kstore_kv.go 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244
  1. // package kstore_kv -- локальное быстрое key-value хранилище ядра.
  2. package kstore_kv
  3. import (
  4. "fmt"
  5. "os"
  6. "sync"
  7. "time"
  8. "github.com/dgraph-io/badger/v4"
  9. mL0 "gitp78su.ipnodns.ru/svi/kern/v4/lev0"
  10. "gitp78su.ipnodns.ru/svi/kern/v4/lev0/alias"
  11. mKh "gitp78su.ipnodns.ru/svi/kern/v4/lev0/helpers"
  12. mKt "gitp78su.ipnodns.ru/svi/kern/v4/lev0/ktypes"
  13. mL1 "gitp78su.ipnodns.ru/svi/kern/v4/lev1"
  14. "gitp78su.ipnodns.ru/svi/kern/v4/lev2/kctx"
  15. )
  16. var (
  17. storeStreamName = alias.NewAStreamName("kstore_kv") // Имя потока для ожидателя потоков
  18. )
  19. // kStoreKv -- локальное хранилище ядра.
  20. type kStoreKv struct {
  21. sync.RWMutex
  22. kCtx mKt.IKernelCtx
  23. lCtx mKt.ILocalCtx
  24. log mKt.ILogBuf
  25. wg mKt.IKernelWg
  26. storePath string
  27. db *badger.DB
  28. isWork mKt.ISafeBool
  29. }
  30. var (
  31. kernStore *kStoreKv // Глобальный объект
  32. block sync.Mutex
  33. )
  34. // GetKernelStore -- возвращает новое локальное хранилище ядра.
  35. func GetKernelStore() *mL0.Result[*kStoreKv] {
  36. block.Lock()
  37. defer block.Unlock()
  38. if kernStore != nil {
  39. kernStore.log.Debug("GetKernelStore()")
  40. return mL0.NewRes(kernStore)
  41. }
  42. resKernCtx := kctx.GetKernelCtx()
  43. if resKernCtx.IsErr() {
  44. err := fmt.Errorf("GetKernelStore(): in get kernel ctx, err=\n\t%w", resKernCtx.Err())
  45. return mL0.NewErr[*kStoreKv](err)
  46. }
  47. kCtx := resKernCtx.Val()
  48. log := mL1.NewLogBuf(mL1.OptIsTerm(true), mL1.OptPrefix("kStoreKv"))
  49. resLocCtx := mL1.NewLocalCtx(kCtx.Ctx())
  50. if resLocCtx.IsErr() {
  51. err := fmt.Errorf("GetKernelStore(): in get local ctx, err=\n\t%w", resLocCtx.Err())
  52. return mL0.NewErr[*kStoreKv](err)
  53. }
  54. sf := &kStoreKv{
  55. kCtx: kCtx,
  56. lCtx: resLocCtx.Val(),
  57. wg: kCtx.Wg(),
  58. isWork: mL1.NewSafeBool(),
  59. log: log,
  60. }
  61. sf.open()
  62. kernStore = sf
  63. resSet := kCtx.Set("kernStoreKV", kernStore, "fast KV store on Badger")
  64. if resSet.IsErr() {
  65. err := fmt.Errorf("GetKernelStore(): in set kernel store KV to kernel ctx, err=\n\t%w", resSet.Err())
  66. return mL0.NewErr[*kStoreKv](err)
  67. }
  68. return mL0.NewRes(kernStore)
  69. }
  70. // Log -- возвращает локальный лог.
  71. func (sf *kStoreKv) Log() mKt.ILogBuf {
  72. return sf.log
  73. }
  74. // Set -- устанавливает значение по ключу.
  75. func (sf *kStoreKv) Set(key string, val []byte) *mL0.Result[bool] {
  76. sf.Lock()
  77. defer sf.Unlock()
  78. sf.log.Debug("Set(): key='%v'", key)
  79. fnSet := func(txn *badger.Txn) error {
  80. err := txn.Set([]byte(key), val)
  81. return err
  82. }
  83. err := sf.db.Update(fnSet)
  84. if err != nil {
  85. err := fmt.Errorf("Set(): key=%v, err=\n\t%w", key, err)
  86. sf.log.Err(err.Error())
  87. return mL0.NewErr[bool](err)
  88. }
  89. return mL0.NewRes(true)
  90. }
  91. // Get -- возвращает значение по ключу.
  92. func (sf *kStoreKv) Get(key string) *mL0.Result[[]byte] {
  93. sf.RLock()
  94. defer sf.RUnlock()
  95. sf.log.Debug("Get(): key='%v'", key)
  96. var binVal []byte
  97. fnGet := func(txn *badger.Txn) error {
  98. item, err := txn.Get([]byte(key))
  99. if err != nil {
  100. return err
  101. }
  102. binVal, err = item.ValueCopy(binVal)
  103. return err
  104. }
  105. err := sf.db.View(fnGet)
  106. if err != nil {
  107. err := fmt.Errorf("Get(): key=%v, err=\n\t%v", key, err)
  108. sf.log.Err(err.Error())
  109. return mL0.NewErr[[]byte](err)
  110. }
  111. return mL0.NewRes(binVal)
  112. }
  113. // ByPrefix -- фильтрует ключи по префиксу.
  114. func (sf *kStoreKv) ByPrefix(prefix string) *mL0.Result[[]string] {
  115. var (
  116. binKey []byte
  117. lstKey = []string{}
  118. )
  119. // fnValue := func(v []byte) error {
  120. // fmt.Printf("key=%s, value=%s\n", key, v)
  121. // return nil
  122. // }
  123. fnPrefix := func(txn *badger.Txn) error {
  124. it := txn.NewIterator(badger.DefaultIteratorOptions)
  125. defer it.Close()
  126. binPref := []byte(prefix)
  127. for it.Seek(binPref); it.ValidForPrefix(binPref); it.Next() {
  128. item := it.Item()
  129. binKey = item.Key()
  130. // err := item.Value(fnValue)
  131. // if err != nil {
  132. // return err
  133. // }
  134. lstKey = append(lstKey, string(binKey))
  135. }
  136. return nil
  137. }
  138. err := sf.db.View(fnPrefix)
  139. if err != nil {
  140. err := fmt.Errorf("ByPrefix(): in find, err=\n\t%w", err)
  141. return mL0.NewErr[[]string](err)
  142. }
  143. return mL0.NewRes(lstKey)
  144. }
  145. // Delete -- удалить ключ из хранилища.
  146. func (sf *kStoreKv) Delete(key string) *mL0.Result[bool] {
  147. sf.Lock()
  148. defer sf.Unlock()
  149. sf.log.Debug("Delete(): key='%v'", key)
  150. fnDelete := func(txn *badger.Txn) error {
  151. err := txn.Delete([]byte(key))
  152. return err
  153. }
  154. err := sf.db.Update(fnDelete)
  155. if err != nil {
  156. err := fmt.Errorf("Delete(): key=%v, err=\n\t%w", key, err)
  157. sf.log.Err(err.Error())
  158. return mL0.NewErr[bool](err)
  159. }
  160. return mL0.NewRes(true)
  161. }
  162. // Открывает базу при создании.
  163. func (sf *kStoreKv) open() {
  164. sf.Lock()
  165. defer sf.Unlock()
  166. sf.log.Debug("open()")
  167. strPath := os.Getenv("LOCAL_STORE_PATH")
  168. mKh.Hassert(strPath != "", "open(): env LOCAL_STORE_PATH not set")
  169. pwd, err := os.Getwd()
  170. mKh.Hassert(err == nil, "open(): in get PWD, err=\n\t%v", err)
  171. sf.storePath = pwd + strPath + "/db_local"
  172. err = os.MkdirAll(sf.storePath, 0750)
  173. mKh.Hassert(err == nil, "open(): in make dir %v, err=\n\t%v", sf.storePath, err)
  174. sf.db, err = badger.Open(badger.DefaultOptions(sf.storePath))
  175. mKh.Hassert(err == nil, "open(): in open DB %v, err=\n\t%v", sf.storePath, err)
  176. res := sf.wg.Add(storeStreamName)
  177. res.Hassert("open(): in add name stream to IKernelWg")
  178. sf.isWork.Set()
  179. go sf.close()
  180. go sf.clean()
  181. }
  182. // Выполняет периодическую сборку мусора в файле.
  183. func (sf *kStoreKv) clean() {
  184. chRun := make(chan int, 2)
  185. defer close(chRun)
  186. fnClean := func() {
  187. sf.Lock()
  188. defer sf.Unlock()
  189. _ = sf.db.RunValueLogGC(0.7)
  190. }
  191. chRun <- 1
  192. for {
  193. select {
  194. case <-sf.kCtx.Ctx().Done(): // надо прекратить работу
  195. return
  196. case <-chRun: // Пора поработать
  197. fnClean()
  198. }
  199. time.Sleep(time.Second * 1)
  200. }
  201. }
  202. // Ожидает последнего потока под отдельной блокировкой.
  203. func (sf *kStoreKv) wait(chWait chan int) {
  204. for {
  205. time.Sleep(time.Millisecond * 5)
  206. if sf.wg.Len() <= 1 {
  207. break
  208. }
  209. }
  210. close(chWait)
  211. }
  212. // Ожидает закрытия контекста ядра, закрывает хранилище.
  213. func (sf *kStoreKv) close() {
  214. sf.kCtx.Wait()
  215. sf.Lock()
  216. defer sf.Unlock()
  217. if !sf.isWork.Get() {
  218. return
  219. }
  220. chWait := make(chan int, 2)
  221. go sf.wait(chWait)
  222. <-chWait
  223. sf.isWork.Reset()
  224. err := sf.db.Close()
  225. mKh.Assert(err == nil, "kStoreKv.close(): in close DB, err=\n\t%v", err)
  226. sf.wg.Done(storeStreamName)
  227. sf.log.Debug("close(): done")
  228. }