dict_topic_sub.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. // package dict_topic_sub -- потокобезопасный словарь подписчиков локальной шины.
  2. package dict_topic_sub
  3. import (
  4. "fmt"
  5. "sync"
  6. mKa "gitp78su.ipnodns.ru/svi/kern/v4/lev0/kalias"
  7. mKt "gitp78su.ipnodns.ru/svi/kern/v4/lev0/ktypes"
  8. mL1 "gitp78su.ipnodns.ru/svi/kern/v4/lev1"
  9. "gitp78su.ipnodns.ru/svi/kern/v4/lev2/kbus/dict_sub_hook"
  10. "gitp78su.ipnodns.ru/svi/kern/v4/lev2/kctx"
  11. )
  12. type tReadReq struct {
  13. topic mKa.ATopic
  14. binMsg []byte
  15. }
  16. // dictTopicSub -- потокобезопасный словарь подписчиков.
  17. type dictTopicSub struct {
  18. sync.RWMutex
  19. kCtx mKt.IKernelCtx
  20. dictTopicHook map[mKa.ATopic]mKt.IDictSubHook
  21. }
  22. // NewDictTopicSub -- возвращает потокобезопасный словарь подписчиков.
  23. func NewDictTopicSub() mKt.IResult[*dictTopicSub] {
  24. resCtx := kctx.GetKernelCtx()
  25. if resCtx.IsErr() {
  26. err := fmt.Errorf("NewDictTopicSub(): kCtx==nil")
  27. return mL1.NewErr[*dictTopicSub](err)
  28. }
  29. kCtx := resCtx.Val()
  30. sf := &dictTopicSub{
  31. kCtx: kCtx,
  32. dictTopicHook: map[mKa.ATopic]mKt.IDictSubHook{},
  33. }
  34. return mL1.NewRes(sf)
  35. }
  36. // Read -- вызывает обработчики при поступлении события.
  37. func (sf *dictTopicSub) Read(topic mKa.ATopic, binMsg []byte) {
  38. sf.RLock()
  39. defer sf.RUnlock()
  40. if topic == "" {
  41. sf.kCtx.Log().Err("dictTopicSub.Read(): topic is empty")
  42. return
  43. }
  44. msg := &tReadReq{
  45. topic: topic,
  46. binMsg: binMsg,
  47. }
  48. dictHook := sf.dictTopicHook[msg.topic]
  49. if dictHook == nil {
  50. return
  51. }
  52. dictHook.Read(msg.binMsg)
  53. }
  54. // Subscribe -- подписывает обработчик на топик.
  55. func (sf *dictTopicSub) Subscribe(handler mKt.IBusHandlerSubscribe) mKt.IResult[bool] {
  56. sf.Lock()
  57. defer sf.Unlock()
  58. if handler == nil {
  59. err := fmt.Errorf("dictTopicSub.Subscribe(): handler==nil")
  60. return mL1.NewErr[bool](err)
  61. }
  62. topic := handler.Topic()
  63. if topic == "" {
  64. err := fmt.Errorf("dictTopicSub.Subscribe(): topic==\"\"")
  65. return mL1.NewErr[bool](err)
  66. }
  67. dictSubHook := sf.dictTopicHook[topic]
  68. if dictSubHook == nil {
  69. resNewDictTopicSub := dict_sub_hook.NewDictSubHook()
  70. if resNewDictTopicSub.IsErr() {
  71. err := fmt.Errorf("dictTopicSub.Subscribe(): NewDictSubHook(), err=\n\t%w", resNewDictTopicSub.Err())
  72. return mL1.NewErr[bool](err)
  73. }
  74. dictSubHook = resNewDictTopicSub.Val()
  75. sf.dictTopicHook[topic] = dictSubHook
  76. }
  77. res := dictSubHook.Subscribe(handler)
  78. if res.IsErr() {
  79. err := fmt.Errorf("dictTopicSub.Subscribe(): in add subscribe dict handler, err=\n\t%w", res.Err())
  80. return mL1.NewErr[bool](err)
  81. }
  82. return mL1.NewRes(true)
  83. }
  84. // Unsubscribe -- отписывает обработчик.
  85. func (sf *dictTopicSub) Unsubscribe(handler mKt.IBusHandlerSubscribe) {
  86. sf.Lock()
  87. defer sf.Unlock()
  88. if handler == nil {
  89. return
  90. }
  91. topic := handler.Topic()
  92. dictSubHook := sf.dictTopicHook[topic]
  93. if dictSubHook == nil {
  94. return
  95. }
  96. dictSubHook.Unsubscribe(handler)
  97. }