kstore_kv.go 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. // package kstore_kv -- локальное быстрое key-value хранилище ядра
  2. package kstore_kv
  3. import (
  4. "fmt"
  5. "log"
  6. "os"
  7. "sync"
  8. "time"
  9. "github.com/dgraph-io/badger/v4"
  10. . "gitp78su.ipnodns.ru/svi/kern/kc/helpers"
  11. "gitp78su.ipnodns.ru/svi/kern/kc/local_ctx"
  12. "gitp78su.ipnodns.ru/svi/kern/kc/safe_bool"
  13. "gitp78su.ipnodns.ru/svi/kern/krn/kctx"
  14. . "gitp78su.ipnodns.ru/svi/kern/krn/ktypes"
  15. )
  16. const (
  17. storeStreamName = "kstore_kv" // Имя потока для ожидателя потоков
  18. )
  19. // kStoreKv -- локальное хранилище ядра
  20. type kStoreKv struct {
  21. kCtx IKernelCtx
  22. ctx ILocalCtx
  23. log ILogBuf
  24. wg IKernelWg
  25. storePath string
  26. db *badger.DB
  27. isWork ISafeBool
  28. block sync.RWMutex
  29. blockClose sync.Mutex
  30. }
  31. var (
  32. kernStore *kStoreKv // Глобальный объект
  33. block sync.Mutex
  34. )
  35. // GetKernelStore -- возвращает новое локальное хранилище ядра
  36. func GetKernelStore() IKernelStoreKv {
  37. block.Lock()
  38. defer block.Unlock()
  39. log.Println("GetKernelStore()")
  40. if kernStore != nil {
  41. return kernStore
  42. }
  43. ctx := kctx.GetKernelCtx()
  44. sf := &kStoreKv{
  45. kCtx: ctx,
  46. ctx: local_ctx.NewLocalCtx(ctx.BaseCtx()),
  47. wg: ctx.Wg(),
  48. isWork: safe_bool.NewSafeBool(),
  49. }
  50. sf.log = sf.ctx.Log()
  51. sf.open()
  52. kernStore = sf
  53. ctx.Set("kernStoreKV", kernStore, "fast KV store on Badger")
  54. return kernStore
  55. }
  56. // Log -- возвращает локальный лог
  57. func (sf *kStoreKv) Log() ILogBuf {
  58. return sf.log
  59. }
  60. // Set -- устанавливает значение по ключу
  61. func (sf *kStoreKv) Set(key string, val []byte) error {
  62. sf.block.Lock()
  63. defer sf.block.Unlock()
  64. sf.log.Debug("kStoreKv.Set(): key='%v'", key)
  65. fnSet := func(txn *badger.Txn) error {
  66. err := txn.Set([]byte(key), val)
  67. return err
  68. }
  69. err := sf.db.Update(fnSet)
  70. if err != nil {
  71. strOut := fmt.Sprintf("kStoreKv.Set(): key=%v, err=\n\t%v", key, err)
  72. sf.log.Err(strOut)
  73. return fmt.Errorf(strOut, "")
  74. }
  75. return nil
  76. }
  77. // Get -- возвращает значение по ключу
  78. func (sf *kStoreKv) Get(key string) ([]byte, error) {
  79. sf.block.RLock()
  80. defer sf.block.RUnlock()
  81. sf.log.Debug("kStoreKv.Get(): key='%v'", key)
  82. var binVal []byte
  83. fnGet := func(txn *badger.Txn) error {
  84. item, err := txn.Get([]byte(key))
  85. if err != nil {
  86. return err
  87. }
  88. binVal, err = item.ValueCopy(binVal)
  89. return err
  90. }
  91. err := sf.db.View(fnGet)
  92. if err != nil {
  93. strOut := fmt.Sprintf("kStoreKv.Get(): key=%v, err=\n\t%v", key, err)
  94. sf.log.Err(strOut)
  95. return nil, fmt.Errorf(strOut, "")
  96. }
  97. return binVal, nil
  98. }
  99. // Delete -- удалить ключ из хранилища
  100. func (sf *kStoreKv) Delete(key string) error {
  101. sf.block.Lock()
  102. defer sf.block.Unlock()
  103. sf.log.Debug("kStoreKv.Delete(): key='%v'", key)
  104. fnDelete := func(txn *badger.Txn) error {
  105. err := txn.Delete([]byte(key))
  106. return err
  107. }
  108. err := sf.db.Update(fnDelete)
  109. if err != nil {
  110. strOut := fmt.Sprintf("kStoreKv.Delete(): key=%v, err=\n\t%v", key, err)
  111. sf.log.Err(strOut)
  112. return fmt.Errorf(strOut, "")
  113. }
  114. return nil
  115. }
  116. // Открывает базу при создании
  117. func (sf *kStoreKv) open() {
  118. sf.block.Lock()
  119. defer sf.block.Unlock()
  120. sf.log.Debug("kStoreKv.open()")
  121. strPath := os.Getenv("LOCAL_STORE_PATH")
  122. Hassert(strPath != "", "kStoreKv.open(): env LOCAL_STORE_PATH not set")
  123. pwd, err := os.Getwd()
  124. Hassert(err == nil, "kStoreKv.open(): in get PWD, err=\n\t%v", err)
  125. sf.storePath = pwd + strPath + "/db_local"
  126. err = os.MkdirAll(sf.storePath, 0750)
  127. Hassert(err == nil, "kStoreKv.open(): in make dir %v, err=\n\t%v", sf.storePath, err)
  128. sf.db, err = badger.Open(badger.DefaultOptions(sf.storePath))
  129. Hassert(err == nil, "kStoreKv.open(): in open DB %v, err=\n\t%v", sf.storePath, err)
  130. err = sf.wg.Add(storeStreamName)
  131. Hassert(err == nil, "kStoreKv.open(): in add name stream to IKernelWg, err=\n\t%v", err)
  132. sf.isWork.Set()
  133. go sf.close()
  134. go sf.clean()
  135. }
  136. // Выполняет периодическую сборку мусора в файле
  137. func (sf *kStoreKv) clean() {
  138. chRun := make(chan int, 2)
  139. defer close(chRun)
  140. fnClean := func() {
  141. sf.block.Lock()
  142. defer sf.block.Unlock()
  143. _ = sf.db.RunValueLogGC(0.7)
  144. }
  145. chRun <- 1
  146. for {
  147. select {
  148. case <-sf.kCtx.BaseCtx().Done(): // надо прекратить работу
  149. return
  150. case <-chRun: // Пора поработать
  151. fnClean()
  152. }
  153. time.Sleep(time.Second * 1)
  154. }
  155. }
  156. // Ожидает последнего потока под отдельной блокировкой
  157. func (sf *kStoreKv) wait(chWait chan int) {
  158. for {
  159. time.Sleep(time.Millisecond * 5)
  160. if sf.wg.Len() <= 1 {
  161. break
  162. }
  163. }
  164. close(chWait)
  165. }
  166. // Ожидает закрытия контекста ядра, закрывает хранилище
  167. func (sf *kStoreKv) close() {
  168. sf.kCtx.Done()
  169. sf.blockClose.Lock()
  170. defer sf.blockClose.Unlock()
  171. if !sf.isWork.Get() {
  172. return
  173. }
  174. chWait := make(chan int, 2)
  175. go sf.wait(chWait)
  176. <-chWait
  177. sf.isWork.Reset()
  178. err := sf.db.Close()
  179. Assert(err == nil, "kStoreKv.close(): in close DB, err=\n\t%v", err)
  180. sf.wg.Done(storeStreamName)
  181. sf.log.Debug("kStoreKv.close(): done")
  182. }