dict_topic_sub.go 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475
  1. // package dict_topic_sub -- потокобезопасный словарь подписчиков локальной шины.
  2. package dict_topic_sub
  3. import (
  4. "sync"
  5. mL0 "gitp78su.ipnodns.ru/svi/kern/v4/lev0"
  6. mKd "gitp78su.ipnodns.ru/svi/kern/v4/lev0/defs"
  7. mKs "gitp78su.ipnodns.ru/svi/kern/v4/lev0/kspec"
  8. "gitp78su.ipnodns.ru/svi/kern/v4/lev2/kbus/dict_sub_hook"
  9. "gitp78su.ipnodns.ru/svi/kern/v4/lev2/kctx"
  10. )
  11. type tReadReq struct {
  12. topic *mKd.Topic
  13. binMsg []byte
  14. }
  15. // dictTopicSub -- потокобезопасный словарь подписчиков.
  16. type dictTopicSub struct {
  17. sync.RWMutex
  18. kCtx mKs.IKernelCtx
  19. dictTopicHook map[*mKd.Topic]mKs.IDictSubHook
  20. }
  21. // NewDictTopicSub -- возвращает потокобезопасный словарь подписчиков.
  22. func NewDictTopicSub() *dictTopicSub {
  23. sf := &dictTopicSub{
  24. kCtx: kctx.GetKernelCtx(),
  25. dictTopicHook: map[*mKd.Topic]mKs.IDictSubHook{},
  26. }
  27. return sf
  28. }
  29. // Read -- вызывает обработчики при поступлении события.
  30. func (sf *dictTopicSub) Read(topic *mKd.Topic, binMsg []byte) {
  31. sf.RLock()
  32. defer sf.RUnlock()
  33. msg := &tReadReq{
  34. topic: topic,
  35. binMsg: binMsg,
  36. }
  37. dictHook := sf.dictTopicHook[msg.topic]
  38. if dictHook == nil {
  39. return
  40. }
  41. dictHook.Read(msg.binMsg)
  42. }
  43. // Subscribe -- подписывает обработчик на топик.
  44. func (sf *dictTopicSub) Subscribe(handler mKs.IBusHandlerSubscribe) {
  45. sf.Lock()
  46. defer sf.Unlock()
  47. mL0.Hassert(handler != nil, "dictTopicSub.Subscribe(): handler==nil")
  48. topic := handler.Topic()
  49. dictSubHook := sf.dictTopicHook[topic]
  50. if dictSubHook == nil {
  51. dictSubHook := dict_sub_hook.NewDictSubHook()
  52. sf.dictTopicHook[topic] = dictSubHook
  53. }
  54. dictSubHook.Subscribe(handler)
  55. }
  56. // Unsubscribe -- отписывает обработчик.
  57. func (sf *dictTopicSub) Unsubscribe(handler mKs.IBusHandlerSubscribe) {
  58. sf.Lock()
  59. defer sf.Unlock()
  60. mL0.Hassert(handler != nil, "dictTopicSub.Unsubscribe(): handler==nil")
  61. topic := handler.Topic()
  62. dictSubHook := sf.dictTopicHook[topic]
  63. if dictSubHook == nil {
  64. return
  65. }
  66. dictSubHook.Unsubscribe(handler)
  67. }