dict_topic_sub.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. // package dict_topic_sub -- потокобезопасный словарь подписчиков локальной шины
  2. package dict_topic_sub
  3. import (
  4. "fmt"
  5. "sync"
  6. . "gitp78su.ipnodns.ru/svi/kern/v4/lev0/kalias"
  7. . "gitp78su.ipnodns.ru/svi/kern/v4/lev0/ktypes"
  8. . "gitp78su.ipnodns.ru/svi/kern/v4/lev1"
  9. "gitp78su.ipnodns.ru/svi/kern/v4/lev2/kbus/dict_sub_hook"
  10. "gitp78su.ipnodns.ru/svi/kern/v4/lev2/kctx"
  11. )
  12. type tReadReq struct {
  13. topic ATopic
  14. binMsg []byte
  15. }
  16. // dictTopicSub -- потокобезопасный словарь подписчиков
  17. type dictTopicSub struct {
  18. sync.RWMutex
  19. kCtx IKernelCtx
  20. dictTopicHook map[ATopic]IDictSubHook
  21. }
  22. // NewDictTopicSub -- возвращает потокобезопасный словарь подписчиков
  23. func NewDictTopicSub() IResult[*dictTopicSub] {
  24. resCtx := kctx.GetKernelCtx()
  25. if resCtx.IsErr() {
  26. err := fmt.Errorf("NewDictTopicSub(): kCtx==nil")
  27. return NewErr[*dictTopicSub](err)
  28. }
  29. kCtx := resCtx.Val()
  30. sf := &dictTopicSub{
  31. kCtx: kCtx,
  32. dictTopicHook: map[ATopic]IDictSubHook{},
  33. }
  34. return NewRes(sf)
  35. }
  36. // Read -- вызывает обработчики при поступлении события
  37. func (sf *dictTopicSub) Read(topic ATopic, binMsg []byte) {
  38. sf.RLock()
  39. defer sf.RUnlock()
  40. if topic == "" {
  41. sf.kCtx.Log().Err("dictTopicSub.Read(): topic is empty")
  42. return
  43. }
  44. msg := &tReadReq{
  45. topic: topic,
  46. binMsg: binMsg,
  47. }
  48. dictHook := sf.dictTopicHook[msg.topic]
  49. if dictHook == nil {
  50. return
  51. }
  52. dictHook.Read(msg.binMsg)
  53. }
  54. // Subscribe -- подписывает обработчик на топик
  55. func (sf *dictTopicSub) Subscribe(handler IBusHandlerSubscribe) IResult[bool] {
  56. sf.Lock()
  57. defer sf.Unlock()
  58. if handler == nil {
  59. err := fmt.Errorf("dictTopicSub.Subscribe(): handler==nil")
  60. return NewErr[bool](err)
  61. }
  62. topic := handler.Topic()
  63. if topic == "" {
  64. err := fmt.Errorf("dictTopicSub.Subscribe(): topic==\"\"")
  65. return NewErr[bool](err)
  66. }
  67. dictSubHook := sf.dictTopicHook[topic]
  68. if dictSubHook == nil {
  69. resNewDictTopicSub := dict_sub_hook.NewDictSubHook()
  70. if resNewDictTopicSub.IsErr() {
  71. err := fmt.Errorf("dictTopicSub.Subscribe(): NewDictSubHook(), err=\n\t%w", resNewDictTopicSub.Err())
  72. return NewErr[bool](err)
  73. }
  74. dictSubHook = resNewDictTopicSub.Val()
  75. sf.dictTopicHook[topic] = dictSubHook
  76. }
  77. res := dictSubHook.Subscribe(handler)
  78. if res.IsErr() {
  79. err := fmt.Errorf("dictTopicSub.Subscribe(): in add subscribe dict handler, err=\n\t%w", res.Err())
  80. return NewErr[bool](err)
  81. }
  82. return NewRes(true)
  83. }
  84. // Unsubscribe -- отписывает обработчик
  85. func (sf *dictTopicSub) Unsubscribe(handler IBusHandlerSubscribe) {
  86. sf.Lock()
  87. defer sf.Unlock()
  88. if handler == nil {
  89. return
  90. }
  91. topic := handler.Topic()
  92. dictSubHook := sf.dictTopicHook[topic]
  93. if dictSubHook == nil {
  94. return
  95. }
  96. dictSubHook.Unsubscribe(handler)
  97. }