kwg.go 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. // package kwg -- именованный ожидатель потоков ядра.
  2. //
  3. // Не позволяет завершиться ядру, если есть хоть один работающий поток
  4. package kwg
  5. import (
  6. "context"
  7. "sync"
  8. mL0 "gitp78su.ipnodns.ru/svi/kern/v4/lev0"
  9. mKd "gitp78su.ipnodns.ru/svi/kern/v4/lev0/defs"
  10. mKh "gitp78su.ipnodns.ru/svi/kern/v4/lev0/helpers"
  11. mKs "gitp78su.ipnodns.ru/svi/kern/v4/lev0/kspec"
  12. mL1 "gitp78su.ipnodns.ru/svi/kern/v4/lev1"
  13. )
  14. // kernelWg -- именованный ожидатель потоков ядра.
  15. type kernelWg struct {
  16. sync.RWMutex
  17. ctx context.Context
  18. dictStream map[*mKd.StreamName]bool // Словарь имён потоков с признаком работы
  19. isWork mKs.ISafeBool
  20. log mKs.ILogBuf
  21. }
  22. var (
  23. kernWg *kernelWg // Глобальный объект
  24. block sync.Mutex
  25. )
  26. // GetKernelWg -- возвращает новый именованный ожидатель потоков ядра.
  27. func GetKernelWg(ctx context.Context) mKs.IKernelWg {
  28. block.Lock()
  29. defer block.Unlock()
  30. if kernWg != nil {
  31. kernWg.log.Debug("GetKernelWg()")
  32. return kernWg
  33. }
  34. mKh.Hassert(ctx != nil, "GetKernelWg(): ctx==nil")
  35. sf := &kernelWg{
  36. ctx: ctx,
  37. dictStream: map[*mKd.StreamName]bool{},
  38. isWork: mL1.NewSafeBool(),
  39. log: mL1.NewLogBuf(mL1.OptIsTerm(true), mL1.OptPrefix("kernelWg")),
  40. }
  41. sf.log.Debug("GetKernelWg(): run")
  42. go sf.close()
  43. sf.isWork.Set()
  44. kernWg = sf
  45. return kernWg
  46. }
  47. // Log -- возвращает лог ожидателя потоков.
  48. func (sf *kernelWg) Log() mKs.ILogBuf {
  49. return sf.log
  50. }
  51. // Len -- возвращает размер списка ожидания потоков.
  52. func (sf *kernelWg) Len() int {
  53. sf.RLock()
  54. defer sf.RUnlock()
  55. return len(sf.dictStream)
  56. }
  57. // IsWork -- возвращает признак работы ядра.
  58. func (sf *kernelWg) IsWork() bool {
  59. return sf.isWork.Get()
  60. }
  61. // List -- возвращает список имён потоков на ожидании.
  62. func (sf *kernelWg) List() []*mKd.StreamName {
  63. sf.RLock()
  64. defer sf.RUnlock()
  65. lst := make([]*mKd.StreamName, 0, len(sf.dictStream))
  66. for name := range sf.dictStream {
  67. lst = append(lst, name)
  68. }
  69. return lst
  70. }
  71. // Done -- удаляет поток из ожидания.
  72. func (sf *kernelWg) Done(name *mKd.StreamName) {
  73. sf.Lock()
  74. defer sf.Unlock()
  75. delete(sf.dictStream, name)
  76. sf.log.Debug("Done(): stream(%v) done", name)
  77. }
  78. // Wait -- блокирующий вызов; возвращает управление, только когда все потоки завершили работу.
  79. func (sf *kernelWg) Wait() {
  80. for {
  81. mKh.SleepMs()
  82. if !sf.isWork.Get() {
  83. break
  84. }
  85. }
  86. sf.log.Debug("Wait(): done")
  87. }
  88. // Add -- добавляет поток в ожидание.
  89. func (sf *kernelWg) Add(name *mKd.StreamName) {
  90. sf.Lock()
  91. defer sf.Unlock()
  92. sf.log.Debug("Add(): stream='%v'", name)
  93. mL0.Hassert(sf.isWork.Get(), "Add(): stream=%v, work end", name)
  94. mKh.Hassert(name.Get() != "", "Add(): name stream is empty")
  95. _, isOk := sf.dictStream[name]
  96. mKh.Hassert(!isOk, "Add(): stream '%v' already exists", name)
  97. sf.dictStream[name] = true
  98. }
  99. // Ожидает окончания работы ожидателя групп.
  100. func (sf *kernelWg) close() {
  101. <-sf.ctx.Done()
  102. fnDone := func() bool {
  103. sf.Lock()
  104. defer sf.Unlock()
  105. return len(sf.dictStream) == 0
  106. }
  107. for {
  108. mKh.SleepMs()
  109. if fnDone() {
  110. break
  111. }
  112. }
  113. sf.Lock()
  114. defer sf.Unlock()
  115. if !sf.isWork.Get() {
  116. return
  117. }
  118. sf.isWork.Reset()
  119. sf.log.Debug("close(): end")
  120. }