package pester // pester provides additional resiliency over the standard http client methods by // allowing you to control concurrency, retries, and a backoff strategy. import ( "bytes" "errors" "fmt" "io" "io/ioutil" "math" "math/rand" "net/http" "net/url" "sync" "time" ) // Client wraps the http client and exposes all the functionality of the http.Client. // Additionally, Client provides pester specific values for handling resiliency. type Client struct { // wrap it to provide access to http built ins hc *http.Client Transport http.RoundTripper CheckRedirect func(req *http.Request, via []*http.Request) error Jar http.CookieJar Timeout time.Duration // pester specific Concurrency int MaxRetries int Backoff BackoffStrategy KeepLog bool SuccessReqNum int SuccessRetryNum int wg *sync.WaitGroup sync.Mutex ErrLog []ErrEntry } // ErrEntry is used to provide the LogString() data and is populated // each time an error happens if KeepLog is set. // ErrEntry.Retry is deprecated in favor of ErrEntry.Attempt type ErrEntry struct { Time time.Time Method string URL string Verb string Request int Retry int Attempt int Err error } // result simplifies the channel communication for concurrent request handling type result struct { resp *http.Response err error req int retry int } // params represents all the params needed to run http client calls and pester errors type params struct { method string verb string req *http.Request url string bodyType string body io.Reader data url.Values } // New constructs a new DefaultClient with sensible default values func New() *Client { return &Client{ Concurrency: DefaultClient.Concurrency, MaxRetries: DefaultClient.MaxRetries, Backoff: DefaultClient.Backoff, ErrLog: DefaultClient.ErrLog, wg: &sync.WaitGroup{}, } } // NewExtendedClient allows you to pass in an http.Client that is previously set up // and extends it to have Pester's features of concurrency and retries. func NewExtendedClient(hc *http.Client) *Client { c := New() c.hc = hc return c } // BackoffStrategy is used to determine how long a retry request should wait until attempted type BackoffStrategy func(retry int) time.Duration // DefaultClient provides sensible defaults var DefaultClient = &Client{Concurrency: 1, MaxRetries: 3, Backoff: DefaultBackoff, ErrLog: []ErrEntry{}} // DefaultBackoff always returns 1 second func DefaultBackoff(_ int) time.Duration { return 1 * time.Second } // ExponentialBackoff returns ever increasing backoffs by a power of 2 func ExponentialBackoff(i int) time.Duration { return time.Duration(math.Pow(2, float64(i))) * time.Second } // ExponentialJitterBackoff returns ever increasing backoffs by a power of 2 // with +/- 0-33% to prevent sychronized reuqests. func ExponentialJitterBackoff(i int) time.Duration { return jitter(int(math.Pow(2, float64(i)))) } // LinearBackoff returns increasing durations, each a second longer than the last func LinearBackoff(i int) time.Duration { return time.Duration(i) * time.Second } // LinearJitterBackoff returns increasing durations, each a second longer than the last // with +/- 0-33% to prevent sychronized reuqests. func LinearJitterBackoff(i int) time.Duration { return jitter(i) } // jitter keeps the +/- 0-33% logic in one place func jitter(i int) time.Duration { ms := i * 1000 maxJitter := ms / 3 rand.Seed(time.Now().Unix()) jitter := rand.Intn(maxJitter + 1) if rand.Intn(2) == 1 { ms = ms + jitter } else { ms = ms - jitter } // a jitter of 0 messes up the time.Tick chan if ms <= 0 { ms = 1 } return time.Duration(ms) * time.Millisecond } // Wait blocks until all pester requests have returned // Probably not that useful outside of testing. func (c *Client) Wait() { c.wg.Wait() } // pester provides all the logic of retries, concurrency, backoff, and logging func (c *Client) pester(p params) (*http.Response, error) { resultCh := make(chan result) multiplexCh := make(chan result) finishCh := make(chan struct{}) // track all requests that go out so we can close the late listener routine that closes late incoming response bodies totalSentRequests := &sync.WaitGroup{} totalSentRequests.Add(1) defer totalSentRequests.Done() allRequestsBackCh := make(chan struct{}) go func() { totalSentRequests.Wait() close(allRequestsBackCh) }() // GET calls should be idempotent and can make use // of concurrency. Other verbs can mutate and should not // make use of the concurrency feature concurrency := c.Concurrency if p.verb != "GET" { concurrency = 1 } c.Lock() if c.hc == nil { c.hc = &http.Client{} c.hc.Transport = c.Transport c.hc.CheckRedirect = c.CheckRedirect c.hc.Jar = c.Jar c.hc.Timeout = c.Timeout } c.Unlock() // re-create the http client so we can leverage the std lib httpClient := http.Client{ Transport: c.hc.Transport, CheckRedirect: c.hc.CheckRedirect, Jar: c.hc.Jar, Timeout: c.hc.Timeout, } // if we have a request body, we need to save it for later var originalRequestBody []byte var originalBody []byte var err error if p.req != nil && p.req.Body != nil { originalRequestBody, err = ioutil.ReadAll(p.req.Body) if err != nil { return &http.Response{}, errors.New("error reading request body") } p.req.Body.Close() } if p.body != nil { originalBody, err = ioutil.ReadAll(p.body) if err != nil { return &http.Response{}, errors.New("error reading body") } } AttemptLimit := c.MaxRetries if AttemptLimit <= 0 { AttemptLimit = 1 } for req := 0; req < concurrency; req++ { c.wg.Add(1) totalSentRequests.Add(1) go func(n int, p params) { defer c.wg.Done() defer totalSentRequests.Done() var err error for i := 1; i <= AttemptLimit; i++ { c.wg.Add(1) defer c.wg.Done() select { case <-finishCh: return default: } resp := &http.Response{} // rehydrate the body (it is drained each read) if len(originalRequestBody) > 0 { p.req.Body = ioutil.NopCloser(bytes.NewBuffer(originalRequestBody)) } if len(originalBody) > 0 { p.body = bytes.NewBuffer(originalBody) } // route the calls switch p.method { case "Do": resp, err = httpClient.Do(p.req) case "Get": resp, err = httpClient.Get(p.url) case "Head": resp, err = httpClient.Head(p.url) case "Post": resp, err = httpClient.Post(p.url, p.bodyType, p.body) case "PostForm": resp, err = httpClient.PostForm(p.url, p.data) } // Early return if we have a valid result // Only retry (ie, continue the loop) on 5xx status codes if err == nil && resp.StatusCode < 500 { multiplexCh <- result{resp: resp, err: err, req: n, retry: i} return } c.log(ErrEntry{ Time: time.Now(), Method: p.method, Verb: p.verb, URL: p.url, Request: n, Retry: i + 1, // would remove, but would break backward compatibility Attempt: i, Err: err, }) // if it is the last iteration, grab the result (which is an error at this point) if i == AttemptLimit { multiplexCh <- result{resp: resp, err: err} return } // if we are retrying, we should close this response body to free the fd if resp != nil { resp.Body.Close() } // prevent a 0 from causing the tick to block, pass additional microsecond <-time.Tick(c.Backoff(i) + 1*time.Microsecond) } }(req, p) } // spin off the go routine so it can continually listen in on late results and close the response bodies go func() { gotFirstResult := false for { select { case res := <-multiplexCh: if !gotFirstResult { gotFirstResult = true close(finishCh) resultCh <- res } else if res.resp != nil { // we only return one result to the caller; close all other response bodies that come back // drain the body before close as to not prevent keepalive. see https://gist.github.com/mholt/eba0f2cc96658be0f717 io.Copy(ioutil.Discard, res.resp.Body) res.resp.Body.Close() } case <-allRequestsBackCh: // don't leave this goroutine running return } } }() select { case res := <-resultCh: c.Lock() defer c.Unlock() c.SuccessReqNum = res.req c.SuccessRetryNum = res.retry return res.resp, res.err } } // LogString provides a string representation of the errors the client has seen func (c *Client) LogString() string { c.Lock() defer c.Unlock() var res string for _, e := range c.ErrLog { res += fmt.Sprintf("%d %s [%s] %s request-%d retry-%d error: %s\n", e.Time.Unix(), e.Method, e.Verb, e.URL, e.Request, e.Retry, e.Err) } return res } // LogErrCount is a helper method used primarily for test validation func (c *Client) LogErrCount() int { c.Lock() defer c.Unlock() return len(c.ErrLog) } // EmbedHTTPClient allows you to extend an existing Pester client with an // underlying http.Client, such as https://godoc.org/golang.org/x/oauth2/google#DefaultClient func (c *Client) EmbedHTTPClient(hc *http.Client) { c.hc = hc } func (c *Client) log(e ErrEntry) { if c.KeepLog { c.Lock() c.ErrLog = append(c.ErrLog, e) c.Unlock() } } // Do provides the same functionality as http.Client.Do func (c *Client) Do(req *http.Request) (resp *http.Response, err error) { return c.pester(params{method: "Do", req: req, verb: req.Method, url: req.URL.String()}) } // Get provides the same functionality as http.Client.Get func (c *Client) Get(url string) (resp *http.Response, err error) { return c.pester(params{method: "Get", url: url, verb: "GET"}) } // Head provides the same functionality as http.Client.Head func (c *Client) Head(url string) (resp *http.Response, err error) { return c.pester(params{method: "Head", url: url, verb: "HEAD"}) } // Post provides the same functionality as http.Client.Post func (c *Client) Post(url string, bodyType string, body io.Reader) (resp *http.Response, err error) { return c.pester(params{method: "Post", url: url, bodyType: bodyType, body: body, verb: "POST"}) } // PostForm provides the same functionality as http.Client.PostForm func (c *Client) PostForm(url string, data url.Values) (resp *http.Response, err error) { return c.pester(params{method: "PostForm", url: url, data: data, verb: "POST"}) } //////////////////////////////////////// // Provide self-constructing variants // //////////////////////////////////////// // Do provides the same functionality as http.Client.Do and creates its own constructor func Do(req *http.Request) (resp *http.Response, err error) { c := New() return c.Do(req) } // Get provides the same functionality as http.Client.Get and creates its own constructor func Get(url string) (resp *http.Response, err error) { c := New() return c.Get(url) } // Head provides the same functionality as http.Client.Head and creates its own constructor func Head(url string) (resp *http.Response, err error) { c := New() return c.Head(url) } // Post provides the same functionality as http.Client.Post and creates its own constructor func Post(url string, bodyType string, body io.Reader) (resp *http.Response, err error) { c := New() return c.Post(url, bodyType, body) } // PostForm provides the same functionality as http.Client.PostForm and creates its own constructor func PostForm(url string, data url.Values) (resp *http.Response, err error) { c := New() return c.PostForm(url, data) }