dict_topic_serve.go 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. // package dict_topic_serve -- словарь топиков обработчиков запросов.
  2. package dict_topic_serve
  3. import (
  4. "context"
  5. "fmt"
  6. "sync"
  7. "time"
  8. mKa "gitp78su.ipnodns.ru/svi/kern/v4/lev0/kalias"
  9. mKt "gitp78su.ipnodns.ru/svi/kern/v4/lev0/ktypes"
  10. mL1 "gitp78su.ipnodns.ru/svi/kern/v4/lev1"
  11. "gitp78su.ipnodns.ru/svi/kern/v4/lev2/kctx"
  12. )
  13. // dictServe -- потокобезопасный словарь обработчиков запросов.
  14. //
  15. // Допускается только один обработчик запросов на один топик.
  16. type dictServe struct {
  17. sync.RWMutex
  18. kCtx mKt.IKernelCtx
  19. dictServe map[mKa.ATopic]mKt.IBusHandlerServe
  20. }
  21. // NewDictServe -- возвращает потокобезопасный словарь обработчиков запросов.
  22. func NewDictServe() mKt.IResult[*dictServe] {
  23. resKernCtx := kctx.GetKernelCtx()
  24. if resKernCtx.IsErr() {
  25. err := fmt.Errorf("NewDictServe(): in get kernel ctx, err=\n\t%w", resKernCtx.Err())
  26. return mL1.NewErr[*dictServe](err)
  27. }
  28. sf := &dictServe{
  29. kCtx: resKernCtx.Val(),
  30. dictServe: map[mKa.ATopic]mKt.IBusHandlerServe{},
  31. }
  32. return mL1.NewRes(sf)
  33. }
  34. // Register -- регистрирует обработчик запросов.
  35. func (sf *dictServe) Register(handler mKt.IBusHandlerServe) mKt.IResult[bool] {
  36. sf.Lock()
  37. defer sf.Unlock()
  38. if handler == nil {
  39. return mL1.NewErr[bool](fmt.Errorf("dictServe.Register(): IBusHandlerSubscribe==nil"))
  40. }
  41. topic := handler.Topic()
  42. if topic == "" {
  43. return mL1.NewErr[bool](fmt.Errorf("dictServe.Register(): empty topic of handler"))
  44. }
  45. isTwinRegister := sf.register(handler)
  46. if isTwinRegister {
  47. return mL1.NewErr[bool](fmt.Errorf("dictServe.Register(): handler of topic (%v) already register", topic))
  48. }
  49. return mL1.NewRes(true)
  50. }
  51. // Unregister -- удаляет обработчик запросов из словаря.
  52. func (sf *dictServe) Unregister(handler mKt.IBusHandlerServe) {
  53. sf.Lock()
  54. defer sf.Unlock()
  55. if handler == nil {
  56. sf.kCtx.Log().Err("dictServe.Unregister(): IBusHandlerSubscribe==nil")
  57. return
  58. }
  59. delete(sf.dictServe, handler.Topic())
  60. }
  61. // SendRequest -- вызывает обработчик при поступлении запроса.
  62. func (sf *dictServe) SendRequest(topic mKa.ATopic, binReq []byte) mKt.IResult[[]byte] {
  63. sf.RLock()
  64. defer sf.RUnlock()
  65. handler, isOk := sf.dictServe[topic]
  66. if !isOk {
  67. err := fmt.Errorf("dictServe.SendRequest(): handler for topic (%v) not exists", topic)
  68. return mL1.NewErr[[]byte](err)
  69. }
  70. var (
  71. chRes = make(chan mKt.IResult[[]byte], 2)
  72. )
  73. ctx, fnCancel := context.WithTimeout(sf.kCtx.Ctx(), time.Millisecond*time.Duration(TimeoutDefault))
  74. defer fnCancel()
  75. fnCall := func() {
  76. defer close(chRes)
  77. res := handler.FnBack(binReq)
  78. chRes <- res
  79. }
  80. go fnCall()
  81. select {
  82. case <-ctx.Done():
  83. err := fmt.Errorf("dictServe.SendRequest(): in call for topic (%v), err=\n\t%w", topic, ctx.Err())
  84. return mL1.NewErr[[]byte](err)
  85. case res := <-chRes:
  86. return res
  87. }
  88. }
  89. var TimeoutDefault = 15000
  90. // регистрирует обработчик запросов.
  91. func (sf *dictServe) register(handler mKt.IBusHandlerServe) bool {
  92. topic := handler.Topic()
  93. _, isOk := sf.dictServe[topic]
  94. if isOk {
  95. return true
  96. }
  97. sf.dictServe[topic] = handler
  98. return false
  99. }