// package client_bus_http -- клиент HTTP-шины. package client_bus_http import ( "encoding/json" "fmt" "io" "net/http" "strings" "github.com/google/uuid" mL0 "gitp78su.ipnodns.ru/svi/kern/v4/lev0" mKa "gitp78su.ipnodns.ru/svi/kern/v4/lev0/alias" mKt "gitp78su.ipnodns.ru/svi/kern/v4/lev0/ktypes" mL1 "gitp78su.ipnodns.ru/svi/kern/v4/lev1" "gitp78su.ipnodns.ru/svi/kern/v4/lev1/kbus_msg/msg_pub" "gitp78su.ipnodns.ru/svi/kern/v4/lev1/kbus_msg/msg_serve" "gitp78su.ipnodns.ru/svi/kern/v4/lev1/kbus_msg/msg_sub" "gitp78su.ipnodns.ru/svi/kern/v4/lev1/kbus_msg/msg_unsub" "gitp78su.ipnodns.ru/svi/kern/v4/lev1/safe_string" "gitp78su.ipnodns.ru/svi/kern/v4/lev2/kbus_http" "gitp78su.ipnodns.ru/svi/kern/v4/lev2/kctx" ) // ClientBusHttp -- клиент HTTP-шины. type ClientBusHttp struct { bus mKt.IKernelBus ctx mKt.ILocalCtx log mKt.ILogBuf isWork mKt.ISafeBool urlRemote string // URL дистанционной шины urlLocal string // URL локальной шины для веб-хуков } // NewClientBusHttp - -возвращает новый клиент HTTP-шины. func NewClientBusHttp(urlRemote string) *mL0.Result[mKt.IBusClient] { log := mL1.NewLogBuf(mL1.OptIsTerm(true), mL1.OptPrefix("ClientBusHttp")) log.Debug("NewClientBusHttp()") if urlRemote == "" { err := fmt.Errorf("NewClientBusHttp(): urlRemote is empty") return mL0.NewErr[mKt.IBusClient](err) } resKernCtx := kctx.GetKernelCtx() if resKernCtx.IsErr() { err := fmt.Errorf("NewClientBusHttp(): in get ctx, err=\n\t%w", resKernCtx.Err()) return mL0.NewErr[mKt.IBusClient](err) } kCtx := resKernCtx.Val() resLocal := safe_string.NewSafeStringGetenv("LOCAL_HTTP_URL") if resLocal.IsErr() { err := fmt.Errorf("NewClientBusHttp(): in get env LOCAL_HTTP_URL, err=\n\t%w", resLocal.Err()) return mL0.NewErr[mKt.IBusClient](err) } urlLocal := resLocal.Val() resBus := kbus_http.GetKernelBusHttp() if resBus.IsErr() { err := fmt.Errorf("NewClientBusHttp(): in get bus, err=\n\t%w", resBus.Err()) return mL0.NewErr[mKt.IBusClient](err) } kBusHttp := resBus.Val() lCtx := mL1.NewLocalCtx(kCtx.Ctx()) sf := &ClientBusHttp{ ctx: lCtx.Val(), log: log, bus: kBusHttp, isWork: mL1.NewSafeBool(), urlRemote: strings.TrimSuffix(urlRemote, "/"), urlLocal: strings.TrimSuffix(urlLocal.Get(), "/"), } return mL0.NewRes(mKt.IBusClient(sf)) } // Unsubscribe -- отписывается от топика в дистанционной шине. func (sf *ClientBusHttp) Unsubscribe(handler mKt.IBusHandlerSubscribe) { _uuid, err := uuid.NewV6() mL1.Hassert()(err == nil, "ClientBusHttp.Unsubscribe(): in generate UUID v6, err=\n\t%v", err) req := &msg_unsub.UnsubReq{ Name_: handler.Name(), Uuid_: _uuid.String(), } req.SelfCheck() binReq, err := json.MarshalIndent(req, "", " ") mL1.Hassert()(err == nil, "ClientBusHttp.Unsubscribe(): in marshal to JSON unsubscribe request, err=\n\t%v", err) reader := strings.NewReader(string(binReq)) hReq, err := http.NewRequest(http.MethodPost, sf.urlRemote+"/bus/unsub", reader) mL1.Hassert()(err == nil, "ClientBusHttp.Unsubscribe(): in new request, err=\n\t%v", err) binBody, err := sf.makePost(hReq) if err != nil { sf.log.Err("Unsubscribe(): in make request, err=\n\t%v", err) return } resp := &msg_unsub.UnsubResp{} err = json.Unmarshal(binBody, resp) if err != nil { sf.log.Err("Unsubscribe(): in unmarshal response, err=\n\t%v", err) return } if resp.Status_ != "ok" { sf.log.Err("Unsubscribe(): resp!='ok', err=\n\t%v", resp.Status_) } if resp.Uuid_ != req.Uuid_ { sf.log.Err("Unsubscribe(): resp uuid(%v) bad", resp.Uuid_) } } // Subscribe -- подписывается на топик в дистанционной шине. func (sf *ClientBusHttp) Subscribe(handler mKt.IBusHandlerSubscribe) *mL0.Result[bool] { _uuid, err := uuid.NewV6() mL1.Hassert()(err == nil, "ClientBusHttp.Subscribe(): in generate UUID v6, err=\n\t%v", err) req := &msg_sub.SubscribeReq{ Topic_: handler.Topic(), Uuid_: _uuid.String(), WebHook_: sf.urlLocal + "/bus/pub", // Веб-хук всегда на своей стороне } req.SelfCheck() binReq, _ := json.MarshalIndent(req, "", " ") body := strings.NewReader(string(binReq)) hReq, err := http.NewRequest(http.MethodPost, sf.urlRemote+"/bus/sub", body) mL1.Hassert()(err == nil, "ClientBusHttp.Subscribe(): in new request, err=\n\t%v") binBody, err := sf.makePost(hReq) if err != nil { err := fmt.Errorf("ClientBusHttp.Subscribe(): in make request, err=\n\t%w", err) return mL0.NewErr[bool](err) } resp := &msg_sub.SubscribeResp{} err = json.Unmarshal(binBody, resp) if err != nil { err := fmt.Errorf("ClientBusHttp.Subscribe(): in unmarshal response, err=\n\t%v", err) return mL0.NewErr[bool](err) } if resp.Status_ != "ok" { err := fmt.Errorf("ClientBusHttp.Subscribe(): resp!='ok', err=\n\t%v", resp.Status_) return mL0.NewErr[bool](err) } if resp.Uuid_ != req.Uuid_ { err := fmt.Errorf("ClientBusHttp.Subscribe(): resp uuid(%v) bad", resp.Uuid_) return mL0.NewErr[bool](err) } res := sf.bus.Subscribe(handler) return res } // SendRequest -- отправляет в дистанционную шину запрос. func (sf *ClientBusHttp) SendRequest(topic *mKa.ATopic, binReq []byte) *mL0.Result[[]byte] { _uuid, err := uuid.NewV6() mL1.Hassert()(err == nil, "ClientBusHttp.SendRequest(): in generate UUID v6, err=\n\t%v", err) req := &msg_serve.ServeReq{ Topic_: topic, Uuid_: _uuid.String(), BinReq_: binReq, } req.SelfCheck() _binReq, _ := json.MarshalIndent(req, "", " ") body := strings.NewReader(string(_binReq)) hReq, err := http.NewRequest(http.MethodPost, sf.urlRemote+"/bus/request", body) mL1.Hassert()(err == nil, "ClientBusHttp.SendRequest(): in new request, err=\n\t%v") binBody, err := sf.makePost(hReq) if err != nil { err := fmt.Errorf("ClientBusHttp.SendRequest(): in make request, err=\n\t%w", err) return mL0.NewErr[[]byte](err) } resp := &msg_serve.ServeResp{} err = json.Unmarshal(binBody, resp) if err != nil { err := fmt.Errorf("ClientBusHttp.SendRequest(): in unmarshal response, err=\n\t%v", err) return mL0.NewErr[[]byte](err) } if resp.Status_ != "ok" { err := fmt.Errorf("ClientBusHttp.SendRequest(): resp!='ok', err=\n\t%v", resp.Status_) return mL0.NewErr[[]byte](err) } if resp.Uuid_ != req.Uuid_ { err := fmt.Errorf("ClientBusHttp.SendRequest(): resp uuid(%v) bad", resp.Uuid_) return mL0.NewErr[[]byte](err) } return mL0.NewRes(resp.BinResp_) } // RegisterServe -- регистрирует в локальной шине обработчик. func (sf *ClientBusHttp) RegisterServe(handler mKt.IBusHandlerServe) *mL0.Result[bool] { if handler == nil { return mL0.NewErr[bool](fmt.Errorf("ClientBusHttp.RegisterServe(): handler==nil")) } res := sf.bus.RegisterServe(handler) if res.IsErr() { err := fmt.Errorf("ClientBusHttp.RegisterServe(): in register serve to bus, err=\n\t%v", res.Err()) return mL0.NewErr[bool](err) } return mL0.NewRes(true) } // Publish -- публикует сообщение в дистанционной шину. func (sf *ClientBusHttp) Publish(topic *mKa.ATopic, binMsg []byte) *mL0.Result[bool] { _uuid, err := uuid.NewV6() mL1.Hassert()(err == nil, "ClientBusHttp.Publish(): in generate UUID v6, err=\n\t%v", err) req := &msg_pub.PublishReq{ Topic_: topic, Uuid_: _uuid.String(), BinMsg_: binMsg, } req.SelfCheck() binReq, _ := json.MarshalIndent(req, "", " ") body := strings.NewReader(string(binReq)) hReq, err := http.NewRequest(http.MethodPost, sf.urlRemote+"/bus/pub", body) mL1.Hassert()(err == nil, "ClientBusHttp.Publish(): in new request, err=\n\t%v") binBody, err := sf.makePost(hReq) if err != nil { err := fmt.Errorf("ClientBusHttp.Publish(): in make request, err=\n\t%w", err) return mL0.NewErr[bool](err) } resp := &msg_pub.PublishResp{} err = json.Unmarshal(binBody, resp) if err != nil { err := fmt.Errorf("ClientBusHttp.Publish(): in unmarshal response, err=\n\t%v", err) return mL0.NewErr[bool](err) } if resp.Status_ != "ok" { err := fmt.Errorf("ClientBusHttp.Publish(): resp!='ok', err=\n\t%v", resp.Status_) return mL0.NewErr[bool](err) } if resp.Uuid_ != req.Uuid_ { err := fmt.Errorf("ClientBusHttp.Publish(): resp uuid(%v) bad", resp.Uuid_) return mL0.NewErr[bool](err) } return mL0.NewRes(true) } // Единый обработчик запросов. func (sf *ClientBusHttp) makePost(hReq *http.Request) ([]byte, error) { hReq.Header.Add("Content-Type", "application/json") _resp, err := http.DefaultClient.Do(hReq) if err != nil { err := fmt.Errorf("ClientBusHttp.makePost(): after request, err=\n\t%w", err) return nil, err } defer func() { _ = _resp.Body.Close() }() if _resp.StatusCode != http.StatusOK { return nil, fmt.Errorf("ClientBusHttp.makePost(): url=%q, status=%q", hReq.URL, _resp.Status) } binBody, err := io.ReadAll(_resp.Body) return binBody, err } // Log -- возвращает локальный лог клиента. func (sf *ClientBusHttp) Log() mKt.ILogBuf { return sf.log } // IsWork -- возвращает признак работы. func (sf *ClientBusHttp) IsWork() bool { return sf.bus.IsWork() }