dict_topic_sub.go 2.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. // package dict_topic_sub -- потокобезопасный словарь подписчиков локальной шины
  2. package dict_topic_sub
  3. import (
  4. "fmt"
  5. "sync"
  6. "gitp78su.ipnodns.ru/svi/kern/v4/krn/kbus/dict_sub_hook"
  7. "gitp78su.ipnodns.ru/svi/kern/v4/krn/kctx"
  8. . "gitp78su.ipnodns.ru/svi/kern/v4/lev0/kalias"
  9. . "gitp78su.ipnodns.ru/svi/kern/v4/lev0/ktypes"
  10. . "gitp78su.ipnodns.ru/svi/kern/v4/lev1/helpers"
  11. . "gitp78su.ipnodns.ru/svi/kern/v4/lev1/result"
  12. )
  13. type tReadReq struct {
  14. topic ATopic
  15. binMsg []byte
  16. }
  17. // dictTopicSub -- потокобезопасный словарь подписчиков
  18. type dictTopicSub struct {
  19. sync.RWMutex
  20. ctx IKernelCtx
  21. dictTopicHook map[ATopic]IDictSubHook
  22. }
  23. // NewDictTopicSub -- возвращает потокобезопасный словарь подписчиков
  24. func NewDictTopicSub() IDictTopicSub {
  25. sf := &dictTopicSub{
  26. ctx: kctx.GetKernelCtx(),
  27. dictTopicHook: map[ATopic]IDictSubHook{},
  28. }
  29. return sf
  30. }
  31. // Read -- вызывает обработчики при поступлении события
  32. func (sf *dictTopicSub) Read(topic ATopic, binMsg []byte) {
  33. sf.RLock()
  34. defer sf.RUnlock()
  35. Hassert(topic != "", "dictTopicSub.Read(): topic is empty")
  36. msg := &tReadReq{
  37. topic: topic,
  38. binMsg: binMsg,
  39. }
  40. dictHook := sf.dictTopicHook[msg.topic]
  41. if dictHook == nil {
  42. return
  43. }
  44. dictHook.Read(msg.binMsg)
  45. }
  46. // Subscribe -- подписывает обработчик на топик
  47. func (sf *dictTopicSub) Subscribe(handler IBusHandlerSubscribe) IResult[bool] {
  48. sf.Lock()
  49. defer sf.Unlock()
  50. if handler == nil {
  51. err := fmt.Errorf("dictTopicSub.Subscribe(): handler==nil")
  52. return NewErr[bool](err)
  53. }
  54. topic := handler.Topic()
  55. if topic == "" {
  56. err := fmt.Errorf("dictTopicSub.Subscribe(): topic==\"\"")
  57. return NewErr[bool](err)
  58. }
  59. dictSubHook := sf.dictTopicHook[topic]
  60. if dictSubHook == nil {
  61. dictSubHook = dict_sub_hook.NewDictSubHook()
  62. sf.dictTopicHook[topic] = dictSubHook
  63. }
  64. res := dictSubHook.Subscribe(handler)
  65. if res.IsErr() {
  66. err := fmt.Errorf("dictTopicSub.Subscribe(): in add subscribe dict handler, err=\n\t%w", res.Err())
  67. return NewErr[bool](err)
  68. }
  69. return NewRes(true)
  70. }
  71. // Unsubscribe -- отписывает обработчик
  72. func (sf *dictTopicSub) Unsubscribe(handler IBusHandlerSubscribe) {
  73. sf.Lock()
  74. defer sf.Unlock()
  75. if handler == nil {
  76. return
  77. }
  78. topic := handler.Topic()
  79. dictSubHook := sf.dictTopicHook[topic]
  80. if dictSubHook == nil {
  81. return
  82. }
  83. dictSubHook.Unsubscribe(handler)
  84. }