dict_topic_serve.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. // package dict_topic_serve -- словарь топиков обработчиков запросов
  2. package dict_topic_serve
  3. import (
  4. "context"
  5. "fmt"
  6. "sync"
  7. "time"
  8. "gitp78su.ipnodns.ru/svi/kern/v4/krn/kctx"
  9. . "gitp78su.ipnodns.ru/svi/kern/v4/lev0/kalias"
  10. . "gitp78su.ipnodns.ru/svi/kern/v4/lev0/ktypes"
  11. . "gitp78su.ipnodns.ru/svi/kern/v4/lev1/helpers"
  12. . "gitp78su.ipnodns.ru/svi/kern/v4/lev1/result"
  13. )
  14. // dictServe -- потокобезопасный словарь обработчиков запросов
  15. //
  16. // Допускается только один обработчик запросов на один топик
  17. type dictServe struct {
  18. sync.RWMutex
  19. ctx IKernelCtx
  20. dictServe map[ATopic]IBusHandlerServe
  21. }
  22. // NewDictServe -- возвращает потокобезопасный словарь обработчиков запросов
  23. func NewDictServe() IDictTopicServe {
  24. sf := &dictServe{
  25. ctx: kctx.GetKernelCtx(),
  26. dictServe: map[ATopic]IBusHandlerServe{},
  27. }
  28. return sf
  29. }
  30. // Register -- регистрирует обработчик запросов
  31. func (sf *dictServe) Register(handler IBusHandlerServe) IResult[bool] {
  32. sf.Lock()
  33. defer sf.Unlock()
  34. if handler == nil {
  35. return NewErr[bool](fmt.Errorf("dictServe.Register(): IBusHandlerSubscribe==nil"))
  36. }
  37. topic := handler.Topic()
  38. if topic == "" {
  39. return NewErr[bool](fmt.Errorf("dictServe.Register(): empty topic of handler"))
  40. }
  41. isTwinRegister := sf.register(handler)
  42. if isTwinRegister {
  43. return NewErr[bool](fmt.Errorf("dictServe.Register(): handler of topic (%v) already register", topic))
  44. }
  45. return NewRes(true)
  46. }
  47. // Unregister -- удаляет обработчик запросов из словаря
  48. func (sf *dictServe) Unregister(handler IBusHandlerServe) {
  49. sf.Lock()
  50. defer sf.Unlock()
  51. Hassert(handler != nil, "dictServe.Unregister(): IBusHandlerSubscribe==nil")
  52. delete(sf.dictServe, handler.Topic())
  53. }
  54. // SendRequest -- вызывает обработчик при поступлении запроса
  55. func (sf *dictServe) SendRequest(topic ATopic, binReq []byte) IResult[[]byte] {
  56. sf.RLock()
  57. defer sf.RUnlock()
  58. handler, isOk := sf.dictServe[topic]
  59. if !isOk {
  60. err := fmt.Errorf("dictServe.SendRequest(): handler for topic (%v) not exists", topic)
  61. return NewErr[[]byte](err)
  62. }
  63. var (
  64. chRes = make(chan IResult[[]byte], 2)
  65. )
  66. ctx, fnCancel := context.WithTimeout(sf.ctx.Ctx(), time.Millisecond*time.Duration(TimeoutDefault))
  67. defer fnCancel()
  68. fnCall := func() {
  69. defer close(chRes)
  70. res := handler.FnBack(binReq)
  71. chRes <- res
  72. }
  73. go fnCall()
  74. select {
  75. case <-ctx.Done():
  76. err := fmt.Errorf("dictServe.SendRequest(): in call for topic (%v), err=\n\t%w", topic, ctx.Err())
  77. return NewErr[[]byte](err)
  78. case res := <-chRes:
  79. return res
  80. }
  81. }
  82. var TimeoutDefault = 15000
  83. // регистрирует обработчик запросов
  84. func (sf *dictServe) register(handler IBusHandlerServe) bool {
  85. topic := handler.Topic()
  86. _, isOk := sf.dictServe[topic]
  87. if isOk {
  88. return true
  89. }
  90. sf.dictServe[topic] = handler
  91. return false
  92. }