| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263 |
- // package client_bus_http -- клиент HTTP-шины.
- package client_bus_http
- import (
- "encoding/json"
- "fmt"
- "io"
- "net/http"
- "strings"
- "github.com/google/uuid"
- mKa "gitp78su.ipnodns.ru/svi/kern/v4/lev0/kalias"
- mKt "gitp78su.ipnodns.ru/svi/kern/v4/lev0/ktypes"
- mL1 "gitp78su.ipnodns.ru/svi/kern/v4/lev1"
- "gitp78su.ipnodns.ru/svi/kern/v4/lev1/kbus_msg/msg_pub"
- "gitp78su.ipnodns.ru/svi/kern/v4/lev1/kbus_msg/msg_serve"
- "gitp78su.ipnodns.ru/svi/kern/v4/lev1/kbus_msg/msg_sub"
- "gitp78su.ipnodns.ru/svi/kern/v4/lev1/kbus_msg/msg_unsub"
- "gitp78su.ipnodns.ru/svi/kern/v4/lev1/safe_string"
- "gitp78su.ipnodns.ru/svi/kern/v4/lev2/kbus_http"
- "gitp78su.ipnodns.ru/svi/kern/v4/lev2/kctx"
- )
- // ClientBusHttp -- клиент HTTP-шины.
- type ClientBusHttp struct {
- bus mKt.IKernelBus
- ctx mKt.ILocalCtx
- log mKt.ILogBuf
- isWork mKt.ISafeBool
- urlRemote string // URL дистанционной шины
- urlLocal string // URL локальной шины для веб-хуков
- }
- // NewClientBusHttp - -возвращает новый клиент HTTP-шины.
- func NewClientBusHttp(urlRemote string) mKt.IResult[mKt.IBusClient] {
- log := mL1.NewLogBuf(mL1.OptIsTerm(true), mL1.OptPrefix("ClientBusHttp"))
- log.Debug("NewClientBusHttp()")
- if urlRemote == "" {
- err := fmt.Errorf("NewClientBusHttp(): urlRemote is empty")
- return mL1.NewErr[mKt.IBusClient](err)
- }
- resKernCtx := kctx.GetKernelCtx()
- if resKernCtx.IsErr() {
- err := fmt.Errorf("NewClientBusHttp(): in get ctx, err=\n\t%w", resKernCtx.Err())
- return mL1.NewErr[mKt.IBusClient](err)
- }
- kCtx := resKernCtx.Val()
- resLocal := safe_string.NewSafeStringGetenv("LOCAL_HTTP_URL")
- if resLocal.IsErr() {
- err := fmt.Errorf("NewClientBusHttp(): in get env LOCAL_HTTP_URL, err=\n\t%w", resLocal.Err())
- return mL1.NewErr[mKt.IBusClient](err)
- }
- urlLocal := resLocal.Val()
- resBus := kbus_http.GetKernelBusHttp()
- if resBus.IsErr() {
- err := fmt.Errorf("NewClientBusHttp(): in get bus, err=\n\t%w", resBus.Err())
- return mL1.NewErr[mKt.IBusClient](err)
- }
- kBusHttp := resBus.Val()
- lCtx := mL1.NewLocalCtx(kCtx.Ctx())
- sf := &ClientBusHttp{
- ctx: lCtx.Val(),
- log: log,
- bus: kBusHttp,
- isWork: mL1.NewSafeBool(),
- urlRemote: strings.TrimSuffix(urlRemote, "/"),
- urlLocal: strings.TrimSuffix(urlLocal.Get(), "/"),
- }
- return mL1.NewRes(mKt.IBusClient(sf))
- }
- // Unsubscribe -- отписывается от топика в дистанционной шине.
- func (sf *ClientBusHttp) Unsubscribe(handler mKt.IBusHandlerSubscribe) {
- _uuid, err := uuid.NewV6()
- mL1.Hassert()(err == nil, "ClientBusHttp.Unsubscribe(): in generate UUID v6, err=\n\t%v", err)
- req := &msg_unsub.UnsubReq{
- Name_: handler.Name(),
- Uuid_: _uuid.String(),
- }
- req.SelfCheck()
- binReq, err := json.MarshalIndent(req, "", " ")
- mL1.Hassert()(err == nil, "ClientBusHttp.Unsubscribe(): in marshal to JSON unsubscribe request, err=\n\t%v", err)
- reader := strings.NewReader(string(binReq))
- hReq, err := http.NewRequest(http.MethodPost, sf.urlRemote+"/bus/unsub", reader)
- mL1.Hassert()(err == nil, "ClientBusHttp.Unsubscribe(): in new request, err=\n\t%v", err)
- binBody, err := sf.makePost(hReq)
- if err != nil {
- sf.log.Err("Unsubscribe(): in make request, err=\n\t%v", err)
- return
- }
- resp := &msg_unsub.UnsubResp{}
- err = json.Unmarshal(binBody, resp)
- if err != nil {
- sf.log.Err("Unsubscribe(): in unmarshal response, err=\n\t%v", err)
- return
- }
- if resp.Status_ != "ok" {
- sf.log.Err("Unsubscribe(): resp!='ok', err=\n\t%v", resp.Status_)
- }
- if resp.Uuid_ != req.Uuid_ {
- sf.log.Err("Unsubscribe(): resp uuid(%v) bad", resp.Uuid_)
- }
- }
- // Subscribe -- подписывается на топик в дистанционной шине.
- func (sf *ClientBusHttp) Subscribe(handler mKt.IBusHandlerSubscribe) mKt.IResult[bool] {
- _uuid, err := uuid.NewV6()
- mL1.Hassert()(err == nil, "ClientBusHttp.Subscribe(): in generate UUID v6, err=\n\t%v", err)
- req := &msg_sub.SubscribeReq{
- Topic_: handler.Topic(),
- Uuid_: _uuid.String(),
- WebHook_: sf.urlLocal + "/bus/pub", // Веб-хук всегда на своей стороне
- }
- req.SelfCheck()
- binReq, _ := json.MarshalIndent(req, "", " ")
- body := strings.NewReader(string(binReq))
- hReq, err := http.NewRequest(http.MethodPost, sf.urlRemote+"/bus/sub", body)
- mL1.Hassert()(err == nil, "ClientBusHttp.Subscribe(): in new request, err=\n\t%v")
- binBody, err := sf.makePost(hReq)
- if err != nil {
- err := fmt.Errorf("ClientBusHttp.Subscribe(): in make request, err=\n\t%w", err)
- return mL1.NewErr[bool](err)
- }
- resp := &msg_sub.SubscribeResp{}
- err = json.Unmarshal(binBody, resp)
- if err != nil {
- err := fmt.Errorf("ClientBusHttp.Subscribe(): in unmarshal response, err=\n\t%v", err)
- return mL1.NewErr[bool](err)
- }
- if resp.Status_ != "ok" {
- err := fmt.Errorf("ClientBusHttp.Subscribe(): resp!='ok', err=\n\t%v", resp.Status_)
- return mL1.NewErr[bool](err)
- }
- if resp.Uuid_ != req.Uuid_ {
- err := fmt.Errorf("ClientBusHttp.Subscribe(): resp uuid(%v) bad", resp.Uuid_)
- return mL1.NewErr[bool](err)
- }
- res := sf.bus.Subscribe(handler)
- return res
- }
- // SendRequest -- отправляет в дистанционную шину запрос.
- func (sf *ClientBusHttp) SendRequest(topic mKa.ATopic, binReq []byte) mKt.IResult[[]byte] {
- _uuid, err := uuid.NewV6()
- mL1.Hassert()(err == nil, "ClientBusHttp.SendRequest(): in generate UUID v6, err=\n\t%v", err)
- req := &msg_serve.ServeReq{
- Topic_: topic,
- Uuid_: _uuid.String(),
- BinReq_: binReq,
- }
- req.SelfCheck()
- _binReq, _ := json.MarshalIndent(req, "", " ")
- body := strings.NewReader(string(_binReq))
- hReq, err := http.NewRequest(http.MethodPost, sf.urlRemote+"/bus/request", body)
- mL1.Hassert()(err == nil, "ClientBusHttp.SendRequest(): in new request, err=\n\t%v")
- binBody, err := sf.makePost(hReq)
- if err != nil {
- err := fmt.Errorf("ClientBusHttp.SendRequest(): in make request, err=\n\t%w", err)
- return mL1.NewErr[[]byte](err)
- }
- resp := &msg_serve.ServeResp{}
- err = json.Unmarshal(binBody, resp)
- if err != nil {
- err := fmt.Errorf("ClientBusHttp.SendRequest(): in unmarshal response, err=\n\t%v", err)
- return mL1.NewErr[[]byte](err)
- }
- if resp.Status_ != "ok" {
- err := fmt.Errorf("ClientBusHttp.SendRequest(): resp!='ok', err=\n\t%v", resp.Status_)
- return mL1.NewErr[[]byte](err)
- }
- if resp.Uuid_ != req.Uuid_ {
- err := fmt.Errorf("ClientBusHttp.SendRequest(): resp uuid(%v) bad", resp.Uuid_)
- return mL1.NewErr[[]byte](err)
- }
- return mL1.NewRes(resp.BinResp_)
- }
- // RegisterServe -- регистрирует в локальной шине обработчик.
- func (sf *ClientBusHttp) RegisterServe(handler mKt.IBusHandlerServe) mKt.IResult[bool] {
- if handler == nil {
- return mL1.NewErr[bool](fmt.Errorf("ClientBusHttp.RegisterServe(): handler==nil"))
- }
- res := sf.bus.RegisterServe(handler)
- if res.IsErr() {
- err := fmt.Errorf("ClientBusHttp.RegisterServe(): in register serve to bus, err=\n\t%v", res.Err())
- return mL1.NewErr[bool](err)
- }
- return mL1.NewRes(true)
- }
- // Publish -- публикует сообщение в дистанционной шину.
- func (sf *ClientBusHttp) Publish(topic mKa.ATopic, binMsg []byte) mKt.IResult[bool] {
- _uuid, err := uuid.NewV6()
- mL1.Hassert()(err == nil, "ClientBusHttp.Publish(): in generate UUID v6, err=\n\t%v", err)
- req := &msg_pub.PublishReq{
- Topic_: topic,
- Uuid_: _uuid.String(),
- BinMsg_: binMsg,
- }
- req.SelfCheck()
- binReq, _ := json.MarshalIndent(req, "", " ")
- body := strings.NewReader(string(binReq))
- hReq, err := http.NewRequest(http.MethodPost, sf.urlRemote+"/bus/pub", body)
- mL1.Hassert()(err == nil, "ClientBusHttp.Publish(): in new request, err=\n\t%v")
- binBody, err := sf.makePost(hReq)
- if err != nil {
- err := fmt.Errorf("ClientBusHttp.Publish(): in make request, err=\n\t%w", err)
- return mL1.NewErr[bool](err)
- }
- resp := &msg_pub.PublishResp{}
- err = json.Unmarshal(binBody, resp)
- if err != nil {
- err := fmt.Errorf("ClientBusHttp.Publish(): in unmarshal response, err=\n\t%v", err)
- return mL1.NewErr[bool](err)
- }
- if resp.Status_ != "ok" {
- err := fmt.Errorf("ClientBusHttp.Publish(): resp!='ok', err=\n\t%v", resp.Status_)
- return mL1.NewErr[bool](err)
- }
- if resp.Uuid_ != req.Uuid_ {
- err := fmt.Errorf("ClientBusHttp.Publish(): resp uuid(%v) bad", resp.Uuid_)
- return mL1.NewErr[bool](err)
- }
- return mL1.NewRes(true)
- }
- // Единый обработчик запросов.
- func (sf *ClientBusHttp) makePost(hReq *http.Request) ([]byte, error) {
- hReq.Header.Add("Content-Type", "application/json")
- _resp, err := http.DefaultClient.Do(hReq)
- if err != nil {
- err := fmt.Errorf("ClientBusHttp.makePost(): after request, err=\n\t%w", err)
- return nil, err
- }
- defer func() {
- _ = _resp.Body.Close()
- }()
- if _resp.StatusCode != http.StatusOK {
- return nil, fmt.Errorf("ClientBusHttp.makePost(): url=%q, status=%q", hReq.URL, _resp.Status)
- }
- binBody, err := io.ReadAll(_resp.Body)
- return binBody, err
- }
- // Log -- возвращает локальный лог клиента.
- func (sf *ClientBusHttp) Log() mKt.ILogBuf {
- return sf.log
- }
- // IsWork -- возвращает признак работы.
- func (sf *ClientBusHttp) IsWork() bool {
- return sf.bus.IsWork()
- }
|