kstore_kv.go 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242
  1. // package kstore_kv -- локальное быстрое key-value хранилище ядра.
  2. package kstore_kv
  3. import (
  4. "fmt"
  5. "os"
  6. "sync"
  7. "time"
  8. "github.com/dgraph-io/badger/v4"
  9. mKh "gitp78su.ipnodns.ru/svi/kern/v4/lev0/helpers"
  10. mKt "gitp78su.ipnodns.ru/svi/kern/v4/lev0/ktypes"
  11. mL1 "gitp78su.ipnodns.ru/svi/kern/v4/lev1"
  12. "gitp78su.ipnodns.ru/svi/kern/v4/lev2/kctx"
  13. )
  14. const (
  15. storeStreamName = "kstore_kv" // Имя потока для ожидателя потоков
  16. )
  17. // kStoreKv -- локальное хранилище ядра.
  18. type kStoreKv struct {
  19. sync.RWMutex
  20. kCtx mKt.IKernelCtx
  21. lCtx mKt.ILocalCtx
  22. log mKt.ILogBuf
  23. wg mKt.IKernelWg
  24. storePath string
  25. db *badger.DB
  26. isWork mKt.ISafeBool
  27. }
  28. var (
  29. kernStore *kStoreKv // Глобальный объект
  30. block sync.Mutex
  31. )
  32. // GetKernelStore -- возвращает новое локальное хранилище ядра.
  33. func GetKernelStore() mKt.IResult[*kStoreKv] {
  34. block.Lock()
  35. defer block.Unlock()
  36. if kernStore != nil {
  37. kernStore.log.Debug("GetKernelStore()")
  38. return mL1.NewRes(kernStore)
  39. }
  40. resKernCtx := kctx.GetKernelCtx()
  41. if resKernCtx.IsErr() {
  42. err := fmt.Errorf("GetKernelStore(): in get kernel ctx, err=\n\t%w", resKernCtx.Err())
  43. return mL1.NewErr[*kStoreKv](err)
  44. }
  45. kCtx := resKernCtx.Val()
  46. log := mL1.NewLogBuf(mL1.OptIsTerm(true), mL1.OptPrefix("kStoreKv"))
  47. resLocCtx := mL1.NewLocalCtx(kCtx.Ctx())
  48. if resLocCtx.IsErr() {
  49. err := fmt.Errorf("GetKernelStore(): in get local ctx, err=\n\t%w", resLocCtx.Err())
  50. return mL1.NewErr[*kStoreKv](err)
  51. }
  52. sf := &kStoreKv{
  53. kCtx: kCtx,
  54. lCtx: resLocCtx.Val(),
  55. wg: kCtx.Wg(),
  56. isWork: mL1.NewSafeBool(),
  57. log: log,
  58. }
  59. sf.open()
  60. kernStore = sf
  61. resSet := kCtx.Set("kernStoreKV", kernStore, "fast KV store on Badger")
  62. if resSet.IsErr() {
  63. err := fmt.Errorf("GetKernelStore(): in set kernel store KV to kernel ctx, err=\n\t%w", resSet.Err())
  64. return mL1.NewErr[*kStoreKv](err)
  65. }
  66. return mL1.NewRes(kernStore)
  67. }
  68. // Log -- возвращает локальный лог.
  69. func (sf *kStoreKv) Log() mKt.ILogBuf {
  70. return sf.log
  71. }
  72. // Set -- устанавливает значение по ключу.
  73. func (sf *kStoreKv) Set(key string, val []byte) mKt.IResult[bool] {
  74. sf.Lock()
  75. defer sf.Unlock()
  76. sf.log.Debug("Set(): key='%v'", key)
  77. fnSet := func(txn *badger.Txn) error {
  78. err := txn.Set([]byte(key), val)
  79. return err
  80. }
  81. err := sf.db.Update(fnSet)
  82. if err != nil {
  83. err := fmt.Errorf("Set(): key=%v, err=\n\t%w", key, err)
  84. sf.log.Err(err.Error())
  85. return mL1.NewErr[bool](err)
  86. }
  87. return mL1.NewRes(true)
  88. }
  89. // Get -- возвращает значение по ключу.
  90. func (sf *kStoreKv) Get(key string) mKt.IResult[[]byte] {
  91. sf.RLock()
  92. defer sf.RUnlock()
  93. sf.log.Debug("Get(): key='%v'", key)
  94. var binVal []byte
  95. fnGet := func(txn *badger.Txn) error {
  96. item, err := txn.Get([]byte(key))
  97. if err != nil {
  98. return err
  99. }
  100. binVal, err = item.ValueCopy(binVal)
  101. return err
  102. }
  103. err := sf.db.View(fnGet)
  104. if err != nil {
  105. err := fmt.Errorf("Get(): key=%v, err=\n\t%v", key, err)
  106. sf.log.Err(err.Error())
  107. return mL1.NewErr[[]byte](err)
  108. }
  109. return mL1.NewRes(binVal)
  110. }
  111. // ByPrefix -- фильтрует ключи по префиксу.
  112. func (sf *kStoreKv) ByPrefix(prefix string) mKt.IResult[[]string] {
  113. var (
  114. binKey []byte
  115. lstKey = []string{}
  116. )
  117. // fnValue := func(v []byte) error {
  118. // fmt.Printf("key=%s, value=%s\n", key, v)
  119. // return nil
  120. // }
  121. fnPrefix := func(txn *badger.Txn) error {
  122. it := txn.NewIterator(badger.DefaultIteratorOptions)
  123. defer it.Close()
  124. binPref := []byte(prefix)
  125. for it.Seek(binPref); it.ValidForPrefix(binPref); it.Next() {
  126. item := it.Item()
  127. binKey = item.Key()
  128. // err := item.Value(fnValue)
  129. // if err != nil {
  130. // return err
  131. // }
  132. lstKey = append(lstKey, string(binKey))
  133. }
  134. return nil
  135. }
  136. err := sf.db.View(fnPrefix)
  137. if err != nil {
  138. err := fmt.Errorf("ByPrefix(): in find, err=\n\t%w", err)
  139. return mL1.NewErr[[]string](err)
  140. }
  141. return mL1.NewRes(lstKey)
  142. }
  143. // Delete -- удалить ключ из хранилища.
  144. func (sf *kStoreKv) Delete(key string) mKt.IResult[bool] {
  145. sf.Lock()
  146. defer sf.Unlock()
  147. sf.log.Debug("Delete(): key='%v'", key)
  148. fnDelete := func(txn *badger.Txn) error {
  149. err := txn.Delete([]byte(key))
  150. return err
  151. }
  152. err := sf.db.Update(fnDelete)
  153. if err != nil {
  154. err := fmt.Errorf("Delete(): key=%v, err=\n\t%w", key, err)
  155. sf.log.Err(err.Error())
  156. return mL1.NewErr[bool](err)
  157. }
  158. return mL1.NewRes(true)
  159. }
  160. // Открывает базу при создании.
  161. func (sf *kStoreKv) open() {
  162. sf.Lock()
  163. defer sf.Unlock()
  164. sf.log.Debug("open()")
  165. strPath := os.Getenv("LOCAL_STORE_PATH")
  166. mKh.Hassert(strPath != "", "open(): env LOCAL_STORE_PATH not set")
  167. pwd, err := os.Getwd()
  168. mKh.Hassert(err == nil, "open(): in get PWD, err=\n\t%v", err)
  169. sf.storePath = pwd + strPath + "/db_local"
  170. err = os.MkdirAll(sf.storePath, 0750)
  171. mKh.Hassert(err == nil, "open(): in make dir %v, err=\n\t%v", sf.storePath, err)
  172. sf.db, err = badger.Open(badger.DefaultOptions(sf.storePath))
  173. mKh.Hassert(err == nil, "open(): in open DB %v, err=\n\t%v", sf.storePath, err)
  174. res := sf.wg.Add(storeStreamName)
  175. res.Hassert("open(): in add name stream to IKernelWg")
  176. sf.isWork.Set()
  177. go sf.close()
  178. go sf.clean()
  179. }
  180. // Выполняет периодическую сборку мусора в файле.
  181. func (sf *kStoreKv) clean() {
  182. chRun := make(chan int, 2)
  183. defer close(chRun)
  184. fnClean := func() {
  185. sf.Lock()
  186. defer sf.Unlock()
  187. _ = sf.db.RunValueLogGC(0.7)
  188. }
  189. chRun <- 1
  190. for {
  191. select {
  192. case <-sf.kCtx.Ctx().Done(): // надо прекратить работу
  193. return
  194. case <-chRun: // Пора поработать
  195. fnClean()
  196. }
  197. time.Sleep(time.Second * 1)
  198. }
  199. }
  200. // Ожидает последнего потока под отдельной блокировкой.
  201. func (sf *kStoreKv) wait(chWait chan int) {
  202. for {
  203. time.Sleep(time.Millisecond * 5)
  204. if sf.wg.Len() <= 1 {
  205. break
  206. }
  207. }
  208. close(chWait)
  209. }
  210. // Ожидает закрытия контекста ядра, закрывает хранилище.
  211. func (sf *kStoreKv) close() {
  212. sf.kCtx.Wait()
  213. sf.Lock()
  214. defer sf.Unlock()
  215. if !sf.isWork.Get() {
  216. return
  217. }
  218. chWait := make(chan int, 2)
  219. go sf.wait(chWait)
  220. <-chWait
  221. sf.isWork.Reset()
  222. err := sf.db.Close()
  223. mKh.Assert(err == nil, "kStoreKv.close(): in close DB, err=\n\t%v", err)
  224. sf.wg.Done(storeStreamName)
  225. sf.log.Debug("close(): done")
  226. }