client_bus_http.go 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215
  1. // package client_bus_http -- клиент HTTP-шины
  2. package client_bus_http
  3. import (
  4. "encoding/json"
  5. "fmt"
  6. "io"
  7. "net/http"
  8. "os"
  9. "strings"
  10. "github.com/google/uuid"
  11. . "gitp78su.ipnodns.ru/svi/kern/v2/kc/helpers"
  12. "gitp78su.ipnodns.ru/svi/kern/v2/kc/local_ctx"
  13. "gitp78su.ipnodns.ru/svi/kern/v2/kc/safe_bool"
  14. . "gitp78su.ipnodns.ru/svi/kern/v2/krn/kalias"
  15. "gitp78su.ipnodns.ru/svi/kern/v2/krn/kbus/kbus_http"
  16. "gitp78su.ipnodns.ru/svi/kern/v2/krn/kbus/kbus_msg/msg_pub"
  17. "gitp78su.ipnodns.ru/svi/kern/v2/krn/kbus/kbus_msg/msg_serve"
  18. "gitp78su.ipnodns.ru/svi/kern/v2/krn/kbus/kbus_msg/msg_sub"
  19. "gitp78su.ipnodns.ru/svi/kern/v2/krn/kbus/kbus_msg/msg_unsub"
  20. "gitp78su.ipnodns.ru/svi/kern/v2/krn/kctx"
  21. . "gitp78su.ipnodns.ru/svi/kern/v2/krn/ktypes"
  22. )
  23. // ClientBusHttp -- клиент HTTP-шины
  24. type ClientBusHttp struct {
  25. bus IKernelBus
  26. ctx ILocalCtx
  27. log ILogBuf
  28. isWork ISafeBool
  29. urlRemote string // URL дистанционной шины
  30. urlLocal string // URL локальной шины
  31. }
  32. // NewClientBusHttp - -возвращает новый клиент HTTP-шины
  33. func NewClientBusHttp(url string) IBusClient {
  34. Hassert(url != "", "NewClientBusHttp(): url is empty")
  35. kCtx := kctx.GetKernelCtx()
  36. urlLocal := os.Getenv("LOCAL_HTTP_URL")
  37. Hassert(urlLocal != "", "NewClientBusHttp(): env LOCAL_HTTP_URL not set")
  38. sf := &ClientBusHttp{
  39. ctx: local_ctx.NewLocalCtx(kCtx.Ctx()),
  40. bus: kbus_http.GetKernelBusHttp(),
  41. isWork: safe_bool.NewSafeBool(),
  42. urlRemote: strings.TrimSuffix(url, "/"),
  43. urlLocal: strings.TrimSuffix(urlLocal, "/"),
  44. }
  45. sf.log = sf.ctx.Log()
  46. return sf
  47. }
  48. // Unsubscribe -- отписывается от топика в дистанционной шине
  49. func (sf *ClientBusHttp) Unsubscribe(handler IBusHandlerSubscribe) {
  50. _uuid, err := uuid.NewV6()
  51. Hassert(err == nil, "ClientBusHttp.Unsubscribe(): in generate UUID v6, err=\n\t%v", err)
  52. req := &msg_unsub.UnsubReq{
  53. Name_: handler.Name(),
  54. Uuid_: _uuid.String(),
  55. }
  56. req.SelfCheck()
  57. binReq, _ := json.MarshalIndent(req, "", " ")
  58. body := strings.NewReader(string(binReq))
  59. hReq, err := http.NewRequest("POST", sf.urlRemote+"/bus/unsub", body)
  60. Hassert(err == nil, "ClientBusHttp.Unsubscribe(): in new request, err=\n\t%v")
  61. binBody, err := sf.makePost(hReq)
  62. if err != nil {
  63. sf.log.Err("ClientBusHttp.Unsubscribe(): in make request, err=\n\t%v")
  64. return
  65. }
  66. resp := &msg_unsub.UnsubResp{}
  67. err = json.Unmarshal(binBody, resp)
  68. Hassert(err == nil, "ClientBusHttp.Unsubscribe(): in unmarshal response, err=\n\t%v", err)
  69. if string(resp.Status_) != "ok" {
  70. sf.log.Err("ClientBusHttp.Unsubscribe(): resp!='ok', err=\n\t%v", resp.Status_)
  71. }
  72. Hassert(resp.Uuid_ == req.Uuid_, "ClientBusHttp.Unsubscribe(): resp uuid(%v) bad", resp.Uuid_)
  73. }
  74. // Subscribe -- подписывается на топик в дистанционной шине
  75. func (sf *ClientBusHttp) Subscribe(handler IBusHandlerSubscribe) Result[bool] {
  76. _uuid, err := uuid.NewV6()
  77. Hassert(err == nil, "ClientBusHttp.Subscribe(): in generate UUID v6, err=\n\t%v", err)
  78. req := &msg_sub.SubscribeReq{
  79. Topic_: handler.Topic(),
  80. Uuid_: _uuid.String(),
  81. WebHook_: sf.urlLocal + "/bus/pub",
  82. }
  83. req.SelfCheck()
  84. binReq, _ := json.MarshalIndent(req, "", " ")
  85. body := strings.NewReader(string(binReq))
  86. hReq, err := http.NewRequest("POST", sf.urlRemote+"/bus/sub", body)
  87. Hassert(err == nil, "ClientBusHttp.Subscribe(): in new request, err=\n\t%v")
  88. binBody, err := sf.makePost(hReq)
  89. if err != nil {
  90. err := fmt.Errorf("ClientBusHttp.Subscribe(): in make request, err=\n\t%w", err)
  91. sf.log.Err(err.Error())
  92. return NewErr[bool](err)
  93. }
  94. resp := &msg_sub.SubscribeResp{}
  95. err = json.Unmarshal(binBody, resp)
  96. Hassert(err == nil, "ClientBusHttp.Subscribe(): in unmarshal response, err=\n\t%v", err)
  97. if string(resp.Status_) != "ok" {
  98. err := fmt.Errorf("ClientBusHttp.Subscribe(): resp!='ok', err=\n\t%v", resp.Status_)
  99. sf.log.Err(err.Error())
  100. return NewErr[bool](err)
  101. }
  102. Hassert(resp.Uuid_ == req.Uuid_, "ClientBusHttp.Subscribe(): resp uuid(%v) bad", resp.Uuid_)
  103. res := sf.bus.Subscribe(handler)
  104. return res
  105. }
  106. // SendRequest -- отправляет в дистанционную шину запрос
  107. func (sf *ClientBusHttp) SendRequest(topic ATopic, binReq []byte) Result[[]byte] {
  108. _uuid, err := uuid.NewV6()
  109. Hassert(err == nil, "ClientBusHttp.SendRequest(): in generate UUID v6, err=\n\t%v", err)
  110. req := &msg_serve.ServeReq{
  111. Topic_: topic,
  112. Uuid_: _uuid.String(),
  113. BinReq_: binReq,
  114. }
  115. req.SelfCheck()
  116. _binReq, _ := json.MarshalIndent(req, "", " ")
  117. body := strings.NewReader(string(_binReq))
  118. hReq, err := http.NewRequest("POST", sf.urlRemote+"/bus/request", body)
  119. Hassert(err == nil, "ClientBusHttp.SendRequest(): in new request, err=\n\t%v")
  120. binBody, err := sf.makePost(hReq)
  121. if err != nil {
  122. err := fmt.Errorf("ClientBusHttp.SendRequest(): in make request, err=\n\t%w", err)
  123. sf.log.Err(err.Error())
  124. return NewErr[[]byte](err)
  125. }
  126. resp := &msg_serve.ServeResp{}
  127. err = json.Unmarshal(binBody, resp)
  128. Hassert(err == nil, "ClientBusHttp.SendRequest(): in unmarshal response, err=\n\t%v", err)
  129. if string(resp.Status_) != "ok" {
  130. err := fmt.Errorf("ClientBusHttp.SendRequest(): resp!='ok', err=\n\t%v", resp.Status_)
  131. sf.log.Err(err.Error())
  132. return NewErr[[]byte](err)
  133. }
  134. Hassert(resp.Uuid_ == req.Uuid_, "ClientBusHttp.SendRequest(): resp uuid(%v) bad", resp.Uuid_)
  135. return NewOk(resp.BinResp_)
  136. }
  137. // RegisterServe -- регистрирует в локальной шине обработчик
  138. func (sf *ClientBusHttp) RegisterServe(handler IBusHandlerServe) {
  139. Hassert(handler != nil, "ClientBusHttp.RegisterServe(): handler==nil")
  140. sf.bus.RegisterServe(handler)
  141. }
  142. // Publish -- публикует сообщение в дистанционной шину
  143. func (sf *ClientBusHttp) Publish(topic ATopic, binMsg []byte) Result[bool] {
  144. _uuid, err := uuid.NewV6()
  145. Hassert(err == nil, "ClientBusHttp.Publish(): in generate UUID v6, err=\n\t%v", err)
  146. req := &msg_pub.PublishReq{
  147. Topic_: topic,
  148. Uuid_: _uuid.String(),
  149. BinMsg_: binMsg,
  150. }
  151. req.SelfCheck()
  152. binReq, _ := json.MarshalIndent(req, "", " ")
  153. body := strings.NewReader(string(binReq))
  154. hReq, err := http.NewRequest("POST", sf.urlRemote+"/bus/pub", body)
  155. Hassert(err == nil, "ClientBusHttp.Publish(): in new request, err=\n\t%v")
  156. binBody, err := sf.makePost(hReq)
  157. if err != nil {
  158. err := fmt.Errorf("ClientBusHttp.Publish(): in make request, err=\n\t%w", err)
  159. sf.log.Err(err.Error())
  160. return NewErr[bool](err)
  161. }
  162. resp := &msg_pub.PublishResp{}
  163. err = json.Unmarshal(binBody, resp)
  164. Hassert(err == nil, "ClientBusHttp.Publish(): in unmarshal response, err=\n\t%v", err)
  165. if string(resp.Status_) != "ok" {
  166. err := fmt.Errorf("ClientBusHttp.Publish(): resp!='ok', err=\n\t%v", resp.Status_)
  167. sf.log.Err(err.Error())
  168. return NewErr[bool](err)
  169. }
  170. Hassert(resp.Uuid_ == req.Uuid_, "ClientBusHttp.Publish(): resp uuid(%v) bad", resp.Uuid_)
  171. return NewOk(true)
  172. }
  173. // Единый обработчик запросов
  174. func (sf *ClientBusHttp) makePost(hReq *http.Request) ([]byte, error) {
  175. hReq.Header.Add("Content-Type", "application/json")
  176. _resp, err := http.DefaultClient.Do(hReq)
  177. if err != nil {
  178. err := fmt.Errorf("ClientBusHttp.makePost(): after request, err=\n\t%w", err)
  179. sf.log.Err(err.Error())
  180. return nil, err
  181. }
  182. defer _resp.Body.Close()
  183. binBody, _ := io.ReadAll(_resp.Body)
  184. return binBody, nil
  185. }
  186. // Log -- возвращает локальный лог клиента
  187. func (sf *ClientBusHttp) Log() ILogBuf {
  188. return sf.log
  189. }
  190. // IsWork -- возвращает признак работы
  191. func (sf *ClientBusHttp) IsWork() bool {
  192. return sf.bus.IsWork()
  193. }