client.go 77 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632263326342635263626372638263926402641264226432644264526462647264826492650265126522653265426552656265726582659266026612662266326642665266626672668266926702671267226732674267526762677267826792680268126822683268426852686268726882689269026912692269326942695269626972698269927002701270227032704270527062707270827092710271127122713271427152716271727182719272027212722272327242725272627272728272927302731273227332734273527362737273827392740274127422743274427452746274727482749275027512752275327542755275627572758275927602761276227632764276527662767276827692770277127722773277427752776277727782779278027812782278327842785278627872788278927902791279227932794279527962797279827992800280128022803280428052806280728082809281028112812281328142815281628172818281928202821282228232824282528262827282828292830283128322833283428352836283728382839284028412842284328442845284628472848284928502851285228532854285528562857285828592860286128622863286428652866286728682869287028712872287328742875287628772878287928802881288228832884288528862887
  1. // go:build !windows || !race
  2. package fasthttp
  3. import (
  4. "bufio"
  5. "crypto/tls"
  6. "errors"
  7. "fmt"
  8. "io"
  9. "net"
  10. "strconv"
  11. "strings"
  12. "sync"
  13. "sync/atomic"
  14. "time"
  15. )
  16. // Do performs the given http request and fills the given http response.
  17. //
  18. // Request must contain at least non-zero RequestURI with full url (including
  19. // scheme and host) or non-zero Host header + RequestURI.
  20. //
  21. // Client determines the server to be requested in the following order:
  22. //
  23. // - from RequestURI if it contains full url with scheme and host;
  24. // - from Host header otherwise.
  25. //
  26. // The function doesn't follow redirects. Use Get* for following redirects.
  27. //
  28. // Response is ignored if resp is nil.
  29. //
  30. // ErrNoFreeConns is returned if all DefaultMaxConnsPerHost connections
  31. // to the requested host are busy.
  32. //
  33. // It is recommended obtaining req and resp via AcquireRequest
  34. // and AcquireResponse in performance-critical code.
  35. func Do(req *Request, resp *Response) error {
  36. return defaultClient.Do(req, resp)
  37. }
  38. // DoTimeout performs the given request and waits for response during
  39. // the given timeout duration.
  40. //
  41. // Request must contain at least non-zero RequestURI with full url (including
  42. // scheme and host) or non-zero Host header + RequestURI.
  43. //
  44. // Client determines the server to be requested in the following order:
  45. //
  46. // - from RequestURI if it contains full url with scheme and host;
  47. // - from Host header otherwise.
  48. //
  49. // The function doesn't follow redirects. Use Get* for following redirects.
  50. //
  51. // Response is ignored if resp is nil.
  52. //
  53. // ErrTimeout is returned if the response wasn't returned during
  54. // the given timeout.
  55. //
  56. // ErrNoFreeConns is returned if all DefaultMaxConnsPerHost connections
  57. // to the requested host are busy.
  58. //
  59. // It is recommended obtaining req and resp via AcquireRequest
  60. // and AcquireResponse in performance-critical code.
  61. //
  62. // Warning: DoTimeout does not terminate the request itself. The request will
  63. // continue in the background and the response will be discarded.
  64. // If requests take too long and the connection pool gets filled up please
  65. // try using a Client and setting a ReadTimeout.
  66. func DoTimeout(req *Request, resp *Response, timeout time.Duration) error {
  67. return defaultClient.DoTimeout(req, resp, timeout)
  68. }
  69. // DoDeadline performs the given request and waits for response until
  70. // the given deadline.
  71. //
  72. // Request must contain at least non-zero RequestURI with full url (including
  73. // scheme and host) or non-zero Host header + RequestURI.
  74. //
  75. // Client determines the server to be requested in the following order:
  76. //
  77. // - from RequestURI if it contains full url with scheme and host;
  78. // - from Host header otherwise.
  79. //
  80. // The function doesn't follow redirects. Use Get* for following redirects.
  81. //
  82. // Response is ignored if resp is nil.
  83. //
  84. // ErrTimeout is returned if the response wasn't returned until
  85. // the given deadline.
  86. //
  87. // ErrNoFreeConns is returned if all DefaultMaxConnsPerHost connections
  88. // to the requested host are busy.
  89. //
  90. // It is recommended obtaining req and resp via AcquireRequest
  91. // and AcquireResponse in performance-critical code.
  92. func DoDeadline(req *Request, resp *Response, deadline time.Time) error {
  93. return defaultClient.DoDeadline(req, resp, deadline)
  94. }
  95. // DoRedirects performs the given http request and fills the given http response,
  96. // following up to maxRedirectsCount redirects. When the redirect count exceeds
  97. // maxRedirectsCount, ErrTooManyRedirects is returned.
  98. //
  99. // Request must contain at least non-zero RequestURI with full url (including
  100. // scheme and host) or non-zero Host header + RequestURI.
  101. //
  102. // Client determines the server to be requested in the following order:
  103. //
  104. // - from RequestURI if it contains full url with scheme and host;
  105. // - from Host header otherwise.
  106. //
  107. // Response is ignored if resp is nil.
  108. //
  109. // ErrNoFreeConns is returned if all DefaultMaxConnsPerHost connections
  110. // to the requested host are busy.
  111. //
  112. // It is recommended obtaining req and resp via AcquireRequest
  113. // and AcquireResponse in performance-critical code.
  114. func DoRedirects(req *Request, resp *Response, maxRedirectsCount int) error {
  115. _, _, err := doRequestFollowRedirects(req, resp, req.URI().String(), maxRedirectsCount, &defaultClient)
  116. return err
  117. }
  118. // Get returns the status code and body of url.
  119. //
  120. // The contents of dst will be replaced by the body and returned, if the dst
  121. // is too small a new slice will be allocated.
  122. //
  123. // The function follows redirects. Use Do* for manually handling redirects.
  124. func Get(dst []byte, url string) (statusCode int, body []byte, err error) {
  125. return defaultClient.Get(dst, url)
  126. }
  127. // GetTimeout returns the status code and body of url.
  128. //
  129. // The contents of dst will be replaced by the body and returned, if the dst
  130. // is too small a new slice will be allocated.
  131. //
  132. // The function follows redirects. Use Do* for manually handling redirects.
  133. //
  134. // ErrTimeout error is returned if url contents couldn't be fetched
  135. // during the given timeout.
  136. func GetTimeout(dst []byte, url string, timeout time.Duration) (statusCode int, body []byte, err error) {
  137. return defaultClient.GetTimeout(dst, url, timeout)
  138. }
  139. // GetDeadline returns the status code and body of url.
  140. //
  141. // The contents of dst will be replaced by the body and returned, if the dst
  142. // is too small a new slice will be allocated.
  143. //
  144. // The function follows redirects. Use Do* for manually handling redirects.
  145. //
  146. // ErrTimeout error is returned if url contents couldn't be fetched
  147. // until the given deadline.
  148. func GetDeadline(dst []byte, url string, deadline time.Time) (statusCode int, body []byte, err error) {
  149. return defaultClient.GetDeadline(dst, url, deadline)
  150. }
  151. // Post sends POST request to the given url with the given POST arguments.
  152. //
  153. // The contents of dst will be replaced by the body and returned, if the dst
  154. // is too small a new slice will be allocated.
  155. //
  156. // The function follows redirects. Use Do* for manually handling redirects.
  157. //
  158. // Empty POST body is sent if postArgs is nil.
  159. func Post(dst []byte, url string, postArgs *Args) (statusCode int, body []byte, err error) {
  160. return defaultClient.Post(dst, url, postArgs)
  161. }
  162. var defaultClient Client
  163. // Client implements http client.
  164. //
  165. // Copying Client by value is prohibited. Create new instance instead.
  166. //
  167. // It is safe calling Client methods from concurrently running goroutines.
  168. //
  169. // The fields of a Client should not be changed while it is in use.
  170. type Client struct {
  171. noCopy noCopy //nolint:unused,structcheck
  172. // Client name. Used in User-Agent request header.
  173. //
  174. // Default client name is used if not set.
  175. Name string
  176. // NoDefaultUserAgentHeader when set to true, causes the default
  177. // User-Agent header to be excluded from the Request.
  178. NoDefaultUserAgentHeader bool
  179. // Callback for establishing new connections to hosts.
  180. //
  181. // Default Dial is used if not set.
  182. Dial DialFunc
  183. // Attempt to connect to both ipv4 and ipv6 addresses if set to true.
  184. //
  185. // This option is used only if default TCP dialer is used,
  186. // i.e. if Dial is blank.
  187. //
  188. // By default client connects only to ipv4 addresses,
  189. // since unfortunately ipv6 remains broken in many networks worldwide :)
  190. DialDualStack bool
  191. // TLS config for https connections.
  192. //
  193. // Default TLS config is used if not set.
  194. TLSConfig *tls.Config
  195. // Maximum number of connections per each host which may be established.
  196. //
  197. // DefaultMaxConnsPerHost is used if not set.
  198. MaxConnsPerHost int
  199. // Idle keep-alive connections are closed after this duration.
  200. //
  201. // By default idle connections are closed
  202. // after DefaultMaxIdleConnDuration.
  203. MaxIdleConnDuration time.Duration
  204. // Keep-alive connections are closed after this duration.
  205. //
  206. // By default connection duration is unlimited.
  207. MaxConnDuration time.Duration
  208. // Maximum number of attempts for idempotent calls
  209. //
  210. // DefaultMaxIdemponentCallAttempts is used if not set.
  211. MaxIdemponentCallAttempts int
  212. // Per-connection buffer size for responses' reading.
  213. // This also limits the maximum header size.
  214. //
  215. // Default buffer size is used if 0.
  216. ReadBufferSize int
  217. // Per-connection buffer size for requests' writing.
  218. //
  219. // Default buffer size is used if 0.
  220. WriteBufferSize int
  221. // Maximum duration for full response reading (including body).
  222. //
  223. // By default response read timeout is unlimited.
  224. ReadTimeout time.Duration
  225. // Maximum duration for full request writing (including body).
  226. //
  227. // By default request write timeout is unlimited.
  228. WriteTimeout time.Duration
  229. // Maximum response body size.
  230. //
  231. // The client returns ErrBodyTooLarge if this limit is greater than 0
  232. // and response body is greater than the limit.
  233. //
  234. // By default response body size is unlimited.
  235. MaxResponseBodySize int
  236. // Header names are passed as-is without normalization
  237. // if this option is set.
  238. //
  239. // Disabled header names' normalization may be useful only for proxying
  240. // responses to other clients expecting case-sensitive
  241. // header names. See https://github.com/valyala/fasthttp/issues/57
  242. // for details.
  243. //
  244. // By default request and response header names are normalized, i.e.
  245. // The first letter and the first letters following dashes
  246. // are uppercased, while all the other letters are lowercased.
  247. // Examples:
  248. //
  249. // * HOST -> Host
  250. // * content-type -> Content-Type
  251. // * cONTENT-lenGTH -> Content-Length
  252. DisableHeaderNamesNormalizing bool
  253. // Path values are sent as-is without normalization
  254. //
  255. // Disabled path normalization may be useful for proxying incoming requests
  256. // to servers that are expecting paths to be forwarded as-is.
  257. //
  258. // By default path values are normalized, i.e.
  259. // extra slashes are removed, special characters are encoded.
  260. DisablePathNormalizing bool
  261. // Maximum duration for waiting for a free connection.
  262. //
  263. // By default will not waiting, return ErrNoFreeConns immediately
  264. MaxConnWaitTimeout time.Duration
  265. // RetryIf controls whether a retry should be attempted after an error.
  266. //
  267. // By default will use isIdempotent function
  268. RetryIf RetryIfFunc
  269. // Connection pool strategy. Can be either LIFO or FIFO (default).
  270. ConnPoolStrategy ConnPoolStrategyType
  271. // ConfigureClient configures the fasthttp.HostClient.
  272. ConfigureClient func(hc *HostClient) error
  273. mLock sync.Mutex
  274. m map[string]*HostClient
  275. ms map[string]*HostClient
  276. readerPool sync.Pool
  277. writerPool sync.Pool
  278. }
  279. // Get returns the status code and body of url.
  280. //
  281. // The contents of dst will be replaced by the body and returned, if the dst
  282. // is too small a new slice will be allocated.
  283. //
  284. // The function follows redirects. Use Do* for manually handling redirects.
  285. func (c *Client) Get(dst []byte, url string) (statusCode int, body []byte, err error) {
  286. return clientGetURL(dst, url, c)
  287. }
  288. // GetTimeout returns the status code and body of url.
  289. //
  290. // The contents of dst will be replaced by the body and returned, if the dst
  291. // is too small a new slice will be allocated.
  292. //
  293. // The function follows redirects. Use Do* for manually handling redirects.
  294. //
  295. // ErrTimeout error is returned if url contents couldn't be fetched
  296. // during the given timeout.
  297. func (c *Client) GetTimeout(dst []byte, url string, timeout time.Duration) (statusCode int, body []byte, err error) {
  298. return clientGetURLTimeout(dst, url, timeout, c)
  299. }
  300. // GetDeadline returns the status code and body of url.
  301. //
  302. // The contents of dst will be replaced by the body and returned, if the dst
  303. // is too small a new slice will be allocated.
  304. //
  305. // The function follows redirects. Use Do* for manually handling redirects.
  306. //
  307. // ErrTimeout error is returned if url contents couldn't be fetched
  308. // until the given deadline.
  309. func (c *Client) GetDeadline(dst []byte, url string, deadline time.Time) (statusCode int, body []byte, err error) {
  310. return clientGetURLDeadline(dst, url, deadline, c)
  311. }
  312. // Post sends POST request to the given url with the given POST arguments.
  313. //
  314. // The contents of dst will be replaced by the body and returned, if the dst
  315. // is too small a new slice will be allocated.
  316. //
  317. // The function follows redirects. Use Do* for manually handling redirects.
  318. //
  319. // Empty POST body is sent if postArgs is nil.
  320. func (c *Client) Post(dst []byte, url string, postArgs *Args) (statusCode int, body []byte, err error) {
  321. return clientPostURL(dst, url, postArgs, c)
  322. }
  323. // DoTimeout performs the given request and waits for response during
  324. // the given timeout duration.
  325. //
  326. // Request must contain at least non-zero RequestURI with full url (including
  327. // scheme and host) or non-zero Host header + RequestURI.
  328. //
  329. // Client determines the server to be requested in the following order:
  330. //
  331. // - from RequestURI if it contains full url with scheme and host;
  332. // - from Host header otherwise.
  333. //
  334. // The function doesn't follow redirects. Use Get* for following redirects.
  335. //
  336. // Response is ignored if resp is nil.
  337. //
  338. // ErrTimeout is returned if the response wasn't returned during
  339. // the given timeout.
  340. //
  341. // ErrNoFreeConns is returned if all Client.MaxConnsPerHost connections
  342. // to the requested host are busy.
  343. //
  344. // It is recommended obtaining req and resp via AcquireRequest
  345. // and AcquireResponse in performance-critical code.
  346. //
  347. // Warning: DoTimeout does not terminate the request itself. The request will
  348. // continue in the background and the response will be discarded.
  349. // If requests take too long and the connection pool gets filled up please
  350. // try setting a ReadTimeout.
  351. func (c *Client) DoTimeout(req *Request, resp *Response, timeout time.Duration) error {
  352. req.timeout = timeout
  353. return c.Do(req, resp)
  354. }
  355. // DoDeadline performs the given request and waits for response until
  356. // the given deadline.
  357. //
  358. // Request must contain at least non-zero RequestURI with full url (including
  359. // scheme and host) or non-zero Host header + RequestURI.
  360. //
  361. // Client determines the server to be requested in the following order:
  362. //
  363. // - from RequestURI if it contains full url with scheme and host;
  364. // - from Host header otherwise.
  365. //
  366. // The function doesn't follow redirects. Use Get* for following redirects.
  367. //
  368. // Response is ignored if resp is nil.
  369. //
  370. // ErrTimeout is returned if the response wasn't returned until
  371. // the given deadline.
  372. //
  373. // ErrNoFreeConns is returned if all Client.MaxConnsPerHost connections
  374. // to the requested host are busy.
  375. //
  376. // It is recommended obtaining req and resp via AcquireRequest
  377. // and AcquireResponse in performance-critical code.
  378. func (c *Client) DoDeadline(req *Request, resp *Response, deadline time.Time) error {
  379. req.timeout = time.Until(deadline)
  380. return c.Do(req, resp)
  381. }
  382. // DoRedirects performs the given http request and fills the given http response,
  383. // following up to maxRedirectsCount redirects. When the redirect count exceeds
  384. // maxRedirectsCount, ErrTooManyRedirects is returned.
  385. //
  386. // Request must contain at least non-zero RequestURI with full url (including
  387. // scheme and host) or non-zero Host header + RequestURI.
  388. //
  389. // Client determines the server to be requested in the following order:
  390. //
  391. // - from RequestURI if it contains full url with scheme and host;
  392. // - from Host header otherwise.
  393. //
  394. // Response is ignored if resp is nil.
  395. //
  396. // ErrNoFreeConns is returned if all DefaultMaxConnsPerHost connections
  397. // to the requested host are busy.
  398. //
  399. // It is recommended obtaining req and resp via AcquireRequest
  400. // and AcquireResponse in performance-critical code.
  401. func (c *Client) DoRedirects(req *Request, resp *Response, maxRedirectsCount int) error {
  402. _, _, err := doRequestFollowRedirects(req, resp, req.URI().String(), maxRedirectsCount, c)
  403. return err
  404. }
  405. // Do performs the given http request and fills the given http response.
  406. //
  407. // Request must contain at least non-zero RequestURI with full url (including
  408. // scheme and host) or non-zero Host header + RequestURI.
  409. //
  410. // Client determines the server to be requested in the following order:
  411. //
  412. // - from RequestURI if it contains full url with scheme and host;
  413. // - from Host header otherwise.
  414. //
  415. // Response is ignored if resp is nil.
  416. //
  417. // The function doesn't follow redirects. Use Get* for following redirects.
  418. //
  419. // ErrNoFreeConns is returned if all Client.MaxConnsPerHost connections
  420. // to the requested host are busy.
  421. //
  422. // It is recommended obtaining req and resp via AcquireRequest
  423. // and AcquireResponse in performance-critical code.
  424. func (c *Client) Do(req *Request, resp *Response) error {
  425. uri := req.URI()
  426. if uri == nil {
  427. return ErrorInvalidURI
  428. }
  429. host := uri.Host()
  430. isTLS := false
  431. if uri.isHttps() {
  432. isTLS = true
  433. } else if !uri.isHttp() {
  434. return fmt.Errorf("unsupported protocol %q. http and https are supported", uri.Scheme())
  435. }
  436. startCleaner := false
  437. c.mLock.Lock()
  438. m := c.m
  439. if isTLS {
  440. m = c.ms
  441. }
  442. if m == nil {
  443. m = make(map[string]*HostClient)
  444. if isTLS {
  445. c.ms = m
  446. } else {
  447. c.m = m
  448. }
  449. }
  450. hc := m[string(host)]
  451. if hc == nil {
  452. hc = &HostClient{
  453. Addr: addMissingPort(string(host), isTLS),
  454. Name: c.Name,
  455. NoDefaultUserAgentHeader: c.NoDefaultUserAgentHeader,
  456. Dial: c.Dial,
  457. DialDualStack: c.DialDualStack,
  458. IsTLS: isTLS,
  459. TLSConfig: c.TLSConfig,
  460. MaxConns: c.MaxConnsPerHost,
  461. MaxIdleConnDuration: c.MaxIdleConnDuration,
  462. MaxConnDuration: c.MaxConnDuration,
  463. MaxIdemponentCallAttempts: c.MaxIdemponentCallAttempts,
  464. ReadBufferSize: c.ReadBufferSize,
  465. WriteBufferSize: c.WriteBufferSize,
  466. ReadTimeout: c.ReadTimeout,
  467. WriteTimeout: c.WriteTimeout,
  468. MaxResponseBodySize: c.MaxResponseBodySize,
  469. DisableHeaderNamesNormalizing: c.DisableHeaderNamesNormalizing,
  470. DisablePathNormalizing: c.DisablePathNormalizing,
  471. MaxConnWaitTimeout: c.MaxConnWaitTimeout,
  472. RetryIf: c.RetryIf,
  473. ConnPoolStrategy: c.ConnPoolStrategy,
  474. clientReaderPool: &c.readerPool,
  475. clientWriterPool: &c.writerPool,
  476. }
  477. if c.ConfigureClient != nil {
  478. if err := c.ConfigureClient(hc); err != nil {
  479. return err
  480. }
  481. }
  482. m[string(host)] = hc
  483. if len(m) == 1 {
  484. startCleaner = true
  485. }
  486. }
  487. atomic.AddInt32(&hc.pendingClientRequests, 1)
  488. defer atomic.AddInt32(&hc.pendingClientRequests, -1)
  489. c.mLock.Unlock()
  490. if startCleaner {
  491. go c.mCleaner(m)
  492. }
  493. return hc.Do(req, resp)
  494. }
  495. // CloseIdleConnections closes any connections which were previously
  496. // connected from previous requests but are now sitting idle in a
  497. // "keep-alive" state. It does not interrupt any connections currently
  498. // in use.
  499. func (c *Client) CloseIdleConnections() {
  500. c.mLock.Lock()
  501. for _, v := range c.m {
  502. v.CloseIdleConnections()
  503. }
  504. for _, v := range c.ms {
  505. v.CloseIdleConnections()
  506. }
  507. c.mLock.Unlock()
  508. }
  509. func (c *Client) mCleaner(m map[string]*HostClient) {
  510. mustStop := false
  511. sleep := c.MaxIdleConnDuration
  512. if sleep < time.Second {
  513. sleep = time.Second
  514. } else if sleep > 10*time.Second {
  515. sleep = 10 * time.Second
  516. }
  517. for {
  518. c.mLock.Lock()
  519. for k, v := range m {
  520. v.connsLock.Lock()
  521. if v.connsCount == 0 && atomic.LoadInt32(&v.pendingClientRequests) == 0 {
  522. delete(m, k)
  523. }
  524. v.connsLock.Unlock()
  525. }
  526. if len(m) == 0 {
  527. mustStop = true
  528. }
  529. c.mLock.Unlock()
  530. if mustStop {
  531. break
  532. }
  533. time.Sleep(sleep)
  534. }
  535. }
  536. // DefaultMaxConnsPerHost is the maximum number of concurrent connections
  537. // http client may establish per host by default (i.e. if
  538. // Client.MaxConnsPerHost isn't set).
  539. const DefaultMaxConnsPerHost = 512
  540. // DefaultMaxIdleConnDuration is the default duration before idle keep-alive
  541. // connection is closed.
  542. const DefaultMaxIdleConnDuration = 10 * time.Second
  543. // DefaultMaxIdemponentCallAttempts is the default idempotent calls attempts count.
  544. const DefaultMaxIdemponentCallAttempts = 5
  545. // DialFunc must establish connection to addr.
  546. //
  547. // There is no need in establishing TLS (SSL) connection for https.
  548. // The client automatically converts connection to TLS
  549. // if HostClient.IsTLS is set.
  550. //
  551. // TCP address passed to DialFunc always contains host and port.
  552. // Example TCP addr values:
  553. //
  554. // - foobar.com:80
  555. // - foobar.com:443
  556. // - foobar.com:8080
  557. type DialFunc func(addr string) (net.Conn, error)
  558. // RetryIfFunc signature of retry if function
  559. //
  560. // Request argument passed to RetryIfFunc, if there are any request errors.
  561. type RetryIfFunc func(request *Request) bool
  562. // TransportFunc wraps every request/response.
  563. type TransportFunc func(*Request, *Response) error
  564. // ConnPoolStrategyType define strategy of connection pool enqueue/dequeue
  565. type ConnPoolStrategyType int
  566. const (
  567. FIFO ConnPoolStrategyType = iota
  568. LIFO
  569. )
  570. // HostClient balances http requests among hosts listed in Addr.
  571. //
  572. // HostClient may be used for balancing load among multiple upstream hosts.
  573. // While multiple addresses passed to HostClient.Addr may be used for balancing
  574. // load among them, it would be better using LBClient instead, since HostClient
  575. // may unevenly balance load among upstream hosts.
  576. //
  577. // It is forbidden copying HostClient instances. Create new instances instead.
  578. //
  579. // It is safe calling HostClient methods from concurrently running goroutines.
  580. type HostClient struct {
  581. noCopy noCopy //nolint:unused,structcheck
  582. // Comma-separated list of upstream HTTP server host addresses,
  583. // which are passed to Dial in a round-robin manner.
  584. //
  585. // Each address may contain port if default dialer is used.
  586. // For example,
  587. //
  588. // - foobar.com:80
  589. // - foobar.com:443
  590. // - foobar.com:8080
  591. Addr string
  592. // Client name. Used in User-Agent request header.
  593. Name string
  594. // NoDefaultUserAgentHeader when set to true, causes the default
  595. // User-Agent header to be excluded from the Request.
  596. NoDefaultUserAgentHeader bool
  597. // Callback for establishing new connection to the host.
  598. //
  599. // Default Dial is used if not set.
  600. Dial DialFunc
  601. // Attempt to connect to both ipv4 and ipv6 host addresses
  602. // if set to true.
  603. //
  604. // This option is used only if default TCP dialer is used,
  605. // i.e. if Dial is blank.
  606. //
  607. // By default client connects only to ipv4 addresses,
  608. // since unfortunately ipv6 remains broken in many networks worldwide :)
  609. DialDualStack bool
  610. // Whether to use TLS (aka SSL or HTTPS) for host connections.
  611. IsTLS bool
  612. // Optional TLS config.
  613. TLSConfig *tls.Config
  614. // Maximum number of connections which may be established to all hosts
  615. // listed in Addr.
  616. //
  617. // You can change this value while the HostClient is being used
  618. // using HostClient.SetMaxConns(value)
  619. //
  620. // DefaultMaxConnsPerHost is used if not set.
  621. MaxConns int
  622. // Keep-alive connections are closed after this duration.
  623. //
  624. // By default connection duration is unlimited.
  625. MaxConnDuration time.Duration
  626. // Idle keep-alive connections are closed after this duration.
  627. //
  628. // By default idle connections are closed
  629. // after DefaultMaxIdleConnDuration.
  630. MaxIdleConnDuration time.Duration
  631. // Maximum number of attempts for idempotent calls
  632. //
  633. // DefaultMaxIdemponentCallAttempts is used if not set.
  634. MaxIdemponentCallAttempts int
  635. // Per-connection buffer size for responses' reading.
  636. // This also limits the maximum header size.
  637. //
  638. // Default buffer size is used if 0.
  639. ReadBufferSize int
  640. // Per-connection buffer size for requests' writing.
  641. //
  642. // Default buffer size is used if 0.
  643. WriteBufferSize int
  644. // Maximum duration for full response reading (including body).
  645. //
  646. // By default response read timeout is unlimited.
  647. ReadTimeout time.Duration
  648. // Maximum duration for full request writing (including body).
  649. //
  650. // By default request write timeout is unlimited.
  651. WriteTimeout time.Duration
  652. // Maximum response body size.
  653. //
  654. // The client returns ErrBodyTooLarge if this limit is greater than 0
  655. // and response body is greater than the limit.
  656. //
  657. // By default response body size is unlimited.
  658. MaxResponseBodySize int
  659. // Header names are passed as-is without normalization
  660. // if this option is set.
  661. //
  662. // Disabled header names' normalization may be useful only for proxying
  663. // responses to other clients expecting case-sensitive
  664. // header names. See https://github.com/valyala/fasthttp/issues/57
  665. // for details.
  666. //
  667. // By default request and response header names are normalized, i.e.
  668. // The first letter and the first letters following dashes
  669. // are uppercased, while all the other letters are lowercased.
  670. // Examples:
  671. //
  672. // * HOST -> Host
  673. // * content-type -> Content-Type
  674. // * cONTENT-lenGTH -> Content-Length
  675. DisableHeaderNamesNormalizing bool
  676. // Path values are sent as-is without normalization
  677. //
  678. // Disabled path normalization may be useful for proxying incoming requests
  679. // to servers that are expecting paths to be forwarded as-is.
  680. //
  681. // By default path values are normalized, i.e.
  682. // extra slashes are removed, special characters are encoded.
  683. DisablePathNormalizing bool
  684. // Will not log potentially sensitive content in error logs
  685. //
  686. // This option is useful for servers that handle sensitive data
  687. // in the request/response.
  688. //
  689. // Client logs full errors by default.
  690. SecureErrorLogMessage bool
  691. // Maximum duration for waiting for a free connection.
  692. //
  693. // By default will not waiting, return ErrNoFreeConns immediately
  694. MaxConnWaitTimeout time.Duration
  695. // RetryIf controls whether a retry should be attempted after an error.
  696. //
  697. // By default will use isIdempotent function
  698. RetryIf RetryIfFunc
  699. // Transport defines a transport-like mechanism that wraps every request/response.
  700. Transport TransportFunc
  701. // Connection pool strategy. Can be either LIFO or FIFO (default).
  702. ConnPoolStrategy ConnPoolStrategyType
  703. clientName atomic.Value
  704. lastUseTime uint32
  705. connsLock sync.Mutex
  706. connsCount int
  707. conns []*clientConn
  708. connsWait *wantConnQueue
  709. addrsLock sync.Mutex
  710. addrs []string
  711. addrIdx uint32
  712. tlsConfigMap map[string]*tls.Config
  713. tlsConfigMapLock sync.Mutex
  714. readerPool sync.Pool
  715. writerPool sync.Pool
  716. clientReaderPool *sync.Pool
  717. clientWriterPool *sync.Pool
  718. pendingRequests int32
  719. // pendingClientRequests counts the number of requests that a Client is currently running using this HostClient.
  720. // It will be incremented ealier than pendingRequests and will be used by Client to see if the HostClient is still in use.
  721. pendingClientRequests int32
  722. connsCleanerRun bool
  723. }
  724. type clientConn struct {
  725. c net.Conn
  726. createdTime time.Time
  727. lastUseTime time.Time
  728. }
  729. var startTimeUnix = time.Now().Unix()
  730. // LastUseTime returns time the client was last used
  731. func (c *HostClient) LastUseTime() time.Time {
  732. n := atomic.LoadUint32(&c.lastUseTime)
  733. return time.Unix(startTimeUnix+int64(n), 0)
  734. }
  735. // Get returns the status code and body of url.
  736. //
  737. // The contents of dst will be replaced by the body and returned, if the dst
  738. // is too small a new slice will be allocated.
  739. //
  740. // The function follows redirects. Use Do* for manually handling redirects.
  741. func (c *HostClient) Get(dst []byte, url string) (statusCode int, body []byte, err error) {
  742. return clientGetURL(dst, url, c)
  743. }
  744. // GetTimeout returns the status code and body of url.
  745. //
  746. // The contents of dst will be replaced by the body and returned, if the dst
  747. // is too small a new slice will be allocated.
  748. //
  749. // The function follows redirects. Use Do* for manually handling redirects.
  750. //
  751. // ErrTimeout error is returned if url contents couldn't be fetched
  752. // during the given timeout.
  753. func (c *HostClient) GetTimeout(dst []byte, url string, timeout time.Duration) (statusCode int, body []byte, err error) {
  754. return clientGetURLTimeout(dst, url, timeout, c)
  755. }
  756. // GetDeadline returns the status code and body of url.
  757. //
  758. // The contents of dst will be replaced by the body and returned, if the dst
  759. // is too small a new slice will be allocated.
  760. //
  761. // The function follows redirects. Use Do* for manually handling redirects.
  762. //
  763. // ErrTimeout error is returned if url contents couldn't be fetched
  764. // until the given deadline.
  765. func (c *HostClient) GetDeadline(dst []byte, url string, deadline time.Time) (statusCode int, body []byte, err error) {
  766. return clientGetURLDeadline(dst, url, deadline, c)
  767. }
  768. // Post sends POST request to the given url with the given POST arguments.
  769. //
  770. // The contents of dst will be replaced by the body and returned, if the dst
  771. // is too small a new slice will be allocated.
  772. //
  773. // The function follows redirects. Use Do* for manually handling redirects.
  774. //
  775. // Empty POST body is sent if postArgs is nil.
  776. func (c *HostClient) Post(dst []byte, url string, postArgs *Args) (statusCode int, body []byte, err error) {
  777. return clientPostURL(dst, url, postArgs, c)
  778. }
  779. type clientDoer interface {
  780. Do(req *Request, resp *Response) error
  781. }
  782. func clientGetURL(dst []byte, url string, c clientDoer) (statusCode int, body []byte, err error) {
  783. req := AcquireRequest()
  784. statusCode, body, err = doRequestFollowRedirectsBuffer(req, dst, url, c)
  785. ReleaseRequest(req)
  786. return statusCode, body, err
  787. }
  788. func clientGetURLTimeout(dst []byte, url string, timeout time.Duration, c clientDoer) (statusCode int, body []byte, err error) {
  789. deadline := time.Now().Add(timeout)
  790. return clientGetURLDeadline(dst, url, deadline, c)
  791. }
  792. type clientURLResponse struct {
  793. statusCode int
  794. body []byte
  795. err error
  796. }
  797. func clientGetURLDeadline(dst []byte, url string, deadline time.Time, c clientDoer) (statusCode int, body []byte, err error) {
  798. timeout := -time.Since(deadline)
  799. if timeout <= 0 {
  800. return 0, dst, ErrTimeout
  801. }
  802. var ch chan clientURLResponse
  803. chv := clientURLResponseChPool.Get()
  804. if chv == nil {
  805. chv = make(chan clientURLResponse, 1)
  806. }
  807. ch = chv.(chan clientURLResponse)
  808. // Note that the request continues execution on ErrTimeout until
  809. // client-specific ReadTimeout exceeds. This helps limiting load
  810. // on slow hosts by MaxConns* concurrent requests.
  811. //
  812. // Without this 'hack' the load on slow host could exceed MaxConns*
  813. // concurrent requests, since timed out requests on client side
  814. // usually continue execution on the host.
  815. var mu sync.Mutex
  816. var timedout, responded bool
  817. go func() {
  818. req := AcquireRequest()
  819. statusCodeCopy, bodyCopy, errCopy := doRequestFollowRedirectsBuffer(req, dst, url, c)
  820. mu.Lock()
  821. {
  822. if !timedout {
  823. ch <- clientURLResponse{
  824. statusCode: statusCodeCopy,
  825. body: bodyCopy,
  826. err: errCopy,
  827. }
  828. responded = true
  829. }
  830. }
  831. mu.Unlock()
  832. ReleaseRequest(req)
  833. }()
  834. tc := AcquireTimer(timeout)
  835. select {
  836. case resp := <-ch:
  837. statusCode = resp.statusCode
  838. body = resp.body
  839. err = resp.err
  840. case <-tc.C:
  841. mu.Lock()
  842. {
  843. if responded {
  844. resp := <-ch
  845. statusCode = resp.statusCode
  846. body = resp.body
  847. err = resp.err
  848. } else {
  849. timedout = true
  850. err = ErrTimeout
  851. body = dst
  852. }
  853. }
  854. mu.Unlock()
  855. }
  856. ReleaseTimer(tc)
  857. clientURLResponseChPool.Put(chv)
  858. return statusCode, body, err
  859. }
  860. var clientURLResponseChPool sync.Pool
  861. func clientPostURL(dst []byte, url string, postArgs *Args, c clientDoer) (statusCode int, body []byte, err error) {
  862. req := AcquireRequest()
  863. req.Header.SetMethod(MethodPost)
  864. req.Header.SetContentTypeBytes(strPostArgsContentType)
  865. if postArgs != nil {
  866. if _, err := postArgs.WriteTo(req.BodyWriter()); err != nil {
  867. return 0, nil, err
  868. }
  869. }
  870. statusCode, body, err = doRequestFollowRedirectsBuffer(req, dst, url, c)
  871. ReleaseRequest(req)
  872. return statusCode, body, err
  873. }
  874. var (
  875. // ErrMissingLocation is returned by clients when the Location header is missing on
  876. // an HTTP response with a redirect status code.
  877. ErrMissingLocation = errors.New("missing Location header for http redirect")
  878. // ErrTooManyRedirects is returned by clients when the number of redirects followed
  879. // exceed the max count.
  880. ErrTooManyRedirects = errors.New("too many redirects detected when doing the request")
  881. // HostClients are only able to follow redirects to the same protocol.
  882. ErrHostClientRedirectToDifferentScheme = errors.New("HostClient can't follow redirects to a different protocol, please use Client instead")
  883. )
  884. const defaultMaxRedirectsCount = 16
  885. func doRequestFollowRedirectsBuffer(req *Request, dst []byte, url string, c clientDoer) (statusCode int, body []byte, err error) {
  886. resp := AcquireResponse()
  887. bodyBuf := resp.bodyBuffer()
  888. resp.keepBodyBuffer = true
  889. oldBody := bodyBuf.B
  890. bodyBuf.B = dst
  891. statusCode, _, err = doRequestFollowRedirects(req, resp, url, defaultMaxRedirectsCount, c)
  892. body = bodyBuf.B
  893. bodyBuf.B = oldBody
  894. resp.keepBodyBuffer = false
  895. ReleaseResponse(resp)
  896. return statusCode, body, err
  897. }
  898. func doRequestFollowRedirects(req *Request, resp *Response, url string, maxRedirectsCount int, c clientDoer) (statusCode int, body []byte, err error) {
  899. redirectsCount := 0
  900. for {
  901. req.SetRequestURI(url)
  902. if err := req.parseURI(); err != nil {
  903. return 0, nil, err
  904. }
  905. if err = c.Do(req, resp); err != nil {
  906. break
  907. }
  908. statusCode = resp.Header.StatusCode()
  909. if !StatusCodeIsRedirect(statusCode) {
  910. break
  911. }
  912. redirectsCount++
  913. if redirectsCount > maxRedirectsCount {
  914. err = ErrTooManyRedirects
  915. break
  916. }
  917. location := resp.Header.peek(strLocation)
  918. if len(location) == 0 {
  919. err = ErrMissingLocation
  920. break
  921. }
  922. url = getRedirectURL(url, location)
  923. }
  924. return statusCode, body, err
  925. }
  926. func getRedirectURL(baseURL string, location []byte) string {
  927. u := AcquireURI()
  928. u.Update(baseURL)
  929. u.UpdateBytes(location)
  930. redirectURL := u.String()
  931. ReleaseURI(u)
  932. return redirectURL
  933. }
  934. // StatusCodeIsRedirect returns true if the status code indicates a redirect.
  935. func StatusCodeIsRedirect(statusCode int) bool {
  936. return statusCode == StatusMovedPermanently ||
  937. statusCode == StatusFound ||
  938. statusCode == StatusSeeOther ||
  939. statusCode == StatusTemporaryRedirect ||
  940. statusCode == StatusPermanentRedirect
  941. }
  942. var (
  943. requestPool sync.Pool
  944. responsePool sync.Pool
  945. )
  946. // AcquireRequest returns an empty Request instance from request pool.
  947. //
  948. // The returned Request instance may be passed to ReleaseRequest when it is
  949. // no longer needed. This allows Request recycling, reduces GC pressure
  950. // and usually improves performance.
  951. func AcquireRequest() *Request {
  952. v := requestPool.Get()
  953. if v == nil {
  954. return &Request{}
  955. }
  956. return v.(*Request)
  957. }
  958. // ReleaseRequest returns req acquired via AcquireRequest to request pool.
  959. //
  960. // It is forbidden accessing req and/or its' members after returning
  961. // it to request pool.
  962. func ReleaseRequest(req *Request) {
  963. req.Reset()
  964. requestPool.Put(req)
  965. }
  966. // AcquireResponse returns an empty Response instance from response pool.
  967. //
  968. // The returned Response instance may be passed to ReleaseResponse when it is
  969. // no longer needed. This allows Response recycling, reduces GC pressure
  970. // and usually improves performance.
  971. func AcquireResponse() *Response {
  972. v := responsePool.Get()
  973. if v == nil {
  974. return &Response{}
  975. }
  976. return v.(*Response)
  977. }
  978. // ReleaseResponse return resp acquired via AcquireResponse to response pool.
  979. //
  980. // It is forbidden accessing resp and/or its' members after returning
  981. // it to response pool.
  982. func ReleaseResponse(resp *Response) {
  983. resp.Reset()
  984. responsePool.Put(resp)
  985. }
  986. // DoTimeout performs the given request and waits for response during
  987. // the given timeout duration.
  988. //
  989. // Request must contain at least non-zero RequestURI with full url (including
  990. // scheme and host) or non-zero Host header + RequestURI.
  991. //
  992. // The function doesn't follow redirects. Use Get* for following redirects.
  993. //
  994. // Response is ignored if resp is nil.
  995. //
  996. // ErrTimeout is returned if the response wasn't returned during
  997. // the given timeout.
  998. //
  999. // ErrNoFreeConns is returned if all HostClient.MaxConns connections
  1000. // to the host are busy.
  1001. //
  1002. // It is recommended obtaining req and resp via AcquireRequest
  1003. // and AcquireResponse in performance-critical code.
  1004. //
  1005. // Warning: DoTimeout does not terminate the request itself. The request will
  1006. // continue in the background and the response will be discarded.
  1007. // If requests take too long and the connection pool gets filled up please
  1008. // try setting a ReadTimeout.
  1009. func (c *HostClient) DoTimeout(req *Request, resp *Response, timeout time.Duration) error {
  1010. req.timeout = timeout
  1011. return c.Do(req, resp)
  1012. }
  1013. // DoDeadline performs the given request and waits for response until
  1014. // the given deadline.
  1015. //
  1016. // Request must contain at least non-zero RequestURI with full url (including
  1017. // scheme and host) or non-zero Host header + RequestURI.
  1018. //
  1019. // The function doesn't follow redirects. Use Get* for following redirects.
  1020. //
  1021. // Response is ignored if resp is nil.
  1022. //
  1023. // ErrTimeout is returned if the response wasn't returned until
  1024. // the given deadline.
  1025. //
  1026. // ErrNoFreeConns is returned if all HostClient.MaxConns connections
  1027. // to the host are busy.
  1028. //
  1029. // It is recommended obtaining req and resp via AcquireRequest
  1030. // and AcquireResponse in performance-critical code.
  1031. func (c *HostClient) DoDeadline(req *Request, resp *Response, deadline time.Time) error {
  1032. req.timeout = time.Until(deadline)
  1033. return c.Do(req, resp)
  1034. }
  1035. // DoRedirects performs the given http request and fills the given http response,
  1036. // following up to maxRedirectsCount redirects. When the redirect count exceeds
  1037. // maxRedirectsCount, ErrTooManyRedirects is returned.
  1038. //
  1039. // Request must contain at least non-zero RequestURI with full url (including
  1040. // scheme and host) or non-zero Host header + RequestURI.
  1041. //
  1042. // Client determines the server to be requested in the following order:
  1043. //
  1044. // - from RequestURI if it contains full url with scheme and host;
  1045. // - from Host header otherwise.
  1046. //
  1047. // Response is ignored if resp is nil.
  1048. //
  1049. // ErrNoFreeConns is returned if all DefaultMaxConnsPerHost connections
  1050. // to the requested host are busy.
  1051. //
  1052. // It is recommended obtaining req and resp via AcquireRequest
  1053. // and AcquireResponse in performance-critical code.
  1054. func (c *HostClient) DoRedirects(req *Request, resp *Response, maxRedirectsCount int) error {
  1055. _, _, err := doRequestFollowRedirects(req, resp, req.URI().String(), maxRedirectsCount, c)
  1056. return err
  1057. }
  1058. // Do performs the given http request and sets the corresponding response.
  1059. //
  1060. // Request must contain at least non-zero RequestURI with full url (including
  1061. // scheme and host) or non-zero Host header + RequestURI.
  1062. //
  1063. // The function doesn't follow redirects. Use Get* for following redirects.
  1064. //
  1065. // Response is ignored if resp is nil.
  1066. //
  1067. // ErrNoFreeConns is returned if all HostClient.MaxConns connections
  1068. // to the host are busy.
  1069. //
  1070. // It is recommended obtaining req and resp via AcquireRequest
  1071. // and AcquireResponse in performance-critical code.
  1072. func (c *HostClient) Do(req *Request, resp *Response) error {
  1073. var err error
  1074. var retry bool
  1075. maxAttempts := c.MaxIdemponentCallAttempts
  1076. if maxAttempts <= 0 {
  1077. maxAttempts = DefaultMaxIdemponentCallAttempts
  1078. }
  1079. isRequestRetryable := isIdempotent
  1080. if c.RetryIf != nil {
  1081. isRequestRetryable = c.RetryIf
  1082. }
  1083. attempts := 0
  1084. hasBodyStream := req.IsBodyStream()
  1085. atomic.AddInt32(&c.pendingRequests, 1)
  1086. for {
  1087. retry, err = c.do(req, resp)
  1088. if err == nil || !retry {
  1089. break
  1090. }
  1091. if hasBodyStream {
  1092. break
  1093. }
  1094. if !isRequestRetryable(req) {
  1095. // Retry non-idempotent requests if the server closes
  1096. // the connection before sending the response.
  1097. //
  1098. // This case is possible if the server closes the idle
  1099. // keep-alive connection on timeout.
  1100. //
  1101. // Apache and nginx usually do this.
  1102. if err != io.EOF {
  1103. break
  1104. }
  1105. }
  1106. attempts++
  1107. if attempts >= maxAttempts {
  1108. break
  1109. }
  1110. }
  1111. atomic.AddInt32(&c.pendingRequests, -1)
  1112. if err == io.EOF {
  1113. err = ErrConnectionClosed
  1114. }
  1115. return err
  1116. }
  1117. // PendingRequests returns the current number of requests the client
  1118. // is executing.
  1119. //
  1120. // This function may be used for balancing load among multiple HostClient
  1121. // instances.
  1122. func (c *HostClient) PendingRequests() int {
  1123. return int(atomic.LoadInt32(&c.pendingRequests))
  1124. }
  1125. func isIdempotent(req *Request) bool {
  1126. return req.Header.IsGet() || req.Header.IsHead() || req.Header.IsPut()
  1127. }
  1128. func (c *HostClient) do(req *Request, resp *Response) (bool, error) {
  1129. nilResp := false
  1130. if resp == nil {
  1131. nilResp = true
  1132. resp = AcquireResponse()
  1133. }
  1134. ok, err := c.doNonNilReqResp(req, resp)
  1135. if nilResp {
  1136. ReleaseResponse(resp)
  1137. }
  1138. return ok, err
  1139. }
  1140. func (c *HostClient) doNonNilReqResp(req *Request, resp *Response) (bool, error) {
  1141. if req == nil {
  1142. panic("BUG: req cannot be nil")
  1143. }
  1144. if resp == nil {
  1145. panic("BUG: resp cannot be nil")
  1146. }
  1147. // Secure header error logs configuration
  1148. resp.secureErrorLogMessage = c.SecureErrorLogMessage
  1149. resp.Header.secureErrorLogMessage = c.SecureErrorLogMessage
  1150. req.secureErrorLogMessage = c.SecureErrorLogMessage
  1151. req.Header.secureErrorLogMessage = c.SecureErrorLogMessage
  1152. if c.IsTLS != req.URI().isHttps() {
  1153. return false, ErrHostClientRedirectToDifferentScheme
  1154. }
  1155. atomic.StoreUint32(&c.lastUseTime, uint32(time.Now().Unix()-startTimeUnix))
  1156. // Free up resources occupied by response before sending the request,
  1157. // so the GC may reclaim these resources (e.g. response body).
  1158. // backing up SkipBody in case it was set explicitly
  1159. customSkipBody := resp.SkipBody
  1160. resp.Reset()
  1161. resp.SkipBody = customSkipBody
  1162. req.URI().DisablePathNormalizing = c.DisablePathNormalizing
  1163. userAgentOld := req.Header.UserAgent()
  1164. if len(userAgentOld) == 0 {
  1165. req.Header.userAgent = append(req.Header.userAgent[:0], c.getClientName()...)
  1166. }
  1167. if c.Transport != nil {
  1168. err := c.Transport(req, resp)
  1169. return err == nil, err
  1170. }
  1171. var deadline time.Time
  1172. if req.timeout > 0 {
  1173. deadline = time.Now().Add(req.timeout)
  1174. }
  1175. cc, err := c.acquireConn(req.timeout, req.ConnectionClose())
  1176. if err != nil {
  1177. return false, err
  1178. }
  1179. conn := cc.c
  1180. resp.parseNetConn(conn)
  1181. writeDeadline := deadline
  1182. if c.WriteTimeout > 0 {
  1183. tmpWriteDeadline := time.Now().Add(c.WriteTimeout)
  1184. if writeDeadline.IsZero() || tmpWriteDeadline.Before(writeDeadline) {
  1185. writeDeadline = tmpWriteDeadline
  1186. }
  1187. }
  1188. if !writeDeadline.IsZero() {
  1189. // Set Deadline every time, since golang has fixed the performance issue
  1190. // See https://github.com/golang/go/issues/15133#issuecomment-271571395 for details
  1191. if err = conn.SetWriteDeadline(writeDeadline); err != nil {
  1192. c.closeConn(cc)
  1193. return true, err
  1194. }
  1195. }
  1196. resetConnection := false
  1197. if c.MaxConnDuration > 0 && time.Since(cc.createdTime) > c.MaxConnDuration && !req.ConnectionClose() {
  1198. req.SetConnectionClose()
  1199. resetConnection = true
  1200. }
  1201. bw := c.acquireWriter(conn)
  1202. err = req.Write(bw)
  1203. if resetConnection {
  1204. req.Header.ResetConnectionClose()
  1205. }
  1206. if err == nil {
  1207. err = bw.Flush()
  1208. }
  1209. c.releaseWriter(bw)
  1210. // Return ErrTimeout on any timeout.
  1211. if x, ok := err.(interface{ Timeout() bool }); ok && x.Timeout() {
  1212. err = ErrTimeout
  1213. }
  1214. isConnRST := isConnectionReset(err)
  1215. if err != nil && !isConnRST {
  1216. c.closeConn(cc)
  1217. return true, err
  1218. }
  1219. readDeadline := deadline
  1220. if c.ReadTimeout > 0 {
  1221. tmpReadDeadline := time.Now().Add(c.ReadTimeout)
  1222. if readDeadline.IsZero() || tmpReadDeadline.Before(readDeadline) {
  1223. readDeadline = tmpReadDeadline
  1224. }
  1225. }
  1226. if !readDeadline.IsZero() {
  1227. // Set Deadline every time, since golang has fixed the performance issue
  1228. // See https://github.com/golang/go/issues/15133#issuecomment-271571395 for details
  1229. if err = conn.SetReadDeadline(readDeadline); err != nil {
  1230. c.closeConn(cc)
  1231. return true, err
  1232. }
  1233. }
  1234. if customSkipBody || req.Header.IsHead() {
  1235. resp.SkipBody = true
  1236. }
  1237. if c.DisableHeaderNamesNormalizing {
  1238. resp.Header.DisableNormalizing()
  1239. }
  1240. br := c.acquireReader(conn)
  1241. err = resp.ReadLimitBody(br, c.MaxResponseBodySize)
  1242. c.releaseReader(br)
  1243. if err != nil {
  1244. c.closeConn(cc)
  1245. // Don't retry in case of ErrBodyTooLarge since we will just get the same again.
  1246. retry := err != ErrBodyTooLarge
  1247. return retry, err
  1248. }
  1249. if resetConnection || req.ConnectionClose() || resp.ConnectionClose() || isConnRST {
  1250. c.closeConn(cc)
  1251. } else {
  1252. c.releaseConn(cc)
  1253. }
  1254. return false, nil
  1255. }
  1256. var (
  1257. // ErrNoFreeConns is returned when no free connections available
  1258. // to the given host.
  1259. //
  1260. // Increase the allowed number of connections per host if you
  1261. // see this error.
  1262. ErrNoFreeConns = errors.New("no free connections available to host")
  1263. // ErrConnectionClosed may be returned from client methods if the server
  1264. // closes connection before returning the first response byte.
  1265. //
  1266. // If you see this error, then either fix the server by returning
  1267. // 'Connection: close' response header before closing the connection
  1268. // or add 'Connection: close' request header before sending requests
  1269. // to broken server.
  1270. ErrConnectionClosed = errors.New("the server closed connection before returning the first response byte. " +
  1271. "Make sure the server returns 'Connection: close' response header before closing the connection")
  1272. // ErrConnPoolStrategyNotImpl is returned when HostClient.ConnPoolStrategy is not implement yet.
  1273. // If you see this error, then you need to check your HostClient configuration.
  1274. ErrConnPoolStrategyNotImpl = errors.New("connection pool strategy is not implement")
  1275. )
  1276. type timeoutError struct{}
  1277. func (e *timeoutError) Error() string {
  1278. return "timeout"
  1279. }
  1280. // Only implement the Timeout() function of the net.Error interface.
  1281. // This allows for checks like:
  1282. //
  1283. // if x, ok := err.(interface{ Timeout() bool }); ok && x.Timeout() {
  1284. func (e *timeoutError) Timeout() bool {
  1285. return true
  1286. }
  1287. // ErrTimeout is returned from timed out calls.
  1288. var ErrTimeout = &timeoutError{}
  1289. // SetMaxConns sets up the maximum number of connections which may be established to all hosts listed in Addr.
  1290. func (c *HostClient) SetMaxConns(newMaxConns int) {
  1291. c.connsLock.Lock()
  1292. c.MaxConns = newMaxConns
  1293. c.connsLock.Unlock()
  1294. }
  1295. func (c *HostClient) acquireConn(reqTimeout time.Duration, connectionClose bool) (cc *clientConn, err error) {
  1296. createConn := false
  1297. startCleaner := false
  1298. var n int
  1299. c.connsLock.Lock()
  1300. n = len(c.conns)
  1301. if n == 0 {
  1302. maxConns := c.MaxConns
  1303. if maxConns <= 0 {
  1304. maxConns = DefaultMaxConnsPerHost
  1305. }
  1306. if c.connsCount < maxConns {
  1307. c.connsCount++
  1308. createConn = true
  1309. if !c.connsCleanerRun && !connectionClose {
  1310. startCleaner = true
  1311. c.connsCleanerRun = true
  1312. }
  1313. }
  1314. } else {
  1315. switch c.ConnPoolStrategy {
  1316. case LIFO:
  1317. n--
  1318. cc = c.conns[n]
  1319. c.conns[n] = nil
  1320. c.conns = c.conns[:n]
  1321. case FIFO:
  1322. cc = c.conns[0]
  1323. copy(c.conns, c.conns[1:])
  1324. c.conns[n-1] = nil
  1325. c.conns = c.conns[:n-1]
  1326. default:
  1327. return nil, ErrConnPoolStrategyNotImpl
  1328. }
  1329. }
  1330. c.connsLock.Unlock()
  1331. if cc != nil {
  1332. return cc, nil
  1333. }
  1334. if !createConn {
  1335. if c.MaxConnWaitTimeout <= 0 {
  1336. return nil, ErrNoFreeConns
  1337. }
  1338. // reqTimeout c.MaxConnWaitTimeout wait duration
  1339. // d1 d2 min(d1, d2)
  1340. // 0(not set) d2 d2
  1341. // d1 0(don't wait) 0(don't wait)
  1342. // 0(not set) d2 d2
  1343. timeout := c.MaxConnWaitTimeout
  1344. timeoutOverridden := false
  1345. // reqTimeout == 0 means not set
  1346. if reqTimeout > 0 && reqTimeout < timeout {
  1347. timeout = reqTimeout
  1348. timeoutOverridden = true
  1349. }
  1350. // wait for a free connection
  1351. tc := AcquireTimer(timeout)
  1352. defer ReleaseTimer(tc)
  1353. w := &wantConn{
  1354. ready: make(chan struct{}, 1),
  1355. }
  1356. defer func() {
  1357. if err != nil {
  1358. w.cancel(c, err)
  1359. }
  1360. }()
  1361. c.queueForIdle(w)
  1362. select {
  1363. case <-w.ready:
  1364. return w.conn, w.err
  1365. case <-tc.C:
  1366. if timeoutOverridden {
  1367. return nil, ErrTimeout
  1368. }
  1369. return nil, ErrNoFreeConns
  1370. }
  1371. }
  1372. if startCleaner {
  1373. go c.connsCleaner()
  1374. }
  1375. conn, err := c.dialHostHard()
  1376. if err != nil {
  1377. c.decConnsCount()
  1378. return nil, err
  1379. }
  1380. cc = acquireClientConn(conn)
  1381. return cc, nil
  1382. }
  1383. func (c *HostClient) queueForIdle(w *wantConn) {
  1384. c.connsLock.Lock()
  1385. defer c.connsLock.Unlock()
  1386. if c.connsWait == nil {
  1387. c.connsWait = &wantConnQueue{}
  1388. }
  1389. c.connsWait.clearFront()
  1390. c.connsWait.pushBack(w)
  1391. }
  1392. func (c *HostClient) dialConnFor(w *wantConn) {
  1393. conn, err := c.dialHostHard()
  1394. if err != nil {
  1395. w.tryDeliver(nil, err)
  1396. c.decConnsCount()
  1397. return
  1398. }
  1399. cc := acquireClientConn(conn)
  1400. delivered := w.tryDeliver(cc, nil)
  1401. if !delivered {
  1402. // not delivered, return idle connection
  1403. c.releaseConn(cc)
  1404. }
  1405. }
  1406. // CloseIdleConnections closes any connections which were previously
  1407. // connected from previous requests but are now sitting idle in a
  1408. // "keep-alive" state. It does not interrupt any connections currently
  1409. // in use.
  1410. func (c *HostClient) CloseIdleConnections() {
  1411. c.connsLock.Lock()
  1412. scratch := append([]*clientConn{}, c.conns...)
  1413. for i := range c.conns {
  1414. c.conns[i] = nil
  1415. }
  1416. c.conns = c.conns[:0]
  1417. c.connsLock.Unlock()
  1418. for _, cc := range scratch {
  1419. c.closeConn(cc)
  1420. }
  1421. }
  1422. func (c *HostClient) connsCleaner() {
  1423. var (
  1424. scratch []*clientConn
  1425. maxIdleConnDuration = c.MaxIdleConnDuration
  1426. )
  1427. if maxIdleConnDuration <= 0 {
  1428. maxIdleConnDuration = DefaultMaxIdleConnDuration
  1429. }
  1430. for {
  1431. currentTime := time.Now()
  1432. // Determine idle connections to be closed.
  1433. c.connsLock.Lock()
  1434. conns := c.conns
  1435. n := len(conns)
  1436. i := 0
  1437. for i < n && currentTime.Sub(conns[i].lastUseTime) > maxIdleConnDuration {
  1438. i++
  1439. }
  1440. sleepFor := maxIdleConnDuration
  1441. if i < n {
  1442. // + 1 so we actually sleep past the expiration time and not up to it.
  1443. // Otherwise the > check above would still fail.
  1444. sleepFor = maxIdleConnDuration - currentTime.Sub(conns[i].lastUseTime) + 1
  1445. }
  1446. scratch = append(scratch[:0], conns[:i]...)
  1447. if i > 0 {
  1448. m := copy(conns, conns[i:])
  1449. for i = m; i < n; i++ {
  1450. conns[i] = nil
  1451. }
  1452. c.conns = conns[:m]
  1453. }
  1454. c.connsLock.Unlock()
  1455. // Close idle connections.
  1456. for i, cc := range scratch {
  1457. c.closeConn(cc)
  1458. scratch[i] = nil
  1459. }
  1460. // Determine whether to stop the connsCleaner.
  1461. c.connsLock.Lock()
  1462. mustStop := c.connsCount == 0
  1463. if mustStop {
  1464. c.connsCleanerRun = false
  1465. }
  1466. c.connsLock.Unlock()
  1467. if mustStop {
  1468. break
  1469. }
  1470. time.Sleep(sleepFor)
  1471. }
  1472. }
  1473. func (c *HostClient) closeConn(cc *clientConn) {
  1474. c.decConnsCount()
  1475. cc.c.Close()
  1476. releaseClientConn(cc)
  1477. }
  1478. func (c *HostClient) decConnsCount() {
  1479. if c.MaxConnWaitTimeout <= 0 {
  1480. c.connsLock.Lock()
  1481. c.connsCount--
  1482. c.connsLock.Unlock()
  1483. return
  1484. }
  1485. c.connsLock.Lock()
  1486. defer c.connsLock.Unlock()
  1487. dialed := false
  1488. if q := c.connsWait; q != nil && q.len() > 0 {
  1489. for q.len() > 0 {
  1490. w := q.popFront()
  1491. if w.waiting() {
  1492. go c.dialConnFor(w)
  1493. dialed = true
  1494. break
  1495. }
  1496. }
  1497. }
  1498. if !dialed {
  1499. c.connsCount--
  1500. }
  1501. }
  1502. // ConnsCount returns connection count of HostClient
  1503. func (c *HostClient) ConnsCount() int {
  1504. c.connsLock.Lock()
  1505. defer c.connsLock.Unlock()
  1506. return c.connsCount
  1507. }
  1508. func acquireClientConn(conn net.Conn) *clientConn {
  1509. v := clientConnPool.Get()
  1510. if v == nil {
  1511. v = &clientConn{}
  1512. }
  1513. cc := v.(*clientConn)
  1514. cc.c = conn
  1515. cc.createdTime = time.Now()
  1516. return cc
  1517. }
  1518. func releaseClientConn(cc *clientConn) {
  1519. // Reset all fields.
  1520. *cc = clientConn{}
  1521. clientConnPool.Put(cc)
  1522. }
  1523. var clientConnPool sync.Pool
  1524. func (c *HostClient) releaseConn(cc *clientConn) {
  1525. cc.lastUseTime = time.Now()
  1526. if c.MaxConnWaitTimeout <= 0 {
  1527. c.connsLock.Lock()
  1528. c.conns = append(c.conns, cc)
  1529. c.connsLock.Unlock()
  1530. return
  1531. }
  1532. // try to deliver an idle connection to a *wantConn
  1533. c.connsLock.Lock()
  1534. defer c.connsLock.Unlock()
  1535. delivered := false
  1536. if q := c.connsWait; q != nil && q.len() > 0 {
  1537. for q.len() > 0 {
  1538. w := q.popFront()
  1539. if w.waiting() {
  1540. delivered = w.tryDeliver(cc, nil)
  1541. break
  1542. }
  1543. }
  1544. }
  1545. if !delivered {
  1546. c.conns = append(c.conns, cc)
  1547. }
  1548. }
  1549. func (c *HostClient) acquireWriter(conn net.Conn) *bufio.Writer {
  1550. var v interface{}
  1551. if c.clientWriterPool != nil {
  1552. v = c.clientWriterPool.Get()
  1553. if v == nil {
  1554. n := c.WriteBufferSize
  1555. if n <= 0 {
  1556. n = defaultWriteBufferSize
  1557. }
  1558. return bufio.NewWriterSize(conn, n)
  1559. }
  1560. } else {
  1561. v = c.writerPool.Get()
  1562. if v == nil {
  1563. n := c.WriteBufferSize
  1564. if n <= 0 {
  1565. n = defaultWriteBufferSize
  1566. }
  1567. return bufio.NewWriterSize(conn, n)
  1568. }
  1569. }
  1570. bw := v.(*bufio.Writer)
  1571. bw.Reset(conn)
  1572. return bw
  1573. }
  1574. func (c *HostClient) releaseWriter(bw *bufio.Writer) {
  1575. if c.clientWriterPool != nil {
  1576. c.clientWriterPool.Put(bw)
  1577. } else {
  1578. c.writerPool.Put(bw)
  1579. }
  1580. }
  1581. func (c *HostClient) acquireReader(conn net.Conn) *bufio.Reader {
  1582. var v interface{}
  1583. if c.clientReaderPool != nil {
  1584. v = c.clientReaderPool.Get()
  1585. if v == nil {
  1586. n := c.ReadBufferSize
  1587. if n <= 0 {
  1588. n = defaultReadBufferSize
  1589. }
  1590. return bufio.NewReaderSize(conn, n)
  1591. }
  1592. } else {
  1593. v = c.readerPool.Get()
  1594. if v == nil {
  1595. n := c.ReadBufferSize
  1596. if n <= 0 {
  1597. n = defaultReadBufferSize
  1598. }
  1599. return bufio.NewReaderSize(conn, n)
  1600. }
  1601. }
  1602. br := v.(*bufio.Reader)
  1603. br.Reset(conn)
  1604. return br
  1605. }
  1606. func (c *HostClient) releaseReader(br *bufio.Reader) {
  1607. if c.clientReaderPool != nil {
  1608. c.clientReaderPool.Put(br)
  1609. } else {
  1610. c.readerPool.Put(br)
  1611. }
  1612. }
  1613. func newClientTLSConfig(c *tls.Config, addr string) *tls.Config {
  1614. if c == nil {
  1615. c = &tls.Config{}
  1616. } else {
  1617. c = c.Clone()
  1618. }
  1619. if len(c.ServerName) == 0 {
  1620. serverName := tlsServerName(addr)
  1621. if serverName == "*" {
  1622. c.InsecureSkipVerify = true
  1623. } else {
  1624. c.ServerName = serverName
  1625. }
  1626. }
  1627. return c
  1628. }
  1629. func tlsServerName(addr string) string {
  1630. if !strings.Contains(addr, ":") {
  1631. return addr
  1632. }
  1633. host, _, err := net.SplitHostPort(addr)
  1634. if err != nil {
  1635. return "*"
  1636. }
  1637. return host
  1638. }
  1639. func (c *HostClient) nextAddr() string {
  1640. c.addrsLock.Lock()
  1641. if c.addrs == nil {
  1642. c.addrs = strings.Split(c.Addr, ",")
  1643. }
  1644. addr := c.addrs[0]
  1645. if len(c.addrs) > 1 {
  1646. addr = c.addrs[c.addrIdx%uint32(len(c.addrs))]
  1647. c.addrIdx++
  1648. }
  1649. c.addrsLock.Unlock()
  1650. return addr
  1651. }
  1652. func (c *HostClient) dialHostHard() (conn net.Conn, err error) {
  1653. // attempt to dial all the available hosts before giving up.
  1654. c.addrsLock.Lock()
  1655. n := len(c.addrs)
  1656. c.addrsLock.Unlock()
  1657. if n == 0 {
  1658. // It looks like c.addrs isn't initialized yet.
  1659. n = 1
  1660. }
  1661. timeout := c.ReadTimeout + c.WriteTimeout
  1662. if timeout <= 0 {
  1663. timeout = DefaultDialTimeout
  1664. }
  1665. deadline := time.Now().Add(timeout)
  1666. for n > 0 {
  1667. addr := c.nextAddr()
  1668. tlsConfig := c.cachedTLSConfig(addr)
  1669. conn, err = dialAddr(addr, c.Dial, c.DialDualStack, c.IsTLS, tlsConfig, c.WriteTimeout)
  1670. if err == nil {
  1671. return conn, nil
  1672. }
  1673. if time.Since(deadline) >= 0 {
  1674. break
  1675. }
  1676. n--
  1677. }
  1678. return nil, err
  1679. }
  1680. func (c *HostClient) cachedTLSConfig(addr string) *tls.Config {
  1681. if !c.IsTLS {
  1682. return nil
  1683. }
  1684. c.tlsConfigMapLock.Lock()
  1685. if c.tlsConfigMap == nil {
  1686. c.tlsConfigMap = make(map[string]*tls.Config)
  1687. }
  1688. cfg := c.tlsConfigMap[addr]
  1689. if cfg == nil {
  1690. cfg = newClientTLSConfig(c.TLSConfig, addr)
  1691. c.tlsConfigMap[addr] = cfg
  1692. }
  1693. c.tlsConfigMapLock.Unlock()
  1694. return cfg
  1695. }
  1696. // ErrTLSHandshakeTimeout indicates there is a timeout from tls handshake.
  1697. var ErrTLSHandshakeTimeout = errors.New("tls handshake timed out")
  1698. func tlsClientHandshake(rawConn net.Conn, tlsConfig *tls.Config, deadline time.Time) (_ net.Conn, retErr error) {
  1699. defer func() {
  1700. if retErr != nil {
  1701. rawConn.Close()
  1702. }
  1703. }()
  1704. conn := tls.Client(rawConn, tlsConfig)
  1705. err := conn.SetDeadline(deadline)
  1706. if err != nil {
  1707. return nil, err
  1708. }
  1709. err = conn.Handshake()
  1710. if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
  1711. return nil, ErrTLSHandshakeTimeout
  1712. }
  1713. if err != nil {
  1714. return nil, err
  1715. }
  1716. err = conn.SetDeadline(time.Time{})
  1717. if err != nil {
  1718. return nil, err
  1719. }
  1720. return conn, nil
  1721. }
  1722. func dialAddr(addr string, dial DialFunc, dialDualStack, isTLS bool, tlsConfig *tls.Config, timeout time.Duration) (net.Conn, error) {
  1723. deadline := time.Now().Add(timeout)
  1724. if dial == nil {
  1725. if dialDualStack {
  1726. dial = DialDualStack
  1727. } else {
  1728. dial = Dial
  1729. }
  1730. addr = addMissingPort(addr, isTLS)
  1731. }
  1732. conn, err := dial(addr)
  1733. if err != nil {
  1734. return nil, err
  1735. }
  1736. if conn == nil {
  1737. panic("BUG: DialFunc returned (nil, nil)")
  1738. }
  1739. // We assume that any conn that has the Handshake() method is a TLS conn already.
  1740. // This doesn't cover just tls.Conn but also other TLS implementations.
  1741. _, isTLSAlready := conn.(interface{ Handshake() error })
  1742. if isTLS && !isTLSAlready {
  1743. if timeout == 0 {
  1744. return tls.Client(conn, tlsConfig), nil
  1745. }
  1746. return tlsClientHandshake(conn, tlsConfig, deadline)
  1747. }
  1748. return conn, nil
  1749. }
  1750. func (c *HostClient) getClientName() []byte {
  1751. v := c.clientName.Load()
  1752. var clientName []byte
  1753. if v == nil {
  1754. clientName = []byte(c.Name)
  1755. if len(clientName) == 0 && !c.NoDefaultUserAgentHeader {
  1756. clientName = defaultUserAgent
  1757. }
  1758. c.clientName.Store(clientName)
  1759. } else {
  1760. clientName = v.([]byte)
  1761. }
  1762. return clientName
  1763. }
  1764. func addMissingPort(addr string, isTLS bool) string {
  1765. n := strings.Index(addr, ":")
  1766. if n >= 0 {
  1767. return addr
  1768. }
  1769. port := 80
  1770. if isTLS {
  1771. port = 443
  1772. }
  1773. return net.JoinHostPort(addr, strconv.Itoa(port))
  1774. }
  1775. // A wantConn records state about a wanted connection
  1776. // (that is, an active call to getConn).
  1777. // The conn may be gotten by dialing or by finding an idle connection,
  1778. // or a cancellation may make the conn no longer wanted.
  1779. // These three options are racing against each other and use
  1780. // wantConn to coordinate and agree about the winning outcome.
  1781. //
  1782. // inspired by net/http/transport.go
  1783. type wantConn struct {
  1784. ready chan struct{}
  1785. mu sync.Mutex // protects conn, err, close(ready)
  1786. conn *clientConn
  1787. err error
  1788. }
  1789. // waiting reports whether w is still waiting for an answer (connection or error).
  1790. func (w *wantConn) waiting() bool {
  1791. select {
  1792. case <-w.ready:
  1793. return false
  1794. default:
  1795. return true
  1796. }
  1797. }
  1798. // tryDeliver attempts to deliver conn, err to w and reports whether it succeeded.
  1799. func (w *wantConn) tryDeliver(conn *clientConn, err error) bool {
  1800. w.mu.Lock()
  1801. defer w.mu.Unlock()
  1802. if w.conn != nil || w.err != nil {
  1803. return false
  1804. }
  1805. w.conn = conn
  1806. w.err = err
  1807. if w.conn == nil && w.err == nil {
  1808. panic("fasthttp: internal error: misuse of tryDeliver")
  1809. }
  1810. close(w.ready)
  1811. return true
  1812. }
  1813. // cancel marks w as no longer wanting a result (for example, due to cancellation).
  1814. // If a connection has been delivered already, cancel returns it with c.releaseConn.
  1815. func (w *wantConn) cancel(c *HostClient, err error) {
  1816. w.mu.Lock()
  1817. if w.conn == nil && w.err == nil {
  1818. close(w.ready) // catch misbehavior in future delivery
  1819. }
  1820. conn := w.conn
  1821. w.conn = nil
  1822. w.err = err
  1823. w.mu.Unlock()
  1824. if conn != nil {
  1825. c.releaseConn(conn)
  1826. }
  1827. }
  1828. // A wantConnQueue is a queue of wantConns.
  1829. //
  1830. // inspired by net/http/transport.go
  1831. type wantConnQueue struct {
  1832. // This is a queue, not a deque.
  1833. // It is split into two stages - head[headPos:] and tail.
  1834. // popFront is trivial (headPos++) on the first stage, and
  1835. // pushBack is trivial (append) on the second stage.
  1836. // If the first stage is empty, popFront can swap the
  1837. // first and second stages to remedy the situation.
  1838. //
  1839. // This two-stage split is analogous to the use of two lists
  1840. // in Okasaki's purely functional queue but without the
  1841. // overhead of reversing the list when swapping stages.
  1842. head []*wantConn
  1843. headPos int
  1844. tail []*wantConn
  1845. }
  1846. // len returns the number of items in the queue.
  1847. func (q *wantConnQueue) len() int {
  1848. return len(q.head) - q.headPos + len(q.tail)
  1849. }
  1850. // pushBack adds w to the back of the queue.
  1851. func (q *wantConnQueue) pushBack(w *wantConn) {
  1852. q.tail = append(q.tail, w)
  1853. }
  1854. // popFront removes and returns the wantConn at the front of the queue.
  1855. func (q *wantConnQueue) popFront() *wantConn {
  1856. if q.headPos >= len(q.head) {
  1857. if len(q.tail) == 0 {
  1858. return nil
  1859. }
  1860. // Pick up tail as new head, clear tail.
  1861. q.head, q.headPos, q.tail = q.tail, 0, q.head[:0]
  1862. }
  1863. w := q.head[q.headPos]
  1864. q.head[q.headPos] = nil
  1865. q.headPos++
  1866. return w
  1867. }
  1868. // peekFront returns the wantConn at the front of the queue without removing it.
  1869. func (q *wantConnQueue) peekFront() *wantConn {
  1870. if q.headPos < len(q.head) {
  1871. return q.head[q.headPos]
  1872. }
  1873. if len(q.tail) > 0 {
  1874. return q.tail[0]
  1875. }
  1876. return nil
  1877. }
  1878. // cleanFront pops any wantConns that are no longer waiting from the head of the
  1879. // queue, reporting whether any were popped.
  1880. func (q *wantConnQueue) clearFront() (cleaned bool) {
  1881. for {
  1882. w := q.peekFront()
  1883. if w == nil || w.waiting() {
  1884. return cleaned
  1885. }
  1886. q.popFront()
  1887. cleaned = true
  1888. }
  1889. }
  1890. // PipelineClient pipelines requests over a limited set of concurrent
  1891. // connections to the given Addr.
  1892. //
  1893. // This client may be used in highly loaded HTTP-based RPC systems for reducing
  1894. // context switches and network level overhead.
  1895. // See https://en.wikipedia.org/wiki/HTTP_pipelining for details.
  1896. //
  1897. // It is forbidden copying PipelineClient instances. Create new instances
  1898. // instead.
  1899. //
  1900. // It is safe calling PipelineClient methods from concurrently running
  1901. // goroutines.
  1902. type PipelineClient struct {
  1903. noCopy noCopy //nolint:unused,structcheck
  1904. // Address of the host to connect to.
  1905. Addr string
  1906. // PipelineClient name. Used in User-Agent request header.
  1907. Name string
  1908. // NoDefaultUserAgentHeader when set to true, causes the default
  1909. // User-Agent header to be excluded from the Request.
  1910. NoDefaultUserAgentHeader bool
  1911. // The maximum number of concurrent connections to the Addr.
  1912. //
  1913. // A single connection is used by default.
  1914. MaxConns int
  1915. // The maximum number of pending pipelined requests over
  1916. // a single connection to Addr.
  1917. //
  1918. // DefaultMaxPendingRequests is used by default.
  1919. MaxPendingRequests int
  1920. // The maximum delay before sending pipelined requests as a batch
  1921. // to the server.
  1922. //
  1923. // By default requests are sent immediately to the server.
  1924. MaxBatchDelay time.Duration
  1925. // Callback for connection establishing to the host.
  1926. //
  1927. // Default Dial is used if not set.
  1928. Dial DialFunc
  1929. // Attempt to connect to both ipv4 and ipv6 host addresses
  1930. // if set to true.
  1931. //
  1932. // This option is used only if default TCP dialer is used,
  1933. // i.e. if Dial is blank.
  1934. //
  1935. // By default client connects only to ipv4 addresses,
  1936. // since unfortunately ipv6 remains broken in many networks worldwide :)
  1937. DialDualStack bool
  1938. // Response header names are passed as-is without normalization
  1939. // if this option is set.
  1940. //
  1941. // Disabled header names' normalization may be useful only for proxying
  1942. // responses to other clients expecting case-sensitive
  1943. // header names. See https://github.com/valyala/fasthttp/issues/57
  1944. // for details.
  1945. //
  1946. // By default request and response header names are normalized, i.e.
  1947. // The first letter and the first letters following dashes
  1948. // are uppercased, while all the other letters are lowercased.
  1949. // Examples:
  1950. //
  1951. // * HOST -> Host
  1952. // * content-type -> Content-Type
  1953. // * cONTENT-lenGTH -> Content-Length
  1954. DisableHeaderNamesNormalizing bool
  1955. // Path values are sent as-is without normalization
  1956. //
  1957. // Disabled path normalization may be useful for proxying incoming requests
  1958. // to servers that are expecting paths to be forwarded as-is.
  1959. //
  1960. // By default path values are normalized, i.e.
  1961. // extra slashes are removed, special characters are encoded.
  1962. DisablePathNormalizing bool
  1963. // Whether to use TLS (aka SSL or HTTPS) for host connections.
  1964. IsTLS bool
  1965. // Optional TLS config.
  1966. TLSConfig *tls.Config
  1967. // Idle connection to the host is closed after this duration.
  1968. //
  1969. // By default idle connection is closed after
  1970. // DefaultMaxIdleConnDuration.
  1971. MaxIdleConnDuration time.Duration
  1972. // Buffer size for responses' reading.
  1973. // This also limits the maximum header size.
  1974. //
  1975. // Default buffer size is used if 0.
  1976. ReadBufferSize int
  1977. // Buffer size for requests' writing.
  1978. //
  1979. // Default buffer size is used if 0.
  1980. WriteBufferSize int
  1981. // Maximum duration for full response reading (including body).
  1982. //
  1983. // By default response read timeout is unlimited.
  1984. ReadTimeout time.Duration
  1985. // Maximum duration for full request writing (including body).
  1986. //
  1987. // By default request write timeout is unlimited.
  1988. WriteTimeout time.Duration
  1989. // Logger for logging client errors.
  1990. //
  1991. // By default standard logger from log package is used.
  1992. Logger Logger
  1993. connClients []*pipelineConnClient
  1994. connClientsLock sync.Mutex
  1995. }
  1996. type pipelineConnClient struct {
  1997. noCopy noCopy //nolint:unused,structcheck
  1998. Addr string
  1999. Name string
  2000. NoDefaultUserAgentHeader bool
  2001. MaxPendingRequests int
  2002. MaxBatchDelay time.Duration
  2003. Dial DialFunc
  2004. DialDualStack bool
  2005. DisableHeaderNamesNormalizing bool
  2006. DisablePathNormalizing bool
  2007. IsTLS bool
  2008. TLSConfig *tls.Config
  2009. MaxIdleConnDuration time.Duration
  2010. ReadBufferSize int
  2011. WriteBufferSize int
  2012. ReadTimeout time.Duration
  2013. WriteTimeout time.Duration
  2014. Logger Logger
  2015. workPool sync.Pool
  2016. chLock sync.Mutex
  2017. chW chan *pipelineWork
  2018. chR chan *pipelineWork
  2019. tlsConfigLock sync.Mutex
  2020. tlsConfig *tls.Config
  2021. clientName atomic.Value
  2022. }
  2023. type pipelineWork struct {
  2024. reqCopy Request
  2025. respCopy Response
  2026. req *Request
  2027. resp *Response
  2028. t *time.Timer
  2029. deadline time.Time
  2030. err error
  2031. done chan struct{}
  2032. }
  2033. // DoTimeout performs the given request and waits for response during
  2034. // the given timeout duration.
  2035. //
  2036. // Request must contain at least non-zero RequestURI with full url (including
  2037. // scheme and host) or non-zero Host header + RequestURI.
  2038. //
  2039. // The function doesn't follow redirects.
  2040. //
  2041. // Response is ignored if resp is nil.
  2042. //
  2043. // ErrTimeout is returned if the response wasn't returned during
  2044. // the given timeout.
  2045. //
  2046. // It is recommended obtaining req and resp via AcquireRequest
  2047. // and AcquireResponse in performance-critical code.
  2048. //
  2049. // Warning: DoTimeout does not terminate the request itself. The request will
  2050. // continue in the background and the response will be discarded.
  2051. // If requests take too long and the connection pool gets filled up please
  2052. // try setting a ReadTimeout.
  2053. func (c *PipelineClient) DoTimeout(req *Request, resp *Response, timeout time.Duration) error {
  2054. return c.DoDeadline(req, resp, time.Now().Add(timeout))
  2055. }
  2056. // DoDeadline performs the given request and waits for response until
  2057. // the given deadline.
  2058. //
  2059. // Request must contain at least non-zero RequestURI with full url (including
  2060. // scheme and host) or non-zero Host header + RequestURI.
  2061. //
  2062. // The function doesn't follow redirects.
  2063. //
  2064. // Response is ignored if resp is nil.
  2065. //
  2066. // ErrTimeout is returned if the response wasn't returned until
  2067. // the given deadline.
  2068. //
  2069. // It is recommended obtaining req and resp via AcquireRequest
  2070. // and AcquireResponse in performance-critical code.
  2071. func (c *PipelineClient) DoDeadline(req *Request, resp *Response, deadline time.Time) error {
  2072. return c.getConnClient().DoDeadline(req, resp, deadline)
  2073. }
  2074. func (c *pipelineConnClient) DoDeadline(req *Request, resp *Response, deadline time.Time) error {
  2075. c.init()
  2076. timeout := -time.Since(deadline)
  2077. if timeout < 0 {
  2078. return ErrTimeout
  2079. }
  2080. if c.DisablePathNormalizing {
  2081. req.URI().DisablePathNormalizing = true
  2082. }
  2083. userAgentOld := req.Header.UserAgent()
  2084. if len(userAgentOld) == 0 {
  2085. req.Header.userAgent = append(req.Header.userAgent[:0], c.getClientName()...)
  2086. }
  2087. w := acquirePipelineWork(&c.workPool, timeout)
  2088. w.respCopy.Header.disableNormalizing = c.DisableHeaderNamesNormalizing
  2089. w.req = &w.reqCopy
  2090. w.resp = &w.respCopy
  2091. // Make a copy of the request in order to avoid data races on timeouts
  2092. req.copyToSkipBody(&w.reqCopy)
  2093. swapRequestBody(req, &w.reqCopy)
  2094. // Put the request to outgoing queue
  2095. select {
  2096. case c.chW <- w:
  2097. // Fast path: len(c.ch) < cap(c.ch)
  2098. default:
  2099. // Slow path
  2100. select {
  2101. case c.chW <- w:
  2102. case <-w.t.C:
  2103. releasePipelineWork(&c.workPool, w)
  2104. return ErrTimeout
  2105. }
  2106. }
  2107. // Wait for the response
  2108. var err error
  2109. select {
  2110. case <-w.done:
  2111. if resp != nil {
  2112. w.respCopy.copyToSkipBody(resp)
  2113. swapResponseBody(resp, &w.respCopy)
  2114. }
  2115. err = w.err
  2116. releasePipelineWork(&c.workPool, w)
  2117. case <-w.t.C:
  2118. err = ErrTimeout
  2119. }
  2120. return err
  2121. }
  2122. // Do performs the given http request and sets the corresponding response.
  2123. //
  2124. // Request must contain at least non-zero RequestURI with full url (including
  2125. // scheme and host) or non-zero Host header + RequestURI.
  2126. //
  2127. // The function doesn't follow redirects. Use Get* for following redirects.
  2128. //
  2129. // Response is ignored if resp is nil.
  2130. //
  2131. // It is recommended obtaining req and resp via AcquireRequest
  2132. // and AcquireResponse in performance-critical code.
  2133. func (c *PipelineClient) Do(req *Request, resp *Response) error {
  2134. return c.getConnClient().Do(req, resp)
  2135. }
  2136. func (c *pipelineConnClient) Do(req *Request, resp *Response) error {
  2137. c.init()
  2138. if c.DisablePathNormalizing {
  2139. req.URI().DisablePathNormalizing = true
  2140. }
  2141. userAgentOld := req.Header.UserAgent()
  2142. if len(userAgentOld) == 0 {
  2143. req.Header.userAgent = append(req.Header.userAgent[:0], c.getClientName()...)
  2144. }
  2145. w := acquirePipelineWork(&c.workPool, 0)
  2146. w.req = req
  2147. if resp != nil {
  2148. resp.Header.disableNormalizing = c.DisableHeaderNamesNormalizing
  2149. w.resp = resp
  2150. } else {
  2151. w.resp = &w.respCopy
  2152. }
  2153. // Put the request to outgoing queue
  2154. select {
  2155. case c.chW <- w:
  2156. default:
  2157. // Try substituting the oldest w with the current one.
  2158. select {
  2159. case wOld := <-c.chW:
  2160. wOld.err = ErrPipelineOverflow
  2161. wOld.done <- struct{}{}
  2162. default:
  2163. }
  2164. select {
  2165. case c.chW <- w:
  2166. default:
  2167. releasePipelineWork(&c.workPool, w)
  2168. return ErrPipelineOverflow
  2169. }
  2170. }
  2171. // Wait for the response
  2172. <-w.done
  2173. err := w.err
  2174. releasePipelineWork(&c.workPool, w)
  2175. return err
  2176. }
  2177. func (c *PipelineClient) getConnClient() *pipelineConnClient {
  2178. c.connClientsLock.Lock()
  2179. cc := c.getConnClientUnlocked()
  2180. c.connClientsLock.Unlock()
  2181. return cc
  2182. }
  2183. func (c *PipelineClient) getConnClientUnlocked() *pipelineConnClient {
  2184. if len(c.connClients) == 0 {
  2185. return c.newConnClient()
  2186. }
  2187. // Return the client with the minimum number of pending requests.
  2188. minCC := c.connClients[0]
  2189. minReqs := minCC.PendingRequests()
  2190. if minReqs == 0 {
  2191. return minCC
  2192. }
  2193. for i := 1; i < len(c.connClients); i++ {
  2194. cc := c.connClients[i]
  2195. reqs := cc.PendingRequests()
  2196. if reqs == 0 {
  2197. return cc
  2198. }
  2199. if reqs < minReqs {
  2200. minCC = cc
  2201. minReqs = reqs
  2202. }
  2203. }
  2204. maxConns := c.MaxConns
  2205. if maxConns <= 0 {
  2206. maxConns = 1
  2207. }
  2208. if len(c.connClients) < maxConns {
  2209. return c.newConnClient()
  2210. }
  2211. return minCC
  2212. }
  2213. func (c *PipelineClient) newConnClient() *pipelineConnClient {
  2214. cc := &pipelineConnClient{
  2215. Addr: c.Addr,
  2216. Name: c.Name,
  2217. NoDefaultUserAgentHeader: c.NoDefaultUserAgentHeader,
  2218. MaxPendingRequests: c.MaxPendingRequests,
  2219. MaxBatchDelay: c.MaxBatchDelay,
  2220. Dial: c.Dial,
  2221. DialDualStack: c.DialDualStack,
  2222. DisableHeaderNamesNormalizing: c.DisableHeaderNamesNormalizing,
  2223. DisablePathNormalizing: c.DisablePathNormalizing,
  2224. IsTLS: c.IsTLS,
  2225. TLSConfig: c.TLSConfig,
  2226. MaxIdleConnDuration: c.MaxIdleConnDuration,
  2227. ReadBufferSize: c.ReadBufferSize,
  2228. WriteBufferSize: c.WriteBufferSize,
  2229. ReadTimeout: c.ReadTimeout,
  2230. WriteTimeout: c.WriteTimeout,
  2231. Logger: c.Logger,
  2232. }
  2233. c.connClients = append(c.connClients, cc)
  2234. return cc
  2235. }
  2236. // ErrPipelineOverflow may be returned from PipelineClient.Do*
  2237. // if the requests' queue is overflown.
  2238. var ErrPipelineOverflow = errors.New("pipelined requests' queue has been overflown. Increase MaxConns and/or MaxPendingRequests")
  2239. // DefaultMaxPendingRequests is the default value
  2240. // for PipelineClient.MaxPendingRequests.
  2241. const DefaultMaxPendingRequests = 1024
  2242. func (c *pipelineConnClient) init() {
  2243. c.chLock.Lock()
  2244. if c.chR == nil {
  2245. maxPendingRequests := c.MaxPendingRequests
  2246. if maxPendingRequests <= 0 {
  2247. maxPendingRequests = DefaultMaxPendingRequests
  2248. }
  2249. c.chR = make(chan *pipelineWork, maxPendingRequests)
  2250. if c.chW == nil {
  2251. c.chW = make(chan *pipelineWork, maxPendingRequests)
  2252. }
  2253. go func() {
  2254. // Keep restarting the worker if it fails (connection errors for example).
  2255. for {
  2256. if err := c.worker(); err != nil {
  2257. c.logger().Printf("error in PipelineClient(%q): %v", c.Addr, err)
  2258. if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
  2259. // Throttle client reconnections on timeout errors
  2260. time.Sleep(time.Second)
  2261. }
  2262. } else {
  2263. c.chLock.Lock()
  2264. stop := len(c.chR) == 0 && len(c.chW) == 0
  2265. if !stop {
  2266. c.chR = nil
  2267. c.chW = nil
  2268. }
  2269. c.chLock.Unlock()
  2270. if stop {
  2271. break
  2272. }
  2273. }
  2274. }
  2275. }()
  2276. }
  2277. c.chLock.Unlock()
  2278. }
  2279. func (c *pipelineConnClient) worker() error {
  2280. tlsConfig := c.cachedTLSConfig()
  2281. conn, err := dialAddr(c.Addr, c.Dial, c.DialDualStack, c.IsTLS, tlsConfig, c.WriteTimeout)
  2282. if err != nil {
  2283. return err
  2284. }
  2285. // Start reader and writer
  2286. stopW := make(chan struct{})
  2287. doneW := make(chan error)
  2288. go func() {
  2289. doneW <- c.writer(conn, stopW)
  2290. }()
  2291. stopR := make(chan struct{})
  2292. doneR := make(chan error)
  2293. go func() {
  2294. doneR <- c.reader(conn, stopR)
  2295. }()
  2296. // Wait until reader and writer are stopped
  2297. select {
  2298. case err = <-doneW:
  2299. conn.Close()
  2300. close(stopR)
  2301. <-doneR
  2302. case err = <-doneR:
  2303. conn.Close()
  2304. close(stopW)
  2305. <-doneW
  2306. }
  2307. // Notify pending readers
  2308. for len(c.chR) > 0 {
  2309. w := <-c.chR
  2310. w.err = errPipelineConnStopped
  2311. w.done <- struct{}{}
  2312. }
  2313. return err
  2314. }
  2315. func (c *pipelineConnClient) cachedTLSConfig() *tls.Config {
  2316. if !c.IsTLS {
  2317. return nil
  2318. }
  2319. c.tlsConfigLock.Lock()
  2320. cfg := c.tlsConfig
  2321. if cfg == nil {
  2322. cfg = newClientTLSConfig(c.TLSConfig, c.Addr)
  2323. c.tlsConfig = cfg
  2324. }
  2325. c.tlsConfigLock.Unlock()
  2326. return cfg
  2327. }
  2328. func (c *pipelineConnClient) writer(conn net.Conn, stopCh <-chan struct{}) error {
  2329. writeBufferSize := c.WriteBufferSize
  2330. if writeBufferSize <= 0 {
  2331. writeBufferSize = defaultWriteBufferSize
  2332. }
  2333. bw := bufio.NewWriterSize(conn, writeBufferSize)
  2334. defer bw.Flush()
  2335. chR := c.chR
  2336. chW := c.chW
  2337. writeTimeout := c.WriteTimeout
  2338. maxIdleConnDuration := c.MaxIdleConnDuration
  2339. if maxIdleConnDuration <= 0 {
  2340. maxIdleConnDuration = DefaultMaxIdleConnDuration
  2341. }
  2342. maxBatchDelay := c.MaxBatchDelay
  2343. var (
  2344. stopTimer = time.NewTimer(time.Hour)
  2345. flushTimer = time.NewTimer(time.Hour)
  2346. flushTimerCh <-chan time.Time
  2347. instantTimerCh = make(chan time.Time)
  2348. w *pipelineWork
  2349. err error
  2350. )
  2351. close(instantTimerCh)
  2352. for {
  2353. againChW:
  2354. select {
  2355. case w = <-chW:
  2356. // Fast path: len(chW) > 0
  2357. default:
  2358. // Slow path
  2359. stopTimer.Reset(maxIdleConnDuration)
  2360. select {
  2361. case w = <-chW:
  2362. case <-stopTimer.C:
  2363. return nil
  2364. case <-stopCh:
  2365. return nil
  2366. case <-flushTimerCh:
  2367. if err = bw.Flush(); err != nil {
  2368. return err
  2369. }
  2370. flushTimerCh = nil
  2371. goto againChW
  2372. }
  2373. }
  2374. if !w.deadline.IsZero() && time.Since(w.deadline) >= 0 {
  2375. w.err = ErrTimeout
  2376. w.done <- struct{}{}
  2377. continue
  2378. }
  2379. w.resp.parseNetConn(conn)
  2380. if writeTimeout > 0 {
  2381. // Set Deadline every time, since golang has fixed the performance issue
  2382. // See https://github.com/golang/go/issues/15133#issuecomment-271571395 for details
  2383. currentTime := time.Now()
  2384. if err = conn.SetWriteDeadline(currentTime.Add(writeTimeout)); err != nil {
  2385. w.err = err
  2386. w.done <- struct{}{}
  2387. return err
  2388. }
  2389. }
  2390. if err = w.req.Write(bw); err != nil {
  2391. w.err = err
  2392. w.done <- struct{}{}
  2393. return err
  2394. }
  2395. if flushTimerCh == nil && (len(chW) == 0 || len(chR) == cap(chR)) {
  2396. if maxBatchDelay > 0 {
  2397. flushTimer.Reset(maxBatchDelay)
  2398. flushTimerCh = flushTimer.C
  2399. } else {
  2400. flushTimerCh = instantTimerCh
  2401. }
  2402. }
  2403. againChR:
  2404. select {
  2405. case chR <- w:
  2406. // Fast path: len(chR) < cap(chR)
  2407. default:
  2408. // Slow path
  2409. select {
  2410. case chR <- w:
  2411. case <-stopCh:
  2412. w.err = errPipelineConnStopped
  2413. w.done <- struct{}{}
  2414. return nil
  2415. case <-flushTimerCh:
  2416. if err = bw.Flush(); err != nil {
  2417. w.err = err
  2418. w.done <- struct{}{}
  2419. return err
  2420. }
  2421. flushTimerCh = nil
  2422. goto againChR
  2423. }
  2424. }
  2425. }
  2426. }
  2427. func (c *pipelineConnClient) reader(conn net.Conn, stopCh <-chan struct{}) error {
  2428. readBufferSize := c.ReadBufferSize
  2429. if readBufferSize <= 0 {
  2430. readBufferSize = defaultReadBufferSize
  2431. }
  2432. br := bufio.NewReaderSize(conn, readBufferSize)
  2433. chR := c.chR
  2434. readTimeout := c.ReadTimeout
  2435. var (
  2436. w *pipelineWork
  2437. err error
  2438. )
  2439. for {
  2440. select {
  2441. case w = <-chR:
  2442. // Fast path: len(chR) > 0
  2443. default:
  2444. // Slow path
  2445. select {
  2446. case w = <-chR:
  2447. case <-stopCh:
  2448. return nil
  2449. }
  2450. }
  2451. if readTimeout > 0 {
  2452. // Set Deadline every time, since golang has fixed the performance issue
  2453. // See https://github.com/golang/go/issues/15133#issuecomment-271571395 for details
  2454. currentTime := time.Now()
  2455. if err = conn.SetReadDeadline(currentTime.Add(readTimeout)); err != nil {
  2456. w.err = err
  2457. w.done <- struct{}{}
  2458. return err
  2459. }
  2460. }
  2461. if err = w.resp.Read(br); err != nil {
  2462. w.err = err
  2463. w.done <- struct{}{}
  2464. return err
  2465. }
  2466. w.done <- struct{}{}
  2467. }
  2468. }
  2469. func (c *pipelineConnClient) logger() Logger {
  2470. if c.Logger != nil {
  2471. return c.Logger
  2472. }
  2473. return defaultLogger
  2474. }
  2475. // PendingRequests returns the current number of pending requests pipelined
  2476. // to the server.
  2477. //
  2478. // This number may exceed MaxPendingRequests*MaxConns by up to two times, since
  2479. // each connection to the server may keep up to MaxPendingRequests requests
  2480. // in the queue before sending them to the server.
  2481. //
  2482. // This function may be used for balancing load among multiple PipelineClient
  2483. // instances.
  2484. func (c *PipelineClient) PendingRequests() int {
  2485. c.connClientsLock.Lock()
  2486. n := 0
  2487. for _, cc := range c.connClients {
  2488. n += cc.PendingRequests()
  2489. }
  2490. c.connClientsLock.Unlock()
  2491. return n
  2492. }
  2493. func (c *pipelineConnClient) PendingRequests() int {
  2494. c.init()
  2495. c.chLock.Lock()
  2496. n := len(c.chR) + len(c.chW)
  2497. c.chLock.Unlock()
  2498. return n
  2499. }
  2500. func (c *pipelineConnClient) getClientName() []byte {
  2501. v := c.clientName.Load()
  2502. var clientName []byte
  2503. if v == nil {
  2504. clientName = []byte(c.Name)
  2505. if len(clientName) == 0 && !c.NoDefaultUserAgentHeader {
  2506. clientName = defaultUserAgent
  2507. }
  2508. c.clientName.Store(clientName)
  2509. } else {
  2510. clientName = v.([]byte)
  2511. }
  2512. return clientName
  2513. }
  2514. var errPipelineConnStopped = errors.New("pipeline connection has been stopped")
  2515. func acquirePipelineWork(pool *sync.Pool, timeout time.Duration) (w *pipelineWork) {
  2516. v := pool.Get()
  2517. if v != nil {
  2518. w = v.(*pipelineWork)
  2519. } else {
  2520. w = &pipelineWork{
  2521. done: make(chan struct{}, 1),
  2522. }
  2523. }
  2524. if timeout > 0 {
  2525. if w.t == nil {
  2526. w.t = time.NewTimer(timeout)
  2527. } else {
  2528. w.t.Reset(timeout)
  2529. }
  2530. w.deadline = time.Now().Add(timeout)
  2531. } else {
  2532. w.deadline = zeroTime
  2533. }
  2534. return w
  2535. }
  2536. func releasePipelineWork(pool *sync.Pool, w *pipelineWork) {
  2537. if w.t != nil {
  2538. w.t.Stop()
  2539. }
  2540. w.reqCopy.Reset()
  2541. w.respCopy.Reset()
  2542. w.req = nil
  2543. w.resp = nil
  2544. w.err = nil
  2545. pool.Put(w)
  2546. }