dict_topic_serve.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. // package dict_topic_serve -- словарь топиков обработчиков запросов.
  2. package dict_topic_serve
  3. import (
  4. "context"
  5. "fmt"
  6. "sync"
  7. "time"
  8. mL0 "gitp78su.ipnodns.ru/svi/kern/v4/lev0"
  9. mKa "gitp78su.ipnodns.ru/svi/kern/v4/lev0/alias"
  10. mKt "gitp78su.ipnodns.ru/svi/kern/v4/lev0/ktypes"
  11. "gitp78su.ipnodns.ru/svi/kern/v4/lev2/kctx"
  12. )
  13. // dictServe -- потокобезопасный словарь обработчиков запросов.
  14. //
  15. // Допускается только один обработчик запросов на один топик.
  16. type dictServe struct {
  17. sync.RWMutex
  18. kCtx mKt.IKernelCtx
  19. dictServe map[*mKa.ATopic]mKt.IBusHandlerServe
  20. }
  21. // NewDictServe -- возвращает потокобезопасный словарь обработчиков запросов.
  22. func NewDictServe() *mL0.Result[*dictServe] {
  23. resKernCtx := kctx.GetKernelCtx()
  24. if resKernCtx.IsErr() {
  25. err := fmt.Errorf("NewDictServe(): in get kernel ctx, err=\n\t%w", resKernCtx.Err())
  26. return mL0.NewErr[*dictServe](err)
  27. }
  28. sf := &dictServe{
  29. kCtx: resKernCtx.Val(),
  30. dictServe: map[*mKa.ATopic]mKt.IBusHandlerServe{},
  31. }
  32. return mL0.NewRes(sf)
  33. }
  34. // Register -- регистрирует обработчик запросов.
  35. func (sf *dictServe) Register(handler mKt.IBusHandlerServe) *mL0.Result[bool] {
  36. sf.Lock()
  37. defer sf.Unlock()
  38. if handler == nil {
  39. return mL0.NewErr[bool](fmt.Errorf("dictServe.Register(): IBusHandlerSubscribe==nil"))
  40. }
  41. topic := handler.Topic()
  42. isTwinRegister := sf.register(handler)
  43. if isTwinRegister {
  44. return mL0.NewErr[bool](fmt.Errorf("dictServe.Register(): handler of topic (%v) already register", topic))
  45. }
  46. return mL0.NewRes(true)
  47. }
  48. // Unregister -- удаляет обработчик запросов из словаря.
  49. func (sf *dictServe) Unregister(handler mKt.IBusHandlerServe) {
  50. sf.Lock()
  51. defer sf.Unlock()
  52. if handler == nil {
  53. sf.kCtx.Log().Err("dictServe.Unregister(): IBusHandlerSubscribe==nil")
  54. return
  55. }
  56. delete(sf.dictServe, handler.Topic())
  57. }
  58. // SendRequest -- вызывает обработчик при поступлении запроса.
  59. func (sf *dictServe) SendRequest(topic *mKa.ATopic, binReq []byte) *mL0.Result[[]byte] {
  60. sf.RLock()
  61. defer sf.RUnlock()
  62. handler, isOk := sf.dictServe[topic]
  63. if !isOk {
  64. err := fmt.Errorf("dictServe.SendRequest(): handler for topic (%v) not exists", topic)
  65. return mL0.NewErr[[]byte](err)
  66. }
  67. var (
  68. chRes = make(chan *mL0.Result[[]byte], 2)
  69. )
  70. ctx, fnCancel := context.WithTimeout(sf.kCtx.Ctx(), time.Millisecond*time.Duration(TimeoutDefault))
  71. defer fnCancel()
  72. fnCall := func() {
  73. defer close(chRes)
  74. res := handler.FnBack(binReq)
  75. chRes <- res
  76. }
  77. go fnCall()
  78. select {
  79. case <-ctx.Done():
  80. err := fmt.Errorf("dictServe.SendRequest(): in call for topic (%v), err=\n\t%w", topic, ctx.Err())
  81. return mL0.NewErr[[]byte](err)
  82. case res := <-chRes:
  83. return res
  84. }
  85. }
  86. var TimeoutDefault = 15000
  87. // регистрирует обработчик запросов.
  88. func (sf *dictServe) register(handler mKt.IBusHandlerServe) bool {
  89. topic := handler.Topic()
  90. _, isOk := sf.dictServe[topic]
  91. if isOk {
  92. return true
  93. }
  94. sf.dictServe[topic] = handler
  95. return false
  96. }