dict_topic_sub.go 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475
  1. // package dict_topic_sub -- потокобезопасный словарь подписчиков локальной шины.
  2. package dict_topic_sub
  3. import (
  4. "sync"
  5. mL0 "gitp78su.ipnodns.ru/svi/kern/v4/lev0"
  6. mKa "gitp78su.ipnodns.ru/svi/kern/v4/lev0/alias"
  7. mKt "gitp78su.ipnodns.ru/svi/kern/v4/lev0/ktypes"
  8. "gitp78su.ipnodns.ru/svi/kern/v4/lev2/kbus/dict_sub_hook"
  9. "gitp78su.ipnodns.ru/svi/kern/v4/lev2/kctx"
  10. )
  11. type tReadReq struct {
  12. topic *mKa.ATopic
  13. binMsg []byte
  14. }
  15. // dictTopicSub -- потокобезопасный словарь подписчиков.
  16. type dictTopicSub struct {
  17. sync.RWMutex
  18. kCtx mKt.IKernelCtx
  19. dictTopicHook map[*mKa.ATopic]mKt.IDictSubHook
  20. }
  21. // NewDictTopicSub -- возвращает потокобезопасный словарь подписчиков.
  22. func NewDictTopicSub() *dictTopicSub {
  23. sf := &dictTopicSub{
  24. kCtx: kctx.GetKernelCtx(),
  25. dictTopicHook: map[*mKa.ATopic]mKt.IDictSubHook{},
  26. }
  27. return sf
  28. }
  29. // Read -- вызывает обработчики при поступлении события.
  30. func (sf *dictTopicSub) Read(topic *mKa.ATopic, binMsg []byte) {
  31. sf.RLock()
  32. defer sf.RUnlock()
  33. msg := &tReadReq{
  34. topic: topic,
  35. binMsg: binMsg,
  36. }
  37. dictHook := sf.dictTopicHook[msg.topic]
  38. if dictHook == nil {
  39. return
  40. }
  41. dictHook.Read(msg.binMsg)
  42. }
  43. // Subscribe -- подписывает обработчик на топик.
  44. func (sf *dictTopicSub) Subscribe(handler mKt.IBusHandlerSubscribe){
  45. sf.Lock()
  46. defer sf.Unlock()
  47. mL0.Hassert(handler!=nil,"dictTopicSub.Subscribe(): handler==nil")
  48. topic := handler.Topic()
  49. dictSubHook := sf.dictTopicHook[topic]
  50. if dictSubHook == nil {
  51. dictSubHook := dict_sub_hook.NewDictSubHook()
  52. sf.dictTopicHook[topic] = dictSubHook
  53. }
  54. dictSubHook.Subscribe(handler)
  55. }
  56. // Unsubscribe -- отписывает обработчик.
  57. func (sf *dictTopicSub) Unsubscribe(handler mKt.IBusHandlerSubscribe) {
  58. sf.Lock()
  59. defer sf.Unlock()
  60. mL0.Hassert(handler!=nil, "dictTopicSub.Unsubscribe(): handler==nil")
  61. topic := handler.Topic()
  62. dictSubHook := sf.dictTopicHook[topic]
  63. if dictSubHook == nil {
  64. return
  65. }
  66. dictSubHook.Unsubscribe(handler)
  67. }