client_bus_http.go 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260
  1. // package client_bus_http -- клиент HTTP-шины.
  2. package client_bus_http
  3. import (
  4. "encoding/json"
  5. "fmt"
  6. "io"
  7. "net/http"
  8. "strings"
  9. "github.com/google/uuid"
  10. mL0 "gitp78su.ipnodns.ru/svi/kern/v4/lev0"
  11. "gitp78su.ipnodns.ru/svi/kern/v4/lev0/defs/topic"
  12. mKs "gitp78su.ipnodns.ru/svi/kern/v4/lev0/kspec"
  13. mL1 "gitp78su.ipnodns.ru/svi/kern/v4/lev1"
  14. "gitp78su.ipnodns.ru/svi/kern/v4/lev1/kbus_msg/msg_pub"
  15. "gitp78su.ipnodns.ru/svi/kern/v4/lev1/kbus_msg/msg_serve"
  16. "gitp78su.ipnodns.ru/svi/kern/v4/lev1/kbus_msg/msg_sub"
  17. "gitp78su.ipnodns.ru/svi/kern/v4/lev1/kbus_msg/msg_unsub"
  18. "gitp78su.ipnodns.ru/svi/kern/v4/lev1/log_buf"
  19. "gitp78su.ipnodns.ru/svi/kern/v4/lev1/safe_bool"
  20. "gitp78su.ipnodns.ru/svi/kern/v4/lev1/safe_string"
  21. "gitp78su.ipnodns.ru/svi/kern/v4/lev2/kbus_http"
  22. "gitp78su.ipnodns.ru/svi/kern/v4/lev2/kctx"
  23. )
  24. // ClientBusHttp -- клиент HTTP-шины.
  25. type ClientBusHttp struct {
  26. bus mKs.IKernelBus
  27. lCtx mKs.ILocalCtx
  28. log mKs.ILogBuf
  29. isWork mKs.ISafeBool
  30. urlRemote string // URL дистанционной шины
  31. urlLocal string // URL локальной шины для веб-хуков
  32. }
  33. // NewClientBusHttp - -возвращает новый клиент HTTP-шины.
  34. func NewClientBusHttp(urlRemote string) mKs.IBusClient {
  35. param := &log_buf.LogBufParam{
  36. IsTerm_: safe_bool.NewSafeBool(true),
  37. Prefix_: "ClientBusHttp",
  38. }
  39. log := mL1.NewLogBuf(param)
  40. log.Debug("NewClientBusHttp()")
  41. mL0.Hassert(urlRemote != "", "NewClientBusHttp(): urlRemote is empty")
  42. kCtx := kctx.GetKernelCtx()
  43. resLocal := safe_string.NewSafeStringGetenv("LOCAL_HTTP_URL")
  44. resLocal.Hassert("NewClientBusHttp(): in get env LOCAL_HTTP_URL)")
  45. urlLocal := resLocal.Ok()
  46. kBusHttp := kbus_http.GetKernelBusHttp()
  47. sf := &ClientBusHttp{
  48. lCtx: mL1.NewLocalCtx(kCtx.Ctx()),
  49. log: log,
  50. bus: kBusHttp,
  51. isWork: mL1.NewSafeBool(),
  52. urlRemote: strings.TrimSuffix(urlRemote, "/"),
  53. urlLocal: strings.TrimSuffix(urlLocal.Get(), "/"),
  54. }
  55. return sf
  56. }
  57. // Unsubscribe -- отписывается от топика в дистанционной шине.
  58. func (sf *ClientBusHttp) Unsubscribe(handler mKs.IBusHandlerSubscribe) {
  59. _uuid, err := uuid.NewV6()
  60. mL1.Hassert()(err == nil, "ClientBusHttp.Unsubscribe(): in generate UUID v6, err=\n\t%v", err)
  61. req := &msg_unsub.UnsubReq{
  62. Name_: handler.Name(),
  63. Uuid_: _uuid.String(),
  64. }
  65. req.SelfCheck()
  66. binReq, err := json.MarshalIndent(req, "", " ")
  67. mL1.Hassert()(err == nil, "ClientBusHttp.Unsubscribe(): in marshal to JSON unsubscribe request, err=\n\t%v", err)
  68. reader := strings.NewReader(string(binReq))
  69. hReq, err := http.NewRequest(http.MethodPost, sf.urlRemote+"/bus/unsub", reader)
  70. mL1.Hassert()(err == nil, "ClientBusHttp.Unsubscribe(): in new request, err=\n\t%v", err)
  71. resBody := sf.makePost(hReq)
  72. if resBody.IsErr() {
  73. return
  74. }
  75. resp := &msg_unsub.UnsubResp{}
  76. err = json.Unmarshal(resBody.Ok(), resp)
  77. if err != nil {
  78. sf.log.Err("Unsubscribe(): in unmarshal response, err=\n\t%v", err)
  79. return
  80. }
  81. if resp.Status_ != "ok" {
  82. sf.log.Err("Unsubscribe(): resp!='ok', err=\n\t%v", resp.Status_)
  83. }
  84. if resp.Uuid_ != req.Uuid_ {
  85. sf.log.Err("Unsubscribe(): resp uuid(%v) bad", resp.Uuid_)
  86. }
  87. }
  88. // Subscribe -- подписывается на топик в дистанционной шине.
  89. func (sf *ClientBusHttp) Subscribe(handler mKs.IBusHandlerSubscribe) mKs.IResult[bool] {
  90. _uuid, err := uuid.NewV6()
  91. mL0.Hassert(err == nil, "ClientBusHttp.Subscribe(): in generate UUID v6, err=\n\t%v", err)
  92. req := &msg_sub.SubscribeReq{
  93. Topic_: handler.Topic(),
  94. Uuid_: _uuid.String(),
  95. WebHook_: sf.urlLocal + "/bus/pub", // Веб-хук всегда на своей стороне
  96. }
  97. req.SelfCheck()
  98. binReq, _ := json.MarshalIndent(req, "", " ")
  99. body := strings.NewReader(string(binReq))
  100. hReq, err := http.NewRequest(http.MethodPost, sf.urlRemote+"/bus/sub", body)
  101. mL0.Hassert(err != nil, "ClientBusHttp.Subscribe(): in new request, err=\n\t%v")
  102. resBody := sf.makePost(hReq)
  103. if resBody.IsErr() {
  104. err := fmt.Errorf("ClientBusHttp.Subscribe(): in make POST-request, err=\n\t%w", err)
  105. return mL0.NewErr[bool](err)
  106. }
  107. resp := &msg_sub.SubscribeResp{}
  108. err = json.Unmarshal(resBody.Ok(), resp)
  109. if err != nil {
  110. err := fmt.Errorf("ClientBusHttp.Subscribe(): in unmarshal response, err=\n\t%v", err)
  111. return mL0.NewErr[bool](err)
  112. }
  113. if resp.Status_ != "ok" {
  114. err := fmt.Errorf("ClientBusHttp.Subscribe(): resp!='ok', err=\n\t%v", resp.Status_)
  115. return mL0.NewErr[bool](err)
  116. }
  117. if resp.Uuid_ != req.Uuid_ {
  118. err := fmt.Errorf("ClientBusHttp.Subscribe(): resp uuid(%v) bad", resp.Uuid_)
  119. return mL0.NewErr[bool](err)
  120. }
  121. res := sf.bus.Subscribe(handler)
  122. return res
  123. }
  124. // SendRequest -- отправляет в дистанционную шину запрос.
  125. func (sf *ClientBusHttp) SendRequest(topic *topic.ATopic, binReq []byte) mL0.IResult[[]byte] {
  126. _uuid, err := uuid.NewV6()
  127. mL1.Hassert()(err == nil, "ClientBusHttp.SendRequest(): in generate UUID v6, err=\n\t%v", err)
  128. req := &msg_serve.ServeReq{
  129. Topic_: topic,
  130. Uuid_: _uuid.String(),
  131. BinReq_: binReq,
  132. }
  133. req.SelfCheck()
  134. _binReq, _ := json.MarshalIndent(req, "", " ")
  135. body := strings.NewReader(string(_binReq))
  136. hReq, err := http.NewRequest(http.MethodPost, sf.urlRemote+"/bus/request", body)
  137. mL1.Hassert()(err == nil, "ClientBusHttp.SendRequest(): in new request, err=\n\t%v")
  138. resBody := sf.makePost(hReq)
  139. if resBody.IsErr() {
  140. err := fmt.Errorf("ClientBusHttp.SendRequest(): in make request, err=\n\t%w", err)
  141. return mL0.WrapErr(resBody, err)
  142. }
  143. resp := &msg_serve.ServeResp{}
  144. err = json.Unmarshal(resBody.Ok(), resp)
  145. if err != nil {
  146. err := fmt.Errorf("ClientBusHttp.SendRequest(): in unmarshal response, err=\n\t%v", err)
  147. return mL0.NewErr[[]byte](err)
  148. }
  149. if resp.Status_ != "ok" {
  150. err := fmt.Errorf("ClientBusHttp.SendRequest(): resp!='ok', err=\n\t%v", resp.Status_)
  151. return mL0.NewErr[[]byte](err)
  152. }
  153. if resp.Uuid_ != req.Uuid_ {
  154. err := fmt.Errorf("ClientBusHttp.SendRequest(): resp uuid(%v) bad", resp.Uuid_)
  155. return mL0.NewErr[[]byte](err)
  156. }
  157. return mL0.NewOk(resp.BinResp_)
  158. }
  159. // RegisterServe -- регистрирует в локальной шине обработчик.
  160. func (sf *ClientBusHttp) RegisterServe(handler mKs.IBusHandlerServe) mL0.IResult[bool] {
  161. if handler == nil {
  162. return mL0.NewErr[bool](fmt.Errorf("ClientBusHttp.RegisterServe(): handler==nil"))
  163. }
  164. res := sf.bus.RegisterServe(handler)
  165. if res.IsErr() {
  166. err := fmt.Errorf("ClientBusHttp.RegisterServe(): in register serve to bus, err=\n\t%v", res.Err())
  167. return mL0.NewErr[bool](err)
  168. }
  169. return mL0.NewOk(true)
  170. }
  171. // Publish -- публикует сообщение в дистанционной шину.
  172. func (sf *ClientBusHttp) Publish(topic *topic.ATopic, binMsg []byte) mL0.IResult[bool] {
  173. _uuid, err := uuid.NewV6()
  174. mL1.Hassert()(err == nil, "ClientBusHttp.Publish(): in generate UUID v6, err=\n\t%v", err)
  175. req := &msg_pub.PublishReq{
  176. Topic_: topic,
  177. Uuid_: _uuid.String(),
  178. BinMsg_: binMsg,
  179. }
  180. req.SelfCheck()
  181. binReq, _ := json.MarshalIndent(req, "", " ")
  182. body := strings.NewReader(string(binReq))
  183. hReq, err := http.NewRequest(http.MethodPost, sf.urlRemote+"/bus/pub", body)
  184. mL1.Hassert()(err == nil, "ClientBusHttp.Publish(): in new request, err=\n\t%v")
  185. resBody := sf.makePost(hReq)
  186. if resBody.IsErr() {
  187. err := fmt.Errorf("ClientBusHttp.Publish(): in make request, err=\n\t%w", err)
  188. return mL0.NewErr[bool](err)
  189. }
  190. resp := &msg_pub.PublishResp{}
  191. err = json.Unmarshal(resBody.Ok(), resp)
  192. if err != nil {
  193. err := fmt.Errorf("ClientBusHttp.Publish(): in unmarshal response, err=\n\t%v", err)
  194. return mL0.NewErr[bool](err)
  195. }
  196. if resp.Status_ != "ok" {
  197. err := fmt.Errorf("ClientBusHttp.Publish(): resp!='ok', err=\n\t%v", resp.Status_)
  198. return mL0.NewErr[bool](err)
  199. }
  200. if resp.Uuid_ != req.Uuid_ {
  201. err := fmt.Errorf("ClientBusHttp.Publish(): resp uuid(%v) bad", resp.Uuid_)
  202. return mL0.NewErr[bool](err)
  203. }
  204. return mL0.NewOk(true)
  205. }
  206. // Единый обработчик запросов.
  207. func (sf *ClientBusHttp) makePost(hReq *http.Request) mKs.IResult[[]byte] {
  208. hReq.Header.Add("Content-Type", "application/json")
  209. _resp, err := http.DefaultClient.Do(hReq)
  210. if err != nil {
  211. err := fmt.Errorf("ClientBusHttp.makePost(): after request, err=\n\t%w", err)
  212. return mL0.NewErr[[]byte](err)
  213. }
  214. defer func() {
  215. _ = _resp.Body.Close()
  216. }()
  217. if _resp.StatusCode != http.StatusOK {
  218. err := fmt.Errorf("ClientBusHttp.makePost(): url=%q, status=%q",
  219. hReq.URL, _resp.Status)
  220. return mL0.NewErr[[]byte](err)
  221. }
  222. binBody, err := io.ReadAll(_resp.Body)
  223. if err != nil {
  224. err := fmt.Errorf("ClientBusHttp.makePost(): after read body, err=\n\t%v",
  225. err)
  226. return mL0.NewErr[[]byte](err)
  227. }
  228. return mL0.NewOk(binBody)
  229. }
  230. // Log -- возвращает локальный лог клиента.
  231. func (sf *ClientBusHttp) Log() mKs.ILogBuf {
  232. return sf.log
  233. }
  234. // IsWork -- возвращает признак работы.
  235. func (sf *ClientBusHttp) IsWork() mKs.EBool {
  236. return sf.bus.IsWork()
  237. }