kwg.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139
  1. // package kwg -- именованный ожидатель потоков ядра
  2. //
  3. // Не позволяет завершиться ядру, если есть хоть один работающий поток
  4. package kwg
  5. import (
  6. "context"
  7. "fmt"
  8. "sync"
  9. "gitp78su.ipnodns.ru/svi/kern/v4/kc/log_buf"
  10. . "gitp78su.ipnodns.ru/svi/kern/v4/lev0/kalias"
  11. . "gitp78su.ipnodns.ru/svi/kern/v4/lev0/ktypes"
  12. . "gitp78su.ipnodns.ru/svi/kern/v4/lev1/helpers"
  13. . "gitp78su.ipnodns.ru/svi/kern/v4/lev1/result"
  14. "gitp78su.ipnodns.ru/svi/kern/v4/lev1/safe_bool"
  15. )
  16. // kernelWg -- именованный ожидатель потоков ядра
  17. type kernelWg struct {
  18. sync.RWMutex
  19. ctx context.Context
  20. dictStream map[AStreamName]bool // Словарь имён потоков с признаком работы
  21. isWork ISafeBool
  22. log ILogBuf
  23. }
  24. var (
  25. kernWg *kernelWg // Глобальный объект
  26. block sync.Mutex
  27. )
  28. // GetKernelWg -- возвращает новый именованный ожидатель потоков ядра
  29. func GetKernelWg(ctx context.Context) IKernelWg {
  30. block.Lock()
  31. defer block.Unlock()
  32. if kernWg != nil {
  33. kernWg.log.Debug("GetKernelWg()")
  34. return kernWg
  35. }
  36. Hassert(ctx != nil, "GetKernelWg(): ctx==nil")
  37. sf := &kernelWg{
  38. ctx: ctx,
  39. dictStream: map[AStreamName]bool{},
  40. isWork: safe_bool.NewSafeBool(),
  41. log: log_buf.NewLogBuf(log_buf.OptIsTerm(true), log_buf.OptPrefix("kernelWg")),
  42. }
  43. sf.log.Debug("GetKernelWg(): run")
  44. go sf.close()
  45. sf.isWork.Set()
  46. kernWg = sf
  47. return kernWg
  48. }
  49. // Log -- возвращает лог ожидателя потоков
  50. func (sf *kernelWg) Log() ILogBuf {
  51. return sf.log
  52. }
  53. // Len -- возвращает размер списка ожидания потоков
  54. func (sf *kernelWg) Len() int {
  55. sf.RLock()
  56. defer sf.RUnlock()
  57. return len(sf.dictStream)
  58. }
  59. // IsWork -- возвращает признак работы ядра
  60. func (sf *kernelWg) IsWork() bool {
  61. return sf.isWork.Get()
  62. }
  63. // List -- возвращает список имён потоков на ожидании
  64. func (sf *kernelWg) List() []AStreamName {
  65. sf.RLock()
  66. defer sf.RUnlock()
  67. lst := []AStreamName{}
  68. for name := range sf.dictStream {
  69. lst = append(lst, name)
  70. }
  71. return lst
  72. }
  73. // Done -- удаляет поток из ожидания
  74. func (sf *kernelWg) Done(name AStreamName) {
  75. sf.Lock()
  76. defer sf.Unlock()
  77. delete(sf.dictStream, name)
  78. sf.log.Debug("Done(): stream(%v) done", name)
  79. }
  80. // Wait -- блокирующий вызов; возвращает управление, только когда все потоки завершили работу
  81. func (sf *kernelWg) Wait() {
  82. for {
  83. SleepMs()
  84. if !sf.isWork.Get() {
  85. break
  86. }
  87. }
  88. sf.log.Debug("Wait(): done")
  89. }
  90. // Add -- добавляет поток в ожидание
  91. func (sf *kernelWg) Add(name AStreamName) IResult[bool] {
  92. sf.Lock()
  93. defer sf.Unlock()
  94. sf.log.Debug("Add(): stream='%v'", name)
  95. if !sf.isWork.Get() {
  96. err := fmt.Errorf("Add(): stream=%v, work end", name)
  97. return NewErr[bool](err)
  98. }
  99. Hassert(name != "", "Add(): name stream is empty")
  100. _, isOk := sf.dictStream[name]
  101. Hassert(!isOk, "Add(): stream '%v' already exists", name)
  102. sf.dictStream[name] = true
  103. return NewRes(true)
  104. }
  105. // Ожидает окончания работы ожидателя групп
  106. func (sf *kernelWg) close() {
  107. <-sf.ctx.Done()
  108. fnDone := func() bool {
  109. sf.Lock()
  110. defer sf.Unlock()
  111. return len(sf.dictStream) == 0
  112. }
  113. for {
  114. SleepMs()
  115. if fnDone() {
  116. break
  117. }
  118. }
  119. sf.Lock()
  120. defer sf.Unlock()
  121. if !sf.isWork.Get() {
  122. return
  123. }
  124. sf.isWork.Reset()
  125. sf.log.Debug("close(): end")
  126. }