| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123 |
- // package kernel_wg -- именованный ожидатель потоков ядра
- //
- // Не позволяет завершиться ядру, если есть хоть один работающий поток
- package kernel_wg
- import (
- "context"
- "fmt"
- "log"
- "sync"
- "time"
- . "wartank/kernel/internal/kernel_alias"
- "wartank/kernel/internal/safe_bool"
- . "wartank/kernel/kernel_types"
- . "wartank/pkg/helpers"
- )
- // ядроОп -- именованный ожидатель потоков ядра
- type ядроОп struct {
- ctx context.Context
- dictStream map[AStreamName]bool // Словарь имён потоков с признаком работы
- еслиРаботает ИБезопБул
- block sync.RWMutex
- }
- var (
- kernWg *ядроОп // Глобальный объект
- )
- // GetKernelWg -- возвращает новый именованный ожидатель потоков ядра
- func GetKernelWg(ctx context.Context) IKernelWg {
- log.Println("NewKernelWg()")
- if kernWg != nil {
- return kernWg
- }
- Паника(ctx != nil, "GetKernelWg(): ctx==nil")
- сам := &ядроОп{
- ctx: ctx,
- dictStream: map[AStreamName]bool{},
- еслиРаботает: safe_bool.НовБезопБул_(),
- }
- go сам.close()
- сам.еслиРаботает.Уст()
- kernWg = сам
- return kernWg
- }
- // Len -- возвращает размер списка ожидания потоков
- func (сам *ядроОп) Len() int {
- сам.block.RLock()
- defer сам.block.RUnlock()
- return len(сам.dictStream)
- }
- // IsWork -- возвращает признак работы ядра
- func (сам *ядроОп) IsWork() bool {
- return сам.еслиРаботает.Получ()
- }
- // List -- возвращает список имён потоков на ожидании
- func (сам *ядроОп) List() []AStreamName {
- сам.block.RLock()
- defer сам.block.RUnlock()
- lst := []AStreamName{}
- for name := range сам.dictStream {
- lst = append(lst, name)
- }
- return lst
- }
- // Done -- удаляет поток из ожидания
- func (сам *ядроОп) Done(name AStreamName) {
- сам.block.Lock()
- defer сам.block.Unlock()
- delete(сам.dictStream, name)
- }
- // Wait -- блокирующий вызов; возвращает управление, только когда все потоки завершили работу
- func (сам *ядроОп) Wait() {
- for {
- time.Sleep(time.Millisecond * 5)
- if !сам.еслиРаботает.Получ() {
- break
- }
- }
- log.Println("ядроОп.Wait(): done")
- }
- // Add -- добавляет поток в ожидание
- func (сам *ядроОп) Add(name AStreamName) error {
- log.Printf("ядроОп.Add(): stream='%v'\n", name)
- сам.block.Lock()
- defer сам.block.Unlock()
- if !сам.еслиРаботает.Получ() {
- return fmt.Errorf("ядроОп.Add(): stream=%v, work end", name)
- }
- Паника(name != "", "ядроОп.Add(): name stream is empty")
- _, isOk := сам.dictStream[name]
- Паника(!isOk, "ядроОп.Add(): stream '%v' already exists", name)
- сам.dictStream[name] = true
- return nil
- }
- // Ожидает окончания работы ожидателя групп
- func (сам *ядроОп) close() {
- <-сам.ctx.Done()
- fnDone := func() bool {
- сам.block.Lock()
- defer сам.block.Unlock()
- return len(сам.dictStream) == 0
- }
- for {
- time.Sleep(time.Millisecond * 1)
- if fnDone() {
- break
- }
- }
- сам.block.Lock()
- defer сам.block.Unlock()
- сам.еслиРаботает.Сброс()
- log.Println("ядроОп.close(): done")
- }
|