| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192 |
- // package dict_topic_sub -- потокобезопасный словарь подписчиков локальной шины
- package dict_topic_sub
- import (
- "fmt"
- "sync"
- "gitp78su.ipnodns.ru/svi/kern/v4/krn/kbus/dict_sub_hook"
- "gitp78su.ipnodns.ru/svi/kern/v4/krn/kctx"
- . "gitp78su.ipnodns.ru/svi/kern/v4/lev0/kalias"
- . "gitp78su.ipnodns.ru/svi/kern/v4/lev0/ktypes"
- . "gitp78su.ipnodns.ru/svi/kern/v4/lev1/helpers"
- . "gitp78su.ipnodns.ru/svi/kern/v4/lev1/result"
- )
- type tReadReq struct {
- topic ATopic
- binMsg []byte
- }
- // dictTopicSub -- потокобезопасный словарь подписчиков
- type dictTopicSub struct {
- sync.RWMutex
- ctx IKernelCtx
- dictTopicHook map[ATopic]IDictSubHook
- }
- // NewDictTopicSub -- возвращает потокобезопасный словарь подписчиков
- func NewDictTopicSub() IDictTopicSub {
- sf := &dictTopicSub{
- ctx: kctx.GetKernelCtx(),
- dictTopicHook: map[ATopic]IDictSubHook{},
- }
- return sf
- }
- // Read -- вызывает обработчики при поступлении события
- func (sf *dictTopicSub) Read(topic ATopic, binMsg []byte) {
- sf.RLock()
- defer sf.RUnlock()
- Hassert(topic != "", "dictTopicSub.Read(): topic is empty")
- msg := &tReadReq{
- topic: topic,
- binMsg: binMsg,
- }
- dictHook := sf.dictTopicHook[msg.topic]
- if dictHook == nil {
- return
- }
- dictHook.Read(msg.binMsg)
- }
- // Subscribe -- подписывает обработчик на топик
- func (sf *dictTopicSub) Subscribe(handler IBusHandlerSubscribe) IResult[bool] {
- sf.Lock()
- defer sf.Unlock()
- if handler == nil {
- err := fmt.Errorf("dictTopicSub.Subscribe(): handler==nil")
- return NewErr[bool](err)
- }
- topic := handler.Topic()
- if topic == "" {
- err := fmt.Errorf("dictTopicSub.Subscribe(): topic==\"\"")
- return NewErr[bool](err)
- }
- dictSubHook := sf.dictTopicHook[topic]
- if dictSubHook == nil {
- dictSubHook = dict_sub_hook.NewDictSubHook()
- sf.dictTopicHook[topic] = dictSubHook
- }
- res := dictSubHook.Subscribe(handler)
- if res.IsErr() {
- err := fmt.Errorf("dictTopicSub.Subscribe(): in add subscribe dict handler, err=\n\t%w", res.Err())
- return NewErr[bool](err)
- }
- return NewRes(true)
- }
- // Unsubscribe -- отписывает обработчик
- func (sf *dictTopicSub) Unsubscribe(handler IBusHandlerSubscribe) {
- sf.Lock()
- defer sf.Unlock()
- if handler == nil {
- return
- }
- topic := handler.Topic()
- dictSubHook := sf.dictTopicHook[topic]
- if dictSubHook == nil {
- return
- }
- dictSubHook.Unsubscribe(handler)
- }
|