kbus_base.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181
  1. // package kbus_base -- базовая часть шины данных.
  2. package kbus_base
  3. import (
  4. "fmt"
  5. "sync"
  6. mKa "gitp78su.ipnodns.ru/svi/kern/v4/lev0/kalias"
  7. mKt "gitp78su.ipnodns.ru/svi/kern/v4/lev0/ktypes"
  8. mL1 "gitp78su.ipnodns.ru/svi/kern/v4/lev1"
  9. "gitp78su.ipnodns.ru/svi/kern/v4/lev2/kbus/dict_topic_serve"
  10. "gitp78su.ipnodns.ru/svi/kern/v4/lev2/kbus/dict_topic_sub"
  11. "gitp78su.ipnodns.ru/svi/kern/v4/lev2/kctx"
  12. )
  13. const (
  14. strBusBaseStream = "bus_base"
  15. )
  16. // KBusBase -- базовая часть шины данных.
  17. type KBusBase struct {
  18. Ctx_ mKt.IKernelCtx
  19. IsWork_ mKt.ISafeBool
  20. ctx mKt.ILocalCtx
  21. log mKt.ILogBuf
  22. dictSub mKt.IDictTopicSub
  23. dictServe mKt.IDictTopicServe
  24. }
  25. var (
  26. Bus_ *KBusBase
  27. block sync.Mutex
  28. )
  29. // GetKernelBusBase -- возвращает базовую шину сообщений.
  30. func GetKernelBusBase() mKt.IResult[*KBusBase] {
  31. block.Lock()
  32. defer block.Unlock()
  33. if Bus_ != nil {
  34. return mL1.NewRes(Bus_)
  35. }
  36. resKernCtx := kctx.GetKernelCtx()
  37. if resKernCtx.IsErr() {
  38. err := fmt.Errorf("GetKernelBusBase(): in get from kernCtx, err=\n\t%w", resKernCtx.Err())
  39. return mL1.NewErr[*KBusBase](err)
  40. }
  41. kCtx := resKernCtx.Val()
  42. resLocCtx := mL1.NewLocalCtx(kCtx.Ctx())
  43. if resLocCtx.IsErr() {
  44. err := fmt.Errorf("GetKernelBusBase(): in get from localCtx, err=\n\t%w", resLocCtx.Err())
  45. return mL1.NewErr[*KBusBase](err)
  46. }
  47. resDictServe := dict_topic_serve.NewDictServe()
  48. if resDictServe.IsErr() {
  49. err := fmt.Errorf("GetKernelBusBase(): in get from dictServe, err=\n\t%w", resDictServe.Err())
  50. return mL1.NewErr[*KBusBase](err)
  51. }
  52. resDictTopicSu := dict_topic_sub.NewDictTopicSub()
  53. if resDictTopicSu.IsErr() {
  54. err := fmt.Errorf("GetKernelBusBase(): in get from dictTopicSu, err=\n\t%w", resDictTopicSu.Err())
  55. return mL1.NewErr[*KBusBase](err)
  56. }
  57. Bus_ = &KBusBase{
  58. Ctx_: resKernCtx.Val(),
  59. IsWork_: mL1.NewSafeBool(),
  60. dictSub: resDictTopicSu.Val(),
  61. dictServe: mKt.IDictTopicServe(resDictServe.Val()),
  62. ctx: resLocCtx.Val(),
  63. }
  64. Bus_.log = Bus_.ctx.Log()
  65. go Bus_.close()
  66. go Bus_.run()
  67. Bus_.IsWork_.Set()
  68. res := Bus_.Ctx_.Wg().Add(strBusBaseStream)
  69. res.Hassert("GetKernelBusBase(): in add name stream(%v)", strBusBaseStream)
  70. resSet := Bus_.Ctx_.Set("kernBusBase", Bus_, "base of data bus")
  71. if resSet.IsErr() {
  72. err := fmt.Errorf("GetKernelBusBase(): in set kernel bus base to kernel ctx, err=\n\t%w", resSet.Err())
  73. return mL1.NewErr[*KBusBase](err)
  74. }
  75. _ = mKt.IKernelBus(Bus_)
  76. return mL1.NewRes(Bus_)
  77. }
  78. // Log -- возвращает лог шины.
  79. func (sf *KBusBase) Log() mKt.ILogBuf {
  80. return sf.log
  81. }
  82. func (sf *KBusBase) run() {
  83. sf.log.Debug("KBusBase.run()")
  84. for {
  85. break
  86. }
  87. }
  88. // Unsubscribe -- отписывает обработчик от топика.
  89. func (sf *KBusBase) Unsubscribe(handler mKt.IBusHandlerSubscribe) {
  90. sf.log.Debug("KBusBase.Unsubscribe(): handler='%v'", handler.Name())
  91. sf.dictSub.Unsubscribe(handler)
  92. }
  93. // Subscribe -- подписывает обработчик на топик.
  94. func (sf *KBusBase) Subscribe(handler mKt.IBusHandlerSubscribe) mKt.IResult[bool] {
  95. sf.log.Debug("KBusBase.Subscribe(): handler='%v'", handler.Name())
  96. if !sf.IsWork_.Get() {
  97. err := fmt.Errorf("KBusBase.Subscribe(): handler='%v', bus already closed", handler.Name())
  98. sf.log.Err(err.Error())
  99. return mL1.NewErr[bool](err)
  100. }
  101. res := sf.dictSub.Subscribe(handler)
  102. if res.IsErr() {
  103. err := fmt.Errorf("KBusBase.Subscribe(): handler='%v', err=\n\t%w", handler.Name(), res.Err())
  104. sf.log.Err(err.Error())
  105. return mL1.NewErr[bool](err)
  106. }
  107. return mL1.NewRes(true)
  108. }
  109. // SendRequest -- отправляет запрос в шину данных.
  110. func (sf *KBusBase) SendRequest(topic mKa.ATopic, binReq []byte) mKt.IResult[[]byte] {
  111. sf.log.Debug("KBusBase.SendRequest(): topic='%v'", topic)
  112. if !sf.IsWork_.Get() {
  113. err := fmt.Errorf("KBusBase.SendRequest(): topic='%v', bus already closed", topic)
  114. sf.log.Err(err.Error())
  115. return mL1.NewErr[[]byte](err)
  116. }
  117. res := sf.dictServe.SendRequest(topic, binReq)
  118. if res.IsErr() {
  119. err := fmt.Errorf("KBusBase.SendRequest(): topic='%v', err=\n\t%w", topic, res.Err())
  120. sf.log.Err(err.Error())
  121. return mL1.NewErr[[]byte](err)
  122. }
  123. return res
  124. }
  125. // RegisterServe -- регистрирует обработчики входящих запросов.
  126. func (sf *KBusBase) RegisterServe(handler mKt.IBusHandlerServe) mKt.IResult[bool] {
  127. if handler == nil {
  128. return mL1.NewErr[bool](fmt.Errorf("KBusBase.RegisterServe(): IBusHandlerServe==nil"))
  129. }
  130. sf.log.Debug("KBusBase.RegisterServe(): handler='%v'", handler.Name())
  131. res := sf.dictServe.Register(handler)
  132. if res.IsErr() {
  133. err := fmt.Errorf("KBusBase.RegisterServe(): handler='%v', err=\n\t%w", handler.Name(), res.Err())
  134. sf.log.Err(err.Error())
  135. return mL1.NewErr[bool](err)
  136. }
  137. return mL1.NewRes(true)
  138. }
  139. // Publish -- публикует сообщение в шину.
  140. func (sf *KBusBase) Publish(topic mKa.ATopic, binMsg []byte) mKt.IResult[bool] {
  141. sf.log.Debug("KBusBase.Publish(): topic='%v'", topic)
  142. if !sf.IsWork_.Get() {
  143. err := fmt.Errorf("KBusBase.Publish(): topic='%v',bus already closed", topic)
  144. sf.log.Err(err.Error())
  145. return mL1.NewErr[bool](err)
  146. }
  147. // Асинхронный запуск чтения
  148. go sf.dictSub.Read(topic, binMsg)
  149. return mL1.NewRes(true)
  150. }
  151. // IsWork -- возвращает признак работы шины.
  152. func (sf *KBusBase) IsWork() bool {
  153. return sf.IsWork_.Get()
  154. }
  155. // Ожидает закрытия шины в отдельном потоке.
  156. func (sf *KBusBase) close() {
  157. sf.Ctx_.Wait()
  158. sf.Ctx_.Lock()
  159. defer sf.Ctx_.Unlock()
  160. if !sf.IsWork_.Get() {
  161. return
  162. }
  163. sf.IsWork_.Reset()
  164. sf.Ctx_.Wg().Done(strBusBaseStream)
  165. sf.log.Debug("KBusBase.close(): done")
  166. }