dict_topic_serve.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. // package dict_topic_serve -- словарь топиков обработчиков запросов
  2. package dict_topic_serve
  3. import (
  4. "context"
  5. "fmt"
  6. "sync"
  7. "time"
  8. . "gitp78su.ipnodns.ru/svi/kern/v3/kc/helpers"
  9. . "gitp78su.ipnodns.ru/svi/kern/v3/krn/kalias"
  10. "gitp78su.ipnodns.ru/svi/kern/v3/krn/kctx"
  11. . "gitp78su.ipnodns.ru/svi/kern/v3/krn/ktypes"
  12. )
  13. // dictServe -- потокобезопасный словарь обработчиков запросов
  14. //
  15. // Допускается только один обработчик запросов на один топик
  16. type dictServe struct {
  17. sync.RWMutex
  18. ctx IKernelCtx
  19. dictServe map[ATopic]IBusHandlerServe
  20. }
  21. // NewDictServe -- возвращает потокобезопасный словарь обработчиков запросов
  22. func NewDictServe() IDictTopicServe {
  23. sf := &dictServe{
  24. ctx: kctx.GetKernelCtx(),
  25. dictServe: map[ATopic]IBusHandlerServe{},
  26. }
  27. return sf
  28. }
  29. // Register -- регистрирует обработчик запросов
  30. func (sf *dictServe) Register(handler IBusHandlerServe) {
  31. sf.Lock()
  32. defer sf.Unlock()
  33. Hassert(handler != nil, "dictServe.Register(): IBusHandlerSubscribe==nil")
  34. topic := handler.Topic()
  35. Hassert(topic != "", "dictServe.Register(): empty topic of handler")
  36. isTwinRegister := sf.register(handler)
  37. Hassert(!isTwinRegister, "dictServe.Register(): handler of topic (%v) already register", handler.Topic())
  38. }
  39. // Unregister -- удаляет обработчик запросов из словаря
  40. func (sf *dictServe) Unregister(handler IBusHandlerServe) {
  41. sf.Lock()
  42. defer sf.Unlock()
  43. Hassert(handler != nil, "dictServe.Unregister(): IBusHandlerSubscribe==nil")
  44. delete(sf.dictServe, handler.Topic())
  45. }
  46. // SendRequest -- вызывает обработчик при поступлении запроса
  47. func (sf *dictServe) SendRequest(topic ATopic, binReq []byte) Result[[]byte] {
  48. sf.RLock()
  49. defer sf.RUnlock()
  50. handler, isOk := sf.dictServe[topic]
  51. if !isOk {
  52. err := fmt.Errorf("dictServe.SendRequest(): handler for topic (%v) not exists", topic)
  53. return NewErr[[]byte](err)
  54. }
  55. var (
  56. chRes = make(chan Result[[]byte], 2)
  57. )
  58. ctx, fnCancel := context.WithTimeout(sf.ctx.Ctx(), time.Millisecond*time.Duration(TimeoutDefault))
  59. defer fnCancel()
  60. fnCall := func() {
  61. defer close(chRes)
  62. res := handler.FnBack(binReq)
  63. chRes <- res
  64. }
  65. go fnCall()
  66. select {
  67. case <-ctx.Done():
  68. err := fmt.Errorf("dictServe.SendRequest(): in call for topic (%v), err=\n\t%w", topic, ctx.Err())
  69. return NewErr[[]byte](err)
  70. case res := <-chRes:
  71. return res
  72. }
  73. }
  74. var TimeoutDefault = 15000
  75. // регистрирует обработчик запросов
  76. func (sf *dictServe) register(handler IBusHandlerServe) bool {
  77. topic := handler.Topic()
  78. _, isOk := sf.dictServe[topic]
  79. if isOk {
  80. return true
  81. }
  82. sf.dictServe[topic] = handler
  83. return false
  84. }