| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143 |
- // package dict_topic_serve -- словарь топиков обработчиков запросов
- package dict_topic_serve
- import (
- "context"
- "fmt"
- "time"
- . "gitp78su.ipnodns.ru/svi/kern/kc/helpers"
- . "gitp78su.ipnodns.ru/svi/kern/krn/kalias"
- "gitp78su.ipnodns.ru/svi/kern/krn/kbus/kbus_msg/msg_serve"
- "gitp78su.ipnodns.ru/svi/kern/krn/kctx"
- . "gitp78su.ipnodns.ru/svi/kern/krn/ktypes"
- )
- // dictServe -- потокобезопасный словарь обработчиков запросов
- //
- // Допускается только один обработчик запросов на один топик
- type dictServe struct {
- ctx IKernelCtx
- chUnregisterIn chan IBusHandlerServe
- dictServe map[ATopic]IBusHandlerServe
- chSendRequestIn chan *msg_serve.ServeReq
- chSendRequestOut chan *serveResp
- chRegisterIn chan IBusHandlerServe
- chRegisterOut chan bool
- }
- // NewDictServe -- возвращает потокобезопасный словарь обработчиков запросов
- func NewDictServe() IDictTopicServe {
- sf := &dictServe{
- ctx: kctx.GetKernelCtx(),
- chUnregisterIn: make(chan IBusHandlerServe, 5),
- dictServe: map[ATopic]IBusHandlerServe{},
- chSendRequestIn: make(chan *msg_serve.ServeReq, 5),
- chSendRequestOut: make(chan *serveResp, 5),
- chRegisterIn: make(chan IBusHandlerServe, 5),
- chRegisterOut: make(chan bool, 5),
- }
- go sf.run()
- return sf
- }
- // Register -- регистрирует обработчик запросов
- func (sf *dictServe) Register(handler IBusHandlerServe) {
- Hassert(handler != nil, "dictServe.Register(): IBusHandlerSubscribe==nil")
- topic := handler.Topic()
- Hassert(topic != "", "dictServe.Register(): empty topic of handler")
- sf.chRegisterIn <- handler
- isTwinRegister := <-sf.chRegisterOut
- Hassert(!isTwinRegister, "dictServe.Register(): handler of topic (%v) already register", handler.Topic())
- }
- // Unregister -- удаляет обработчик запросов из словаря
- func (sf *dictServe) Unregister(handler IBusHandlerServe) {
- Hassert(handler != nil, "dictServe.Unregister(): IBusHandlerSubscribe==nil")
- sf.chUnregisterIn <- handler
- }
- type serveResp struct {
- binResp []byte
- err error
- }
- // SendRequest -- вызывает обработчик при поступлении запроса
- func (sf *dictServe) SendRequest(topic ATopic, binReq []byte) ([]byte, error) {
- req := &msg_serve.ServeReq{
- Topic_: topic,
- BinReq_: binReq,
- }
- sf.chSendRequestIn <- req
- resp := <-sf.chSendRequestOut
- return resp.binResp, resp.err
- }
- func (sf *dictServe) run() {
- for {
- select {
- case handler := <-sf.chUnregisterIn:
- delete(sf.dictServe, handler.Topic())
- case reqServe := <-sf.chSendRequestIn:
- binResp, err := sf.sendRequest(reqServe)
- resp := &serveResp{
- err: err,
- binResp: binResp,
- }
- sf.chSendRequestOut <- resp
- case handler := <-sf.chRegisterIn:
- sf.chRegisterOut <- sf.register(handler)
- }
- }
- }
- var TimeoutDefault = 15000
- // вызывает обработчик при поступлении запроса
- func (sf *dictServe) sendRequest(req *msg_serve.ServeReq) ([]byte, error) {
- handler, isOk := sf.dictServe[req.Topic_]
- if !isOk {
- return nil, fmt.Errorf("dictServe.sendRequest(): handler for topic (%v) not exists", req.Topic_)
- }
- var (
- chErr = make(chan error, 2)
- binRes []byte
- )
- ctx, fnCancel := context.WithTimeout(sf.ctx.BaseCtx(), time.Millisecond*time.Duration(TimeoutDefault))
- defer fnCancel()
- fnCall := func() {
- defer close(chErr)
- var err error
- binRes, err = handler.FnBack(req.BinReq_)
- if err != nil {
- chErr <- err
- }
- }
- go fnCall()
- select {
- case <-ctx.Done():
- return nil, fmt.Errorf("dictServe.sendRequest(): in call for topic (%v), err=\n\t%w", req.Topic_, ctx.Err())
- case err := <-chErr:
- if err != nil {
- return nil, fmt.Errorf("dictServe.sendRequest(): error in call for topic (%v), err=\n\t%w", req.Topic_, err)
- }
- }
- return binRes, nil
- }
- // регистрирует обработчик запросов
- func (sf *dictServe) register(handler IBusHandlerServe) bool {
- topic := handler.Topic()
- _, isOk := sf.dictServe[topic]
- if isOk {
- return true
- }
- sf.dictServe[topic] = handler
- return false
- }
|