| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546254725482549255025512552255325542555255625572558255925602561256225632564256525662567256825692570257125722573257425752576257725782579258025812582258325842585258625872588258925902591259225932594259525962597259825992600260126022603260426052606260726082609261026112612261326142615261626172618261926202621262226232624262526262627262826292630263126322633263426352636263726382639264026412642264326442645264626472648264926502651265226532654265526562657265826592660266126622663266426652666266726682669267026712672267326742675267626772678267926802681268226832684268526862687268826892690269126922693269426952696269726982699270027012702270327042705270627072708270927102711271227132714271527162717271827192720272127222723272427252726272727282729273027312732273327342735273627372738273927402741274227432744274527462747274827492750275127522753275427552756275727582759276027612762276327642765276627672768276927702771277227732774277527762777277827792780278127822783278427852786278727882789279027912792279327942795279627972798279928002801280228032804280528062807280828092810281128122813281428152816281728182819282028212822282328242825282628272828282928302831283228332834283528362837283828392840284128422843284428452846284728482849285028512852285328542855285628572858285928602861286228632864286528662867286828692870287128722873287428752876287728782879288028812882288328842885288628872888288928902891289228932894289528962897289828992900290129022903290429052906290729082909291029112912291329142915291629172918291929202921292229232924292529262927292829292930293129322933293429352936293729382939294029412942294329442945294629472948294929502951295229532954295529562957295829592960296129622963296429652966296729682969297029712972297329742975297629772978297929802981298229832984298529862987298829892990299129922993299429952996299729982999300030013002300330043005300630073008300930103011301230133014301530163017301830193020302130223023302430253026302730283029303030313032303330343035303630373038303930403041304230433044304530463047304830493050305130523053305430553056305730583059306030613062306330643065306630673068306930703071307230733074307530763077307830793080308130823083308430853086308730883089309030913092309330943095309630973098309931003101310231033104 |
- package fasthttp
- import (
- "bufio"
- "bytes"
- "crypto/tls"
- "errors"
- "fmt"
- "io"
- "net"
- "strings"
- "sync"
- "sync/atomic"
- "time"
- )
- // Do performs the given http request and fills the given http response.
- //
- // Request must contain at least non-zero RequestURI with full url (including
- // scheme and host) or non-zero Host header + RequestURI.
- //
- // Client determines the server to be requested in the following order:
- //
- // - from RequestURI if it contains full url with scheme and host;
- // - from Host header otherwise.
- //
- // The function doesn't follow redirects. Use Get* for following redirects.
- //
- // Response is ignored if resp is nil.
- //
- // ErrNoFreeConns is returned if all DefaultMaxConnsPerHost connections
- // to the requested host are busy.
- //
- // It is recommended obtaining req and resp via AcquireRequest
- // and AcquireResponse in performance-critical code.
- func Do(req *Request, resp *Response) error {
- return defaultClient.Do(req, resp)
- }
- // DoTimeout performs the given request and waits for response during
- // the given timeout duration.
- //
- // Request must contain at least non-zero RequestURI with full url (including
- // scheme and host) or non-zero Host header + RequestURI.
- //
- // Client determines the server to be requested in the following order:
- //
- // - from RequestURI if it contains full url with scheme and host;
- // - from Host header otherwise.
- //
- // The function doesn't follow redirects. Use Get* for following redirects.
- //
- // Response is ignored if resp is nil.
- //
- // ErrTimeout is returned if the response wasn't returned during
- // the given timeout.
- //
- // ErrNoFreeConns is returned if all DefaultMaxConnsPerHost connections
- // to the requested host are busy.
- //
- // It is recommended obtaining req and resp via AcquireRequest
- // and AcquireResponse in performance-critical code.
- func DoTimeout(req *Request, resp *Response, timeout time.Duration) error {
- return defaultClient.DoTimeout(req, resp, timeout)
- }
- // DoDeadline performs the given request and waits for response until
- // the given deadline.
- //
- // Request must contain at least non-zero RequestURI with full url (including
- // scheme and host) or non-zero Host header + RequestURI.
- //
- // Client determines the server to be requested in the following order:
- //
- // - from RequestURI if it contains full url with scheme and host;
- // - from Host header otherwise.
- //
- // The function doesn't follow redirects. Use Get* for following redirects.
- //
- // Response is ignored if resp is nil.
- //
- // ErrTimeout is returned if the response wasn't returned until
- // the given deadline.
- //
- // ErrNoFreeConns is returned if all DefaultMaxConnsPerHost connections
- // to the requested host are busy.
- //
- // It is recommended obtaining req and resp via AcquireRequest
- // and AcquireResponse in performance-critical code.
- func DoDeadline(req *Request, resp *Response, deadline time.Time) error {
- return defaultClient.DoDeadline(req, resp, deadline)
- }
- // DoRedirects performs the given http request and fills the given http response,
- // following up to maxRedirectsCount redirects. When the redirect count exceeds
- // maxRedirectsCount, ErrTooManyRedirects is returned.
- //
- // Request must contain at least non-zero RequestURI with full url (including
- // scheme and host) or non-zero Host header + RequestURI.
- //
- // Client determines the server to be requested in the following order:
- //
- // - from RequestURI if it contains full url with scheme and host;
- // - from Host header otherwise.
- //
- // Response is ignored if resp is nil.
- //
- // ErrNoFreeConns is returned if all DefaultMaxConnsPerHost connections
- // to the requested host are busy.
- //
- // It is recommended obtaining req and resp via AcquireRequest
- // and AcquireResponse in performance-critical code.
- func DoRedirects(req *Request, resp *Response, maxRedirectsCount int) error {
- if defaultClient.DisablePathNormalizing {
- req.URI().DisablePathNormalizing = true
- }
- _, _, err := doRequestFollowRedirects(req, resp, req.URI().String(), maxRedirectsCount, &defaultClient)
- return err
- }
- // Get returns the status code and body of url.
- //
- // The contents of dst will be replaced by the body and returned, if the dst
- // is too small a new slice will be allocated.
- //
- // The function follows redirects. Use Do* for manually handling redirects.
- func Get(dst []byte, url string) (statusCode int, body []byte, err error) {
- return defaultClient.Get(dst, url)
- }
- // GetTimeout returns the status code and body of url.
- //
- // The contents of dst will be replaced by the body and returned, if the dst
- // is too small a new slice will be allocated.
- //
- // The function follows redirects. Use Do* for manually handling redirects.
- //
- // ErrTimeout error is returned if url contents couldn't be fetched
- // during the given timeout.
- func GetTimeout(dst []byte, url string, timeout time.Duration) (statusCode int, body []byte, err error) {
- return defaultClient.GetTimeout(dst, url, timeout)
- }
- // GetDeadline returns the status code and body of url.
- //
- // The contents of dst will be replaced by the body and returned, if the dst
- // is too small a new slice will be allocated.
- //
- // The function follows redirects. Use Do* for manually handling redirects.
- //
- // ErrTimeout error is returned if url contents couldn't be fetched
- // until the given deadline.
- func GetDeadline(dst []byte, url string, deadline time.Time) (statusCode int, body []byte, err error) {
- return defaultClient.GetDeadline(dst, url, deadline)
- }
- // Post sends POST request to the given url with the given POST arguments.
- //
- // The contents of dst will be replaced by the body and returned, if the dst
- // is too small a new slice will be allocated.
- //
- // The function follows redirects. Use Do* for manually handling redirects.
- //
- // Empty POST body is sent if postArgs is nil.
- func Post(dst []byte, url string, postArgs *Args) (statusCode int, body []byte, err error) {
- return defaultClient.Post(dst, url, postArgs)
- }
- var defaultClient Client
- // Client implements http client.
- //
- // Copying Client by value is prohibited. Create new instance instead.
- //
- // It is safe calling Client methods from concurrently running goroutines.
- //
- // The fields of a Client should not be changed while it is in use.
- type Client struct {
- noCopy noCopy
- readerPool sync.Pool
- writerPool sync.Pool
- // Transport defines a transport-like mechanism that wraps every request/response.
- Transport RoundTripper
- // Callback for establishing new connections to hosts.
- //
- // Default DialTimeout is used if not set.
- DialTimeout DialFuncWithTimeout
- // Callback for establishing new connections to hosts.
- //
- // Note that if Dial is set instead of DialTimeout, Dial will ignore Request timeout.
- // If you want the tcp dial process to account for request timeouts, use DialTimeout instead.
- //
- // If not set, DialTimeout is used.
- Dial DialFunc
- // TLS config for https connections.
- //
- // Default TLS config is used if not set.
- TLSConfig *tls.Config
- // RetryIf controls whether a retry should be attempted after an error.
- //
- // By default will use isIdempotent function.
- //
- // Deprecated: Use RetryIfErr instead.
- // This field is only effective when the `RetryIfErr` field is not set.
- RetryIf RetryIfFunc
- // When the client encounters an error during a request, the behavior—whether to retry
- // and whether to reset the request timeout—should be determined
- // based on the return value of this field.
- // This field is only effective within the range of MaxIdemponentCallAttempts.
- RetryIfErr RetryIfErrFunc
- // ConfigureClient configures the fasthttp.HostClient.
- ConfigureClient func(hc *HostClient) error
- m map[string]*HostClient
- ms map[string]*HostClient
- // Client name. Used in User-Agent request header.
- //
- // Default client name is used if not set.
- Name string
- // Maximum number of connections per each host which may be established.
- //
- // DefaultMaxConnsPerHost is used if not set.
- MaxConnsPerHost int
- // Idle keep-alive connections are closed after this duration.
- //
- // By default idle connections are closed
- // after DefaultMaxIdleConnDuration.
- MaxIdleConnDuration time.Duration
- // Keep-alive connections are closed after this duration.
- //
- // By default connection duration is unlimited.
- MaxConnDuration time.Duration
- // Maximum number of attempts for idempotent calls.
- //
- // DefaultMaxIdemponentCallAttempts is used if not set.
- MaxIdemponentCallAttempts int
- // Per-connection buffer size for responses' reading.
- // This also limits the maximum header size.
- //
- // Default buffer size is used if 0.
- ReadBufferSize int
- // Per-connection buffer size for requests' writing.
- //
- // Default buffer size is used if 0.
- WriteBufferSize int
- // Maximum duration for full response reading (including body).
- //
- // By default response read timeout is unlimited.
- ReadTimeout time.Duration
- // Maximum duration for full request writing (including body).
- //
- // By default request write timeout is unlimited.
- WriteTimeout time.Duration
- // Maximum response body size.
- //
- // The client returns ErrBodyTooLarge if this limit is greater than 0
- // and response body is greater than the limit.
- //
- // By default response body size is unlimited.
- MaxResponseBodySize int
- // Maximum duration for waiting for a free connection.
- //
- // By default will not waiting, return ErrNoFreeConns immediately.
- MaxConnWaitTimeout time.Duration
- // Connection pool strategy. Can be either LIFO or FIFO (default).
- ConnPoolStrategy ConnPoolStrategyType
- mLock sync.RWMutex
- mOnce sync.Once
- // NoDefaultUserAgentHeader when set to true, causes the default
- // User-Agent header to be excluded from the Request.
- NoDefaultUserAgentHeader bool
- // Attempt to connect to both ipv4 and ipv6 addresses if set to true.
- //
- // This option is used only if default TCP dialer is used,
- // i.e. if Dial is blank.
- //
- // By default client connects only to ipv4 addresses,
- // since unfortunately ipv6 remains broken in many networks worldwide :)
- DialDualStack bool
- // Header names are passed as-is without normalization
- // if this option is set.
- //
- // Disabled header names' normalization may be useful only for proxying
- // responses to other clients expecting case-sensitive
- // header names. See https://github.com/valyala/fasthttp/issues/57
- // for details.
- //
- // By default request and response header names are normalized, i.e.
- // The first letter and the first letters following dashes
- // are uppercased, while all the other letters are lowercased.
- // Examples:
- //
- // * HOST -> Host
- // * content-type -> Content-Type
- // * cONTENT-lenGTH -> Content-Length
- DisableHeaderNamesNormalizing bool
- // Path values are sent as-is without normalization.
- //
- // Disabled path normalization may be useful for proxying incoming requests
- // to servers that are expecting paths to be forwarded as-is.
- //
- // By default path values are normalized, i.e.
- // extra slashes are removed, special characters are encoded.
- DisablePathNormalizing bool
- // StreamResponseBody enables response body streaming.
- StreamResponseBody bool
- }
- // Get returns the status code and body of url.
- //
- // The contents of dst will be replaced by the body and returned, if the dst
- // is too small a new slice will be allocated.
- //
- // The function follows redirects. Use Do* for manually handling redirects.
- func (c *Client) Get(dst []byte, url string) (statusCode int, body []byte, err error) {
- return clientGetURL(dst, url, c)
- }
- // GetTimeout returns the status code and body of url.
- //
- // The contents of dst will be replaced by the body and returned, if the dst
- // is too small a new slice will be allocated.
- //
- // The function follows redirects. Use Do* for manually handling redirects.
- //
- // ErrTimeout error is returned if url contents couldn't be fetched
- // during the given timeout.
- func (c *Client) GetTimeout(dst []byte, url string, timeout time.Duration) (statusCode int, body []byte, err error) {
- return clientGetURLTimeout(dst, url, timeout, c)
- }
- // GetDeadline returns the status code and body of url.
- //
- // The contents of dst will be replaced by the body and returned, if the dst
- // is too small a new slice will be allocated.
- //
- // The function follows redirects. Use Do* for manually handling redirects.
- //
- // ErrTimeout error is returned if url contents couldn't be fetched
- // until the given deadline.
- func (c *Client) GetDeadline(dst []byte, url string, deadline time.Time) (statusCode int, body []byte, err error) {
- return clientGetURLDeadline(dst, url, deadline, c)
- }
- // Post sends POST request to the given url with the given POST arguments.
- //
- // The contents of dst will be replaced by the body and returned, if the dst
- // is too small a new slice will be allocated.
- //
- // The function follows redirects. Use Do* for manually handling redirects.
- //
- // Empty POST body is sent if postArgs is nil.
- func (c *Client) Post(dst []byte, url string, postArgs *Args) (statusCode int, body []byte, err error) {
- return clientPostURL(dst, url, postArgs, c)
- }
- // DoTimeout performs the given request and waits for response during
- // the given timeout duration.
- //
- // Request must contain at least non-zero RequestURI with full url (including
- // scheme and host) or non-zero Host header + RequestURI.
- //
- // Client determines the server to be requested in the following order:
- //
- // - from RequestURI if it contains full url with scheme and host;
- // - from Host header otherwise.
- //
- // The function doesn't follow redirects. Use Get* for following redirects.
- //
- // Response is ignored if resp is nil.
- //
- // ErrTimeout is returned if the response wasn't returned during
- // the given timeout.
- // Immediately returns ErrTimeout if timeout value is negative.
- //
- // ErrNoFreeConns is returned if all Client.MaxConnsPerHost connections
- // to the requested host are busy.
- //
- // It is recommended obtaining req and resp via AcquireRequest
- // and AcquireResponse in performance-critical code.
- func (c *Client) DoTimeout(req *Request, resp *Response, timeout time.Duration) error {
- req.timeout = timeout
- if req.timeout <= 0 {
- return ErrTimeout
- }
- return c.Do(req, resp)
- }
- // DoDeadline performs the given request and waits for response until
- // the given deadline.
- //
- // Request must contain at least non-zero RequestURI with full url (including
- // scheme and host) or non-zero Host header + RequestURI.
- //
- // Client determines the server to be requested in the following order:
- //
- // - from RequestURI if it contains full url with scheme and host;
- // - from Host header otherwise.
- //
- // The function doesn't follow redirects. Use Get* for following redirects.
- //
- // Response is ignored if resp is nil.
- //
- // ErrTimeout is returned if the response wasn't returned until
- // the given deadline.
- // Immediately returns ErrTimeout if the deadline has already been reached.
- //
- // ErrNoFreeConns is returned if all Client.MaxConnsPerHost connections
- // to the requested host are busy.
- //
- // It is recommended obtaining req and resp via AcquireRequest
- // and AcquireResponse in performance-critical code.
- func (c *Client) DoDeadline(req *Request, resp *Response, deadline time.Time) error {
- req.timeout = time.Until(deadline)
- if req.timeout <= 0 {
- return ErrTimeout
- }
- return c.Do(req, resp)
- }
- // DoRedirects performs the given http request and fills the given http response,
- // following up to maxRedirectsCount redirects. When the redirect count exceeds
- // maxRedirectsCount, ErrTooManyRedirects is returned.
- //
- // Request must contain at least non-zero RequestURI with full url (including
- // scheme and host) or non-zero Host header + RequestURI.
- //
- // Client determines the server to be requested in the following order:
- //
- // - from RequestURI if it contains full url with scheme and host;
- // - from Host header otherwise.
- //
- // Response is ignored if resp is nil.
- //
- // ErrNoFreeConns is returned if all DefaultMaxConnsPerHost connections
- // to the requested host are busy.
- //
- // It is recommended obtaining req and resp via AcquireRequest
- // and AcquireResponse in performance-critical code.
- func (c *Client) DoRedirects(req *Request, resp *Response, maxRedirectsCount int) error {
- if c.DisablePathNormalizing {
- req.URI().DisablePathNormalizing = true
- }
- _, _, err := doRequestFollowRedirects(req, resp, req.URI().String(), maxRedirectsCount, c)
- return err
- }
- // Do performs the given http request and fills the given http response.
- //
- // Request must contain at least non-zero RequestURI with full url (including
- // scheme and host) or non-zero Host header + RequestURI.
- //
- // Client determines the server to be requested in the following order:
- //
- // - from RequestURI if it contains full url with scheme and host;
- // - from Host header otherwise.
- //
- // Response is ignored if resp is nil.
- //
- // The function doesn't follow redirects. Use Get* for following redirects.
- //
- // ErrNoFreeConns is returned if all Client.MaxConnsPerHost connections
- // to the requested host are busy.
- //
- // It is recommended obtaining req and resp via AcquireRequest
- // and AcquireResponse in performance-critical code.
- func (c *Client) Do(req *Request, resp *Response) error {
- uri := req.URI()
- if uri == nil {
- return ErrorInvalidURI
- }
- host := uri.Host()
- if bytes.ContainsRune(host, ',') {
- return fmt.Errorf("invalid host %q. Use HostClient for multiple hosts", host)
- }
- isTLS := false
- if uri.isHTTPS() {
- isTLS = true
- } else if !uri.isHTTP() {
- return fmt.Errorf("unsupported protocol %q. http and https are supported", uri.Scheme())
- }
- c.mOnce.Do(func() {
- c.mLock.Lock()
- c.m = make(map[string]*HostClient)
- c.ms = make(map[string]*HostClient)
- c.mLock.Unlock()
- })
- startCleaner := false
- c.mLock.RLock()
- m := c.m
- if isTLS {
- m = c.ms
- }
- hc := m[string(host)]
- if hc != nil {
- atomic.AddInt32(&hc.pendingClientRequests, 1)
- defer atomic.AddInt32(&hc.pendingClientRequests, -1)
- }
- c.mLock.RUnlock()
- if hc == nil {
- c.mLock.Lock()
- hc = m[string(host)]
- if hc == nil {
- hc = &HostClient{
- Addr: AddMissingPort(string(host), isTLS),
- Transport: c.Transport,
- Name: c.Name,
- NoDefaultUserAgentHeader: c.NoDefaultUserAgentHeader,
- Dial: c.Dial,
- DialTimeout: c.DialTimeout,
- DialDualStack: c.DialDualStack,
- IsTLS: isTLS,
- TLSConfig: c.TLSConfig,
- MaxConns: c.MaxConnsPerHost,
- MaxIdleConnDuration: c.MaxIdleConnDuration,
- MaxConnDuration: c.MaxConnDuration,
- MaxIdemponentCallAttempts: c.MaxIdemponentCallAttempts,
- ReadBufferSize: c.ReadBufferSize,
- WriteBufferSize: c.WriteBufferSize,
- ReadTimeout: c.ReadTimeout,
- WriteTimeout: c.WriteTimeout,
- MaxResponseBodySize: c.MaxResponseBodySize,
- DisableHeaderNamesNormalizing: c.DisableHeaderNamesNormalizing,
- DisablePathNormalizing: c.DisablePathNormalizing,
- MaxConnWaitTimeout: c.MaxConnWaitTimeout,
- RetryIf: c.RetryIf,
- RetryIfErr: c.RetryIfErr,
- ConnPoolStrategy: c.ConnPoolStrategy,
- StreamResponseBody: c.StreamResponseBody,
- clientReaderPool: &c.readerPool,
- clientWriterPool: &c.writerPool,
- }
- if c.ConfigureClient != nil {
- if err := c.ConfigureClient(hc); err != nil {
- c.mLock.Unlock()
- return err
- }
- }
- m[string(host)] = hc
- if len(m) == 1 {
- startCleaner = true
- }
- }
- atomic.AddInt32(&hc.pendingClientRequests, 1)
- defer atomic.AddInt32(&hc.pendingClientRequests, -1)
- c.mLock.Unlock()
- }
- if startCleaner {
- go c.mCleaner(m)
- }
- return hc.Do(req, resp)
- }
- // CloseIdleConnections closes any connections which were previously
- // connected from previous requests but are now sitting idle in a
- // "keep-alive" state. It does not interrupt any connections currently
- // in use.
- func (c *Client) CloseIdleConnections() {
- c.mLock.RLock()
- for _, v := range c.m {
- v.CloseIdleConnections()
- }
- for _, v := range c.ms {
- v.CloseIdleConnections()
- }
- c.mLock.RUnlock()
- }
- func (c *Client) mCleaner(m map[string]*HostClient) {
- mustStop := false
- sleep := c.MaxIdleConnDuration
- if sleep < time.Second {
- sleep = time.Second
- } else if sleep > 10*time.Second {
- sleep = 10 * time.Second
- }
- for {
- time.Sleep(sleep)
- c.mLock.Lock()
- for k, v := range m {
- v.connsLock.Lock()
- if v.connsCount == 0 && atomic.LoadInt32(&v.pendingClientRequests) == 0 {
- delete(m, k)
- }
- v.connsLock.Unlock()
- }
- if len(m) == 0 {
- mustStop = true
- }
- c.mLock.Unlock()
- if mustStop {
- break
- }
- }
- }
- // DefaultMaxConnsPerHost is the maximum number of concurrent connections
- // http client may establish per host by default (i.e. if
- // Client.MaxConnsPerHost isn't set).
- const DefaultMaxConnsPerHost = 512
- // DefaultMaxIdleConnDuration is the default duration before idle keep-alive
- // connection is closed.
- const DefaultMaxIdleConnDuration = 10 * time.Second
- // DefaultMaxIdemponentCallAttempts is the default idempotent calls attempts count.
- const DefaultMaxIdemponentCallAttempts = 5
- // DialFunc must establish connection to addr.
- //
- // There is no need in establishing TLS (SSL) connection for https.
- // The client automatically converts connection to TLS
- // if HostClient.IsTLS is set.
- //
- // TCP address passed to DialFunc always contains host and port.
- // Example TCP addr values:
- //
- // - foobar.com:80
- // - foobar.com:443
- // - foobar.com:8080
- type DialFunc func(addr string) (net.Conn, error)
- // DialFuncWithTimeout must establish connection to addr.
- // Unlike DialFunc, it also accepts a timeout.
- //
- // There is no need in establishing TLS (SSL) connection for https.
- // The client automatically converts connection to TLS
- // if HostClient.IsTLS is set.
- //
- // TCP address passed to DialFuncWithTimeout always contains host and port.
- // Example TCP addr values:
- //
- // - foobar.com:80
- // - foobar.com:443
- // - foobar.com:8080
- type DialFuncWithTimeout func(addr string, timeout time.Duration) (net.Conn, error)
- // RetryIfFunc defines the signature of the retry if function.
- // Request argument passed to RetryIfFunc, if there are any request errors.
- type RetryIfFunc func(request *Request) bool
- // RetryIfErrFunc defines an interface used for implementing the following functionality:
- // When the client encounters an error during a request, the behavior—whether to retry
- // and whether to reset the request timeout—should be determined
- // based on the return value of this interface.
- //
- // attempt indicates which attempt the current retry is due to a failure of.
- // The first request counts as the first attempt.
- //
- // err represents the error encountered while attempting the `attempts`-th request.
- //
- // resetTimeout indicates whether to reuse the `Request`'s timeout as the timeout interval,
- // rather than using the timeout after subtracting the time spent on previous failed requests.
- // This return value is meaningful only when you use `Request.SetTimeout`, `DoTimeout`, or `DoDeadline`.
- //
- // retry indicates whether to retry the current request. If it is false,
- // the request function will immediately return with the `err`.
- type RetryIfErrFunc func(request *Request, attempts int, err error) (resetTimeout bool, retry bool)
- // RoundTripper wraps every request/response.
- type RoundTripper interface {
- RoundTrip(hc *HostClient, req *Request, resp *Response) (retry bool, err error)
- }
- // ConnPoolStrategyType define strategy of connection pool enqueue/dequeue.
- type ConnPoolStrategyType int
- const (
- FIFO ConnPoolStrategyType = iota
- LIFO
- )
- // HostClient balances http requests among hosts listed in Addr.
- //
- // HostClient may be used for balancing load among multiple upstream hosts.
- // While multiple addresses passed to HostClient.Addr may be used for balancing
- // load among them, it would be better using LBClient instead, since HostClient
- // may unevenly balance load among upstream hosts.
- //
- // It is forbidden copying HostClient instances. Create new instances instead.
- //
- // It is safe calling HostClient methods from concurrently running goroutines.
- type HostClient struct {
- noCopy noCopy
- readerPool sync.Pool
- writerPool sync.Pool
- // Transport defines a transport-like mechanism that wraps every request/response.
- Transport RoundTripper
- // Callback for establishing new connections to hosts.
- //
- // Default DialTimeout is used if not set.
- DialTimeout DialFuncWithTimeout
- // Callback for establishing new connections to hosts.
- //
- // Note that if Dial is set instead of DialTimeout, Dial will ignore Request timeout.
- // If you want the tcp dial process to account for request timeouts, use DialTimeout instead.
- //
- // If not set, DialTimeout is used.
- Dial DialFunc
- // Optional TLS config.
- TLSConfig *tls.Config
- // RetryIf controls whether a retry should be attempted after an error.
- // By default, it uses the isIdempotent function.
- //
- // Deprecated: Use RetryIfErr instead.
- // This field is only effective when the `RetryIfErr` field is not set.
- RetryIf RetryIfFunc
- // When the client encounters an error during a request, the behavior—whether to retry
- // and whether to reset the request timeout—should be determined
- // based on the return value of this field.
- // This field is only effective within the range of MaxIdemponentCallAttempts.
- RetryIfErr RetryIfErrFunc
- connsWait *wantConnQueue
- tlsConfigMap map[string]*tls.Config
- clientReaderPool *sync.Pool
- clientWriterPool *sync.Pool
- // Comma-separated list of upstream HTTP server host addresses,
- // which are passed to Dial or DialTimeout in a round-robin manner.
- //
- // Each address may contain port if default dialer is used.
- // For example,
- //
- // - foobar.com:80
- // - foobar.com:443
- // - foobar.com:8080
- Addr string
- // Client name. Used in User-Agent request header.
- Name string
- conns []*clientConn
- addrs []string
- // Maximum number of connections which may be established to all hosts
- // listed in Addr.
- //
- // You can change this value while the HostClient is being used
- // with HostClient.SetMaxConns(value)
- //
- // DefaultMaxConnsPerHost is used if not set.
- MaxConns int
- // Keep-alive connections are closed after this duration.
- //
- // By default connection duration is unlimited.
- MaxConnDuration time.Duration
- // Idle keep-alive connections are closed after this duration.
- //
- // By default idle connections are closed
- // after DefaultMaxIdleConnDuration.
- MaxIdleConnDuration time.Duration
- // Maximum number of attempts for idempotent calls.
- //
- // A value of 0 or a negative value represents using DefaultMaxIdemponentCallAttempts.
- // For example, a value of 1 means the request will be executed only once,
- // while 2 means the request will be executed at most twice.
- // The RetryIfErr and RetryIf fields can invalidate remaining attempts.
- MaxIdemponentCallAttempts int
- // Per-connection buffer size for responses' reading.
- // This also limits the maximum header size.
- //
- // Default buffer size is used if 0.
- ReadBufferSize int
- // Per-connection buffer size for requests' writing.
- //
- // Default buffer size is used if 0.
- WriteBufferSize int
- // Maximum duration for full response reading (including body).
- //
- // By default response read timeout is unlimited.
- ReadTimeout time.Duration
- // Maximum duration for full request writing (including body).
- //
- // By default request write timeout is unlimited.
- WriteTimeout time.Duration
- // Maximum response body size.
- //
- // The client returns ErrBodyTooLarge if this limit is greater than 0
- // and response body is greater than the limit.
- //
- // By default response body size is unlimited.
- MaxResponseBodySize int
- // Maximum duration for waiting for a free connection.
- //
- // By default will not waiting, return ErrNoFreeConns immediately
- MaxConnWaitTimeout time.Duration
- // Connection pool strategy. Can be either LIFO or FIFO (default).
- ConnPoolStrategy ConnPoolStrategyType
- connsCount int
- connsLock sync.Mutex
- addrsLock sync.Mutex
- tlsConfigMapLock sync.Mutex
- addrIdx uint32
- lastUseTime uint32
- pendingRequests int32
- // pendingClientRequests counts the number of requests that a Client is currently running using this HostClient.
- // It will be incremented earlier than pendingRequests and will be used by Client to see if the HostClient is still in use.
- pendingClientRequests int32
- // NoDefaultUserAgentHeader when set to true, causes the default
- // User-Agent header to be excluded from the Request.
- NoDefaultUserAgentHeader bool
- // Attempt to connect to both ipv4 and ipv6 host addresses
- // if set to true.
- //
- // This option is used only if default TCP dialer is used,
- // i.e. if Dial and DialTimeout are blank.
- //
- // By default client connects only to ipv4 addresses,
- // since unfortunately ipv6 remains broken in many networks worldwide :)
- DialDualStack bool
- // Whether to use TLS (aka SSL or HTTPS) for host connections.
- IsTLS bool
- // Header names are passed as-is without normalization
- // if this option is set.
- //
- // Disabled header names' normalization may be useful only for proxying
- // responses to other clients expecting case-sensitive
- // header names. See https://github.com/valyala/fasthttp/issues/57
- // for details.
- //
- // By default request and response header names are normalized, i.e.
- // The first letter and the first letters following dashes
- // are uppercased, while all the other letters are lowercased.
- // Examples:
- //
- // * HOST -> Host
- // * content-type -> Content-Type
- // * cONTENT-lenGTH -> Content-Length
- DisableHeaderNamesNormalizing bool
- // Path values are sent as-is without normalization.
- //
- // Disabled path normalization may be useful for proxying incoming requests
- // to servers that are expecting paths to be forwarded as-is.
- //
- // By default path values are normalized, i.e.
- // extra slashes are removed, special characters are encoded.
- DisablePathNormalizing bool
- // Will not log potentially sensitive content in error logs.
- //
- // This option is useful for servers that handle sensitive data
- // in the request/response.
- //
- // Client logs full errors by default.
- SecureErrorLogMessage bool
- // StreamResponseBody enables response body streaming.
- StreamResponseBody bool
- connsCleanerRun bool
- }
- type clientConn struct {
- c net.Conn
- createdTime time.Time
- lastUseTime time.Time
- }
- // CreatedTime returns net.Conn the client.
- func (cc *clientConn) Conn() net.Conn {
- return cc.c
- }
- // CreatedTime returns time the client was created.
- func (cc *clientConn) CreatedTime() time.Time {
- return cc.createdTime
- }
- // LastUseTime returns time the client was last used.
- func (cc *clientConn) LastUseTime() time.Time {
- return cc.lastUseTime
- }
- var startTimeUnix = time.Now().Unix()
- // LastUseTime returns time the client was last used.
- func (c *HostClient) LastUseTime() time.Time {
- n := atomic.LoadUint32(&c.lastUseTime)
- return time.Unix(startTimeUnix+int64(n), 0)
- }
- // Get returns the status code and body of url.
- //
- // The contents of dst will be replaced by the body and returned, if the dst
- // is too small a new slice will be allocated.
- //
- // The function follows redirects. Use Do* for manually handling redirects.
- func (c *HostClient) Get(dst []byte, url string) (statusCode int, body []byte, err error) {
- return clientGetURL(dst, url, c)
- }
- // GetTimeout returns the status code and body of url.
- //
- // The contents of dst will be replaced by the body and returned, if the dst
- // is too small a new slice will be allocated.
- //
- // The function follows redirects. Use Do* for manually handling redirects.
- //
- // ErrTimeout error is returned if url contents couldn't be fetched
- // during the given timeout.
- func (c *HostClient) GetTimeout(dst []byte, url string, timeout time.Duration) (statusCode int, body []byte, err error) {
- return clientGetURLTimeout(dst, url, timeout, c)
- }
- // GetDeadline returns the status code and body of url.
- //
- // The contents of dst will be replaced by the body and returned, if the dst
- // is too small a new slice will be allocated.
- //
- // The function follows redirects. Use Do* for manually handling redirects.
- //
- // ErrTimeout error is returned if url contents couldn't be fetched
- // until the given deadline.
- func (c *HostClient) GetDeadline(dst []byte, url string, deadline time.Time) (statusCode int, body []byte, err error) {
- return clientGetURLDeadline(dst, url, deadline, c)
- }
- // Post sends POST request to the given url with the given POST arguments.
- //
- // The contents of dst will be replaced by the body and returned, if the dst
- // is too small a new slice will be allocated.
- //
- // The function follows redirects. Use Do* for manually handling redirects.
- //
- // Empty POST body is sent if postArgs is nil.
- func (c *HostClient) Post(dst []byte, url string, postArgs *Args) (statusCode int, body []byte, err error) {
- return clientPostURL(dst, url, postArgs, c)
- }
- type clientDoer interface {
- Do(req *Request, resp *Response) error
- }
- func clientGetURL(dst []byte, url string, c clientDoer) (statusCode int, body []byte, err error) {
- req := AcquireRequest()
- statusCode, body, err = doRequestFollowRedirectsBuffer(req, dst, url, c)
- ReleaseRequest(req)
- return statusCode, body, err
- }
- func clientGetURLTimeout(dst []byte, url string, timeout time.Duration, c clientDoer) (statusCode int, body []byte, err error) {
- deadline := time.Now().Add(timeout)
- return clientGetURLDeadline(dst, url, deadline, c)
- }
- type clientURLResponse struct {
- err error
- body []byte
- statusCode int
- }
- func clientGetURLDeadline(dst []byte, url string, deadline time.Time, c clientDoer) (statusCode int, body []byte, err error) {
- timeout := time.Until(deadline)
- if timeout <= 0 {
- return 0, dst, ErrTimeout
- }
- var ch chan clientURLResponse
- chv := clientURLResponseChPool.Get()
- if chv == nil {
- chv = make(chan clientURLResponse, 1)
- }
- ch = chv.(chan clientURLResponse)
- // Note that the request continues execution on ErrTimeout until
- // client-specific ReadTimeout exceeds. This helps limiting load
- // on slow hosts by MaxConns* concurrent requests.
- //
- // Without this 'hack' the load on slow host could exceed MaxConns*
- // concurrent requests, since timed out requests on client side
- // usually continue execution on the host.
- var mu sync.Mutex
- var timedout, responded bool
- go func() {
- req := AcquireRequest()
- statusCodeCopy, bodyCopy, errCopy := doRequestFollowRedirectsBuffer(req, dst, url, c)
- mu.Lock()
- if !timedout {
- ch <- clientURLResponse{
- statusCode: statusCodeCopy,
- body: bodyCopy,
- err: errCopy,
- }
- responded = true
- }
- mu.Unlock()
- ReleaseRequest(req)
- }()
- tc := AcquireTimer(timeout)
- select {
- case resp := <-ch:
- statusCode = resp.statusCode
- body = resp.body
- err = resp.err
- case <-tc.C:
- mu.Lock()
- if responded {
- resp := <-ch
- statusCode = resp.statusCode
- body = resp.body
- err = resp.err
- } else {
- timedout = true
- err = ErrTimeout
- body = dst
- }
- mu.Unlock()
- }
- ReleaseTimer(tc)
- clientURLResponseChPool.Put(chv)
- return statusCode, body, err
- }
- var clientURLResponseChPool sync.Pool
- func clientPostURL(dst []byte, url string, postArgs *Args, c clientDoer) (statusCode int, body []byte, err error) {
- req := AcquireRequest()
- defer ReleaseRequest(req)
- req.Header.SetMethod(MethodPost)
- req.Header.SetContentTypeBytes(strPostArgsContentType)
- if postArgs != nil {
- if _, err := postArgs.WriteTo(req.BodyWriter()); err != nil {
- return 0, nil, err
- }
- }
- statusCode, body, err = doRequestFollowRedirectsBuffer(req, dst, url, c)
- return statusCode, body, err
- }
- var (
- // ErrMissingLocation is returned by clients when the Location header is missing on
- // an HTTP response with a redirect status code.
- ErrMissingLocation = errors.New("missing Location header for http redirect")
- // ErrTooManyRedirects is returned by clients when the number of redirects followed
- // exceed the max count.
- ErrTooManyRedirects = errors.New("too many redirects detected when doing the request")
- // HostClients are only able to follow redirects to the same protocol.
- ErrHostClientRedirectToDifferentScheme = errors.New("HostClient can't follow redirects to a different protocol," +
- " please use Client instead")
- )
- const defaultMaxRedirectsCount = 16
- func doRequestFollowRedirectsBuffer(req *Request, dst []byte, url string, c clientDoer) (statusCode int, body []byte, err error) {
- resp := AcquireResponse()
- bodyBuf := resp.bodyBuffer()
- resp.keepBodyBuffer = true
- oldBody := bodyBuf.B
- bodyBuf.B = dst
- statusCode, _, err = doRequestFollowRedirects(req, resp, url, defaultMaxRedirectsCount, c)
- body = bodyBuf.B
- bodyBuf.B = oldBody
- resp.keepBodyBuffer = false
- ReleaseResponse(resp)
- return statusCode, body, err
- }
- func doRequestFollowRedirects(
- req *Request, resp *Response, url string, maxRedirectsCount int, c clientDoer,
- ) (statusCode int, body []byte, err error) {
- redirectsCount := 0
- for {
- req.SetRequestURI(url)
- if err := req.parseURI(); err != nil {
- return 0, nil, err
- }
- if err = c.Do(req, resp); err != nil {
- break
- }
- statusCode = resp.Header.StatusCode()
- if !StatusCodeIsRedirect(statusCode) {
- break
- }
- redirectsCount++
- if redirectsCount > maxRedirectsCount {
- err = ErrTooManyRedirects
- break
- }
- location := resp.Header.peek(strLocation)
- if len(location) == 0 {
- err = ErrMissingLocation
- break
- }
- url = getRedirectURL(url, location, req.DisableRedirectPathNormalizing)
- if string(req.Header.Method()) == "POST" && (statusCode == 301 || statusCode == 302) {
- req.Header.SetMethod(MethodGet)
- }
- }
- return statusCode, body, err
- }
- func getRedirectURL(baseURL string, location []byte, disablePathNormalizing bool) string {
- u := AcquireURI()
- u.Update(baseURL)
- u.UpdateBytes(location)
- u.DisablePathNormalizing = disablePathNormalizing
- redirectURL := u.String()
- ReleaseURI(u)
- return redirectURL
- }
- // StatusCodeIsRedirect returns true if the status code indicates a redirect.
- func StatusCodeIsRedirect(statusCode int) bool {
- return statusCode == StatusMovedPermanently ||
- statusCode == StatusFound ||
- statusCode == StatusSeeOther ||
- statusCode == StatusTemporaryRedirect ||
- statusCode == StatusPermanentRedirect
- }
- var (
- requestPool sync.Pool
- responsePool sync.Pool
- )
- // AcquireRequest returns an empty Request instance from request pool.
- //
- // The returned Request instance may be passed to ReleaseRequest when it is
- // no longer needed. This allows Request recycling, reduces GC pressure
- // and usually improves performance.
- func AcquireRequest() *Request {
- v := requestPool.Get()
- if v == nil {
- return &Request{}
- }
- return v.(*Request)
- }
- // ReleaseRequest returns req acquired via AcquireRequest to request pool.
- //
- // It is forbidden accessing req and/or its' members after returning
- // it to request pool.
- func ReleaseRequest(req *Request) {
- req.Reset()
- requestPool.Put(req)
- }
- // AcquireResponse returns an empty Response instance from response pool.
- //
- // The returned Response instance may be passed to ReleaseResponse when it is
- // no longer needed. This allows Response recycling, reduces GC pressure
- // and usually improves performance.
- func AcquireResponse() *Response {
- v := responsePool.Get()
- if v == nil {
- return &Response{}
- }
- return v.(*Response)
- }
- // ReleaseResponse return resp acquired via AcquireResponse to response pool.
- //
- // It is forbidden accessing resp and/or its' members after returning
- // it to response pool.
- func ReleaseResponse(resp *Response) {
- resp.Reset()
- responsePool.Put(resp)
- }
- // DoTimeout performs the given request and waits for response during
- // the given timeout duration.
- //
- // Request must contain at least non-zero RequestURI with full url (including
- // scheme and host) or non-zero Host header + RequestURI.
- //
- // The function doesn't follow redirects. Use Get* for following redirects.
- //
- // Response is ignored if resp is nil.
- //
- // ErrTimeout is returned if the response wasn't returned during
- // the given timeout.
- // Immediately returns ErrTimeout if timeout value is negative.
- //
- // ErrNoFreeConns is returned if all HostClient.MaxConns connections
- // to the host are busy.
- //
- // It is recommended obtaining req and resp via AcquireRequest
- // and AcquireResponse in performance-critical code.
- func (c *HostClient) DoTimeout(req *Request, resp *Response, timeout time.Duration) error {
- req.timeout = timeout
- if req.timeout <= 0 {
- return ErrTimeout
- }
- return c.Do(req, resp)
- }
- // DoDeadline performs the given request and waits for response until
- // the given deadline.
- //
- // Request must contain at least non-zero RequestURI with full url (including
- // scheme and host) or non-zero Host header + RequestURI.
- //
- // The function doesn't follow redirects. Use Get* for following redirects.
- //
- // Response is ignored if resp is nil.
- //
- // ErrTimeout is returned if the response wasn't returned until
- // the given deadline.
- // Immediately returns ErrTimeout if the deadline has already been reached.
- //
- // ErrNoFreeConns is returned if all HostClient.MaxConns connections
- // to the host are busy.
- //
- // It is recommended obtaining req and resp via AcquireRequest
- // and AcquireResponse in performance-critical code.
- func (c *HostClient) DoDeadline(req *Request, resp *Response, deadline time.Time) error {
- req.timeout = time.Until(deadline)
- if req.timeout <= 0 {
- return ErrTimeout
- }
- return c.Do(req, resp)
- }
- // DoRedirects performs the given http request and fills the given http response,
- // following up to maxRedirectsCount redirects. When the redirect count exceeds
- // maxRedirectsCount, ErrTooManyRedirects is returned.
- //
- // Request must contain at least non-zero RequestURI with full url (including
- // scheme and host) or non-zero Host header + RequestURI.
- //
- // Client determines the server to be requested in the following order:
- //
- // - from RequestURI if it contains full url with scheme and host;
- // - from Host header otherwise.
- //
- // Response is ignored if resp is nil.
- //
- // ErrNoFreeConns is returned if all DefaultMaxConnsPerHost connections
- // to the requested host are busy.
- //
- // It is recommended obtaining req and resp via AcquireRequest
- // and AcquireResponse in performance-critical code.
- func (c *HostClient) DoRedirects(req *Request, resp *Response, maxRedirectsCount int) error {
- if c.DisablePathNormalizing {
- req.URI().DisablePathNormalizing = true
- }
- _, _, err := doRequestFollowRedirects(req, resp, req.URI().String(), maxRedirectsCount, c)
- return err
- }
- // Do performs the given http request and sets the corresponding response.
- //
- // Request must contain at least non-zero RequestURI with full url (including
- // scheme and host) or non-zero Host header + RequestURI.
- //
- // The function doesn't follow redirects. Use Get* for following redirects.
- //
- // Response is ignored if resp is nil.
- //
- // ErrNoFreeConns is returned if all HostClient.MaxConns connections
- // to the host are busy.
- //
- // It is recommended obtaining req and resp via AcquireRequest
- // and AcquireResponse in performance-critical code.
- func (c *HostClient) Do(req *Request, resp *Response) error {
- var (
- err error
- retry bool
- resetTimeout bool
- )
- maxAttempts := c.MaxIdemponentCallAttempts
- if maxAttempts <= 0 {
- maxAttempts = DefaultMaxIdemponentCallAttempts
- }
- attempts := 0
- hasBodyStream := req.IsBodyStream()
- // If a request has a timeout we store the timeout
- // and calculate a deadline so we can keep updating the
- // timeout on each retry.
- deadline := time.Time{}
- timeout := req.timeout
- if timeout > 0 {
- deadline = time.Now().Add(timeout)
- }
- retryFunc := c.RetryIf
- if retryFunc == nil {
- retryFunc = isIdempotent
- }
- atomic.AddInt32(&c.pendingRequests, 1)
- for {
- // If the original timeout was set, we need to update
- // the one set on the request to reflect the remaining time.
- if timeout > 0 {
- req.timeout = time.Until(deadline)
- if req.timeout <= 0 {
- err = ErrTimeout
- break
- }
- }
- retry, err = c.do(req, resp)
- if err == nil || !retry {
- break
- }
- if hasBodyStream {
- break
- }
- // Path prioritization based on ease of computation
- attempts++
- if attempts >= maxAttempts {
- break
- }
- if c.RetryIfErr != nil {
- resetTimeout, retry = c.RetryIfErr(req, attempts, err)
- } else {
- retry = retryFunc(req)
- }
- if !retry {
- break
- }
- if timeout > 0 && resetTimeout {
- deadline = time.Now().Add(timeout)
- }
- }
- atomic.AddInt32(&c.pendingRequests, -1)
- // Restore the original timeout.
- req.timeout = timeout
- if err == io.EOF {
- err = ErrConnectionClosed
- }
- return err
- }
- // PendingRequests returns the current number of requests the client
- // is executing.
- //
- // This function may be used for balancing load among multiple HostClient
- // instances.
- func (c *HostClient) PendingRequests() int {
- return int(atomic.LoadInt32(&c.pendingRequests))
- }
- func isIdempotent(req *Request) bool {
- return req.Header.IsGet() || req.Header.IsHead() || req.Header.IsPut()
- }
- func (c *HostClient) do(req *Request, resp *Response) (bool, error) {
- if resp == nil {
- resp = AcquireResponse()
- defer ReleaseResponse(resp)
- }
- return c.doNonNilReqResp(req, resp)
- }
- func (c *HostClient) doNonNilReqResp(req *Request, resp *Response) (bool, error) {
- if req == nil {
- // for debugging purposes
- panic("BUG: req cannot be nil")
- }
- if resp == nil {
- // for debugging purposes
- panic("BUG: resp cannot be nil")
- }
- // Secure header error logs configuration
- resp.secureErrorLogMessage = c.SecureErrorLogMessage
- resp.Header.secureErrorLogMessage = c.SecureErrorLogMessage
- req.secureErrorLogMessage = c.SecureErrorLogMessage
- req.Header.secureErrorLogMessage = c.SecureErrorLogMessage
- if c.IsTLS != req.URI().isHTTPS() {
- return false, ErrHostClientRedirectToDifferentScheme
- }
- atomic.StoreUint32(&c.lastUseTime, uint32(time.Now().Unix()-startTimeUnix)) // #nosec G115
- // Free up resources occupied by response before sending the request,
- // so the GC may reclaim these resources (e.g. response body).
- // backing up SkipBody in case it was set explicitly
- customSkipBody := resp.SkipBody
- customStreamBody := resp.StreamBody || c.StreamResponseBody
- resp.Reset()
- resp.SkipBody = customSkipBody
- resp.StreamBody = customStreamBody
- req.URI().DisablePathNormalizing = c.DisablePathNormalizing
- userAgentOld := req.Header.UserAgent()
- if len(userAgentOld) == 0 {
- userAgent := c.Name
- if userAgent == "" && !c.NoDefaultUserAgentHeader {
- userAgent = defaultUserAgent
- }
- if userAgent != "" {
- req.Header.userAgent = append(req.Header.userAgent[:0], userAgent...)
- }
- }
- return c.transport().RoundTrip(c, req, resp)
- }
- func (c *HostClient) transport() RoundTripper {
- if c.Transport == nil {
- return DefaultTransport
- }
- return c.Transport
- }
- var (
- // ErrNoFreeConns is returned when no free connections available
- // to the given host.
- //
- // Increase the allowed number of connections per host if you
- // see this error.
- ErrNoFreeConns = errors.New("no free connections available to host")
- // ErrConnectionClosed may be returned from client methods if the server
- // closes connection before returning the first response byte.
- //
- // If you see this error, then either fix the server by returning
- // 'Connection: close' response header before closing the connection
- // or add 'Connection: close' request header before sending requests
- // to broken server.
- ErrConnectionClosed = errors.New("the server closed connection before returning the first response byte. " +
- "Make sure the server returns 'Connection: close' response header before closing the connection")
- // ErrConnPoolStrategyNotImpl is returned when HostClient.ConnPoolStrategy is not implement yet.
- // If you see this error, then you need to check your HostClient configuration.
- ErrConnPoolStrategyNotImpl = errors.New("connection pool strategy is not implement")
- )
- type timeoutError struct{}
- func (e *timeoutError) Error() string {
- return "timeout"
- }
- // Only implement the Timeout() function of the net.Error interface.
- // This allows for checks like:
- //
- // if x, ok := err.(interface{ Timeout() bool }); ok && x.Timeout() {
- func (e *timeoutError) Timeout() bool {
- return true
- }
- // ErrTimeout is returned from timed out calls.
- var ErrTimeout = &timeoutError{}
- // SetMaxConns sets up the maximum number of connections which may be established to all hosts listed in Addr.
- func (c *HostClient) SetMaxConns(newMaxConns int) {
- c.connsLock.Lock()
- c.MaxConns = newMaxConns
- c.connsLock.Unlock()
- }
- func (c *HostClient) AcquireConn(reqTimeout time.Duration, connectionClose bool) (cc *clientConn, err error) {
- createConn := false
- startCleaner := false
- var n int
- c.connsLock.Lock()
- n = len(c.conns)
- if n == 0 {
- maxConns := c.MaxConns
- if maxConns <= 0 {
- maxConns = DefaultMaxConnsPerHost
- }
- if c.connsCount < maxConns {
- c.connsCount++
- createConn = true
- if !c.connsCleanerRun && !connectionClose {
- startCleaner = true
- c.connsCleanerRun = true
- }
- }
- } else {
- switch c.ConnPoolStrategy {
- case LIFO:
- n--
- cc = c.conns[n]
- c.conns[n] = nil
- c.conns = c.conns[:n]
- case FIFO:
- cc = c.conns[0]
- copy(c.conns, c.conns[1:])
- c.conns[n-1] = nil
- c.conns = c.conns[:n-1]
- default:
- c.connsLock.Unlock()
- return nil, ErrConnPoolStrategyNotImpl
- }
- }
- c.connsLock.Unlock()
- if cc != nil {
- return cc, nil
- }
- if !createConn {
- if c.MaxConnWaitTimeout <= 0 {
- return nil, ErrNoFreeConns
- }
- //nolint:dupword
- // reqTimeout c.MaxConnWaitTimeout wait duration
- // d1 d2 min(d1, d2)
- // 0(not set) d2 d2
- // d1 0(don't wait) 0(don't wait)
- // 0(not set) d2 d2
- timeout := c.MaxConnWaitTimeout
- timeoutOverridden := false
- // reqTimeout == 0 means not set
- if reqTimeout > 0 && reqTimeout < timeout {
- timeout = reqTimeout
- timeoutOverridden = true
- }
- // wait for a free connection
- tc := AcquireTimer(timeout)
- defer ReleaseTimer(tc)
- w := &wantConn{
- ready: make(chan struct{}, 1),
- }
- defer func() {
- if err != nil {
- w.cancel(c, err)
- }
- }()
- c.queueForIdle(w)
- select {
- case <-w.ready:
- return w.conn, w.err
- case <-tc.C:
- c.connsWait.failedWaiters.Add(1)
- if timeoutOverridden {
- return nil, ErrTimeout
- }
- return nil, ErrNoFreeConns
- }
- }
- if startCleaner {
- go c.connsCleaner()
- }
- conn, err := c.dialHostHard(reqTimeout)
- if err != nil {
- c.decConnsCount()
- return nil, err
- }
- cc = acquireClientConn(conn)
- return cc, nil
- }
- func (c *HostClient) queueForIdle(w *wantConn) {
- c.connsLock.Lock()
- defer c.connsLock.Unlock()
- if c.connsWait == nil {
- c.connsWait = &wantConnQueue{}
- }
- c.connsWait.clearFront()
- c.connsWait.pushBack(w)
- }
- func (c *HostClient) dialConnFor(w *wantConn) {
- conn, err := c.dialHostHard(0)
- if err != nil {
- w.tryDeliver(nil, err)
- c.decConnsCount()
- return
- }
- cc := acquireClientConn(conn)
- if !w.tryDeliver(cc, nil) {
- // not delivered, return idle connection
- c.ReleaseConn(cc)
- }
- }
- // CloseIdleConnections closes any connections which were previously
- // connected from previous requests but are now sitting idle in a
- // "keep-alive" state. It does not interrupt any connections currently
- // in use.
- func (c *HostClient) CloseIdleConnections() {
- c.connsLock.Lock()
- scratch := append([]*clientConn{}, c.conns...)
- for i := range c.conns {
- c.conns[i] = nil
- }
- c.conns = c.conns[:0]
- c.connsLock.Unlock()
- for _, cc := range scratch {
- c.CloseConn(cc)
- }
- }
- func (c *HostClient) connsCleaner() {
- var (
- scratch []*clientConn
- maxIdleConnDuration = c.MaxIdleConnDuration
- )
- if maxIdleConnDuration <= 0 {
- maxIdleConnDuration = DefaultMaxIdleConnDuration
- }
- for {
- currentTime := time.Now()
- // Determine idle connections to be closed.
- c.connsLock.Lock()
- conns := c.conns
- n := len(conns)
- i := 0
- for i < n && currentTime.Sub(conns[i].lastUseTime) > maxIdleConnDuration {
- i++
- }
- sleepFor := maxIdleConnDuration
- if i < n {
- // + 1 so we actually sleep past the expiration time and not up to it.
- // Otherwise the > check above would still fail.
- sleepFor = maxIdleConnDuration - currentTime.Sub(conns[i].lastUseTime) + 1
- }
- scratch = append(scratch[:0], conns[:i]...)
- if i > 0 {
- m := copy(conns, conns[i:])
- for i = m; i < n; i++ {
- conns[i] = nil
- }
- c.conns = conns[:m]
- }
- c.connsLock.Unlock()
- // Close idle connections.
- for i, cc := range scratch {
- c.CloseConn(cc)
- scratch[i] = nil
- }
- // Determine whether to stop the connsCleaner.
- c.connsLock.Lock()
- mustStop := c.connsCount == 0
- if mustStop {
- c.connsCleanerRun = false
- }
- c.connsLock.Unlock()
- if mustStop {
- break
- }
- time.Sleep(sleepFor)
- }
- }
- func (c *HostClient) CloseConn(cc *clientConn) {
- c.decConnsCount()
- cc.c.Close()
- releaseClientConn(cc)
- }
- func (c *HostClient) decConnsCount() {
- if c.MaxConnWaitTimeout <= 0 {
- c.connsLock.Lock()
- c.connsCount--
- c.connsLock.Unlock()
- return
- }
- c.connsLock.Lock()
- defer c.connsLock.Unlock()
- dialed := false
- if q := c.connsWait; q != nil && q.len() > 0 {
- for q.len() > 0 {
- w := q.popFront()
- if w.waiting() {
- go c.dialConnFor(w)
- dialed = true
- break
- }
- c.connsWait.failedWaiters.Add(-1)
- }
- }
- if !dialed {
- c.connsCount--
- }
- }
- // ConnsCount returns connection count of HostClient.
- func (c *HostClient) ConnsCount() int {
- c.connsLock.Lock()
- defer c.connsLock.Unlock()
- return c.connsCount
- }
- func acquireClientConn(conn net.Conn) *clientConn {
- v := clientConnPool.Get()
- if v == nil {
- v = &clientConn{}
- }
- cc := v.(*clientConn)
- cc.c = conn
- cc.createdTime = time.Now()
- return cc
- }
- func releaseClientConn(cc *clientConn) {
- // Reset all fields.
- *cc = clientConn{}
- clientConnPool.Put(cc)
- }
- var clientConnPool sync.Pool
- func (c *HostClient) ReleaseConn(cc *clientConn) {
- cc.lastUseTime = time.Now()
- if c.MaxConnWaitTimeout <= 0 {
- c.connsLock.Lock()
- c.conns = append(c.conns, cc)
- c.connsLock.Unlock()
- return
- }
- // try to deliver an idle connection to a *wantConn
- c.connsLock.Lock()
- defer c.connsLock.Unlock()
- delivered := false
- if q := c.connsWait; q != nil && q.len() > 0 {
- for q.len() > 0 {
- w := q.popFront()
- if w.waiting() {
- delivered = w.tryDeliver(cc, nil)
- // This is the last resort to hand over conCount sema.
- // We must ensure that there are no valid waiters in connsWait
- // when we exit this loop.
- //
- // We did not apply the same looping pattern in the decConnsCount
- // method because it needs to create a new time-spent connection,
- // and the decConnsCount call chain will inevitably reach this point.
- // When MaxConnWaitTimeout>0.
- if delivered {
- break
- }
- }
- c.connsWait.failedWaiters.Add(-1)
- }
- }
- if !delivered {
- c.conns = append(c.conns, cc)
- }
- }
- func (c *HostClient) AcquireWriter(conn net.Conn) *bufio.Writer {
- var v any
- if c.clientWriterPool != nil {
- v = c.clientWriterPool.Get()
- if v == nil {
- n := c.WriteBufferSize
- if n <= 0 {
- n = defaultWriteBufferSize
- }
- return bufio.NewWriterSize(conn, n)
- }
- } else {
- v = c.writerPool.Get()
- if v == nil {
- n := c.WriteBufferSize
- if n <= 0 {
- n = defaultWriteBufferSize
- }
- return bufio.NewWriterSize(conn, n)
- }
- }
- bw := v.(*bufio.Writer)
- bw.Reset(conn)
- return bw
- }
- func (c *HostClient) ReleaseWriter(bw *bufio.Writer) {
- if c.clientWriterPool != nil {
- c.clientWriterPool.Put(bw)
- } else {
- c.writerPool.Put(bw)
- }
- }
- func (c *HostClient) AcquireReader(conn net.Conn) *bufio.Reader {
- var v any
- if c.clientReaderPool != nil {
- v = c.clientReaderPool.Get()
- if v == nil {
- n := c.ReadBufferSize
- if n <= 0 {
- n = defaultReadBufferSize
- }
- return bufio.NewReaderSize(conn, n)
- }
- } else {
- v = c.readerPool.Get()
- if v == nil {
- n := c.ReadBufferSize
- if n <= 0 {
- n = defaultReadBufferSize
- }
- return bufio.NewReaderSize(conn, n)
- }
- }
- br := v.(*bufio.Reader)
- br.Reset(conn)
- return br
- }
- func (c *HostClient) ReleaseReader(br *bufio.Reader) {
- if c.clientReaderPool != nil {
- c.clientReaderPool.Put(br)
- } else {
- c.readerPool.Put(br)
- }
- }
- func newClientTLSConfig(c *tls.Config, addr string) *tls.Config {
- if c == nil {
- c = &tls.Config{}
- } else {
- c = c.Clone()
- }
- if c.ServerName == "" {
- serverName := tlsServerName(addr)
- if serverName == "*" {
- c.InsecureSkipVerify = true
- } else {
- c.ServerName = serverName
- }
- }
- return c
- }
- func tlsServerName(addr string) string {
- if !strings.Contains(addr, ":") {
- return addr
- }
- host, _, err := net.SplitHostPort(addr)
- if err != nil {
- return "*"
- }
- return host
- }
- func (c *HostClient) nextAddr() string {
- c.addrsLock.Lock()
- if c.addrs == nil {
- c.addrs = strings.Split(c.Addr, ",")
- }
- addr := c.addrs[0]
- if len(c.addrs) > 1 {
- addr = c.addrs[c.addrIdx%uint32(len(c.addrs))] // #nosec G115
- c.addrIdx++
- }
- c.addrsLock.Unlock()
- return addr
- }
- func (c *HostClient) dialHostHard(dialTimeout time.Duration) (conn net.Conn, err error) {
- // use dialTimeout to control the timeout of each dial. It does not work if dialTimeout is 0 or if
- // c.DialTimeout has not been set and c.Dial has been set.
- // attempt to dial all the available hosts before giving up.
- c.addrsLock.Lock()
- n := len(c.addrs)
- c.addrsLock.Unlock()
- if n == 0 {
- // It looks like c.addrs isn't initialized yet.
- n = 1
- }
- timeout := c.ReadTimeout + c.WriteTimeout
- if timeout <= 0 {
- timeout = DefaultDialTimeout
- }
- deadline := time.Now().Add(timeout)
- for n > 0 {
- addr := c.nextAddr()
- tlsConfig := c.cachedTLSConfig(addr)
- conn, err = dialAddr(addr, c.Dial, c.DialTimeout, c.DialDualStack, c.IsTLS, tlsConfig, dialTimeout, c.WriteTimeout)
- if err == nil {
- return conn, nil
- }
- if time.Since(deadline) >= 0 {
- break
- }
- n--
- }
- return nil, err
- }
- func (c *HostClient) cachedTLSConfig(addr string) *tls.Config {
- if !c.IsTLS {
- return nil
- }
- c.tlsConfigMapLock.Lock()
- if c.tlsConfigMap == nil {
- c.tlsConfigMap = make(map[string]*tls.Config)
- }
- cfg := c.tlsConfigMap[addr]
- if cfg == nil {
- cfg = newClientTLSConfig(c.TLSConfig, addr)
- c.tlsConfigMap[addr] = cfg
- }
- c.tlsConfigMapLock.Unlock()
- return cfg
- }
- // ErrTLSHandshakeTimeout indicates there is a timeout from tls handshake.
- var ErrTLSHandshakeTimeout = errors.New("tls handshake timed out")
- func tlsClientHandshake(rawConn net.Conn, tlsConfig *tls.Config, deadline time.Time) (_ net.Conn, retErr error) {
- defer func() {
- if retErr != nil {
- rawConn.Close()
- }
- }()
- conn := tls.Client(rawConn, tlsConfig)
- err := conn.SetDeadline(deadline)
- if err != nil {
- return nil, err
- }
- err = conn.Handshake()
- if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
- return nil, ErrTLSHandshakeTimeout
- }
- if err != nil {
- return nil, err
- }
- err = conn.SetDeadline(time.Time{})
- if err != nil {
- return nil, err
- }
- return conn, nil
- }
- func dialAddr(
- addr string, dial DialFunc, dialWithTimeout DialFuncWithTimeout, dialDualStack, isTLS bool,
- tlsConfig *tls.Config, dialTimeout, writeTimeout time.Duration,
- ) (net.Conn, error) {
- deadline := time.Now().Add(writeTimeout)
- conn, err := callDialFunc(addr, dial, dialWithTimeout, dialDualStack, isTLS, dialTimeout)
- if err != nil {
- return nil, err
- }
- if conn == nil {
- return nil, errors.New("dialling unsuccessful. Please report this bug")
- }
- // We assume that any conn that has the Handshake() method is a TLS conn already.
- // This doesn't cover just tls.Conn but also other TLS implementations.
- _, isTLSAlready := conn.(interface{ Handshake() error })
- if isTLS && !isTLSAlready {
- if writeTimeout == 0 {
- return tls.Client(conn, tlsConfig), nil
- }
- return tlsClientHandshake(conn, tlsConfig, deadline)
- }
- return conn, nil
- }
- func callDialFunc(
- addr string, dial DialFunc, dialWithTimeout DialFuncWithTimeout, dialDualStack, isTLS bool, timeout time.Duration,
- ) (net.Conn, error) {
- if dialWithTimeout != nil {
- return dialWithTimeout(addr, timeout)
- }
- if dial != nil {
- return dial(addr)
- }
- addr = AddMissingPort(addr, isTLS)
- if timeout > 0 {
- if dialDualStack {
- return DialDualStackTimeout(addr, timeout)
- }
- return DialTimeout(addr, timeout)
- }
- if dialDualStack {
- return DialDualStack(addr)
- }
- return Dial(addr)
- }
- // AddMissingPort adds a port to a host if it is missing.
- // A literal IPv6 address in hostport must be enclosed in square
- // brackets, as in "[::1]:80", "[::1%lo0]:80".
- func AddMissingPort(addr string, isTLS bool) string {
- addrLen := len(addr)
- if addrLen == 0 {
- return addr
- }
- isIP6 := addr[0] == '['
- if isIP6 {
- // if the IPv6 has opening bracket but closing bracket is the last char then it doesn't have a port
- isIP6WithoutPort := addr[addrLen-1] == ']'
- if !isIP6WithoutPort {
- return addr
- }
- } else { // IPv4
- columnPos := strings.LastIndexByte(addr, ':')
- if columnPos > 0 {
- return addr
- }
- }
- port := ":80"
- if isTLS {
- port = ":443"
- }
- return addr + port
- }
- // A wantConn records state about a wanted connection
- // (that is, an active call to getConn).
- // The conn may be gotten by dialing or by finding an idle connection,
- // or a cancellation may make the conn no longer wanted.
- // These three options are racing against each other and use
- // wantConn to coordinate and agree about the winning outcome.
- //
- // Inspired by net/http/transport.go.
- type wantConn struct {
- err error
- ready chan struct{}
- conn *clientConn
- mu sync.Mutex // protects conn, err, close(ready)
- }
- // waiting reports whether w is still waiting for an answer (connection or error).
- func (w *wantConn) waiting() bool {
- select {
- case <-w.ready:
- return false
- default:
- return true
- }
- }
- // tryDeliver attempts to deliver conn, err to w and reports whether it succeeded.
- func (w *wantConn) tryDeliver(conn *clientConn, err error) bool {
- w.mu.Lock()
- defer w.mu.Unlock()
- if w.conn != nil || w.err != nil {
- return false
- }
- w.conn = conn
- w.err = err
- if w.conn == nil && w.err == nil {
- panic("fasthttp: internal error: misuse of tryDeliver")
- }
- close(w.ready)
- return true
- }
- // cancel marks w as no longer wanting a result (for example, due to cancellation).
- // If a connection has been delivered already, cancel returns it with c.releaseConn.
- func (w *wantConn) cancel(c *HostClient, err error) {
- w.mu.Lock()
- if w.conn == nil && w.err == nil {
- close(w.ready) // catch misbehavior in future delivery
- }
- conn := w.conn
- w.conn = nil
- w.err = err
- w.mu.Unlock()
- if conn != nil {
- c.ReleaseConn(conn)
- }
- }
- // A wantConnQueue is a queue of wantConns.
- //
- // Inspired by net/http/transport.go.
- type wantConnQueue struct {
- // This is a queue, not a dequeue.
- // It is split into two stages - head[headPos:] and tail.
- // popFront is trivial (headPos++) on the first stage, and
- // pushBack is trivial (append) on the second stage.
- // If the first stage is empty, popFront can swap the
- // first and second stages to remedy the situation.
- //
- // This two-stage split is analogous to the use of two lists
- // in Okasaki's purely functional queue but without the
- // overhead of reversing the list when swapping stages.
- head []*wantConn
- tail []*wantConn
- headPos int
- // failedWaiters is the number of waiters in the head or tail queue,
- // but is invalid.
- // These state waiters cannot truly be considered as waiters; the current
- // implementation does not immediately remove them when they become
- // invalid but instead only marks them.
- failedWaiters atomic.Int64
- }
- // len returns the number of items in the queue.
- func (q *wantConnQueue) len() int {
- return len(q.head) - q.headPos + len(q.tail) - int(q.failedWaiters.Load())
- }
- // pushBack adds w to the back of the queue.
- func (q *wantConnQueue) pushBack(w *wantConn) {
- q.tail = append(q.tail, w)
- }
- // popFront removes and returns the wantConn at the front of the queue.
- func (q *wantConnQueue) popFront() *wantConn {
- if q.headPos >= len(q.head) {
- if len(q.tail) == 0 {
- return nil
- }
- // Pick up tail as new head, clear tail.
- q.head, q.headPos, q.tail = q.tail, 0, q.head[:0]
- }
- w := q.head[q.headPos]
- q.head[q.headPos] = nil
- q.headPos++
- return w
- }
- // peekFront returns the wantConn at the front of the queue without removing it.
- func (q *wantConnQueue) peekFront() *wantConn {
- if q.headPos < len(q.head) {
- return q.head[q.headPos]
- }
- if len(q.tail) > 0 {
- return q.tail[0]
- }
- return nil
- }
- // clearFront pops any wantConns that are no longer waiting from the head of the
- // queue, reporting whether any were popped.
- func (q *wantConnQueue) clearFront() (cleaned bool) {
- for {
- w := q.peekFront()
- if w == nil || w.waiting() {
- return cleaned
- }
- q.popFront()
- q.failedWaiters.Add(-1)
- cleaned = true
- }
- }
- // PipelineClient pipelines requests over a limited set of concurrent
- // connections to the given Addr.
- //
- // This client may be used in highly loaded HTTP-based RPC systems for reducing
- // context switches and network level overhead.
- // See https://en.wikipedia.org/wiki/HTTP_pipelining for details.
- //
- // It is forbidden copying PipelineClient instances. Create new instances
- // instead.
- //
- // It is safe calling PipelineClient methods from concurrently running
- // goroutines.
- type PipelineClient struct {
- noCopy noCopy
- // Logger for logging client errors.
- //
- // By default standard logger from log package is used.
- Logger Logger
- // Callback for connection establishing to the host.
- //
- // Default Dial is used if not set.
- Dial DialFunc
- // Optional TLS config.
- TLSConfig *tls.Config
- // Address of the host to connect to.
- Addr string
- // PipelineClient name. Used in User-Agent request header.
- Name string
- connClients []*pipelineConnClient
- // The maximum number of concurrent connections to the Addr.
- //
- // A single connection is used by default.
- MaxConns int
- // The maximum number of pending pipelined requests over
- // a single connection to Addr.
- //
- // DefaultMaxPendingRequests is used by default.
- MaxPendingRequests int
- // The maximum delay before sending pipelined requests as a batch
- // to the server.
- //
- // By default requests are sent immediately to the server.
- MaxBatchDelay time.Duration
- // Idle connection to the host is closed after this duration.
- //
- // By default idle connection is closed after
- // DefaultMaxIdleConnDuration.
- MaxIdleConnDuration time.Duration
- // Buffer size for responses' reading.
- // This also limits the maximum header size.
- //
- // Default buffer size is used if 0.
- ReadBufferSize int
- // Buffer size for requests' writing.
- //
- // Default buffer size is used if 0.
- WriteBufferSize int
- // Maximum duration for full response reading (including body).
- //
- // By default response read timeout is unlimited.
- ReadTimeout time.Duration
- // Maximum duration for full request writing (including body).
- //
- // By default request write timeout is unlimited.
- WriteTimeout time.Duration
- connClientsLock sync.Mutex
- // NoDefaultUserAgentHeader when set to true, causes the default
- // User-Agent header to be excluded from the Request.
- NoDefaultUserAgentHeader bool
- // Attempt to connect to both ipv4 and ipv6 host addresses
- // if set to true.
- //
- // This option is used only if default TCP dialer is used,
- // i.e. if Dial is blank.
- //
- // By default client connects only to ipv4 addresses,
- // since unfortunately ipv6 remains broken in many networks worldwide :)
- DialDualStack bool
- // Response header names are passed as-is without normalization
- // if this option is set.
- //
- // Disabled header names' normalization may be useful only for proxying
- // responses to other clients expecting case-sensitive
- // header names. See https://github.com/valyala/fasthttp/issues/57
- // for details.
- //
- // By default request and response header names are normalized, i.e.
- // The first letter and the first letters following dashes
- // are uppercased, while all the other letters are lowercased.
- // Examples:
- //
- // * HOST -> Host
- // * content-type -> Content-Type
- // * cONTENT-lenGTH -> Content-Length
- DisableHeaderNamesNormalizing bool
- // Path values are sent as-is without normalization
- //
- // Disabled path normalization may be useful for proxying incoming requests
- // to servers that are expecting paths to be forwarded as-is.
- //
- // By default path values are normalized, i.e.
- // extra slashes are removed, special characters are encoded.
- DisablePathNormalizing bool
- // Whether to use TLS (aka SSL or HTTPS) for host connections.
- IsTLS bool
- }
- type pipelineConnClient struct {
- noCopy noCopy
- workPool sync.Pool
- Logger Logger
- Dial DialFunc
- TLSConfig *tls.Config
- chW chan *pipelineWork
- chR chan *pipelineWork
- tlsConfig *tls.Config
- Addr string
- Name string
- MaxPendingRequests int
- MaxBatchDelay time.Duration
- MaxIdleConnDuration time.Duration
- ReadBufferSize int
- WriteBufferSize int
- ReadTimeout time.Duration
- WriteTimeout time.Duration
- chLock sync.Mutex
- tlsConfigLock sync.Mutex
- NoDefaultUserAgentHeader bool
- DialDualStack bool
- DisableHeaderNamesNormalizing bool
- DisablePathNormalizing bool
- IsTLS bool
- }
- type pipelineWork struct {
- respCopy Response
- deadline time.Time
- err error
- req *Request
- resp *Response
- t *time.Timer
- done chan struct{}
- reqCopy Request
- }
- // DoTimeout performs the given request and waits for response during
- // the given timeout duration.
- //
- // Request must contain at least non-zero RequestURI with full url (including
- // scheme and host) or non-zero Host header + RequestURI.
- //
- // The function doesn't follow redirects.
- //
- // Response is ignored if resp is nil.
- //
- // ErrTimeout is returned if the response wasn't returned during
- // the given timeout.
- //
- // It is recommended obtaining req and resp via AcquireRequest
- // and AcquireResponse in performance-critical code.
- func (c *PipelineClient) DoTimeout(req *Request, resp *Response, timeout time.Duration) error {
- return c.DoDeadline(req, resp, time.Now().Add(timeout))
- }
- // DoDeadline performs the given request and waits for response until
- // the given deadline.
- //
- // Request must contain at least non-zero RequestURI with full url (including
- // scheme and host) or non-zero Host header + RequestURI.
- //
- // The function doesn't follow redirects.
- //
- // Response is ignored if resp is nil.
- //
- // ErrTimeout is returned if the response wasn't returned until
- // the given deadline.
- //
- // It is recommended obtaining req and resp via AcquireRequest
- // and AcquireResponse in performance-critical code.
- func (c *PipelineClient) DoDeadline(req *Request, resp *Response, deadline time.Time) error {
- return c.getConnClient().DoDeadline(req, resp, deadline)
- }
- func (c *pipelineConnClient) DoDeadline(req *Request, resp *Response, deadline time.Time) error {
- c.init()
- timeout := time.Until(deadline)
- if timeout <= 0 {
- return ErrTimeout
- }
- if c.DisablePathNormalizing {
- req.URI().DisablePathNormalizing = true
- }
- userAgentOld := req.Header.UserAgent()
- if len(userAgentOld) == 0 {
- userAgent := c.Name
- if userAgent == "" && !c.NoDefaultUserAgentHeader {
- userAgent = defaultUserAgent
- }
- if userAgent != "" {
- req.Header.userAgent = append(req.Header.userAgent[:0], userAgent...)
- }
- }
- w := c.acquirePipelineWork(timeout)
- w.respCopy.Header.disableNormalizing = c.DisableHeaderNamesNormalizing
- w.req = &w.reqCopy
- w.resp = &w.respCopy
- // Make a copy of the request in order to avoid data races on timeouts
- req.copyToSkipBody(&w.reqCopy)
- swapRequestBody(req, &w.reqCopy)
- // Put the request to outgoing queue
- select {
- case c.chW <- w:
- // Fast path: len(c.ch) < cap(c.ch)
- default:
- // Slow path
- select {
- case c.chW <- w:
- case <-w.t.C:
- c.releasePipelineWork(w)
- return ErrTimeout
- }
- }
- // Wait for the response
- var err error
- select {
- case <-w.done:
- if resp != nil {
- w.respCopy.copyToSkipBody(resp)
- swapResponseBody(resp, &w.respCopy)
- }
- err = w.err
- c.releasePipelineWork(w)
- case <-w.t.C:
- err = ErrTimeout
- }
- return err
- }
- func (c *pipelineConnClient) acquirePipelineWork(timeout time.Duration) (w *pipelineWork) {
- v := c.workPool.Get()
- if v != nil {
- w = v.(*pipelineWork)
- } else {
- w = &pipelineWork{
- done: make(chan struct{}, 1),
- }
- }
- if timeout > 0 {
- if w.t == nil {
- w.t = time.NewTimer(timeout)
- } else {
- w.t.Reset(timeout)
- }
- w.deadline = time.Now().Add(timeout)
- } else {
- w.deadline = zeroTime
- }
- return w
- }
- func (c *pipelineConnClient) releasePipelineWork(w *pipelineWork) {
- if w.t != nil {
- w.t.Stop()
- }
- w.reqCopy.Reset()
- w.respCopy.Reset()
- w.req = nil
- w.resp = nil
- w.err = nil
- c.workPool.Put(w)
- }
- // Do performs the given http request and sets the corresponding response.
- //
- // Request must contain at least non-zero RequestURI with full url (including
- // scheme and host) or non-zero Host header + RequestURI.
- //
- // The function doesn't follow redirects. Use Get* for following redirects.
- //
- // Response is ignored if resp is nil.
- //
- // It is recommended obtaining req and resp via AcquireRequest
- // and AcquireResponse in performance-critical code.
- func (c *PipelineClient) Do(req *Request, resp *Response) error {
- return c.getConnClient().Do(req, resp)
- }
- func (c *pipelineConnClient) Do(req *Request, resp *Response) error {
- c.init()
- if c.DisablePathNormalizing {
- req.URI().DisablePathNormalizing = true
- }
- userAgentOld := req.Header.UserAgent()
- if len(userAgentOld) == 0 {
- userAgent := c.Name
- if userAgent == "" && !c.NoDefaultUserAgentHeader {
- userAgent = defaultUserAgent
- }
- if userAgent != "" {
- req.Header.userAgent = append(req.Header.userAgent[:0], userAgent...)
- }
- }
- w := c.acquirePipelineWork(0)
- w.req = req
- if resp != nil {
- resp.Header.disableNormalizing = c.DisableHeaderNamesNormalizing
- w.resp = resp
- } else {
- w.resp = &w.respCopy
- }
- // Put the request to outgoing queue
- select {
- case c.chW <- w:
- default:
- // Try substituting the oldest w with the current one.
- select {
- case wOld := <-c.chW:
- wOld.err = ErrPipelineOverflow
- wOld.done <- struct{}{}
- default:
- }
- select {
- case c.chW <- w:
- default:
- c.releasePipelineWork(w)
- return ErrPipelineOverflow
- }
- }
- // Wait for the response
- <-w.done
- err := w.err
- c.releasePipelineWork(w)
- return err
- }
- func (c *PipelineClient) getConnClient() *pipelineConnClient {
- c.connClientsLock.Lock()
- cc := c.getConnClientUnlocked()
- c.connClientsLock.Unlock()
- return cc
- }
- func (c *PipelineClient) getConnClientUnlocked() *pipelineConnClient {
- if len(c.connClients) == 0 {
- return c.newConnClient()
- }
- // Return the client with the minimum number of pending requests.
- minCC := c.connClients[0]
- minReqs := minCC.PendingRequests()
- if minReqs == 0 {
- return minCC
- }
- for i := 1; i < len(c.connClients); i++ {
- cc := c.connClients[i]
- reqs := cc.PendingRequests()
- if reqs == 0 {
- return cc
- }
- if reqs < minReqs {
- minCC = cc
- minReqs = reqs
- }
- }
- maxConns := c.MaxConns
- if maxConns <= 0 {
- maxConns = 1
- }
- if len(c.connClients) < maxConns {
- return c.newConnClient()
- }
- return minCC
- }
- func (c *PipelineClient) newConnClient() *pipelineConnClient {
- cc := &pipelineConnClient{
- Addr: c.Addr,
- Name: c.Name,
- NoDefaultUserAgentHeader: c.NoDefaultUserAgentHeader,
- MaxPendingRequests: c.MaxPendingRequests,
- MaxBatchDelay: c.MaxBatchDelay,
- Dial: c.Dial,
- DialDualStack: c.DialDualStack,
- DisableHeaderNamesNormalizing: c.DisableHeaderNamesNormalizing,
- DisablePathNormalizing: c.DisablePathNormalizing,
- IsTLS: c.IsTLS,
- TLSConfig: c.TLSConfig,
- MaxIdleConnDuration: c.MaxIdleConnDuration,
- ReadBufferSize: c.ReadBufferSize,
- WriteBufferSize: c.WriteBufferSize,
- ReadTimeout: c.ReadTimeout,
- WriteTimeout: c.WriteTimeout,
- Logger: c.Logger,
- }
- c.connClients = append(c.connClients, cc)
- return cc
- }
- // ErrPipelineOverflow may be returned from PipelineClient.Do*
- // if the requests' queue is overflowed.
- var ErrPipelineOverflow = errors.New("pipelined requests' queue has been overflowed. Increase MaxConns and/or MaxPendingRequests")
- // DefaultMaxPendingRequests is the default value
- // for PipelineClient.MaxPendingRequests.
- const DefaultMaxPendingRequests = 1024
- func (c *pipelineConnClient) init() {
- c.chLock.Lock()
- if c.chR == nil {
- maxPendingRequests := c.MaxPendingRequests
- if maxPendingRequests <= 0 {
- maxPendingRequests = DefaultMaxPendingRequests
- }
- c.chR = make(chan *pipelineWork, maxPendingRequests)
- if c.chW == nil {
- c.chW = make(chan *pipelineWork, maxPendingRequests)
- }
- go func() {
- // Keep restarting the worker if it fails (connection errors for example).
- for {
- if err := c.worker(); err != nil {
- c.logger().Printf("error in PipelineClient(%q): %v", c.Addr, err)
- if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
- // Throttle client reconnections on timeout errors
- time.Sleep(time.Second)
- }
- } else {
- c.chLock.Lock()
- stop := len(c.chR) == 0 && len(c.chW) == 0
- if !stop {
- c.chR = nil
- c.chW = nil
- }
- c.chLock.Unlock()
- if stop {
- break
- }
- }
- }
- }()
- }
- c.chLock.Unlock()
- }
- func (c *pipelineConnClient) worker() error {
- tlsConfig := c.cachedTLSConfig()
- conn, err := dialAddr(c.Addr, c.Dial, nil, c.DialDualStack, c.IsTLS, tlsConfig, 0, c.WriteTimeout)
- if err != nil {
- return err
- }
- // Start reader and writer
- stopW := make(chan struct{})
- doneW := make(chan error)
- go func() {
- doneW <- c.writer(conn, stopW)
- }()
- stopR := make(chan struct{})
- doneR := make(chan error)
- go func() {
- doneR <- c.reader(conn, stopR)
- }()
- // Wait until reader and writer are stopped
- select {
- case err = <-doneW:
- conn.Close()
- close(stopR)
- <-doneR
- case err = <-doneR:
- conn.Close()
- close(stopW)
- <-doneW
- }
- // Notify pending readers
- for len(c.chR) > 0 {
- w := <-c.chR
- w.err = errPipelineConnStopped
- w.done <- struct{}{}
- }
- return err
- }
- func (c *pipelineConnClient) cachedTLSConfig() *tls.Config {
- if !c.IsTLS {
- return nil
- }
- c.tlsConfigLock.Lock()
- cfg := c.tlsConfig
- if cfg == nil {
- cfg = newClientTLSConfig(c.TLSConfig, c.Addr)
- c.tlsConfig = cfg
- }
- c.tlsConfigLock.Unlock()
- return cfg
- }
- func (c *pipelineConnClient) writer(conn net.Conn, stopCh <-chan struct{}) error {
- writeBufferSize := c.WriteBufferSize
- if writeBufferSize <= 0 {
- writeBufferSize = defaultWriteBufferSize
- }
- bw := bufio.NewWriterSize(conn, writeBufferSize)
- defer bw.Flush()
- chR := c.chR
- chW := c.chW
- writeTimeout := c.WriteTimeout
- maxIdleConnDuration := c.MaxIdleConnDuration
- if maxIdleConnDuration <= 0 {
- maxIdleConnDuration = DefaultMaxIdleConnDuration
- }
- maxBatchDelay := c.MaxBatchDelay
- var (
- stopTimer = time.NewTimer(time.Hour)
- flushTimer = time.NewTimer(time.Hour)
- flushTimerCh <-chan time.Time
- instantTimerCh = make(chan time.Time)
- w *pipelineWork
- err error
- )
- close(instantTimerCh)
- for {
- againChW:
- select {
- case w = <-chW:
- // Fast path: len(chW) > 0
- default:
- // Slow path
- stopTimer.Reset(maxIdleConnDuration)
- select {
- case w = <-chW:
- case <-stopTimer.C:
- return nil
- case <-stopCh:
- return nil
- case <-flushTimerCh:
- if err = bw.Flush(); err != nil {
- return err
- }
- flushTimerCh = nil
- goto againChW
- }
- }
- if !w.deadline.IsZero() && time.Since(w.deadline) >= 0 {
- w.err = ErrTimeout
- w.done <- struct{}{}
- continue
- }
- w.resp.ParseNetConn(conn)
- if writeTimeout > 0 {
- // Set Deadline every time, since golang has fixed the performance issue
- // See https://github.com/golang/go/issues/15133#issuecomment-271571395 for details
- currentTime := time.Now()
- if err = conn.SetWriteDeadline(currentTime.Add(writeTimeout)); err != nil {
- w.err = err
- w.done <- struct{}{}
- return err
- }
- }
- if err = w.req.Write(bw); err != nil {
- w.err = err
- w.done <- struct{}{}
- return err
- }
- if flushTimerCh == nil && (len(chW) == 0 || len(chR) == cap(chR)) {
- if maxBatchDelay > 0 {
- flushTimer.Reset(maxBatchDelay)
- flushTimerCh = flushTimer.C
- } else {
- flushTimerCh = instantTimerCh
- }
- }
- againChR:
- select {
- case chR <- w:
- // Fast path: len(chR) < cap(chR)
- default:
- // Slow path
- select {
- case chR <- w:
- case <-stopCh:
- w.err = errPipelineConnStopped
- w.done <- struct{}{}
- return nil
- case <-flushTimerCh:
- if err = bw.Flush(); err != nil {
- w.err = err
- w.done <- struct{}{}
- return err
- }
- flushTimerCh = nil
- goto againChR
- }
- }
- }
- }
- func (c *pipelineConnClient) reader(conn net.Conn, stopCh <-chan struct{}) error {
- readBufferSize := c.ReadBufferSize
- if readBufferSize <= 0 {
- readBufferSize = defaultReadBufferSize
- }
- br := bufio.NewReaderSize(conn, readBufferSize)
- chR := c.chR
- readTimeout := c.ReadTimeout
- var (
- w *pipelineWork
- err error
- )
- for {
- select {
- case w = <-chR:
- // Fast path: len(chR) > 0
- default:
- // Slow path
- select {
- case w = <-chR:
- case <-stopCh:
- return nil
- }
- }
- if readTimeout > 0 {
- // Set Deadline every time, since golang has fixed the performance issue
- // See https://github.com/golang/go/issues/15133#issuecomment-271571395 for details
- currentTime := time.Now()
- if err = conn.SetReadDeadline(currentTime.Add(readTimeout)); err != nil {
- w.err = err
- w.done <- struct{}{}
- return err
- }
- }
- if err = w.resp.Read(br); err != nil {
- w.err = err
- w.done <- struct{}{}
- return err
- }
- w.done <- struct{}{}
- }
- }
- func (c *pipelineConnClient) logger() Logger {
- if c.Logger != nil {
- return c.Logger
- }
- return defaultLogger
- }
- // PendingRequests returns the current number of pending requests pipelined
- // to the server.
- //
- // This number may exceed MaxPendingRequests*MaxConns by up to two times, since
- // each connection to the server may keep up to MaxPendingRequests requests
- // in the queue before sending them to the server.
- //
- // This function may be used for balancing load among multiple PipelineClient
- // instances.
- func (c *PipelineClient) PendingRequests() int {
- c.connClientsLock.Lock()
- n := 0
- for _, cc := range c.connClients {
- n += cc.PendingRequests()
- }
- c.connClientsLock.Unlock()
- return n
- }
- func (c *pipelineConnClient) PendingRequests() int {
- c.init()
- c.chLock.Lock()
- n := len(c.chR) + len(c.chW)
- c.chLock.Unlock()
- return n
- }
- var errPipelineConnStopped = errors.New("pipeline connection has been stopped")
- var DefaultTransport RoundTripper = &transport{}
- type transport struct{}
- func (t *transport) RoundTrip(hc *HostClient, req *Request, resp *Response) (retry bool, err error) {
- customSkipBody := resp.SkipBody
- customStreamBody := resp.StreamBody
- var deadline time.Time
- if req.timeout > 0 {
- deadline = time.Now().Add(req.timeout)
- }
- cc, err := hc.AcquireConn(req.timeout, req.ConnectionClose())
- if err != nil {
- return false, err
- }
- conn := cc.c
- resp.ParseNetConn(conn)
- writeDeadline := deadline
- if hc.WriteTimeout > 0 {
- tmpWriteDeadline := time.Now().Add(hc.WriteTimeout)
- if writeDeadline.IsZero() || tmpWriteDeadline.Before(writeDeadline) {
- writeDeadline = tmpWriteDeadline
- }
- }
- if err = conn.SetWriteDeadline(writeDeadline); err != nil {
- hc.CloseConn(cc)
- return true, err
- }
- resetConnection := false
- if hc.MaxConnDuration > 0 && time.Since(cc.createdTime) > hc.MaxConnDuration && !req.ConnectionClose() {
- req.SetConnectionClose()
- resetConnection = true
- }
- bw := hc.AcquireWriter(conn)
- err = req.Write(bw)
- if resetConnection {
- req.Header.ResetConnectionClose()
- }
- if err == nil {
- err = bw.Flush()
- }
- hc.ReleaseWriter(bw)
- // Return ErrTimeout on any timeout.
- if x, ok := err.(interface{ Timeout() bool }); ok && x.Timeout() {
- err = ErrTimeout
- }
- if err != nil {
- hc.CloseConn(cc)
- return true, err
- }
- readDeadline := deadline
- if hc.ReadTimeout > 0 {
- tmpReadDeadline := time.Now().Add(hc.ReadTimeout)
- if readDeadline.IsZero() || tmpReadDeadline.Before(readDeadline) {
- readDeadline = tmpReadDeadline
- }
- }
- if err = conn.SetReadDeadline(readDeadline); err != nil {
- hc.CloseConn(cc)
- return true, err
- }
- if customSkipBody || req.Header.IsHead() {
- resp.SkipBody = true
- }
- if hc.DisableHeaderNamesNormalizing {
- resp.Header.DisableNormalizing()
- }
- br := hc.AcquireReader(conn)
- err = resp.ReadLimitBody(br, hc.MaxResponseBodySize)
- if err != nil {
- hc.ReleaseReader(br)
- hc.CloseConn(cc)
- // Don't retry in case of ErrBodyTooLarge since we will just get the same again.
- needRetry := err != ErrBodyTooLarge
- return needRetry, err
- }
- closeConn := resetConnection || req.ConnectionClose() || resp.ConnectionClose()
- if customStreamBody && resp.bodyStream != nil {
- rbs := resp.bodyStream
- resp.bodyStream = newCloseReaderWithError(rbs, func(wErr error) error {
- hc.ReleaseReader(br)
- if r, ok := rbs.(*requestStream); ok {
- releaseRequestStream(r)
- }
- if closeConn || resp.ConnectionClose() || wErr != nil {
- hc.CloseConn(cc)
- } else {
- hc.ReleaseConn(cc)
- }
- return nil
- })
- return false, nil
- }
- hc.ReleaseReader(br)
- if closeConn {
- hc.CloseConn(cc)
- } else {
- hc.ReleaseConn(cc)
- }
- return false, nil
- }
|