| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100 |
- // package dict_topic_serve -- словарь топиков обработчиков запросов.
- package dict_topic_serve
- import (
- "context"
- "fmt"
- "sync"
- "time"
- mL0 "gitp78su.ipnodns.ru/svi/kern/v4/lev0"
- mKd "gitp78su.ipnodns.ru/svi/kern/v4/lev0/defs"
- mKs "gitp78su.ipnodns.ru/svi/kern/v4/lev0/kspec"
- "gitp78su.ipnodns.ru/svi/kern/v4/lev2/kctx"
- )
- // dictServe -- потокобезопасный словарь обработчиков запросов.
- //
- // Допускается только один обработчик запросов на один топик.
- type dictServe struct {
- sync.RWMutex
- kCtx mKs.IKernelCtx
- dictServe map[*mKd.Topic]mKs.IBusHandlerServe
- }
- // NewDictServe -- возвращает потокобезопасный словарь обработчиков запросов.
- func NewDictServe() *dictServe {
- sf := &dictServe{
- kCtx: kctx.GetKernelCtx(),
- dictServe: map[*mKd.Topic]mKs.IBusHandlerServe{},
- }
- return sf
- }
- // Register -- регистрирует обработчик запросов.
- func (sf *dictServe) Register(handler mKs.IBusHandlerServe) mL0.IResult[bool] {
- sf.Lock()
- defer sf.Unlock()
- if handler == nil {
- return mL0.NewErr[bool](fmt.Errorf("dictServe.Register(): IBusHandlerSubscribe==nil"))
- }
- topic := handler.Topic()
- isTwinRegister := sf.register(handler)
- if isTwinRegister {
- return mL0.NewErr[bool](fmt.Errorf("dictServe.Register(): handler of topic (%v) already register", topic))
- }
- return mL0.NewRes(true)
- }
- // Unregister -- удаляет обработчик запросов из словаря.
- func (sf *dictServe) Unregister(handler mKs.IBusHandlerServe) {
- sf.Lock()
- defer sf.Unlock()
- if handler == nil {
- sf.kCtx.Log().Err("dictServe.Unregister(): IBusHandlerSubscribe==nil")
- return
- }
- delete(sf.dictServe, handler.Topic())
- }
- // SendRequest -- вызывает обработчик при поступлении запроса.
- func (sf *dictServe) SendRequest(topic *mKd.Topic, binReq []byte) mL0.IResult[[]byte] {
- sf.RLock()
- defer sf.RUnlock()
- handler, isOk := sf.dictServe[topic]
- if !isOk {
- err := fmt.Errorf("dictServe.SendRequest(): handler for topic (%v) not exists", topic)
- return mL0.NewErr[[]byte](err)
- }
- var (
- chRes = make(chan mL0.IResult[[]byte], 2)
- )
- ctx, fnCancel := context.WithTimeout(sf.kCtx.Ctx(), time.Millisecond*time.Duration(TimeoutDefault))
- defer fnCancel()
- fnCall := func() {
- defer close(chRes)
- res := handler.FnBack(binReq)
- chRes <- res
- }
- go fnCall()
- select {
- case <-ctx.Done():
- err := fmt.Errorf("dictServe.SendRequest(): in call for topic (%v), err=\n\t%w", topic, ctx.Err())
- return mL0.NewErr[[]byte](err)
- case res := <-chRes:
- return res
- }
- }
- var TimeoutDefault = 15000
- // регистрирует обработчик запросов.
- func (sf *dictServe) register(handler mKs.IBusHandlerServe) bool {
- topic := handler.Topic()
- _, isOk := sf.dictServe[topic]
- if isOk {
- return true
- }
- sf.dictServe[topic] = handler
- return false
- }
|