| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180 |
- // package kernel_store -- локальное хранилище ядра
- package kernel_store
- import (
- "fmt"
- "log"
- "os"
- "sync"
- "time"
- "github.com/dgraph-io/badger/v4"
- "wartank/kernel/internal/safe_bool"
- . "wartank/kernel/kernel_types"
- . "wartank/pkg/helpers"
- )
- const (
- storeStreamName = "kernel_store" // Имя потока для ожидателя потоков
- )
- // kernelStore -- локальное хранилище ядра
- type kernelStore struct {
- ctx ИЯдроКонтекст
- wg IKernelWg
- storePath string
- db *badger.DB
- isWork ИБезопБул
- block sync.RWMutex
- blockClose sync.Mutex
- }
- var (
- kernStore *kernelStore // Глобальный объект
- block sync.Mutex
- )
- // GetKernelStore -- возвращает новое локальное хранилище ядра
- func GetKernelStore(ctx ИЯдроКонтекст) ИЯдроХранилище {
- log.Println("GetKernelStore()")
- block.Lock()
- defer block.Unlock()
- if kernStore != nil {
- return kernStore
- }
- Паника(ctx != nil, "GetKernelStore(): IKernelCtx==nil")
- sf := &kernelStore{
- ctx: ctx,
- wg: ctx.Оп(),
- isWork: safe_bool.НовБезопБул_(),
- }
- sf.open()
- kernStore = sf
- ctx.Уст("kernStore", kernStore)
- return kernStore
- }
- // Set -- устанавливает значение по ключу
- func (sf *kernelStore) Set(key string, val []byte) error {
- sf.block.Lock()
- defer sf.block.Unlock()
- // if !sf.isWork.Get() {
- // return fmt.Errorf("kernelStore.Set(): DB already close")
- // }
- fnSet := func(txn *badger.Txn) error {
- err := txn.Set([]byte(key), val)
- return err
- }
- err := sf.db.Update(fnSet)
- if err != nil {
- return fmt.Errorf("kernelStore.Set(): key=%v, err=\n\t%w", key, err)
- }
- return nil
- }
- // Get -- возвращает значение по ключу
- func (sf *kernelStore) Get(key string) ([]byte, error) {
- sf.block.RLock()
- defer sf.block.RUnlock()
- var binVal []byte
- fnGet := func(txn *badger.Txn) error {
- item, err := txn.Get([]byte(key))
- if err != nil {
- return err
- }
- binVal, err = item.ValueCopy(binVal)
- return err
- }
- err := sf.db.View(fnGet)
- if err != nil {
- return nil, fmt.Errorf("kernelStore.Delete(): key=%v, err=\n\t%w", key, err)
- }
- return binVal, nil
- }
- // Delete -- удалить ключ из хранилища
- func (sf *kernelStore) Delete(key string) error {
- sf.block.Lock()
- defer sf.block.Unlock()
- fnDelete := func(txn *badger.Txn) error {
- err := txn.Delete([]byte(key))
- return err
- }
- err := sf.db.Update(fnDelete)
- if err != nil {
- return fmt.Errorf("kernelStore.Delete(): key=%v, err=\n\t%w", key, err)
- }
- return nil
- }
- // Открывает базу при создании
- func (sf *kernelStore) open() {
- sf.block.Lock()
- defer sf.block.Unlock()
- strPath := os.Getenv("LOCAL_STORE_PATH")
- Паника(strPath != "", "kernelStore.open(): env LOCAL_STORE_PATH not set")
- pwd, err := os.Getwd()
- Паника(err == nil, "kernelStore.open(): in get PWD, err=\n\t%v", err)
- sf.storePath = pwd + strPath + "/db_local"
- err = os.MkdirAll(sf.storePath, 0750)
- Паника(err == nil, "kernelStore.open(): in make dir %v, err=\n\t%v", sf.storePath, err)
- sf.db, err = badger.Open(badger.DefaultOptions(sf.storePath))
- Паника(err == nil, "kernelStore.open(): in open DB %v, err=\n\t%v", sf.storePath, err)
- err = sf.wg.Add(storeStreamName)
- Паника(err == nil, "kernelStore.open(): in add name stream to IKernelWg, err=\n\t%v", err)
- sf.isWork.Уст()
- go sf.close()
- go sf.clean()
- }
- // Выполняет периодическую сборку мусора в файле
- func (sf *kernelStore) clean() {
- chRun := make(chan int, 2)
- defer close(chRun)
- fnClean := func() {
- sf.block.Lock()
- defer sf.block.Unlock()
- _ = sf.db.RunValueLogGC(0.7)
- }
- chRun <- 1
- for {
- select {
- case <-sf.ctx.Конт().Done(): // надо прекратить работу
- return
- case <-chRun: // Пора поработать
- fnClean()
- }
- time.Sleep(time.Second * 1)
- }
- }
- // Ожидает последнего потока под отдельной блокировкой
- func (sf *kernelStore) wait(chWait chan int) {
- for {
- time.Sleep(time.Millisecond * 5)
- if sf.wg.Len() <= 1 {
- break
- }
- }
- close(chWait)
- }
- // Ожидает закрытия контекста ядра, закрывает хранилище
- func (sf *kernelStore) close() {
- <-sf.ctx.Конт().Done()
- sf.blockClose.Lock()
- defer sf.blockClose.Unlock()
- if !sf.isWork.Получ() {
- return
- }
- chWait := make(chan int, 2)
- go sf.wait(chWait)
- <-chWait
- sf.isWork.Сброс()
- err := sf.db.Close()
- Провер(err == nil, "kernelStore.close(): in close DB, err=\n\t%v", err)
- sf.wg.Done(storeStreamName)
- log.Println("kernelStore.close(): done")
- }
|