| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133 |
- // package kwg -- именованный ожидатель потоков ядра.
- //
- // Не позволяет завершиться ядру, если есть хоть один работающий поток
- package kwg
- import (
- "context"
- "sync"
- mL0 "gitp78su.ipnodns.ru/svi/kern/v4/lev0"
- mKd "gitp78su.ipnodns.ru/svi/kern/v4/lev0/defs"
- mKh "gitp78su.ipnodns.ru/svi/kern/v4/lev0/helpers"
- mKs "gitp78su.ipnodns.ru/svi/kern/v4/lev0/kspec"
- mL1 "gitp78su.ipnodns.ru/svi/kern/v4/lev1"
- )
- // kernelWg -- именованный ожидатель потоков ядра.
- type kernelWg struct {
- sync.RWMutex
- ctx context.Context
- dictStream map[*mKd.StreamName]bool // Словарь имён потоков с признаком работы
- isWork mKs.ISafeBool
- log mKs.ILogBuf
- }
- var (
- kernWg *kernelWg // Глобальный объект
- block sync.Mutex
- )
- // GetKernelWg -- возвращает новый именованный ожидатель потоков ядра.
- func GetKernelWg(ctx context.Context) mKs.IKernelWg {
- block.Lock()
- defer block.Unlock()
- if kernWg != nil {
- kernWg.log.Debug("GetKernelWg()")
- return kernWg
- }
- mKh.Hassert(ctx != nil, "GetKernelWg(): ctx==nil")
- sf := &kernelWg{
- ctx: ctx,
- dictStream: map[*mKd.StreamName]bool{},
- isWork: mL1.NewSafeBool(),
- log: mL1.NewLogBuf(mL1.OptIsTerm(true), mL1.OptPrefix("kernelWg")),
- }
- sf.log.Debug("GetKernelWg(): run")
- go sf.close()
- sf.isWork.Set()
- kernWg = sf
- return kernWg
- }
- // Log -- возвращает лог ожидателя потоков.
- func (sf *kernelWg) Log() mKs.ILogBuf {
- return sf.log
- }
- // Len -- возвращает размер списка ожидания потоков.
- func (sf *kernelWg) Len() int {
- sf.RLock()
- defer sf.RUnlock()
- return len(sf.dictStream)
- }
- // IsWork -- возвращает признак работы ядра.
- func (sf *kernelWg) IsWork() bool {
- return sf.isWork.Get()
- }
- // List -- возвращает список имён потоков на ожидании.
- func (sf *kernelWg) List() []*mKd.StreamName {
- sf.RLock()
- defer sf.RUnlock()
- lst := make([]*mKd.StreamName, 0, len(sf.dictStream))
- for name := range sf.dictStream {
- lst = append(lst, name)
- }
- return lst
- }
- // Done -- удаляет поток из ожидания.
- func (sf *kernelWg) Done(name *mKd.StreamName) {
- sf.Lock()
- defer sf.Unlock()
- delete(sf.dictStream, name)
- sf.log.Debug("Done(): stream(%v) done", name)
- }
- // Wait -- блокирующий вызов; возвращает управление, только когда все потоки завершили работу.
- func (sf *kernelWg) Wait() {
- for {
- mKh.SleepMs()
- if !sf.isWork.Get() {
- break
- }
- }
- sf.log.Debug("Wait(): done")
- }
- // Add -- добавляет поток в ожидание.
- func (sf *kernelWg) Add(name *mKd.StreamName) {
- sf.Lock()
- defer sf.Unlock()
- sf.log.Debug("Add(): stream='%v'", name)
- mL0.Hassert(sf.isWork.Get(), "Add(): stream=%v, work end", name)
- mKh.Hassert(name.Get() != "", "Add(): name stream is empty")
- _, isOk := sf.dictStream[name]
- mKh.Hassert(!isOk, "Add(): stream '%v' already exists", name)
- sf.dictStream[name] = true
- }
- // Ожидает окончания работы ожидателя групп.
- func (sf *kernelWg) close() {
- <-sf.ctx.Done()
- fnDone := func() bool {
- sf.Lock()
- defer sf.Unlock()
- return len(sf.dictStream) == 0
- }
- for {
- mKh.SleepMs()
- if fnDone() {
- break
- }
- }
- sf.Lock()
- defer sf.Unlock()
- if !sf.isWork.Get() {
- return
- }
- sf.isWork.Reset()
- sf.log.Debug("close(): end")
- }
|