Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 124 additions & 4 deletions api_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@ import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"iter"
"log"
"math"
"math/rand"
"net"
"net/http"
"net/textproto"
"net/url"
Expand Down Expand Up @@ -71,7 +74,7 @@ func sendStreamRequest[T responseStream[R], R any](ctx context.Context, ac *apiC
}
req = req.WithContext(requestContext)

resp, err := doRequest(ac, req)
resp, err := doRequestWithRetry(ac, req, httpOptions.RetryOptions)
if err != nil {
return err
}
Expand Down Expand Up @@ -102,7 +105,7 @@ func sendRequest(ctx context.Context, ac *apiClient, path string, method string,
}
req = req.WithContext(requestContext)

resp, err := doRequest(ac, req)
resp, err := doRequestWithRetry(ac, req, httpOptions.RetryOptions)
if err != nil {
return nil, err
}
Expand All @@ -115,13 +118,13 @@ func sendRequest(ctx context.Context, ac *apiClient, path string, method string,
func downloadFile(ctx context.Context, ac *apiClient, path string, httpOptions *HTTPOptions) ([]byte, error) {
// The client and request timeout are not used for downloadFile.
// TODO(b/427540996): implement timeout.
req, _, err := buildRequest(ctx, ac, path, nil, http.MethodGet, httpOptions)
req, patchedOptions, err := buildRequest(ctx, ac, path, nil, http.MethodGet, httpOptions)
if err != nil {
return nil, err
}
req = req.WithContext(ctx)

resp, err := doRequest(ac, req)
resp, err := doRequestWithRetry(ac, req, patchedOptions.RetryOptions)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -216,6 +219,9 @@ func patchHTTPOptions(options, patchOptions HTTPOptions) (*HTTPOptions, error) {
if patchOptions.ExtraBody != nil {
copyOption.ExtraBody = patchOptions.ExtraBody
}
if patchOptions.RetryOptions != nil {
copyOption.RetryOptions = patchOptions.RetryOptions
}
// Request timeout config overrides client timeout config.
// So we need a pointer type so that we know the request timeout
// is explicitly set or not.
Expand Down Expand Up @@ -412,6 +418,120 @@ func doRequest(ac *apiClient, req *http.Request) (*http.Response, error) {
return resp, nil
}

// Default retry settings.
// See https://cloud.google.com/storage/docs/retry-strategy.
const (
defaultRetryAttempts = 5
defaultRetryInitialDelay = time.Second
defaultRetryMaxDelay = 60 * time.Second
defaultRetryExpBase = 2.0
defaultRetryJitter = time.Second
)

var defaultRetryHTTPStatusCodes = []int{
http.StatusRequestTimeout,
http.StatusTooManyRequests,
http.StatusInternalServerError,
http.StatusBadGateway,
http.StatusServiceUnavailable,
http.StatusGatewayTimeout,
}

func resolvedRetryOptions(opts *HTTPRetryOptions) *HTTPRetryOptions {
if opts == nil {
return nil
}
resolved := *opts
if resolved.Attempts == 0 {
resolved.Attempts = defaultRetryAttempts
}
if resolved.Attempts <= 1 {
return nil
}
if resolved.InitialDelay <= 0 {
resolved.InitialDelay = defaultRetryInitialDelay
}
if resolved.MaxDelay <= 0 {
resolved.MaxDelay = defaultRetryMaxDelay
}
if resolved.ExpBase <= 0 {
resolved.ExpBase = defaultRetryExpBase
}
if resolved.Jitter < 0 {
resolved.Jitter = 0
} else if resolved.Jitter == 0 {
resolved.Jitter = defaultRetryJitter
}
if len(resolved.HTTPStatusCodes) == 0 {
resolved.HTTPStatusCodes = defaultRetryHTTPStatusCodes
}
return &resolved
}

func backoffDelay(opts *HTTPRetryOptions, retryNum int) time.Duration {
delay := float64(opts.InitialDelay) * math.Pow(opts.ExpBase, float64(retryNum-1))
delay += rand.Float64() * float64(opts.Jitter)
if maxD := float64(opts.MaxDelay); delay > maxD {
delay = maxD
}
return time.Duration(delay)
}

func isRetriableTransportErr(err error) bool {
var netErr net.Error
return errors.As(err, &netErr)
}

func doRequestWithRetry(ac *apiClient, req *http.Request, retryOpts *HTTPRetryOptions) (*http.Response, error) {
resolved := resolvedRetryOptions(retryOpts)
if resolved == nil {
return doRequest(ac, req)
}
// http.NewRequest sets GetBody for bytes.Buffer/Reader bodies so the body
// can be rewound on retry. Skip retry if the body is non-empty and not
// rewindable to avoid re-sending an empty payload.
canRewindBody := req.Body == nil || req.Body == http.NoBody || req.GetBody != nil

var resp *http.Response
var lastErr error
for attempt := 1; attempt <= resolved.Attempts; attempt++ {
if attempt > 1 {
if !canRewindBody {
return resp, lastErr
}
if req.GetBody != nil {
body, gerr := req.GetBody()
if gerr != nil {
return nil, fmt.Errorf("doRequestWithRetry: rewinding body: %w", gerr)
}
req.Body = body
}
select {
case <-req.Context().Done():
return nil, req.Context().Err()
case <-time.After(backoffDelay(resolved, attempt-1)):
}
}
resp, lastErr = doRequest(ac, req)
if lastErr != nil {
if attempt == resolved.Attempts || !isRetriableTransportErr(lastErr) {
return resp, lastErr
}
continue
}
if httpStatusOk(resp) {
return resp, nil
}
if !slices.Contains(resolved.HTTPStatusCodes, resp.StatusCode) || attempt == resolved.Attempts {
return resp, nil
}
_, _ = io.Copy(io.Discard, resp.Body)
_ = resp.Body.Close()
lastErr = fmt.Errorf("doRequestWithRetry: retriable status %d", resp.StatusCode)
}
return resp, lastErr
}

func deserializeUnaryResponse(resp *http.Response) (map[string]any, error) {
if !httpStatusOk(resp) {
return nil, newAPIError(resp)
Expand Down
Loading