diff --git a/internal/consistency/diff/table_diff.go b/internal/consistency/diff/table_diff.go index 31f72de..100c8af 100644 --- a/internal/consistency/diff/table_diff.go +++ b/internal/consistency/diff/table_diff.go @@ -102,7 +102,7 @@ type TableDiffTask struct { blockHashSQLCache map[hashBoundsKey]string blockHashSQLMu sync.Mutex - NodeOriginNames map[string]string + NodeOriginNames map[string]map[string]string CompareUnitSize int MaxDiffRows int64 @@ -204,44 +204,57 @@ func (t *TableDiffTask) loadNodeOriginNames() error { return nil } - var firstPool *pgxpool.Pool - for _, pool := range t.Pools { - firstPool = pool - break - } + t.NodeOriginNames = make(map[string]map[string]string) - if firstPool == nil { - t.NodeOriginNames = make(map[string]string) - return fmt.Errorf("no connection pool available to load node origin names") + var lastErr error + for name, pool := range t.Pools { + names, err := queries.GetNodeOriginNames(t.Ctx, pool) + if err != nil { + lastErr = err + continue + } + t.NodeOriginNames[name] = names } - names, err := queries.GetNodeOriginNames(t.Ctx, firstPool) - if err != nil { - t.NodeOriginNames = make(map[string]string) - return err + if len(t.NodeOriginNames) == 0 && lastErr != nil { + return lastErr } - - t.NodeOriginNames = names return nil } +// flatNodeOriginNames merges all per-node origin maps into a single map for +// lookup purposes (e.g. resolving --against-origin). If the same roident +// appears on multiple nodes, the last one wins — this is acceptable because +// resolveAgainstOrigin is only used with spock (where roidents are global) +// or for user-facing name resolution where any match suffices. +func (t *TableDiffTask) flatNodeOriginNames() map[string]string { + flat := make(map[string]string) + for _, nodeMap := range t.NodeOriginNames { + for id, name := range nodeMap { + flat[id] = name + } + } + return flat +} + func (t *TableDiffTask) resolveAgainstOrigin() error { if strings.TrimSpace(t.AgainstOrigin) == "" { return nil } - if len(t.NodeOriginNames) == 0 { + flat := t.flatNodeOriginNames() + if len(flat) == 0 { return fmt.Errorf("unable to resolve --against-origin: no node origin names available") } orig := strings.TrimSpace(t.AgainstOrigin) // direct match on origin id - if _, ok := t.NodeOriginNames[orig]; ok { + if _, ok := flat[orig]; ok { t.resolvedAgainstOrigin = orig return nil } // match on origin name - for id, name := range t.NodeOriginNames { + for id, name := range flat { if name == orig { t.resolvedAgainstOrigin = id return nil @@ -249,8 +262,8 @@ func (t *TableDiffTask) resolveAgainstOrigin() error { } // build a list of available origins for the error message - available := make([]string, 0, len(t.NodeOriginNames)) - for id, name := range t.NodeOriginNames { + available := make([]string, 0, len(flat)) + for id, name := range flat { if id != name { available = append(available, fmt.Sprintf("%s (%s)", id, name)) } else { @@ -294,8 +307,8 @@ func (t *TableDiffTask) buildEffectiveFilter() (string, error) { return strings.Join(parts, " AND "), nil } -func (t *TableDiffTask) withSpockMetadata(row map[string]any) map[string]any { - row["node_origin"] = utils.TranslateNodeOrigin(row["node_origin"], t.NodeOriginNames) +func (t *TableDiffTask) withSpockMetadata(row map[string]any, nodeName string) map[string]any { + row["node_origin"] = utils.TranslateNodeOrigin(row["node_origin"], t.NodeOriginNames[nodeName]) return utils.AddSpockMetadata(row) } @@ -1375,8 +1388,9 @@ func (t *TableDiffTask) ExecuteTask() (err error) { DiffRowsCount: make(map[string]int), AgainstOrigin: t.AgainstOrigin, AgainstOriginResolved: func() string { - if t.resolvedAgainstOrigin != "" && t.NodeOriginNames != nil { - if name, ok := t.NodeOriginNames[t.resolvedAgainstOrigin]; ok { + if t.resolvedAgainstOrigin != "" { + flat := t.flatNodeOriginNames() + if name, ok := flat[t.resolvedAgainstOrigin]; ok { return name } } @@ -1995,7 +2009,7 @@ func (t *TableDiffTask) recursiveDiff( break } rowAsMap := utils.OrderedMapToMap(row) - rowWithMeta := t.withSpockMetadata(rowAsMap) + rowWithMeta := t.withSpockMetadata(rowAsMap, node1Name) rowAsOrderedMap := utils.MapToOrderedMap(rowWithMeta, t.Cols) t.DiffResult.NodeDiffs[pairKey].Rows[node1Name] = append(t.DiffResult.NodeDiffs[pairKey].Rows[node1Name], rowAsOrderedMap) currentDiffRowsForPair++ @@ -2012,7 +2026,7 @@ func (t *TableDiffTask) recursiveDiff( break } rowAsMap := utils.OrderedMapToMap(row) - rowWithMeta := t.withSpockMetadata(rowAsMap) + rowWithMeta := t.withSpockMetadata(rowAsMap, node2Name) rowAsOrderedMap := utils.MapToOrderedMap(rowWithMeta, t.Cols) t.DiffResult.NodeDiffs[pairKey].Rows[node2Name] = append(t.DiffResult.NodeDiffs[pairKey].Rows[node2Name], rowAsOrderedMap) currentDiffRowsForPair++ @@ -2030,12 +2044,12 @@ func (t *TableDiffTask) recursiveDiff( break } node1DataAsMap := utils.OrderedMapToMap(modRow.Node1Data) - node1DataWithMeta := t.withSpockMetadata(node1DataAsMap) + node1DataWithMeta := t.withSpockMetadata(node1DataAsMap, node1Name) node1DataAsOrderedMap := utils.MapToOrderedMap(node1DataWithMeta, t.Cols) t.DiffResult.NodeDiffs[pairKey].Rows[node1Name] = append(t.DiffResult.NodeDiffs[pairKey].Rows[node1Name], node1DataAsOrderedMap) node2DataAsMap := utils.OrderedMapToMap(modRow.Node2Data) - node2DataWithMeta := t.withSpockMetadata(node2DataAsMap) + node2DataWithMeta := t.withSpockMetadata(node2DataAsMap, node2Name) node2DataAsOrderedMap := utils.MapToOrderedMap(node2DataWithMeta, t.Cols) t.DiffResult.NodeDiffs[pairKey].Rows[node2Name] = append(t.DiffResult.NodeDiffs[pairKey].Rows[node2Name], node2DataAsOrderedMap) currentDiffRowsForPair++ diff --git a/internal/consistency/diff/table_diff_origin_test.go b/internal/consistency/diff/table_diff_origin_test.go index bafd7aa..5ffc7b7 100644 --- a/internal/consistency/diff/table_diff_origin_test.go +++ b/internal/consistency/diff/table_diff_origin_test.go @@ -40,7 +40,7 @@ func TestResolveAgainstOrigin_NoNodeOriginNames(t *testing.T) { func TestResolveAgainstOrigin_MatchByID(t *testing.T) { task := &TableDiffTask{ AgainstOrigin: "3", - NodeOriginNames: map[string]string{"3": "n1", "4": "n2"}, + NodeOriginNames: map[string]map[string]string{"node1": {"3": "n1", "4": "n2"}}, } if err := task.resolveAgainstOrigin(); err != nil { t.Fatalf("unexpected error: %v", err) @@ -53,7 +53,7 @@ func TestResolveAgainstOrigin_MatchByID(t *testing.T) { func TestResolveAgainstOrigin_MatchByName_SpockNodeName(t *testing.T) { task := &TableDiffTask{ AgainstOrigin: "n1", - NodeOriginNames: map[string]string{"3": "n1", "4": "n2"}, + NodeOriginNames: map[string]map[string]string{"node1": {"3": "n1", "4": "n2"}}, } if err := task.resolveAgainstOrigin(); err != nil { t.Fatalf("unexpected error: %v", err) @@ -64,10 +64,12 @@ func TestResolveAgainstOrigin_MatchByName_SpockNodeName(t *testing.T) { } func TestResolveAgainstOrigin_MatchByName_SubscriptionName(t *testing.T) { - // Native PG: NodeOriginNames maps roident -> subscription name + // Native PG: NodeOriginNames maps roident -> subscription name, per node task := &TableDiffTask{ - AgainstOrigin: "sub_n1_to_n2", - NodeOriginNames: map[string]string{"5": "sub_n1_to_n2", "6": "sub_n3_to_n2"}, + AgainstOrigin: "sub_n1_to_n2", + NodeOriginNames: map[string]map[string]string{ + "node1": {"5": "sub_n1_to_n2", "6": "sub_n3_to_n2"}, + }, } if err := task.resolveAgainstOrigin(); err != nil { t.Fatalf("unexpected error: %v", err) @@ -80,7 +82,7 @@ func TestResolveAgainstOrigin_MatchByName_SubscriptionName(t *testing.T) { func TestResolveAgainstOrigin_NoMatch(t *testing.T) { task := &TableDiffTask{ AgainstOrigin: "nonexistent", - NodeOriginNames: map[string]string{"3": "n1", "4": "n2"}, + NodeOriginNames: map[string]map[string]string{"node1": {"3": "n1", "4": "n2"}}, } err := task.resolveAgainstOrigin() if err == nil { @@ -133,3 +135,45 @@ func TestBuildEffectiveFilter_Empty(t *testing.T) { t.Fatalf("expected empty filter, got: %s", filter) } } + +func TestWithSpockMetadata_PerNodeTranslation(t *testing.T) { + // Simulate native PG: same roident "1" maps to different names on different nodes + task := &TableDiffTask{ + NodeOriginNames: map[string]map[string]string{ + "n1": {"1": "sub_n3_to_n1"}, + "n2": {"1": "sub_n3_to_n2"}, + }, + } + + row1 := map[string]any{"node_origin": "1", "id": 1} + result1 := task.withSpockMetadata(row1, "n1") + meta1 := result1["_spock_metadata_"].(map[string]any) + if meta1["node_origin"] != "sub_n3_to_n1" { + t.Fatalf("n1 row: expected origin sub_n3_to_n1, got %v", meta1["node_origin"]) + } + + row2 := map[string]any{"node_origin": "1", "id": 2} + result2 := task.withSpockMetadata(row2, "n2") + meta2 := result2["_spock_metadata_"].(map[string]any) + if meta2["node_origin"] != "sub_n3_to_n2" { + t.Fatalf("n2 row: expected origin sub_n3_to_n2, got %v", meta2["node_origin"]) + } +} + +func TestFlatNodeOriginNames_MergesAllNodes(t *testing.T) { + task := &TableDiffTask{ + NodeOriginNames: map[string]map[string]string{ + "n1": {"1": "sub_n3_to_n1"}, + "n2": {"1": "sub_n3_to_n2", "2": "sub_n4_to_n2"}, + }, + } + flat := task.flatNodeOriginNames() + // roident "2" should always be present + if flat["2"] != "sub_n4_to_n2" { + t.Fatalf("expected flat[2]=sub_n4_to_n2, got %q", flat["2"]) + } + // roident "1" will be one of the two — either is valid for flattened lookup + if flat["1"] != "sub_n3_to_n1" && flat["1"] != "sub_n3_to_n2" { + t.Fatalf("expected flat[1] to be one of the subscription names, got %q", flat["1"]) + } +} diff --git a/internal/consistency/diff/table_rerun.go b/internal/consistency/diff/table_rerun.go index ca4b1ac..2f177e8 100644 --- a/internal/consistency/diff/table_rerun.go +++ b/internal/consistency/diff/table_rerun.go @@ -374,12 +374,12 @@ func (t *TableDiffTask) reCompareDiffs(fetchedRowsByNode map[string]map[string]t persistentDiffCount++ if nowOnNode1 { rowAsMap := utils.OrderedMapToMap(newRow1) - rowWithMeta := t.withSpockMetadata(rowAsMap) + rowWithMeta := t.withSpockMetadata(rowAsMap, node1) newDiffsForPair.Rows[node1] = append(newDiffsForPair.Rows[node1], utils.MapToOrderedMap(rowWithMeta, t.Cols)) } if nowOnNode2 { rowAsMap := utils.OrderedMapToMap(newRow2) - rowWithMeta := t.withSpockMetadata(rowAsMap) + rowWithMeta := t.withSpockMetadata(rowAsMap, node2) newDiffsForPair.Rows[node2] = append(newDiffsForPair.Rows[node2], utils.MapToOrderedMap(rowWithMeta, t.Cols)) } } diff --git a/internal/consistency/mtree/merkle.go b/internal/consistency/mtree/merkle.go index 0736ce1..4b6bd03 100644 --- a/internal/consistency/mtree/merkle.go +++ b/internal/consistency/mtree/merkle.go @@ -99,7 +99,7 @@ type MerkleTreeTask struct { diffMutex sync.Mutex diffRowKeySets map[string]map[string]map[string]struct{} StartTime time.Time - NodeOriginNames map[string]string + NodeOriginNames map[string]map[string]string Ctx context.Context } @@ -510,8 +510,14 @@ func (m *MerkleTreeTask) loadNodeOriginNames() error { return nil } + m.NodeOriginNames = make(map[string]map[string]string) + var lastErr error for _, nodeInfo := range m.ClusterNodes { + nodeName, _ := nodeInfo["Name"].(string) + if nodeName == "" { + continue + } pool, err := auth.GetClusterNodeConnection(m.Ctx, nodeInfo, m.connOpts()) if err != nil { lastErr = err @@ -523,15 +529,13 @@ func (m *MerkleTreeTask) loadNodeOriginNames() error { lastErr = err continue } - m.NodeOriginNames = names - return nil + m.NodeOriginNames[nodeName] = names } - m.NodeOriginNames = make(map[string]string) - if lastErr != nil { + if len(m.NodeOriginNames) == 0 && lastErr != nil { return lastErr } - return fmt.Errorf("no nodes available to load node origin names") + return nil } func (m *MerkleTreeTask) appendDiffs(nodePairKey string, work CompareRangesWorkItem, pr1, pr2 []types.OrderedMap) error { @@ -603,7 +607,7 @@ func (m *MerkleTreeTask) addRowToDiff(nodePairKey, nodeName string, row types.Or } rowMap := utils.OrderedMapToMap(row) - rowMap["node_origin"] = utils.TranslateNodeOrigin(rowMap["node_origin"], m.NodeOriginNames) + rowMap["node_origin"] = utils.TranslateNodeOrigin(rowMap["node_origin"], m.NodeOriginNames[nodeName]) rowWithMeta := utils.AddSpockMetadata(rowMap) orderedRow := utils.MapToOrderedMap(rowWithMeta, m.Cols)