kbus_http_test.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563
  1. package kbus_http
  2. import (
  3. "encoding/json"
  4. "io"
  5. "net/http"
  6. "os"
  7. "strings"
  8. "testing"
  9. . "gitp78su.ipnodns.ru/svi/kern/v4/lev1"
  10. "gitp78su.ipnodns.ru/svi/kern/v4/lev1/kbus_msg/msg_pub"
  11. "gitp78su.ipnodns.ru/svi/kern/v4/lev1/kbus_msg/msg_serve"
  12. "gitp78su.ipnodns.ru/svi/kern/v4/lev1/kbus_msg/msg_sub"
  13. "gitp78su.ipnodns.ru/svi/kern/v4/lev1/kbus_msg/msg_unsub"
  14. "gitp78su.ipnodns.ru/svi/kern/v4/lev2/kctx"
  15. "gitp78su.ipnodns.ru/svi/kern/v4/lev2/kserv_http"
  16. "gitp78su.ipnodns.ru/svi/kern/v4/lev2/mock_hand_serve"
  17. "gitp78su.ipnodns.ru/svi/kern/v4/lev2/mock_hand_sub_local"
  18. )
  19. type tester struct {
  20. t *testing.T
  21. handSub *mock_hand_sub_local.MockHandlerSub
  22. handServ *mock_hand_serve.MockHandlerServe
  23. }
  24. func TestKernelBusHttp(t *testing.T) {
  25. sf := &tester{
  26. t: t,
  27. handSub: mock_hand_sub_local.NewMockHandlerSub("topic_sub", "http://localhost:18200/bus/pub"),
  28. handServ: mock_hand_serve.NewMockHandlerServe("topic_serv", "name_serv"),
  29. }
  30. ctx := kctx.GetKernelCtx().Hassert("TestKernelBusHttp()")
  31. ctx.Set("monolitName", "test_monolit", "comment").Hassert("TestKernelBusHttp()")
  32. sf.get()
  33. sf.req()
  34. sf.sub()
  35. sf.pub()
  36. sf.unsub()
  37. }
  38. // Запрос на отписку.
  39. func (sf *tester) unsub() {
  40. sf.t.Log("unsub")
  41. sf.unsubBad1()
  42. sf.unsubBad2()
  43. sf.unsubGood1()
  44. sf.unsubBad3()
  45. sf.unsubGood2()
  46. }
  47. func (sf *tester) unsubGood2() {
  48. sf.t.Log("unsubGood2")
  49. res := Bus_.Subscribe(sf.handSub)
  50. if res.IsErr() {
  51. sf.t.Fatalf("unsubGood1(): err=%v", res.Err())
  52. }
  53. req := &msg_unsub.UnsubReq{
  54. Name_: sf.handSub.Name_,
  55. Uuid_: "test_uuid",
  56. }
  57. binReq, _ := json.MarshalIndent(req, "", " ")
  58. body := strings.NewReader(string(binReq))
  59. resServ := kserv_http.GetKernelServHttp()
  60. fibApp := resServ.Hassert("unsubGood2").Fiber()
  61. hReq, err := http.NewRequest("POST", "/bus/unsub", body)
  62. if err != nil {
  63. sf.t.Fatalf("unsubGood2(): err=%v", err)
  64. }
  65. hReq.Header.Add("Content-Type", "application/json")
  66. _resp, err := fibApp.Test(hReq)
  67. if err != nil {
  68. sf.t.Fatalf("unsubGood2(): after request, err=%v", err)
  69. }
  70. if _resp.StatusCode != 200 {
  71. sf.t.Fatalf("unsubGood2(): statusCode(%v)!=200", _resp.StatusCode)
  72. }
  73. defer _resp.Body.Close()
  74. binBody, _ := io.ReadAll(_resp.Body)
  75. resp := &msg_unsub.UnsubResp{}
  76. err = json.Unmarshal(binBody, resp)
  77. if err != nil {
  78. sf.t.Fatalf("unsubGood2(): err=%v", err)
  79. }
  80. if resp.Status_ != "ok" {
  81. sf.t.Fatalf("unsubGood2(): resp(%v)!='ok'", resp.Status_)
  82. }
  83. }
  84. // Кривой запрос.
  85. func (sf *tester) unsubBad3() {
  86. sf.t.Log("unsubBad3")
  87. req := "tra-la-la"
  88. binReq, _ := json.MarshalIndent(req, "", " ")
  89. body := strings.NewReader(string(binReq))
  90. resServ := kserv_http.GetKernelServHttp()
  91. fibApp := resServ.Hassert("unsubBad3").Fiber()
  92. hReq, err := http.NewRequest("POST", "/bus/unsub", body)
  93. hReq.Header.Add("Content-Type", "application/json")
  94. if err != nil {
  95. sf.t.Fatalf("unsubBad3(): err=%v", err)
  96. }
  97. _resp, err := fibApp.Test(hReq)
  98. if err != nil {
  99. sf.t.Fatalf("unsubBad3(): after request, err=%v", err)
  100. }
  101. defer _resp.Body.Close()
  102. if _resp.StatusCode == 200 {
  103. sf.t.Fatalf("unsubBad3(): statusCode(%v)==200", _resp.StatusCode)
  104. }
  105. }
  106. func (sf *tester) unsubGood1() {
  107. sf.t.Log("unsubGood1")
  108. res := Bus_.Subscribe(sf.handSub)
  109. if res.IsErr() {
  110. sf.t.Fatalf("unsubGood1(): err=%v", res.Err())
  111. }
  112. count := 0
  113. for count < 100 {
  114. SleepMs()
  115. count++
  116. }
  117. req := &msg_unsub.UnsubReq{
  118. Name_: sf.handSub.Name_,
  119. Uuid_: "test_uuid",
  120. }
  121. resp := Bus_.processUnsubRequest(req)
  122. if resp.Status_ != "ok" {
  123. sf.t.Fatalf("unsubGood1(): status(%v)!='ok'", resp.Status_)
  124. }
  125. }
  126. // Все поля на месте, нет подписчика.
  127. func (sf *tester) unsubBad2() {
  128. sf.t.Log("unsubBad2")
  129. Bus_.Unsubscribe(sf.handSub)
  130. count := 0
  131. for count < 100 {
  132. SleepMs()
  133. count++
  134. }
  135. req := &msg_unsub.UnsubReq{
  136. Name_: sf.handSub.Name_,
  137. Uuid_: "test_uuid",
  138. }
  139. resp := Bus_.processUnsubRequest(req)
  140. if resp.Status_ == "ok" {
  141. sf.t.Fatalf("unsubBad2(): status(%v)=='ok'", resp.Status_)
  142. }
  143. }
  144. // Нет полей для процесса отписки.
  145. func (sf *tester) unsubBad1() {
  146. sf.t.Log("unsubBad1")
  147. defer func() {
  148. if _panic := recover(); _panic == nil {
  149. sf.t.Fatalf("unsubBad1(): panic==nil")
  150. }
  151. }()
  152. req := &msg_unsub.UnsubReq{}
  153. _ = Bus_.processUnsubRequest(req)
  154. }
  155. // Запрос на публикацию.
  156. func (sf *tester) pub() {
  157. sf.t.Log("pub")
  158. sf.pubBad1()
  159. sf.pubGood1()
  160. sf.pubBad2()
  161. sf.pubBad3()
  162. sf.pubGood2()
  163. }
  164. func (sf *tester) pubGood2() {
  165. sf.t.Log("pubGood2")
  166. req := &msg_pub.PublishReq{
  167. Topic_: "topic_sub",
  168. Uuid_: "test_uuid",
  169. BinMsg_: []byte("http_pub"),
  170. }
  171. binReq, _ := json.MarshalIndent(req, "", " ")
  172. body := strings.NewReader(string(binReq))
  173. resServ := kserv_http.GetKernelServHttp()
  174. fibApp := resServ.Hassert("pubGood2").Fiber()
  175. hReq, err := http.NewRequest("POST", "/bus/pub", body)
  176. hReq.Header.Add("Content-Type", "application/json")
  177. if err != nil {
  178. sf.t.Fatalf("pubGood2(): err=%v", err)
  179. }
  180. _resp, err := fibApp.Test(hReq)
  181. if err != nil {
  182. sf.t.Fatalf("pubGood2(): after request, err=%v", err)
  183. }
  184. if _resp.StatusCode != 200 {
  185. sf.t.Fatalf("pubGood2(): statusCode(%v)!=200", _resp.StatusCode)
  186. }
  187. defer _resp.Body.Close()
  188. binBody, _ := io.ReadAll(_resp.Body)
  189. resp := &msg_pub.PublishResp{}
  190. err = json.Unmarshal(binBody, resp)
  191. if err != nil {
  192. sf.t.Fatalf("pubGood2(): err=%v", err)
  193. }
  194. if resp.Status_ != "ok" {
  195. sf.t.Fatalf("pubGood2(): resp(%v)!='ok'", resp.Status_)
  196. }
  197. }
  198. // Кривой запрос.
  199. func (sf *tester) pubBad3() {
  200. sf.t.Log("pubBad3")
  201. req := "tra-la-la"
  202. binReq, _ := json.MarshalIndent(req, "", " ")
  203. body := strings.NewReader(string(binReq))
  204. resServ := kserv_http.GetKernelServHttp()
  205. fibApp := resServ.Hassert("pubBad3").Fiber()
  206. hReq, err := http.NewRequest("POST", "/bus/pub", body)
  207. hReq.Header.Add("Content-Type", "application/json")
  208. if err != nil {
  209. sf.t.Fatalf("pubBad3(): err=%v", err)
  210. }
  211. _resp, err := fibApp.Test(hReq)
  212. if err != nil {
  213. sf.t.Fatalf("pubBad3(): after request, err=%v", err)
  214. }
  215. defer _resp.Body.Close()
  216. if _resp.StatusCode == 200 {
  217. sf.t.Fatalf("pubBad3(): statusCode(%v)==200", _resp.StatusCode)
  218. }
  219. }
  220. // Что-то случилось внутри шины.
  221. func (sf *tester) pubBad2() {
  222. sf.t.Log("pubBad2")
  223. Bus_.IsWork_.Reset()
  224. defer Bus_.IsWork_.Set()
  225. req := &msg_pub.PublishReq{
  226. Topic_: "topic_sub",
  227. Uuid_: "test_uuid",
  228. BinMsg_: []byte("test_pub"),
  229. }
  230. resp := Bus_.processPublish(req)
  231. if resp.Status_ == "ok" {
  232. sf.t.Fatalf("pubBad2(): status(%v)=='ok'", resp.Status_)
  233. }
  234. }
  235. // Все поля на месте.
  236. func (sf *tester) pubGood1() {
  237. sf.t.Log("pubGood1")
  238. res := Bus_.Subscribe(sf.handSub)
  239. if res.IsErr() {
  240. sf.t.Fatalf("pubGood1(): err=%v", res.Err())
  241. }
  242. req := &msg_pub.PublishReq{
  243. Topic_: "topic_sub",
  244. Uuid_: "test_uuid",
  245. BinMsg_: []byte("test_pub"),
  246. }
  247. _ = Bus_.processPublish(req)
  248. for {
  249. SleepMs()
  250. msg := sf.handSub.Msg()
  251. if msg != "" {
  252. break
  253. }
  254. }
  255. msg := sf.handSub.Msg()
  256. if msg != "test_pub" {
  257. sf.t.Fatalf("pubGood1(): msg(%v)!='test_pub'", msg)
  258. }
  259. }
  260. // Нет полей для процесса публикации.
  261. func (sf *tester) pubBad1() {
  262. sf.t.Log("pubBad1")
  263. defer func() {
  264. if _panic := recover(); _panic == nil {
  265. sf.t.Fatalf("pubBad1(): panic==nil")
  266. }
  267. }()
  268. req := &msg_pub.PublishReq{}
  269. _ = Bus_.processPublish(req)
  270. }
  271. // Запрос подписки на топик.
  272. func (sf *tester) sub() {
  273. sf.t.Log("sub")
  274. sf.subBad1()
  275. sf.subBad2()
  276. sf.subGood1()
  277. sf.subBad3()
  278. sf.subGood2()
  279. }
  280. // Полный процесс подписки.
  281. func (sf *tester) subGood2() {
  282. sf.t.Log("subGood2")
  283. req := &msg_sub.SubscribeReq{
  284. Topic_: "topic_serv",
  285. Uuid_: "test_uuid",
  286. WebHook_: "http://localhost:18200/bus/pub/",
  287. }
  288. binReq, _ := json.MarshalIndent(req, "", " ")
  289. body := strings.NewReader(string(binReq))
  290. resServ := kserv_http.GetKernelServHttp()
  291. fibApp := resServ.Hassert("subGood2").Fiber()
  292. hReq, err := http.NewRequest("POST", "/bus/sub", body)
  293. hReq.Header.Add("Content-Type", "application/json")
  294. if err != nil {
  295. sf.t.Fatalf("subBad1(): err=%v", err)
  296. }
  297. _resp, err := fibApp.Test(hReq)
  298. if err != nil {
  299. sf.t.Fatalf("subBad1(): after request, err=%v", err)
  300. }
  301. if _resp.StatusCode != 200 {
  302. sf.t.Fatalf("subBad1(): statusCode(%v)!=200", _resp.StatusCode)
  303. }
  304. defer _resp.Body.Close()
  305. binBody, _ := io.ReadAll(_resp.Body)
  306. resp := &msg_sub.SubscribeResp{}
  307. err = json.Unmarshal(binBody, resp)
  308. if err != nil {
  309. sf.t.Fatalf("subBad1(): err=%v", err)
  310. }
  311. if resp.Status_ != "ok" {
  312. sf.t.Fatalf("subBad1(): resp(%v)!='ok'", resp.Status_)
  313. }
  314. }
  315. // Отключена базовая шина.
  316. func (sf *tester) subBad3() {
  317. sf.t.Log("subBad3")
  318. req := &msg_sub.SubscribeReq{
  319. Topic_: "topic_serv",
  320. Uuid_: "test_uuid",
  321. WebHook_: "http://localhost:18200/bus/pub/",
  322. }
  323. defer func() {
  324. if _panic := recover(); _panic != nil {
  325. sf.t.Fatalf("subBad3(): panic!=nil")
  326. }
  327. }()
  328. // _bus := kernel_bus_base.GetKernelBusBase()
  329. Bus_.IsWork_.Reset()
  330. defer Bus_.IsWork_.Set()
  331. resp := Bus_.processSubscribe(req)
  332. if resp.Status_ == "ok" {
  333. sf.t.Fatalf("subBad3(): resp==ok")
  334. }
  335. }
  336. // Проверка полей запроса в процессе подписки.
  337. func (sf *tester) subGood1() {
  338. sf.t.Log("subGood1")
  339. req := &msg_sub.SubscribeReq{
  340. Topic_: "topic_serv",
  341. Uuid_: "test_uuid",
  342. WebHook_: "http://localhost:18200/bus/",
  343. }
  344. defer func() {
  345. if _panic := recover(); _panic != nil {
  346. sf.t.Fatalf("subGood1(): panic!=nil")
  347. }
  348. }()
  349. _ = Bus_.processSubscribe(req)
  350. }
  351. // Проверка кривых полей запроса в процессе подписки.
  352. func (sf *tester) subBad2() {
  353. sf.t.Log("subBad2")
  354. req := &msg_sub.SubscribeReq{
  355. Topic_: "",
  356. }
  357. defer func() {
  358. if _panic := recover(); _panic == nil {
  359. sf.t.Fatalf("subBad2(): panic==nil")
  360. }
  361. }()
  362. _ = Bus_.processSubscribe(req)
  363. }
  364. // Кривой запрос.
  365. func (sf *tester) subBad1() {
  366. sf.t.Log("subBad1")
  367. req := "tra-ta-ta"
  368. binReq, _ := json.MarshalIndent(req, "", " ")
  369. body := strings.NewReader(string(binReq))
  370. resServ := kserv_http.GetKernelServHttp()
  371. fibApp := resServ.Hassert("subBad1").Fiber()
  372. hReq, err := http.NewRequest("POST", "/bus/sub", body)
  373. hReq.Header.Add("Content-Type", "application/json")
  374. if err != nil {
  375. sf.t.Fatalf("subBad1(): err=%v", err)
  376. }
  377. _resp, err := fibApp.Test(hReq)
  378. if err != nil {
  379. sf.t.Fatalf("subBad1(): after request, err=%v", err)
  380. }
  381. defer _resp.Body.Close()
  382. if _resp.StatusCode != 400 {
  383. sf.t.Fatalf("subBad1(): statusCode(%v)!=400", _resp.StatusCode)
  384. }
  385. }
  386. // Входящий запрос.
  387. func (sf *tester) req() {
  388. sf.t.Log("req")
  389. sf.reqBad1()
  390. sf.reqBad2()
  391. sf.reqBad3()
  392. sf.reqGood1()
  393. sf.reqBad4()
  394. }
  395. // Что-то с обработчиком.
  396. func (sf *tester) reqBad4() {
  397. sf.t.Log("reqBad4")
  398. sf.handServ.IsBad_.Set()
  399. defer sf.handServ.IsBad_.Reset()
  400. req := &msg_serve.ServeReq{
  401. Topic_: sf.handServ.Topic_,
  402. Uuid_: "test_uuid",
  403. BinReq_: []byte("test_msg"),
  404. }
  405. binReq, _ := json.MarshalIndent(req, "", " ")
  406. body := strings.NewReader(string(binReq))
  407. resServ := kserv_http.GetKernelServHttp()
  408. fibApp := resServ.Hassert("reqBad4").Fiber()
  409. hReq, err := http.NewRequest("POST", "/bus/request", body)
  410. hReq.Header.Add("Content-Type", "application/json")
  411. if err != nil {
  412. sf.t.Fatalf("reqBad4(): err=%v", err)
  413. }
  414. _resp, err := fibApp.Test(hReq)
  415. if err != nil {
  416. sf.t.Fatalf("reqBad4(): after request, err=%v", err)
  417. }
  418. if _resp.StatusCode != 200 {
  419. sf.t.Fatalf("reqBad4(): statusCode(%v)!=200", _resp.StatusCode)
  420. }
  421. defer _resp.Body.Close()
  422. binBody, _ := io.ReadAll(_resp.Body)
  423. resp := &msg_serve.ServeResp{}
  424. err = json.Unmarshal(binBody, resp)
  425. if err != nil {
  426. sf.t.Fatalf("reqBad4(): err=%v", err)
  427. }
  428. if resp.Status_ == "ok" {
  429. sf.t.Fatalf("reqBad4(): status(%v)=='ok'", resp.Status_)
  430. }
  431. }
  432. func (sf *tester) reqGood1() {
  433. sf.t.Log("reqGood1")
  434. req := &msg_serve.ServeReq{
  435. Topic_: sf.handServ.Topic_,
  436. Uuid_: "test_uuid",
  437. BinReq_: []byte("test_msg"),
  438. }
  439. binReq, _ := json.MarshalIndent(req, "", " ")
  440. body := strings.NewReader(string(binReq))
  441. resServ := kserv_http.GetKernelServHttp()
  442. fibApp := resServ.Hassert("reqGood1").Fiber()
  443. hReq, err := http.NewRequest("POST", "/bus/request", body)
  444. hReq.Header.Add("Content-Type", "application/json")
  445. if err != nil {
  446. sf.t.Fatalf("reqGood1(): err=%v", err)
  447. }
  448. _resp, err := fibApp.Test(hReq)
  449. if err != nil {
  450. sf.t.Fatalf("reqGood1(): after request, err=%v", err)
  451. }
  452. if _resp.StatusCode != 200 {
  453. sf.t.Fatalf("reqGood1(): statusCode(%v)!=200", _resp.StatusCode)
  454. }
  455. defer _resp.Body.Close()
  456. binBody, _ := io.ReadAll(_resp.Body)
  457. resp := &msg_serve.ServeResp{}
  458. err = json.Unmarshal(binBody, resp)
  459. if err != nil {
  460. sf.t.Fatalf("reqGood1(): err=%v", err)
  461. }
  462. if string(resp.BinResp_) != "test_msg" {
  463. sf.t.Fatalf("reqGood1(): resp(%v)!='test_msg'", string(resp.BinResp_))
  464. }
  465. }
  466. // Нет такого топика для запросов.
  467. func (sf *tester) reqBad3() {
  468. sf.t.Log("reqBad3")
  469. req := &msg_serve.ServeReq{
  470. Topic_: "bad_topic",
  471. Uuid_: "test_uuid",
  472. BinReq_: []byte("test_msg"),
  473. }
  474. binReq, _ := json.MarshalIndent(req, "", " ")
  475. body := strings.NewReader(string(binReq))
  476. resServ := kserv_http.GetKernelServHttp()
  477. fibApp := resServ.Hassert("reqBad3").Fiber()
  478. hReq, err := http.NewRequest("POST", "/bus/request", body)
  479. if err != nil {
  480. sf.t.Fatalf("reqBad3(): err=%v", err)
  481. }
  482. resp, err := fibApp.Test(hReq)
  483. if err != nil {
  484. sf.t.Fatalf("reqBad3(): after request, err=%v", err)
  485. }
  486. defer resp.Body.Close()
  487. if resp.StatusCode != 400 {
  488. sf.t.Fatalf("reqBad3(): statusCode(%v)!=400", resp.StatusCode)
  489. }
  490. }
  491. // Нет тела запроса.
  492. func (sf *tester) reqBad2() {
  493. sf.t.Log("reqBad1")
  494. body := strings.NewReader("test_msg")
  495. resServ := kserv_http.GetKernelServHttp()
  496. fibApp := resServ.Hassert("reqBad2").Fiber()
  497. hReq, err := http.NewRequest("POST", "/bus/request", body)
  498. if err != nil {
  499. sf.t.Fatalf("reqBad1(): err=%v", err)
  500. }
  501. resp, err := fibApp.Test(hReq)
  502. if err != nil {
  503. sf.t.Fatalf("reqBad1(): after request, err=%v", err)
  504. }
  505. defer resp.Body.Close()
  506. if resp.StatusCode != 400 {
  507. sf.t.Fatalf("reqBad1(): statusCode(%v)!=400", resp.StatusCode)
  508. }
  509. }
  510. // Отсутствуют поля в запросе.
  511. func (sf *tester) reqBad1() {
  512. sf.t.Log("reqBad1")
  513. defer func() {
  514. if _panic := recover(); _panic == nil {
  515. sf.t.Fatalf("reqBad1(): panic==nil")
  516. }
  517. }()
  518. Bus_.processSendRequest(nil)
  519. }
  520. // Получает локальную шину.
  521. func (sf *tester) get() {
  522. sf.t.Log("get")
  523. _ = MakeEnv()
  524. _ = os.Unsetenv("LOCAL_HTTP_URL")
  525. os.Setenv("LOCAL_HTTP_URL", "http://localhost:18312/")
  526. _ = GetKernelBusHttp()
  527. if Bus_ == nil {
  528. sf.t.Fatalf("get(): bus==nil")
  529. }
  530. _ = GetKernelBusHttp()
  531. Bus_.RegisterServe(sf.handServ).Hassert("get()")
  532. }