Skip to content

[spark] Fix concurrent V2 MERGE INTO data inconsistency#7432

Merged
Zouxxyy merged 1 commit intoapache:masterfrom
kerwin-zk:concurrent-merge
Mar 17, 2026
Merged

[spark] Fix concurrent V2 MERGE INTO data inconsistency#7432
Zouxxyy merged 1 commit intoapache:masterfrom
kerwin-zk:concurrent-merge

Conversation

@kerwin-zk
Copy link
Contributor

Purpose

2026-03-16T07:50:48.3572834Z - Paimon MergeInto: concurrent two merge *** FAILED ***
2026-03-16T07:50:48.3581608Z java.util.concurrent.ExecutionException: org.scalatest.exceptions.TestFailedException:
2026-03-16T07:50:48.3582624Z Results do not match for query:
2026-03-16T07:50:48.3585622Z Timezone: sun.util.calendar.ZoneInfo[id="America/Los_Angeles",offset=-28800000,dstSavings=3600000,useDaylight=true,transitions=185,lastRule=java.util.SimpleTimeZone[id=America/Los_Angeles,offset=-28800000,dstSavings=3600000,useDaylight=true,startYear=0,startMode=3,startMonth=2,startDay=8,startDayOfWeek=1,startTime=7200000,startTimeMode=0,endMode=3,endMonth=10,endDay=1,endDayOfWeek=1,endTime=7200000,endTimeMode=0]]
2026-03-16T07:50:48.3589114Z Timezone Env:
2026-03-16T07:50:48.3589738Z
2026-03-16T07:50:48.3589947Z == Parsed Logical Plan ==
2026-03-16T07:50:48.3590322Z 'Project [unresolvedalias('count(1), None)]
2026-03-16T07:50:48.3591158Z +- 'UnresolvedRelation [tm_t_false_1656877428_V2MergeIntoAppendNonBucketedTableTest], [], false
2026-03-16T07:50:48.3591709Z
2026-03-16T07:50:48.3591830Z == Analyzed Logical Plan ==
2026-03-16T07:50:48.3592144Z count(1): bigint
2026-03-16T07:50:48.3592419Z Aggregate [count(1) AS count(1)#157846L]
2026-03-16T07:50:48.3593011Z +- SubqueryAlias paimon.test.tm_t_false_1656877428_V2MergeIntoAppendNonBucketedTableTest
2026-03-16T07:50:48.3594371Z +- RelationV2[id#157842, b#157843, c#157844] paimon.test.tm_t_false_1656877428_V2MergeIntoAppendNonBucketedTableTest test.tm_t_false_1656877428_V2MergeIntoAppendNonBucketedTableTest
2026-03-16T07:50:48.3595330Z
2026-03-16T07:50:48.3595454Z == Optimized Logical Plan ==
2026-03-16T07:50:48.3595903Z Project [count_star_0#157849L AS count(1)#157845L AS count(1)#157846L]
2026-03-16T07:50:48.3596673Z +- RelationV2[count_star_0#157849L] test.tm_t_false_1656877428_V2MergeIntoAppendNonBucketedTableTest
2026-03-16T07:50:48.3597207Z
2026-03-16T07:50:48.3597317Z == Physical Plan ==
2026-03-16T07:50:48.3597727Z *(1) Project [count_star_0#157849L AS count(1)#157845L AS count(1)#157846L]
2026-03-16T07:50:48.3598238Z +- *(1) LocalTableScan [count_star_0#157849L]
2026-03-16T07:50:48.3598512Z
2026-03-16T07:50:48.3598613Z == Results ==
2026-03-16T07:50:48.3598752Z
2026-03-16T07:50:48.3598847Z == Results ==
2026-03-16T07:50:48.3599149Z !== Correct Answer - 1 == == Spark Answer - 1 ==
2026-03-16T07:50:48.3599584Z !struct<> struct<count(1):bigint>
2026-03-16T07:50:48.3599961Z ![9] [14]
2026-03-16T07:50:48.3600413Z at java.util.concurrent.FutureTask.report(FutureTask.java:122)
2026-03-16T07:50:48.3601165Z at java.util.concurrent.FutureTask.get(FutureTask.java:192)
2026-03-16T07:50:48.3602031Z at org.apache.paimon.spark.sql.MergeIntoAppendTableTest.$anonfun$$init$$19(MergeIntoTableTestBase.scala:889)
2026-03-16T07:50:48.3602957Z at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
2026-03-16T07:50:48.3603783Z at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
2026-03-16T07:50:48.3604682Z at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
2026-03-16T07:50:48.3605452Z at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
2026-03-16T07:50:48.3606165Z at org.apache.spark.sql.test.SQLTestUtilsBase.withTable(SQLTestUtils.scala:306)
2026-03-16T07:50:48.3606929Z at org.apache.spark.sql.test.SQLTestUtilsBase.withTable$(SQLTestUtils.scala:304)
2026-03-16T07:50:48.3607760Z at org.apache.paimon.spark.PaimonSparkTestBase.withTable(PaimonSparkTestBase.scala:44)

Tests

CI

* Override to use the cached dataSplits instead of calling inputSplits (which triggers
* loadSplits()). This prevents a race condition during concurrent MERGE INTO operations.
*/
override def reportDriverMetrics(): Array[CustomTaskMetric] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can modify getInputSplits to use caled splits to prevent cal agiant

Copy link
Contributor

@Zouxxyy Zouxxyy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@Zouxxyy Zouxxyy merged commit 2807a22 into apache:master Mar 17, 2026
11 of 12 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants