| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229 |
- // package kstore_kv -- локальное быстрое key-value хранилище ядра.
- package kstore_kv
- import (
- "fmt"
- "os"
- "sync"
- "time"
- "github.com/dgraph-io/badger/v4"
- mL0 "gitp78su.ipnodns.ru/svi/kern/v4/lev0"
- "gitp78su.ipnodns.ru/svi/kern/v4/lev0/defs"
- mKh "gitp78su.ipnodns.ru/svi/kern/v4/lev0/helpers"
- mKs "gitp78su.ipnodns.ru/svi/kern/v4/lev0/kspec"
- mL1 "gitp78su.ipnodns.ru/svi/kern/v4/lev1"
- "gitp78su.ipnodns.ru/svi/kern/v4/lev2/kctx"
- )
- var (
- storeStreamName = defs.NewStreamName("kstore_kv") // Имя потока для ожидателя потоков
- )
- // kStoreKv -- локальное хранилище ядра.
- type kStoreKv struct {
- sync.RWMutex
- kCtx mKs.IKernelCtx
- lCtx mKs.ILocalCtx
- log mKs.ILogBuf
- wg mKs.IKernelWg
- storePath string
- db *badger.DB
- isWork mKs.ISafeBool
- }
- var (
- kernStore *kStoreKv // Глобальный объект
- block sync.Mutex
- )
- // GetKernelStore -- возвращает новое локальное хранилище ядра.
- func GetKernelStore() mKs.IKernelStoreKv {
- block.Lock()
- defer block.Unlock()
- if kernStore != nil {
- kernStore.log.Debug("GetKernelStore()")
- return kernStore
- }
- kCtx := kctx.GetKernelCtx()
- log := mL1.NewLogBuf(mL1.OptIsTerm(true), mL1.OptPrefix("kStoreKv"))
- sf := &kStoreKv{
- kCtx: kCtx,
- lCtx: mL1.NewLocalCtx(kCtx.Ctx()),
- wg: kCtx.Wg(),
- isWork: mL1.NewSafeBool(),
- log: log,
- }
- sf.open()
- kernStore = sf
- kCtx.Set("kernStoreKV", kernStore, "fast KV store on Badger")
- return kernStore
- }
- // Log -- возвращает локальный лог.
- func (sf *kStoreKv) Log() mKs.ILogBuf {
- return sf.log
- }
- // Set -- устанавливает значение по ключу.
- func (sf *kStoreKv) Set(key string, val []byte) mL0.IResult[bool] {
- sf.Lock()
- defer sf.Unlock()
- sf.log.Debug("Set(): key='%v'", key)
- fnSet := func(txn *badger.Txn) error {
- err := txn.Set([]byte(key), val)
- return err
- }
- err := sf.db.Update(fnSet)
- if err != nil {
- err := fmt.Errorf("Set(): key=%v, err=\n\t%w", key, err)
- sf.log.Err(err.Error())
- return mL0.NewErr[bool](err)
- }
- return mL0.NewRes(true)
- }
- // Get -- возвращает значение по ключу.
- func (sf *kStoreKv) Get(key string) mL0.IResult[[]byte] {
- sf.RLock()
- defer sf.RUnlock()
- sf.log.Debug("Get(): key='%v'", key)
- var binVal []byte
- fnGet := func(txn *badger.Txn) error {
- item, err := txn.Get([]byte(key))
- if err != nil {
- return err
- }
- binVal, err = item.ValueCopy(binVal)
- return err
- }
- err := sf.db.View(fnGet)
- if err != nil {
- err := fmt.Errorf("Get(): key=%v, err=\n\t%v", key, err)
- sf.log.Err(err.Error())
- return mL0.NewErr[[]byte](err)
- }
- return mL0.NewRes(binVal)
- }
- // ByPrefix -- фильтрует ключи по префиксу.
- func (sf *kStoreKv) ByPrefix(prefix string) mL0.IResult[[]string] {
- var (
- binKey []byte
- lstKey = []string{}
- )
- // fnValue := func(v []byte) error {
- // fmt.Printf("key=%s, value=%s\n", key, v)
- // return nil
- // }
- fnPrefix := func(txn *badger.Txn) error {
- it := txn.NewIterator(badger.DefaultIteratorOptions)
- defer it.Close()
- binPref := []byte(prefix)
- for it.Seek(binPref); it.ValidForPrefix(binPref); it.Next() {
- item := it.Item()
- binKey = item.Key()
- // err := item.Value(fnValue)
- // if err != nil {
- // return err
- // }
- lstKey = append(lstKey, string(binKey))
- }
- return nil
- }
- err := sf.db.View(fnPrefix)
- if err != nil {
- err := fmt.Errorf("ByPrefix(): in find, err=\n\t%w", err)
- return mL0.NewErr[[]string](err)
- }
- return mL0.NewRes(lstKey)
- }
- // Delete -- удалить ключ из хранилища.
- func (sf *kStoreKv) Delete(key string) mL0.IResult[bool] {
- sf.Lock()
- defer sf.Unlock()
- sf.log.Debug("Delete(): key='%v'", key)
- fnDelete := func(txn *badger.Txn) error {
- err := txn.Delete([]byte(key))
- return err
- }
- err := sf.db.Update(fnDelete)
- if err != nil {
- err := fmt.Errorf("Delete(): key=%v, err=\n\t%w", key, err)
- sf.log.Err(err.Error())
- return mL0.NewErr[bool](err)
- }
- return mL0.NewRes(true)
- }
- // Открывает базу при создании.
- func (sf *kStoreKv) open() {
- sf.Lock()
- defer sf.Unlock()
- sf.log.Debug("open()")
- strPath := os.Getenv("LOCAL_STORE_PATH")
- mKh.Hassert(strPath != "", "open(): env LOCAL_STORE_PATH not set")
- pwd, err := os.Getwd()
- mKh.Hassert(err == nil, "open(): in get PWD, err=\n\t%v", err)
- sf.storePath = pwd + strPath + "/db_local"
- err = os.MkdirAll(sf.storePath, 0750)
- mKh.Hassert(err == nil, "open(): in make dir %v, err=\n\t%v", sf.storePath, err)
- sf.db, err = badger.Open(badger.DefaultOptions(sf.storePath))
- mKh.Hassert(err == nil, "open(): in open DB %v, err=\n\t%v", sf.storePath, err)
- sf.wg.Add(storeStreamName)
- sf.isWork.Set()
- go sf.close()
- go sf.clean()
- }
- // Выполняет периодическую сборку мусора в файле.
- func (sf *kStoreKv) clean() {
- chRun := make(chan int, 2)
- defer close(chRun)
- fnClean := func() {
- sf.Lock()
- defer sf.Unlock()
- _ = sf.db.RunValueLogGC(0.7)
- }
- chRun <- 1
- for {
- select {
- case <-sf.kCtx.Ctx().Done(): // надо прекратить работу
- return
- case <-chRun: // Пора поработать
- fnClean()
- }
- time.Sleep(time.Second * 1)
- }
- }
- // Ожидает последнего потока под отдельной блокировкой.
- func (sf *kStoreKv) wait(chWait chan int) {
- for {
- time.Sleep(time.Millisecond * 5)
- if sf.wg.Len() <= 1 {
- break
- }
- }
- close(chWait)
- }
- // Ожидает закрытия контекста ядра, закрывает хранилище.
- func (sf *kStoreKv) close() {
- sf.kCtx.Wait()
- sf.Lock()
- defer sf.Unlock()
- if !sf.isWork.Get() {
- return
- }
- chWait := make(chan int, 2)
- go sf.wait(chWait)
- <-chWait
- sf.isWork.Reset()
- err := sf.db.Close()
- mKh.Assert(err == nil, "kStoreKv.close(): in close DB, err=\n\t%v", err)
- sf.wg.Done(storeStreamName)
- sf.log.Debug("close(): done")
- }
|