kwg.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. // package kwg -- именованный ожидатель потоков ядра.
  2. //
  3. // Не позволяет завершиться ядру, если есть хоть один работающий поток
  4. package kwg
  5. import (
  6. "context"
  7. "sync"
  8. mL0 "gitp78su.ipnodns.ru/svi/kern/v4/lev0"
  9. "gitp78su.ipnodns.ru/svi/kern/v4/lev0/defs/stream_name"
  10. "gitp78su.ipnodns.ru/svi/kern/v4/lev0/etypes/ebool"
  11. mKh "gitp78su.ipnodns.ru/svi/kern/v4/lev0/helpers"
  12. "gitp78su.ipnodns.ru/svi/kern/v4/lev0/kspec"
  13. mKs "gitp78su.ipnodns.ru/svi/kern/v4/lev0/kspec"
  14. mL1 "gitp78su.ipnodns.ru/svi/kern/v4/lev1"
  15. "gitp78su.ipnodns.ru/svi/kern/v4/lev1/log_buf"
  16. "gitp78su.ipnodns.ru/svi/kern/v4/lev1/safe_bool"
  17. )
  18. // kernelWg -- именованный ожидатель потоков ядра.
  19. type kernelWg struct {
  20. sync.RWMutex
  21. ctx context.Context
  22. dictStream map[*stream_name.AStreamName]bool // Словарь имён потоков с признаком работы
  23. isWork mKs.ISafeBool
  24. log mKs.ILogBuf
  25. }
  26. var (
  27. kernWg *kernelWg // Глобальный объект
  28. block sync.Mutex
  29. )
  30. // GetKernelWg -- возвращает новый именованный ожидатель потоков ядра.
  31. func GetKernelWg(ctx context.Context) mKs.IKernelWg {
  32. block.Lock()
  33. defer block.Unlock()
  34. if kernWg != nil {
  35. kernWg.log.Debug("GetKernelWg()")
  36. return kernWg
  37. }
  38. mKh.Hassert(ctx != nil, "GetKernelWg(): ctx==nil")
  39. paramLogBuf := &log_buf.LogBufParam{
  40. IsTerm_: safe_bool.NewSafeBool(true),
  41. Prefix_: "kernelWg",
  42. }
  43. sf := &kernelWg{
  44. ctx: ctx,
  45. dictStream: map[*stream_name.AStreamName]bool{},
  46. isWork: mL1.NewSafeBool(),
  47. log: mL1.NewLogBuf(paramLogBuf),
  48. }
  49. sf.log.Debug("GetKernelWg(): run")
  50. go sf.close()
  51. sf.isWork.Set()
  52. kernWg = sf
  53. return kernWg
  54. }
  55. // Log -- возвращает лог ожидателя потоков.
  56. func (sf *kernelWg) Log() mKs.ILogBuf {
  57. return sf.log
  58. }
  59. // Len -- возвращает размер списка ожидания потоков.
  60. func (sf *kernelWg) Len() int {
  61. sf.RLock()
  62. defer sf.RUnlock()
  63. return len(sf.dictStream)
  64. }
  65. // IsWork -- возвращает признак работы ядра.
  66. func (sf *kernelWg) IsWork() kspec.EBool {
  67. res := sf.isWork.Get()
  68. return ebool.NewEBool(res)
  69. }
  70. // List -- возвращает список имён потоков на ожидании.
  71. func (sf *kernelWg) List() []*stream_name.AStreamName {
  72. sf.RLock()
  73. defer sf.RUnlock()
  74. lst := make([]*stream_name.AStreamName, 0, len(sf.dictStream))
  75. for name := range sf.dictStream {
  76. lst = append(lst, name)
  77. }
  78. return lst
  79. }
  80. // Done -- удаляет поток из ожидания.
  81. func (sf *kernelWg) Done(name *stream_name.AStreamName) {
  82. sf.Lock()
  83. defer sf.Unlock()
  84. delete(sf.dictStream, name)
  85. sf.log.Debug("Done(): stream(%v) done", name)
  86. }
  87. // Wait -- блокирующий вызов; возвращает управление, только когда все потоки завершили работу.
  88. func (sf *kernelWg) Wait() {
  89. for {
  90. mKh.SleepMs()
  91. if !sf.isWork.Get() {
  92. break
  93. }
  94. }
  95. sf.log.Debug("Wait(): done")
  96. }
  97. // Add -- добавляет поток в ожидание.
  98. func (sf *kernelWg) Add(name *stream_name.AStreamName) {
  99. sf.Lock()
  100. defer sf.Unlock()
  101. sf.log.Debug("Add(): stream='%v'", name)
  102. mL0.Hassert(sf.isWork.Get(), "Add(): stream=%v, work end", name)
  103. mKh.Hassert(name.Get() != "", "Add(): name stream is empty")
  104. _, isOk := sf.dictStream[name]
  105. mKh.Hassert(!isOk, "Add(): stream '%v' already exists", name)
  106. sf.dictStream[name] = true
  107. }
  108. // Ожидает окончания работы ожидателя групп.
  109. func (sf *kernelWg) close() {
  110. <-sf.ctx.Done()
  111. fnDone := func() bool {
  112. sf.Lock()
  113. defer sf.Unlock()
  114. return len(sf.dictStream) == 0
  115. }
  116. for {
  117. mKh.SleepMs()
  118. if fnDone() {
  119. break
  120. }
  121. }
  122. sf.Lock()
  123. defer sf.Unlock()
  124. if !sf.isWork.Get() {
  125. return
  126. }
  127. sf.isWork.Reset()
  128. sf.log.Debug("close(): end")
  129. }