// package dict_topic_serve -- словарь топиков обработчиков запросов. package dict_topic_serve import ( "context" "fmt" "sync" "time" mL0 "gitp78su.ipnodns.ru/svi/kern/v4/lev0" "gitp78su.ipnodns.ru/svi/kern/v4/lev0/defs/topic" "gitp78su.ipnodns.ru/svi/kern/v4/lev0/etypes/ebool" "gitp78su.ipnodns.ru/svi/kern/v4/lev0/kspec" mKs "gitp78su.ipnodns.ru/svi/kern/v4/lev0/kspec" "gitp78su.ipnodns.ru/svi/kern/v4/lev2/kctx" ) // dictServe -- потокобезопасный словарь обработчиков запросов. // // Допускается только один обработчик запросов на один топик. type dictServe struct { sync.RWMutex kCtx mKs.IKernelCtx dictServe map[*topic.ATopic]mKs.IBusHandlerServe } // NewDictServe -- возвращает потокобезопасный словарь обработчиков запросов. func NewDictServe() *dictServe { sf := &dictServe{ kCtx: kctx.GetKernelCtx(), dictServe: map[*topic.ATopic]mKs.IBusHandlerServe{}, } return sf } // Register -- регистрирует обработчик запросов. func (sf *dictServe) Register(handler mKs.IBusHandlerServe) mL0.IResult[kspec.EBool] { sf.Lock() defer sf.Unlock() if handler == nil { err := fmt.Errorf("dictServe.Register(): IBusHandlerSubscribe==nil") return mL0.NewErr[kspec.EBool](err) } topic := handler.Topic() isTwinRegister := sf.register(handler) if isTwinRegister { err := fmt.Errorf("dictServe.Register(): handler of topic (%v) already register", topic) return mL0.NewErr[kspec.EBool](err) } return mL0.NewOk(ebool.NewEBool(true)) } // Unregister -- удаляет обработчик запросов из словаря. func (sf *dictServe) Unregister(handler mKs.IBusHandlerServe) { sf.Lock() defer sf.Unlock() if handler == nil { sf.kCtx.Log().Err("dictServe.Unregister(): IBusHandlerSubscribe==nil") return } delete(sf.dictServe, handler.Topic()) } // SendRequest -- вызывает обработчик при поступлении запроса. func (sf *dictServe) SendRequest(topic *topic.ATopic, binReq []byte) mL0.IResult[[]byte] { sf.RLock() defer sf.RUnlock() handler, isOk := sf.dictServe[topic] if !isOk { err := fmt.Errorf("dictServe.SendRequest(): handler for topic (%v) not exists", topic) return mL0.NewErr[[]byte](err) } var ( chRes = make(chan mL0.IResult[[]byte], 2) ) ctx, fnCancel := context.WithTimeout(sf.kCtx.Ctx(), time.Millisecond*time.Duration(TimeoutDefault)) defer fnCancel() fnCall := func() { defer close(chRes) res := handler.FnBack(binReq) chRes <- res } go fnCall() select { case <-ctx.Done(): err := fmt.Errorf("dictServe.SendRequest(): in call for topic (%v), err=\n\t%w", topic, ctx.Err()) return mL0.NewErr[[]byte](err) case res := <-chRes: return res } } var TimeoutDefault = 15000 // регистрирует обработчик запросов. func (sf *dictServe) register(handler mKs.IBusHandlerServe) bool { topic := handler.Topic() _, isOk := sf.dictServe[topic] if isOk { return true } sf.dictServe[topic] = handler return false }