// package dict_topic_serve -- словарь топиков обработчиков запросов. package dict_topic_serve import ( "context" "fmt" "sync" "time" . "gitp78su.ipnodns.ru/svi/kern/v4/lev0/kalias" . "gitp78su.ipnodns.ru/svi/kern/v4/lev0/ktypes" . "gitp78su.ipnodns.ru/svi/kern/v4/lev1" "gitp78su.ipnodns.ru/svi/kern/v4/lev2/kctx" ) // dictServe -- потокобезопасный словарь обработчиков запросов. // // Допускается только один обработчик запросов на один топик. type dictServe struct { sync.RWMutex kCtx IKernelCtx dictServe map[ATopic]IBusHandlerServe } // NewDictServe -- возвращает потокобезопасный словарь обработчиков запросов. func NewDictServe() IResult[*dictServe] { resKernCtx := kctx.GetKernelCtx() if resKernCtx.IsErr() { err := fmt.Errorf("NewDictServe(): in get kernel ctx, err=\n\t%w", resKernCtx.Err()) return NewErr[*dictServe](err) } sf := &dictServe{ kCtx: resKernCtx.Val(), dictServe: map[ATopic]IBusHandlerServe{}, } return NewRes(sf) } // Register -- регистрирует обработчик запросов. func (sf *dictServe) Register(handler IBusHandlerServe) IResult[bool] { sf.Lock() defer sf.Unlock() if handler == nil { return NewErr[bool](fmt.Errorf("dictServe.Register(): IBusHandlerSubscribe==nil")) } topic := handler.Topic() if topic == "" { return NewErr[bool](fmt.Errorf("dictServe.Register(): empty topic of handler")) } isTwinRegister := sf.register(handler) if isTwinRegister { return NewErr[bool](fmt.Errorf("dictServe.Register(): handler of topic (%v) already register", topic)) } return NewRes(true) } // Unregister -- удаляет обработчик запросов из словаря. func (sf *dictServe) Unregister(handler IBusHandlerServe) { sf.Lock() defer sf.Unlock() if handler == nil { sf.kCtx.Log().Err("dictServe.Unregister(): IBusHandlerSubscribe==nil") return } delete(sf.dictServe, handler.Topic()) } // SendRequest -- вызывает обработчик при поступлении запроса. func (sf *dictServe) SendRequest(topic ATopic, binReq []byte) IResult[[]byte] { sf.RLock() defer sf.RUnlock() handler, isOk := sf.dictServe[topic] if !isOk { err := fmt.Errorf("dictServe.SendRequest(): handler for topic (%v) not exists", topic) return NewErr[[]byte](err) } var ( chRes = make(chan IResult[[]byte], 2) ) ctx, fnCancel := context.WithTimeout(sf.kCtx.Ctx(), time.Millisecond*time.Duration(TimeoutDefault)) defer fnCancel() fnCall := func() { defer close(chRes) res := handler.FnBack(binReq) chRes <- res } go fnCall() select { case <-ctx.Done(): err := fmt.Errorf("dictServe.SendRequest(): in call for topic (%v), err=\n\t%w", topic, ctx.Err()) return NewErr[[]byte](err) case res := <-chRes: return res } } var TimeoutDefault = 15000 // регистрирует обработчик запросов. func (sf *dictServe) register(handler IBusHandlerServe) bool { topic := handler.Topic() _, isOk := sf.dictServe[topic] if isOk { return true } sf.dictServe[topic] = handler return false }