client.go 78 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910291129122913291429152916291729182919292029212922292329242925292629272928292929302931293229332934
  1. package fasthttp
  2. import (
  3. "bufio"
  4. "crypto/tls"
  5. "errors"
  6. "fmt"
  7. "io"
  8. "net"
  9. "strings"
  10. "sync"
  11. "sync/atomic"
  12. "time"
  13. )
  14. // Do performs the given http request and fills the given http response.
  15. //
  16. // Request must contain at least non-zero RequestURI with full url (including
  17. // scheme and host) or non-zero Host header + RequestURI.
  18. //
  19. // Client determines the server to be requested in the following order:
  20. //
  21. // - from RequestURI if it contains full url with scheme and host;
  22. // - from Host header otherwise.
  23. //
  24. // The function doesn't follow redirects. Use Get* for following redirects.
  25. //
  26. // Response is ignored if resp is nil.
  27. //
  28. // ErrNoFreeConns is returned if all DefaultMaxConnsPerHost connections
  29. // to the requested host are busy.
  30. //
  31. // It is recommended obtaining req and resp via AcquireRequest
  32. // and AcquireResponse in performance-critical code.
  33. func Do(req *Request, resp *Response) error {
  34. return defaultClient.Do(req, resp)
  35. }
  36. // DoTimeout performs the given request and waits for response during
  37. // the given timeout duration.
  38. //
  39. // Request must contain at least non-zero RequestURI with full url (including
  40. // scheme and host) or non-zero Host header + RequestURI.
  41. //
  42. // Client determines the server to be requested in the following order:
  43. //
  44. // - from RequestURI if it contains full url with scheme and host;
  45. // - from Host header otherwise.
  46. //
  47. // The function doesn't follow redirects. Use Get* for following redirects.
  48. //
  49. // Response is ignored if resp is nil.
  50. //
  51. // ErrTimeout is returned if the response wasn't returned during
  52. // the given timeout.
  53. //
  54. // ErrNoFreeConns is returned if all DefaultMaxConnsPerHost connections
  55. // to the requested host are busy.
  56. //
  57. // It is recommended obtaining req and resp via AcquireRequest
  58. // and AcquireResponse in performance-critical code.
  59. //
  60. // Warning: DoTimeout does not terminate the request itself. The request will
  61. // continue in the background and the response will be discarded.
  62. // If requests take too long and the connection pool gets filled up please
  63. // try using a Client and setting a ReadTimeout.
  64. func DoTimeout(req *Request, resp *Response, timeout time.Duration) error {
  65. return defaultClient.DoTimeout(req, resp, timeout)
  66. }
  67. // DoDeadline performs the given request and waits for response until
  68. // the given deadline.
  69. //
  70. // Request must contain at least non-zero RequestURI with full url (including
  71. // scheme and host) or non-zero Host header + RequestURI.
  72. //
  73. // Client determines the server to be requested in the following order:
  74. //
  75. // - from RequestURI if it contains full url with scheme and host;
  76. // - from Host header otherwise.
  77. //
  78. // The function doesn't follow redirects. Use Get* for following redirects.
  79. //
  80. // Response is ignored if resp is nil.
  81. //
  82. // ErrTimeout is returned if the response wasn't returned until
  83. // the given deadline.
  84. //
  85. // ErrNoFreeConns is returned if all DefaultMaxConnsPerHost connections
  86. // to the requested host are busy.
  87. //
  88. // It is recommended obtaining req and resp via AcquireRequest
  89. // and AcquireResponse in performance-critical code.
  90. func DoDeadline(req *Request, resp *Response, deadline time.Time) error {
  91. return defaultClient.DoDeadline(req, resp, deadline)
  92. }
  93. // DoRedirects performs the given http request and fills the given http response,
  94. // following up to maxRedirectsCount redirects. When the redirect count exceeds
  95. // maxRedirectsCount, ErrTooManyRedirects is returned.
  96. //
  97. // Request must contain at least non-zero RequestURI with full url (including
  98. // scheme and host) or non-zero Host header + RequestURI.
  99. //
  100. // Client determines the server to be requested in the following order:
  101. //
  102. // - from RequestURI if it contains full url with scheme and host;
  103. // - from Host header otherwise.
  104. //
  105. // Response is ignored if resp is nil.
  106. //
  107. // ErrNoFreeConns is returned if all DefaultMaxConnsPerHost connections
  108. // to the requested host are busy.
  109. //
  110. // It is recommended obtaining req and resp via AcquireRequest
  111. // and AcquireResponse in performance-critical code.
  112. func DoRedirects(req *Request, resp *Response, maxRedirectsCount int) error {
  113. _, _, err := doRequestFollowRedirects(req, resp, req.URI().String(), maxRedirectsCount, &defaultClient)
  114. return err
  115. }
  116. // Get returns the status code and body of url.
  117. //
  118. // The contents of dst will be replaced by the body and returned, if the dst
  119. // is too small a new slice will be allocated.
  120. //
  121. // The function follows redirects. Use Do* for manually handling redirects.
  122. func Get(dst []byte, url string) (statusCode int, body []byte, err error) {
  123. return defaultClient.Get(dst, url)
  124. }
  125. // GetTimeout returns the status code and body of url.
  126. //
  127. // The contents of dst will be replaced by the body and returned, if the dst
  128. // is too small a new slice will be allocated.
  129. //
  130. // The function follows redirects. Use Do* for manually handling redirects.
  131. //
  132. // ErrTimeout error is returned if url contents couldn't be fetched
  133. // during the given timeout.
  134. func GetTimeout(dst []byte, url string, timeout time.Duration) (statusCode int, body []byte, err error) {
  135. return defaultClient.GetTimeout(dst, url, timeout)
  136. }
  137. // GetDeadline returns the status code and body of url.
  138. //
  139. // The contents of dst will be replaced by the body and returned, if the dst
  140. // is too small a new slice will be allocated.
  141. //
  142. // The function follows redirects. Use Do* for manually handling redirects.
  143. //
  144. // ErrTimeout error is returned if url contents couldn't be fetched
  145. // until the given deadline.
  146. func GetDeadline(dst []byte, url string, deadline time.Time) (statusCode int, body []byte, err error) {
  147. return defaultClient.GetDeadline(dst, url, deadline)
  148. }
  149. // Post sends POST request to the given url with the given POST arguments.
  150. //
  151. // The contents of dst will be replaced by the body and returned, if the dst
  152. // is too small a new slice will be allocated.
  153. //
  154. // The function follows redirects. Use Do* for manually handling redirects.
  155. //
  156. // Empty POST body is sent if postArgs is nil.
  157. func Post(dst []byte, url string, postArgs *Args) (statusCode int, body []byte, err error) {
  158. return defaultClient.Post(dst, url, postArgs)
  159. }
  160. var defaultClient Client
  161. // Client implements http client.
  162. //
  163. // Copying Client by value is prohibited. Create new instance instead.
  164. //
  165. // It is safe calling Client methods from concurrently running goroutines.
  166. //
  167. // The fields of a Client should not be changed while it is in use.
  168. type Client struct {
  169. noCopy noCopy
  170. // Client name. Used in User-Agent request header.
  171. //
  172. // Default client name is used if not set.
  173. Name string
  174. // NoDefaultUserAgentHeader when set to true, causes the default
  175. // User-Agent header to be excluded from the Request.
  176. NoDefaultUserAgentHeader bool
  177. // Callback for establishing new connections to hosts.
  178. //
  179. // Default Dial is used if not set.
  180. Dial DialFunc
  181. // Attempt to connect to both ipv4 and ipv6 addresses if set to true.
  182. //
  183. // This option is used only if default TCP dialer is used,
  184. // i.e. if Dial is blank.
  185. //
  186. // By default client connects only to ipv4 addresses,
  187. // since unfortunately ipv6 remains broken in many networks worldwide :)
  188. DialDualStack bool
  189. // TLS config for https connections.
  190. //
  191. // Default TLS config is used if not set.
  192. TLSConfig *tls.Config
  193. // Maximum number of connections per each host which may be established.
  194. //
  195. // DefaultMaxConnsPerHost is used if not set.
  196. MaxConnsPerHost int
  197. // Idle keep-alive connections are closed after this duration.
  198. //
  199. // By default idle connections are closed
  200. // after DefaultMaxIdleConnDuration.
  201. MaxIdleConnDuration time.Duration
  202. // Keep-alive connections are closed after this duration.
  203. //
  204. // By default connection duration is unlimited.
  205. MaxConnDuration time.Duration
  206. // Maximum number of attempts for idempotent calls
  207. //
  208. // DefaultMaxIdemponentCallAttempts is used if not set.
  209. MaxIdemponentCallAttempts int
  210. // Per-connection buffer size for responses' reading.
  211. // This also limits the maximum header size.
  212. //
  213. // Default buffer size is used if 0.
  214. ReadBufferSize int
  215. // Per-connection buffer size for requests' writing.
  216. //
  217. // Default buffer size is used if 0.
  218. WriteBufferSize int
  219. // Maximum duration for full response reading (including body).
  220. //
  221. // By default response read timeout is unlimited.
  222. ReadTimeout time.Duration
  223. // Maximum duration for full request writing (including body).
  224. //
  225. // By default request write timeout is unlimited.
  226. WriteTimeout time.Duration
  227. // Maximum response body size.
  228. //
  229. // The client returns ErrBodyTooLarge if this limit is greater than 0
  230. // and response body is greater than the limit.
  231. //
  232. // By default response body size is unlimited.
  233. MaxResponseBodySize int
  234. // Header names are passed as-is without normalization
  235. // if this option is set.
  236. //
  237. // Disabled header names' normalization may be useful only for proxying
  238. // responses to other clients expecting case-sensitive
  239. // header names. See https://github.com/valyala/fasthttp/issues/57
  240. // for details.
  241. //
  242. // By default request and response header names are normalized, i.e.
  243. // The first letter and the first letters following dashes
  244. // are uppercased, while all the other letters are lowercased.
  245. // Examples:
  246. //
  247. // * HOST -> Host
  248. // * content-type -> Content-Type
  249. // * cONTENT-lenGTH -> Content-Length
  250. DisableHeaderNamesNormalizing bool
  251. // Path values are sent as-is without normalization
  252. //
  253. // Disabled path normalization may be useful for proxying incoming requests
  254. // to servers that are expecting paths to be forwarded as-is.
  255. //
  256. // By default path values are normalized, i.e.
  257. // extra slashes are removed, special characters are encoded.
  258. DisablePathNormalizing bool
  259. // Maximum duration for waiting for a free connection.
  260. //
  261. // By default will not waiting, return ErrNoFreeConns immediately
  262. MaxConnWaitTimeout time.Duration
  263. // RetryIf controls whether a retry should be attempted after an error.
  264. //
  265. // By default will use isIdempotent function
  266. RetryIf RetryIfFunc
  267. // Connection pool strategy. Can be either LIFO or FIFO (default).
  268. ConnPoolStrategy ConnPoolStrategyType
  269. // StreamResponseBody enables response body streaming
  270. StreamResponseBody bool
  271. // ConfigureClient configures the fasthttp.HostClient.
  272. ConfigureClient func(hc *HostClient) error
  273. mLock sync.Mutex
  274. m map[string]*HostClient
  275. ms map[string]*HostClient
  276. readerPool sync.Pool
  277. writerPool sync.Pool
  278. }
  279. // Get returns the status code and body of url.
  280. //
  281. // The contents of dst will be replaced by the body and returned, if the dst
  282. // is too small a new slice will be allocated.
  283. //
  284. // The function follows redirects. Use Do* for manually handling redirects.
  285. func (c *Client) Get(dst []byte, url string) (statusCode int, body []byte, err error) {
  286. return clientGetURL(dst, url, c)
  287. }
  288. // GetTimeout returns the status code and body of url.
  289. //
  290. // The contents of dst will be replaced by the body and returned, if the dst
  291. // is too small a new slice will be allocated.
  292. //
  293. // The function follows redirects. Use Do* for manually handling redirects.
  294. //
  295. // ErrTimeout error is returned if url contents couldn't be fetched
  296. // during the given timeout.
  297. func (c *Client) GetTimeout(dst []byte, url string, timeout time.Duration) (statusCode int, body []byte, err error) {
  298. return clientGetURLTimeout(dst, url, timeout, c)
  299. }
  300. // GetDeadline returns the status code and body of url.
  301. //
  302. // The contents of dst will be replaced by the body and returned, if the dst
  303. // is too small a new slice will be allocated.
  304. //
  305. // The function follows redirects. Use Do* for manually handling redirects.
  306. //
  307. // ErrTimeout error is returned if url contents couldn't be fetched
  308. // until the given deadline.
  309. func (c *Client) GetDeadline(dst []byte, url string, deadline time.Time) (statusCode int, body []byte, err error) {
  310. return clientGetURLDeadline(dst, url, deadline, c)
  311. }
  312. // Post sends POST request to the given url with the given POST arguments.
  313. //
  314. // The contents of dst will be replaced by the body and returned, if the dst
  315. // is too small a new slice will be allocated.
  316. //
  317. // The function follows redirects. Use Do* for manually handling redirects.
  318. //
  319. // Empty POST body is sent if postArgs is nil.
  320. func (c *Client) Post(dst []byte, url string, postArgs *Args) (statusCode int, body []byte, err error) {
  321. return clientPostURL(dst, url, postArgs, c)
  322. }
  323. // DoTimeout performs the given request and waits for response during
  324. // the given timeout duration.
  325. //
  326. // Request must contain at least non-zero RequestURI with full url (including
  327. // scheme and host) or non-zero Host header + RequestURI.
  328. //
  329. // Client determines the server to be requested in the following order:
  330. //
  331. // - from RequestURI if it contains full url with scheme and host;
  332. // - from Host header otherwise.
  333. //
  334. // The function doesn't follow redirects. Use Get* for following redirects.
  335. //
  336. // Response is ignored if resp is nil.
  337. //
  338. // ErrTimeout is returned if the response wasn't returned during
  339. // the given timeout.
  340. // Immediately returns ErrTimeout if timeout value is negative.
  341. //
  342. // ErrNoFreeConns is returned if all Client.MaxConnsPerHost connections
  343. // to the requested host are busy.
  344. //
  345. // It is recommended obtaining req and resp via AcquireRequest
  346. // and AcquireResponse in performance-critical code.
  347. //
  348. // Warning: DoTimeout does not terminate the request itself. The request will
  349. // continue in the background and the response will be discarded.
  350. // If requests take too long and the connection pool gets filled up please
  351. // try setting a ReadTimeout.
  352. func (c *Client) DoTimeout(req *Request, resp *Response, timeout time.Duration) error {
  353. req.timeout = timeout
  354. if req.timeout < 0 {
  355. return ErrTimeout
  356. }
  357. return c.Do(req, resp)
  358. }
  359. // DoDeadline performs the given request and waits for response until
  360. // the given deadline.
  361. //
  362. // Request must contain at least non-zero RequestURI with full url (including
  363. // scheme and host) or non-zero Host header + RequestURI.
  364. //
  365. // Client determines the server to be requested in the following order:
  366. //
  367. // - from RequestURI if it contains full url with scheme and host;
  368. // - from Host header otherwise.
  369. //
  370. // The function doesn't follow redirects. Use Get* for following redirects.
  371. //
  372. // Response is ignored if resp is nil.
  373. //
  374. // ErrTimeout is returned if the response wasn't returned until
  375. // the given deadline.
  376. // Immediately returns ErrTimeout if the deadline has already been reached.
  377. //
  378. // ErrNoFreeConns is returned if all Client.MaxConnsPerHost connections
  379. // to the requested host are busy.
  380. //
  381. // It is recommended obtaining req and resp via AcquireRequest
  382. // and AcquireResponse in performance-critical code.
  383. func (c *Client) DoDeadline(req *Request, resp *Response, deadline time.Time) error {
  384. req.timeout = time.Until(deadline)
  385. if req.timeout < 0 {
  386. return ErrTimeout
  387. }
  388. return c.Do(req, resp)
  389. }
  390. // DoRedirects performs the given http request and fills the given http response,
  391. // following up to maxRedirectsCount redirects. When the redirect count exceeds
  392. // maxRedirectsCount, ErrTooManyRedirects is returned.
  393. //
  394. // Request must contain at least non-zero RequestURI with full url (including
  395. // scheme and host) or non-zero Host header + RequestURI.
  396. //
  397. // Client determines the server to be requested in the following order:
  398. //
  399. // - from RequestURI if it contains full url with scheme and host;
  400. // - from Host header otherwise.
  401. //
  402. // Response is ignored if resp is nil.
  403. //
  404. // ErrNoFreeConns is returned if all DefaultMaxConnsPerHost connections
  405. // to the requested host are busy.
  406. //
  407. // It is recommended obtaining req and resp via AcquireRequest
  408. // and AcquireResponse in performance-critical code.
  409. func (c *Client) DoRedirects(req *Request, resp *Response, maxRedirectsCount int) error {
  410. _, _, err := doRequestFollowRedirects(req, resp, req.URI().String(), maxRedirectsCount, c)
  411. return err
  412. }
  413. // Do performs the given http request and fills the given http response.
  414. //
  415. // Request must contain at least non-zero RequestURI with full url (including
  416. // scheme and host) or non-zero Host header + RequestURI.
  417. //
  418. // Client determines the server to be requested in the following order:
  419. //
  420. // - from RequestURI if it contains full url with scheme and host;
  421. // - from Host header otherwise.
  422. //
  423. // Response is ignored if resp is nil.
  424. //
  425. // The function doesn't follow redirects. Use Get* for following redirects.
  426. //
  427. // ErrNoFreeConns is returned if all Client.MaxConnsPerHost connections
  428. // to the requested host are busy.
  429. //
  430. // It is recommended obtaining req and resp via AcquireRequest
  431. // and AcquireResponse in performance-critical code.
  432. func (c *Client) Do(req *Request, resp *Response) error {
  433. uri := req.URI()
  434. if uri == nil {
  435. return ErrorInvalidURI
  436. }
  437. host := uri.Host()
  438. isTLS := false
  439. if uri.isHTTPS() {
  440. isTLS = true
  441. } else if !uri.isHTTP() {
  442. return fmt.Errorf("unsupported protocol %q. http and https are supported", uri.Scheme())
  443. }
  444. startCleaner := false
  445. c.mLock.Lock()
  446. m := c.m
  447. if isTLS {
  448. m = c.ms
  449. }
  450. if m == nil {
  451. m = make(map[string]*HostClient)
  452. if isTLS {
  453. c.ms = m
  454. } else {
  455. c.m = m
  456. }
  457. }
  458. hc := m[string(host)]
  459. if hc == nil {
  460. hc = &HostClient{
  461. Addr: AddMissingPort(string(host), isTLS),
  462. Name: c.Name,
  463. NoDefaultUserAgentHeader: c.NoDefaultUserAgentHeader,
  464. Dial: c.Dial,
  465. DialDualStack: c.DialDualStack,
  466. IsTLS: isTLS,
  467. TLSConfig: c.TLSConfig,
  468. MaxConns: c.MaxConnsPerHost,
  469. MaxIdleConnDuration: c.MaxIdleConnDuration,
  470. MaxConnDuration: c.MaxConnDuration,
  471. MaxIdemponentCallAttempts: c.MaxIdemponentCallAttempts,
  472. ReadBufferSize: c.ReadBufferSize,
  473. WriteBufferSize: c.WriteBufferSize,
  474. ReadTimeout: c.ReadTimeout,
  475. WriteTimeout: c.WriteTimeout,
  476. MaxResponseBodySize: c.MaxResponseBodySize,
  477. DisableHeaderNamesNormalizing: c.DisableHeaderNamesNormalizing,
  478. DisablePathNormalizing: c.DisablePathNormalizing,
  479. MaxConnWaitTimeout: c.MaxConnWaitTimeout,
  480. RetryIf: c.RetryIf,
  481. ConnPoolStrategy: c.ConnPoolStrategy,
  482. StreamResponseBody: c.StreamResponseBody,
  483. clientReaderPool: &c.readerPool,
  484. clientWriterPool: &c.writerPool,
  485. }
  486. if c.ConfigureClient != nil {
  487. if err := c.ConfigureClient(hc); err != nil {
  488. c.mLock.Unlock()
  489. return err
  490. }
  491. }
  492. m[string(host)] = hc
  493. if len(m) == 1 {
  494. startCleaner = true
  495. }
  496. }
  497. atomic.AddInt32(&hc.pendingClientRequests, 1)
  498. defer atomic.AddInt32(&hc.pendingClientRequests, -1)
  499. c.mLock.Unlock()
  500. if startCleaner {
  501. go c.mCleaner(m)
  502. }
  503. return hc.Do(req, resp)
  504. }
  505. // CloseIdleConnections closes any connections which were previously
  506. // connected from previous requests but are now sitting idle in a
  507. // "keep-alive" state. It does not interrupt any connections currently
  508. // in use.
  509. func (c *Client) CloseIdleConnections() {
  510. c.mLock.Lock()
  511. for _, v := range c.m {
  512. v.CloseIdleConnections()
  513. }
  514. for _, v := range c.ms {
  515. v.CloseIdleConnections()
  516. }
  517. c.mLock.Unlock()
  518. }
  519. func (c *Client) mCleaner(m map[string]*HostClient) {
  520. mustStop := false
  521. sleep := c.MaxIdleConnDuration
  522. if sleep < time.Second {
  523. sleep = time.Second
  524. } else if sleep > 10*time.Second {
  525. sleep = 10 * time.Second
  526. }
  527. for {
  528. time.Sleep(sleep)
  529. c.mLock.Lock()
  530. for k, v := range m {
  531. v.connsLock.Lock()
  532. if v.connsCount == 0 && atomic.LoadInt32(&v.pendingClientRequests) == 0 {
  533. delete(m, k)
  534. }
  535. v.connsLock.Unlock()
  536. }
  537. if len(m) == 0 {
  538. mustStop = true
  539. }
  540. c.mLock.Unlock()
  541. if mustStop {
  542. break
  543. }
  544. }
  545. }
  546. // DefaultMaxConnsPerHost is the maximum number of concurrent connections
  547. // http client may establish per host by default (i.e. if
  548. // Client.MaxConnsPerHost isn't set).
  549. const DefaultMaxConnsPerHost = 512
  550. // DefaultMaxIdleConnDuration is the default duration before idle keep-alive
  551. // connection is closed.
  552. const DefaultMaxIdleConnDuration = 10 * time.Second
  553. // DefaultMaxIdemponentCallAttempts is the default idempotent calls attempts count.
  554. const DefaultMaxIdemponentCallAttempts = 5
  555. // DialFunc must establish connection to addr.
  556. //
  557. // There is no need in establishing TLS (SSL) connection for https.
  558. // The client automatically converts connection to TLS
  559. // if HostClient.IsTLS is set.
  560. //
  561. // TCP address passed to DialFunc always contains host and port.
  562. // Example TCP addr values:
  563. //
  564. // - foobar.com:80
  565. // - foobar.com:443
  566. // - foobar.com:8080
  567. type DialFunc func(addr string) (net.Conn, error)
  568. // RetryIfFunc signature of retry if function
  569. //
  570. // Request argument passed to RetryIfFunc, if there are any request errors.
  571. type RetryIfFunc func(request *Request) bool
  572. // TransportFunc wraps every request/response.
  573. type TransportFunc func(*Request, *Response) error
  574. // ConnPoolStrategyType define strategy of connection pool enqueue/dequeue
  575. type ConnPoolStrategyType int
  576. const (
  577. FIFO ConnPoolStrategyType = iota
  578. LIFO
  579. )
  580. // HostClient balances http requests among hosts listed in Addr.
  581. //
  582. // HostClient may be used for balancing load among multiple upstream hosts.
  583. // While multiple addresses passed to HostClient.Addr may be used for balancing
  584. // load among them, it would be better using LBClient instead, since HostClient
  585. // may unevenly balance load among upstream hosts.
  586. //
  587. // It is forbidden copying HostClient instances. Create new instances instead.
  588. //
  589. // It is safe calling HostClient methods from concurrently running goroutines.
  590. type HostClient struct {
  591. noCopy noCopy
  592. // Comma-separated list of upstream HTTP server host addresses,
  593. // which are passed to Dial in a round-robin manner.
  594. //
  595. // Each address may contain port if default dialer is used.
  596. // For example,
  597. //
  598. // - foobar.com:80
  599. // - foobar.com:443
  600. // - foobar.com:8080
  601. Addr string
  602. // Client name. Used in User-Agent request header.
  603. Name string
  604. // NoDefaultUserAgentHeader when set to true, causes the default
  605. // User-Agent header to be excluded from the Request.
  606. NoDefaultUserAgentHeader bool
  607. // Callback for establishing new connection to the host.
  608. //
  609. // Default Dial is used if not set.
  610. Dial DialFunc
  611. // Attempt to connect to both ipv4 and ipv6 host addresses
  612. // if set to true.
  613. //
  614. // This option is used only if default TCP dialer is used,
  615. // i.e. if Dial is blank.
  616. //
  617. // By default client connects only to ipv4 addresses,
  618. // since unfortunately ipv6 remains broken in many networks worldwide :)
  619. DialDualStack bool
  620. // Whether to use TLS (aka SSL or HTTPS) for host connections.
  621. IsTLS bool
  622. // Optional TLS config.
  623. TLSConfig *tls.Config
  624. // Maximum number of connections which may be established to all hosts
  625. // listed in Addr.
  626. //
  627. // You can change this value while the HostClient is being used
  628. // with HostClient.SetMaxConns(value)
  629. //
  630. // DefaultMaxConnsPerHost is used if not set.
  631. MaxConns int
  632. // Keep-alive connections are closed after this duration.
  633. //
  634. // By default connection duration is unlimited.
  635. MaxConnDuration time.Duration
  636. // Idle keep-alive connections are closed after this duration.
  637. //
  638. // By default idle connections are closed
  639. // after DefaultMaxIdleConnDuration.
  640. MaxIdleConnDuration time.Duration
  641. // Maximum number of attempts for idempotent calls
  642. //
  643. // DefaultMaxIdemponentCallAttempts is used if not set.
  644. MaxIdemponentCallAttempts int
  645. // Per-connection buffer size for responses' reading.
  646. // This also limits the maximum header size.
  647. //
  648. // Default buffer size is used if 0.
  649. ReadBufferSize int
  650. // Per-connection buffer size for requests' writing.
  651. //
  652. // Default buffer size is used if 0.
  653. WriteBufferSize int
  654. // Maximum duration for full response reading (including body).
  655. //
  656. // By default response read timeout is unlimited.
  657. ReadTimeout time.Duration
  658. // Maximum duration for full request writing (including body).
  659. //
  660. // By default request write timeout is unlimited.
  661. WriteTimeout time.Duration
  662. // Maximum response body size.
  663. //
  664. // The client returns ErrBodyTooLarge if this limit is greater than 0
  665. // and response body is greater than the limit.
  666. //
  667. // By default response body size is unlimited.
  668. MaxResponseBodySize int
  669. // Header names are passed as-is without normalization
  670. // if this option is set.
  671. //
  672. // Disabled header names' normalization may be useful only for proxying
  673. // responses to other clients expecting case-sensitive
  674. // header names. See https://github.com/valyala/fasthttp/issues/57
  675. // for details.
  676. //
  677. // By default request and response header names are normalized, i.e.
  678. // The first letter and the first letters following dashes
  679. // are uppercased, while all the other letters are lowercased.
  680. // Examples:
  681. //
  682. // * HOST -> Host
  683. // * content-type -> Content-Type
  684. // * cONTENT-lenGTH -> Content-Length
  685. DisableHeaderNamesNormalizing bool
  686. // Path values are sent as-is without normalization
  687. //
  688. // Disabled path normalization may be useful for proxying incoming requests
  689. // to servers that are expecting paths to be forwarded as-is.
  690. //
  691. // By default path values are normalized, i.e.
  692. // extra slashes are removed, special characters are encoded.
  693. DisablePathNormalizing bool
  694. // Will not log potentially sensitive content in error logs
  695. //
  696. // This option is useful for servers that handle sensitive data
  697. // in the request/response.
  698. //
  699. // Client logs full errors by default.
  700. SecureErrorLogMessage bool
  701. // Maximum duration for waiting for a free connection.
  702. //
  703. // By default will not waiting, return ErrNoFreeConns immediately
  704. MaxConnWaitTimeout time.Duration
  705. // RetryIf controls whether a retry should be attempted after an error.
  706. //
  707. // By default will use isIdempotent function
  708. RetryIf RetryIfFunc
  709. // Transport defines a transport-like mechanism that wraps every request/response.
  710. Transport TransportFunc
  711. // Connection pool strategy. Can be either LIFO or FIFO (default).
  712. ConnPoolStrategy ConnPoolStrategyType
  713. // StreamResponseBody enables response body streaming
  714. StreamResponseBody bool
  715. lastUseTime uint32
  716. connsLock sync.Mutex
  717. connsCount int
  718. conns []*clientConn
  719. connsWait *wantConnQueue
  720. addrsLock sync.Mutex
  721. addrs []string
  722. addrIdx uint32
  723. tlsConfigMap map[string]*tls.Config
  724. tlsConfigMapLock sync.Mutex
  725. readerPool sync.Pool
  726. writerPool sync.Pool
  727. clientReaderPool *sync.Pool
  728. clientWriterPool *sync.Pool
  729. pendingRequests int32
  730. // pendingClientRequests counts the number of requests that a Client is currently running using this HostClient.
  731. // It will be incremented earlier than pendingRequests and will be used by Client to see if the HostClient is still in use.
  732. pendingClientRequests int32
  733. connsCleanerRun bool
  734. }
  735. type clientConn struct {
  736. c net.Conn
  737. createdTime time.Time
  738. lastUseTime time.Time
  739. }
  740. var startTimeUnix = time.Now().Unix()
  741. // LastUseTime returns time the client was last used
  742. func (c *HostClient) LastUseTime() time.Time {
  743. n := atomic.LoadUint32(&c.lastUseTime)
  744. return time.Unix(startTimeUnix+int64(n), 0)
  745. }
  746. // Get returns the status code and body of url.
  747. //
  748. // The contents of dst will be replaced by the body and returned, if the dst
  749. // is too small a new slice will be allocated.
  750. //
  751. // The function follows redirects. Use Do* for manually handling redirects.
  752. func (c *HostClient) Get(dst []byte, url string) (statusCode int, body []byte, err error) {
  753. return clientGetURL(dst, url, c)
  754. }
  755. // GetTimeout returns the status code and body of url.
  756. //
  757. // The contents of dst will be replaced by the body and returned, if the dst
  758. // is too small a new slice will be allocated.
  759. //
  760. // The function follows redirects. Use Do* for manually handling redirects.
  761. //
  762. // ErrTimeout error is returned if url contents couldn't be fetched
  763. // during the given timeout.
  764. func (c *HostClient) GetTimeout(dst []byte, url string, timeout time.Duration) (statusCode int, body []byte, err error) {
  765. return clientGetURLTimeout(dst, url, timeout, c)
  766. }
  767. // GetDeadline returns the status code and body of url.
  768. //
  769. // The contents of dst will be replaced by the body and returned, if the dst
  770. // is too small a new slice will be allocated.
  771. //
  772. // The function follows redirects. Use Do* for manually handling redirects.
  773. //
  774. // ErrTimeout error is returned if url contents couldn't be fetched
  775. // until the given deadline.
  776. func (c *HostClient) GetDeadline(dst []byte, url string, deadline time.Time) (statusCode int, body []byte, err error) {
  777. return clientGetURLDeadline(dst, url, deadline, c)
  778. }
  779. // Post sends POST request to the given url with the given POST arguments.
  780. //
  781. // The contents of dst will be replaced by the body and returned, if the dst
  782. // is too small a new slice will be allocated.
  783. //
  784. // The function follows redirects. Use Do* for manually handling redirects.
  785. //
  786. // Empty POST body is sent if postArgs is nil.
  787. func (c *HostClient) Post(dst []byte, url string, postArgs *Args) (statusCode int, body []byte, err error) {
  788. return clientPostURL(dst, url, postArgs, c)
  789. }
  790. type clientDoer interface {
  791. Do(req *Request, resp *Response) error
  792. }
  793. func clientGetURL(dst []byte, url string, c clientDoer) (statusCode int, body []byte, err error) {
  794. req := AcquireRequest()
  795. statusCode, body, err = doRequestFollowRedirectsBuffer(req, dst, url, c)
  796. ReleaseRequest(req)
  797. return statusCode, body, err
  798. }
  799. func clientGetURLTimeout(dst []byte, url string, timeout time.Duration, c clientDoer) (statusCode int, body []byte, err error) {
  800. deadline := time.Now().Add(timeout)
  801. return clientGetURLDeadline(dst, url, deadline, c)
  802. }
  803. type clientURLResponse struct {
  804. statusCode int
  805. body []byte
  806. err error
  807. }
  808. func clientGetURLDeadline(dst []byte, url string, deadline time.Time, c clientDoer) (statusCode int, body []byte, err error) {
  809. timeout := time.Until(deadline)
  810. if timeout <= 0 {
  811. return 0, dst, ErrTimeout
  812. }
  813. var ch chan clientURLResponse
  814. chv := clientURLResponseChPool.Get()
  815. if chv == nil {
  816. chv = make(chan clientURLResponse, 1)
  817. }
  818. ch = chv.(chan clientURLResponse)
  819. // Note that the request continues execution on ErrTimeout until
  820. // client-specific ReadTimeout exceeds. This helps limiting load
  821. // on slow hosts by MaxConns* concurrent requests.
  822. //
  823. // Without this 'hack' the load on slow host could exceed MaxConns*
  824. // concurrent requests, since timed out requests on client side
  825. // usually continue execution on the host.
  826. var mu sync.Mutex
  827. var timedout, responded bool
  828. go func() {
  829. req := AcquireRequest()
  830. statusCodeCopy, bodyCopy, errCopy := doRequestFollowRedirectsBuffer(req, dst, url, c)
  831. mu.Lock()
  832. {
  833. if !timedout {
  834. ch <- clientURLResponse{
  835. statusCode: statusCodeCopy,
  836. body: bodyCopy,
  837. err: errCopy,
  838. }
  839. responded = true
  840. }
  841. }
  842. mu.Unlock()
  843. ReleaseRequest(req)
  844. }()
  845. tc := AcquireTimer(timeout)
  846. select {
  847. case resp := <-ch:
  848. statusCode = resp.statusCode
  849. body = resp.body
  850. err = resp.err
  851. case <-tc.C:
  852. mu.Lock()
  853. {
  854. if responded {
  855. resp := <-ch
  856. statusCode = resp.statusCode
  857. body = resp.body
  858. err = resp.err
  859. } else {
  860. timedout = true
  861. err = ErrTimeout
  862. body = dst
  863. }
  864. }
  865. mu.Unlock()
  866. }
  867. ReleaseTimer(tc)
  868. clientURLResponseChPool.Put(chv)
  869. return statusCode, body, err
  870. }
  871. var clientURLResponseChPool sync.Pool
  872. func clientPostURL(dst []byte, url string, postArgs *Args, c clientDoer) (statusCode int, body []byte, err error) {
  873. req := AcquireRequest()
  874. defer ReleaseRequest(req)
  875. req.Header.SetMethod(MethodPost)
  876. req.Header.SetContentTypeBytes(strPostArgsContentType)
  877. if postArgs != nil {
  878. if _, err := postArgs.WriteTo(req.BodyWriter()); err != nil {
  879. return 0, nil, err
  880. }
  881. }
  882. statusCode, body, err = doRequestFollowRedirectsBuffer(req, dst, url, c)
  883. return statusCode, body, err
  884. }
  885. var (
  886. // ErrMissingLocation is returned by clients when the Location header is missing on
  887. // an HTTP response with a redirect status code.
  888. ErrMissingLocation = errors.New("missing Location header for http redirect")
  889. // ErrTooManyRedirects is returned by clients when the number of redirects followed
  890. // exceed the max count.
  891. ErrTooManyRedirects = errors.New("too many redirects detected when doing the request")
  892. // HostClients are only able to follow redirects to the same protocol.
  893. ErrHostClientRedirectToDifferentScheme = errors.New("HostClient can't follow redirects to a different protocol, please use Client instead")
  894. )
  895. const defaultMaxRedirectsCount = 16
  896. func doRequestFollowRedirectsBuffer(req *Request, dst []byte, url string, c clientDoer) (statusCode int, body []byte, err error) {
  897. resp := AcquireResponse()
  898. bodyBuf := resp.bodyBuffer()
  899. resp.keepBodyBuffer = true
  900. oldBody := bodyBuf.B
  901. bodyBuf.B = dst
  902. statusCode, _, err = doRequestFollowRedirects(req, resp, url, defaultMaxRedirectsCount, c)
  903. body = bodyBuf.B
  904. bodyBuf.B = oldBody
  905. resp.keepBodyBuffer = false
  906. ReleaseResponse(resp)
  907. return statusCode, body, err
  908. }
  909. func doRequestFollowRedirects(req *Request, resp *Response, url string, maxRedirectsCount int, c clientDoer) (statusCode int, body []byte, err error) {
  910. redirectsCount := 0
  911. for {
  912. req.SetRequestURI(url)
  913. if err := req.parseURI(); err != nil {
  914. return 0, nil, err
  915. }
  916. if err = c.Do(req, resp); err != nil {
  917. break
  918. }
  919. statusCode = resp.Header.StatusCode()
  920. if !StatusCodeIsRedirect(statusCode) {
  921. break
  922. }
  923. redirectsCount++
  924. if redirectsCount > maxRedirectsCount {
  925. err = ErrTooManyRedirects
  926. break
  927. }
  928. location := resp.Header.peek(strLocation)
  929. if len(location) == 0 {
  930. err = ErrMissingLocation
  931. break
  932. }
  933. url = getRedirectURL(url, location)
  934. }
  935. return statusCode, body, err
  936. }
  937. func getRedirectURL(baseURL string, location []byte) string {
  938. u := AcquireURI()
  939. u.Update(baseURL)
  940. u.UpdateBytes(location)
  941. redirectURL := u.String()
  942. ReleaseURI(u)
  943. return redirectURL
  944. }
  945. // StatusCodeIsRedirect returns true if the status code indicates a redirect.
  946. func StatusCodeIsRedirect(statusCode int) bool {
  947. return statusCode == StatusMovedPermanently ||
  948. statusCode == StatusFound ||
  949. statusCode == StatusSeeOther ||
  950. statusCode == StatusTemporaryRedirect ||
  951. statusCode == StatusPermanentRedirect
  952. }
  953. var (
  954. requestPool sync.Pool
  955. responsePool sync.Pool
  956. )
  957. // AcquireRequest returns an empty Request instance from request pool.
  958. //
  959. // The returned Request instance may be passed to ReleaseRequest when it is
  960. // no longer needed. This allows Request recycling, reduces GC pressure
  961. // and usually improves performance.
  962. func AcquireRequest() *Request {
  963. v := requestPool.Get()
  964. if v == nil {
  965. return &Request{}
  966. }
  967. return v.(*Request)
  968. }
  969. // ReleaseRequest returns req acquired via AcquireRequest to request pool.
  970. //
  971. // It is forbidden accessing req and/or its' members after returning
  972. // it to request pool.
  973. func ReleaseRequest(req *Request) {
  974. req.Reset()
  975. requestPool.Put(req)
  976. }
  977. // AcquireResponse returns an empty Response instance from response pool.
  978. //
  979. // The returned Response instance may be passed to ReleaseResponse when it is
  980. // no longer needed. This allows Response recycling, reduces GC pressure
  981. // and usually improves performance.
  982. func AcquireResponse() *Response {
  983. v := responsePool.Get()
  984. if v == nil {
  985. return &Response{}
  986. }
  987. return v.(*Response)
  988. }
  989. // ReleaseResponse return resp acquired via AcquireResponse to response pool.
  990. //
  991. // It is forbidden accessing resp and/or its' members after returning
  992. // it to response pool.
  993. func ReleaseResponse(resp *Response) {
  994. resp.Reset()
  995. responsePool.Put(resp)
  996. }
  997. // DoTimeout performs the given request and waits for response during
  998. // the given timeout duration.
  999. //
  1000. // Request must contain at least non-zero RequestURI with full url (including
  1001. // scheme and host) or non-zero Host header + RequestURI.
  1002. //
  1003. // The function doesn't follow redirects. Use Get* for following redirects.
  1004. //
  1005. // Response is ignored if resp is nil.
  1006. //
  1007. // ErrTimeout is returned if the response wasn't returned during
  1008. // the given timeout.
  1009. // Immediately returns ErrTimeout if timeout value is negative.
  1010. //
  1011. // ErrNoFreeConns is returned if all HostClient.MaxConns connections
  1012. // to the host are busy.
  1013. //
  1014. // It is recommended obtaining req and resp via AcquireRequest
  1015. // and AcquireResponse in performance-critical code.
  1016. //
  1017. // Warning: DoTimeout does not terminate the request itself. The request will
  1018. // continue in the background and the response will be discarded.
  1019. // If requests take too long and the connection pool gets filled up please
  1020. // try setting a ReadTimeout.
  1021. func (c *HostClient) DoTimeout(req *Request, resp *Response, timeout time.Duration) error {
  1022. req.timeout = timeout
  1023. if req.timeout < 0 {
  1024. return ErrTimeout
  1025. }
  1026. return c.Do(req, resp)
  1027. }
  1028. // DoDeadline performs the given request and waits for response until
  1029. // the given deadline.
  1030. //
  1031. // Request must contain at least non-zero RequestURI with full url (including
  1032. // scheme and host) or non-zero Host header + RequestURI.
  1033. //
  1034. // The function doesn't follow redirects. Use Get* for following redirects.
  1035. //
  1036. // Response is ignored if resp is nil.
  1037. //
  1038. // ErrTimeout is returned if the response wasn't returned until
  1039. // the given deadline.
  1040. // Immediately returns ErrTimeout if the deadline has already been reached.
  1041. //
  1042. // ErrNoFreeConns is returned if all HostClient.MaxConns connections
  1043. // to the host are busy.
  1044. //
  1045. // It is recommended obtaining req and resp via AcquireRequest
  1046. // and AcquireResponse in performance-critical code.
  1047. func (c *HostClient) DoDeadline(req *Request, resp *Response, deadline time.Time) error {
  1048. req.timeout = time.Until(deadline)
  1049. if req.timeout < 0 {
  1050. return ErrTimeout
  1051. }
  1052. return c.Do(req, resp)
  1053. }
  1054. // DoRedirects performs the given http request and fills the given http response,
  1055. // following up to maxRedirectsCount redirects. When the redirect count exceeds
  1056. // maxRedirectsCount, ErrTooManyRedirects is returned.
  1057. //
  1058. // Request must contain at least non-zero RequestURI with full url (including
  1059. // scheme and host) or non-zero Host header + RequestURI.
  1060. //
  1061. // Client determines the server to be requested in the following order:
  1062. //
  1063. // - from RequestURI if it contains full url with scheme and host;
  1064. // - from Host header otherwise.
  1065. //
  1066. // Response is ignored if resp is nil.
  1067. //
  1068. // ErrNoFreeConns is returned if all DefaultMaxConnsPerHost connections
  1069. // to the requested host are busy.
  1070. //
  1071. // It is recommended obtaining req and resp via AcquireRequest
  1072. // and AcquireResponse in performance-critical code.
  1073. func (c *HostClient) DoRedirects(req *Request, resp *Response, maxRedirectsCount int) error {
  1074. _, _, err := doRequestFollowRedirects(req, resp, req.URI().String(), maxRedirectsCount, c)
  1075. return err
  1076. }
  1077. // Do performs the given http request and sets the corresponding response.
  1078. //
  1079. // Request must contain at least non-zero RequestURI with full url (including
  1080. // scheme and host) or non-zero Host header + RequestURI.
  1081. //
  1082. // The function doesn't follow redirects. Use Get* for following redirects.
  1083. //
  1084. // Response is ignored if resp is nil.
  1085. //
  1086. // ErrNoFreeConns is returned if all HostClient.MaxConns connections
  1087. // to the host are busy.
  1088. //
  1089. // It is recommended obtaining req and resp via AcquireRequest
  1090. // and AcquireResponse in performance-critical code.
  1091. func (c *HostClient) Do(req *Request, resp *Response) error {
  1092. var err error
  1093. var retry bool
  1094. maxAttempts := c.MaxIdemponentCallAttempts
  1095. if maxAttempts <= 0 {
  1096. maxAttempts = DefaultMaxIdemponentCallAttempts
  1097. }
  1098. isRequestRetryable := isIdempotent
  1099. if c.RetryIf != nil {
  1100. isRequestRetryable = c.RetryIf
  1101. }
  1102. attempts := 0
  1103. hasBodyStream := req.IsBodyStream()
  1104. atomic.AddInt32(&c.pendingRequests, 1)
  1105. for {
  1106. retry, err = c.do(req, resp)
  1107. if err == nil || !retry {
  1108. break
  1109. }
  1110. if hasBodyStream {
  1111. break
  1112. }
  1113. if !isRequestRetryable(req) {
  1114. // Retry non-idempotent requests if the server closes
  1115. // the connection before sending the response.
  1116. //
  1117. // This case is possible if the server closes the idle
  1118. // keep-alive connection on timeout.
  1119. //
  1120. // Apache and nginx usually do this.
  1121. if err != io.EOF {
  1122. break
  1123. }
  1124. }
  1125. attempts++
  1126. if attempts >= maxAttempts {
  1127. break
  1128. }
  1129. }
  1130. atomic.AddInt32(&c.pendingRequests, -1)
  1131. if err == io.EOF {
  1132. err = ErrConnectionClosed
  1133. }
  1134. return err
  1135. }
  1136. // PendingRequests returns the current number of requests the client
  1137. // is executing.
  1138. //
  1139. // This function may be used for balancing load among multiple HostClient
  1140. // instances.
  1141. func (c *HostClient) PendingRequests() int {
  1142. return int(atomic.LoadInt32(&c.pendingRequests))
  1143. }
  1144. func isIdempotent(req *Request) bool {
  1145. return req.Header.IsGet() || req.Header.IsHead() || req.Header.IsPut()
  1146. }
  1147. func (c *HostClient) do(req *Request, resp *Response) (bool, error) {
  1148. if resp == nil {
  1149. resp = AcquireResponse()
  1150. defer ReleaseResponse(resp)
  1151. }
  1152. ok, err := c.doNonNilReqResp(req, resp)
  1153. return ok, err
  1154. }
  1155. func (c *HostClient) doNonNilReqResp(req *Request, resp *Response) (bool, error) {
  1156. if req == nil {
  1157. // for debugging purposes
  1158. panic("BUG: req cannot be nil")
  1159. }
  1160. if resp == nil {
  1161. // for debugging purposes
  1162. panic("BUG: resp cannot be nil")
  1163. }
  1164. // Secure header error logs configuration
  1165. resp.secureErrorLogMessage = c.SecureErrorLogMessage
  1166. resp.Header.secureErrorLogMessage = c.SecureErrorLogMessage
  1167. req.secureErrorLogMessage = c.SecureErrorLogMessage
  1168. req.Header.secureErrorLogMessage = c.SecureErrorLogMessage
  1169. if c.IsTLS != req.URI().isHTTPS() {
  1170. return false, ErrHostClientRedirectToDifferentScheme
  1171. }
  1172. atomic.StoreUint32(&c.lastUseTime, uint32(time.Now().Unix()-startTimeUnix))
  1173. // Free up resources occupied by response before sending the request,
  1174. // so the GC may reclaim these resources (e.g. response body).
  1175. // backing up SkipBody in case it was set explicitly
  1176. customSkipBody := resp.SkipBody
  1177. customStreamBody := resp.StreamBody || c.StreamResponseBody
  1178. resp.Reset()
  1179. resp.SkipBody = customSkipBody
  1180. resp.StreamBody = customStreamBody
  1181. req.URI().DisablePathNormalizing = c.DisablePathNormalizing
  1182. userAgentOld := req.Header.UserAgent()
  1183. if len(userAgentOld) == 0 {
  1184. userAgent := c.Name
  1185. if userAgent == "" && !c.NoDefaultUserAgentHeader {
  1186. userAgent = defaultUserAgent
  1187. }
  1188. if userAgent != "" {
  1189. req.Header.userAgent = append(req.Header.userAgent[:], userAgent...)
  1190. }
  1191. }
  1192. if c.Transport != nil {
  1193. err := c.Transport(req, resp)
  1194. return err == nil, err
  1195. }
  1196. var deadline time.Time
  1197. if req.timeout > 0 {
  1198. deadline = time.Now().Add(req.timeout)
  1199. }
  1200. cc, err := c.acquireConn(req.timeout, req.ConnectionClose())
  1201. if err != nil {
  1202. return false, err
  1203. }
  1204. conn := cc.c
  1205. resp.parseNetConn(conn)
  1206. writeDeadline := deadline
  1207. if c.WriteTimeout > 0 {
  1208. tmpWriteDeadline := time.Now().Add(c.WriteTimeout)
  1209. if writeDeadline.IsZero() || tmpWriteDeadline.Before(writeDeadline) {
  1210. writeDeadline = tmpWriteDeadline
  1211. }
  1212. }
  1213. if !writeDeadline.IsZero() {
  1214. // Set Deadline every time, since golang has fixed the performance issue
  1215. // See https://github.com/golang/go/issues/15133#issuecomment-271571395 for details
  1216. if err = conn.SetWriteDeadline(writeDeadline); err != nil {
  1217. c.closeConn(cc)
  1218. return true, err
  1219. }
  1220. }
  1221. resetConnection := false
  1222. if c.MaxConnDuration > 0 && time.Since(cc.createdTime) > c.MaxConnDuration && !req.ConnectionClose() {
  1223. req.SetConnectionClose()
  1224. resetConnection = true
  1225. }
  1226. bw := c.acquireWriter(conn)
  1227. err = req.Write(bw)
  1228. if resetConnection {
  1229. req.Header.ResetConnectionClose()
  1230. }
  1231. if err == nil {
  1232. err = bw.Flush()
  1233. }
  1234. c.releaseWriter(bw)
  1235. // Return ErrTimeout on any timeout.
  1236. if x, ok := err.(interface{ Timeout() bool }); ok && x.Timeout() {
  1237. err = ErrTimeout
  1238. }
  1239. isConnRST := isConnectionReset(err)
  1240. if err != nil && !isConnRST {
  1241. c.closeConn(cc)
  1242. return true, err
  1243. }
  1244. readDeadline := deadline
  1245. if c.ReadTimeout > 0 {
  1246. tmpReadDeadline := time.Now().Add(c.ReadTimeout)
  1247. if readDeadline.IsZero() || tmpReadDeadline.Before(readDeadline) {
  1248. readDeadline = tmpReadDeadline
  1249. }
  1250. }
  1251. if !readDeadline.IsZero() {
  1252. // Set Deadline every time, since golang has fixed the performance issue
  1253. // See https://github.com/golang/go/issues/15133#issuecomment-271571395 for details
  1254. if err = conn.SetReadDeadline(readDeadline); err != nil {
  1255. c.closeConn(cc)
  1256. return true, err
  1257. }
  1258. }
  1259. if customSkipBody || req.Header.IsHead() {
  1260. resp.SkipBody = true
  1261. }
  1262. if c.DisableHeaderNamesNormalizing {
  1263. resp.Header.DisableNormalizing()
  1264. }
  1265. br := c.acquireReader(conn)
  1266. err = resp.ReadLimitBody(br, c.MaxResponseBodySize)
  1267. c.releaseReader(br)
  1268. if err != nil {
  1269. c.closeConn(cc)
  1270. // Don't retry in case of ErrBodyTooLarge since we will just get the same again.
  1271. retry := err != ErrBodyTooLarge
  1272. return retry, err
  1273. }
  1274. closeConn := resetConnection || req.ConnectionClose() || resp.ConnectionClose() || isConnRST
  1275. if customStreamBody && resp.bodyStream != nil {
  1276. rbs := resp.bodyStream
  1277. resp.bodyStream = newCloseReader(rbs, func() error {
  1278. if r, ok := rbs.(*requestStream); ok {
  1279. releaseRequestStream(r)
  1280. }
  1281. if closeConn {
  1282. c.closeConn(cc)
  1283. } else {
  1284. c.releaseConn(cc)
  1285. }
  1286. return nil
  1287. })
  1288. return false, nil
  1289. }
  1290. if closeConn {
  1291. c.closeConn(cc)
  1292. } else {
  1293. c.releaseConn(cc)
  1294. }
  1295. return false, nil
  1296. }
  1297. var (
  1298. // ErrNoFreeConns is returned when no free connections available
  1299. // to the given host.
  1300. //
  1301. // Increase the allowed number of connections per host if you
  1302. // see this error.
  1303. ErrNoFreeConns = errors.New("no free connections available to host")
  1304. // ErrConnectionClosed may be returned from client methods if the server
  1305. // closes connection before returning the first response byte.
  1306. //
  1307. // If you see this error, then either fix the server by returning
  1308. // 'Connection: close' response header before closing the connection
  1309. // or add 'Connection: close' request header before sending requests
  1310. // to broken server.
  1311. ErrConnectionClosed = errors.New("the server closed connection before returning the first response byte. " +
  1312. "Make sure the server returns 'Connection: close' response header before closing the connection")
  1313. // ErrConnPoolStrategyNotImpl is returned when HostClient.ConnPoolStrategy is not implement yet.
  1314. // If you see this error, then you need to check your HostClient configuration.
  1315. ErrConnPoolStrategyNotImpl = errors.New("connection pool strategy is not implement")
  1316. )
  1317. type timeoutError struct{}
  1318. func (e *timeoutError) Error() string {
  1319. return "timeout"
  1320. }
  1321. // Only implement the Timeout() function of the net.Error interface.
  1322. // This allows for checks like:
  1323. //
  1324. // if x, ok := err.(interface{ Timeout() bool }); ok && x.Timeout() {
  1325. func (e *timeoutError) Timeout() bool {
  1326. return true
  1327. }
  1328. // ErrTimeout is returned from timed out calls.
  1329. var ErrTimeout = &timeoutError{}
  1330. // SetMaxConns sets up the maximum number of connections which may be established to all hosts listed in Addr.
  1331. func (c *HostClient) SetMaxConns(newMaxConns int) {
  1332. c.connsLock.Lock()
  1333. c.MaxConns = newMaxConns
  1334. c.connsLock.Unlock()
  1335. }
  1336. func (c *HostClient) acquireConn(reqTimeout time.Duration, connectionClose bool) (cc *clientConn, err error) {
  1337. createConn := false
  1338. startCleaner := false
  1339. var n int
  1340. c.connsLock.Lock()
  1341. n = len(c.conns)
  1342. if n == 0 {
  1343. maxConns := c.MaxConns
  1344. if maxConns <= 0 {
  1345. maxConns = DefaultMaxConnsPerHost
  1346. }
  1347. if c.connsCount < maxConns {
  1348. c.connsCount++
  1349. createConn = true
  1350. if !c.connsCleanerRun && !connectionClose {
  1351. startCleaner = true
  1352. c.connsCleanerRun = true
  1353. }
  1354. }
  1355. } else {
  1356. switch c.ConnPoolStrategy {
  1357. case LIFO:
  1358. n--
  1359. cc = c.conns[n]
  1360. c.conns[n] = nil
  1361. c.conns = c.conns[:n]
  1362. case FIFO:
  1363. cc = c.conns[0]
  1364. copy(c.conns, c.conns[1:])
  1365. c.conns[n-1] = nil
  1366. c.conns = c.conns[:n-1]
  1367. default:
  1368. c.connsLock.Unlock()
  1369. return nil, ErrConnPoolStrategyNotImpl
  1370. }
  1371. }
  1372. c.connsLock.Unlock()
  1373. if cc != nil {
  1374. return cc, nil
  1375. }
  1376. if !createConn {
  1377. if c.MaxConnWaitTimeout <= 0 {
  1378. return nil, ErrNoFreeConns
  1379. }
  1380. // reqTimeout c.MaxConnWaitTimeout wait duration
  1381. // d1 d2 min(d1, d2)
  1382. // 0(not set) d2 d2
  1383. // d1 0(don't wait) 0(don't wait)
  1384. // 0(not set) d2 d2
  1385. timeout := c.MaxConnWaitTimeout
  1386. timeoutOverridden := false
  1387. // reqTimeout == 0 means not set
  1388. if reqTimeout > 0 && reqTimeout < timeout {
  1389. timeout = reqTimeout
  1390. timeoutOverridden = true
  1391. }
  1392. // wait for a free connection
  1393. tc := AcquireTimer(timeout)
  1394. defer ReleaseTimer(tc)
  1395. w := &wantConn{
  1396. ready: make(chan struct{}, 1),
  1397. }
  1398. defer func() {
  1399. if err != nil {
  1400. w.cancel(c, err)
  1401. }
  1402. }()
  1403. c.queueForIdle(w)
  1404. select {
  1405. case <-w.ready:
  1406. return w.conn, w.err
  1407. case <-tc.C:
  1408. if timeoutOverridden {
  1409. return nil, ErrTimeout
  1410. }
  1411. return nil, ErrNoFreeConns
  1412. }
  1413. }
  1414. if startCleaner {
  1415. go c.connsCleaner()
  1416. }
  1417. conn, err := c.dialHostHard(reqTimeout)
  1418. if err != nil {
  1419. c.decConnsCount()
  1420. return nil, err
  1421. }
  1422. cc = acquireClientConn(conn)
  1423. return cc, nil
  1424. }
  1425. func (c *HostClient) queueForIdle(w *wantConn) {
  1426. c.connsLock.Lock()
  1427. defer c.connsLock.Unlock()
  1428. if c.connsWait == nil {
  1429. c.connsWait = &wantConnQueue{}
  1430. }
  1431. c.connsWait.clearFront()
  1432. c.connsWait.pushBack(w)
  1433. }
  1434. func (c *HostClient) dialConnFor(w *wantConn) {
  1435. conn, err := c.dialHostHard(0)
  1436. if err != nil {
  1437. w.tryDeliver(nil, err)
  1438. c.decConnsCount()
  1439. return
  1440. }
  1441. cc := acquireClientConn(conn)
  1442. if !w.tryDeliver(cc, nil) {
  1443. // not delivered, return idle connection
  1444. c.releaseConn(cc)
  1445. }
  1446. }
  1447. // CloseIdleConnections closes any connections which were previously
  1448. // connected from previous requests but are now sitting idle in a
  1449. // "keep-alive" state. It does not interrupt any connections currently
  1450. // in use.
  1451. func (c *HostClient) CloseIdleConnections() {
  1452. c.connsLock.Lock()
  1453. scratch := append([]*clientConn{}, c.conns...)
  1454. for i := range c.conns {
  1455. c.conns[i] = nil
  1456. }
  1457. c.conns = c.conns[:0]
  1458. c.connsLock.Unlock()
  1459. for _, cc := range scratch {
  1460. c.closeConn(cc)
  1461. }
  1462. }
  1463. func (c *HostClient) connsCleaner() {
  1464. var (
  1465. scratch []*clientConn
  1466. maxIdleConnDuration = c.MaxIdleConnDuration
  1467. )
  1468. if maxIdleConnDuration <= 0 {
  1469. maxIdleConnDuration = DefaultMaxIdleConnDuration
  1470. }
  1471. for {
  1472. currentTime := time.Now()
  1473. // Determine idle connections to be closed.
  1474. c.connsLock.Lock()
  1475. conns := c.conns
  1476. n := len(conns)
  1477. i := 0
  1478. for i < n && currentTime.Sub(conns[i].lastUseTime) > maxIdleConnDuration {
  1479. i++
  1480. }
  1481. sleepFor := maxIdleConnDuration
  1482. if i < n {
  1483. // + 1 so we actually sleep past the expiration time and not up to it.
  1484. // Otherwise the > check above would still fail.
  1485. sleepFor = maxIdleConnDuration - currentTime.Sub(conns[i].lastUseTime) + 1
  1486. }
  1487. scratch = append(scratch[:0], conns[:i]...)
  1488. if i > 0 {
  1489. m := copy(conns, conns[i:])
  1490. for i = m; i < n; i++ {
  1491. conns[i] = nil
  1492. }
  1493. c.conns = conns[:m]
  1494. }
  1495. c.connsLock.Unlock()
  1496. // Close idle connections.
  1497. for i, cc := range scratch {
  1498. c.closeConn(cc)
  1499. scratch[i] = nil
  1500. }
  1501. // Determine whether to stop the connsCleaner.
  1502. c.connsLock.Lock()
  1503. mustStop := c.connsCount == 0
  1504. if mustStop {
  1505. c.connsCleanerRun = false
  1506. }
  1507. c.connsLock.Unlock()
  1508. if mustStop {
  1509. break
  1510. }
  1511. time.Sleep(sleepFor)
  1512. }
  1513. }
  1514. func (c *HostClient) closeConn(cc *clientConn) {
  1515. c.decConnsCount()
  1516. cc.c.Close()
  1517. releaseClientConn(cc)
  1518. }
  1519. func (c *HostClient) decConnsCount() {
  1520. if c.MaxConnWaitTimeout <= 0 {
  1521. c.connsLock.Lock()
  1522. c.connsCount--
  1523. c.connsLock.Unlock()
  1524. return
  1525. }
  1526. c.connsLock.Lock()
  1527. defer c.connsLock.Unlock()
  1528. dialed := false
  1529. if q := c.connsWait; q != nil && q.len() > 0 {
  1530. for q.len() > 0 {
  1531. w := q.popFront()
  1532. if w.waiting() {
  1533. go c.dialConnFor(w)
  1534. dialed = true
  1535. break
  1536. }
  1537. }
  1538. }
  1539. if !dialed {
  1540. c.connsCount--
  1541. }
  1542. }
  1543. // ConnsCount returns connection count of HostClient
  1544. func (c *HostClient) ConnsCount() int {
  1545. c.connsLock.Lock()
  1546. defer c.connsLock.Unlock()
  1547. return c.connsCount
  1548. }
  1549. func acquireClientConn(conn net.Conn) *clientConn {
  1550. v := clientConnPool.Get()
  1551. if v == nil {
  1552. v = &clientConn{}
  1553. }
  1554. cc := v.(*clientConn)
  1555. cc.c = conn
  1556. cc.createdTime = time.Now()
  1557. return cc
  1558. }
  1559. func releaseClientConn(cc *clientConn) {
  1560. // Reset all fields.
  1561. *cc = clientConn{}
  1562. clientConnPool.Put(cc)
  1563. }
  1564. var clientConnPool sync.Pool
  1565. func (c *HostClient) releaseConn(cc *clientConn) {
  1566. cc.lastUseTime = time.Now()
  1567. if c.MaxConnWaitTimeout <= 0 {
  1568. c.connsLock.Lock()
  1569. c.conns = append(c.conns, cc)
  1570. c.connsLock.Unlock()
  1571. return
  1572. }
  1573. // try to deliver an idle connection to a *wantConn
  1574. c.connsLock.Lock()
  1575. defer c.connsLock.Unlock()
  1576. delivered := false
  1577. if q := c.connsWait; q != nil && q.len() > 0 {
  1578. for q.len() > 0 {
  1579. w := q.popFront()
  1580. if w.waiting() {
  1581. delivered = w.tryDeliver(cc, nil)
  1582. break
  1583. }
  1584. }
  1585. }
  1586. if !delivered {
  1587. c.conns = append(c.conns, cc)
  1588. }
  1589. }
  1590. func (c *HostClient) acquireWriter(conn net.Conn) *bufio.Writer {
  1591. var v interface{}
  1592. if c.clientWriterPool != nil {
  1593. v = c.clientWriterPool.Get()
  1594. if v == nil {
  1595. n := c.WriteBufferSize
  1596. if n <= 0 {
  1597. n = defaultWriteBufferSize
  1598. }
  1599. return bufio.NewWriterSize(conn, n)
  1600. }
  1601. } else {
  1602. v = c.writerPool.Get()
  1603. if v == nil {
  1604. n := c.WriteBufferSize
  1605. if n <= 0 {
  1606. n = defaultWriteBufferSize
  1607. }
  1608. return bufio.NewWriterSize(conn, n)
  1609. }
  1610. }
  1611. bw := v.(*bufio.Writer)
  1612. bw.Reset(conn)
  1613. return bw
  1614. }
  1615. func (c *HostClient) releaseWriter(bw *bufio.Writer) {
  1616. if c.clientWriterPool != nil {
  1617. c.clientWriterPool.Put(bw)
  1618. } else {
  1619. c.writerPool.Put(bw)
  1620. }
  1621. }
  1622. func (c *HostClient) acquireReader(conn net.Conn) *bufio.Reader {
  1623. var v interface{}
  1624. if c.clientReaderPool != nil {
  1625. v = c.clientReaderPool.Get()
  1626. if v == nil {
  1627. n := c.ReadBufferSize
  1628. if n <= 0 {
  1629. n = defaultReadBufferSize
  1630. }
  1631. return bufio.NewReaderSize(conn, n)
  1632. }
  1633. } else {
  1634. v = c.readerPool.Get()
  1635. if v == nil {
  1636. n := c.ReadBufferSize
  1637. if n <= 0 {
  1638. n = defaultReadBufferSize
  1639. }
  1640. return bufio.NewReaderSize(conn, n)
  1641. }
  1642. }
  1643. br := v.(*bufio.Reader)
  1644. br.Reset(conn)
  1645. return br
  1646. }
  1647. func (c *HostClient) releaseReader(br *bufio.Reader) {
  1648. if c.clientReaderPool != nil {
  1649. c.clientReaderPool.Put(br)
  1650. } else {
  1651. c.readerPool.Put(br)
  1652. }
  1653. }
  1654. func newClientTLSConfig(c *tls.Config, addr string) *tls.Config {
  1655. if c == nil {
  1656. c = &tls.Config{}
  1657. } else {
  1658. c = c.Clone()
  1659. }
  1660. if len(c.ServerName) == 0 {
  1661. serverName := tlsServerName(addr)
  1662. if serverName == "*" {
  1663. c.InsecureSkipVerify = true
  1664. } else {
  1665. c.ServerName = serverName
  1666. }
  1667. }
  1668. return c
  1669. }
  1670. func tlsServerName(addr string) string {
  1671. if !strings.Contains(addr, ":") {
  1672. return addr
  1673. }
  1674. host, _, err := net.SplitHostPort(addr)
  1675. if err != nil {
  1676. return "*"
  1677. }
  1678. return host
  1679. }
  1680. func (c *HostClient) nextAddr() string {
  1681. c.addrsLock.Lock()
  1682. if c.addrs == nil {
  1683. c.addrs = strings.Split(c.Addr, ",")
  1684. }
  1685. addr := c.addrs[0]
  1686. if len(c.addrs) > 1 {
  1687. addr = c.addrs[c.addrIdx%uint32(len(c.addrs))]
  1688. c.addrIdx++
  1689. }
  1690. c.addrsLock.Unlock()
  1691. return addr
  1692. }
  1693. func (c *HostClient) dialHostHard(dialTimeout time.Duration) (conn net.Conn, err error) {
  1694. // use dialTimeout to control the timeout of each dial. It does not work if dialTimeout is 0 or dial has been set.
  1695. // attempt to dial all the available hosts before giving up.
  1696. c.addrsLock.Lock()
  1697. n := len(c.addrs)
  1698. c.addrsLock.Unlock()
  1699. if n == 0 {
  1700. // It looks like c.addrs isn't initialized yet.
  1701. n = 1
  1702. }
  1703. dial := c.Dial
  1704. if dialTimeout != 0 && dial == nil {
  1705. dial = func(addr string) (net.Conn, error) {
  1706. return DialTimeout(addr, dialTimeout)
  1707. }
  1708. }
  1709. timeout := c.ReadTimeout + c.WriteTimeout
  1710. if timeout <= 0 {
  1711. timeout = DefaultDialTimeout
  1712. }
  1713. deadline := time.Now().Add(timeout)
  1714. for n > 0 {
  1715. addr := c.nextAddr()
  1716. tlsConfig := c.cachedTLSConfig(addr)
  1717. conn, err = dialAddr(addr, dial, c.DialDualStack, c.IsTLS, tlsConfig, c.WriteTimeout)
  1718. if err == nil {
  1719. return conn, nil
  1720. }
  1721. if time.Since(deadline) >= 0 {
  1722. break
  1723. }
  1724. n--
  1725. }
  1726. return nil, err
  1727. }
  1728. func (c *HostClient) cachedTLSConfig(addr string) *tls.Config {
  1729. if !c.IsTLS {
  1730. return nil
  1731. }
  1732. c.tlsConfigMapLock.Lock()
  1733. if c.tlsConfigMap == nil {
  1734. c.tlsConfigMap = make(map[string]*tls.Config)
  1735. }
  1736. cfg := c.tlsConfigMap[addr]
  1737. if cfg == nil {
  1738. cfg = newClientTLSConfig(c.TLSConfig, addr)
  1739. c.tlsConfigMap[addr] = cfg
  1740. }
  1741. c.tlsConfigMapLock.Unlock()
  1742. return cfg
  1743. }
  1744. // ErrTLSHandshakeTimeout indicates there is a timeout from tls handshake.
  1745. var ErrTLSHandshakeTimeout = errors.New("tls handshake timed out")
  1746. func tlsClientHandshake(rawConn net.Conn, tlsConfig *tls.Config, deadline time.Time) (_ net.Conn, retErr error) {
  1747. defer func() {
  1748. if retErr != nil {
  1749. rawConn.Close()
  1750. }
  1751. }()
  1752. conn := tls.Client(rawConn, tlsConfig)
  1753. err := conn.SetDeadline(deadline)
  1754. if err != nil {
  1755. return nil, err
  1756. }
  1757. err = conn.Handshake()
  1758. if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
  1759. return nil, ErrTLSHandshakeTimeout
  1760. }
  1761. if err != nil {
  1762. return nil, err
  1763. }
  1764. err = conn.SetDeadline(time.Time{})
  1765. if err != nil {
  1766. return nil, err
  1767. }
  1768. return conn, nil
  1769. }
  1770. func dialAddr(addr string, dial DialFunc, dialDualStack, isTLS bool, tlsConfig *tls.Config, timeout time.Duration) (net.Conn, error) {
  1771. deadline := time.Now().Add(timeout)
  1772. if dial == nil {
  1773. if dialDualStack {
  1774. dial = DialDualStack
  1775. } else {
  1776. dial = Dial
  1777. }
  1778. addr = AddMissingPort(addr, isTLS)
  1779. }
  1780. conn, err := dial(addr)
  1781. if err != nil {
  1782. return nil, err
  1783. }
  1784. if conn == nil {
  1785. return nil, errors.New("dialling unsuccessful. Please report this bug!")
  1786. }
  1787. // We assume that any conn that has the Handshake() method is a TLS conn already.
  1788. // This doesn't cover just tls.Conn but also other TLS implementations.
  1789. _, isTLSAlready := conn.(interface{ Handshake() error })
  1790. if isTLS && !isTLSAlready {
  1791. if timeout == 0 {
  1792. return tls.Client(conn, tlsConfig), nil
  1793. }
  1794. return tlsClientHandshake(conn, tlsConfig, deadline)
  1795. }
  1796. return conn, nil
  1797. }
  1798. // AddMissingPort adds a port to a host if it is missing.
  1799. // A literal IPv6 address in hostport must be enclosed in square
  1800. // brackets, as in "[::1]:80", "[::1%lo0]:80".
  1801. func AddMissingPort(addr string, isTLS bool) string {
  1802. addrLen := len(addr)
  1803. if addrLen == 0 {
  1804. return addr
  1805. }
  1806. isIP6 := addr[0] == '['
  1807. if isIP6 {
  1808. // if the IPv6 has opening bracket but closing bracket is the last char then it doesn't have a port
  1809. isIP6WithoutPort := addr[addrLen-1] == ']'
  1810. if !isIP6WithoutPort {
  1811. return addr
  1812. }
  1813. } else { // IPv4
  1814. columnPos := strings.LastIndexByte(addr, ':')
  1815. if columnPos > 0 {
  1816. return addr
  1817. }
  1818. }
  1819. port := ":80"
  1820. if isTLS {
  1821. port = ":443"
  1822. }
  1823. return addr + port
  1824. }
  1825. // A wantConn records state about a wanted connection
  1826. // (that is, an active call to getConn).
  1827. // The conn may be gotten by dialing or by finding an idle connection,
  1828. // or a cancellation may make the conn no longer wanted.
  1829. // These three options are racing against each other and use
  1830. // wantConn to coordinate and agree about the winning outcome.
  1831. //
  1832. // inspired by net/http/transport.go
  1833. type wantConn struct {
  1834. ready chan struct{}
  1835. mu sync.Mutex // protects conn, err, close(ready)
  1836. conn *clientConn
  1837. err error
  1838. }
  1839. // waiting reports whether w is still waiting for an answer (connection or error).
  1840. func (w *wantConn) waiting() bool {
  1841. select {
  1842. case <-w.ready:
  1843. return false
  1844. default:
  1845. return true
  1846. }
  1847. }
  1848. // tryDeliver attempts to deliver conn, err to w and reports whether it succeeded.
  1849. func (w *wantConn) tryDeliver(conn *clientConn, err error) bool {
  1850. w.mu.Lock()
  1851. defer w.mu.Unlock()
  1852. if w.conn != nil || w.err != nil {
  1853. return false
  1854. }
  1855. w.conn = conn
  1856. w.err = err
  1857. if w.conn == nil && w.err == nil {
  1858. panic("fasthttp: internal error: misuse of tryDeliver")
  1859. }
  1860. close(w.ready)
  1861. return true
  1862. }
  1863. // cancel marks w as no longer wanting a result (for example, due to cancellation).
  1864. // If a connection has been delivered already, cancel returns it with c.releaseConn.
  1865. func (w *wantConn) cancel(c *HostClient, err error) {
  1866. w.mu.Lock()
  1867. if w.conn == nil && w.err == nil {
  1868. close(w.ready) // catch misbehavior in future delivery
  1869. }
  1870. conn := w.conn
  1871. w.conn = nil
  1872. w.err = err
  1873. w.mu.Unlock()
  1874. if conn != nil {
  1875. c.releaseConn(conn)
  1876. }
  1877. }
  1878. // A wantConnQueue is a queue of wantConns.
  1879. //
  1880. // inspired by net/http/transport.go
  1881. type wantConnQueue struct {
  1882. // This is a queue, not a deque.
  1883. // It is split into two stages - head[headPos:] and tail.
  1884. // popFront is trivial (headPos++) on the first stage, and
  1885. // pushBack is trivial (append) on the second stage.
  1886. // If the first stage is empty, popFront can swap the
  1887. // first and second stages to remedy the situation.
  1888. //
  1889. // This two-stage split is analogous to the use of two lists
  1890. // in Okasaki's purely functional queue but without the
  1891. // overhead of reversing the list when swapping stages.
  1892. head []*wantConn
  1893. headPos int
  1894. tail []*wantConn
  1895. }
  1896. // len returns the number of items in the queue.
  1897. func (q *wantConnQueue) len() int {
  1898. return len(q.head) - q.headPos + len(q.tail)
  1899. }
  1900. // pushBack adds w to the back of the queue.
  1901. func (q *wantConnQueue) pushBack(w *wantConn) {
  1902. q.tail = append(q.tail, w)
  1903. }
  1904. // popFront removes and returns the wantConn at the front of the queue.
  1905. func (q *wantConnQueue) popFront() *wantConn {
  1906. if q.headPos >= len(q.head) {
  1907. if len(q.tail) == 0 {
  1908. return nil
  1909. }
  1910. // Pick up tail as new head, clear tail.
  1911. q.head, q.headPos, q.tail = q.tail, 0, q.head[:0]
  1912. }
  1913. w := q.head[q.headPos]
  1914. q.head[q.headPos] = nil
  1915. q.headPos++
  1916. return w
  1917. }
  1918. // peekFront returns the wantConn at the front of the queue without removing it.
  1919. func (q *wantConnQueue) peekFront() *wantConn {
  1920. if q.headPos < len(q.head) {
  1921. return q.head[q.headPos]
  1922. }
  1923. if len(q.tail) > 0 {
  1924. return q.tail[0]
  1925. }
  1926. return nil
  1927. }
  1928. // clearFront pops any wantConns that are no longer waiting from the head of the
  1929. // queue, reporting whether any were popped.
  1930. func (q *wantConnQueue) clearFront() (cleaned bool) {
  1931. for {
  1932. w := q.peekFront()
  1933. if w == nil || w.waiting() {
  1934. return cleaned
  1935. }
  1936. q.popFront()
  1937. cleaned = true
  1938. }
  1939. }
  1940. // PipelineClient pipelines requests over a limited set of concurrent
  1941. // connections to the given Addr.
  1942. //
  1943. // This client may be used in highly loaded HTTP-based RPC systems for reducing
  1944. // context switches and network level overhead.
  1945. // See https://en.wikipedia.org/wiki/HTTP_pipelining for details.
  1946. //
  1947. // It is forbidden copying PipelineClient instances. Create new instances
  1948. // instead.
  1949. //
  1950. // It is safe calling PipelineClient methods from concurrently running
  1951. // goroutines.
  1952. type PipelineClient struct {
  1953. noCopy noCopy
  1954. // Address of the host to connect to.
  1955. Addr string
  1956. // PipelineClient name. Used in User-Agent request header.
  1957. Name string
  1958. // NoDefaultUserAgentHeader when set to true, causes the default
  1959. // User-Agent header to be excluded from the Request.
  1960. NoDefaultUserAgentHeader bool
  1961. // The maximum number of concurrent connections to the Addr.
  1962. //
  1963. // A single connection is used by default.
  1964. MaxConns int
  1965. // The maximum number of pending pipelined requests over
  1966. // a single connection to Addr.
  1967. //
  1968. // DefaultMaxPendingRequests is used by default.
  1969. MaxPendingRequests int
  1970. // The maximum delay before sending pipelined requests as a batch
  1971. // to the server.
  1972. //
  1973. // By default requests are sent immediately to the server.
  1974. MaxBatchDelay time.Duration
  1975. // Callback for connection establishing to the host.
  1976. //
  1977. // Default Dial is used if not set.
  1978. Dial DialFunc
  1979. // Attempt to connect to both ipv4 and ipv6 host addresses
  1980. // if set to true.
  1981. //
  1982. // This option is used only if default TCP dialer is used,
  1983. // i.e. if Dial is blank.
  1984. //
  1985. // By default client connects only to ipv4 addresses,
  1986. // since unfortunately ipv6 remains broken in many networks worldwide :)
  1987. DialDualStack bool
  1988. // Response header names are passed as-is without normalization
  1989. // if this option is set.
  1990. //
  1991. // Disabled header names' normalization may be useful only for proxying
  1992. // responses to other clients expecting case-sensitive
  1993. // header names. See https://github.com/valyala/fasthttp/issues/57
  1994. // for details.
  1995. //
  1996. // By default request and response header names are normalized, i.e.
  1997. // The first letter and the first letters following dashes
  1998. // are uppercased, while all the other letters are lowercased.
  1999. // Examples:
  2000. //
  2001. // * HOST -> Host
  2002. // * content-type -> Content-Type
  2003. // * cONTENT-lenGTH -> Content-Length
  2004. DisableHeaderNamesNormalizing bool
  2005. // Path values are sent as-is without normalization
  2006. //
  2007. // Disabled path normalization may be useful for proxying incoming requests
  2008. // to servers that are expecting paths to be forwarded as-is.
  2009. //
  2010. // By default path values are normalized, i.e.
  2011. // extra slashes are removed, special characters are encoded.
  2012. DisablePathNormalizing bool
  2013. // Whether to use TLS (aka SSL or HTTPS) for host connections.
  2014. IsTLS bool
  2015. // Optional TLS config.
  2016. TLSConfig *tls.Config
  2017. // Idle connection to the host is closed after this duration.
  2018. //
  2019. // By default idle connection is closed after
  2020. // DefaultMaxIdleConnDuration.
  2021. MaxIdleConnDuration time.Duration
  2022. // Buffer size for responses' reading.
  2023. // This also limits the maximum header size.
  2024. //
  2025. // Default buffer size is used if 0.
  2026. ReadBufferSize int
  2027. // Buffer size for requests' writing.
  2028. //
  2029. // Default buffer size is used if 0.
  2030. WriteBufferSize int
  2031. // Maximum duration for full response reading (including body).
  2032. //
  2033. // By default response read timeout is unlimited.
  2034. ReadTimeout time.Duration
  2035. // Maximum duration for full request writing (including body).
  2036. //
  2037. // By default request write timeout is unlimited.
  2038. WriteTimeout time.Duration
  2039. // Logger for logging client errors.
  2040. //
  2041. // By default standard logger from log package is used.
  2042. Logger Logger
  2043. connClients []*pipelineConnClient
  2044. connClientsLock sync.Mutex
  2045. }
  2046. type pipelineConnClient struct {
  2047. noCopy noCopy
  2048. Addr string
  2049. Name string
  2050. NoDefaultUserAgentHeader bool
  2051. MaxPendingRequests int
  2052. MaxBatchDelay time.Duration
  2053. Dial DialFunc
  2054. DialDualStack bool
  2055. DisableHeaderNamesNormalizing bool
  2056. DisablePathNormalizing bool
  2057. IsTLS bool
  2058. TLSConfig *tls.Config
  2059. MaxIdleConnDuration time.Duration
  2060. ReadBufferSize int
  2061. WriteBufferSize int
  2062. ReadTimeout time.Duration
  2063. WriteTimeout time.Duration
  2064. Logger Logger
  2065. workPool sync.Pool
  2066. chLock sync.Mutex
  2067. chW chan *pipelineWork
  2068. chR chan *pipelineWork
  2069. tlsConfigLock sync.Mutex
  2070. tlsConfig *tls.Config
  2071. }
  2072. type pipelineWork struct {
  2073. reqCopy Request
  2074. respCopy Response
  2075. req *Request
  2076. resp *Response
  2077. t *time.Timer
  2078. deadline time.Time
  2079. err error
  2080. done chan struct{}
  2081. }
  2082. // DoTimeout performs the given request and waits for response during
  2083. // the given timeout duration.
  2084. //
  2085. // Request must contain at least non-zero RequestURI with full url (including
  2086. // scheme and host) or non-zero Host header + RequestURI.
  2087. //
  2088. // The function doesn't follow redirects.
  2089. //
  2090. // Response is ignored if resp is nil.
  2091. //
  2092. // ErrTimeout is returned if the response wasn't returned during
  2093. // the given timeout.
  2094. //
  2095. // It is recommended obtaining req and resp via AcquireRequest
  2096. // and AcquireResponse in performance-critical code.
  2097. //
  2098. // Warning: DoTimeout does not terminate the request itself. The request will
  2099. // continue in the background and the response will be discarded.
  2100. // If requests take too long and the connection pool gets filled up please
  2101. // try setting a ReadTimeout.
  2102. func (c *PipelineClient) DoTimeout(req *Request, resp *Response, timeout time.Duration) error {
  2103. return c.DoDeadline(req, resp, time.Now().Add(timeout))
  2104. }
  2105. // DoDeadline performs the given request and waits for response until
  2106. // the given deadline.
  2107. //
  2108. // Request must contain at least non-zero RequestURI with full url (including
  2109. // scheme and host) or non-zero Host header + RequestURI.
  2110. //
  2111. // The function doesn't follow redirects.
  2112. //
  2113. // Response is ignored if resp is nil.
  2114. //
  2115. // ErrTimeout is returned if the response wasn't returned until
  2116. // the given deadline.
  2117. //
  2118. // It is recommended obtaining req and resp via AcquireRequest
  2119. // and AcquireResponse in performance-critical code.
  2120. func (c *PipelineClient) DoDeadline(req *Request, resp *Response, deadline time.Time) error {
  2121. return c.getConnClient().DoDeadline(req, resp, deadline)
  2122. }
  2123. func (c *pipelineConnClient) DoDeadline(req *Request, resp *Response, deadline time.Time) error {
  2124. c.init()
  2125. timeout := time.Until(deadline)
  2126. if timeout < 0 {
  2127. return ErrTimeout
  2128. }
  2129. if c.DisablePathNormalizing {
  2130. req.URI().DisablePathNormalizing = true
  2131. }
  2132. userAgentOld := req.Header.UserAgent()
  2133. if len(userAgentOld) == 0 {
  2134. userAgent := c.Name
  2135. if userAgent == "" && !c.NoDefaultUserAgentHeader {
  2136. userAgent = defaultUserAgent
  2137. }
  2138. if userAgent != "" {
  2139. req.Header.userAgent = append(req.Header.userAgent[:], userAgent...)
  2140. }
  2141. }
  2142. w := c.acquirePipelineWork(timeout)
  2143. w.respCopy.Header.disableNormalizing = c.DisableHeaderNamesNormalizing
  2144. w.req = &w.reqCopy
  2145. w.resp = &w.respCopy
  2146. // Make a copy of the request in order to avoid data races on timeouts
  2147. req.copyToSkipBody(&w.reqCopy)
  2148. swapRequestBody(req, &w.reqCopy)
  2149. // Put the request to outgoing queue
  2150. select {
  2151. case c.chW <- w:
  2152. // Fast path: len(c.ch) < cap(c.ch)
  2153. default:
  2154. // Slow path
  2155. select {
  2156. case c.chW <- w:
  2157. case <-w.t.C:
  2158. c.releasePipelineWork(w)
  2159. return ErrTimeout
  2160. }
  2161. }
  2162. // Wait for the response
  2163. var err error
  2164. select {
  2165. case <-w.done:
  2166. if resp != nil {
  2167. w.respCopy.copyToSkipBody(resp)
  2168. swapResponseBody(resp, &w.respCopy)
  2169. }
  2170. err = w.err
  2171. c.releasePipelineWork(w)
  2172. case <-w.t.C:
  2173. err = ErrTimeout
  2174. }
  2175. return err
  2176. }
  2177. func (c *pipelineConnClient) acquirePipelineWork(timeout time.Duration) (w *pipelineWork) {
  2178. v := c.workPool.Get()
  2179. if v != nil {
  2180. w = v.(*pipelineWork)
  2181. } else {
  2182. w = &pipelineWork{
  2183. done: make(chan struct{}, 1),
  2184. }
  2185. }
  2186. if timeout > 0 {
  2187. if w.t == nil {
  2188. w.t = time.NewTimer(timeout)
  2189. } else {
  2190. w.t.Reset(timeout)
  2191. }
  2192. w.deadline = time.Now().Add(timeout)
  2193. } else {
  2194. w.deadline = zeroTime
  2195. }
  2196. return w
  2197. }
  2198. func (c *pipelineConnClient) releasePipelineWork(w *pipelineWork) {
  2199. if w.t != nil {
  2200. w.t.Stop()
  2201. }
  2202. w.reqCopy.Reset()
  2203. w.respCopy.Reset()
  2204. w.req = nil
  2205. w.resp = nil
  2206. w.err = nil
  2207. c.workPool.Put(w)
  2208. }
  2209. // Do performs the given http request and sets the corresponding response.
  2210. //
  2211. // Request must contain at least non-zero RequestURI with full url (including
  2212. // scheme and host) or non-zero Host header + RequestURI.
  2213. //
  2214. // The function doesn't follow redirects. Use Get* for following redirects.
  2215. //
  2216. // Response is ignored if resp is nil.
  2217. //
  2218. // It is recommended obtaining req and resp via AcquireRequest
  2219. // and AcquireResponse in performance-critical code.
  2220. func (c *PipelineClient) Do(req *Request, resp *Response) error {
  2221. return c.getConnClient().Do(req, resp)
  2222. }
  2223. func (c *pipelineConnClient) Do(req *Request, resp *Response) error {
  2224. c.init()
  2225. if c.DisablePathNormalizing {
  2226. req.URI().DisablePathNormalizing = true
  2227. }
  2228. userAgentOld := req.Header.UserAgent()
  2229. if len(userAgentOld) == 0 {
  2230. userAgent := c.Name
  2231. if userAgent == "" && !c.NoDefaultUserAgentHeader {
  2232. userAgent = defaultUserAgent
  2233. }
  2234. if userAgent != "" {
  2235. req.Header.userAgent = append(req.Header.userAgent[:], userAgent...)
  2236. }
  2237. }
  2238. w := c.acquirePipelineWork(0)
  2239. w.req = req
  2240. if resp != nil {
  2241. resp.Header.disableNormalizing = c.DisableHeaderNamesNormalizing
  2242. w.resp = resp
  2243. } else {
  2244. w.resp = &w.respCopy
  2245. }
  2246. // Put the request to outgoing queue
  2247. select {
  2248. case c.chW <- w:
  2249. default:
  2250. // Try substituting the oldest w with the current one.
  2251. select {
  2252. case wOld := <-c.chW:
  2253. wOld.err = ErrPipelineOverflow
  2254. wOld.done <- struct{}{}
  2255. default:
  2256. }
  2257. select {
  2258. case c.chW <- w:
  2259. default:
  2260. c.releasePipelineWork(w)
  2261. return ErrPipelineOverflow
  2262. }
  2263. }
  2264. // Wait for the response
  2265. <-w.done
  2266. err := w.err
  2267. c.releasePipelineWork(w)
  2268. return err
  2269. }
  2270. func (c *PipelineClient) getConnClient() *pipelineConnClient {
  2271. c.connClientsLock.Lock()
  2272. cc := c.getConnClientUnlocked()
  2273. c.connClientsLock.Unlock()
  2274. return cc
  2275. }
  2276. func (c *PipelineClient) getConnClientUnlocked() *pipelineConnClient {
  2277. if len(c.connClients) == 0 {
  2278. return c.newConnClient()
  2279. }
  2280. // Return the client with the minimum number of pending requests.
  2281. minCC := c.connClients[0]
  2282. minReqs := minCC.PendingRequests()
  2283. if minReqs == 0 {
  2284. return minCC
  2285. }
  2286. for i := 1; i < len(c.connClients); i++ {
  2287. cc := c.connClients[i]
  2288. reqs := cc.PendingRequests()
  2289. if reqs == 0 {
  2290. return cc
  2291. }
  2292. if reqs < minReqs {
  2293. minCC = cc
  2294. minReqs = reqs
  2295. }
  2296. }
  2297. maxConns := c.MaxConns
  2298. if maxConns <= 0 {
  2299. maxConns = 1
  2300. }
  2301. if len(c.connClients) < maxConns {
  2302. return c.newConnClient()
  2303. }
  2304. return minCC
  2305. }
  2306. func (c *PipelineClient) newConnClient() *pipelineConnClient {
  2307. cc := &pipelineConnClient{
  2308. Addr: c.Addr,
  2309. Name: c.Name,
  2310. NoDefaultUserAgentHeader: c.NoDefaultUserAgentHeader,
  2311. MaxPendingRequests: c.MaxPendingRequests,
  2312. MaxBatchDelay: c.MaxBatchDelay,
  2313. Dial: c.Dial,
  2314. DialDualStack: c.DialDualStack,
  2315. DisableHeaderNamesNormalizing: c.DisableHeaderNamesNormalizing,
  2316. DisablePathNormalizing: c.DisablePathNormalizing,
  2317. IsTLS: c.IsTLS,
  2318. TLSConfig: c.TLSConfig,
  2319. MaxIdleConnDuration: c.MaxIdleConnDuration,
  2320. ReadBufferSize: c.ReadBufferSize,
  2321. WriteBufferSize: c.WriteBufferSize,
  2322. ReadTimeout: c.ReadTimeout,
  2323. WriteTimeout: c.WriteTimeout,
  2324. Logger: c.Logger,
  2325. }
  2326. c.connClients = append(c.connClients, cc)
  2327. return cc
  2328. }
  2329. // ErrPipelineOverflow may be returned from PipelineClient.Do*
  2330. // if the requests' queue is overflown.
  2331. var ErrPipelineOverflow = errors.New("pipelined requests' queue has been overflown. Increase MaxConns and/or MaxPendingRequests")
  2332. // DefaultMaxPendingRequests is the default value
  2333. // for PipelineClient.MaxPendingRequests.
  2334. const DefaultMaxPendingRequests = 1024
  2335. func (c *pipelineConnClient) init() {
  2336. c.chLock.Lock()
  2337. if c.chR == nil {
  2338. maxPendingRequests := c.MaxPendingRequests
  2339. if maxPendingRequests <= 0 {
  2340. maxPendingRequests = DefaultMaxPendingRequests
  2341. }
  2342. c.chR = make(chan *pipelineWork, maxPendingRequests)
  2343. if c.chW == nil {
  2344. c.chW = make(chan *pipelineWork, maxPendingRequests)
  2345. }
  2346. go func() {
  2347. // Keep restarting the worker if it fails (connection errors for example).
  2348. for {
  2349. if err := c.worker(); err != nil {
  2350. c.logger().Printf("error in PipelineClient(%q): %v", c.Addr, err)
  2351. if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
  2352. // Throttle client reconnections on timeout errors
  2353. time.Sleep(time.Second)
  2354. }
  2355. } else {
  2356. c.chLock.Lock()
  2357. stop := len(c.chR) == 0 && len(c.chW) == 0
  2358. if !stop {
  2359. c.chR = nil
  2360. c.chW = nil
  2361. }
  2362. c.chLock.Unlock()
  2363. if stop {
  2364. break
  2365. }
  2366. }
  2367. }
  2368. }()
  2369. }
  2370. c.chLock.Unlock()
  2371. }
  2372. func (c *pipelineConnClient) worker() error {
  2373. tlsConfig := c.cachedTLSConfig()
  2374. conn, err := dialAddr(c.Addr, c.Dial, c.DialDualStack, c.IsTLS, tlsConfig, c.WriteTimeout)
  2375. if err != nil {
  2376. return err
  2377. }
  2378. // Start reader and writer
  2379. stopW := make(chan struct{})
  2380. doneW := make(chan error)
  2381. go func() {
  2382. doneW <- c.writer(conn, stopW)
  2383. }()
  2384. stopR := make(chan struct{})
  2385. doneR := make(chan error)
  2386. go func() {
  2387. doneR <- c.reader(conn, stopR)
  2388. }()
  2389. // Wait until reader and writer are stopped
  2390. select {
  2391. case err = <-doneW:
  2392. conn.Close()
  2393. close(stopR)
  2394. <-doneR
  2395. case err = <-doneR:
  2396. conn.Close()
  2397. close(stopW)
  2398. <-doneW
  2399. }
  2400. // Notify pending readers
  2401. for len(c.chR) > 0 {
  2402. w := <-c.chR
  2403. w.err = errPipelineConnStopped
  2404. w.done <- struct{}{}
  2405. }
  2406. return err
  2407. }
  2408. func (c *pipelineConnClient) cachedTLSConfig() *tls.Config {
  2409. if !c.IsTLS {
  2410. return nil
  2411. }
  2412. c.tlsConfigLock.Lock()
  2413. cfg := c.tlsConfig
  2414. if cfg == nil {
  2415. cfg = newClientTLSConfig(c.TLSConfig, c.Addr)
  2416. c.tlsConfig = cfg
  2417. }
  2418. c.tlsConfigLock.Unlock()
  2419. return cfg
  2420. }
  2421. func (c *pipelineConnClient) writer(conn net.Conn, stopCh <-chan struct{}) error {
  2422. writeBufferSize := c.WriteBufferSize
  2423. if writeBufferSize <= 0 {
  2424. writeBufferSize = defaultWriteBufferSize
  2425. }
  2426. bw := bufio.NewWriterSize(conn, writeBufferSize)
  2427. defer bw.Flush()
  2428. chR := c.chR
  2429. chW := c.chW
  2430. writeTimeout := c.WriteTimeout
  2431. maxIdleConnDuration := c.MaxIdleConnDuration
  2432. if maxIdleConnDuration <= 0 {
  2433. maxIdleConnDuration = DefaultMaxIdleConnDuration
  2434. }
  2435. maxBatchDelay := c.MaxBatchDelay
  2436. var (
  2437. stopTimer = time.NewTimer(time.Hour)
  2438. flushTimer = time.NewTimer(time.Hour)
  2439. flushTimerCh <-chan time.Time
  2440. instantTimerCh = make(chan time.Time)
  2441. w *pipelineWork
  2442. err error
  2443. )
  2444. close(instantTimerCh)
  2445. for {
  2446. againChW:
  2447. select {
  2448. case w = <-chW:
  2449. // Fast path: len(chW) > 0
  2450. default:
  2451. // Slow path
  2452. stopTimer.Reset(maxIdleConnDuration)
  2453. select {
  2454. case w = <-chW:
  2455. case <-stopTimer.C:
  2456. return nil
  2457. case <-stopCh:
  2458. return nil
  2459. case <-flushTimerCh:
  2460. if err = bw.Flush(); err != nil {
  2461. return err
  2462. }
  2463. flushTimerCh = nil
  2464. goto againChW
  2465. }
  2466. }
  2467. if !w.deadline.IsZero() && time.Since(w.deadline) >= 0 {
  2468. w.err = ErrTimeout
  2469. w.done <- struct{}{}
  2470. continue
  2471. }
  2472. w.resp.parseNetConn(conn)
  2473. if writeTimeout > 0 {
  2474. // Set Deadline every time, since golang has fixed the performance issue
  2475. // See https://github.com/golang/go/issues/15133#issuecomment-271571395 for details
  2476. currentTime := time.Now()
  2477. if err = conn.SetWriteDeadline(currentTime.Add(writeTimeout)); err != nil {
  2478. w.err = err
  2479. w.done <- struct{}{}
  2480. return err
  2481. }
  2482. }
  2483. if err = w.req.Write(bw); err != nil {
  2484. w.err = err
  2485. w.done <- struct{}{}
  2486. return err
  2487. }
  2488. if flushTimerCh == nil && (len(chW) == 0 || len(chR) == cap(chR)) {
  2489. if maxBatchDelay > 0 {
  2490. flushTimer.Reset(maxBatchDelay)
  2491. flushTimerCh = flushTimer.C
  2492. } else {
  2493. flushTimerCh = instantTimerCh
  2494. }
  2495. }
  2496. againChR:
  2497. select {
  2498. case chR <- w:
  2499. // Fast path: len(chR) < cap(chR)
  2500. default:
  2501. // Slow path
  2502. select {
  2503. case chR <- w:
  2504. case <-stopCh:
  2505. w.err = errPipelineConnStopped
  2506. w.done <- struct{}{}
  2507. return nil
  2508. case <-flushTimerCh:
  2509. if err = bw.Flush(); err != nil {
  2510. w.err = err
  2511. w.done <- struct{}{}
  2512. return err
  2513. }
  2514. flushTimerCh = nil
  2515. goto againChR
  2516. }
  2517. }
  2518. }
  2519. }
  2520. func (c *pipelineConnClient) reader(conn net.Conn, stopCh <-chan struct{}) error {
  2521. readBufferSize := c.ReadBufferSize
  2522. if readBufferSize <= 0 {
  2523. readBufferSize = defaultReadBufferSize
  2524. }
  2525. br := bufio.NewReaderSize(conn, readBufferSize)
  2526. chR := c.chR
  2527. readTimeout := c.ReadTimeout
  2528. var (
  2529. w *pipelineWork
  2530. err error
  2531. )
  2532. for {
  2533. select {
  2534. case w = <-chR:
  2535. // Fast path: len(chR) > 0
  2536. default:
  2537. // Slow path
  2538. select {
  2539. case w = <-chR:
  2540. case <-stopCh:
  2541. return nil
  2542. }
  2543. }
  2544. if readTimeout > 0 {
  2545. // Set Deadline every time, since golang has fixed the performance issue
  2546. // See https://github.com/golang/go/issues/15133#issuecomment-271571395 for details
  2547. currentTime := time.Now()
  2548. if err = conn.SetReadDeadline(currentTime.Add(readTimeout)); err != nil {
  2549. w.err = err
  2550. w.done <- struct{}{}
  2551. return err
  2552. }
  2553. }
  2554. if err = w.resp.Read(br); err != nil {
  2555. w.err = err
  2556. w.done <- struct{}{}
  2557. return err
  2558. }
  2559. w.done <- struct{}{}
  2560. }
  2561. }
  2562. func (c *pipelineConnClient) logger() Logger {
  2563. if c.Logger != nil {
  2564. return c.Logger
  2565. }
  2566. return defaultLogger
  2567. }
  2568. // PendingRequests returns the current number of pending requests pipelined
  2569. // to the server.
  2570. //
  2571. // This number may exceed MaxPendingRequests*MaxConns by up to two times, since
  2572. // each connection to the server may keep up to MaxPendingRequests requests
  2573. // in the queue before sending them to the server.
  2574. //
  2575. // This function may be used for balancing load among multiple PipelineClient
  2576. // instances.
  2577. func (c *PipelineClient) PendingRequests() int {
  2578. c.connClientsLock.Lock()
  2579. n := 0
  2580. for _, cc := range c.connClients {
  2581. n += cc.PendingRequests()
  2582. }
  2583. c.connClientsLock.Unlock()
  2584. return n
  2585. }
  2586. func (c *pipelineConnClient) PendingRequests() int {
  2587. c.init()
  2588. c.chLock.Lock()
  2589. n := len(c.chR) + len(c.chW)
  2590. c.chLock.Unlock()
  2591. return n
  2592. }
  2593. var errPipelineConnStopped = errors.New("pipeline connection has been stopped")