| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215 |
- // package kbus_http -- шина сообщений поверх HTTP
- package kbus_http
- import (
- "fmt"
- "net/http"
- "sync"
- "github.com/gofiber/fiber/v2"
- "gitp78su.ipnodns.ru/svi/kern/v3/kc/log_buf"
- "gitp78su.ipnodns.ru/svi/kern/v3/krn/kbus/kbus_base"
- "gitp78su.ipnodns.ru/svi/kern/v3/krn/kbus/kbus_msg/msg_pub"
- "gitp78su.ipnodns.ru/svi/kern/v3/krn/kbus/kbus_msg/msg_serve"
- "gitp78su.ipnodns.ru/svi/kern/v3/krn/kbus/kbus_msg/msg_sub"
- "gitp78su.ipnodns.ru/svi/kern/v3/krn/kbus/kbus_msg/msg_unsub"
- "gitp78su.ipnodns.ru/svi/kern/v3/krn/kctx"
- "gitp78su.ipnodns.ru/svi/kern/v3/krn/kserv_http"
- . "gitp78su.ipnodns.ru/svi/kern/v3/krn/ktypes"
- "gitp78su.ipnodns.ru/svi/kern/v3/mock/mock_hand_sub_http"
- )
- // kBusHttp -- шина данных поверх HTTP
- type kBusHttp struct {
- *kbus_base.KBusBase
- log ILogBuf
- }
- var (
- Bus_ *kBusHttp
- block sync.Mutex
- )
- // GetKernelBusHttp -- возвращает шину HTTP
- func GetKernelBusHttp() IKernelBus {
- block.Lock()
- defer block.Unlock()
- if Bus_ != nil {
- return Bus_
- }
- log := log_buf.NewLogBuf(log_buf.OptIsTerm(true), log_buf.OptPrefix("kBusHttp"))
- log.Debug("GetKernelBusHttp(): new")
- ctx := kctx.GetKernelCtx()
- sf := &kBusHttp{
- KBusBase: kbus_base.GetKernelBusBase(),
- log: log,
- }
- servHttp := kserv_http.GetKernelServHttp()
- fibApp := servHttp.Fiber()
- fibApp.Post("/bus/sub", sf.postSub) // Топик подписки, IN
- fibApp.Post("/bus/unsub", sf.postUnsub) // Топик отписки, IN
- fibApp.Post("/bus/request", sf.postSendRequest) // Топик входящих запросов, IN
- fibApp.Post("/bus/pub", sf.postPublish) // Топик публикаций подписки, IN
- ctx.Set("kernBus", sf, "GetKernelBusHttp(): http data bus")
- Bus_ = sf
- return Bus_
- }
- // Входящий запрос HTTP на подписку
- func (sf *kBusHttp) postSub(ctx *fiber.Ctx) error {
- sf.log.Debug("postSub()")
- ctx.Set("Content-type", "text/html; charset=utf8")
- ctx.Set("Content-type", "text/json")
- ctx.Set("Cache-Control", "no-cache")
- sf.log.Debug("postSub()")
- req := &msg_sub.SubscribeReq{}
- err := ctx.BodyParser(req)
- if err != nil {
- resp := &msg_sub.SubscribeResp{
- Status_: fmt.Sprintf("kernelBusHttp.postSub(): in parse request, err=\n\t%v\n", err),
- Uuid_: req.Uuid_,
- }
- resp.SelfCheck()
- ctx.Response().SetStatusCode(http.StatusBadRequest)
- sf.log.Err("postSub(): in body parser, status=%q", resp.Status_)
- return ctx.JSON(resp)
- }
- resp := sf.processSubscribe(req)
- resp.SelfCheck()
- return ctx.JSON(resp)
- }
- // Процесс подписки веб-хука
- func (sf *kBusHttp) processSubscribe(req *msg_sub.SubscribeReq) *msg_sub.SubscribeResp {
- req.SelfCheck()
- handler := mock_hand_sub_http.NewMockHandSubHttp(req.Topic_, req.WebHook_)
- resp := &msg_sub.SubscribeResp{
- Status_: "ok",
- Uuid_: req.Uuid_,
- Name_: handler.Name(),
- }
- res := sf.Subscribe(handler)
- if res.IsErr() {
- resp.Status_ = fmt.Sprintf("kernelBusHttp.processSubscribe(): err=\n\t%v", res.Err())
- return resp
- }
- return resp
- }
- // Входящая публикация
- func (sf *kBusHttp) postPublish(ctx *fiber.Ctx) error {
- sf.log.Debug("postPublish()")
- ctx.Set("Content-type", "text/html; charset=utf8")
- ctx.Set("Content-type", "text/json")
- ctx.Set("Cache-Control", "no-cache")
- req := &msg_pub.PublishReq{}
- err := ctx.BodyParser(req)
- if err != nil {
- resp := &msg_pub.PublishResp{
- Status_: fmt.Sprintf("kernelBusHttp.postPublish(): in parse request, err=\n\t%v\n", err),
- Uuid_: req.Uuid_,
- }
- resp.SelfCheck()
- ctx.Response().SetStatusCode(http.StatusBadRequest)
- sf.log.Err("postPublish(): in body parser, status=%v", resp.Status_)
- return ctx.JSON(resp)
- }
- resp := sf.processPublish(req)
- resp.SelfCheck()
- return ctx.JSON(resp)
- }
- // Выполняет процесс публикации
- func (sf *kBusHttp) processPublish(req *msg_pub.PublishReq) *msg_pub.PublishResp {
- req.SelfCheck()
- res := sf.Publish(req.Topic_, req.BinMsg_)
- resp := &msg_pub.PublishResp{
- Status_: "ok",
- Uuid_: req.Uuid_,
- }
- if res.IsErr() {
- resp.Status_ = fmt.Sprintf("kernelBusHttp.processPublish(): err=\n\t%v", res.Err())
- return resp
- }
- return resp
- }
- // Входящий запрос
- func (sf *kBusHttp) postSendRequest(ctx *fiber.Ctx) error {
- sf.log.Debug("postSendRequest()")
- ctx.Set("Content-type", "text/html; charset=utf8")
- ctx.Set("Content-type", "text/json")
- ctx.Set("Cache-Control", "no-cache")
- req := &msg_serve.ServeReq{}
- err := ctx.BodyParser(req)
- if err != nil {
- resp := &msg_serve.ServeResp{
- Status_: fmt.Sprintf("kernelBusHttp.postSendRequest(): err=\n\t%v", err),
- Uuid_: req.Uuid_,
- }
- resp.SelfCheck()
- ctx.Response().SetStatusCode(http.StatusBadRequest)
- sf.log.Err("postSendRequest(): in body parser, status=%v", resp.Status_)
- return ctx.JSON(resp)
- }
- resp := sf.processSendRequest(req)
- resp.SelfCheck()
- return ctx.JSON(resp)
- }
- // Обрабатывает входящий запрос
- func (sf *kBusHttp) processSendRequest(req *msg_serve.ServeReq) *msg_serve.ServeResp {
- req.SelfCheck()
- res := sf.SendRequest(req.Topic_, req.BinReq_)
- resp := &msg_serve.ServeResp{
- Status_: "ok",
- Uuid_: req.Uuid_,
- }
- if res.IsErr() {
- resp.Status_ = fmt.Sprintf("kernelBusHttp.processSendRequest(): err=\n\t%v", res.Err())
- return resp
- }
- resp.BinResp_ = res.Val()
- return resp
- }
- // Входящая отписка от топика по HTTP
- func (sf *kBusHttp) postUnsub(ctx *fiber.Ctx) error {
- sf.log.Debug("postUnsub()")
- ctx.Set("Content-type", "text/html; charset=utf8")
- ctx.Set("Content-type", "text/json")
- ctx.Set("Cache-Control", "no-cache")
- req := &msg_unsub.UnsubReq{}
- err := ctx.BodyParser(req)
- if err != nil {
- resp := &msg_serve.ServeResp{
- Status_: fmt.Sprintf("kernelBusHttp.postSendRequest(): err=\n\t%v", err),
- Uuid_: req.Uuid_,
- }
- resp.SelfCheck()
- ctx.Response().SetStatusCode(http.StatusBadRequest)
- sf.log.Err("postUnsub(): in body_parser, status=%q", resp.Status_)
- return ctx.JSON(resp)
- }
- resp := sf.processUnsubRequest(req)
- resp.SelfCheck()
- return ctx.JSON(resp)
- }
- // Процесс отписки от топика
- func (sf *kBusHttp) processUnsubRequest(req *msg_unsub.UnsubReq) *msg_unsub.UnsubResp {
- req.SelfCheck()
- _hand := sf.Ctx_.Get(string(req.Name_))
- resp := &msg_unsub.UnsubResp{
- Status_: "ok",
- Uuid_: req.Uuid_,
- }
- if _hand == nil {
- resp.Status_ = fmt.Sprintf("kernelBusHttp.processUnsubRequest(): handler name(%v) not exists", req.Name_)
- return resp
- }
- hand := _hand.Val().(IBusHandlerSubscribe)
- sf.Unsubscribe(hand)
- return resp
- }
|