Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions go/cmd/gh-ost/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,9 @@ func main() {
if migrationContext.CheckpointIntervalSeconds < 10 {
migrationContext.Log.Fatalf("--checkpoint-seconds should be >=10")
}
if migrationContext.CountTableRows && migrationContext.PanicOnWarnings {
migrationContext.Log.Warning("--exact-rowcount with --panic-on-warnings: row counts cannot be exact due to warning detection")
}

switch *cutOver {
case "atomic", "default", "":
Expand Down
209 changes: 140 additions & 69 deletions go/logic/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -1497,6 +1497,107 @@ func (this *Applier) buildDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) []*dmlB
return []*dmlBuildResult{newDmlBuildResultError(fmt.Errorf("Unknown dml event type: %+v", dmlEvent.DML))}
}

// executeBatchWithWarningChecking executes a batch of DML statements with SHOW WARNINGS
// interleaved after each statement to detect warnings from any statement in the batch.
// This is used when PanicOnWarnings is enabled to ensure warnings from middle statements
// are not lost (SHOW WARNINGS only shows warnings from the last statement in a multi-statement batch).
func (this *Applier) executeBatchWithWarningChecking(ctx context.Context, tx *gosql.Tx, buildResults []*dmlBuildResult) (int64, error) {
// Build query with interleaved SHOW WARNINGS: stmt1; SHOW WARNINGS; stmt2; SHOW WARNINGS; ...
var queryBuilder strings.Builder
args := make([]interface{}, 0)

for _, buildResult := range buildResults {
queryBuilder.WriteString(buildResult.query)
queryBuilder.WriteString(";\nSHOW WARNINGS;\n")
args = append(args, buildResult.args...)
}

query := queryBuilder.String()

// Execute the multi-statement query
rows, err := tx.QueryContext(ctx, query, args...)
if err != nil {
return 0, fmt.Errorf("%w; query=%s; args=%+v", err, query, args)
}
defer rows.Close()

var totalDelta int64

// QueryContext with multi-statement queries returns rows positioned at the first result set
// that produces rows (i.e., the first SHOW WARNINGS), automatically skipping DML results.
// Verify we're at a SHOW WARNINGS result set (should have 3 columns: Level, Code, Message)
cols, err := rows.Columns()
if err != nil {
return 0, fmt.Errorf("failed to get columns: %w", err)
}

// If somehow we're not at a result set with columns, try to advance
if len(cols) == 0 {
if !rows.NextResultSet() {
return 0, fmt.Errorf("expected SHOW WARNINGS result set after first statement")
}
}

// Compile regex once before loop to avoid performance penalty and handle errors properly
migrationKeyRegex, err := this.compileMigrationKeyWarningRegex()
if err != nil {
return 0, err
}

// Iterate through SHOW WARNINGS result sets.
// DML statements don't create navigable result sets, so we move directly between SHOW WARNINGS.
// Pattern: [at SHOW WARNINGS #1] -> read warnings -> NextResultSet() -> [at SHOW WARNINGS #2] -> ...
for i := 0; i < len(buildResults); i++ {
// We can't get exact rows affected with QueryContext (needed for reading SHOW WARNINGS).
// Use the theoretical delta (+1 for INSERT, -1 for DELETE, 0 for UPDATE) as an approximation.
// This may be inaccurate (e.g., INSERT IGNORE with duplicate affects 0 rows but we count +1).
totalDelta += buildResults[i].rowsDelta

// Read warnings from this statement's SHOW WARNINGS result set
var sqlWarnings []string
for rows.Next() {
var level, message string
var code int
if err := rows.Scan(&level, &code, &message); err != nil {
// Scan failure means we cannot reliably read warnings.
// Since PanicOnWarnings is a safety feature, we must fail hard rather than silently skip.
return 0, fmt.Errorf("failed to scan SHOW WARNINGS for statement %d: %w", i+1, err)
}

if strings.Contains(message, "Duplicate entry") && migrationKeyRegex.MatchString(message) {
// Duplicate entry on migration unique key is expected during binlog replay
// (row was already copied during bulk copy phase)
continue
}
sqlWarnings = append(sqlWarnings, fmt.Sprintf("%s: %s (%d)", level, message, code))
}

// Check for errors that occurred while iterating through warnings
if err := rows.Err(); err != nil {
return 0, fmt.Errorf("error reading SHOW WARNINGS result set for statement %d: %w", i+1, err)
}

if len(sqlWarnings) > 0 {
return 0, fmt.Errorf("warnings detected in statement %d of %d: %v", i+1, len(buildResults), sqlWarnings)
}

// Move to the next statement's SHOW WARNINGS result set
// For the last statement, there's no next result set
// DML statements don't create result sets, so we only need one NextResultSet call
// to move from SHOW WARNINGS #N to SHOW WARNINGS #(N+1)
if i < len(buildResults)-1 {
if !rows.NextResultSet() {
if err := rows.Err(); err != nil {
return 0, fmt.Errorf("error moving to SHOW WARNINGS for statement %d: %w", i+2, err)
}
return 0, fmt.Errorf("expected SHOW WARNINGS result set for statement %d", i+2)
}
}
}

return totalDelta, nil
}

// ApplyDMLEventQueries applies multiple DML queries onto the _ghost_ table
func (this *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent)) error {
var totalDelta int64
Expand Down Expand Up @@ -1536,82 +1637,52 @@ func (this *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent))
}
}

// We batch together the DML queries into multi-statements to minimize network trips.
// We have to use the raw driver connection to access the rows affected
// for each statement in the multi-statement.
execErr := conn.Raw(func(driverConn any) error {
ex := driverConn.(driver.ExecerContext)
nvc := driverConn.(driver.NamedValueChecker)

multiArgs := make([]driver.NamedValue, 0, nArgs)
multiQueryBuilder := strings.Builder{}
for _, buildResult := range buildResults {
for _, arg := range buildResult.args {
nv := driver.NamedValue{Value: driver.Value(arg)}
nvc.CheckNamedValue(&nv)
multiArgs = append(multiArgs, nv)
}

multiQueryBuilder.WriteString(buildResult.query)
multiQueryBuilder.WriteString(";\n")
}

res, err := ex.ExecContext(ctx, multiQueryBuilder.String(), multiArgs)
if err != nil {
err = fmt.Errorf("%w; query=%s; args=%+v", err, multiQueryBuilder.String(), multiArgs)
return err
}

mysqlRes := res.(drivermysql.Result)

// each DML is either a single insert (delta +1), update (delta +0) or delete (delta -1).
// multiplying by the rows actually affected (either 0 or 1) will give an accurate row delta for this DML event
for i, rowsAffected := range mysqlRes.AllRowsAffected() {
totalDelta += buildResults[i].rowsDelta * rowsAffected
}
return nil
})

if execErr != nil {
return rollback(execErr)
}

// Check for warnings when PanicOnWarnings is enabled
// When PanicOnWarnings is enabled, we need to check warnings after each statement
// in the batch. SHOW WARNINGS only shows warnings from the last statement in a
// multi-statement query, so we interleave SHOW WARNINGS after each DML statement.
if this.migrationContext.PanicOnWarnings {
//nolint:execinquery
rows, err := tx.Query("SHOW WARNINGS")
if err != nil {
return rollback(err)
}
defer rows.Close()
if err = rows.Err(); err != nil {
return rollback(err)
}

// Compile regex once before loop to avoid performance penalty and handle errors properly
migrationKeyRegex, err := this.compileMigrationKeyWarningRegex()
totalDelta, err = this.executeBatchWithWarningChecking(ctx, tx, buildResults)
if err != nil {
return rollback(err)
}
} else {
// Fast path: batch together DML queries into multi-statements to minimize network trips.
// We use the raw driver connection to access the rows affected for each statement.
execErr := conn.Raw(func(driverConn any) error {
ex := driverConn.(driver.ExecerContext)
nvc := driverConn.(driver.NamedValueChecker)

multiArgs := make([]driver.NamedValue, 0, nArgs)
multiQueryBuilder := strings.Builder{}
for _, buildResult := range buildResults {
for _, arg := range buildResult.args {
nv := driver.NamedValue{Value: driver.Value(arg)}
nvc.CheckNamedValue(&nv)
multiArgs = append(multiArgs, nv)
}

multiQueryBuilder.WriteString(buildResult.query)
multiQueryBuilder.WriteString(";\n")
}

var sqlWarnings []string
for rows.Next() {
var level, message string
var code int
if err := rows.Scan(&level, &code, &message); err != nil {
this.migrationContext.Log.Warningf("Failed to read SHOW WARNINGS row")
continue
res, err := ex.ExecContext(ctx, multiQueryBuilder.String(), multiArgs)
if err != nil {
err = fmt.Errorf("%w; query=%s; args=%+v", err, multiQueryBuilder.String(), multiArgs)
return err
}
if strings.Contains(message, "Duplicate entry") && migrationKeyRegex.MatchString(message) {
// Duplicate entry on migration unique key is expected during binlog replay
// (row was already copied during bulk copy phase)
continue

mysqlRes := res.(drivermysql.Result)

// each DML is either a single insert (delta +1), update (delta +0) or delete (delta -1).
// multiplying by the rows actually affected (either 0 or 1) will give an accurate row delta for this DML event
for i, rowsAffected := range mysqlRes.AllRowsAffected() {
totalDelta += buildResults[i].rowsDelta * rowsAffected
}
sqlWarnings = append(sqlWarnings, fmt.Sprintf("%s: %s (%d)", level, message, code))
}
if len(sqlWarnings) > 0 {
warningMsg := fmt.Sprintf("Warnings detected during DML event application: %v", sqlWarnings)
return rollback(errors.New(warningMsg))
return nil
})

if execErr != nil {
return rollback(execErr)
}
}

Expand Down
110 changes: 110 additions & 0 deletions go/logic/applier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1367,6 +1367,116 @@ func (suite *ApplierTestSuite) TestPanicOnWarningsDisabled() {
suite.Require().Equal("bob@example.com", results[1].email)
}

// TestMultipleDMLEventsInBatch tests that multiple DML events are processed in a single transaction
// and that if one fails due to a warning, the entire batch is rolled back - including events that
// come AFTER the failure. This proves true transaction atomicity.
func (suite *ApplierTestSuite) TestMultipleDMLEventsInBatch() {
ctx := context.Background()

var err error

// Create table with id and email columns
_, err = suite.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT PRIMARY KEY, email VARCHAR(100));", getTestTableName()))
suite.Require().NoError(err)

// Create ghost table with unique index on email
_, err = suite.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT PRIMARY KEY, email VARCHAR(100), UNIQUE KEY email_unique (email));", getTestGhostTableName()))
suite.Require().NoError(err)

connectionConfig, err := getTestConnectionConfig(ctx, suite.mysqlContainer)
suite.Require().NoError(err)

migrationContext := newTestMigrationContext()
migrationContext.ApplierConnectionConfig = connectionConfig
migrationContext.SetConnectionConfig("innodb")

migrationContext.PanicOnWarnings = true

migrationContext.OriginalTableColumns = sql.NewColumnList([]string{"id", "email"})
migrationContext.SharedColumns = sql.NewColumnList([]string{"id", "email"})
migrationContext.MappedSharedColumns = sql.NewColumnList([]string{"id", "email"})
migrationContext.UniqueKey = &sql.UniqueKey{
Name: "PRIMARY",
NameInGhostTable: "PRIMARY",
Columns: *sql.NewColumnList([]string{"id"}),
}

applier := NewApplier(migrationContext)
suite.Require().NoError(applier.prepareQueries())
defer applier.Teardown()

err = applier.InitDBConnections()
suite.Require().NoError(err)

// Insert initial rows into ghost table
_, err = suite.db.ExecContext(ctx, fmt.Sprintf("INSERT INTO %s (id, email) VALUES (1, 'alice@example.com'), (3, 'charlie@example.com');", getTestGhostTableName()))
suite.Require().NoError(err)

// Simulate multiple binlog events in a batch:
// 1. Duplicate on PRIMARY KEY (allowed - expected during binlog replay)
// 2. Duplicate on email index (should fail) ← FAILURE IN MIDDLE
// 3. Valid insert (would succeed) ← SUCCESS AFTER FAILURE
//
// The critical test: Even though event #3 would succeed on its own, it must be rolled back
// because event #2 failed. This proves the entire batch is truly atomic.
dmlEvents := []*binlog.BinlogDMLEvent{
{
DatabaseName: testMysqlDatabase,
TableName: testMysqlTableName,
DML: binlog.InsertDML,
NewColumnValues: sql.ToColumnValues([]interface{}{1, "alice@example.com"}), // duplicate PRIMARY (normally allowed)
},
{
DatabaseName: testMysqlDatabase,
TableName: testMysqlTableName,
DML: binlog.InsertDML,
NewColumnValues: sql.ToColumnValues([]interface{}{4, "alice@example.com"}), // duplicate email (FAILS)
},
{
DatabaseName: testMysqlDatabase,
TableName: testMysqlTableName,
DML: binlog.InsertDML,
NewColumnValues: sql.ToColumnValues([]interface{}{2, "bob@example.com"}), // valid insert (would succeed)
},
}

// Should fail due to the second event
err = applier.ApplyDMLEventQueries(dmlEvents)
suite.Require().Error(err)
suite.Require().Contains(err.Error(), "Duplicate entry")

// Verify that the entire batch was rolled back - still only the original 2 rows
// Critically: id=2 (bob@example.com) from event #3 should NOT be present
rows, err := suite.db.Query("SELECT id, email FROM " + getTestGhostTableName() + " ORDER BY id")
suite.Require().NoError(err)
defer rows.Close()

var results []struct {
id int
email string
}
for rows.Next() {
var id int
var email string
err = rows.Scan(&id, &email)
suite.Require().NoError(err)
results = append(results, struct {
id int
email string
}{id, email})
}
suite.Require().NoError(rows.Err())

// Should still have exactly 2 original rows (entire batch was rolled back)
// This proves that even event #3 (which would have succeeded) was rolled back
suite.Require().Len(results, 2)
suite.Require().Equal(1, results[0].id)
suite.Require().Equal("alice@example.com", results[0].email)
suite.Require().Equal(3, results[1].id)
suite.Require().Equal("charlie@example.com", results[1].email)
// Critically: id=2 (bob@example.com) is NOT present, proving event #3 was rolled back
}

func TestApplier(t *testing.T) {
suite.Run(t, new(ApplierTestSuite))
}
11 changes: 11 additions & 0 deletions localtests/panic-on-warnings-batch-middle/create.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
drop table if exists gh_ost_test;
create table gh_ost_test (
id int auto_increment,
email varchar(255) not null,
primary key (id)
) auto_increment=1;

-- Insert initial data - all unique emails
insert into gh_ost_test (email) values ('alice@example.com');
insert into gh_ost_test (email) values ('bob@example.com');
insert into gh_ost_test (email) values ('charlie@example.com');
1 change: 1 addition & 0 deletions localtests/panic-on-warnings-batch-middle/expect_failure
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ERROR warnings detected in statement 1 of 2
1 change: 1 addition & 0 deletions localtests/panic-on-warnings-batch-middle/extra_args
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
--default-retries=1 --panic-on-warnings --alter "add unique index email_idx(email)" --postpone-cut-over-flag-file=/tmp/gh-ost-test.postpone-cutover
Loading
Loading