client_bus_http.go 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254
  1. // package client_bus_http -- клиент HTTP-шины.
  2. package client_bus_http
  3. import (
  4. "encoding/json"
  5. "fmt"
  6. "io"
  7. "net/http"
  8. "strings"
  9. "github.com/google/uuid"
  10. mL0 "gitp78su.ipnodns.ru/svi/kern/v4/lev0"
  11. mKd "gitp78su.ipnodns.ru/svi/kern/v4/lev0/defs"
  12. mKs "gitp78su.ipnodns.ru/svi/kern/v4/lev0/kspec"
  13. mL1 "gitp78su.ipnodns.ru/svi/kern/v4/lev1"
  14. "gitp78su.ipnodns.ru/svi/kern/v4/lev1/kbus_msg/msg_pub"
  15. "gitp78su.ipnodns.ru/svi/kern/v4/lev1/kbus_msg/msg_serve"
  16. "gitp78su.ipnodns.ru/svi/kern/v4/lev1/kbus_msg/msg_sub"
  17. "gitp78su.ipnodns.ru/svi/kern/v4/lev1/kbus_msg/msg_unsub"
  18. "gitp78su.ipnodns.ru/svi/kern/v4/lev1/safe_string"
  19. "gitp78su.ipnodns.ru/svi/kern/v4/lev2/kbus_http"
  20. "gitp78su.ipnodns.ru/svi/kern/v4/lev2/kctx"
  21. )
  22. // ClientBusHttp -- клиент HTTP-шины.
  23. type ClientBusHttp struct {
  24. bus mKs.IKernelBus
  25. lCtx mKs.ILocalCtx
  26. log mKs.ILogBuf
  27. isWork mKs.ISafeBool
  28. urlRemote string // URL дистанционной шины
  29. urlLocal string // URL локальной шины для веб-хуков
  30. }
  31. // NewClientBusHttp - -возвращает новый клиент HTTP-шины.
  32. func NewClientBusHttp(urlRemote string) mKs.IBusClient {
  33. log := mL1.NewLogBuf(mL1.OptIsTerm(true), mL1.OptPrefix("ClientBusHttp"))
  34. log.Debug("NewClientBusHttp()")
  35. mL0.Hassert(urlRemote != "", "NewClientBusHttp(): urlRemote is empty")
  36. kCtx := kctx.GetKernelCtx()
  37. resLocal := safe_string.NewSafeStringGetenv("LOCAL_HTTP_URL")
  38. resLocal.Hassert("NewClientBusHttp(): in get env LOCAL_HTTP_URL)")
  39. urlLocal := resLocal.Ok()
  40. kBusHttp := kbus_http.GetKernelBusHttp()
  41. sf := &ClientBusHttp{
  42. lCtx: mL1.NewLocalCtx(kCtx.Ctx()),
  43. log: log,
  44. bus: kBusHttp,
  45. isWork: mL1.NewSafeBool(),
  46. urlRemote: strings.TrimSuffix(urlRemote, "/"),
  47. urlLocal: strings.TrimSuffix(urlLocal.Get(), "/"),
  48. }
  49. return sf
  50. }
  51. // Unsubscribe -- отписывается от топика в дистанционной шине.
  52. func (sf *ClientBusHttp) Unsubscribe(handler mKs.IBusHandlerSubscribe) {
  53. _uuid, err := uuid.NewV6()
  54. mL1.Hassert()(err == nil, "ClientBusHttp.Unsubscribe(): in generate UUID v6, err=\n\t%v", err)
  55. req := &msg_unsub.UnsubReq{
  56. Name_: handler.Name(),
  57. Uuid_: _uuid.String(),
  58. }
  59. req.SelfCheck()
  60. binReq, err := json.MarshalIndent(req, "", " ")
  61. mL1.Hassert()(err == nil, "ClientBusHttp.Unsubscribe(): in marshal to JSON unsubscribe request, err=\n\t%v", err)
  62. reader := strings.NewReader(string(binReq))
  63. hReq, err := http.NewRequest(http.MethodPost, sf.urlRemote+"/bus/unsub", reader)
  64. mL1.Hassert()(err == nil, "ClientBusHttp.Unsubscribe(): in new request, err=\n\t%v", err)
  65. resBody := sf.makePost(hReq)
  66. if resBody.IsErr() {
  67. return
  68. }
  69. resp := &msg_unsub.UnsubResp{}
  70. err = json.Unmarshal(resBody.Ok(), resp)
  71. if err != nil {
  72. sf.log.Err("Unsubscribe(): in unmarshal response, err=\n\t%v", err)
  73. return
  74. }
  75. if resp.Status_ != "ok" {
  76. sf.log.Err("Unsubscribe(): resp!='ok', err=\n\t%v", resp.Status_)
  77. }
  78. if resp.Uuid_ != req.Uuid_ {
  79. sf.log.Err("Unsubscribe(): resp uuid(%v) bad", resp.Uuid_)
  80. }
  81. }
  82. // Subscribe -- подписывается на топик в дистанционной шине.
  83. func (sf *ClientBusHttp) Subscribe(handler mKs.IBusHandlerSubscribe) mKs.IResult[bool] {
  84. _uuid, err := uuid.NewV6()
  85. mL0.Hassert(err == nil, "ClientBusHttp.Subscribe(): in generate UUID v6, err=\n\t%v", err)
  86. req := &msg_sub.SubscribeReq{
  87. Topic_: handler.Topic(),
  88. Uuid_: _uuid.String(),
  89. WebHook_: sf.urlLocal + "/bus/pub", // Веб-хук всегда на своей стороне
  90. }
  91. req.SelfCheck()
  92. binReq, _ := json.MarshalIndent(req, "", " ")
  93. body := strings.NewReader(string(binReq))
  94. hReq, err := http.NewRequest(http.MethodPost, sf.urlRemote+"/bus/sub", body)
  95. mL0.Hassert(err != nil, "ClientBusHttp.Subscribe(): in new request, err=\n\t%v")
  96. resBody := sf.makePost(hReq)
  97. if resBody.IsErr() {
  98. err := fmt.Errorf("ClientBusHttp.Subscribe(): in make POST-request, err=\n\t%w", err)
  99. return mL0.NewErr[bool](err)
  100. }
  101. resp := &msg_sub.SubscribeResp{}
  102. err = json.Unmarshal(resBody.Ok(), resp)
  103. if err != nil {
  104. err := fmt.Errorf("ClientBusHttp.Subscribe(): in unmarshal response, err=\n\t%v", err)
  105. return mL0.NewErr[bool](err)
  106. }
  107. if resp.Status_ != "ok" {
  108. err := fmt.Errorf("ClientBusHttp.Subscribe(): resp!='ok', err=\n\t%v", resp.Status_)
  109. return mL0.NewErr[bool](err)
  110. }
  111. if resp.Uuid_ != req.Uuid_ {
  112. err := fmt.Errorf("ClientBusHttp.Subscribe(): resp uuid(%v) bad", resp.Uuid_)
  113. return mL0.NewErr[bool](err)
  114. }
  115. res := sf.bus.Subscribe(handler)
  116. return res
  117. }
  118. // SendRequest -- отправляет в дистанционную шину запрос.
  119. func (sf *ClientBusHttp) SendRequest(topic *mKd.Topic, binReq []byte) mL0.IResult[[]byte] {
  120. _uuid, err := uuid.NewV6()
  121. mL1.Hassert()(err == nil, "ClientBusHttp.SendRequest(): in generate UUID v6, err=\n\t%v", err)
  122. req := &msg_serve.ServeReq{
  123. Topic_: topic,
  124. Uuid_: _uuid.String(),
  125. BinReq_: binReq,
  126. }
  127. req.SelfCheck()
  128. _binReq, _ := json.MarshalIndent(req, "", " ")
  129. body := strings.NewReader(string(_binReq))
  130. hReq, err := http.NewRequest(http.MethodPost, sf.urlRemote+"/bus/request", body)
  131. mL1.Hassert()(err == nil, "ClientBusHttp.SendRequest(): in new request, err=\n\t%v")
  132. resBody := sf.makePost(hReq)
  133. if resBody.IsErr() {
  134. err := fmt.Errorf("ClientBusHttp.SendRequest(): in make request, err=\n\t%w", err)
  135. return mL0.WrapErr(resBody, err)
  136. }
  137. resp := &msg_serve.ServeResp{}
  138. err = json.Unmarshal(resBody.Ok(), resp)
  139. if err != nil {
  140. err := fmt.Errorf("ClientBusHttp.SendRequest(): in unmarshal response, err=\n\t%v", err)
  141. return mL0.NewErr[[]byte](err)
  142. }
  143. if resp.Status_ != "ok" {
  144. err := fmt.Errorf("ClientBusHttp.SendRequest(): resp!='ok', err=\n\t%v", resp.Status_)
  145. return mL0.NewErr[[]byte](err)
  146. }
  147. if resp.Uuid_ != req.Uuid_ {
  148. err := fmt.Errorf("ClientBusHttp.SendRequest(): resp uuid(%v) bad", resp.Uuid_)
  149. return mL0.NewErr[[]byte](err)
  150. }
  151. return mL0.NewRes(resp.BinResp_)
  152. }
  153. // RegisterServe -- регистрирует в локальной шине обработчик.
  154. func (sf *ClientBusHttp) RegisterServe(handler mKs.IBusHandlerServe) mL0.IResult[bool] {
  155. if handler == nil {
  156. return mL0.NewErr[bool](fmt.Errorf("ClientBusHttp.RegisterServe(): handler==nil"))
  157. }
  158. res := sf.bus.RegisterServe(handler)
  159. if res.IsErr() {
  160. err := fmt.Errorf("ClientBusHttp.RegisterServe(): in register serve to bus, err=\n\t%v", res.Err())
  161. return mL0.NewErr[bool](err)
  162. }
  163. return mL0.NewRes(true)
  164. }
  165. // Publish -- публикует сообщение в дистанционной шину.
  166. func (sf *ClientBusHttp) Publish(topic *mKd.Topic, binMsg []byte) mL0.IResult[bool] {
  167. _uuid, err := uuid.NewV6()
  168. mL1.Hassert()(err == nil, "ClientBusHttp.Publish(): in generate UUID v6, err=\n\t%v", err)
  169. req := &msg_pub.PublishReq{
  170. Topic_: topic,
  171. Uuid_: _uuid.String(),
  172. BinMsg_: binMsg,
  173. }
  174. req.SelfCheck()
  175. binReq, _ := json.MarshalIndent(req, "", " ")
  176. body := strings.NewReader(string(binReq))
  177. hReq, err := http.NewRequest(http.MethodPost, sf.urlRemote+"/bus/pub", body)
  178. mL1.Hassert()(err == nil, "ClientBusHttp.Publish(): in new request, err=\n\t%v")
  179. resBody := sf.makePost(hReq)
  180. if resBody.IsErr() {
  181. err := fmt.Errorf("ClientBusHttp.Publish(): in make request, err=\n\t%w", err)
  182. return mL0.NewErr[bool](err)
  183. }
  184. resp := &msg_pub.PublishResp{}
  185. err = json.Unmarshal(resBody.Ok(), resp)
  186. if err != nil {
  187. err := fmt.Errorf("ClientBusHttp.Publish(): in unmarshal response, err=\n\t%v", err)
  188. return mL0.NewErr[bool](err)
  189. }
  190. if resp.Status_ != "ok" {
  191. err := fmt.Errorf("ClientBusHttp.Publish(): resp!='ok', err=\n\t%v", resp.Status_)
  192. return mL0.NewErr[bool](err)
  193. }
  194. if resp.Uuid_ != req.Uuid_ {
  195. err := fmt.Errorf("ClientBusHttp.Publish(): resp uuid(%v) bad", resp.Uuid_)
  196. return mL0.NewErr[bool](err)
  197. }
  198. return mL0.NewRes(true)
  199. }
  200. // Единый обработчик запросов.
  201. func (sf *ClientBusHttp) makePost(hReq *http.Request) mKs.IResult[[]byte] {
  202. hReq.Header.Add("Content-Type", "application/json")
  203. _resp, err := http.DefaultClient.Do(hReq)
  204. if err != nil {
  205. err := fmt.Errorf("ClientBusHttp.makePost(): after request, err=\n\t%w", err)
  206. return mL0.NewErr[[]byte](err)
  207. }
  208. defer func() {
  209. _ = _resp.Body.Close()
  210. }()
  211. if _resp.StatusCode != http.StatusOK {
  212. err := fmt.Errorf("ClientBusHttp.makePost(): url=%q, status=%q",
  213. hReq.URL, _resp.Status)
  214. return mL0.NewErr[[]byte](err)
  215. }
  216. binBody, err := io.ReadAll(_resp.Body)
  217. if err != nil {
  218. err := fmt.Errorf("ClientBusHttp.makePost(): after read body, err=\n\t%v",
  219. err)
  220. return mL0.NewErr[[]byte](err)
  221. }
  222. return mL0.NewRes(binBody)
  223. }
  224. // Log -- возвращает локальный лог клиента.
  225. func (sf *ClientBusHttp) Log() mKs.ILogBuf {
  226. return sf.log
  227. }
  228. // IsWork -- возвращает признак работы.
  229. func (sf *ClientBusHttp) IsWork() bool {
  230. return sf.bus.IsWork()
  231. }