kbus_http.go 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. // package kbus_http -- шина сообщений поверх HTTP.
  2. package kbus_http
  3. import (
  4. "fmt"
  5. "net/http"
  6. "sync"
  7. "github.com/gofiber/fiber/v3"
  8. mL0 "gitp78su.ipnodns.ru/svi/kern/v4/lev0"
  9. mKt "gitp78su.ipnodns.ru/svi/kern/v4/lev0/ktypes"
  10. mL1 "gitp78su.ipnodns.ru/svi/kern/v4/lev1"
  11. "gitp78su.ipnodns.ru/svi/kern/v4/lev1/kbus_msg/msg_pub"
  12. "gitp78su.ipnodns.ru/svi/kern/v4/lev1/kbus_msg/msg_serve"
  13. "gitp78su.ipnodns.ru/svi/kern/v4/lev1/kbus_msg/msg_sub"
  14. "gitp78su.ipnodns.ru/svi/kern/v4/lev1/kbus_msg/msg_unsub"
  15. "gitp78su.ipnodns.ru/svi/kern/v4/lev2/kbus_base"
  16. "gitp78su.ipnodns.ru/svi/kern/v4/lev2/kctx"
  17. "gitp78su.ipnodns.ru/svi/kern/v4/lev2/kserv_http"
  18. "gitp78su.ipnodns.ru/svi/kern/v4/lev2/mock_hand_sub_http"
  19. )
  20. // kBusHttp -- шина данных поверх HTTP.
  21. type kBusHttp struct {
  22. *kbus_base.KBusBase
  23. log mKt.ILogBuf
  24. }
  25. var (
  26. Bus_ *kBusHttp
  27. block sync.Mutex
  28. )
  29. // GetKernelBusHttp -- возвращает шину HTTP.
  30. func GetKernelBusHttp() *mL0.Result[mKt.IKernelBus] {
  31. block.Lock()
  32. defer block.Unlock()
  33. if Bus_ != nil {
  34. return mL0.NewRes(mKt.IKernelBus(Bus_))
  35. }
  36. log := mL1.NewLogBuf(mL1.OptIsTerm(true), mL1.OptPrefix("kBusHttp"))
  37. log.Debug("GetKernelBusHttp(): new")
  38. resKernCtx := kctx.GetKernelCtx()
  39. if resKernCtx.IsErr() {
  40. err := fmt.Errorf("GetKernelBusHttp(): err=\n\t%v", resKernCtx.Err())
  41. return mL0.NewErr[mKt.IKernelBus](err)
  42. }
  43. kCtx := resKernCtx.Val()
  44. resBus := kbus_base.GetKernelBusBase()
  45. if resBus.IsErr() {
  46. err := fmt.Errorf("GetKernelBusHttp(): err=\n\t%v", resBus.Err())
  47. return mL0.NewErr[mKt.IKernelBus](err)
  48. }
  49. sf := &kBusHttp{
  50. KBusBase: resBus.Val(),
  51. log: log,
  52. }
  53. resServ := kserv_http.GetKernelServHttp()
  54. if resServ.IsErr() {
  55. err := fmt.Errorf("GetKernelBusHttp(): err=\n\t%v", resServ.Err())
  56. return mL0.NewErr[mKt.IKernelBus](err)
  57. }
  58. serv := resServ.Val()
  59. fibApp := serv.Fiber()
  60. fibApp.Post("/bus/sub", sf.postSub) // Топик подписки, IN
  61. fibApp.Post("/bus/unsub", sf.postUnsub) // Топик отписки, IN
  62. fibApp.Post("/bus/request", sf.postSendRequest) // Топик входящих запросов, IN
  63. fibApp.Post("/bus/pub", sf.postPublish) // Топик публикаций подписки, IN
  64. resSet := kCtx.Set("kernBus", sf, "GetKernelBusHttp(): http data bus")
  65. if resSet.IsErr() {
  66. sf.log.Err("GetKernelBusHttp(): err=\n\t%v", resSet.Err())
  67. return mL0.NewErr[mKt.IKernelBus](resSet.Err())
  68. }
  69. Bus_ = sf
  70. return mL0.NewRes(mKt.IKernelBus(Bus_))
  71. }
  72. // Входящий запрос HTTP на подписку.
  73. func (sf *kBusHttp) postSub(ctx fiber.Ctx) error {
  74. sf.log.Debug("postSub()")
  75. ctx.Set("Content-type", "text/html; charset=utf8")
  76. ctx.Set("Content-type", "text/json")
  77. ctx.Set("Cache-Control", "no-cache")
  78. sf.log.Debug("postSub()")
  79. req := &msg_sub.SubscribeReq{}
  80. err := ctx.Bind().Body(req)
  81. if err != nil {
  82. resp := &msg_sub.SubscribeResp{
  83. Status_: fmt.Sprintf("kernelBusHttp.postSub(): in parse request, err=\n\t%v\n", err),
  84. Uuid_: req.Uuid_,
  85. }
  86. resp.SelfCheck()
  87. ctx.Response().SetStatusCode(http.StatusBadRequest)
  88. sf.log.Err("postSub(): in body parser, status=%q", resp.Status_)
  89. return ctx.JSON(resp)
  90. }
  91. resp := sf.processSubscribe(req)
  92. resp.SelfCheck()
  93. return ctx.JSON(resp)
  94. }
  95. // Процесс подписки веб-хука.
  96. func (sf *kBusHttp) processSubscribe(req *msg_sub.SubscribeReq) *msg_sub.SubscribeResp {
  97. req.SelfCheck()
  98. handler := mock_hand_sub_http.NewMockHandSubHttp(req.Topic_, req.WebHook_)
  99. resp := &msg_sub.SubscribeResp{
  100. Status_: "ok",
  101. Uuid_: req.Uuid_,
  102. Name_: handler.Name(),
  103. }
  104. res := sf.Subscribe(handler)
  105. if res.IsErr() {
  106. resp.Status_ = fmt.Sprintf("kernelBusHttp.processSubscribe(): err=\n\t%v", res.Err())
  107. return resp
  108. }
  109. return resp
  110. }
  111. // Входящая публикация.
  112. func (sf *kBusHttp) postPublish(ctx fiber.Ctx) error {
  113. sf.log.Debug("postPublish()")
  114. ctx.Set("Content-type", "text/html; charset=utf8")
  115. ctx.Set("Content-type", "text/json")
  116. ctx.Set("Cache-Control", "no-cache")
  117. req := &msg_pub.PublishReq{}
  118. err := ctx.Bind().Body(req)
  119. if err != nil {
  120. resp := &msg_pub.PublishResp{
  121. Status_: fmt.Sprintf("kernelBusHttp.postPublish(): in parse request, err=\n\t%v\n", err),
  122. Uuid_: req.Uuid_,
  123. }
  124. resp.SelfCheck()
  125. ctx.Response().SetStatusCode(http.StatusBadRequest)
  126. sf.log.Err("postPublish(): in body parser, status=%v", resp.Status_)
  127. return ctx.JSON(resp)
  128. }
  129. resp := sf.processPublish(req)
  130. resp.SelfCheck()
  131. return ctx.JSON(resp)
  132. }
  133. // Выполняет процесс публикации.
  134. func (sf *kBusHttp) processPublish(req *msg_pub.PublishReq) *msg_pub.PublishResp {
  135. req.SelfCheck()
  136. res := sf.Publish(req.Topic_, req.BinMsg_)
  137. resp := &msg_pub.PublishResp{
  138. Status_: "ok",
  139. Uuid_: req.Uuid_,
  140. }
  141. if res.IsErr() {
  142. resp.Status_ = fmt.Sprintf("kernelBusHttp.processPublish(): err=\n\t%v", res.Err())
  143. return resp
  144. }
  145. return resp
  146. }
  147. // Входящий запрос.
  148. func (sf *kBusHttp) postSendRequest(ctx fiber.Ctx) error {
  149. sf.log.Debug("postSendRequest()")
  150. ctx.Set("Content-type", "text/html; charset=utf8")
  151. ctx.Set("Content-type", "text/json")
  152. ctx.Set("Cache-Control", "no-cache")
  153. req := &msg_serve.ServeReq{}
  154. err := ctx.Bind().Body(req)
  155. if err != nil {
  156. resp := &msg_serve.ServeResp{
  157. Status_: fmt.Sprintf("kernelBusHttp.postSendRequest(): err=\n\t%v", err),
  158. Uuid_: req.Uuid_,
  159. }
  160. resp.SelfCheck()
  161. ctx.Response().SetStatusCode(http.StatusBadRequest)
  162. sf.log.Err("postSendRequest(): in body parser, status=%v", resp.Status_)
  163. return ctx.JSON(resp)
  164. }
  165. resp := sf.processSendRequest(req)
  166. resp.SelfCheck()
  167. return ctx.JSON(resp)
  168. }
  169. // Обрабатывает входящий запрос.
  170. func (sf *kBusHttp) processSendRequest(req *msg_serve.ServeReq) *msg_serve.ServeResp {
  171. req.SelfCheck()
  172. res := sf.SendRequest(req.Topic_, req.BinReq_)
  173. resp := &msg_serve.ServeResp{
  174. Status_: "ok",
  175. Uuid_: req.Uuid_,
  176. }
  177. if res.IsErr() {
  178. resp.Status_ = fmt.Sprintf("kernelBusHttp.processSendRequest(): err=\n\t%v", res.Err())
  179. return resp
  180. }
  181. resp.BinResp_ = res.Val()
  182. return resp
  183. }
  184. // Входящая отписка от топика по HTTP.
  185. func (sf *kBusHttp) postUnsub(ctx fiber.Ctx) error {
  186. sf.log.Debug("postUnsub()")
  187. ctx.Set("Content-type", "text/html; charset=utf8")
  188. ctx.Set("Content-type", "text/json")
  189. ctx.Set("Cache-Control", "no-cache")
  190. req := &msg_unsub.UnsubReq{}
  191. err := ctx.Bind().Body(req)
  192. if err != nil {
  193. resp := &msg_serve.ServeResp{
  194. Status_: fmt.Sprintf("kernelBusHttp.postSendRequest(): err=\n\t%v", err),
  195. Uuid_: req.Uuid_,
  196. }
  197. resp.SelfCheck()
  198. ctx.Response().SetStatusCode(http.StatusBadRequest)
  199. sf.log.Err("postUnsub(): in body_parser, status=%q", resp.Status_)
  200. return ctx.JSON(resp)
  201. }
  202. resp := sf.processUnsubRequest(req)
  203. resp.SelfCheck()
  204. return ctx.JSON(resp)
  205. }
  206. // Процесс отписки от топика.
  207. func (sf *kBusHttp) processUnsubRequest(req *msg_unsub.UnsubReq) *msg_unsub.UnsubResp {
  208. req.SelfCheck()
  209. resp := &msg_unsub.UnsubResp{
  210. Status_: "ok",
  211. Uuid_: req.Uuid_,
  212. }
  213. optHandler := sf.Ctx_.Get(req.Name_.Get())
  214. if optHandler.IsNone() {
  215. resp.Status_ = fmt.Sprintf("kernelBusHttp.processUnsubRequest(): not get handler(%v) from kernel ctx",
  216. req.Name_)
  217. return resp
  218. }
  219. if optHandler == nil {
  220. resp.Status_ = fmt.Sprintf("kernelBusHttp.processUnsubRequest(): handler(%v) not exists", req.Name_)
  221. return resp
  222. }
  223. hand := optHandler.Val().Val().(mKt.IBusHandlerSubscribe)
  224. sf.Unsubscribe(hand)
  225. return resp
  226. }