|
|
@@ -10,20 +10,16 @@ import (
|
|
|
|
|
|
"github.com/google/uuid"
|
|
|
|
|
|
- "gitp78su.ipnodns.ru/svi/kern/v4/kc/log_buf"
|
|
|
- "gitp78su.ipnodns.ru/svi/kern/v4/kc/safe_string"
|
|
|
- "gitp78su.ipnodns.ru/svi/kern/v4/krn/kbus/kbus_http"
|
|
|
- "gitp78su.ipnodns.ru/svi/kern/v4/krn/kbus/kbus_msg/msg_pub"
|
|
|
- "gitp78su.ipnodns.ru/svi/kern/v4/krn/kbus/kbus_msg/msg_serve"
|
|
|
- "gitp78su.ipnodns.ru/svi/kern/v4/krn/kbus/kbus_msg/msg_sub"
|
|
|
- "gitp78su.ipnodns.ru/svi/kern/v4/krn/kbus/kbus_msg/msg_unsub"
|
|
|
- "gitp78su.ipnodns.ru/svi/kern/v4/krn/kctx"
|
|
|
. "gitp78su.ipnodns.ru/svi/kern/v4/lev0/kalias"
|
|
|
. "gitp78su.ipnodns.ru/svi/kern/v4/lev0/ktypes"
|
|
|
- . "gitp78su.ipnodns.ru/svi/kern/v4/lev1/helpers"
|
|
|
- "gitp78su.ipnodns.ru/svi/kern/v4/lev1/local_ctx"
|
|
|
- . "gitp78su.ipnodns.ru/svi/kern/v4/lev1/result"
|
|
|
- "gitp78su.ipnodns.ru/svi/kern/v4/lev1/safe_bool"
|
|
|
+ . "gitp78su.ipnodns.ru/svi/kern/v4/lev1"
|
|
|
+ "gitp78su.ipnodns.ru/svi/kern/v4/lev1/kbus_msg/msg_pub"
|
|
|
+ "gitp78su.ipnodns.ru/svi/kern/v4/lev1/kbus_msg/msg_serve"
|
|
|
+ "gitp78su.ipnodns.ru/svi/kern/v4/lev1/kbus_msg/msg_sub"
|
|
|
+ "gitp78su.ipnodns.ru/svi/kern/v4/lev1/kbus_msg/msg_unsub"
|
|
|
+ "gitp78su.ipnodns.ru/svi/kern/v4/lev1/safe_string"
|
|
|
+ "gitp78su.ipnodns.ru/svi/kern/v4/lev2/kbus_http"
|
|
|
+ "gitp78su.ipnodns.ru/svi/kern/v4/lev2/kctx"
|
|
|
)
|
|
|
|
|
|
// ClientBusHttp -- клиент HTTP-шины
|
|
|
@@ -38,13 +34,18 @@ type ClientBusHttp struct {
|
|
|
|
|
|
// NewClientBusHttp - -возвращает новый клиент HTTP-шины
|
|
|
func NewClientBusHttp(urlRemote string) IResult[IBusClient] {
|
|
|
- log := log_buf.NewLogBuf(log_buf.OptIsTerm(true), log_buf.OptPrefix("ClientBusHttp"))
|
|
|
+ log := NewLogBuf(OptIsTerm(true), OptPrefix("ClientBusHttp"))
|
|
|
log.Debug("NewClientBusHttp()")
|
|
|
if urlRemote == "" {
|
|
|
err := fmt.Errorf("NewClientBusHttp(): urlRemote is empty")
|
|
|
return NewErr[IBusClient](err)
|
|
|
}
|
|
|
- kCtx := kctx.GetKernelCtx()
|
|
|
+ resKernCtx := kctx.GetKernelCtx()
|
|
|
+ if resKernCtx.IsErr() {
|
|
|
+ err := fmt.Errorf("NewClientBusHttp(): in get ctx, err=\n\t%w", resKernCtx.Err())
|
|
|
+ return NewErr[IBusClient](err)
|
|
|
+ }
|
|
|
+ kCtx := resKernCtx.Val()
|
|
|
resLocal := safe_string.NewSafeStringGetenv("LOCAL_HTTP_URL")
|
|
|
if resLocal.IsErr() {
|
|
|
err := fmt.Errorf("NewClientBusHttp(): in get env LOCAL_HTTP_URL, err=\n\t%w", resLocal.Err())
|
|
|
@@ -57,11 +58,12 @@ func NewClientBusHttp(urlRemote string) IResult[IBusClient] {
|
|
|
return NewErr[IBusClient](err)
|
|
|
}
|
|
|
kBusHttp := resBus.Val()
|
|
|
+ lCtx := NewLocalCtx(kCtx.Ctx())
|
|
|
sf := &ClientBusHttp{
|
|
|
- ctx: local_ctx.NewLocalCtx(kCtx.Ctx()),
|
|
|
+ ctx: lCtx.Val(),
|
|
|
log: log,
|
|
|
bus: kBusHttp,
|
|
|
- isWork: safe_bool.NewSafeBool(),
|
|
|
+ isWork: NewSafeBool(),
|
|
|
urlRemote: strings.TrimSuffix(urlRemote, "/"),
|
|
|
urlLocal: strings.TrimSuffix(urlLocal.Get(), "/"),
|
|
|
}
|
|
|
@@ -71,7 +73,7 @@ func NewClientBusHttp(urlRemote string) IResult[IBusClient] {
|
|
|
// Unsubscribe -- отписывается от топика в дистанционной шине
|
|
|
func (sf *ClientBusHttp) Unsubscribe(handler IBusHandlerSubscribe) {
|
|
|
_uuid, err := uuid.NewV6()
|
|
|
- Hassert(err == nil, "ClientBusHttp.Unsubscribe(): in generate UUID v6, err=\n\t%v", err)
|
|
|
+ Hassert()(err == nil, "ClientBusHttp.Unsubscribe(): in generate UUID v6, err=\n\t%v", err)
|
|
|
|
|
|
req := &msg_unsub.UnsubReq{
|
|
|
Name_: handler.Name(),
|
|
|
@@ -79,11 +81,11 @@ func (sf *ClientBusHttp) Unsubscribe(handler IBusHandlerSubscribe) {
|
|
|
}
|
|
|
req.SelfCheck()
|
|
|
binReq, err := json.MarshalIndent(req, "", " ")
|
|
|
- Hassert(err == nil, "ClientBusHttp.Unsubscribe(): in marshal to JSON unsubscribe request, err=\n\t%v", err)
|
|
|
+ Hassert()(err == nil, "ClientBusHttp.Unsubscribe(): in marshal to JSON unsubscribe request, err=\n\t%v", err)
|
|
|
reader := strings.NewReader(string(binReq))
|
|
|
|
|
|
hReq, err := http.NewRequest("POST", sf.urlRemote+"/bus/unsub", reader)
|
|
|
- Hassert(err == nil, "ClientBusHttp.Unsubscribe(): in new request, err=\n\t%v")
|
|
|
+ Hassert()(err == nil, "ClientBusHttp.Unsubscribe(): in new request, err=\n\t%v", err)
|
|
|
|
|
|
binBody, err := sf.makePost(hReq)
|
|
|
if err != nil {
|
|
|
@@ -92,17 +94,22 @@ func (sf *ClientBusHttp) Unsubscribe(handler IBusHandlerSubscribe) {
|
|
|
}
|
|
|
resp := &msg_unsub.UnsubResp{}
|
|
|
err = json.Unmarshal(binBody, resp)
|
|
|
- Hassert(err == nil, "ClientBusHttp.Unsubscribe(): in unmarshal response, err=\n\t%v", err)
|
|
|
+ if err != nil {
|
|
|
+ sf.log.Err("Unsubscribe(): in unmarshal response, err=\n\t%v", err)
|
|
|
+ return
|
|
|
+ }
|
|
|
if string(resp.Status_) != "ok" {
|
|
|
sf.log.Err("Unsubscribe(): resp!='ok', err=\n\t%v", resp.Status_)
|
|
|
}
|
|
|
- Hassert(resp.Uuid_ == req.Uuid_, "ClientBusHttp.Unsubscribe(): resp uuid(%v) bad", resp.Uuid_)
|
|
|
+ if resp.Uuid_ != req.Uuid_ {
|
|
|
+ sf.log.Err("Unsubscribe(): resp uuid(%v) bad", resp.Uuid_)
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
// Subscribe -- подписывается на топик в дистанционной шине
|
|
|
func (sf *ClientBusHttp) Subscribe(handler IBusHandlerSubscribe) IResult[bool] {
|
|
|
_uuid, err := uuid.NewV6()
|
|
|
- Hassert(err == nil, "ClientBusHttp.Subscribe(): in generate UUID v6, err=\n\t%v", err)
|
|
|
+ Hassert()(err == nil, "ClientBusHttp.Subscribe(): in generate UUID v6, err=\n\t%v", err)
|
|
|
req := &msg_sub.SubscribeReq{
|
|
|
Topic_: handler.Topic(),
|
|
|
Uuid_: _uuid.String(),
|
|
|
@@ -113,7 +120,7 @@ func (sf *ClientBusHttp) Subscribe(handler IBusHandlerSubscribe) IResult[bool] {
|
|
|
body := strings.NewReader(string(binReq))
|
|
|
|
|
|
hReq, err := http.NewRequest("POST", sf.urlRemote+"/bus/sub", body)
|
|
|
- Hassert(err == nil, "ClientBusHttp.Subscribe(): in new request, err=\n\t%v")
|
|
|
+ Hassert()(err == nil, "ClientBusHttp.Subscribe(): in new request, err=\n\t%v")
|
|
|
|
|
|
binBody, err := sf.makePost(hReq)
|
|
|
if err != nil {
|
|
|
@@ -122,12 +129,18 @@ func (sf *ClientBusHttp) Subscribe(handler IBusHandlerSubscribe) IResult[bool] {
|
|
|
}
|
|
|
resp := &msg_sub.SubscribeResp{}
|
|
|
err = json.Unmarshal(binBody, resp)
|
|
|
- Hassert(err == nil, "ClientBusHttp.Subscribe(): in unmarshal response, err=\n\t%v", err)
|
|
|
+ if err != nil {
|
|
|
+ err := fmt.Errorf("ClientBusHttp.Subscribe(): in unmarshal response, err=\n\t%v", err)
|
|
|
+ return NewErr[bool](err)
|
|
|
+ }
|
|
|
if string(resp.Status_) != "ok" {
|
|
|
err := fmt.Errorf("ClientBusHttp.Subscribe(): resp!='ok', err=\n\t%v", resp.Status_)
|
|
|
return NewErr[bool](err)
|
|
|
}
|
|
|
- Hassert(resp.Uuid_ == req.Uuid_, "ClientBusHttp.Subscribe(): resp uuid(%v) bad", resp.Uuid_)
|
|
|
+ if resp.Uuid_ != req.Uuid_ {
|
|
|
+ err := fmt.Errorf("ClientBusHttp.Subscribe(): resp uuid(%v) bad", resp.Uuid_)
|
|
|
+ return NewErr[bool](err)
|
|
|
+ }
|
|
|
res := sf.bus.Subscribe(handler)
|
|
|
return res
|
|
|
}
|
|
|
@@ -135,7 +148,7 @@ func (sf *ClientBusHttp) Subscribe(handler IBusHandlerSubscribe) IResult[bool] {
|
|
|
// SendRequest -- отправляет в дистанционную шину запрос
|
|
|
func (sf *ClientBusHttp) SendRequest(topic ATopic, binReq []byte) IResult[[]byte] {
|
|
|
_uuid, err := uuid.NewV6()
|
|
|
- Hassert(err == nil, "ClientBusHttp.SendRequest(): in generate UUID v6, err=\n\t%v", err)
|
|
|
+ Hassert()(err == nil, "ClientBusHttp.SendRequest(): in generate UUID v6, err=\n\t%v", err)
|
|
|
req := &msg_serve.ServeReq{
|
|
|
Topic_: topic,
|
|
|
Uuid_: _uuid.String(),
|
|
|
@@ -146,7 +159,7 @@ func (sf *ClientBusHttp) SendRequest(topic ATopic, binReq []byte) IResult[[]byte
|
|
|
body := strings.NewReader(string(_binReq))
|
|
|
|
|
|
hReq, err := http.NewRequest("POST", sf.urlRemote+"/bus/request", body)
|
|
|
- Hassert(err == nil, "ClientBusHttp.SendRequest(): in new request, err=\n\t%v")
|
|
|
+ Hassert()(err == nil, "ClientBusHttp.SendRequest(): in new request, err=\n\t%v")
|
|
|
|
|
|
binBody, err := sf.makePost(hReq)
|
|
|
if err != nil {
|
|
|
@@ -155,12 +168,18 @@ func (sf *ClientBusHttp) SendRequest(topic ATopic, binReq []byte) IResult[[]byte
|
|
|
}
|
|
|
resp := &msg_serve.ServeResp{}
|
|
|
err = json.Unmarshal(binBody, resp)
|
|
|
- Hassert(err == nil, "ClientBusHttp.SendRequest(): in unmarshal response, err=\n\t%v", err)
|
|
|
+ if err != nil {
|
|
|
+ err := fmt.Errorf("ClientBusHttp.SendRequest(): in unmarshal response, err=\n\t%v", err)
|
|
|
+ return NewErr[[]byte](err)
|
|
|
+ }
|
|
|
if string(resp.Status_) != "ok" {
|
|
|
err := fmt.Errorf("ClientBusHttp.SendRequest(): resp!='ok', err=\n\t%v", resp.Status_)
|
|
|
return NewErr[[]byte](err)
|
|
|
}
|
|
|
- Hassert(resp.Uuid_ == req.Uuid_, "ClientBusHttp.SendRequest(): resp uuid(%v) bad", resp.Uuid_)
|
|
|
+ if resp.Uuid_ != req.Uuid_ {
|
|
|
+ err := fmt.Errorf("ClientBusHttp.SendRequest(): resp uuid(%v) bad", resp.Uuid_)
|
|
|
+ return NewErr[[]byte](err)
|
|
|
+ }
|
|
|
return NewRes(resp.BinResp_)
|
|
|
}
|
|
|
|
|
|
@@ -180,7 +199,7 @@ func (sf *ClientBusHttp) RegisterServe(handler IBusHandlerServe) IResult[bool] {
|
|
|
// Publish -- публикует сообщение в дистанционной шину
|
|
|
func (sf *ClientBusHttp) Publish(topic ATopic, binMsg []byte) IResult[bool] {
|
|
|
_uuid, err := uuid.NewV6()
|
|
|
- Hassert(err == nil, "ClientBusHttp.Publish(): in generate UUID v6, err=\n\t%v", err)
|
|
|
+ Hassert()(err == nil, "ClientBusHttp.Publish(): in generate UUID v6, err=\n\t%v", err)
|
|
|
req := &msg_pub.PublishReq{
|
|
|
Topic_: topic,
|
|
|
Uuid_: _uuid.String(),
|
|
|
@@ -191,7 +210,7 @@ func (sf *ClientBusHttp) Publish(topic ATopic, binMsg []byte) IResult[bool] {
|
|
|
body := strings.NewReader(string(binReq))
|
|
|
|
|
|
hReq, err := http.NewRequest("POST", sf.urlRemote+"/bus/pub", body)
|
|
|
- Hassert(err == nil, "ClientBusHttp.Publish(): in new request, err=\n\t%v")
|
|
|
+ Hassert()(err == nil, "ClientBusHttp.Publish(): in new request, err=\n\t%v")
|
|
|
|
|
|
binBody, err := sf.makePost(hReq)
|
|
|
if err != nil {
|
|
|
@@ -200,12 +219,18 @@ func (sf *ClientBusHttp) Publish(topic ATopic, binMsg []byte) IResult[bool] {
|
|
|
}
|
|
|
resp := &msg_pub.PublishResp{}
|
|
|
err = json.Unmarshal(binBody, resp)
|
|
|
- Hassert(err == nil, "ClientBusHttp.Publish(): in unmarshal response, err=\n\t%v", err)
|
|
|
+ if err != nil {
|
|
|
+ err := fmt.Errorf("ClientBusHttp.Publish(): in unmarshal response, err=\n\t%v", err)
|
|
|
+ return NewErr[bool](err)
|
|
|
+ }
|
|
|
if string(resp.Status_) != "ok" {
|
|
|
err := fmt.Errorf("ClientBusHttp.Publish(): resp!='ok', err=\n\t%v", resp.Status_)
|
|
|
return NewErr[bool](err)
|
|
|
}
|
|
|
- Hassert(resp.Uuid_ == req.Uuid_, "ClientBusHttp.Publish(): resp uuid(%v) bad", resp.Uuid_)
|
|
|
+ if resp.Uuid_ != req.Uuid_ {
|
|
|
+ err := fmt.Errorf("ClientBusHttp.Publish(): resp uuid(%v) bad", resp.Uuid_)
|
|
|
+ return NewErr[bool](err)
|
|
|
+ }
|
|
|
return NewRes(true)
|
|
|
}
|
|
|
|