kstore_kv.go 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229
  1. // package kstore_kv -- локальное быстрое key-value хранилище ядра.
  2. package kstore_kv
  3. import (
  4. "fmt"
  5. "os"
  6. "sync"
  7. "time"
  8. "github.com/dgraph-io/badger/v4"
  9. mL0 "gitp78su.ipnodns.ru/svi/kern/v4/lev0"
  10. "gitp78su.ipnodns.ru/svi/kern/v4/lev0/defs"
  11. mKh "gitp78su.ipnodns.ru/svi/kern/v4/lev0/helpers"
  12. mKs "gitp78su.ipnodns.ru/svi/kern/v4/lev0/kspec"
  13. mL1 "gitp78su.ipnodns.ru/svi/kern/v4/lev1"
  14. "gitp78su.ipnodns.ru/svi/kern/v4/lev2/kctx"
  15. )
  16. var (
  17. storeStreamName = defs.NewStreamName("kstore_kv") // Имя потока для ожидателя потоков
  18. )
  19. // kStoreKv -- локальное хранилище ядра.
  20. type kStoreKv struct {
  21. sync.RWMutex
  22. kCtx mKs.IKernelCtx
  23. lCtx mKs.ILocalCtx
  24. log mKs.ILogBuf
  25. wg mKs.IKernelWg
  26. storePath string
  27. db *badger.DB
  28. isWork mKs.ISafeBool
  29. }
  30. var (
  31. kernStore *kStoreKv // Глобальный объект
  32. block sync.Mutex
  33. )
  34. // GetKernelStore -- возвращает новое локальное хранилище ядра.
  35. func GetKernelStore() mKs.IKernelStoreKv {
  36. block.Lock()
  37. defer block.Unlock()
  38. if kernStore != nil {
  39. kernStore.log.Debug("GetKernelStore()")
  40. return kernStore
  41. }
  42. kCtx := kctx.GetKernelCtx()
  43. log := mL1.NewLogBuf(mL1.OptIsTerm(true), mL1.OptPrefix("kStoreKv"))
  44. sf := &kStoreKv{
  45. kCtx: kCtx,
  46. lCtx: mL1.NewLocalCtx(kCtx.Ctx()),
  47. wg: kCtx.Wg(),
  48. isWork: mL1.NewSafeBool(),
  49. log: log,
  50. }
  51. sf.open()
  52. kernStore = sf
  53. kCtx.Set("kernStoreKV", kernStore, "fast KV store on Badger")
  54. return kernStore
  55. }
  56. // Log -- возвращает локальный лог.
  57. func (sf *kStoreKv) Log() mKs.ILogBuf {
  58. return sf.log
  59. }
  60. // Set -- устанавливает значение по ключу.
  61. func (sf *kStoreKv) Set(key string, val []byte) mL0.IResult[bool] {
  62. sf.Lock()
  63. defer sf.Unlock()
  64. sf.log.Debug("Set(): key='%v'", key)
  65. fnSet := func(txn *badger.Txn) error {
  66. err := txn.Set([]byte(key), val)
  67. return err
  68. }
  69. err := sf.db.Update(fnSet)
  70. if err != nil {
  71. err := fmt.Errorf("Set(): key=%v, err=\n\t%w", key, err)
  72. sf.log.Err(err.Error())
  73. return mL0.NewErr[bool](err)
  74. }
  75. return mL0.NewRes(true)
  76. }
  77. // Get -- возвращает значение по ключу.
  78. func (sf *kStoreKv) Get(key string) mL0.IResult[[]byte] {
  79. sf.RLock()
  80. defer sf.RUnlock()
  81. sf.log.Debug("Get(): key='%v'", key)
  82. var binVal []byte
  83. fnGet := func(txn *badger.Txn) error {
  84. item, err := txn.Get([]byte(key))
  85. if err != nil {
  86. return err
  87. }
  88. binVal, err = item.ValueCopy(binVal)
  89. return err
  90. }
  91. err := sf.db.View(fnGet)
  92. if err != nil {
  93. err := fmt.Errorf("Get(): key=%v, err=\n\t%v", key, err)
  94. sf.log.Err(err.Error())
  95. return mL0.NewErr[[]byte](err)
  96. }
  97. return mL0.NewRes(binVal)
  98. }
  99. // ByPrefix -- фильтрует ключи по префиксу.
  100. func (sf *kStoreKv) ByPrefix(prefix string) mL0.IResult[[]string] {
  101. var (
  102. binKey []byte
  103. lstKey = []string{}
  104. )
  105. // fnValue := func(v []byte) error {
  106. // fmt.Printf("key=%s, value=%s\n", key, v)
  107. // return nil
  108. // }
  109. fnPrefix := func(txn *badger.Txn) error {
  110. it := txn.NewIterator(badger.DefaultIteratorOptions)
  111. defer it.Close()
  112. binPref := []byte(prefix)
  113. for it.Seek(binPref); it.ValidForPrefix(binPref); it.Next() {
  114. item := it.Item()
  115. binKey = item.Key()
  116. // err := item.Value(fnValue)
  117. // if err != nil {
  118. // return err
  119. // }
  120. lstKey = append(lstKey, string(binKey))
  121. }
  122. return nil
  123. }
  124. err := sf.db.View(fnPrefix)
  125. if err != nil {
  126. err := fmt.Errorf("ByPrefix(): in find, err=\n\t%w", err)
  127. return mL0.NewErr[[]string](err)
  128. }
  129. return mL0.NewRes(lstKey)
  130. }
  131. // Delete -- удалить ключ из хранилища.
  132. func (sf *kStoreKv) Delete(key string) mL0.IResult[bool] {
  133. sf.Lock()
  134. defer sf.Unlock()
  135. sf.log.Debug("Delete(): key='%v'", key)
  136. fnDelete := func(txn *badger.Txn) error {
  137. err := txn.Delete([]byte(key))
  138. return err
  139. }
  140. err := sf.db.Update(fnDelete)
  141. if err != nil {
  142. err := fmt.Errorf("Delete(): key=%v, err=\n\t%w", key, err)
  143. sf.log.Err(err.Error())
  144. return mL0.NewErr[bool](err)
  145. }
  146. return mL0.NewRes(true)
  147. }
  148. // Открывает базу при создании.
  149. func (sf *kStoreKv) open() {
  150. sf.Lock()
  151. defer sf.Unlock()
  152. sf.log.Debug("open()")
  153. strPath := os.Getenv("LOCAL_STORE_PATH")
  154. mKh.Hassert(strPath != "", "open(): env LOCAL_STORE_PATH not set")
  155. pwd, err := os.Getwd()
  156. mKh.Hassert(err == nil, "open(): in get PWD, err=\n\t%v", err)
  157. sf.storePath = pwd + strPath + "/db_local"
  158. err = os.MkdirAll(sf.storePath, 0750)
  159. mKh.Hassert(err == nil, "open(): in make dir %v, err=\n\t%v", sf.storePath, err)
  160. sf.db, err = badger.Open(badger.DefaultOptions(sf.storePath))
  161. mKh.Hassert(err == nil, "open(): in open DB %v, err=\n\t%v", sf.storePath, err)
  162. sf.wg.Add(storeStreamName)
  163. sf.isWork.Set()
  164. go sf.close()
  165. go sf.clean()
  166. }
  167. // Выполняет периодическую сборку мусора в файле.
  168. func (sf *kStoreKv) clean() {
  169. chRun := make(chan int, 2)
  170. defer close(chRun)
  171. fnClean := func() {
  172. sf.Lock()
  173. defer sf.Unlock()
  174. _ = sf.db.RunValueLogGC(0.7)
  175. }
  176. chRun <- 1
  177. for {
  178. select {
  179. case <-sf.kCtx.Ctx().Done(): // надо прекратить работу
  180. return
  181. case <-chRun: // Пора поработать
  182. fnClean()
  183. }
  184. time.Sleep(time.Second * 1)
  185. }
  186. }
  187. // Ожидает последнего потока под отдельной блокировкой.
  188. func (sf *kStoreKv) wait(chWait chan int) {
  189. for {
  190. time.Sleep(time.Millisecond * 5)
  191. if sf.wg.Len() <= 1 {
  192. break
  193. }
  194. }
  195. close(chWait)
  196. }
  197. // Ожидает закрытия контекста ядра, закрывает хранилище.
  198. func (sf *kStoreKv) close() {
  199. sf.kCtx.Wait()
  200. sf.Lock()
  201. defer sf.Unlock()
  202. if !sf.isWork.Get() {
  203. return
  204. }
  205. chWait := make(chan int, 2)
  206. go sf.wait(chWait)
  207. <-chWait
  208. sf.isWork.Reset()
  209. err := sf.db.Close()
  210. mKh.Assert(err == nil, "kStoreKv.close(): in close DB, err=\n\t%v", err)
  211. sf.wg.Done(storeStreamName)
  212. sf.log.Debug("close(): done")
  213. }