kwg.go 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. // package kwg -- именованный ожидатель потоков ядра
  2. //
  3. // Не позволяет завершиться ядру, если есть хоть один работающий поток
  4. package kwg
  5. import (
  6. "context"
  7. "fmt"
  8. "sync"
  9. . "gitp78su.ipnodns.ru/svi/kern/kc/helpers"
  10. "gitp78su.ipnodns.ru/svi/kern/kc/log_buf"
  11. "gitp78su.ipnodns.ru/svi/kern/kc/safe_bool"
  12. . "gitp78su.ipnodns.ru/svi/kern/krn/kalias"
  13. . "gitp78su.ipnodns.ru/svi/kern/krn/ktypes"
  14. )
  15. // kernelWg -- именованный ожидатель потоков ядра
  16. type kernelWg struct {
  17. sync.RWMutex
  18. ctx context.Context
  19. dictStream map[AStreamName]bool // Словарь имён потоков с признаком работы
  20. isWork ISafeBool
  21. log ILogBuf
  22. }
  23. var (
  24. kernWg *kernelWg // Глобальный объект
  25. block sync.Mutex
  26. )
  27. // GetKernelWg -- возвращает новый именованный ожидатель потоков ядра
  28. func GetKernelWg(ctx context.Context) IKernelWg {
  29. block.Lock()
  30. defer block.Unlock()
  31. if kernWg != nil {
  32. kernWg.log.Debug("GetKernelWg()")
  33. return kernWg
  34. }
  35. Hassert(ctx != nil, "GetKernelWg(): ctx==nil")
  36. sf := &kernelWg{
  37. ctx: ctx,
  38. dictStream: map[AStreamName]bool{},
  39. isWork: safe_bool.NewSafeBool(),
  40. log: log_buf.NewLogBuf(),
  41. }
  42. sf.log.Debug("GetKernelWg(): run")
  43. go sf.close()
  44. sf.isWork.Set()
  45. kernWg = sf
  46. return kernWg
  47. }
  48. // Log -- возвращает лог ожидателя потоков
  49. func (sf *kernelWg) Log() ILogBuf {
  50. return sf.log
  51. }
  52. // Len -- возвращает размер списка ожидания потоков
  53. func (sf *kernelWg) Len() int {
  54. sf.RLock()
  55. defer sf.RUnlock()
  56. return len(sf.dictStream)
  57. }
  58. // IsWork -- возвращает признак работы ядра
  59. func (sf *kernelWg) IsWork() bool {
  60. return sf.isWork.Get()
  61. }
  62. // List -- возвращает список имён потоков на ожидании
  63. func (sf *kernelWg) List() []AStreamName {
  64. sf.RLock()
  65. defer sf.RUnlock()
  66. lst := []AStreamName{}
  67. for name := range sf.dictStream {
  68. lst = append(lst, name)
  69. }
  70. return lst
  71. }
  72. // Done -- удаляет поток из ожидания
  73. func (sf *kernelWg) Done(name AStreamName) {
  74. sf.Lock()
  75. defer sf.Unlock()
  76. delete(sf.dictStream, name)
  77. sf.log.Debug("kernelWg.Done(): stream(%v) done", name)
  78. }
  79. // Wait -- блокирующий вызов; возвращает управление, только когда все потоки завершили работу
  80. func (sf *kernelWg) Wait() {
  81. for {
  82. SleepMs()
  83. if !sf.isWork.Get() {
  84. break
  85. }
  86. }
  87. sf.log.Debug("kernelWg.Wait(): done")
  88. }
  89. // Add -- добавляет поток в ожидание
  90. func (sf *kernelWg) Add(name AStreamName) error {
  91. sf.Lock()
  92. defer sf.Unlock()
  93. sf.log.Debug("kernelWg.Add(): stream='%v'", name)
  94. if !sf.isWork.Get() {
  95. return fmt.Errorf("kernelWg.Add(): stream=%v, work end", name)
  96. }
  97. Hassert(name != "", "kernelWg.Add(): name stream is empty")
  98. _, isOk := sf.dictStream[name]
  99. Hassert(!isOk, "kernelWg.Add(): stream '%v' already exists", name)
  100. sf.dictStream[name] = true
  101. return nil
  102. }
  103. // Ожидает окончания работы ожидателя групп
  104. func (sf *kernelWg) close() {
  105. <-sf.ctx.Done()
  106. fnDone := func() bool {
  107. sf.Lock()
  108. defer sf.Unlock()
  109. return len(sf.dictStream) == 0
  110. }
  111. for {
  112. SleepMs()
  113. if fnDone() {
  114. break
  115. }
  116. }
  117. sf.Lock()
  118. defer sf.Unlock()
  119. if !sf.isWork.Get() {
  120. return
  121. }
  122. sf.isWork.Reset()
  123. sf.log.Debug("kernelWg.close(): end")
  124. }