dict_topic_serve.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100
  1. // package dict_topic_serve -- словарь топиков обработчиков запросов.
  2. package dict_topic_serve
  3. import (
  4. "context"
  5. "fmt"
  6. "sync"
  7. "time"
  8. mL0 "gitp78su.ipnodns.ru/svi/kern/v4/lev0"
  9. mKd "gitp78su.ipnodns.ru/svi/kern/v4/lev0/defs"
  10. mKs "gitp78su.ipnodns.ru/svi/kern/v4/lev0/kspec"
  11. "gitp78su.ipnodns.ru/svi/kern/v4/lev2/kctx"
  12. )
  13. // dictServe -- потокобезопасный словарь обработчиков запросов.
  14. //
  15. // Допускается только один обработчик запросов на один топик.
  16. type dictServe struct {
  17. sync.RWMutex
  18. kCtx mKs.IKernelCtx
  19. dictServe map[*mKd.Topic]mKs.IBusHandlerServe
  20. }
  21. // NewDictServe -- возвращает потокобезопасный словарь обработчиков запросов.
  22. func NewDictServe() *dictServe {
  23. sf := &dictServe{
  24. kCtx: kctx.GetKernelCtx(),
  25. dictServe: map[*mKd.Topic]mKs.IBusHandlerServe{},
  26. }
  27. return sf
  28. }
  29. // Register -- регистрирует обработчик запросов.
  30. func (sf *dictServe) Register(handler mKs.IBusHandlerServe) mL0.IResult[bool] {
  31. sf.Lock()
  32. defer sf.Unlock()
  33. if handler == nil {
  34. return mL0.NewErr[bool](fmt.Errorf("dictServe.Register(): IBusHandlerSubscribe==nil"))
  35. }
  36. topic := handler.Topic()
  37. isTwinRegister := sf.register(handler)
  38. if isTwinRegister {
  39. return mL0.NewErr[bool](fmt.Errorf("dictServe.Register(): handler of topic (%v) already register", topic))
  40. }
  41. return mL0.NewRes(true)
  42. }
  43. // Unregister -- удаляет обработчик запросов из словаря.
  44. func (sf *dictServe) Unregister(handler mKs.IBusHandlerServe) {
  45. sf.Lock()
  46. defer sf.Unlock()
  47. if handler == nil {
  48. sf.kCtx.Log().Err("dictServe.Unregister(): IBusHandlerSubscribe==nil")
  49. return
  50. }
  51. delete(sf.dictServe, handler.Topic())
  52. }
  53. // SendRequest -- вызывает обработчик при поступлении запроса.
  54. func (sf *dictServe) SendRequest(topic *mKd.Topic, binReq []byte) mL0.IResult[[]byte] {
  55. sf.RLock()
  56. defer sf.RUnlock()
  57. handler, isOk := sf.dictServe[topic]
  58. if !isOk {
  59. err := fmt.Errorf("dictServe.SendRequest(): handler for topic (%v) not exists", topic)
  60. return mL0.NewErr[[]byte](err)
  61. }
  62. var (
  63. chRes = make(chan mL0.IResult[[]byte], 2)
  64. )
  65. ctx, fnCancel := context.WithTimeout(sf.kCtx.Ctx(), time.Millisecond*time.Duration(TimeoutDefault))
  66. defer fnCancel()
  67. fnCall := func() {
  68. defer close(chRes)
  69. res := handler.FnBack(binReq)
  70. chRes <- res
  71. }
  72. go fnCall()
  73. select {
  74. case <-ctx.Done():
  75. err := fmt.Errorf("dictServe.SendRequest(): in call for topic (%v), err=\n\t%w", topic, ctx.Err())
  76. return mL0.NewErr[[]byte](err)
  77. case res := <-chRes:
  78. return res
  79. }
  80. }
  81. var TimeoutDefault = 15000
  82. // регистрирует обработчик запросов.
  83. func (sf *dictServe) register(handler mKs.IBusHandlerServe) bool {
  84. topic := handler.Topic()
  85. _, isOk := sf.dictServe[topic]
  86. if isOk {
  87. return true
  88. }
  89. sf.dictServe[topic] = handler
  90. return false
  91. }