kbus_base.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
  1. // package kbus_base -- базовая часть шины данных.
  2. package kbus_base
  3. import (
  4. "fmt"
  5. "sync"
  6. mL0 "gitp78su.ipnodns.ru/svi/kern/v4/lev0"
  7. mKd "gitp78su.ipnodns.ru/svi/kern/v4/lev0/defs"
  8. mKs "gitp78su.ipnodns.ru/svi/kern/v4/lev0/kspec"
  9. mL1 "gitp78su.ipnodns.ru/svi/kern/v4/lev1"
  10. "gitp78su.ipnodns.ru/svi/kern/v4/lev2/kbus/dict_topic_serve"
  11. "gitp78su.ipnodns.ru/svi/kern/v4/lev2/kbus/dict_topic_sub"
  12. "gitp78su.ipnodns.ru/svi/kern/v4/lev2/kctx"
  13. )
  14. var (
  15. busBaseStreamName = mKd.NewStreamName("bus_base")
  16. )
  17. // KBusBase -- базовая часть шины данных.
  18. type KBusBase struct {
  19. KCtx_ mKs.IKernelCtx
  20. IsWork_ mKs.ISafeBool
  21. lCtx mKs.ILocalCtx
  22. log mKs.ILogBuf
  23. dictSub mKs.IDictTopicSub
  24. dictServe mKs.IDictTopicServe
  25. }
  26. var (
  27. Bus_ *KBusBase
  28. block sync.Mutex
  29. )
  30. // GetKernelBusBase -- возвращает базовую шину сообщений.
  31. func GetKernelBusBase() *KBusBase {
  32. block.Lock()
  33. defer block.Unlock()
  34. if Bus_ != nil {
  35. return Bus_
  36. }
  37. kCtx := kctx.GetKernelCtx()
  38. lCtx := mL1.NewLocalCtx(kCtx.Ctx())
  39. Bus_ = &KBusBase{
  40. KCtx_: kCtx,
  41. IsWork_: mL1.NewSafeBool(),
  42. dictSub: dict_topic_sub.NewDictTopicSub(),
  43. dictServe: dict_topic_serve.NewDictServe(),
  44. lCtx: lCtx,
  45. }
  46. Bus_.log = Bus_.lCtx.Log()
  47. go Bus_.close()
  48. go Bus_.run()
  49. Bus_.IsWork_.Set()
  50. Bus_.KCtx_.Wg().Add(busBaseStreamName)
  51. Bus_.KCtx_.Set("kernBusBase", Bus_, "base of data bus")
  52. _ = mKs.IKernelBus(Bus_)
  53. return Bus_
  54. }
  55. // Log -- возвращает лог шины.
  56. func (sf *KBusBase) Log() mKs.ILogBuf {
  57. return sf.log
  58. }
  59. func (sf *KBusBase) run() {
  60. sf.log.Debug("KBusBase.run()")
  61. for {
  62. break
  63. }
  64. }
  65. // Unsubscribe -- отписывает обработчик от топика.
  66. func (sf *KBusBase) Unsubscribe(handler mKs.IBusHandlerSubscribe) {
  67. sf.log.Debug("KBusBase.Unsubscribe(): handler='%v'", handler.Name())
  68. sf.dictSub.Unsubscribe(handler)
  69. }
  70. // Subscribe -- подписывает обработчик на топик.
  71. func (sf *KBusBase) Subscribe(handler mKs.IBusHandlerSubscribe) mKs.IResult[bool] {
  72. sf.log.Debug("KBusBase.Subscribe(): handler='%v'", handler.Name())
  73. mL0.Hassert(!sf.IsWork_.Get(), "KBusBase.Subscribe(): handler='%v', bus already closed", handler.Name())
  74. sf.dictSub.Subscribe(handler)
  75. return mL0.NewRes(true)
  76. }
  77. // SendRequest -- отправляет запрос в шину данных.
  78. func (sf *KBusBase) SendRequest(topic *mKd.Topic, binReq []byte) mL0.IResult[[]byte] {
  79. sf.log.Debug("KBusBase.SendRequest(): topic='%v'", topic)
  80. if !sf.IsWork_.Get() {
  81. err := fmt.Errorf("KBusBase.SendRequest(): topic='%v', bus already closed", topic)
  82. sf.log.Err(err.Error())
  83. return mL0.NewErr[[]byte](err)
  84. }
  85. res := sf.dictServe.SendRequest(topic, binReq)
  86. if res.IsErr() {
  87. err := fmt.Errorf("KBusBase.SendRequest(): topic='%v', err=\n\t%w", topic, res.Err())
  88. sf.log.Err(err.Error())
  89. return mL0.NewErr[[]byte](err)
  90. }
  91. return res
  92. }
  93. // RegisterServe -- регистрирует обработчики входящих запросов.
  94. func (sf *KBusBase) RegisterServe(handler mKs.IBusHandlerServe) mL0.IResult[bool] {
  95. if handler == nil {
  96. return mL0.NewErr[bool](fmt.Errorf("KBusBase.RegisterServe(): IBusHandlerServe==nil"))
  97. }
  98. sf.log.Debug("KBusBase.RegisterServe(): handler='%v'", handler.Name())
  99. res := sf.dictServe.Register(handler)
  100. if res.IsErr() {
  101. err := fmt.Errorf("KBusBase.RegisterServe(): handler='%v', err=\n\t%w", handler.Name(), res.Err())
  102. sf.log.Err(err.Error())
  103. return mL0.NewErr[bool](err)
  104. }
  105. return mL0.NewRes(true)
  106. }
  107. // Publish -- публикует сообщение в шину.
  108. func (sf *KBusBase) Publish(topic *mKd.Topic, binMsg []byte) mL0.IResult[bool] {
  109. sf.log.Debug("KBusBase.Publish(): topic='%v'", topic)
  110. if !sf.IsWork_.Get() {
  111. err := fmt.Errorf("KBusBase.Publish(): topic='%v',bus already closed", topic)
  112. sf.log.Err(err.Error())
  113. return mL0.NewErr[bool](err)
  114. }
  115. // Асинхронный запуск чтения
  116. go sf.dictSub.Read(topic, binMsg)
  117. return mL0.NewRes(true)
  118. }
  119. // IsWork -- возвращает признак работы шины.
  120. func (sf *KBusBase) IsWork() bool {
  121. return sf.IsWork_.Get()
  122. }
  123. // Ожидает закрытия шины в отдельном потоке.
  124. func (sf *KBusBase) close() {
  125. sf.KCtx_.Wait()
  126. sf.KCtx_.Lock()
  127. defer sf.KCtx_.Unlock()
  128. if !sf.IsWork_.Get() {
  129. return
  130. }
  131. sf.IsWork_.Reset()
  132. sf.KCtx_.Wg().Done(busBaseStreamName)
  133. sf.log.Debug("KBusBase.close(): done")
  134. }