client.go 83 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910291129122913291429152916291729182919292029212922292329242925292629272928292929302931293229332934293529362937293829392940294129422943294429452946294729482949295029512952295329542955295629572958295929602961296229632964296529662967296829692970297129722973297429752976297729782979298029812982298329842985298629872988298929902991299229932994299529962997299829993000300130023003300430053006300730083009301030113012301330143015301630173018301930203021302230233024302530263027302830293030303130323033303430353036303730383039304030413042304330443045304630473048304930503051305230533054305530563057305830593060306130623063306430653066306730683069307030713072
  1. package fasthttp
  2. import (
  3. "bufio"
  4. "bytes"
  5. "crypto/tls"
  6. "errors"
  7. "fmt"
  8. "io"
  9. "net"
  10. "strings"
  11. "sync"
  12. "sync/atomic"
  13. "time"
  14. )
  15. // Do performs the given http request and fills the given http response.
  16. //
  17. // Request must contain at least non-zero RequestURI with full url (including
  18. // scheme and host) or non-zero Host header + RequestURI.
  19. //
  20. // Client determines the server to be requested in the following order:
  21. //
  22. // - from RequestURI if it contains full url with scheme and host;
  23. // - from Host header otherwise.
  24. //
  25. // The function doesn't follow redirects. Use Get* for following redirects.
  26. //
  27. // Response is ignored if resp is nil.
  28. //
  29. // ErrNoFreeConns is returned if all DefaultMaxConnsPerHost connections
  30. // to the requested host are busy.
  31. //
  32. // It is recommended obtaining req and resp via AcquireRequest
  33. // and AcquireResponse in performance-critical code.
  34. func Do(req *Request, resp *Response) error {
  35. return defaultClient.Do(req, resp)
  36. }
  37. // DoTimeout performs the given request and waits for response during
  38. // the given timeout duration.
  39. //
  40. // Request must contain at least non-zero RequestURI with full url (including
  41. // scheme and host) or non-zero Host header + RequestURI.
  42. //
  43. // Client determines the server to be requested in the following order:
  44. //
  45. // - from RequestURI if it contains full url with scheme and host;
  46. // - from Host header otherwise.
  47. //
  48. // The function doesn't follow redirects. Use Get* for following redirects.
  49. //
  50. // Response is ignored if resp is nil.
  51. //
  52. // ErrTimeout is returned if the response wasn't returned during
  53. // the given timeout.
  54. //
  55. // ErrNoFreeConns is returned if all DefaultMaxConnsPerHost connections
  56. // to the requested host are busy.
  57. //
  58. // It is recommended obtaining req and resp via AcquireRequest
  59. // and AcquireResponse in performance-critical code.
  60. func DoTimeout(req *Request, resp *Response, timeout time.Duration) error {
  61. return defaultClient.DoTimeout(req, resp, timeout)
  62. }
  63. // DoDeadline performs the given request and waits for response until
  64. // the given deadline.
  65. //
  66. // Request must contain at least non-zero RequestURI with full url (including
  67. // scheme and host) or non-zero Host header + RequestURI.
  68. //
  69. // Client determines the server to be requested in the following order:
  70. //
  71. // - from RequestURI if it contains full url with scheme and host;
  72. // - from Host header otherwise.
  73. //
  74. // The function doesn't follow redirects. Use Get* for following redirects.
  75. //
  76. // Response is ignored if resp is nil.
  77. //
  78. // ErrTimeout is returned if the response wasn't returned until
  79. // the given deadline.
  80. //
  81. // ErrNoFreeConns is returned if all DefaultMaxConnsPerHost connections
  82. // to the requested host are busy.
  83. //
  84. // It is recommended obtaining req and resp via AcquireRequest
  85. // and AcquireResponse in performance-critical code.
  86. func DoDeadline(req *Request, resp *Response, deadline time.Time) error {
  87. return defaultClient.DoDeadline(req, resp, deadline)
  88. }
  89. // DoRedirects performs the given http request and fills the given http response,
  90. // following up to maxRedirectsCount redirects. When the redirect count exceeds
  91. // maxRedirectsCount, ErrTooManyRedirects is returned.
  92. //
  93. // Request must contain at least non-zero RequestURI with full url (including
  94. // scheme and host) or non-zero Host header + RequestURI.
  95. //
  96. // Client determines the server to be requested in the following order:
  97. //
  98. // - from RequestURI if it contains full url with scheme and host;
  99. // - from Host header otherwise.
  100. //
  101. // Response is ignored if resp is nil.
  102. //
  103. // ErrNoFreeConns is returned if all DefaultMaxConnsPerHost connections
  104. // to the requested host are busy.
  105. //
  106. // It is recommended obtaining req and resp via AcquireRequest
  107. // and AcquireResponse in performance-critical code.
  108. func DoRedirects(req *Request, resp *Response, maxRedirectsCount int) error {
  109. _, _, err := doRequestFollowRedirects(req, resp, req.URI().String(), maxRedirectsCount, &defaultClient)
  110. return err
  111. }
  112. // Get returns the status code and body of url.
  113. //
  114. // The contents of dst will be replaced by the body and returned, if the dst
  115. // is too small a new slice will be allocated.
  116. //
  117. // The function follows redirects. Use Do* for manually handling redirects.
  118. func Get(dst []byte, url string) (statusCode int, body []byte, err error) {
  119. return defaultClient.Get(dst, url)
  120. }
  121. // GetTimeout returns the status code and body of url.
  122. //
  123. // The contents of dst will be replaced by the body and returned, if the dst
  124. // is too small a new slice will be allocated.
  125. //
  126. // The function follows redirects. Use Do* for manually handling redirects.
  127. //
  128. // ErrTimeout error is returned if url contents couldn't be fetched
  129. // during the given timeout.
  130. func GetTimeout(dst []byte, url string, timeout time.Duration) (statusCode int, body []byte, err error) {
  131. return defaultClient.GetTimeout(dst, url, timeout)
  132. }
  133. // GetDeadline returns the status code and body of url.
  134. //
  135. // The contents of dst will be replaced by the body and returned, if the dst
  136. // is too small a new slice will be allocated.
  137. //
  138. // The function follows redirects. Use Do* for manually handling redirects.
  139. //
  140. // ErrTimeout error is returned if url contents couldn't be fetched
  141. // until the given deadline.
  142. func GetDeadline(dst []byte, url string, deadline time.Time) (statusCode int, body []byte, err error) {
  143. return defaultClient.GetDeadline(dst, url, deadline)
  144. }
  145. // Post sends POST request to the given url with the given POST arguments.
  146. //
  147. // The contents of dst will be replaced by the body and returned, if the dst
  148. // is too small a new slice will be allocated.
  149. //
  150. // The function follows redirects. Use Do* for manually handling redirects.
  151. //
  152. // Empty POST body is sent if postArgs is nil.
  153. func Post(dst []byte, url string, postArgs *Args) (statusCode int, body []byte, err error) {
  154. return defaultClient.Post(dst, url, postArgs)
  155. }
  156. var defaultClient Client
  157. // Client implements http client.
  158. //
  159. // Copying Client by value is prohibited. Create new instance instead.
  160. //
  161. // It is safe calling Client methods from concurrently running goroutines.
  162. //
  163. // The fields of a Client should not be changed while it is in use.
  164. type Client struct {
  165. noCopy noCopy
  166. readerPool sync.Pool
  167. writerPool sync.Pool
  168. // Callback for establishing new connections to hosts.
  169. //
  170. // Default DialTimeout is used if not set.
  171. DialTimeout DialFuncWithTimeout
  172. // Callback for establishing new connections to hosts.
  173. //
  174. // Note that if Dial is set instead of DialTimeout, Dial will ignore Request timeout.
  175. // If you want the tcp dial process to account for request timeouts, use DialTimeout instead.
  176. //
  177. // If not set, DialTimeout is used.
  178. Dial DialFunc
  179. // TLS config for https connections.
  180. //
  181. // Default TLS config is used if not set.
  182. TLSConfig *tls.Config
  183. // RetryIf controls whether a retry should be attempted after an error.
  184. //
  185. // By default will use isIdempotent function.
  186. //
  187. // Deprecated: Use RetryIfErr instead.
  188. // This field is only effective when the `RetryIfErr` field is not set.
  189. RetryIf RetryIfFunc
  190. // When the client encounters an error during a request, the behavior—whether to retry
  191. // and whether to reset the request timeout—should be determined
  192. // based on the return value of this field.
  193. // This field is only effective within the range of MaxIdemponentCallAttempts.
  194. RetryIfErr RetryIfErrFunc
  195. // ConfigureClient configures the fasthttp.HostClient.
  196. ConfigureClient func(hc *HostClient) error
  197. m map[string]*HostClient
  198. ms map[string]*HostClient
  199. // Client name. Used in User-Agent request header.
  200. //
  201. // Default client name is used if not set.
  202. Name string
  203. // Maximum number of connections per each host which may be established.
  204. //
  205. // DefaultMaxConnsPerHost is used if not set.
  206. MaxConnsPerHost int
  207. // Idle keep-alive connections are closed after this duration.
  208. //
  209. // By default idle connections are closed
  210. // after DefaultMaxIdleConnDuration.
  211. MaxIdleConnDuration time.Duration
  212. // Keep-alive connections are closed after this duration.
  213. //
  214. // By default connection duration is unlimited.
  215. MaxConnDuration time.Duration
  216. // Maximum number of attempts for idempotent calls.
  217. //
  218. // DefaultMaxIdemponentCallAttempts is used if not set.
  219. MaxIdemponentCallAttempts int
  220. // Per-connection buffer size for responses' reading.
  221. // This also limits the maximum header size.
  222. //
  223. // Default buffer size is used if 0.
  224. ReadBufferSize int
  225. // Per-connection buffer size for requests' writing.
  226. //
  227. // Default buffer size is used if 0.
  228. WriteBufferSize int
  229. // Maximum duration for full response reading (including body).
  230. //
  231. // By default response read timeout is unlimited.
  232. ReadTimeout time.Duration
  233. // Maximum duration for full request writing (including body).
  234. //
  235. // By default request write timeout is unlimited.
  236. WriteTimeout time.Duration
  237. // Maximum response body size.
  238. //
  239. // The client returns ErrBodyTooLarge if this limit is greater than 0
  240. // and response body is greater than the limit.
  241. //
  242. // By default response body size is unlimited.
  243. MaxResponseBodySize int
  244. // Maximum duration for waiting for a free connection.
  245. //
  246. // By default will not waiting, return ErrNoFreeConns immediately.
  247. MaxConnWaitTimeout time.Duration
  248. // Connection pool strategy. Can be either LIFO or FIFO (default).
  249. ConnPoolStrategy ConnPoolStrategyType
  250. mLock sync.RWMutex
  251. mOnce sync.Once
  252. // NoDefaultUserAgentHeader when set to true, causes the default
  253. // User-Agent header to be excluded from the Request.
  254. NoDefaultUserAgentHeader bool
  255. // Attempt to connect to both ipv4 and ipv6 addresses if set to true.
  256. //
  257. // This option is used only if default TCP dialer is used,
  258. // i.e. if Dial is blank.
  259. //
  260. // By default client connects only to ipv4 addresses,
  261. // since unfortunately ipv6 remains broken in many networks worldwide :)
  262. DialDualStack bool
  263. // Header names are passed as-is without normalization
  264. // if this option is set.
  265. //
  266. // Disabled header names' normalization may be useful only for proxying
  267. // responses to other clients expecting case-sensitive
  268. // header names. See https://github.com/valyala/fasthttp/issues/57
  269. // for details.
  270. //
  271. // By default request and response header names are normalized, i.e.
  272. // The first letter and the first letters following dashes
  273. // are uppercased, while all the other letters are lowercased.
  274. // Examples:
  275. //
  276. // * HOST -> Host
  277. // * content-type -> Content-Type
  278. // * cONTENT-lenGTH -> Content-Length
  279. DisableHeaderNamesNormalizing bool
  280. // Path values are sent as-is without normalization.
  281. //
  282. // Disabled path normalization may be useful for proxying incoming requests
  283. // to servers that are expecting paths to be forwarded as-is.
  284. //
  285. // By default path values are normalized, i.e.
  286. // extra slashes are removed, special characters are encoded.
  287. DisablePathNormalizing bool
  288. // StreamResponseBody enables response body streaming.
  289. StreamResponseBody bool
  290. }
  291. // Get returns the status code and body of url.
  292. //
  293. // The contents of dst will be replaced by the body and returned, if the dst
  294. // is too small a new slice will be allocated.
  295. //
  296. // The function follows redirects. Use Do* for manually handling redirects.
  297. func (c *Client) Get(dst []byte, url string) (statusCode int, body []byte, err error) {
  298. return clientGetURL(dst, url, c)
  299. }
  300. // GetTimeout returns the status code and body of url.
  301. //
  302. // The contents of dst will be replaced by the body and returned, if the dst
  303. // is too small a new slice will be allocated.
  304. //
  305. // The function follows redirects. Use Do* for manually handling redirects.
  306. //
  307. // ErrTimeout error is returned if url contents couldn't be fetched
  308. // during the given timeout.
  309. func (c *Client) GetTimeout(dst []byte, url string, timeout time.Duration) (statusCode int, body []byte, err error) {
  310. return clientGetURLTimeout(dst, url, timeout, c)
  311. }
  312. // GetDeadline returns the status code and body of url.
  313. //
  314. // The contents of dst will be replaced by the body and returned, if the dst
  315. // is too small a new slice will be allocated.
  316. //
  317. // The function follows redirects. Use Do* for manually handling redirects.
  318. //
  319. // ErrTimeout error is returned if url contents couldn't be fetched
  320. // until the given deadline.
  321. func (c *Client) GetDeadline(dst []byte, url string, deadline time.Time) (statusCode int, body []byte, err error) {
  322. return clientGetURLDeadline(dst, url, deadline, c)
  323. }
  324. // Post sends POST request to the given url with the given POST arguments.
  325. //
  326. // The contents of dst will be replaced by the body and returned, if the dst
  327. // is too small a new slice will be allocated.
  328. //
  329. // The function follows redirects. Use Do* for manually handling redirects.
  330. //
  331. // Empty POST body is sent if postArgs is nil.
  332. func (c *Client) Post(dst []byte, url string, postArgs *Args) (statusCode int, body []byte, err error) {
  333. return clientPostURL(dst, url, postArgs, c)
  334. }
  335. // DoTimeout performs the given request and waits for response during
  336. // the given timeout duration.
  337. //
  338. // Request must contain at least non-zero RequestURI with full url (including
  339. // scheme and host) or non-zero Host header + RequestURI.
  340. //
  341. // Client determines the server to be requested in the following order:
  342. //
  343. // - from RequestURI if it contains full url with scheme and host;
  344. // - from Host header otherwise.
  345. //
  346. // The function doesn't follow redirects. Use Get* for following redirects.
  347. //
  348. // Response is ignored if resp is nil.
  349. //
  350. // ErrTimeout is returned if the response wasn't returned during
  351. // the given timeout.
  352. // Immediately returns ErrTimeout if timeout value is negative.
  353. //
  354. // ErrNoFreeConns is returned if all Client.MaxConnsPerHost connections
  355. // to the requested host are busy.
  356. //
  357. // It is recommended obtaining req and resp via AcquireRequest
  358. // and AcquireResponse in performance-critical code.
  359. func (c *Client) DoTimeout(req *Request, resp *Response, timeout time.Duration) error {
  360. req.timeout = timeout
  361. if req.timeout <= 0 {
  362. return ErrTimeout
  363. }
  364. return c.Do(req, resp)
  365. }
  366. // DoDeadline performs the given request and waits for response until
  367. // the given deadline.
  368. //
  369. // Request must contain at least non-zero RequestURI with full url (including
  370. // scheme and host) or non-zero Host header + RequestURI.
  371. //
  372. // Client determines the server to be requested in the following order:
  373. //
  374. // - from RequestURI if it contains full url with scheme and host;
  375. // - from Host header otherwise.
  376. //
  377. // The function doesn't follow redirects. Use Get* for following redirects.
  378. //
  379. // Response is ignored if resp is nil.
  380. //
  381. // ErrTimeout is returned if the response wasn't returned until
  382. // the given deadline.
  383. // Immediately returns ErrTimeout if the deadline has already been reached.
  384. //
  385. // ErrNoFreeConns is returned if all Client.MaxConnsPerHost connections
  386. // to the requested host are busy.
  387. //
  388. // It is recommended obtaining req and resp via AcquireRequest
  389. // and AcquireResponse in performance-critical code.
  390. func (c *Client) DoDeadline(req *Request, resp *Response, deadline time.Time) error {
  391. req.timeout = time.Until(deadline)
  392. if req.timeout <= 0 {
  393. return ErrTimeout
  394. }
  395. return c.Do(req, resp)
  396. }
  397. // DoRedirects performs the given http request and fills the given http response,
  398. // following up to maxRedirectsCount redirects. When the redirect count exceeds
  399. // maxRedirectsCount, ErrTooManyRedirects is returned.
  400. //
  401. // Request must contain at least non-zero RequestURI with full url (including
  402. // scheme and host) or non-zero Host header + RequestURI.
  403. //
  404. // Client determines the server to be requested in the following order:
  405. //
  406. // - from RequestURI if it contains full url with scheme and host;
  407. // - from Host header otherwise.
  408. //
  409. // Response is ignored if resp is nil.
  410. //
  411. // ErrNoFreeConns is returned if all DefaultMaxConnsPerHost connections
  412. // to the requested host are busy.
  413. //
  414. // It is recommended obtaining req and resp via AcquireRequest
  415. // and AcquireResponse in performance-critical code.
  416. func (c *Client) DoRedirects(req *Request, resp *Response, maxRedirectsCount int) error {
  417. _, _, err := doRequestFollowRedirects(req, resp, req.URI().String(), maxRedirectsCount, c)
  418. return err
  419. }
  420. // Do performs the given http request and fills the given http response.
  421. //
  422. // Request must contain at least non-zero RequestURI with full url (including
  423. // scheme and host) or non-zero Host header + RequestURI.
  424. //
  425. // Client determines the server to be requested in the following order:
  426. //
  427. // - from RequestURI if it contains full url with scheme and host;
  428. // - from Host header otherwise.
  429. //
  430. // Response is ignored if resp is nil.
  431. //
  432. // The function doesn't follow redirects. Use Get* for following redirects.
  433. //
  434. // ErrNoFreeConns is returned if all Client.MaxConnsPerHost connections
  435. // to the requested host are busy.
  436. //
  437. // It is recommended obtaining req and resp via AcquireRequest
  438. // and AcquireResponse in performance-critical code.
  439. func (c *Client) Do(req *Request, resp *Response) error {
  440. uri := req.URI()
  441. if uri == nil {
  442. return ErrorInvalidURI
  443. }
  444. host := uri.Host()
  445. if bytes.ContainsRune(host, ',') {
  446. return fmt.Errorf("invalid host %q. Use HostClient for multiple hosts", host)
  447. }
  448. isTLS := false
  449. if uri.isHTTPS() {
  450. isTLS = true
  451. } else if !uri.isHTTP() {
  452. return fmt.Errorf("unsupported protocol %q. http and https are supported", uri.Scheme())
  453. }
  454. c.mOnce.Do(func() {
  455. c.mLock.Lock()
  456. c.m = make(map[string]*HostClient)
  457. c.ms = make(map[string]*HostClient)
  458. c.mLock.Unlock()
  459. })
  460. startCleaner := false
  461. c.mLock.RLock()
  462. m := c.m
  463. if isTLS {
  464. m = c.ms
  465. }
  466. hc := m[string(host)]
  467. if hc != nil {
  468. atomic.AddInt32(&hc.pendingClientRequests, 1)
  469. defer atomic.AddInt32(&hc.pendingClientRequests, -1)
  470. }
  471. c.mLock.RUnlock()
  472. if hc == nil {
  473. c.mLock.Lock()
  474. hc = m[string(host)]
  475. if hc == nil {
  476. hc = &HostClient{
  477. Addr: AddMissingPort(string(host), isTLS),
  478. Name: c.Name,
  479. NoDefaultUserAgentHeader: c.NoDefaultUserAgentHeader,
  480. Dial: c.Dial,
  481. DialTimeout: c.DialTimeout,
  482. DialDualStack: c.DialDualStack,
  483. IsTLS: isTLS,
  484. TLSConfig: c.TLSConfig,
  485. MaxConns: c.MaxConnsPerHost,
  486. MaxIdleConnDuration: c.MaxIdleConnDuration,
  487. MaxConnDuration: c.MaxConnDuration,
  488. MaxIdemponentCallAttempts: c.MaxIdemponentCallAttempts,
  489. ReadBufferSize: c.ReadBufferSize,
  490. WriteBufferSize: c.WriteBufferSize,
  491. ReadTimeout: c.ReadTimeout,
  492. WriteTimeout: c.WriteTimeout,
  493. MaxResponseBodySize: c.MaxResponseBodySize,
  494. DisableHeaderNamesNormalizing: c.DisableHeaderNamesNormalizing,
  495. DisablePathNormalizing: c.DisablePathNormalizing,
  496. MaxConnWaitTimeout: c.MaxConnWaitTimeout,
  497. RetryIf: c.RetryIf,
  498. RetryIfErr: c.RetryIfErr,
  499. ConnPoolStrategy: c.ConnPoolStrategy,
  500. StreamResponseBody: c.StreamResponseBody,
  501. clientReaderPool: &c.readerPool,
  502. clientWriterPool: &c.writerPool,
  503. }
  504. if c.ConfigureClient != nil {
  505. if err := c.ConfigureClient(hc); err != nil {
  506. c.mLock.Unlock()
  507. return err
  508. }
  509. }
  510. m[string(host)] = hc
  511. if len(m) == 1 {
  512. startCleaner = true
  513. }
  514. }
  515. atomic.AddInt32(&hc.pendingClientRequests, 1)
  516. defer atomic.AddInt32(&hc.pendingClientRequests, -1)
  517. c.mLock.Unlock()
  518. }
  519. if startCleaner {
  520. go c.mCleaner(m)
  521. }
  522. return hc.Do(req, resp)
  523. }
  524. // CloseIdleConnections closes any connections which were previously
  525. // connected from previous requests but are now sitting idle in a
  526. // "keep-alive" state. It does not interrupt any connections currently
  527. // in use.
  528. func (c *Client) CloseIdleConnections() {
  529. c.mLock.RLock()
  530. for _, v := range c.m {
  531. v.CloseIdleConnections()
  532. }
  533. for _, v := range c.ms {
  534. v.CloseIdleConnections()
  535. }
  536. c.mLock.RUnlock()
  537. }
  538. func (c *Client) mCleaner(m map[string]*HostClient) {
  539. mustStop := false
  540. sleep := c.MaxIdleConnDuration
  541. if sleep < time.Second {
  542. sleep = time.Second
  543. } else if sleep > 10*time.Second {
  544. sleep = 10 * time.Second
  545. }
  546. for {
  547. time.Sleep(sleep)
  548. c.mLock.Lock()
  549. for k, v := range m {
  550. v.connsLock.Lock()
  551. if v.connsCount == 0 && atomic.LoadInt32(&v.pendingClientRequests) == 0 {
  552. delete(m, k)
  553. }
  554. v.connsLock.Unlock()
  555. }
  556. if len(m) == 0 {
  557. mustStop = true
  558. }
  559. c.mLock.Unlock()
  560. if mustStop {
  561. break
  562. }
  563. }
  564. }
  565. // DefaultMaxConnsPerHost is the maximum number of concurrent connections
  566. // http client may establish per host by default (i.e. if
  567. // Client.MaxConnsPerHost isn't set).
  568. const DefaultMaxConnsPerHost = 512
  569. // DefaultMaxIdleConnDuration is the default duration before idle keep-alive
  570. // connection is closed.
  571. const DefaultMaxIdleConnDuration = 10 * time.Second
  572. // DefaultMaxIdemponentCallAttempts is the default idempotent calls attempts count.
  573. const DefaultMaxIdemponentCallAttempts = 5
  574. // DialFunc must establish connection to addr.
  575. //
  576. // There is no need in establishing TLS (SSL) connection for https.
  577. // The client automatically converts connection to TLS
  578. // if HostClient.IsTLS is set.
  579. //
  580. // TCP address passed to DialFunc always contains host and port.
  581. // Example TCP addr values:
  582. //
  583. // - foobar.com:80
  584. // - foobar.com:443
  585. // - foobar.com:8080
  586. type DialFunc func(addr string) (net.Conn, error)
  587. // DialFuncWithTimeout must establish connection to addr.
  588. // Unlike DialFunc, it also accepts a timeout.
  589. //
  590. // There is no need in establishing TLS (SSL) connection for https.
  591. // The client automatically converts connection to TLS
  592. // if HostClient.IsTLS is set.
  593. //
  594. // TCP address passed to DialFuncWithTimeout always contains host and port.
  595. // Example TCP addr values:
  596. //
  597. // - foobar.com:80
  598. // - foobar.com:443
  599. // - foobar.com:8080
  600. type DialFuncWithTimeout func(addr string, timeout time.Duration) (net.Conn, error)
  601. // RetryIfFunc defines the signature of the retry if function.
  602. // Request argument passed to RetryIfFunc, if there are any request errors.
  603. type RetryIfFunc func(request *Request) bool
  604. // RetryIfErrFunc defines an interface used for implementing the following functionality:
  605. // When the client encounters an error during a request, the behavior—whether to retry
  606. // and whether to reset the request timeout—should be determined
  607. // based on the return value of this interface.
  608. //
  609. // attempt indicates which attempt the current retry is due to a failure of.
  610. // The first request counts as the first attempt.
  611. //
  612. // err represents the error encountered while attempting the `attempts`-th request.
  613. //
  614. // resetTimeout indicates whether to reuse the `Request`'s timeout as the timeout interval,
  615. // rather than using the timeout after subtracting the time spent on previous failed requests.
  616. // This return value is meaningful only when you use `Request.SetTimeout`, `DoTimeout`, or `DoDeadline`.
  617. //
  618. // retry indicates whether to retry the current request. If it is false,
  619. // the request function will immediately return with the `err`.
  620. type RetryIfErrFunc func(request *Request, attempts int, err error) (resetTimeout bool, retry bool)
  621. // RoundTripper wraps every request/response.
  622. type RoundTripper interface {
  623. RoundTrip(hc *HostClient, req *Request, resp *Response) (retry bool, err error)
  624. }
  625. // ConnPoolStrategyType define strategy of connection pool enqueue/dequeue.
  626. type ConnPoolStrategyType int
  627. const (
  628. FIFO ConnPoolStrategyType = iota
  629. LIFO
  630. )
  631. // HostClient balances http requests among hosts listed in Addr.
  632. //
  633. // HostClient may be used for balancing load among multiple upstream hosts.
  634. // While multiple addresses passed to HostClient.Addr may be used for balancing
  635. // load among them, it would be better using LBClient instead, since HostClient
  636. // may unevenly balance load among upstream hosts.
  637. //
  638. // It is forbidden copying HostClient instances. Create new instances instead.
  639. //
  640. // It is safe calling HostClient methods from concurrently running goroutines.
  641. type HostClient struct {
  642. noCopy noCopy
  643. readerPool sync.Pool
  644. writerPool sync.Pool
  645. // Transport defines a transport-like mechanism that wraps every request/response.
  646. Transport RoundTripper
  647. // Callback for establishing new connections to hosts.
  648. //
  649. // Default DialTimeout is used if not set.
  650. DialTimeout DialFuncWithTimeout
  651. // Callback for establishing new connections to hosts.
  652. //
  653. // Note that if Dial is set instead of DialTimeout, Dial will ignore Request timeout.
  654. // If you want the tcp dial process to account for request timeouts, use DialTimeout instead.
  655. //
  656. // If not set, DialTimeout is used.
  657. Dial DialFunc
  658. // Optional TLS config.
  659. TLSConfig *tls.Config
  660. // RetryIf controls whether a retry should be attempted after an error.
  661. // By default, it uses the isIdempotent function.
  662. //
  663. // Deprecated: Use RetryIfErr instead.
  664. // This field is only effective when the `RetryIfErr` field is not set.
  665. RetryIf RetryIfFunc
  666. // When the client encounters an error during a request, the behavior—whether to retry
  667. // and whether to reset the request timeout—should be determined
  668. // based on the return value of this field.
  669. // This field is only effective within the range of MaxIdemponentCallAttempts.
  670. RetryIfErr RetryIfErrFunc
  671. connsWait *wantConnQueue
  672. tlsConfigMap map[string]*tls.Config
  673. clientReaderPool *sync.Pool
  674. clientWriterPool *sync.Pool
  675. // Comma-separated list of upstream HTTP server host addresses,
  676. // which are passed to Dial or DialTimeout in a round-robin manner.
  677. //
  678. // Each address may contain port if default dialer is used.
  679. // For example,
  680. //
  681. // - foobar.com:80
  682. // - foobar.com:443
  683. // - foobar.com:8080
  684. Addr string
  685. // Client name. Used in User-Agent request header.
  686. Name string
  687. conns []*clientConn
  688. addrs []string
  689. // Maximum number of connections which may be established to all hosts
  690. // listed in Addr.
  691. //
  692. // You can change this value while the HostClient is being used
  693. // with HostClient.SetMaxConns(value)
  694. //
  695. // DefaultMaxConnsPerHost is used if not set.
  696. MaxConns int
  697. // Keep-alive connections are closed after this duration.
  698. //
  699. // By default connection duration is unlimited.
  700. MaxConnDuration time.Duration
  701. // Idle keep-alive connections are closed after this duration.
  702. //
  703. // By default idle connections are closed
  704. // after DefaultMaxIdleConnDuration.
  705. MaxIdleConnDuration time.Duration
  706. // Maximum number of attempts for idempotent calls.
  707. //
  708. // A value of 0 or a negative value represents using DefaultMaxIdemponentCallAttempts.
  709. // For example, a value of 1 means the request will be executed only once,
  710. // while 2 means the request will be executed at most twice.
  711. // The RetryIfErr and RetryIf fields can invalidate remaining attempts.
  712. MaxIdemponentCallAttempts int
  713. // Per-connection buffer size for responses' reading.
  714. // This also limits the maximum header size.
  715. //
  716. // Default buffer size is used if 0.
  717. ReadBufferSize int
  718. // Per-connection buffer size for requests' writing.
  719. //
  720. // Default buffer size is used if 0.
  721. WriteBufferSize int
  722. // Maximum duration for full response reading (including body).
  723. //
  724. // By default response read timeout is unlimited.
  725. ReadTimeout time.Duration
  726. // Maximum duration for full request writing (including body).
  727. //
  728. // By default request write timeout is unlimited.
  729. WriteTimeout time.Duration
  730. // Maximum response body size.
  731. //
  732. // The client returns ErrBodyTooLarge if this limit is greater than 0
  733. // and response body is greater than the limit.
  734. //
  735. // By default response body size is unlimited.
  736. MaxResponseBodySize int
  737. // Maximum duration for waiting for a free connection.
  738. //
  739. // By default will not waiting, return ErrNoFreeConns immediately
  740. MaxConnWaitTimeout time.Duration
  741. // Connection pool strategy. Can be either LIFO or FIFO (default).
  742. ConnPoolStrategy ConnPoolStrategyType
  743. connsCount int
  744. connsLock sync.Mutex
  745. addrsLock sync.Mutex
  746. tlsConfigMapLock sync.Mutex
  747. addrIdx uint32
  748. lastUseTime uint32
  749. pendingRequests int32
  750. // pendingClientRequests counts the number of requests that a Client is currently running using this HostClient.
  751. // It will be incremented earlier than pendingRequests and will be used by Client to see if the HostClient is still in use.
  752. pendingClientRequests int32
  753. // NoDefaultUserAgentHeader when set to true, causes the default
  754. // User-Agent header to be excluded from the Request.
  755. NoDefaultUserAgentHeader bool
  756. // Attempt to connect to both ipv4 and ipv6 host addresses
  757. // if set to true.
  758. //
  759. // This option is used only if default TCP dialer is used,
  760. // i.e. if Dial and DialTimeout are blank.
  761. //
  762. // By default client connects only to ipv4 addresses,
  763. // since unfortunately ipv6 remains broken in many networks worldwide :)
  764. DialDualStack bool
  765. // Whether to use TLS (aka SSL or HTTPS) for host connections.
  766. IsTLS bool
  767. // Header names are passed as-is without normalization
  768. // if this option is set.
  769. //
  770. // Disabled header names' normalization may be useful only for proxying
  771. // responses to other clients expecting case-sensitive
  772. // header names. See https://github.com/valyala/fasthttp/issues/57
  773. // for details.
  774. //
  775. // By default request and response header names are normalized, i.e.
  776. // The first letter and the first letters following dashes
  777. // are uppercased, while all the other letters are lowercased.
  778. // Examples:
  779. //
  780. // * HOST -> Host
  781. // * content-type -> Content-Type
  782. // * cONTENT-lenGTH -> Content-Length
  783. DisableHeaderNamesNormalizing bool
  784. // Path values are sent as-is without normalization.
  785. //
  786. // Disabled path normalization may be useful for proxying incoming requests
  787. // to servers that are expecting paths to be forwarded as-is.
  788. //
  789. // By default path values are normalized, i.e.
  790. // extra slashes are removed, special characters are encoded.
  791. DisablePathNormalizing bool
  792. // Will not log potentially sensitive content in error logs.
  793. //
  794. // This option is useful for servers that handle sensitive data
  795. // in the request/response.
  796. //
  797. // Client logs full errors by default.
  798. SecureErrorLogMessage bool
  799. // StreamResponseBody enables response body streaming.
  800. StreamResponseBody bool
  801. connsCleanerRun bool
  802. }
  803. type clientConn struct {
  804. c net.Conn
  805. createdTime time.Time
  806. lastUseTime time.Time
  807. }
  808. var startTimeUnix = time.Now().Unix()
  809. // LastUseTime returns time the client was last used.
  810. func (c *HostClient) LastUseTime() time.Time {
  811. n := atomic.LoadUint32(&c.lastUseTime)
  812. return time.Unix(startTimeUnix+int64(n), 0)
  813. }
  814. // Get returns the status code and body of url.
  815. //
  816. // The contents of dst will be replaced by the body and returned, if the dst
  817. // is too small a new slice will be allocated.
  818. //
  819. // The function follows redirects. Use Do* for manually handling redirects.
  820. func (c *HostClient) Get(dst []byte, url string) (statusCode int, body []byte, err error) {
  821. return clientGetURL(dst, url, c)
  822. }
  823. // GetTimeout returns the status code and body of url.
  824. //
  825. // The contents of dst will be replaced by the body and returned, if the dst
  826. // is too small a new slice will be allocated.
  827. //
  828. // The function follows redirects. Use Do* for manually handling redirects.
  829. //
  830. // ErrTimeout error is returned if url contents couldn't be fetched
  831. // during the given timeout.
  832. func (c *HostClient) GetTimeout(dst []byte, url string, timeout time.Duration) (statusCode int, body []byte, err error) {
  833. return clientGetURLTimeout(dst, url, timeout, c)
  834. }
  835. // GetDeadline returns the status code and body of url.
  836. //
  837. // The contents of dst will be replaced by the body and returned, if the dst
  838. // is too small a new slice will be allocated.
  839. //
  840. // The function follows redirects. Use Do* for manually handling redirects.
  841. //
  842. // ErrTimeout error is returned if url contents couldn't be fetched
  843. // until the given deadline.
  844. func (c *HostClient) GetDeadline(dst []byte, url string, deadline time.Time) (statusCode int, body []byte, err error) {
  845. return clientGetURLDeadline(dst, url, deadline, c)
  846. }
  847. // Post sends POST request to the given url with the given POST arguments.
  848. //
  849. // The contents of dst will be replaced by the body and returned, if the dst
  850. // is too small a new slice will be allocated.
  851. //
  852. // The function follows redirects. Use Do* for manually handling redirects.
  853. //
  854. // Empty POST body is sent if postArgs is nil.
  855. func (c *HostClient) Post(dst []byte, url string, postArgs *Args) (statusCode int, body []byte, err error) {
  856. return clientPostURL(dst, url, postArgs, c)
  857. }
  858. type clientDoer interface {
  859. Do(req *Request, resp *Response) error
  860. }
  861. func clientGetURL(dst []byte, url string, c clientDoer) (statusCode int, body []byte, err error) {
  862. req := AcquireRequest()
  863. statusCode, body, err = doRequestFollowRedirectsBuffer(req, dst, url, c)
  864. ReleaseRequest(req)
  865. return statusCode, body, err
  866. }
  867. func clientGetURLTimeout(dst []byte, url string, timeout time.Duration, c clientDoer) (statusCode int, body []byte, err error) {
  868. deadline := time.Now().Add(timeout)
  869. return clientGetURLDeadline(dst, url, deadline, c)
  870. }
  871. type clientURLResponse struct {
  872. err error
  873. body []byte
  874. statusCode int
  875. }
  876. func clientGetURLDeadline(dst []byte, url string, deadline time.Time, c clientDoer) (statusCode int, body []byte, err error) {
  877. timeout := time.Until(deadline)
  878. if timeout <= 0 {
  879. return 0, dst, ErrTimeout
  880. }
  881. var ch chan clientURLResponse
  882. chv := clientURLResponseChPool.Get()
  883. if chv == nil {
  884. chv = make(chan clientURLResponse, 1)
  885. }
  886. ch = chv.(chan clientURLResponse)
  887. // Note that the request continues execution on ErrTimeout until
  888. // client-specific ReadTimeout exceeds. This helps limiting load
  889. // on slow hosts by MaxConns* concurrent requests.
  890. //
  891. // Without this 'hack' the load on slow host could exceed MaxConns*
  892. // concurrent requests, since timed out requests on client side
  893. // usually continue execution on the host.
  894. var mu sync.Mutex
  895. var timedout, responded bool
  896. go func() {
  897. req := AcquireRequest()
  898. statusCodeCopy, bodyCopy, errCopy := doRequestFollowRedirectsBuffer(req, dst, url, c)
  899. mu.Lock()
  900. if !timedout {
  901. ch <- clientURLResponse{
  902. statusCode: statusCodeCopy,
  903. body: bodyCopy,
  904. err: errCopy,
  905. }
  906. responded = true
  907. }
  908. mu.Unlock()
  909. ReleaseRequest(req)
  910. }()
  911. tc := AcquireTimer(timeout)
  912. select {
  913. case resp := <-ch:
  914. statusCode = resp.statusCode
  915. body = resp.body
  916. err = resp.err
  917. case <-tc.C:
  918. mu.Lock()
  919. if responded {
  920. resp := <-ch
  921. statusCode = resp.statusCode
  922. body = resp.body
  923. err = resp.err
  924. } else {
  925. timedout = true
  926. err = ErrTimeout
  927. body = dst
  928. }
  929. mu.Unlock()
  930. }
  931. ReleaseTimer(tc)
  932. clientURLResponseChPool.Put(chv)
  933. return statusCode, body, err
  934. }
  935. var clientURLResponseChPool sync.Pool
  936. func clientPostURL(dst []byte, url string, postArgs *Args, c clientDoer) (statusCode int, body []byte, err error) {
  937. req := AcquireRequest()
  938. defer ReleaseRequest(req)
  939. req.Header.SetMethod(MethodPost)
  940. req.Header.SetContentTypeBytes(strPostArgsContentType)
  941. if postArgs != nil {
  942. if _, err := postArgs.WriteTo(req.BodyWriter()); err != nil {
  943. return 0, nil, err
  944. }
  945. }
  946. statusCode, body, err = doRequestFollowRedirectsBuffer(req, dst, url, c)
  947. return statusCode, body, err
  948. }
  949. var (
  950. // ErrMissingLocation is returned by clients when the Location header is missing on
  951. // an HTTP response with a redirect status code.
  952. ErrMissingLocation = errors.New("missing Location header for http redirect")
  953. // ErrTooManyRedirects is returned by clients when the number of redirects followed
  954. // exceed the max count.
  955. ErrTooManyRedirects = errors.New("too many redirects detected when doing the request")
  956. // HostClients are only able to follow redirects to the same protocol.
  957. ErrHostClientRedirectToDifferentScheme = errors.New("HostClient can't follow redirects to a different protocol," +
  958. " please use Client instead")
  959. )
  960. const defaultMaxRedirectsCount = 16
  961. func doRequestFollowRedirectsBuffer(req *Request, dst []byte, url string, c clientDoer) (statusCode int, body []byte, err error) {
  962. resp := AcquireResponse()
  963. bodyBuf := resp.bodyBuffer()
  964. resp.keepBodyBuffer = true
  965. oldBody := bodyBuf.B
  966. bodyBuf.B = dst
  967. statusCode, _, err = doRequestFollowRedirects(req, resp, url, defaultMaxRedirectsCount, c)
  968. body = bodyBuf.B
  969. bodyBuf.B = oldBody
  970. resp.keepBodyBuffer = false
  971. ReleaseResponse(resp)
  972. return statusCode, body, err
  973. }
  974. func doRequestFollowRedirects(
  975. req *Request, resp *Response, url string, maxRedirectsCount int, c clientDoer,
  976. ) (statusCode int, body []byte, err error) {
  977. redirectsCount := 0
  978. for {
  979. req.SetRequestURI(url)
  980. if err := req.parseURI(); err != nil {
  981. return 0, nil, err
  982. }
  983. if err = c.Do(req, resp); err != nil {
  984. break
  985. }
  986. statusCode = resp.Header.StatusCode()
  987. if !StatusCodeIsRedirect(statusCode) {
  988. break
  989. }
  990. redirectsCount++
  991. if redirectsCount > maxRedirectsCount {
  992. err = ErrTooManyRedirects
  993. break
  994. }
  995. location := resp.Header.peek(strLocation)
  996. if len(location) == 0 {
  997. err = ErrMissingLocation
  998. break
  999. }
  1000. url = getRedirectURL(url, location, req.DisableRedirectPathNormalizing)
  1001. }
  1002. return statusCode, body, err
  1003. }
  1004. func getRedirectURL(baseURL string, location []byte, disablePathNormalizing bool) string {
  1005. u := AcquireURI()
  1006. u.Update(baseURL)
  1007. u.UpdateBytes(location)
  1008. u.DisablePathNormalizing = disablePathNormalizing
  1009. redirectURL := u.String()
  1010. ReleaseURI(u)
  1011. return redirectURL
  1012. }
  1013. // StatusCodeIsRedirect returns true if the status code indicates a redirect.
  1014. func StatusCodeIsRedirect(statusCode int) bool {
  1015. return statusCode == StatusMovedPermanently ||
  1016. statusCode == StatusFound ||
  1017. statusCode == StatusSeeOther ||
  1018. statusCode == StatusTemporaryRedirect ||
  1019. statusCode == StatusPermanentRedirect
  1020. }
  1021. var (
  1022. requestPool sync.Pool
  1023. responsePool sync.Pool
  1024. )
  1025. // AcquireRequest returns an empty Request instance from request pool.
  1026. //
  1027. // The returned Request instance may be passed to ReleaseRequest when it is
  1028. // no longer needed. This allows Request recycling, reduces GC pressure
  1029. // and usually improves performance.
  1030. func AcquireRequest() *Request {
  1031. v := requestPool.Get()
  1032. if v == nil {
  1033. return &Request{}
  1034. }
  1035. return v.(*Request)
  1036. }
  1037. // ReleaseRequest returns req acquired via AcquireRequest to request pool.
  1038. //
  1039. // It is forbidden accessing req and/or its' members after returning
  1040. // it to request pool.
  1041. func ReleaseRequest(req *Request) {
  1042. req.Reset()
  1043. requestPool.Put(req)
  1044. }
  1045. // AcquireResponse returns an empty Response instance from response pool.
  1046. //
  1047. // The returned Response instance may be passed to ReleaseResponse when it is
  1048. // no longer needed. This allows Response recycling, reduces GC pressure
  1049. // and usually improves performance.
  1050. func AcquireResponse() *Response {
  1051. v := responsePool.Get()
  1052. if v == nil {
  1053. return &Response{}
  1054. }
  1055. return v.(*Response)
  1056. }
  1057. // ReleaseResponse return resp acquired via AcquireResponse to response pool.
  1058. //
  1059. // It is forbidden accessing resp and/or its' members after returning
  1060. // it to response pool.
  1061. func ReleaseResponse(resp *Response) {
  1062. resp.Reset()
  1063. responsePool.Put(resp)
  1064. }
  1065. // DoTimeout performs the given request and waits for response during
  1066. // the given timeout duration.
  1067. //
  1068. // Request must contain at least non-zero RequestURI with full url (including
  1069. // scheme and host) or non-zero Host header + RequestURI.
  1070. //
  1071. // The function doesn't follow redirects. Use Get* for following redirects.
  1072. //
  1073. // Response is ignored if resp is nil.
  1074. //
  1075. // ErrTimeout is returned if the response wasn't returned during
  1076. // the given timeout.
  1077. // Immediately returns ErrTimeout if timeout value is negative.
  1078. //
  1079. // ErrNoFreeConns is returned if all HostClient.MaxConns connections
  1080. // to the host are busy.
  1081. //
  1082. // It is recommended obtaining req and resp via AcquireRequest
  1083. // and AcquireResponse in performance-critical code.
  1084. func (c *HostClient) DoTimeout(req *Request, resp *Response, timeout time.Duration) error {
  1085. req.timeout = timeout
  1086. if req.timeout <= 0 {
  1087. return ErrTimeout
  1088. }
  1089. return c.Do(req, resp)
  1090. }
  1091. // DoDeadline performs the given request and waits for response until
  1092. // the given deadline.
  1093. //
  1094. // Request must contain at least non-zero RequestURI with full url (including
  1095. // scheme and host) or non-zero Host header + RequestURI.
  1096. //
  1097. // The function doesn't follow redirects. Use Get* for following redirects.
  1098. //
  1099. // Response is ignored if resp is nil.
  1100. //
  1101. // ErrTimeout is returned if the response wasn't returned until
  1102. // the given deadline.
  1103. // Immediately returns ErrTimeout if the deadline has already been reached.
  1104. //
  1105. // ErrNoFreeConns is returned if all HostClient.MaxConns connections
  1106. // to the host are busy.
  1107. //
  1108. // It is recommended obtaining req and resp via AcquireRequest
  1109. // and AcquireResponse in performance-critical code.
  1110. func (c *HostClient) DoDeadline(req *Request, resp *Response, deadline time.Time) error {
  1111. req.timeout = time.Until(deadline)
  1112. if req.timeout <= 0 {
  1113. return ErrTimeout
  1114. }
  1115. return c.Do(req, resp)
  1116. }
  1117. // DoRedirects performs the given http request and fills the given http response,
  1118. // following up to maxRedirectsCount redirects. When the redirect count exceeds
  1119. // maxRedirectsCount, ErrTooManyRedirects is returned.
  1120. //
  1121. // Request must contain at least non-zero RequestURI with full url (including
  1122. // scheme and host) or non-zero Host header + RequestURI.
  1123. //
  1124. // Client determines the server to be requested in the following order:
  1125. //
  1126. // - from RequestURI if it contains full url with scheme and host;
  1127. // - from Host header otherwise.
  1128. //
  1129. // Response is ignored if resp is nil.
  1130. //
  1131. // ErrNoFreeConns is returned if all DefaultMaxConnsPerHost connections
  1132. // to the requested host are busy.
  1133. //
  1134. // It is recommended obtaining req and resp via AcquireRequest
  1135. // and AcquireResponse in performance-critical code.
  1136. func (c *HostClient) DoRedirects(req *Request, resp *Response, maxRedirectsCount int) error {
  1137. _, _, err := doRequestFollowRedirects(req, resp, req.URI().String(), maxRedirectsCount, c)
  1138. return err
  1139. }
  1140. // Do performs the given http request and sets the corresponding response.
  1141. //
  1142. // Request must contain at least non-zero RequestURI with full url (including
  1143. // scheme and host) or non-zero Host header + RequestURI.
  1144. //
  1145. // The function doesn't follow redirects. Use Get* for following redirects.
  1146. //
  1147. // Response is ignored if resp is nil.
  1148. //
  1149. // ErrNoFreeConns is returned if all HostClient.MaxConns connections
  1150. // to the host are busy.
  1151. //
  1152. // It is recommended obtaining req and resp via AcquireRequest
  1153. // and AcquireResponse in performance-critical code.
  1154. func (c *HostClient) Do(req *Request, resp *Response) error {
  1155. var (
  1156. err error
  1157. retry bool
  1158. resetTimeout bool
  1159. )
  1160. maxAttempts := c.MaxIdemponentCallAttempts
  1161. if maxAttempts <= 0 {
  1162. maxAttempts = DefaultMaxIdemponentCallAttempts
  1163. }
  1164. attempts := 0
  1165. hasBodyStream := req.IsBodyStream()
  1166. // If a request has a timeout we store the timeout
  1167. // and calculate a deadline so we can keep updating the
  1168. // timeout on each retry.
  1169. deadline := time.Time{}
  1170. timeout := req.timeout
  1171. if timeout > 0 {
  1172. deadline = time.Now().Add(timeout)
  1173. }
  1174. retryFunc := c.RetryIf
  1175. if retryFunc == nil {
  1176. retryFunc = isIdempotent
  1177. }
  1178. atomic.AddInt32(&c.pendingRequests, 1)
  1179. for {
  1180. // If the original timeout was set, we need to update
  1181. // the one set on the request to reflect the remaining time.
  1182. if timeout > 0 {
  1183. req.timeout = time.Until(deadline)
  1184. if req.timeout <= 0 {
  1185. err = ErrTimeout
  1186. break
  1187. }
  1188. }
  1189. retry, err = c.do(req, resp)
  1190. if err == nil || !retry {
  1191. break
  1192. }
  1193. if hasBodyStream {
  1194. break
  1195. }
  1196. // Path prioritization based on ease of computation
  1197. attempts++
  1198. if attempts >= maxAttempts {
  1199. break
  1200. }
  1201. if c.RetryIfErr != nil {
  1202. resetTimeout, retry = c.RetryIfErr(req, attempts, err)
  1203. } else {
  1204. retry = retryFunc(req)
  1205. }
  1206. if !retry {
  1207. break
  1208. }
  1209. if timeout > 0 && resetTimeout {
  1210. deadline = time.Now().Add(timeout)
  1211. }
  1212. }
  1213. atomic.AddInt32(&c.pendingRequests, -1)
  1214. // Restore the original timeout.
  1215. req.timeout = timeout
  1216. if err == io.EOF {
  1217. err = ErrConnectionClosed
  1218. }
  1219. return err
  1220. }
  1221. // PendingRequests returns the current number of requests the client
  1222. // is executing.
  1223. //
  1224. // This function may be used for balancing load among multiple HostClient
  1225. // instances.
  1226. func (c *HostClient) PendingRequests() int {
  1227. return int(atomic.LoadInt32(&c.pendingRequests))
  1228. }
  1229. func isIdempotent(req *Request) bool {
  1230. return req.Header.IsGet() || req.Header.IsHead() || req.Header.IsPut()
  1231. }
  1232. func (c *HostClient) do(req *Request, resp *Response) (bool, error) {
  1233. if resp == nil {
  1234. resp = AcquireResponse()
  1235. defer ReleaseResponse(resp)
  1236. }
  1237. return c.doNonNilReqResp(req, resp)
  1238. }
  1239. func (c *HostClient) doNonNilReqResp(req *Request, resp *Response) (bool, error) {
  1240. if req == nil {
  1241. // for debugging purposes
  1242. panic("BUG: req cannot be nil")
  1243. }
  1244. if resp == nil {
  1245. // for debugging purposes
  1246. panic("BUG: resp cannot be nil")
  1247. }
  1248. // Secure header error logs configuration
  1249. resp.secureErrorLogMessage = c.SecureErrorLogMessage
  1250. resp.Header.secureErrorLogMessage = c.SecureErrorLogMessage
  1251. req.secureErrorLogMessage = c.SecureErrorLogMessage
  1252. req.Header.secureErrorLogMessage = c.SecureErrorLogMessage
  1253. if c.IsTLS != req.URI().isHTTPS() {
  1254. return false, ErrHostClientRedirectToDifferentScheme
  1255. }
  1256. atomic.StoreUint32(&c.lastUseTime, uint32(time.Now().Unix()-startTimeUnix)) // #nosec G115
  1257. // Free up resources occupied by response before sending the request,
  1258. // so the GC may reclaim these resources (e.g. response body).
  1259. // backing up SkipBody in case it was set explicitly
  1260. customSkipBody := resp.SkipBody
  1261. customStreamBody := resp.StreamBody || c.StreamResponseBody
  1262. resp.Reset()
  1263. resp.SkipBody = customSkipBody
  1264. resp.StreamBody = customStreamBody
  1265. req.URI().DisablePathNormalizing = c.DisablePathNormalizing
  1266. userAgentOld := req.Header.UserAgent()
  1267. if len(userAgentOld) == 0 {
  1268. userAgent := c.Name
  1269. if userAgent == "" && !c.NoDefaultUserAgentHeader {
  1270. userAgent = defaultUserAgent
  1271. }
  1272. if userAgent != "" {
  1273. req.Header.userAgent = append(req.Header.userAgent[:0], userAgent...)
  1274. }
  1275. }
  1276. return c.transport().RoundTrip(c, req, resp)
  1277. }
  1278. func (c *HostClient) transport() RoundTripper {
  1279. if c.Transport == nil {
  1280. return DefaultTransport
  1281. }
  1282. return c.Transport
  1283. }
  1284. var (
  1285. // ErrNoFreeConns is returned when no free connections available
  1286. // to the given host.
  1287. //
  1288. // Increase the allowed number of connections per host if you
  1289. // see this error.
  1290. ErrNoFreeConns = errors.New("no free connections available to host")
  1291. // ErrConnectionClosed may be returned from client methods if the server
  1292. // closes connection before returning the first response byte.
  1293. //
  1294. // If you see this error, then either fix the server by returning
  1295. // 'Connection: close' response header before closing the connection
  1296. // or add 'Connection: close' request header before sending requests
  1297. // to broken server.
  1298. ErrConnectionClosed = errors.New("the server closed connection before returning the first response byte. " +
  1299. "Make sure the server returns 'Connection: close' response header before closing the connection")
  1300. // ErrConnPoolStrategyNotImpl is returned when HostClient.ConnPoolStrategy is not implement yet.
  1301. // If you see this error, then you need to check your HostClient configuration.
  1302. ErrConnPoolStrategyNotImpl = errors.New("connection pool strategy is not implement")
  1303. )
  1304. type timeoutError struct{}
  1305. func (e *timeoutError) Error() string {
  1306. return "timeout"
  1307. }
  1308. // Only implement the Timeout() function of the net.Error interface.
  1309. // This allows for checks like:
  1310. //
  1311. // if x, ok := err.(interface{ Timeout() bool }); ok && x.Timeout() {
  1312. func (e *timeoutError) Timeout() bool {
  1313. return true
  1314. }
  1315. // ErrTimeout is returned from timed out calls.
  1316. var ErrTimeout = &timeoutError{}
  1317. // SetMaxConns sets up the maximum number of connections which may be established to all hosts listed in Addr.
  1318. func (c *HostClient) SetMaxConns(newMaxConns int) {
  1319. c.connsLock.Lock()
  1320. c.MaxConns = newMaxConns
  1321. c.connsLock.Unlock()
  1322. }
  1323. func (c *HostClient) acquireConn(reqTimeout time.Duration, connectionClose bool) (cc *clientConn, err error) {
  1324. createConn := false
  1325. startCleaner := false
  1326. var n int
  1327. c.connsLock.Lock()
  1328. n = len(c.conns)
  1329. if n == 0 {
  1330. maxConns := c.MaxConns
  1331. if maxConns <= 0 {
  1332. maxConns = DefaultMaxConnsPerHost
  1333. }
  1334. if c.connsCount < maxConns {
  1335. c.connsCount++
  1336. createConn = true
  1337. if !c.connsCleanerRun && !connectionClose {
  1338. startCleaner = true
  1339. c.connsCleanerRun = true
  1340. }
  1341. }
  1342. } else {
  1343. switch c.ConnPoolStrategy {
  1344. case LIFO:
  1345. n--
  1346. cc = c.conns[n]
  1347. c.conns[n] = nil
  1348. c.conns = c.conns[:n]
  1349. case FIFO:
  1350. cc = c.conns[0]
  1351. copy(c.conns, c.conns[1:])
  1352. c.conns[n-1] = nil
  1353. c.conns = c.conns[:n-1]
  1354. default:
  1355. c.connsLock.Unlock()
  1356. return nil, ErrConnPoolStrategyNotImpl
  1357. }
  1358. }
  1359. c.connsLock.Unlock()
  1360. if cc != nil {
  1361. return cc, nil
  1362. }
  1363. if !createConn {
  1364. if c.MaxConnWaitTimeout <= 0 {
  1365. return nil, ErrNoFreeConns
  1366. }
  1367. //nolint:dupword
  1368. // reqTimeout c.MaxConnWaitTimeout wait duration
  1369. // d1 d2 min(d1, d2)
  1370. // 0(not set) d2 d2
  1371. // d1 0(don't wait) 0(don't wait)
  1372. // 0(not set) d2 d2
  1373. timeout := c.MaxConnWaitTimeout
  1374. timeoutOverridden := false
  1375. // reqTimeout == 0 means not set
  1376. if reqTimeout > 0 && reqTimeout < timeout {
  1377. timeout = reqTimeout
  1378. timeoutOverridden = true
  1379. }
  1380. // wait for a free connection
  1381. tc := AcquireTimer(timeout)
  1382. defer ReleaseTimer(tc)
  1383. w := &wantConn{
  1384. ready: make(chan struct{}, 1),
  1385. }
  1386. defer func() {
  1387. if err != nil {
  1388. w.cancel(c, err)
  1389. }
  1390. }()
  1391. c.queueForIdle(w)
  1392. select {
  1393. case <-w.ready:
  1394. return w.conn, w.err
  1395. case <-tc.C:
  1396. c.connsWait.failedWaiters.Add(1)
  1397. if timeoutOverridden {
  1398. return nil, ErrTimeout
  1399. }
  1400. return nil, ErrNoFreeConns
  1401. }
  1402. }
  1403. if startCleaner {
  1404. go c.connsCleaner()
  1405. }
  1406. conn, err := c.dialHostHard(reqTimeout)
  1407. if err != nil {
  1408. c.decConnsCount()
  1409. return nil, err
  1410. }
  1411. cc = acquireClientConn(conn)
  1412. return cc, nil
  1413. }
  1414. func (c *HostClient) queueForIdle(w *wantConn) {
  1415. c.connsLock.Lock()
  1416. defer c.connsLock.Unlock()
  1417. if c.connsWait == nil {
  1418. c.connsWait = &wantConnQueue{}
  1419. }
  1420. c.connsWait.clearFront()
  1421. c.connsWait.pushBack(w)
  1422. }
  1423. func (c *HostClient) dialConnFor(w *wantConn) {
  1424. conn, err := c.dialHostHard(0)
  1425. if err != nil {
  1426. w.tryDeliver(nil, err)
  1427. c.decConnsCount()
  1428. return
  1429. }
  1430. cc := acquireClientConn(conn)
  1431. if !w.tryDeliver(cc, nil) {
  1432. // not delivered, return idle connection
  1433. c.releaseConn(cc)
  1434. }
  1435. }
  1436. // CloseIdleConnections closes any connections which were previously
  1437. // connected from previous requests but are now sitting idle in a
  1438. // "keep-alive" state. It does not interrupt any connections currently
  1439. // in use.
  1440. func (c *HostClient) CloseIdleConnections() {
  1441. c.connsLock.Lock()
  1442. scratch := append([]*clientConn{}, c.conns...)
  1443. for i := range c.conns {
  1444. c.conns[i] = nil
  1445. }
  1446. c.conns = c.conns[:0]
  1447. c.connsLock.Unlock()
  1448. for _, cc := range scratch {
  1449. c.closeConn(cc)
  1450. }
  1451. }
  1452. func (c *HostClient) connsCleaner() {
  1453. var (
  1454. scratch []*clientConn
  1455. maxIdleConnDuration = c.MaxIdleConnDuration
  1456. )
  1457. if maxIdleConnDuration <= 0 {
  1458. maxIdleConnDuration = DefaultMaxIdleConnDuration
  1459. }
  1460. for {
  1461. currentTime := time.Now()
  1462. // Determine idle connections to be closed.
  1463. c.connsLock.Lock()
  1464. conns := c.conns
  1465. n := len(conns)
  1466. i := 0
  1467. for i < n && currentTime.Sub(conns[i].lastUseTime) > maxIdleConnDuration {
  1468. i++
  1469. }
  1470. sleepFor := maxIdleConnDuration
  1471. if i < n {
  1472. // + 1 so we actually sleep past the expiration time and not up to it.
  1473. // Otherwise the > check above would still fail.
  1474. sleepFor = maxIdleConnDuration - currentTime.Sub(conns[i].lastUseTime) + 1
  1475. }
  1476. scratch = append(scratch[:0], conns[:i]...)
  1477. if i > 0 {
  1478. m := copy(conns, conns[i:])
  1479. for i = m; i < n; i++ {
  1480. conns[i] = nil
  1481. }
  1482. c.conns = conns[:m]
  1483. }
  1484. c.connsLock.Unlock()
  1485. // Close idle connections.
  1486. for i, cc := range scratch {
  1487. c.closeConn(cc)
  1488. scratch[i] = nil
  1489. }
  1490. // Determine whether to stop the connsCleaner.
  1491. c.connsLock.Lock()
  1492. mustStop := c.connsCount == 0
  1493. if mustStop {
  1494. c.connsCleanerRun = false
  1495. }
  1496. c.connsLock.Unlock()
  1497. if mustStop {
  1498. break
  1499. }
  1500. time.Sleep(sleepFor)
  1501. }
  1502. }
  1503. func (c *HostClient) closeConn(cc *clientConn) {
  1504. c.decConnsCount()
  1505. cc.c.Close()
  1506. releaseClientConn(cc)
  1507. }
  1508. func (c *HostClient) decConnsCount() {
  1509. if c.MaxConnWaitTimeout <= 0 {
  1510. c.connsLock.Lock()
  1511. c.connsCount--
  1512. c.connsLock.Unlock()
  1513. return
  1514. }
  1515. c.connsLock.Lock()
  1516. defer c.connsLock.Unlock()
  1517. dialed := false
  1518. if q := c.connsWait; q != nil && q.len() > 0 {
  1519. for q.len() > 0 {
  1520. w := q.popFront()
  1521. if w.waiting() {
  1522. go c.dialConnFor(w)
  1523. dialed = true
  1524. break
  1525. }
  1526. c.connsWait.failedWaiters.Add(-1)
  1527. }
  1528. }
  1529. if !dialed {
  1530. c.connsCount--
  1531. }
  1532. }
  1533. // ConnsCount returns connection count of HostClient.
  1534. func (c *HostClient) ConnsCount() int {
  1535. c.connsLock.Lock()
  1536. defer c.connsLock.Unlock()
  1537. return c.connsCount
  1538. }
  1539. func acquireClientConn(conn net.Conn) *clientConn {
  1540. v := clientConnPool.Get()
  1541. if v == nil {
  1542. v = &clientConn{}
  1543. }
  1544. cc := v.(*clientConn)
  1545. cc.c = conn
  1546. cc.createdTime = time.Now()
  1547. return cc
  1548. }
  1549. func releaseClientConn(cc *clientConn) {
  1550. // Reset all fields.
  1551. *cc = clientConn{}
  1552. clientConnPool.Put(cc)
  1553. }
  1554. var clientConnPool sync.Pool
  1555. func (c *HostClient) releaseConn(cc *clientConn) {
  1556. cc.lastUseTime = time.Now()
  1557. if c.MaxConnWaitTimeout <= 0 {
  1558. c.connsLock.Lock()
  1559. c.conns = append(c.conns, cc)
  1560. c.connsLock.Unlock()
  1561. return
  1562. }
  1563. // try to deliver an idle connection to a *wantConn
  1564. c.connsLock.Lock()
  1565. defer c.connsLock.Unlock()
  1566. delivered := false
  1567. if q := c.connsWait; q != nil && q.len() > 0 {
  1568. for q.len() > 0 {
  1569. w := q.popFront()
  1570. if w.waiting() {
  1571. delivered = w.tryDeliver(cc, nil)
  1572. // This is the last resort to hand over conCount sema.
  1573. // We must ensure that there are no valid waiters in connsWait
  1574. // when we exit this loop.
  1575. //
  1576. // We did not apply the same looping pattern in the decConnsCount
  1577. // method because it needs to create a new time-spent connection,
  1578. // and the decConnsCount call chain will inevitably reach this point.
  1579. // When MaxConnWaitTimeout>0.
  1580. if delivered {
  1581. break
  1582. }
  1583. }
  1584. c.connsWait.failedWaiters.Add(-1)
  1585. }
  1586. }
  1587. if !delivered {
  1588. c.conns = append(c.conns, cc)
  1589. }
  1590. }
  1591. func (c *HostClient) acquireWriter(conn net.Conn) *bufio.Writer {
  1592. var v any
  1593. if c.clientWriterPool != nil {
  1594. v = c.clientWriterPool.Get()
  1595. if v == nil {
  1596. n := c.WriteBufferSize
  1597. if n <= 0 {
  1598. n = defaultWriteBufferSize
  1599. }
  1600. return bufio.NewWriterSize(conn, n)
  1601. }
  1602. } else {
  1603. v = c.writerPool.Get()
  1604. if v == nil {
  1605. n := c.WriteBufferSize
  1606. if n <= 0 {
  1607. n = defaultWriteBufferSize
  1608. }
  1609. return bufio.NewWriterSize(conn, n)
  1610. }
  1611. }
  1612. bw := v.(*bufio.Writer)
  1613. bw.Reset(conn)
  1614. return bw
  1615. }
  1616. func (c *HostClient) releaseWriter(bw *bufio.Writer) {
  1617. if c.clientWriterPool != nil {
  1618. c.clientWriterPool.Put(bw)
  1619. } else {
  1620. c.writerPool.Put(bw)
  1621. }
  1622. }
  1623. func (c *HostClient) acquireReader(conn net.Conn) *bufio.Reader {
  1624. var v any
  1625. if c.clientReaderPool != nil {
  1626. v = c.clientReaderPool.Get()
  1627. if v == nil {
  1628. n := c.ReadBufferSize
  1629. if n <= 0 {
  1630. n = defaultReadBufferSize
  1631. }
  1632. return bufio.NewReaderSize(conn, n)
  1633. }
  1634. } else {
  1635. v = c.readerPool.Get()
  1636. if v == nil {
  1637. n := c.ReadBufferSize
  1638. if n <= 0 {
  1639. n = defaultReadBufferSize
  1640. }
  1641. return bufio.NewReaderSize(conn, n)
  1642. }
  1643. }
  1644. br := v.(*bufio.Reader)
  1645. br.Reset(conn)
  1646. return br
  1647. }
  1648. func (c *HostClient) releaseReader(br *bufio.Reader) {
  1649. if c.clientReaderPool != nil {
  1650. c.clientReaderPool.Put(br)
  1651. } else {
  1652. c.readerPool.Put(br)
  1653. }
  1654. }
  1655. func newClientTLSConfig(c *tls.Config, addr string) *tls.Config {
  1656. if c == nil {
  1657. c = &tls.Config{}
  1658. } else {
  1659. c = c.Clone()
  1660. }
  1661. if c.ServerName == "" {
  1662. serverName := tlsServerName(addr)
  1663. if serverName == "*" {
  1664. c.InsecureSkipVerify = true
  1665. } else {
  1666. c.ServerName = serverName
  1667. }
  1668. }
  1669. return c
  1670. }
  1671. func tlsServerName(addr string) string {
  1672. if !strings.Contains(addr, ":") {
  1673. return addr
  1674. }
  1675. host, _, err := net.SplitHostPort(addr)
  1676. if err != nil {
  1677. return "*"
  1678. }
  1679. return host
  1680. }
  1681. func (c *HostClient) nextAddr() string {
  1682. c.addrsLock.Lock()
  1683. if c.addrs == nil {
  1684. c.addrs = strings.Split(c.Addr, ",")
  1685. }
  1686. addr := c.addrs[0]
  1687. if len(c.addrs) > 1 {
  1688. addr = c.addrs[c.addrIdx%uint32(len(c.addrs))] // #nosec G115
  1689. c.addrIdx++
  1690. }
  1691. c.addrsLock.Unlock()
  1692. return addr
  1693. }
  1694. func (c *HostClient) dialHostHard(dialTimeout time.Duration) (conn net.Conn, err error) {
  1695. // use dialTimeout to control the timeout of each dial. It does not work if dialTimeout is 0 or if
  1696. // c.DialTimeout has not been set and c.Dial has been set.
  1697. // attempt to dial all the available hosts before giving up.
  1698. c.addrsLock.Lock()
  1699. n := len(c.addrs)
  1700. c.addrsLock.Unlock()
  1701. if n == 0 {
  1702. // It looks like c.addrs isn't initialized yet.
  1703. n = 1
  1704. }
  1705. timeout := c.ReadTimeout + c.WriteTimeout
  1706. if timeout <= 0 {
  1707. timeout = DefaultDialTimeout
  1708. }
  1709. deadline := time.Now().Add(timeout)
  1710. for n > 0 {
  1711. addr := c.nextAddr()
  1712. tlsConfig := c.cachedTLSConfig(addr)
  1713. conn, err = dialAddr(addr, c.Dial, c.DialTimeout, c.DialDualStack, c.IsTLS, tlsConfig, dialTimeout, c.WriteTimeout)
  1714. if err == nil {
  1715. return conn, nil
  1716. }
  1717. if time.Since(deadline) >= 0 {
  1718. break
  1719. }
  1720. n--
  1721. }
  1722. return nil, err
  1723. }
  1724. func (c *HostClient) cachedTLSConfig(addr string) *tls.Config {
  1725. if !c.IsTLS {
  1726. return nil
  1727. }
  1728. c.tlsConfigMapLock.Lock()
  1729. if c.tlsConfigMap == nil {
  1730. c.tlsConfigMap = make(map[string]*tls.Config)
  1731. }
  1732. cfg := c.tlsConfigMap[addr]
  1733. if cfg == nil {
  1734. cfg = newClientTLSConfig(c.TLSConfig, addr)
  1735. c.tlsConfigMap[addr] = cfg
  1736. }
  1737. c.tlsConfigMapLock.Unlock()
  1738. return cfg
  1739. }
  1740. // ErrTLSHandshakeTimeout indicates there is a timeout from tls handshake.
  1741. var ErrTLSHandshakeTimeout = errors.New("tls handshake timed out")
  1742. func tlsClientHandshake(rawConn net.Conn, tlsConfig *tls.Config, deadline time.Time) (_ net.Conn, retErr error) {
  1743. defer func() {
  1744. if retErr != nil {
  1745. rawConn.Close()
  1746. }
  1747. }()
  1748. conn := tls.Client(rawConn, tlsConfig)
  1749. err := conn.SetDeadline(deadline)
  1750. if err != nil {
  1751. return nil, err
  1752. }
  1753. err = conn.Handshake()
  1754. if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
  1755. return nil, ErrTLSHandshakeTimeout
  1756. }
  1757. if err != nil {
  1758. return nil, err
  1759. }
  1760. err = conn.SetDeadline(time.Time{})
  1761. if err != nil {
  1762. return nil, err
  1763. }
  1764. return conn, nil
  1765. }
  1766. func dialAddr(
  1767. addr string, dial DialFunc, dialWithTimeout DialFuncWithTimeout, dialDualStack, isTLS bool,
  1768. tlsConfig *tls.Config, dialTimeout, writeTimeout time.Duration,
  1769. ) (net.Conn, error) {
  1770. deadline := time.Now().Add(writeTimeout)
  1771. conn, err := callDialFunc(addr, dial, dialWithTimeout, dialDualStack, isTLS, dialTimeout)
  1772. if err != nil {
  1773. return nil, err
  1774. }
  1775. if conn == nil {
  1776. return nil, errors.New("dialling unsuccessful. Please report this bug")
  1777. }
  1778. // We assume that any conn that has the Handshake() method is a TLS conn already.
  1779. // This doesn't cover just tls.Conn but also other TLS implementations.
  1780. _, isTLSAlready := conn.(interface{ Handshake() error })
  1781. if isTLS && !isTLSAlready {
  1782. if writeTimeout == 0 {
  1783. return tls.Client(conn, tlsConfig), nil
  1784. }
  1785. return tlsClientHandshake(conn, tlsConfig, deadline)
  1786. }
  1787. return conn, nil
  1788. }
  1789. func callDialFunc(
  1790. addr string, dial DialFunc, dialWithTimeout DialFuncWithTimeout, dialDualStack, isTLS bool, timeout time.Duration,
  1791. ) (net.Conn, error) {
  1792. if dialWithTimeout != nil {
  1793. return dialWithTimeout(addr, timeout)
  1794. }
  1795. if dial != nil {
  1796. return dial(addr)
  1797. }
  1798. addr = AddMissingPort(addr, isTLS)
  1799. if timeout > 0 {
  1800. if dialDualStack {
  1801. return DialDualStackTimeout(addr, timeout)
  1802. }
  1803. return DialTimeout(addr, timeout)
  1804. }
  1805. if dialDualStack {
  1806. return DialDualStack(addr)
  1807. }
  1808. return Dial(addr)
  1809. }
  1810. // AddMissingPort adds a port to a host if it is missing.
  1811. // A literal IPv6 address in hostport must be enclosed in square
  1812. // brackets, as in "[::1]:80", "[::1%lo0]:80".
  1813. func AddMissingPort(addr string, isTLS bool) string {
  1814. addrLen := len(addr)
  1815. if addrLen == 0 {
  1816. return addr
  1817. }
  1818. isIP6 := addr[0] == '['
  1819. if isIP6 {
  1820. // if the IPv6 has opening bracket but closing bracket is the last char then it doesn't have a port
  1821. isIP6WithoutPort := addr[addrLen-1] == ']'
  1822. if !isIP6WithoutPort {
  1823. return addr
  1824. }
  1825. } else { // IPv4
  1826. columnPos := strings.LastIndexByte(addr, ':')
  1827. if columnPos > 0 {
  1828. return addr
  1829. }
  1830. }
  1831. port := ":80"
  1832. if isTLS {
  1833. port = ":443"
  1834. }
  1835. return addr + port
  1836. }
  1837. // A wantConn records state about a wanted connection
  1838. // (that is, an active call to getConn).
  1839. // The conn may be gotten by dialing or by finding an idle connection,
  1840. // or a cancellation may make the conn no longer wanted.
  1841. // These three options are racing against each other and use
  1842. // wantConn to coordinate and agree about the winning outcome.
  1843. //
  1844. // Inspired by net/http/transport.go.
  1845. type wantConn struct {
  1846. err error
  1847. ready chan struct{}
  1848. conn *clientConn
  1849. mu sync.Mutex // protects conn, err, close(ready)
  1850. }
  1851. // waiting reports whether w is still waiting for an answer (connection or error).
  1852. func (w *wantConn) waiting() bool {
  1853. select {
  1854. case <-w.ready:
  1855. return false
  1856. default:
  1857. return true
  1858. }
  1859. }
  1860. // tryDeliver attempts to deliver conn, err to w and reports whether it succeeded.
  1861. func (w *wantConn) tryDeliver(conn *clientConn, err error) bool {
  1862. w.mu.Lock()
  1863. defer w.mu.Unlock()
  1864. if w.conn != nil || w.err != nil {
  1865. return false
  1866. }
  1867. w.conn = conn
  1868. w.err = err
  1869. if w.conn == nil && w.err == nil {
  1870. panic("fasthttp: internal error: misuse of tryDeliver")
  1871. }
  1872. close(w.ready)
  1873. return true
  1874. }
  1875. // cancel marks w as no longer wanting a result (for example, due to cancellation).
  1876. // If a connection has been delivered already, cancel returns it with c.releaseConn.
  1877. func (w *wantConn) cancel(c *HostClient, err error) {
  1878. w.mu.Lock()
  1879. if w.conn == nil && w.err == nil {
  1880. close(w.ready) // catch misbehavior in future delivery
  1881. }
  1882. conn := w.conn
  1883. w.conn = nil
  1884. w.err = err
  1885. w.mu.Unlock()
  1886. if conn != nil {
  1887. c.releaseConn(conn)
  1888. }
  1889. }
  1890. // A wantConnQueue is a queue of wantConns.
  1891. //
  1892. // Inspired by net/http/transport.go.
  1893. type wantConnQueue struct {
  1894. // This is a queue, not a dequeue.
  1895. // It is split into two stages - head[headPos:] and tail.
  1896. // popFront is trivial (headPos++) on the first stage, and
  1897. // pushBack is trivial (append) on the second stage.
  1898. // If the first stage is empty, popFront can swap the
  1899. // first and second stages to remedy the situation.
  1900. //
  1901. // This two-stage split is analogous to the use of two lists
  1902. // in Okasaki's purely functional queue but without the
  1903. // overhead of reversing the list when swapping stages.
  1904. head []*wantConn
  1905. tail []*wantConn
  1906. headPos int
  1907. // failedWaiters is the number of waiters in the head or tail queue,
  1908. // but is invalid.
  1909. // These state waiters cannot truly be considered as waiters; the current
  1910. // implementation does not immediately remove them when they become
  1911. // invalid but instead only marks them.
  1912. failedWaiters atomic.Int64
  1913. }
  1914. // len returns the number of items in the queue.
  1915. func (q *wantConnQueue) len() int {
  1916. return len(q.head) - q.headPos + len(q.tail) - int(q.failedWaiters.Load())
  1917. }
  1918. // pushBack adds w to the back of the queue.
  1919. func (q *wantConnQueue) pushBack(w *wantConn) {
  1920. q.tail = append(q.tail, w)
  1921. }
  1922. // popFront removes and returns the wantConn at the front of the queue.
  1923. func (q *wantConnQueue) popFront() *wantConn {
  1924. if q.headPos >= len(q.head) {
  1925. if len(q.tail) == 0 {
  1926. return nil
  1927. }
  1928. // Pick up tail as new head, clear tail.
  1929. q.head, q.headPos, q.tail = q.tail, 0, q.head[:0]
  1930. }
  1931. w := q.head[q.headPos]
  1932. q.head[q.headPos] = nil
  1933. q.headPos++
  1934. return w
  1935. }
  1936. // peekFront returns the wantConn at the front of the queue without removing it.
  1937. func (q *wantConnQueue) peekFront() *wantConn {
  1938. if q.headPos < len(q.head) {
  1939. return q.head[q.headPos]
  1940. }
  1941. if len(q.tail) > 0 {
  1942. return q.tail[0]
  1943. }
  1944. return nil
  1945. }
  1946. // clearFront pops any wantConns that are no longer waiting from the head of the
  1947. // queue, reporting whether any were popped.
  1948. func (q *wantConnQueue) clearFront() (cleaned bool) {
  1949. for {
  1950. w := q.peekFront()
  1951. if w == nil || w.waiting() {
  1952. return cleaned
  1953. }
  1954. q.popFront()
  1955. q.failedWaiters.Add(-1)
  1956. cleaned = true
  1957. }
  1958. }
  1959. // PipelineClient pipelines requests over a limited set of concurrent
  1960. // connections to the given Addr.
  1961. //
  1962. // This client may be used in highly loaded HTTP-based RPC systems for reducing
  1963. // context switches and network level overhead.
  1964. // See https://en.wikipedia.org/wiki/HTTP_pipelining for details.
  1965. //
  1966. // It is forbidden copying PipelineClient instances. Create new instances
  1967. // instead.
  1968. //
  1969. // It is safe calling PipelineClient methods from concurrently running
  1970. // goroutines.
  1971. type PipelineClient struct {
  1972. noCopy noCopy
  1973. // Logger for logging client errors.
  1974. //
  1975. // By default standard logger from log package is used.
  1976. Logger Logger
  1977. // Callback for connection establishing to the host.
  1978. //
  1979. // Default Dial is used if not set.
  1980. Dial DialFunc
  1981. // Optional TLS config.
  1982. TLSConfig *tls.Config
  1983. // Address of the host to connect to.
  1984. Addr string
  1985. // PipelineClient name. Used in User-Agent request header.
  1986. Name string
  1987. connClients []*pipelineConnClient
  1988. // The maximum number of concurrent connections to the Addr.
  1989. //
  1990. // A single connection is used by default.
  1991. MaxConns int
  1992. // The maximum number of pending pipelined requests over
  1993. // a single connection to Addr.
  1994. //
  1995. // DefaultMaxPendingRequests is used by default.
  1996. MaxPendingRequests int
  1997. // The maximum delay before sending pipelined requests as a batch
  1998. // to the server.
  1999. //
  2000. // By default requests are sent immediately to the server.
  2001. MaxBatchDelay time.Duration
  2002. // Idle connection to the host is closed after this duration.
  2003. //
  2004. // By default idle connection is closed after
  2005. // DefaultMaxIdleConnDuration.
  2006. MaxIdleConnDuration time.Duration
  2007. // Buffer size for responses' reading.
  2008. // This also limits the maximum header size.
  2009. //
  2010. // Default buffer size is used if 0.
  2011. ReadBufferSize int
  2012. // Buffer size for requests' writing.
  2013. //
  2014. // Default buffer size is used if 0.
  2015. WriteBufferSize int
  2016. // Maximum duration for full response reading (including body).
  2017. //
  2018. // By default response read timeout is unlimited.
  2019. ReadTimeout time.Duration
  2020. // Maximum duration for full request writing (including body).
  2021. //
  2022. // By default request write timeout is unlimited.
  2023. WriteTimeout time.Duration
  2024. connClientsLock sync.Mutex
  2025. // NoDefaultUserAgentHeader when set to true, causes the default
  2026. // User-Agent header to be excluded from the Request.
  2027. NoDefaultUserAgentHeader bool
  2028. // Attempt to connect to both ipv4 and ipv6 host addresses
  2029. // if set to true.
  2030. //
  2031. // This option is used only if default TCP dialer is used,
  2032. // i.e. if Dial is blank.
  2033. //
  2034. // By default client connects only to ipv4 addresses,
  2035. // since unfortunately ipv6 remains broken in many networks worldwide :)
  2036. DialDualStack bool
  2037. // Response header names are passed as-is without normalization
  2038. // if this option is set.
  2039. //
  2040. // Disabled header names' normalization may be useful only for proxying
  2041. // responses to other clients expecting case-sensitive
  2042. // header names. See https://github.com/valyala/fasthttp/issues/57
  2043. // for details.
  2044. //
  2045. // By default request and response header names are normalized, i.e.
  2046. // The first letter and the first letters following dashes
  2047. // are uppercased, while all the other letters are lowercased.
  2048. // Examples:
  2049. //
  2050. // * HOST -> Host
  2051. // * content-type -> Content-Type
  2052. // * cONTENT-lenGTH -> Content-Length
  2053. DisableHeaderNamesNormalizing bool
  2054. // Path values are sent as-is without normalization
  2055. //
  2056. // Disabled path normalization may be useful for proxying incoming requests
  2057. // to servers that are expecting paths to be forwarded as-is.
  2058. //
  2059. // By default path values are normalized, i.e.
  2060. // extra slashes are removed, special characters are encoded.
  2061. DisablePathNormalizing bool
  2062. // Whether to use TLS (aka SSL or HTTPS) for host connections.
  2063. IsTLS bool
  2064. }
  2065. type pipelineConnClient struct {
  2066. noCopy noCopy
  2067. workPool sync.Pool
  2068. Logger Logger
  2069. Dial DialFunc
  2070. TLSConfig *tls.Config
  2071. chW chan *pipelineWork
  2072. chR chan *pipelineWork
  2073. tlsConfig *tls.Config
  2074. Addr string
  2075. Name string
  2076. MaxPendingRequests int
  2077. MaxBatchDelay time.Duration
  2078. MaxIdleConnDuration time.Duration
  2079. ReadBufferSize int
  2080. WriteBufferSize int
  2081. ReadTimeout time.Duration
  2082. WriteTimeout time.Duration
  2083. chLock sync.Mutex
  2084. tlsConfigLock sync.Mutex
  2085. NoDefaultUserAgentHeader bool
  2086. DialDualStack bool
  2087. DisableHeaderNamesNormalizing bool
  2088. DisablePathNormalizing bool
  2089. IsTLS bool
  2090. }
  2091. type pipelineWork struct {
  2092. respCopy Response
  2093. deadline time.Time
  2094. err error
  2095. req *Request
  2096. resp *Response
  2097. t *time.Timer
  2098. done chan struct{}
  2099. reqCopy Request
  2100. }
  2101. // DoTimeout performs the given request and waits for response during
  2102. // the given timeout duration.
  2103. //
  2104. // Request must contain at least non-zero RequestURI with full url (including
  2105. // scheme and host) or non-zero Host header + RequestURI.
  2106. //
  2107. // The function doesn't follow redirects.
  2108. //
  2109. // Response is ignored if resp is nil.
  2110. //
  2111. // ErrTimeout is returned if the response wasn't returned during
  2112. // the given timeout.
  2113. //
  2114. // It is recommended obtaining req and resp via AcquireRequest
  2115. // and AcquireResponse in performance-critical code.
  2116. func (c *PipelineClient) DoTimeout(req *Request, resp *Response, timeout time.Duration) error {
  2117. return c.DoDeadline(req, resp, time.Now().Add(timeout))
  2118. }
  2119. // DoDeadline performs the given request and waits for response until
  2120. // the given deadline.
  2121. //
  2122. // Request must contain at least non-zero RequestURI with full url (including
  2123. // scheme and host) or non-zero Host header + RequestURI.
  2124. //
  2125. // The function doesn't follow redirects.
  2126. //
  2127. // Response is ignored if resp is nil.
  2128. //
  2129. // ErrTimeout is returned if the response wasn't returned until
  2130. // the given deadline.
  2131. //
  2132. // It is recommended obtaining req and resp via AcquireRequest
  2133. // and AcquireResponse in performance-critical code.
  2134. func (c *PipelineClient) DoDeadline(req *Request, resp *Response, deadline time.Time) error {
  2135. return c.getConnClient().DoDeadline(req, resp, deadline)
  2136. }
  2137. func (c *pipelineConnClient) DoDeadline(req *Request, resp *Response, deadline time.Time) error {
  2138. c.init()
  2139. timeout := time.Until(deadline)
  2140. if timeout <= 0 {
  2141. return ErrTimeout
  2142. }
  2143. if c.DisablePathNormalizing {
  2144. req.URI().DisablePathNormalizing = true
  2145. }
  2146. userAgentOld := req.Header.UserAgent()
  2147. if len(userAgentOld) == 0 {
  2148. userAgent := c.Name
  2149. if userAgent == "" && !c.NoDefaultUserAgentHeader {
  2150. userAgent = defaultUserAgent
  2151. }
  2152. if userAgent != "" {
  2153. req.Header.userAgent = append(req.Header.userAgent[:0], userAgent...)
  2154. }
  2155. }
  2156. w := c.acquirePipelineWork(timeout)
  2157. w.respCopy.Header.disableNormalizing = c.DisableHeaderNamesNormalizing
  2158. w.req = &w.reqCopy
  2159. w.resp = &w.respCopy
  2160. // Make a copy of the request in order to avoid data races on timeouts
  2161. req.copyToSkipBody(&w.reqCopy)
  2162. swapRequestBody(req, &w.reqCopy)
  2163. // Put the request to outgoing queue
  2164. select {
  2165. case c.chW <- w:
  2166. // Fast path: len(c.ch) < cap(c.ch)
  2167. default:
  2168. // Slow path
  2169. select {
  2170. case c.chW <- w:
  2171. case <-w.t.C:
  2172. c.releasePipelineWork(w)
  2173. return ErrTimeout
  2174. }
  2175. }
  2176. // Wait for the response
  2177. var err error
  2178. select {
  2179. case <-w.done:
  2180. if resp != nil {
  2181. w.respCopy.copyToSkipBody(resp)
  2182. swapResponseBody(resp, &w.respCopy)
  2183. }
  2184. err = w.err
  2185. c.releasePipelineWork(w)
  2186. case <-w.t.C:
  2187. err = ErrTimeout
  2188. }
  2189. return err
  2190. }
  2191. func (c *pipelineConnClient) acquirePipelineWork(timeout time.Duration) (w *pipelineWork) {
  2192. v := c.workPool.Get()
  2193. if v != nil {
  2194. w = v.(*pipelineWork)
  2195. } else {
  2196. w = &pipelineWork{
  2197. done: make(chan struct{}, 1),
  2198. }
  2199. }
  2200. if timeout > 0 {
  2201. if w.t == nil {
  2202. w.t = time.NewTimer(timeout)
  2203. } else {
  2204. w.t.Reset(timeout)
  2205. }
  2206. w.deadline = time.Now().Add(timeout)
  2207. } else {
  2208. w.deadline = zeroTime
  2209. }
  2210. return w
  2211. }
  2212. func (c *pipelineConnClient) releasePipelineWork(w *pipelineWork) {
  2213. if w.t != nil {
  2214. w.t.Stop()
  2215. }
  2216. w.reqCopy.Reset()
  2217. w.respCopy.Reset()
  2218. w.req = nil
  2219. w.resp = nil
  2220. w.err = nil
  2221. c.workPool.Put(w)
  2222. }
  2223. // Do performs the given http request and sets the corresponding response.
  2224. //
  2225. // Request must contain at least non-zero RequestURI with full url (including
  2226. // scheme and host) or non-zero Host header + RequestURI.
  2227. //
  2228. // The function doesn't follow redirects. Use Get* for following redirects.
  2229. //
  2230. // Response is ignored if resp is nil.
  2231. //
  2232. // It is recommended obtaining req and resp via AcquireRequest
  2233. // and AcquireResponse in performance-critical code.
  2234. func (c *PipelineClient) Do(req *Request, resp *Response) error {
  2235. return c.getConnClient().Do(req, resp)
  2236. }
  2237. func (c *pipelineConnClient) Do(req *Request, resp *Response) error {
  2238. c.init()
  2239. if c.DisablePathNormalizing {
  2240. req.URI().DisablePathNormalizing = true
  2241. }
  2242. userAgentOld := req.Header.UserAgent()
  2243. if len(userAgentOld) == 0 {
  2244. userAgent := c.Name
  2245. if userAgent == "" && !c.NoDefaultUserAgentHeader {
  2246. userAgent = defaultUserAgent
  2247. }
  2248. if userAgent != "" {
  2249. req.Header.userAgent = append(req.Header.userAgent[:0], userAgent...)
  2250. }
  2251. }
  2252. w := c.acquirePipelineWork(0)
  2253. w.req = req
  2254. if resp != nil {
  2255. resp.Header.disableNormalizing = c.DisableHeaderNamesNormalizing
  2256. w.resp = resp
  2257. } else {
  2258. w.resp = &w.respCopy
  2259. }
  2260. // Put the request to outgoing queue
  2261. select {
  2262. case c.chW <- w:
  2263. default:
  2264. // Try substituting the oldest w with the current one.
  2265. select {
  2266. case wOld := <-c.chW:
  2267. wOld.err = ErrPipelineOverflow
  2268. wOld.done <- struct{}{}
  2269. default:
  2270. }
  2271. select {
  2272. case c.chW <- w:
  2273. default:
  2274. c.releasePipelineWork(w)
  2275. return ErrPipelineOverflow
  2276. }
  2277. }
  2278. // Wait for the response
  2279. <-w.done
  2280. err := w.err
  2281. c.releasePipelineWork(w)
  2282. return err
  2283. }
  2284. func (c *PipelineClient) getConnClient() *pipelineConnClient {
  2285. c.connClientsLock.Lock()
  2286. cc := c.getConnClientUnlocked()
  2287. c.connClientsLock.Unlock()
  2288. return cc
  2289. }
  2290. func (c *PipelineClient) getConnClientUnlocked() *pipelineConnClient {
  2291. if len(c.connClients) == 0 {
  2292. return c.newConnClient()
  2293. }
  2294. // Return the client with the minimum number of pending requests.
  2295. minCC := c.connClients[0]
  2296. minReqs := minCC.PendingRequests()
  2297. if minReqs == 0 {
  2298. return minCC
  2299. }
  2300. for i := 1; i < len(c.connClients); i++ {
  2301. cc := c.connClients[i]
  2302. reqs := cc.PendingRequests()
  2303. if reqs == 0 {
  2304. return cc
  2305. }
  2306. if reqs < minReqs {
  2307. minCC = cc
  2308. minReqs = reqs
  2309. }
  2310. }
  2311. maxConns := c.MaxConns
  2312. if maxConns <= 0 {
  2313. maxConns = 1
  2314. }
  2315. if len(c.connClients) < maxConns {
  2316. return c.newConnClient()
  2317. }
  2318. return minCC
  2319. }
  2320. func (c *PipelineClient) newConnClient() *pipelineConnClient {
  2321. cc := &pipelineConnClient{
  2322. Addr: c.Addr,
  2323. Name: c.Name,
  2324. NoDefaultUserAgentHeader: c.NoDefaultUserAgentHeader,
  2325. MaxPendingRequests: c.MaxPendingRequests,
  2326. MaxBatchDelay: c.MaxBatchDelay,
  2327. Dial: c.Dial,
  2328. DialDualStack: c.DialDualStack,
  2329. DisableHeaderNamesNormalizing: c.DisableHeaderNamesNormalizing,
  2330. DisablePathNormalizing: c.DisablePathNormalizing,
  2331. IsTLS: c.IsTLS,
  2332. TLSConfig: c.TLSConfig,
  2333. MaxIdleConnDuration: c.MaxIdleConnDuration,
  2334. ReadBufferSize: c.ReadBufferSize,
  2335. WriteBufferSize: c.WriteBufferSize,
  2336. ReadTimeout: c.ReadTimeout,
  2337. WriteTimeout: c.WriteTimeout,
  2338. Logger: c.Logger,
  2339. }
  2340. c.connClients = append(c.connClients, cc)
  2341. return cc
  2342. }
  2343. // ErrPipelineOverflow may be returned from PipelineClient.Do*
  2344. // if the requests' queue is overflowed.
  2345. var ErrPipelineOverflow = errors.New("pipelined requests' queue has been overflowed. Increase MaxConns and/or MaxPendingRequests")
  2346. // DefaultMaxPendingRequests is the default value
  2347. // for PipelineClient.MaxPendingRequests.
  2348. const DefaultMaxPendingRequests = 1024
  2349. func (c *pipelineConnClient) init() {
  2350. c.chLock.Lock()
  2351. if c.chR == nil {
  2352. maxPendingRequests := c.MaxPendingRequests
  2353. if maxPendingRequests <= 0 {
  2354. maxPendingRequests = DefaultMaxPendingRequests
  2355. }
  2356. c.chR = make(chan *pipelineWork, maxPendingRequests)
  2357. if c.chW == nil {
  2358. c.chW = make(chan *pipelineWork, maxPendingRequests)
  2359. }
  2360. go func() {
  2361. // Keep restarting the worker if it fails (connection errors for example).
  2362. for {
  2363. if err := c.worker(); err != nil {
  2364. c.logger().Printf("error in PipelineClient(%q): %v", c.Addr, err)
  2365. if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
  2366. // Throttle client reconnections on timeout errors
  2367. time.Sleep(time.Second)
  2368. }
  2369. } else {
  2370. c.chLock.Lock()
  2371. stop := len(c.chR) == 0 && len(c.chW) == 0
  2372. if !stop {
  2373. c.chR = nil
  2374. c.chW = nil
  2375. }
  2376. c.chLock.Unlock()
  2377. if stop {
  2378. break
  2379. }
  2380. }
  2381. }
  2382. }()
  2383. }
  2384. c.chLock.Unlock()
  2385. }
  2386. func (c *pipelineConnClient) worker() error {
  2387. tlsConfig := c.cachedTLSConfig()
  2388. conn, err := dialAddr(c.Addr, c.Dial, nil, c.DialDualStack, c.IsTLS, tlsConfig, 0, c.WriteTimeout)
  2389. if err != nil {
  2390. return err
  2391. }
  2392. // Start reader and writer
  2393. stopW := make(chan struct{})
  2394. doneW := make(chan error)
  2395. go func() {
  2396. doneW <- c.writer(conn, stopW)
  2397. }()
  2398. stopR := make(chan struct{})
  2399. doneR := make(chan error)
  2400. go func() {
  2401. doneR <- c.reader(conn, stopR)
  2402. }()
  2403. // Wait until reader and writer are stopped
  2404. select {
  2405. case err = <-doneW:
  2406. conn.Close()
  2407. close(stopR)
  2408. <-doneR
  2409. case err = <-doneR:
  2410. conn.Close()
  2411. close(stopW)
  2412. <-doneW
  2413. }
  2414. // Notify pending readers
  2415. for len(c.chR) > 0 {
  2416. w := <-c.chR
  2417. w.err = errPipelineConnStopped
  2418. w.done <- struct{}{}
  2419. }
  2420. return err
  2421. }
  2422. func (c *pipelineConnClient) cachedTLSConfig() *tls.Config {
  2423. if !c.IsTLS {
  2424. return nil
  2425. }
  2426. c.tlsConfigLock.Lock()
  2427. cfg := c.tlsConfig
  2428. if cfg == nil {
  2429. cfg = newClientTLSConfig(c.TLSConfig, c.Addr)
  2430. c.tlsConfig = cfg
  2431. }
  2432. c.tlsConfigLock.Unlock()
  2433. return cfg
  2434. }
  2435. func (c *pipelineConnClient) writer(conn net.Conn, stopCh <-chan struct{}) error {
  2436. writeBufferSize := c.WriteBufferSize
  2437. if writeBufferSize <= 0 {
  2438. writeBufferSize = defaultWriteBufferSize
  2439. }
  2440. bw := bufio.NewWriterSize(conn, writeBufferSize)
  2441. defer bw.Flush()
  2442. chR := c.chR
  2443. chW := c.chW
  2444. writeTimeout := c.WriteTimeout
  2445. maxIdleConnDuration := c.MaxIdleConnDuration
  2446. if maxIdleConnDuration <= 0 {
  2447. maxIdleConnDuration = DefaultMaxIdleConnDuration
  2448. }
  2449. maxBatchDelay := c.MaxBatchDelay
  2450. var (
  2451. stopTimer = time.NewTimer(time.Hour)
  2452. flushTimer = time.NewTimer(time.Hour)
  2453. flushTimerCh <-chan time.Time
  2454. instantTimerCh = make(chan time.Time)
  2455. w *pipelineWork
  2456. err error
  2457. )
  2458. close(instantTimerCh)
  2459. for {
  2460. againChW:
  2461. select {
  2462. case w = <-chW:
  2463. // Fast path: len(chW) > 0
  2464. default:
  2465. // Slow path
  2466. stopTimer.Reset(maxIdleConnDuration)
  2467. select {
  2468. case w = <-chW:
  2469. case <-stopTimer.C:
  2470. return nil
  2471. case <-stopCh:
  2472. return nil
  2473. case <-flushTimerCh:
  2474. if err = bw.Flush(); err != nil {
  2475. return err
  2476. }
  2477. flushTimerCh = nil
  2478. goto againChW
  2479. }
  2480. }
  2481. if !w.deadline.IsZero() && time.Since(w.deadline) >= 0 {
  2482. w.err = ErrTimeout
  2483. w.done <- struct{}{}
  2484. continue
  2485. }
  2486. w.resp.parseNetConn(conn)
  2487. if writeTimeout > 0 {
  2488. // Set Deadline every time, since golang has fixed the performance issue
  2489. // See https://github.com/golang/go/issues/15133#issuecomment-271571395 for details
  2490. currentTime := time.Now()
  2491. if err = conn.SetWriteDeadline(currentTime.Add(writeTimeout)); err != nil {
  2492. w.err = err
  2493. w.done <- struct{}{}
  2494. return err
  2495. }
  2496. }
  2497. if err = w.req.Write(bw); err != nil {
  2498. w.err = err
  2499. w.done <- struct{}{}
  2500. return err
  2501. }
  2502. if flushTimerCh == nil && (len(chW) == 0 || len(chR) == cap(chR)) {
  2503. if maxBatchDelay > 0 {
  2504. flushTimer.Reset(maxBatchDelay)
  2505. flushTimerCh = flushTimer.C
  2506. } else {
  2507. flushTimerCh = instantTimerCh
  2508. }
  2509. }
  2510. againChR:
  2511. select {
  2512. case chR <- w:
  2513. // Fast path: len(chR) < cap(chR)
  2514. default:
  2515. // Slow path
  2516. select {
  2517. case chR <- w:
  2518. case <-stopCh:
  2519. w.err = errPipelineConnStopped
  2520. w.done <- struct{}{}
  2521. return nil
  2522. case <-flushTimerCh:
  2523. if err = bw.Flush(); err != nil {
  2524. w.err = err
  2525. w.done <- struct{}{}
  2526. return err
  2527. }
  2528. flushTimerCh = nil
  2529. goto againChR
  2530. }
  2531. }
  2532. }
  2533. }
  2534. func (c *pipelineConnClient) reader(conn net.Conn, stopCh <-chan struct{}) error {
  2535. readBufferSize := c.ReadBufferSize
  2536. if readBufferSize <= 0 {
  2537. readBufferSize = defaultReadBufferSize
  2538. }
  2539. br := bufio.NewReaderSize(conn, readBufferSize)
  2540. chR := c.chR
  2541. readTimeout := c.ReadTimeout
  2542. var (
  2543. w *pipelineWork
  2544. err error
  2545. )
  2546. for {
  2547. select {
  2548. case w = <-chR:
  2549. // Fast path: len(chR) > 0
  2550. default:
  2551. // Slow path
  2552. select {
  2553. case w = <-chR:
  2554. case <-stopCh:
  2555. return nil
  2556. }
  2557. }
  2558. if readTimeout > 0 {
  2559. // Set Deadline every time, since golang has fixed the performance issue
  2560. // See https://github.com/golang/go/issues/15133#issuecomment-271571395 for details
  2561. currentTime := time.Now()
  2562. if err = conn.SetReadDeadline(currentTime.Add(readTimeout)); err != nil {
  2563. w.err = err
  2564. w.done <- struct{}{}
  2565. return err
  2566. }
  2567. }
  2568. if err = w.resp.Read(br); err != nil {
  2569. w.err = err
  2570. w.done <- struct{}{}
  2571. return err
  2572. }
  2573. w.done <- struct{}{}
  2574. }
  2575. }
  2576. func (c *pipelineConnClient) logger() Logger {
  2577. if c.Logger != nil {
  2578. return c.Logger
  2579. }
  2580. return defaultLogger
  2581. }
  2582. // PendingRequests returns the current number of pending requests pipelined
  2583. // to the server.
  2584. //
  2585. // This number may exceed MaxPendingRequests*MaxConns by up to two times, since
  2586. // each connection to the server may keep up to MaxPendingRequests requests
  2587. // in the queue before sending them to the server.
  2588. //
  2589. // This function may be used for balancing load among multiple PipelineClient
  2590. // instances.
  2591. func (c *PipelineClient) PendingRequests() int {
  2592. c.connClientsLock.Lock()
  2593. n := 0
  2594. for _, cc := range c.connClients {
  2595. n += cc.PendingRequests()
  2596. }
  2597. c.connClientsLock.Unlock()
  2598. return n
  2599. }
  2600. func (c *pipelineConnClient) PendingRequests() int {
  2601. c.init()
  2602. c.chLock.Lock()
  2603. n := len(c.chR) + len(c.chW)
  2604. c.chLock.Unlock()
  2605. return n
  2606. }
  2607. var errPipelineConnStopped = errors.New("pipeline connection has been stopped")
  2608. var DefaultTransport RoundTripper = &transport{}
  2609. type transport struct{}
  2610. func (t *transport) RoundTrip(hc *HostClient, req *Request, resp *Response) (retry bool, err error) {
  2611. customSkipBody := resp.SkipBody
  2612. customStreamBody := resp.StreamBody
  2613. var deadline time.Time
  2614. if req.timeout > 0 {
  2615. deadline = time.Now().Add(req.timeout)
  2616. }
  2617. cc, err := hc.acquireConn(req.timeout, req.ConnectionClose())
  2618. if err != nil {
  2619. return false, err
  2620. }
  2621. conn := cc.c
  2622. resp.parseNetConn(conn)
  2623. writeDeadline := deadline
  2624. if hc.WriteTimeout > 0 {
  2625. tmpWriteDeadline := time.Now().Add(hc.WriteTimeout)
  2626. if writeDeadline.IsZero() || tmpWriteDeadline.Before(writeDeadline) {
  2627. writeDeadline = tmpWriteDeadline
  2628. }
  2629. }
  2630. if err = conn.SetWriteDeadline(writeDeadline); err != nil {
  2631. hc.closeConn(cc)
  2632. return true, err
  2633. }
  2634. resetConnection := false
  2635. if hc.MaxConnDuration > 0 && time.Since(cc.createdTime) > hc.MaxConnDuration && !req.ConnectionClose() {
  2636. req.SetConnectionClose()
  2637. resetConnection = true
  2638. }
  2639. bw := hc.acquireWriter(conn)
  2640. err = req.Write(bw)
  2641. if resetConnection {
  2642. req.Header.ResetConnectionClose()
  2643. }
  2644. if err == nil {
  2645. err = bw.Flush()
  2646. }
  2647. hc.releaseWriter(bw)
  2648. // Return ErrTimeout on any timeout.
  2649. if x, ok := err.(interface{ Timeout() bool }); ok && x.Timeout() {
  2650. err = ErrTimeout
  2651. }
  2652. if err != nil {
  2653. hc.closeConn(cc)
  2654. return true, err
  2655. }
  2656. readDeadline := deadline
  2657. if hc.ReadTimeout > 0 {
  2658. tmpReadDeadline := time.Now().Add(hc.ReadTimeout)
  2659. if readDeadline.IsZero() || tmpReadDeadline.Before(readDeadline) {
  2660. readDeadline = tmpReadDeadline
  2661. }
  2662. }
  2663. if err = conn.SetReadDeadline(readDeadline); err != nil {
  2664. hc.closeConn(cc)
  2665. return true, err
  2666. }
  2667. if customSkipBody || req.Header.IsHead() {
  2668. resp.SkipBody = true
  2669. }
  2670. if hc.DisableHeaderNamesNormalizing {
  2671. resp.Header.DisableNormalizing()
  2672. }
  2673. br := hc.acquireReader(conn)
  2674. err = resp.ReadLimitBody(br, hc.MaxResponseBodySize)
  2675. if err != nil {
  2676. hc.releaseReader(br)
  2677. hc.closeConn(cc)
  2678. // Don't retry in case of ErrBodyTooLarge since we will just get the same again.
  2679. needRetry := err != ErrBodyTooLarge
  2680. return needRetry, err
  2681. }
  2682. closeConn := resetConnection || req.ConnectionClose() || resp.ConnectionClose()
  2683. if customStreamBody && resp.bodyStream != nil {
  2684. rbs := resp.bodyStream
  2685. resp.bodyStream = newCloseReaderWithError(rbs, func(wErr error) error {
  2686. hc.releaseReader(br)
  2687. if r, ok := rbs.(*requestStream); ok {
  2688. releaseRequestStream(r)
  2689. }
  2690. if closeConn || resp.ConnectionClose() || wErr != nil {
  2691. hc.closeConn(cc)
  2692. } else {
  2693. hc.releaseConn(cc)
  2694. }
  2695. return nil
  2696. })
  2697. return false, nil
  2698. }
  2699. hc.releaseReader(br)
  2700. if closeConn {
  2701. hc.closeConn(cc)
  2702. } else {
  2703. hc.releaseConn(cc)
  2704. }
  2705. return false, nil
  2706. }