kstore_kv.go 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237
  1. // package kstore_kv -- локальное быстрое key-value хранилище ядра.
  2. package kstore_kv
  3. import (
  4. "fmt"
  5. "os"
  6. "sync"
  7. "time"
  8. "github.com/dgraph-io/badger/v4"
  9. mL0 "gitp78su.ipnodns.ru/svi/kern/v4/lev0"
  10. "gitp78su.ipnodns.ru/svi/kern/v4/lev0/defs/stream_name"
  11. "gitp78su.ipnodns.ru/svi/kern/v4/lev0/etypes/ebool"
  12. mKh "gitp78su.ipnodns.ru/svi/kern/v4/lev0/helpers"
  13. "gitp78su.ipnodns.ru/svi/kern/v4/lev0/kspec"
  14. mKs "gitp78su.ipnodns.ru/svi/kern/v4/lev0/kspec"
  15. mL1 "gitp78su.ipnodns.ru/svi/kern/v4/lev1"
  16. "gitp78su.ipnodns.ru/svi/kern/v4/lev1/log_buf"
  17. "gitp78su.ipnodns.ru/svi/kern/v4/lev1/safe_bool"
  18. "gitp78su.ipnodns.ru/svi/kern/v4/lev2/kctx"
  19. )
  20. var (
  21. storeStreamName = stream_name.NewAStreamName("kstore_kv") // Имя потока для ожидателя потоков
  22. )
  23. // kStoreKv -- локальное хранилище ядра.
  24. type kStoreKv struct {
  25. sync.RWMutex
  26. kCtx mKs.IKernelCtx
  27. lCtx mKs.ILocalCtx
  28. log mKs.ILogBuf
  29. wg mKs.IKernelWg
  30. storePath string
  31. db *badger.DB
  32. isWork mKs.ISafeBool
  33. }
  34. var (
  35. kernStore *kStoreKv // Глобальный объект
  36. block sync.Mutex
  37. )
  38. // GetKernelStore -- возвращает новое локальное хранилище ядра.
  39. func GetKernelStore() mKs.IKernelStoreKv {
  40. block.Lock()
  41. defer block.Unlock()
  42. if kernStore != nil {
  43. kernStore.log.Debug("GetKernelStore()")
  44. return kernStore
  45. }
  46. kCtx := kctx.GetKernelCtx()
  47. param := &log_buf.LogBufParam{
  48. Prefix_: "kStoreKv",
  49. IsTerm_: safe_bool.NewSafeBool(true),
  50. }
  51. log := mL1.NewLogBuf(param)
  52. sf := &kStoreKv{
  53. kCtx: kCtx,
  54. lCtx: mL1.NewLocalCtx(kCtx.Ctx()),
  55. wg: kCtx.Wg(),
  56. isWork: mL1.NewSafeBool(),
  57. log: log,
  58. }
  59. sf.open()
  60. kernStore = sf
  61. kCtx.Set("kernStoreKV", kernStore, "fast KV store on Badger")
  62. return kernStore
  63. }
  64. // Log -- возвращает локальный лог.
  65. func (sf *kStoreKv) Log() mKs.ILogBuf {
  66. return sf.log
  67. }
  68. // Set -- устанавливает значение по ключу.
  69. func (sf *kStoreKv) Set(key string, val []byte) mL0.IResult[kspec.EBool] {
  70. sf.Lock()
  71. defer sf.Unlock()
  72. sf.log.Debug("Set(): key='%v'", key)
  73. fnSet := func(txn *badger.Txn) error {
  74. err := txn.Set([]byte(key), val)
  75. return err
  76. }
  77. err := sf.db.Update(fnSet)
  78. if err != nil {
  79. err := fmt.Errorf("Set(): key=%v, err=\n\t%w", key, err)
  80. sf.log.Err(err.Error())
  81. return mL0.NewErr[kspec.EBool](err)
  82. }
  83. return mL0.NewOk(ebool.NewEBool(true))
  84. }
  85. // Get -- возвращает значение по ключу.
  86. func (sf *kStoreKv) Get(key string) mL0.IResult[[]byte] {
  87. sf.RLock()
  88. defer sf.RUnlock()
  89. sf.log.Debug("Get(): key='%v'", key)
  90. var binVal []byte
  91. fnGet := func(txn *badger.Txn) error {
  92. item, err := txn.Get([]byte(key))
  93. if err != nil {
  94. return err
  95. }
  96. binVal, err = item.ValueCopy(binVal)
  97. return err
  98. }
  99. err := sf.db.View(fnGet)
  100. if err != nil {
  101. err := fmt.Errorf("Get(): key=%v, err=\n\t%v", key, err)
  102. sf.log.Err(err.Error())
  103. return mL0.NewErr[[]byte](err)
  104. }
  105. return mL0.NewOk(binVal)
  106. }
  107. // ByPrefix -- фильтрует ключи по префиксу.
  108. func (sf *kStoreKv) ByPrefix(prefix string) mL0.IResult[[]string] {
  109. var (
  110. binKey []byte
  111. lstKey = []string{}
  112. )
  113. // fnValue := func(v []byte) error {
  114. // fmt.Printf("key=%s, value=%s\n", key, v)
  115. // return nil
  116. // }
  117. fnPrefix := func(txn *badger.Txn) error {
  118. it := txn.NewIterator(badger.DefaultIteratorOptions)
  119. defer it.Close()
  120. binPref := []byte(prefix)
  121. for it.Seek(binPref); it.ValidForPrefix(binPref); it.Next() {
  122. item := it.Item()
  123. binKey = item.Key()
  124. // err := item.Value(fnValue)
  125. // if err != nil {
  126. // return err
  127. // }
  128. lstKey = append(lstKey, string(binKey))
  129. }
  130. return nil
  131. }
  132. err := sf.db.View(fnPrefix)
  133. if err != nil {
  134. err := fmt.Errorf("ByPrefix(): in find, err=\n\t%w", err)
  135. return mL0.NewErr[[]string](err)
  136. }
  137. return mL0.NewOk(lstKey)
  138. }
  139. // Delete -- удалить ключ из хранилища.
  140. func (sf *kStoreKv) Delete(key string) mL0.IResult[kspec.EBool] {
  141. sf.Lock()
  142. defer sf.Unlock()
  143. sf.log.Debug("Delete(): key='%v'", key)
  144. fnDelete := func(txn *badger.Txn) error {
  145. err := txn.Delete([]byte(key))
  146. return err
  147. }
  148. err := sf.db.Update(fnDelete)
  149. if err != nil {
  150. err := fmt.Errorf("Delete(): key=%v, err=\n\t%w", key, err)
  151. sf.log.Err(err.Error())
  152. return mL0.NewErr[kspec.EBool](err)
  153. }
  154. return mL0.NewOk(ebool.NewEBool(true))
  155. }
  156. // Открывает базу при создании.
  157. func (sf *kStoreKv) open() {
  158. sf.Lock()
  159. defer sf.Unlock()
  160. sf.log.Debug("open()")
  161. strPath := os.Getenv("LOCAL_STORE_PATH")
  162. mKh.Hassert(strPath != "", "open(): env LOCAL_STORE_PATH not set")
  163. pwd, err := os.Getwd()
  164. mKh.Hassert(err == nil, "open(): in get PWD, err=\n\t%v", err)
  165. sf.storePath = pwd + strPath + "/db_local"
  166. err = os.MkdirAll(sf.storePath, 0750)
  167. mKh.Hassert(err == nil, "open(): in make dir %v, err=\n\t%v", sf.storePath, err)
  168. sf.db, err = badger.Open(badger.DefaultOptions(sf.storePath))
  169. mKh.Hassert(err == nil, "open(): in open DB %v, err=\n\t%v", sf.storePath, err)
  170. sf.wg.Add(storeStreamName)
  171. sf.isWork.Set()
  172. go sf.close()
  173. go sf.clean()
  174. }
  175. // Выполняет периодическую сборку мусора в файле.
  176. func (sf *kStoreKv) clean() {
  177. chRun := make(chan int, 2)
  178. defer close(chRun)
  179. fnClean := func() {
  180. sf.Lock()
  181. defer sf.Unlock()
  182. _ = sf.db.RunValueLogGC(0.7)
  183. }
  184. chRun <- 1
  185. for {
  186. select {
  187. case <-sf.kCtx.Ctx().Done(): // надо прекратить работу
  188. return
  189. case <-chRun: // Пора поработать
  190. fnClean()
  191. }
  192. time.Sleep(time.Second * 1)
  193. }
  194. }
  195. // Ожидает последнего потока под отдельной блокировкой.
  196. func (sf *kStoreKv) wait(chWait chan int) {
  197. for {
  198. time.Sleep(time.Millisecond * 5)
  199. if sf.wg.Len() <= 1 {
  200. break
  201. }
  202. }
  203. close(chWait)
  204. }
  205. // Ожидает закрытия контекста ядра, закрывает хранилище.
  206. func (sf *kStoreKv) close() {
  207. sf.kCtx.Wait()
  208. sf.Lock()
  209. defer sf.Unlock()
  210. if !sf.isWork.Get() {
  211. return
  212. }
  213. chWait := make(chan int, 2)
  214. go sf.wait(chWait)
  215. <-chWait
  216. sf.isWork.Reset()
  217. err := sf.db.Close()
  218. mKh.Assert(err == nil, "kStoreKv.close(): in close DB, err=\n\t%v", err)
  219. sf.wg.Done(storeStreamName)
  220. sf.log.Debug("close(): done")
  221. }