kbus_http.go 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239
  1. // package kbus_http -- шина сообщений поверх HTTP
  2. package kbus_http
  3. import (
  4. "fmt"
  5. "net/http"
  6. "sync"
  7. "github.com/gofiber/fiber/v2"
  8. . "gitp78su.ipnodns.ru/svi/kern/v4/lev0/ktypes"
  9. . "gitp78su.ipnodns.ru/svi/kern/v4/lev1"
  10. "gitp78su.ipnodns.ru/svi/kern/v4/lev1/kbus_msg/msg_pub"
  11. "gitp78su.ipnodns.ru/svi/kern/v4/lev1/kbus_msg/msg_serve"
  12. "gitp78su.ipnodns.ru/svi/kern/v4/lev1/kbus_msg/msg_sub"
  13. "gitp78su.ipnodns.ru/svi/kern/v4/lev1/kbus_msg/msg_unsub"
  14. "gitp78su.ipnodns.ru/svi/kern/v4/lev2/kbus_base"
  15. "gitp78su.ipnodns.ru/svi/kern/v4/lev2/kctx"
  16. "gitp78su.ipnodns.ru/svi/kern/v4/lev2/kserv_http"
  17. "gitp78su.ipnodns.ru/svi/kern/v4/lev2/mock_hand_sub_http"
  18. )
  19. // kBusHttp -- шина данных поверх HTTP
  20. type kBusHttp struct {
  21. *kbus_base.KBusBase
  22. log ILogBuf
  23. }
  24. var (
  25. Bus_ *kBusHttp
  26. block sync.Mutex
  27. )
  28. // GetKernelBusHttp -- возвращает шину HTTP
  29. func GetKernelBusHttp() IResult[IKernelBus] {
  30. block.Lock()
  31. defer block.Unlock()
  32. if Bus_ != nil {
  33. return NewRes(IKernelBus(Bus_))
  34. }
  35. log := NewLogBuf(OptIsTerm(true), OptPrefix("kBusHttp"))
  36. log.Debug("GetKernelBusHttp(): new")
  37. resKernCtx := kctx.GetKernelCtx()
  38. if resKernCtx.IsErr() {
  39. err := fmt.Errorf("GetKernelBusHttp(): err=\n\t%v", resKernCtx.Err())
  40. return NewErr[IKernelBus](err)
  41. }
  42. kCtx := resKernCtx.Val()
  43. resBus := kbus_base.GetKernelBusBase()
  44. if resBus.IsErr() {
  45. err := fmt.Errorf("GetKernelBusHttp(): err=\n\t%v", resBus.Err())
  46. return NewErr[IKernelBus](err)
  47. }
  48. sf := &kBusHttp{
  49. KBusBase: resBus.Val(),
  50. log: log,
  51. }
  52. resServ := kserv_http.GetKernelServHttp()
  53. if resServ.IsErr() {
  54. err := fmt.Errorf("GetKernelBusHttp(): err=\n\t%v", resServ.Err())
  55. return NewErr[IKernelBus](err)
  56. }
  57. serv := resServ.Val()
  58. fibApp := serv.Fiber()
  59. fibApp.Post("/bus/sub", sf.postSub) // Топик подписки, IN
  60. fibApp.Post("/bus/unsub", sf.postUnsub) // Топик отписки, IN
  61. fibApp.Post("/bus/request", sf.postSendRequest) // Топик входящих запросов, IN
  62. fibApp.Post("/bus/pub", sf.postPublish) // Топик публикаций подписки, IN
  63. resSet := kCtx.Set("kernBus", sf, "GetKernelBusHttp(): http data bus")
  64. if resSet.IsErr() {
  65. sf.log.Err("GetKernelBusHttp(): err=\n\t%v", resSet.Err())
  66. return NewErr[IKernelBus](resSet.Err())
  67. }
  68. Bus_ = sf
  69. return NewRes(IKernelBus(Bus_))
  70. }
  71. // Входящий запрос HTTP на подписку
  72. func (sf *kBusHttp) postSub(ctx *fiber.Ctx) error {
  73. sf.log.Debug("postSub()")
  74. ctx.Set("Content-type", "text/html; charset=utf8")
  75. ctx.Set("Content-type", "text/json")
  76. ctx.Set("Cache-Control", "no-cache")
  77. sf.log.Debug("postSub()")
  78. req := &msg_sub.SubscribeReq{}
  79. err := ctx.BodyParser(req)
  80. if err != nil {
  81. resp := &msg_sub.SubscribeResp{
  82. Status_: fmt.Sprintf("kernelBusHttp.postSub(): in parse request, err=\n\t%v\n", err),
  83. Uuid_: req.Uuid_,
  84. }
  85. resp.SelfCheck()
  86. ctx.Response().SetStatusCode(http.StatusBadRequest)
  87. sf.log.Err("postSub(): in body parser, status=%q", resp.Status_)
  88. return ctx.JSON(resp)
  89. }
  90. resp := sf.processSubscribe(req)
  91. resp.SelfCheck()
  92. return ctx.JSON(resp)
  93. }
  94. // Процесс подписки веб-хука
  95. func (sf *kBusHttp) processSubscribe(req *msg_sub.SubscribeReq) *msg_sub.SubscribeResp {
  96. req.SelfCheck()
  97. handler := mock_hand_sub_http.NewMockHandSubHttp(req.Topic_, req.WebHook_)
  98. resp := &msg_sub.SubscribeResp{
  99. Status_: "ok",
  100. Uuid_: req.Uuid_,
  101. Name_: handler.Name(),
  102. }
  103. res := sf.Subscribe(handler)
  104. if res.IsErr() {
  105. resp.Status_ = fmt.Sprintf("kernelBusHttp.processSubscribe(): err=\n\t%v", res.Err())
  106. return resp
  107. }
  108. return resp
  109. }
  110. // Входящая публикация
  111. func (sf *kBusHttp) postPublish(ctx *fiber.Ctx) error {
  112. sf.log.Debug("postPublish()")
  113. ctx.Set("Content-type", "text/html; charset=utf8")
  114. ctx.Set("Content-type", "text/json")
  115. ctx.Set("Cache-Control", "no-cache")
  116. req := &msg_pub.PublishReq{}
  117. err := ctx.BodyParser(req)
  118. if err != nil {
  119. resp := &msg_pub.PublishResp{
  120. Status_: fmt.Sprintf("kernelBusHttp.postPublish(): in parse request, err=\n\t%v\n", err),
  121. Uuid_: req.Uuid_,
  122. }
  123. resp.SelfCheck()
  124. ctx.Response().SetStatusCode(http.StatusBadRequest)
  125. sf.log.Err("postPublish(): in body parser, status=%v", resp.Status_)
  126. return ctx.JSON(resp)
  127. }
  128. resp := sf.processPublish(req)
  129. resp.SelfCheck()
  130. return ctx.JSON(resp)
  131. }
  132. // Выполняет процесс публикации
  133. func (sf *kBusHttp) processPublish(req *msg_pub.PublishReq) *msg_pub.PublishResp {
  134. req.SelfCheck()
  135. res := sf.Publish(req.Topic_, req.BinMsg_)
  136. resp := &msg_pub.PublishResp{
  137. Status_: "ok",
  138. Uuid_: req.Uuid_,
  139. }
  140. if res.IsErr() {
  141. resp.Status_ = fmt.Sprintf("kernelBusHttp.processPublish(): err=\n\t%v", res.Err())
  142. return resp
  143. }
  144. return resp
  145. }
  146. // Входящий запрос
  147. func (sf *kBusHttp) postSendRequest(ctx *fiber.Ctx) error {
  148. sf.log.Debug("postSendRequest()")
  149. ctx.Set("Content-type", "text/html; charset=utf8")
  150. ctx.Set("Content-type", "text/json")
  151. ctx.Set("Cache-Control", "no-cache")
  152. req := &msg_serve.ServeReq{}
  153. err := ctx.BodyParser(req)
  154. if err != nil {
  155. resp := &msg_serve.ServeResp{
  156. Status_: fmt.Sprintf("kernelBusHttp.postSendRequest(): err=\n\t%v", err),
  157. Uuid_: req.Uuid_,
  158. }
  159. resp.SelfCheck()
  160. ctx.Response().SetStatusCode(http.StatusBadRequest)
  161. sf.log.Err("postSendRequest(): in body parser, status=%v", resp.Status_)
  162. return ctx.JSON(resp)
  163. }
  164. resp := sf.processSendRequest(req)
  165. resp.SelfCheck()
  166. return ctx.JSON(resp)
  167. }
  168. // Обрабатывает входящий запрос
  169. func (sf *kBusHttp) processSendRequest(req *msg_serve.ServeReq) *msg_serve.ServeResp {
  170. req.SelfCheck()
  171. res := sf.SendRequest(req.Topic_, req.BinReq_)
  172. resp := &msg_serve.ServeResp{
  173. Status_: "ok",
  174. Uuid_: req.Uuid_,
  175. }
  176. if res.IsErr() {
  177. resp.Status_ = fmt.Sprintf("kernelBusHttp.processSendRequest(): err=\n\t%v", res.Err())
  178. return resp
  179. }
  180. resp.BinResp_ = res.Val()
  181. return resp
  182. }
  183. // Входящая отписка от топика по HTTP
  184. func (sf *kBusHttp) postUnsub(ctx *fiber.Ctx) error {
  185. sf.log.Debug("postUnsub()")
  186. ctx.Set("Content-type", "text/html; charset=utf8")
  187. ctx.Set("Content-type", "text/json")
  188. ctx.Set("Cache-Control", "no-cache")
  189. req := &msg_unsub.UnsubReq{}
  190. err := ctx.BodyParser(req)
  191. if err != nil {
  192. resp := &msg_serve.ServeResp{
  193. Status_: fmt.Sprintf("kernelBusHttp.postSendRequest(): err=\n\t%v", err),
  194. Uuid_: req.Uuid_,
  195. }
  196. resp.SelfCheck()
  197. ctx.Response().SetStatusCode(http.StatusBadRequest)
  198. sf.log.Err("postUnsub(): in body_parser, status=%q", resp.Status_)
  199. return ctx.JSON(resp)
  200. }
  201. resp := sf.processUnsubRequest(req)
  202. resp.SelfCheck()
  203. return ctx.JSON(resp)
  204. }
  205. // Процесс отписки от топика
  206. func (sf *kBusHttp) processUnsubRequest(req *msg_unsub.UnsubReq) *msg_unsub.UnsubResp {
  207. req.SelfCheck()
  208. resp := &msg_unsub.UnsubResp{
  209. Status_: "ok",
  210. Uuid_: req.Uuid_,
  211. }
  212. optHandler := sf.Ctx_.Get(string(req.Name_))
  213. if optHandler.IsNone() {
  214. resp.Status_ = fmt.Sprintf("kernelBusHttp.processUnsubRequest(): not get handler(%v) from kernel ctx",
  215. req.Name_)
  216. return resp
  217. }
  218. if optHandler == nil {
  219. resp.Status_ = fmt.Sprintf("kernelBusHttp.processUnsubRequest(): handler(%v) not exists", req.Name_)
  220. return resp
  221. }
  222. hand := optHandler.Val().Val().(IBusHandlerSubscribe)
  223. sf.Unsubscribe(hand)
  224. return resp
  225. }