client.go 84 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546254725482549255025512552255325542555255625572558255925602561256225632564256525662567256825692570257125722573257425752576257725782579258025812582258325842585258625872588258925902591259225932594259525962597259825992600260126022603260426052606260726082609261026112612261326142615261626172618261926202621262226232624262526262627262826292630263126322633263426352636263726382639264026412642264326442645264626472648264926502651265226532654265526562657265826592660266126622663266426652666266726682669267026712672267326742675267626772678267926802681268226832684268526862687268826892690269126922693269426952696269726982699270027012702270327042705270627072708270927102711271227132714271527162717271827192720272127222723272427252726272727282729273027312732273327342735273627372738273927402741274227432744274527462747274827492750275127522753275427552756275727582759276027612762276327642765276627672768276927702771277227732774277527762777277827792780278127822783278427852786278727882789279027912792279327942795279627972798279928002801280228032804280528062807280828092810281128122813281428152816281728182819282028212822282328242825282628272828282928302831283228332834283528362837283828392840284128422843284428452846284728482849285028512852285328542855285628572858285928602861286228632864286528662867286828692870287128722873287428752876287728782879288028812882288328842885288628872888288928902891289228932894289528962897289828992900290129022903290429052906290729082909291029112912291329142915291629172918291929202921292229232924292529262927292829292930293129322933293429352936293729382939294029412942294329442945294629472948294929502951295229532954295529562957295829592960296129622963296429652966296729682969297029712972297329742975297629772978297929802981298229832984298529862987298829892990299129922993299429952996299729982999300030013002300330043005300630073008300930103011301230133014301530163017301830193020302130223023302430253026302730283029303030313032303330343035303630373038303930403041304230433044304530463047304830493050305130523053305430553056305730583059306030613062306330643065306630673068306930703071307230733074307530763077307830793080308130823083308430853086308730883089309030913092309330943095309630973098309931003101310231033104
  1. package fasthttp
  2. import (
  3. "bufio"
  4. "bytes"
  5. "crypto/tls"
  6. "errors"
  7. "fmt"
  8. "io"
  9. "net"
  10. "strings"
  11. "sync"
  12. "sync/atomic"
  13. "time"
  14. )
  15. // Do performs the given http request and fills the given http response.
  16. //
  17. // Request must contain at least non-zero RequestURI with full url (including
  18. // scheme and host) or non-zero Host header + RequestURI.
  19. //
  20. // Client determines the server to be requested in the following order:
  21. //
  22. // - from RequestURI if it contains full url with scheme and host;
  23. // - from Host header otherwise.
  24. //
  25. // The function doesn't follow redirects. Use Get* for following redirects.
  26. //
  27. // Response is ignored if resp is nil.
  28. //
  29. // ErrNoFreeConns is returned if all DefaultMaxConnsPerHost connections
  30. // to the requested host are busy.
  31. //
  32. // It is recommended obtaining req and resp via AcquireRequest
  33. // and AcquireResponse in performance-critical code.
  34. func Do(req *Request, resp *Response) error {
  35. return defaultClient.Do(req, resp)
  36. }
  37. // DoTimeout performs the given request and waits for response during
  38. // the given timeout duration.
  39. //
  40. // Request must contain at least non-zero RequestURI with full url (including
  41. // scheme and host) or non-zero Host header + RequestURI.
  42. //
  43. // Client determines the server to be requested in the following order:
  44. //
  45. // - from RequestURI if it contains full url with scheme and host;
  46. // - from Host header otherwise.
  47. //
  48. // The function doesn't follow redirects. Use Get* for following redirects.
  49. //
  50. // Response is ignored if resp is nil.
  51. //
  52. // ErrTimeout is returned if the response wasn't returned during
  53. // the given timeout.
  54. //
  55. // ErrNoFreeConns is returned if all DefaultMaxConnsPerHost connections
  56. // to the requested host are busy.
  57. //
  58. // It is recommended obtaining req and resp via AcquireRequest
  59. // and AcquireResponse in performance-critical code.
  60. func DoTimeout(req *Request, resp *Response, timeout time.Duration) error {
  61. return defaultClient.DoTimeout(req, resp, timeout)
  62. }
  63. // DoDeadline performs the given request and waits for response until
  64. // the given deadline.
  65. //
  66. // Request must contain at least non-zero RequestURI with full url (including
  67. // scheme and host) or non-zero Host header + RequestURI.
  68. //
  69. // Client determines the server to be requested in the following order:
  70. //
  71. // - from RequestURI if it contains full url with scheme and host;
  72. // - from Host header otherwise.
  73. //
  74. // The function doesn't follow redirects. Use Get* for following redirects.
  75. //
  76. // Response is ignored if resp is nil.
  77. //
  78. // ErrTimeout is returned if the response wasn't returned until
  79. // the given deadline.
  80. //
  81. // ErrNoFreeConns is returned if all DefaultMaxConnsPerHost connections
  82. // to the requested host are busy.
  83. //
  84. // It is recommended obtaining req and resp via AcquireRequest
  85. // and AcquireResponse in performance-critical code.
  86. func DoDeadline(req *Request, resp *Response, deadline time.Time) error {
  87. return defaultClient.DoDeadline(req, resp, deadline)
  88. }
  89. // DoRedirects performs the given http request and fills the given http response,
  90. // following up to maxRedirectsCount redirects. When the redirect count exceeds
  91. // maxRedirectsCount, ErrTooManyRedirects is returned.
  92. //
  93. // Request must contain at least non-zero RequestURI with full url (including
  94. // scheme and host) or non-zero Host header + RequestURI.
  95. //
  96. // Client determines the server to be requested in the following order:
  97. //
  98. // - from RequestURI if it contains full url with scheme and host;
  99. // - from Host header otherwise.
  100. //
  101. // Response is ignored if resp is nil.
  102. //
  103. // ErrNoFreeConns is returned if all DefaultMaxConnsPerHost connections
  104. // to the requested host are busy.
  105. //
  106. // It is recommended obtaining req and resp via AcquireRequest
  107. // and AcquireResponse in performance-critical code.
  108. func DoRedirects(req *Request, resp *Response, maxRedirectsCount int) error {
  109. if defaultClient.DisablePathNormalizing {
  110. req.URI().DisablePathNormalizing = true
  111. }
  112. _, _, err := doRequestFollowRedirects(req, resp, req.URI().String(), maxRedirectsCount, &defaultClient)
  113. return err
  114. }
  115. // Get returns the status code and body of url.
  116. //
  117. // The contents of dst will be replaced by the body and returned, if the dst
  118. // is too small a new slice will be allocated.
  119. //
  120. // The function follows redirects. Use Do* for manually handling redirects.
  121. func Get(dst []byte, url string) (statusCode int, body []byte, err error) {
  122. return defaultClient.Get(dst, url)
  123. }
  124. // GetTimeout returns the status code and body of url.
  125. //
  126. // The contents of dst will be replaced by the body and returned, if the dst
  127. // is too small a new slice will be allocated.
  128. //
  129. // The function follows redirects. Use Do* for manually handling redirects.
  130. //
  131. // ErrTimeout error is returned if url contents couldn't be fetched
  132. // during the given timeout.
  133. func GetTimeout(dst []byte, url string, timeout time.Duration) (statusCode int, body []byte, err error) {
  134. return defaultClient.GetTimeout(dst, url, timeout)
  135. }
  136. // GetDeadline returns the status code and body of url.
  137. //
  138. // The contents of dst will be replaced by the body and returned, if the dst
  139. // is too small a new slice will be allocated.
  140. //
  141. // The function follows redirects. Use Do* for manually handling redirects.
  142. //
  143. // ErrTimeout error is returned if url contents couldn't be fetched
  144. // until the given deadline.
  145. func GetDeadline(dst []byte, url string, deadline time.Time) (statusCode int, body []byte, err error) {
  146. return defaultClient.GetDeadline(dst, url, deadline)
  147. }
  148. // Post sends POST request to the given url with the given POST arguments.
  149. //
  150. // The contents of dst will be replaced by the body and returned, if the dst
  151. // is too small a new slice will be allocated.
  152. //
  153. // The function follows redirects. Use Do* for manually handling redirects.
  154. //
  155. // Empty POST body is sent if postArgs is nil.
  156. func Post(dst []byte, url string, postArgs *Args) (statusCode int, body []byte, err error) {
  157. return defaultClient.Post(dst, url, postArgs)
  158. }
  159. var defaultClient Client
  160. // Client implements http client.
  161. //
  162. // Copying Client by value is prohibited. Create new instance instead.
  163. //
  164. // It is safe calling Client methods from concurrently running goroutines.
  165. //
  166. // The fields of a Client should not be changed while it is in use.
  167. type Client struct {
  168. noCopy noCopy
  169. readerPool sync.Pool
  170. writerPool sync.Pool
  171. // Transport defines a transport-like mechanism that wraps every request/response.
  172. Transport RoundTripper
  173. // Callback for establishing new connections to hosts.
  174. //
  175. // Default DialTimeout is used if not set.
  176. DialTimeout DialFuncWithTimeout
  177. // Callback for establishing new connections to hosts.
  178. //
  179. // Note that if Dial is set instead of DialTimeout, Dial will ignore Request timeout.
  180. // If you want the tcp dial process to account for request timeouts, use DialTimeout instead.
  181. //
  182. // If not set, DialTimeout is used.
  183. Dial DialFunc
  184. // TLS config for https connections.
  185. //
  186. // Default TLS config is used if not set.
  187. TLSConfig *tls.Config
  188. // RetryIf controls whether a retry should be attempted after an error.
  189. //
  190. // By default will use isIdempotent function.
  191. //
  192. // Deprecated: Use RetryIfErr instead.
  193. // This field is only effective when the `RetryIfErr` field is not set.
  194. RetryIf RetryIfFunc
  195. // When the client encounters an error during a request, the behavior—whether to retry
  196. // and whether to reset the request timeout—should be determined
  197. // based on the return value of this field.
  198. // This field is only effective within the range of MaxIdemponentCallAttempts.
  199. RetryIfErr RetryIfErrFunc
  200. // ConfigureClient configures the fasthttp.HostClient.
  201. ConfigureClient func(hc *HostClient) error
  202. m map[string]*HostClient
  203. ms map[string]*HostClient
  204. // Client name. Used in User-Agent request header.
  205. //
  206. // Default client name is used if not set.
  207. Name string
  208. // Maximum number of connections per each host which may be established.
  209. //
  210. // DefaultMaxConnsPerHost is used if not set.
  211. MaxConnsPerHost int
  212. // Idle keep-alive connections are closed after this duration.
  213. //
  214. // By default idle connections are closed
  215. // after DefaultMaxIdleConnDuration.
  216. MaxIdleConnDuration time.Duration
  217. // Keep-alive connections are closed after this duration.
  218. //
  219. // By default connection duration is unlimited.
  220. MaxConnDuration time.Duration
  221. // Maximum number of attempts for idempotent calls.
  222. //
  223. // DefaultMaxIdemponentCallAttempts is used if not set.
  224. MaxIdemponentCallAttempts int
  225. // Per-connection buffer size for responses' reading.
  226. // This also limits the maximum header size.
  227. //
  228. // Default buffer size is used if 0.
  229. ReadBufferSize int
  230. // Per-connection buffer size for requests' writing.
  231. //
  232. // Default buffer size is used if 0.
  233. WriteBufferSize int
  234. // Maximum duration for full response reading (including body).
  235. //
  236. // By default response read timeout is unlimited.
  237. ReadTimeout time.Duration
  238. // Maximum duration for full request writing (including body).
  239. //
  240. // By default request write timeout is unlimited.
  241. WriteTimeout time.Duration
  242. // Maximum response body size.
  243. //
  244. // The client returns ErrBodyTooLarge if this limit is greater than 0
  245. // and response body is greater than the limit.
  246. //
  247. // By default response body size is unlimited.
  248. MaxResponseBodySize int
  249. // Maximum duration for waiting for a free connection.
  250. //
  251. // By default will not waiting, return ErrNoFreeConns immediately.
  252. MaxConnWaitTimeout time.Duration
  253. // Connection pool strategy. Can be either LIFO or FIFO (default).
  254. ConnPoolStrategy ConnPoolStrategyType
  255. mLock sync.RWMutex
  256. mOnce sync.Once
  257. // NoDefaultUserAgentHeader when set to true, causes the default
  258. // User-Agent header to be excluded from the Request.
  259. NoDefaultUserAgentHeader bool
  260. // Attempt to connect to both ipv4 and ipv6 addresses if set to true.
  261. //
  262. // This option is used only if default TCP dialer is used,
  263. // i.e. if Dial is blank.
  264. //
  265. // By default client connects only to ipv4 addresses,
  266. // since unfortunately ipv6 remains broken in many networks worldwide :)
  267. DialDualStack bool
  268. // Header names are passed as-is without normalization
  269. // if this option is set.
  270. //
  271. // Disabled header names' normalization may be useful only for proxying
  272. // responses to other clients expecting case-sensitive
  273. // header names. See https://github.com/valyala/fasthttp/issues/57
  274. // for details.
  275. //
  276. // By default request and response header names are normalized, i.e.
  277. // The first letter and the first letters following dashes
  278. // are uppercased, while all the other letters are lowercased.
  279. // Examples:
  280. //
  281. // * HOST -> Host
  282. // * content-type -> Content-Type
  283. // * cONTENT-lenGTH -> Content-Length
  284. DisableHeaderNamesNormalizing bool
  285. // Path values are sent as-is without normalization.
  286. //
  287. // Disabled path normalization may be useful for proxying incoming requests
  288. // to servers that are expecting paths to be forwarded as-is.
  289. //
  290. // By default path values are normalized, i.e.
  291. // extra slashes are removed, special characters are encoded.
  292. DisablePathNormalizing bool
  293. // StreamResponseBody enables response body streaming.
  294. StreamResponseBody bool
  295. }
  296. // Get returns the status code and body of url.
  297. //
  298. // The contents of dst will be replaced by the body and returned, if the dst
  299. // is too small a new slice will be allocated.
  300. //
  301. // The function follows redirects. Use Do* for manually handling redirects.
  302. func (c *Client) Get(dst []byte, url string) (statusCode int, body []byte, err error) {
  303. return clientGetURL(dst, url, c)
  304. }
  305. // GetTimeout returns the status code and body of url.
  306. //
  307. // The contents of dst will be replaced by the body and returned, if the dst
  308. // is too small a new slice will be allocated.
  309. //
  310. // The function follows redirects. Use Do* for manually handling redirects.
  311. //
  312. // ErrTimeout error is returned if url contents couldn't be fetched
  313. // during the given timeout.
  314. func (c *Client) GetTimeout(dst []byte, url string, timeout time.Duration) (statusCode int, body []byte, err error) {
  315. return clientGetURLTimeout(dst, url, timeout, c)
  316. }
  317. // GetDeadline returns the status code and body of url.
  318. //
  319. // The contents of dst will be replaced by the body and returned, if the dst
  320. // is too small a new slice will be allocated.
  321. //
  322. // The function follows redirects. Use Do* for manually handling redirects.
  323. //
  324. // ErrTimeout error is returned if url contents couldn't be fetched
  325. // until the given deadline.
  326. func (c *Client) GetDeadline(dst []byte, url string, deadline time.Time) (statusCode int, body []byte, err error) {
  327. return clientGetURLDeadline(dst, url, deadline, c)
  328. }
  329. // Post sends POST request to the given url with the given POST arguments.
  330. //
  331. // The contents of dst will be replaced by the body and returned, if the dst
  332. // is too small a new slice will be allocated.
  333. //
  334. // The function follows redirects. Use Do* for manually handling redirects.
  335. //
  336. // Empty POST body is sent if postArgs is nil.
  337. func (c *Client) Post(dst []byte, url string, postArgs *Args) (statusCode int, body []byte, err error) {
  338. return clientPostURL(dst, url, postArgs, c)
  339. }
  340. // DoTimeout performs the given request and waits for response during
  341. // the given timeout duration.
  342. //
  343. // Request must contain at least non-zero RequestURI with full url (including
  344. // scheme and host) or non-zero Host header + RequestURI.
  345. //
  346. // Client determines the server to be requested in the following order:
  347. //
  348. // - from RequestURI if it contains full url with scheme and host;
  349. // - from Host header otherwise.
  350. //
  351. // The function doesn't follow redirects. Use Get* for following redirects.
  352. //
  353. // Response is ignored if resp is nil.
  354. //
  355. // ErrTimeout is returned if the response wasn't returned during
  356. // the given timeout.
  357. // Immediately returns ErrTimeout if timeout value is negative.
  358. //
  359. // ErrNoFreeConns is returned if all Client.MaxConnsPerHost connections
  360. // to the requested host are busy.
  361. //
  362. // It is recommended obtaining req and resp via AcquireRequest
  363. // and AcquireResponse in performance-critical code.
  364. func (c *Client) DoTimeout(req *Request, resp *Response, timeout time.Duration) error {
  365. req.timeout = timeout
  366. if req.timeout <= 0 {
  367. return ErrTimeout
  368. }
  369. return c.Do(req, resp)
  370. }
  371. // DoDeadline performs the given request and waits for response until
  372. // the given deadline.
  373. //
  374. // Request must contain at least non-zero RequestURI with full url (including
  375. // scheme and host) or non-zero Host header + RequestURI.
  376. //
  377. // Client determines the server to be requested in the following order:
  378. //
  379. // - from RequestURI if it contains full url with scheme and host;
  380. // - from Host header otherwise.
  381. //
  382. // The function doesn't follow redirects. Use Get* for following redirects.
  383. //
  384. // Response is ignored if resp is nil.
  385. //
  386. // ErrTimeout is returned if the response wasn't returned until
  387. // the given deadline.
  388. // Immediately returns ErrTimeout if the deadline has already been reached.
  389. //
  390. // ErrNoFreeConns is returned if all Client.MaxConnsPerHost connections
  391. // to the requested host are busy.
  392. //
  393. // It is recommended obtaining req and resp via AcquireRequest
  394. // and AcquireResponse in performance-critical code.
  395. func (c *Client) DoDeadline(req *Request, resp *Response, deadline time.Time) error {
  396. req.timeout = time.Until(deadline)
  397. if req.timeout <= 0 {
  398. return ErrTimeout
  399. }
  400. return c.Do(req, resp)
  401. }
  402. // DoRedirects performs the given http request and fills the given http response,
  403. // following up to maxRedirectsCount redirects. When the redirect count exceeds
  404. // maxRedirectsCount, ErrTooManyRedirects is returned.
  405. //
  406. // Request must contain at least non-zero RequestURI with full url (including
  407. // scheme and host) or non-zero Host header + RequestURI.
  408. //
  409. // Client determines the server to be requested in the following order:
  410. //
  411. // - from RequestURI if it contains full url with scheme and host;
  412. // - from Host header otherwise.
  413. //
  414. // Response is ignored if resp is nil.
  415. //
  416. // ErrNoFreeConns is returned if all DefaultMaxConnsPerHost connections
  417. // to the requested host are busy.
  418. //
  419. // It is recommended obtaining req and resp via AcquireRequest
  420. // and AcquireResponse in performance-critical code.
  421. func (c *Client) DoRedirects(req *Request, resp *Response, maxRedirectsCount int) error {
  422. if c.DisablePathNormalizing {
  423. req.URI().DisablePathNormalizing = true
  424. }
  425. _, _, err := doRequestFollowRedirects(req, resp, req.URI().String(), maxRedirectsCount, c)
  426. return err
  427. }
  428. // Do performs the given http request and fills the given http response.
  429. //
  430. // Request must contain at least non-zero RequestURI with full url (including
  431. // scheme and host) or non-zero Host header + RequestURI.
  432. //
  433. // Client determines the server to be requested in the following order:
  434. //
  435. // - from RequestURI if it contains full url with scheme and host;
  436. // - from Host header otherwise.
  437. //
  438. // Response is ignored if resp is nil.
  439. //
  440. // The function doesn't follow redirects. Use Get* for following redirects.
  441. //
  442. // ErrNoFreeConns is returned if all Client.MaxConnsPerHost connections
  443. // to the requested host are busy.
  444. //
  445. // It is recommended obtaining req and resp via AcquireRequest
  446. // and AcquireResponse in performance-critical code.
  447. func (c *Client) Do(req *Request, resp *Response) error {
  448. uri := req.URI()
  449. if uri == nil {
  450. return ErrorInvalidURI
  451. }
  452. host := uri.Host()
  453. if bytes.ContainsRune(host, ',') {
  454. return fmt.Errorf("invalid host %q. Use HostClient for multiple hosts", host)
  455. }
  456. isTLS := false
  457. if uri.isHTTPS() {
  458. isTLS = true
  459. } else if !uri.isHTTP() {
  460. return fmt.Errorf("unsupported protocol %q. http and https are supported", uri.Scheme())
  461. }
  462. c.mOnce.Do(func() {
  463. c.mLock.Lock()
  464. c.m = make(map[string]*HostClient)
  465. c.ms = make(map[string]*HostClient)
  466. c.mLock.Unlock()
  467. })
  468. startCleaner := false
  469. c.mLock.RLock()
  470. m := c.m
  471. if isTLS {
  472. m = c.ms
  473. }
  474. hc := m[string(host)]
  475. if hc != nil {
  476. atomic.AddInt32(&hc.pendingClientRequests, 1)
  477. defer atomic.AddInt32(&hc.pendingClientRequests, -1)
  478. }
  479. c.mLock.RUnlock()
  480. if hc == nil {
  481. c.mLock.Lock()
  482. hc = m[string(host)]
  483. if hc == nil {
  484. hc = &HostClient{
  485. Addr: AddMissingPort(string(host), isTLS),
  486. Transport: c.Transport,
  487. Name: c.Name,
  488. NoDefaultUserAgentHeader: c.NoDefaultUserAgentHeader,
  489. Dial: c.Dial,
  490. DialTimeout: c.DialTimeout,
  491. DialDualStack: c.DialDualStack,
  492. IsTLS: isTLS,
  493. TLSConfig: c.TLSConfig,
  494. MaxConns: c.MaxConnsPerHost,
  495. MaxIdleConnDuration: c.MaxIdleConnDuration,
  496. MaxConnDuration: c.MaxConnDuration,
  497. MaxIdemponentCallAttempts: c.MaxIdemponentCallAttempts,
  498. ReadBufferSize: c.ReadBufferSize,
  499. WriteBufferSize: c.WriteBufferSize,
  500. ReadTimeout: c.ReadTimeout,
  501. WriteTimeout: c.WriteTimeout,
  502. MaxResponseBodySize: c.MaxResponseBodySize,
  503. DisableHeaderNamesNormalizing: c.DisableHeaderNamesNormalizing,
  504. DisablePathNormalizing: c.DisablePathNormalizing,
  505. MaxConnWaitTimeout: c.MaxConnWaitTimeout,
  506. RetryIf: c.RetryIf,
  507. RetryIfErr: c.RetryIfErr,
  508. ConnPoolStrategy: c.ConnPoolStrategy,
  509. StreamResponseBody: c.StreamResponseBody,
  510. clientReaderPool: &c.readerPool,
  511. clientWriterPool: &c.writerPool,
  512. }
  513. if c.ConfigureClient != nil {
  514. if err := c.ConfigureClient(hc); err != nil {
  515. c.mLock.Unlock()
  516. return err
  517. }
  518. }
  519. m[string(host)] = hc
  520. if len(m) == 1 {
  521. startCleaner = true
  522. }
  523. }
  524. atomic.AddInt32(&hc.pendingClientRequests, 1)
  525. defer atomic.AddInt32(&hc.pendingClientRequests, -1)
  526. c.mLock.Unlock()
  527. }
  528. if startCleaner {
  529. go c.mCleaner(m)
  530. }
  531. return hc.Do(req, resp)
  532. }
  533. // CloseIdleConnections closes any connections which were previously
  534. // connected from previous requests but are now sitting idle in a
  535. // "keep-alive" state. It does not interrupt any connections currently
  536. // in use.
  537. func (c *Client) CloseIdleConnections() {
  538. c.mLock.RLock()
  539. for _, v := range c.m {
  540. v.CloseIdleConnections()
  541. }
  542. for _, v := range c.ms {
  543. v.CloseIdleConnections()
  544. }
  545. c.mLock.RUnlock()
  546. }
  547. func (c *Client) mCleaner(m map[string]*HostClient) {
  548. mustStop := false
  549. sleep := c.MaxIdleConnDuration
  550. if sleep < time.Second {
  551. sleep = time.Second
  552. } else if sleep > 10*time.Second {
  553. sleep = 10 * time.Second
  554. }
  555. for {
  556. time.Sleep(sleep)
  557. c.mLock.Lock()
  558. for k, v := range m {
  559. v.connsLock.Lock()
  560. if v.connsCount == 0 && atomic.LoadInt32(&v.pendingClientRequests) == 0 {
  561. delete(m, k)
  562. }
  563. v.connsLock.Unlock()
  564. }
  565. if len(m) == 0 {
  566. mustStop = true
  567. }
  568. c.mLock.Unlock()
  569. if mustStop {
  570. break
  571. }
  572. }
  573. }
  574. // DefaultMaxConnsPerHost is the maximum number of concurrent connections
  575. // http client may establish per host by default (i.e. if
  576. // Client.MaxConnsPerHost isn't set).
  577. const DefaultMaxConnsPerHost = 512
  578. // DefaultMaxIdleConnDuration is the default duration before idle keep-alive
  579. // connection is closed.
  580. const DefaultMaxIdleConnDuration = 10 * time.Second
  581. // DefaultMaxIdemponentCallAttempts is the default idempotent calls attempts count.
  582. const DefaultMaxIdemponentCallAttempts = 5
  583. // DialFunc must establish connection to addr.
  584. //
  585. // There is no need in establishing TLS (SSL) connection for https.
  586. // The client automatically converts connection to TLS
  587. // if HostClient.IsTLS is set.
  588. //
  589. // TCP address passed to DialFunc always contains host and port.
  590. // Example TCP addr values:
  591. //
  592. // - foobar.com:80
  593. // - foobar.com:443
  594. // - foobar.com:8080
  595. type DialFunc func(addr string) (net.Conn, error)
  596. // DialFuncWithTimeout must establish connection to addr.
  597. // Unlike DialFunc, it also accepts a timeout.
  598. //
  599. // There is no need in establishing TLS (SSL) connection for https.
  600. // The client automatically converts connection to TLS
  601. // if HostClient.IsTLS is set.
  602. //
  603. // TCP address passed to DialFuncWithTimeout always contains host and port.
  604. // Example TCP addr values:
  605. //
  606. // - foobar.com:80
  607. // - foobar.com:443
  608. // - foobar.com:8080
  609. type DialFuncWithTimeout func(addr string, timeout time.Duration) (net.Conn, error)
  610. // RetryIfFunc defines the signature of the retry if function.
  611. // Request argument passed to RetryIfFunc, if there are any request errors.
  612. type RetryIfFunc func(request *Request) bool
  613. // RetryIfErrFunc defines an interface used for implementing the following functionality:
  614. // When the client encounters an error during a request, the behavior—whether to retry
  615. // and whether to reset the request timeout—should be determined
  616. // based on the return value of this interface.
  617. //
  618. // attempt indicates which attempt the current retry is due to a failure of.
  619. // The first request counts as the first attempt.
  620. //
  621. // err represents the error encountered while attempting the `attempts`-th request.
  622. //
  623. // resetTimeout indicates whether to reuse the `Request`'s timeout as the timeout interval,
  624. // rather than using the timeout after subtracting the time spent on previous failed requests.
  625. // This return value is meaningful only when you use `Request.SetTimeout`, `DoTimeout`, or `DoDeadline`.
  626. //
  627. // retry indicates whether to retry the current request. If it is false,
  628. // the request function will immediately return with the `err`.
  629. type RetryIfErrFunc func(request *Request, attempts int, err error) (resetTimeout bool, retry bool)
  630. // RoundTripper wraps every request/response.
  631. type RoundTripper interface {
  632. RoundTrip(hc *HostClient, req *Request, resp *Response) (retry bool, err error)
  633. }
  634. // ConnPoolStrategyType define strategy of connection pool enqueue/dequeue.
  635. type ConnPoolStrategyType int
  636. const (
  637. FIFO ConnPoolStrategyType = iota
  638. LIFO
  639. )
  640. // HostClient balances http requests among hosts listed in Addr.
  641. //
  642. // HostClient may be used for balancing load among multiple upstream hosts.
  643. // While multiple addresses passed to HostClient.Addr may be used for balancing
  644. // load among them, it would be better using LBClient instead, since HostClient
  645. // may unevenly balance load among upstream hosts.
  646. //
  647. // It is forbidden copying HostClient instances. Create new instances instead.
  648. //
  649. // It is safe calling HostClient methods from concurrently running goroutines.
  650. type HostClient struct {
  651. noCopy noCopy
  652. readerPool sync.Pool
  653. writerPool sync.Pool
  654. // Transport defines a transport-like mechanism that wraps every request/response.
  655. Transport RoundTripper
  656. // Callback for establishing new connections to hosts.
  657. //
  658. // Default DialTimeout is used if not set.
  659. DialTimeout DialFuncWithTimeout
  660. // Callback for establishing new connections to hosts.
  661. //
  662. // Note that if Dial is set instead of DialTimeout, Dial will ignore Request timeout.
  663. // If you want the tcp dial process to account for request timeouts, use DialTimeout instead.
  664. //
  665. // If not set, DialTimeout is used.
  666. Dial DialFunc
  667. // Optional TLS config.
  668. TLSConfig *tls.Config
  669. // RetryIf controls whether a retry should be attempted after an error.
  670. // By default, it uses the isIdempotent function.
  671. //
  672. // Deprecated: Use RetryIfErr instead.
  673. // This field is only effective when the `RetryIfErr` field is not set.
  674. RetryIf RetryIfFunc
  675. // When the client encounters an error during a request, the behavior—whether to retry
  676. // and whether to reset the request timeout—should be determined
  677. // based on the return value of this field.
  678. // This field is only effective within the range of MaxIdemponentCallAttempts.
  679. RetryIfErr RetryIfErrFunc
  680. connsWait *wantConnQueue
  681. tlsConfigMap map[string]*tls.Config
  682. clientReaderPool *sync.Pool
  683. clientWriterPool *sync.Pool
  684. // Comma-separated list of upstream HTTP server host addresses,
  685. // which are passed to Dial or DialTimeout in a round-robin manner.
  686. //
  687. // Each address may contain port if default dialer is used.
  688. // For example,
  689. //
  690. // - foobar.com:80
  691. // - foobar.com:443
  692. // - foobar.com:8080
  693. Addr string
  694. // Client name. Used in User-Agent request header.
  695. Name string
  696. conns []*clientConn
  697. addrs []string
  698. // Maximum number of connections which may be established to all hosts
  699. // listed in Addr.
  700. //
  701. // You can change this value while the HostClient is being used
  702. // with HostClient.SetMaxConns(value)
  703. //
  704. // DefaultMaxConnsPerHost is used if not set.
  705. MaxConns int
  706. // Keep-alive connections are closed after this duration.
  707. //
  708. // By default connection duration is unlimited.
  709. MaxConnDuration time.Duration
  710. // Idle keep-alive connections are closed after this duration.
  711. //
  712. // By default idle connections are closed
  713. // after DefaultMaxIdleConnDuration.
  714. MaxIdleConnDuration time.Duration
  715. // Maximum number of attempts for idempotent calls.
  716. //
  717. // A value of 0 or a negative value represents using DefaultMaxIdemponentCallAttempts.
  718. // For example, a value of 1 means the request will be executed only once,
  719. // while 2 means the request will be executed at most twice.
  720. // The RetryIfErr and RetryIf fields can invalidate remaining attempts.
  721. MaxIdemponentCallAttempts int
  722. // Per-connection buffer size for responses' reading.
  723. // This also limits the maximum header size.
  724. //
  725. // Default buffer size is used if 0.
  726. ReadBufferSize int
  727. // Per-connection buffer size for requests' writing.
  728. //
  729. // Default buffer size is used if 0.
  730. WriteBufferSize int
  731. // Maximum duration for full response reading (including body).
  732. //
  733. // By default response read timeout is unlimited.
  734. ReadTimeout time.Duration
  735. // Maximum duration for full request writing (including body).
  736. //
  737. // By default request write timeout is unlimited.
  738. WriteTimeout time.Duration
  739. // Maximum response body size.
  740. //
  741. // The client returns ErrBodyTooLarge if this limit is greater than 0
  742. // and response body is greater than the limit.
  743. //
  744. // By default response body size is unlimited.
  745. MaxResponseBodySize int
  746. // Maximum duration for waiting for a free connection.
  747. //
  748. // By default will not waiting, return ErrNoFreeConns immediately
  749. MaxConnWaitTimeout time.Duration
  750. // Connection pool strategy. Can be either LIFO or FIFO (default).
  751. ConnPoolStrategy ConnPoolStrategyType
  752. connsCount int
  753. connsLock sync.Mutex
  754. addrsLock sync.Mutex
  755. tlsConfigMapLock sync.Mutex
  756. addrIdx uint32
  757. lastUseTime uint32
  758. pendingRequests int32
  759. // pendingClientRequests counts the number of requests that a Client is currently running using this HostClient.
  760. // It will be incremented earlier than pendingRequests and will be used by Client to see if the HostClient is still in use.
  761. pendingClientRequests int32
  762. // NoDefaultUserAgentHeader when set to true, causes the default
  763. // User-Agent header to be excluded from the Request.
  764. NoDefaultUserAgentHeader bool
  765. // Attempt to connect to both ipv4 and ipv6 host addresses
  766. // if set to true.
  767. //
  768. // This option is used only if default TCP dialer is used,
  769. // i.e. if Dial and DialTimeout are blank.
  770. //
  771. // By default client connects only to ipv4 addresses,
  772. // since unfortunately ipv6 remains broken in many networks worldwide :)
  773. DialDualStack bool
  774. // Whether to use TLS (aka SSL or HTTPS) for host connections.
  775. IsTLS bool
  776. // Header names are passed as-is without normalization
  777. // if this option is set.
  778. //
  779. // Disabled header names' normalization may be useful only for proxying
  780. // responses to other clients expecting case-sensitive
  781. // header names. See https://github.com/valyala/fasthttp/issues/57
  782. // for details.
  783. //
  784. // By default request and response header names are normalized, i.e.
  785. // The first letter and the first letters following dashes
  786. // are uppercased, while all the other letters are lowercased.
  787. // Examples:
  788. //
  789. // * HOST -> Host
  790. // * content-type -> Content-Type
  791. // * cONTENT-lenGTH -> Content-Length
  792. DisableHeaderNamesNormalizing bool
  793. // Path values are sent as-is without normalization.
  794. //
  795. // Disabled path normalization may be useful for proxying incoming requests
  796. // to servers that are expecting paths to be forwarded as-is.
  797. //
  798. // By default path values are normalized, i.e.
  799. // extra slashes are removed, special characters are encoded.
  800. DisablePathNormalizing bool
  801. // Will not log potentially sensitive content in error logs.
  802. //
  803. // This option is useful for servers that handle sensitive data
  804. // in the request/response.
  805. //
  806. // Client logs full errors by default.
  807. SecureErrorLogMessage bool
  808. // StreamResponseBody enables response body streaming.
  809. StreamResponseBody bool
  810. connsCleanerRun bool
  811. }
  812. type clientConn struct {
  813. c net.Conn
  814. createdTime time.Time
  815. lastUseTime time.Time
  816. }
  817. // CreatedTime returns net.Conn the client.
  818. func (cc *clientConn) Conn() net.Conn {
  819. return cc.c
  820. }
  821. // CreatedTime returns time the client was created.
  822. func (cc *clientConn) CreatedTime() time.Time {
  823. return cc.createdTime
  824. }
  825. // LastUseTime returns time the client was last used.
  826. func (cc *clientConn) LastUseTime() time.Time {
  827. return cc.lastUseTime
  828. }
  829. var startTimeUnix = time.Now().Unix()
  830. // LastUseTime returns time the client was last used.
  831. func (c *HostClient) LastUseTime() time.Time {
  832. n := atomic.LoadUint32(&c.lastUseTime)
  833. return time.Unix(startTimeUnix+int64(n), 0)
  834. }
  835. // Get returns the status code and body of url.
  836. //
  837. // The contents of dst will be replaced by the body and returned, if the dst
  838. // is too small a new slice will be allocated.
  839. //
  840. // The function follows redirects. Use Do* for manually handling redirects.
  841. func (c *HostClient) Get(dst []byte, url string) (statusCode int, body []byte, err error) {
  842. return clientGetURL(dst, url, c)
  843. }
  844. // GetTimeout returns the status code and body of url.
  845. //
  846. // The contents of dst will be replaced by the body and returned, if the dst
  847. // is too small a new slice will be allocated.
  848. //
  849. // The function follows redirects. Use Do* for manually handling redirects.
  850. //
  851. // ErrTimeout error is returned if url contents couldn't be fetched
  852. // during the given timeout.
  853. func (c *HostClient) GetTimeout(dst []byte, url string, timeout time.Duration) (statusCode int, body []byte, err error) {
  854. return clientGetURLTimeout(dst, url, timeout, c)
  855. }
  856. // GetDeadline returns the status code and body of url.
  857. //
  858. // The contents of dst will be replaced by the body and returned, if the dst
  859. // is too small a new slice will be allocated.
  860. //
  861. // The function follows redirects. Use Do* for manually handling redirects.
  862. //
  863. // ErrTimeout error is returned if url contents couldn't be fetched
  864. // until the given deadline.
  865. func (c *HostClient) GetDeadline(dst []byte, url string, deadline time.Time) (statusCode int, body []byte, err error) {
  866. return clientGetURLDeadline(dst, url, deadline, c)
  867. }
  868. // Post sends POST request to the given url with the given POST arguments.
  869. //
  870. // The contents of dst will be replaced by the body and returned, if the dst
  871. // is too small a new slice will be allocated.
  872. //
  873. // The function follows redirects. Use Do* for manually handling redirects.
  874. //
  875. // Empty POST body is sent if postArgs is nil.
  876. func (c *HostClient) Post(dst []byte, url string, postArgs *Args) (statusCode int, body []byte, err error) {
  877. return clientPostURL(dst, url, postArgs, c)
  878. }
  879. type clientDoer interface {
  880. Do(req *Request, resp *Response) error
  881. }
  882. func clientGetURL(dst []byte, url string, c clientDoer) (statusCode int, body []byte, err error) {
  883. req := AcquireRequest()
  884. statusCode, body, err = doRequestFollowRedirectsBuffer(req, dst, url, c)
  885. ReleaseRequest(req)
  886. return statusCode, body, err
  887. }
  888. func clientGetURLTimeout(dst []byte, url string, timeout time.Duration, c clientDoer) (statusCode int, body []byte, err error) {
  889. deadline := time.Now().Add(timeout)
  890. return clientGetURLDeadline(dst, url, deadline, c)
  891. }
  892. type clientURLResponse struct {
  893. err error
  894. body []byte
  895. statusCode int
  896. }
  897. func clientGetURLDeadline(dst []byte, url string, deadline time.Time, c clientDoer) (statusCode int, body []byte, err error) {
  898. timeout := time.Until(deadline)
  899. if timeout <= 0 {
  900. return 0, dst, ErrTimeout
  901. }
  902. var ch chan clientURLResponse
  903. chv := clientURLResponseChPool.Get()
  904. if chv == nil {
  905. chv = make(chan clientURLResponse, 1)
  906. }
  907. ch = chv.(chan clientURLResponse)
  908. // Note that the request continues execution on ErrTimeout until
  909. // client-specific ReadTimeout exceeds. This helps limiting load
  910. // on slow hosts by MaxConns* concurrent requests.
  911. //
  912. // Without this 'hack' the load on slow host could exceed MaxConns*
  913. // concurrent requests, since timed out requests on client side
  914. // usually continue execution on the host.
  915. var mu sync.Mutex
  916. var timedout, responded bool
  917. go func() {
  918. req := AcquireRequest()
  919. statusCodeCopy, bodyCopy, errCopy := doRequestFollowRedirectsBuffer(req, dst, url, c)
  920. mu.Lock()
  921. if !timedout {
  922. ch <- clientURLResponse{
  923. statusCode: statusCodeCopy,
  924. body: bodyCopy,
  925. err: errCopy,
  926. }
  927. responded = true
  928. }
  929. mu.Unlock()
  930. ReleaseRequest(req)
  931. }()
  932. tc := AcquireTimer(timeout)
  933. select {
  934. case resp := <-ch:
  935. statusCode = resp.statusCode
  936. body = resp.body
  937. err = resp.err
  938. case <-tc.C:
  939. mu.Lock()
  940. if responded {
  941. resp := <-ch
  942. statusCode = resp.statusCode
  943. body = resp.body
  944. err = resp.err
  945. } else {
  946. timedout = true
  947. err = ErrTimeout
  948. body = dst
  949. }
  950. mu.Unlock()
  951. }
  952. ReleaseTimer(tc)
  953. clientURLResponseChPool.Put(chv)
  954. return statusCode, body, err
  955. }
  956. var clientURLResponseChPool sync.Pool
  957. func clientPostURL(dst []byte, url string, postArgs *Args, c clientDoer) (statusCode int, body []byte, err error) {
  958. req := AcquireRequest()
  959. defer ReleaseRequest(req)
  960. req.Header.SetMethod(MethodPost)
  961. req.Header.SetContentTypeBytes(strPostArgsContentType)
  962. if postArgs != nil {
  963. if _, err := postArgs.WriteTo(req.BodyWriter()); err != nil {
  964. return 0, nil, err
  965. }
  966. }
  967. statusCode, body, err = doRequestFollowRedirectsBuffer(req, dst, url, c)
  968. return statusCode, body, err
  969. }
  970. var (
  971. // ErrMissingLocation is returned by clients when the Location header is missing on
  972. // an HTTP response with a redirect status code.
  973. ErrMissingLocation = errors.New("missing Location header for http redirect")
  974. // ErrTooManyRedirects is returned by clients when the number of redirects followed
  975. // exceed the max count.
  976. ErrTooManyRedirects = errors.New("too many redirects detected when doing the request")
  977. // HostClients are only able to follow redirects to the same protocol.
  978. ErrHostClientRedirectToDifferentScheme = errors.New("HostClient can't follow redirects to a different protocol," +
  979. " please use Client instead")
  980. )
  981. const defaultMaxRedirectsCount = 16
  982. func doRequestFollowRedirectsBuffer(req *Request, dst []byte, url string, c clientDoer) (statusCode int, body []byte, err error) {
  983. resp := AcquireResponse()
  984. bodyBuf := resp.bodyBuffer()
  985. resp.keepBodyBuffer = true
  986. oldBody := bodyBuf.B
  987. bodyBuf.B = dst
  988. statusCode, _, err = doRequestFollowRedirects(req, resp, url, defaultMaxRedirectsCount, c)
  989. body = bodyBuf.B
  990. bodyBuf.B = oldBody
  991. resp.keepBodyBuffer = false
  992. ReleaseResponse(resp)
  993. return statusCode, body, err
  994. }
  995. func doRequestFollowRedirects(
  996. req *Request, resp *Response, url string, maxRedirectsCount int, c clientDoer,
  997. ) (statusCode int, body []byte, err error) {
  998. redirectsCount := 0
  999. for {
  1000. req.SetRequestURI(url)
  1001. if err := req.parseURI(); err != nil {
  1002. return 0, nil, err
  1003. }
  1004. if err = c.Do(req, resp); err != nil {
  1005. break
  1006. }
  1007. statusCode = resp.Header.StatusCode()
  1008. if !StatusCodeIsRedirect(statusCode) {
  1009. break
  1010. }
  1011. redirectsCount++
  1012. if redirectsCount > maxRedirectsCount {
  1013. err = ErrTooManyRedirects
  1014. break
  1015. }
  1016. location := resp.Header.peek(strLocation)
  1017. if len(location) == 0 {
  1018. err = ErrMissingLocation
  1019. break
  1020. }
  1021. url = getRedirectURL(url, location, req.DisableRedirectPathNormalizing)
  1022. if string(req.Header.Method()) == "POST" && (statusCode == 301 || statusCode == 302) {
  1023. req.Header.SetMethod(MethodGet)
  1024. }
  1025. }
  1026. return statusCode, body, err
  1027. }
  1028. func getRedirectURL(baseURL string, location []byte, disablePathNormalizing bool) string {
  1029. u := AcquireURI()
  1030. u.Update(baseURL)
  1031. u.UpdateBytes(location)
  1032. u.DisablePathNormalizing = disablePathNormalizing
  1033. redirectURL := u.String()
  1034. ReleaseURI(u)
  1035. return redirectURL
  1036. }
  1037. // StatusCodeIsRedirect returns true if the status code indicates a redirect.
  1038. func StatusCodeIsRedirect(statusCode int) bool {
  1039. return statusCode == StatusMovedPermanently ||
  1040. statusCode == StatusFound ||
  1041. statusCode == StatusSeeOther ||
  1042. statusCode == StatusTemporaryRedirect ||
  1043. statusCode == StatusPermanentRedirect
  1044. }
  1045. var (
  1046. requestPool sync.Pool
  1047. responsePool sync.Pool
  1048. )
  1049. // AcquireRequest returns an empty Request instance from request pool.
  1050. //
  1051. // The returned Request instance may be passed to ReleaseRequest when it is
  1052. // no longer needed. This allows Request recycling, reduces GC pressure
  1053. // and usually improves performance.
  1054. func AcquireRequest() *Request {
  1055. v := requestPool.Get()
  1056. if v == nil {
  1057. return &Request{}
  1058. }
  1059. return v.(*Request)
  1060. }
  1061. // ReleaseRequest returns req acquired via AcquireRequest to request pool.
  1062. //
  1063. // It is forbidden accessing req and/or its' members after returning
  1064. // it to request pool.
  1065. func ReleaseRequest(req *Request) {
  1066. req.Reset()
  1067. requestPool.Put(req)
  1068. }
  1069. // AcquireResponse returns an empty Response instance from response pool.
  1070. //
  1071. // The returned Response instance may be passed to ReleaseResponse when it is
  1072. // no longer needed. This allows Response recycling, reduces GC pressure
  1073. // and usually improves performance.
  1074. func AcquireResponse() *Response {
  1075. v := responsePool.Get()
  1076. if v == nil {
  1077. return &Response{}
  1078. }
  1079. return v.(*Response)
  1080. }
  1081. // ReleaseResponse return resp acquired via AcquireResponse to response pool.
  1082. //
  1083. // It is forbidden accessing resp and/or its' members after returning
  1084. // it to response pool.
  1085. func ReleaseResponse(resp *Response) {
  1086. resp.Reset()
  1087. responsePool.Put(resp)
  1088. }
  1089. // DoTimeout performs the given request and waits for response during
  1090. // the given timeout duration.
  1091. //
  1092. // Request must contain at least non-zero RequestURI with full url (including
  1093. // scheme and host) or non-zero Host header + RequestURI.
  1094. //
  1095. // The function doesn't follow redirects. Use Get* for following redirects.
  1096. //
  1097. // Response is ignored if resp is nil.
  1098. //
  1099. // ErrTimeout is returned if the response wasn't returned during
  1100. // the given timeout.
  1101. // Immediately returns ErrTimeout if timeout value is negative.
  1102. //
  1103. // ErrNoFreeConns is returned if all HostClient.MaxConns connections
  1104. // to the host are busy.
  1105. //
  1106. // It is recommended obtaining req and resp via AcquireRequest
  1107. // and AcquireResponse in performance-critical code.
  1108. func (c *HostClient) DoTimeout(req *Request, resp *Response, timeout time.Duration) error {
  1109. req.timeout = timeout
  1110. if req.timeout <= 0 {
  1111. return ErrTimeout
  1112. }
  1113. return c.Do(req, resp)
  1114. }
  1115. // DoDeadline performs the given request and waits for response until
  1116. // the given deadline.
  1117. //
  1118. // Request must contain at least non-zero RequestURI with full url (including
  1119. // scheme and host) or non-zero Host header + RequestURI.
  1120. //
  1121. // The function doesn't follow redirects. Use Get* for following redirects.
  1122. //
  1123. // Response is ignored if resp is nil.
  1124. //
  1125. // ErrTimeout is returned if the response wasn't returned until
  1126. // the given deadline.
  1127. // Immediately returns ErrTimeout if the deadline has already been reached.
  1128. //
  1129. // ErrNoFreeConns is returned if all HostClient.MaxConns connections
  1130. // to the host are busy.
  1131. //
  1132. // It is recommended obtaining req and resp via AcquireRequest
  1133. // and AcquireResponse in performance-critical code.
  1134. func (c *HostClient) DoDeadline(req *Request, resp *Response, deadline time.Time) error {
  1135. req.timeout = time.Until(deadline)
  1136. if req.timeout <= 0 {
  1137. return ErrTimeout
  1138. }
  1139. return c.Do(req, resp)
  1140. }
  1141. // DoRedirects performs the given http request and fills the given http response,
  1142. // following up to maxRedirectsCount redirects. When the redirect count exceeds
  1143. // maxRedirectsCount, ErrTooManyRedirects is returned.
  1144. //
  1145. // Request must contain at least non-zero RequestURI with full url (including
  1146. // scheme and host) or non-zero Host header + RequestURI.
  1147. //
  1148. // Client determines the server to be requested in the following order:
  1149. //
  1150. // - from RequestURI if it contains full url with scheme and host;
  1151. // - from Host header otherwise.
  1152. //
  1153. // Response is ignored if resp is nil.
  1154. //
  1155. // ErrNoFreeConns is returned if all DefaultMaxConnsPerHost connections
  1156. // to the requested host are busy.
  1157. //
  1158. // It is recommended obtaining req and resp via AcquireRequest
  1159. // and AcquireResponse in performance-critical code.
  1160. func (c *HostClient) DoRedirects(req *Request, resp *Response, maxRedirectsCount int) error {
  1161. if c.DisablePathNormalizing {
  1162. req.URI().DisablePathNormalizing = true
  1163. }
  1164. _, _, err := doRequestFollowRedirects(req, resp, req.URI().String(), maxRedirectsCount, c)
  1165. return err
  1166. }
  1167. // Do performs the given http request and sets the corresponding response.
  1168. //
  1169. // Request must contain at least non-zero RequestURI with full url (including
  1170. // scheme and host) or non-zero Host header + RequestURI.
  1171. //
  1172. // The function doesn't follow redirects. Use Get* for following redirects.
  1173. //
  1174. // Response is ignored if resp is nil.
  1175. //
  1176. // ErrNoFreeConns is returned if all HostClient.MaxConns connections
  1177. // to the host are busy.
  1178. //
  1179. // It is recommended obtaining req and resp via AcquireRequest
  1180. // and AcquireResponse in performance-critical code.
  1181. func (c *HostClient) Do(req *Request, resp *Response) error {
  1182. var (
  1183. err error
  1184. retry bool
  1185. resetTimeout bool
  1186. )
  1187. maxAttempts := c.MaxIdemponentCallAttempts
  1188. if maxAttempts <= 0 {
  1189. maxAttempts = DefaultMaxIdemponentCallAttempts
  1190. }
  1191. attempts := 0
  1192. hasBodyStream := req.IsBodyStream()
  1193. // If a request has a timeout we store the timeout
  1194. // and calculate a deadline so we can keep updating the
  1195. // timeout on each retry.
  1196. deadline := time.Time{}
  1197. timeout := req.timeout
  1198. if timeout > 0 {
  1199. deadline = time.Now().Add(timeout)
  1200. }
  1201. retryFunc := c.RetryIf
  1202. if retryFunc == nil {
  1203. retryFunc = isIdempotent
  1204. }
  1205. atomic.AddInt32(&c.pendingRequests, 1)
  1206. for {
  1207. // If the original timeout was set, we need to update
  1208. // the one set on the request to reflect the remaining time.
  1209. if timeout > 0 {
  1210. req.timeout = time.Until(deadline)
  1211. if req.timeout <= 0 {
  1212. err = ErrTimeout
  1213. break
  1214. }
  1215. }
  1216. retry, err = c.do(req, resp)
  1217. if err == nil || !retry {
  1218. break
  1219. }
  1220. if hasBodyStream {
  1221. break
  1222. }
  1223. // Path prioritization based on ease of computation
  1224. attempts++
  1225. if attempts >= maxAttempts {
  1226. break
  1227. }
  1228. if c.RetryIfErr != nil {
  1229. resetTimeout, retry = c.RetryIfErr(req, attempts, err)
  1230. } else {
  1231. retry = retryFunc(req)
  1232. }
  1233. if !retry {
  1234. break
  1235. }
  1236. if timeout > 0 && resetTimeout {
  1237. deadline = time.Now().Add(timeout)
  1238. }
  1239. }
  1240. atomic.AddInt32(&c.pendingRequests, -1)
  1241. // Restore the original timeout.
  1242. req.timeout = timeout
  1243. if err == io.EOF {
  1244. err = ErrConnectionClosed
  1245. }
  1246. return err
  1247. }
  1248. // PendingRequests returns the current number of requests the client
  1249. // is executing.
  1250. //
  1251. // This function may be used for balancing load among multiple HostClient
  1252. // instances.
  1253. func (c *HostClient) PendingRequests() int {
  1254. return int(atomic.LoadInt32(&c.pendingRequests))
  1255. }
  1256. func isIdempotent(req *Request) bool {
  1257. return req.Header.IsGet() || req.Header.IsHead() || req.Header.IsPut()
  1258. }
  1259. func (c *HostClient) do(req *Request, resp *Response) (bool, error) {
  1260. if resp == nil {
  1261. resp = AcquireResponse()
  1262. defer ReleaseResponse(resp)
  1263. }
  1264. return c.doNonNilReqResp(req, resp)
  1265. }
  1266. func (c *HostClient) doNonNilReqResp(req *Request, resp *Response) (bool, error) {
  1267. if req == nil {
  1268. // for debugging purposes
  1269. panic("BUG: req cannot be nil")
  1270. }
  1271. if resp == nil {
  1272. // for debugging purposes
  1273. panic("BUG: resp cannot be nil")
  1274. }
  1275. // Secure header error logs configuration
  1276. resp.secureErrorLogMessage = c.SecureErrorLogMessage
  1277. resp.Header.secureErrorLogMessage = c.SecureErrorLogMessage
  1278. req.secureErrorLogMessage = c.SecureErrorLogMessage
  1279. req.Header.secureErrorLogMessage = c.SecureErrorLogMessage
  1280. if c.IsTLS != req.URI().isHTTPS() {
  1281. return false, ErrHostClientRedirectToDifferentScheme
  1282. }
  1283. atomic.StoreUint32(&c.lastUseTime, uint32(time.Now().Unix()-startTimeUnix)) // #nosec G115
  1284. // Free up resources occupied by response before sending the request,
  1285. // so the GC may reclaim these resources (e.g. response body).
  1286. // backing up SkipBody in case it was set explicitly
  1287. customSkipBody := resp.SkipBody
  1288. customStreamBody := resp.StreamBody || c.StreamResponseBody
  1289. resp.Reset()
  1290. resp.SkipBody = customSkipBody
  1291. resp.StreamBody = customStreamBody
  1292. req.URI().DisablePathNormalizing = c.DisablePathNormalizing
  1293. userAgentOld := req.Header.UserAgent()
  1294. if len(userAgentOld) == 0 {
  1295. userAgent := c.Name
  1296. if userAgent == "" && !c.NoDefaultUserAgentHeader {
  1297. userAgent = defaultUserAgent
  1298. }
  1299. if userAgent != "" {
  1300. req.Header.userAgent = append(req.Header.userAgent[:0], userAgent...)
  1301. }
  1302. }
  1303. return c.transport().RoundTrip(c, req, resp)
  1304. }
  1305. func (c *HostClient) transport() RoundTripper {
  1306. if c.Transport == nil {
  1307. return DefaultTransport
  1308. }
  1309. return c.Transport
  1310. }
  1311. var (
  1312. // ErrNoFreeConns is returned when no free connections available
  1313. // to the given host.
  1314. //
  1315. // Increase the allowed number of connections per host if you
  1316. // see this error.
  1317. ErrNoFreeConns = errors.New("no free connections available to host")
  1318. // ErrConnectionClosed may be returned from client methods if the server
  1319. // closes connection before returning the first response byte.
  1320. //
  1321. // If you see this error, then either fix the server by returning
  1322. // 'Connection: close' response header before closing the connection
  1323. // or add 'Connection: close' request header before sending requests
  1324. // to broken server.
  1325. ErrConnectionClosed = errors.New("the server closed connection before returning the first response byte. " +
  1326. "Make sure the server returns 'Connection: close' response header before closing the connection")
  1327. // ErrConnPoolStrategyNotImpl is returned when HostClient.ConnPoolStrategy is not implement yet.
  1328. // If you see this error, then you need to check your HostClient configuration.
  1329. ErrConnPoolStrategyNotImpl = errors.New("connection pool strategy is not implement")
  1330. )
  1331. type timeoutError struct{}
  1332. func (e *timeoutError) Error() string {
  1333. return "timeout"
  1334. }
  1335. // Only implement the Timeout() function of the net.Error interface.
  1336. // This allows for checks like:
  1337. //
  1338. // if x, ok := err.(interface{ Timeout() bool }); ok && x.Timeout() {
  1339. func (e *timeoutError) Timeout() bool {
  1340. return true
  1341. }
  1342. // ErrTimeout is returned from timed out calls.
  1343. var ErrTimeout = &timeoutError{}
  1344. // SetMaxConns sets up the maximum number of connections which may be established to all hosts listed in Addr.
  1345. func (c *HostClient) SetMaxConns(newMaxConns int) {
  1346. c.connsLock.Lock()
  1347. c.MaxConns = newMaxConns
  1348. c.connsLock.Unlock()
  1349. }
  1350. func (c *HostClient) AcquireConn(reqTimeout time.Duration, connectionClose bool) (cc *clientConn, err error) {
  1351. createConn := false
  1352. startCleaner := false
  1353. var n int
  1354. c.connsLock.Lock()
  1355. n = len(c.conns)
  1356. if n == 0 {
  1357. maxConns := c.MaxConns
  1358. if maxConns <= 0 {
  1359. maxConns = DefaultMaxConnsPerHost
  1360. }
  1361. if c.connsCount < maxConns {
  1362. c.connsCount++
  1363. createConn = true
  1364. if !c.connsCleanerRun && !connectionClose {
  1365. startCleaner = true
  1366. c.connsCleanerRun = true
  1367. }
  1368. }
  1369. } else {
  1370. switch c.ConnPoolStrategy {
  1371. case LIFO:
  1372. n--
  1373. cc = c.conns[n]
  1374. c.conns[n] = nil
  1375. c.conns = c.conns[:n]
  1376. case FIFO:
  1377. cc = c.conns[0]
  1378. copy(c.conns, c.conns[1:])
  1379. c.conns[n-1] = nil
  1380. c.conns = c.conns[:n-1]
  1381. default:
  1382. c.connsLock.Unlock()
  1383. return nil, ErrConnPoolStrategyNotImpl
  1384. }
  1385. }
  1386. c.connsLock.Unlock()
  1387. if cc != nil {
  1388. return cc, nil
  1389. }
  1390. if !createConn {
  1391. if c.MaxConnWaitTimeout <= 0 {
  1392. return nil, ErrNoFreeConns
  1393. }
  1394. //nolint:dupword
  1395. // reqTimeout c.MaxConnWaitTimeout wait duration
  1396. // d1 d2 min(d1, d2)
  1397. // 0(not set) d2 d2
  1398. // d1 0(don't wait) 0(don't wait)
  1399. // 0(not set) d2 d2
  1400. timeout := c.MaxConnWaitTimeout
  1401. timeoutOverridden := false
  1402. // reqTimeout == 0 means not set
  1403. if reqTimeout > 0 && reqTimeout < timeout {
  1404. timeout = reqTimeout
  1405. timeoutOverridden = true
  1406. }
  1407. // wait for a free connection
  1408. tc := AcquireTimer(timeout)
  1409. defer ReleaseTimer(tc)
  1410. w := &wantConn{
  1411. ready: make(chan struct{}, 1),
  1412. }
  1413. defer func() {
  1414. if err != nil {
  1415. w.cancel(c, err)
  1416. }
  1417. }()
  1418. c.queueForIdle(w)
  1419. select {
  1420. case <-w.ready:
  1421. return w.conn, w.err
  1422. case <-tc.C:
  1423. c.connsWait.failedWaiters.Add(1)
  1424. if timeoutOverridden {
  1425. return nil, ErrTimeout
  1426. }
  1427. return nil, ErrNoFreeConns
  1428. }
  1429. }
  1430. if startCleaner {
  1431. go c.connsCleaner()
  1432. }
  1433. conn, err := c.dialHostHard(reqTimeout)
  1434. if err != nil {
  1435. c.decConnsCount()
  1436. return nil, err
  1437. }
  1438. cc = acquireClientConn(conn)
  1439. return cc, nil
  1440. }
  1441. func (c *HostClient) queueForIdle(w *wantConn) {
  1442. c.connsLock.Lock()
  1443. defer c.connsLock.Unlock()
  1444. if c.connsWait == nil {
  1445. c.connsWait = &wantConnQueue{}
  1446. }
  1447. c.connsWait.clearFront()
  1448. c.connsWait.pushBack(w)
  1449. }
  1450. func (c *HostClient) dialConnFor(w *wantConn) {
  1451. conn, err := c.dialHostHard(0)
  1452. if err != nil {
  1453. w.tryDeliver(nil, err)
  1454. c.decConnsCount()
  1455. return
  1456. }
  1457. cc := acquireClientConn(conn)
  1458. if !w.tryDeliver(cc, nil) {
  1459. // not delivered, return idle connection
  1460. c.ReleaseConn(cc)
  1461. }
  1462. }
  1463. // CloseIdleConnections closes any connections which were previously
  1464. // connected from previous requests but are now sitting idle in a
  1465. // "keep-alive" state. It does not interrupt any connections currently
  1466. // in use.
  1467. func (c *HostClient) CloseIdleConnections() {
  1468. c.connsLock.Lock()
  1469. scratch := append([]*clientConn{}, c.conns...)
  1470. for i := range c.conns {
  1471. c.conns[i] = nil
  1472. }
  1473. c.conns = c.conns[:0]
  1474. c.connsLock.Unlock()
  1475. for _, cc := range scratch {
  1476. c.CloseConn(cc)
  1477. }
  1478. }
  1479. func (c *HostClient) connsCleaner() {
  1480. var (
  1481. scratch []*clientConn
  1482. maxIdleConnDuration = c.MaxIdleConnDuration
  1483. )
  1484. if maxIdleConnDuration <= 0 {
  1485. maxIdleConnDuration = DefaultMaxIdleConnDuration
  1486. }
  1487. for {
  1488. currentTime := time.Now()
  1489. // Determine idle connections to be closed.
  1490. c.connsLock.Lock()
  1491. conns := c.conns
  1492. n := len(conns)
  1493. i := 0
  1494. for i < n && currentTime.Sub(conns[i].lastUseTime) > maxIdleConnDuration {
  1495. i++
  1496. }
  1497. sleepFor := maxIdleConnDuration
  1498. if i < n {
  1499. // + 1 so we actually sleep past the expiration time and not up to it.
  1500. // Otherwise the > check above would still fail.
  1501. sleepFor = maxIdleConnDuration - currentTime.Sub(conns[i].lastUseTime) + 1
  1502. }
  1503. scratch = append(scratch[:0], conns[:i]...)
  1504. if i > 0 {
  1505. m := copy(conns, conns[i:])
  1506. for i = m; i < n; i++ {
  1507. conns[i] = nil
  1508. }
  1509. c.conns = conns[:m]
  1510. }
  1511. c.connsLock.Unlock()
  1512. // Close idle connections.
  1513. for i, cc := range scratch {
  1514. c.CloseConn(cc)
  1515. scratch[i] = nil
  1516. }
  1517. // Determine whether to stop the connsCleaner.
  1518. c.connsLock.Lock()
  1519. mustStop := c.connsCount == 0
  1520. if mustStop {
  1521. c.connsCleanerRun = false
  1522. }
  1523. c.connsLock.Unlock()
  1524. if mustStop {
  1525. break
  1526. }
  1527. time.Sleep(sleepFor)
  1528. }
  1529. }
  1530. func (c *HostClient) CloseConn(cc *clientConn) {
  1531. c.decConnsCount()
  1532. cc.c.Close()
  1533. releaseClientConn(cc)
  1534. }
  1535. func (c *HostClient) decConnsCount() {
  1536. if c.MaxConnWaitTimeout <= 0 {
  1537. c.connsLock.Lock()
  1538. c.connsCount--
  1539. c.connsLock.Unlock()
  1540. return
  1541. }
  1542. c.connsLock.Lock()
  1543. defer c.connsLock.Unlock()
  1544. dialed := false
  1545. if q := c.connsWait; q != nil && q.len() > 0 {
  1546. for q.len() > 0 {
  1547. w := q.popFront()
  1548. if w.waiting() {
  1549. go c.dialConnFor(w)
  1550. dialed = true
  1551. break
  1552. }
  1553. c.connsWait.failedWaiters.Add(-1)
  1554. }
  1555. }
  1556. if !dialed {
  1557. c.connsCount--
  1558. }
  1559. }
  1560. // ConnsCount returns connection count of HostClient.
  1561. func (c *HostClient) ConnsCount() int {
  1562. c.connsLock.Lock()
  1563. defer c.connsLock.Unlock()
  1564. return c.connsCount
  1565. }
  1566. func acquireClientConn(conn net.Conn) *clientConn {
  1567. v := clientConnPool.Get()
  1568. if v == nil {
  1569. v = &clientConn{}
  1570. }
  1571. cc := v.(*clientConn)
  1572. cc.c = conn
  1573. cc.createdTime = time.Now()
  1574. return cc
  1575. }
  1576. func releaseClientConn(cc *clientConn) {
  1577. // Reset all fields.
  1578. *cc = clientConn{}
  1579. clientConnPool.Put(cc)
  1580. }
  1581. var clientConnPool sync.Pool
  1582. func (c *HostClient) ReleaseConn(cc *clientConn) {
  1583. cc.lastUseTime = time.Now()
  1584. if c.MaxConnWaitTimeout <= 0 {
  1585. c.connsLock.Lock()
  1586. c.conns = append(c.conns, cc)
  1587. c.connsLock.Unlock()
  1588. return
  1589. }
  1590. // try to deliver an idle connection to a *wantConn
  1591. c.connsLock.Lock()
  1592. defer c.connsLock.Unlock()
  1593. delivered := false
  1594. if q := c.connsWait; q != nil && q.len() > 0 {
  1595. for q.len() > 0 {
  1596. w := q.popFront()
  1597. if w.waiting() {
  1598. delivered = w.tryDeliver(cc, nil)
  1599. // This is the last resort to hand over conCount sema.
  1600. // We must ensure that there are no valid waiters in connsWait
  1601. // when we exit this loop.
  1602. //
  1603. // We did not apply the same looping pattern in the decConnsCount
  1604. // method because it needs to create a new time-spent connection,
  1605. // and the decConnsCount call chain will inevitably reach this point.
  1606. // When MaxConnWaitTimeout>0.
  1607. if delivered {
  1608. break
  1609. }
  1610. }
  1611. c.connsWait.failedWaiters.Add(-1)
  1612. }
  1613. }
  1614. if !delivered {
  1615. c.conns = append(c.conns, cc)
  1616. }
  1617. }
  1618. func (c *HostClient) AcquireWriter(conn net.Conn) *bufio.Writer {
  1619. var v any
  1620. if c.clientWriterPool != nil {
  1621. v = c.clientWriterPool.Get()
  1622. if v == nil {
  1623. n := c.WriteBufferSize
  1624. if n <= 0 {
  1625. n = defaultWriteBufferSize
  1626. }
  1627. return bufio.NewWriterSize(conn, n)
  1628. }
  1629. } else {
  1630. v = c.writerPool.Get()
  1631. if v == nil {
  1632. n := c.WriteBufferSize
  1633. if n <= 0 {
  1634. n = defaultWriteBufferSize
  1635. }
  1636. return bufio.NewWriterSize(conn, n)
  1637. }
  1638. }
  1639. bw := v.(*bufio.Writer)
  1640. bw.Reset(conn)
  1641. return bw
  1642. }
  1643. func (c *HostClient) ReleaseWriter(bw *bufio.Writer) {
  1644. if c.clientWriterPool != nil {
  1645. c.clientWriterPool.Put(bw)
  1646. } else {
  1647. c.writerPool.Put(bw)
  1648. }
  1649. }
  1650. func (c *HostClient) AcquireReader(conn net.Conn) *bufio.Reader {
  1651. var v any
  1652. if c.clientReaderPool != nil {
  1653. v = c.clientReaderPool.Get()
  1654. if v == nil {
  1655. n := c.ReadBufferSize
  1656. if n <= 0 {
  1657. n = defaultReadBufferSize
  1658. }
  1659. return bufio.NewReaderSize(conn, n)
  1660. }
  1661. } else {
  1662. v = c.readerPool.Get()
  1663. if v == nil {
  1664. n := c.ReadBufferSize
  1665. if n <= 0 {
  1666. n = defaultReadBufferSize
  1667. }
  1668. return bufio.NewReaderSize(conn, n)
  1669. }
  1670. }
  1671. br := v.(*bufio.Reader)
  1672. br.Reset(conn)
  1673. return br
  1674. }
  1675. func (c *HostClient) ReleaseReader(br *bufio.Reader) {
  1676. if c.clientReaderPool != nil {
  1677. c.clientReaderPool.Put(br)
  1678. } else {
  1679. c.readerPool.Put(br)
  1680. }
  1681. }
  1682. func newClientTLSConfig(c *tls.Config, addr string) *tls.Config {
  1683. if c == nil {
  1684. c = &tls.Config{}
  1685. } else {
  1686. c = c.Clone()
  1687. }
  1688. if c.ServerName == "" {
  1689. serverName := tlsServerName(addr)
  1690. if serverName == "*" {
  1691. c.InsecureSkipVerify = true
  1692. } else {
  1693. c.ServerName = serverName
  1694. }
  1695. }
  1696. return c
  1697. }
  1698. func tlsServerName(addr string) string {
  1699. if !strings.Contains(addr, ":") {
  1700. return addr
  1701. }
  1702. host, _, err := net.SplitHostPort(addr)
  1703. if err != nil {
  1704. return "*"
  1705. }
  1706. return host
  1707. }
  1708. func (c *HostClient) nextAddr() string {
  1709. c.addrsLock.Lock()
  1710. if c.addrs == nil {
  1711. c.addrs = strings.Split(c.Addr, ",")
  1712. }
  1713. addr := c.addrs[0]
  1714. if len(c.addrs) > 1 {
  1715. addr = c.addrs[c.addrIdx%uint32(len(c.addrs))] // #nosec G115
  1716. c.addrIdx++
  1717. }
  1718. c.addrsLock.Unlock()
  1719. return addr
  1720. }
  1721. func (c *HostClient) dialHostHard(dialTimeout time.Duration) (conn net.Conn, err error) {
  1722. // use dialTimeout to control the timeout of each dial. It does not work if dialTimeout is 0 or if
  1723. // c.DialTimeout has not been set and c.Dial has been set.
  1724. // attempt to dial all the available hosts before giving up.
  1725. c.addrsLock.Lock()
  1726. n := len(c.addrs)
  1727. c.addrsLock.Unlock()
  1728. if n == 0 {
  1729. // It looks like c.addrs isn't initialized yet.
  1730. n = 1
  1731. }
  1732. timeout := c.ReadTimeout + c.WriteTimeout
  1733. if timeout <= 0 {
  1734. timeout = DefaultDialTimeout
  1735. }
  1736. deadline := time.Now().Add(timeout)
  1737. for n > 0 {
  1738. addr := c.nextAddr()
  1739. tlsConfig := c.cachedTLSConfig(addr)
  1740. conn, err = dialAddr(addr, c.Dial, c.DialTimeout, c.DialDualStack, c.IsTLS, tlsConfig, dialTimeout, c.WriteTimeout)
  1741. if err == nil {
  1742. return conn, nil
  1743. }
  1744. if time.Since(deadline) >= 0 {
  1745. break
  1746. }
  1747. n--
  1748. }
  1749. return nil, err
  1750. }
  1751. func (c *HostClient) cachedTLSConfig(addr string) *tls.Config {
  1752. if !c.IsTLS {
  1753. return nil
  1754. }
  1755. c.tlsConfigMapLock.Lock()
  1756. if c.tlsConfigMap == nil {
  1757. c.tlsConfigMap = make(map[string]*tls.Config)
  1758. }
  1759. cfg := c.tlsConfigMap[addr]
  1760. if cfg == nil {
  1761. cfg = newClientTLSConfig(c.TLSConfig, addr)
  1762. c.tlsConfigMap[addr] = cfg
  1763. }
  1764. c.tlsConfigMapLock.Unlock()
  1765. return cfg
  1766. }
  1767. // ErrTLSHandshakeTimeout indicates there is a timeout from tls handshake.
  1768. var ErrTLSHandshakeTimeout = errors.New("tls handshake timed out")
  1769. func tlsClientHandshake(rawConn net.Conn, tlsConfig *tls.Config, deadline time.Time) (_ net.Conn, retErr error) {
  1770. defer func() {
  1771. if retErr != nil {
  1772. rawConn.Close()
  1773. }
  1774. }()
  1775. conn := tls.Client(rawConn, tlsConfig)
  1776. err := conn.SetDeadline(deadline)
  1777. if err != nil {
  1778. return nil, err
  1779. }
  1780. err = conn.Handshake()
  1781. if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
  1782. return nil, ErrTLSHandshakeTimeout
  1783. }
  1784. if err != nil {
  1785. return nil, err
  1786. }
  1787. err = conn.SetDeadline(time.Time{})
  1788. if err != nil {
  1789. return nil, err
  1790. }
  1791. return conn, nil
  1792. }
  1793. func dialAddr(
  1794. addr string, dial DialFunc, dialWithTimeout DialFuncWithTimeout, dialDualStack, isTLS bool,
  1795. tlsConfig *tls.Config, dialTimeout, writeTimeout time.Duration,
  1796. ) (net.Conn, error) {
  1797. deadline := time.Now().Add(writeTimeout)
  1798. conn, err := callDialFunc(addr, dial, dialWithTimeout, dialDualStack, isTLS, dialTimeout)
  1799. if err != nil {
  1800. return nil, err
  1801. }
  1802. if conn == nil {
  1803. return nil, errors.New("dialling unsuccessful. Please report this bug")
  1804. }
  1805. // We assume that any conn that has the Handshake() method is a TLS conn already.
  1806. // This doesn't cover just tls.Conn but also other TLS implementations.
  1807. _, isTLSAlready := conn.(interface{ Handshake() error })
  1808. if isTLS && !isTLSAlready {
  1809. if writeTimeout == 0 {
  1810. return tls.Client(conn, tlsConfig), nil
  1811. }
  1812. return tlsClientHandshake(conn, tlsConfig, deadline)
  1813. }
  1814. return conn, nil
  1815. }
  1816. func callDialFunc(
  1817. addr string, dial DialFunc, dialWithTimeout DialFuncWithTimeout, dialDualStack, isTLS bool, timeout time.Duration,
  1818. ) (net.Conn, error) {
  1819. if dialWithTimeout != nil {
  1820. return dialWithTimeout(addr, timeout)
  1821. }
  1822. if dial != nil {
  1823. return dial(addr)
  1824. }
  1825. addr = AddMissingPort(addr, isTLS)
  1826. if timeout > 0 {
  1827. if dialDualStack {
  1828. return DialDualStackTimeout(addr, timeout)
  1829. }
  1830. return DialTimeout(addr, timeout)
  1831. }
  1832. if dialDualStack {
  1833. return DialDualStack(addr)
  1834. }
  1835. return Dial(addr)
  1836. }
  1837. // AddMissingPort adds a port to a host if it is missing.
  1838. // A literal IPv6 address in hostport must be enclosed in square
  1839. // brackets, as in "[::1]:80", "[::1%lo0]:80".
  1840. func AddMissingPort(addr string, isTLS bool) string {
  1841. addrLen := len(addr)
  1842. if addrLen == 0 {
  1843. return addr
  1844. }
  1845. isIP6 := addr[0] == '['
  1846. if isIP6 {
  1847. // if the IPv6 has opening bracket but closing bracket is the last char then it doesn't have a port
  1848. isIP6WithoutPort := addr[addrLen-1] == ']'
  1849. if !isIP6WithoutPort {
  1850. return addr
  1851. }
  1852. } else { // IPv4
  1853. columnPos := strings.LastIndexByte(addr, ':')
  1854. if columnPos > 0 {
  1855. return addr
  1856. }
  1857. }
  1858. port := ":80"
  1859. if isTLS {
  1860. port = ":443"
  1861. }
  1862. return addr + port
  1863. }
  1864. // A wantConn records state about a wanted connection
  1865. // (that is, an active call to getConn).
  1866. // The conn may be gotten by dialing or by finding an idle connection,
  1867. // or a cancellation may make the conn no longer wanted.
  1868. // These three options are racing against each other and use
  1869. // wantConn to coordinate and agree about the winning outcome.
  1870. //
  1871. // Inspired by net/http/transport.go.
  1872. type wantConn struct {
  1873. err error
  1874. ready chan struct{}
  1875. conn *clientConn
  1876. mu sync.Mutex // protects conn, err, close(ready)
  1877. }
  1878. // waiting reports whether w is still waiting for an answer (connection or error).
  1879. func (w *wantConn) waiting() bool {
  1880. select {
  1881. case <-w.ready:
  1882. return false
  1883. default:
  1884. return true
  1885. }
  1886. }
  1887. // tryDeliver attempts to deliver conn, err to w and reports whether it succeeded.
  1888. func (w *wantConn) tryDeliver(conn *clientConn, err error) bool {
  1889. w.mu.Lock()
  1890. defer w.mu.Unlock()
  1891. if w.conn != nil || w.err != nil {
  1892. return false
  1893. }
  1894. w.conn = conn
  1895. w.err = err
  1896. if w.conn == nil && w.err == nil {
  1897. panic("fasthttp: internal error: misuse of tryDeliver")
  1898. }
  1899. close(w.ready)
  1900. return true
  1901. }
  1902. // cancel marks w as no longer wanting a result (for example, due to cancellation).
  1903. // If a connection has been delivered already, cancel returns it with c.releaseConn.
  1904. func (w *wantConn) cancel(c *HostClient, err error) {
  1905. w.mu.Lock()
  1906. if w.conn == nil && w.err == nil {
  1907. close(w.ready) // catch misbehavior in future delivery
  1908. }
  1909. conn := w.conn
  1910. w.conn = nil
  1911. w.err = err
  1912. w.mu.Unlock()
  1913. if conn != nil {
  1914. c.ReleaseConn(conn)
  1915. }
  1916. }
  1917. // A wantConnQueue is a queue of wantConns.
  1918. //
  1919. // Inspired by net/http/transport.go.
  1920. type wantConnQueue struct {
  1921. // This is a queue, not a dequeue.
  1922. // It is split into two stages - head[headPos:] and tail.
  1923. // popFront is trivial (headPos++) on the first stage, and
  1924. // pushBack is trivial (append) on the second stage.
  1925. // If the first stage is empty, popFront can swap the
  1926. // first and second stages to remedy the situation.
  1927. //
  1928. // This two-stage split is analogous to the use of two lists
  1929. // in Okasaki's purely functional queue but without the
  1930. // overhead of reversing the list when swapping stages.
  1931. head []*wantConn
  1932. tail []*wantConn
  1933. headPos int
  1934. // failedWaiters is the number of waiters in the head or tail queue,
  1935. // but is invalid.
  1936. // These state waiters cannot truly be considered as waiters; the current
  1937. // implementation does not immediately remove them when they become
  1938. // invalid but instead only marks them.
  1939. failedWaiters atomic.Int64
  1940. }
  1941. // len returns the number of items in the queue.
  1942. func (q *wantConnQueue) len() int {
  1943. return len(q.head) - q.headPos + len(q.tail) - int(q.failedWaiters.Load())
  1944. }
  1945. // pushBack adds w to the back of the queue.
  1946. func (q *wantConnQueue) pushBack(w *wantConn) {
  1947. q.tail = append(q.tail, w)
  1948. }
  1949. // popFront removes and returns the wantConn at the front of the queue.
  1950. func (q *wantConnQueue) popFront() *wantConn {
  1951. if q.headPos >= len(q.head) {
  1952. if len(q.tail) == 0 {
  1953. return nil
  1954. }
  1955. // Pick up tail as new head, clear tail.
  1956. q.head, q.headPos, q.tail = q.tail, 0, q.head[:0]
  1957. }
  1958. w := q.head[q.headPos]
  1959. q.head[q.headPos] = nil
  1960. q.headPos++
  1961. return w
  1962. }
  1963. // peekFront returns the wantConn at the front of the queue without removing it.
  1964. func (q *wantConnQueue) peekFront() *wantConn {
  1965. if q.headPos < len(q.head) {
  1966. return q.head[q.headPos]
  1967. }
  1968. if len(q.tail) > 0 {
  1969. return q.tail[0]
  1970. }
  1971. return nil
  1972. }
  1973. // clearFront pops any wantConns that are no longer waiting from the head of the
  1974. // queue, reporting whether any were popped.
  1975. func (q *wantConnQueue) clearFront() (cleaned bool) {
  1976. for {
  1977. w := q.peekFront()
  1978. if w == nil || w.waiting() {
  1979. return cleaned
  1980. }
  1981. q.popFront()
  1982. q.failedWaiters.Add(-1)
  1983. cleaned = true
  1984. }
  1985. }
  1986. // PipelineClient pipelines requests over a limited set of concurrent
  1987. // connections to the given Addr.
  1988. //
  1989. // This client may be used in highly loaded HTTP-based RPC systems for reducing
  1990. // context switches and network level overhead.
  1991. // See https://en.wikipedia.org/wiki/HTTP_pipelining for details.
  1992. //
  1993. // It is forbidden copying PipelineClient instances. Create new instances
  1994. // instead.
  1995. //
  1996. // It is safe calling PipelineClient methods from concurrently running
  1997. // goroutines.
  1998. type PipelineClient struct {
  1999. noCopy noCopy
  2000. // Logger for logging client errors.
  2001. //
  2002. // By default standard logger from log package is used.
  2003. Logger Logger
  2004. // Callback for connection establishing to the host.
  2005. //
  2006. // Default Dial is used if not set.
  2007. Dial DialFunc
  2008. // Optional TLS config.
  2009. TLSConfig *tls.Config
  2010. // Address of the host to connect to.
  2011. Addr string
  2012. // PipelineClient name. Used in User-Agent request header.
  2013. Name string
  2014. connClients []*pipelineConnClient
  2015. // The maximum number of concurrent connections to the Addr.
  2016. //
  2017. // A single connection is used by default.
  2018. MaxConns int
  2019. // The maximum number of pending pipelined requests over
  2020. // a single connection to Addr.
  2021. //
  2022. // DefaultMaxPendingRequests is used by default.
  2023. MaxPendingRequests int
  2024. // The maximum delay before sending pipelined requests as a batch
  2025. // to the server.
  2026. //
  2027. // By default requests are sent immediately to the server.
  2028. MaxBatchDelay time.Duration
  2029. // Idle connection to the host is closed after this duration.
  2030. //
  2031. // By default idle connection is closed after
  2032. // DefaultMaxIdleConnDuration.
  2033. MaxIdleConnDuration time.Duration
  2034. // Buffer size for responses' reading.
  2035. // This also limits the maximum header size.
  2036. //
  2037. // Default buffer size is used if 0.
  2038. ReadBufferSize int
  2039. // Buffer size for requests' writing.
  2040. //
  2041. // Default buffer size is used if 0.
  2042. WriteBufferSize int
  2043. // Maximum duration for full response reading (including body).
  2044. //
  2045. // By default response read timeout is unlimited.
  2046. ReadTimeout time.Duration
  2047. // Maximum duration for full request writing (including body).
  2048. //
  2049. // By default request write timeout is unlimited.
  2050. WriteTimeout time.Duration
  2051. connClientsLock sync.Mutex
  2052. // NoDefaultUserAgentHeader when set to true, causes the default
  2053. // User-Agent header to be excluded from the Request.
  2054. NoDefaultUserAgentHeader bool
  2055. // Attempt to connect to both ipv4 and ipv6 host addresses
  2056. // if set to true.
  2057. //
  2058. // This option is used only if default TCP dialer is used,
  2059. // i.e. if Dial is blank.
  2060. //
  2061. // By default client connects only to ipv4 addresses,
  2062. // since unfortunately ipv6 remains broken in many networks worldwide :)
  2063. DialDualStack bool
  2064. // Response header names are passed as-is without normalization
  2065. // if this option is set.
  2066. //
  2067. // Disabled header names' normalization may be useful only for proxying
  2068. // responses to other clients expecting case-sensitive
  2069. // header names. See https://github.com/valyala/fasthttp/issues/57
  2070. // for details.
  2071. //
  2072. // By default request and response header names are normalized, i.e.
  2073. // The first letter and the first letters following dashes
  2074. // are uppercased, while all the other letters are lowercased.
  2075. // Examples:
  2076. //
  2077. // * HOST -> Host
  2078. // * content-type -> Content-Type
  2079. // * cONTENT-lenGTH -> Content-Length
  2080. DisableHeaderNamesNormalizing bool
  2081. // Path values are sent as-is without normalization
  2082. //
  2083. // Disabled path normalization may be useful for proxying incoming requests
  2084. // to servers that are expecting paths to be forwarded as-is.
  2085. //
  2086. // By default path values are normalized, i.e.
  2087. // extra slashes are removed, special characters are encoded.
  2088. DisablePathNormalizing bool
  2089. // Whether to use TLS (aka SSL or HTTPS) for host connections.
  2090. IsTLS bool
  2091. }
  2092. type pipelineConnClient struct {
  2093. noCopy noCopy
  2094. workPool sync.Pool
  2095. Logger Logger
  2096. Dial DialFunc
  2097. TLSConfig *tls.Config
  2098. chW chan *pipelineWork
  2099. chR chan *pipelineWork
  2100. tlsConfig *tls.Config
  2101. Addr string
  2102. Name string
  2103. MaxPendingRequests int
  2104. MaxBatchDelay time.Duration
  2105. MaxIdleConnDuration time.Duration
  2106. ReadBufferSize int
  2107. WriteBufferSize int
  2108. ReadTimeout time.Duration
  2109. WriteTimeout time.Duration
  2110. chLock sync.Mutex
  2111. tlsConfigLock sync.Mutex
  2112. NoDefaultUserAgentHeader bool
  2113. DialDualStack bool
  2114. DisableHeaderNamesNormalizing bool
  2115. DisablePathNormalizing bool
  2116. IsTLS bool
  2117. }
  2118. type pipelineWork struct {
  2119. respCopy Response
  2120. deadline time.Time
  2121. err error
  2122. req *Request
  2123. resp *Response
  2124. t *time.Timer
  2125. done chan struct{}
  2126. reqCopy Request
  2127. }
  2128. // DoTimeout performs the given request and waits for response during
  2129. // the given timeout duration.
  2130. //
  2131. // Request must contain at least non-zero RequestURI with full url (including
  2132. // scheme and host) or non-zero Host header + RequestURI.
  2133. //
  2134. // The function doesn't follow redirects.
  2135. //
  2136. // Response is ignored if resp is nil.
  2137. //
  2138. // ErrTimeout is returned if the response wasn't returned during
  2139. // the given timeout.
  2140. //
  2141. // It is recommended obtaining req and resp via AcquireRequest
  2142. // and AcquireResponse in performance-critical code.
  2143. func (c *PipelineClient) DoTimeout(req *Request, resp *Response, timeout time.Duration) error {
  2144. return c.DoDeadline(req, resp, time.Now().Add(timeout))
  2145. }
  2146. // DoDeadline performs the given request and waits for response until
  2147. // the given deadline.
  2148. //
  2149. // Request must contain at least non-zero RequestURI with full url (including
  2150. // scheme and host) or non-zero Host header + RequestURI.
  2151. //
  2152. // The function doesn't follow redirects.
  2153. //
  2154. // Response is ignored if resp is nil.
  2155. //
  2156. // ErrTimeout is returned if the response wasn't returned until
  2157. // the given deadline.
  2158. //
  2159. // It is recommended obtaining req and resp via AcquireRequest
  2160. // and AcquireResponse in performance-critical code.
  2161. func (c *PipelineClient) DoDeadline(req *Request, resp *Response, deadline time.Time) error {
  2162. return c.getConnClient().DoDeadline(req, resp, deadline)
  2163. }
  2164. func (c *pipelineConnClient) DoDeadline(req *Request, resp *Response, deadline time.Time) error {
  2165. c.init()
  2166. timeout := time.Until(deadline)
  2167. if timeout <= 0 {
  2168. return ErrTimeout
  2169. }
  2170. if c.DisablePathNormalizing {
  2171. req.URI().DisablePathNormalizing = true
  2172. }
  2173. userAgentOld := req.Header.UserAgent()
  2174. if len(userAgentOld) == 0 {
  2175. userAgent := c.Name
  2176. if userAgent == "" && !c.NoDefaultUserAgentHeader {
  2177. userAgent = defaultUserAgent
  2178. }
  2179. if userAgent != "" {
  2180. req.Header.userAgent = append(req.Header.userAgent[:0], userAgent...)
  2181. }
  2182. }
  2183. w := c.acquirePipelineWork(timeout)
  2184. w.respCopy.Header.disableNormalizing = c.DisableHeaderNamesNormalizing
  2185. w.req = &w.reqCopy
  2186. w.resp = &w.respCopy
  2187. // Make a copy of the request in order to avoid data races on timeouts
  2188. req.copyToSkipBody(&w.reqCopy)
  2189. swapRequestBody(req, &w.reqCopy)
  2190. // Put the request to outgoing queue
  2191. select {
  2192. case c.chW <- w:
  2193. // Fast path: len(c.ch) < cap(c.ch)
  2194. default:
  2195. // Slow path
  2196. select {
  2197. case c.chW <- w:
  2198. case <-w.t.C:
  2199. c.releasePipelineWork(w)
  2200. return ErrTimeout
  2201. }
  2202. }
  2203. // Wait for the response
  2204. var err error
  2205. select {
  2206. case <-w.done:
  2207. if resp != nil {
  2208. w.respCopy.copyToSkipBody(resp)
  2209. swapResponseBody(resp, &w.respCopy)
  2210. }
  2211. err = w.err
  2212. c.releasePipelineWork(w)
  2213. case <-w.t.C:
  2214. err = ErrTimeout
  2215. }
  2216. return err
  2217. }
  2218. func (c *pipelineConnClient) acquirePipelineWork(timeout time.Duration) (w *pipelineWork) {
  2219. v := c.workPool.Get()
  2220. if v != nil {
  2221. w = v.(*pipelineWork)
  2222. } else {
  2223. w = &pipelineWork{
  2224. done: make(chan struct{}, 1),
  2225. }
  2226. }
  2227. if timeout > 0 {
  2228. if w.t == nil {
  2229. w.t = time.NewTimer(timeout)
  2230. } else {
  2231. w.t.Reset(timeout)
  2232. }
  2233. w.deadline = time.Now().Add(timeout)
  2234. } else {
  2235. w.deadline = zeroTime
  2236. }
  2237. return w
  2238. }
  2239. func (c *pipelineConnClient) releasePipelineWork(w *pipelineWork) {
  2240. if w.t != nil {
  2241. w.t.Stop()
  2242. }
  2243. w.reqCopy.Reset()
  2244. w.respCopy.Reset()
  2245. w.req = nil
  2246. w.resp = nil
  2247. w.err = nil
  2248. c.workPool.Put(w)
  2249. }
  2250. // Do performs the given http request and sets the corresponding response.
  2251. //
  2252. // Request must contain at least non-zero RequestURI with full url (including
  2253. // scheme and host) or non-zero Host header + RequestURI.
  2254. //
  2255. // The function doesn't follow redirects. Use Get* for following redirects.
  2256. //
  2257. // Response is ignored if resp is nil.
  2258. //
  2259. // It is recommended obtaining req and resp via AcquireRequest
  2260. // and AcquireResponse in performance-critical code.
  2261. func (c *PipelineClient) Do(req *Request, resp *Response) error {
  2262. return c.getConnClient().Do(req, resp)
  2263. }
  2264. func (c *pipelineConnClient) Do(req *Request, resp *Response) error {
  2265. c.init()
  2266. if c.DisablePathNormalizing {
  2267. req.URI().DisablePathNormalizing = true
  2268. }
  2269. userAgentOld := req.Header.UserAgent()
  2270. if len(userAgentOld) == 0 {
  2271. userAgent := c.Name
  2272. if userAgent == "" && !c.NoDefaultUserAgentHeader {
  2273. userAgent = defaultUserAgent
  2274. }
  2275. if userAgent != "" {
  2276. req.Header.userAgent = append(req.Header.userAgent[:0], userAgent...)
  2277. }
  2278. }
  2279. w := c.acquirePipelineWork(0)
  2280. w.req = req
  2281. if resp != nil {
  2282. resp.Header.disableNormalizing = c.DisableHeaderNamesNormalizing
  2283. w.resp = resp
  2284. } else {
  2285. w.resp = &w.respCopy
  2286. }
  2287. // Put the request to outgoing queue
  2288. select {
  2289. case c.chW <- w:
  2290. default:
  2291. // Try substituting the oldest w with the current one.
  2292. select {
  2293. case wOld := <-c.chW:
  2294. wOld.err = ErrPipelineOverflow
  2295. wOld.done <- struct{}{}
  2296. default:
  2297. }
  2298. select {
  2299. case c.chW <- w:
  2300. default:
  2301. c.releasePipelineWork(w)
  2302. return ErrPipelineOverflow
  2303. }
  2304. }
  2305. // Wait for the response
  2306. <-w.done
  2307. err := w.err
  2308. c.releasePipelineWork(w)
  2309. return err
  2310. }
  2311. func (c *PipelineClient) getConnClient() *pipelineConnClient {
  2312. c.connClientsLock.Lock()
  2313. cc := c.getConnClientUnlocked()
  2314. c.connClientsLock.Unlock()
  2315. return cc
  2316. }
  2317. func (c *PipelineClient) getConnClientUnlocked() *pipelineConnClient {
  2318. if len(c.connClients) == 0 {
  2319. return c.newConnClient()
  2320. }
  2321. // Return the client with the minimum number of pending requests.
  2322. minCC := c.connClients[0]
  2323. minReqs := minCC.PendingRequests()
  2324. if minReqs == 0 {
  2325. return minCC
  2326. }
  2327. for i := 1; i < len(c.connClients); i++ {
  2328. cc := c.connClients[i]
  2329. reqs := cc.PendingRequests()
  2330. if reqs == 0 {
  2331. return cc
  2332. }
  2333. if reqs < minReqs {
  2334. minCC = cc
  2335. minReqs = reqs
  2336. }
  2337. }
  2338. maxConns := c.MaxConns
  2339. if maxConns <= 0 {
  2340. maxConns = 1
  2341. }
  2342. if len(c.connClients) < maxConns {
  2343. return c.newConnClient()
  2344. }
  2345. return minCC
  2346. }
  2347. func (c *PipelineClient) newConnClient() *pipelineConnClient {
  2348. cc := &pipelineConnClient{
  2349. Addr: c.Addr,
  2350. Name: c.Name,
  2351. NoDefaultUserAgentHeader: c.NoDefaultUserAgentHeader,
  2352. MaxPendingRequests: c.MaxPendingRequests,
  2353. MaxBatchDelay: c.MaxBatchDelay,
  2354. Dial: c.Dial,
  2355. DialDualStack: c.DialDualStack,
  2356. DisableHeaderNamesNormalizing: c.DisableHeaderNamesNormalizing,
  2357. DisablePathNormalizing: c.DisablePathNormalizing,
  2358. IsTLS: c.IsTLS,
  2359. TLSConfig: c.TLSConfig,
  2360. MaxIdleConnDuration: c.MaxIdleConnDuration,
  2361. ReadBufferSize: c.ReadBufferSize,
  2362. WriteBufferSize: c.WriteBufferSize,
  2363. ReadTimeout: c.ReadTimeout,
  2364. WriteTimeout: c.WriteTimeout,
  2365. Logger: c.Logger,
  2366. }
  2367. c.connClients = append(c.connClients, cc)
  2368. return cc
  2369. }
  2370. // ErrPipelineOverflow may be returned from PipelineClient.Do*
  2371. // if the requests' queue is overflowed.
  2372. var ErrPipelineOverflow = errors.New("pipelined requests' queue has been overflowed. Increase MaxConns and/or MaxPendingRequests")
  2373. // DefaultMaxPendingRequests is the default value
  2374. // for PipelineClient.MaxPendingRequests.
  2375. const DefaultMaxPendingRequests = 1024
  2376. func (c *pipelineConnClient) init() {
  2377. c.chLock.Lock()
  2378. if c.chR == nil {
  2379. maxPendingRequests := c.MaxPendingRequests
  2380. if maxPendingRequests <= 0 {
  2381. maxPendingRequests = DefaultMaxPendingRequests
  2382. }
  2383. c.chR = make(chan *pipelineWork, maxPendingRequests)
  2384. if c.chW == nil {
  2385. c.chW = make(chan *pipelineWork, maxPendingRequests)
  2386. }
  2387. go func() {
  2388. // Keep restarting the worker if it fails (connection errors for example).
  2389. for {
  2390. if err := c.worker(); err != nil {
  2391. c.logger().Printf("error in PipelineClient(%q): %v", c.Addr, err)
  2392. if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
  2393. // Throttle client reconnections on timeout errors
  2394. time.Sleep(time.Second)
  2395. }
  2396. } else {
  2397. c.chLock.Lock()
  2398. stop := len(c.chR) == 0 && len(c.chW) == 0
  2399. if !stop {
  2400. c.chR = nil
  2401. c.chW = nil
  2402. }
  2403. c.chLock.Unlock()
  2404. if stop {
  2405. break
  2406. }
  2407. }
  2408. }
  2409. }()
  2410. }
  2411. c.chLock.Unlock()
  2412. }
  2413. func (c *pipelineConnClient) worker() error {
  2414. tlsConfig := c.cachedTLSConfig()
  2415. conn, err := dialAddr(c.Addr, c.Dial, nil, c.DialDualStack, c.IsTLS, tlsConfig, 0, c.WriteTimeout)
  2416. if err != nil {
  2417. return err
  2418. }
  2419. // Start reader and writer
  2420. stopW := make(chan struct{})
  2421. doneW := make(chan error)
  2422. go func() {
  2423. doneW <- c.writer(conn, stopW)
  2424. }()
  2425. stopR := make(chan struct{})
  2426. doneR := make(chan error)
  2427. go func() {
  2428. doneR <- c.reader(conn, stopR)
  2429. }()
  2430. // Wait until reader and writer are stopped
  2431. select {
  2432. case err = <-doneW:
  2433. conn.Close()
  2434. close(stopR)
  2435. <-doneR
  2436. case err = <-doneR:
  2437. conn.Close()
  2438. close(stopW)
  2439. <-doneW
  2440. }
  2441. // Notify pending readers
  2442. for len(c.chR) > 0 {
  2443. w := <-c.chR
  2444. w.err = errPipelineConnStopped
  2445. w.done <- struct{}{}
  2446. }
  2447. return err
  2448. }
  2449. func (c *pipelineConnClient) cachedTLSConfig() *tls.Config {
  2450. if !c.IsTLS {
  2451. return nil
  2452. }
  2453. c.tlsConfigLock.Lock()
  2454. cfg := c.tlsConfig
  2455. if cfg == nil {
  2456. cfg = newClientTLSConfig(c.TLSConfig, c.Addr)
  2457. c.tlsConfig = cfg
  2458. }
  2459. c.tlsConfigLock.Unlock()
  2460. return cfg
  2461. }
  2462. func (c *pipelineConnClient) writer(conn net.Conn, stopCh <-chan struct{}) error {
  2463. writeBufferSize := c.WriteBufferSize
  2464. if writeBufferSize <= 0 {
  2465. writeBufferSize = defaultWriteBufferSize
  2466. }
  2467. bw := bufio.NewWriterSize(conn, writeBufferSize)
  2468. defer bw.Flush()
  2469. chR := c.chR
  2470. chW := c.chW
  2471. writeTimeout := c.WriteTimeout
  2472. maxIdleConnDuration := c.MaxIdleConnDuration
  2473. if maxIdleConnDuration <= 0 {
  2474. maxIdleConnDuration = DefaultMaxIdleConnDuration
  2475. }
  2476. maxBatchDelay := c.MaxBatchDelay
  2477. var (
  2478. stopTimer = time.NewTimer(time.Hour)
  2479. flushTimer = time.NewTimer(time.Hour)
  2480. flushTimerCh <-chan time.Time
  2481. instantTimerCh = make(chan time.Time)
  2482. w *pipelineWork
  2483. err error
  2484. )
  2485. close(instantTimerCh)
  2486. for {
  2487. againChW:
  2488. select {
  2489. case w = <-chW:
  2490. // Fast path: len(chW) > 0
  2491. default:
  2492. // Slow path
  2493. stopTimer.Reset(maxIdleConnDuration)
  2494. select {
  2495. case w = <-chW:
  2496. case <-stopTimer.C:
  2497. return nil
  2498. case <-stopCh:
  2499. return nil
  2500. case <-flushTimerCh:
  2501. if err = bw.Flush(); err != nil {
  2502. return err
  2503. }
  2504. flushTimerCh = nil
  2505. goto againChW
  2506. }
  2507. }
  2508. if !w.deadline.IsZero() && time.Since(w.deadline) >= 0 {
  2509. w.err = ErrTimeout
  2510. w.done <- struct{}{}
  2511. continue
  2512. }
  2513. w.resp.ParseNetConn(conn)
  2514. if writeTimeout > 0 {
  2515. // Set Deadline every time, since golang has fixed the performance issue
  2516. // See https://github.com/golang/go/issues/15133#issuecomment-271571395 for details
  2517. currentTime := time.Now()
  2518. if err = conn.SetWriteDeadline(currentTime.Add(writeTimeout)); err != nil {
  2519. w.err = err
  2520. w.done <- struct{}{}
  2521. return err
  2522. }
  2523. }
  2524. if err = w.req.Write(bw); err != nil {
  2525. w.err = err
  2526. w.done <- struct{}{}
  2527. return err
  2528. }
  2529. if flushTimerCh == nil && (len(chW) == 0 || len(chR) == cap(chR)) {
  2530. if maxBatchDelay > 0 {
  2531. flushTimer.Reset(maxBatchDelay)
  2532. flushTimerCh = flushTimer.C
  2533. } else {
  2534. flushTimerCh = instantTimerCh
  2535. }
  2536. }
  2537. againChR:
  2538. select {
  2539. case chR <- w:
  2540. // Fast path: len(chR) < cap(chR)
  2541. default:
  2542. // Slow path
  2543. select {
  2544. case chR <- w:
  2545. case <-stopCh:
  2546. w.err = errPipelineConnStopped
  2547. w.done <- struct{}{}
  2548. return nil
  2549. case <-flushTimerCh:
  2550. if err = bw.Flush(); err != nil {
  2551. w.err = err
  2552. w.done <- struct{}{}
  2553. return err
  2554. }
  2555. flushTimerCh = nil
  2556. goto againChR
  2557. }
  2558. }
  2559. }
  2560. }
  2561. func (c *pipelineConnClient) reader(conn net.Conn, stopCh <-chan struct{}) error {
  2562. readBufferSize := c.ReadBufferSize
  2563. if readBufferSize <= 0 {
  2564. readBufferSize = defaultReadBufferSize
  2565. }
  2566. br := bufio.NewReaderSize(conn, readBufferSize)
  2567. chR := c.chR
  2568. readTimeout := c.ReadTimeout
  2569. var (
  2570. w *pipelineWork
  2571. err error
  2572. )
  2573. for {
  2574. select {
  2575. case w = <-chR:
  2576. // Fast path: len(chR) > 0
  2577. default:
  2578. // Slow path
  2579. select {
  2580. case w = <-chR:
  2581. case <-stopCh:
  2582. return nil
  2583. }
  2584. }
  2585. if readTimeout > 0 {
  2586. // Set Deadline every time, since golang has fixed the performance issue
  2587. // See https://github.com/golang/go/issues/15133#issuecomment-271571395 for details
  2588. currentTime := time.Now()
  2589. if err = conn.SetReadDeadline(currentTime.Add(readTimeout)); err != nil {
  2590. w.err = err
  2591. w.done <- struct{}{}
  2592. return err
  2593. }
  2594. }
  2595. if err = w.resp.Read(br); err != nil {
  2596. w.err = err
  2597. w.done <- struct{}{}
  2598. return err
  2599. }
  2600. w.done <- struct{}{}
  2601. }
  2602. }
  2603. func (c *pipelineConnClient) logger() Logger {
  2604. if c.Logger != nil {
  2605. return c.Logger
  2606. }
  2607. return defaultLogger
  2608. }
  2609. // PendingRequests returns the current number of pending requests pipelined
  2610. // to the server.
  2611. //
  2612. // This number may exceed MaxPendingRequests*MaxConns by up to two times, since
  2613. // each connection to the server may keep up to MaxPendingRequests requests
  2614. // in the queue before sending them to the server.
  2615. //
  2616. // This function may be used for balancing load among multiple PipelineClient
  2617. // instances.
  2618. func (c *PipelineClient) PendingRequests() int {
  2619. c.connClientsLock.Lock()
  2620. n := 0
  2621. for _, cc := range c.connClients {
  2622. n += cc.PendingRequests()
  2623. }
  2624. c.connClientsLock.Unlock()
  2625. return n
  2626. }
  2627. func (c *pipelineConnClient) PendingRequests() int {
  2628. c.init()
  2629. c.chLock.Lock()
  2630. n := len(c.chR) + len(c.chW)
  2631. c.chLock.Unlock()
  2632. return n
  2633. }
  2634. var errPipelineConnStopped = errors.New("pipeline connection has been stopped")
  2635. var DefaultTransport RoundTripper = &transport{}
  2636. type transport struct{}
  2637. func (t *transport) RoundTrip(hc *HostClient, req *Request, resp *Response) (retry bool, err error) {
  2638. customSkipBody := resp.SkipBody
  2639. customStreamBody := resp.StreamBody
  2640. var deadline time.Time
  2641. if req.timeout > 0 {
  2642. deadline = time.Now().Add(req.timeout)
  2643. }
  2644. cc, err := hc.AcquireConn(req.timeout, req.ConnectionClose())
  2645. if err != nil {
  2646. return false, err
  2647. }
  2648. conn := cc.c
  2649. resp.ParseNetConn(conn)
  2650. writeDeadline := deadline
  2651. if hc.WriteTimeout > 0 {
  2652. tmpWriteDeadline := time.Now().Add(hc.WriteTimeout)
  2653. if writeDeadline.IsZero() || tmpWriteDeadline.Before(writeDeadline) {
  2654. writeDeadline = tmpWriteDeadline
  2655. }
  2656. }
  2657. if err = conn.SetWriteDeadline(writeDeadline); err != nil {
  2658. hc.CloseConn(cc)
  2659. return true, err
  2660. }
  2661. resetConnection := false
  2662. if hc.MaxConnDuration > 0 && time.Since(cc.createdTime) > hc.MaxConnDuration && !req.ConnectionClose() {
  2663. req.SetConnectionClose()
  2664. resetConnection = true
  2665. }
  2666. bw := hc.AcquireWriter(conn)
  2667. err = req.Write(bw)
  2668. if resetConnection {
  2669. req.Header.ResetConnectionClose()
  2670. }
  2671. if err == nil {
  2672. err = bw.Flush()
  2673. }
  2674. hc.ReleaseWriter(bw)
  2675. // Return ErrTimeout on any timeout.
  2676. if x, ok := err.(interface{ Timeout() bool }); ok && x.Timeout() {
  2677. err = ErrTimeout
  2678. }
  2679. if err != nil {
  2680. hc.CloseConn(cc)
  2681. return true, err
  2682. }
  2683. readDeadline := deadline
  2684. if hc.ReadTimeout > 0 {
  2685. tmpReadDeadline := time.Now().Add(hc.ReadTimeout)
  2686. if readDeadline.IsZero() || tmpReadDeadline.Before(readDeadline) {
  2687. readDeadline = tmpReadDeadline
  2688. }
  2689. }
  2690. if err = conn.SetReadDeadline(readDeadline); err != nil {
  2691. hc.CloseConn(cc)
  2692. return true, err
  2693. }
  2694. if customSkipBody || req.Header.IsHead() {
  2695. resp.SkipBody = true
  2696. }
  2697. if hc.DisableHeaderNamesNormalizing {
  2698. resp.Header.DisableNormalizing()
  2699. }
  2700. br := hc.AcquireReader(conn)
  2701. err = resp.ReadLimitBody(br, hc.MaxResponseBodySize)
  2702. if err != nil {
  2703. hc.ReleaseReader(br)
  2704. hc.CloseConn(cc)
  2705. // Don't retry in case of ErrBodyTooLarge since we will just get the same again.
  2706. needRetry := err != ErrBodyTooLarge
  2707. return needRetry, err
  2708. }
  2709. closeConn := resetConnection || req.ConnectionClose() || resp.ConnectionClose()
  2710. if customStreamBody && resp.bodyStream != nil {
  2711. rbs := resp.bodyStream
  2712. resp.bodyStream = newCloseReaderWithError(rbs, func(wErr error) error {
  2713. hc.ReleaseReader(br)
  2714. if r, ok := rbs.(*requestStream); ok {
  2715. releaseRequestStream(r)
  2716. }
  2717. if closeConn || resp.ConnectionClose() || wErr != nil {
  2718. hc.CloseConn(cc)
  2719. } else {
  2720. hc.ReleaseConn(cc)
  2721. }
  2722. return nil
  2723. })
  2724. return false, nil
  2725. }
  2726. hc.ReleaseReader(br)
  2727. if closeConn {
  2728. hc.CloseConn(cc)
  2729. } else {
  2730. hc.ReleaseConn(cc)
  2731. }
  2732. return false, nil
  2733. }