kwg.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138
  1. // package kwg -- именованный ожидатель потоков ядра.
  2. //
  3. // Не позволяет завершиться ядру, если есть хоть один работающий поток
  4. package kwg
  5. import (
  6. "context"
  7. "fmt"
  8. "sync"
  9. mL0 "gitp78su.ipnodns.ru/svi/kern/v4/lev0"
  10. mKa "gitp78su.ipnodns.ru/svi/kern/v4/lev0/alias"
  11. mKh "gitp78su.ipnodns.ru/svi/kern/v4/lev0/helpers"
  12. mKt "gitp78su.ipnodns.ru/svi/kern/v4/lev0/ktypes"
  13. mL1 "gitp78su.ipnodns.ru/svi/kern/v4/lev1"
  14. )
  15. // kernelWg -- именованный ожидатель потоков ядра.
  16. type kernelWg struct {
  17. sync.RWMutex
  18. ctx context.Context
  19. dictStream map[*mKa.AStreamName]bool // Словарь имён потоков с признаком работы
  20. isWork mKt.ISafeBool
  21. log mKt.ILogBuf
  22. }
  23. var (
  24. kernWg *kernelWg // Глобальный объект
  25. block sync.Mutex
  26. )
  27. // GetKernelWg -- возвращает новый именованный ожидатель потоков ядра.
  28. func GetKernelWg(ctx context.Context) mKt.IKernelWg {
  29. block.Lock()
  30. defer block.Unlock()
  31. if kernWg != nil {
  32. kernWg.log.Debug("GetKernelWg()")
  33. return kernWg
  34. }
  35. mKh.Hassert(ctx != nil, "GetKernelWg(): ctx==nil")
  36. sf := &kernelWg{
  37. ctx: ctx,
  38. dictStream: map[*mKa.AStreamName]bool{},
  39. isWork: mL1.NewSafeBool(),
  40. log: mL1.NewLogBuf(mL1.OptIsTerm(true), mL1.OptPrefix("kernelWg")),
  41. }
  42. sf.log.Debug("GetKernelWg(): run")
  43. go sf.close()
  44. sf.isWork.Set()
  45. kernWg = sf
  46. return kernWg
  47. }
  48. // Log -- возвращает лог ожидателя потоков.
  49. func (sf *kernelWg) Log() mKt.ILogBuf {
  50. return sf.log
  51. }
  52. // Len -- возвращает размер списка ожидания потоков.
  53. func (sf *kernelWg) Len() int {
  54. sf.RLock()
  55. defer sf.RUnlock()
  56. return len(sf.dictStream)
  57. }
  58. // IsWork -- возвращает признак работы ядра.
  59. func (sf *kernelWg) IsWork() bool {
  60. return sf.isWork.Get()
  61. }
  62. // List -- возвращает список имён потоков на ожидании.
  63. func (sf *kernelWg) List() []*mKa.AStreamName {
  64. sf.RLock()
  65. defer sf.RUnlock()
  66. lst := make([]*mKa.AStreamName, 0, len(sf.dictStream))
  67. for name := range sf.dictStream {
  68. lst = append(lst, name)
  69. }
  70. return lst
  71. }
  72. // Done -- удаляет поток из ожидания.
  73. func (sf *kernelWg) Done(name *mKa.AStreamName) {
  74. sf.Lock()
  75. defer sf.Unlock()
  76. delete(sf.dictStream, name)
  77. sf.log.Debug("Done(): stream(%v) done", name)
  78. }
  79. // Wait -- блокирующий вызов; возвращает управление, только когда все потоки завершили работу.
  80. func (sf *kernelWg) Wait() {
  81. for {
  82. mKh.SleepMs()
  83. if !sf.isWork.Get() {
  84. break
  85. }
  86. }
  87. sf.log.Debug("Wait(): done")
  88. }
  89. // Add -- добавляет поток в ожидание.
  90. func (sf *kernelWg) Add(name *mKa.AStreamName) *mL0.Result[bool] {
  91. sf.Lock()
  92. defer sf.Unlock()
  93. sf.log.Debug("Add(): stream='%v'", name)
  94. if !sf.isWork.Get() {
  95. err := fmt.Errorf("Add(): stream=%v, work end", name)
  96. return mL0.NewErr[bool](err)
  97. }
  98. mKh.Hassert(name.Get() != "", "Add(): name stream is empty")
  99. _, isOk := sf.dictStream[name]
  100. mKh.Hassert(!isOk, "Add(): stream '%v' already exists", name)
  101. sf.dictStream[name] = true
  102. return mL0.NewRes(true)
  103. }
  104. // Ожидает окончания работы ожидателя групп.
  105. func (sf *kernelWg) close() {
  106. <-sf.ctx.Done()
  107. fnDone := func() bool {
  108. sf.Lock()
  109. defer sf.Unlock()
  110. return len(sf.dictStream) == 0
  111. }
  112. for {
  113. mKh.SleepMs()
  114. if fnDone() {
  115. break
  116. }
  117. }
  118. sf.Lock()
  119. defer sf.Unlock()
  120. if !sf.isWork.Get() {
  121. return
  122. }
  123. sf.isWork.Reset()
  124. sf.log.Debug("close(): end")
  125. }