// package kernel_wg -- именованный ожидатель потоков ядра // // Не позволяет завершиться ядру, если есть хоть один работающий поток package kernel_wg import ( "context" "fmt" "log" "sync" "time" . "wartank/kernel/internal/kernel_alias" "wartank/kernel/internal/safe_bool" . "wartank/kernel/kernel_types" . "wartank/pkg/helpers" ) // ядроОп -- именованный ожидатель потоков ядра type ядроОп struct { ctx context.Context dictStream map[AStreamName]bool // Словарь имён потоков с признаком работы еслиРаботает ИБезопБул block sync.RWMutex } var ( kernWg *ядроОп // Глобальный объект ) // GetKernelWg -- возвращает новый именованный ожидатель потоков ядра func GetKernelWg(ctx context.Context) IKernelWg { log.Println("NewKernelWg()") if kernWg != nil { return kernWg } Паника(ctx != nil, "GetKernelWg(): ctx==nil") сам := &ядроОп{ ctx: ctx, dictStream: map[AStreamName]bool{}, еслиРаботает: safe_bool.НовБезопБул_(), } go сам.close() сам.еслиРаботает.Уст() kernWg = сам return kernWg } // Len -- возвращает размер списка ожидания потоков func (сам *ядроОп) Len() int { сам.block.RLock() defer сам.block.RUnlock() return len(сам.dictStream) } // IsWork -- возвращает признак работы ядра func (сам *ядроОп) IsWork() bool { return сам.еслиРаботает.Получ() } // List -- возвращает список имён потоков на ожидании func (сам *ядроОп) List() []AStreamName { сам.block.RLock() defer сам.block.RUnlock() lst := []AStreamName{} for name := range сам.dictStream { lst = append(lst, name) } return lst } // Done -- удаляет поток из ожидания func (сам *ядроОп) Done(name AStreamName) { сам.block.Lock() defer сам.block.Unlock() delete(сам.dictStream, name) } // Wait -- блокирующий вызов; возвращает управление, только когда все потоки завершили работу func (сам *ядроОп) Wait() { for { time.Sleep(time.Millisecond * 5) if !сам.еслиРаботает.Получ() { break } } log.Println("ядроОп.Wait(): done") } // Add -- добавляет поток в ожидание func (сам *ядроОп) Add(name AStreamName) error { log.Printf("ядроОп.Add(): stream='%v'\n", name) сам.block.Lock() defer сам.block.Unlock() if !сам.еслиРаботает.Получ() { return fmt.Errorf("ядроОп.Add(): stream=%v, work end", name) } Паника(name != "", "ядроОп.Add(): name stream is empty") _, isOk := сам.dictStream[name] Паника(!isOk, "ядроОп.Add(): stream '%v' already exists", name) сам.dictStream[name] = true return nil } // Ожидает окончания работы ожидателя групп func (сам *ядроОп) close() { <-сам.ctx.Done() fnDone := func() bool { сам.block.Lock() defer сам.block.Unlock() return len(сам.dictStream) == 0 } for { time.Sleep(time.Millisecond * 1) if fnDone() { break } } сам.block.Lock() defer сам.block.Unlock() сам.еслиРаботает.Сброс() log.Println("ядроОп.close(): done") }