|
|
@@ -4,11 +4,11 @@ package dict_topic_serve
|
|
|
import (
|
|
|
"context"
|
|
|
"fmt"
|
|
|
+ "sync"
|
|
|
"time"
|
|
|
|
|
|
. "gitp78su.ipnodns.ru/svi/kern/kc/helpers"
|
|
|
. "gitp78su.ipnodns.ru/svi/kern/krn/kalias"
|
|
|
- "gitp78su.ipnodns.ru/svi/kern/krn/kbus/kbus_msg/msg_serve"
|
|
|
"gitp78su.ipnodns.ru/svi/kern/krn/kctx"
|
|
|
. "gitp78su.ipnodns.ru/svi/kern/krn/ktypes"
|
|
|
)
|
|
|
@@ -17,93 +17,46 @@ import (
|
|
|
//
|
|
|
// Допускается только один обработчик запросов на один топик
|
|
|
type dictServe struct {
|
|
|
- ctx IKernelCtx
|
|
|
-
|
|
|
- chUnregisterIn chan IBusHandlerServe
|
|
|
- dictServe map[ATopic]IBusHandlerServe
|
|
|
-
|
|
|
- chSendRequestIn chan *msg_serve.ServeReq
|
|
|
- chSendRequestOut chan *serveResp
|
|
|
-
|
|
|
- chRegisterIn chan IBusHandlerServe
|
|
|
- chRegisterOut chan bool
|
|
|
+ sync.RWMutex
|
|
|
+ ctx IKernelCtx
|
|
|
+ dictServe map[ATopic]IBusHandlerServe
|
|
|
}
|
|
|
|
|
|
// NewDictServe -- возвращает потокобезопасный словарь обработчиков запросов
|
|
|
func NewDictServe() IDictTopicServe {
|
|
|
sf := &dictServe{
|
|
|
- ctx: kctx.GetKernelCtx(),
|
|
|
-
|
|
|
- chUnregisterIn: make(chan IBusHandlerServe, 5),
|
|
|
- dictServe: map[ATopic]IBusHandlerServe{},
|
|
|
-
|
|
|
- chSendRequestIn: make(chan *msg_serve.ServeReq, 5),
|
|
|
- chSendRequestOut: make(chan *serveResp, 5),
|
|
|
-
|
|
|
- chRegisterIn: make(chan IBusHandlerServe, 5),
|
|
|
- chRegisterOut: make(chan bool, 5),
|
|
|
+ ctx: kctx.GetKernelCtx(),
|
|
|
+ dictServe: map[ATopic]IBusHandlerServe{},
|
|
|
}
|
|
|
- go sf.run()
|
|
|
return sf
|
|
|
}
|
|
|
|
|
|
// Register -- регистрирует обработчик запросов
|
|
|
func (sf *dictServe) Register(handler IBusHandlerServe) {
|
|
|
+ sf.Lock()
|
|
|
+ defer sf.Unlock()
|
|
|
Hassert(handler != nil, "dictServe.Register(): IBusHandlerSubscribe==nil")
|
|
|
topic := handler.Topic()
|
|
|
Hassert(topic != "", "dictServe.Register(): empty topic of handler")
|
|
|
- sf.chRegisterIn <- handler
|
|
|
- isTwinRegister := <-sf.chRegisterOut
|
|
|
+ isTwinRegister := sf.register(handler)
|
|
|
Hassert(!isTwinRegister, "dictServe.Register(): handler of topic (%v) already register", handler.Topic())
|
|
|
}
|
|
|
|
|
|
// Unregister -- удаляет обработчик запросов из словаря
|
|
|
func (sf *dictServe) Unregister(handler IBusHandlerServe) {
|
|
|
+ sf.Lock()
|
|
|
+ defer sf.Unlock()
|
|
|
Hassert(handler != nil, "dictServe.Unregister(): IBusHandlerSubscribe==nil")
|
|
|
- sf.chUnregisterIn <- handler
|
|
|
-}
|
|
|
-
|
|
|
-type serveResp struct {
|
|
|
- binResp []byte
|
|
|
- err error
|
|
|
+ delete(sf.dictServe, handler.Topic())
|
|
|
}
|
|
|
|
|
|
// SendRequest -- вызывает обработчик при поступлении запроса
|
|
|
func (sf *dictServe) SendRequest(topic ATopic, binReq []byte) ([]byte, error) {
|
|
|
- req := &msg_serve.ServeReq{
|
|
|
- Topic_: topic,
|
|
|
- BinReq_: binReq,
|
|
|
- }
|
|
|
- sf.chSendRequestIn <- req
|
|
|
- resp := <-sf.chSendRequestOut
|
|
|
- return resp.binResp, resp.err
|
|
|
-}
|
|
|
-
|
|
|
-func (sf *dictServe) run() {
|
|
|
- for {
|
|
|
- select {
|
|
|
- case handler := <-sf.chUnregisterIn:
|
|
|
- delete(sf.dictServe, handler.Topic())
|
|
|
- case reqServe := <-sf.chSendRequestIn:
|
|
|
- binResp, err := sf.sendRequest(reqServe)
|
|
|
- resp := &serveResp{
|
|
|
- err: err,
|
|
|
- binResp: binResp,
|
|
|
- }
|
|
|
- sf.chSendRequestOut <- resp
|
|
|
- case handler := <-sf.chRegisterIn:
|
|
|
- sf.chRegisterOut <- sf.register(handler)
|
|
|
- }
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-var TimeoutDefault = 15000
|
|
|
-
|
|
|
-// вызывает обработчик при поступлении запроса
|
|
|
-func (sf *dictServe) sendRequest(req *msg_serve.ServeReq) ([]byte, error) {
|
|
|
- handler, isOk := sf.dictServe[req.Topic_]
|
|
|
+ sf.RLock()
|
|
|
+ defer sf.RUnlock()
|
|
|
+ handler, isOk := sf.dictServe[topic]
|
|
|
if !isOk {
|
|
|
- return nil, fmt.Errorf("dictServe.sendRequest(): handler for topic (%v) not exists", req.Topic_)
|
|
|
+ return nil, fmt.Errorf("dictServe.SendRequest(): handler for topic (%v) not exists", topic)
|
|
|
}
|
|
|
var (
|
|
|
chErr = make(chan error, 2)
|
|
|
@@ -114,7 +67,7 @@ func (sf *dictServe) sendRequest(req *msg_serve.ServeReq) ([]byte, error) {
|
|
|
fnCall := func() {
|
|
|
defer close(chErr)
|
|
|
var err error
|
|
|
- binRes, err = handler.FnBack(req.BinReq_)
|
|
|
+ binRes, err = handler.FnBack(binReq)
|
|
|
if err != nil {
|
|
|
chErr <- err
|
|
|
}
|
|
|
@@ -122,15 +75,17 @@ func (sf *dictServe) sendRequest(req *msg_serve.ServeReq) ([]byte, error) {
|
|
|
go fnCall()
|
|
|
select {
|
|
|
case <-ctx.Done():
|
|
|
- return nil, fmt.Errorf("dictServe.sendRequest(): in call for topic (%v), err=\n\t%w", req.Topic_, ctx.Err())
|
|
|
+ return nil, fmt.Errorf("dictServe.SendRequest(): in call for topic (%v), err=\n\t%w", topic, ctx.Err())
|
|
|
case err := <-chErr:
|
|
|
if err != nil {
|
|
|
- return nil, fmt.Errorf("dictServe.sendRequest(): error in call for topic (%v), err=\n\t%w", req.Topic_, err)
|
|
|
+ return nil, fmt.Errorf("dictServe.SendRequest(): error in call for topic (%v), err=\n\t%w", topic, err)
|
|
|
}
|
|
|
}
|
|
|
return binRes, nil
|
|
|
}
|
|
|
|
|
|
+var TimeoutDefault = 15000
|
|
|
+
|
|
|
// регистрирует обработчик запросов
|
|
|
func (sf *dictServe) register(handler IBusHandlerServe) bool {
|
|
|
topic := handler.Topic()
|