// package kstore_kv -- локальное быстрое key-value хранилище ядра. package kstore_kv import ( "fmt" "os" "sync" "time" "github.com/dgraph-io/badger/v4" mL0 "gitp78su.ipnodns.ru/svi/kern/v4/lev0" "gitp78su.ipnodns.ru/svi/kern/v4/lev0/defs" mKh "gitp78su.ipnodns.ru/svi/kern/v4/lev0/helpers" mKt "gitp78su.ipnodns.ru/svi/kern/v4/lev0/kspec" mL1 "gitp78su.ipnodns.ru/svi/kern/v4/lev1" "gitp78su.ipnodns.ru/svi/kern/v4/lev2/kctx" ) var ( storeStreamName = defs.NewStreamName("kstore_kv") // Имя потока для ожидателя потоков ) // kStoreKv -- локальное хранилище ядра. type kStoreKv struct { sync.RWMutex kCtx mKt.IKernelCtx lCtx mKt.ILocalCtx log mKt.ILogBuf wg mKt.IKernelWg storePath string db *badger.DB isWork mKt.ISafeBool } var ( kernStore *kStoreKv // Глобальный объект block sync.Mutex ) // GetKernelStore -- возвращает новое локальное хранилище ядра. func GetKernelStore() mKt.IKernelStoreKv { block.Lock() defer block.Unlock() if kernStore != nil { kernStore.log.Debug("GetKernelStore()") return kernStore } kCtx := kctx.GetKernelCtx() log := mL1.NewLogBuf(mL1.OptIsTerm(true), mL1.OptPrefix("kStoreKv")) sf := &kStoreKv{ kCtx: kCtx, lCtx: mL1.NewLocalCtx(kCtx.Ctx()), wg: kCtx.Wg(), isWork: mL1.NewSafeBool(), log: log, } sf.open() kernStore = sf kCtx.Set("kernStoreKV", kernStore, "fast KV store on Badger") return kernStore } // Log -- возвращает локальный лог. func (sf *kStoreKv) Log() mKt.ILogBuf { return sf.log } // Set -- устанавливает значение по ключу. func (sf *kStoreKv) Set(key string, val []byte) mL0.IResult[bool] { sf.Lock() defer sf.Unlock() sf.log.Debug("Set(): key='%v'", key) fnSet := func(txn *badger.Txn) error { err := txn.Set([]byte(key), val) return err } err := sf.db.Update(fnSet) if err != nil { err := fmt.Errorf("Set(): key=%v, err=\n\t%w", key, err) sf.log.Err(err.Error()) return mL0.NewErr[bool](err) } return mL0.NewRes(true) } // Get -- возвращает значение по ключу. func (sf *kStoreKv) Get(key string) mL0.IResult[[]byte] { sf.RLock() defer sf.RUnlock() sf.log.Debug("Get(): key='%v'", key) var binVal []byte fnGet := func(txn *badger.Txn) error { item, err := txn.Get([]byte(key)) if err != nil { return err } binVal, err = item.ValueCopy(binVal) return err } err := sf.db.View(fnGet) if err != nil { err := fmt.Errorf("Get(): key=%v, err=\n\t%v", key, err) sf.log.Err(err.Error()) return mL0.NewErr[[]byte](err) } return mL0.NewRes(binVal) } // ByPrefix -- фильтрует ключи по префиксу. func (sf *kStoreKv) ByPrefix(prefix string) mL0.IResult[[]string] { var ( binKey []byte lstKey = []string{} ) // fnValue := func(v []byte) error { // fmt.Printf("key=%s, value=%s\n", key, v) // return nil // } fnPrefix := func(txn *badger.Txn) error { it := txn.NewIterator(badger.DefaultIteratorOptions) defer it.Close() binPref := []byte(prefix) for it.Seek(binPref); it.ValidForPrefix(binPref); it.Next() { item := it.Item() binKey = item.Key() // err := item.Value(fnValue) // if err != nil { // return err // } lstKey = append(lstKey, string(binKey)) } return nil } err := sf.db.View(fnPrefix) if err != nil { err := fmt.Errorf("ByPrefix(): in find, err=\n\t%w", err) return mL0.NewErr[[]string](err) } return mL0.NewRes(lstKey) } // Delete -- удалить ключ из хранилища. func (sf *kStoreKv) Delete(key string) mL0.IResult[bool] { sf.Lock() defer sf.Unlock() sf.log.Debug("Delete(): key='%v'", key) fnDelete := func(txn *badger.Txn) error { err := txn.Delete([]byte(key)) return err } err := sf.db.Update(fnDelete) if err != nil { err := fmt.Errorf("Delete(): key=%v, err=\n\t%w", key, err) sf.log.Err(err.Error()) return mL0.NewErr[bool](err) } return mL0.NewRes(true) } // Открывает базу при создании. func (sf *kStoreKv) open() { sf.Lock() defer sf.Unlock() sf.log.Debug("open()") strPath := os.Getenv("LOCAL_STORE_PATH") mKh.Hassert(strPath != "", "open(): env LOCAL_STORE_PATH not set") pwd, err := os.Getwd() mKh.Hassert(err == nil, "open(): in get PWD, err=\n\t%v", err) sf.storePath = pwd + strPath + "/db_local" err = os.MkdirAll(sf.storePath, 0750) mKh.Hassert(err == nil, "open(): in make dir %v, err=\n\t%v", sf.storePath, err) sf.db, err = badger.Open(badger.DefaultOptions(sf.storePath)) mKh.Hassert(err == nil, "open(): in open DB %v, err=\n\t%v", sf.storePath, err) sf.wg.Add(storeStreamName) sf.isWork.Set() go sf.close() go sf.clean() } // Выполняет периодическую сборку мусора в файле. func (sf *kStoreKv) clean() { chRun := make(chan int, 2) defer close(chRun) fnClean := func() { sf.Lock() defer sf.Unlock() _ = sf.db.RunValueLogGC(0.7) } chRun <- 1 for { select { case <-sf.kCtx.Ctx().Done(): // надо прекратить работу return case <-chRun: // Пора поработать fnClean() } time.Sleep(time.Second * 1) } } // Ожидает последнего потока под отдельной блокировкой. func (sf *kStoreKv) wait(chWait chan int) { for { time.Sleep(time.Millisecond * 5) if sf.wg.Len() <= 1 { break } } close(chWait) } // Ожидает закрытия контекста ядра, закрывает хранилище. func (sf *kStoreKv) close() { sf.kCtx.Wait() sf.Lock() defer sf.Unlock() if !sf.isWork.Get() { return } chWait := make(chan int, 2) go sf.wait(chWait) <-chWait sf.isWork.Reset() err := sf.db.Close() mKh.Assert(err == nil, "kStoreKv.close(): in close DB, err=\n\t%v", err) sf.wg.Done(storeStreamName) sf.log.Debug("close(): done") }