// package dict_topic_sub -- потокобезопасный словарь подписчиков локальной шины. package dict_topic_sub import ( "fmt" "sync" mKa "gitp78su.ipnodns.ru/svi/kern/v4/lev0/kalias" mKt "gitp78su.ipnodns.ru/svi/kern/v4/lev0/ktypes" mL1 "gitp78su.ipnodns.ru/svi/kern/v4/lev1" "gitp78su.ipnodns.ru/svi/kern/v4/lev2/kbus/dict_sub_hook" "gitp78su.ipnodns.ru/svi/kern/v4/lev2/kctx" ) type tReadReq struct { topic mKa.ATopic binMsg []byte } // dictTopicSub -- потокобезопасный словарь подписчиков. type dictTopicSub struct { sync.RWMutex kCtx mKt.IKernelCtx dictTopicHook map[mKa.ATopic]mKt.IDictSubHook } // NewDictTopicSub -- возвращает потокобезопасный словарь подписчиков. func NewDictTopicSub() mKt.IResult[*dictTopicSub] { resCtx := kctx.GetKernelCtx() if resCtx.IsErr() { err := fmt.Errorf("NewDictTopicSub(): kCtx==nil") return mL1.NewErr[*dictTopicSub](err) } kCtx := resCtx.Val() sf := &dictTopicSub{ kCtx: kCtx, dictTopicHook: map[mKa.ATopic]mKt.IDictSubHook{}, } return mL1.NewRes(sf) } // Read -- вызывает обработчики при поступлении события. func (sf *dictTopicSub) Read(topic mKa.ATopic, binMsg []byte) { sf.RLock() defer sf.RUnlock() if topic == "" { sf.kCtx.Log().Err("dictTopicSub.Read(): topic is empty") return } msg := &tReadReq{ topic: topic, binMsg: binMsg, } dictHook := sf.dictTopicHook[msg.topic] if dictHook == nil { return } dictHook.Read(msg.binMsg) } // Subscribe -- подписывает обработчик на топик. func (sf *dictTopicSub) Subscribe(handler mKt.IBusHandlerSubscribe) mKt.IResult[bool] { sf.Lock() defer sf.Unlock() if handler == nil { err := fmt.Errorf("dictTopicSub.Subscribe(): handler==nil") return mL1.NewErr[bool](err) } topic := handler.Topic() if topic == "" { err := fmt.Errorf("dictTopicSub.Subscribe(): topic==\"\"") return mL1.NewErr[bool](err) } dictSubHook := sf.dictTopicHook[topic] if dictSubHook == nil { resNewDictTopicSub := dict_sub_hook.NewDictSubHook() if resNewDictTopicSub.IsErr() { err := fmt.Errorf("dictTopicSub.Subscribe(): NewDictSubHook(), err=\n\t%w", resNewDictTopicSub.Err()) return mL1.NewErr[bool](err) } dictSubHook = resNewDictTopicSub.Val() sf.dictTopicHook[topic] = dictSubHook } res := dictSubHook.Subscribe(handler) if res.IsErr() { err := fmt.Errorf("dictTopicSub.Subscribe(): in add subscribe dict handler, err=\n\t%w", res.Err()) return mL1.NewErr[bool](err) } return mL1.NewRes(true) } // Unsubscribe -- отписывает обработчик. func (sf *dictTopicSub) Unsubscribe(handler mKt.IBusHandlerSubscribe) { sf.Lock() defer sf.Unlock() if handler == nil { return } topic := handler.Topic() dictSubHook := sf.dictTopicHook[topic] if dictSubHook == nil { return } dictSubHook.Unsubscribe(handler) }