// package kwg -- именованный ожидатель потоков ядра. // // Не позволяет завершиться ядру, если есть хоть один работающий поток package kwg import ( "context" "fmt" "sync" mKh "gitp78su.ipnodns.ru/svi/kern/v4/lev0/helpers" mKa "gitp78su.ipnodns.ru/svi/kern/v4/lev0/kalias" mKt "gitp78su.ipnodns.ru/svi/kern/v4/lev0/ktypes" mL1 "gitp78su.ipnodns.ru/svi/kern/v4/lev1" ) // kernelWg -- именованный ожидатель потоков ядра. type kernelWg struct { sync.RWMutex ctx context.Context dictStream map[mKa.AStreamName]bool // Словарь имён потоков с признаком работы isWork mKt.ISafeBool log mKt.ILogBuf } var ( kernWg *kernelWg // Глобальный объект block sync.Mutex ) // GetKernelWg -- возвращает новый именованный ожидатель потоков ядра. func GetKernelWg(ctx context.Context) mKt.IKernelWg { block.Lock() defer block.Unlock() if kernWg != nil { kernWg.log.Debug("GetKernelWg()") return kernWg } mKh.Hassert(ctx != nil, "GetKernelWg(): ctx==nil") sf := &kernelWg{ ctx: ctx, dictStream: map[mKa.AStreamName]bool{}, isWork: mL1.NewSafeBool(), log: mL1.NewLogBuf(mL1.OptIsTerm(true), mL1.OptPrefix("kernelWg")), } sf.log.Debug("GetKernelWg(): run") go sf.close() sf.isWork.Set() kernWg = sf return kernWg } // Log -- возвращает лог ожидателя потоков. func (sf *kernelWg) Log() mKt.ILogBuf { return sf.log } // Len -- возвращает размер списка ожидания потоков. func (sf *kernelWg) Len() int { sf.RLock() defer sf.RUnlock() return len(sf.dictStream) } // IsWork -- возвращает признак работы ядра. func (sf *kernelWg) IsWork() bool { return sf.isWork.Get() } // List -- возвращает список имён потоков на ожидании. func (sf *kernelWg) List() []mKa.AStreamName { sf.RLock() defer sf.RUnlock() lst := make([]mKa.AStreamName, 0, len(sf.dictStream)) for name := range sf.dictStream { lst = append(lst, name) } return lst } // Done -- удаляет поток из ожидания. func (sf *kernelWg) Done(name mKa.AStreamName) { sf.Lock() defer sf.Unlock() delete(sf.dictStream, name) sf.log.Debug("Done(): stream(%v) done", name) } // Wait -- блокирующий вызов; возвращает управление, только когда все потоки завершили работу. func (sf *kernelWg) Wait() { for { mKh.SleepMs() if !sf.isWork.Get() { break } } sf.log.Debug("Wait(): done") } // Add -- добавляет поток в ожидание. func (sf *kernelWg) Add(name mKa.AStreamName) mKt.IResult[bool] { sf.Lock() defer sf.Unlock() sf.log.Debug("Add(): stream='%v'", name) if !sf.isWork.Get() { err := fmt.Errorf("Add(): stream=%v, work end", name) return mL1.NewErr[bool](err) } mKh.Hassert(name != "", "Add(): name stream is empty") _, isOk := sf.dictStream[name] mKh.Hassert(!isOk, "Add(): stream '%v' already exists", name) sf.dictStream[name] = true return mL1.NewRes(true) } // Ожидает окончания работы ожидателя групп. func (sf *kernelWg) close() { <-sf.ctx.Done() fnDone := func() bool { sf.Lock() defer sf.Unlock() return len(sf.dictStream) == 0 } for { mKh.SleepMs() if fnDone() { break } } sf.Lock() defer sf.Unlock() if !sf.isWork.Get() { return } sf.isWork.Reset() sf.log.Debug("close(): end") }