kbus_base.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. // package kbus_base -- базовая часть шины данных
  2. package kbus_base
  3. import (
  4. "fmt"
  5. "sync"
  6. "gitp78su.ipnodns.ru/svi/kern/v4/krn/kbus/dict_topic_serve"
  7. "gitp78su.ipnodns.ru/svi/kern/v4/krn/kbus/dict_topic_sub"
  8. "gitp78su.ipnodns.ru/svi/kern/v4/krn/kctx"
  9. . "gitp78su.ipnodns.ru/svi/kern/v4/lev0/kalias"
  10. . "gitp78su.ipnodns.ru/svi/kern/v4/lev0/ktypes"
  11. . "gitp78su.ipnodns.ru/svi/kern/v4/lev1/helpers"
  12. "gitp78su.ipnodns.ru/svi/kern/v4/lev1/local_ctx"
  13. . "gitp78su.ipnodns.ru/svi/kern/v4/lev1/result"
  14. "gitp78su.ipnodns.ru/svi/kern/v4/lev1/safe_bool"
  15. )
  16. const (
  17. strBusBaseStream = "bus_base"
  18. )
  19. // KBusBase -- базовая часть шины данных
  20. type KBusBase struct {
  21. Ctx_ IKernelCtx
  22. IsWork_ ISafeBool
  23. ctx ILocalCtx
  24. log ILogBuf
  25. dictSub IDictTopicSub
  26. dictServe IDictTopicServe
  27. }
  28. var (
  29. Bus_ *KBusBase
  30. block sync.Mutex
  31. )
  32. // GetKernelBusBase -- возвращает базовую шину сообщений
  33. func GetKernelBusBase() *KBusBase {
  34. block.Lock()
  35. defer block.Unlock()
  36. if Bus_ != nil {
  37. return Bus_
  38. }
  39. ctx := kctx.GetKernelCtx()
  40. Bus_ = &KBusBase{
  41. Ctx_: ctx,
  42. IsWork_: safe_bool.NewSafeBool(),
  43. dictSub: dict_topic_sub.NewDictTopicSub(),
  44. dictServe: dict_topic_serve.NewDictServe(),
  45. ctx: local_ctx.NewLocalCtx(ctx.Ctx()),
  46. }
  47. Bus_.log = Bus_.ctx.Log()
  48. go Bus_.close()
  49. go Bus_.run()
  50. Bus_.IsWork_.Set()
  51. res := Bus_.Ctx_.Wg().Add(strBusBaseStream)
  52. res.Hassert("GetKernelBusBase(): in add name stream(%v)", strBusBaseStream)
  53. ctx.Set("kernBusBase", Bus_, "base of data bus")
  54. _ = IKernelBus(Bus_)
  55. return Bus_
  56. }
  57. // Log -- возвращает лог шины
  58. func (sf *KBusBase) Log() ILogBuf {
  59. return sf.log
  60. }
  61. func (sf *KBusBase) run() {
  62. sf.log.Debug("KBusBase.run()")
  63. for {
  64. break
  65. }
  66. }
  67. // Unsubscribe -- отписывает обработчик от топика
  68. func (sf *KBusBase) Unsubscribe(handler IBusHandlerSubscribe) {
  69. sf.log.Debug("KBusBase.Unsubscribe(): handler='%v'", handler.Name())
  70. sf.dictSub.Unsubscribe(handler)
  71. }
  72. // Subscribe -- подписывает обработчик на топик
  73. func (sf *KBusBase) Subscribe(handler IBusHandlerSubscribe) IResult[bool] {
  74. sf.log.Debug("KBusBase.Subscribe(): handler='%v'", handler.Name())
  75. if !sf.IsWork_.Get() {
  76. err := fmt.Errorf("KBusBase.Subscribe(): handler='%v', bus already closed", handler.Name())
  77. sf.log.Err(err.Error())
  78. return NewErr[bool](err)
  79. }
  80. sf.dictSub.Subscribe(handler)
  81. return NewRes(true)
  82. }
  83. // SendRequest -- отправляет запрос в шину данных
  84. func (sf *KBusBase) SendRequest(topic ATopic, binReq []byte) IResult[[]byte] {
  85. sf.log.Debug("KBusBase.SendRequest(): topic='%v'", topic)
  86. if !sf.IsWork_.Get() {
  87. err := fmt.Errorf("KBusBase.SendRequest(): topic='%v', bus already closed", topic)
  88. sf.log.Err(err.Error())
  89. return NewErr[[]byte](err)
  90. }
  91. res := sf.dictServe.SendRequest(topic, binReq)
  92. if res.IsErr() {
  93. err := fmt.Errorf("KBusBase.SendRequest(): topic='%v', err=\n\t%w", topic, res.Err())
  94. sf.log.Err(err.Error())
  95. return NewErr[[]byte](err)
  96. }
  97. return res
  98. }
  99. // RegisterServe -- регистрирует обработчики входящих запросов
  100. func (sf *KBusBase) RegisterServe(handler IBusHandlerServe) IResult[bool] {
  101. Hassert(handler != nil, "KBusBase.RegisterServe(): IBusHandlerSubscribe==nil")
  102. sf.log.Debug("KBusBase.RegisterServe(): handler='%v'", handler.Name())
  103. res := sf.dictServe.Register(handler)
  104. if res.IsErr() {
  105. err := fmt.Errorf("KBusBase.RegisterServe(): handler='%v', err=\n\t%w", handler.Name(), res.Err())
  106. sf.log.Err(err.Error())
  107. return NewErr[bool](err)
  108. }
  109. return NewRes(true)
  110. }
  111. // Publish -- публикует сообщение в шину
  112. func (sf *KBusBase) Publish(topic ATopic, binMsg []byte) IResult[bool] {
  113. sf.log.Debug("KBusBase.Publish(): topic='%v'", topic)
  114. if !sf.IsWork_.Get() {
  115. err := fmt.Errorf("KBusBase.Publish(): topic='%v',bus already closed", topic)
  116. sf.log.Err(err.Error())
  117. return NewErr[bool](err)
  118. }
  119. // Асинхронный запуск чтения
  120. go sf.dictSub.Read(topic, binMsg)
  121. return NewRes(true)
  122. }
  123. // IsWork -- возвращает признак работы шины
  124. func (sf *KBusBase) IsWork() bool {
  125. return sf.IsWork_.Get()
  126. }
  127. // Ожидает закрытия шины в отдельном потоке
  128. func (sf *KBusBase) close() {
  129. sf.Ctx_.Done()
  130. sf.Ctx_.Lock()
  131. defer sf.Ctx_.Unlock()
  132. if !sf.IsWork_.Get() {
  133. return
  134. }
  135. sf.IsWork_.Reset()
  136. sf.Ctx_.Wg().Done(strBusBaseStream)
  137. sf.log.Debug("KBusBase.close(): done")
  138. }