kbus_http_test.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558
  1. package kbus_http
  2. import (
  3. "encoding/json"
  4. "io"
  5. "net/http"
  6. "os"
  7. "strings"
  8. "testing"
  9. . "gitp78su.ipnodns.ru/svi/kern/v4/lev1"
  10. "gitp78su.ipnodns.ru/svi/kern/v4/lev1/kbus_msg/msg_pub"
  11. "gitp78su.ipnodns.ru/svi/kern/v4/lev1/kbus_msg/msg_serve"
  12. "gitp78su.ipnodns.ru/svi/kern/v4/lev1/kbus_msg/msg_sub"
  13. "gitp78su.ipnodns.ru/svi/kern/v4/lev1/kbus_msg/msg_unsub"
  14. "gitp78su.ipnodns.ru/svi/kern/v4/lev2/kctx"
  15. "gitp78su.ipnodns.ru/svi/kern/v4/lev2/kserv_http"
  16. "gitp78su.ipnodns.ru/svi/kern/v4/lev2/mock_hand_serve"
  17. "gitp78su.ipnodns.ru/svi/kern/v4/lev2/mock_hand_sub_local"
  18. )
  19. type tester struct {
  20. t *testing.T
  21. handSub *mock_hand_sub_local.MockHandlerSub
  22. handServ *mock_hand_serve.MockHandlerServe
  23. }
  24. func TestKernelBusHttp(t *testing.T) {
  25. sf := &tester{
  26. t: t,
  27. handSub: mock_hand_sub_local.NewMockHandlerSub("topic_sub", "http://localhost:18200/bus/pub"),
  28. handServ: mock_hand_serve.NewMockHandlerServe("topic_serv", "name_serv"),
  29. }
  30. ctx := kctx.GetKernelCtx().Hassert("TestKernelBusHttp()")
  31. ctx.Set("monolitName", "test_monolit", "comment").Hassert("TestKernelBusHttp()")
  32. sf.get()
  33. sf.req()
  34. sf.sub()
  35. sf.pub()
  36. sf.unsub()
  37. }
  38. // Запрос на отписку
  39. func (sf *tester) unsub() {
  40. sf.t.Log("unsub")
  41. sf.unsubBad1()
  42. sf.unsubBad2()
  43. sf.unsubGood1()
  44. sf.unsubBad3()
  45. sf.unsubGood2()
  46. }
  47. func (sf *tester) unsubGood2() {
  48. sf.t.Log("unsubGood2")
  49. res := Bus_.Subscribe(sf.handSub)
  50. if res.IsErr() {
  51. sf.t.Fatalf("unsubGood1(): err=%v", res.Err())
  52. }
  53. req := &msg_unsub.UnsubReq{
  54. Name_: sf.handSub.Name_,
  55. Uuid_: "test_uuid",
  56. }
  57. binReq, _ := json.MarshalIndent(req, "", " ")
  58. body := strings.NewReader(string(binReq))
  59. resServ := kserv_http.GetKernelServHttp()
  60. fibApp := resServ.Hassert("unsubGood2").Fiber()
  61. hReq, err := http.NewRequest("POST", "/bus/unsub", body)
  62. if err != nil {
  63. sf.t.Fatalf("unsubGood2(): err=%v", err)
  64. }
  65. hReq.Header.Add("Content-Type", "application/json")
  66. _resp, err := fibApp.Test(hReq)
  67. if err != nil {
  68. sf.t.Fatalf("unsubGood2(): after request, err=%v", err)
  69. }
  70. if _resp.StatusCode != 200 {
  71. sf.t.Fatalf("unsubGood2(): statusCode(%v)!=200", _resp.StatusCode)
  72. }
  73. defer _resp.Body.Close()
  74. binBody, _ := io.ReadAll(_resp.Body)
  75. resp := &msg_unsub.UnsubResp{}
  76. err = json.Unmarshal(binBody, resp)
  77. if err != nil {
  78. sf.t.Fatalf("unsubGood2(): err=%v", err)
  79. }
  80. if string(resp.Status_) != "ok" {
  81. sf.t.Fatalf("unsubGood2(): resp(%v)!='ok'", string(resp.Status_))
  82. }
  83. }
  84. // Кривой запрос
  85. func (sf *tester) unsubBad3() {
  86. sf.t.Log("unsubBad3")
  87. req := "tra-la-la"
  88. binReq, _ := json.MarshalIndent(req, "", " ")
  89. body := strings.NewReader(string(binReq))
  90. resServ := kserv_http.GetKernelServHttp()
  91. fibApp := resServ.Hassert("unsubBad3").Fiber()
  92. hReq, err := http.NewRequest("POST", "/bus/unsub", body)
  93. hReq.Header.Add("Content-Type", "application/json")
  94. if err != nil {
  95. sf.t.Fatalf("unsubBad3(): err=%v", err)
  96. }
  97. _resp, err := fibApp.Test(hReq)
  98. if err != nil {
  99. sf.t.Fatalf("unsubBad3(): after request, err=%v", err)
  100. }
  101. if _resp.StatusCode == 200 {
  102. sf.t.Fatalf("unsubBad3(): statusCode(%v)==200", _resp.StatusCode)
  103. }
  104. }
  105. func (sf *tester) unsubGood1() {
  106. sf.t.Log("unsubGood1")
  107. res := Bus_.Subscribe(sf.handSub)
  108. if res.IsErr() {
  109. sf.t.Fatalf("unsubGood1(): err=%v", res.Err())
  110. }
  111. count := 0
  112. for count < 100 {
  113. SleepMs()
  114. count++
  115. }
  116. req := &msg_unsub.UnsubReq{
  117. Name_: sf.handSub.Name_,
  118. Uuid_: "test_uuid",
  119. }
  120. resp := Bus_.processUnsubRequest(req)
  121. if resp.Status_ != "ok" {
  122. sf.t.Fatalf("unsubGood1(): status(%v)!='ok'", resp.Status_)
  123. }
  124. }
  125. // Все поля на месте, нет подписчика
  126. func (sf *tester) unsubBad2() {
  127. sf.t.Log("unsubBad2")
  128. Bus_.Unsubscribe(sf.handSub)
  129. count := 0
  130. for count < 100 {
  131. SleepMs()
  132. count++
  133. }
  134. req := &msg_unsub.UnsubReq{
  135. Name_: sf.handSub.Name_,
  136. Uuid_: "test_uuid",
  137. }
  138. resp := Bus_.processUnsubRequest(req)
  139. if resp.Status_ == "ok" {
  140. sf.t.Fatalf("unsubBad2(): status(%v)=='ok'", resp.Status_)
  141. }
  142. }
  143. // Нет полей для процесса отписки
  144. func (sf *tester) unsubBad1() {
  145. sf.t.Log("unsubBad1")
  146. defer func() {
  147. if _panic := recover(); _panic == nil {
  148. sf.t.Fatalf("unsubBad1(): panic==nil")
  149. }
  150. }()
  151. req := &msg_unsub.UnsubReq{}
  152. _ = Bus_.processUnsubRequest(req)
  153. }
  154. // Запрос на публикацию
  155. func (sf *tester) pub() {
  156. sf.t.Log("pub")
  157. sf.pubBad1()
  158. sf.pubGood1()
  159. sf.pubBad2()
  160. sf.pubBad3()
  161. sf.pubGood2()
  162. }
  163. func (sf *tester) pubGood2() {
  164. sf.t.Log("pubGood2")
  165. req := &msg_pub.PublishReq{
  166. Topic_: "topic_sub",
  167. Uuid_: "test_uuid",
  168. BinMsg_: []byte("http_pub"),
  169. }
  170. binReq, _ := json.MarshalIndent(req, "", " ")
  171. body := strings.NewReader(string(binReq))
  172. resServ := kserv_http.GetKernelServHttp()
  173. fibApp := resServ.Hassert("pubGood2").Fiber()
  174. hReq, err := http.NewRequest("POST", "/bus/pub", body)
  175. hReq.Header.Add("Content-Type", "application/json")
  176. if err != nil {
  177. sf.t.Fatalf("pubGood2(): err=%v", err)
  178. }
  179. _resp, err := fibApp.Test(hReq)
  180. if err != nil {
  181. sf.t.Fatalf("pubGood2(): after request, err=%v", err)
  182. }
  183. if _resp.StatusCode != 200 {
  184. sf.t.Fatalf("pubGood2(): statusCode(%v)!=200", _resp.StatusCode)
  185. }
  186. defer _resp.Body.Close()
  187. binBody, _ := io.ReadAll(_resp.Body)
  188. resp := &msg_pub.PublishResp{}
  189. err = json.Unmarshal(binBody, resp)
  190. if err != nil {
  191. sf.t.Fatalf("pubGood2(): err=%v", err)
  192. }
  193. if string(resp.Status_) != "ok" {
  194. sf.t.Fatalf("pubGood2(): resp(%v)!='ok'", string(resp.Status_))
  195. }
  196. }
  197. // Кривой запрос
  198. func (sf *tester) pubBad3() {
  199. sf.t.Log("pubBad3")
  200. req := "tra-la-la"
  201. binReq, _ := json.MarshalIndent(req, "", " ")
  202. body := strings.NewReader(string(binReq))
  203. resServ := kserv_http.GetKernelServHttp()
  204. fibApp := resServ.Hassert("pubBad3").Fiber()
  205. hReq, err := http.NewRequest("POST", "/bus/pub", body)
  206. hReq.Header.Add("Content-Type", "application/json")
  207. if err != nil {
  208. sf.t.Fatalf("pubBad3(): err=%v", err)
  209. }
  210. _resp, err := fibApp.Test(hReq)
  211. if err != nil {
  212. sf.t.Fatalf("pubBad3(): after request, err=%v", err)
  213. }
  214. if _resp.StatusCode == 200 {
  215. sf.t.Fatalf("pubBad3(): statusCode(%v)==200", _resp.StatusCode)
  216. }
  217. }
  218. // Что-то случилось внутри шины
  219. func (sf *tester) pubBad2() {
  220. sf.t.Log("pubBad2")
  221. Bus_.IsWork_.Reset()
  222. defer Bus_.IsWork_.Set()
  223. req := &msg_pub.PublishReq{
  224. Topic_: "topic_sub",
  225. Uuid_: "test_uuid",
  226. BinMsg_: []byte("test_pub"),
  227. }
  228. resp := Bus_.processPublish(req)
  229. if resp.Status_ == "ok" {
  230. sf.t.Fatalf("pubBad2(): status(%v)=='ok'", resp.Status_)
  231. }
  232. }
  233. // Все поля на месте
  234. func (sf *tester) pubGood1() {
  235. sf.t.Log("pubGood1")
  236. res := Bus_.Subscribe(sf.handSub)
  237. if res.IsErr() {
  238. sf.t.Fatalf("pubGood1(): err=%v", res.Err())
  239. }
  240. req := &msg_pub.PublishReq{
  241. Topic_: "topic_sub",
  242. Uuid_: "test_uuid",
  243. BinMsg_: []byte("test_pub"),
  244. }
  245. _ = Bus_.processPublish(req)
  246. for {
  247. SleepMs()
  248. msg := string(sf.handSub.Msg())
  249. if msg != "" {
  250. break
  251. }
  252. }
  253. msg := string(sf.handSub.Msg())
  254. if msg != "test_pub" {
  255. sf.t.Fatalf("pubGood1(): msg(%v)!='test_pub'", msg)
  256. }
  257. }
  258. // Нет полей для процесса публикации
  259. func (sf *tester) pubBad1() {
  260. sf.t.Log("pubBad1")
  261. defer func() {
  262. if _panic := recover(); _panic == nil {
  263. sf.t.Fatalf("pubBad1(): panic==nil")
  264. }
  265. }()
  266. req := &msg_pub.PublishReq{}
  267. _ = Bus_.processPublish(req)
  268. }
  269. // Запрос подписки на топик
  270. func (sf *tester) sub() {
  271. sf.t.Log("sub")
  272. sf.subBad1()
  273. sf.subBad2()
  274. sf.subGood1()
  275. sf.subBad3()
  276. sf.subGood2()
  277. }
  278. // Полный процесс подписки
  279. func (sf *tester) subGood2() {
  280. sf.t.Log("subGood2")
  281. req := &msg_sub.SubscribeReq{
  282. Topic_: "topic_serv",
  283. Uuid_: "test_uuid",
  284. WebHook_: "http://localhost:18200/bus/pub/",
  285. }
  286. binReq, _ := json.MarshalIndent(req, "", " ")
  287. body := strings.NewReader(string(binReq))
  288. resServ := kserv_http.GetKernelServHttp()
  289. fibApp := resServ.Hassert("subGood2").Fiber()
  290. hReq, err := http.NewRequest("POST", "/bus/sub", body)
  291. hReq.Header.Add("Content-Type", "application/json")
  292. if err != nil {
  293. sf.t.Fatalf("subBad1(): err=%v", err)
  294. }
  295. _resp, err := fibApp.Test(hReq)
  296. if err != nil {
  297. sf.t.Fatalf("subBad1(): after request, err=%v", err)
  298. }
  299. if _resp.StatusCode != 200 {
  300. sf.t.Fatalf("subBad1(): statusCode(%v)!=200", _resp.StatusCode)
  301. }
  302. defer _resp.Body.Close()
  303. binBody, _ := io.ReadAll(_resp.Body)
  304. resp := &msg_sub.SubscribeResp{}
  305. err = json.Unmarshal(binBody, resp)
  306. if err != nil {
  307. sf.t.Fatalf("subBad1(): err=%v", err)
  308. }
  309. if string(resp.Status_) != "ok" {
  310. sf.t.Fatalf("subBad1(): resp(%v)!='ok'", string(resp.Status_))
  311. }
  312. }
  313. // Отключена базовая шина
  314. func (sf *tester) subBad3() {
  315. sf.t.Log("subBad3")
  316. req := &msg_sub.SubscribeReq{
  317. Topic_: "topic_serv",
  318. Uuid_: "test_uuid",
  319. WebHook_: "http://localhost:18200/bus/pub/",
  320. }
  321. defer func() {
  322. if _panic := recover(); _panic != nil {
  323. sf.t.Fatalf("subBad3(): panic!=nil")
  324. }
  325. }()
  326. // _bus := kernel_bus_base.GetKernelBusBase()
  327. Bus_.IsWork_.Reset()
  328. defer Bus_.IsWork_.Set()
  329. resp := Bus_.processSubscribe(req)
  330. if resp.Status_ == "ok" {
  331. sf.t.Fatalf("subBad3(): resp==ok")
  332. }
  333. }
  334. // Проверка полей запроса в процессе подписки
  335. func (sf *tester) subGood1() {
  336. sf.t.Log("subGood1")
  337. req := &msg_sub.SubscribeReq{
  338. Topic_: "topic_serv",
  339. Uuid_: "test_uuid",
  340. WebHook_: "http://localhost:18200/bus/",
  341. }
  342. defer func() {
  343. if _panic := recover(); _panic != nil {
  344. sf.t.Fatalf("subGood1(): panic!=nil")
  345. }
  346. }()
  347. _ = Bus_.processSubscribe(req)
  348. }
  349. // Проверка кривых полей запроса в процессе подписки
  350. func (sf *tester) subBad2() {
  351. sf.t.Log("subBad2")
  352. req := &msg_sub.SubscribeReq{
  353. Topic_: "",
  354. }
  355. defer func() {
  356. if _panic := recover(); _panic == nil {
  357. sf.t.Fatalf("subBad2(): panic==nil")
  358. }
  359. }()
  360. _ = Bus_.processSubscribe(req)
  361. }
  362. // Кривой запрос
  363. func (sf *tester) subBad1() {
  364. sf.t.Log("subBad1")
  365. req := "tra-ta-ta"
  366. binReq, _ := json.MarshalIndent(req, "", " ")
  367. body := strings.NewReader(string(binReq))
  368. resServ := kserv_http.GetKernelServHttp()
  369. fibApp := resServ.Hassert("subBad1").Fiber()
  370. hReq, err := http.NewRequest("POST", "/bus/sub", body)
  371. hReq.Header.Add("Content-Type", "application/json")
  372. if err != nil {
  373. sf.t.Fatalf("subBad1(): err=%v", err)
  374. }
  375. _resp, err := fibApp.Test(hReq)
  376. if err != nil {
  377. sf.t.Fatalf("subBad1(): after request, err=%v", err)
  378. }
  379. if _resp.StatusCode != 400 {
  380. sf.t.Fatalf("subBad1(): statusCode(%v)!=400", _resp.StatusCode)
  381. }
  382. }
  383. // Входящий запрос
  384. func (sf *tester) req() {
  385. sf.t.Log("req")
  386. sf.reqBad1()
  387. sf.reqBad2()
  388. sf.reqBad3()
  389. sf.reqGood1()
  390. sf.reqBad4()
  391. }
  392. // Что-то с обработчиком
  393. func (sf *tester) reqBad4() {
  394. sf.t.Log("reqBad4")
  395. sf.handServ.IsBad_.Set()
  396. defer sf.handServ.IsBad_.Reset()
  397. req := &msg_serve.ServeReq{
  398. Topic_: sf.handServ.Topic_,
  399. Uuid_: "test_uuid",
  400. BinReq_: []byte("test_msg"),
  401. }
  402. binReq, _ := json.MarshalIndent(req, "", " ")
  403. body := strings.NewReader(string(binReq))
  404. resServ := kserv_http.GetKernelServHttp()
  405. fibApp := resServ.Hassert("reqBad4").Fiber()
  406. hReq, err := http.NewRequest("POST", "/bus/request", body)
  407. hReq.Header.Add("Content-Type", "application/json")
  408. if err != nil {
  409. sf.t.Fatalf("reqBad4(): err=%v", err)
  410. }
  411. _resp, err := fibApp.Test(hReq)
  412. if err != nil {
  413. sf.t.Fatalf("reqBad4(): after request, err=%v", err)
  414. }
  415. if _resp.StatusCode != 200 {
  416. sf.t.Fatalf("reqBad4(): statusCode(%v)!=200", _resp.StatusCode)
  417. }
  418. defer _resp.Body.Close()
  419. binBody, _ := io.ReadAll(_resp.Body)
  420. resp := &msg_serve.ServeResp{}
  421. err = json.Unmarshal(binBody, resp)
  422. if err != nil {
  423. sf.t.Fatalf("reqBad4(): err=%v", err)
  424. }
  425. if string(resp.Status_) == "ok" {
  426. sf.t.Fatalf("reqBad4(): status(%v)=='ok'", string(resp.Status_))
  427. }
  428. }
  429. func (sf *tester) reqGood1() {
  430. sf.t.Log("reqGood1")
  431. req := &msg_serve.ServeReq{
  432. Topic_: sf.handServ.Topic_,
  433. Uuid_: "test_uuid",
  434. BinReq_: []byte("test_msg"),
  435. }
  436. binReq, _ := json.MarshalIndent(req, "", " ")
  437. body := strings.NewReader(string(binReq))
  438. resServ := kserv_http.GetKernelServHttp()
  439. fibApp := resServ.Hassert("reqGood1").Fiber()
  440. hReq, err := http.NewRequest("POST", "/bus/request", body)
  441. hReq.Header.Add("Content-Type", "application/json")
  442. if err != nil {
  443. sf.t.Fatalf("reqGood1(): err=%v", err)
  444. }
  445. _resp, err := fibApp.Test(hReq)
  446. if err != nil {
  447. sf.t.Fatalf("reqGood1(): after request, err=%v", err)
  448. }
  449. if _resp.StatusCode != 200 {
  450. sf.t.Fatalf("reqGood1(): statusCode(%v)!=200", _resp.StatusCode)
  451. }
  452. defer _resp.Body.Close()
  453. binBody, _ := io.ReadAll(_resp.Body)
  454. resp := &msg_serve.ServeResp{}
  455. err = json.Unmarshal(binBody, resp)
  456. if err != nil {
  457. sf.t.Fatalf("reqGood1(): err=%v", err)
  458. }
  459. if string(resp.BinResp_) != "test_msg" {
  460. sf.t.Fatalf("reqGood1(): resp(%v)!='test_msg'", string(resp.BinResp_))
  461. }
  462. }
  463. // Нет такого топика для запросов
  464. func (sf *tester) reqBad3() {
  465. sf.t.Log("reqBad3")
  466. req := &msg_serve.ServeReq{
  467. Topic_: "bad_topic",
  468. Uuid_: "test_uuid",
  469. BinReq_: []byte("test_msg"),
  470. }
  471. binReq, _ := json.MarshalIndent(req, "", " ")
  472. body := strings.NewReader(string(binReq))
  473. resServ := kserv_http.GetKernelServHttp()
  474. fibApp := resServ.Hassert("reqBad3").Fiber()
  475. hReq, err := http.NewRequest("POST", "/bus/request", body)
  476. if err != nil {
  477. sf.t.Fatalf("reqBad3(): err=%v", err)
  478. }
  479. resp, err := fibApp.Test(hReq)
  480. if err != nil {
  481. sf.t.Fatalf("reqBad3(): after request, err=%v", err)
  482. }
  483. if resp.StatusCode != 400 {
  484. sf.t.Fatalf("reqBad3(): statusCode(%v)!=400", resp.StatusCode)
  485. }
  486. }
  487. // Нет тела запроса
  488. func (sf *tester) reqBad2() {
  489. sf.t.Log("reqBad1")
  490. body := strings.NewReader("test_msg")
  491. resServ := kserv_http.GetKernelServHttp()
  492. fibApp := resServ.Hassert("reqBad2").Fiber()
  493. hReq, err := http.NewRequest("POST", "/bus/request", body)
  494. if err != nil {
  495. sf.t.Fatalf("reqBad1(): err=%v", err)
  496. }
  497. resp, err := fibApp.Test(hReq)
  498. if err != nil {
  499. sf.t.Fatalf("reqBad1(): after request, err=%v", err)
  500. }
  501. if resp.StatusCode != 400 {
  502. sf.t.Fatalf("reqBad1(): statusCode(%v)!=400", resp.StatusCode)
  503. }
  504. }
  505. // Отсутствуют поля в запросе
  506. func (sf *tester) reqBad1() {
  507. sf.t.Log("reqBad1")
  508. defer func() {
  509. if _panic := recover(); _panic == nil {
  510. sf.t.Fatalf("reqBad1(): panic==nil")
  511. }
  512. }()
  513. Bus_.processSendRequest(nil)
  514. }
  515. // Получает локальную шину
  516. func (sf *tester) get() {
  517. sf.t.Log("get")
  518. _ = MakeEnv()
  519. _ = os.Unsetenv("LOCAL_HTTP_URL")
  520. os.Setenv("LOCAL_HTTP_URL", "http://localhost:18312/")
  521. _ = GetKernelBusHttp()
  522. if Bus_ == nil {
  523. sf.t.Fatalf("get(): bus==nil")
  524. }
  525. _ = GetKernelBusHttp()
  526. Bus_.RegisterServe(sf.handServ).Hassert("get()")
  527. }