Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 42 additions & 28 deletions internal/consistency/diff/table_diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ type TableDiffTask struct {
blockHashSQLCache map[hashBoundsKey]string
blockHashSQLMu sync.Mutex

NodeOriginNames map[string]string
NodeOriginNames map[string]map[string]string

CompareUnitSize int
MaxDiffRows int64
Expand Down Expand Up @@ -204,53 +204,66 @@ func (t *TableDiffTask) loadNodeOriginNames() error {
return nil
}

var firstPool *pgxpool.Pool
for _, pool := range t.Pools {
firstPool = pool
break
}
t.NodeOriginNames = make(map[string]map[string]string)

if firstPool == nil {
t.NodeOriginNames = make(map[string]string)
return fmt.Errorf("no connection pool available to load node origin names")
var lastErr error
for name, pool := range t.Pools {
names, err := queries.GetNodeOriginNames(t.Ctx, pool)
if err != nil {
lastErr = err
continue
}
t.NodeOriginNames[name] = names
}

names, err := queries.GetNodeOriginNames(t.Ctx, firstPool)
if err != nil {
t.NodeOriginNames = make(map[string]string)
return err
if len(t.NodeOriginNames) == 0 && lastErr != nil {
return lastErr
}

t.NodeOriginNames = names
return nil
}

// flatNodeOriginNames merges all per-node origin maps into a single map for
// lookup purposes (e.g. resolving --against-origin). If the same roident
// appears on multiple nodes, the last one wins — this is acceptable because
// resolveAgainstOrigin is only used with spock (where roidents are global)
// or for user-facing name resolution where any match suffices.
func (t *TableDiffTask) flatNodeOriginNames() map[string]string {
flat := make(map[string]string)
for _, nodeMap := range t.NodeOriginNames {
for id, name := range nodeMap {
flat[id] = name
}
}
return flat
}

func (t *TableDiffTask) resolveAgainstOrigin() error {
if strings.TrimSpace(t.AgainstOrigin) == "" {
return nil
}
if len(t.NodeOriginNames) == 0 {
flat := t.flatNodeOriginNames()
if len(flat) == 0 {
return fmt.Errorf("unable to resolve --against-origin: no node origin names available")
}

orig := strings.TrimSpace(t.AgainstOrigin)
// direct match on origin id
if _, ok := t.NodeOriginNames[orig]; ok {
if _, ok := flat[orig]; ok {
t.resolvedAgainstOrigin = orig
return nil
}

// match on origin name
for id, name := range t.NodeOriginNames {
for id, name := range flat {
if name == orig {
t.resolvedAgainstOrigin = id
return nil
}
}

// build a list of available origins for the error message
available := make([]string, 0, len(t.NodeOriginNames))
for id, name := range t.NodeOriginNames {
available := make([]string, 0, len(flat))
for id, name := range flat {
if id != name {
available = append(available, fmt.Sprintf("%s (%s)", id, name))
} else {
Expand Down Expand Up @@ -294,8 +307,8 @@ func (t *TableDiffTask) buildEffectiveFilter() (string, error) {
return strings.Join(parts, " AND "), nil
}

func (t *TableDiffTask) withSpockMetadata(row map[string]any) map[string]any {
row["node_origin"] = utils.TranslateNodeOrigin(row["node_origin"], t.NodeOriginNames)
func (t *TableDiffTask) withSpockMetadata(row map[string]any, nodeName string) map[string]any {
row["node_origin"] = utils.TranslateNodeOrigin(row["node_origin"], t.NodeOriginNames[nodeName])
return utils.AddSpockMetadata(row)
}

Expand Down Expand Up @@ -1375,8 +1388,9 @@ func (t *TableDiffTask) ExecuteTask() (err error) {
DiffRowsCount: make(map[string]int),
AgainstOrigin: t.AgainstOrigin,
AgainstOriginResolved: func() string {
if t.resolvedAgainstOrigin != "" && t.NodeOriginNames != nil {
if name, ok := t.NodeOriginNames[t.resolvedAgainstOrigin]; ok {
if t.resolvedAgainstOrigin != "" {
flat := t.flatNodeOriginNames()
if name, ok := flat[t.resolvedAgainstOrigin]; ok {
return name
}
}
Expand Down Expand Up @@ -1995,7 +2009,7 @@ func (t *TableDiffTask) recursiveDiff(
break
}
rowAsMap := utils.OrderedMapToMap(row)
rowWithMeta := t.withSpockMetadata(rowAsMap)
rowWithMeta := t.withSpockMetadata(rowAsMap, node1Name)
rowAsOrderedMap := utils.MapToOrderedMap(rowWithMeta, t.Cols)
t.DiffResult.NodeDiffs[pairKey].Rows[node1Name] = append(t.DiffResult.NodeDiffs[pairKey].Rows[node1Name], rowAsOrderedMap)
currentDiffRowsForPair++
Expand All @@ -2012,7 +2026,7 @@ func (t *TableDiffTask) recursiveDiff(
break
}
rowAsMap := utils.OrderedMapToMap(row)
rowWithMeta := t.withSpockMetadata(rowAsMap)
rowWithMeta := t.withSpockMetadata(rowAsMap, node2Name)
rowAsOrderedMap := utils.MapToOrderedMap(rowWithMeta, t.Cols)
t.DiffResult.NodeDiffs[pairKey].Rows[node2Name] = append(t.DiffResult.NodeDiffs[pairKey].Rows[node2Name], rowAsOrderedMap)
currentDiffRowsForPair++
Expand All @@ -2030,12 +2044,12 @@ func (t *TableDiffTask) recursiveDiff(
break
}
node1DataAsMap := utils.OrderedMapToMap(modRow.Node1Data)
node1DataWithMeta := t.withSpockMetadata(node1DataAsMap)
node1DataWithMeta := t.withSpockMetadata(node1DataAsMap, node1Name)
node1DataAsOrderedMap := utils.MapToOrderedMap(node1DataWithMeta, t.Cols)
t.DiffResult.NodeDiffs[pairKey].Rows[node1Name] = append(t.DiffResult.NodeDiffs[pairKey].Rows[node1Name], node1DataAsOrderedMap)

node2DataAsMap := utils.OrderedMapToMap(modRow.Node2Data)
node2DataWithMeta := t.withSpockMetadata(node2DataAsMap)
node2DataWithMeta := t.withSpockMetadata(node2DataAsMap, node2Name)
node2DataAsOrderedMap := utils.MapToOrderedMap(node2DataWithMeta, t.Cols)
t.DiffResult.NodeDiffs[pairKey].Rows[node2Name] = append(t.DiffResult.NodeDiffs[pairKey].Rows[node2Name], node2DataAsOrderedMap)
currentDiffRowsForPair++
Expand Down
56 changes: 50 additions & 6 deletions internal/consistency/diff/table_diff_origin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestResolveAgainstOrigin_NoNodeOriginNames(t *testing.T) {
func TestResolveAgainstOrigin_MatchByID(t *testing.T) {
task := &TableDiffTask{
AgainstOrigin: "3",
NodeOriginNames: map[string]string{"3": "n1", "4": "n2"},
NodeOriginNames: map[string]map[string]string{"node1": {"3": "n1", "4": "n2"}},
}
if err := task.resolveAgainstOrigin(); err != nil {
t.Fatalf("unexpected error: %v", err)
Expand All @@ -53,7 +53,7 @@ func TestResolveAgainstOrigin_MatchByID(t *testing.T) {
func TestResolveAgainstOrigin_MatchByName_SpockNodeName(t *testing.T) {
task := &TableDiffTask{
AgainstOrigin: "n1",
NodeOriginNames: map[string]string{"3": "n1", "4": "n2"},
NodeOriginNames: map[string]map[string]string{"node1": {"3": "n1", "4": "n2"}},
}
if err := task.resolveAgainstOrigin(); err != nil {
t.Fatalf("unexpected error: %v", err)
Expand All @@ -64,10 +64,12 @@ func TestResolveAgainstOrigin_MatchByName_SpockNodeName(t *testing.T) {
}

func TestResolveAgainstOrigin_MatchByName_SubscriptionName(t *testing.T) {
// Native PG: NodeOriginNames maps roident -> subscription name
// Native PG: NodeOriginNames maps roident -> subscription name, per node
task := &TableDiffTask{
AgainstOrigin: "sub_n1_to_n2",
NodeOriginNames: map[string]string{"5": "sub_n1_to_n2", "6": "sub_n3_to_n2"},
AgainstOrigin: "sub_n1_to_n2",
NodeOriginNames: map[string]map[string]string{
"node1": {"5": "sub_n1_to_n2", "6": "sub_n3_to_n2"},
},
}
if err := task.resolveAgainstOrigin(); err != nil {
t.Fatalf("unexpected error: %v", err)
Expand All @@ -80,7 +82,7 @@ func TestResolveAgainstOrigin_MatchByName_SubscriptionName(t *testing.T) {
func TestResolveAgainstOrigin_NoMatch(t *testing.T) {
task := &TableDiffTask{
AgainstOrigin: "nonexistent",
NodeOriginNames: map[string]string{"3": "n1", "4": "n2"},
NodeOriginNames: map[string]map[string]string{"node1": {"3": "n1", "4": "n2"}},
}
err := task.resolveAgainstOrigin()
if err == nil {
Expand Down Expand Up @@ -133,3 +135,45 @@ func TestBuildEffectiveFilter_Empty(t *testing.T) {
t.Fatalf("expected empty filter, got: %s", filter)
}
}

func TestWithSpockMetadata_PerNodeTranslation(t *testing.T) {
// Simulate native PG: same roident "1" maps to different names on different nodes
task := &TableDiffTask{
NodeOriginNames: map[string]map[string]string{
"n1": {"1": "sub_n3_to_n1"},
"n2": {"1": "sub_n3_to_n2"},
},
}

row1 := map[string]any{"node_origin": "1", "id": 1}
result1 := task.withSpockMetadata(row1, "n1")
meta1 := result1["_spock_metadata_"].(map[string]any)
if meta1["node_origin"] != "sub_n3_to_n1" {
t.Fatalf("n1 row: expected origin sub_n3_to_n1, got %v", meta1["node_origin"])
}

row2 := map[string]any{"node_origin": "1", "id": 2}
result2 := task.withSpockMetadata(row2, "n2")
meta2 := result2["_spock_metadata_"].(map[string]any)
if meta2["node_origin"] != "sub_n3_to_n2" {
t.Fatalf("n2 row: expected origin sub_n3_to_n2, got %v", meta2["node_origin"])
}
}

func TestFlatNodeOriginNames_MergesAllNodes(t *testing.T) {
task := &TableDiffTask{
NodeOriginNames: map[string]map[string]string{
"n1": {"1": "sub_n3_to_n1"},
"n2": {"1": "sub_n3_to_n2", "2": "sub_n4_to_n2"},
},
}
flat := task.flatNodeOriginNames()
// roident "2" should always be present
if flat["2"] != "sub_n4_to_n2" {
t.Fatalf("expected flat[2]=sub_n4_to_n2, got %q", flat["2"])
}
// roident "1" will be one of the two — either is valid for flattened lookup
if flat["1"] != "sub_n3_to_n1" && flat["1"] != "sub_n3_to_n2" {
t.Fatalf("expected flat[1] to be one of the subscription names, got %q", flat["1"])
}
}
4 changes: 2 additions & 2 deletions internal/consistency/diff/table_rerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,12 +374,12 @@ func (t *TableDiffTask) reCompareDiffs(fetchedRowsByNode map[string]map[string]t
persistentDiffCount++
if nowOnNode1 {
rowAsMap := utils.OrderedMapToMap(newRow1)
rowWithMeta := t.withSpockMetadata(rowAsMap)
rowWithMeta := t.withSpockMetadata(rowAsMap, node1)
newDiffsForPair.Rows[node1] = append(newDiffsForPair.Rows[node1], utils.MapToOrderedMap(rowWithMeta, t.Cols))
}
if nowOnNode2 {
rowAsMap := utils.OrderedMapToMap(newRow2)
rowWithMeta := t.withSpockMetadata(rowAsMap)
rowWithMeta := t.withSpockMetadata(rowAsMap, node2)
newDiffsForPair.Rows[node2] = append(newDiffsForPair.Rows[node2], utils.MapToOrderedMap(rowWithMeta, t.Cols))
}
}
Expand Down
18 changes: 11 additions & 7 deletions internal/consistency/mtree/merkle.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ type MerkleTreeTask struct {
diffMutex sync.Mutex
diffRowKeySets map[string]map[string]map[string]struct{}
StartTime time.Time
NodeOriginNames map[string]string
NodeOriginNames map[string]map[string]string

Ctx context.Context
}
Expand Down Expand Up @@ -510,8 +510,14 @@ func (m *MerkleTreeTask) loadNodeOriginNames() error {
return nil
}

m.NodeOriginNames = make(map[string]map[string]string)

var lastErr error
for _, nodeInfo := range m.ClusterNodes {
nodeName, _ := nodeInfo["Name"].(string)
if nodeName == "" {
continue
}
pool, err := auth.GetClusterNodeConnection(m.Ctx, nodeInfo, m.connOpts())
if err != nil {
lastErr = err
Expand All @@ -523,15 +529,13 @@ func (m *MerkleTreeTask) loadNodeOriginNames() error {
lastErr = err
continue
}
m.NodeOriginNames = names
return nil
m.NodeOriginNames[nodeName] = names
}

m.NodeOriginNames = make(map[string]string)
if lastErr != nil {
if len(m.NodeOriginNames) == 0 && lastErr != nil {
return lastErr
}
return fmt.Errorf("no nodes available to load node origin names")
return nil
}

func (m *MerkleTreeTask) appendDiffs(nodePairKey string, work CompareRangesWorkItem, pr1, pr2 []types.OrderedMap) error {
Expand Down Expand Up @@ -603,7 +607,7 @@ func (m *MerkleTreeTask) addRowToDiff(nodePairKey, nodeName string, row types.Or
}

rowMap := utils.OrderedMapToMap(row)
rowMap["node_origin"] = utils.TranslateNodeOrigin(rowMap["node_origin"], m.NodeOriginNames)
rowMap["node_origin"] = utils.TranslateNodeOrigin(rowMap["node_origin"], m.NodeOriginNames[nodeName])
rowWithMeta := utils.AddSpockMetadata(rowMap)
orderedRow := utils.MapToOrderedMap(rowWithMeta, m.Cols)

Expand Down
Loading