| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798 |
- // package dict_topic_serve -- словарь топиков обработчиков запросов
- package dict_topic_serve
- import (
- "context"
- "fmt"
- "sync"
- "time"
- . "gitp78su.ipnodns.ru/svi/kern/kc/helpers"
- . "gitp78su.ipnodns.ru/svi/kern/krn/kalias"
- "gitp78su.ipnodns.ru/svi/kern/krn/kctx"
- . "gitp78su.ipnodns.ru/svi/kern/krn/ktypes"
- )
- // dictServe -- потокобезопасный словарь обработчиков запросов
- //
- // Допускается только один обработчик запросов на один топик
- type dictServe struct {
- sync.RWMutex
- ctx IKernelCtx
- dictServe map[ATopic]IBusHandlerServe
- }
- // NewDictServe -- возвращает потокобезопасный словарь обработчиков запросов
- func NewDictServe() IDictTopicServe {
- sf := &dictServe{
- ctx: kctx.GetKernelCtx(),
- dictServe: map[ATopic]IBusHandlerServe{},
- }
- return sf
- }
- // Register -- регистрирует обработчик запросов
- func (sf *dictServe) Register(handler IBusHandlerServe) {
- sf.Lock()
- defer sf.Unlock()
- Hassert(handler != nil, "dictServe.Register(): IBusHandlerSubscribe==nil")
- topic := handler.Topic()
- Hassert(topic != "", "dictServe.Register(): empty topic of handler")
- isTwinRegister := sf.register(handler)
- Hassert(!isTwinRegister, "dictServe.Register(): handler of topic (%v) already register", handler.Topic())
- }
- // Unregister -- удаляет обработчик запросов из словаря
- func (sf *dictServe) Unregister(handler IBusHandlerServe) {
- sf.Lock()
- defer sf.Unlock()
- Hassert(handler != nil, "dictServe.Unregister(): IBusHandlerSubscribe==nil")
- delete(sf.dictServe, handler.Topic())
- }
- // SendRequest -- вызывает обработчик при поступлении запроса
- func (sf *dictServe) SendRequest(topic ATopic, binReq []byte) ([]byte, error) {
- sf.RLock()
- defer sf.RUnlock()
- handler, isOk := sf.dictServe[topic]
- if !isOk {
- return nil, fmt.Errorf("dictServe.SendRequest(): handler for topic (%v) not exists", topic)
- }
- var (
- chErr = make(chan error, 2)
- binRes []byte
- )
- ctx, fnCancel := context.WithTimeout(sf.ctx.BaseCtx(), time.Millisecond*time.Duration(TimeoutDefault))
- defer fnCancel()
- fnCall := func() {
- defer close(chErr)
- var err error
- binRes, err = handler.FnBack(binReq)
- if err != nil {
- chErr <- err
- }
- }
- go fnCall()
- select {
- case <-ctx.Done():
- return nil, fmt.Errorf("dictServe.SendRequest(): in call for topic (%v), err=\n\t%w", topic, ctx.Err())
- case err := <-chErr:
- if err != nil {
- return nil, fmt.Errorf("dictServe.SendRequest(): error in call for topic (%v), err=\n\t%w", topic, err)
- }
- }
- return binRes, nil
- }
- var TimeoutDefault = 15000
- // регистрирует обработчик запросов
- func (sf *dictServe) register(handler IBusHandlerServe) bool {
- topic := handler.Topic()
- _, isOk := sf.dictServe[topic]
- if isOk {
- return true
- }
- sf.dictServe[topic] = handler
- return false
- }
|