diff --git a/pkg/drivers/generic/tx.go b/pkg/drivers/generic/tx.go index a67a105a..4da8b57e 100644 --- a/pkg/drivers/generic/tx.go +++ b/pkg/drivers/generic/tx.go @@ -3,6 +3,7 @@ package generic import ( "context" "database/sql" + "strings" "time" "github.com/k3s-io/kine/pkg/metrics" @@ -49,7 +50,11 @@ func (t *Tx) Rollback() error { func (t *Tx) MustRollback() { if err := t.Rollback(); err != nil { - if err != sql.ErrTxDone { + if err != sql.ErrTxDone && + // MSSQL auto-rolls back transactions on serialization failures or + // deadlocks; a subsequent Rollback() returns this driver-specific + // error instead of sql.ErrTxDone. + !strings.Contains(err.Error(), "no corresponding BEGIN TRANSACTION") { logrus.Fatalf("Transaction rollback failed: %v", err) } } diff --git a/pkg/drivers/mssql/mssql.go b/pkg/drivers/mssql/mssql.go index 4f6d5501..2fd76aac 100644 --- a/pkg/drivers/mssql/mssql.go +++ b/pkg/drivers/mssql/mssql.go @@ -154,13 +154,18 @@ func New(ctx context.Context, dataSourceName string, tlsInfo tls.Config, connPoo %s ) c`, revSQL, mssqlCountInnerSQL)) + // Use NOLOCK hint to avoid lock contention with concurrent writes. + // The poll query runs every ~1s; any missed rows from dirty reads + // will be caught in the next iteration. + nolockRevSQL := `SELECT MAX(rkv.id) FROM kine AS rkv WITH (NOLOCK)` + nolockCompactRevSQL := `SELECT MAX(crkv.prev_revision) FROM kine AS crkv WITH (NOLOCK) WHERE crkv.name = 'compact_rev_key'` dialect.AfterSQL = q(fmt.Sprintf(` SELECT (%s) AS id, (%s) AS compact_revision, %s - FROM kine AS kv + FROM kine AS kv WITH (NOLOCK) WHERE kv.name LIKE ? AND kv.id > ? - ORDER BY kv.id ASC`, revSQL, compactRevSQL, columns)) + ORDER BY kv.id ASC`, nolockRevSQL, nolockCompactRevSQL, columns)) dialect.GetSizeSQL = ` SELECT SUM(reserved_page_count) * 8 * 1024 diff --git a/pkg/drivers/pgsql/pgsql.go b/pkg/drivers/pgsql/pgsql.go index 41d469e7..91104a58 100644 --- a/pkg/drivers/pgsql/pgsql.go +++ b/pkg/drivers/pgsql/pgsql.go @@ -139,7 +139,23 @@ func setup(db *sql.DB) error { for _, stmt := range schema { logrus.Tracef("SETUP EXEC : %v", util.Stripped(stmt)) - _, err := db.Exec(stmt) + var err error + // Retry index creation to work around a known PostgreSQL race + // where CREATE INDEX IF NOT EXISTS can fail with a duplicate-key + // error on pg_class when concurrent sessions (or background + // processes) touch the same catalog entries. + for attempt := 0; attempt < 3; attempt++ { + _, err = db.Exec(stmt) + if err == nil { + break + } + if pgErr, ok := err.(*pgconn.PgError); ok && pgErr.Code == pgerrcode.UniqueViolation && strings.Contains(pgErr.ConstraintName, "pg_class") { + logrus.Warnf("SETUP EXEC attempt %d hit pg_class race, retrying: %v", attempt+1, err) + time.Sleep(100 * time.Millisecond) + continue + } + return err + } if err != nil { return err } diff --git a/pkg/logstructured/sqllog/sql.go b/pkg/logstructured/sqllog/sql.go index b2248c2f..1db1a680 100644 --- a/pkg/logstructured/sqllog/sql.go +++ b/pkg/logstructured/sqllog/sql.go @@ -17,7 +17,11 @@ const ( compactInterval = 5 * time.Minute compactTimeout = 60 * time.Second compactMinRetain = 1000 - compactBatchSize = 1000 + // Use smaller batches to reduce lock contention on MSSQL, where + // SERIALIZABLE DELETE transactions can block other queries for the + // entire batch duration. 100 revisions per batch keeps each + // transaction under ~1 s even on slow backends. + compactBatchSize = 100 pollBatchSize = 500 ) @@ -166,6 +170,10 @@ outer: metrics.CompactTotal.WithLabelValues(metrics.ResultError).Inc() continue outer } + + // Yield briefly between batches so that other queries blocked + // by SERIALIZABLE locks can make progress. + time.Sleep(50 * time.Millisecond) } if err := s.postCompact(); err != nil {