From d20c72d961daa2e0c050511abd45cbbca27b7131 Mon Sep 17 00:00:00 2001 From: Gabriel Gilder Date: Mon, 9 Mar 2026 16:08:27 -0700 Subject: [PATCH 1/3] Handle warnings in middle of DML batch --- go/cmd/gh-ost/main.go | 3 + go/logic/applier.go | 209 ++++++++++++++++++++++++++------------- go/logic/applier_test.go | 110 +++++++++++++++++++++ 3 files changed, 253 insertions(+), 69 deletions(-) diff --git a/go/cmd/gh-ost/main.go b/go/cmd/gh-ost/main.go index fae519680..567137fd5 100644 --- a/go/cmd/gh-ost/main.go +++ b/go/cmd/gh-ost/main.go @@ -316,6 +316,9 @@ func main() { if migrationContext.CheckpointIntervalSeconds < 10 { migrationContext.Log.Fatalf("--checkpoint-seconds should be >=10") } + if migrationContext.CountTableRows && migrationContext.PanicOnWarnings { + migrationContext.Log.Warning("--exact-rowcount with --panic-on-warnings: row counts cannot be exact due to warning detection") + } switch *cutOver { case "atomic", "default", "": diff --git a/go/logic/applier.go b/go/logic/applier.go index ec2d7be10..bfc1786d3 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -1497,6 +1497,107 @@ func (this *Applier) buildDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) []*dmlB return []*dmlBuildResult{newDmlBuildResultError(fmt.Errorf("Unknown dml event type: %+v", dmlEvent.DML))} } +// executeBatchWithWarningChecking executes a batch of DML statements with SHOW WARNINGS +// interleaved after each statement to detect warnings from any statement in the batch. +// This is used when PanicOnWarnings is enabled to ensure warnings from middle statements +// are not lost (SHOW WARNINGS only shows warnings from the last statement in a multi-statement batch). +func (this *Applier) executeBatchWithWarningChecking(ctx context.Context, tx *gosql.Tx, buildResults []*dmlBuildResult) (int64, error) { + // Build query with interleaved SHOW WARNINGS: stmt1; SHOW WARNINGS; stmt2; SHOW WARNINGS; ... + var queryBuilder strings.Builder + args := make([]interface{}, 0) + + for _, buildResult := range buildResults { + queryBuilder.WriteString(buildResult.query) + queryBuilder.WriteString(";\nSHOW WARNINGS;\n") + args = append(args, buildResult.args...) + } + + query := queryBuilder.String() + + // Execute the multi-statement query + rows, err := tx.QueryContext(ctx, query, args...) + if err != nil { + return 0, fmt.Errorf("%w; query=%s; args=%+v", err, query, args) + } + defer rows.Close() + + var totalDelta int64 + + // QueryContext with multi-statement queries returns rows positioned at the first result set + // that produces rows (i.e., the first SHOW WARNINGS), automatically skipping DML results. + // Verify we're at a SHOW WARNINGS result set (should have 3 columns: Level, Code, Message) + cols, err := rows.Columns() + if err != nil { + return 0, fmt.Errorf("failed to get columns: %w", err) + } + + // If somehow we're not at a result set with columns, try to advance + if len(cols) == 0 { + if !rows.NextResultSet() { + return 0, fmt.Errorf("expected SHOW WARNINGS result set after first statement") + } + } + + // Compile regex once before loop to avoid performance penalty and handle errors properly + migrationKeyRegex, err := this.compileMigrationKeyWarningRegex() + if err != nil { + return 0, err + } + + // Iterate through SHOW WARNINGS result sets. + // DML statements don't create navigable result sets, so we move directly between SHOW WARNINGS. + // Pattern: [at SHOW WARNINGS #1] -> read warnings -> NextResultSet() -> [at SHOW WARNINGS #2] -> ... + for i := 0; i < len(buildResults); i++ { + // We can't get exact rows affected with QueryContext (needed for reading SHOW WARNINGS). + // Use the theoretical delta (+1 for INSERT, -1 for DELETE, 0 for UPDATE) as an approximation. + // This may be inaccurate (e.g., INSERT IGNORE with duplicate affects 0 rows but we count +1). + totalDelta += buildResults[i].rowsDelta + + // Read warnings from this statement's SHOW WARNINGS result set + var sqlWarnings []string + for rows.Next() { + var level, message string + var code int + if err := rows.Scan(&level, &code, &message); err != nil { + // Scan failure means we cannot reliably read warnings. + // Since PanicOnWarnings is a safety feature, we must fail hard rather than silently skip. + return 0, fmt.Errorf("failed to scan SHOW WARNINGS for statement %d: %w", i+1, err) + } + + if strings.Contains(message, "Duplicate entry") && migrationKeyRegex.MatchString(message) { + // Duplicate entry on migration unique key is expected during binlog replay + // (row was already copied during bulk copy phase) + continue + } + sqlWarnings = append(sqlWarnings, fmt.Sprintf("%s: %s (%d)", level, message, code)) + } + + // Check for errors that occurred while iterating through warnings + if err := rows.Err(); err != nil { + return 0, fmt.Errorf("error reading SHOW WARNINGS result set for statement %d: %w", i+1, err) + } + + if len(sqlWarnings) > 0 { + return 0, fmt.Errorf("warnings detected in statement %d of %d: %v", i+1, len(buildResults), sqlWarnings) + } + + // Move to the next statement's SHOW WARNINGS result set + // For the last statement, there's no next result set + // DML statements don't create result sets, so we only need one NextResultSet call + // to move from SHOW WARNINGS #N to SHOW WARNINGS #(N+1) + if i < len(buildResults)-1 { + if !rows.NextResultSet() { + if err := rows.Err(); err != nil { + return 0, fmt.Errorf("error moving to SHOW WARNINGS for statement %d: %w", i+2, err) + } + return 0, fmt.Errorf("expected SHOW WARNINGS result set for statement %d", i+2) + } + } + } + + return totalDelta, nil +} + // ApplyDMLEventQueries applies multiple DML queries onto the _ghost_ table func (this *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent)) error { var totalDelta int64 @@ -1536,82 +1637,52 @@ func (this *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent)) } } - // We batch together the DML queries into multi-statements to minimize network trips. - // We have to use the raw driver connection to access the rows affected - // for each statement in the multi-statement. - execErr := conn.Raw(func(driverConn any) error { - ex := driverConn.(driver.ExecerContext) - nvc := driverConn.(driver.NamedValueChecker) - - multiArgs := make([]driver.NamedValue, 0, nArgs) - multiQueryBuilder := strings.Builder{} - for _, buildResult := range buildResults { - for _, arg := range buildResult.args { - nv := driver.NamedValue{Value: driver.Value(arg)} - nvc.CheckNamedValue(&nv) - multiArgs = append(multiArgs, nv) - } - - multiQueryBuilder.WriteString(buildResult.query) - multiQueryBuilder.WriteString(";\n") - } - - res, err := ex.ExecContext(ctx, multiQueryBuilder.String(), multiArgs) - if err != nil { - err = fmt.Errorf("%w; query=%s; args=%+v", err, multiQueryBuilder.String(), multiArgs) - return err - } - - mysqlRes := res.(drivermysql.Result) - - // each DML is either a single insert (delta +1), update (delta +0) or delete (delta -1). - // multiplying by the rows actually affected (either 0 or 1) will give an accurate row delta for this DML event - for i, rowsAffected := range mysqlRes.AllRowsAffected() { - totalDelta += buildResults[i].rowsDelta * rowsAffected - } - return nil - }) - - if execErr != nil { - return rollback(execErr) - } - - // Check for warnings when PanicOnWarnings is enabled + // When PanicOnWarnings is enabled, we need to check warnings after each statement + // in the batch. SHOW WARNINGS only shows warnings from the last statement in a + // multi-statement query, so we interleave SHOW WARNINGS after each DML statement. if this.migrationContext.PanicOnWarnings { - //nolint:execinquery - rows, err := tx.Query("SHOW WARNINGS") - if err != nil { - return rollback(err) - } - defer rows.Close() - if err = rows.Err(); err != nil { - return rollback(err) - } - - // Compile regex once before loop to avoid performance penalty and handle errors properly - migrationKeyRegex, err := this.compileMigrationKeyWarningRegex() + totalDelta, err = this.executeBatchWithWarningChecking(ctx, tx, buildResults) if err != nil { return rollback(err) } + } else { + // Fast path: batch together DML queries into multi-statements to minimize network trips. + // We use the raw driver connection to access the rows affected for each statement. + execErr := conn.Raw(func(driverConn any) error { + ex := driverConn.(driver.ExecerContext) + nvc := driverConn.(driver.NamedValueChecker) + + multiArgs := make([]driver.NamedValue, 0, nArgs) + multiQueryBuilder := strings.Builder{} + for _, buildResult := range buildResults { + for _, arg := range buildResult.args { + nv := driver.NamedValue{Value: driver.Value(arg)} + nvc.CheckNamedValue(&nv) + multiArgs = append(multiArgs, nv) + } + + multiQueryBuilder.WriteString(buildResult.query) + multiQueryBuilder.WriteString(";\n") + } - var sqlWarnings []string - for rows.Next() { - var level, message string - var code int - if err := rows.Scan(&level, &code, &message); err != nil { - this.migrationContext.Log.Warningf("Failed to read SHOW WARNINGS row") - continue + res, err := ex.ExecContext(ctx, multiQueryBuilder.String(), multiArgs) + if err != nil { + err = fmt.Errorf("%w; query=%s; args=%+v", err, multiQueryBuilder.String(), multiArgs) + return err } - if strings.Contains(message, "Duplicate entry") && migrationKeyRegex.MatchString(message) { - // Duplicate entry on migration unique key is expected during binlog replay - // (row was already copied during bulk copy phase) - continue + + mysqlRes := res.(drivermysql.Result) + + // each DML is either a single insert (delta +1), update (delta +0) or delete (delta -1). + // multiplying by the rows actually affected (either 0 or 1) will give an accurate row delta for this DML event + for i, rowsAffected := range mysqlRes.AllRowsAffected() { + totalDelta += buildResults[i].rowsDelta * rowsAffected } - sqlWarnings = append(sqlWarnings, fmt.Sprintf("%s: %s (%d)", level, message, code)) - } - if len(sqlWarnings) > 0 { - warningMsg := fmt.Sprintf("Warnings detected during DML event application: %v", sqlWarnings) - return rollback(errors.New(warningMsg)) + return nil + }) + + if execErr != nil { + return rollback(execErr) } } diff --git a/go/logic/applier_test.go b/go/logic/applier_test.go index 6b02e0c02..1656dd141 100644 --- a/go/logic/applier_test.go +++ b/go/logic/applier_test.go @@ -1367,6 +1367,116 @@ func (suite *ApplierTestSuite) TestPanicOnWarningsDisabled() { suite.Require().Equal("bob@example.com", results[1].email) } +// TestMultipleDMLEventsInBatch tests that multiple DML events are processed in a single transaction +// and that if one fails due to a warning, the entire batch is rolled back - including events that +// come AFTER the failure. This proves true transaction atomicity. +func (suite *ApplierTestSuite) TestMultipleDMLEventsInBatch() { + ctx := context.Background() + + var err error + + // Create table with id and email columns + _, err = suite.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT PRIMARY KEY, email VARCHAR(100));", getTestTableName())) + suite.Require().NoError(err) + + // Create ghost table with unique index on email + _, err = suite.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT PRIMARY KEY, email VARCHAR(100), UNIQUE KEY email_unique (email));", getTestGhostTableName())) + suite.Require().NoError(err) + + connectionConfig, err := getTestConnectionConfig(ctx, suite.mysqlContainer) + suite.Require().NoError(err) + + migrationContext := newTestMigrationContext() + migrationContext.ApplierConnectionConfig = connectionConfig + migrationContext.SetConnectionConfig("innodb") + + migrationContext.PanicOnWarnings = true + + migrationContext.OriginalTableColumns = sql.NewColumnList([]string{"id", "email"}) + migrationContext.SharedColumns = sql.NewColumnList([]string{"id", "email"}) + migrationContext.MappedSharedColumns = sql.NewColumnList([]string{"id", "email"}) + migrationContext.UniqueKey = &sql.UniqueKey{ + Name: "PRIMARY", + NameInGhostTable: "PRIMARY", + Columns: *sql.NewColumnList([]string{"id"}), + } + + applier := NewApplier(migrationContext) + suite.Require().NoError(applier.prepareQueries()) + defer applier.Teardown() + + err = applier.InitDBConnections() + suite.Require().NoError(err) + + // Insert initial rows into ghost table + _, err = suite.db.ExecContext(ctx, fmt.Sprintf("INSERT INTO %s (id, email) VALUES (1, 'alice@example.com'), (3, 'charlie@example.com');", getTestGhostTableName())) + suite.Require().NoError(err) + + // Simulate multiple binlog events in a batch: + // 1. Duplicate on PRIMARY KEY (allowed - expected during binlog replay) + // 2. Duplicate on email index (should fail) ← FAILURE IN MIDDLE + // 3. Valid insert (would succeed) ← SUCCESS AFTER FAILURE + // + // The critical test: Even though event #3 would succeed on its own, it must be rolled back + // because event #2 failed. This proves the entire batch is truly atomic. + dmlEvents := []*binlog.BinlogDMLEvent{ + { + DatabaseName: testMysqlDatabase, + TableName: testMysqlTableName, + DML: binlog.InsertDML, + NewColumnValues: sql.ToColumnValues([]interface{}{1, "alice@example.com"}), // duplicate PRIMARY (normally allowed) + }, + { + DatabaseName: testMysqlDatabase, + TableName: testMysqlTableName, + DML: binlog.InsertDML, + NewColumnValues: sql.ToColumnValues([]interface{}{4, "alice@example.com"}), // duplicate email (FAILS) + }, + { + DatabaseName: testMysqlDatabase, + TableName: testMysqlTableName, + DML: binlog.InsertDML, + NewColumnValues: sql.ToColumnValues([]interface{}{2, "bob@example.com"}), // valid insert (would succeed) + }, + } + + // Should fail due to the second event + err = applier.ApplyDMLEventQueries(dmlEvents) + suite.Require().Error(err) + suite.Require().Contains(err.Error(), "Duplicate entry") + + // Verify that the entire batch was rolled back - still only the original 2 rows + // Critically: id=2 (bob@example.com) from event #3 should NOT be present + rows, err := suite.db.Query("SELECT id, email FROM " + getTestGhostTableName() + " ORDER BY id") + suite.Require().NoError(err) + defer rows.Close() + + var results []struct { + id int + email string + } + for rows.Next() { + var id int + var email string + err = rows.Scan(&id, &email) + suite.Require().NoError(err) + results = append(results, struct { + id int + email string + }{id, email}) + } + suite.Require().NoError(rows.Err()) + + // Should still have exactly 2 original rows (entire batch was rolled back) + // This proves that even event #3 (which would have succeeded) was rolled back + suite.Require().Len(results, 2) + suite.Require().Equal(1, results[0].id) + suite.Require().Equal("alice@example.com", results[0].email) + suite.Require().Equal(3, results[1].id) + suite.Require().Equal("charlie@example.com", results[1].email) + // Critically: id=2 (bob@example.com) is NOT present, proving event #3 was rolled back +} + func TestApplier(t *testing.T) { suite.Run(t, new(ApplierTestSuite)) } From 5a2b02bfa3c2f66b5bc64621fa0c7c62776f1722 Mon Sep 17 00:00:00 2001 From: Gabriel Gilder Date: Tue, 10 Mar 2026 10:24:00 -0700 Subject: [PATCH 2/3] Add integration test for batch warnings --- .../panic-on-warnings-batch-middle/create.sql | 11 ++++ .../expect_failure | 1 + .../panic-on-warnings-batch-middle/extra_args | 1 + .../panic-on-warnings-batch-middle/test.sh | 57 +++++++++++++++++++ 4 files changed, 70 insertions(+) create mode 100644 localtests/panic-on-warnings-batch-middle/create.sql create mode 100644 localtests/panic-on-warnings-batch-middle/expect_failure create mode 100644 localtests/panic-on-warnings-batch-middle/extra_args create mode 100755 localtests/panic-on-warnings-batch-middle/test.sh diff --git a/localtests/panic-on-warnings-batch-middle/create.sql b/localtests/panic-on-warnings-batch-middle/create.sql new file mode 100644 index 000000000..e4883ca66 --- /dev/null +++ b/localtests/panic-on-warnings-batch-middle/create.sql @@ -0,0 +1,11 @@ +drop table if exists gh_ost_test; +create table gh_ost_test ( + id int auto_increment, + email varchar(255) not null, + primary key (id) +) auto_increment=1; + +-- Insert initial data - all unique emails +insert into gh_ost_test (email) values ('alice@example.com'); +insert into gh_ost_test (email) values ('bob@example.com'); +insert into gh_ost_test (email) values ('charlie@example.com'); diff --git a/localtests/panic-on-warnings-batch-middle/expect_failure b/localtests/panic-on-warnings-batch-middle/expect_failure new file mode 100644 index 000000000..11800efb0 --- /dev/null +++ b/localtests/panic-on-warnings-batch-middle/expect_failure @@ -0,0 +1 @@ +ERROR warnings detected in statement 1 of 2 diff --git a/localtests/panic-on-warnings-batch-middle/extra_args b/localtests/panic-on-warnings-batch-middle/extra_args new file mode 100644 index 000000000..55cdcd0e8 --- /dev/null +++ b/localtests/panic-on-warnings-batch-middle/extra_args @@ -0,0 +1 @@ +--default-retries=1 --panic-on-warnings --alter "add unique index email_idx(email)" --postpone-cut-over-flag-file=/tmp/gh-ost-test.postpone-cutover diff --git a/localtests/panic-on-warnings-batch-middle/test.sh b/localtests/panic-on-warnings-batch-middle/test.sh new file mode 100755 index 000000000..cbe3829c7 --- /dev/null +++ b/localtests/panic-on-warnings-batch-middle/test.sh @@ -0,0 +1,57 @@ +#!/bin/bash +# Custom test: inject batched DML events AFTER row copy completes +# Tests that warnings in the middle of a DML batch are detected + +# Create postpone flag file (referenced in extra_args) +postpone_flag_file=/tmp/gh-ost-test.postpone-cutover +touch $postpone_flag_file + +# Set table names (required by build_ghost_command) +table_name="gh_ost_test" +ghost_table_name="_gh_ost_test_gho" + +# Build gh-ost command using framework function +build_ghost_command + +# Run in background +echo_dot +echo > $test_logfile +bash -c "$cmd" >>$test_logfile 2>&1 & +ghost_pid=$! + +# Wait for row copy to complete +echo_dot +for i in {1..30}; do + grep -q "Row copy complete" $test_logfile && break + ps -p $ghost_pid > /dev/null || { echo; echo "ERROR gh-ost exited early"; rm -f $postpone_flag_file; return 1; } + sleep 1; echo_dot +done + +# Inject batched DML events that will create warnings +# These must be in a single transaction to be batched during binlog replay +echo_dot +gh-ost-test-mysql-master test << 'EOF' +BEGIN; +-- INSERT with duplicate PRIMARY KEY - warning on migration key (filtered by gh-ost) +INSERT IGNORE INTO gh_ost_test (id, email) VALUES (1, 'duplicate_pk@example.com'); +-- INSERT with duplicate email - warning on unique index (should trigger failure) +INSERT IGNORE INTO gh_ost_test (email) VALUES ('alice@example.com'); +-- INSERT with unique data - would succeed if not for previous warning +INSERT IGNORE INTO gh_ost_test (email) VALUES ('new@example.com'); +COMMIT; +EOF + +# Wait for binlog events to replicate and be applied +sleep 10; echo_dot + +# Complete cutover by removing postpone flag +rm -f $postpone_flag_file + +# Wait for gh-ost to complete +wait $ghost_pid +execution_result=$? +rm -f $postpone_flag_file + +# Validate using framework function +validate_expected_failure +return $? From 1e8f6ce2574914dcda7aa314a3571c024863a78a Mon Sep 17 00:00:00 2001 From: Gabriel Gilder Date: Wed, 11 Mar 2026 16:52:47 -0700 Subject: [PATCH 3/3] Update expected failure message for update-pk test --- .../expect_failure | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/localtests/panic-on-warnings-update-pk-with-duplicate-on-new-unique-index/expect_failure b/localtests/panic-on-warnings-update-pk-with-duplicate-on-new-unique-index/expect_failure index a54788d01..fb8dc562a 100644 --- a/localtests/panic-on-warnings-update-pk-with-duplicate-on-new-unique-index/expect_failure +++ b/localtests/panic-on-warnings-update-pk-with-duplicate-on-new-unique-index/expect_failure @@ -1 +1 @@ -Warnings detected during DML event application +ERROR warnings detected in statement 1 of 1