|
|
@@ -26,7 +26,7 @@ import (
|
|
|
// ClientBusHttp -- клиент HTTP-шины.
|
|
|
type ClientBusHttp struct {
|
|
|
bus mKt.IKernelBus
|
|
|
- ctx mKt.ILocalCtx
|
|
|
+ lCtx mKt.ILocalCtx
|
|
|
log mKt.ILogBuf
|
|
|
isWork mKt.ISafeBool
|
|
|
urlRemote string // URL дистанционной шины
|
|
|
@@ -34,41 +34,24 @@ type ClientBusHttp struct {
|
|
|
}
|
|
|
|
|
|
// NewClientBusHttp - -возвращает новый клиент HTTP-шины.
|
|
|
-func NewClientBusHttp(urlRemote string) *mL0.Result[mKt.IBusClient] {
|
|
|
+func NewClientBusHttp(urlRemote string) mKt.IBusClient {
|
|
|
log := mL1.NewLogBuf(mL1.OptIsTerm(true), mL1.OptPrefix("ClientBusHttp"))
|
|
|
log.Debug("NewClientBusHttp()")
|
|
|
- if urlRemote == "" {
|
|
|
- err := fmt.Errorf("NewClientBusHttp(): urlRemote is empty")
|
|
|
- return mL0.NewErr[mKt.IBusClient](err)
|
|
|
- }
|
|
|
- resKernCtx := kctx.GetKernelCtx()
|
|
|
- if resKernCtx.IsErr() {
|
|
|
- err := fmt.Errorf("NewClientBusHttp(): in get ctx, err=\n\t%w", resKernCtx.Err())
|
|
|
- return mL0.NewErr[mKt.IBusClient](err)
|
|
|
- }
|
|
|
- kCtx := resKernCtx.Val()
|
|
|
+ mL0.Hassert(urlRemote != "", "NewClientBusHttp(): urlRemote is empty")
|
|
|
+ kCtx := kctx.GetKernelCtx()
|
|
|
resLocal := safe_string.NewSafeStringGetenv("LOCAL_HTTP_URL")
|
|
|
- if resLocal.IsErr() {
|
|
|
- err := fmt.Errorf("NewClientBusHttp(): in get env LOCAL_HTTP_URL, err=\n\t%w", resLocal.Err())
|
|
|
- return mL0.NewErr[mKt.IBusClient](err)
|
|
|
- }
|
|
|
- urlLocal := resLocal.Val()
|
|
|
- resBus := kbus_http.GetKernelBusHttp()
|
|
|
- if resBus.IsErr() {
|
|
|
- err := fmt.Errorf("NewClientBusHttp(): in get bus, err=\n\t%w", resBus.Err())
|
|
|
- return mL0.NewErr[mKt.IBusClient](err)
|
|
|
- }
|
|
|
- kBusHttp := resBus.Val()
|
|
|
- lCtx := mL1.NewLocalCtx(kCtx.Ctx())
|
|
|
+ resLocal.Hassert("NewClientBusHttp(): in get env LOCAL_HTTP_URL)")
|
|
|
+ urlLocal := resLocal.Ok()
|
|
|
+ kBusHttp := kbus_http.GetKernelBusHttp()
|
|
|
sf := &ClientBusHttp{
|
|
|
- ctx: lCtx.Val(),
|
|
|
+ lCtx: mL1.NewLocalCtx(kCtx.Ctx()),
|
|
|
log: log,
|
|
|
bus: kBusHttp,
|
|
|
isWork: mL1.NewSafeBool(),
|
|
|
urlRemote: strings.TrimSuffix(urlRemote, "/"),
|
|
|
urlLocal: strings.TrimSuffix(urlLocal.Get(), "/"),
|
|
|
}
|
|
|
- return mL0.NewRes(mKt.IBusClient(sf))
|
|
|
+ return sf
|
|
|
}
|
|
|
|
|
|
// Unsubscribe -- отписывается от топика в дистанционной шине.
|
|
|
@@ -88,13 +71,12 @@ func (sf *ClientBusHttp) Unsubscribe(handler mKt.IBusHandlerSubscribe) {
|
|
|
hReq, err := http.NewRequest(http.MethodPost, sf.urlRemote+"/bus/unsub", reader)
|
|
|
mL1.Hassert()(err == nil, "ClientBusHttp.Unsubscribe(): in new request, err=\n\t%v", err)
|
|
|
|
|
|
- binBody, err := sf.makePost(hReq)
|
|
|
- if err != nil {
|
|
|
- sf.log.Err("Unsubscribe(): in make request, err=\n\t%v", err)
|
|
|
+ resBody := sf.makePost(hReq)
|
|
|
+ if resBody.IsErr() {
|
|
|
return
|
|
|
}
|
|
|
resp := &msg_unsub.UnsubResp{}
|
|
|
- err = json.Unmarshal(binBody, resp)
|
|
|
+ err = json.Unmarshal(resBody.Ok(), resp)
|
|
|
if err != nil {
|
|
|
sf.log.Err("Unsubscribe(): in unmarshal response, err=\n\t%v", err)
|
|
|
return
|
|
|
@@ -108,9 +90,9 @@ func (sf *ClientBusHttp) Unsubscribe(handler mKt.IBusHandlerSubscribe) {
|
|
|
}
|
|
|
|
|
|
// Subscribe -- подписывается на топик в дистанционной шине.
|
|
|
-func (sf *ClientBusHttp) Subscribe(handler mKt.IBusHandlerSubscribe) *mL0.Result[bool] {
|
|
|
+func (sf *ClientBusHttp) Subscribe(handler mKt.IBusHandlerSubscribe) mKt.IResult[bool] {
|
|
|
_uuid, err := uuid.NewV6()
|
|
|
- mL1.Hassert()(err == nil, "ClientBusHttp.Subscribe(): in generate UUID v6, err=\n\t%v", err)
|
|
|
+ mL0.Hassert(err == nil, "ClientBusHttp.Subscribe(): in generate UUID v6, err=\n\t%v", err)
|
|
|
req := &msg_sub.SubscribeReq{
|
|
|
Topic_: handler.Topic(),
|
|
|
Uuid_: _uuid.String(),
|
|
|
@@ -121,15 +103,15 @@ func (sf *ClientBusHttp) Subscribe(handler mKt.IBusHandlerSubscribe) *mL0.Result
|
|
|
body := strings.NewReader(string(binReq))
|
|
|
|
|
|
hReq, err := http.NewRequest(http.MethodPost, sf.urlRemote+"/bus/sub", body)
|
|
|
- mL1.Hassert()(err == nil, "ClientBusHttp.Subscribe(): in new request, err=\n\t%v")
|
|
|
+ mL0.Hassert(err != nil, "ClientBusHttp.Subscribe(): in new request, err=\n\t%v")
|
|
|
|
|
|
- binBody, err := sf.makePost(hReq)
|
|
|
- if err != nil {
|
|
|
- err := fmt.Errorf("ClientBusHttp.Subscribe(): in make request, err=\n\t%w", err)
|
|
|
+ resBody := sf.makePost(hReq)
|
|
|
+ if resBody.IsErr() {
|
|
|
+ err := fmt.Errorf("ClientBusHttp.Subscribe(): in make POST-request, err=\n\t%w", err)
|
|
|
return mL0.NewErr[bool](err)
|
|
|
}
|
|
|
resp := &msg_sub.SubscribeResp{}
|
|
|
- err = json.Unmarshal(binBody, resp)
|
|
|
+ err = json.Unmarshal(resBody.Ok(), resp)
|
|
|
if err != nil {
|
|
|
err := fmt.Errorf("ClientBusHttp.Subscribe(): in unmarshal response, err=\n\t%v", err)
|
|
|
return mL0.NewErr[bool](err)
|
|
|
@@ -147,7 +129,7 @@ func (sf *ClientBusHttp) Subscribe(handler mKt.IBusHandlerSubscribe) *mL0.Result
|
|
|
}
|
|
|
|
|
|
// SendRequest -- отправляет в дистанционную шину запрос.
|
|
|
-func (sf *ClientBusHttp) SendRequest(topic *mKa.ATopic, binReq []byte) *mL0.Result[[]byte] {
|
|
|
+func (sf *ClientBusHttp) SendRequest(topic *mKa.ATopic, binReq []byte) mL0.IResult[[]byte] {
|
|
|
_uuid, err := uuid.NewV6()
|
|
|
mL1.Hassert()(err == nil, "ClientBusHttp.SendRequest(): in generate UUID v6, err=\n\t%v", err)
|
|
|
req := &msg_serve.ServeReq{
|
|
|
@@ -162,13 +144,13 @@ func (sf *ClientBusHttp) SendRequest(topic *mKa.ATopic, binReq []byte) *mL0.Resu
|
|
|
hReq, err := http.NewRequest(http.MethodPost, sf.urlRemote+"/bus/request", body)
|
|
|
mL1.Hassert()(err == nil, "ClientBusHttp.SendRequest(): in new request, err=\n\t%v")
|
|
|
|
|
|
- binBody, err := sf.makePost(hReq)
|
|
|
- if err != nil {
|
|
|
+ resBody := sf.makePost(hReq)
|
|
|
+ if resBody.IsErr() {
|
|
|
err := fmt.Errorf("ClientBusHttp.SendRequest(): in make request, err=\n\t%w", err)
|
|
|
- return mL0.NewErr[[]byte](err)
|
|
|
+ return mL0.WrapErr(resBody, err)
|
|
|
}
|
|
|
resp := &msg_serve.ServeResp{}
|
|
|
- err = json.Unmarshal(binBody, resp)
|
|
|
+ err = json.Unmarshal(resBody.Ok(), resp)
|
|
|
if err != nil {
|
|
|
err := fmt.Errorf("ClientBusHttp.SendRequest(): in unmarshal response, err=\n\t%v", err)
|
|
|
return mL0.NewErr[[]byte](err)
|
|
|
@@ -185,7 +167,7 @@ func (sf *ClientBusHttp) SendRequest(topic *mKa.ATopic, binReq []byte) *mL0.Resu
|
|
|
}
|
|
|
|
|
|
// RegisterServe -- регистрирует в локальной шине обработчик.
|
|
|
-func (sf *ClientBusHttp) RegisterServe(handler mKt.IBusHandlerServe) *mL0.Result[bool] {
|
|
|
+func (sf *ClientBusHttp) RegisterServe(handler mKt.IBusHandlerServe) mL0.IResult[bool] {
|
|
|
if handler == nil {
|
|
|
return mL0.NewErr[bool](fmt.Errorf("ClientBusHttp.RegisterServe(): handler==nil"))
|
|
|
}
|
|
|
@@ -198,7 +180,7 @@ func (sf *ClientBusHttp) RegisterServe(handler mKt.IBusHandlerServe) *mL0.Result
|
|
|
}
|
|
|
|
|
|
// Publish -- публикует сообщение в дистанционной шину.
|
|
|
-func (sf *ClientBusHttp) Publish(topic *mKa.ATopic, binMsg []byte) *mL0.Result[bool] {
|
|
|
+func (sf *ClientBusHttp) Publish(topic *mKa.ATopic, binMsg []byte) mL0.IResult[bool] {
|
|
|
_uuid, err := uuid.NewV6()
|
|
|
mL1.Hassert()(err == nil, "ClientBusHttp.Publish(): in generate UUID v6, err=\n\t%v", err)
|
|
|
req := &msg_pub.PublishReq{
|
|
|
@@ -213,13 +195,13 @@ func (sf *ClientBusHttp) Publish(topic *mKa.ATopic, binMsg []byte) *mL0.Result[b
|
|
|
hReq, err := http.NewRequest(http.MethodPost, sf.urlRemote+"/bus/pub", body)
|
|
|
mL1.Hassert()(err == nil, "ClientBusHttp.Publish(): in new request, err=\n\t%v")
|
|
|
|
|
|
- binBody, err := sf.makePost(hReq)
|
|
|
- if err != nil {
|
|
|
+ resBody := sf.makePost(hReq)
|
|
|
+ if resBody.IsErr() {
|
|
|
err := fmt.Errorf("ClientBusHttp.Publish(): in make request, err=\n\t%w", err)
|
|
|
return mL0.NewErr[bool](err)
|
|
|
}
|
|
|
resp := &msg_pub.PublishResp{}
|
|
|
- err = json.Unmarshal(binBody, resp)
|
|
|
+ err = json.Unmarshal(resBody.Ok(), resp)
|
|
|
if err != nil {
|
|
|
err := fmt.Errorf("ClientBusHttp.Publish(): in unmarshal response, err=\n\t%v", err)
|
|
|
return mL0.NewErr[bool](err)
|
|
|
@@ -236,21 +218,29 @@ func (sf *ClientBusHttp) Publish(topic *mKa.ATopic, binMsg []byte) *mL0.Result[b
|
|
|
}
|
|
|
|
|
|
// Единый обработчик запросов.
|
|
|
-func (sf *ClientBusHttp) makePost(hReq *http.Request) ([]byte, error) {
|
|
|
+func (sf *ClientBusHttp) makePost(hReq *http.Request) mKt.IResult[[]byte] {
|
|
|
hReq.Header.Add("Content-Type", "application/json")
|
|
|
_resp, err := http.DefaultClient.Do(hReq)
|
|
|
if err != nil {
|
|
|
err := fmt.Errorf("ClientBusHttp.makePost(): after request, err=\n\t%w", err)
|
|
|
- return nil, err
|
|
|
+ return mL0.NewErr[[]byte](err)
|
|
|
}
|
|
|
defer func() {
|
|
|
_ = _resp.Body.Close()
|
|
|
}()
|
|
|
if _resp.StatusCode != http.StatusOK {
|
|
|
- return nil, fmt.Errorf("ClientBusHttp.makePost(): url=%q, status=%q", hReq.URL, _resp.Status)
|
|
|
+ err := fmt.Errorf("ClientBusHttp.makePost(): url=%q, status=%q",
|
|
|
+ hReq.URL, _resp.Status)
|
|
|
+ return mL0.NewErr[[]byte](err)
|
|
|
}
|
|
|
+
|
|
|
binBody, err := io.ReadAll(_resp.Body)
|
|
|
- return binBody, err
|
|
|
+ if err != nil {
|
|
|
+ err := fmt.Errorf("ClientBusHttp.makePost(): after read body, err=\n\t%v",
|
|
|
+ err)
|
|
|
+ return mL0.NewErr[[]byte](err)
|
|
|
+ }
|
|
|
+ return mL0.NewRes(binBody)
|
|
|
}
|
|
|
|
|
|
// Log -- возвращает локальный лог клиента.
|