Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion pkg/drivers/generic/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package generic
import (
"context"
"database/sql"
"strings"
"time"

"github.com/k3s-io/kine/pkg/metrics"
Expand Down Expand Up @@ -49,7 +50,11 @@ func (t *Tx) Rollback() error {

func (t *Tx) MustRollback() {
if err := t.Rollback(); err != nil {
if err != sql.ErrTxDone {
if err != sql.ErrTxDone &&
// MSSQL auto-rolls back transactions on serialization failures or
// deadlocks; a subsequent Rollback() returns this driver-specific
// error instead of sql.ErrTxDone.
!strings.Contains(err.Error(), "no corresponding BEGIN TRANSACTION") {
logrus.Fatalf("Transaction rollback failed: %v", err)
}
}
Expand Down
9 changes: 7 additions & 2 deletions pkg/drivers/mssql/mssql.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,13 +154,18 @@ func New(ctx context.Context, dataSourceName string, tlsInfo tls.Config, connPoo
%s
) c`, revSQL, mssqlCountInnerSQL))

// Use NOLOCK hint to avoid lock contention with concurrent writes.
// The poll query runs every ~1s; any missed rows from dirty reads
// will be caught in the next iteration.
nolockRevSQL := `SELECT MAX(rkv.id) FROM kine AS rkv WITH (NOLOCK)`
nolockCompactRevSQL := `SELECT MAX(crkv.prev_revision) FROM kine AS crkv WITH (NOLOCK) WHERE crkv.name = 'compact_rev_key'`
dialect.AfterSQL = q(fmt.Sprintf(`
SELECT (%s) AS id, (%s) AS compact_revision, %s
FROM kine AS kv
FROM kine AS kv WITH (NOLOCK)
WHERE
kv.name LIKE ? AND
kv.id > ?
ORDER BY kv.id ASC`, revSQL, compactRevSQL, columns))
ORDER BY kv.id ASC`, nolockRevSQL, nolockCompactRevSQL, columns))
Comment on lines +157 to +168
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🌐 Web query:

In SQL Server, for a polling query on an append-only event table, does WITH (NOLOCK) allow reading uncommitted rows that may later roll back? Compare this with WITH (READPAST) regarding lock avoidance and consistency guarantees.

💡 Result:

Yes, in SQL Server, WITH (NOLOCK) allows reading uncommitted rows from an append-only event table that may later roll back. NOLOCK (equivalent to READUNCOMMITTED) enables dirty reads by not acquiring shared locks and ignoring exclusive locks from other transactions, potentially reading data modifications that get rolled back. This provides high concurrency and avoids blocking but risks inconsistency, such as seeing phantom rows that disappear or data never committed. In comparison, WITH (READPAST) avoids locks by skipping locked rows entirely, ensuring only committed data is read (no dirty reads) but potentially missing rows that match the query criteria if they are locked during an insert/update transaction. READPAST requires READ COMMITTED or REPEATABLE READ isolation (or SNAPSHOT with other hints) and is useful for queue-like scenarios but offers no consistency guarantee against missing data. For a polling query on an append-only event table: - NOLOCK: May read uncommitted inserts (later rolled back), avoiding all blocking. - READPAST: Skips locked (uncommitted) rows, so polls only committed events but ignores in-flight ones, also avoiding blocking. NOLOCK prioritizes speed over accuracy (dirty reads possible); READPAST prioritizes avoiding dirty reads but sacrifices completeness.

Citations:


NOLOCK allows reading uncommitted rows that may later roll back.

Lines 160–164 use WITH (NOLOCK) hints on the polling query, which enables dirty reads. For an append-only event table, this risks processing events from transactions that subsequently roll back, causing phantom rows to disappear from the dataset after being published. Consider using WITH (READPAST) instead to skip locked (uncommitted) rows while avoiding blocking, ensuring only committed data is polled.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/drivers/mssql/mssql.go` around lines 157 - 168, The current polling SQL
strings nolockRevSQL, nolockCompactRevSQL and the query assigned to
dialect.AfterSQL use "WITH (NOLOCK)" which permits dirty (uncommitted) reads;
change those hints to "WITH (READPAST)" so the poll skips locked/uncommitted
rows instead of reading them. Locate nolockRevSQL, nolockCompactRevSQL and the
q(fmt.Sprintf(...)) that sets dialect.AfterSQL and replace each "WITH (NOLOCK)"
occurrence with "WITH (READPAST)" ensuring the same SQL structure and
placeholders remain unchanged.


dialect.GetSizeSQL = `
SELECT SUM(reserved_page_count) * 8 * 1024
Expand Down
18 changes: 17 additions & 1 deletion pkg/drivers/pgsql/pgsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,23 @@ func setup(db *sql.DB) error {

for _, stmt := range schema {
logrus.Tracef("SETUP EXEC : %v", util.Stripped(stmt))
_, err := db.Exec(stmt)
var err error
// Retry index creation to work around a known PostgreSQL race
// where CREATE INDEX IF NOT EXISTS can fail with a duplicate-key
// error on pg_class when concurrent sessions (or background
// processes) touch the same catalog entries.
for attempt := 0; attempt < 3; attempt++ {
_, err = db.Exec(stmt)
if err == nil {
break
}
if pgErr, ok := err.(*pgconn.PgError); ok && pgErr.Code == pgerrcode.UniqueViolation && strings.Contains(pgErr.ConstraintName, "pg_class") {
logrus.Warnf("SETUP EXEC attempt %d hit pg_class race, retrying: %v", attempt+1, err)
time.Sleep(100 * time.Millisecond)
continue
Comment on lines +152 to +155
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Avoid “retrying” log and sleep on the last attempt.

Line 153 and Line 154 execute even on the final attempt (attempt == 2), so the code logs “retrying” and waits 100ms when no retry will happen.

Proposed fix
-			if pgErr, ok := err.(*pgconn.PgError); ok && pgErr.Code == pgerrcode.UniqueViolation && strings.Contains(pgErr.ConstraintName, "pg_class") {
-				logrus.Warnf("SETUP EXEC attempt %d hit pg_class race, retrying: %v", attempt+1, err)
-				time.Sleep(100 * time.Millisecond)
-				continue
-			}
-			return err
+			if pgErr, ok := err.(*pgconn.PgError); ok && pgErr.Code == pgerrcode.UniqueViolation && strings.Contains(pgErr.ConstraintName, "pg_class") {
+				if attempt < 2 {
+					logrus.Warnf("SETUP EXEC attempt %d hit pg_class race, retrying: %v", attempt+1, err)
+					time.Sleep(100 * time.Millisecond)
+					continue
+				}
+			}
+			return err
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if pgErr, ok := err.(*pgconn.PgError); ok && pgErr.Code == pgerrcode.UniqueViolation && strings.Contains(pgErr.ConstraintName, "pg_class") {
logrus.Warnf("SETUP EXEC attempt %d hit pg_class race, retrying: %v", attempt+1, err)
time.Sleep(100 * time.Millisecond)
continue
if pgErr, ok := err.(*pgconn.PgError); ok && pgErr.Code == pgerrcode.UniqueViolation && strings.Contains(pgErr.ConstraintName, "pg_class") {
if attempt < 2 {
logrus.Warnf("SETUP EXEC attempt %d hit pg_class race, retrying: %v", attempt+1, err)
time.Sleep(100 * time.Millisecond)
continue
}
}
return err
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/drivers/pgsql/pgsql.go` around lines 152 - 155, The retry branch logs
"retrying" and sleeps even on the final attempt because it doesn't check the
loop's last iteration; change the if-block that checks pgErr (the condition
using pgerrcode.UniqueViolation and strings.Contains(pgErr.ConstraintName,
"pg_class")) to only call logrus.Warnf("... retrying ...", attempt+1, err) and
time.Sleep(100 * time.Millisecond) when the current attempt is not the final one
(e.g., check attempt < maxAttempts-1 or attempt != lastAttempt); leave the error
handling/continue flow untouched for the final attempt so no misleading log or
unnecessary sleep occurs.

}
return err
}
if err != nil {
return err
}
Expand Down
10 changes: 9 additions & 1 deletion pkg/logstructured/sqllog/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@ const (
compactInterval = 5 * time.Minute
compactTimeout = 60 * time.Second
compactMinRetain = 1000
compactBatchSize = 1000
// Use smaller batches to reduce lock contention on MSSQL, where
// SERIALIZABLE DELETE transactions can block other queries for the
// entire batch duration. 100 revisions per batch keeps each
// transaction under ~1 s even on slow backends.
compactBatchSize = 100
Comment on lines +20 to +24
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Compaction is over-throttled globally (100 batch + fixed 50ms sleep each loop).

Line 24 plus Lines 174-176 can reduce compaction throughput by an order of magnitude under backlog, and it diverges from the stated 1,000-row objective.

Proposed patch
-	compactBatchSize = 100
+	compactBatchSize = 1000
@@
-			time.Sleep(50 * time.Millisecond)
+			select {
+			case <-s.ctx.Done():
+				return
+			case <-time.After(50 * time.Millisecond):
+			}

Also applies to: 174-176

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/logstructured/sqllog/sql.go` around lines 20 - 24, The compaction is
over-throttled: increase compactBatchSize from 100 to 1000 (change the
compactBatchSize constant) and remove the unconditional 50ms sleep inside the
compaction loop (the loop that currently sleeps at the end of each iteration) —
instead make the pause adaptive or configurable (e.g., a new CompactionConfig or
env var controlling throttle/backoff) so under backlog the loop can process
multiple back-to-back batches while still allowing small delays for MSSQL if
needed; update references to compactBatchSize and the compaction loop/sleep call
accordingly.

pollBatchSize = 500
)

Expand Down Expand Up @@ -166,6 +170,10 @@ outer:
metrics.CompactTotal.WithLabelValues(metrics.ResultError).Inc()
continue outer
}

// Yield briefly between batches so that other queries blocked
// by SERIALIZABLE locks can make progress.
time.Sleep(50 * time.Millisecond)
}

if err := s.postCompact(); err != nil {
Expand Down