kbus_base.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
  1. // package kbus_base -- базовая часть шины данных
  2. package kbus_base
  3. import (
  4. "fmt"
  5. "sync"
  6. . "gitp78su.ipnodns.ru/svi/kern/kc/helpers"
  7. "gitp78su.ipnodns.ru/svi/kern/kc/local_ctx"
  8. "gitp78su.ipnodns.ru/svi/kern/kc/safe_bool"
  9. . "gitp78su.ipnodns.ru/svi/kern/krn/kalias"
  10. "gitp78su.ipnodns.ru/svi/kern/krn/kbus/dict_topic_serve"
  11. "gitp78su.ipnodns.ru/svi/kern/krn/kbus/dict_topic_sub"
  12. "gitp78su.ipnodns.ru/svi/kern/krn/kctx"
  13. . "gitp78su.ipnodns.ru/svi/kern/krn/ktypes"
  14. )
  15. const (
  16. strBusBaseStream = "bus_base"
  17. )
  18. // KBusBase -- базовая часть шины данных
  19. type KBusBase struct {
  20. Ctx_ IKernelCtx
  21. IsWork_ ISafeBool
  22. ctx ILocalCtx
  23. log ILogBuf
  24. dictSub IDictTopicSub
  25. dictServe IDictTopicServe
  26. }
  27. var (
  28. Bus_ *KBusBase
  29. block sync.Mutex
  30. )
  31. // GetKernelBusBase -- возвращает базовую шину сообщений
  32. func GetKernelBusBase() *KBusBase {
  33. block.Lock()
  34. defer block.Unlock()
  35. if Bus_ != nil {
  36. return Bus_
  37. }
  38. ctx := kctx.GetKernelCtx()
  39. Bus_ = &KBusBase{
  40. Ctx_: ctx,
  41. IsWork_: safe_bool.NewSafeBool(),
  42. dictSub: dict_topic_sub.NewDictTopicSub(),
  43. dictServe: dict_topic_serve.NewDictServe(),
  44. ctx: local_ctx.NewLocalCtx(ctx.BaseCtx()),
  45. }
  46. Bus_.log = Bus_.ctx.Log()
  47. go Bus_.close()
  48. go Bus_.run()
  49. Bus_.IsWork_.Set()
  50. err := Bus_.Ctx_.Wg().Add(strBusBaseStream)
  51. Hassert(err == nil, "GetKernelBusBase(): in add name stream(%v), err=\n\t%v", strBusBaseStream, err)
  52. ctx.Set("kernBusBase", Bus_, "base of data bus")
  53. _ = IKernelBus(Bus_)
  54. return Bus_
  55. }
  56. // Log -- возвращает лог шины
  57. func (sf *KBusBase) Log() ILogBuf {
  58. return sf.log
  59. }
  60. func (sf *KBusBase) run() {
  61. sf.log.Debug("KBusBase.run()")
  62. for {
  63. break
  64. }
  65. }
  66. // Unsubscribe -- отписывает обработчик от топика
  67. func (sf *KBusBase) Unsubscribe(handler IBusHandlerSubscribe) {
  68. sf.log.Debug("KBusBase.Unsubscribe(): handler='%v'", handler.Name())
  69. sf.dictSub.Unsubscribe(handler)
  70. }
  71. // Subscribe -- подписывает обработчик на топик
  72. func (sf *KBusBase) Subscribe(handler IBusHandlerSubscribe) error {
  73. sf.log.Debug("KBusBase.Subscribe(): handler='%v'", handler.Name())
  74. if !sf.IsWork_.Get() {
  75. err := fmt.Errorf("KBusBase.Subscribe(): handler='%v', bus already closed", handler.Name())
  76. sf.log.Err(err.Error())
  77. return err
  78. }
  79. sf.dictSub.Subscribe(handler)
  80. return nil
  81. }
  82. // SendRequest -- отправляет запрос в шину данных
  83. func (sf *KBusBase) SendRequest(topic ATopic, binReq []byte) ([]byte, error) {
  84. sf.log.Debug("KBusBase.SendRequest(): topic='%v'", topic)
  85. if !sf.IsWork_.Get() {
  86. err := fmt.Errorf("KBusBase.SendRequest(): topic='%v', bus already closed", topic)
  87. sf.log.Err(err.Error())
  88. return nil, err
  89. }
  90. binResp, err := sf.dictServe.SendRequest(topic, binReq)
  91. if err != nil {
  92. err := fmt.Errorf("KBusBase.SendRequest(): topic='%v', err=\n\t%w", topic, err)
  93. sf.log.Err(err.Error())
  94. return nil, err
  95. }
  96. return binResp, nil
  97. }
  98. // RegisterServe -- регистрирует обработчики входящих запросов
  99. func (sf *KBusBase) RegisterServe(handler IBusHandlerServe) {
  100. Hassert(handler != nil, "KBusBase.RegisterServe(): IBusHandlerSubscribe==nil")
  101. sf.log.Debug("KBusBase.RegisterServe(): handler='%v'", handler.Name())
  102. sf.dictServe.Register(handler)
  103. }
  104. // Publish -- публикует сообщение в шину
  105. func (sf *KBusBase) Publish(topic ATopic, binMsg []byte) (err error) {
  106. sf.log.Debug("KBusBase.Publish(): topic='%v'", topic)
  107. if !sf.IsWork_.Get() {
  108. err := fmt.Errorf("KBusBase.Publish(): topic='%v',bus already closed", topic)
  109. sf.log.Err(err.Error())
  110. return err
  111. }
  112. // Асинхронный запуск чтения
  113. go sf.dictSub.Read(topic, binMsg)
  114. return nil
  115. }
  116. // IsWork -- возвращает признак работы шины
  117. func (sf *KBusBase) IsWork() bool {
  118. return sf.IsWork_.Get()
  119. }
  120. // Ожидает закрытия шины в отдельном потоке
  121. func (sf *KBusBase) close() {
  122. sf.Ctx_.Done()
  123. sf.Ctx_.Lock()
  124. defer sf.Ctx_.Unlock()
  125. if !sf.IsWork_.Get() {
  126. return
  127. }
  128. sf.IsWork_.Reset()
  129. sf.Ctx_.Wg().Done(strBusBaseStream)
  130. sf.log.Debug("KBusBase.close(): done")
  131. }