Skip to content
Open
32 changes: 24 additions & 8 deletions go/logic/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -917,6 +917,17 @@ func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected
return nil, err
}

// Compile regex once before loop to avoid performance penalty and handle errors properly
// Escape metacharacters to avoid regex syntax errors with table/index names like "my_table[test]" or "idx.name"
// Duplicate warnings are formatted differently across mysql versions, hence the optional table name prefix
escapedTable := regexp.QuoteMeta(this.migrationContext.GetGhostTableName())
escapedKey := regexp.QuoteMeta(this.migrationContext.UniqueKey.NameInGhostTable)
migrationUniqueKeyPattern := fmt.Sprintf(`for key '(%s\.)?%s'`, escapedTable, escapedKey)
migrationKeyRegex, err := regexp.Compile(migrationUniqueKeyPattern)
if err != nil {
return nil, fmt.Errorf("failed to compile migration key pattern: %w", err)
}

var sqlWarnings []string
for rows.Next() {
var level, message string
Expand All @@ -925,10 +936,7 @@ func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected
this.migrationContext.Log.Warningf("Failed to read SHOW WARNINGS row")
continue
}
// Duplicate warnings are formatted differently across mysql versions, hence the optional table name prefix
migrationUniqueKeyExpression := fmt.Sprintf("for key '(%s\\.)?%s'", this.migrationContext.GetGhostTableName(), this.migrationContext.UniqueKey.NameInGhostTable)
matched, _ := regexp.MatchString(migrationUniqueKeyExpression, message)
if strings.Contains(message, "Duplicate entry") && matched {
if strings.Contains(message, "Duplicate entry") && migrationKeyRegex.MatchString(message) {
continue
}
sqlWarnings = append(sqlWarnings, fmt.Sprintf("%s: %s (%d)", level, message, code))
Expand Down Expand Up @@ -1559,6 +1567,17 @@ func (this *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent))
return rollback(err)
}

// Compile regex once before loop to avoid performance penalty and handle errors properly
// Escape metacharacters to avoid regex syntax errors with table/index names like "my_table[test]" or "idx.name"
// Duplicate warnings are formatted differently across mysql versions, hence the optional table name prefix
escapedTable := regexp.QuoteMeta(this.migrationContext.GetGhostTableName())
escapedKey := regexp.QuoteMeta(this.migrationContext.UniqueKey.NameInGhostTable)
migrationUniqueKeyPattern := fmt.Sprintf(`for key '(%s\.)?%s'`, escapedTable, escapedKey)
migrationKeyRegex, err := regexp.Compile(migrationUniqueKeyPattern)
if err != nil {
return rollback(fmt.Errorf("failed to compile migration key pattern: %w", err))
}

var sqlWarnings []string
for rows.Next() {
var level, message string
Expand All @@ -1567,10 +1586,7 @@ func (this *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent))
this.migrationContext.Log.Warningf("Failed to read SHOW WARNINGS row")
continue
}
// Duplicate warnings are formatted differently across mysql versions, hence the optional table name prefix
migrationUniqueKeyExpression := fmt.Sprintf("for key '(%s\\.)?%s'", this.migrationContext.GetGhostTableName(), this.migrationContext.UniqueKey.NameInGhostTable)
matched, _ := regexp.MatchString(migrationUniqueKeyExpression, message)
if strings.Contains(message, "Duplicate entry") && matched {
if strings.Contains(message, "Duplicate entry") && migrationKeyRegex.MatchString(message) {
// Duplicate entry on migration unique key is expected during binlog replay
// (row was already copied during bulk copy phase)
continue
Expand Down
304 changes: 298 additions & 6 deletions go/logic/applier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -782,23 +782,35 @@ func (suite *ApplierTestSuite) TestPanicOnWarningsWithDuplicateKeyOnNonMigration
suite.Require().Error(err)
suite.Require().Contains(err.Error(), "Duplicate entry")

// Verify that the ghost table still has only 3 rows (no data loss)
rows, err := suite.db.Query("SELECT * FROM " + getTestGhostTableName() + " ORDER BY id")
// Verify that the ghost table still has only the original 3 rows with correct data (no data loss)
rows, err := suite.db.Query("SELECT id, email FROM " + getTestGhostTableName() + " ORDER BY id")
suite.Require().NoError(err)
defer rows.Close()

var count int
var results []struct {
id int
email string
}
for rows.Next() {
var id int
var email string
err = rows.Scan(&id, &email)
suite.Require().NoError(err)
count += 1
results = append(results, struct {
id int
email string
}{id, email})
}
suite.Require().NoError(rows.Err())

// All 3 original rows should still be present
suite.Require().Equal(3, count)
// All 3 original rows should still be present with correct data
suite.Require().Len(results, 3)
suite.Require().Equal(1, results[0].id)
suite.Require().Equal("user1@example.com", results[0].email)
suite.Require().Equal(2, results[1].id)
suite.Require().Equal("user2@example.com", results[1].email)
suite.Require().Equal(3, results[2].id)
suite.Require().Equal("user3@example.com", results[2].email)
}

// TestUpdateModifyingUniqueKeyWithDuplicateOnOtherIndex tests the scenario where:
Expand Down Expand Up @@ -980,6 +992,286 @@ func (suite *ApplierTestSuite) TestNormalUpdateWithPanicOnWarnings() {
suite.Require().NoError(rows.Err())
}

// TestDuplicateOnMigrationKeyAllowedInBinlogReplay tests the positive case where
// a duplicate on the migration unique key during binlog replay is expected and should be allowed
func (suite *ApplierTestSuite) TestDuplicateOnMigrationKeyAllowedInBinlogReplay() {
ctx := context.Background()

var err error

// Create table with id and email columns, where id is the primary key
_, err = suite.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT PRIMARY KEY, email VARCHAR(100));", getTestTableName()))
suite.Require().NoError(err)

// Create ghost table with same schema plus a new unique index on email
_, err = suite.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT PRIMARY KEY, email VARCHAR(100), UNIQUE KEY email_unique (email));", getTestGhostTableName()))
suite.Require().NoError(err)

connectionConfig, err := getTestConnectionConfig(ctx, suite.mysqlContainer)
suite.Require().NoError(err)

migrationContext := newTestMigrationContext()
migrationContext.ApplierConnectionConfig = connectionConfig
migrationContext.SetConnectionConfig("innodb")

migrationContext.PanicOnWarnings = true

migrationContext.OriginalTableColumns = sql.NewColumnList([]string{"id", "email"})
migrationContext.SharedColumns = sql.NewColumnList([]string{"id", "email"})
migrationContext.MappedSharedColumns = sql.NewColumnList([]string{"id", "email"})
migrationContext.UniqueKey = &sql.UniqueKey{
Name: "PRIMARY",
NameInGhostTable: "PRIMARY",
Columns: *sql.NewColumnList([]string{"id"}),
}

applier := NewApplier(migrationContext)
suite.Require().NoError(applier.prepareQueries())
defer applier.Teardown()

err = applier.InitDBConnections()
suite.Require().NoError(err)

// Insert initial rows into ghost table (simulating bulk copy phase)
_, err = suite.db.ExecContext(ctx, fmt.Sprintf("INSERT INTO %s (id, email) VALUES (1, 'alice@example.com'), (2, 'bob@example.com');", getTestGhostTableName()))
suite.Require().NoError(err)

// Simulate binlog event: try to insert the same row again (duplicate on PRIMARY KEY - the migration key)
// This is expected during binlog replay when a row was already copied during bulk copy
dmlEvents := []*binlog.BinlogDMLEvent{
{
DatabaseName: testMysqlDatabase,
TableName: testMysqlTableName,
DML: binlog.InsertDML,
NewColumnValues: sql.ToColumnValues([]interface{}{1, "alice@example.com"}), // duplicate PRIMARY KEY
},
}

// This should succeed - duplicate on migration unique key is expected and should be filtered out
err = applier.ApplyDMLEventQueries(dmlEvents)
suite.Require().NoError(err)

// Verify that the ghost table still has only the original 2 rows with correct data
rows, err := suite.db.Query("SELECT id, email FROM " + getTestGhostTableName() + " ORDER BY id")
suite.Require().NoError(err)
defer rows.Close()

var results []struct {
id int
email string
}
for rows.Next() {
var id int
var email string
err = rows.Scan(&id, &email)
suite.Require().NoError(err)
results = append(results, struct {
id int
email string
}{id, email})
}
suite.Require().NoError(rows.Err())

// Should still have exactly 2 rows with correct data
suite.Require().Len(results, 2)
suite.Require().Equal(1, results[0].id)
suite.Require().Equal("alice@example.com", results[0].email)
suite.Require().Equal(2, results[1].id)
suite.Require().Equal("bob@example.com", results[1].email)
}

// TestRegexMetacharactersInIndexName tests that index names with regex metacharacters
// are properly escaped. We test with a plus sign in the index name, which without
// QuoteMeta would be treated as a regex quantifier (one or more of preceding character).
// This test verifies the pattern matches ONLY the exact index name, not a regex pattern.
func (suite *ApplierTestSuite) TestRegexMetacharactersInIndexName() {
ctx := context.Background()

var err error

// Create tables with an index name containing a plus sign
// Without QuoteMeta, "idx+" would mean "one or more 'd'" in regex
_, err = suite.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT PRIMARY KEY, email VARCHAR(100));", getTestTableName()))
suite.Require().NoError(err)

// MySQL allows + in index names when quoted
_, err = suite.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT PRIMARY KEY, email VARCHAR(100), UNIQUE KEY `idx+email` (email));", getTestGhostTableName()))
suite.Require().NoError(err)

connectionConfig, err := getTestConnectionConfig(ctx, suite.mysqlContainer)
suite.Require().NoError(err)

migrationContext := newTestMigrationContext()
migrationContext.ApplierConnectionConfig = connectionConfig
migrationContext.SetConnectionConfig("innodb")

migrationContext.PanicOnWarnings = true

migrationContext.OriginalTableColumns = sql.NewColumnList([]string{"id", "email"})
migrationContext.SharedColumns = sql.NewColumnList([]string{"id", "email"})
migrationContext.MappedSharedColumns = sql.NewColumnList([]string{"id", "email"})
migrationContext.UniqueKey = &sql.UniqueKey{
Name: "PRIMARY",
NameInGhostTable: "PRIMARY",
Columns: *sql.NewColumnList([]string{"id"}),
}

applier := NewApplier(migrationContext)
suite.Require().NoError(applier.prepareQueries())
defer applier.Teardown()

err = applier.InitDBConnections()
suite.Require().NoError(err)

// Insert initial rows
_, err = suite.db.ExecContext(ctx, fmt.Sprintf("INSERT INTO %s (id, email) VALUES (1, 'alice@example.com'), (2, 'bob@example.com');", getTestGhostTableName()))
suite.Require().NoError(err)

// Test: duplicate on PRIMARY KEY (migration key) should be allowed
dmlEvents := []*binlog.BinlogDMLEvent{
{
DatabaseName: testMysqlDatabase,
TableName: testMysqlTableName,
DML: binlog.InsertDML,
NewColumnValues: sql.ToColumnValues([]interface{}{1, "alice@example.com"}),
},
}

err = applier.ApplyDMLEventQueries(dmlEvents)
suite.Require().NoError(err, "Duplicate on PRIMARY should be allowed even with + in index name")

// Test: duplicate on idx+email should fail
// This verifies our regex correctly identifies "idx+email" (not as a regex pattern)
dmlEvents = []*binlog.BinlogDMLEvent{
{
DatabaseName: testMysqlDatabase,
TableName: testMysqlTableName,
DML: binlog.InsertDML,
NewColumnValues: sql.ToColumnValues([]interface{}{3, "alice@example.com"}),
},
}

err = applier.ApplyDMLEventQueries(dmlEvents)
suite.Require().Error(err, "Duplicate on idx+email should fail")
suite.Require().Contains(err.Error(), "Duplicate entry")

// Verify final state - should still have only the original 2 rows
rows, err := suite.db.Query("SELECT id, email FROM " + getTestGhostTableName() + " ORDER BY id")
suite.Require().NoError(err)
defer rows.Close()

var results []struct {
id int
email string
}
for rows.Next() {
var id int
var email string
err = rows.Scan(&id, &email)
suite.Require().NoError(err)
results = append(results, struct {
id int
email string
}{id, email})
}
suite.Require().NoError(rows.Err())

suite.Require().Len(results, 2)
suite.Require().Equal(1, results[0].id)
suite.Require().Equal("alice@example.com", results[0].email)
suite.Require().Equal(2, results[1].id)
suite.Require().Equal("bob@example.com", results[1].email)
}

// TestPanicOnWarningsDisabled tests that when PanicOnWarnings is false,
// warnings are not checked and duplicates are silently ignored
func (suite *ApplierTestSuite) TestPanicOnWarningsDisabled() {
ctx := context.Background()

var err error

// Create table with id and email columns
_, err = suite.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT PRIMARY KEY, email VARCHAR(100));", getTestTableName()))
suite.Require().NoError(err)

// Create ghost table with unique index on email
_, err = suite.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT PRIMARY KEY, email VARCHAR(100), UNIQUE KEY email_unique (email));", getTestGhostTableName()))
suite.Require().NoError(err)

connectionConfig, err := getTestConnectionConfig(ctx, suite.mysqlContainer)
suite.Require().NoError(err)

migrationContext := newTestMigrationContext()
migrationContext.ApplierConnectionConfig = connectionConfig
migrationContext.SetConnectionConfig("innodb")

// PanicOnWarnings is false (default)
migrationContext.PanicOnWarnings = false

migrationContext.OriginalTableColumns = sql.NewColumnList([]string{"id", "email"})
migrationContext.SharedColumns = sql.NewColumnList([]string{"id", "email"})
migrationContext.MappedSharedColumns = sql.NewColumnList([]string{"id", "email"})
migrationContext.UniqueKey = &sql.UniqueKey{
Name: "PRIMARY",
NameInGhostTable: "PRIMARY",
Columns: *sql.NewColumnList([]string{"id"}),
}

applier := NewApplier(migrationContext)
suite.Require().NoError(applier.prepareQueries())
defer applier.Teardown()

err = applier.InitDBConnections()
suite.Require().NoError(err)

// Insert initial rows into ghost table
_, err = suite.db.ExecContext(ctx, fmt.Sprintf("INSERT INTO %s (id, email) VALUES (1, 'alice@example.com'), (2, 'bob@example.com');", getTestGhostTableName()))
suite.Require().NoError(err)

// Simulate binlog event: insert duplicate email on non-migration index
// With PanicOnWarnings disabled, this should succeed (INSERT IGNORE skips it)
dmlEvents := []*binlog.BinlogDMLEvent{
{
DatabaseName: testMysqlDatabase,
TableName: testMysqlTableName,
DML: binlog.InsertDML,
NewColumnValues: sql.ToColumnValues([]interface{}{3, "alice@example.com"}), // duplicate email
},
}

// Should succeed because PanicOnWarnings is disabled
err = applier.ApplyDMLEventQueries(dmlEvents)
suite.Require().NoError(err)

// Verify that only 2 original rows exist with correct data (the duplicate was silently ignored)
rows, err := suite.db.Query("SELECT id, email FROM " + getTestGhostTableName() + " ORDER BY id")
suite.Require().NoError(err)
defer rows.Close()

var results []struct {
id int
email string
}
for rows.Next() {
var id int
var email string
err = rows.Scan(&id, &email)
suite.Require().NoError(err)
results = append(results, struct {
id int
email string
}{id, email})
}
suite.Require().NoError(rows.Err())

// Should still have exactly 2 original rows (id=3 was silently ignored)
suite.Require().Len(results, 2)
suite.Require().Equal(1, results[0].id)
suite.Require().Equal("alice@example.com", results[0].email)
suite.Require().Equal(2, results[1].id)
suite.Require().Equal("bob@example.com", results[1].email)
}

func TestApplier(t *testing.T) {
suite.Run(t, new(ApplierTestSuite))
}
Loading