kbus_http.go 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215
  1. // package kbus_http -- шина сообщений поверх HTTP
  2. package kbus_http
  3. import (
  4. "fmt"
  5. "net/http"
  6. "sync"
  7. "github.com/gofiber/fiber/v2"
  8. "gitp78su.ipnodns.ru/svi/kern/v3/kc/log_buf"
  9. "gitp78su.ipnodns.ru/svi/kern/v3/krn/kbus/kbus_base"
  10. "gitp78su.ipnodns.ru/svi/kern/v3/krn/kbus/kbus_msg/msg_pub"
  11. "gitp78su.ipnodns.ru/svi/kern/v3/krn/kbus/kbus_msg/msg_serve"
  12. "gitp78su.ipnodns.ru/svi/kern/v3/krn/kbus/kbus_msg/msg_sub"
  13. "gitp78su.ipnodns.ru/svi/kern/v3/krn/kbus/kbus_msg/msg_unsub"
  14. "gitp78su.ipnodns.ru/svi/kern/v3/krn/kctx"
  15. "gitp78su.ipnodns.ru/svi/kern/v3/krn/kserv_http"
  16. . "gitp78su.ipnodns.ru/svi/kern/v3/krn/ktypes"
  17. "gitp78su.ipnodns.ru/svi/kern/v3/mock/mock_hand_sub_http"
  18. )
  19. // kBusHttp -- шина данных поверх HTTP
  20. type kBusHttp struct {
  21. *kbus_base.KBusBase
  22. log ILogBuf
  23. }
  24. var (
  25. Bus_ *kBusHttp
  26. block sync.Mutex
  27. )
  28. // GetKernelBusHttp -- возвращает шину HTTP
  29. func GetKernelBusHttp() IKernelBus {
  30. block.Lock()
  31. defer block.Unlock()
  32. if Bus_ != nil {
  33. return Bus_
  34. }
  35. log := log_buf.NewLogBuf(log_buf.OptIsTerm(true), log_buf.OptPrefix("kBusHttp"))
  36. log.Debug("GetKernelBusHttp(): new")
  37. ctx := kctx.GetKernelCtx()
  38. sf := &kBusHttp{
  39. KBusBase: kbus_base.GetKernelBusBase(),
  40. log: log,
  41. }
  42. servHttp := kserv_http.GetKernelServHttp()
  43. fibApp := servHttp.Fiber()
  44. fibApp.Post("/bus/sub", sf.postSub) // Топик подписки, IN
  45. fibApp.Post("/bus/unsub", sf.postUnsub) // Топик отписки, IN
  46. fibApp.Post("/bus/request", sf.postSendRequest) // Топик входящих запросов, IN
  47. fibApp.Post("/bus/pub", sf.postPublish) // Топик публикаций подписки, IN
  48. ctx.Set("kernBus", sf, "GetKernelBusHttp(): http data bus")
  49. Bus_ = sf
  50. return Bus_
  51. }
  52. // Входящий запрос HTTP на подписку
  53. func (sf *kBusHttp) postSub(ctx *fiber.Ctx) error {
  54. sf.log.Debug("postSub()")
  55. ctx.Set("Content-type", "text/html; charset=utf8")
  56. ctx.Set("Content-type", "text/json")
  57. ctx.Set("Cache-Control", "no-cache")
  58. sf.log.Debug("postSub()")
  59. req := &msg_sub.SubscribeReq{}
  60. err := ctx.BodyParser(req)
  61. if err != nil {
  62. resp := &msg_sub.SubscribeResp{
  63. Status_: fmt.Sprintf("kernelBusHttp.postSub(): in parse request, err=\n\t%v\n", err),
  64. Uuid_: req.Uuid_,
  65. }
  66. resp.SelfCheck()
  67. ctx.Response().SetStatusCode(http.StatusBadRequest)
  68. sf.log.Err("postSub(): in body parser, status=%q", resp.Status_)
  69. return ctx.JSON(resp)
  70. }
  71. resp := sf.processSubscribe(req)
  72. resp.SelfCheck()
  73. return ctx.JSON(resp)
  74. }
  75. // Процесс подписки веб-хука
  76. func (sf *kBusHttp) processSubscribe(req *msg_sub.SubscribeReq) *msg_sub.SubscribeResp {
  77. req.SelfCheck()
  78. handler := mock_hand_sub_http.NewMockHandSubHttp(req.Topic_, req.WebHook_)
  79. resp := &msg_sub.SubscribeResp{
  80. Status_: "ok",
  81. Uuid_: req.Uuid_,
  82. Name_: handler.Name(),
  83. }
  84. res := sf.Subscribe(handler)
  85. if res.IsErr() {
  86. resp.Status_ = fmt.Sprintf("kernelBusHttp.processSubscribe(): err=\n\t%v", res.Err())
  87. return resp
  88. }
  89. return resp
  90. }
  91. // Входящая публикация
  92. func (sf *kBusHttp) postPublish(ctx *fiber.Ctx) error {
  93. sf.log.Debug("postPublish()")
  94. ctx.Set("Content-type", "text/html; charset=utf8")
  95. ctx.Set("Content-type", "text/json")
  96. ctx.Set("Cache-Control", "no-cache")
  97. req := &msg_pub.PublishReq{}
  98. err := ctx.BodyParser(req)
  99. if err != nil {
  100. resp := &msg_pub.PublishResp{
  101. Status_: fmt.Sprintf("kernelBusHttp.postPublish(): in parse request, err=\n\t%v\n", err),
  102. Uuid_: req.Uuid_,
  103. }
  104. resp.SelfCheck()
  105. ctx.Response().SetStatusCode(http.StatusBadRequest)
  106. sf.log.Err("postPublish(): in body parser, status=%v", resp.Status_)
  107. return ctx.JSON(resp)
  108. }
  109. resp := sf.processPublish(req)
  110. resp.SelfCheck()
  111. return ctx.JSON(resp)
  112. }
  113. // Выполняет процесс публикации
  114. func (sf *kBusHttp) processPublish(req *msg_pub.PublishReq) *msg_pub.PublishResp {
  115. req.SelfCheck()
  116. res := sf.Publish(req.Topic_, req.BinMsg_)
  117. resp := &msg_pub.PublishResp{
  118. Status_: "ok",
  119. Uuid_: req.Uuid_,
  120. }
  121. if res.IsErr() {
  122. resp.Status_ = fmt.Sprintf("kernelBusHttp.processPublish(): err=\n\t%v", res.Err())
  123. return resp
  124. }
  125. return resp
  126. }
  127. // Входящий запрос
  128. func (sf *kBusHttp) postSendRequest(ctx *fiber.Ctx) error {
  129. sf.log.Debug("postSendRequest()")
  130. ctx.Set("Content-type", "text/html; charset=utf8")
  131. ctx.Set("Content-type", "text/json")
  132. ctx.Set("Cache-Control", "no-cache")
  133. req := &msg_serve.ServeReq{}
  134. err := ctx.BodyParser(req)
  135. if err != nil {
  136. resp := &msg_serve.ServeResp{
  137. Status_: fmt.Sprintf("kernelBusHttp.postSendRequest(): err=\n\t%v", err),
  138. Uuid_: req.Uuid_,
  139. }
  140. resp.SelfCheck()
  141. ctx.Response().SetStatusCode(http.StatusBadRequest)
  142. sf.log.Err("postSendRequest(): in body parser, status=%v", resp.Status_)
  143. return ctx.JSON(resp)
  144. }
  145. resp := sf.processSendRequest(req)
  146. resp.SelfCheck()
  147. return ctx.JSON(resp)
  148. }
  149. // Обрабатывает входящий запрос
  150. func (sf *kBusHttp) processSendRequest(req *msg_serve.ServeReq) *msg_serve.ServeResp {
  151. req.SelfCheck()
  152. res := sf.SendRequest(req.Topic_, req.BinReq_)
  153. resp := &msg_serve.ServeResp{
  154. Status_: "ok",
  155. Uuid_: req.Uuid_,
  156. }
  157. if res.IsErr() {
  158. resp.Status_ = fmt.Sprintf("kernelBusHttp.processSendRequest(): err=\n\t%v", res.Err())
  159. return resp
  160. }
  161. resp.BinResp_ = res.Val()
  162. return resp
  163. }
  164. // Входящая отписка от топика по HTTP
  165. func (sf *kBusHttp) postUnsub(ctx *fiber.Ctx) error {
  166. sf.log.Debug("postUnsub()")
  167. ctx.Set("Content-type", "text/html; charset=utf8")
  168. ctx.Set("Content-type", "text/json")
  169. ctx.Set("Cache-Control", "no-cache")
  170. req := &msg_unsub.UnsubReq{}
  171. err := ctx.BodyParser(req)
  172. if err != nil {
  173. resp := &msg_serve.ServeResp{
  174. Status_: fmt.Sprintf("kernelBusHttp.postSendRequest(): err=\n\t%v", err),
  175. Uuid_: req.Uuid_,
  176. }
  177. resp.SelfCheck()
  178. ctx.Response().SetStatusCode(http.StatusBadRequest)
  179. sf.log.Err("postUnsub(): in body_parser, status=%q", resp.Status_)
  180. return ctx.JSON(resp)
  181. }
  182. resp := sf.processUnsubRequest(req)
  183. resp.SelfCheck()
  184. return ctx.JSON(resp)
  185. }
  186. // Процесс отписки от топика
  187. func (sf *kBusHttp) processUnsubRequest(req *msg_unsub.UnsubReq) *msg_unsub.UnsubResp {
  188. req.SelfCheck()
  189. _hand := sf.Ctx_.Get(string(req.Name_))
  190. resp := &msg_unsub.UnsubResp{
  191. Status_: "ok",
  192. Uuid_: req.Uuid_,
  193. }
  194. if _hand == nil {
  195. resp.Status_ = fmt.Sprintf("kernelBusHttp.processUnsubRequest(): handler name(%v) not exists", req.Name_)
  196. return resp
  197. }
  198. hand := _hand.Val().(IBusHandlerSubscribe)
  199. sf.Unsubscribe(hand)
  200. return resp
  201. }