diff --git a/backends/s3/s3.go b/backends/s3/s3.go index eecc35b..ea735b7 100644 --- a/backends/s3/s3.go +++ b/backends/s3/s3.go @@ -43,8 +43,14 @@ const ( DefaultSecretsRefreshInterval = 15 * time.Second // DefaultDisableContentMd5 : disable sending the Content-MD5 header DefaultDisableContentMd5 = false + // DefaultClientTimeout is the default value for [(Options).ClientTimeout]. + DefaultClientTimeout = 15 * time.Minute ) +// ErrClientTimeout is returned when a Store, Read, Delete or List operation +// reaches the amount set in the [(Options).ClientTimeout] option (default [DefaultClientTimeout]). +var ErrClientTimeout = errors.New("S3 client timed out") + // Options describes the storage options for the S3 backend type Options struct { // AccessKey and SecretKey are statically defined here. @@ -140,11 +146,8 @@ type Options struct { // wait for a TLS handshake. Default if unset: 10s TLSHandshakeTimeout time.Duration `yaml:"tls_handshake_timeout"` - // ClientTimeout specifies a time limit for requests made by this - // HTTP Client. The timeout includes connection time, any - // redirects, and reading the response body. The timer remains - // running after Get, Head, Post, or Do return and will - // interrupt reading of the Response.Body. + // ClientTimeout specifies a time limit for operations on the S3 Backend. + // If [UseUpdateMarker] is set, saving the marker is considered part of the operation. // Default if unset: 15m ClientTimeout time.Duration `yaml:"client_timeout"` @@ -244,23 +247,21 @@ func recordMinioDurationMetric(method string, start time.Time) { metricCallHistogram.WithLabelValues(method).Observe(elapsed.Seconds()) } -func (b *Backend) doList(ctx context.Context, prefix string) (simpleblob.BlobList, error) { - var blobs simpleblob.BlobList - +func (b *Backend) doList(ctx context.Context, prefix string) (blobs simpleblob.BlobList, err error) { + ctx, cancel := b.clientTimeoutContext(ctx) + defer cancel() defer recordMinioDurationMetric("list", time.Now()) // Runes to strip from blob names for GlobalPrefix // This is fine, because we can trust the API to only return with the prefix. - // TODO: trust but verify gpEndIndex := len(b.opt.GlobalPrefix) - objCh := b.client.ListObjects(ctx, b.opt.Bucket, minio.ListObjectsOptions{ + objIter := b.client.ListObjectsIter(ctx, b.opt.Bucket, minio.ListObjectsOptions{ Prefix: prefix, Recursive: !b.opt.PrefixFolders && !b.opt.HideFolders, }) - for obj := range objCh { - // Handle error returned by MinIO client - if err := convertMinioError(obj.Err, true); err != nil { + for obj := range objIter { + if err = convertError(ctx, obj.Err, true); err != nil { metricCallErrors.WithLabelValues("list").Inc() metricCallErrorsType.WithLabelValues("list", errorToMetricsLabel(err)).Inc() return nil, err @@ -296,6 +297,8 @@ func (b *Backend) doList(ctx context.Context, prefix string) (simpleblob.BlobLis // configured in b. func (b *Backend) Load(ctx context.Context, name string) ([]byte, error) { name = b.prependGlobalPrefix(name) + ctx, cancel := b.clientTimeoutContext(ctx) + defer cancel() r, err := b.doLoadReader(ctx, name) if err != nil { @@ -304,20 +307,20 @@ func (b *Backend) Load(ctx context.Context, name string) ([]byte, error) { defer r.Close() p, err := io.ReadAll(r) - if err = convertMinioError(err, false); err != nil { + if err = convertError(ctx, err, false); err != nil { return nil, err } return p, nil } -func (b *Backend) doLoadReader(ctx context.Context, name string) (io.ReadCloser, error) { +func (b *Backend) doLoadReader(ctx context.Context, name string) (*minio.Object, error) { metricCalls.WithLabelValues("load").Inc() metricLastCallTimestamp.WithLabelValues("load").SetToCurrentTime() defer recordMinioDurationMetric("load", time.Now()) obj, err := b.client.GetObject(ctx, b.opt.Bucket, name, minio.GetObjectOptions{}) - if err = convertMinioError(err, false); err != nil { + if err = convertError(ctx, err, false); err != nil { if !errors.Is(err, os.ErrNotExist) { metricCallErrors.WithLabelValues("load").Inc() metricCallErrorsType.WithLabelValues("load", errorToMetricsLabel(err)).Inc() @@ -328,7 +331,7 @@ func (b *Backend) doLoadReader(ctx context.Context, name string) (io.ReadCloser, return nil, os.ErrNotExist } info, err := obj.Stat() - if err = convertMinioError(err, false); err != nil { + if err = convertError(ctx, err, false); err != nil { if !errors.Is(err, os.ErrNotExist) { metricCallErrors.WithLabelValues("load").Inc() metricCallErrorsType.WithLabelValues("load", errorToMetricsLabel(err)).Inc() @@ -346,9 +349,9 @@ func (b *Backend) doLoadReader(ctx context.Context, name string) (io.ReadCloser, // Store sets the content of the object identified by name to the content // of data, in the S3 Bucket configured in b. func (b *Backend) Store(ctx context.Context, name string, data []byte) error { - // Prepend global prefix name = b.prependGlobalPrefix(name) - + ctx, cancel := b.clientTimeoutContext(ctx) + defer cancel() info, err := b.doStore(ctx, name, data) if err != nil { return err @@ -376,20 +379,20 @@ func (b *Backend) doStoreReader(ctx context.Context, name string, r io.Reader, s // minio accepts size == -1, meaning the size is unknown. info, err := b.client.PutObject(ctx, b.opt.Bucket, name, r, size, putObjectOptions) - err = convertMinioError(err, false) - if err != nil { + if err = convertError(ctx, err, false); err != nil { metricCallErrors.WithLabelValues("store").Inc() metricCallErrorsType.WithLabelValues("store", errorToMetricsLabel(err)).Inc() + return info, err } - return info, err + return info, nil } // Delete removes the object identified by name from the S3 Bucket // configured in b. func (b *Backend) Delete(ctx context.Context, name string) error { - // Prepend global prefix name = b.prependGlobalPrefix(name) - + ctx, cancel := b.clientTimeoutContext(ctx) + defer cancel() if err := b.doDelete(ctx, name); err != nil { return err } @@ -402,9 +405,42 @@ func (b *Backend) doDelete(ctx context.Context, name string) error { defer recordMinioDurationMetric("delete", time.Now()) err := b.client.RemoveObject(ctx, b.opt.Bucket, name, minio.RemoveObjectOptions{}) - if err = convertMinioError(err, false); err != nil { + if err = convertError(ctx, err, false); err != nil { metricCallErrors.WithLabelValues("delete").Inc() metricCallErrorsType.WithLabelValues("delete", errorToMetricsLabel(err)).Inc() + return err + } + return nil +} + +// clientTimeoutContext wraps [context.WithTimeoutCause] with the values and options that caracterise a client timeout. +func (b *Backend) clientTimeoutContext(ctx context.Context) (context.Context, context.CancelFunc) { + return context.WithTimeoutCause(ctx, getOpt(b.opt.ClientTimeout, DefaultClientTimeout), ErrClientTimeout) +} + +// convertError returns a more informative error from err. +// It may be converted to a [minio.ErrorResponse], +// or an [ErrClientTimeout] when ctx was issued by [(*Backend).contextWithTimeout]. +func convertError(ctx context.Context, err error, isList bool) error { + if err == nil { + return nil + } + // Try to get a more specific error. + if ctx.Err() != nil { + err = context.Cause(ctx) + } else { + errRes := minio.ToErrorResponse(err) + switch errRes.Code { + case "BucketAlreadyOwnedByYou": + // This is the desired outcome if we work on already existing bucket. + return nil + case "NoSuchKey": + // NoSuchKey in a list means the marker is missing. + if !isList { + // This error does not reflect an upstream issue, so no metrics. + return fmt.Errorf("%w: %s", os.ErrNotExist, err.Error()) + } + } } return err } @@ -472,11 +508,6 @@ func New(ctx context.Context, opt Options) (*Backend, error) { TLSClientConfig: tlsConfig, ForceAttemptHTTP2: true, } - hc := &http.Client{ - Transport: transport, - // includes reading response body! - Timeout: getOpt(opt.ClientTimeout, 15*time.Minute), - } // Some of the following calls require a short running context ctx, cancel := context.WithTimeout(ctx, opt.InitTimeout) @@ -512,7 +543,7 @@ func New(ctx context.Context, opt Options) (*Backend, error) { cfg := &minio.Options{ Creds: creds, Secure: useSSL, - Transport: hc.Transport, + Transport: transport, Region: opt.Region, } @@ -537,7 +568,7 @@ func New(ctx context.Context, opt Options) (*Backend, error) { err := client.MakeBucket(ctx, opt.Bucket, minio.MakeBucketOptions{Region: opt.Region}) if err != nil { - if err := convertMinioError(err, false); err != nil { + if err := convertError(ctx, err, false); err != nil { return nil, err } } @@ -561,27 +592,6 @@ func (b *Backend) setGlobalPrefix(prefix string) { b.markerName = b.prependGlobalPrefix(UpdateMarkerFilename) } -// convertMinioError takes an error, possibly a minio.ErrorResponse -// and turns it into a well known error when possible. -// If error is not well known, it is returned as is. -// If error is considered to be ignorable, nil is returned. -func convertMinioError(err error, isList bool) error { - if err == nil { - return nil - } - errRes := minio.ToErrorResponse(err) - // We need to differentiate between a missing bucket and a missing key, - // because a missing bucket is the result of missing rights or a deletion from the outside. - // Thus, we do not use `errRes.StatusCode` that would be == 404 for either case. - if !isList && errRes.Code == "NoSuchKey" { - return fmt.Errorf("%w: %s", os.ErrNotExist, err.Error()) - } - if errRes.Code == "BucketAlreadyOwnedByYou" { - return nil - } - return err -} - // errorToMetricsLabel converts an error into a prometheus label. // If error is a NotExist error, "NotFound" is returned. // If error is a timeout, "Timeout" is returned. @@ -598,6 +608,7 @@ func errorToMetricsLabel(err error) string { } var netError *net.OpError if errors.Is(err, context.DeadlineExceeded) || + errors.Is(err, ErrClientTimeout) || (errors.As(err, &netError) && netError.Timeout()) { return "Timeout" } diff --git a/backends/s3/s3_test.go b/backends/s3/s3_test.go index f3d7b0b..fb1df6f 100644 --- a/backends/s3/s3_test.go +++ b/backends/s3/s3_test.go @@ -2,6 +2,7 @@ package s3 import ( "context" + "io" "testing" "time" @@ -221,3 +222,75 @@ func TestHideFolders(t *testing.T) { assert.Equal(t, []string{"baz"}, ls.Names()) }) } + +// TestClientTimeout makes sure the ClientTimeout option is taken into consideration. +func TestClientTimeout(t *testing.T) { + b := getBackend(t.Context(), t) + t.Run("basic", func(t *testing.T) { + b.opt.ClientTimeout = time.Microsecond + ctx, _ := b.clientTimeoutContext(t.Context()) + time.Sleep(time.Second) + require.ErrorIs(t, context.Cause(ctx), ErrClientTimeout) + }) + t.Run("crud", func(t *testing.T) { + b.opt.ClientTimeout = time.Microsecond + ctx := t.Context() + assert.ErrorIs(t, b.Store(ctx, "crudTest", []byte("123")), ErrClientTimeout) + _, err := b.Load(ctx, "crudTest") + assert.ErrorIs(t, err, ErrClientTimeout) + assert.ErrorIs(t, b.Delete(ctx, "delete"), ErrClientTimeout) + _, err = b.List(ctx, "") + assert.ErrorIs(t, err, ErrClientTimeout) + }) + t.Run("stream write", func(t *testing.T) { + b.opt.ClientTimeout = time.Millisecond + ctx := t.Context() + w, err := b.NewWriter(ctx, "failOnWriteTest") + require.NoError(t, err) + time.Sleep(5 * time.Millisecond) + _, err = w.Write([]byte("123")) + require.ErrorIs(t, err, ErrClientTimeout) + require.ErrorIs(t, w.Close(), ErrClientTimeout) + }) + t.Run("stream read", func(t *testing.T) { + b.opt.ClientTimeout = 0 // Reset + ctx := t.Context() + require.NoError(t, b.Store(ctx, "failOnReadTest", []byte("123"))) // avoid ErrNoExist + + b.opt.ClientTimeout = time.Millisecond + r, err := b.NewReader(ctx, "failOnReadTest") + require.NoError(t, err) + time.Sleep(5 * time.Millisecond) + _, err = r.Read(make([]byte, 1)) + require.ErrorIs(t, err, ErrClientTimeout) + require.ErrorIs(t, r.Close(), ErrClientTimeout) + }) +} + +// Ensure that in Minio Client, context cancellation aborts +// an ongoing PutObject or GetObject operation. +func TestMinioKeepsContext(t *testing.T) { + t.Parallel() + t.Run("putObject", func(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithCancel(t.Context()) + b := getBackend(ctx, t) + _, err := b.client.PutObject(ctx, b.opt.Bucket, "putObject", readerOnce(cancel), -1, minio.PutObjectOptions{}) + require.ErrorIs(t, err, context.Canceled) + }) + t.Run("getObject", func(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithCancel(t.Context()) + b := getBackend(ctx, t) + require.NoError(t, b.Store(ctx, "getObject", []byte("123"))) // avoid ErrNoExist + obj, err := b.client.GetObject(ctx, b.opt.Bucket, "getObject", minio.GetObjectOptions{}) + require.NoError(t, err) + cancel() + _, err = io.Copy(io.Discard, obj) + require.ErrorIs(t, err, context.Canceled) + }) +} + +type readerOnce context.CancelFunc + +func (f readerOnce) Read(p []byte) (int, error) { f(); return len(p), nil } diff --git a/backends/s3/stream.go b/backends/s3/stream.go index 1e61b14..c9ec83b 100644 --- a/backends/s3/stream.go +++ b/backends/s3/stream.go @@ -2,6 +2,7 @@ package s3 import ( "context" + "errors" "io" "github.com/PowerDNS/simpleblob" @@ -11,71 +12,100 @@ import ( // NewReader satisfies StreamReader and provides a read streaming interface to // a blob located on an S3 server. func (b *Backend) NewReader(ctx context.Context, name string) (io.ReadCloser, error) { + ctx, cancel := b.clientTimeoutContext(ctx) name = b.prependGlobalPrefix(name) r, err := b.doLoadReader(ctx, name) if err != nil { return nil, err } - return r, nil + return &readWrapper{r, ctx, cancel}, nil +} + +// A readWrapper implements io.ReadCloser and allows keeping the context around. +type readWrapper struct { + obj *minio.Object + ctx context.Context + cancel context.CancelFunc +} + +func (r *readWrapper) Read(b []byte) (n int, err error) { + n, err = r.obj.Read(b) + if err == context.DeadlineExceeded { + return n, context.Cause(r.ctx) + } + return n, err +} + +func (r *readWrapper) Close() (err error) { + err = r.obj.Close() + if err == nil { + err = context.Cause(r.ctx) + } + r.cancel() + return err } // NewWriter satisfies StreamWriter and provides a write streaming interface to // a blob located on an S3 server. func (b *Backend) NewWriter(ctx context.Context, name string) (io.WriteCloser, error) { + ctx, cancel := b.clientTimeoutContext(ctx) if err := ctx.Err(); err != nil { return nil, err } name = b.prependGlobalPrefix(name) pr, pw := io.Pipe() w := &writerWrapper{ - ctx: ctx, - backend: b, - name: name, - pw: pw, - donePipe: make(chan struct{}), + ctx: ctx, + backend: b, + pw: pw, } - go func() { - var err error - // The following call will return only on error or - // if the writing end of the pipe is closed. + go func(ctx context.Context, b *Backend, name string, pr *io.PipeReader, cancel context.CancelFunc) { + // This call returns when the pipe is closed, or when an error occurs. // It is okay to write to w.info from this goroutine - // because it will only be used after w.donePipe is closed. - w.info, err = w.backend.doStoreReader(w.ctx, w.name, pr, -1) - _ = pr.CloseWithError(err) // Always returns nil. - close(w.donePipe) - }() + // because it will only be used after the WaitGroup is done. + info, err := b.doStoreReader(ctx, name, pr, -1) + if err == nil { + _ = b.setMarker(ctx, name, info.ETag, false) + } + cancel() + }(ctx, b, name, pr, cancel) return w, nil } // A writerWrapper implements io.WriteCloser and is returned by (*Backend).NewWriter. type writerWrapper struct { backend *Backend + closed bool + prevErr error - // We need to keep these around - // to write the marker in Close. - ctx context.Context - info minio.UploadInfo - name string + // We keep track of the context in Write and Close. + ctx context.Context // Writes are sent to this pipe // and then written to S3 in a background goroutine. - pw *io.PipeWriter - donePipe chan struct{} + pw *io.PipeWriter } -func (w *writerWrapper) Write(p []byte) (int, error) { - // Not checking the status of ctx explicitly because it will be propagated - // from the reader goroutine. - return w.pw.Write(p) +func (w *writerWrapper) Write(p []byte) (n int, err error) { + if w.closed { + return 0, errors.Join(w.prevErr, simpleblob.ErrClosed) + } + if err := context.Cause(w.ctx); err != nil { + _ = w.Close() + w.prevErr = err + return 0, err + } + n, w.prevErr = w.pw.Write(p) + return n, w.prevErr } func (w *writerWrapper) Close() error { - select { - case <-w.donePipe: - return simpleblob.ErrClosed - default: + if w.closed { + return errors.Join(w.prevErr, simpleblob.ErrClosed) } - _ = w.pw.Close() // Always returns nil. - <-w.donePipe // Wait for doStoreReader to return and w.info to be set. - return w.backend.setMarker(w.ctx, w.name, w.info.ETag, false) + w.closed = true + w.prevErr = context.Cause(w.ctx) + _ = w.pw.CloseWithError(w.prevErr) + <-w.ctx.Done() + return w.prevErr }