From 20568dbf05b48ab51a7882fe7fc332fe3557316e Mon Sep 17 00:00:00 2001 From: Alec Thomas Date: Tue, 31 Mar 2026 14:41:03 +1100 Subject: [PATCH] refactor: return errors instead of logging them internally Functions should return errors, not log them internally, so callers retain control over how failures are reported and handled. - cacheBundleSync, refreshExpiration: return error, callers log - handler.go: serveCached returns (bool, error), streamNonOKResponse, streamAndCache, fetchAndCache return error, ServeHTTP logs - host.go: buildTargetURL clone URL by value instead of re-parsing - git.go: serveReadyRepo, serveWithSpool, startClone return error - snapshot.go: serveSnapshotWithSpool, streamSnapshotDirect, writeSnapshotSpool return error - doFetch: remove redundant log (was logging AND returning) Co-Authored-By: Claude Opus 4.6 (1M context) --- internal/cache/s3.go | 18 +++-- internal/strategy/git/git.go | 98 +++++++++++++--------------- internal/strategy/git/snapshot.go | 68 +++++++++---------- internal/strategy/handler/handler.go | 63 +++++++++--------- internal/strategy/host.go | 14 +--- 5 files changed, 121 insertions(+), 140 deletions(-) diff --git a/internal/cache/s3.go b/internal/cache/s3.go index 5b66a6a..6cee936 100644 --- a/internal/cache/s3.go +++ b/internal/cache/s3.go @@ -211,7 +211,12 @@ func (s *S3) Open(ctx context.Context, key Key) (io.ReadCloser, http.Header, err remaining := expiresAt.Sub(now) if remaining < s.config.MaxTTL/2 { newExpiresAt := now.Add(s.config.MaxTTL) - go s.refreshExpiration(context.WithoutCancel(ctx), objectName, objInfo, newExpiresAt) + go func() { + bgCtx := context.WithoutCancel(ctx) + if err := s.refreshExpiration(bgCtx, objectName, objInfo, newExpiresAt); err != nil { + s.logger.WarnContext(bgCtx, "Failed to refresh S3 expiration", "object", objectName, "error", err) + } + }() } } } @@ -227,11 +232,11 @@ func (s *S3) Open(ctx context.Context, key Key) (io.ReadCloser, http.Header, err // refreshExpiration updates the Expires-At metadata on an S3 object using // server-side copy-to-self with metadata replacement. This avoids re-uploading -// the object data. Errors are logged but not returned since this is best-effort. -func (s *S3) refreshExpiration(ctx context.Context, objectName string, objInfo minio.ObjectInfo, newExpiresAt time.Time) { +// the object data. +func (s *S3) refreshExpiration(ctx context.Context, objectName string, objInfo minio.ObjectInfo, newExpiresAt time.Time) error { newExpiresAtBytes, err := newExpiresAt.MarshalText() if err != nil { - return + return errors.Wrap(err, "marshal expiration time") } // Rebuild user metadata with updated expiration @@ -250,10 +255,9 @@ func (s *S3) refreshExpiration(ctx context.Context, objectName string, objInfo m ReplaceMetadata: true, } if _, err := s.client.CopyObject(ctx, dst, src); err != nil { - s.logger.WarnContext(ctx, "Failed to refresh S3 expiration", - "object", objectName, - "error", err.Error()) + return errors.Wrap(err, "copy object") } + return nil } const s3ErrNoSuchKey = "NoSuchKey" diff --git a/internal/strategy/git/git.go b/internal/strategy/git/git.go index 5e9e306..f7f483e 100644 --- a/internal/strategy/git/git.go +++ b/internal/strategy/git/git.go @@ -127,7 +127,9 @@ func New( } s.config.ServerURL = strings.TrimRight(config.ServerURL, "/") - s.warmExistingRepos(ctx) + if err := s.warmExistingRepos(ctx); err != nil { + logger.WarnContext(ctx, "Failed to warm existing repos", "error", err) + } s.proxy = &httputil.ReverseProxy{ Director: func(req *http.Request) { @@ -168,19 +170,18 @@ func New( var _ strategy.Strategy = (*Strategy)(nil) -func (s *Strategy) warmExistingRepos(ctx context.Context) { +func (s *Strategy) warmExistingRepos(ctx context.Context) error { logger := logging.FromContext(ctx) existing, err := s.cloneManager.DiscoverExisting(ctx) if err != nil { - logger.WarnContext(ctx, "Failed to discover existing clones", "error", err) + return errors.Wrap(err, "discover existing clones") } for _, repo := range existing { logger.InfoContext(ctx, "Running startup fetch for existing repo", "upstream", repo.UpstreamURL()) preRefs, err := repo.GetLocalRefs(ctx) if err != nil { - logger.WarnContext(ctx, "Failed to get pre-fetch refs for existing repo", "upstream", repo.UpstreamURL(), - "error", err) + return errors.Wrapf(err, "get pre-fetch refs for %s", repo.UpstreamURL()) } start := time.Now() @@ -194,12 +195,10 @@ func (s *Strategy) warmExistingRepos(ctx context.Context) { postRefs, err := repo.GetLocalRefs(ctx) if err != nil { - logger.WarnContext(ctx, "Failed to get post-fetch refs for existing repo", "upstream", repo.UpstreamURL(), - "error", err) - } else { - maps.DeleteFunc(postRefs, func(k, v string) bool { return preRefs[k] == v }) - logger.InfoContext(ctx, "Post-fetch changed refs for existing repo", "upstream", repo.UpstreamURL(), "refs", postRefs) + return errors.Wrapf(err, "get post-fetch refs for %s", repo.UpstreamURL()) } + maps.DeleteFunc(postRefs, func(k, v string) bool { return preRefs[k] == v }) + logger.InfoContext(ctx, "Post-fetch changed refs for existing repo", "upstream", repo.UpstreamURL(), "refs", postRefs) if s.config.SnapshotInterval > 0 { s.scheduleSnapshotJobs(repo) @@ -208,6 +207,7 @@ func (s *Strategy) warmExistingRepos(ctx context.Context) { s.scheduleRepackJobs(repo) } } + return nil } // SetHTTPTransport overrides the HTTP transport used for upstream requests. @@ -267,36 +267,37 @@ func (s *Strategy) handleRequest(w http.ResponseWriter, r *http.Request) { switch state { case gitclone.StateReady: - s.serveReadyRepo(w, r, repo, host, pathValue, isInfoRefs) + if err := s.serveReadyRepo(w, r, repo, host, pathValue, isInfoRefs); err != nil { + logger.ErrorContext(ctx, "Failed to serve from local mirror", "error", err) + http.Error(w, "Internal server error", http.StatusInternalServerError) + } case gitclone.StateCloning, gitclone.StateEmpty: if state == gitclone.StateEmpty { logger.DebugContext(ctx, "Starting background clone, forwarding to upstream") s.scheduler.Submit(repo.UpstreamURL(), "clone", func(ctx context.Context) error { - s.startClone(ctx, repo) - return nil + return s.startClone(ctx, repo) }) } - s.serveWithSpool(w, r, host, pathValue, upstreamURL) + if err := s.serveWithSpool(w, r, host, pathValue, upstreamURL); err != nil { + logger.WarnContext(ctx, "Spool failed, forwarding to upstream", "error", err) + s.forwardToUpstream(w, r, host, pathValue) + } } } -func (s *Strategy) serveReadyRepo(w http.ResponseWriter, r *http.Request, repo *gitclone.Repository, host, pathValue string, isInfoRefs bool) { +func (s *Strategy) serveReadyRepo(w http.ResponseWriter, r *http.Request, repo *gitclone.Repository, host, pathValue string, isInfoRefs bool) error { ctx := r.Context() - logger := logging.FromContext(ctx) - stale, err := s.checkRefsStale(ctx, repo) - if err != nil { - logger.WarnContext(ctx, "Failed to check upstream refs", "upstream", repo.UpstreamURL(), "error", err) - } + stale, _ := s.checkRefsStale(ctx, repo) //nolint:errcheck // best-effort; treat as non-stale on failure if isInfoRefs && stale { // Mirror is behind upstream. Forward to upstream so the client gets // fresh refs immediately, and kick off a background fetch so the // mirror catches up for subsequent requests. - logger.InfoContext(ctx, "Refs stale, forwarding to upstream and fetching in background", "upstream", repo.UpstreamURL()) + logging.FromContext(ctx).InfoContext(ctx, "Refs stale, forwarding to upstream and fetching in background", "upstream", repo.UpstreamURL()) s.submitFetch(repo) s.forwardToUpstream(w, r, host, pathValue) - return + return nil } s.maybeBackgroundFetch(repo) @@ -307,9 +308,7 @@ func (s *Strategy) serveReadyRepo(w http.ResponseWriter, r *http.Request, repo * var readErr error bodyBytes, readErr = io.ReadAll(r.Body) if readErr != nil { - logger.ErrorContext(ctx, "Failed to read request body", "error", readErr) - http.Error(w, "Internal server error", http.StatusInternalServerError) - return + return errors.Wrap(readErr, "read request body") } r.Body = io.NopCloser(bytes.NewReader(bodyBytes)) r.ContentLength = int64(len(bodyBytes)) @@ -320,7 +319,7 @@ func (s *Strategy) serveReadyRepo(w http.ResponseWriter, r *http.Request, repo * // The mirror is missing the requested object — most likely a commit // that was advertised before a concurrent force-push fetch orphaned // it. Fall back to upstream so the client is not left with an error. - logger.InfoContext(ctx, "Falling back to upstream due to 'not our ref'", "path", pathValue) + logging.FromContext(ctx).InfoContext(ctx, "Falling back to upstream due to 'not our ref'", "path", pathValue) if bodyBytes != nil { r.Body = io.NopCloser(bytes.NewReader(bodyBytes)) r.ContentLength = int64(len(bodyBytes)) @@ -328,6 +327,7 @@ func (s *Strategy) serveReadyRepo(w http.ResponseWriter, r *http.Request, repo * } s.forwardToUpstream(w, r, host, pathValue) } + return nil } // SpoolKeyForRequest returns the spool key for a request, or empty string if the @@ -389,32 +389,26 @@ func (s *Strategy) cleanupSpools(upstreamURL string) error { return nil } -func (s *Strategy) serveWithSpool(w http.ResponseWriter, r *http.Request, host, pathValue, upstreamURL string) { +func (s *Strategy) serveWithSpool(w http.ResponseWriter, r *http.Request, host, pathValue, upstreamURL string) error { ctx := r.Context() logger := logging.FromContext(ctx) key, err := SpoolKeyForRequest(pathValue, r) if err != nil { - logger.WarnContext(ctx, "Failed to compute spool key, forwarding to upstream", "error", err) - s.forwardToUpstream(w, r, host, pathValue) - return + return errors.Wrap(err, "compute spool key") } if key == "" { s.forwardToUpstream(w, r, host, pathValue) - return + return nil } rp, err := s.getOrCreateRepoSpools(upstreamURL) if err != nil { - logger.WarnContext(ctx, "Failed to resolve spool directory, forwarding to upstream", "error", err) - s.forwardToUpstream(w, r, host, pathValue) - return + return errors.Wrap(err, "resolve spool directory") } spool, isWriter, err := rp.GetOrCreate(key) if err != nil { - logger.WarnContext(ctx, "Failed to create spool, forwarding to upstream", "error", err) - s.forwardToUpstream(w, r, host, pathValue) - return + return errors.Wrap(err, "create spool") } if isWriter { @@ -422,13 +416,13 @@ func (s *Strategy) serveWithSpool(w http.ResponseWriter, r *http.Request, host, tw := NewSpoolTeeWriter(w, spool) s.forwardToUpstream(tw, r, host, pathValue) spool.MarkComplete() - return + return nil } if spool.Failed() { logger.DebugContext(ctx, "Spool failed, forwarding to upstream", "key", key) s.forwardToUpstream(w, r, host, pathValue) - return + return nil } logger.DebugContext(ctx, "Serving from spool", "key", key, "upstream", upstreamURL) @@ -436,10 +430,11 @@ func (s *Strategy) serveWithSpool(w http.ResponseWriter, r *http.Request, host, if errors.Is(err, ErrSpoolFailed) { logger.DebugContext(ctx, "Spool failed before response started, forwarding to upstream", "key", key) s.forwardToUpstream(w, r, host, pathValue) - return + return nil } - logger.WarnContext(ctx, "Spool read failed mid-stream", "key", key, "error", err) + return errors.Wrapf(err, "spool read failed mid-stream for key %s", key) } + return nil } func ExtractRepoPath(pathValue string) string { @@ -457,7 +452,9 @@ func ExtractRepoPath(pathValue string) string { // context is cancelled. Returns an error if the clone fails or the context is done. func (s *Strategy) ensureCloneReady(ctx context.Context, repo *gitclone.Repository) error { if repo.State() == gitclone.StateEmpty { - s.startClone(ctx, repo) + if err := s.startClone(ctx, repo); err != nil { + return err + } } for repo.State() == gitclone.StateCloning { t := time.NewTimer(500 * time.Millisecond) @@ -474,13 +471,13 @@ func (s *Strategy) ensureCloneReady(ctx context.Context, repo *gitclone.Reposito return nil } -func (s *Strategy) startClone(ctx context.Context, repo *gitclone.Repository) { +func (s *Strategy) startClone(ctx context.Context, repo *gitclone.Repository) error { // Atomically claim the clone so only one goroutine performs the restore // or clone. Without this gate, concurrent snapshot requests each call // startClone and extract tarballs over the same directory, corrupting // packed-refs and other git metadata. if !repo.TryStartCloning() { - return + return nil } logger := logging.FromContext(ctx) @@ -508,13 +505,13 @@ func (s *Strategy) startClone(ctx context.Context, repo *gitclone.Repository) { // fall through to a fresh clone so we don't re-upload bad data. repo.ResetToEmpty() if rmErr := os.RemoveAll(repo.Path()); rmErr != nil { - logger.WarnContext(ctx, "Failed to remove corrupt mirror", "upstream", upstream, "error", rmErr) + return errors.Wrapf(rmErr, "remove corrupt mirror for %s", upstream) } } else { repo.MarkReady() if err := s.cleanupSpools(upstream); err != nil { - logger.WarnContext(ctx, "Failed to clean up spools", "upstream", upstream, "error", err) + return errors.Wrapf(err, "clean up spools for %s", upstream) } logger.InfoContext(ctx, "Post-restore fetch completed, serving", "upstream", upstream) @@ -525,7 +522,7 @@ func (s *Strategy) startClone(ctx context.Context, repo *gitclone.Repository) { if s.config.RepackInterval > 0 { s.scheduleRepackJobs(repo) } - return + return nil } } @@ -537,14 +534,13 @@ func (s *Strategy) startClone(ctx context.Context, repo *gitclone.Repository) { // Clean up spools regardless of clone success or failure, so that subsequent // requests either serve from the local backend or go directly to upstream. if cleanupErr := s.cleanupSpools(upstream); cleanupErr != nil { - logger.WarnContext(ctx, "Failed to clean up spools", "upstream", upstream, "error", cleanupErr) + return errors.Wrapf(cleanupErr, "clean up spools for %s", upstream) } if err != nil { s.metrics.recordOperation(ctx, "clone", "error", time.Since(cloneStart)) - logger.ErrorContext(ctx, "Clone failed", "upstream", upstream, "error", err) repo.ResetToEmpty() - return + return errors.Wrapf(err, "clone %s", upstream) } s.metrics.recordOperation(ctx, "clone", "success", time.Since(cloneStart)) @@ -556,6 +552,7 @@ func (s *Strategy) startClone(ctx context.Context, repo *gitclone.Repository) { if s.config.RepackInterval > 0 { s.scheduleRepackJobs(repo) } + return nil } // tryRestoreSnapshot attempts to restore a mirror from an S3 mirror snapshot. @@ -622,7 +619,6 @@ func (s *Strategy) doFetch(ctx context.Context, repo *gitclone.Repository) error start := time.Now() if err := repo.Fetch(ctx); err != nil { s.metrics.recordOperation(ctx, "fetch", "error", time.Since(start)) - logger.ErrorContext(ctx, "Fetch failed", "upstream", repo.UpstreamURL(), "duration", time.Since(start), "error", err) return errors.Errorf("fetch failed: %w", err) } s.metrics.recordOperation(ctx, "fetch", "success", time.Since(start)) diff --git a/internal/strategy/git/snapshot.go b/internal/strategy/git/snapshot.go index 63be9c7..e50bf53 100644 --- a/internal/strategy/git/snapshot.go +++ b/internal/strategy/git/snapshot.go @@ -269,7 +269,9 @@ func (s *Strategy) handleSnapshotRequest(w http.ResponseWriter, r *http.Request, } if reader == nil { - s.serveSnapshotWithSpool(w, r, repo, upstreamURL) + if err := s.serveSnapshotWithSpool(w, r, repo, upstreamURL); err != nil { + logger.ErrorContext(ctx, "Failed to serve snapshot via spool", "upstream", upstreamURL, "error", err) + } return } defer reader.Close() @@ -360,7 +362,9 @@ func (s *Strategy) serveSnapshotWithBundle(ctx context.Context, w http.ResponseW logger.WarnContext(bgCtx, "Failed to pre-generate bundle", "upstream", upstreamURL, "error", err) return } - s.cacheBundleSync(bgCtx, bundleCacheKey(upstreamURL, snapshotCommit), bundleData) + if err := s.cacheBundleSync(bgCtx, bundleCacheKey(upstreamURL, snapshotCommit), bundleData); err != nil { + logger.WarnContext(bgCtx, "Failed to cache bundle", "upstream", upstreamURL, "error", err) + } }() } @@ -373,26 +377,24 @@ const bundleCacheTTL = 2 * time.Hour func (s *Strategy) cacheBundleAsync(ctx context.Context, key cache.Key, data []byte) { go func() { - s.cacheBundleSync(context.WithoutCancel(ctx), key, data) + bgCtx := context.WithoutCancel(ctx) + if err := s.cacheBundleSync(bgCtx, key, data); err != nil { + logging.FromContext(bgCtx).WarnContext(bgCtx, "Failed to cache bundle", "error", err) + } }() } -func (s *Strategy) cacheBundleSync(ctx context.Context, key cache.Key, data []byte) { - logger := logging.FromContext(ctx) +func (s *Strategy) cacheBundleSync(ctx context.Context, key cache.Key, data []byte) error { headers := http.Header{"Content-Type": {"application/x-git-bundle"}} wc, err := s.cache.Create(ctx, key, headers, bundleCacheTTL) if err != nil { - logger.WarnContext(ctx, "Failed to cache bundle", "error", err) - return + return errors.Wrap(err, "create cache entry") } if _, err := wc.Write(data); err != nil { - logger.WarnContext(ctx, "Failed to write bundle to cache", "error", err) _ = wc.Close() - return - } - if err := wc.Close(); err != nil { - logger.WarnContext(ctx, "Failed to close bundle cache writer", "error", err) + return errors.Wrap(err, "write bundle to cache") } + return errors.Wrap(wc.Close(), "close bundle cache writer") } func revParse(ctx context.Context, repoDir, ref string) (string, error) { @@ -445,7 +447,7 @@ func (s *Strategy) createBundle(ctx context.Context, repo *gitclone.Repository, // mirror, streams tar+zstd to both the HTTP client and a spool file, then // triggers a background cache backfill. Concurrent requests for the same URL // become readers that follow the spool, avoiding redundant clone+tar work. -func (s *Strategy) serveSnapshotWithSpool(w http.ResponseWriter, r *http.Request, repo *gitclone.Repository, upstreamURL string) { +func (s *Strategy) serveSnapshotWithSpool(w http.ResponseWriter, r *http.Request, repo *gitclone.Repository, upstreamURL string) error { ctx := r.Context() logger := logging.FromContext(ctx) @@ -462,49 +464,40 @@ func (s *Strategy) serveSnapshotWithSpool(w http.ResponseWriter, r *http.Request if err := spool.ServeTo(w); err != nil { if errors.Is(err, ErrSpoolFailed) { logger.DebugContext(ctx, "Snapshot spool failed before headers, falling back to direct stream", "upstream", upstreamURL) - s.streamSnapshotDirect(w, r, repo, upstreamURL) - return + return s.streamSnapshotDirect(w, r, repo) } - logger.WarnContext(ctx, "Snapshot spool read error", "upstream", upstreamURL, "error", err) + return errors.Wrap(err, "snapshot spool read") } - return + return nil } // Writer failed; fall through to generate independently. - s.streamSnapshotDirect(w, r, repo, upstreamURL) - return + return s.streamSnapshotDirect(w, r, repo) } - s.writeSnapshotSpool(w, r, repo, upstreamURL, entry) + return s.writeSnapshotSpool(w, r, repo, upstreamURL, entry) } // streamSnapshotDirect streams a snapshot directly to the client without // spooling. Used as a fallback when the spool writer failed. -func (s *Strategy) streamSnapshotDirect(w http.ResponseWriter, r *http.Request, repo *gitclone.Repository, upstreamURL string) { +func (s *Strategy) streamSnapshotDirect(w http.ResponseWriter, r *http.Request, repo *gitclone.Repository) error { ctx := r.Context() - logger := logging.FromContext(ctx) mirrorRoot := s.cloneManager.Config().MirrorRoot snapshotDir, err := os.MkdirTemp(mirrorRoot, ".snapshot-stream-*") if err != nil { - logger.ErrorContext(ctx, "Failed to create temp snapshot dir", "upstream", upstreamURL, "error", err) - http.Error(w, "Internal server error", http.StatusInternalServerError) - return + return errors.Wrap(err, "create temp snapshot dir") } defer func() { _ = os.RemoveAll(snapshotDir) }() repoDir := filepath.Join(snapshotDir, "repo") if err := s.cloneForSnapshot(ctx, repo, repoDir); err != nil { - logger.ErrorContext(ctx, "Failed to clone for snapshot streaming", "upstream", upstreamURL, "error", err) - http.Error(w, "Internal server error", http.StatusInternalServerError) - return + return errors.Wrap(err, "clone for snapshot streaming") } w.Header().Set("Content-Type", "application/zstd") w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=%q", filepath.Base(repoDir)+".tar.zst")) - if err := snapshot.StreamTo(ctx, w, repoDir, nil, s.config.ZstdThreads); err != nil { - logger.ErrorContext(ctx, "Failed to stream snapshot to client", "upstream", upstreamURL, "error", err) - } + return errors.Wrap(snapshot.StreamTo(ctx, w, repoDir, nil, s.config.ZstdThreads), "stream snapshot to client") } // prepareSnapshotSpool creates the spool and clones the mirror into a temp directory, @@ -551,15 +544,13 @@ func (s *Strategy) prepareSnapshotSpool(ctx context.Context, repo *gitclone.Repo // writeSnapshotSpool is the writer path for snapshot spooling. It creates a // spool, clones the mirror, streams the tar+zstd output through a SpoolTeeWriter, // and triggers a background cache backfill. -func (s *Strategy) writeSnapshotSpool(w http.ResponseWriter, r *http.Request, repo *gitclone.Repository, upstreamURL string, entry *snapshotSpoolEntry) { +func (s *Strategy) writeSnapshotSpool(w http.ResponseWriter, r *http.Request, repo *gitclone.Repository, upstreamURL string, entry *snapshotSpoolEntry) error { ctx := r.Context() logger := logging.FromContext(ctx) spool, spoolDir, repoDir, err := s.prepareSnapshotSpool(ctx, repo, upstreamURL, entry) if err != nil { - logger.ErrorContext(ctx, "Failed to prepare snapshot spool", "upstream", upstreamURL, "error", err) - http.Error(w, "Internal server error", http.StatusInternalServerError) - return + return errors.Wrap(err, "prepare snapshot spool") } snapshotDir := filepath.Dir(repoDir) @@ -567,9 +558,9 @@ func (s *Strategy) writeSnapshotSpool(w http.ResponseWriter, r *http.Request, re w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=%q", filepath.Base(repoDir)+".tar.zst")) tw := NewSpoolTeeWriter(w, spool) - if err := snapshot.StreamTo(ctx, tw, repoDir, nil, s.config.ZstdThreads); err != nil { - logger.ErrorContext(ctx, "Failed to stream snapshot to client", "upstream", upstreamURL, "error", err) - spool.MarkError(err) + streamErr := snapshot.StreamTo(ctx, tw, repoDir, nil, s.config.ZstdThreads) + if streamErr != nil { + spool.MarkError(streamErr) } else { spool.MarkComplete() } @@ -598,6 +589,7 @@ func (s *Strategy) writeSnapshotSpool(w http.ResponseWriter, r *http.Request, re if s.config.SnapshotInterval > 0 { s.scheduleSnapshotJobs(repo) } + return errors.Wrap(streamErr, "stream snapshot to client") } // scheduleDeferredMirrorRestore schedules a one-shot background mirror restore diff --git a/internal/strategy/handler/handler.go b/internal/strategy/handler/handler.go index 365c037..f99201b 100644 --- a/internal/strategy/handler/handler.go +++ b/internal/strategy/handler/handler.go @@ -2,7 +2,6 @@ package handler import ( "io" - "log/slog" "maps" "net/http" "os" @@ -106,40 +105,45 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { logger.DebugContext(ctx, "Processing request", "cache_key", cacheKeyStr) - if h.serveCached(w, r, key, logger) { + served, err := h.serveCached(w, r, key) + if err != nil { + logger.ErrorContext(ctx, "Failed to serve from cache", "error", err) + } + if served { return } - h.fetchAndCache(w, r, key, logger) + if err := h.fetchAndCache(w, r, key); err != nil { + logger.ErrorContext(ctx, "Failed to fetch and cache", "error", err) + } } -func (h *Handler) serveCached(w http.ResponseWriter, r *http.Request, key cache.Key, logger *slog.Logger) bool { +func (h *Handler) serveCached(w http.ResponseWriter, r *http.Request, key cache.Key) (bool, error) { cr, headers, err := h.cache.Open(r.Context(), key) if err != nil { if !errors.Is(err, os.ErrNotExist) { h.errorHandler(httputil.Errorf(http.StatusInternalServerError, "failed to open cache: %w", err), w, r) - return true + return true, nil } - return false + return false, nil } - logger.DebugContext(r.Context(), "Cache hit") + logging.FromContext(r.Context()).DebugContext(r.Context(), "Cache hit") defer cr.Close() maps.Copy(w.Header(), headers) if _, err := io.Copy(w, cr); err != nil { - logger.ErrorContext(r.Context(), "Failed to stream from cache", "error", err) - httputil.ErrorResponse(w, r, http.StatusInternalServerError, "Failed to stream from cache", "error", err) + return true, errors.Wrap(err, "stream from cache") } - return true + return true, nil } -func (h *Handler) fetchAndCache(w http.ResponseWriter, r *http.Request, key cache.Key, logger *slog.Logger) { - logger.DebugContext(r.Context(), "Cache miss, fetching from upstream") +func (h *Handler) fetchAndCache(w http.ResponseWriter, r *http.Request, key cache.Key) error { + logging.FromContext(r.Context()).DebugContext(r.Context(), "Cache miss, fetching from upstream") upstreamReq, err := h.transformFunc(r) if err != nil { h.errorHandler(err, w, r) - return + return nil } // Forward safe headers from the original request, without overwriting headers set by transform. @@ -153,53 +157,46 @@ func (h *Handler) fetchAndCache(w http.ResponseWriter, r *http.Request, key cach resp, err := h.client.Do(upstreamReq) if err != nil { h.errorHandler(httputil.Errorf(http.StatusBadGateway, "failed to fetch: %w", err), w, r) - return + return nil } - defer func() { - if closeErr := resp.Body.Close(); closeErr != nil { - logger.ErrorContext(r.Context(), "Failed to close response body", "error", closeErr) - } - }() + defer resp.Body.Close() if resp.StatusCode != http.StatusOK { - h.streamNonOKResponse(w, resp, logger) - return + return h.streamNonOKResponse(w, resp) } - h.streamAndCache(w, r, key, resp, logger) + return h.streamAndCache(w, r, key, resp) } -func (h *Handler) streamNonOKResponse(w http.ResponseWriter, resp *http.Response, logger *slog.Logger) { +func (h *Handler) streamNonOKResponse(w http.ResponseWriter, resp *http.Response) error { w.WriteHeader(resp.StatusCode) if _, err := io.Copy(w, resp.Body); err != nil { - logger.ErrorContext(resp.Request.Context(), "Failed to stream error response", "error", err) + return errors.Wrap(err, "stream non-OK response") } + return nil } -func (h *Handler) streamAndCache(w http.ResponseWriter, r *http.Request, key cache.Key, resp *http.Response, logger *slog.Logger) { +func (h *Handler) streamAndCache(w http.ResponseWriter, r *http.Request, key cache.Key, resp *http.Response) error { ttl := h.ttlFunc(r) responseHeaders := maps.Clone(resp.Header) cw, err := h.cache.Create(r.Context(), key, responseHeaders, ttl) if err != nil { h.errorHandler(httputil.Errorf(http.StatusInternalServerError, "failed to create cache entry: %w", err), w, r) - return + return nil } pr, pw := io.Pipe() go func() { mw := io.MultiWriter(pw, cw) _, copyErr := io.Copy(mw, resp.Body) - closeErr := errors.Join(cw.Close(), resp.Body.Close()) + closeErr := cw.Close() pw.CloseWithError(errors.Join(copyErr, closeErr)) }() maps.Copy(w.Header(), resp.Header) - if _, err := io.Copy(w, pr); err != nil { - logger.ErrorContext(r.Context(), "Failed to stream response", "error", err) - } - if closeErr := pr.Close(); closeErr != nil { - logger.ErrorContext(r.Context(), "Failed to close pipe", "error", closeErr) - } + _, copyErr := io.Copy(w, pr) + closeErr := pr.Close() + return errors.Wrap(errors.Join(copyErr, closeErr), "stream and cache response") } func defaultErrorHandler(err error, w http.ResponseWriter, r *http.Request) { diff --git a/internal/strategy/host.go b/internal/strategy/host.go index 1887cf5..71ddbf9 100644 --- a/internal/strategy/host.go +++ b/internal/strategy/host.go @@ -2,14 +2,12 @@ package strategy import ( "context" - "log/slog" "net/http" "net/url" "github.com/alecthomas/errors" "github.com/block/cachew/internal/cache" - "github.com/block/cachew/internal/logging" "github.com/block/cachew/internal/strategy/handler" ) @@ -36,14 +34,13 @@ type Host struct { target *url.URL cache cache.Cache client *http.Client - logger *slog.Logger prefix string headers map[string]string } var _ Strategy = (*Host)(nil) -func NewHost(ctx context.Context, config HostConfig, cache cache.Cache, mux Mux) (*Host, error) { +func NewHost(_ context.Context, config HostConfig, cache cache.Cache, mux Mux) (*Host, error) { u, err := url.Parse(config.Target) if err != nil { return nil, errors.Errorf("invalid target URL: %w", err) @@ -53,7 +50,6 @@ func NewHost(ctx context.Context, config HostConfig, cache cache.Cache, mux Mux) target: u, cache: cache, client: &http.Client{}, - logger: logging.FromContext(ctx), prefix: prefix, headers: config.Headers, } @@ -91,12 +87,8 @@ func (d *Host) buildTargetURL(r *http.Request) *url.URL { path = "/" } - targetURL, err := url.Parse(d.target.String()) - if err != nil { - d.logger.Error("Failed to parse target URL", "error", err, "target", d.target) - return &url.URL{} - } + targetURL := *d.target targetURL.Path = path targetURL.RawQuery = r.URL.RawQuery - return targetURL + return &targetURL }