// package kstore_kv -- локальное быстрое key-value хранилище ядра. package kstore_kv import ( "fmt" "os" "sync" "time" "github.com/dgraph-io/badger/v4" . "gitp78su.ipnodns.ru/svi/kern/v4/lev0/helpers" . "gitp78su.ipnodns.ru/svi/kern/v4/lev0/ktypes" "gitp78su.ipnodns.ru/svi/kern/v4/lev1/local_ctx" "gitp78su.ipnodns.ru/svi/kern/v4/lev1/log_buf" . "gitp78su.ipnodns.ru/svi/kern/v4/lev1/result" "gitp78su.ipnodns.ru/svi/kern/v4/lev1/safe_bool" "gitp78su.ipnodns.ru/svi/kern/v4/lev2/kctx" ) const ( storeStreamName = "kstore_kv" // Имя потока для ожидателя потоков ) // kStoreKv -- локальное хранилище ядра. type kStoreKv struct { sync.RWMutex kCtx IKernelCtx lCtx ILocalCtx log ILogBuf wg IKernelWg storePath string db *badger.DB isWork ISafeBool } var ( kernStore *kStoreKv // Глобальный объект block sync.Mutex ) // GetKernelStore -- возвращает новое локальное хранилище ядра. func GetKernelStore() IResult[*kStoreKv] { block.Lock() defer block.Unlock() if kernStore != nil { kernStore.log.Debug("GetKernelStore()") return NewRes(kernStore) } resKernCtx := kctx.GetKernelCtx() if resKernCtx.IsErr() { err := fmt.Errorf("GetKernelStore(): in get kernel ctx, err=\n\t%w", resKernCtx.Err()) return NewErr[*kStoreKv](err) } kCtx := resKernCtx.Val() log := log_buf.NewLogBuf(log_buf.OptIsTerm(true), log_buf.OptPrefix("kStoreKv")) resLocCtx := local_ctx.NewLocalCtx(kCtx.Ctx()) if resLocCtx.IsErr() { err := fmt.Errorf("GetKernelStore(): in get local ctx, err=\n\t%w", resLocCtx.Err()) return NewErr[*kStoreKv](err) } sf := &kStoreKv{ kCtx: kCtx, lCtx: resLocCtx.Val(), wg: kCtx.Wg(), isWork: safe_bool.NewSafeBool(), log: log, } sf.open() kernStore = sf resSet := kCtx.Set("kernStoreKV", kernStore, "fast KV store on Badger") if resSet.IsErr() { err := fmt.Errorf("GetKernelStore(): in set kernel store KV to kernel ctx, err=\n\t%w", resSet.Err()) return NewErr[*kStoreKv](err) } return NewRes(kernStore) } // Log -- возвращает локальный лог. func (sf *kStoreKv) Log() ILogBuf { return sf.log } // Set -- устанавливает значение по ключу. func (sf *kStoreKv) Set(key string, val []byte) IResult[bool] { sf.Lock() defer sf.Unlock() sf.log.Debug("Set(): key='%v'", key) fnSet := func(txn *badger.Txn) error { err := txn.Set([]byte(key), val) return err } err := sf.db.Update(fnSet) if err != nil { err := fmt.Errorf("Set(): key=%v, err=\n\t%w", key, err) sf.log.Err(err.Error()) return NewErr[bool](err) } return NewRes(true) } // Get -- возвращает значение по ключу. func (sf *kStoreKv) Get(key string) IResult[[]byte] { sf.RLock() defer sf.RUnlock() sf.log.Debug("Get(): key='%v'", key) var binVal []byte fnGet := func(txn *badger.Txn) error { item, err := txn.Get([]byte(key)) if err != nil { return err } binVal, err = item.ValueCopy(binVal) return err } err := sf.db.View(fnGet) if err != nil { err := fmt.Errorf("Get(): key=%v, err=\n\t%v", key, err) sf.log.Err(err.Error()) return NewErr[[]byte](err) } return NewRes(binVal) } // ByPrefix -- фильтрует ключи по префиксу. func (sf *kStoreKv) ByPrefix(prefix string) IResult[[]string] { var ( binKey []byte lstKey = []string{} ) // fnValue := func(v []byte) error { // fmt.Printf("key=%s, value=%s\n", key, v) // return nil // } fnPrefix := func(txn *badger.Txn) error { it := txn.NewIterator(badger.DefaultIteratorOptions) defer it.Close() binPref := []byte(prefix) for it.Seek(binPref); it.ValidForPrefix(binPref); it.Next() { item := it.Item() binKey = item.Key() // err := item.Value(fnValue) // if err != nil { // return err // } lstKey = append(lstKey, string(binKey)) } return nil } err := sf.db.View(fnPrefix) if err != nil { err := fmt.Errorf("ByPrefix(): in find, err=\n\t%w", err) return NewErr[[]string](err) } return NewRes(lstKey) } // Delete -- удалить ключ из хранилища. func (sf *kStoreKv) Delete(key string) IResult[bool] { sf.Lock() defer sf.Unlock() sf.log.Debug("Delete(): key='%v'", key) fnDelete := func(txn *badger.Txn) error { err := txn.Delete([]byte(key)) return err } err := sf.db.Update(fnDelete) if err != nil { err := fmt.Errorf("Delete(): key=%v, err=\n\t%w", key, err) sf.log.Err(err.Error()) return NewErr[bool](err) } return NewRes(true) } // Открывает базу при создании. func (sf *kStoreKv) open() { sf.Lock() defer sf.Unlock() sf.log.Debug("open()") strPath := os.Getenv("LOCAL_STORE_PATH") Hassert(strPath != "", "open(): env LOCAL_STORE_PATH not set") pwd, err := os.Getwd() Hassert(err == nil, "open(): in get PWD, err=\n\t%v", err) sf.storePath = pwd + strPath + "/db_local" err = os.MkdirAll(sf.storePath, 0750) Hassert(err == nil, "open(): in make dir %v, err=\n\t%v", sf.storePath, err) sf.db, err = badger.Open(badger.DefaultOptions(sf.storePath)) Hassert(err == nil, "open(): in open DB %v, err=\n\t%v", sf.storePath, err) res := sf.wg.Add(storeStreamName) res.Hassert("open(): in add name stream to IKernelWg") sf.isWork.Set() go sf.close() go sf.clean() } // Выполняет периодическую сборку мусора в файле. func (sf *kStoreKv) clean() { chRun := make(chan int, 2) defer close(chRun) fnClean := func() { sf.Lock() defer sf.Unlock() _ = sf.db.RunValueLogGC(0.7) } chRun <- 1 for { select { case <-sf.kCtx.Ctx().Done(): // надо прекратить работу return case <-chRun: // Пора поработать fnClean() } time.Sleep(time.Second * 1) } } // Ожидает последнего потока под отдельной блокировкой. func (sf *kStoreKv) wait(chWait chan int) { for { time.Sleep(time.Millisecond * 5) if sf.wg.Len() <= 1 { break } } close(chWait) } // Ожидает закрытия контекста ядра, закрывает хранилище. func (sf *kStoreKv) close() { sf.kCtx.Wait() sf.Lock() defer sf.Unlock() if !sf.isWork.Get() { return } chWait := make(chan int, 2) go sf.wait(chWait) <-chWait sf.isWork.Reset() err := sf.db.Close() Assert(err == nil, "kStoreKv.close(): in close DB, err=\n\t%v", err) sf.wg.Done(storeStreamName) sf.log.Debug("close(): done") }