| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108 |
- // package dict_topic_serve -- словарь топиков обработчиков запросов
- package dict_topic_serve
- import (
- "context"
- "fmt"
- "sync"
- "time"
- . "gitp78su.ipnodns.ru/svi/kern/v4/lev0/kalias"
- . "gitp78su.ipnodns.ru/svi/kern/v4/lev0/ktypes"
- . "gitp78su.ipnodns.ru/svi/kern/v4/lev1"
- "gitp78su.ipnodns.ru/svi/kern/v4/lev2/kctx"
- )
- // dictServe -- потокобезопасный словарь обработчиков запросов
- //
- // Допускается только один обработчик запросов на один топик
- type dictServe struct {
- sync.RWMutex
- kCtx IKernelCtx
- dictServe map[ATopic]IBusHandlerServe
- }
- // NewDictServe -- возвращает потокобезопасный словарь обработчиков запросов
- func NewDictServe() IResult[*dictServe] {
- resKernCtx := kctx.GetKernelCtx()
- if resKernCtx.IsErr() {
- err := fmt.Errorf("NewDictServe(): in get kernel ctx, err=\n\t%w", resKernCtx.Err())
- return NewErr[*dictServe](err)
- }
- sf := &dictServe{
- kCtx: resKernCtx.Val(),
- dictServe: map[ATopic]IBusHandlerServe{},
- }
- return NewRes(sf)
- }
- // Register -- регистрирует обработчик запросов
- func (sf *dictServe) Register(handler IBusHandlerServe) IResult[bool] {
- sf.Lock()
- defer sf.Unlock()
- if handler == nil {
- return NewErr[bool](fmt.Errorf("dictServe.Register(): IBusHandlerSubscribe==nil"))
- }
- topic := handler.Topic()
- if topic == "" {
- return NewErr[bool](fmt.Errorf("dictServe.Register(): empty topic of handler"))
- }
- isTwinRegister := sf.register(handler)
- if isTwinRegister {
- return NewErr[bool](fmt.Errorf("dictServe.Register(): handler of topic (%v) already register", topic))
- }
- return NewRes(true)
- }
- // Unregister -- удаляет обработчик запросов из словаря
- func (sf *dictServe) Unregister(handler IBusHandlerServe) {
- sf.Lock()
- defer sf.Unlock()
- if handler == nil {
- sf.kCtx.Log().Err("dictServe.Unregister(): IBusHandlerSubscribe==nil")
- return
- }
- delete(sf.dictServe, handler.Topic())
- }
- // SendRequest -- вызывает обработчик при поступлении запроса
- func (sf *dictServe) SendRequest(topic ATopic, binReq []byte) IResult[[]byte] {
- sf.RLock()
- defer sf.RUnlock()
- handler, isOk := sf.dictServe[topic]
- if !isOk {
- err := fmt.Errorf("dictServe.SendRequest(): handler for topic (%v) not exists", topic)
- return NewErr[[]byte](err)
- }
- var (
- chRes = make(chan IResult[[]byte], 2)
- )
- ctx, fnCancel := context.WithTimeout(sf.kCtx.Ctx(), time.Millisecond*time.Duration(TimeoutDefault))
- defer fnCancel()
- fnCall := func() {
- defer close(chRes)
- res := handler.FnBack(binReq)
- chRes <- res
- }
- go fnCall()
- select {
- case <-ctx.Done():
- err := fmt.Errorf("dictServe.SendRequest(): in call for topic (%v), err=\n\t%w", topic, ctx.Err())
- return NewErr[[]byte](err)
- case res := <-chRes:
- return res
- }
- }
- var TimeoutDefault = 15000
- // регистрирует обработчик запросов
- func (sf *dictServe) register(handler IBusHandlerServe) bool {
- topic := handler.Topic()
- _, isOk := sf.dictServe[topic]
- if isOk {
- return true
- }
- sf.dictServe[topic] = handler
- return false
- }
|