kstore_kv.go 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231
  1. // package kstore_kv -- локальное быстрое key-value хранилище ядра
  2. package kstore_kv
  3. import (
  4. "fmt"
  5. "os"
  6. "sync"
  7. "time"
  8. "github.com/dgraph-io/badger/v4"
  9. "gitp78su.ipnodns.ru/svi/kern/v4/kc/log_buf"
  10. "gitp78su.ipnodns.ru/svi/kern/v4/krn/kctx"
  11. . "gitp78su.ipnodns.ru/svi/kern/v4/lev0/ktypes"
  12. . "gitp78su.ipnodns.ru/svi/kern/v4/lev1/helpers"
  13. "gitp78su.ipnodns.ru/svi/kern/v4/lev1/local_ctx"
  14. . "gitp78su.ipnodns.ru/svi/kern/v4/lev1/result"
  15. "gitp78su.ipnodns.ru/svi/kern/v4/lev1/safe_bool"
  16. )
  17. const (
  18. storeStreamName = "kstore_kv" // Имя потока для ожидателя потоков
  19. )
  20. // kStoreKv -- локальное хранилище ядра
  21. type kStoreKv struct {
  22. sync.RWMutex
  23. kCtx IKernelCtx
  24. ctx ILocalCtx
  25. log ILogBuf
  26. wg IKernelWg
  27. storePath string
  28. db *badger.DB
  29. isWork ISafeBool
  30. }
  31. var (
  32. kernStore *kStoreKv // Глобальный объект
  33. block sync.Mutex
  34. )
  35. // GetKernelStore -- возвращает новое локальное хранилище ядра
  36. func GetKernelStore() IKernelStoreKv {
  37. block.Lock()
  38. defer block.Unlock()
  39. if kernStore != nil {
  40. kernStore.log.Debug("GetKernelStore()")
  41. return kernStore
  42. }
  43. ctx := kctx.GetKernelCtx()
  44. log := log_buf.NewLogBuf(log_buf.OptIsTerm(true), log_buf.OptPrefix("kStoreKv"))
  45. sf := &kStoreKv{
  46. kCtx: ctx,
  47. ctx: local_ctx.NewLocalCtx(ctx.Ctx()),
  48. wg: ctx.Wg(),
  49. isWork: safe_bool.NewSafeBool(),
  50. log: log,
  51. }
  52. sf.open()
  53. kernStore = sf
  54. ctx.Set("kernStoreKV", kernStore, "fast KV store on Badger")
  55. return kernStore
  56. }
  57. // Log -- возвращает локальный лог
  58. func (sf *kStoreKv) Log() ILogBuf {
  59. return sf.log
  60. }
  61. // Set -- устанавливает значение по ключу
  62. func (sf *kStoreKv) Set(key string, val []byte) IResult[bool] {
  63. sf.Lock()
  64. defer sf.Unlock()
  65. sf.log.Debug("Set(): key='%v'", key)
  66. fnSet := func(txn *badger.Txn) error {
  67. err := txn.Set([]byte(key), val)
  68. return err
  69. }
  70. err := sf.db.Update(fnSet)
  71. if err != nil {
  72. err := fmt.Errorf("Set(): key=%v, err=\n\t%w", key, err)
  73. sf.log.Err(err.Error())
  74. return NewErr[bool](err)
  75. }
  76. return NewRes(true)
  77. }
  78. // Get -- возвращает значение по ключу
  79. func (sf *kStoreKv) Get(key string) IResult[[]byte] {
  80. sf.RLock()
  81. defer sf.RUnlock()
  82. sf.log.Debug("Get(): key='%v'", key)
  83. var binVal []byte
  84. fnGet := func(txn *badger.Txn) error {
  85. item, err := txn.Get([]byte(key))
  86. if err != nil {
  87. return err
  88. }
  89. binVal, err = item.ValueCopy(binVal)
  90. return err
  91. }
  92. err := sf.db.View(fnGet)
  93. if err != nil {
  94. err := fmt.Errorf("Get(): key=%v, err=\n\t%v", key, err)
  95. sf.log.Err(err.Error())
  96. return NewErr[[]byte](err)
  97. }
  98. return NewRes(binVal)
  99. }
  100. // ByPrefix -- фильтрует ключи по префиксу
  101. func (sf *kStoreKv) ByPrefix(prefix string) IResult[[]string] {
  102. var (
  103. binKey []byte
  104. lstKey = []string{}
  105. )
  106. // fnValue := func(v []byte) error {
  107. // fmt.Printf("key=%s, value=%s\n", key, v)
  108. // return nil
  109. // }
  110. fnPrefix := func(txn *badger.Txn) error {
  111. it := txn.NewIterator(badger.DefaultIteratorOptions)
  112. defer it.Close()
  113. binPref := []byte(prefix)
  114. for it.Seek(binPref); it.ValidForPrefix(binPref); it.Next() {
  115. item := it.Item()
  116. binKey = item.Key()
  117. // err := item.Value(fnValue)
  118. // if err != nil {
  119. // return err
  120. // }
  121. lstKey = append(lstKey, string(binKey))
  122. }
  123. return nil
  124. }
  125. err := sf.db.View(fnPrefix)
  126. if err != nil {
  127. err := fmt.Errorf("ByPrefix(): in find, err=\n\t%w", err)
  128. return NewErr[[]string](err)
  129. }
  130. return NewRes(lstKey)
  131. }
  132. // Delete -- удалить ключ из хранилища
  133. func (sf *kStoreKv) Delete(key string) IResult[bool] {
  134. sf.Lock()
  135. defer sf.Unlock()
  136. sf.log.Debug("Delete(): key='%v'", key)
  137. fnDelete := func(txn *badger.Txn) error {
  138. err := txn.Delete([]byte(key))
  139. return err
  140. }
  141. err := sf.db.Update(fnDelete)
  142. if err != nil {
  143. err := fmt.Errorf("Delete(): key=%v, err=\n\t%w", key, err)
  144. sf.log.Err(err.Error())
  145. return NewErr[bool](err)
  146. }
  147. return NewRes(true)
  148. }
  149. // Открывает базу при создании
  150. func (sf *kStoreKv) open() {
  151. sf.Lock()
  152. defer sf.Unlock()
  153. sf.log.Debug("open()")
  154. strPath := os.Getenv("LOCAL_STORE_PATH")
  155. Hassert(strPath != "", "open(): env LOCAL_STORE_PATH not set")
  156. pwd, err := os.Getwd()
  157. Hassert(err == nil, "open(): in get PWD, err=\n\t%v", err)
  158. sf.storePath = pwd + strPath + "/db_local"
  159. err = os.MkdirAll(sf.storePath, 0750)
  160. Hassert(err == nil, "open(): in make dir %v, err=\n\t%v", sf.storePath, err)
  161. sf.db, err = badger.Open(badger.DefaultOptions(sf.storePath))
  162. Hassert(err == nil, "open(): in open DB %v, err=\n\t%v", sf.storePath, err)
  163. res := sf.wg.Add(storeStreamName)
  164. res.Hassert("open(): in add name stream to IKernelWg")
  165. sf.isWork.Set()
  166. go sf.close()
  167. go sf.clean()
  168. }
  169. // Выполняет периодическую сборку мусора в файле
  170. func (sf *kStoreKv) clean() {
  171. chRun := make(chan int, 2)
  172. defer close(chRun)
  173. fnClean := func() {
  174. sf.Lock()
  175. defer sf.Unlock()
  176. _ = sf.db.RunValueLogGC(0.7)
  177. }
  178. chRun <- 1
  179. for {
  180. select {
  181. case <-sf.kCtx.Ctx().Done(): // надо прекратить работу
  182. return
  183. case <-chRun: // Пора поработать
  184. fnClean()
  185. }
  186. time.Sleep(time.Second * 1)
  187. }
  188. }
  189. // Ожидает последнего потока под отдельной блокировкой
  190. func (sf *kStoreKv) wait(chWait chan int) {
  191. for {
  192. time.Sleep(time.Millisecond * 5)
  193. if sf.wg.Len() <= 1 {
  194. break
  195. }
  196. }
  197. close(chWait)
  198. }
  199. // Ожидает закрытия контекста ядра, закрывает хранилище
  200. func (sf *kStoreKv) close() {
  201. sf.kCtx.Done()
  202. sf.Lock()
  203. defer sf.Unlock()
  204. if !sf.isWork.Get() {
  205. return
  206. }
  207. chWait := make(chan int, 2)
  208. go sf.wait(chWait)
  209. <-chWait
  210. sf.isWork.Reset()
  211. err := sf.db.Close()
  212. Assert(err == nil, "kStoreKv.close(): in close DB, err=\n\t%v", err)
  213. sf.wg.Done(storeStreamName)
  214. sf.log.Debug("close(): done")
  215. }