| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105 |
- // package dict_topic_sub -- потокобезопасный словарь подписчиков локальной шины.
- package dict_topic_sub
- import (
- "fmt"
- "sync"
- mKa "gitp78su.ipnodns.ru/svi/kern/v4/lev0/kalias"
- mKt "gitp78su.ipnodns.ru/svi/kern/v4/lev0/ktypes"
- mL1 "gitp78su.ipnodns.ru/svi/kern/v4/lev1"
- "gitp78su.ipnodns.ru/svi/kern/v4/lev2/kbus/dict_sub_hook"
- "gitp78su.ipnodns.ru/svi/kern/v4/lev2/kctx"
- )
- type tReadReq struct {
- topic mKa.ATopic
- binMsg []byte
- }
- // dictTopicSub -- потокобезопасный словарь подписчиков.
- type dictTopicSub struct {
- sync.RWMutex
- kCtx mKt.IKernelCtx
- dictTopicHook map[mKa.ATopic]mKt.IDictSubHook
- }
- // NewDictTopicSub -- возвращает потокобезопасный словарь подписчиков.
- func NewDictTopicSub() mKt.IResult[*dictTopicSub] {
- resCtx := kctx.GetKernelCtx()
- if resCtx.IsErr() {
- err := fmt.Errorf("NewDictTopicSub(): kCtx==nil")
- return mL1.NewErr[*dictTopicSub](err)
- }
- kCtx := resCtx.Val()
- sf := &dictTopicSub{
- kCtx: kCtx,
- dictTopicHook: map[mKa.ATopic]mKt.IDictSubHook{},
- }
- return mL1.NewRes(sf)
- }
- // Read -- вызывает обработчики при поступлении события.
- func (sf *dictTopicSub) Read(topic mKa.ATopic, binMsg []byte) {
- sf.RLock()
- defer sf.RUnlock()
- if topic == "" {
- sf.kCtx.Log().Err("dictTopicSub.Read(): topic is empty")
- return
- }
- msg := &tReadReq{
- topic: topic,
- binMsg: binMsg,
- }
- dictHook := sf.dictTopicHook[msg.topic]
- if dictHook == nil {
- return
- }
- dictHook.Read(msg.binMsg)
- }
- // Subscribe -- подписывает обработчик на топик.
- func (sf *dictTopicSub) Subscribe(handler mKt.IBusHandlerSubscribe) mKt.IResult[bool] {
- sf.Lock()
- defer sf.Unlock()
- if handler == nil {
- err := fmt.Errorf("dictTopicSub.Subscribe(): handler==nil")
- return mL1.NewErr[bool](err)
- }
- topic := handler.Topic()
- if topic == "" {
- err := fmt.Errorf("dictTopicSub.Subscribe(): topic==\"\"")
- return mL1.NewErr[bool](err)
- }
- dictSubHook := sf.dictTopicHook[topic]
- if dictSubHook == nil {
- resNewDictTopicSub := dict_sub_hook.NewDictSubHook()
- if resNewDictTopicSub.IsErr() {
- err := fmt.Errorf("dictTopicSub.Subscribe(): NewDictSubHook(), err=\n\t%w", resNewDictTopicSub.Err())
- return mL1.NewErr[bool](err)
- }
- dictSubHook = resNewDictTopicSub.Val()
- sf.dictTopicHook[topic] = dictSubHook
- }
- res := dictSubHook.Subscribe(handler)
- if res.IsErr() {
- err := fmt.Errorf("dictTopicSub.Subscribe(): in add subscribe dict handler, err=\n\t%w", res.Err())
- return mL1.NewErr[bool](err)
- }
- return mL1.NewRes(true)
- }
- // Unsubscribe -- отписывает обработчик.
- func (sf *dictTopicSub) Unsubscribe(handler mKt.IBusHandlerSubscribe) {
- sf.Lock()
- defer sf.Unlock()
- if handler == nil {
- return
- }
- topic := handler.Topic()
- dictSubHook := sf.dictTopicHook[topic]
- if dictSubHook == nil {
- return
- }
- dictSubHook.Unsubscribe(handler)
- }
|