dict_topic_serve.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
  1. // package dict_topic_serve -- словарь топиков обработчиков запросов.
  2. package dict_topic_serve
  3. import (
  4. "context"
  5. "fmt"
  6. "sync"
  7. "time"
  8. mL0 "gitp78su.ipnodns.ru/svi/kern/v4/lev0"
  9. "gitp78su.ipnodns.ru/svi/kern/v4/lev0/defs/topic"
  10. "gitp78su.ipnodns.ru/svi/kern/v4/lev0/etypes/ebool"
  11. "gitp78su.ipnodns.ru/svi/kern/v4/lev0/kspec"
  12. mKs "gitp78su.ipnodns.ru/svi/kern/v4/lev0/kspec"
  13. "gitp78su.ipnodns.ru/svi/kern/v4/lev2/kctx"
  14. )
  15. // dictServe -- потокобезопасный словарь обработчиков запросов.
  16. //
  17. // Допускается только один обработчик запросов на один топик.
  18. type dictServe struct {
  19. sync.RWMutex
  20. kCtx mKs.IKernelCtx
  21. dictServe map[*topic.ATopic]mKs.IBusHandlerServe
  22. }
  23. // NewDictServe -- возвращает потокобезопасный словарь обработчиков запросов.
  24. func NewDictServe() *dictServe {
  25. sf := &dictServe{
  26. kCtx: kctx.GetKernelCtx(),
  27. dictServe: map[*topic.ATopic]mKs.IBusHandlerServe{},
  28. }
  29. return sf
  30. }
  31. // Register -- регистрирует обработчик запросов.
  32. func (sf *dictServe) Register(handler mKs.IBusHandlerServe) mL0.IResult[kspec.EBool] {
  33. sf.Lock()
  34. defer sf.Unlock()
  35. if handler == nil {
  36. err := fmt.Errorf("dictServe.Register(): IBusHandlerSubscribe==nil")
  37. return mL0.NewErr[kspec.EBool](err)
  38. }
  39. topic := handler.Topic()
  40. isTwinRegister := sf.register(handler)
  41. if isTwinRegister {
  42. err := fmt.Errorf("dictServe.Register(): handler of topic (%v) already register", topic)
  43. return mL0.NewErr[kspec.EBool](err)
  44. }
  45. return mL0.NewOk(ebool.NewEBool(true))
  46. }
  47. // Unregister -- удаляет обработчик запросов из словаря.
  48. func (sf *dictServe) Unregister(handler mKs.IBusHandlerServe) {
  49. sf.Lock()
  50. defer sf.Unlock()
  51. if handler == nil {
  52. sf.kCtx.Log().Err("dictServe.Unregister(): IBusHandlerSubscribe==nil")
  53. return
  54. }
  55. delete(sf.dictServe, handler.Topic())
  56. }
  57. // SendRequest -- вызывает обработчик при поступлении запроса.
  58. func (sf *dictServe) SendRequest(topic *topic.ATopic, binReq []byte) mL0.IResult[[]byte] {
  59. sf.RLock()
  60. defer sf.RUnlock()
  61. handler, isOk := sf.dictServe[topic]
  62. if !isOk {
  63. err := fmt.Errorf("dictServe.SendRequest(): handler for topic (%v) not exists", topic)
  64. return mL0.NewErr[[]byte](err)
  65. }
  66. var (
  67. chRes = make(chan mL0.IResult[[]byte], 2)
  68. )
  69. ctx, fnCancel := context.WithTimeout(sf.kCtx.Ctx(), time.Millisecond*time.Duration(TimeoutDefault))
  70. defer fnCancel()
  71. fnCall := func() {
  72. defer close(chRes)
  73. res := handler.FnBack(binReq)
  74. chRes <- res
  75. }
  76. go fnCall()
  77. select {
  78. case <-ctx.Done():
  79. err := fmt.Errorf("dictServe.SendRequest(): in call for topic (%v), err=\n\t%w", topic, ctx.Err())
  80. return mL0.NewErr[[]byte](err)
  81. case res := <-chRes:
  82. return res
  83. }
  84. }
  85. var TimeoutDefault = 15000
  86. // регистрирует обработчик запросов.
  87. func (sf *dictServe) register(handler mKs.IBusHandlerServe) bool {
  88. topic := handler.Topic()
  89. _, isOk := sf.dictServe[topic]
  90. if isOk {
  91. return true
  92. }
  93. sf.dictServe[topic] = handler
  94. return false
  95. }