// package kbus_http -- шина сообщений поверх HTTP. package kbus_http import ( "fmt" "net/http" "sync" "github.com/gofiber/fiber/v3" mKs "gitp78su.ipnodns.ru/svi/kern/v4/lev0/kspec" mL1 "gitp78su.ipnodns.ru/svi/kern/v4/lev1" "gitp78su.ipnodns.ru/svi/kern/v4/lev1/kbus_msg/msg_pub" "gitp78su.ipnodns.ru/svi/kern/v4/lev1/kbus_msg/msg_serve" "gitp78su.ipnodns.ru/svi/kern/v4/lev1/kbus_msg/msg_sub" "gitp78su.ipnodns.ru/svi/kern/v4/lev1/kbus_msg/msg_unsub" "gitp78su.ipnodns.ru/svi/kern/v4/lev2/kbus_base" "gitp78su.ipnodns.ru/svi/kern/v4/lev2/kctx" "gitp78su.ipnodns.ru/svi/kern/v4/lev2/kserv_http" "gitp78su.ipnodns.ru/svi/kern/v4/lev2/mock_hand_sub_http" ) // kBusHttp -- шина данных поверх HTTP. type kBusHttp struct { *kbus_base.KBusBase log mKs.ILogBuf } var ( Bus_ *kBusHttp block sync.Mutex ) // GetKernelBusHttp -- возвращает шину HTTP. func GetKernelBusHttp() mKs.IKernelBus { block.Lock() defer block.Unlock() if Bus_ != nil { return Bus_ } log := mL1.NewLogBuf(mL1.OptIsTerm(true), mL1.OptPrefix("kBusHttp")) log.Debug("GetKernelBusHttp(): new") kCtx := kctx.GetKernelCtx() kBus := kbus_base.GetKernelBusBase() sf := &kBusHttp{ KBusBase: kBus, log: log, } kServHttp := kserv_http.GetKernelServHttp() fibApp := kServHttp.Fiber() fibApp.Post("/bus/sub", sf.postSub) // Топик подписки, IN fibApp.Post("/bus/unsub", sf.postUnsub) // Топик отписки, IN fibApp.Post("/bus/request", sf.postSendRequest) // Топик входящих запросов, IN fibApp.Post("/bus/pub", sf.postPublish) // Топик публикаций подписки, IN kCtx.Set("kernBus", sf, "GetKernelBusHttp(): http data bus") Bus_ = sf return Bus_ } // Входящий запрос HTTP на подписку. func (sf *kBusHttp) postSub(ctx fiber.Ctx) error { sf.log.Debug("postSub()") ctx.Set("Content-type", "text/html; charset=utf8") ctx.Set("Content-type", "text/json") ctx.Set("Cache-Control", "no-cache") sf.log.Debug("postSub()") req := &msg_sub.SubscribeReq{} err := ctx.Bind().Body(req) if err != nil { resp := &msg_sub.SubscribeResp{ Status_: fmt.Sprintf("kernelBusHttp.postSub(): in parse request, err=\n\t%v\n", err), Uuid_: req.Uuid_, } resp.SelfCheck() ctx.Response().SetStatusCode(http.StatusBadRequest) sf.log.Err("postSub(): in body parser, status=%q", resp.Status_) return ctx.JSON(resp) } resp := sf.processSubscribe(req) resp.SelfCheck() return ctx.JSON(resp) } // Процесс подписки веб-хука. func (sf *kBusHttp) processSubscribe(req *msg_sub.SubscribeReq) *msg_sub.SubscribeResp { req.SelfCheck() handler := mock_hand_sub_http.NewMockHandSubHttp(req.Topic_, req.WebHook_) resp := &msg_sub.SubscribeResp{ Status_: "ok", Uuid_: req.Uuid_, Name_: handler.Name(), } res := sf.Subscribe(handler) if res.IsErr() { resp.Status_ = fmt.Sprintf("kernelBusHttp.processSubscribe(): err=\n\t%v", res.Err()) return resp } return resp } // Входящая публикация. func (sf *kBusHttp) postPublish(ctx fiber.Ctx) error { sf.log.Debug("postPublish()") ctx.Set("Content-type", "text/html; charset=utf8") ctx.Set("Content-type", "text/json") ctx.Set("Cache-Control", "no-cache") req := &msg_pub.PublishReq{} err := ctx.Bind().Body(req) if err != nil { resp := &msg_pub.PublishResp{ Status_: fmt.Sprintf("kernelBusHttp.postPublish(): in parse request, err=\n\t%v\n", err), Uuid_: req.Uuid_, } resp.SelfCheck() ctx.Response().SetStatusCode(http.StatusBadRequest) sf.log.Err("postPublish(): in body parser, status=%v", resp.Status_) return ctx.JSON(resp) } resp := sf.processPublish(req) resp.SelfCheck() return ctx.JSON(resp) } // Выполняет процесс публикации. func (sf *kBusHttp) processPublish(req *msg_pub.PublishReq) *msg_pub.PublishResp { req.SelfCheck() res := sf.Publish(req.Topic_, req.BinMsg_) resp := &msg_pub.PublishResp{ Status_: "ok", Uuid_: req.Uuid_, } if res.IsErr() { resp.Status_ = fmt.Sprintf("kernelBusHttp.processPublish(): err=\n\t%v", res.Err()) return resp } return resp } // Входящий запрос. func (sf *kBusHttp) postSendRequest(ctx fiber.Ctx) error { sf.log.Debug("postSendRequest()") ctx.Set("Content-type", "text/html; charset=utf8") ctx.Set("Content-type", "text/json") ctx.Set("Cache-Control", "no-cache") req := &msg_serve.ServeReq{} err := ctx.Bind().Body(req) if err != nil { resp := &msg_serve.ServeResp{ Status_: fmt.Sprintf("kernelBusHttp.postSendRequest(): err=\n\t%v", err), Uuid_: req.Uuid_, } resp.SelfCheck() ctx.Response().SetStatusCode(http.StatusBadRequest) sf.log.Err("postSendRequest(): in body parser, status=%v", resp.Status_) return ctx.JSON(resp) } resp := sf.processSendRequest(req) resp.SelfCheck() return ctx.JSON(resp) } // Обрабатывает входящий запрос. func (sf *kBusHttp) processSendRequest(req *msg_serve.ServeReq) *msg_serve.ServeResp { req.SelfCheck() res := sf.SendRequest(req.Topic_, req.BinReq_) resp := &msg_serve.ServeResp{ Status_: "ok", Uuid_: req.Uuid_, } if res.IsErr() { resp.Status_ = fmt.Sprintf("kernelBusHttp.processSendRequest(): err=\n\t%v", res.Err()) return resp } resp.BinResp_ = res.Ok() return resp } // Входящая отписка от топика по HTTP. func (sf *kBusHttp) postUnsub(ctx fiber.Ctx) error { sf.log.Debug("postUnsub()") ctx.Set("Content-type", "text/html; charset=utf8") ctx.Set("Content-type", "text/json") ctx.Set("Cache-Control", "no-cache") req := &msg_unsub.UnsubReq{} err := ctx.Bind().Body(req) if err != nil { resp := &msg_serve.ServeResp{ Status_: fmt.Sprintf("kernelBusHttp.postSendRequest(): err=\n\t%v", err), Uuid_: req.Uuid_, } resp.SelfCheck() ctx.Response().SetStatusCode(http.StatusBadRequest) sf.log.Err("postUnsub(): in body_parser, status=%q", resp.Status_) return ctx.JSON(resp) } resp := sf.processUnsubRequest(req) resp.SelfCheck() return ctx.JSON(resp) } // Процесс отписки от топика. func (sf *kBusHttp) processUnsubRequest(req *msg_unsub.UnsubReq) *msg_unsub.UnsubResp { req.SelfCheck() resp := &msg_unsub.UnsubResp{ Status_: "ok", Uuid_: req.Uuid_, } optHandler := sf.KCtx_.Get(req.Name_.Get()) if optHandler.IsNone() { resp.Status_ = fmt.Sprintf("kernelBusHttp.processUnsubRequest(): not get handler(%v) from kernel ctx", req.Name_) return resp } if optHandler == nil { resp.Status_ = fmt.Sprintf("kernelBusHttp.processUnsubRequest(): handler(%v) not exists", req.Name_) return resp } hand := optHandler.Some().Val().(mKs.IBusHandlerSubscribe) sf.Unsubscribe(hand) return resp }