dict_topic_serve.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143
  1. // package dict_topic_serve -- словарь топиков обработчиков запросов
  2. package dict_topic_serve
  3. import (
  4. "context"
  5. "fmt"
  6. "time"
  7. . "gitp78su.ipnodns.ru/svi/kern/kc/helpers"
  8. . "gitp78su.ipnodns.ru/svi/kern/krn/kalias"
  9. "gitp78su.ipnodns.ru/svi/kern/krn/kbus/kbus_msg/msg_serve"
  10. "gitp78su.ipnodns.ru/svi/kern/krn/kctx"
  11. . "gitp78su.ipnodns.ru/svi/kern/krn/ktypes"
  12. )
  13. // dictServe -- потокобезопасный словарь обработчиков запросов
  14. //
  15. // Допускается только один обработчик запросов на один топик
  16. type dictServe struct {
  17. ctx IKernelCtx
  18. chUnregisterIn chan IBusHandlerServe
  19. dictServe map[ATopic]IBusHandlerServe
  20. chSendRequestIn chan *msg_serve.ServeReq
  21. chSendRequestOut chan *serveResp
  22. chRegisterIn chan IBusHandlerServe
  23. chRegisterOut chan bool
  24. }
  25. // NewDictServe -- возвращает потокобезопасный словарь обработчиков запросов
  26. func NewDictServe() IDictTopicServe {
  27. sf := &dictServe{
  28. ctx: kctx.GetKernelCtx(),
  29. chUnregisterIn: make(chan IBusHandlerServe, 5),
  30. dictServe: map[ATopic]IBusHandlerServe{},
  31. chSendRequestIn: make(chan *msg_serve.ServeReq, 5),
  32. chSendRequestOut: make(chan *serveResp, 5),
  33. chRegisterIn: make(chan IBusHandlerServe, 5),
  34. chRegisterOut: make(chan bool, 5),
  35. }
  36. go sf.run()
  37. return sf
  38. }
  39. // Register -- регистрирует обработчик запросов
  40. func (sf *dictServe) Register(handler IBusHandlerServe) {
  41. Hassert(handler != nil, "dictServe.Register(): IBusHandlerSubscribe==nil")
  42. topic := handler.Topic()
  43. Hassert(topic != "", "dictServe.Register(): empty topic of handler")
  44. sf.chRegisterIn <- handler
  45. isTwinRegister := <-sf.chRegisterOut
  46. Hassert(!isTwinRegister, "dictServe.Register(): handler of topic (%v) already register", handler.Topic())
  47. }
  48. // Unregister -- удаляет обработчик запросов из словаря
  49. func (sf *dictServe) Unregister(handler IBusHandlerServe) {
  50. Hassert(handler != nil, "dictServe.Unregister(): IBusHandlerSubscribe==nil")
  51. sf.chUnregisterIn <- handler
  52. }
  53. type serveResp struct {
  54. binResp []byte
  55. err error
  56. }
  57. // SendRequest -- вызывает обработчик при поступлении запроса
  58. func (sf *dictServe) SendRequest(topic ATopic, binReq []byte) ([]byte, error) {
  59. req := &msg_serve.ServeReq{
  60. Topic_: topic,
  61. BinReq_: binReq,
  62. }
  63. sf.chSendRequestIn <- req
  64. resp := <-sf.chSendRequestOut
  65. return resp.binResp, resp.err
  66. }
  67. func (sf *dictServe) run() {
  68. for {
  69. select {
  70. case handler := <-sf.chUnregisterIn:
  71. delete(sf.dictServe, handler.Topic())
  72. case reqServe := <-sf.chSendRequestIn:
  73. binResp, err := sf.sendRequest(reqServe)
  74. resp := &serveResp{
  75. err: err,
  76. binResp: binResp,
  77. }
  78. sf.chSendRequestOut <- resp
  79. case handler := <-sf.chRegisterIn:
  80. sf.chRegisterOut <- sf.register(handler)
  81. }
  82. }
  83. }
  84. var TimeoutDefault = 15000
  85. // вызывает обработчик при поступлении запроса
  86. func (sf *dictServe) sendRequest(req *msg_serve.ServeReq) ([]byte, error) {
  87. handler, isOk := sf.dictServe[req.Topic_]
  88. if !isOk {
  89. return nil, fmt.Errorf("dictServe.sendRequest(): handler for topic (%v) not exists", req.Topic_)
  90. }
  91. var (
  92. chErr = make(chan error, 2)
  93. binRes []byte
  94. )
  95. ctx, fnCancel := context.WithTimeout(sf.ctx.BaseCtx(), time.Millisecond*time.Duration(TimeoutDefault))
  96. defer fnCancel()
  97. fnCall := func() {
  98. defer close(chErr)
  99. var err error
  100. binRes, err = handler.FnBack(req.BinReq_)
  101. if err != nil {
  102. chErr <- err
  103. }
  104. }
  105. go fnCall()
  106. select {
  107. case <-ctx.Done():
  108. return nil, fmt.Errorf("dictServe.sendRequest(): in call for topic (%v), err=\n\t%w", req.Topic_, ctx.Err())
  109. case err := <-chErr:
  110. if err != nil {
  111. return nil, fmt.Errorf("dictServe.sendRequest(): error in call for topic (%v), err=\n\t%w", req.Topic_, err)
  112. }
  113. }
  114. return binRes, nil
  115. }
  116. // регистрирует обработчик запросов
  117. func (sf *dictServe) register(handler IBusHandlerServe) bool {
  118. topic := handler.Topic()
  119. _, isOk := sf.dictServe[topic]
  120. if isOk {
  121. return true
  122. }
  123. sf.dictServe[topic] = handler
  124. return false
  125. }