// package kbus_http -- шина сообщений поверх HTTP package kbus_http import ( "fmt" "net/http" "sync" "github.com/gofiber/fiber/v2" . "gitp78su.ipnodns.ru/svi/kern/v4/lev0/ktypes" . "gitp78su.ipnodns.ru/svi/kern/v4/lev1" "gitp78su.ipnodns.ru/svi/kern/v4/lev1/kbus_msg/msg_pub" "gitp78su.ipnodns.ru/svi/kern/v4/lev1/kbus_msg/msg_serve" "gitp78su.ipnodns.ru/svi/kern/v4/lev1/kbus_msg/msg_sub" "gitp78su.ipnodns.ru/svi/kern/v4/lev1/kbus_msg/msg_unsub" "gitp78su.ipnodns.ru/svi/kern/v4/lev2/kbus_base" "gitp78su.ipnodns.ru/svi/kern/v4/lev2/kctx" "gitp78su.ipnodns.ru/svi/kern/v4/lev2/kserv_http" "gitp78su.ipnodns.ru/svi/kern/v4/lev2/mock_hand_sub_http" ) // kBusHttp -- шина данных поверх HTTP type kBusHttp struct { *kbus_base.KBusBase log ILogBuf } var ( Bus_ *kBusHttp block sync.Mutex ) // GetKernelBusHttp -- возвращает шину HTTP func GetKernelBusHttp() IResult[IKernelBus] { block.Lock() defer block.Unlock() if Bus_ != nil { return NewRes(IKernelBus(Bus_)) } log := NewLogBuf(OptIsTerm(true), OptPrefix("kBusHttp")) log.Debug("GetKernelBusHttp(): new") resKernCtx := kctx.GetKernelCtx() if resKernCtx.IsErr() { err := fmt.Errorf("GetKernelBusHttp(): err=\n\t%v", resKernCtx.Err()) return NewErr[IKernelBus](err) } kCtx := resKernCtx.Val() resBus := kbus_base.GetKernelBusBase() if resBus.IsErr() { err := fmt.Errorf("GetKernelBusHttp(): err=\n\t%v", resBus.Err()) return NewErr[IKernelBus](err) } sf := &kBusHttp{ KBusBase: resBus.Val(), log: log, } resServ := kserv_http.GetKernelServHttp() if resServ.IsErr() { err := fmt.Errorf("GetKernelBusHttp(): err=\n\t%v", resServ.Err()) return NewErr[IKernelBus](err) } serv := resServ.Val() fibApp := serv.Fiber() fibApp.Post("/bus/sub", sf.postSub) // Топик подписки, IN fibApp.Post("/bus/unsub", sf.postUnsub) // Топик отписки, IN fibApp.Post("/bus/request", sf.postSendRequest) // Топик входящих запросов, IN fibApp.Post("/bus/pub", sf.postPublish) // Топик публикаций подписки, IN resSet := kCtx.Set("kernBus", sf, "GetKernelBusHttp(): http data bus") if resSet.IsErr() { sf.log.Err("GetKernelBusHttp(): err=\n\t%v", resSet.Err()) return NewErr[IKernelBus](resSet.Err()) } Bus_ = sf return NewRes(IKernelBus(Bus_)) } // Входящий запрос HTTP на подписку func (sf *kBusHttp) postSub(ctx *fiber.Ctx) error { sf.log.Debug("postSub()") ctx.Set("Content-type", "text/html; charset=utf8") ctx.Set("Content-type", "text/json") ctx.Set("Cache-Control", "no-cache") sf.log.Debug("postSub()") req := &msg_sub.SubscribeReq{} err := ctx.BodyParser(req) if err != nil { resp := &msg_sub.SubscribeResp{ Status_: fmt.Sprintf("kernelBusHttp.postSub(): in parse request, err=\n\t%v\n", err), Uuid_: req.Uuid_, } resp.SelfCheck() ctx.Response().SetStatusCode(http.StatusBadRequest) sf.log.Err("postSub(): in body parser, status=%q", resp.Status_) return ctx.JSON(resp) } resp := sf.processSubscribe(req) resp.SelfCheck() return ctx.JSON(resp) } // Процесс подписки веб-хука func (sf *kBusHttp) processSubscribe(req *msg_sub.SubscribeReq) *msg_sub.SubscribeResp { req.SelfCheck() handler := mock_hand_sub_http.NewMockHandSubHttp(req.Topic_, req.WebHook_) resp := &msg_sub.SubscribeResp{ Status_: "ok", Uuid_: req.Uuid_, Name_: handler.Name(), } res := sf.Subscribe(handler) if res.IsErr() { resp.Status_ = fmt.Sprintf("kernelBusHttp.processSubscribe(): err=\n\t%v", res.Err()) return resp } return resp } // Входящая публикация func (sf *kBusHttp) postPublish(ctx *fiber.Ctx) error { sf.log.Debug("postPublish()") ctx.Set("Content-type", "text/html; charset=utf8") ctx.Set("Content-type", "text/json") ctx.Set("Cache-Control", "no-cache") req := &msg_pub.PublishReq{} err := ctx.BodyParser(req) if err != nil { resp := &msg_pub.PublishResp{ Status_: fmt.Sprintf("kernelBusHttp.postPublish(): in parse request, err=\n\t%v\n", err), Uuid_: req.Uuid_, } resp.SelfCheck() ctx.Response().SetStatusCode(http.StatusBadRequest) sf.log.Err("postPublish(): in body parser, status=%v", resp.Status_) return ctx.JSON(resp) } resp := sf.processPublish(req) resp.SelfCheck() return ctx.JSON(resp) } // Выполняет процесс публикации func (sf *kBusHttp) processPublish(req *msg_pub.PublishReq) *msg_pub.PublishResp { req.SelfCheck() res := sf.Publish(req.Topic_, req.BinMsg_) resp := &msg_pub.PublishResp{ Status_: "ok", Uuid_: req.Uuid_, } if res.IsErr() { resp.Status_ = fmt.Sprintf("kernelBusHttp.processPublish(): err=\n\t%v", res.Err()) return resp } return resp } // Входящий запрос func (sf *kBusHttp) postSendRequest(ctx *fiber.Ctx) error { sf.log.Debug("postSendRequest()") ctx.Set("Content-type", "text/html; charset=utf8") ctx.Set("Content-type", "text/json") ctx.Set("Cache-Control", "no-cache") req := &msg_serve.ServeReq{} err := ctx.BodyParser(req) if err != nil { resp := &msg_serve.ServeResp{ Status_: fmt.Sprintf("kernelBusHttp.postSendRequest(): err=\n\t%v", err), Uuid_: req.Uuid_, } resp.SelfCheck() ctx.Response().SetStatusCode(http.StatusBadRequest) sf.log.Err("postSendRequest(): in body parser, status=%v", resp.Status_) return ctx.JSON(resp) } resp := sf.processSendRequest(req) resp.SelfCheck() return ctx.JSON(resp) } // Обрабатывает входящий запрос func (sf *kBusHttp) processSendRequest(req *msg_serve.ServeReq) *msg_serve.ServeResp { req.SelfCheck() res := sf.SendRequest(req.Topic_, req.BinReq_) resp := &msg_serve.ServeResp{ Status_: "ok", Uuid_: req.Uuid_, } if res.IsErr() { resp.Status_ = fmt.Sprintf("kernelBusHttp.processSendRequest(): err=\n\t%v", res.Err()) return resp } resp.BinResp_ = res.Val() return resp } // Входящая отписка от топика по HTTP func (sf *kBusHttp) postUnsub(ctx *fiber.Ctx) error { sf.log.Debug("postUnsub()") ctx.Set("Content-type", "text/html; charset=utf8") ctx.Set("Content-type", "text/json") ctx.Set("Cache-Control", "no-cache") req := &msg_unsub.UnsubReq{} err := ctx.BodyParser(req) if err != nil { resp := &msg_serve.ServeResp{ Status_: fmt.Sprintf("kernelBusHttp.postSendRequest(): err=\n\t%v", err), Uuid_: req.Uuid_, } resp.SelfCheck() ctx.Response().SetStatusCode(http.StatusBadRequest) sf.log.Err("postUnsub(): in body_parser, status=%q", resp.Status_) return ctx.JSON(resp) } resp := sf.processUnsubRequest(req) resp.SelfCheck() return ctx.JSON(resp) } // Процесс отписки от топика func (sf *kBusHttp) processUnsubRequest(req *msg_unsub.UnsubReq) *msg_unsub.UnsubResp { req.SelfCheck() resp := &msg_unsub.UnsubResp{ Status_: "ok", Uuid_: req.Uuid_, } optHandler := sf.Ctx_.Get(string(req.Name_)) if optHandler.IsNone() { resp.Status_ = fmt.Sprintf("kernelBusHttp.processUnsubRequest(): not get handler(%v) from kernel ctx", req.Name_) return resp } if optHandler == nil { resp.Status_ = fmt.Sprintf("kernelBusHttp.processUnsubRequest(): handler(%v) not exists", req.Name_) return resp } hand := optHandler.Val().Val().(IBusHandlerSubscribe) sf.Unsubscribe(hand) return resp }