Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 11 additions & 7 deletions internal/cache/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,12 @@ func (s *S3) Open(ctx context.Context, key Key) (io.ReadCloser, http.Header, err
remaining := expiresAt.Sub(now)
if remaining < s.config.MaxTTL/2 {
newExpiresAt := now.Add(s.config.MaxTTL)
go s.refreshExpiration(context.WithoutCancel(ctx), objectName, objInfo, newExpiresAt)
go func() {
bgCtx := context.WithoutCancel(ctx)
if err := s.refreshExpiration(bgCtx, objectName, objInfo, newExpiresAt); err != nil {
s.logger.WarnContext(bgCtx, "Failed to refresh S3 expiration", "object", objectName, "error", err)
}
}()
}
}
}
Expand All @@ -227,11 +232,11 @@ func (s *S3) Open(ctx context.Context, key Key) (io.ReadCloser, http.Header, err

// refreshExpiration updates the Expires-At metadata on an S3 object using
// server-side copy-to-self with metadata replacement. This avoids re-uploading
// the object data. Errors are logged but not returned since this is best-effort.
func (s *S3) refreshExpiration(ctx context.Context, objectName string, objInfo minio.ObjectInfo, newExpiresAt time.Time) {
// the object data.
func (s *S3) refreshExpiration(ctx context.Context, objectName string, objInfo minio.ObjectInfo, newExpiresAt time.Time) error {
newExpiresAtBytes, err := newExpiresAt.MarshalText()
if err != nil {
return
return errors.Wrap(err, "marshal expiration time")
}

// Rebuild user metadata with updated expiration
Expand All @@ -250,10 +255,9 @@ func (s *S3) refreshExpiration(ctx context.Context, objectName string, objInfo m
ReplaceMetadata: true,
}
if _, err := s.client.CopyObject(ctx, dst, src); err != nil {
s.logger.WarnContext(ctx, "Failed to refresh S3 expiration",
"object", objectName,
"error", err.Error())
return errors.Wrap(err, "copy object")
}
return nil
}

const s3ErrNoSuchKey = "NoSuchKey"
Expand Down
98 changes: 47 additions & 51 deletions internal/strategy/git/git.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,9 @@ func New(
}
s.config.ServerURL = strings.TrimRight(config.ServerURL, "/")

s.warmExistingRepos(ctx)
if err := s.warmExistingRepos(ctx); err != nil {
logger.WarnContext(ctx, "Failed to warm existing repos", "error", err)
}

s.proxy = &httputil.ReverseProxy{
Director: func(req *http.Request) {
Expand Down Expand Up @@ -168,19 +170,18 @@ func New(

var _ strategy.Strategy = (*Strategy)(nil)

func (s *Strategy) warmExistingRepos(ctx context.Context) {
func (s *Strategy) warmExistingRepos(ctx context.Context) error {
logger := logging.FromContext(ctx)
existing, err := s.cloneManager.DiscoverExisting(ctx)
if err != nil {
logger.WarnContext(ctx, "Failed to discover existing clones", "error", err)
return errors.Wrap(err, "discover existing clones")
}
for _, repo := range existing {
logger.InfoContext(ctx, "Running startup fetch for existing repo", "upstream", repo.UpstreamURL())

preRefs, err := repo.GetLocalRefs(ctx)
if err != nil {
logger.WarnContext(ctx, "Failed to get pre-fetch refs for existing repo", "upstream", repo.UpstreamURL(),
"error", err)
return errors.Wrapf(err, "get pre-fetch refs for %s", repo.UpstreamURL())
}

start := time.Now()
Expand All @@ -194,12 +195,10 @@ func (s *Strategy) warmExistingRepos(ctx context.Context) {

postRefs, err := repo.GetLocalRefs(ctx)
if err != nil {
logger.WarnContext(ctx, "Failed to get post-fetch refs for existing repo", "upstream", repo.UpstreamURL(),
"error", err)
} else {
maps.DeleteFunc(postRefs, func(k, v string) bool { return preRefs[k] == v })
logger.InfoContext(ctx, "Post-fetch changed refs for existing repo", "upstream", repo.UpstreamURL(), "refs", postRefs)
return errors.Wrapf(err, "get post-fetch refs for %s", repo.UpstreamURL())
}
maps.DeleteFunc(postRefs, func(k, v string) bool { return preRefs[k] == v })
logger.InfoContext(ctx, "Post-fetch changed refs for existing repo", "upstream", repo.UpstreamURL(), "refs", postRefs)

if s.config.SnapshotInterval > 0 {
s.scheduleSnapshotJobs(repo)
Expand All @@ -208,6 +207,7 @@ func (s *Strategy) warmExistingRepos(ctx context.Context) {
s.scheduleRepackJobs(repo)
}
}
return nil
}

// SetHTTPTransport overrides the HTTP transport used for upstream requests.
Expand Down Expand Up @@ -267,36 +267,37 @@ func (s *Strategy) handleRequest(w http.ResponseWriter, r *http.Request) {

switch state {
case gitclone.StateReady:
s.serveReadyRepo(w, r, repo, host, pathValue, isInfoRefs)
if err := s.serveReadyRepo(w, r, repo, host, pathValue, isInfoRefs); err != nil {
logger.ErrorContext(ctx, "Failed to serve from local mirror", "error", err)
http.Error(w, "Internal server error", http.StatusInternalServerError)
}

case gitclone.StateCloning, gitclone.StateEmpty:
if state == gitclone.StateEmpty {
logger.DebugContext(ctx, "Starting background clone, forwarding to upstream")
s.scheduler.Submit(repo.UpstreamURL(), "clone", func(ctx context.Context) error {
s.startClone(ctx, repo)
return nil
return s.startClone(ctx, repo)
})
}
s.serveWithSpool(w, r, host, pathValue, upstreamURL)
if err := s.serveWithSpool(w, r, host, pathValue, upstreamURL); err != nil {
logger.WarnContext(ctx, "Spool failed, forwarding to upstream", "error", err)
s.forwardToUpstream(w, r, host, pathValue)
}
}
}

func (s *Strategy) serveReadyRepo(w http.ResponseWriter, r *http.Request, repo *gitclone.Repository, host, pathValue string, isInfoRefs bool) {
func (s *Strategy) serveReadyRepo(w http.ResponseWriter, r *http.Request, repo *gitclone.Repository, host, pathValue string, isInfoRefs bool) error {
ctx := r.Context()
logger := logging.FromContext(ctx)

stale, err := s.checkRefsStale(ctx, repo)
if err != nil {
logger.WarnContext(ctx, "Failed to check upstream refs", "upstream", repo.UpstreamURL(), "error", err)
}
stale, _ := s.checkRefsStale(ctx, repo) //nolint:errcheck // best-effort; treat as non-stale on failure
if isInfoRefs && stale {
// Mirror is behind upstream. Forward to upstream so the client gets
// fresh refs immediately, and kick off a background fetch so the
// mirror catches up for subsequent requests.
logger.InfoContext(ctx, "Refs stale, forwarding to upstream and fetching in background", "upstream", repo.UpstreamURL())
logging.FromContext(ctx).InfoContext(ctx, "Refs stale, forwarding to upstream and fetching in background", "upstream", repo.UpstreamURL())
s.submitFetch(repo)
s.forwardToUpstream(w, r, host, pathValue)
return
return nil
}
s.maybeBackgroundFetch(repo)

Expand All @@ -307,9 +308,7 @@ func (s *Strategy) serveReadyRepo(w http.ResponseWriter, r *http.Request, repo *
var readErr error
bodyBytes, readErr = io.ReadAll(r.Body)
if readErr != nil {
logger.ErrorContext(ctx, "Failed to read request body", "error", readErr)
http.Error(w, "Internal server error", http.StatusInternalServerError)
return
return errors.Wrap(readErr, "read request body")
}
r.Body = io.NopCloser(bytes.NewReader(bodyBytes))
r.ContentLength = int64(len(bodyBytes))
Expand All @@ -320,14 +319,15 @@ func (s *Strategy) serveReadyRepo(w http.ResponseWriter, r *http.Request, repo *
// The mirror is missing the requested object — most likely a commit
// that was advertised before a concurrent force-push fetch orphaned
// it. Fall back to upstream so the client is not left with an error.
logger.InfoContext(ctx, "Falling back to upstream due to 'not our ref'", "path", pathValue)
logging.FromContext(ctx).InfoContext(ctx, "Falling back to upstream due to 'not our ref'", "path", pathValue)
if bodyBytes != nil {
r.Body = io.NopCloser(bytes.NewReader(bodyBytes))
r.ContentLength = int64(len(bodyBytes))
r.TransferEncoding = nil
}
s.forwardToUpstream(w, r, host, pathValue)
}
return nil
}

// SpoolKeyForRequest returns the spool key for a request, or empty string if the
Expand Down Expand Up @@ -389,57 +389,52 @@ func (s *Strategy) cleanupSpools(upstreamURL string) error {
return nil
}

func (s *Strategy) serveWithSpool(w http.ResponseWriter, r *http.Request, host, pathValue, upstreamURL string) {
func (s *Strategy) serveWithSpool(w http.ResponseWriter, r *http.Request, host, pathValue, upstreamURL string) error {
ctx := r.Context()
logger := logging.FromContext(ctx)

key, err := SpoolKeyForRequest(pathValue, r)
if err != nil {
logger.WarnContext(ctx, "Failed to compute spool key, forwarding to upstream", "error", err)
s.forwardToUpstream(w, r, host, pathValue)
return
return errors.Wrap(err, "compute spool key")
}
if key == "" {
s.forwardToUpstream(w, r, host, pathValue)
return
return nil
}

rp, err := s.getOrCreateRepoSpools(upstreamURL)
if err != nil {
logger.WarnContext(ctx, "Failed to resolve spool directory, forwarding to upstream", "error", err)
s.forwardToUpstream(w, r, host, pathValue)
return
return errors.Wrap(err, "resolve spool directory")
}
spool, isWriter, err := rp.GetOrCreate(key)
if err != nil {
logger.WarnContext(ctx, "Failed to create spool, forwarding to upstream", "error", err)
s.forwardToUpstream(w, r, host, pathValue)
return
return errors.Wrap(err, "create spool")
}

if isWriter {
logger.DebugContext(ctx, "Spooling upstream response", "key", key, "upstream", upstreamURL)
tw := NewSpoolTeeWriter(w, spool)
s.forwardToUpstream(tw, r, host, pathValue)
spool.MarkComplete()
return
return nil
}

if spool.Failed() {
logger.DebugContext(ctx, "Spool failed, forwarding to upstream", "key", key)
s.forwardToUpstream(w, r, host, pathValue)
return
return nil
}

logger.DebugContext(ctx, "Serving from spool", "key", key, "upstream", upstreamURL)
if err := spool.ServeTo(w); err != nil {
if errors.Is(err, ErrSpoolFailed) {
logger.DebugContext(ctx, "Spool failed before response started, forwarding to upstream", "key", key)
s.forwardToUpstream(w, r, host, pathValue)
return
return nil
}
logger.WarnContext(ctx, "Spool read failed mid-stream", "key", key, "error", err)
return errors.Wrapf(err, "spool read failed mid-stream for key %s", key)
}
return nil
}

func ExtractRepoPath(pathValue string) string {
Expand All @@ -457,7 +452,9 @@ func ExtractRepoPath(pathValue string) string {
// context is cancelled. Returns an error if the clone fails or the context is done.
func (s *Strategy) ensureCloneReady(ctx context.Context, repo *gitclone.Repository) error {
if repo.State() == gitclone.StateEmpty {
s.startClone(ctx, repo)
if err := s.startClone(ctx, repo); err != nil {
return err
}
}
for repo.State() == gitclone.StateCloning {
t := time.NewTimer(500 * time.Millisecond)
Expand All @@ -474,13 +471,13 @@ func (s *Strategy) ensureCloneReady(ctx context.Context, repo *gitclone.Reposito
return nil
}

func (s *Strategy) startClone(ctx context.Context, repo *gitclone.Repository) {
func (s *Strategy) startClone(ctx context.Context, repo *gitclone.Repository) error {
// Atomically claim the clone so only one goroutine performs the restore
// or clone. Without this gate, concurrent snapshot requests each call
// startClone and extract tarballs over the same directory, corrupting
// packed-refs and other git metadata.
if !repo.TryStartCloning() {
return
return nil
}

logger := logging.FromContext(ctx)
Expand Down Expand Up @@ -508,13 +505,13 @@ func (s *Strategy) startClone(ctx context.Context, repo *gitclone.Repository) {
// fall through to a fresh clone so we don't re-upload bad data.
repo.ResetToEmpty()
if rmErr := os.RemoveAll(repo.Path()); rmErr != nil {
logger.WarnContext(ctx, "Failed to remove corrupt mirror", "upstream", upstream, "error", rmErr)
return errors.Wrapf(rmErr, "remove corrupt mirror for %s", upstream)
}
} else {
repo.MarkReady()

if err := s.cleanupSpools(upstream); err != nil {
logger.WarnContext(ctx, "Failed to clean up spools", "upstream", upstream, "error", err)
return errors.Wrapf(err, "clean up spools for %s", upstream)
}

logger.InfoContext(ctx, "Post-restore fetch completed, serving", "upstream", upstream)
Expand All @@ -525,7 +522,7 @@ func (s *Strategy) startClone(ctx context.Context, repo *gitclone.Repository) {
if s.config.RepackInterval > 0 {
s.scheduleRepackJobs(repo)
}
return
return nil
}
}

Expand All @@ -537,14 +534,13 @@ func (s *Strategy) startClone(ctx context.Context, repo *gitclone.Repository) {
// Clean up spools regardless of clone success or failure, so that subsequent
// requests either serve from the local backend or go directly to upstream.
if cleanupErr := s.cleanupSpools(upstream); cleanupErr != nil {
logger.WarnContext(ctx, "Failed to clean up spools", "upstream", upstream, "error", cleanupErr)
return errors.Wrapf(cleanupErr, "clean up spools for %s", upstream)
}

if err != nil {
s.metrics.recordOperation(ctx, "clone", "error", time.Since(cloneStart))
logger.ErrorContext(ctx, "Clone failed", "upstream", upstream, "error", err)
repo.ResetToEmpty()
return
return errors.Wrapf(err, "clone %s", upstream)
}

s.metrics.recordOperation(ctx, "clone", "success", time.Since(cloneStart))
Expand All @@ -556,6 +552,7 @@ func (s *Strategy) startClone(ctx context.Context, repo *gitclone.Repository) {
if s.config.RepackInterval > 0 {
s.scheduleRepackJobs(repo)
}
return nil
}

// tryRestoreSnapshot attempts to restore a mirror from an S3 mirror snapshot.
Expand Down Expand Up @@ -622,7 +619,6 @@ func (s *Strategy) doFetch(ctx context.Context, repo *gitclone.Repository) error
start := time.Now()
if err := repo.Fetch(ctx); err != nil {
s.metrics.recordOperation(ctx, "fetch", "error", time.Since(start))
logger.ErrorContext(ctx, "Fetch failed", "upstream", repo.UpstreamURL(), "duration", time.Since(start), "error", err)
return errors.Errorf("fetch failed: %w", err)
}
s.metrics.recordOperation(ctx, "fetch", "success", time.Since(start))
Expand Down
Loading