lbclient.go 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204
  1. package fasthttp
  2. import (
  3. "sync"
  4. "sync/atomic"
  5. "time"
  6. )
  7. // BalancingClient is the interface for clients, which may be passed
  8. // to LBClient.Clients.
  9. type BalancingClient interface {
  10. DoDeadline(req *Request, resp *Response, deadline time.Time) error
  11. PendingRequests() int
  12. }
  13. // LBClient balances requests among available LBClient.Clients.
  14. //
  15. // It has the following features:
  16. //
  17. // - Balances load among available clients using 'least loaded' + 'least total'
  18. // hybrid technique.
  19. // - Dynamically decreases load on unhealthy clients.
  20. //
  21. // It is forbidden copying LBClient instances. Create new instances instead.
  22. //
  23. // It is safe calling LBClient methods from concurrently running goroutines.
  24. type LBClient struct {
  25. noCopy noCopy //nolint:unused,structcheck
  26. // Clients must contain non-zero clients list.
  27. // Incoming requests are balanced among these clients.
  28. Clients []BalancingClient
  29. // HealthCheck is a callback called after each request.
  30. //
  31. // The request, response and the error returned by the client
  32. // is passed to HealthCheck, so the callback may determine whether
  33. // the client is healthy.
  34. //
  35. // Load on the current client is decreased if HealthCheck returns false.
  36. //
  37. // By default HealthCheck returns false if err != nil.
  38. HealthCheck func(req *Request, resp *Response, err error) bool
  39. // Timeout is the request timeout used when calling LBClient.Do.
  40. //
  41. // DefaultLBClientTimeout is used by default.
  42. Timeout time.Duration
  43. cs []*lbClient
  44. once sync.Once
  45. mu sync.RWMutex
  46. }
  47. // DefaultLBClientTimeout is the default request timeout used by LBClient
  48. // when calling LBClient.Do.
  49. //
  50. // The timeout may be overridden via LBClient.Timeout.
  51. const DefaultLBClientTimeout = time.Second
  52. // DoDeadline calls DoDeadline on the least loaded client
  53. func (cc *LBClient) DoDeadline(req *Request, resp *Response, deadline time.Time) error {
  54. return cc.get().DoDeadline(req, resp, deadline)
  55. }
  56. // DoTimeout calculates deadline and calls DoDeadline on the least loaded client
  57. func (cc *LBClient) DoTimeout(req *Request, resp *Response, timeout time.Duration) error {
  58. deadline := time.Now().Add(timeout)
  59. return cc.get().DoDeadline(req, resp, deadline)
  60. }
  61. // Do calls calculates deadline using LBClient.Timeout and calls DoDeadline
  62. // on the least loaded client.
  63. func (cc *LBClient) Do(req *Request, resp *Response) error {
  64. timeout := cc.Timeout
  65. if timeout <= 0 {
  66. timeout = DefaultLBClientTimeout
  67. }
  68. return cc.DoTimeout(req, resp, timeout)
  69. }
  70. func (cc *LBClient) init() {
  71. cc.mu.Lock()
  72. defer cc.mu.Unlock()
  73. if len(cc.Clients) == 0 {
  74. panic("BUG: LBClient.Clients cannot be empty")
  75. }
  76. for _, c := range cc.Clients {
  77. cc.cs = append(cc.cs, &lbClient{
  78. c: c,
  79. healthCheck: cc.HealthCheck,
  80. })
  81. }
  82. }
  83. // AddClient adds a new client to the balanced clients
  84. // returns the new total number of clients
  85. func (cc *LBClient) AddClient(c BalancingClient) int {
  86. cc.mu.Lock()
  87. cc.cs = append(cc.cs, &lbClient{
  88. c: c,
  89. healthCheck: cc.HealthCheck,
  90. })
  91. cc.mu.Unlock()
  92. return len(cc.cs)
  93. }
  94. // RemoveClients removes clients using the provided callback
  95. // if rc returns true, the passed client will be removed
  96. // returns the new total number of clients
  97. func (cc *LBClient) RemoveClients(rc func(BalancingClient) bool) int {
  98. cc.mu.Lock()
  99. n := 0
  100. for _, cs := range cc.cs {
  101. if rc(cs.c) {
  102. continue
  103. }
  104. cc.cs[n] = cs
  105. n++
  106. }
  107. for i := n; i < len(cc.cs); i++ {
  108. cc.cs[i] = nil
  109. }
  110. cc.cs = cc.cs[:n]
  111. cc.mu.Unlock()
  112. return len(cc.cs)
  113. }
  114. func (cc *LBClient) get() *lbClient {
  115. cc.once.Do(cc.init)
  116. cc.mu.RLock()
  117. cs := cc.cs
  118. minC := cs[0]
  119. minN := minC.PendingRequests()
  120. minT := atomic.LoadUint64(&minC.total)
  121. for _, c := range cs[1:] {
  122. n := c.PendingRequests()
  123. t := atomic.LoadUint64(&c.total)
  124. if n < minN || (n == minN && t < minT) {
  125. minC = c
  126. minN = n
  127. minT = t
  128. }
  129. }
  130. cc.mu.RUnlock()
  131. return minC
  132. }
  133. type lbClient struct {
  134. c BalancingClient
  135. healthCheck func(req *Request, resp *Response, err error) bool
  136. penalty uint32
  137. // total amount of requests handled.
  138. total uint64
  139. }
  140. func (c *lbClient) DoDeadline(req *Request, resp *Response, deadline time.Time) error {
  141. err := c.c.DoDeadline(req, resp, deadline)
  142. if !c.isHealthy(req, resp, err) && c.incPenalty() {
  143. // Penalize the client returning error, so the next requests
  144. // are routed to another clients.
  145. time.AfterFunc(penaltyDuration, c.decPenalty)
  146. } else {
  147. atomic.AddUint64(&c.total, 1)
  148. }
  149. return err
  150. }
  151. func (c *lbClient) PendingRequests() int {
  152. n := c.c.PendingRequests()
  153. m := atomic.LoadUint32(&c.penalty)
  154. return n + int(m)
  155. }
  156. func (c *lbClient) isHealthy(req *Request, resp *Response, err error) bool {
  157. if c.healthCheck == nil {
  158. return err == nil
  159. }
  160. return c.healthCheck(req, resp, err)
  161. }
  162. func (c *lbClient) incPenalty() bool {
  163. m := atomic.AddUint32(&c.penalty, 1)
  164. if m > maxPenalty {
  165. c.decPenalty()
  166. return false
  167. }
  168. return true
  169. }
  170. func (c *lbClient) decPenalty() {
  171. atomic.AddUint32(&c.penalty, ^uint32(0))
  172. }
  173. const (
  174. maxPenalty = 300
  175. penaltyDuration = 3 * time.Second
  176. )