dict_topic_sub.go 2.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. // package dict_topic_sub -- потокобезопасный словарь подписчиков локальной шины.
  2. package dict_topic_sub
  3. import (
  4. "fmt"
  5. "sync"
  6. mL0 "gitp78su.ipnodns.ru/svi/kern/v4/lev0"
  7. mKa "gitp78su.ipnodns.ru/svi/kern/v4/lev0/alias"
  8. mKt "gitp78su.ipnodns.ru/svi/kern/v4/lev0/ktypes"
  9. "gitp78su.ipnodns.ru/svi/kern/v4/lev2/kbus/dict_sub_hook"
  10. "gitp78su.ipnodns.ru/svi/kern/v4/lev2/kctx"
  11. )
  12. type tReadReq struct {
  13. topic *mKa.ATopic
  14. binMsg []byte
  15. }
  16. // dictTopicSub -- потокобезопасный словарь подписчиков.
  17. type dictTopicSub struct {
  18. sync.RWMutex
  19. kCtx mKt.IKernelCtx
  20. dictTopicHook map[*mKa.ATopic]mKt.IDictSubHook
  21. }
  22. // NewDictTopicSub -- возвращает потокобезопасный словарь подписчиков.
  23. func NewDictTopicSub() *mL0.Result[*dictTopicSub] {
  24. resCtx := kctx.GetKernelCtx()
  25. if resCtx.IsErr() {
  26. err := fmt.Errorf("NewDictTopicSub(): kCtx==nil")
  27. return mL0.NewErr[*dictTopicSub](err)
  28. }
  29. kCtx := resCtx.Val()
  30. sf := &dictTopicSub{
  31. kCtx: kCtx,
  32. dictTopicHook: map[*mKa.ATopic]mKt.IDictSubHook{},
  33. }
  34. return mL0.NewRes(sf)
  35. }
  36. // Read -- вызывает обработчики при поступлении события.
  37. func (sf *dictTopicSub) Read(topic *mKa.ATopic, binMsg []byte) {
  38. sf.RLock()
  39. defer sf.RUnlock()
  40. msg := &tReadReq{
  41. topic: topic,
  42. binMsg: binMsg,
  43. }
  44. dictHook := sf.dictTopicHook[msg.topic]
  45. if dictHook == nil {
  46. return
  47. }
  48. dictHook.Read(msg.binMsg)
  49. }
  50. // Subscribe -- подписывает обработчик на топик.
  51. func (sf *dictTopicSub) Subscribe(handler mKt.IBusHandlerSubscribe) *mL0.Result[bool] {
  52. sf.Lock()
  53. defer sf.Unlock()
  54. if handler == nil {
  55. err := fmt.Errorf("dictTopicSub.Subscribe(): handler==nil")
  56. return mL0.NewErr[bool](err)
  57. }
  58. topic := handler.Topic()
  59. dictSubHook := sf.dictTopicHook[topic]
  60. if dictSubHook == nil {
  61. resNewDictTopicSub := dict_sub_hook.NewDictSubHook()
  62. if resNewDictTopicSub.IsErr() {
  63. err := fmt.Errorf("dictTopicSub.Subscribe(): NewDictSubHook(), err=\n\t%w", resNewDictTopicSub.Err())
  64. return mL0.NewErr[bool](err)
  65. }
  66. dictSubHook = resNewDictTopicSub.Val()
  67. sf.dictTopicHook[topic] = dictSubHook
  68. }
  69. res := dictSubHook.Subscribe(handler)
  70. if res.IsErr() {
  71. err := fmt.Errorf("dictTopicSub.Subscribe(): in add subscribe dict handler, err=\n\t%w", res.Err())
  72. return mL0.NewErr[bool](err)
  73. }
  74. return mL0.NewRes(true)
  75. }
  76. // Unsubscribe -- отписывает обработчик.
  77. func (sf *dictTopicSub) Unsubscribe(handler mKt.IBusHandlerSubscribe) {
  78. sf.Lock()
  79. defer sf.Unlock()
  80. if handler == nil {
  81. return
  82. }
  83. topic := handler.Topic()
  84. dictSubHook := sf.dictTopicHook[topic]
  85. if dictSubHook == nil {
  86. return
  87. }
  88. dictSubHook.Unsubscribe(handler)
  89. }