kbus_http.go 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221
  1. // package kbus_http -- шина сообщений поверх HTTP.
  2. package kbus_http
  3. import (
  4. "fmt"
  5. "net/http"
  6. "sync"
  7. "github.com/gofiber/fiber/v3"
  8. mKs "gitp78su.ipnodns.ru/svi/kern/v4/lev0/kspec"
  9. mL1 "gitp78su.ipnodns.ru/svi/kern/v4/lev1"
  10. "gitp78su.ipnodns.ru/svi/kern/v4/lev1/kbus_msg/msg_pub"
  11. "gitp78su.ipnodns.ru/svi/kern/v4/lev1/kbus_msg/msg_serve"
  12. "gitp78su.ipnodns.ru/svi/kern/v4/lev1/kbus_msg/msg_sub"
  13. "gitp78su.ipnodns.ru/svi/kern/v4/lev1/kbus_msg/msg_unsub"
  14. "gitp78su.ipnodns.ru/svi/kern/v4/lev2/kbus_base"
  15. "gitp78su.ipnodns.ru/svi/kern/v4/lev2/kctx"
  16. "gitp78su.ipnodns.ru/svi/kern/v4/lev2/kserv_http"
  17. "gitp78su.ipnodns.ru/svi/kern/v4/lev2/mock_hand_sub_http"
  18. )
  19. // kBusHttp -- шина данных поверх HTTP.
  20. type kBusHttp struct {
  21. *kbus_base.KBusBase
  22. log mKs.ILogBuf
  23. }
  24. var (
  25. Bus_ *kBusHttp
  26. block sync.Mutex
  27. )
  28. // GetKernelBusHttp -- возвращает шину HTTP.
  29. func GetKernelBusHttp() mKs.IKernelBus {
  30. block.Lock()
  31. defer block.Unlock()
  32. if Bus_ != nil {
  33. return Bus_
  34. }
  35. log := mL1.NewLogBuf(mL1.OptIsTerm(true), mL1.OptPrefix("kBusHttp"))
  36. log.Debug("GetKernelBusHttp(): new")
  37. kCtx := kctx.GetKernelCtx()
  38. kBus := kbus_base.GetKernelBusBase()
  39. sf := &kBusHttp{
  40. KBusBase: kBus,
  41. log: log,
  42. }
  43. kServHttp := kserv_http.GetKernelServHttp()
  44. fibApp := kServHttp.Fiber()
  45. fibApp.Post("/bus/sub", sf.postSub) // Топик подписки, IN
  46. fibApp.Post("/bus/unsub", sf.postUnsub) // Топик отписки, IN
  47. fibApp.Post("/bus/request", sf.postSendRequest) // Топик входящих запросов, IN
  48. fibApp.Post("/bus/pub", sf.postPublish) // Топик публикаций подписки, IN
  49. kCtx.Set("kernBus", sf, "GetKernelBusHttp(): http data bus")
  50. Bus_ = sf
  51. return Bus_
  52. }
  53. // Входящий запрос HTTP на подписку.
  54. func (sf *kBusHttp) postSub(ctx fiber.Ctx) error {
  55. sf.log.Debug("postSub()")
  56. ctx.Set("Content-type", "text/html; charset=utf8")
  57. ctx.Set("Content-type", "text/json")
  58. ctx.Set("Cache-Control", "no-cache")
  59. sf.log.Debug("postSub()")
  60. req := &msg_sub.SubscribeReq{}
  61. err := ctx.Bind().Body(req)
  62. if err != nil {
  63. resp := &msg_sub.SubscribeResp{
  64. Status_: fmt.Sprintf("kernelBusHttp.postSub(): in parse request, err=\n\t%v\n", err),
  65. Uuid_: req.Uuid_,
  66. }
  67. resp.SelfCheck()
  68. ctx.Response().SetStatusCode(http.StatusBadRequest)
  69. sf.log.Err("postSub(): in body parser, status=%q", resp.Status_)
  70. return ctx.JSON(resp)
  71. }
  72. resp := sf.processSubscribe(req)
  73. resp.SelfCheck()
  74. return ctx.JSON(resp)
  75. }
  76. // Процесс подписки веб-хука.
  77. func (sf *kBusHttp) processSubscribe(req *msg_sub.SubscribeReq) *msg_sub.SubscribeResp {
  78. req.SelfCheck()
  79. handler := mock_hand_sub_http.NewMockHandSubHttp(req.Topic_, req.WebHook_)
  80. resp := &msg_sub.SubscribeResp{
  81. Status_: "ok",
  82. Uuid_: req.Uuid_,
  83. Name_: handler.Name(),
  84. }
  85. res := sf.Subscribe(handler)
  86. if res.IsErr() {
  87. resp.Status_ = fmt.Sprintf("kernelBusHttp.processSubscribe(): err=\n\t%v", res.Err())
  88. return resp
  89. }
  90. return resp
  91. }
  92. // Входящая публикация.
  93. func (sf *kBusHttp) postPublish(ctx fiber.Ctx) error {
  94. sf.log.Debug("postPublish()")
  95. ctx.Set("Content-type", "text/html; charset=utf8")
  96. ctx.Set("Content-type", "text/json")
  97. ctx.Set("Cache-Control", "no-cache")
  98. req := &msg_pub.PublishReq{}
  99. err := ctx.Bind().Body(req)
  100. if err != nil {
  101. resp := &msg_pub.PublishResp{
  102. Status_: fmt.Sprintf("kernelBusHttp.postPublish(): in parse request, err=\n\t%v\n", err),
  103. Uuid_: req.Uuid_,
  104. }
  105. resp.SelfCheck()
  106. ctx.Response().SetStatusCode(http.StatusBadRequest)
  107. sf.log.Err("postPublish(): in body parser, status=%v", resp.Status_)
  108. return ctx.JSON(resp)
  109. }
  110. resp := sf.processPublish(req)
  111. resp.SelfCheck()
  112. return ctx.JSON(resp)
  113. }
  114. // Выполняет процесс публикации.
  115. func (sf *kBusHttp) processPublish(req *msg_pub.PublishReq) *msg_pub.PublishResp {
  116. req.SelfCheck()
  117. res := sf.Publish(req.Topic_, req.BinMsg_)
  118. resp := &msg_pub.PublishResp{
  119. Status_: "ok",
  120. Uuid_: req.Uuid_,
  121. }
  122. if res.IsErr() {
  123. resp.Status_ = fmt.Sprintf("kernelBusHttp.processPublish(): err=\n\t%v", res.Err())
  124. return resp
  125. }
  126. return resp
  127. }
  128. // Входящий запрос.
  129. func (sf *kBusHttp) postSendRequest(ctx fiber.Ctx) error {
  130. sf.log.Debug("postSendRequest()")
  131. ctx.Set("Content-type", "text/html; charset=utf8")
  132. ctx.Set("Content-type", "text/json")
  133. ctx.Set("Cache-Control", "no-cache")
  134. req := &msg_serve.ServeReq{}
  135. err := ctx.Bind().Body(req)
  136. if err != nil {
  137. resp := &msg_serve.ServeResp{
  138. Status_: fmt.Sprintf("kernelBusHttp.postSendRequest(): err=\n\t%v", err),
  139. Uuid_: req.Uuid_,
  140. }
  141. resp.SelfCheck()
  142. ctx.Response().SetStatusCode(http.StatusBadRequest)
  143. sf.log.Err("postSendRequest(): in body parser, status=%v", resp.Status_)
  144. return ctx.JSON(resp)
  145. }
  146. resp := sf.processSendRequest(req)
  147. resp.SelfCheck()
  148. return ctx.JSON(resp)
  149. }
  150. // Обрабатывает входящий запрос.
  151. func (sf *kBusHttp) processSendRequest(req *msg_serve.ServeReq) *msg_serve.ServeResp {
  152. req.SelfCheck()
  153. res := sf.SendRequest(req.Topic_, req.BinReq_)
  154. resp := &msg_serve.ServeResp{
  155. Status_: "ok",
  156. Uuid_: req.Uuid_,
  157. }
  158. if res.IsErr() {
  159. resp.Status_ = fmt.Sprintf("kernelBusHttp.processSendRequest(): err=\n\t%v", res.Err())
  160. return resp
  161. }
  162. resp.BinResp_ = res.Ok()
  163. return resp
  164. }
  165. // Входящая отписка от топика по HTTP.
  166. func (sf *kBusHttp) postUnsub(ctx fiber.Ctx) error {
  167. sf.log.Debug("postUnsub()")
  168. ctx.Set("Content-type", "text/html; charset=utf8")
  169. ctx.Set("Content-type", "text/json")
  170. ctx.Set("Cache-Control", "no-cache")
  171. req := &msg_unsub.UnsubReq{}
  172. err := ctx.Bind().Body(req)
  173. if err != nil {
  174. resp := &msg_serve.ServeResp{
  175. Status_: fmt.Sprintf("kernelBusHttp.postSendRequest(): err=\n\t%v", err),
  176. Uuid_: req.Uuid_,
  177. }
  178. resp.SelfCheck()
  179. ctx.Response().SetStatusCode(http.StatusBadRequest)
  180. sf.log.Err("postUnsub(): in body_parser, status=%q", resp.Status_)
  181. return ctx.JSON(resp)
  182. }
  183. resp := sf.processUnsubRequest(req)
  184. resp.SelfCheck()
  185. return ctx.JSON(resp)
  186. }
  187. // Процесс отписки от топика.
  188. func (sf *kBusHttp) processUnsubRequest(req *msg_unsub.UnsubReq) *msg_unsub.UnsubResp {
  189. req.SelfCheck()
  190. resp := &msg_unsub.UnsubResp{
  191. Status_: "ok",
  192. Uuid_: req.Uuid_,
  193. }
  194. optHandler := sf.KCtx_.Get(req.Name_.Get())
  195. if optHandler.IsNone() {
  196. resp.Status_ = fmt.Sprintf("kernelBusHttp.processUnsubRequest(): not get handler(%v) from kernel ctx",
  197. req.Name_)
  198. return resp
  199. }
  200. if optHandler == nil {
  201. resp.Status_ = fmt.Sprintf("kernelBusHttp.processUnsubRequest(): handler(%v) not exists", req.Name_)
  202. return resp
  203. }
  204. hand := optHandler.Some().Val().(mKs.IBusHandlerSubscribe)
  205. sf.Unsubscribe(hand)
  206. return resp
  207. }