| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697 |
- // package dict_topic_sub -- потокобезопасный словарь подписчиков локальной шины.
- package dict_topic_sub
- import (
- "fmt"
- "sync"
- mL0 "gitp78su.ipnodns.ru/svi/kern/v4/lev0"
- mKa "gitp78su.ipnodns.ru/svi/kern/v4/lev0/alias"
- mKt "gitp78su.ipnodns.ru/svi/kern/v4/lev0/ktypes"
- "gitp78su.ipnodns.ru/svi/kern/v4/lev2/kbus/dict_sub_hook"
- "gitp78su.ipnodns.ru/svi/kern/v4/lev2/kctx"
- )
- type tReadReq struct {
- topic *mKa.ATopic
- binMsg []byte
- }
- // dictTopicSub -- потокобезопасный словарь подписчиков.
- type dictTopicSub struct {
- sync.RWMutex
- kCtx mKt.IKernelCtx
- dictTopicHook map[*mKa.ATopic]mKt.IDictSubHook
- }
- // NewDictTopicSub -- возвращает потокобезопасный словарь подписчиков.
- func NewDictTopicSub() *mL0.Result[*dictTopicSub] {
- resCtx := kctx.GetKernelCtx()
- if resCtx.IsErr() {
- err := fmt.Errorf("NewDictTopicSub(): kCtx==nil")
- return mL0.NewErr[*dictTopicSub](err)
- }
- kCtx := resCtx.Val()
- sf := &dictTopicSub{
- kCtx: kCtx,
- dictTopicHook: map[*mKa.ATopic]mKt.IDictSubHook{},
- }
- return mL0.NewRes(sf)
- }
- // Read -- вызывает обработчики при поступлении события.
- func (sf *dictTopicSub) Read(topic *mKa.ATopic, binMsg []byte) {
- sf.RLock()
- defer sf.RUnlock()
- msg := &tReadReq{
- topic: topic,
- binMsg: binMsg,
- }
- dictHook := sf.dictTopicHook[msg.topic]
- if dictHook == nil {
- return
- }
- dictHook.Read(msg.binMsg)
- }
- // Subscribe -- подписывает обработчик на топик.
- func (sf *dictTopicSub) Subscribe(handler mKt.IBusHandlerSubscribe) *mL0.Result[bool] {
- sf.Lock()
- defer sf.Unlock()
- if handler == nil {
- err := fmt.Errorf("dictTopicSub.Subscribe(): handler==nil")
- return mL0.NewErr[bool](err)
- }
- topic := handler.Topic()
- dictSubHook := sf.dictTopicHook[topic]
- if dictSubHook == nil {
- resNewDictTopicSub := dict_sub_hook.NewDictSubHook()
- if resNewDictTopicSub.IsErr() {
- err := fmt.Errorf("dictTopicSub.Subscribe(): NewDictSubHook(), err=\n\t%w", resNewDictTopicSub.Err())
- return mL0.NewErr[bool](err)
- }
- dictSubHook = resNewDictTopicSub.Val()
- sf.dictTopicHook[topic] = dictSubHook
- }
- res := dictSubHook.Subscribe(handler)
- if res.IsErr() {
- err := fmt.Errorf("dictTopicSub.Subscribe(): in add subscribe dict handler, err=\n\t%w", res.Err())
- return mL0.NewErr[bool](err)
- }
- return mL0.NewRes(true)
- }
- // Unsubscribe -- отписывает обработчик.
- func (sf *dictTopicSub) Unsubscribe(handler mKt.IBusHandlerSubscribe) {
- sf.Lock()
- defer sf.Unlock()
- if handler == nil {
- return
- }
- topic := handler.Topic()
- dictSubHook := sf.dictTopicHook[topic]
- if dictSubHook == nil {
- return
- }
- dictSubHook.Unsubscribe(handler)
- }
|