kstore_kv.go 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245
  1. // package kstore_kv -- локальное быстрое key-value хранилище ядра.
  2. package kstore_kv
  3. import (
  4. "fmt"
  5. "os"
  6. "sync"
  7. "time"
  8. "github.com/dgraph-io/badger/v4"
  9. . "gitp78su.ipnodns.ru/svi/kern/v4/lev0/helpers"
  10. . "gitp78su.ipnodns.ru/svi/kern/v4/lev0/ktypes"
  11. "gitp78su.ipnodns.ru/svi/kern/v4/lev1/local_ctx"
  12. "gitp78su.ipnodns.ru/svi/kern/v4/lev1/log_buf"
  13. . "gitp78su.ipnodns.ru/svi/kern/v4/lev1/result"
  14. "gitp78su.ipnodns.ru/svi/kern/v4/lev1/safe_bool"
  15. "gitp78su.ipnodns.ru/svi/kern/v4/lev2/kctx"
  16. )
  17. const (
  18. storeStreamName = "kstore_kv" // Имя потока для ожидателя потоков
  19. )
  20. // kStoreKv -- локальное хранилище ядра.
  21. type kStoreKv struct {
  22. sync.RWMutex
  23. kCtx IKernelCtx
  24. lCtx ILocalCtx
  25. log ILogBuf
  26. wg IKernelWg
  27. storePath string
  28. db *badger.DB
  29. isWork ISafeBool
  30. }
  31. var (
  32. kernStore *kStoreKv // Глобальный объект
  33. block sync.Mutex
  34. )
  35. // GetKernelStore -- возвращает новое локальное хранилище ядра.
  36. func GetKernelStore() IResult[*kStoreKv] {
  37. block.Lock()
  38. defer block.Unlock()
  39. if kernStore != nil {
  40. kernStore.log.Debug("GetKernelStore()")
  41. return NewRes(kernStore)
  42. }
  43. resKernCtx := kctx.GetKernelCtx()
  44. if resKernCtx.IsErr() {
  45. err := fmt.Errorf("GetKernelStore(): in get kernel ctx, err=\n\t%w", resKernCtx.Err())
  46. return NewErr[*kStoreKv](err)
  47. }
  48. kCtx := resKernCtx.Val()
  49. log := log_buf.NewLogBuf(log_buf.OptIsTerm(true), log_buf.OptPrefix("kStoreKv"))
  50. resLocCtx := local_ctx.NewLocalCtx(kCtx.Ctx())
  51. if resLocCtx.IsErr() {
  52. err := fmt.Errorf("GetKernelStore(): in get local ctx, err=\n\t%w", resLocCtx.Err())
  53. return NewErr[*kStoreKv](err)
  54. }
  55. sf := &kStoreKv{
  56. kCtx: kCtx,
  57. lCtx: resLocCtx.Val(),
  58. wg: kCtx.Wg(),
  59. isWork: safe_bool.NewSafeBool(),
  60. log: log,
  61. }
  62. sf.open()
  63. kernStore = sf
  64. resSet := kCtx.Set("kernStoreKV", kernStore, "fast KV store on Badger")
  65. if resSet.IsErr() {
  66. err := fmt.Errorf("GetKernelStore(): in set kernel store KV to kernel ctx, err=\n\t%w", resSet.Err())
  67. return NewErr[*kStoreKv](err)
  68. }
  69. return NewRes(kernStore)
  70. }
  71. // Log -- возвращает локальный лог.
  72. func (sf *kStoreKv) Log() ILogBuf {
  73. return sf.log
  74. }
  75. // Set -- устанавливает значение по ключу.
  76. func (sf *kStoreKv) Set(key string, val []byte) IResult[bool] {
  77. sf.Lock()
  78. defer sf.Unlock()
  79. sf.log.Debug("Set(): key='%v'", key)
  80. fnSet := func(txn *badger.Txn) error {
  81. err := txn.Set([]byte(key), val)
  82. return err
  83. }
  84. err := sf.db.Update(fnSet)
  85. if err != nil {
  86. err := fmt.Errorf("Set(): key=%v, err=\n\t%w", key, err)
  87. sf.log.Err(err.Error())
  88. return NewErr[bool](err)
  89. }
  90. return NewRes(true)
  91. }
  92. // Get -- возвращает значение по ключу.
  93. func (sf *kStoreKv) Get(key string) IResult[[]byte] {
  94. sf.RLock()
  95. defer sf.RUnlock()
  96. sf.log.Debug("Get(): key='%v'", key)
  97. var binVal []byte
  98. fnGet := func(txn *badger.Txn) error {
  99. item, err := txn.Get([]byte(key))
  100. if err != nil {
  101. return err
  102. }
  103. binVal, err = item.ValueCopy(binVal)
  104. return err
  105. }
  106. err := sf.db.View(fnGet)
  107. if err != nil {
  108. err := fmt.Errorf("Get(): key=%v, err=\n\t%v", key, err)
  109. sf.log.Err(err.Error())
  110. return NewErr[[]byte](err)
  111. }
  112. return NewRes(binVal)
  113. }
  114. // ByPrefix -- фильтрует ключи по префиксу.
  115. func (sf *kStoreKv) ByPrefix(prefix string) IResult[[]string] {
  116. var (
  117. binKey []byte
  118. lstKey = []string{}
  119. )
  120. // fnValue := func(v []byte) error {
  121. // fmt.Printf("key=%s, value=%s\n", key, v)
  122. // return nil
  123. // }
  124. fnPrefix := func(txn *badger.Txn) error {
  125. it := txn.NewIterator(badger.DefaultIteratorOptions)
  126. defer it.Close()
  127. binPref := []byte(prefix)
  128. for it.Seek(binPref); it.ValidForPrefix(binPref); it.Next() {
  129. item := it.Item()
  130. binKey = item.Key()
  131. // err := item.Value(fnValue)
  132. // if err != nil {
  133. // return err
  134. // }
  135. lstKey = append(lstKey, string(binKey))
  136. }
  137. return nil
  138. }
  139. err := sf.db.View(fnPrefix)
  140. if err != nil {
  141. err := fmt.Errorf("ByPrefix(): in find, err=\n\t%w", err)
  142. return NewErr[[]string](err)
  143. }
  144. return NewRes(lstKey)
  145. }
  146. // Delete -- удалить ключ из хранилища.
  147. func (sf *kStoreKv) Delete(key string) IResult[bool] {
  148. sf.Lock()
  149. defer sf.Unlock()
  150. sf.log.Debug("Delete(): key='%v'", key)
  151. fnDelete := func(txn *badger.Txn) error {
  152. err := txn.Delete([]byte(key))
  153. return err
  154. }
  155. err := sf.db.Update(fnDelete)
  156. if err != nil {
  157. err := fmt.Errorf("Delete(): key=%v, err=\n\t%w", key, err)
  158. sf.log.Err(err.Error())
  159. return NewErr[bool](err)
  160. }
  161. return NewRes(true)
  162. }
  163. // Открывает базу при создании.
  164. func (sf *kStoreKv) open() {
  165. sf.Lock()
  166. defer sf.Unlock()
  167. sf.log.Debug("open()")
  168. strPath := os.Getenv("LOCAL_STORE_PATH")
  169. Hassert(strPath != "", "open(): env LOCAL_STORE_PATH not set")
  170. pwd, err := os.Getwd()
  171. Hassert(err == nil, "open(): in get PWD, err=\n\t%v", err)
  172. sf.storePath = pwd + strPath + "/db_local"
  173. err = os.MkdirAll(sf.storePath, 0750)
  174. Hassert(err == nil, "open(): in make dir %v, err=\n\t%v", sf.storePath, err)
  175. sf.db, err = badger.Open(badger.DefaultOptions(sf.storePath))
  176. Hassert(err == nil, "open(): in open DB %v, err=\n\t%v", sf.storePath, err)
  177. res := sf.wg.Add(storeStreamName)
  178. res.Hassert("open(): in add name stream to IKernelWg")
  179. sf.isWork.Set()
  180. go sf.close()
  181. go sf.clean()
  182. }
  183. // Выполняет периодическую сборку мусора в файле.
  184. func (sf *kStoreKv) clean() {
  185. chRun := make(chan int, 2)
  186. defer close(chRun)
  187. fnClean := func() {
  188. sf.Lock()
  189. defer sf.Unlock()
  190. _ = sf.db.RunValueLogGC(0.7)
  191. }
  192. chRun <- 1
  193. for {
  194. select {
  195. case <-sf.kCtx.Ctx().Done(): // надо прекратить работу
  196. return
  197. case <-chRun: // Пора поработать
  198. fnClean()
  199. }
  200. time.Sleep(time.Second * 1)
  201. }
  202. }
  203. // Ожидает последнего потока под отдельной блокировкой.
  204. func (sf *kStoreKv) wait(chWait chan int) {
  205. for {
  206. time.Sleep(time.Millisecond * 5)
  207. if sf.wg.Len() <= 1 {
  208. break
  209. }
  210. }
  211. close(chWait)
  212. }
  213. // Ожидает закрытия контекста ядра, закрывает хранилище.
  214. func (sf *kStoreKv) close() {
  215. sf.kCtx.Wait()
  216. sf.Lock()
  217. defer sf.Unlock()
  218. if !sf.isWork.Get() {
  219. return
  220. }
  221. chWait := make(chan int, 2)
  222. go sf.wait(chWait)
  223. <-chWait
  224. sf.isWork.Reset()
  225. err := sf.db.Close()
  226. Assert(err == nil, "kStoreKv.close(): in close DB, err=\n\t%v", err)
  227. sf.wg.Done(storeStreamName)
  228. sf.log.Debug("close(): done")
  229. }