kbus_base.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150
  1. // package kbus_base -- базовая часть шины данных.
  2. package kbus_base
  3. import (
  4. "fmt"
  5. "sync"
  6. mL0 "gitp78su.ipnodns.ru/svi/kern/v4/lev0"
  7. "gitp78su.ipnodns.ru/svi/kern/v4/lev0/alias"
  8. mKa "gitp78su.ipnodns.ru/svi/kern/v4/lev0/alias"
  9. mKt "gitp78su.ipnodns.ru/svi/kern/v4/lev0/ktypes"
  10. mL1 "gitp78su.ipnodns.ru/svi/kern/v4/lev1"
  11. "gitp78su.ipnodns.ru/svi/kern/v4/lev2/kbus/dict_topic_serve"
  12. "gitp78su.ipnodns.ru/svi/kern/v4/lev2/kbus/dict_topic_sub"
  13. "gitp78su.ipnodns.ru/svi/kern/v4/lev2/kctx"
  14. )
  15. var (
  16. busBaseStreamName = alias.NewAStreamName("bus_base")
  17. )
  18. // KBusBase -- базовая часть шины данных.
  19. type KBusBase struct {
  20. KCtx_ mKt.IKernelCtx
  21. IsWork_ mKt.ISafeBool
  22. lCtx mKt.ILocalCtx
  23. log mKt.ILogBuf
  24. dictSub mKt.IDictTopicSub
  25. dictServe mKt.IDictTopicServe
  26. }
  27. var (
  28. Bus_ *KBusBase
  29. block sync.Mutex
  30. )
  31. // GetKernelBusBase -- возвращает базовую шину сообщений.
  32. func GetKernelBusBase() *KBusBase {
  33. block.Lock()
  34. defer block.Unlock()
  35. if Bus_ != nil {
  36. return Bus_
  37. }
  38. kCtx := kctx.GetKernelCtx()
  39. lCtx := mL1.NewLocalCtx(kCtx.Ctx())
  40. Bus_ = &KBusBase{
  41. KCtx_: kCtx,
  42. IsWork_: mL1.NewSafeBool(),
  43. dictSub: dict_topic_sub.NewDictTopicSub(),
  44. dictServe: dict_topic_serve.NewDictServe(),
  45. lCtx: lCtx,
  46. }
  47. Bus_.log = Bus_.lCtx.Log()
  48. go Bus_.close()
  49. go Bus_.run()
  50. Bus_.IsWork_.Set()
  51. Bus_.KCtx_.Wg().Add(busBaseStreamName)
  52. Bus_.KCtx_.Set("kernBusBase", Bus_, "base of data bus")
  53. _ = mKt.IKernelBus(Bus_)
  54. return Bus_
  55. }
  56. // Log -- возвращает лог шины.
  57. func (sf *KBusBase) Log() mKt.ILogBuf {
  58. return sf.log
  59. }
  60. func (sf *KBusBase) run() {
  61. sf.log.Debug("KBusBase.run()")
  62. for {
  63. break
  64. }
  65. }
  66. // Unsubscribe -- отписывает обработчик от топика.
  67. func (sf *KBusBase) Unsubscribe(handler mKt.IBusHandlerSubscribe) {
  68. sf.log.Debug("KBusBase.Unsubscribe(): handler='%v'", handler.Name())
  69. sf.dictSub.Unsubscribe(handler)
  70. }
  71. // Subscribe -- подписывает обработчик на топик.
  72. func (sf *KBusBase) Subscribe(handler mKt.IBusHandlerSubscribe) mKt.IResult[bool]{
  73. sf.log.Debug("KBusBase.Subscribe(): handler='%v'", handler.Name())
  74. mL0.Hassert(!sf.IsWork_.Get(), "KBusBase.Subscribe(): handler='%v', bus already closed", handler.Name())
  75. sf.dictSub.Subscribe(handler)
  76. return mL0.NewRes(true)
  77. }
  78. // SendRequest -- отправляет запрос в шину данных.
  79. func (sf *KBusBase) SendRequest(topic *mKa.ATopic, binReq []byte) mL0.IResult[[]byte] {
  80. sf.log.Debug("KBusBase.SendRequest(): topic='%v'", topic)
  81. if !sf.IsWork_.Get() {
  82. err := fmt.Errorf("KBusBase.SendRequest(): topic='%v', bus already closed", topic)
  83. sf.log.Err(err.Error())
  84. return mL0.NewErr[[]byte](err)
  85. }
  86. res := sf.dictServe.SendRequest(topic, binReq)
  87. if res.IsErr() {
  88. err := fmt.Errorf("KBusBase.SendRequest(): topic='%v', err=\n\t%w", topic, res.Err())
  89. sf.log.Err(err.Error())
  90. return mL0.NewErr[[]byte](err)
  91. }
  92. return res
  93. }
  94. // RegisterServe -- регистрирует обработчики входящих запросов.
  95. func (sf *KBusBase) RegisterServe(handler mKt.IBusHandlerServe) mL0.IResult[bool] {
  96. if handler == nil {
  97. return mL0.NewErr[bool](fmt.Errorf("KBusBase.RegisterServe(): IBusHandlerServe==nil"))
  98. }
  99. sf.log.Debug("KBusBase.RegisterServe(): handler='%v'", handler.Name())
  100. res := sf.dictServe.Register(handler)
  101. if res.IsErr() {
  102. err := fmt.Errorf("KBusBase.RegisterServe(): handler='%v', err=\n\t%w", handler.Name(), res.Err())
  103. sf.log.Err(err.Error())
  104. return mL0.NewErr[bool](err)
  105. }
  106. return mL0.NewRes(true)
  107. }
  108. // Publish -- публикует сообщение в шину.
  109. func (sf *KBusBase) Publish(topic *mKa.ATopic, binMsg []byte) mL0.IResult[bool] {
  110. sf.log.Debug("KBusBase.Publish(): topic='%v'", topic)
  111. if !sf.IsWork_.Get() {
  112. err := fmt.Errorf("KBusBase.Publish(): topic='%v',bus already closed", topic)
  113. sf.log.Err(err.Error())
  114. return mL0.NewErr[bool](err)
  115. }
  116. // Асинхронный запуск чтения
  117. go sf.dictSub.Read(topic, binMsg)
  118. return mL0.NewRes(true)
  119. }
  120. // IsWork -- возвращает признак работы шины.
  121. func (sf *KBusBase) IsWork() bool {
  122. return sf.IsWork_.Get()
  123. }
  124. // Ожидает закрытия шины в отдельном потоке.
  125. func (sf *KBusBase) close() {
  126. sf.KCtx_.Wait()
  127. sf.KCtx_.Lock()
  128. defer sf.KCtx_.Unlock()
  129. if !sf.IsWork_.Get() {
  130. return
  131. }
  132. sf.IsWork_.Reset()
  133. sf.KCtx_.Wg().Done(busBaseStreamName)
  134. sf.log.Debug("KBusBase.close(): done")
  135. }