client.go 80 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910291129122913291429152916291729182919292029212922292329242925292629272928292929302931293229332934293529362937293829392940294129422943294429452946294729482949295029512952295329542955295629572958295929602961296229632964296529662967296829692970297129722973297429752976297729782979298029812982298329842985298629872988298929902991299229932994299529962997299829993000300130023003300430053006
  1. package fasthttp
  2. import (
  3. "bufio"
  4. "bytes"
  5. "crypto/tls"
  6. "errors"
  7. "fmt"
  8. "io"
  9. "net"
  10. "strings"
  11. "sync"
  12. "sync/atomic"
  13. "time"
  14. )
  15. // Do performs the given http request and fills the given http response.
  16. //
  17. // Request must contain at least non-zero RequestURI with full url (including
  18. // scheme and host) or non-zero Host header + RequestURI.
  19. //
  20. // Client determines the server to be requested in the following order:
  21. //
  22. // - from RequestURI if it contains full url with scheme and host;
  23. // - from Host header otherwise.
  24. //
  25. // The function doesn't follow redirects. Use Get* for following redirects.
  26. //
  27. // Response is ignored if resp is nil.
  28. //
  29. // ErrNoFreeConns is returned if all DefaultMaxConnsPerHost connections
  30. // to the requested host are busy.
  31. //
  32. // It is recommended obtaining req and resp via AcquireRequest
  33. // and AcquireResponse in performance-critical code.
  34. func Do(req *Request, resp *Response) error {
  35. return defaultClient.Do(req, resp)
  36. }
  37. // DoTimeout performs the given request and waits for response during
  38. // the given timeout duration.
  39. //
  40. // Request must contain at least non-zero RequestURI with full url (including
  41. // scheme and host) or non-zero Host header + RequestURI.
  42. //
  43. // Client determines the server to be requested in the following order:
  44. //
  45. // - from RequestURI if it contains full url with scheme and host;
  46. // - from Host header otherwise.
  47. //
  48. // The function doesn't follow redirects. Use Get* for following redirects.
  49. //
  50. // Response is ignored if resp is nil.
  51. //
  52. // ErrTimeout is returned if the response wasn't returned during
  53. // the given timeout.
  54. //
  55. // ErrNoFreeConns is returned if all DefaultMaxConnsPerHost connections
  56. // to the requested host are busy.
  57. //
  58. // It is recommended obtaining req and resp via AcquireRequest
  59. // and AcquireResponse in performance-critical code.
  60. func DoTimeout(req *Request, resp *Response, timeout time.Duration) error {
  61. return defaultClient.DoTimeout(req, resp, timeout)
  62. }
  63. // DoDeadline performs the given request and waits for response until
  64. // the given deadline.
  65. //
  66. // Request must contain at least non-zero RequestURI with full url (including
  67. // scheme and host) or non-zero Host header + RequestURI.
  68. //
  69. // Client determines the server to be requested in the following order:
  70. //
  71. // - from RequestURI if it contains full url with scheme and host;
  72. // - from Host header otherwise.
  73. //
  74. // The function doesn't follow redirects. Use Get* for following redirects.
  75. //
  76. // Response is ignored if resp is nil.
  77. //
  78. // ErrTimeout is returned if the response wasn't returned until
  79. // the given deadline.
  80. //
  81. // ErrNoFreeConns is returned if all DefaultMaxConnsPerHost connections
  82. // to the requested host are busy.
  83. //
  84. // It is recommended obtaining req and resp via AcquireRequest
  85. // and AcquireResponse in performance-critical code.
  86. func DoDeadline(req *Request, resp *Response, deadline time.Time) error {
  87. return defaultClient.DoDeadline(req, resp, deadline)
  88. }
  89. // DoRedirects performs the given http request and fills the given http response,
  90. // following up to maxRedirectsCount redirects. When the redirect count exceeds
  91. // maxRedirectsCount, ErrTooManyRedirects is returned.
  92. //
  93. // Request must contain at least non-zero RequestURI with full url (including
  94. // scheme and host) or non-zero Host header + RequestURI.
  95. //
  96. // Client determines the server to be requested in the following order:
  97. //
  98. // - from RequestURI if it contains full url with scheme and host;
  99. // - from Host header otherwise.
  100. //
  101. // Response is ignored if resp is nil.
  102. //
  103. // ErrNoFreeConns is returned if all DefaultMaxConnsPerHost connections
  104. // to the requested host are busy.
  105. //
  106. // It is recommended obtaining req and resp via AcquireRequest
  107. // and AcquireResponse in performance-critical code.
  108. func DoRedirects(req *Request, resp *Response, maxRedirectsCount int) error {
  109. _, _, err := doRequestFollowRedirects(req, resp, req.URI().String(), maxRedirectsCount, &defaultClient)
  110. return err
  111. }
  112. // Get returns the status code and body of url.
  113. //
  114. // The contents of dst will be replaced by the body and returned, if the dst
  115. // is too small a new slice will be allocated.
  116. //
  117. // The function follows redirects. Use Do* for manually handling redirects.
  118. func Get(dst []byte, url string) (statusCode int, body []byte, err error) {
  119. return defaultClient.Get(dst, url)
  120. }
  121. // GetTimeout returns the status code and body of url.
  122. //
  123. // The contents of dst will be replaced by the body and returned, if the dst
  124. // is too small a new slice will be allocated.
  125. //
  126. // The function follows redirects. Use Do* for manually handling redirects.
  127. //
  128. // ErrTimeout error is returned if url contents couldn't be fetched
  129. // during the given timeout.
  130. func GetTimeout(dst []byte, url string, timeout time.Duration) (statusCode int, body []byte, err error) {
  131. return defaultClient.GetTimeout(dst, url, timeout)
  132. }
  133. // GetDeadline returns the status code and body of url.
  134. //
  135. // The contents of dst will be replaced by the body and returned, if the dst
  136. // is too small a new slice will be allocated.
  137. //
  138. // The function follows redirects. Use Do* for manually handling redirects.
  139. //
  140. // ErrTimeout error is returned if url contents couldn't be fetched
  141. // until the given deadline.
  142. func GetDeadline(dst []byte, url string, deadline time.Time) (statusCode int, body []byte, err error) {
  143. return defaultClient.GetDeadline(dst, url, deadline)
  144. }
  145. // Post sends POST request to the given url with the given POST arguments.
  146. //
  147. // The contents of dst will be replaced by the body and returned, if the dst
  148. // is too small a new slice will be allocated.
  149. //
  150. // The function follows redirects. Use Do* for manually handling redirects.
  151. //
  152. // Empty POST body is sent if postArgs is nil.
  153. func Post(dst []byte, url string, postArgs *Args) (statusCode int, body []byte, err error) {
  154. return defaultClient.Post(dst, url, postArgs)
  155. }
  156. var defaultClient Client
  157. // Client implements http client.
  158. //
  159. // Copying Client by value is prohibited. Create new instance instead.
  160. //
  161. // It is safe calling Client methods from concurrently running goroutines.
  162. //
  163. // The fields of a Client should not be changed while it is in use.
  164. type Client struct {
  165. noCopy noCopy
  166. // Client name. Used in User-Agent request header.
  167. //
  168. // Default client name is used if not set.
  169. Name string
  170. // NoDefaultUserAgentHeader when set to true, causes the default
  171. // User-Agent header to be excluded from the Request.
  172. NoDefaultUserAgentHeader bool
  173. // Callback for establishing new connections to hosts.
  174. //
  175. // Default DialTimeout is used if not set.
  176. DialTimeout DialFuncWithTimeout
  177. // Callback for establishing new connections to hosts.
  178. //
  179. // Note that if Dial is set instead of DialTimeout, Dial will ignore Request timeout.
  180. // If you want the tcp dial process to account for request timeouts, use DialTimeout instead.
  181. //
  182. // If not set, DialTimeout is used.
  183. Dial DialFunc
  184. // Attempt to connect to both ipv4 and ipv6 addresses if set to true.
  185. //
  186. // This option is used only if default TCP dialer is used,
  187. // i.e. if Dial is blank.
  188. //
  189. // By default client connects only to ipv4 addresses,
  190. // since unfortunately ipv6 remains broken in many networks worldwide :)
  191. DialDualStack bool
  192. // TLS config for https connections.
  193. //
  194. // Default TLS config is used if not set.
  195. TLSConfig *tls.Config
  196. // Maximum number of connections per each host which may be established.
  197. //
  198. // DefaultMaxConnsPerHost is used if not set.
  199. MaxConnsPerHost int
  200. // Idle keep-alive connections are closed after this duration.
  201. //
  202. // By default idle connections are closed
  203. // after DefaultMaxIdleConnDuration.
  204. MaxIdleConnDuration time.Duration
  205. // Keep-alive connections are closed after this duration.
  206. //
  207. // By default connection duration is unlimited.
  208. MaxConnDuration time.Duration
  209. // Maximum number of attempts for idempotent calls.
  210. //
  211. // DefaultMaxIdemponentCallAttempts is used if not set.
  212. MaxIdemponentCallAttempts int
  213. // Per-connection buffer size for responses' reading.
  214. // This also limits the maximum header size.
  215. //
  216. // Default buffer size is used if 0.
  217. ReadBufferSize int
  218. // Per-connection buffer size for requests' writing.
  219. //
  220. // Default buffer size is used if 0.
  221. WriteBufferSize int
  222. // Maximum duration for full response reading (including body).
  223. //
  224. // By default response read timeout is unlimited.
  225. ReadTimeout time.Duration
  226. // Maximum duration for full request writing (including body).
  227. //
  228. // By default request write timeout is unlimited.
  229. WriteTimeout time.Duration
  230. // Maximum response body size.
  231. //
  232. // The client returns ErrBodyTooLarge if this limit is greater than 0
  233. // and response body is greater than the limit.
  234. //
  235. // By default response body size is unlimited.
  236. MaxResponseBodySize int
  237. // Header names are passed as-is without normalization
  238. // if this option is set.
  239. //
  240. // Disabled header names' normalization may be useful only for proxying
  241. // responses to other clients expecting case-sensitive
  242. // header names. See https://github.com/valyala/fasthttp/issues/57
  243. // for details.
  244. //
  245. // By default request and response header names are normalized, i.e.
  246. // The first letter and the first letters following dashes
  247. // are uppercased, while all the other letters are lowercased.
  248. // Examples:
  249. //
  250. // * HOST -> Host
  251. // * content-type -> Content-Type
  252. // * cONTENT-lenGTH -> Content-Length
  253. DisableHeaderNamesNormalizing bool
  254. // Path values are sent as-is without normalization.
  255. //
  256. // Disabled path normalization may be useful for proxying incoming requests
  257. // to servers that are expecting paths to be forwarded as-is.
  258. //
  259. // By default path values are normalized, i.e.
  260. // extra slashes are removed, special characters are encoded.
  261. DisablePathNormalizing bool
  262. // Maximum duration for waiting for a free connection.
  263. //
  264. // By default will not waiting, return ErrNoFreeConns immediately.
  265. MaxConnWaitTimeout time.Duration
  266. // RetryIf controls whether a retry should be attempted after an error.
  267. //
  268. // By default will use isIdempotent function.
  269. RetryIf RetryIfFunc
  270. // Connection pool strategy. Can be either LIFO or FIFO (default).
  271. ConnPoolStrategy ConnPoolStrategyType
  272. // StreamResponseBody enables response body streaming.
  273. StreamResponseBody bool
  274. // ConfigureClient configures the fasthttp.HostClient.
  275. ConfigureClient func(hc *HostClient) error
  276. mLock sync.RWMutex
  277. mOnce sync.Once
  278. m map[string]*HostClient
  279. ms map[string]*HostClient
  280. readerPool sync.Pool
  281. writerPool sync.Pool
  282. }
  283. // Get returns the status code and body of url.
  284. //
  285. // The contents of dst will be replaced by the body and returned, if the dst
  286. // is too small a new slice will be allocated.
  287. //
  288. // The function follows redirects. Use Do* for manually handling redirects.
  289. func (c *Client) Get(dst []byte, url string) (statusCode int, body []byte, err error) {
  290. return clientGetURL(dst, url, c)
  291. }
  292. // GetTimeout returns the status code and body of url.
  293. //
  294. // The contents of dst will be replaced by the body and returned, if the dst
  295. // is too small a new slice will be allocated.
  296. //
  297. // The function follows redirects. Use Do* for manually handling redirects.
  298. //
  299. // ErrTimeout error is returned if url contents couldn't be fetched
  300. // during the given timeout.
  301. func (c *Client) GetTimeout(dst []byte, url string, timeout time.Duration) (statusCode int, body []byte, err error) {
  302. return clientGetURLTimeout(dst, url, timeout, c)
  303. }
  304. // GetDeadline returns the status code and body of url.
  305. //
  306. // The contents of dst will be replaced by the body and returned, if the dst
  307. // is too small a new slice will be allocated.
  308. //
  309. // The function follows redirects. Use Do* for manually handling redirects.
  310. //
  311. // ErrTimeout error is returned if url contents couldn't be fetched
  312. // until the given deadline.
  313. func (c *Client) GetDeadline(dst []byte, url string, deadline time.Time) (statusCode int, body []byte, err error) {
  314. return clientGetURLDeadline(dst, url, deadline, c)
  315. }
  316. // Post sends POST request to the given url with the given POST arguments.
  317. //
  318. // The contents of dst will be replaced by the body and returned, if the dst
  319. // is too small a new slice will be allocated.
  320. //
  321. // The function follows redirects. Use Do* for manually handling redirects.
  322. //
  323. // Empty POST body is sent if postArgs is nil.
  324. func (c *Client) Post(dst []byte, url string, postArgs *Args) (statusCode int, body []byte, err error) {
  325. return clientPostURL(dst, url, postArgs, c)
  326. }
  327. // DoTimeout performs the given request and waits for response during
  328. // the given timeout duration.
  329. //
  330. // Request must contain at least non-zero RequestURI with full url (including
  331. // scheme and host) or non-zero Host header + RequestURI.
  332. //
  333. // Client determines the server to be requested in the following order:
  334. //
  335. // - from RequestURI if it contains full url with scheme and host;
  336. // - from Host header otherwise.
  337. //
  338. // The function doesn't follow redirects. Use Get* for following redirects.
  339. //
  340. // Response is ignored if resp is nil.
  341. //
  342. // ErrTimeout is returned if the response wasn't returned during
  343. // the given timeout.
  344. // Immediately returns ErrTimeout if timeout value is negative.
  345. //
  346. // ErrNoFreeConns is returned if all Client.MaxConnsPerHost connections
  347. // to the requested host are busy.
  348. //
  349. // It is recommended obtaining req and resp via AcquireRequest
  350. // and AcquireResponse in performance-critical code.
  351. func (c *Client) DoTimeout(req *Request, resp *Response, timeout time.Duration) error {
  352. req.timeout = timeout
  353. if req.timeout <= 0 {
  354. return ErrTimeout
  355. }
  356. return c.Do(req, resp)
  357. }
  358. // DoDeadline performs the given request and waits for response until
  359. // the given deadline.
  360. //
  361. // Request must contain at least non-zero RequestURI with full url (including
  362. // scheme and host) or non-zero Host header + RequestURI.
  363. //
  364. // Client determines the server to be requested in the following order:
  365. //
  366. // - from RequestURI if it contains full url with scheme and host;
  367. // - from Host header otherwise.
  368. //
  369. // The function doesn't follow redirects. Use Get* for following redirects.
  370. //
  371. // Response is ignored if resp is nil.
  372. //
  373. // ErrTimeout is returned if the response wasn't returned until
  374. // the given deadline.
  375. // Immediately returns ErrTimeout if the deadline has already been reached.
  376. //
  377. // ErrNoFreeConns is returned if all Client.MaxConnsPerHost connections
  378. // to the requested host are busy.
  379. //
  380. // It is recommended obtaining req and resp via AcquireRequest
  381. // and AcquireResponse in performance-critical code.
  382. func (c *Client) DoDeadline(req *Request, resp *Response, deadline time.Time) error {
  383. req.timeout = time.Until(deadline)
  384. if req.timeout <= 0 {
  385. return ErrTimeout
  386. }
  387. return c.Do(req, resp)
  388. }
  389. // DoRedirects performs the given http request and fills the given http response,
  390. // following up to maxRedirectsCount redirects. When the redirect count exceeds
  391. // maxRedirectsCount, ErrTooManyRedirects is returned.
  392. //
  393. // Request must contain at least non-zero RequestURI with full url (including
  394. // scheme and host) or non-zero Host header + RequestURI.
  395. //
  396. // Client determines the server to be requested in the following order:
  397. //
  398. // - from RequestURI if it contains full url with scheme and host;
  399. // - from Host header otherwise.
  400. //
  401. // Response is ignored if resp is nil.
  402. //
  403. // ErrNoFreeConns is returned if all DefaultMaxConnsPerHost connections
  404. // to the requested host are busy.
  405. //
  406. // It is recommended obtaining req and resp via AcquireRequest
  407. // and AcquireResponse in performance-critical code.
  408. func (c *Client) DoRedirects(req *Request, resp *Response, maxRedirectsCount int) error {
  409. _, _, err := doRequestFollowRedirects(req, resp, req.URI().String(), maxRedirectsCount, c)
  410. return err
  411. }
  412. // Do performs the given http request and fills the given http response.
  413. //
  414. // Request must contain at least non-zero RequestURI with full url (including
  415. // scheme and host) or non-zero Host header + RequestURI.
  416. //
  417. // Client determines the server to be requested in the following order:
  418. //
  419. // - from RequestURI if it contains full url with scheme and host;
  420. // - from Host header otherwise.
  421. //
  422. // Response is ignored if resp is nil.
  423. //
  424. // The function doesn't follow redirects. Use Get* for following redirects.
  425. //
  426. // ErrNoFreeConns is returned if all Client.MaxConnsPerHost connections
  427. // to the requested host are busy.
  428. //
  429. // It is recommended obtaining req and resp via AcquireRequest
  430. // and AcquireResponse in performance-critical code.
  431. func (c *Client) Do(req *Request, resp *Response) error {
  432. uri := req.URI()
  433. if uri == nil {
  434. return ErrorInvalidURI
  435. }
  436. host := uri.Host()
  437. if bytes.ContainsRune(host, ',') {
  438. return fmt.Errorf("invalid host %q. Use HostClient for multiple hosts", host)
  439. }
  440. isTLS := false
  441. if uri.isHTTPS() {
  442. isTLS = true
  443. } else if !uri.isHTTP() {
  444. return fmt.Errorf("unsupported protocol %q. http and https are supported", uri.Scheme())
  445. }
  446. c.mOnce.Do(func() {
  447. c.mLock.Lock()
  448. c.m = make(map[string]*HostClient)
  449. c.ms = make(map[string]*HostClient)
  450. c.mLock.Unlock()
  451. })
  452. startCleaner := false
  453. c.mLock.RLock()
  454. m := c.m
  455. if isTLS {
  456. m = c.ms
  457. }
  458. hc := m[string(host)]
  459. if hc != nil {
  460. atomic.AddInt32(&hc.pendingClientRequests, 1)
  461. defer atomic.AddInt32(&hc.pendingClientRequests, -1)
  462. }
  463. c.mLock.RUnlock()
  464. if hc == nil {
  465. c.mLock.Lock()
  466. hc = m[string(host)]
  467. if hc == nil {
  468. hc = &HostClient{
  469. Addr: AddMissingPort(string(host), isTLS),
  470. Name: c.Name,
  471. NoDefaultUserAgentHeader: c.NoDefaultUserAgentHeader,
  472. Dial: c.Dial,
  473. DialTimeout: c.DialTimeout,
  474. DialDualStack: c.DialDualStack,
  475. IsTLS: isTLS,
  476. TLSConfig: c.TLSConfig,
  477. MaxConns: c.MaxConnsPerHost,
  478. MaxIdleConnDuration: c.MaxIdleConnDuration,
  479. MaxConnDuration: c.MaxConnDuration,
  480. MaxIdemponentCallAttempts: c.MaxIdemponentCallAttempts,
  481. ReadBufferSize: c.ReadBufferSize,
  482. WriteBufferSize: c.WriteBufferSize,
  483. ReadTimeout: c.ReadTimeout,
  484. WriteTimeout: c.WriteTimeout,
  485. MaxResponseBodySize: c.MaxResponseBodySize,
  486. DisableHeaderNamesNormalizing: c.DisableHeaderNamesNormalizing,
  487. DisablePathNormalizing: c.DisablePathNormalizing,
  488. MaxConnWaitTimeout: c.MaxConnWaitTimeout,
  489. RetryIf: c.RetryIf,
  490. ConnPoolStrategy: c.ConnPoolStrategy,
  491. StreamResponseBody: c.StreamResponseBody,
  492. clientReaderPool: &c.readerPool,
  493. clientWriterPool: &c.writerPool,
  494. }
  495. if c.ConfigureClient != nil {
  496. if err := c.ConfigureClient(hc); err != nil {
  497. c.mLock.Unlock()
  498. return err
  499. }
  500. }
  501. m[string(host)] = hc
  502. if len(m) == 1 {
  503. startCleaner = true
  504. }
  505. }
  506. atomic.AddInt32(&hc.pendingClientRequests, 1)
  507. defer atomic.AddInt32(&hc.pendingClientRequests, -1)
  508. c.mLock.Unlock()
  509. }
  510. if startCleaner {
  511. go c.mCleaner(m)
  512. }
  513. return hc.Do(req, resp)
  514. }
  515. // CloseIdleConnections closes any connections which were previously
  516. // connected from previous requests but are now sitting idle in a
  517. // "keep-alive" state. It does not interrupt any connections currently
  518. // in use.
  519. func (c *Client) CloseIdleConnections() {
  520. c.mLock.RLock()
  521. for _, v := range c.m {
  522. v.CloseIdleConnections()
  523. }
  524. for _, v := range c.ms {
  525. v.CloseIdleConnections()
  526. }
  527. c.mLock.RUnlock()
  528. }
  529. func (c *Client) mCleaner(m map[string]*HostClient) {
  530. mustStop := false
  531. sleep := c.MaxIdleConnDuration
  532. if sleep < time.Second {
  533. sleep = time.Second
  534. } else if sleep > 10*time.Second {
  535. sleep = 10 * time.Second
  536. }
  537. for {
  538. time.Sleep(sleep)
  539. c.mLock.Lock()
  540. for k, v := range m {
  541. v.connsLock.Lock()
  542. /* #nosec G601 */
  543. if v.connsCount == 0 && atomic.LoadInt32(&v.pendingClientRequests) == 0 {
  544. delete(m, k)
  545. }
  546. v.connsLock.Unlock()
  547. }
  548. if len(m) == 0 {
  549. mustStop = true
  550. }
  551. c.mLock.Unlock()
  552. if mustStop {
  553. break
  554. }
  555. }
  556. }
  557. // DefaultMaxConnsPerHost is the maximum number of concurrent connections
  558. // http client may establish per host by default (i.e. if
  559. // Client.MaxConnsPerHost isn't set).
  560. const DefaultMaxConnsPerHost = 512
  561. // DefaultMaxIdleConnDuration is the default duration before idle keep-alive
  562. // connection is closed.
  563. const DefaultMaxIdleConnDuration = 10 * time.Second
  564. // DefaultMaxIdemponentCallAttempts is the default idempotent calls attempts count.
  565. const DefaultMaxIdemponentCallAttempts = 5
  566. // DialFunc must establish connection to addr.
  567. //
  568. // There is no need in establishing TLS (SSL) connection for https.
  569. // The client automatically converts connection to TLS
  570. // if HostClient.IsTLS is set.
  571. //
  572. // TCP address passed to DialFunc always contains host and port.
  573. // Example TCP addr values:
  574. //
  575. // - foobar.com:80
  576. // - foobar.com:443
  577. // - foobar.com:8080
  578. type DialFunc func(addr string) (net.Conn, error)
  579. // DialFuncWithTimeout must establish connection to addr.
  580. // Unlike DialFunc, it also accepts a timeout.
  581. //
  582. // There is no need in establishing TLS (SSL) connection for https.
  583. // The client automatically converts connection to TLS
  584. // if HostClient.IsTLS is set.
  585. //
  586. // TCP address passed to DialFuncWithTimeout always contains host and port.
  587. // Example TCP addr values:
  588. //
  589. // - foobar.com:80
  590. // - foobar.com:443
  591. // - foobar.com:8080
  592. type DialFuncWithTimeout func(addr string, timeout time.Duration) (net.Conn, error)
  593. // RetryIfFunc signature of retry if function.
  594. //
  595. // Request argument passed to RetryIfFunc, if there are any request errors.
  596. type RetryIfFunc func(request *Request) bool
  597. // RoundTripper wraps every request/response.
  598. type RoundTripper interface {
  599. RoundTrip(hc *HostClient, req *Request, resp *Response) (retry bool, err error)
  600. }
  601. // ConnPoolStrategyType define strategy of connection pool enqueue/dequeue.
  602. type ConnPoolStrategyType int
  603. const (
  604. FIFO ConnPoolStrategyType = iota
  605. LIFO
  606. )
  607. // HostClient balances http requests among hosts listed in Addr.
  608. //
  609. // HostClient may be used for balancing load among multiple upstream hosts.
  610. // While multiple addresses passed to HostClient.Addr may be used for balancing
  611. // load among them, it would be better using LBClient instead, since HostClient
  612. // may unevenly balance load among upstream hosts.
  613. //
  614. // It is forbidden copying HostClient instances. Create new instances instead.
  615. //
  616. // It is safe calling HostClient methods from concurrently running goroutines.
  617. type HostClient struct {
  618. noCopy noCopy
  619. // Comma-separated list of upstream HTTP server host addresses,
  620. // which are passed to Dial or DialTimeout in a round-robin manner.
  621. //
  622. // Each address may contain port if default dialer is used.
  623. // For example,
  624. //
  625. // - foobar.com:80
  626. // - foobar.com:443
  627. // - foobar.com:8080
  628. Addr string
  629. // Client name. Used in User-Agent request header.
  630. Name string
  631. // NoDefaultUserAgentHeader when set to true, causes the default
  632. // User-Agent header to be excluded from the Request.
  633. NoDefaultUserAgentHeader bool
  634. // Callback for establishing new connections to hosts.
  635. //
  636. // Default DialTimeout is used if not set.
  637. DialTimeout DialFuncWithTimeout
  638. // Callback for establishing new connections to hosts.
  639. //
  640. // Note that if Dial is set instead of DialTimeout, Dial will ignore Request timeout.
  641. // If you want the tcp dial process to account for request timeouts, use DialTimeout instead.
  642. //
  643. // If not set, DialTimeout is used.
  644. Dial DialFunc
  645. // Attempt to connect to both ipv4 and ipv6 host addresses
  646. // if set to true.
  647. //
  648. // This option is used only if default TCP dialer is used,
  649. // i.e. if Dial and DialTimeout are blank.
  650. //
  651. // By default client connects only to ipv4 addresses,
  652. // since unfortunately ipv6 remains broken in many networks worldwide :)
  653. DialDualStack bool
  654. // Whether to use TLS (aka SSL or HTTPS) for host connections.
  655. IsTLS bool
  656. // Optional TLS config.
  657. TLSConfig *tls.Config
  658. // Maximum number of connections which may be established to all hosts
  659. // listed in Addr.
  660. //
  661. // You can change this value while the HostClient is being used
  662. // with HostClient.SetMaxConns(value)
  663. //
  664. // DefaultMaxConnsPerHost is used if not set.
  665. MaxConns int
  666. // Keep-alive connections are closed after this duration.
  667. //
  668. // By default connection duration is unlimited.
  669. MaxConnDuration time.Duration
  670. // Idle keep-alive connections are closed after this duration.
  671. //
  672. // By default idle connections are closed
  673. // after DefaultMaxIdleConnDuration.
  674. MaxIdleConnDuration time.Duration
  675. // Maximum number of attempts for idempotent calls.
  676. //
  677. // DefaultMaxIdemponentCallAttempts is used if not set.
  678. MaxIdemponentCallAttempts int
  679. // Per-connection buffer size for responses' reading.
  680. // This also limits the maximum header size.
  681. //
  682. // Default buffer size is used if 0.
  683. ReadBufferSize int
  684. // Per-connection buffer size for requests' writing.
  685. //
  686. // Default buffer size is used if 0.
  687. WriteBufferSize int
  688. // Maximum duration for full response reading (including body).
  689. //
  690. // By default response read timeout is unlimited.
  691. ReadTimeout time.Duration
  692. // Maximum duration for full request writing (including body).
  693. //
  694. // By default request write timeout is unlimited.
  695. WriteTimeout time.Duration
  696. // Maximum response body size.
  697. //
  698. // The client returns ErrBodyTooLarge if this limit is greater than 0
  699. // and response body is greater than the limit.
  700. //
  701. // By default response body size is unlimited.
  702. MaxResponseBodySize int
  703. // Header names are passed as-is without normalization
  704. // if this option is set.
  705. //
  706. // Disabled header names' normalization may be useful only for proxying
  707. // responses to other clients expecting case-sensitive
  708. // header names. See https://github.com/valyala/fasthttp/issues/57
  709. // for details.
  710. //
  711. // By default request and response header names are normalized, i.e.
  712. // The first letter and the first letters following dashes
  713. // are uppercased, while all the other letters are lowercased.
  714. // Examples:
  715. //
  716. // * HOST -> Host
  717. // * content-type -> Content-Type
  718. // * cONTENT-lenGTH -> Content-Length
  719. DisableHeaderNamesNormalizing bool
  720. // Path values are sent as-is without normalization.
  721. //
  722. // Disabled path normalization may be useful for proxying incoming requests
  723. // to servers that are expecting paths to be forwarded as-is.
  724. //
  725. // By default path values are normalized, i.e.
  726. // extra slashes are removed, special characters are encoded.
  727. DisablePathNormalizing bool
  728. // Will not log potentially sensitive content in error logs.
  729. //
  730. // This option is useful for servers that handle sensitive data
  731. // in the request/response.
  732. //
  733. // Client logs full errors by default.
  734. SecureErrorLogMessage bool
  735. // Maximum duration for waiting for a free connection.
  736. //
  737. // By default will not waiting, return ErrNoFreeConns immediately
  738. MaxConnWaitTimeout time.Duration
  739. // RetryIf controls whether a retry should be attempted after an error.
  740. //
  741. // By default will use isIdempotent function
  742. RetryIf RetryIfFunc
  743. // Transport defines a transport-like mechanism that wraps every request/response.
  744. Transport RoundTripper
  745. // Connection pool strategy. Can be either LIFO or FIFO (default).
  746. ConnPoolStrategy ConnPoolStrategyType
  747. // StreamResponseBody enables response body streaming.
  748. StreamResponseBody bool
  749. lastUseTime uint32
  750. connsLock sync.Mutex
  751. connsCount int
  752. conns []*clientConn
  753. connsWait *wantConnQueue
  754. addrsLock sync.Mutex
  755. addrs []string
  756. addrIdx uint32
  757. tlsConfigMap map[string]*tls.Config
  758. tlsConfigMapLock sync.Mutex
  759. readerPool sync.Pool
  760. writerPool sync.Pool
  761. clientReaderPool *sync.Pool
  762. clientWriterPool *sync.Pool
  763. pendingRequests int32
  764. // pendingClientRequests counts the number of requests that a Client is currently running using this HostClient.
  765. // It will be incremented earlier than pendingRequests and will be used by Client to see if the HostClient is still in use.
  766. pendingClientRequests int32
  767. connsCleanerRun bool
  768. }
  769. type clientConn struct {
  770. c net.Conn
  771. createdTime time.Time
  772. lastUseTime time.Time
  773. }
  774. var startTimeUnix = time.Now().Unix()
  775. // LastUseTime returns time the client was last used.
  776. func (c *HostClient) LastUseTime() time.Time {
  777. n := atomic.LoadUint32(&c.lastUseTime)
  778. return time.Unix(startTimeUnix+int64(n), 0)
  779. }
  780. // Get returns the status code and body of url.
  781. //
  782. // The contents of dst will be replaced by the body and returned, if the dst
  783. // is too small a new slice will be allocated.
  784. //
  785. // The function follows redirects. Use Do* for manually handling redirects.
  786. func (c *HostClient) Get(dst []byte, url string) (statusCode int, body []byte, err error) {
  787. return clientGetURL(dst, url, c)
  788. }
  789. // GetTimeout returns the status code and body of url.
  790. //
  791. // The contents of dst will be replaced by the body and returned, if the dst
  792. // is too small a new slice will be allocated.
  793. //
  794. // The function follows redirects. Use Do* for manually handling redirects.
  795. //
  796. // ErrTimeout error is returned if url contents couldn't be fetched
  797. // during the given timeout.
  798. func (c *HostClient) GetTimeout(dst []byte, url string, timeout time.Duration) (statusCode int, body []byte, err error) {
  799. return clientGetURLTimeout(dst, url, timeout, c)
  800. }
  801. // GetDeadline returns the status code and body of url.
  802. //
  803. // The contents of dst will be replaced by the body and returned, if the dst
  804. // is too small a new slice will be allocated.
  805. //
  806. // The function follows redirects. Use Do* for manually handling redirects.
  807. //
  808. // ErrTimeout error is returned if url contents couldn't be fetched
  809. // until the given deadline.
  810. func (c *HostClient) GetDeadline(dst []byte, url string, deadline time.Time) (statusCode int, body []byte, err error) {
  811. return clientGetURLDeadline(dst, url, deadline, c)
  812. }
  813. // Post sends POST request to the given url with the given POST arguments.
  814. //
  815. // The contents of dst will be replaced by the body and returned, if the dst
  816. // is too small a new slice will be allocated.
  817. //
  818. // The function follows redirects. Use Do* for manually handling redirects.
  819. //
  820. // Empty POST body is sent if postArgs is nil.
  821. func (c *HostClient) Post(dst []byte, url string, postArgs *Args) (statusCode int, body []byte, err error) {
  822. return clientPostURL(dst, url, postArgs, c)
  823. }
  824. type clientDoer interface {
  825. Do(req *Request, resp *Response) error
  826. }
  827. func clientGetURL(dst []byte, url string, c clientDoer) (statusCode int, body []byte, err error) {
  828. req := AcquireRequest()
  829. statusCode, body, err = doRequestFollowRedirectsBuffer(req, dst, url, c)
  830. ReleaseRequest(req)
  831. return statusCode, body, err
  832. }
  833. func clientGetURLTimeout(dst []byte, url string, timeout time.Duration, c clientDoer) (statusCode int, body []byte, err error) {
  834. deadline := time.Now().Add(timeout)
  835. return clientGetURLDeadline(dst, url, deadline, c)
  836. }
  837. type clientURLResponse struct {
  838. statusCode int
  839. body []byte
  840. err error
  841. }
  842. func clientGetURLDeadline(dst []byte, url string, deadline time.Time, c clientDoer) (statusCode int, body []byte, err error) {
  843. timeout := time.Until(deadline)
  844. if timeout <= 0 {
  845. return 0, dst, ErrTimeout
  846. }
  847. var ch chan clientURLResponse
  848. chv := clientURLResponseChPool.Get()
  849. if chv == nil {
  850. chv = make(chan clientURLResponse, 1)
  851. }
  852. ch = chv.(chan clientURLResponse)
  853. // Note that the request continues execution on ErrTimeout until
  854. // client-specific ReadTimeout exceeds. This helps limiting load
  855. // on slow hosts by MaxConns* concurrent requests.
  856. //
  857. // Without this 'hack' the load on slow host could exceed MaxConns*
  858. // concurrent requests, since timed out requests on client side
  859. // usually continue execution on the host.
  860. var mu sync.Mutex
  861. var timedout, responded bool
  862. go func() {
  863. req := AcquireRequest()
  864. statusCodeCopy, bodyCopy, errCopy := doRequestFollowRedirectsBuffer(req, dst, url, c)
  865. mu.Lock()
  866. if !timedout {
  867. ch <- clientURLResponse{
  868. statusCode: statusCodeCopy,
  869. body: bodyCopy,
  870. err: errCopy,
  871. }
  872. responded = true
  873. }
  874. mu.Unlock()
  875. ReleaseRequest(req)
  876. }()
  877. tc := AcquireTimer(timeout)
  878. select {
  879. case resp := <-ch:
  880. statusCode = resp.statusCode
  881. body = resp.body
  882. err = resp.err
  883. case <-tc.C:
  884. mu.Lock()
  885. if responded {
  886. resp := <-ch
  887. statusCode = resp.statusCode
  888. body = resp.body
  889. err = resp.err
  890. } else {
  891. timedout = true
  892. err = ErrTimeout
  893. body = dst
  894. }
  895. mu.Unlock()
  896. }
  897. ReleaseTimer(tc)
  898. clientURLResponseChPool.Put(chv)
  899. return statusCode, body, err
  900. }
  901. var clientURLResponseChPool sync.Pool
  902. func clientPostURL(dst []byte, url string, postArgs *Args, c clientDoer) (statusCode int, body []byte, err error) {
  903. req := AcquireRequest()
  904. defer ReleaseRequest(req)
  905. req.Header.SetMethod(MethodPost)
  906. req.Header.SetContentTypeBytes(strPostArgsContentType)
  907. if postArgs != nil {
  908. if _, err := postArgs.WriteTo(req.BodyWriter()); err != nil {
  909. return 0, nil, err
  910. }
  911. }
  912. statusCode, body, err = doRequestFollowRedirectsBuffer(req, dst, url, c)
  913. return statusCode, body, err
  914. }
  915. var (
  916. // ErrMissingLocation is returned by clients when the Location header is missing on
  917. // an HTTP response with a redirect status code.
  918. ErrMissingLocation = errors.New("missing Location header for http redirect")
  919. // ErrTooManyRedirects is returned by clients when the number of redirects followed
  920. // exceed the max count.
  921. ErrTooManyRedirects = errors.New("too many redirects detected when doing the request")
  922. // HostClients are only able to follow redirects to the same protocol.
  923. ErrHostClientRedirectToDifferentScheme = errors.New("HostClient can't follow redirects to a different protocol," +
  924. " please use Client instead")
  925. )
  926. const defaultMaxRedirectsCount = 16
  927. func doRequestFollowRedirectsBuffer(req *Request, dst []byte, url string, c clientDoer) (statusCode int, body []byte, err error) {
  928. resp := AcquireResponse()
  929. bodyBuf := resp.bodyBuffer()
  930. resp.keepBodyBuffer = true
  931. oldBody := bodyBuf.B
  932. bodyBuf.B = dst
  933. statusCode, _, err = doRequestFollowRedirects(req, resp, url, defaultMaxRedirectsCount, c)
  934. body = bodyBuf.B
  935. bodyBuf.B = oldBody
  936. resp.keepBodyBuffer = false
  937. ReleaseResponse(resp)
  938. return statusCode, body, err
  939. }
  940. func doRequestFollowRedirects(
  941. req *Request, resp *Response, url string, maxRedirectsCount int, c clientDoer,
  942. ) (statusCode int, body []byte, err error) {
  943. redirectsCount := 0
  944. for {
  945. req.SetRequestURI(url)
  946. if err := req.parseURI(); err != nil {
  947. return 0, nil, err
  948. }
  949. if err = c.Do(req, resp); err != nil {
  950. break
  951. }
  952. statusCode = resp.Header.StatusCode()
  953. if !StatusCodeIsRedirect(statusCode) {
  954. break
  955. }
  956. redirectsCount++
  957. if redirectsCount > maxRedirectsCount {
  958. err = ErrTooManyRedirects
  959. break
  960. }
  961. location := resp.Header.peek(strLocation)
  962. if len(location) == 0 {
  963. err = ErrMissingLocation
  964. break
  965. }
  966. url = getRedirectURL(url, location, req.DisableRedirectPathNormalizing)
  967. }
  968. return statusCode, body, err
  969. }
  970. func getRedirectURL(baseURL string, location []byte, disablePathNormalizing bool) string {
  971. u := AcquireURI()
  972. u.Update(baseURL)
  973. u.UpdateBytes(location)
  974. u.DisablePathNormalizing = disablePathNormalizing
  975. redirectURL := u.String()
  976. ReleaseURI(u)
  977. return redirectURL
  978. }
  979. // StatusCodeIsRedirect returns true if the status code indicates a redirect.
  980. func StatusCodeIsRedirect(statusCode int) bool {
  981. return statusCode == StatusMovedPermanently ||
  982. statusCode == StatusFound ||
  983. statusCode == StatusSeeOther ||
  984. statusCode == StatusTemporaryRedirect ||
  985. statusCode == StatusPermanentRedirect
  986. }
  987. var (
  988. requestPool sync.Pool
  989. responsePool sync.Pool
  990. )
  991. // AcquireRequest returns an empty Request instance from request pool.
  992. //
  993. // The returned Request instance may be passed to ReleaseRequest when it is
  994. // no longer needed. This allows Request recycling, reduces GC pressure
  995. // and usually improves performance.
  996. func AcquireRequest() *Request {
  997. v := requestPool.Get()
  998. if v == nil {
  999. return &Request{}
  1000. }
  1001. return v.(*Request)
  1002. }
  1003. // ReleaseRequest returns req acquired via AcquireRequest to request pool.
  1004. //
  1005. // It is forbidden accessing req and/or its' members after returning
  1006. // it to request pool.
  1007. func ReleaseRequest(req *Request) {
  1008. req.Reset()
  1009. requestPool.Put(req)
  1010. }
  1011. // AcquireResponse returns an empty Response instance from response pool.
  1012. //
  1013. // The returned Response instance may be passed to ReleaseResponse when it is
  1014. // no longer needed. This allows Response recycling, reduces GC pressure
  1015. // and usually improves performance.
  1016. func AcquireResponse() *Response {
  1017. v := responsePool.Get()
  1018. if v == nil {
  1019. return &Response{}
  1020. }
  1021. return v.(*Response)
  1022. }
  1023. // ReleaseResponse return resp acquired via AcquireResponse to response pool.
  1024. //
  1025. // It is forbidden accessing resp and/or its' members after returning
  1026. // it to response pool.
  1027. func ReleaseResponse(resp *Response) {
  1028. resp.Reset()
  1029. responsePool.Put(resp)
  1030. }
  1031. // DoTimeout performs the given request and waits for response during
  1032. // the given timeout duration.
  1033. //
  1034. // Request must contain at least non-zero RequestURI with full url (including
  1035. // scheme and host) or non-zero Host header + RequestURI.
  1036. //
  1037. // The function doesn't follow redirects. Use Get* for following redirects.
  1038. //
  1039. // Response is ignored if resp is nil.
  1040. //
  1041. // ErrTimeout is returned if the response wasn't returned during
  1042. // the given timeout.
  1043. // Immediately returns ErrTimeout if timeout value is negative.
  1044. //
  1045. // ErrNoFreeConns is returned if all HostClient.MaxConns connections
  1046. // to the host are busy.
  1047. //
  1048. // It is recommended obtaining req and resp via AcquireRequest
  1049. // and AcquireResponse in performance-critical code.
  1050. func (c *HostClient) DoTimeout(req *Request, resp *Response, timeout time.Duration) error {
  1051. req.timeout = timeout
  1052. if req.timeout <= 0 {
  1053. return ErrTimeout
  1054. }
  1055. return c.Do(req, resp)
  1056. }
  1057. // DoDeadline performs the given request and waits for response until
  1058. // the given deadline.
  1059. //
  1060. // Request must contain at least non-zero RequestURI with full url (including
  1061. // scheme and host) or non-zero Host header + RequestURI.
  1062. //
  1063. // The function doesn't follow redirects. Use Get* for following redirects.
  1064. //
  1065. // Response is ignored if resp is nil.
  1066. //
  1067. // ErrTimeout is returned if the response wasn't returned until
  1068. // the given deadline.
  1069. // Immediately returns ErrTimeout if the deadline has already been reached.
  1070. //
  1071. // ErrNoFreeConns is returned if all HostClient.MaxConns connections
  1072. // to the host are busy.
  1073. //
  1074. // It is recommended obtaining req and resp via AcquireRequest
  1075. // and AcquireResponse in performance-critical code.
  1076. func (c *HostClient) DoDeadline(req *Request, resp *Response, deadline time.Time) error {
  1077. req.timeout = time.Until(deadline)
  1078. if req.timeout <= 0 {
  1079. return ErrTimeout
  1080. }
  1081. return c.Do(req, resp)
  1082. }
  1083. // DoRedirects performs the given http request and fills the given http response,
  1084. // following up to maxRedirectsCount redirects. When the redirect count exceeds
  1085. // maxRedirectsCount, ErrTooManyRedirects is returned.
  1086. //
  1087. // Request must contain at least non-zero RequestURI with full url (including
  1088. // scheme and host) or non-zero Host header + RequestURI.
  1089. //
  1090. // Client determines the server to be requested in the following order:
  1091. //
  1092. // - from RequestURI if it contains full url with scheme and host;
  1093. // - from Host header otherwise.
  1094. //
  1095. // Response is ignored if resp is nil.
  1096. //
  1097. // ErrNoFreeConns is returned if all DefaultMaxConnsPerHost connections
  1098. // to the requested host are busy.
  1099. //
  1100. // It is recommended obtaining req and resp via AcquireRequest
  1101. // and AcquireResponse in performance-critical code.
  1102. func (c *HostClient) DoRedirects(req *Request, resp *Response, maxRedirectsCount int) error {
  1103. _, _, err := doRequestFollowRedirects(req, resp, req.URI().String(), maxRedirectsCount, c)
  1104. return err
  1105. }
  1106. // Do performs the given http request and sets the corresponding response.
  1107. //
  1108. // Request must contain at least non-zero RequestURI with full url (including
  1109. // scheme and host) or non-zero Host header + RequestURI.
  1110. //
  1111. // The function doesn't follow redirects. Use Get* for following redirects.
  1112. //
  1113. // Response is ignored if resp is nil.
  1114. //
  1115. // ErrNoFreeConns is returned if all HostClient.MaxConns connections
  1116. // to the host are busy.
  1117. //
  1118. // It is recommended obtaining req and resp via AcquireRequest
  1119. // and AcquireResponse in performance-critical code.
  1120. func (c *HostClient) Do(req *Request, resp *Response) error {
  1121. var err error
  1122. var retry bool
  1123. maxAttempts := c.MaxIdemponentCallAttempts
  1124. if maxAttempts <= 0 {
  1125. maxAttempts = DefaultMaxIdemponentCallAttempts
  1126. }
  1127. isRequestRetryable := isIdempotent
  1128. if c.RetryIf != nil {
  1129. isRequestRetryable = c.RetryIf
  1130. }
  1131. attempts := 0
  1132. hasBodyStream := req.IsBodyStream()
  1133. // If a request has a timeout we store the timeout
  1134. // and calculate a deadline so we can keep updating the
  1135. // timeout on each retry.
  1136. deadline := time.Time{}
  1137. timeout := req.timeout
  1138. if timeout > 0 {
  1139. deadline = time.Now().Add(timeout)
  1140. }
  1141. atomic.AddInt32(&c.pendingRequests, 1)
  1142. for {
  1143. // If the original timeout was set, we need to update
  1144. // the one set on the request to reflect the remaining time.
  1145. if timeout > 0 {
  1146. req.timeout = time.Until(deadline)
  1147. if req.timeout <= 0 {
  1148. err = ErrTimeout
  1149. break
  1150. }
  1151. }
  1152. retry, err = c.do(req, resp)
  1153. if err == nil || !retry {
  1154. break
  1155. }
  1156. if hasBodyStream {
  1157. break
  1158. }
  1159. if !isRequestRetryable(req) {
  1160. // Retry non-idempotent requests if the server closes
  1161. // the connection before sending the response.
  1162. //
  1163. // This case is possible if the server closes the idle
  1164. // keep-alive connection on timeout.
  1165. //
  1166. // Apache and nginx usually do this.
  1167. if err != io.EOF {
  1168. break
  1169. }
  1170. }
  1171. attempts++
  1172. if attempts >= maxAttempts {
  1173. break
  1174. }
  1175. }
  1176. atomic.AddInt32(&c.pendingRequests, -1)
  1177. // Restore the original timeout.
  1178. req.timeout = timeout
  1179. if err == io.EOF {
  1180. err = ErrConnectionClosed
  1181. }
  1182. return err
  1183. }
  1184. // PendingRequests returns the current number of requests the client
  1185. // is executing.
  1186. //
  1187. // This function may be used for balancing load among multiple HostClient
  1188. // instances.
  1189. func (c *HostClient) PendingRequests() int {
  1190. return int(atomic.LoadInt32(&c.pendingRequests))
  1191. }
  1192. func isIdempotent(req *Request) bool {
  1193. return req.Header.IsGet() || req.Header.IsHead() || req.Header.IsPut()
  1194. }
  1195. func (c *HostClient) do(req *Request, resp *Response) (bool, error) {
  1196. if resp == nil {
  1197. resp = AcquireResponse()
  1198. defer ReleaseResponse(resp)
  1199. }
  1200. ok, err := c.doNonNilReqResp(req, resp)
  1201. return ok, err
  1202. }
  1203. func (c *HostClient) doNonNilReqResp(req *Request, resp *Response) (bool, error) {
  1204. if req == nil {
  1205. // for debugging purposes
  1206. panic("BUG: req cannot be nil")
  1207. }
  1208. if resp == nil {
  1209. // for debugging purposes
  1210. panic("BUG: resp cannot be nil")
  1211. }
  1212. // Secure header error logs configuration
  1213. resp.secureErrorLogMessage = c.SecureErrorLogMessage
  1214. resp.Header.secureErrorLogMessage = c.SecureErrorLogMessage
  1215. req.secureErrorLogMessage = c.SecureErrorLogMessage
  1216. req.Header.secureErrorLogMessage = c.SecureErrorLogMessage
  1217. if c.IsTLS != req.URI().isHTTPS() {
  1218. return false, ErrHostClientRedirectToDifferentScheme
  1219. }
  1220. atomic.StoreUint32(&c.lastUseTime, uint32(time.Now().Unix()-startTimeUnix))
  1221. // Free up resources occupied by response before sending the request,
  1222. // so the GC may reclaim these resources (e.g. response body).
  1223. // backing up SkipBody in case it was set explicitly
  1224. customSkipBody := resp.SkipBody
  1225. customStreamBody := resp.StreamBody || c.StreamResponseBody
  1226. resp.Reset()
  1227. resp.SkipBody = customSkipBody
  1228. resp.StreamBody = customStreamBody
  1229. req.URI().DisablePathNormalizing = c.DisablePathNormalizing
  1230. userAgentOld := req.Header.UserAgent()
  1231. if len(userAgentOld) == 0 {
  1232. userAgent := c.Name
  1233. if userAgent == "" && !c.NoDefaultUserAgentHeader {
  1234. userAgent = defaultUserAgent
  1235. }
  1236. if userAgent != "" {
  1237. req.Header.userAgent = append(req.Header.userAgent[:0], userAgent...)
  1238. }
  1239. }
  1240. return c.transport().RoundTrip(c, req, resp)
  1241. }
  1242. func (c *HostClient) transport() RoundTripper {
  1243. if c.Transport == nil {
  1244. return DefaultTransport
  1245. }
  1246. return c.Transport
  1247. }
  1248. var (
  1249. // ErrNoFreeConns is returned when no free connections available
  1250. // to the given host.
  1251. //
  1252. // Increase the allowed number of connections per host if you
  1253. // see this error.
  1254. ErrNoFreeConns = errors.New("no free connections available to host")
  1255. // ErrConnectionClosed may be returned from client methods if the server
  1256. // closes connection before returning the first response byte.
  1257. //
  1258. // If you see this error, then either fix the server by returning
  1259. // 'Connection: close' response header before closing the connection
  1260. // or add 'Connection: close' request header before sending requests
  1261. // to broken server.
  1262. ErrConnectionClosed = errors.New("the server closed connection before returning the first response byte. " +
  1263. "Make sure the server returns 'Connection: close' response header before closing the connection")
  1264. // ErrConnPoolStrategyNotImpl is returned when HostClient.ConnPoolStrategy is not implement yet.
  1265. // If you see this error, then you need to check your HostClient configuration.
  1266. ErrConnPoolStrategyNotImpl = errors.New("connection pool strategy is not implement")
  1267. )
  1268. type timeoutError struct{}
  1269. func (e *timeoutError) Error() string {
  1270. return "timeout"
  1271. }
  1272. // Only implement the Timeout() function of the net.Error interface.
  1273. // This allows for checks like:
  1274. //
  1275. // if x, ok := err.(interface{ Timeout() bool }); ok && x.Timeout() {
  1276. func (e *timeoutError) Timeout() bool {
  1277. return true
  1278. }
  1279. // ErrTimeout is returned from timed out calls.
  1280. var ErrTimeout = &timeoutError{}
  1281. // SetMaxConns sets up the maximum number of connections which may be established to all hosts listed in Addr.
  1282. func (c *HostClient) SetMaxConns(newMaxConns int) {
  1283. c.connsLock.Lock()
  1284. c.MaxConns = newMaxConns
  1285. c.connsLock.Unlock()
  1286. }
  1287. func (c *HostClient) acquireConn(reqTimeout time.Duration, connectionClose bool) (cc *clientConn, err error) {
  1288. createConn := false
  1289. startCleaner := false
  1290. var n int
  1291. c.connsLock.Lock()
  1292. n = len(c.conns)
  1293. if n == 0 {
  1294. maxConns := c.MaxConns
  1295. if maxConns <= 0 {
  1296. maxConns = DefaultMaxConnsPerHost
  1297. }
  1298. if c.connsCount < maxConns {
  1299. c.connsCount++
  1300. createConn = true
  1301. if !c.connsCleanerRun && !connectionClose {
  1302. startCleaner = true
  1303. c.connsCleanerRun = true
  1304. }
  1305. }
  1306. } else {
  1307. switch c.ConnPoolStrategy {
  1308. case LIFO:
  1309. n--
  1310. cc = c.conns[n]
  1311. c.conns[n] = nil
  1312. c.conns = c.conns[:n]
  1313. case FIFO:
  1314. cc = c.conns[0]
  1315. copy(c.conns, c.conns[1:])
  1316. c.conns[n-1] = nil
  1317. c.conns = c.conns[:n-1]
  1318. default:
  1319. c.connsLock.Unlock()
  1320. return nil, ErrConnPoolStrategyNotImpl
  1321. }
  1322. }
  1323. c.connsLock.Unlock()
  1324. if cc != nil {
  1325. return cc, nil
  1326. }
  1327. if !createConn {
  1328. if c.MaxConnWaitTimeout <= 0 {
  1329. return nil, ErrNoFreeConns
  1330. }
  1331. //nolint:dupword
  1332. // reqTimeout c.MaxConnWaitTimeout wait duration
  1333. // d1 d2 min(d1, d2)
  1334. // 0(not set) d2 d2
  1335. // d1 0(don't wait) 0(don't wait)
  1336. // 0(not set) d2 d2
  1337. timeout := c.MaxConnWaitTimeout
  1338. timeoutOverridden := false
  1339. // reqTimeout == 0 means not set
  1340. if reqTimeout > 0 && reqTimeout < timeout {
  1341. timeout = reqTimeout
  1342. timeoutOverridden = true
  1343. }
  1344. // wait for a free connection
  1345. tc := AcquireTimer(timeout)
  1346. defer ReleaseTimer(tc)
  1347. w := &wantConn{
  1348. ready: make(chan struct{}, 1),
  1349. }
  1350. defer func() {
  1351. if err != nil {
  1352. w.cancel(c, err)
  1353. }
  1354. }()
  1355. c.queueForIdle(w)
  1356. select {
  1357. case <-w.ready:
  1358. return w.conn, w.err
  1359. case <-tc.C:
  1360. if timeoutOverridden {
  1361. return nil, ErrTimeout
  1362. }
  1363. return nil, ErrNoFreeConns
  1364. }
  1365. }
  1366. if startCleaner {
  1367. go c.connsCleaner()
  1368. }
  1369. conn, err := c.dialHostHard(reqTimeout)
  1370. if err != nil {
  1371. c.decConnsCount()
  1372. return nil, err
  1373. }
  1374. cc = acquireClientConn(conn)
  1375. return cc, nil
  1376. }
  1377. func (c *HostClient) queueForIdle(w *wantConn) {
  1378. c.connsLock.Lock()
  1379. defer c.connsLock.Unlock()
  1380. if c.connsWait == nil {
  1381. c.connsWait = &wantConnQueue{}
  1382. }
  1383. c.connsWait.clearFront()
  1384. c.connsWait.pushBack(w)
  1385. }
  1386. func (c *HostClient) dialConnFor(w *wantConn) {
  1387. conn, err := c.dialHostHard(0)
  1388. if err != nil {
  1389. w.tryDeliver(nil, err)
  1390. c.decConnsCount()
  1391. return
  1392. }
  1393. cc := acquireClientConn(conn)
  1394. if !w.tryDeliver(cc, nil) {
  1395. // not delivered, return idle connection
  1396. c.releaseConn(cc)
  1397. }
  1398. }
  1399. // CloseIdleConnections closes any connections which were previously
  1400. // connected from previous requests but are now sitting idle in a
  1401. // "keep-alive" state. It does not interrupt any connections currently
  1402. // in use.
  1403. func (c *HostClient) CloseIdleConnections() {
  1404. c.connsLock.Lock()
  1405. scratch := append([]*clientConn{}, c.conns...)
  1406. for i := range c.conns {
  1407. c.conns[i] = nil
  1408. }
  1409. c.conns = c.conns[:0]
  1410. c.connsLock.Unlock()
  1411. for _, cc := range scratch {
  1412. c.closeConn(cc)
  1413. }
  1414. }
  1415. func (c *HostClient) connsCleaner() {
  1416. var (
  1417. scratch []*clientConn
  1418. maxIdleConnDuration = c.MaxIdleConnDuration
  1419. )
  1420. if maxIdleConnDuration <= 0 {
  1421. maxIdleConnDuration = DefaultMaxIdleConnDuration
  1422. }
  1423. for {
  1424. currentTime := time.Now()
  1425. // Determine idle connections to be closed.
  1426. c.connsLock.Lock()
  1427. conns := c.conns
  1428. n := len(conns)
  1429. i := 0
  1430. for i < n && currentTime.Sub(conns[i].lastUseTime) > maxIdleConnDuration {
  1431. i++
  1432. }
  1433. sleepFor := maxIdleConnDuration
  1434. if i < n {
  1435. // + 1 so we actually sleep past the expiration time and not up to it.
  1436. // Otherwise the > check above would still fail.
  1437. sleepFor = maxIdleConnDuration - currentTime.Sub(conns[i].lastUseTime) + 1
  1438. }
  1439. scratch = append(scratch[:0], conns[:i]...)
  1440. if i > 0 {
  1441. m := copy(conns, conns[i:])
  1442. for i = m; i < n; i++ {
  1443. conns[i] = nil
  1444. }
  1445. c.conns = conns[:m]
  1446. }
  1447. c.connsLock.Unlock()
  1448. // Close idle connections.
  1449. for i, cc := range scratch {
  1450. c.closeConn(cc)
  1451. scratch[i] = nil
  1452. }
  1453. // Determine whether to stop the connsCleaner.
  1454. c.connsLock.Lock()
  1455. mustStop := c.connsCount == 0
  1456. if mustStop {
  1457. c.connsCleanerRun = false
  1458. }
  1459. c.connsLock.Unlock()
  1460. if mustStop {
  1461. break
  1462. }
  1463. time.Sleep(sleepFor)
  1464. }
  1465. }
  1466. func (c *HostClient) closeConn(cc *clientConn) {
  1467. c.decConnsCount()
  1468. cc.c.Close()
  1469. releaseClientConn(cc)
  1470. }
  1471. func (c *HostClient) decConnsCount() {
  1472. if c.MaxConnWaitTimeout <= 0 {
  1473. c.connsLock.Lock()
  1474. c.connsCount--
  1475. c.connsLock.Unlock()
  1476. return
  1477. }
  1478. c.connsLock.Lock()
  1479. defer c.connsLock.Unlock()
  1480. dialed := false
  1481. if q := c.connsWait; q != nil && q.len() > 0 {
  1482. for q.len() > 0 {
  1483. w := q.popFront()
  1484. if w.waiting() {
  1485. go c.dialConnFor(w)
  1486. dialed = true
  1487. break
  1488. }
  1489. }
  1490. }
  1491. if !dialed {
  1492. c.connsCount--
  1493. }
  1494. }
  1495. // ConnsCount returns connection count of HostClient.
  1496. func (c *HostClient) ConnsCount() int {
  1497. c.connsLock.Lock()
  1498. defer c.connsLock.Unlock()
  1499. return c.connsCount
  1500. }
  1501. func acquireClientConn(conn net.Conn) *clientConn {
  1502. v := clientConnPool.Get()
  1503. if v == nil {
  1504. v = &clientConn{}
  1505. }
  1506. cc := v.(*clientConn)
  1507. cc.c = conn
  1508. cc.createdTime = time.Now()
  1509. return cc
  1510. }
  1511. func releaseClientConn(cc *clientConn) {
  1512. // Reset all fields.
  1513. *cc = clientConn{}
  1514. clientConnPool.Put(cc)
  1515. }
  1516. var clientConnPool sync.Pool
  1517. func (c *HostClient) releaseConn(cc *clientConn) {
  1518. cc.lastUseTime = time.Now()
  1519. if c.MaxConnWaitTimeout <= 0 {
  1520. c.connsLock.Lock()
  1521. c.conns = append(c.conns, cc)
  1522. c.connsLock.Unlock()
  1523. return
  1524. }
  1525. // try to deliver an idle connection to a *wantConn
  1526. c.connsLock.Lock()
  1527. defer c.connsLock.Unlock()
  1528. delivered := false
  1529. if q := c.connsWait; q != nil && q.len() > 0 {
  1530. for q.len() > 0 {
  1531. w := q.popFront()
  1532. if w.waiting() {
  1533. delivered = w.tryDeliver(cc, nil)
  1534. break
  1535. }
  1536. }
  1537. }
  1538. if !delivered {
  1539. c.conns = append(c.conns, cc)
  1540. }
  1541. }
  1542. func (c *HostClient) acquireWriter(conn net.Conn) *bufio.Writer {
  1543. var v any
  1544. if c.clientWriterPool != nil {
  1545. v = c.clientWriterPool.Get()
  1546. if v == nil {
  1547. n := c.WriteBufferSize
  1548. if n <= 0 {
  1549. n = defaultWriteBufferSize
  1550. }
  1551. return bufio.NewWriterSize(conn, n)
  1552. }
  1553. } else {
  1554. v = c.writerPool.Get()
  1555. if v == nil {
  1556. n := c.WriteBufferSize
  1557. if n <= 0 {
  1558. n = defaultWriteBufferSize
  1559. }
  1560. return bufio.NewWriterSize(conn, n)
  1561. }
  1562. }
  1563. bw := v.(*bufio.Writer)
  1564. bw.Reset(conn)
  1565. return bw
  1566. }
  1567. func (c *HostClient) releaseWriter(bw *bufio.Writer) {
  1568. if c.clientWriterPool != nil {
  1569. c.clientWriterPool.Put(bw)
  1570. } else {
  1571. c.writerPool.Put(bw)
  1572. }
  1573. }
  1574. func (c *HostClient) acquireReader(conn net.Conn) *bufio.Reader {
  1575. var v any
  1576. if c.clientReaderPool != nil {
  1577. v = c.clientReaderPool.Get()
  1578. if v == nil {
  1579. n := c.ReadBufferSize
  1580. if n <= 0 {
  1581. n = defaultReadBufferSize
  1582. }
  1583. return bufio.NewReaderSize(conn, n)
  1584. }
  1585. } else {
  1586. v = c.readerPool.Get()
  1587. if v == nil {
  1588. n := c.ReadBufferSize
  1589. if n <= 0 {
  1590. n = defaultReadBufferSize
  1591. }
  1592. return bufio.NewReaderSize(conn, n)
  1593. }
  1594. }
  1595. br := v.(*bufio.Reader)
  1596. br.Reset(conn)
  1597. return br
  1598. }
  1599. func (c *HostClient) releaseReader(br *bufio.Reader) {
  1600. if c.clientReaderPool != nil {
  1601. c.clientReaderPool.Put(br)
  1602. } else {
  1603. c.readerPool.Put(br)
  1604. }
  1605. }
  1606. func newClientTLSConfig(c *tls.Config, addr string) *tls.Config {
  1607. if c == nil {
  1608. c = &tls.Config{}
  1609. } else {
  1610. c = c.Clone()
  1611. }
  1612. if c.ServerName == "" {
  1613. serverName := tlsServerName(addr)
  1614. if serverName == "*" {
  1615. c.InsecureSkipVerify = true
  1616. } else {
  1617. c.ServerName = serverName
  1618. }
  1619. }
  1620. return c
  1621. }
  1622. func tlsServerName(addr string) string {
  1623. if !strings.Contains(addr, ":") {
  1624. return addr
  1625. }
  1626. host, _, err := net.SplitHostPort(addr)
  1627. if err != nil {
  1628. return "*"
  1629. }
  1630. return host
  1631. }
  1632. func (c *HostClient) nextAddr() string {
  1633. c.addrsLock.Lock()
  1634. if c.addrs == nil {
  1635. c.addrs = strings.Split(c.Addr, ",")
  1636. }
  1637. addr := c.addrs[0]
  1638. if len(c.addrs) > 1 {
  1639. addr = c.addrs[c.addrIdx%uint32(len(c.addrs))]
  1640. c.addrIdx++
  1641. }
  1642. c.addrsLock.Unlock()
  1643. return addr
  1644. }
  1645. func (c *HostClient) dialHostHard(dialTimeout time.Duration) (conn net.Conn, err error) {
  1646. // use dialTimeout to control the timeout of each dial. It does not work if dialTimeout is 0 or if
  1647. // c.DialTimeout has not been set and c.Dial has been set.
  1648. // attempt to dial all the available hosts before giving up.
  1649. c.addrsLock.Lock()
  1650. n := len(c.addrs)
  1651. c.addrsLock.Unlock()
  1652. if n == 0 {
  1653. // It looks like c.addrs isn't initialized yet.
  1654. n = 1
  1655. }
  1656. timeout := c.ReadTimeout + c.WriteTimeout
  1657. if timeout <= 0 {
  1658. timeout = DefaultDialTimeout
  1659. }
  1660. deadline := time.Now().Add(timeout)
  1661. for n > 0 {
  1662. addr := c.nextAddr()
  1663. tlsConfig := c.cachedTLSConfig(addr)
  1664. conn, err = dialAddr(addr, c.Dial, c.DialTimeout, c.DialDualStack, c.IsTLS, tlsConfig, dialTimeout, c.WriteTimeout)
  1665. if err == nil {
  1666. return conn, nil
  1667. }
  1668. if time.Since(deadline) >= 0 {
  1669. break
  1670. }
  1671. n--
  1672. }
  1673. return nil, err
  1674. }
  1675. func (c *HostClient) cachedTLSConfig(addr string) *tls.Config {
  1676. if !c.IsTLS {
  1677. return nil
  1678. }
  1679. c.tlsConfigMapLock.Lock()
  1680. if c.tlsConfigMap == nil {
  1681. c.tlsConfigMap = make(map[string]*tls.Config)
  1682. }
  1683. cfg := c.tlsConfigMap[addr]
  1684. if cfg == nil {
  1685. cfg = newClientTLSConfig(c.TLSConfig, addr)
  1686. c.tlsConfigMap[addr] = cfg
  1687. }
  1688. c.tlsConfigMapLock.Unlock()
  1689. return cfg
  1690. }
  1691. // ErrTLSHandshakeTimeout indicates there is a timeout from tls handshake.
  1692. var ErrTLSHandshakeTimeout = errors.New("tls handshake timed out")
  1693. func tlsClientHandshake(rawConn net.Conn, tlsConfig *tls.Config, deadline time.Time) (_ net.Conn, retErr error) {
  1694. defer func() {
  1695. if retErr != nil {
  1696. rawConn.Close()
  1697. }
  1698. }()
  1699. conn := tls.Client(rawConn, tlsConfig)
  1700. err := conn.SetDeadline(deadline)
  1701. if err != nil {
  1702. return nil, err
  1703. }
  1704. err = conn.Handshake()
  1705. if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
  1706. return nil, ErrTLSHandshakeTimeout
  1707. }
  1708. if err != nil {
  1709. return nil, err
  1710. }
  1711. err = conn.SetDeadline(time.Time{})
  1712. if err != nil {
  1713. return nil, err
  1714. }
  1715. return conn, nil
  1716. }
  1717. func dialAddr(
  1718. addr string, dial DialFunc, dialWithTimeout DialFuncWithTimeout, dialDualStack, isTLS bool,
  1719. tlsConfig *tls.Config, dialTimeout, writeTimeout time.Duration,
  1720. ) (net.Conn, error) {
  1721. deadline := time.Now().Add(writeTimeout)
  1722. conn, err := callDialFunc(addr, dial, dialWithTimeout, dialDualStack, isTLS, dialTimeout)
  1723. if err != nil {
  1724. return nil, err
  1725. }
  1726. if conn == nil {
  1727. return nil, errors.New("dialling unsuccessful. Please report this bug")
  1728. }
  1729. // We assume that any conn that has the Handshake() method is a TLS conn already.
  1730. // This doesn't cover just tls.Conn but also other TLS implementations.
  1731. _, isTLSAlready := conn.(interface{ Handshake() error })
  1732. if isTLS && !isTLSAlready {
  1733. if writeTimeout == 0 {
  1734. return tls.Client(conn, tlsConfig), nil
  1735. }
  1736. return tlsClientHandshake(conn, tlsConfig, deadline)
  1737. }
  1738. return conn, nil
  1739. }
  1740. func callDialFunc(
  1741. addr string, dial DialFunc, dialWithTimeout DialFuncWithTimeout, dialDualStack, isTLS bool, timeout time.Duration,
  1742. ) (net.Conn, error) {
  1743. if dialWithTimeout != nil {
  1744. return dialWithTimeout(addr, timeout)
  1745. }
  1746. if dial != nil {
  1747. return dial(addr)
  1748. }
  1749. addr = AddMissingPort(addr, isTLS)
  1750. if timeout > 0 {
  1751. if dialDualStack {
  1752. return DialDualStackTimeout(addr, timeout)
  1753. }
  1754. return DialTimeout(addr, timeout)
  1755. }
  1756. if dialDualStack {
  1757. return DialDualStack(addr)
  1758. }
  1759. return Dial(addr)
  1760. }
  1761. // AddMissingPort adds a port to a host if it is missing.
  1762. // A literal IPv6 address in hostport must be enclosed in square
  1763. // brackets, as in "[::1]:80", "[::1%lo0]:80".
  1764. func AddMissingPort(addr string, isTLS bool) string {
  1765. addrLen := len(addr)
  1766. if addrLen == 0 {
  1767. return addr
  1768. }
  1769. isIP6 := addr[0] == '['
  1770. if isIP6 {
  1771. // if the IPv6 has opening bracket but closing bracket is the last char then it doesn't have a port
  1772. isIP6WithoutPort := addr[addrLen-1] == ']'
  1773. if !isIP6WithoutPort {
  1774. return addr
  1775. }
  1776. } else { // IPv4
  1777. columnPos := strings.LastIndexByte(addr, ':')
  1778. if columnPos > 0 {
  1779. return addr
  1780. }
  1781. }
  1782. port := ":80"
  1783. if isTLS {
  1784. port = ":443"
  1785. }
  1786. return addr + port
  1787. }
  1788. // A wantConn records state about a wanted connection
  1789. // (that is, an active call to getConn).
  1790. // The conn may be gotten by dialing or by finding an idle connection,
  1791. // or a cancellation may make the conn no longer wanted.
  1792. // These three options are racing against each other and use
  1793. // wantConn to coordinate and agree about the winning outcome.
  1794. //
  1795. // Inspired by net/http/transport.go.
  1796. type wantConn struct {
  1797. ready chan struct{}
  1798. mu sync.Mutex // protects conn, err, close(ready)
  1799. conn *clientConn
  1800. err error
  1801. }
  1802. // waiting reports whether w is still waiting for an answer (connection or error).
  1803. func (w *wantConn) waiting() bool {
  1804. select {
  1805. case <-w.ready:
  1806. return false
  1807. default:
  1808. return true
  1809. }
  1810. }
  1811. // tryDeliver attempts to deliver conn, err to w and reports whether it succeeded.
  1812. func (w *wantConn) tryDeliver(conn *clientConn, err error) bool {
  1813. w.mu.Lock()
  1814. defer w.mu.Unlock()
  1815. if w.conn != nil || w.err != nil {
  1816. return false
  1817. }
  1818. w.conn = conn
  1819. w.err = err
  1820. if w.conn == nil && w.err == nil {
  1821. panic("fasthttp: internal error: misuse of tryDeliver")
  1822. }
  1823. close(w.ready)
  1824. return true
  1825. }
  1826. // cancel marks w as no longer wanting a result (for example, due to cancellation).
  1827. // If a connection has been delivered already, cancel returns it with c.releaseConn.
  1828. func (w *wantConn) cancel(c *HostClient, err error) {
  1829. w.mu.Lock()
  1830. if w.conn == nil && w.err == nil {
  1831. close(w.ready) // catch misbehavior in future delivery
  1832. }
  1833. conn := w.conn
  1834. w.conn = nil
  1835. w.err = err
  1836. w.mu.Unlock()
  1837. if conn != nil {
  1838. c.releaseConn(conn)
  1839. }
  1840. }
  1841. // A wantConnQueue is a queue of wantConns.
  1842. //
  1843. // Inspired by net/http/transport.go.
  1844. type wantConnQueue struct {
  1845. // This is a queue, not a dequeue.
  1846. // It is split into two stages - head[headPos:] and tail.
  1847. // popFront is trivial (headPos++) on the first stage, and
  1848. // pushBack is trivial (append) on the second stage.
  1849. // If the first stage is empty, popFront can swap the
  1850. // first and second stages to remedy the situation.
  1851. //
  1852. // This two-stage split is analogous to the use of two lists
  1853. // in Okasaki's purely functional queue but without the
  1854. // overhead of reversing the list when swapping stages.
  1855. head []*wantConn
  1856. headPos int
  1857. tail []*wantConn
  1858. }
  1859. // len returns the number of items in the queue.
  1860. func (q *wantConnQueue) len() int {
  1861. return len(q.head) - q.headPos + len(q.tail)
  1862. }
  1863. // pushBack adds w to the back of the queue.
  1864. func (q *wantConnQueue) pushBack(w *wantConn) {
  1865. q.tail = append(q.tail, w)
  1866. }
  1867. // popFront removes and returns the wantConn at the front of the queue.
  1868. func (q *wantConnQueue) popFront() *wantConn {
  1869. if q.headPos >= len(q.head) {
  1870. if len(q.tail) == 0 {
  1871. return nil
  1872. }
  1873. // Pick up tail as new head, clear tail.
  1874. q.head, q.headPos, q.tail = q.tail, 0, q.head[:0]
  1875. }
  1876. w := q.head[q.headPos]
  1877. q.head[q.headPos] = nil
  1878. q.headPos++
  1879. return w
  1880. }
  1881. // peekFront returns the wantConn at the front of the queue without removing it.
  1882. func (q *wantConnQueue) peekFront() *wantConn {
  1883. if q.headPos < len(q.head) {
  1884. return q.head[q.headPos]
  1885. }
  1886. if len(q.tail) > 0 {
  1887. return q.tail[0]
  1888. }
  1889. return nil
  1890. }
  1891. // clearFront pops any wantConns that are no longer waiting from the head of the
  1892. // queue, reporting whether any were popped.
  1893. func (q *wantConnQueue) clearFront() (cleaned bool) {
  1894. for {
  1895. w := q.peekFront()
  1896. if w == nil || w.waiting() {
  1897. return cleaned
  1898. }
  1899. q.popFront()
  1900. cleaned = true
  1901. }
  1902. }
  1903. // PipelineClient pipelines requests over a limited set of concurrent
  1904. // connections to the given Addr.
  1905. //
  1906. // This client may be used in highly loaded HTTP-based RPC systems for reducing
  1907. // context switches and network level overhead.
  1908. // See https://en.wikipedia.org/wiki/HTTP_pipelining for details.
  1909. //
  1910. // It is forbidden copying PipelineClient instances. Create new instances
  1911. // instead.
  1912. //
  1913. // It is safe calling PipelineClient methods from concurrently running
  1914. // goroutines.
  1915. type PipelineClient struct {
  1916. noCopy noCopy
  1917. // Address of the host to connect to.
  1918. Addr string
  1919. // PipelineClient name. Used in User-Agent request header.
  1920. Name string
  1921. // NoDefaultUserAgentHeader when set to true, causes the default
  1922. // User-Agent header to be excluded from the Request.
  1923. NoDefaultUserAgentHeader bool
  1924. // The maximum number of concurrent connections to the Addr.
  1925. //
  1926. // A single connection is used by default.
  1927. MaxConns int
  1928. // The maximum number of pending pipelined requests over
  1929. // a single connection to Addr.
  1930. //
  1931. // DefaultMaxPendingRequests is used by default.
  1932. MaxPendingRequests int
  1933. // The maximum delay before sending pipelined requests as a batch
  1934. // to the server.
  1935. //
  1936. // By default requests are sent immediately to the server.
  1937. MaxBatchDelay time.Duration
  1938. // Callback for connection establishing to the host.
  1939. //
  1940. // Default Dial is used if not set.
  1941. Dial DialFunc
  1942. // Attempt to connect to both ipv4 and ipv6 host addresses
  1943. // if set to true.
  1944. //
  1945. // This option is used only if default TCP dialer is used,
  1946. // i.e. if Dial is blank.
  1947. //
  1948. // By default client connects only to ipv4 addresses,
  1949. // since unfortunately ipv6 remains broken in many networks worldwide :)
  1950. DialDualStack bool
  1951. // Response header names are passed as-is without normalization
  1952. // if this option is set.
  1953. //
  1954. // Disabled header names' normalization may be useful only for proxying
  1955. // responses to other clients expecting case-sensitive
  1956. // header names. See https://github.com/valyala/fasthttp/issues/57
  1957. // for details.
  1958. //
  1959. // By default request and response header names are normalized, i.e.
  1960. // The first letter and the first letters following dashes
  1961. // are uppercased, while all the other letters are lowercased.
  1962. // Examples:
  1963. //
  1964. // * HOST -> Host
  1965. // * content-type -> Content-Type
  1966. // * cONTENT-lenGTH -> Content-Length
  1967. DisableHeaderNamesNormalizing bool
  1968. // Path values are sent as-is without normalization
  1969. //
  1970. // Disabled path normalization may be useful for proxying incoming requests
  1971. // to servers that are expecting paths to be forwarded as-is.
  1972. //
  1973. // By default path values are normalized, i.e.
  1974. // extra slashes are removed, special characters are encoded.
  1975. DisablePathNormalizing bool
  1976. // Whether to use TLS (aka SSL or HTTPS) for host connections.
  1977. IsTLS bool
  1978. // Optional TLS config.
  1979. TLSConfig *tls.Config
  1980. // Idle connection to the host is closed after this duration.
  1981. //
  1982. // By default idle connection is closed after
  1983. // DefaultMaxIdleConnDuration.
  1984. MaxIdleConnDuration time.Duration
  1985. // Buffer size for responses' reading.
  1986. // This also limits the maximum header size.
  1987. //
  1988. // Default buffer size is used if 0.
  1989. ReadBufferSize int
  1990. // Buffer size for requests' writing.
  1991. //
  1992. // Default buffer size is used if 0.
  1993. WriteBufferSize int
  1994. // Maximum duration for full response reading (including body).
  1995. //
  1996. // By default response read timeout is unlimited.
  1997. ReadTimeout time.Duration
  1998. // Maximum duration for full request writing (including body).
  1999. //
  2000. // By default request write timeout is unlimited.
  2001. WriteTimeout time.Duration
  2002. // Logger for logging client errors.
  2003. //
  2004. // By default standard logger from log package is used.
  2005. Logger Logger
  2006. connClients []*pipelineConnClient
  2007. connClientsLock sync.Mutex
  2008. }
  2009. type pipelineConnClient struct {
  2010. noCopy noCopy
  2011. Addr string
  2012. Name string
  2013. NoDefaultUserAgentHeader bool
  2014. MaxPendingRequests int
  2015. MaxBatchDelay time.Duration
  2016. Dial DialFunc
  2017. DialDualStack bool
  2018. DisableHeaderNamesNormalizing bool
  2019. DisablePathNormalizing bool
  2020. IsTLS bool
  2021. TLSConfig *tls.Config
  2022. MaxIdleConnDuration time.Duration
  2023. ReadBufferSize int
  2024. WriteBufferSize int
  2025. ReadTimeout time.Duration
  2026. WriteTimeout time.Duration
  2027. Logger Logger
  2028. workPool sync.Pool
  2029. chLock sync.Mutex
  2030. chW chan *pipelineWork
  2031. chR chan *pipelineWork
  2032. tlsConfigLock sync.Mutex
  2033. tlsConfig *tls.Config
  2034. }
  2035. type pipelineWork struct {
  2036. reqCopy Request
  2037. respCopy Response
  2038. req *Request
  2039. resp *Response
  2040. t *time.Timer
  2041. deadline time.Time
  2042. err error
  2043. done chan struct{}
  2044. }
  2045. // DoTimeout performs the given request and waits for response during
  2046. // the given timeout duration.
  2047. //
  2048. // Request must contain at least non-zero RequestURI with full url (including
  2049. // scheme and host) or non-zero Host header + RequestURI.
  2050. //
  2051. // The function doesn't follow redirects.
  2052. //
  2053. // Response is ignored if resp is nil.
  2054. //
  2055. // ErrTimeout is returned if the response wasn't returned during
  2056. // the given timeout.
  2057. //
  2058. // It is recommended obtaining req and resp via AcquireRequest
  2059. // and AcquireResponse in performance-critical code.
  2060. func (c *PipelineClient) DoTimeout(req *Request, resp *Response, timeout time.Duration) error {
  2061. return c.DoDeadline(req, resp, time.Now().Add(timeout))
  2062. }
  2063. // DoDeadline performs the given request and waits for response until
  2064. // the given deadline.
  2065. //
  2066. // Request must contain at least non-zero RequestURI with full url (including
  2067. // scheme and host) or non-zero Host header + RequestURI.
  2068. //
  2069. // The function doesn't follow redirects.
  2070. //
  2071. // Response is ignored if resp is nil.
  2072. //
  2073. // ErrTimeout is returned if the response wasn't returned until
  2074. // the given deadline.
  2075. //
  2076. // It is recommended obtaining req and resp via AcquireRequest
  2077. // and AcquireResponse in performance-critical code.
  2078. func (c *PipelineClient) DoDeadline(req *Request, resp *Response, deadline time.Time) error {
  2079. return c.getConnClient().DoDeadline(req, resp, deadline)
  2080. }
  2081. func (c *pipelineConnClient) DoDeadline(req *Request, resp *Response, deadline time.Time) error {
  2082. c.init()
  2083. timeout := time.Until(deadline)
  2084. if timeout <= 0 {
  2085. return ErrTimeout
  2086. }
  2087. if c.DisablePathNormalizing {
  2088. req.URI().DisablePathNormalizing = true
  2089. }
  2090. userAgentOld := req.Header.UserAgent()
  2091. if len(userAgentOld) == 0 {
  2092. userAgent := c.Name
  2093. if userAgent == "" && !c.NoDefaultUserAgentHeader {
  2094. userAgent = defaultUserAgent
  2095. }
  2096. if userAgent != "" {
  2097. req.Header.userAgent = append(req.Header.userAgent[:0], userAgent...)
  2098. }
  2099. }
  2100. w := c.acquirePipelineWork(timeout)
  2101. w.respCopy.Header.disableNormalizing = c.DisableHeaderNamesNormalizing
  2102. w.req = &w.reqCopy
  2103. w.resp = &w.respCopy
  2104. // Make a copy of the request in order to avoid data races on timeouts
  2105. req.copyToSkipBody(&w.reqCopy)
  2106. swapRequestBody(req, &w.reqCopy)
  2107. // Put the request to outgoing queue
  2108. select {
  2109. case c.chW <- w:
  2110. // Fast path: len(c.ch) < cap(c.ch)
  2111. default:
  2112. // Slow path
  2113. select {
  2114. case c.chW <- w:
  2115. case <-w.t.C:
  2116. c.releasePipelineWork(w)
  2117. return ErrTimeout
  2118. }
  2119. }
  2120. // Wait for the response
  2121. var err error
  2122. select {
  2123. case <-w.done:
  2124. if resp != nil {
  2125. w.respCopy.copyToSkipBody(resp)
  2126. swapResponseBody(resp, &w.respCopy)
  2127. }
  2128. err = w.err
  2129. c.releasePipelineWork(w)
  2130. case <-w.t.C:
  2131. err = ErrTimeout
  2132. }
  2133. return err
  2134. }
  2135. func (c *pipelineConnClient) acquirePipelineWork(timeout time.Duration) (w *pipelineWork) {
  2136. v := c.workPool.Get()
  2137. if v != nil {
  2138. w = v.(*pipelineWork)
  2139. } else {
  2140. w = &pipelineWork{
  2141. done: make(chan struct{}, 1),
  2142. }
  2143. }
  2144. if timeout > 0 {
  2145. if w.t == nil {
  2146. w.t = time.NewTimer(timeout)
  2147. } else {
  2148. w.t.Reset(timeout)
  2149. }
  2150. w.deadline = time.Now().Add(timeout)
  2151. } else {
  2152. w.deadline = zeroTime
  2153. }
  2154. return w
  2155. }
  2156. func (c *pipelineConnClient) releasePipelineWork(w *pipelineWork) {
  2157. if w.t != nil {
  2158. w.t.Stop()
  2159. }
  2160. w.reqCopy.Reset()
  2161. w.respCopy.Reset()
  2162. w.req = nil
  2163. w.resp = nil
  2164. w.err = nil
  2165. c.workPool.Put(w)
  2166. }
  2167. // Do performs the given http request and sets the corresponding response.
  2168. //
  2169. // Request must contain at least non-zero RequestURI with full url (including
  2170. // scheme and host) or non-zero Host header + RequestURI.
  2171. //
  2172. // The function doesn't follow redirects. Use Get* for following redirects.
  2173. //
  2174. // Response is ignored if resp is nil.
  2175. //
  2176. // It is recommended obtaining req and resp via AcquireRequest
  2177. // and AcquireResponse in performance-critical code.
  2178. func (c *PipelineClient) Do(req *Request, resp *Response) error {
  2179. return c.getConnClient().Do(req, resp)
  2180. }
  2181. func (c *pipelineConnClient) Do(req *Request, resp *Response) error {
  2182. c.init()
  2183. if c.DisablePathNormalizing {
  2184. req.URI().DisablePathNormalizing = true
  2185. }
  2186. userAgentOld := req.Header.UserAgent()
  2187. if len(userAgentOld) == 0 {
  2188. userAgent := c.Name
  2189. if userAgent == "" && !c.NoDefaultUserAgentHeader {
  2190. userAgent = defaultUserAgent
  2191. }
  2192. if userAgent != "" {
  2193. req.Header.userAgent = append(req.Header.userAgent[:0], userAgent...)
  2194. }
  2195. }
  2196. w := c.acquirePipelineWork(0)
  2197. w.req = req
  2198. if resp != nil {
  2199. resp.Header.disableNormalizing = c.DisableHeaderNamesNormalizing
  2200. w.resp = resp
  2201. } else {
  2202. w.resp = &w.respCopy
  2203. }
  2204. // Put the request to outgoing queue
  2205. select {
  2206. case c.chW <- w:
  2207. default:
  2208. // Try substituting the oldest w with the current one.
  2209. select {
  2210. case wOld := <-c.chW:
  2211. wOld.err = ErrPipelineOverflow
  2212. wOld.done <- struct{}{}
  2213. default:
  2214. }
  2215. select {
  2216. case c.chW <- w:
  2217. default:
  2218. c.releasePipelineWork(w)
  2219. return ErrPipelineOverflow
  2220. }
  2221. }
  2222. // Wait for the response
  2223. <-w.done
  2224. err := w.err
  2225. c.releasePipelineWork(w)
  2226. return err
  2227. }
  2228. func (c *PipelineClient) getConnClient() *pipelineConnClient {
  2229. c.connClientsLock.Lock()
  2230. cc := c.getConnClientUnlocked()
  2231. c.connClientsLock.Unlock()
  2232. return cc
  2233. }
  2234. func (c *PipelineClient) getConnClientUnlocked() *pipelineConnClient {
  2235. if len(c.connClients) == 0 {
  2236. return c.newConnClient()
  2237. }
  2238. // Return the client with the minimum number of pending requests.
  2239. minCC := c.connClients[0]
  2240. minReqs := minCC.PendingRequests()
  2241. if minReqs == 0 {
  2242. return minCC
  2243. }
  2244. for i := 1; i < len(c.connClients); i++ {
  2245. cc := c.connClients[i]
  2246. reqs := cc.PendingRequests()
  2247. if reqs == 0 {
  2248. return cc
  2249. }
  2250. if reqs < minReqs {
  2251. minCC = cc
  2252. minReqs = reqs
  2253. }
  2254. }
  2255. maxConns := c.MaxConns
  2256. if maxConns <= 0 {
  2257. maxConns = 1
  2258. }
  2259. if len(c.connClients) < maxConns {
  2260. return c.newConnClient()
  2261. }
  2262. return minCC
  2263. }
  2264. func (c *PipelineClient) newConnClient() *pipelineConnClient {
  2265. cc := &pipelineConnClient{
  2266. Addr: c.Addr,
  2267. Name: c.Name,
  2268. NoDefaultUserAgentHeader: c.NoDefaultUserAgentHeader,
  2269. MaxPendingRequests: c.MaxPendingRequests,
  2270. MaxBatchDelay: c.MaxBatchDelay,
  2271. Dial: c.Dial,
  2272. DialDualStack: c.DialDualStack,
  2273. DisableHeaderNamesNormalizing: c.DisableHeaderNamesNormalizing,
  2274. DisablePathNormalizing: c.DisablePathNormalizing,
  2275. IsTLS: c.IsTLS,
  2276. TLSConfig: c.TLSConfig,
  2277. MaxIdleConnDuration: c.MaxIdleConnDuration,
  2278. ReadBufferSize: c.ReadBufferSize,
  2279. WriteBufferSize: c.WriteBufferSize,
  2280. ReadTimeout: c.ReadTimeout,
  2281. WriteTimeout: c.WriteTimeout,
  2282. Logger: c.Logger,
  2283. }
  2284. c.connClients = append(c.connClients, cc)
  2285. return cc
  2286. }
  2287. // ErrPipelineOverflow may be returned from PipelineClient.Do*
  2288. // if the requests' queue is overflowed.
  2289. var ErrPipelineOverflow = errors.New("pipelined requests' queue has been overflowed. Increase MaxConns and/or MaxPendingRequests")
  2290. // DefaultMaxPendingRequests is the default value
  2291. // for PipelineClient.MaxPendingRequests.
  2292. const DefaultMaxPendingRequests = 1024
  2293. func (c *pipelineConnClient) init() {
  2294. c.chLock.Lock()
  2295. if c.chR == nil {
  2296. maxPendingRequests := c.MaxPendingRequests
  2297. if maxPendingRequests <= 0 {
  2298. maxPendingRequests = DefaultMaxPendingRequests
  2299. }
  2300. c.chR = make(chan *pipelineWork, maxPendingRequests)
  2301. if c.chW == nil {
  2302. c.chW = make(chan *pipelineWork, maxPendingRequests)
  2303. }
  2304. go func() {
  2305. // Keep restarting the worker if it fails (connection errors for example).
  2306. for {
  2307. if err := c.worker(); err != nil {
  2308. c.logger().Printf("error in PipelineClient(%q): %v", c.Addr, err)
  2309. if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
  2310. // Throttle client reconnections on timeout errors
  2311. time.Sleep(time.Second)
  2312. }
  2313. } else {
  2314. c.chLock.Lock()
  2315. stop := len(c.chR) == 0 && len(c.chW) == 0
  2316. if !stop {
  2317. c.chR = nil
  2318. c.chW = nil
  2319. }
  2320. c.chLock.Unlock()
  2321. if stop {
  2322. break
  2323. }
  2324. }
  2325. }
  2326. }()
  2327. }
  2328. c.chLock.Unlock()
  2329. }
  2330. func (c *pipelineConnClient) worker() error {
  2331. tlsConfig := c.cachedTLSConfig()
  2332. conn, err := dialAddr(c.Addr, c.Dial, nil, c.DialDualStack, c.IsTLS, tlsConfig, 0, c.WriteTimeout)
  2333. if err != nil {
  2334. return err
  2335. }
  2336. // Start reader and writer
  2337. stopW := make(chan struct{})
  2338. doneW := make(chan error)
  2339. go func() {
  2340. doneW <- c.writer(conn, stopW)
  2341. }()
  2342. stopR := make(chan struct{})
  2343. doneR := make(chan error)
  2344. go func() {
  2345. doneR <- c.reader(conn, stopR)
  2346. }()
  2347. // Wait until reader and writer are stopped
  2348. select {
  2349. case err = <-doneW:
  2350. conn.Close()
  2351. close(stopR)
  2352. <-doneR
  2353. case err = <-doneR:
  2354. conn.Close()
  2355. close(stopW)
  2356. <-doneW
  2357. }
  2358. // Notify pending readers
  2359. for len(c.chR) > 0 {
  2360. w := <-c.chR
  2361. w.err = errPipelineConnStopped
  2362. w.done <- struct{}{}
  2363. }
  2364. return err
  2365. }
  2366. func (c *pipelineConnClient) cachedTLSConfig() *tls.Config {
  2367. if !c.IsTLS {
  2368. return nil
  2369. }
  2370. c.tlsConfigLock.Lock()
  2371. cfg := c.tlsConfig
  2372. if cfg == nil {
  2373. cfg = newClientTLSConfig(c.TLSConfig, c.Addr)
  2374. c.tlsConfig = cfg
  2375. }
  2376. c.tlsConfigLock.Unlock()
  2377. return cfg
  2378. }
  2379. func (c *pipelineConnClient) writer(conn net.Conn, stopCh <-chan struct{}) error {
  2380. writeBufferSize := c.WriteBufferSize
  2381. if writeBufferSize <= 0 {
  2382. writeBufferSize = defaultWriteBufferSize
  2383. }
  2384. bw := bufio.NewWriterSize(conn, writeBufferSize)
  2385. defer bw.Flush()
  2386. chR := c.chR
  2387. chW := c.chW
  2388. writeTimeout := c.WriteTimeout
  2389. maxIdleConnDuration := c.MaxIdleConnDuration
  2390. if maxIdleConnDuration <= 0 {
  2391. maxIdleConnDuration = DefaultMaxIdleConnDuration
  2392. }
  2393. maxBatchDelay := c.MaxBatchDelay
  2394. var (
  2395. stopTimer = time.NewTimer(time.Hour)
  2396. flushTimer = time.NewTimer(time.Hour)
  2397. flushTimerCh <-chan time.Time
  2398. instantTimerCh = make(chan time.Time)
  2399. w *pipelineWork
  2400. err error
  2401. )
  2402. close(instantTimerCh)
  2403. for {
  2404. againChW:
  2405. select {
  2406. case w = <-chW:
  2407. // Fast path: len(chW) > 0
  2408. default:
  2409. // Slow path
  2410. stopTimer.Reset(maxIdleConnDuration)
  2411. select {
  2412. case w = <-chW:
  2413. case <-stopTimer.C:
  2414. return nil
  2415. case <-stopCh:
  2416. return nil
  2417. case <-flushTimerCh:
  2418. if err = bw.Flush(); err != nil {
  2419. return err
  2420. }
  2421. flushTimerCh = nil
  2422. goto againChW
  2423. }
  2424. }
  2425. if !w.deadline.IsZero() && time.Since(w.deadline) >= 0 {
  2426. w.err = ErrTimeout
  2427. w.done <- struct{}{}
  2428. continue
  2429. }
  2430. w.resp.parseNetConn(conn)
  2431. if writeTimeout > 0 {
  2432. // Set Deadline every time, since golang has fixed the performance issue
  2433. // See https://github.com/golang/go/issues/15133#issuecomment-271571395 for details
  2434. currentTime := time.Now()
  2435. if err = conn.SetWriteDeadline(currentTime.Add(writeTimeout)); err != nil {
  2436. w.err = err
  2437. w.done <- struct{}{}
  2438. return err
  2439. }
  2440. }
  2441. if err = w.req.Write(bw); err != nil {
  2442. w.err = err
  2443. w.done <- struct{}{}
  2444. return err
  2445. }
  2446. if flushTimerCh == nil && (len(chW) == 0 || len(chR) == cap(chR)) {
  2447. if maxBatchDelay > 0 {
  2448. flushTimer.Reset(maxBatchDelay)
  2449. flushTimerCh = flushTimer.C
  2450. } else {
  2451. flushTimerCh = instantTimerCh
  2452. }
  2453. }
  2454. againChR:
  2455. select {
  2456. case chR <- w:
  2457. // Fast path: len(chR) < cap(chR)
  2458. default:
  2459. // Slow path
  2460. select {
  2461. case chR <- w:
  2462. case <-stopCh:
  2463. w.err = errPipelineConnStopped
  2464. w.done <- struct{}{}
  2465. return nil
  2466. case <-flushTimerCh:
  2467. if err = bw.Flush(); err != nil {
  2468. w.err = err
  2469. w.done <- struct{}{}
  2470. return err
  2471. }
  2472. flushTimerCh = nil
  2473. goto againChR
  2474. }
  2475. }
  2476. }
  2477. }
  2478. func (c *pipelineConnClient) reader(conn net.Conn, stopCh <-chan struct{}) error {
  2479. readBufferSize := c.ReadBufferSize
  2480. if readBufferSize <= 0 {
  2481. readBufferSize = defaultReadBufferSize
  2482. }
  2483. br := bufio.NewReaderSize(conn, readBufferSize)
  2484. chR := c.chR
  2485. readTimeout := c.ReadTimeout
  2486. var (
  2487. w *pipelineWork
  2488. err error
  2489. )
  2490. for {
  2491. select {
  2492. case w = <-chR:
  2493. // Fast path: len(chR) > 0
  2494. default:
  2495. // Slow path
  2496. select {
  2497. case w = <-chR:
  2498. case <-stopCh:
  2499. return nil
  2500. }
  2501. }
  2502. if readTimeout > 0 {
  2503. // Set Deadline every time, since golang has fixed the performance issue
  2504. // See https://github.com/golang/go/issues/15133#issuecomment-271571395 for details
  2505. currentTime := time.Now()
  2506. if err = conn.SetReadDeadline(currentTime.Add(readTimeout)); err != nil {
  2507. w.err = err
  2508. w.done <- struct{}{}
  2509. return err
  2510. }
  2511. }
  2512. if err = w.resp.Read(br); err != nil {
  2513. w.err = err
  2514. w.done <- struct{}{}
  2515. return err
  2516. }
  2517. w.done <- struct{}{}
  2518. }
  2519. }
  2520. func (c *pipelineConnClient) logger() Logger {
  2521. if c.Logger != nil {
  2522. return c.Logger
  2523. }
  2524. return defaultLogger
  2525. }
  2526. // PendingRequests returns the current number of pending requests pipelined
  2527. // to the server.
  2528. //
  2529. // This number may exceed MaxPendingRequests*MaxConns by up to two times, since
  2530. // each connection to the server may keep up to MaxPendingRequests requests
  2531. // in the queue before sending them to the server.
  2532. //
  2533. // This function may be used for balancing load among multiple PipelineClient
  2534. // instances.
  2535. func (c *PipelineClient) PendingRequests() int {
  2536. c.connClientsLock.Lock()
  2537. n := 0
  2538. for _, cc := range c.connClients {
  2539. n += cc.PendingRequests()
  2540. }
  2541. c.connClientsLock.Unlock()
  2542. return n
  2543. }
  2544. func (c *pipelineConnClient) PendingRequests() int {
  2545. c.init()
  2546. c.chLock.Lock()
  2547. n := len(c.chR) + len(c.chW)
  2548. c.chLock.Unlock()
  2549. return n
  2550. }
  2551. var errPipelineConnStopped = errors.New("pipeline connection has been stopped")
  2552. var DefaultTransport RoundTripper = &transport{}
  2553. type transport struct{}
  2554. func (t *transport) RoundTrip(hc *HostClient, req *Request, resp *Response) (retry bool, err error) {
  2555. customSkipBody := resp.SkipBody
  2556. customStreamBody := resp.StreamBody
  2557. var deadline time.Time
  2558. if req.timeout > 0 {
  2559. deadline = time.Now().Add(req.timeout)
  2560. }
  2561. cc, err := hc.acquireConn(req.timeout, req.ConnectionClose())
  2562. if err != nil {
  2563. return false, err
  2564. }
  2565. conn := cc.c
  2566. resp.parseNetConn(conn)
  2567. writeDeadline := deadline
  2568. if hc.WriteTimeout > 0 {
  2569. tmpWriteDeadline := time.Now().Add(hc.WriteTimeout)
  2570. if writeDeadline.IsZero() || tmpWriteDeadline.Before(writeDeadline) {
  2571. writeDeadline = tmpWriteDeadline
  2572. }
  2573. }
  2574. if err = conn.SetWriteDeadline(writeDeadline); err != nil {
  2575. hc.closeConn(cc)
  2576. return true, err
  2577. }
  2578. resetConnection := false
  2579. if hc.MaxConnDuration > 0 && time.Since(cc.createdTime) > hc.MaxConnDuration && !req.ConnectionClose() {
  2580. req.SetConnectionClose()
  2581. resetConnection = true
  2582. }
  2583. bw := hc.acquireWriter(conn)
  2584. err = req.Write(bw)
  2585. if resetConnection {
  2586. req.Header.ResetConnectionClose()
  2587. }
  2588. if err == nil {
  2589. err = bw.Flush()
  2590. }
  2591. hc.releaseWriter(bw)
  2592. // Return ErrTimeout on any timeout.
  2593. if x, ok := err.(interface{ Timeout() bool }); ok && x.Timeout() {
  2594. err = ErrTimeout
  2595. }
  2596. isConnRST := isConnectionReset(err)
  2597. if err != nil && !isConnRST {
  2598. hc.closeConn(cc)
  2599. return true, err
  2600. }
  2601. readDeadline := deadline
  2602. if hc.ReadTimeout > 0 {
  2603. tmpReadDeadline := time.Now().Add(hc.ReadTimeout)
  2604. if readDeadline.IsZero() || tmpReadDeadline.Before(readDeadline) {
  2605. readDeadline = tmpReadDeadline
  2606. }
  2607. }
  2608. if err = conn.SetReadDeadline(readDeadline); err != nil {
  2609. hc.closeConn(cc)
  2610. return true, err
  2611. }
  2612. if customSkipBody || req.Header.IsHead() {
  2613. resp.SkipBody = true
  2614. }
  2615. if hc.DisableHeaderNamesNormalizing {
  2616. resp.Header.DisableNormalizing()
  2617. }
  2618. br := hc.acquireReader(conn)
  2619. err = resp.ReadLimitBody(br, hc.MaxResponseBodySize)
  2620. if err != nil {
  2621. hc.releaseReader(br)
  2622. hc.closeConn(cc)
  2623. // Don't retry in case of ErrBodyTooLarge since we will just get the same again.
  2624. needRetry := err != ErrBodyTooLarge
  2625. return needRetry, err
  2626. }
  2627. closeConn := resetConnection || req.ConnectionClose() || resp.ConnectionClose() || isConnRST
  2628. if customStreamBody && resp.bodyStream != nil {
  2629. rbs := resp.bodyStream
  2630. resp.bodyStream = newCloseReaderWithError(rbs, func(wErr error) error {
  2631. hc.releaseReader(br)
  2632. if r, ok := rbs.(*requestStream); ok {
  2633. releaseRequestStream(r)
  2634. }
  2635. if closeConn || resp.ConnectionClose() || wErr != nil {
  2636. hc.closeConn(cc)
  2637. } else {
  2638. hc.releaseConn(cc)
  2639. }
  2640. return nil
  2641. })
  2642. return false, nil
  2643. } else {
  2644. hc.releaseReader(br)
  2645. }
  2646. if closeConn {
  2647. hc.closeConn(cc)
  2648. } else {
  2649. hc.releaseConn(cc)
  2650. }
  2651. return false, nil
  2652. }