dict_topic_sub.go 2.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879
  1. // package dict_topic_sub -- потокобезопасный словарь подписчиков локальной шины
  2. package dict_topic_sub
  3. import (
  4. "sync"
  5. . "gitp78su.ipnodns.ru/svi/kern/v3/kc/helpers"
  6. . "gitp78su.ipnodns.ru/svi/kern/v3/krn/kalias"
  7. "gitp78su.ipnodns.ru/svi/kern/v3/krn/kbus/dict_sub_hook"
  8. "gitp78su.ipnodns.ru/svi/kern/v3/krn/kctx"
  9. . "gitp78su.ipnodns.ru/svi/kern/v3/krn/ktypes"
  10. )
  11. type tReadReq struct {
  12. topic ATopic
  13. binMsg []byte
  14. }
  15. // dictTopicSub -- потокобезопасный словарь подписчиков
  16. type dictTopicSub struct {
  17. sync.RWMutex
  18. ctx IKernelCtx
  19. dictTopicHook map[ATopic]IDictSubHook
  20. }
  21. // NewDictTopicSub -- возвращает потокобезопасный словарь подписчиков
  22. func NewDictTopicSub() IDictTopicSub {
  23. sf := &dictTopicSub{
  24. ctx: kctx.GetKernelCtx(),
  25. dictTopicHook: map[ATopic]IDictSubHook{},
  26. }
  27. return sf
  28. }
  29. // Read -- вызывает обработчики при поступлении события
  30. func (sf *dictTopicSub) Read(topic ATopic, binMsg []byte) {
  31. sf.RLock()
  32. defer sf.RUnlock()
  33. Hassert(topic != "", "dictTopicSub.Read(): topic is empty")
  34. msg := &tReadReq{
  35. topic: topic,
  36. binMsg: binMsg,
  37. }
  38. dictHook := sf.dictTopicHook[msg.topic]
  39. if dictHook == nil {
  40. return
  41. }
  42. dictHook.Read(msg.binMsg)
  43. }
  44. // Subscribe -- подписывает обработчик на топик
  45. func (sf *dictTopicSub) Subscribe(handler IBusHandlerSubscribe) {
  46. sf.Lock()
  47. defer sf.Unlock()
  48. Hassert(handler != nil, "dictTopicSub.Subscribe(): IBusHandlerSubscribe==nil")
  49. topic := handler.Topic()
  50. Hassert(topic != "", "dictTopicSub.Subscribe(): topic is empty")
  51. dictSubHook := sf.dictTopicHook[topic]
  52. if dictSubHook == nil {
  53. dictSubHook = dict_sub_hook.NewDictSubHook()
  54. sf.dictTopicHook[topic] = dictSubHook
  55. }
  56. dictSubHook.Subscribe(handler)
  57. }
  58. // Unsubscribe -- отписывает обработчик
  59. func (sf *dictTopicSub) Unsubscribe(handler IBusHandlerSubscribe) {
  60. sf.Lock()
  61. defer sf.Unlock()
  62. if handler == nil {
  63. return
  64. }
  65. topic := handler.Topic()
  66. dictSubHook := sf.dictTopicHook[topic]
  67. if dictSubHook == nil {
  68. return
  69. }
  70. dictSubHook.Unsubscribe(handler)
  71. }