kernel_store.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. // package kernel_store -- локальное хранилище ядра
  2. package kernel_store
  3. import (
  4. "fmt"
  5. "log"
  6. "os"
  7. "sync"
  8. "time"
  9. "github.com/dgraph-io/badger/v4"
  10. "wartank/kernel/internal/safe_bool"
  11. . "wartank/kernel/kernel_types"
  12. . "wartank/pkg/helpers"
  13. )
  14. const (
  15. storeStreamName = "kernel_store" // Имя потока для ожидателя потоков
  16. )
  17. // kernelStore -- локальное хранилище ядра
  18. type kernelStore struct {
  19. ctx ИЯдроКонтекст
  20. wg IKernelWg
  21. storePath string
  22. db *badger.DB
  23. isWork ИБезопБул
  24. block sync.RWMutex
  25. blockClose sync.Mutex
  26. }
  27. var (
  28. kernStore *kernelStore // Глобальный объект
  29. block sync.Mutex
  30. )
  31. // GetKernelStore -- возвращает новое локальное хранилище ядра
  32. func GetKernelStore(ctx ИЯдроКонтекст) ИЯдроХранилище {
  33. log.Println("GetKernelStore()")
  34. block.Lock()
  35. defer block.Unlock()
  36. if kernStore != nil {
  37. return kernStore
  38. }
  39. Паника(ctx != nil, "GetKernelStore(): IKernelCtx==nil")
  40. sf := &kernelStore{
  41. ctx: ctx,
  42. wg: ctx.Оп(),
  43. isWork: safe_bool.НовБезопБул_(),
  44. }
  45. sf.open()
  46. kernStore = sf
  47. ctx.Уст("kernStore", kernStore)
  48. return kernStore
  49. }
  50. // Set -- устанавливает значение по ключу
  51. func (sf *kernelStore) Set(key string, val []byte) error {
  52. sf.block.Lock()
  53. defer sf.block.Unlock()
  54. // if !sf.isWork.Get() {
  55. // return fmt.Errorf("kernelStore.Set(): DB already close")
  56. // }
  57. fnSet := func(txn *badger.Txn) error {
  58. err := txn.Set([]byte(key), val)
  59. return err
  60. }
  61. err := sf.db.Update(fnSet)
  62. if err != nil {
  63. return fmt.Errorf("kernelStore.Set(): key=%v, err=\n\t%w", key, err)
  64. }
  65. return nil
  66. }
  67. // Get -- возвращает значение по ключу
  68. func (sf *kernelStore) Get(key string) ([]byte, error) {
  69. sf.block.RLock()
  70. defer sf.block.RUnlock()
  71. var binVal []byte
  72. fnGet := func(txn *badger.Txn) error {
  73. item, err := txn.Get([]byte(key))
  74. if err != nil {
  75. return err
  76. }
  77. binVal, err = item.ValueCopy(binVal)
  78. return err
  79. }
  80. err := sf.db.View(fnGet)
  81. if err != nil {
  82. return nil, fmt.Errorf("kernelStore.Delete(): key=%v, err=\n\t%w", key, err)
  83. }
  84. return binVal, nil
  85. }
  86. // Delete -- удалить ключ из хранилища
  87. func (sf *kernelStore) Delete(key string) error {
  88. sf.block.Lock()
  89. defer sf.block.Unlock()
  90. fnDelete := func(txn *badger.Txn) error {
  91. err := txn.Delete([]byte(key))
  92. return err
  93. }
  94. err := sf.db.Update(fnDelete)
  95. if err != nil {
  96. return fmt.Errorf("kernelStore.Delete(): key=%v, err=\n\t%w", key, err)
  97. }
  98. return nil
  99. }
  100. // Открывает базу при создании
  101. func (sf *kernelStore) open() {
  102. sf.block.Lock()
  103. defer sf.block.Unlock()
  104. strPath := os.Getenv("LOCAL_STORE_PATH")
  105. Паника(strPath != "", "kernelStore.open(): env LOCAL_STORE_PATH not set")
  106. pwd, err := os.Getwd()
  107. Паника(err == nil, "kernelStore.open(): in get PWD, err=\n\t%v", err)
  108. sf.storePath = pwd + strPath + "/db_local"
  109. err = os.MkdirAll(sf.storePath, 0750)
  110. Паника(err == nil, "kernelStore.open(): in make dir %v, err=\n\t%v", sf.storePath, err)
  111. sf.db, err = badger.Open(badger.DefaultOptions(sf.storePath))
  112. Паника(err == nil, "kernelStore.open(): in open DB %v, err=\n\t%v", sf.storePath, err)
  113. err = sf.wg.Add(storeStreamName)
  114. Паника(err == nil, "kernelStore.open(): in add name stream to IKernelWg, err=\n\t%v", err)
  115. sf.isWork.Уст()
  116. go sf.close()
  117. go sf.clean()
  118. }
  119. // Выполняет периодическую сборку мусора в файле
  120. func (sf *kernelStore) clean() {
  121. chRun := make(chan int, 2)
  122. defer close(chRun)
  123. fnClean := func() {
  124. sf.block.Lock()
  125. defer sf.block.Unlock()
  126. _ = sf.db.RunValueLogGC(0.7)
  127. }
  128. chRun <- 1
  129. for {
  130. select {
  131. case <-sf.ctx.Конт().Done(): // надо прекратить работу
  132. return
  133. case <-chRun: // Пора поработать
  134. fnClean()
  135. }
  136. time.Sleep(time.Second * 1)
  137. }
  138. }
  139. // Ожидает последнего потока под отдельной блокировкой
  140. func (sf *kernelStore) wait(chWait chan int) {
  141. for {
  142. time.Sleep(time.Millisecond * 5)
  143. if sf.wg.Len() <= 1 {
  144. break
  145. }
  146. }
  147. close(chWait)
  148. }
  149. // Ожидает закрытия контекста ядра, закрывает хранилище
  150. func (sf *kernelStore) close() {
  151. <-sf.ctx.Конт().Done()
  152. sf.blockClose.Lock()
  153. defer sf.blockClose.Unlock()
  154. if !sf.isWork.Получ() {
  155. return
  156. }
  157. chWait := make(chan int, 2)
  158. go sf.wait(chWait)
  159. <-chWait
  160. sf.isWork.Сброс()
  161. err := sf.db.Close()
  162. Провер(err == nil, "kernelStore.close(): in close DB, err=\n\t%v", err)
  163. sf.wg.Done(storeStreamName)
  164. log.Println("kernelStore.close(): done")
  165. }