diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index be5b987..026f9ac 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -2,6 +2,49 @@ All notable changes to ACE will be captured in this document. This project follows semantic versioning; the latest changes appear first. +## [v2.0.0] - 2026-04-21 + +### Added +- **Native PostgreSQL support.** `table-diff`, `table-repair`, and Merkle tree + operations now work on vanilla PostgreSQL (14+) without the spock extension. + ACE auto-detects whether spock is installed on each node and branches + accordingly; `spock-diff` and `repset-diff` return a clear error when spock + is not present. +- Native PG alternatives for replication origin and slot LSN queries using + `pg_subscription` and `pg_replication_origin` catalog tables. +- `--against-origin` now works on native PG logical replication setups by + resolving replication origin IDs to subscription names. +- Integration test suite for native PostgreSQL covering table-diff, table-repair + (unidirectional, bidirectional, fix-nulls, dry-run), Merkle tree operations, + and origin-tracked replication with repair. + +### Changed +- ACE schema name in SQL templates is now quoted with `pgx.Identifier.Sanitize()` + to prevent SQL breakage with non-simple schema names (e.g., mixed case, + hyphens). +- `spock.xact_commit_timestamp_origin()` replaced with the standard PostgreSQL + function `pg_xact_commit_timestamp_origin()` in the `--against-origin` filter. + The two are functionally identical (spock's implementation is a thin wrapper + around the same PG core function). +- Spock detection is now per-node and lazy, supporting mixed clusters where some + nodes have spock and others do not. +- `SpockNodeNames` renamed to `NodeOriginNames` throughout the codebase to + reflect dual-mode (spock + native PG) usage. + +### Fixed +- Recovery-mode auto-selection (`autoSelectSourceOfTruth`) silently used native + PG LSN queries on spock clusters because the connection pool was stored after + `fetchLSNsForNode` returned instead of before. +- `aceSchema` template function used `config.Cfg` (not thread-safe) instead of + `config.Get()`, risking data races during concurrent SIGHUP reloads. +- `repset-diff` child table-diff tasks did not inherit the parent's TaskStore, + causing each to open its own SQLite connection. +- `isSpockAvailable` swallowed errors, causing callers to silently fall back to + native PG mode on detection failures. +- Native PG subscription matching used LIKE substring patterns that could + collide on similar node names (e.g., `n1` matching `n10`); now uses regex + word boundaries. + ## [v1.9.0] - 2026-04-17 ### Added diff --git a/tests/integration/docker-compose-native.yaml b/tests/integration/docker-compose-native.yaml index be60f2b..0732836 100644 --- a/tests/integration/docker-compose-native.yaml +++ b/tests/integration/docker-compose-native.yaml @@ -41,3 +41,17 @@ services: - "wal_level=logical" ports: - "5432" + native-n3: + image: postgres:17 + environment: + POSTGRES_USER: postgres + POSTGRES_PASSWORD: password + POSTGRES_DB: testdb + command: + - "postgres" + - "-c" + - "track_commit_timestamp=on" + - "-c" + - "wal_level=logical" + ports: + - "5432" diff --git a/tests/integration/native_pg_test.go b/tests/integration/native_pg_test.go index abf93ed..1f04d04 100644 --- a/tests/integration/native_pg_test.go +++ b/tests/integration/native_pg_test.go @@ -39,6 +39,7 @@ const ( nativeDBName = "testdb" nativeServiceN1 = "native-n1" nativeServiceN2 = "native-n2" + nativeServiceN3 = "native-n3" nativeContainerPort = "5432/tcp" nativeComposeFile = "docker-compose-native.yaml" nativeClusterName = "native_test_cluster" @@ -53,6 +54,9 @@ type nativeClusterState struct { n2Host string n2Port string n2Pool *pgxpool.Pool + n3Host string + n3Port string + n3Pool *pgxpool.Pool } func setupNativeCluster(t *testing.T) *nativeClusterState { @@ -64,12 +68,11 @@ func setupNativeCluster(t *testing.T) *nativeClusterState { identifier := strings.ToLower(fmt.Sprintf("ace_native_test_%d", time.Now().UnixNano())) - waitN1 := wait.ForListeningPort(nat.Port(nativeContainerPort)). - WithStartupTimeout(startupTimeout). - WithPollInterval(5 * time.Second) - waitN2 := wait.ForListeningPort(nat.Port(nativeContainerPort)). - WithStartupTimeout(startupTimeout). - WithPollInterval(5 * time.Second) + waitForPort := func() *wait.HostPortStrategy { + return wait.ForListeningPort(nat.Port(nativeContainerPort)). + WithStartupTimeout(startupTimeout). + WithPollInterval(5 * time.Second) + } stack, err := compose.NewDockerComposeWith( compose.StackIdentifier(identifier), @@ -78,48 +81,48 @@ func setupNativeCluster(t *testing.T) *nativeClusterState { require.NoError(t, err, "create native compose stack") execErr := stack. - WaitForService(nativeServiceN1, waitN1). - WaitForService(nativeServiceN2, waitN2). + WaitForService(nativeServiceN1, waitForPort()). + WaitForService(nativeServiceN2, waitForPort()). + WaitForService(nativeServiceN3, waitForPort()). Up(ctx, compose.Wait(true)) require.NoError(t, execErr, "start native compose stack") state := &nativeClusterState{stack: stack} - // Get mapped host/port for n1 - n1Container, err := stack.ServiceContainer(ctx, nativeServiceN1) - require.NoError(t, err, "get native-n1 container") - n1Host, err := n1Container.Host(ctx) - require.NoError(t, err, "get native-n1 host") cPort, err := nat.NewPort("tcp", "5432") require.NoError(t, err) - n1MappedPort, err := n1Container.MappedPort(ctx, cPort) - require.NoError(t, err, "get native-n1 mapped port") - state.n1Host = n1Host - state.n1Port = n1MappedPort.Port() - - // Get mapped host/port for n2 - n2Container, err := stack.ServiceContainer(ctx, nativeServiceN2) - require.NoError(t, err, "get native-n2 container") - n2Host, err := n2Container.Host(ctx) - require.NoError(t, err, "get native-n2 host") - n2MappedPort, err := n2Container.MappedPort(ctx, cPort) - require.NoError(t, err, "get native-n2 mapped port") - state.n2Host = n2Host - state.n2Port = n2MappedPort.Port() + + // Helper to extract mapped host/port for a service. + getHostPort := func(service string) (string, string) { + c, cErr := stack.ServiceContainer(ctx, service) + require.NoError(t, cErr, "get %s container", service) + h, hErr := c.Host(ctx) + require.NoError(t, hErr, "get %s host", service) + p, pErr := c.MappedPort(ctx, cPort) + require.NoError(t, pErr, "get %s mapped port", service) + return h, p.Port() + } + + state.n1Host, state.n1Port = getHostPort(nativeServiceN1) + state.n2Host, state.n2Port = getHostPort(nativeServiceN2) + state.n3Host, state.n3Port = getHostPort(nativeServiceN3) // Connect state.n1Pool, err = connectToNode(state.n1Host, state.n1Port, nativeUser, nativePassword, nativeDBName) require.NoError(t, err, "connect to native-n1") state.n2Pool, err = connectToNode(state.n2Host, state.n2Port, nativeUser, nativePassword, nativeDBName) require.NoError(t, err, "connect to native-n2") + state.n3Pool, err = connectToNode(state.n3Host, state.n3Port, nativeUser, nativePassword, nativeDBName) + require.NoError(t, err, "connect to native-n3") - // Create pgcrypto extension on both nodes - for _, pool := range []*pgxpool.Pool{state.n1Pool, state.n2Pool} { + // Create pgcrypto extension on all nodes + for _, pool := range []*pgxpool.Pool{state.n1Pool, state.n2Pool, state.n3Pool} { _, err = pool.Exec(ctx, "CREATE EXTENSION IF NOT EXISTS pgcrypto") require.NoError(t, err, "create pgcrypto extension") } - log.Printf("Native PG cluster ready: n1=%s:%s, n2=%s:%s", state.n1Host, state.n1Port, state.n2Host, state.n2Port) + log.Printf("Native PG cluster ready: n1=%s:%s, n2=%s:%s, n3=%s:%s", + state.n1Host, state.n1Port, state.n2Host, state.n2Port, state.n3Host, state.n3Port) return state } @@ -131,6 +134,9 @@ func (s *nativeClusterState) teardown(t *testing.T) { if s.n2Pool != nil { s.n2Pool.Close() } + if s.n3Pool != nil { + s.n3Pool.Close() + } if s.stack != nil { execErr := s.stack.Down( context.Background(), @@ -477,112 +483,128 @@ func getNativeReplicationOrigin(t *testing.T, ctx context.Context, pool *pgxpool return *roidentStr } -// testNativePreserveOrigin verifies origin tracking on native PG with real -// logical replication: -// 1. Set up logical replication (publication on n1, subscription on n2) -// 2. Insert data on n1, wait for streaming replication to n2 -// 3. Verify GetNodeOriginNames maps roident → subscription name -// 4. Verify replicated rows on n2 have origin tracked via pg_xact_commit_timestamp_origin -// 5. Verify the origin resolves to the subscription's pg_replication_origin entry -// 6. Delete rows on n2, run diff + repair, verify rows restored +// testNativePreserveOrigin verifies the full preserve-origin cycle on native PG +// with 3 nodes and real logical replication. Mirrors TestTableRepair_PreserveOrigin +// (the spock version). // -// Note: preserve-origin cannot fully restore origins in a 2-node setup because -// the source-of-truth node (n1) has origin="local" for rows it wrote. A 3-node -// setup (like the spock PreserveOrigin test) is needed for full origin preservation. +// Topology: data originates on n3, replicates to n1 and n2 via subscriptions. +// Both n1 and n2 see rows with a non-local origin (n3's subscription roident). +// We delete rows on n2, diff n1 vs n2, and repair with preserve-origin. Since +// n1's rows have non-local origin metadata, the repair can preserve it. +// +// 1. Create table on all 3 nodes +// 2. Set up publications on n3, subscriptions on n1 and n2 +// 3. Insert data on n3, wait for replication to n1 and n2 +// 4. Verify GetNodeOriginNames maps roident → subscription name +// 5. Capture baseline origins and timestamps on n1 +// 6. Delete rows on n2 to simulate data loss +// 7. Run table-diff → preserve-origin repair (n1 as source of truth) +// 8. Verify repaired rows on n2 retain the original origin and timestamps func testNativePreserveOrigin(t *testing.T, state *nativeClusterState, env *testEnv) { ctx := context.Background() tableName := "native_preserve_origin_test" qualifiedTableName := fmt.Sprintf("public.%s", tableName) - subName := "preserve_origin_sub" pubName := "preserve_origin_pub" + subN1Name := "sub_n3_to_n1" + subN2Name := "sub_n3_to_n2" - // Create table on both nodes. - for _, pool := range []*pgxpool.Pool{state.n1Pool, state.n2Pool} { + // Create table on all 3 nodes. + for _, pool := range []*pgxpool.Pool{state.n1Pool, state.n2Pool, state.n3Pool} { _, err := pool.Exec(ctx, fmt.Sprintf( "CREATE TABLE IF NOT EXISTS %s (id INT PRIMARY KEY, data TEXT)", qualifiedTableName)) require.NoError(t, err) } - // Create publication on n1 and subscription on n2. - // Use copy_data=false so the initial table sync doesn't use a transient - // replication origin (which PG deletes after sync, leaving rows with a - // defunct roident). With copy_data=false, data inserted after the - // subscription starts streaming uses the subscription's main origin. - _, err := state.n1Pool.Exec(ctx, fmt.Sprintf("CREATE PUBLICATION %s FOR TABLE %s", pubName, qualifiedTableName)) + // Publication on n3. + _, err := state.n3Pool.Exec(ctx, fmt.Sprintf("CREATE PUBLICATION %s FOR TABLE %s", pubName, qualifiedTableName)) require.NoError(t, err) - connStr := fmt.Sprintf("host=%s port=5432 dbname=%s user=%s password=%s", - nativeServiceN1, nativeDBName, nativeUser, nativePassword) + // Subscriptions on n1 and n2 pointing at n3 (Docker-internal hostname). + // Use copy_data=false to avoid transient sync origins. + n3ConnStr := fmt.Sprintf("host=%s port=5432 dbname=%s user=%s password=%s", + nativeServiceN3, nativeDBName, nativeUser, nativePassword) + _, err = state.n1Pool.Exec(ctx, fmt.Sprintf( + "CREATE SUBSCRIPTION %s CONNECTION '%s' PUBLICATION %s WITH (copy_data = false)", + subN1Name, n3ConnStr, pubName)) + require.NoError(t, err) _, err = state.n2Pool.Exec(ctx, fmt.Sprintf( "CREATE SUBSCRIPTION %s CONNECTION '%s' PUBLICATION %s WITH (copy_data = false)", - subName, connStr, pubName)) + subN2Name, n3ConnStr, pubName)) require.NoError(t, err) t.Cleanup(func() { - state.n2Pool.Exec(ctx, fmt.Sprintf("DROP SUBSCRIPTION IF EXISTS %s", subName)) - state.n1Pool.Exec(ctx, fmt.Sprintf("DROP PUBLICATION IF EXISTS %s", pubName)) - for _, pool := range []*pgxpool.Pool{state.n1Pool, state.n2Pool} { + state.n1Pool.Exec(ctx, fmt.Sprintf("DROP SUBSCRIPTION IF EXISTS %s", subN1Name)) + state.n2Pool.Exec(ctx, fmt.Sprintf("DROP SUBSCRIPTION IF EXISTS %s", subN2Name)) + state.n3Pool.Exec(ctx, fmt.Sprintf("DROP PUBLICATION IF EXISTS %s", pubName)) + for _, pool := range []*pgxpool.Pool{state.n1Pool, state.n2Pool, state.n3Pool} { pool.Exec(ctx, fmt.Sprintf("DROP TABLE IF EXISTS %s CASCADE", qualifiedTableName)) } }) - // Brief pause for the subscription to connect and start streaming. + // Brief pause for subscriptions to connect and start streaming. time.Sleep(2 * time.Second) - // Insert data on n1 AFTER subscription is streaming. + // Insert data on n3. The streaming apply workers on n1 and n2 will use + // each subscription's main replication origin. sampleIDs := []int{1, 2, 3, 4, 5} for _, id := range sampleIDs { - _, err := state.n1Pool.Exec(ctx, + _, err := state.n3Pool.Exec(ctx, fmt.Sprintf("INSERT INTO %s (id, data) VALUES ($1, $2)", qualifiedTableName), id, fmt.Sprintf("row_%d", id)) require.NoError(t, err) } - assertEventually(t, 30*time.Second, func() error { - var count int - if err := state.n2Pool.QueryRow(ctx, - fmt.Sprintf("SELECT count(*) FROM %s", qualifiedTableName)).Scan(&count); err != nil { - return err - } - if count < len(sampleIDs) { - return fmt.Errorf("expected %d rows on n2, got %d", len(sampleIDs), count) - } - return nil - }) - log.Println("Replication complete: all rows present on n2 (via streaming)") + // Wait for replication to both n1 and n2. + for _, pool := range []*pgxpool.Pool{state.n1Pool, state.n2Pool} { + p := pool // capture for closure + assertEventually(t, 30*time.Second, func() error { + var count int + if err := p.QueryRow(ctx, + fmt.Sprintf("SELECT count(*) FROM %s", qualifiedTableName)).Scan(&count); err != nil { + return err + } + if count < len(sampleIDs) { + return fmt.Errorf("expected %d rows, got %d", len(sampleIDs), count) + } + return nil + }) + } + log.Println("Replication complete: all rows present on n1 and n2 (via streaming from n3)") - // --- Verify GetNodeOriginNames maps roident → subscription name --- - names, err := queries.GetNodeOriginNames(ctx, state.n2Pool) + // --- Verify GetNodeOriginNames on n1 maps roident → subscription name --- + names, err := queries.GetNodeOriginNames(ctx, state.n1Pool) require.NoError(t, err) found := false - var subRoident string - for id, name := range names { - if name == subName { + for _, name := range names { + if name == subN1Name { found = true - subRoident = id break } } - require.True(t, found, "GetNodeOriginNames should contain subscription %q, got: %v", subName, names) - log.Printf("GetNodeOriginNames on n2: %v (subscription roident=%s)", names, subRoident) + require.True(t, found, "GetNodeOriginNames on n1 should contain subscription %q, got: %v", subN1Name, names) + log.Printf("GetNodeOriginNames on n1: %v", names) - // --- Verify replicated rows on n2 have non-local origin --- + // --- Capture baseline origins and timestamps on n1 (source of truth) --- + originalOrigins := make(map[int]string) + originalTimestamps := make(map[int]time.Time) for _, id := range sampleIDs { - origin := getNativeReplicationOrigin(t, ctx, state.n2Pool, qualifiedTableName, id) - require.NotEmpty(t, origin, - "Row %d on n2 should have a replication origin (was replicated from n1)", id) - log.Printf("Row %d on n2: origin=%s", id, origin) + origin := getNativeReplicationOrigin(t, ctx, state.n1Pool, qualifiedTableName, id) + originalOrigins[id] = origin + originalTimestamps[id] = getCommitTimestamp(t, ctx, state.n1Pool, qualifiedTableName, id) + log.Printf("Row %d on n1: origin=%s ts=%s", id, origin, originalTimestamps[id].Format(time.RFC3339Nano)) } - // --- Verify rows on n1 are "local" origin --- + // All rows on n1 were replicated from n3, so they must have non-local origin. for _, id := range sampleIDs { - origin := getNativeReplicationOrigin(t, ctx, state.n1Pool, qualifiedTableName, id) - assert.Empty(t, origin, - "Row %d on n1 should have local origin (roident=0), got %q", id, origin) + require.NotEmpty(t, originalOrigins[id], + "Row %d on n1 should have a replication origin (was replicated from n3)", id) } - log.Println("Origin tracking verified: n2 rows have subscription origin, n1 rows are local") + log.Println("Baseline verified: all sample rows on n1 have non-local origin (from n3)") + + // Wait to ensure original timestamps are clearly in the past. + time.Sleep(2 * time.Second) - // --- Simulate data loss on n2 and verify basic repair works --- + // --- Simulate data loss on n2 --- log.Println("Simulating data loss on n2...") tx, err := state.n2Pool.Begin(ctx) require.NoError(t, err) @@ -593,37 +615,70 @@ func testNativePreserveOrigin(t *testing.T, state *nativeClusterState, env *test require.NoError(t, err) } require.NoError(t, tx.Commit(ctx)) + log.Printf("Deleted %d rows from n2", len(sampleIDs)) - // Run table-diff. + // --- Run table-diff --- diffTask := env.newTableDiffTask(t, qualifiedTableName, []string{nativeServiceN1, nativeServiceN2}) require.NoError(t, diffTask.RunChecks(false)) require.NoError(t, diffTask.ExecuteTask()) diffFile := getLatestDiffFile(t) require.NotEmpty(t, diffFile) + log.Printf("Diff file: %s", diffFile) - // Run repair (recovery mode). + // --- Run preserve-origin repair --- repairTask := env.newTableRepairTask(nativeServiceN1, qualifiedTableName, diffFile) repairTask.RecoveryMode = true + repairTask.PreserveOrigin = true err = repairTask.Run(false) require.NoError(t, err) if repairTask.TaskStatus == "FAILED" { t.Fatalf("Repair failed: %s", repairTask.TaskContext) } + log.Println("Preserve-origin repair completed") - // Verify all rows are restored. + // --- Verify all rows restored --- var count int err = state.n2Pool.QueryRow(ctx, fmt.Sprintf("SELECT count(*) FROM %s", qualifiedTableName)).Scan(&count) require.NoError(t, err) require.Equal(t, len(sampleIDs), count, "All rows should be restored after repair") - // Verify row content matches. + // --- Verify row content --- for _, id := range sampleIDs { var data string err := state.n2Pool.QueryRow(ctx, fmt.Sprintf("SELECT data FROM %s WHERE id = $1", qualifiedTableName), id).Scan(&data) require.NoError(t, err) - assert.Equal(t, fmt.Sprintf("row_%d", id), data, "Row %d data mismatch", id) + require.Equal(t, fmt.Sprintf("row_%d", id), data, "Row %d data mismatch", id) } - log.Println("Native PG origin tracking and repair verified successfully") + + // --- Verify origins are preserved --- + // After preserve-origin repair, repaired rows on n2 should have an + // ACE-created origin that encodes the original source (n3 via n1). + for _, id := range sampleIDs { + repairedOrigin := getNativeReplicationOrigin(t, ctx, state.n2Pool, qualifiedTableName, id) + require.NotEmpty(t, repairedOrigin, + "Row %d should have a replication origin after preserve-origin repair", id) + log.Printf("Row %d: n1 origin=%s, repaired n2 origin=%s", id, originalOrigins[id], repairedOrigin) + } + + // --- Verify timestamps are preserved --- + preservedCount := 0 + for _, id := range sampleIDs { + repairedTS := getCommitTimestamp(t, ctx, state.n2Pool, qualifiedTableName, id) + originalTS := originalTimestamps[id] + tsTrunc := repairedTS.Truncate(time.Microsecond) + originalTsTrunc := originalTS.Truncate(time.Microsecond) + if tsTrunc.Equal(originalTsTrunc) { + preservedCount++ + log.Printf(" Row %d: timestamp PRESERVED (%s)", id, repairedTS.Format(time.RFC3339Nano)) + } else { + log.Printf(" Row %d: timestamp NOT preserved (original=%s repaired=%s)", + id, originalTS.Format(time.RFC3339Nano), repairedTS.Format(time.RFC3339Nano)) + } + } + require.Equal(t, len(sampleIDs), preservedCount, + "All rows must have commit timestamp preserved: %d/%d", preservedCount, len(sampleIDs)) + + log.Printf("Native PG preserve-origin verified: %d/%d origins and timestamps preserved", preservedCount, len(sampleIDs)) }