// package kwg -- именованный ожидатель потоков ядра. // // Не позволяет завершиться ядру, если есть хоть один работающий поток package kwg import ( "context" "sync" mL0 "gitp78su.ipnodns.ru/svi/kern/v4/lev0" "gitp78su.ipnodns.ru/svi/kern/v4/lev0/defs/stream_name" "gitp78su.ipnodns.ru/svi/kern/v4/lev0/etypes/ebool" mKh "gitp78su.ipnodns.ru/svi/kern/v4/lev0/helpers" "gitp78su.ipnodns.ru/svi/kern/v4/lev0/kspec" mKs "gitp78su.ipnodns.ru/svi/kern/v4/lev0/kspec" mL1 "gitp78su.ipnodns.ru/svi/kern/v4/lev1" "gitp78su.ipnodns.ru/svi/kern/v4/lev1/log_buf" "gitp78su.ipnodns.ru/svi/kern/v4/lev1/safe_bool" ) // kernelWg -- именованный ожидатель потоков ядра. type kernelWg struct { sync.RWMutex ctx context.Context dictStream map[*stream_name.AStreamName]bool // Словарь имён потоков с признаком работы isWork mKs.ISafeBool log mKs.ILogBuf } var ( kernWg *kernelWg // Глобальный объект block sync.Mutex ) // GetKernelWg -- возвращает новый именованный ожидатель потоков ядра. func GetKernelWg(ctx context.Context) mKs.IKernelWg { block.Lock() defer block.Unlock() if kernWg != nil { kernWg.log.Debug("GetKernelWg()") return kernWg } mKh.Hassert(ctx != nil, "GetKernelWg(): ctx==nil") paramLogBuf := &log_buf.LogBufParam{ IsTerm_: safe_bool.NewSafeBool(true), Prefix_: "kernelWg", } sf := &kernelWg{ ctx: ctx, dictStream: map[*stream_name.AStreamName]bool{}, isWork: mL1.NewSafeBool(), log: mL1.NewLogBuf(paramLogBuf), } sf.log.Debug("GetKernelWg(): run") go sf.close() sf.isWork.Set() kernWg = sf return kernWg } // Log -- возвращает лог ожидателя потоков. func (sf *kernelWg) Log() mKs.ILogBuf { return sf.log } // Len -- возвращает размер списка ожидания потоков. func (sf *kernelWg) Len() int { sf.RLock() defer sf.RUnlock() return len(sf.dictStream) } // IsWork -- возвращает признак работы ядра. func (sf *kernelWg) IsWork() kspec.EBool { res := sf.isWork.Get() return ebool.NewEBool(res) } // List -- возвращает список имён потоков на ожидании. func (sf *kernelWg) List() []*stream_name.AStreamName { sf.RLock() defer sf.RUnlock() lst := make([]*stream_name.AStreamName, 0, len(sf.dictStream)) for name := range sf.dictStream { lst = append(lst, name) } return lst } // Done -- удаляет поток из ожидания. func (sf *kernelWg) Done(name *stream_name.AStreamName) { sf.Lock() defer sf.Unlock() delete(sf.dictStream, name) sf.log.Debug("Done(): stream(%v) done", name) } // Wait -- блокирующий вызов; возвращает управление, только когда все потоки завершили работу. func (sf *kernelWg) Wait() { for { mKh.SleepMs() if !sf.isWork.Get() { break } } sf.log.Debug("Wait(): done") } // Add -- добавляет поток в ожидание. func (sf *kernelWg) Add(name *stream_name.AStreamName) { sf.Lock() defer sf.Unlock() sf.log.Debug("Add(): stream='%v'", name) mL0.Hassert(sf.isWork.Get(), "Add(): stream=%v, work end", name) mKh.Hassert(name.Get() != "", "Add(): name stream is empty") _, isOk := sf.dictStream[name] mKh.Hassert(!isOk, "Add(): stream '%v' already exists", name) sf.dictStream[name] = true } // Ожидает окончания работы ожидателя групп. func (sf *kernelWg) close() { <-sf.ctx.Done() fnDone := func() bool { sf.Lock() defer sf.Unlock() return len(sf.dictStream) == 0 } for { mKh.SleepMs() if fnDone() { break } } sf.Lock() defer sf.Unlock() if !sf.isWork.Get() { return } sf.isWork.Reset() sf.log.Debug("close(): end") }