Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 64 additions & 53 deletions backends/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,14 @@ const (
DefaultSecretsRefreshInterval = 15 * time.Second
// DefaultDisableContentMd5 : disable sending the Content-MD5 header
DefaultDisableContentMd5 = false
// DefaultClientTimeout is the default value for [(Options).ClientTimeout].
DefaultClientTimeout = 15 * time.Minute
)

// ErrClientTimeout is returned when a Store, Read, Delete or List operation
// reaches the amount set in the [(Options).ClientTimeout] option (default [DefaultClientTimeout]).
var ErrClientTimeout = errors.New("S3 client timed out")

// Options describes the storage options for the S3 backend
type Options struct {
// AccessKey and SecretKey are statically defined here.
Expand Down Expand Up @@ -140,11 +146,8 @@ type Options struct {
// wait for a TLS handshake. Default if unset: 10s
TLSHandshakeTimeout time.Duration `yaml:"tls_handshake_timeout"`

// ClientTimeout specifies a time limit for requests made by this
// HTTP Client. The timeout includes connection time, any
// redirects, and reading the response body. The timer remains
// running after Get, Head, Post, or Do return and will
// interrupt reading of the Response.Body.
// ClientTimeout specifies a time limit for operations on the S3 Backend.
// If [UseUpdateMarker] is set, saving the marker is considered part of the operation.
// Default if unset: 15m
ClientTimeout time.Duration `yaml:"client_timeout"`

Expand Down Expand Up @@ -244,23 +247,21 @@ func recordMinioDurationMetric(method string, start time.Time) {
metricCallHistogram.WithLabelValues(method).Observe(elapsed.Seconds())
}

func (b *Backend) doList(ctx context.Context, prefix string) (simpleblob.BlobList, error) {
var blobs simpleblob.BlobList

func (b *Backend) doList(ctx context.Context, prefix string) (blobs simpleblob.BlobList, err error) {
ctx, cancel := b.clientTimeoutContext(ctx)
defer cancel()
defer recordMinioDurationMetric("list", time.Now())

// Runes to strip from blob names for GlobalPrefix
// This is fine, because we can trust the API to only return with the prefix.
// TODO: trust but verify
gpEndIndex := len(b.opt.GlobalPrefix)

objCh := b.client.ListObjects(ctx, b.opt.Bucket, minio.ListObjectsOptions{
objIter := b.client.ListObjectsIter(ctx, b.opt.Bucket, minio.ListObjectsOptions{
Prefix: prefix,
Recursive: !b.opt.PrefixFolders && !b.opt.HideFolders,
})
for obj := range objCh {
// Handle error returned by MinIO client
if err := convertMinioError(obj.Err, true); err != nil {
for obj := range objIter {
if err = convertError(ctx, obj.Err, true); err != nil {
metricCallErrors.WithLabelValues("list").Inc()
metricCallErrorsType.WithLabelValues("list", errorToMetricsLabel(err)).Inc()
return nil, err
Expand Down Expand Up @@ -296,6 +297,8 @@ func (b *Backend) doList(ctx context.Context, prefix string) (simpleblob.BlobLis
// configured in b.
func (b *Backend) Load(ctx context.Context, name string) ([]byte, error) {
name = b.prependGlobalPrefix(name)
ctx, cancel := b.clientTimeoutContext(ctx)
defer cancel()

r, err := b.doLoadReader(ctx, name)
if err != nil {
Expand All @@ -304,20 +307,20 @@ func (b *Backend) Load(ctx context.Context, name string) ([]byte, error) {
defer r.Close()

p, err := io.ReadAll(r)
if err = convertMinioError(err, false); err != nil {
if err = convertError(ctx, err, false); err != nil {
return nil, err
}
return p, nil
}

func (b *Backend) doLoadReader(ctx context.Context, name string) (io.ReadCloser, error) {
func (b *Backend) doLoadReader(ctx context.Context, name string) (*minio.Object, error) {
metricCalls.WithLabelValues("load").Inc()
metricLastCallTimestamp.WithLabelValues("load").SetToCurrentTime()

defer recordMinioDurationMetric("load", time.Now())

obj, err := b.client.GetObject(ctx, b.opt.Bucket, name, minio.GetObjectOptions{})
if err = convertMinioError(err, false); err != nil {
if err = convertError(ctx, err, false); err != nil {
if !errors.Is(err, os.ErrNotExist) {
metricCallErrors.WithLabelValues("load").Inc()
metricCallErrorsType.WithLabelValues("load", errorToMetricsLabel(err)).Inc()
Expand All @@ -328,7 +331,7 @@ func (b *Backend) doLoadReader(ctx context.Context, name string) (io.ReadCloser,
return nil, os.ErrNotExist
}
info, err := obj.Stat()
if err = convertMinioError(err, false); err != nil {
if err = convertError(ctx, err, false); err != nil {
if !errors.Is(err, os.ErrNotExist) {
metricCallErrors.WithLabelValues("load").Inc()
metricCallErrorsType.WithLabelValues("load", errorToMetricsLabel(err)).Inc()
Expand All @@ -346,9 +349,9 @@ func (b *Backend) doLoadReader(ctx context.Context, name string) (io.ReadCloser,
// Store sets the content of the object identified by name to the content
// of data, in the S3 Bucket configured in b.
func (b *Backend) Store(ctx context.Context, name string, data []byte) error {
// Prepend global prefix
name = b.prependGlobalPrefix(name)

ctx, cancel := b.clientTimeoutContext(ctx)
defer cancel()
info, err := b.doStore(ctx, name, data)
if err != nil {
return err
Expand Down Expand Up @@ -376,20 +379,20 @@ func (b *Backend) doStoreReader(ctx context.Context, name string, r io.Reader, s

// minio accepts size == -1, meaning the size is unknown.
info, err := b.client.PutObject(ctx, b.opt.Bucket, name, r, size, putObjectOptions)
err = convertMinioError(err, false)
if err != nil {
if err = convertError(ctx, err, false); err != nil {
metricCallErrors.WithLabelValues("store").Inc()
metricCallErrorsType.WithLabelValues("store", errorToMetricsLabel(err)).Inc()
return info, err
}
return info, err
return info, nil
}

// Delete removes the object identified by name from the S3 Bucket
// configured in b.
func (b *Backend) Delete(ctx context.Context, name string) error {
// Prepend global prefix
name = b.prependGlobalPrefix(name)

ctx, cancel := b.clientTimeoutContext(ctx)
defer cancel()
if err := b.doDelete(ctx, name); err != nil {
return err
}
Expand All @@ -402,9 +405,42 @@ func (b *Backend) doDelete(ctx context.Context, name string) error {
defer recordMinioDurationMetric("delete", time.Now())

err := b.client.RemoveObject(ctx, b.opt.Bucket, name, minio.RemoveObjectOptions{})
if err = convertMinioError(err, false); err != nil {
if err = convertError(ctx, err, false); err != nil {
metricCallErrors.WithLabelValues("delete").Inc()
metricCallErrorsType.WithLabelValues("delete", errorToMetricsLabel(err)).Inc()
return err
}
return nil
}

// clientTimeoutContext wraps [context.WithTimeoutCause] with the values and options that caracterise a client timeout.
func (b *Backend) clientTimeoutContext(ctx context.Context) (context.Context, context.CancelFunc) {
return context.WithTimeoutCause(ctx, getOpt(b.opt.ClientTimeout, DefaultClientTimeout), ErrClientTimeout)
}

// convertError returns a more informative error from err.
// It may be converted to a [minio.ErrorResponse],
// or an [ErrClientTimeout] when ctx was issued by [(*Backend).contextWithTimeout].
func convertError(ctx context.Context, err error, isList bool) error {
if err == nil {
return nil
}
// Try to get a more specific error.
if ctx.Err() != nil {
err = context.Cause(ctx)
} else {
errRes := minio.ToErrorResponse(err)
switch errRes.Code {
case "BucketAlreadyOwnedByYou":
// This is the desired outcome if we work on already existing bucket.
return nil
case "NoSuchKey":
// NoSuchKey in a list means the marker is missing.
if !isList {
// This error does not reflect an upstream issue, so no metrics.
return fmt.Errorf("%w: %s", os.ErrNotExist, err.Error())
}
}
}
return err
}
Expand Down Expand Up @@ -472,11 +508,6 @@ func New(ctx context.Context, opt Options) (*Backend, error) {
TLSClientConfig: tlsConfig,
ForceAttemptHTTP2: true,
}
hc := &http.Client{
Transport: transport,
// includes reading response body!
Timeout: getOpt(opt.ClientTimeout, 15*time.Minute),
}

// Some of the following calls require a short running context
ctx, cancel := context.WithTimeout(ctx, opt.InitTimeout)
Expand Down Expand Up @@ -512,7 +543,7 @@ func New(ctx context.Context, opt Options) (*Backend, error) {
cfg := &minio.Options{
Creds: creds,
Secure: useSSL,
Transport: hc.Transport,
Transport: transport,
Region: opt.Region,
}

Expand All @@ -537,7 +568,7 @@ func New(ctx context.Context, opt Options) (*Backend, error) {

err := client.MakeBucket(ctx, opt.Bucket, minio.MakeBucketOptions{Region: opt.Region})
if err != nil {
if err := convertMinioError(err, false); err != nil {
if err := convertError(ctx, err, false); err != nil {
return nil, err
}
}
Expand All @@ -561,27 +592,6 @@ func (b *Backend) setGlobalPrefix(prefix string) {
b.markerName = b.prependGlobalPrefix(UpdateMarkerFilename)
}

// convertMinioError takes an error, possibly a minio.ErrorResponse
// and turns it into a well known error when possible.
// If error is not well known, it is returned as is.
// If error is considered to be ignorable, nil is returned.
func convertMinioError(err error, isList bool) error {
if err == nil {
return nil
}
errRes := minio.ToErrorResponse(err)
// We need to differentiate between a missing bucket and a missing key,
// because a missing bucket is the result of missing rights or a deletion from the outside.
// Thus, we do not use `errRes.StatusCode` that would be == 404 for either case.
if !isList && errRes.Code == "NoSuchKey" {
return fmt.Errorf("%w: %s", os.ErrNotExist, err.Error())
}
if errRes.Code == "BucketAlreadyOwnedByYou" {
return nil
}
return err
}

// errorToMetricsLabel converts an error into a prometheus label.
// If error is a NotExist error, "NotFound" is returned.
// If error is a timeout, "Timeout" is returned.
Expand All @@ -598,6 +608,7 @@ func errorToMetricsLabel(err error) string {
}
var netError *net.OpError
if errors.Is(err, context.DeadlineExceeded) ||
errors.Is(err, ErrClientTimeout) ||
(errors.As(err, &netError) && netError.Timeout()) {
return "Timeout"
}
Expand Down
73 changes: 73 additions & 0 deletions backends/s3/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package s3

import (
"context"
"io"
"testing"
"time"

Expand Down Expand Up @@ -221,3 +222,75 @@ func TestHideFolders(t *testing.T) {
assert.Equal(t, []string{"baz"}, ls.Names())
})
}

// TestClientTimeout makes sure the ClientTimeout option is taken into consideration.
func TestClientTimeout(t *testing.T) {
b := getBackend(t.Context(), t)
t.Run("basic", func(t *testing.T) {
b.opt.ClientTimeout = time.Microsecond
ctx, _ := b.clientTimeoutContext(t.Context())
time.Sleep(time.Second)
require.ErrorIs(t, context.Cause(ctx), ErrClientTimeout)
})
t.Run("crud", func(t *testing.T) {
b.opt.ClientTimeout = time.Microsecond
ctx := t.Context()
assert.ErrorIs(t, b.Store(ctx, "crudTest", []byte("123")), ErrClientTimeout)
_, err := b.Load(ctx, "crudTest")
assert.ErrorIs(t, err, ErrClientTimeout)
assert.ErrorIs(t, b.Delete(ctx, "delete"), ErrClientTimeout)
_, err = b.List(ctx, "")
assert.ErrorIs(t, err, ErrClientTimeout)
})
t.Run("stream write", func(t *testing.T) {
b.opt.ClientTimeout = time.Millisecond
ctx := t.Context()
w, err := b.NewWriter(ctx, "failOnWriteTest")
require.NoError(t, err)
time.Sleep(5 * time.Millisecond)
_, err = w.Write([]byte("123"))
require.ErrorIs(t, err, ErrClientTimeout)
require.ErrorIs(t, w.Close(), ErrClientTimeout)
})
t.Run("stream read", func(t *testing.T) {
b.opt.ClientTimeout = 0 // Reset
ctx := t.Context()
require.NoError(t, b.Store(ctx, "failOnReadTest", []byte("123"))) // avoid ErrNoExist

b.opt.ClientTimeout = time.Millisecond
r, err := b.NewReader(ctx, "failOnReadTest")
require.NoError(t, err)
time.Sleep(5 * time.Millisecond)
_, err = r.Read(make([]byte, 1))
require.ErrorIs(t, err, ErrClientTimeout)
require.ErrorIs(t, r.Close(), ErrClientTimeout)
})
}

// Ensure that in Minio Client, context cancellation aborts
// an ongoing PutObject or GetObject operation.
func TestMinioKeepsContext(t *testing.T) {
t.Parallel()
t.Run("putObject", func(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithCancel(t.Context())
b := getBackend(ctx, t)
_, err := b.client.PutObject(ctx, b.opt.Bucket, "putObject", readerOnce(cancel), -1, minio.PutObjectOptions{})
require.ErrorIs(t, err, context.Canceled)
})
t.Run("getObject", func(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithCancel(t.Context())
b := getBackend(ctx, t)
require.NoError(t, b.Store(ctx, "getObject", []byte("123"))) // avoid ErrNoExist
obj, err := b.client.GetObject(ctx, b.opt.Bucket, "getObject", minio.GetObjectOptions{})
require.NoError(t, err)
cancel()
_, err = io.Copy(io.Discard, obj)
require.ErrorIs(t, err, context.Canceled)
})
}

type readerOnce context.CancelFunc

func (f readerOnce) Read(p []byte) (int, error) { f(); return len(p), nil }
Loading