dict_topic_serve.go 3.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. // package dict_topic_serve -- словарь топиков обработчиков запросов
  2. package dict_topic_serve
  3. import (
  4. "context"
  5. "fmt"
  6. "sync"
  7. "time"
  8. . "gitp78su.ipnodns.ru/svi/kern/kc/helpers"
  9. . "gitp78su.ipnodns.ru/svi/kern/krn/kalias"
  10. "gitp78su.ipnodns.ru/svi/kern/krn/kctx"
  11. . "gitp78su.ipnodns.ru/svi/kern/krn/ktypes"
  12. )
  13. // dictServe -- потокобезопасный словарь обработчиков запросов
  14. //
  15. // Допускается только один обработчик запросов на один топик
  16. type dictServe struct {
  17. sync.RWMutex
  18. ctx IKernelCtx
  19. dictServe map[ATopic]IBusHandlerServe
  20. }
  21. // NewDictServe -- возвращает потокобезопасный словарь обработчиков запросов
  22. func NewDictServe() IDictTopicServe {
  23. sf := &dictServe{
  24. ctx: kctx.GetKernelCtx(),
  25. dictServe: map[ATopic]IBusHandlerServe{},
  26. }
  27. return sf
  28. }
  29. // Register -- регистрирует обработчик запросов
  30. func (sf *dictServe) Register(handler IBusHandlerServe) {
  31. sf.Lock()
  32. defer sf.Unlock()
  33. Hassert(handler != nil, "dictServe.Register(): IBusHandlerSubscribe==nil")
  34. topic := handler.Topic()
  35. Hassert(topic != "", "dictServe.Register(): empty topic of handler")
  36. isTwinRegister := sf.register(handler)
  37. Hassert(!isTwinRegister, "dictServe.Register(): handler of topic (%v) already register", handler.Topic())
  38. }
  39. // Unregister -- удаляет обработчик запросов из словаря
  40. func (sf *dictServe) Unregister(handler IBusHandlerServe) {
  41. sf.Lock()
  42. defer sf.Unlock()
  43. Hassert(handler != nil, "dictServe.Unregister(): IBusHandlerSubscribe==nil")
  44. delete(sf.dictServe, handler.Topic())
  45. }
  46. // SendRequest -- вызывает обработчик при поступлении запроса
  47. func (sf *dictServe) SendRequest(topic ATopic, binReq []byte) ([]byte, error) {
  48. sf.RLock()
  49. defer sf.RUnlock()
  50. handler, isOk := sf.dictServe[topic]
  51. if !isOk {
  52. return nil, fmt.Errorf("dictServe.SendRequest(): handler for topic (%v) not exists", topic)
  53. }
  54. var (
  55. chErr = make(chan error, 2)
  56. binRes []byte
  57. )
  58. ctx, fnCancel := context.WithTimeout(sf.ctx.BaseCtx(), time.Millisecond*time.Duration(TimeoutDefault))
  59. defer fnCancel()
  60. fnCall := func() {
  61. defer close(chErr)
  62. var err error
  63. binRes, err = handler.FnBack(binReq)
  64. if err != nil {
  65. chErr <- err
  66. }
  67. }
  68. go fnCall()
  69. select {
  70. case <-ctx.Done():
  71. return nil, fmt.Errorf("dictServe.SendRequest(): in call for topic (%v), err=\n\t%w", topic, ctx.Err())
  72. case err := <-chErr:
  73. if err != nil {
  74. return nil, fmt.Errorf("dictServe.SendRequest(): error in call for topic (%v), err=\n\t%w", topic, err)
  75. }
  76. }
  77. return binRes, nil
  78. }
  79. var TimeoutDefault = 15000
  80. // регистрирует обработчик запросов
  81. func (sf *dictServe) register(handler IBusHandlerServe) bool {
  82. topic := handler.Topic()
  83. _, isOk := sf.dictServe[topic]
  84. if isOk {
  85. return true
  86. }
  87. sf.dictServe[topic] = handler
  88. return false
  89. }