kbus_base.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. // package kbus_base -- базовая часть шины данных.
  2. package kbus_base
  3. import (
  4. "fmt"
  5. "sync"
  6. mL0 "gitp78su.ipnodns.ru/svi/kern/v4/lev0"
  7. "gitp78su.ipnodns.ru/svi/kern/v4/lev0/defs/stream_name"
  8. "gitp78su.ipnodns.ru/svi/kern/v4/lev0/defs/topic"
  9. "gitp78su.ipnodns.ru/svi/kern/v4/lev0/etypes/ebool"
  10. "gitp78su.ipnodns.ru/svi/kern/v4/lev0/kspec"
  11. mKs "gitp78su.ipnodns.ru/svi/kern/v4/lev0/kspec"
  12. mL1 "gitp78su.ipnodns.ru/svi/kern/v4/lev1"
  13. "gitp78su.ipnodns.ru/svi/kern/v4/lev2/kbus/dict_topic_serve"
  14. "gitp78su.ipnodns.ru/svi/kern/v4/lev2/kbus/dict_topic_sub"
  15. "gitp78su.ipnodns.ru/svi/kern/v4/lev2/kctx"
  16. )
  17. var (
  18. busBaseStreamName = stream_name.NewAStreamName("bus_base")
  19. )
  20. // KBusBase -- базовая часть шины данных.
  21. type KBusBase struct {
  22. KCtx_ mKs.IKernelCtx
  23. IsWork_ mKs.ISafeBool
  24. lCtx mKs.ILocalCtx
  25. log mKs.ILogBuf
  26. dictSub mKs.IDictTopicSub
  27. dictServe mKs.IDictTopicServe
  28. }
  29. var (
  30. Bus_ *KBusBase
  31. block sync.Mutex
  32. )
  33. // GetKernelBusBase -- возвращает базовую шину сообщений.
  34. func GetKernelBusBase() *KBusBase {
  35. block.Lock()
  36. defer block.Unlock()
  37. if Bus_ != nil {
  38. return Bus_
  39. }
  40. kCtx := kctx.GetKernelCtx()
  41. lCtx := mL1.NewLocalCtx(kCtx.Ctx())
  42. Bus_ = &KBusBase{
  43. KCtx_: kCtx,
  44. IsWork_: mL1.NewSafeBool(false),
  45. dictSub: dict_topic_sub.NewDictTopicSub(),
  46. dictServe: dict_topic_serve.NewDictServe(),
  47. lCtx: lCtx,
  48. }
  49. Bus_.log = Bus_.lCtx.Log()
  50. go Bus_.close()
  51. go Bus_.run()
  52. Bus_.IsWork_.Set()
  53. Bus_.KCtx_.Wg().Add(busBaseStreamName)
  54. Bus_.KCtx_.Set("kernBusBase", Bus_, "base of data bus")
  55. _ = mKs.IKernelBus(Bus_)
  56. return Bus_
  57. }
  58. // Log -- возвращает лог шины.
  59. func (sf *KBusBase) Log() mKs.ILogBuf {
  60. return sf.log
  61. }
  62. func (sf *KBusBase) run() {
  63. sf.log.Debug("KBusBase.run()")
  64. for {
  65. break
  66. }
  67. }
  68. // Unsubscribe -- отписывает обработчик от топика.
  69. func (sf *KBusBase) Unsubscribe(handler mKs.IBusHandlerSubscribe) {
  70. sf.log.Debug("KBusBase.Unsubscribe(): handler='%v'", handler.Name())
  71. sf.dictSub.Unsubscribe(handler)
  72. }
  73. // Subscribe -- подписывает обработчик на топик.
  74. func (sf *KBusBase) Subscribe(handler mKs.IBusHandlerSubscribe) mKs.IResult[kspec.EBool] {
  75. sf.log.Debug("KBusBase.Subscribe(): handler='%v'", handler.Name())
  76. mL0.Hassert(!sf.IsWork_.Get(), "KBusBase.Subscribe(): handler='%v', bus already closed", handler.Name())
  77. sf.dictSub.Subscribe(handler)
  78. return mL0.NewOk(ebool.NewEBool(true))
  79. }
  80. // SendRequest -- отправляет запрос в шину данных.
  81. func (sf *KBusBase) SendRequest(topic *topic.ATopic, binReq []byte) mL0.IResult[[]byte] {
  82. sf.log.Debug("KBusBase.SendRequest(): topic='%v'", topic)
  83. if !sf.IsWork_.Get() {
  84. err := fmt.Errorf("KBusBase.SendRequest(): topic='%v', bus already closed", topic)
  85. sf.log.Err(err.Error())
  86. return mL0.NewErr[[]byte](err)
  87. }
  88. res := sf.dictServe.SendRequest(topic, binReq)
  89. if res.IsErr() {
  90. err := fmt.Errorf("KBusBase.SendRequest(): topic='%v', err=\n\t%w", topic, res.Err())
  91. sf.log.Err(err.Error())
  92. return mL0.NewErr[[]byte](err)
  93. }
  94. return res
  95. }
  96. // RegisterServe -- регистрирует обработчики входящих запросов.
  97. func (sf *KBusBase) RegisterServe(handler mKs.IBusHandlerServe) mL0.IResult[kspec.EBool] {
  98. if handler == nil {
  99. return mL0.NewErr[kspec.EBool](fmt.Errorf("KBusBase.RegisterServe(): IBusHandlerServe==nil"))
  100. }
  101. sf.log.Debug("KBusBase.RegisterServe(): handler='%v'", handler.Name())
  102. res := sf.dictServe.Register(handler)
  103. if res.IsErr() {
  104. err := fmt.Errorf("KBusBase.RegisterServe(): handler='%v', err=\n\t%w", handler.Name(), res.Err())
  105. sf.log.Err(err.Error())
  106. return mL0.NewErr[kspec.EBool](err)
  107. }
  108. return mL0.NewOk(ebool.NewEBool(true))
  109. }
  110. // Publish -- публикует сообщение в шину.
  111. func (sf *KBusBase) Publish(topic *topic.ATopic, binMsg []byte) mL0.IResult[kspec.EBool] {
  112. sf.log.Debug("KBusBase.Publish(): topic='%v'", topic)
  113. if !sf.IsWork_.Get() {
  114. err := fmt.Errorf("KBusBase.Publish(): topic='%v',bus already closed", topic)
  115. sf.log.Err(err.Error())
  116. return mL0.NewErr[kspec.EBool](err)
  117. }
  118. // Асинхронный запуск чтения
  119. go sf.dictSub.Read(topic, binMsg)
  120. return mL0.NewOk(ebool.NewEBool(true))
  121. }
  122. // IsWork -- возвращает признак работы шины.
  123. func (sf *KBusBase) IsWork() kspec.EBool {
  124. return ebool.NewEBool(sf.IsWork_.Get())
  125. }
  126. // Ожидает закрытия шины в отдельном потоке.
  127. func (sf *KBusBase) close() {
  128. sf.KCtx_.Wait()
  129. sf.KCtx_.Lock()
  130. defer sf.KCtx_.Unlock()
  131. if !sf.IsWork_.Get() {
  132. return
  133. }
  134. sf.IsWork_.Reset()
  135. sf.KCtx_.Wg().Done(busBaseStreamName)
  136. sf.log.Debug("KBusBase.close(): done")
  137. }