// package dict_topic_sub -- потокобезопасный словарь подписчиков локальной шины. package dict_topic_sub import ( "sync" mL0 "gitp78su.ipnodns.ru/svi/kern/v4/lev0" "gitp78su.ipnodns.ru/svi/kern/v4/lev0/defs/topic" mKs "gitp78su.ipnodns.ru/svi/kern/v4/lev0/kspec" "gitp78su.ipnodns.ru/svi/kern/v4/lev2/kbus/dict_sub_hook" "gitp78su.ipnodns.ru/svi/kern/v4/lev2/kctx" ) type tReadReq struct { topic *topic.ATopic binMsg []byte } // dictTopicSub -- потокобезопасный словарь подписчиков. type dictTopicSub struct { sync.RWMutex kCtx mKs.IKernelCtx dictTopicHook map[*topic.ATopic]mKs.IDictSubHook } // NewDictTopicSub -- возвращает потокобезопасный словарь подписчиков. func NewDictTopicSub() *dictTopicSub { sf := &dictTopicSub{ kCtx: kctx.GetKernelCtx(), dictTopicHook: map[*topic.ATopic]mKs.IDictSubHook{}, } return sf } // Read -- вызывает обработчики при поступлении события. func (sf *dictTopicSub) Read(topic *topic.ATopic, binMsg []byte) { sf.RLock() defer sf.RUnlock() msg := &tReadReq{ topic: topic, binMsg: binMsg, } dictHook := sf.dictTopicHook[msg.topic] if dictHook == nil { return } dictHook.Read(msg.binMsg) } // Subscribe -- подписывает обработчик на топик. func (sf *dictTopicSub) Subscribe(handler mKs.IBusHandlerSubscribe) { sf.Lock() defer sf.Unlock() mL0.Hassert(handler != nil, "dictTopicSub.Subscribe(): handler==nil") topic := handler.Topic() dictSubHook := sf.dictTopicHook[topic] if dictSubHook == nil { dictSubHook := dict_sub_hook.NewDictSubHook() sf.dictTopicHook[topic] = dictSubHook } dictSubHook.Subscribe(handler) } // Unsubscribe -- отписывает обработчик. func (sf *dictTopicSub) Unsubscribe(handler mKs.IBusHandlerSubscribe) { sf.Lock() defer sf.Unlock() mL0.Hassert(handler != nil, "dictTopicSub.Unsubscribe(): handler==nil") topic := handler.Topic() dictSubHook := sf.dictTopicHook[topic] if dictSubHook == nil { return } dictSubHook.Unsubscribe(handler) }