kbus_http.go 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205
  1. // package kbus_http -- шина сообщений поверх HTTP
  2. package kbus_http
  3. import (
  4. "fmt"
  5. "net/http"
  6. "github.com/gofiber/fiber/v2"
  7. "gitp78su.ipnodns.ru/svi/kern/v2/krn/kbus/kbus_base"
  8. "gitp78su.ipnodns.ru/svi/kern/v2/krn/kbus/kbus_msg/msg_pub"
  9. "gitp78su.ipnodns.ru/svi/kern/v2/krn/kbus/kbus_msg/msg_serve"
  10. "gitp78su.ipnodns.ru/svi/kern/v2/krn/kbus/kbus_msg/msg_sub"
  11. "gitp78su.ipnodns.ru/svi/kern/v2/krn/kbus/kbus_msg/msg_unsub"
  12. "gitp78su.ipnodns.ru/svi/kern/v2/krn/kctx"
  13. "gitp78su.ipnodns.ru/svi/kern/v2/krn/kserv_http"
  14. . "gitp78su.ipnodns.ru/svi/kern/v2/krn/ktypes"
  15. "gitp78su.ipnodns.ru/svi/kern/v2/mock/mock_hand_sub_http"
  16. )
  17. // kBusHttp -- шина данных поверх HTTP
  18. type kBusHttp struct {
  19. *kbus_base.KBusBase
  20. log ILogBuf
  21. }
  22. var (
  23. bus *kBusHttp
  24. )
  25. // GetKernelBusHttp -- возвращает шину HTTP
  26. func GetKernelBusHttp() IKernelBus {
  27. if bus != nil {
  28. return bus
  29. }
  30. ctx := kctx.GetKernelCtx()
  31. bus = &kBusHttp{
  32. KBusBase: kbus_base.GetKernelBusBase(),
  33. }
  34. bus.log = bus.Log()
  35. ctx.Set("kernBus", bus, "http data bus")
  36. fibApp := kserv_http.GetKernelServHttp().Fiber()
  37. fibApp.Post("/bus/sub", bus.postSub) // Топик подписки, IN
  38. fibApp.Post("/bus/unsub", bus.postUnsub) // Топик отписки, IN
  39. fibApp.Post("/bus/request", bus.postSendRequest) // Топик входящих запросов, IN
  40. fibApp.Post("/bus/pub", bus.postPublish) // Топик публикаций подписки, IN
  41. return bus
  42. }
  43. // Входящий запрос HTTP на подписку
  44. func (sf *kBusHttp) postSub(ctx *fiber.Ctx) error {
  45. ctx.Set("Content-type", "text/html; charset=utf8")
  46. ctx.Set("Content-type", "text/json")
  47. ctx.Set("Cache-Control", "no-cache")
  48. sf.log.Debug("kBusHttp.postSub()")
  49. req := &msg_sub.SubscribeReq{}
  50. err := ctx.BodyParser(req)
  51. if err != nil {
  52. resp := &msg_sub.SubscribeResp{
  53. Status_: fmt.Sprintf("kernelBusHttp.postSub(): in parse request, err=\n\t%v\n", err),
  54. Uuid_: req.Uuid_,
  55. }
  56. resp.SelfCheck()
  57. ctx.Response().SetStatusCode(http.StatusBadRequest)
  58. sf.log.Err(resp.Status_)
  59. return ctx.JSON(resp)
  60. }
  61. resp := sf.processSubscribe(req)
  62. resp.SelfCheck()
  63. return ctx.JSON(resp)
  64. }
  65. // Процесс подписки веб-хука
  66. func (sf *kBusHttp) processSubscribe(req *msg_sub.SubscribeReq) *msg_sub.SubscribeResp {
  67. req.SelfCheck()
  68. handler := mock_hand_sub_http.NewMockHandSubHttp(req.Topic_, req.WebHook_)
  69. resp := &msg_sub.SubscribeResp{
  70. Status_: "ok",
  71. Uuid_: req.Uuid_,
  72. Name_: handler.Name(),
  73. }
  74. res := sf.Subscribe(handler)
  75. if res.IsErr() {
  76. resp.Status_ = fmt.Sprintf("kernelBusHttp.processSubscribe(): err=\n\t%v", res.Err())
  77. return resp
  78. }
  79. return resp
  80. }
  81. // Входящая публикация
  82. func (sf *kBusHttp) postPublish(ctx *fiber.Ctx) error {
  83. sf.log.Debug("kBusHttp.postPublish()")
  84. ctx.Set("Content-type", "text/html; charset=utf8")
  85. ctx.Set("Content-type", "text/json")
  86. ctx.Set("Cache-Control", "no-cache")
  87. req := &msg_pub.PublishReq{}
  88. err := ctx.BodyParser(req)
  89. if err != nil {
  90. resp := &msg_pub.PublishResp{
  91. Status_: fmt.Sprintf("kernelBusHttp.postPublish(): in parse request, err=\n\t%v\n", err),
  92. Uuid_: req.Uuid_,
  93. }
  94. resp.SelfCheck()
  95. ctx.Response().SetStatusCode(http.StatusBadRequest)
  96. sf.log.Err(resp.Status_)
  97. return ctx.JSON(resp)
  98. }
  99. resp := sf.processPublish(req)
  100. resp.SelfCheck()
  101. return ctx.JSON(resp)
  102. }
  103. // Выполняет процесс публикации
  104. func (sf *kBusHttp) processPublish(req *msg_pub.PublishReq) *msg_pub.PublishResp {
  105. req.SelfCheck()
  106. res := sf.Publish(req.Topic_, req.BinMsg_)
  107. resp := &msg_pub.PublishResp{
  108. Status_: "ok",
  109. Uuid_: req.Uuid_,
  110. }
  111. if res.IsErr() {
  112. resp.Status_ = fmt.Sprintf("kernelBusHttp.processPublish(): err=\n\t%v", res.Err())
  113. return resp
  114. }
  115. return resp
  116. }
  117. // Входящий запрос
  118. func (sf *kBusHttp) postSendRequest(ctx *fiber.Ctx) error {
  119. sf.log.Debug("kBusHttp.postSendRequest()")
  120. ctx.Set("Content-type", "text/html; charset=utf8")
  121. ctx.Set("Content-type", "text/json")
  122. ctx.Set("Cache-Control", "no-cache")
  123. req := &msg_serve.ServeReq{}
  124. err := ctx.BodyParser(req)
  125. if err != nil {
  126. resp := &msg_serve.ServeResp{
  127. Status_: fmt.Sprintf("kernelBusHttp.postSendRequest(): err=\n\t%v", err),
  128. Uuid_: req.Uuid_,
  129. }
  130. resp.SelfCheck()
  131. ctx.Response().SetStatusCode(http.StatusBadRequest)
  132. sf.log.Err(resp.Status_)
  133. return ctx.JSON(resp)
  134. }
  135. resp := sf.processSendRequest(req)
  136. resp.SelfCheck()
  137. return ctx.JSON(resp)
  138. }
  139. // Обрабатывает входящий запрос
  140. func (sf *kBusHttp) processSendRequest(req *msg_serve.ServeReq) *msg_serve.ServeResp {
  141. req.SelfCheck()
  142. res := sf.SendRequest(req.Topic_, req.BinReq_)
  143. resp := &msg_serve.ServeResp{
  144. Status_: "ok",
  145. Uuid_: req.Uuid_,
  146. }
  147. if res.IsErr() {
  148. resp.Status_ = fmt.Sprintf("kernelBusHttp.processSendRequest(): err=\n\t%v", res.Err())
  149. return resp
  150. }
  151. resp.BinResp_ = res.Val()
  152. return resp
  153. }
  154. // Входящая отписка от топика по HTTP
  155. func (sf *kBusHttp) postUnsub(ctx *fiber.Ctx) error {
  156. sf.log.Debug("kBusHttp.postUnsub()")
  157. ctx.Set("Content-type", "text/html; charset=utf8")
  158. ctx.Set("Content-type", "text/json")
  159. ctx.Set("Cache-Control", "no-cache")
  160. req := &msg_unsub.UnsubReq{}
  161. err := ctx.BodyParser(req)
  162. if err != nil {
  163. resp := &msg_serve.ServeResp{
  164. Status_: fmt.Sprintf("kernelBusHttp.postSendRequest(): err=\n\t%v", err),
  165. Uuid_: req.Uuid_,
  166. }
  167. resp.SelfCheck()
  168. ctx.Response().SetStatusCode(http.StatusBadRequest)
  169. sf.log.Err(resp.Status_)
  170. return ctx.JSON(resp)
  171. }
  172. resp := sf.processUnsubRequest(req)
  173. resp.SelfCheck()
  174. return ctx.JSON(resp)
  175. }
  176. // Процесс отписки от топика
  177. func (sf *kBusHttp) processUnsubRequest(req *msg_unsub.UnsubReq) *msg_unsub.UnsubResp {
  178. req.SelfCheck()
  179. _hand := sf.Ctx_.Get(string(req.Name_))
  180. resp := &msg_unsub.UnsubResp{
  181. Status_: "ok",
  182. Uuid_: req.Uuid_,
  183. }
  184. if _hand == nil {
  185. resp.Status_ = fmt.Sprintf("kernelBusHttp.processUnsubRequest(): handler name(%v) not exists", req.Name_)
  186. return resp
  187. }
  188. hand := _hand.Val().(IBusHandlerSubscribe)
  189. sf.Unsubscribe(hand)
  190. return resp
  191. }