kbus_http.go 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221
  1. // package kbus_http -- шина сообщений поверх HTTP
  2. package kbus_http
  3. import (
  4. "fmt"
  5. "net/http"
  6. "sync"
  7. "github.com/gofiber/fiber/v2"
  8. "gitp78su.ipnodns.ru/svi/kern/v4/kc/log_buf"
  9. "gitp78su.ipnodns.ru/svi/kern/v4/krn/kbus/kbus_base"
  10. "gitp78su.ipnodns.ru/svi/kern/v4/krn/kbus/kbus_msg/msg_pub"
  11. "gitp78su.ipnodns.ru/svi/kern/v4/krn/kbus/kbus_msg/msg_serve"
  12. "gitp78su.ipnodns.ru/svi/kern/v4/krn/kbus/kbus_msg/msg_sub"
  13. "gitp78su.ipnodns.ru/svi/kern/v4/krn/kbus/kbus_msg/msg_unsub"
  14. "gitp78su.ipnodns.ru/svi/kern/v4/krn/kctx"
  15. "gitp78su.ipnodns.ru/svi/kern/v4/krn/kserv_http"
  16. . "gitp78su.ipnodns.ru/svi/kern/v4/lev0/ktypes"
  17. . "gitp78su.ipnodns.ru/svi/kern/v4/lev1/result"
  18. "gitp78su.ipnodns.ru/svi/kern/v4/mock/mock_hand_sub_http"
  19. )
  20. // kBusHttp -- шина данных поверх HTTP
  21. type kBusHttp struct {
  22. *kbus_base.KBusBase
  23. log ILogBuf
  24. }
  25. var (
  26. Bus_ *kBusHttp
  27. block sync.Mutex
  28. )
  29. // GetKernelBusHttp -- возвращает шину HTTP
  30. func GetKernelBusHttp() IResult[IKernelBus] {
  31. block.Lock()
  32. defer block.Unlock()
  33. if Bus_ != nil {
  34. return NewRes(IKernelBus(Bus_))
  35. }
  36. log := log_buf.NewLogBuf(log_buf.OptIsTerm(true), log_buf.OptPrefix("kBusHttp"))
  37. log.Debug("GetKernelBusHttp(): new")
  38. ctx := kctx.GetKernelCtx()
  39. sf := &kBusHttp{
  40. KBusBase: kbus_base.GetKernelBusBase(),
  41. log: log,
  42. }
  43. resServ := kserv_http.GetKernelServHttp()
  44. if resServ.IsErr() {
  45. err := fmt.Errorf("GetKernelBusHttp(): err=\n\t%v", resServ.Err())
  46. return NewErr[IKernelBus](err)
  47. }
  48. serv := resServ.Val()
  49. fibApp := serv.Fiber()
  50. fibApp.Post("/bus/sub", sf.postSub) // Топик подписки, IN
  51. fibApp.Post("/bus/unsub", sf.postUnsub) // Топик отписки, IN
  52. fibApp.Post("/bus/request", sf.postSendRequest) // Топик входящих запросов, IN
  53. fibApp.Post("/bus/pub", sf.postPublish) // Топик публикаций подписки, IN
  54. ctx.Set("kernBus", sf, "GetKernelBusHttp(): http data bus")
  55. Bus_ = sf
  56. return NewRes(IKernelBus(Bus_))
  57. }
  58. // Входящий запрос HTTP на подписку
  59. func (sf *kBusHttp) postSub(ctx *fiber.Ctx) error {
  60. sf.log.Debug("postSub()")
  61. ctx.Set("Content-type", "text/html; charset=utf8")
  62. ctx.Set("Content-type", "text/json")
  63. ctx.Set("Cache-Control", "no-cache")
  64. sf.log.Debug("postSub()")
  65. req := &msg_sub.SubscribeReq{}
  66. err := ctx.BodyParser(req)
  67. if err != nil {
  68. resp := &msg_sub.SubscribeResp{
  69. Status_: fmt.Sprintf("kernelBusHttp.postSub(): in parse request, err=\n\t%v\n", err),
  70. Uuid_: req.Uuid_,
  71. }
  72. resp.SelfCheck()
  73. ctx.Response().SetStatusCode(http.StatusBadRequest)
  74. sf.log.Err("postSub(): in body parser, status=%q", resp.Status_)
  75. return ctx.JSON(resp)
  76. }
  77. resp := sf.processSubscribe(req)
  78. resp.SelfCheck()
  79. return ctx.JSON(resp)
  80. }
  81. // Процесс подписки веб-хука
  82. func (sf *kBusHttp) processSubscribe(req *msg_sub.SubscribeReq) *msg_sub.SubscribeResp {
  83. req.SelfCheck()
  84. handler := mock_hand_sub_http.NewMockHandSubHttp(req.Topic_, req.WebHook_)
  85. resp := &msg_sub.SubscribeResp{
  86. Status_: "ok",
  87. Uuid_: req.Uuid_,
  88. Name_: handler.Name(),
  89. }
  90. res := sf.Subscribe(handler)
  91. if res.IsErr() {
  92. resp.Status_ = fmt.Sprintf("kernelBusHttp.processSubscribe(): err=\n\t%v", res.Err())
  93. return resp
  94. }
  95. return resp
  96. }
  97. // Входящая публикация
  98. func (sf *kBusHttp) postPublish(ctx *fiber.Ctx) error {
  99. sf.log.Debug("postPublish()")
  100. ctx.Set("Content-type", "text/html; charset=utf8")
  101. ctx.Set("Content-type", "text/json")
  102. ctx.Set("Cache-Control", "no-cache")
  103. req := &msg_pub.PublishReq{}
  104. err := ctx.BodyParser(req)
  105. if err != nil {
  106. resp := &msg_pub.PublishResp{
  107. Status_: fmt.Sprintf("kernelBusHttp.postPublish(): in parse request, err=\n\t%v\n", err),
  108. Uuid_: req.Uuid_,
  109. }
  110. resp.SelfCheck()
  111. ctx.Response().SetStatusCode(http.StatusBadRequest)
  112. sf.log.Err("postPublish(): in body parser, status=%v", resp.Status_)
  113. return ctx.JSON(resp)
  114. }
  115. resp := sf.processPublish(req)
  116. resp.SelfCheck()
  117. return ctx.JSON(resp)
  118. }
  119. // Выполняет процесс публикации
  120. func (sf *kBusHttp) processPublish(req *msg_pub.PublishReq) *msg_pub.PublishResp {
  121. req.SelfCheck()
  122. res := sf.Publish(req.Topic_, req.BinMsg_)
  123. resp := &msg_pub.PublishResp{
  124. Status_: "ok",
  125. Uuid_: req.Uuid_,
  126. }
  127. if res.IsErr() {
  128. resp.Status_ = fmt.Sprintf("kernelBusHttp.processPublish(): err=\n\t%v", res.Err())
  129. return resp
  130. }
  131. return resp
  132. }
  133. // Входящий запрос
  134. func (sf *kBusHttp) postSendRequest(ctx *fiber.Ctx) error {
  135. sf.log.Debug("postSendRequest()")
  136. ctx.Set("Content-type", "text/html; charset=utf8")
  137. ctx.Set("Content-type", "text/json")
  138. ctx.Set("Cache-Control", "no-cache")
  139. req := &msg_serve.ServeReq{}
  140. err := ctx.BodyParser(req)
  141. if err != nil {
  142. resp := &msg_serve.ServeResp{
  143. Status_: fmt.Sprintf("kernelBusHttp.postSendRequest(): err=\n\t%v", err),
  144. Uuid_: req.Uuid_,
  145. }
  146. resp.SelfCheck()
  147. ctx.Response().SetStatusCode(http.StatusBadRequest)
  148. sf.log.Err("postSendRequest(): in body parser, status=%v", resp.Status_)
  149. return ctx.JSON(resp)
  150. }
  151. resp := sf.processSendRequest(req)
  152. resp.SelfCheck()
  153. return ctx.JSON(resp)
  154. }
  155. // Обрабатывает входящий запрос
  156. func (sf *kBusHttp) processSendRequest(req *msg_serve.ServeReq) *msg_serve.ServeResp {
  157. req.SelfCheck()
  158. res := sf.SendRequest(req.Topic_, req.BinReq_)
  159. resp := &msg_serve.ServeResp{
  160. Status_: "ok",
  161. Uuid_: req.Uuid_,
  162. }
  163. if res.IsErr() {
  164. resp.Status_ = fmt.Sprintf("kernelBusHttp.processSendRequest(): err=\n\t%v", res.Err())
  165. return resp
  166. }
  167. resp.BinResp_ = res.Val()
  168. return resp
  169. }
  170. // Входящая отписка от топика по HTTP
  171. func (sf *kBusHttp) postUnsub(ctx *fiber.Ctx) error {
  172. sf.log.Debug("postUnsub()")
  173. ctx.Set("Content-type", "text/html; charset=utf8")
  174. ctx.Set("Content-type", "text/json")
  175. ctx.Set("Cache-Control", "no-cache")
  176. req := &msg_unsub.UnsubReq{}
  177. err := ctx.BodyParser(req)
  178. if err != nil {
  179. resp := &msg_serve.ServeResp{
  180. Status_: fmt.Sprintf("kernelBusHttp.postSendRequest(): err=\n\t%v", err),
  181. Uuid_: req.Uuid_,
  182. }
  183. resp.SelfCheck()
  184. ctx.Response().SetStatusCode(http.StatusBadRequest)
  185. sf.log.Err("postUnsub(): in body_parser, status=%q", resp.Status_)
  186. return ctx.JSON(resp)
  187. }
  188. resp := sf.processUnsubRequest(req)
  189. resp.SelfCheck()
  190. return ctx.JSON(resp)
  191. }
  192. // Процесс отписки от топика
  193. func (sf *kBusHttp) processUnsubRequest(req *msg_unsub.UnsubReq) *msg_unsub.UnsubResp {
  194. req.SelfCheck()
  195. _hand := sf.Ctx_.Get(string(req.Name_))
  196. resp := &msg_unsub.UnsubResp{
  197. Status_: "ok",
  198. Uuid_: req.Uuid_,
  199. }
  200. if _hand == nil {
  201. resp.Status_ = fmt.Sprintf("kernelBusHttp.processUnsubRequest(): handler name(%v) not exists", req.Name_)
  202. return resp
  203. }
  204. hand := _hand.Val().(IBusHandlerSubscribe)
  205. sf.Unsubscribe(hand)
  206. return resp
  207. }