| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221 |
- // package kbus_http -- шина сообщений поверх HTTP.
- package kbus_http
- import (
- "fmt"
- "net/http"
- "sync"
- "github.com/gofiber/fiber/v3"
- mKs "gitp78su.ipnodns.ru/svi/kern/v4/lev0/kspec"
- mL1 "gitp78su.ipnodns.ru/svi/kern/v4/lev1"
- "gitp78su.ipnodns.ru/svi/kern/v4/lev1/kbus_msg/msg_pub"
- "gitp78su.ipnodns.ru/svi/kern/v4/lev1/kbus_msg/msg_serve"
- "gitp78su.ipnodns.ru/svi/kern/v4/lev1/kbus_msg/msg_sub"
- "gitp78su.ipnodns.ru/svi/kern/v4/lev1/kbus_msg/msg_unsub"
- "gitp78su.ipnodns.ru/svi/kern/v4/lev2/kbus_base"
- "gitp78su.ipnodns.ru/svi/kern/v4/lev2/kctx"
- "gitp78su.ipnodns.ru/svi/kern/v4/lev2/kserv_http"
- "gitp78su.ipnodns.ru/svi/kern/v4/lev2/mock_hand_sub_http"
- )
- // kBusHttp -- шина данных поверх HTTP.
- type kBusHttp struct {
- *kbus_base.KBusBase
- log mKs.ILogBuf
- }
- var (
- Bus_ *kBusHttp
- block sync.Mutex
- )
- // GetKernelBusHttp -- возвращает шину HTTP.
- func GetKernelBusHttp() mKs.IKernelBus {
- block.Lock()
- defer block.Unlock()
- if Bus_ != nil {
- return Bus_
- }
- log := mL1.NewLogBuf(mL1.OptIsTerm(true), mL1.OptPrefix("kBusHttp"))
- log.Debug("GetKernelBusHttp(): new")
- kCtx := kctx.GetKernelCtx()
- kBus := kbus_base.GetKernelBusBase()
- sf := &kBusHttp{
- KBusBase: kBus,
- log: log,
- }
- kServHttp := kserv_http.GetKernelServHttp()
- fibApp := kServHttp.Fiber()
- fibApp.Post("/bus/sub", sf.postSub) // Топик подписки, IN
- fibApp.Post("/bus/unsub", sf.postUnsub) // Топик отписки, IN
- fibApp.Post("/bus/request", sf.postSendRequest) // Топик входящих запросов, IN
- fibApp.Post("/bus/pub", sf.postPublish) // Топик публикаций подписки, IN
- kCtx.Set("kernBus", sf, "GetKernelBusHttp(): http data bus")
- Bus_ = sf
- return Bus_
- }
- // Входящий запрос HTTP на подписку.
- func (sf *kBusHttp) postSub(ctx fiber.Ctx) error {
- sf.log.Debug("postSub()")
- ctx.Set("Content-type", "text/html; charset=utf8")
- ctx.Set("Content-type", "text/json")
- ctx.Set("Cache-Control", "no-cache")
- sf.log.Debug("postSub()")
- req := &msg_sub.SubscribeReq{}
- err := ctx.Bind().Body(req)
- if err != nil {
- resp := &msg_sub.SubscribeResp{
- Status_: fmt.Sprintf("kernelBusHttp.postSub(): in parse request, err=\n\t%v\n", err),
- Uuid_: req.Uuid_,
- }
- resp.SelfCheck()
- ctx.Response().SetStatusCode(http.StatusBadRequest)
- sf.log.Err("postSub(): in body parser, status=%q", resp.Status_)
- return ctx.JSON(resp)
- }
- resp := sf.processSubscribe(req)
- resp.SelfCheck()
- return ctx.JSON(resp)
- }
- // Процесс подписки веб-хука.
- func (sf *kBusHttp) processSubscribe(req *msg_sub.SubscribeReq) *msg_sub.SubscribeResp {
- req.SelfCheck()
- handler := mock_hand_sub_http.NewMockHandSubHttp(req.Topic_, req.WebHook_)
- resp := &msg_sub.SubscribeResp{
- Status_: "ok",
- Uuid_: req.Uuid_,
- Name_: handler.Name(),
- }
- res := sf.Subscribe(handler)
- if res.IsErr() {
- resp.Status_ = fmt.Sprintf("kernelBusHttp.processSubscribe(): err=\n\t%v", res.Err())
- return resp
- }
- return resp
- }
- // Входящая публикация.
- func (sf *kBusHttp) postPublish(ctx fiber.Ctx) error {
- sf.log.Debug("postPublish()")
- ctx.Set("Content-type", "text/html; charset=utf8")
- ctx.Set("Content-type", "text/json")
- ctx.Set("Cache-Control", "no-cache")
- req := &msg_pub.PublishReq{}
- err := ctx.Bind().Body(req)
- if err != nil {
- resp := &msg_pub.PublishResp{
- Status_: fmt.Sprintf("kernelBusHttp.postPublish(): in parse request, err=\n\t%v\n", err),
- Uuid_: req.Uuid_,
- }
- resp.SelfCheck()
- ctx.Response().SetStatusCode(http.StatusBadRequest)
- sf.log.Err("postPublish(): in body parser, status=%v", resp.Status_)
- return ctx.JSON(resp)
- }
- resp := sf.processPublish(req)
- resp.SelfCheck()
- return ctx.JSON(resp)
- }
- // Выполняет процесс публикации.
- func (sf *kBusHttp) processPublish(req *msg_pub.PublishReq) *msg_pub.PublishResp {
- req.SelfCheck()
- res := sf.Publish(req.Topic_, req.BinMsg_)
- resp := &msg_pub.PublishResp{
- Status_: "ok",
- Uuid_: req.Uuid_,
- }
- if res.IsErr() {
- resp.Status_ = fmt.Sprintf("kernelBusHttp.processPublish(): err=\n\t%v", res.Err())
- return resp
- }
- return resp
- }
- // Входящий запрос.
- func (sf *kBusHttp) postSendRequest(ctx fiber.Ctx) error {
- sf.log.Debug("postSendRequest()")
- ctx.Set("Content-type", "text/html; charset=utf8")
- ctx.Set("Content-type", "text/json")
- ctx.Set("Cache-Control", "no-cache")
- req := &msg_serve.ServeReq{}
- err := ctx.Bind().Body(req)
- if err != nil {
- resp := &msg_serve.ServeResp{
- Status_: fmt.Sprintf("kernelBusHttp.postSendRequest(): err=\n\t%v", err),
- Uuid_: req.Uuid_,
- }
- resp.SelfCheck()
- ctx.Response().SetStatusCode(http.StatusBadRequest)
- sf.log.Err("postSendRequest(): in body parser, status=%v", resp.Status_)
- return ctx.JSON(resp)
- }
- resp := sf.processSendRequest(req)
- resp.SelfCheck()
- return ctx.JSON(resp)
- }
- // Обрабатывает входящий запрос.
- func (sf *kBusHttp) processSendRequest(req *msg_serve.ServeReq) *msg_serve.ServeResp {
- req.SelfCheck()
- res := sf.SendRequest(req.Topic_, req.BinReq_)
- resp := &msg_serve.ServeResp{
- Status_: "ok",
- Uuid_: req.Uuid_,
- }
- if res.IsErr() {
- resp.Status_ = fmt.Sprintf("kernelBusHttp.processSendRequest(): err=\n\t%v", res.Err())
- return resp
- }
- resp.BinResp_ = res.Ok()
- return resp
- }
- // Входящая отписка от топика по HTTP.
- func (sf *kBusHttp) postUnsub(ctx fiber.Ctx) error {
- sf.log.Debug("postUnsub()")
- ctx.Set("Content-type", "text/html; charset=utf8")
- ctx.Set("Content-type", "text/json")
- ctx.Set("Cache-Control", "no-cache")
- req := &msg_unsub.UnsubReq{}
- err := ctx.Bind().Body(req)
- if err != nil {
- resp := &msg_serve.ServeResp{
- Status_: fmt.Sprintf("kernelBusHttp.postSendRequest(): err=\n\t%v", err),
- Uuid_: req.Uuid_,
- }
- resp.SelfCheck()
- ctx.Response().SetStatusCode(http.StatusBadRequest)
- sf.log.Err("postUnsub(): in body_parser, status=%q", resp.Status_)
- return ctx.JSON(resp)
- }
- resp := sf.processUnsubRequest(req)
- resp.SelfCheck()
- return ctx.JSON(resp)
- }
- // Процесс отписки от топика.
- func (sf *kBusHttp) processUnsubRequest(req *msg_unsub.UnsubReq) *msg_unsub.UnsubResp {
- req.SelfCheck()
- resp := &msg_unsub.UnsubResp{
- Status_: "ok",
- Uuid_: req.Uuid_,
- }
- optHandler := sf.KCtx_.Get(req.Name_.Get())
- if optHandler.IsNone() {
- resp.Status_ = fmt.Sprintf("kernelBusHttp.processUnsubRequest(): not get handler(%v) from kernel ctx",
- req.Name_)
- return resp
- }
- if optHandler == nil {
- resp.Status_ = fmt.Sprintf("kernelBusHttp.processUnsubRequest(): handler(%v) not exists", req.Name_)
- return resp
- }
- hand := optHandler.Some().Val().(mKs.IBusHandlerSubscribe)
- sf.Unsubscribe(hand)
- return resp
- }
|