client_bus_http.go 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261
  1. // package client_bus_http -- клиент HTTP-шины.
  2. package client_bus_http
  3. import (
  4. "encoding/json"
  5. "fmt"
  6. "io"
  7. "net/http"
  8. "strings"
  9. "github.com/google/uuid"
  10. . "gitp78su.ipnodns.ru/svi/kern/v4/lev0/kalias"
  11. . "gitp78su.ipnodns.ru/svi/kern/v4/lev0/ktypes"
  12. . "gitp78su.ipnodns.ru/svi/kern/v4/lev1"
  13. "gitp78su.ipnodns.ru/svi/kern/v4/lev1/kbus_msg/msg_pub"
  14. "gitp78su.ipnodns.ru/svi/kern/v4/lev1/kbus_msg/msg_serve"
  15. "gitp78su.ipnodns.ru/svi/kern/v4/lev1/kbus_msg/msg_sub"
  16. "gitp78su.ipnodns.ru/svi/kern/v4/lev1/kbus_msg/msg_unsub"
  17. "gitp78su.ipnodns.ru/svi/kern/v4/lev1/safe_string"
  18. "gitp78su.ipnodns.ru/svi/kern/v4/lev2/kbus_http"
  19. "gitp78su.ipnodns.ru/svi/kern/v4/lev2/kctx"
  20. )
  21. // ClientBusHttp -- клиент HTTP-шины.
  22. type ClientBusHttp struct {
  23. bus IKernelBus
  24. ctx ILocalCtx
  25. log ILogBuf
  26. isWork ISafeBool
  27. urlRemote string // URL дистанционной шины
  28. urlLocal string // URL локальной шины для веб-хуков
  29. }
  30. // NewClientBusHttp - -возвращает новый клиент HTTP-шины.
  31. func NewClientBusHttp(urlRemote string) IResult[IBusClient] {
  32. log := NewLogBuf(OptIsTerm(true), OptPrefix("ClientBusHttp"))
  33. log.Debug("NewClientBusHttp()")
  34. if urlRemote == "" {
  35. err := fmt.Errorf("NewClientBusHttp(): urlRemote is empty")
  36. return NewErr[IBusClient](err)
  37. }
  38. resKernCtx := kctx.GetKernelCtx()
  39. if resKernCtx.IsErr() {
  40. err := fmt.Errorf("NewClientBusHttp(): in get ctx, err=\n\t%w", resKernCtx.Err())
  41. return NewErr[IBusClient](err)
  42. }
  43. kCtx := resKernCtx.Val()
  44. resLocal := safe_string.NewSafeStringGetenv("LOCAL_HTTP_URL")
  45. if resLocal.IsErr() {
  46. err := fmt.Errorf("NewClientBusHttp(): in get env LOCAL_HTTP_URL, err=\n\t%w", resLocal.Err())
  47. return NewErr[IBusClient](err)
  48. }
  49. urlLocal := resLocal.Val()
  50. resBus := kbus_http.GetKernelBusHttp()
  51. if resBus.IsErr() {
  52. err := fmt.Errorf("NewClientBusHttp(): in get bus, err=\n\t%w", resBus.Err())
  53. return NewErr[IBusClient](err)
  54. }
  55. kBusHttp := resBus.Val()
  56. lCtx := NewLocalCtx(kCtx.Ctx())
  57. sf := &ClientBusHttp{
  58. ctx: lCtx.Val(),
  59. log: log,
  60. bus: kBusHttp,
  61. isWork: NewSafeBool(),
  62. urlRemote: strings.TrimSuffix(urlRemote, "/"),
  63. urlLocal: strings.TrimSuffix(urlLocal.Get(), "/"),
  64. }
  65. return NewRes(IBusClient(sf))
  66. }
  67. // Unsubscribe -- отписывается от топика в дистанционной шине.
  68. func (sf *ClientBusHttp) Unsubscribe(handler IBusHandlerSubscribe) {
  69. _uuid, err := uuid.NewV6()
  70. Hassert()(err == nil, "ClientBusHttp.Unsubscribe(): in generate UUID v6, err=\n\t%v", err)
  71. req := &msg_unsub.UnsubReq{
  72. Name_: handler.Name(),
  73. Uuid_: _uuid.String(),
  74. }
  75. req.SelfCheck()
  76. binReq, err := json.MarshalIndent(req, "", " ")
  77. Hassert()(err == nil, "ClientBusHttp.Unsubscribe(): in marshal to JSON unsubscribe request, err=\n\t%v", err)
  78. reader := strings.NewReader(string(binReq))
  79. hReq, err := http.NewRequest("POST", sf.urlRemote+"/bus/unsub", reader)
  80. Hassert()(err == nil, "ClientBusHttp.Unsubscribe(): in new request, err=\n\t%v", err)
  81. binBody, err := sf.makePost(hReq)
  82. if err != nil {
  83. sf.log.Err("Unsubscribe(): in make request, err=\n\t%v", err)
  84. return
  85. }
  86. resp := &msg_unsub.UnsubResp{}
  87. err = json.Unmarshal(binBody, resp)
  88. if err != nil {
  89. sf.log.Err("Unsubscribe(): in unmarshal response, err=\n\t%v", err)
  90. return
  91. }
  92. if resp.Status_ != "ok" {
  93. sf.log.Err("Unsubscribe(): resp!='ok', err=\n\t%v", resp.Status_)
  94. }
  95. if resp.Uuid_ != req.Uuid_ {
  96. sf.log.Err("Unsubscribe(): resp uuid(%v) bad", resp.Uuid_)
  97. }
  98. }
  99. // Subscribe -- подписывается на топик в дистанционной шине.
  100. func (sf *ClientBusHttp) Subscribe(handler IBusHandlerSubscribe) IResult[bool] {
  101. _uuid, err := uuid.NewV6()
  102. Hassert()(err == nil, "ClientBusHttp.Subscribe(): in generate UUID v6, err=\n\t%v", err)
  103. req := &msg_sub.SubscribeReq{
  104. Topic_: handler.Topic(),
  105. Uuid_: _uuid.String(),
  106. WebHook_: sf.urlLocal + "/bus/pub", // Веб-хук всегда на своей стороне
  107. }
  108. req.SelfCheck()
  109. binReq, _ := json.MarshalIndent(req, "", " ")
  110. body := strings.NewReader(string(binReq))
  111. hReq, err := http.NewRequest("POST", sf.urlRemote+"/bus/sub", body)
  112. Hassert()(err == nil, "ClientBusHttp.Subscribe(): in new request, err=\n\t%v")
  113. binBody, err := sf.makePost(hReq)
  114. if err != nil {
  115. err := fmt.Errorf("ClientBusHttp.Subscribe(): in make request, err=\n\t%w", err)
  116. return NewErr[bool](err)
  117. }
  118. resp := &msg_sub.SubscribeResp{}
  119. err = json.Unmarshal(binBody, resp)
  120. if err != nil {
  121. err := fmt.Errorf("ClientBusHttp.Subscribe(): in unmarshal response, err=\n\t%v", err)
  122. return NewErr[bool](err)
  123. }
  124. if resp.Status_ != "ok" {
  125. err := fmt.Errorf("ClientBusHttp.Subscribe(): resp!='ok', err=\n\t%v", resp.Status_)
  126. return NewErr[bool](err)
  127. }
  128. if resp.Uuid_ != req.Uuid_ {
  129. err := fmt.Errorf("ClientBusHttp.Subscribe(): resp uuid(%v) bad", resp.Uuid_)
  130. return NewErr[bool](err)
  131. }
  132. res := sf.bus.Subscribe(handler)
  133. return res
  134. }
  135. // SendRequest -- отправляет в дистанционную шину запрос.
  136. func (sf *ClientBusHttp) SendRequest(topic ATopic, binReq []byte) IResult[[]byte] {
  137. _uuid, err := uuid.NewV6()
  138. Hassert()(err == nil, "ClientBusHttp.SendRequest(): in generate UUID v6, err=\n\t%v", err)
  139. req := &msg_serve.ServeReq{
  140. Topic_: topic,
  141. Uuid_: _uuid.String(),
  142. BinReq_: binReq,
  143. }
  144. req.SelfCheck()
  145. _binReq, _ := json.MarshalIndent(req, "", " ")
  146. body := strings.NewReader(string(_binReq))
  147. hReq, err := http.NewRequest("POST", sf.urlRemote+"/bus/request", body)
  148. Hassert()(err == nil, "ClientBusHttp.SendRequest(): in new request, err=\n\t%v")
  149. binBody, err := sf.makePost(hReq)
  150. if err != nil {
  151. err := fmt.Errorf("ClientBusHttp.SendRequest(): in make request, err=\n\t%w", err)
  152. return NewErr[[]byte](err)
  153. }
  154. resp := &msg_serve.ServeResp{}
  155. err = json.Unmarshal(binBody, resp)
  156. if err != nil {
  157. err := fmt.Errorf("ClientBusHttp.SendRequest(): in unmarshal response, err=\n\t%v", err)
  158. return NewErr[[]byte](err)
  159. }
  160. if resp.Status_ != "ok" {
  161. err := fmt.Errorf("ClientBusHttp.SendRequest(): resp!='ok', err=\n\t%v", resp.Status_)
  162. return NewErr[[]byte](err)
  163. }
  164. if resp.Uuid_ != req.Uuid_ {
  165. err := fmt.Errorf("ClientBusHttp.SendRequest(): resp uuid(%v) bad", resp.Uuid_)
  166. return NewErr[[]byte](err)
  167. }
  168. return NewRes(resp.BinResp_)
  169. }
  170. // RegisterServe -- регистрирует в локальной шине обработчик.
  171. func (sf *ClientBusHttp) RegisterServe(handler IBusHandlerServe) IResult[bool] {
  172. if handler == nil {
  173. return NewErr[bool](fmt.Errorf("ClientBusHttp.RegisterServe(): handler==nil"))
  174. }
  175. res := sf.bus.RegisterServe(handler)
  176. if res.IsErr() {
  177. err := fmt.Errorf("ClientBusHttp.RegisterServe(): in register serve to bus, err=\n\t%v", res.Err())
  178. return NewErr[bool](err)
  179. }
  180. return NewRes(true)
  181. }
  182. // Publish -- публикует сообщение в дистанционной шину.
  183. func (sf *ClientBusHttp) Publish(topic ATopic, binMsg []byte) IResult[bool] {
  184. _uuid, err := uuid.NewV6()
  185. Hassert()(err == nil, "ClientBusHttp.Publish(): in generate UUID v6, err=\n\t%v", err)
  186. req := &msg_pub.PublishReq{
  187. Topic_: topic,
  188. Uuid_: _uuid.String(),
  189. BinMsg_: binMsg,
  190. }
  191. req.SelfCheck()
  192. binReq, _ := json.MarshalIndent(req, "", " ")
  193. body := strings.NewReader(string(binReq))
  194. hReq, err := http.NewRequest("POST", sf.urlRemote+"/bus/pub", body)
  195. Hassert()(err == nil, "ClientBusHttp.Publish(): in new request, err=\n\t%v")
  196. binBody, err := sf.makePost(hReq)
  197. if err != nil {
  198. err := fmt.Errorf("ClientBusHttp.Publish(): in make request, err=\n\t%w", err)
  199. return NewErr[bool](err)
  200. }
  201. resp := &msg_pub.PublishResp{}
  202. err = json.Unmarshal(binBody, resp)
  203. if err != nil {
  204. err := fmt.Errorf("ClientBusHttp.Publish(): in unmarshal response, err=\n\t%v", err)
  205. return NewErr[bool](err)
  206. }
  207. if resp.Status_ != "ok" {
  208. err := fmt.Errorf("ClientBusHttp.Publish(): resp!='ok', err=\n\t%v", resp.Status_)
  209. return NewErr[bool](err)
  210. }
  211. if resp.Uuid_ != req.Uuid_ {
  212. err := fmt.Errorf("ClientBusHttp.Publish(): resp uuid(%v) bad", resp.Uuid_)
  213. return NewErr[bool](err)
  214. }
  215. return NewRes(true)
  216. }
  217. // Единый обработчик запросов.
  218. func (sf *ClientBusHttp) makePost(hReq *http.Request) ([]byte, error) {
  219. hReq.Header.Add("Content-Type", "application/json")
  220. _resp, err := http.DefaultClient.Do(hReq)
  221. if err != nil {
  222. err := fmt.Errorf("ClientBusHttp.makePost(): after request, err=\n\t%w", err)
  223. return nil, err
  224. }
  225. defer _resp.Body.Close()
  226. if _resp.StatusCode != 200 {
  227. return nil, fmt.Errorf("ClientBusHttp.makePost(): url=%q, status=%q", hReq.URL, _resp.Status)
  228. }
  229. binBody, err := io.ReadAll(_resp.Body)
  230. return binBody, err
  231. }
  232. // Log -- возвращает локальный лог клиента.
  233. func (sf *ClientBusHttp) Log() ILogBuf {
  234. return sf.log
  235. }
  236. // IsWork -- возвращает признак работы.
  237. func (sf *ClientBusHttp) IsWork() bool {
  238. return sf.bus.IsWork()
  239. }