kbus_http_test.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549
  1. package kbus_http
  2. import (
  3. "encoding/json"
  4. "io"
  5. "net/http"
  6. "os"
  7. "strings"
  8. "testing"
  9. . "gitp78su.ipnodns.ru/svi/kern/v2/kc/helpers"
  10. "gitp78su.ipnodns.ru/svi/kern/v2/krn/kbus/kbus_msg/msg_pub"
  11. "gitp78su.ipnodns.ru/svi/kern/v2/krn/kbus/kbus_msg/msg_serve"
  12. "gitp78su.ipnodns.ru/svi/kern/v2/krn/kbus/kbus_msg/msg_sub"
  13. "gitp78su.ipnodns.ru/svi/kern/v2/krn/kbus/kbus_msg/msg_unsub"
  14. "gitp78su.ipnodns.ru/svi/kern/v2/krn/kctx"
  15. "gitp78su.ipnodns.ru/svi/kern/v2/krn/kserv_http"
  16. "gitp78su.ipnodns.ru/svi/kern/v2/mock/mock_env"
  17. "gitp78su.ipnodns.ru/svi/kern/v2/mock/mock_hand_serve"
  18. "gitp78su.ipnodns.ru/svi/kern/v2/mock/mock_hand_sub_local"
  19. )
  20. type tester struct {
  21. t *testing.T
  22. handSub *mock_hand_sub_local.MockHandlerSub
  23. handServ *mock_hand_serve.MockHandlerServe
  24. }
  25. func TestKernelBusHttp(t *testing.T) {
  26. sf := &tester{
  27. t: t,
  28. handSub: mock_hand_sub_local.NewMockHandlerSub("topic_sub", "http://localhost:18200/bus/pub"),
  29. handServ: mock_hand_serve.NewMockHandlerServe("topic_serv", "name_serv"),
  30. }
  31. ctx := kctx.GetKernelCtx()
  32. ctx.Set("monolitName", "test_monolit", "comment")
  33. sf.get()
  34. sf.req()
  35. sf.sub()
  36. sf.pub()
  37. sf.unsub()
  38. }
  39. // Запрос на отписку
  40. func (sf *tester) unsub() {
  41. sf.t.Log("unsub")
  42. sf.unsubBad1()
  43. sf.unsubBad2()
  44. sf.unsubGood1()
  45. sf.unsubBad3()
  46. sf.unsubGood2()
  47. }
  48. func (sf *tester) unsubGood2() {
  49. sf.t.Log("unsubGood2")
  50. res := bus.Subscribe(sf.handSub)
  51. if res.IsErr() {
  52. sf.t.Fatalf("unsubGood1(): err=%v", res.Err())
  53. }
  54. req := &msg_unsub.UnsubReq{
  55. Name_: sf.handSub.Name_,
  56. Uuid_: "test_uuid",
  57. }
  58. binReq, _ := json.MarshalIndent(req, "", " ")
  59. body := strings.NewReader(string(binReq))
  60. fibApp := kserv_http.GetKernelServHttp().Fiber()
  61. hReq, err := http.NewRequest("POST", "/bus/unsub", body)
  62. if err != nil {
  63. sf.t.Fatalf("unsubGood2(): err=%v", err)
  64. }
  65. hReq.Header.Add("Content-Type", "application/json")
  66. _resp, err := fibApp.Test(hReq)
  67. if err != nil {
  68. sf.t.Fatalf("unsubGood2(): after request, err=%v", err)
  69. }
  70. if _resp.StatusCode != 200 {
  71. sf.t.Fatalf("unsubGood2(): statusCode(%v)!=200", _resp.StatusCode)
  72. }
  73. defer _resp.Body.Close()
  74. binBody, _ := io.ReadAll(_resp.Body)
  75. resp := &msg_unsub.UnsubResp{}
  76. err = json.Unmarshal(binBody, resp)
  77. if err != nil {
  78. sf.t.Fatalf("unsubGood2(): err=%v", err)
  79. }
  80. if string(resp.Status_) != "ok" {
  81. sf.t.Fatalf("unsubGood2(): resp(%v)!='ok'", string(resp.Status_))
  82. }
  83. }
  84. // Кривой запрос
  85. func (sf *tester) unsubBad3() {
  86. sf.t.Log("unsubBad3")
  87. req := "tra-la-la"
  88. binReq, _ := json.MarshalIndent(req, "", " ")
  89. body := strings.NewReader(string(binReq))
  90. fibApp := kserv_http.GetKernelServHttp().Fiber()
  91. hReq, err := http.NewRequest("POST", "/bus/unsub", body)
  92. hReq.Header.Add("Content-Type", "application/json")
  93. if err != nil {
  94. sf.t.Fatalf("unsubBad3(): err=%v", err)
  95. }
  96. _resp, err := fibApp.Test(hReq)
  97. if err != nil {
  98. sf.t.Fatalf("unsubBad3(): after request, err=%v", err)
  99. }
  100. if _resp.StatusCode == 200 {
  101. sf.t.Fatalf("unsubBad3(): statusCode(%v)==200", _resp.StatusCode)
  102. }
  103. }
  104. func (sf *tester) unsubGood1() {
  105. sf.t.Log("unsubGood1")
  106. res := bus.Subscribe(sf.handSub)
  107. if res.IsErr() {
  108. sf.t.Fatalf("unsubGood1(): err=%v", res.Err())
  109. }
  110. count := 0
  111. for count < 100 {
  112. SleepMs()
  113. count++
  114. }
  115. req := &msg_unsub.UnsubReq{
  116. Name_: sf.handSub.Name_,
  117. Uuid_: "test_uuid",
  118. }
  119. resp := bus.processUnsubRequest(req)
  120. if resp.Status_ != "ok" {
  121. sf.t.Fatalf("unsubGood1(): status(%v)!='ok'", resp.Status_)
  122. }
  123. }
  124. // Все поля на месте, нет подписчика
  125. func (sf *tester) unsubBad2() {
  126. sf.t.Log("unsubBad2")
  127. bus.Unsubscribe(sf.handSub)
  128. count := 0
  129. for count < 100 {
  130. SleepMs()
  131. count++
  132. }
  133. req := &msg_unsub.UnsubReq{
  134. Name_: sf.handSub.Name_,
  135. Uuid_: "test_uuid",
  136. }
  137. resp := bus.processUnsubRequest(req)
  138. if resp.Status_ == "ok" {
  139. sf.t.Fatalf("unsubBad2(): status(%v)=='ok'", resp.Status_)
  140. }
  141. }
  142. // Нет полей для процесса отписки
  143. func (sf *tester) unsubBad1() {
  144. sf.t.Log("unsubBad1")
  145. defer func() {
  146. if _panic := recover(); _panic == nil {
  147. sf.t.Fatalf("unsubBad1(): panic==nil")
  148. }
  149. }()
  150. req := &msg_unsub.UnsubReq{}
  151. _ = bus.processUnsubRequest(req)
  152. }
  153. // Запрос на публикацию
  154. func (sf *tester) pub() {
  155. sf.t.Log("pub")
  156. sf.pubBad1()
  157. sf.pubGood1()
  158. sf.pubBad2()
  159. sf.pubBad3()
  160. sf.pubGood2()
  161. }
  162. func (sf *tester) pubGood2() {
  163. sf.t.Log("pubGood2")
  164. req := &msg_pub.PublishReq{
  165. Topic_: "topic_sub",
  166. Uuid_: "test_uuid",
  167. BinMsg_: []byte("http_pub"),
  168. }
  169. binReq, _ := json.MarshalIndent(req, "", " ")
  170. body := strings.NewReader(string(binReq))
  171. fibApp := kserv_http.GetKernelServHttp().Fiber()
  172. hReq, err := http.NewRequest("POST", "/bus/pub", body)
  173. hReq.Header.Add("Content-Type", "application/json")
  174. if err != nil {
  175. sf.t.Fatalf("pubGood2(): err=%v", err)
  176. }
  177. _resp, err := fibApp.Test(hReq)
  178. if err != nil {
  179. sf.t.Fatalf("pubGood2(): after request, err=%v", err)
  180. }
  181. if _resp.StatusCode != 200 {
  182. sf.t.Fatalf("pubGood2(): statusCode(%v)!=200", _resp.StatusCode)
  183. }
  184. defer _resp.Body.Close()
  185. binBody, _ := io.ReadAll(_resp.Body)
  186. resp := &msg_pub.PublishResp{}
  187. err = json.Unmarshal(binBody, resp)
  188. if err != nil {
  189. sf.t.Fatalf("pubGood2(): err=%v", err)
  190. }
  191. if string(resp.Status_) != "ok" {
  192. sf.t.Fatalf("pubGood2(): resp(%v)!='ok'", string(resp.Status_))
  193. }
  194. }
  195. // Кривой запрос
  196. func (sf *tester) pubBad3() {
  197. sf.t.Log("pubBad3")
  198. req := "tra-la-la"
  199. binReq, _ := json.MarshalIndent(req, "", " ")
  200. body := strings.NewReader(string(binReq))
  201. fibApp := kserv_http.GetKernelServHttp().Fiber()
  202. hReq, err := http.NewRequest("POST", "/bus/pub", body)
  203. hReq.Header.Add("Content-Type", "application/json")
  204. if err != nil {
  205. sf.t.Fatalf("pubBad3(): err=%v", err)
  206. }
  207. _resp, err := fibApp.Test(hReq)
  208. if err != nil {
  209. sf.t.Fatalf("pubBad3(): after request, err=%v", err)
  210. }
  211. if _resp.StatusCode == 200 {
  212. sf.t.Fatalf("pubBad3(): statusCode(%v)==200", _resp.StatusCode)
  213. }
  214. }
  215. // Что-то случилось внутри шины
  216. func (sf *tester) pubBad2() {
  217. sf.t.Log("pubBad2")
  218. bus.IsWork_.Reset()
  219. defer bus.IsWork_.Set()
  220. req := &msg_pub.PublishReq{
  221. Topic_: "topic_sub",
  222. Uuid_: "test_uuid",
  223. BinMsg_: []byte("test_pub"),
  224. }
  225. resp := bus.processPublish(req)
  226. if resp.Status_ == "ok" {
  227. sf.t.Fatalf("pubBad2(): status(%v)=='ok'", resp.Status_)
  228. }
  229. }
  230. // Все поля на месте
  231. func (sf *tester) pubGood1() {
  232. sf.t.Log("pubGood1")
  233. res := bus.Subscribe(sf.handSub)
  234. if res.IsErr() {
  235. sf.t.Fatalf("pubGood1(): err=%v", res.Err())
  236. }
  237. req := &msg_pub.PublishReq{
  238. Topic_: "topic_sub",
  239. Uuid_: "test_uuid",
  240. BinMsg_: []byte("test_pub"),
  241. }
  242. _ = bus.processPublish(req)
  243. for {
  244. SleepMs()
  245. msg := string(sf.handSub.Msg())
  246. if msg != "" {
  247. break
  248. }
  249. }
  250. msg := string(sf.handSub.Msg())
  251. if msg != "test_pub" {
  252. sf.t.Fatalf("pubGood1(): msg(%v)!='test_pub'", msg)
  253. }
  254. }
  255. // Нет полей для процесса публикации
  256. func (sf *tester) pubBad1() {
  257. sf.t.Log("pubBad1")
  258. defer func() {
  259. if _panic := recover(); _panic == nil {
  260. sf.t.Fatalf("pubBad1(): panic==nil")
  261. }
  262. }()
  263. req := &msg_pub.PublishReq{}
  264. _ = bus.processPublish(req)
  265. }
  266. // Запрос подписки на топик
  267. func (sf *tester) sub() {
  268. sf.t.Log("sub")
  269. sf.subBad1()
  270. sf.subBad2()
  271. sf.subGood1()
  272. sf.subBad3()
  273. sf.subGood2()
  274. }
  275. // Полный процесс подписки
  276. func (sf *tester) subGood2() {
  277. sf.t.Log("subGood2")
  278. req := &msg_sub.SubscribeReq{
  279. Topic_: "topic_serv",
  280. Uuid_: "test_uuid",
  281. WebHook_: "http://localhost:18200/bus/pub/",
  282. }
  283. binReq, _ := json.MarshalIndent(req, "", " ")
  284. body := strings.NewReader(string(binReq))
  285. fibApp := kserv_http.GetKernelServHttp().Fiber()
  286. hReq, err := http.NewRequest("POST", "/bus/sub", body)
  287. hReq.Header.Add("Content-Type", "application/json")
  288. if err != nil {
  289. sf.t.Fatalf("subBad1(): err=%v", err)
  290. }
  291. _resp, err := fibApp.Test(hReq)
  292. if err != nil {
  293. sf.t.Fatalf("subBad1(): after request, err=%v", err)
  294. }
  295. if _resp.StatusCode != 200 {
  296. sf.t.Fatalf("subBad1(): statusCode(%v)!=200", _resp.StatusCode)
  297. }
  298. defer _resp.Body.Close()
  299. binBody, _ := io.ReadAll(_resp.Body)
  300. resp := &msg_sub.SubscribeResp{}
  301. err = json.Unmarshal(binBody, resp)
  302. if err != nil {
  303. sf.t.Fatalf("subBad1(): err=%v", err)
  304. }
  305. if string(resp.Status_) != "ok" {
  306. sf.t.Fatalf("subBad1(): resp(%v)!='ok'", string(resp.Status_))
  307. }
  308. }
  309. // Отключена базовая шина
  310. func (sf *tester) subBad3() {
  311. sf.t.Log("subBad3")
  312. req := &msg_sub.SubscribeReq{
  313. Topic_: "topic_serv",
  314. Uuid_: "test_uuid",
  315. WebHook_: "http://localhost:18200/bus/pub/",
  316. }
  317. defer func() {
  318. if _panic := recover(); _panic != nil {
  319. sf.t.Fatalf("subBad3(): panic!=nil")
  320. }
  321. }()
  322. // _bus := kernel_bus_base.GetKernelBusBase()
  323. bus.IsWork_.Reset()
  324. defer bus.IsWork_.Set()
  325. resp := bus.processSubscribe(req)
  326. if resp.Status_ == "ok" {
  327. sf.t.Fatalf("subBad3(): resp==ok")
  328. }
  329. }
  330. // Проверка полей запроса в процессе подписки
  331. func (sf *tester) subGood1() {
  332. sf.t.Log("subGood1")
  333. req := &msg_sub.SubscribeReq{
  334. Topic_: "topic_serv",
  335. Uuid_: "test_uuid",
  336. WebHook_: "http://localhost:18200/bus/",
  337. }
  338. defer func() {
  339. if _panic := recover(); _panic != nil {
  340. sf.t.Fatalf("subGood1(): panic!=nil")
  341. }
  342. }()
  343. _ = bus.processSubscribe(req)
  344. }
  345. // Проверка кривых полей запроса в процессе подписки
  346. func (sf *tester) subBad2() {
  347. sf.t.Log("subBad2")
  348. req := &msg_sub.SubscribeReq{
  349. Topic_: "",
  350. }
  351. defer func() {
  352. if _panic := recover(); _panic == nil {
  353. sf.t.Fatalf("subBad2(): panic==nil")
  354. }
  355. }()
  356. _ = bus.processSubscribe(req)
  357. }
  358. // Кривой запрос
  359. func (sf *tester) subBad1() {
  360. sf.t.Log("subBad1")
  361. req := "tra-ta-ta"
  362. binReq, _ := json.MarshalIndent(req, "", " ")
  363. body := strings.NewReader(string(binReq))
  364. fibApp := kserv_http.GetKernelServHttp().Fiber()
  365. hReq, err := http.NewRequest("POST", "/bus/sub", body)
  366. hReq.Header.Add("Content-Type", "application/json")
  367. if err != nil {
  368. sf.t.Fatalf("subBad1(): err=%v", err)
  369. }
  370. _resp, err := fibApp.Test(hReq)
  371. if err != nil {
  372. sf.t.Fatalf("subBad1(): after request, err=%v", err)
  373. }
  374. if _resp.StatusCode != 400 {
  375. sf.t.Fatalf("subBad1(): statusCode(%v)!=400", _resp.StatusCode)
  376. }
  377. }
  378. // Входящий запрос
  379. func (sf *tester) req() {
  380. sf.t.Log("req")
  381. sf.reqBad1()
  382. sf.reqBad2()
  383. sf.reqBad3()
  384. sf.reqGood1()
  385. sf.reqBad4()
  386. }
  387. // Что-то с обработчиком
  388. func (sf *tester) reqBad4() {
  389. sf.t.Log("reqBad4")
  390. sf.handServ.IsBad_.Set()
  391. defer sf.handServ.IsBad_.Reset()
  392. req := &msg_serve.ServeReq{
  393. Topic_: sf.handServ.Topic_,
  394. Uuid_: "test_uuid",
  395. BinReq_: []byte("test_msg"),
  396. }
  397. binReq, _ := json.MarshalIndent(req, "", " ")
  398. body := strings.NewReader(string(binReq))
  399. fibApp := kserv_http.GetKernelServHttp().Fiber()
  400. hReq, err := http.NewRequest("POST", "/bus/request", body)
  401. hReq.Header.Add("Content-Type", "application/json")
  402. if err != nil {
  403. sf.t.Fatalf("reqBad4(): err=%v", err)
  404. }
  405. _resp, err := fibApp.Test(hReq)
  406. if err != nil {
  407. sf.t.Fatalf("reqBad4(): after request, err=%v", err)
  408. }
  409. if _resp.StatusCode != 200 {
  410. sf.t.Fatalf("reqBad4(): statusCode(%v)!=200", _resp.StatusCode)
  411. }
  412. defer _resp.Body.Close()
  413. binBody, _ := io.ReadAll(_resp.Body)
  414. resp := &msg_serve.ServeResp{}
  415. err = json.Unmarshal(binBody, resp)
  416. if err != nil {
  417. sf.t.Fatalf("reqBad4(): err=%v", err)
  418. }
  419. if string(resp.Status_) == "ok" {
  420. sf.t.Fatalf("reqBad4(): status(%v)=='ok'", string(resp.Status_))
  421. }
  422. }
  423. func (sf *tester) reqGood1() {
  424. sf.t.Log("reqGood1")
  425. req := &msg_serve.ServeReq{
  426. Topic_: sf.handServ.Topic_,
  427. Uuid_: "test_uuid",
  428. BinReq_: []byte("test_msg"),
  429. }
  430. binReq, _ := json.MarshalIndent(req, "", " ")
  431. body := strings.NewReader(string(binReq))
  432. fibApp := kserv_http.GetKernelServHttp().Fiber()
  433. hReq, err := http.NewRequest("POST", "/bus/request", body)
  434. hReq.Header.Add("Content-Type", "application/json")
  435. if err != nil {
  436. sf.t.Fatalf("reqGood1(): err=%v", err)
  437. }
  438. _resp, err := fibApp.Test(hReq)
  439. if err != nil {
  440. sf.t.Fatalf("reqGood1(): after request, err=%v", err)
  441. }
  442. if _resp.StatusCode != 200 {
  443. sf.t.Fatalf("reqGood1(): statusCode(%v)!=200", _resp.StatusCode)
  444. }
  445. defer _resp.Body.Close()
  446. binBody, _ := io.ReadAll(_resp.Body)
  447. resp := &msg_serve.ServeResp{}
  448. err = json.Unmarshal(binBody, resp)
  449. if err != nil {
  450. sf.t.Fatalf("reqGood1(): err=%v", err)
  451. }
  452. if string(resp.BinResp_) != "test_msg" {
  453. sf.t.Fatalf("reqGood1(): resp(%v)!='test_msg'", string(resp.BinResp_))
  454. }
  455. }
  456. // Нет такого топика для запросов
  457. func (sf *tester) reqBad3() {
  458. sf.t.Log("reqBad3")
  459. req := &msg_serve.ServeReq{
  460. Topic_: "bad_topic",
  461. Uuid_: "test_uuid",
  462. BinReq_: []byte("test_msg"),
  463. }
  464. binReq, _ := json.MarshalIndent(req, "", " ")
  465. body := strings.NewReader(string(binReq))
  466. fibApp := kserv_http.GetKernelServHttp().Fiber()
  467. hReq, err := http.NewRequest("POST", "/bus/request", body)
  468. if err != nil {
  469. sf.t.Fatalf("reqBad3(): err=%v", err)
  470. }
  471. resp, err := fibApp.Test(hReq)
  472. if err != nil {
  473. sf.t.Fatalf("reqBad3(): after request, err=%v", err)
  474. }
  475. if resp.StatusCode != 400 {
  476. sf.t.Fatalf("reqBad3(): statusCode(%v)!=400", resp.StatusCode)
  477. }
  478. }
  479. // Нет тела запроса
  480. func (sf *tester) reqBad2() {
  481. sf.t.Log("reqBad1")
  482. body := strings.NewReader("test_msg")
  483. fibApp := kserv_http.GetKernelServHttp().Fiber()
  484. hReq, err := http.NewRequest("POST", "/bus/request", body)
  485. if err != nil {
  486. sf.t.Fatalf("reqBad1(): err=%v", err)
  487. }
  488. resp, err := fibApp.Test(hReq)
  489. if err != nil {
  490. sf.t.Fatalf("reqBad1(): after request, err=%v", err)
  491. }
  492. if resp.StatusCode != 400 {
  493. sf.t.Fatalf("reqBad1(): statusCode(%v)!=400", resp.StatusCode)
  494. }
  495. }
  496. // Отсутствуют поля в запросе
  497. func (sf *tester) reqBad1() {
  498. sf.t.Log("reqBad1")
  499. defer func() {
  500. if _panic := recover(); _panic == nil {
  501. sf.t.Fatalf("reqBad1(): panic==nil")
  502. }
  503. }()
  504. bus.processSendRequest(nil)
  505. }
  506. // Получает локальную шину
  507. func (sf *tester) get() {
  508. sf.t.Log("get")
  509. _ = mock_env.MakeEnv()
  510. _ = os.Unsetenv("LOCAL_HTTP_URL")
  511. os.Setenv("LOCAL_HTTP_URL", "http://localhost:18312/")
  512. _ = GetKernelBusHttp()
  513. if bus == nil {
  514. sf.t.Fatalf("get(): bus==nil")
  515. }
  516. _ = GetKernelBusHttp()
  517. bus.RegisterServe(sf.handServ)
  518. }