// package kwg -- именованный ожидатель потоков ядра. // // Не позволяет завершиться ядру, если есть хоть один работающий поток package kwg import ( "context" "sync" mL0 "gitp78su.ipnodns.ru/svi/kern/v4/lev0" mKa "gitp78su.ipnodns.ru/svi/kern/v4/lev0/defs" mKh "gitp78su.ipnodns.ru/svi/kern/v4/lev0/helpers" mKt "gitp78su.ipnodns.ru/svi/kern/v4/lev0/kspec" mL1 "gitp78su.ipnodns.ru/svi/kern/v4/lev1" ) // kernelWg -- именованный ожидатель потоков ядра. type kernelWg struct { sync.RWMutex ctx context.Context dictStream map[*mKa.StreamName]bool // Словарь имён потоков с признаком работы isWork mKt.ISafeBool log mKt.ILogBuf } var ( kernWg *kernelWg // Глобальный объект block sync.Mutex ) // GetKernelWg -- возвращает новый именованный ожидатель потоков ядра. func GetKernelWg(ctx context.Context) mKt.IKernelWg { block.Lock() defer block.Unlock() if kernWg != nil { kernWg.log.Debug("GetKernelWg()") return kernWg } mKh.Hassert(ctx != nil, "GetKernelWg(): ctx==nil") sf := &kernelWg{ ctx: ctx, dictStream: map[*mKa.StreamName]bool{}, isWork: mL1.NewSafeBool(), log: mL1.NewLogBuf(mL1.OptIsTerm(true), mL1.OptPrefix("kernelWg")), } sf.log.Debug("GetKernelWg(): run") go sf.close() sf.isWork.Set() kernWg = sf return kernWg } // Log -- возвращает лог ожидателя потоков. func (sf *kernelWg) Log() mKt.ILogBuf { return sf.log } // Len -- возвращает размер списка ожидания потоков. func (sf *kernelWg) Len() int { sf.RLock() defer sf.RUnlock() return len(sf.dictStream) } // IsWork -- возвращает признак работы ядра. func (sf *kernelWg) IsWork() bool { return sf.isWork.Get() } // List -- возвращает список имён потоков на ожидании. func (sf *kernelWg) List() []*mKa.StreamName { sf.RLock() defer sf.RUnlock() lst := make([]*mKa.StreamName, 0, len(sf.dictStream)) for name := range sf.dictStream { lst = append(lst, name) } return lst } // Done -- удаляет поток из ожидания. func (sf *kernelWg) Done(name *mKa.StreamName) { sf.Lock() defer sf.Unlock() delete(sf.dictStream, name) sf.log.Debug("Done(): stream(%v) done", name) } // Wait -- блокирующий вызов; возвращает управление, только когда все потоки завершили работу. func (sf *kernelWg) Wait() { for { mKh.SleepMs() if !sf.isWork.Get() { break } } sf.log.Debug("Wait(): done") } // Add -- добавляет поток в ожидание. func (sf *kernelWg) Add(name *mKa.StreamName) { sf.Lock() defer sf.Unlock() sf.log.Debug("Add(): stream='%v'", name) mL0.Hassert(sf.isWork.Get(), "Add(): stream=%v, work end", name) mKh.Hassert(name.Get() != "", "Add(): name stream is empty") _, isOk := sf.dictStream[name] mKh.Hassert(!isOk, "Add(): stream '%v' already exists", name) sf.dictStream[name] = true } // Ожидает окончания работы ожидателя групп. func (sf *kernelWg) close() { <-sf.ctx.Done() fnDone := func() bool { sf.Lock() defer sf.Unlock() return len(sf.dictStream) == 0 } for { mKh.SleepMs() if fnDone() { break } } sf.Lock() defer sf.Unlock() if !sf.isWork.Get() { return } sf.isWork.Reset() sf.log.Debug("close(): end") }