kernel_wg.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. // package kernel_wg -- именованный ожидатель потоков ядра
  2. //
  3. // Не позволяет завершиться ядру, если есть хоть один работающий поток
  4. package kernel_wg
  5. import (
  6. "context"
  7. "fmt"
  8. "log"
  9. "sync"
  10. "time"
  11. . "wartank/kernel/internal/kernel_alias"
  12. "wartank/kernel/internal/safe_bool"
  13. . "wartank/kernel/kernel_types"
  14. . "wartank/pkg/helpers"
  15. )
  16. // ядроОп -- именованный ожидатель потоков ядра
  17. type ядроОп struct {
  18. ctx context.Context
  19. dictStream map[AStreamName]bool // Словарь имён потоков с признаком работы
  20. еслиРаботает ИБезопБул
  21. block sync.RWMutex
  22. }
  23. var (
  24. kernWg *ядроОп // Глобальный объект
  25. )
  26. // GetKernelWg -- возвращает новый именованный ожидатель потоков ядра
  27. func GetKernelWg(ctx context.Context) IKernelWg {
  28. log.Println("NewKernelWg()")
  29. if kernWg != nil {
  30. return kernWg
  31. }
  32. Паника(ctx != nil, "GetKernelWg(): ctx==nil")
  33. сам := &ядроОп{
  34. ctx: ctx,
  35. dictStream: map[AStreamName]bool{},
  36. еслиРаботает: safe_bool.НовБезопБул_(),
  37. }
  38. go сам.close()
  39. сам.еслиРаботает.Уст()
  40. kernWg = сам
  41. return kernWg
  42. }
  43. // Len -- возвращает размер списка ожидания потоков
  44. func (сам *ядроОп) Len() int {
  45. сам.block.RLock()
  46. defer сам.block.RUnlock()
  47. return len(сам.dictStream)
  48. }
  49. // IsWork -- возвращает признак работы ядра
  50. func (сам *ядроОп) IsWork() bool {
  51. return сам.еслиРаботает.Получ()
  52. }
  53. // List -- возвращает список имён потоков на ожидании
  54. func (сам *ядроОп) List() []AStreamName {
  55. сам.block.RLock()
  56. defer сам.block.RUnlock()
  57. lst := []AStreamName{}
  58. for name := range сам.dictStream {
  59. lst = append(lst, name)
  60. }
  61. return lst
  62. }
  63. // Done -- удаляет поток из ожидания
  64. func (сам *ядроОп) Done(name AStreamName) {
  65. сам.block.Lock()
  66. defer сам.block.Unlock()
  67. delete(сам.dictStream, name)
  68. }
  69. // Wait -- блокирующий вызов; возвращает управление, только когда все потоки завершили работу
  70. func (сам *ядроОп) Wait() {
  71. for {
  72. time.Sleep(time.Millisecond * 5)
  73. if !сам.еслиРаботает.Получ() {
  74. break
  75. }
  76. }
  77. log.Println("ядроОп.Wait(): done")
  78. }
  79. // Add -- добавляет поток в ожидание
  80. func (сам *ядроОп) Add(name AStreamName) error {
  81. log.Printf("ядроОп.Add(): stream='%v'\n", name)
  82. сам.block.Lock()
  83. defer сам.block.Unlock()
  84. if !сам.еслиРаботает.Получ() {
  85. return fmt.Errorf("ядроОп.Add(): stream=%v, work end", name)
  86. }
  87. Паника(name != "", "ядроОп.Add(): name stream is empty")
  88. _, isOk := сам.dictStream[name]
  89. Паника(!isOk, "ядроОп.Add(): stream '%v' already exists", name)
  90. сам.dictStream[name] = true
  91. return nil
  92. }
  93. // Ожидает окончания работы ожидателя групп
  94. func (сам *ядроОп) close() {
  95. <-сам.ctx.Done()
  96. fnDone := func() bool {
  97. сам.block.Lock()
  98. defer сам.block.Unlock()
  99. return len(сам.dictStream) == 0
  100. }
  101. for {
  102. time.Sleep(time.Millisecond * 1)
  103. if fnDone() {
  104. break
  105. }
  106. }
  107. сам.block.Lock()
  108. defer сам.block.Unlock()
  109. сам.еслиРаботает.Сброс()
  110. log.Println("ядроОп.close(): done")
  111. }