kbus_base.go 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183
  1. // package kbus_base -- базовая часть шины данных.
  2. package kbus_base
  3. import (
  4. "fmt"
  5. "sync"
  6. mL0 "gitp78su.ipnodns.ru/svi/kern/v4/lev0"
  7. "gitp78su.ipnodns.ru/svi/kern/v4/lev0/alias"
  8. mKa "gitp78su.ipnodns.ru/svi/kern/v4/lev0/alias"
  9. mKt "gitp78su.ipnodns.ru/svi/kern/v4/lev0/ktypes"
  10. mL1 "gitp78su.ipnodns.ru/svi/kern/v4/lev1"
  11. "gitp78su.ipnodns.ru/svi/kern/v4/lev2/kbus/dict_topic_serve"
  12. "gitp78su.ipnodns.ru/svi/kern/v4/lev2/kbus/dict_topic_sub"
  13. "gitp78su.ipnodns.ru/svi/kern/v4/lev2/kctx"
  14. )
  15. var (
  16. busBaseStreamName = alias.NewAStreamName("bus_base")
  17. )
  18. // KBusBase -- базовая часть шины данных.
  19. type KBusBase struct {
  20. Ctx_ mKt.IKernelCtx
  21. IsWork_ mKt.ISafeBool
  22. ctx mKt.ILocalCtx
  23. log mKt.ILogBuf
  24. dictSub mKt.IDictTopicSub
  25. dictServe mKt.IDictTopicServe
  26. }
  27. var (
  28. Bus_ *KBusBase
  29. block sync.Mutex
  30. )
  31. // GetKernelBusBase -- возвращает базовую шину сообщений.
  32. func GetKernelBusBase() *mL0.Result[*KBusBase] {
  33. block.Lock()
  34. defer block.Unlock()
  35. if Bus_ != nil {
  36. return mL0.NewRes(Bus_)
  37. }
  38. resKernCtx := kctx.GetKernelCtx()
  39. if resKernCtx.IsErr() {
  40. err := fmt.Errorf("GetKernelBusBase(): in get from kernCtx, err=\n\t%w", resKernCtx.Err())
  41. return mL0.NewErr[*KBusBase](err)
  42. }
  43. kCtx := resKernCtx.Val()
  44. resLocCtx := mL1.NewLocalCtx(kCtx.Ctx())
  45. if resLocCtx.IsErr() {
  46. err := fmt.Errorf("GetKernelBusBase(): in get from localCtx, err=\n\t%w", resLocCtx.Err())
  47. return mL0.NewErr[*KBusBase](err)
  48. }
  49. resDictServe := dict_topic_serve.NewDictServe()
  50. if resDictServe.IsErr() {
  51. err := fmt.Errorf("GetKernelBusBase(): in get from dictServe, err=\n\t%w", resDictServe.Err())
  52. return mL0.NewErr[*KBusBase](err)
  53. }
  54. resDictTopicSu := dict_topic_sub.NewDictTopicSub()
  55. if resDictTopicSu.IsErr() {
  56. err := fmt.Errorf("GetKernelBusBase(): in get from dictTopicSu, err=\n\t%w", resDictTopicSu.Err())
  57. return mL0.NewErr[*KBusBase](err)
  58. }
  59. Bus_ = &KBusBase{
  60. Ctx_: resKernCtx.Val(),
  61. IsWork_: mL1.NewSafeBool(),
  62. dictSub: resDictTopicSu.Val(),
  63. dictServe: mKt.IDictTopicServe(resDictServe.Val()),
  64. ctx: resLocCtx.Val(),
  65. }
  66. Bus_.log = Bus_.ctx.Log()
  67. go Bus_.close()
  68. go Bus_.run()
  69. Bus_.IsWork_.Set()
  70. res := Bus_.Ctx_.Wg().Add(busBaseStreamName)
  71. res.Hassert("GetKernelBusBase(): in add name stream(%v)", busBaseStreamName)
  72. resSet := Bus_.Ctx_.Set("kernBusBase", Bus_, "base of data bus")
  73. if resSet.IsErr() {
  74. err := fmt.Errorf("GetKernelBusBase(): in set kernel bus base to kernel ctx, err=\n\t%w", resSet.Err())
  75. return mL0.NewErr[*KBusBase](err)
  76. }
  77. _ = mKt.IKernelBus(Bus_)
  78. return mL0.NewRes(Bus_)
  79. }
  80. // Log -- возвращает лог шины.
  81. func (sf *KBusBase) Log() mKt.ILogBuf {
  82. return sf.log
  83. }
  84. func (sf *KBusBase) run() {
  85. sf.log.Debug("KBusBase.run()")
  86. for {
  87. break
  88. }
  89. }
  90. // Unsubscribe -- отписывает обработчик от топика.
  91. func (sf *KBusBase) Unsubscribe(handler mKt.IBusHandlerSubscribe) {
  92. sf.log.Debug("KBusBase.Unsubscribe(): handler='%v'", handler.Name())
  93. sf.dictSub.Unsubscribe(handler)
  94. }
  95. // Subscribe -- подписывает обработчик на топик.
  96. func (sf *KBusBase) Subscribe(handler mKt.IBusHandlerSubscribe) *mL0.Result[bool] {
  97. sf.log.Debug("KBusBase.Subscribe(): handler='%v'", handler.Name())
  98. if !sf.IsWork_.Get() {
  99. err := fmt.Errorf("KBusBase.Subscribe(): handler='%v', bus already closed", handler.Name())
  100. sf.log.Err(err.Error())
  101. return mL0.NewErr[bool](err)
  102. }
  103. res := sf.dictSub.Subscribe(handler)
  104. if res.IsErr() {
  105. err := fmt.Errorf("KBusBase.Subscribe(): handler='%v', err=\n\t%w", handler.Name(), res.Err())
  106. sf.log.Err(err.Error())
  107. return mL0.NewErr[bool](err)
  108. }
  109. return mL0.NewRes(true)
  110. }
  111. // SendRequest -- отправляет запрос в шину данных.
  112. func (sf *KBusBase) SendRequest(topic *mKa.ATopic, binReq []byte) *mL0.Result[[]byte] {
  113. sf.log.Debug("KBusBase.SendRequest(): topic='%v'", topic)
  114. if !sf.IsWork_.Get() {
  115. err := fmt.Errorf("KBusBase.SendRequest(): topic='%v', bus already closed", topic)
  116. sf.log.Err(err.Error())
  117. return mL0.NewErr[[]byte](err)
  118. }
  119. res := sf.dictServe.SendRequest(topic, binReq)
  120. if res.IsErr() {
  121. err := fmt.Errorf("KBusBase.SendRequest(): topic='%v', err=\n\t%w", topic, res.Err())
  122. sf.log.Err(err.Error())
  123. return mL0.NewErr[[]byte](err)
  124. }
  125. return res
  126. }
  127. // RegisterServe -- регистрирует обработчики входящих запросов.
  128. func (sf *KBusBase) RegisterServe(handler mKt.IBusHandlerServe) *mL0.Result[bool] {
  129. if handler == nil {
  130. return mL0.NewErr[bool](fmt.Errorf("KBusBase.RegisterServe(): IBusHandlerServe==nil"))
  131. }
  132. sf.log.Debug("KBusBase.RegisterServe(): handler='%v'", handler.Name())
  133. res := sf.dictServe.Register(handler)
  134. if res.IsErr() {
  135. err := fmt.Errorf("KBusBase.RegisterServe(): handler='%v', err=\n\t%w", handler.Name(), res.Err())
  136. sf.log.Err(err.Error())
  137. return mL0.NewErr[bool](err)
  138. }
  139. return mL0.NewRes(true)
  140. }
  141. // Publish -- публикует сообщение в шину.
  142. func (sf *KBusBase) Publish(topic *mKa.ATopic, binMsg []byte) *mL0.Result[bool] {
  143. sf.log.Debug("KBusBase.Publish(): topic='%v'", topic)
  144. if !sf.IsWork_.Get() {
  145. err := fmt.Errorf("KBusBase.Publish(): topic='%v',bus already closed", topic)
  146. sf.log.Err(err.Error())
  147. return mL0.NewErr[bool](err)
  148. }
  149. // Асинхронный запуск чтения
  150. go sf.dictSub.Read(topic, binMsg)
  151. return mL0.NewRes(true)
  152. }
  153. // IsWork -- возвращает признак работы шины.
  154. func (sf *KBusBase) IsWork() bool {
  155. return sf.IsWork_.Get()
  156. }
  157. // Ожидает закрытия шины в отдельном потоке.
  158. func (sf *KBusBase) close() {
  159. sf.Ctx_.Wait()
  160. sf.Ctx_.Lock()
  161. defer sf.Ctx_.Unlock()
  162. if !sf.IsWork_.Get() {
  163. return
  164. }
  165. sf.IsWork_.Reset()
  166. sf.Ctx_.Wg().Done(busBaseStreamName)
  167. sf.log.Debug("KBusBase.close(): done")
  168. }