client_bus_http.go 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219
  1. // package client_bus_http -- клиент HTTP-шины
  2. package client_bus_http
  3. import (
  4. "encoding/json"
  5. "fmt"
  6. "io"
  7. "net/http"
  8. "os"
  9. "strings"
  10. "github.com/google/uuid"
  11. . "gitp78su.ipnodns.ru/svi/kern/kc/helpers"
  12. "gitp78su.ipnodns.ru/svi/kern/kc/local_ctx"
  13. "gitp78su.ipnodns.ru/svi/kern/kc/safe_bool"
  14. . "gitp78su.ipnodns.ru/svi/kern/krn/kalias"
  15. "gitp78su.ipnodns.ru/svi/kern/krn/kbus/kbus_http"
  16. "gitp78su.ipnodns.ru/svi/kern/krn/kbus/kbus_msg/msg_pub"
  17. "gitp78su.ipnodns.ru/svi/kern/krn/kbus/kbus_msg/msg_serve"
  18. "gitp78su.ipnodns.ru/svi/kern/krn/kbus/kbus_msg/msg_sub"
  19. "gitp78su.ipnodns.ru/svi/kern/krn/kbus/kbus_msg/msg_unsub"
  20. "gitp78su.ipnodns.ru/svi/kern/krn/kctx"
  21. . "gitp78su.ipnodns.ru/svi/kern/krn/ktypes"
  22. "gitp78su.ipnodns.ru/svi/kern/mock/mock_hand_sub_http"
  23. )
  24. // ClientBusHttp -- клиент HTTP-шины
  25. type ClientBusHttp struct {
  26. bus IKernelBus
  27. ctx ILocalCtx
  28. log ILogBuf
  29. isWork ISafeBool
  30. urlRemote string // URL дистанционной шины
  31. urlLocal string // URL локальной шины
  32. }
  33. // NewClientBusHttp - -возвращает новый клиент HTTP-шины
  34. func NewClientBusHttp(url string) IBusClient {
  35. Hassert(url != "", "NewClientBusHttp(): url is empty")
  36. kCtx := kctx.GetKernelCtx()
  37. urlLocal := os.Getenv("LOCAL_HTTP_URL")
  38. Hassert(urlLocal != "", "NewClientBusHttp(): env LOCAL_HTTP_URL not set")
  39. sf := &ClientBusHttp{
  40. ctx: local_ctx.NewLocalCtx(kCtx.Ctx()),
  41. bus: kbus_http.GetKernelBusHttp(),
  42. isWork: safe_bool.NewSafeBool(),
  43. urlRemote: strings.TrimSuffix(url, "/"),
  44. urlLocal: strings.TrimSuffix(urlLocal, "/"),
  45. }
  46. sf.log = sf.ctx.Log()
  47. return sf
  48. }
  49. // Unsubscribe -- отписывается от топика в дистанционной шине
  50. func (sf *ClientBusHttp) Unsubscribe(handler IBusHandlerSubscribe) {
  51. _uuid, err := uuid.NewV6()
  52. Hassert(err == nil, "ClientBusHttp.Unsubscribe(): in generate UUID v6, err=\n\t%v", err)
  53. req := &msg_unsub.UnsubReq{
  54. Name_: handler.Name(),
  55. Uuid_: _uuid.String(),
  56. }
  57. req.SelfCheck()
  58. binReq, _ := json.MarshalIndent(req, "", " ")
  59. body := strings.NewReader(string(binReq))
  60. hReq, err := http.NewRequest("POST", sf.urlRemote+"/bus/unsub", body)
  61. Hassert(err == nil, "ClientBusHttp.Unsubscribe(): in new request, err=\n\t%v")
  62. binBody, err := sf.makePost(hReq)
  63. if err != nil {
  64. sf.log.Err("ClientBusHttp.Unsubscribe(): in make request, err=\n\t%v")
  65. return
  66. }
  67. resp := &msg_unsub.UnsubResp{}
  68. err = json.Unmarshal(binBody, resp)
  69. Hassert(err == nil, "ClientBusHttp.Unsubscribe(): in unmarshal response, err=\n\t%v", err)
  70. if string(resp.Status_) != "ok" {
  71. sf.log.Err("ClientBusHttp.Unsubscribe(): resp!='ok', err=\n\t%v", resp.Status_)
  72. }
  73. Hassert(resp.Uuid_ == req.Uuid_, "ClientBusHttp.Unsubscribe(): resp uuid(%v) bad", resp.Uuid_)
  74. }
  75. // Subscribe -- подписывается на топик в дистанционной шине
  76. func (sf *ClientBusHttp) Subscribe(handler IBusHandlerSubscribe) error {
  77. _uuid, err := uuid.NewV6()
  78. Hassert(err == nil, "ClientBusHttp.Subscribe(): in generate UUID v6, err=\n\t%v", err)
  79. req := &msg_sub.SubscribeReq{
  80. Topic_: handler.Topic(),
  81. Uuid_: _uuid.String(),
  82. WebHook_: sf.urlLocal + "/bus/pub",
  83. }
  84. req.SelfCheck()
  85. binReq, _ := json.MarshalIndent(req, "", " ")
  86. body := strings.NewReader(string(binReq))
  87. hReq, err := http.NewRequest("POST", sf.urlRemote+"/bus/sub", body)
  88. Hassert(err == nil, "ClientBusHttp.Subscribe(): in new request, err=\n\t%v")
  89. binBody, err := sf.makePost(hReq)
  90. if err != nil {
  91. err := fmt.Errorf("ClientBusHttp.Subscribe(): in make request, err=\n\t%w", err)
  92. sf.log.Err(err.Error())
  93. return err
  94. }
  95. resp := &msg_sub.SubscribeResp{}
  96. err = json.Unmarshal(binBody, resp)
  97. Hassert(err == nil, "ClientBusHttp.Subscribe(): in unmarshal response, err=\n\t%v", err)
  98. if string(resp.Status_) != "ok" {
  99. err := fmt.Errorf("ClientBusHttp.Subscribe(): resp!='ok', err=\n\t%v", resp.Status_)
  100. sf.log.Err(err.Error())
  101. return err
  102. }
  103. Hassert(resp.Uuid_ == req.Uuid_, "ClientBusHttp.Subscribe(): resp uuid(%v) bad", resp.Uuid_)
  104. // FIXME: вот тут похоже дичь
  105. _handler := handler.(*mock_hand_sub_http.MockHandSubHttp)
  106. _handler.SetName(resp.Name_)
  107. err = sf.bus.Subscribe(_handler)
  108. return err
  109. }
  110. // SendRequest -- отправляет в дистанционную шину запрос
  111. func (sf *ClientBusHttp) SendRequest(topic ATopic, binReq []byte) ([]byte, error) {
  112. _uuid, err := uuid.NewV6()
  113. Hassert(err == nil, "ClientBusHttp.SendRequest(): in generate UUID v6, err=\n\t%v", err)
  114. req := &msg_serve.ServeReq{
  115. Topic_: topic,
  116. Uuid_: _uuid.String(),
  117. BinReq_: binReq,
  118. }
  119. req.SelfCheck()
  120. _binReq, _ := json.MarshalIndent(req, "", " ")
  121. body := strings.NewReader(string(_binReq))
  122. hReq, err := http.NewRequest("POST", sf.urlRemote+"/bus/request", body)
  123. Hassert(err == nil, "ClientBusHttp.SendRequest(): in new request, err=\n\t%v")
  124. binBody, err := sf.makePost(hReq)
  125. if err != nil {
  126. err := fmt.Errorf("ClientBusHttp.SendRequest(): in make request, err=\n\t%w", err)
  127. sf.log.Err(err.Error())
  128. return nil, err
  129. }
  130. resp := &msg_serve.ServeResp{}
  131. err = json.Unmarshal(binBody, resp)
  132. Hassert(err == nil, "ClientBusHttp.SendRequest(): in unmarshal response, err=\n\t%v", err)
  133. if string(resp.Status_) != "ok" {
  134. err := fmt.Errorf("ClientBusHttp.SendRequest(): resp!='ok', err=\n\t%v", resp.Status_)
  135. sf.log.Err(err.Error())
  136. return nil, err
  137. }
  138. Hassert(resp.Uuid_ == req.Uuid_, "ClientBusHttp.SendRequest(): resp uuid(%v) bad", resp.Uuid_)
  139. return resp.BinResp_, nil
  140. }
  141. // RegisterServe -- регистрирует в локальной шине обработчик
  142. func (sf *ClientBusHttp) RegisterServe(handler IBusHandlerServe) {
  143. Hassert(handler != nil, "ClientBusHttp.RegisterServe(): handler==nil")
  144. sf.bus.RegisterServe(handler)
  145. }
  146. // Publish -- публикует сообщение в дистанционной шину
  147. func (sf *ClientBusHttp) Publish(topic ATopic, binMsg []byte) error {
  148. _uuid, err := uuid.NewV6()
  149. Hassert(err == nil, "ClientBusHttp.Publish(): in generate UUID v6, err=\n\t%v", err)
  150. req := &msg_pub.PublishReq{
  151. Topic_: topic,
  152. Uuid_: _uuid.String(),
  153. BinMsg_: binMsg,
  154. }
  155. req.SelfCheck()
  156. binReq, _ := json.MarshalIndent(req, "", " ")
  157. body := strings.NewReader(string(binReq))
  158. hReq, err := http.NewRequest("POST", sf.urlRemote+"/bus/pub", body)
  159. Hassert(err == nil, "ClientBusHttp.Publish(): in new request, err=\n\t%v")
  160. binBody, err := sf.makePost(hReq)
  161. if err != nil {
  162. err := fmt.Errorf("ClientBusHttp.Publish(): in make request, err=\n\t%w", err)
  163. sf.log.Err(err.Error())
  164. return err
  165. }
  166. resp := &msg_pub.PublishResp{}
  167. err = json.Unmarshal(binBody, resp)
  168. Hassert(err == nil, "ClientBusHttp.Publish(): in unmarshal response, err=\n\t%v", err)
  169. if string(resp.Status_) != "ok" {
  170. err := fmt.Errorf("ClientBusHttp.Publish(): resp!='ok', err=\n\t%v", resp.Status_)
  171. sf.log.Err(err.Error())
  172. return err
  173. }
  174. Hassert(resp.Uuid_ == req.Uuid_, "ClientBusHttp.Publish(): resp uuid(%v) bad", resp.Uuid_)
  175. return nil
  176. }
  177. // Единый обработчик запросов
  178. func (sf *ClientBusHttp) makePost(hReq *http.Request) ([]byte, error) {
  179. hReq.Header.Add("Content-Type", "application/json")
  180. _resp, err := http.DefaultClient.Do(hReq)
  181. if err != nil {
  182. err := fmt.Errorf("ClientBusHttp.makePost(): after request, err=\n\t%w", err)
  183. sf.log.Err(err.Error())
  184. return nil, err
  185. }
  186. defer _resp.Body.Close()
  187. binBody, _ := io.ReadAll(_resp.Body)
  188. return binBody, nil
  189. }
  190. // Log -- возвращает локальный лог клиента
  191. func (sf *ClientBusHttp) Log() ILogBuf {
  192. return sf.log
  193. }
  194. // IsWork -- возвращает признак работы
  195. func (sf *ClientBusHttp) IsWork() bool {
  196. return sf.bus.IsWork()
  197. }