From 3d2b917965450c8204e2597c2fd01e9daf003a52 Mon Sep 17 00:00:00 2001 From: Junfan Zhang Date: Thu, 26 Mar 2026 11:06:27 +0800 Subject: [PATCH 1/5] fix(spark): potential hang on overlapping decompression with skipped segments --- .../client/impl/DecompressionWorker.java | 20 +++++++++ .../client/impl/ShuffleReadClientImpl.java | 11 ++++- .../impl/ShuffleReadClientImplTest.java | 43 +++++++++++++++++++ 3 files changed, 73 insertions(+), 1 deletion(-) diff --git a/client/src/main/java/org/apache/uniffle/client/impl/DecompressionWorker.java b/client/src/main/java/org/apache/uniffle/client/impl/DecompressionWorker.java index fbb81528d3..6c3c7c37fd 100644 --- a/client/src/main/java/org/apache/uniffle/client/impl/DecompressionWorker.java +++ b/client/src/main/java/org/apache/uniffle/client/impl/DecompressionWorker.java @@ -154,10 +154,30 @@ public void add(int batchIndex, ShuffleDataResult shuffleDataResult) { } public DecompressedShuffleBlock get(int batchIndex, int segmentIndex) { + // guardedly safe to remove the previous batches if exist since the upstream will fetch the + // segments in order + for (int i = 0; i < batchIndex; i++) { + ConcurrentHashMap prevBlocks = tasks.remove(i); + if (prevBlocks != null) { + for (DecompressedShuffleBlock block : prevBlocks.values()) { + segmentPermits.ifPresent(x -> x.release()); + } + } + } + ConcurrentHashMap blocks = tasks.get(batchIndex); if (blocks == null) { return null; } + + // guardedly safe to remove the previous segments if exist since the upstream will fetch the + // segments in order + for (int i = 0; i < segmentIndex; i++) { + if (blocks.remove(i) != null) { + segmentPermits.ifPresent(x -> x.release()); + } + } + DecompressedShuffleBlock block = blocks.remove(segmentIndex); // simplify the memory statistic logic here, just decrease the memory used when the block is // fetched, this is effective due to the upstream will use single-thread to get and release the diff --git a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java index 3bddd3dc8b..a1fbb24799 100644 --- a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java +++ b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java @@ -314,7 +314,16 @@ public ShuffleBlock readShuffleBlockData() { // mark block as processed processedBlockIds.add(bs.getBlockId()); pendingBlockIds.removeLong(bs.getBlockId()); - // update the segment index to skip the unnecessary block in overlapping decompression mode + + // update the segment index to skip the unnecessary block in overlapping decompression mode. + // In overlapping decompression mode, decompression tasks for the whole batch have already + // been submitted. If we skip a segment without removing the corresponding handler, the + // backpressure permits may never be released, which can block subsequent decompression. + // if (decompressionWorker != null) { + // decompressionWorker.get(batchIndex - 1, segmentIndex++); + // } else { + // segmentIndex += 1; + // } segmentIndex += 1; } diff --git a/client/src/test/java/org/apache/uniffle/client/impl/ShuffleReadClientImplTest.java b/client/src/test/java/org/apache/uniffle/client/impl/ShuffleReadClientImplTest.java index 18b73a478e..9b6189b7ad 100644 --- a/client/src/test/java/org/apache/uniffle/client/impl/ShuffleReadClientImplTest.java +++ b/client/src/test/java/org/apache/uniffle/client/impl/ShuffleReadClientImplTest.java @@ -52,6 +52,7 @@ import org.apache.uniffle.storage.handler.impl.HadoopShuffleWriteHandler; import org.apache.uniffle.storage.util.StorageType; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNull; @@ -769,6 +770,48 @@ public void readTest16(Supplier builderS readClient.close(); } + @ParameterizedTest + @MethodSource("clientBuilderProvider") + public void readTestSkipBlocksWithBackpressureDoesNotHang( + Supplier builderSupplier) throws Exception { + // This test is meaningful only when overlapping decompression is enabled. + // For non-overlapping mode, it should still pass and act as a regression guard. + String basePath = uniq(HDFS_URI + "clientReadTestSkipBlocksWithBackpressureDoesNotHang"); + HadoopShuffleWriteHandler writeHandler = + new HadoopShuffleWriteHandler("appId", 0, 1, 1, basePath, ssi1.getId(), conf); + + Map expectedData = Maps.newHashMap(); + Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf(); + + // Expect only taskAttemptId=0 blocks; taskAttemptId=2 blocks will be skipped by the reader. + Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0); + + // Write skipped blocks first to increase the chance of exhausting permits if permits are not + // released when skipping. + writeTestData(writeHandler, 20, 30, 1, 2, Maps.newHashMap(), blockIdBitmap); + writeTestData(writeHandler, 5, 30, 1, 0, expectedData, blockIdBitmap); + + RssConf rssConf = new RssConf(); + // Provide required base configs to avoid reader treating this as "prod mode" with empty values. + rssConf.set(RssClientConf.RSS_STORAGE_TYPE, StorageType.HDFS.name()); + rssConf.setInteger(RssClientConf.RSS_READ_OVERLAPPING_DECOMPRESSION_FETCH_SECONDS_THRESHOLD, 1); + rssConf.setInteger(RssClientConf.RSS_READ_OVERLAPPING_DECOMPRESSION_MAX_CONCURRENT_SEGMENTS, 1); + + ShuffleReadClientImpl readClient = + builderSupplier + .get() + .partitionId(1) + .basePath(basePath) + .blockIdBitmap(blockIdBitmap) + .taskIdBitmap(taskIdBitmap) + .rssConf(rssConf) + .build(); + + assertDoesNotThrow(() -> TestUtils.validateResult(readClient, expectedData)); + readClient.checkProcessedBlockIds(); + readClient.close(); + } + private void writeTestData( HadoopShuffleWriteHandler writeHandler, int num, From 3814e672373b72f1d39c8ae1bb133abd5840ebcd Mon Sep 17 00:00:00 2001 From: Junfan Zhang Date: Thu, 26 Mar 2026 11:09:16 +0800 Subject: [PATCH 2/5] ifx --- .../uniffle/client/impl/ShuffleReadClientImpl.java | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java index a1fbb24799..1690a49ec4 100644 --- a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java +++ b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java @@ -319,12 +319,11 @@ public ShuffleBlock readShuffleBlockData() { // In overlapping decompression mode, decompression tasks for the whole batch have already // been submitted. If we skip a segment without removing the corresponding handler, the // backpressure permits may never be released, which can block subsequent decompression. - // if (decompressionWorker != null) { - // decompressionWorker.get(batchIndex - 1, segmentIndex++); - // } else { - // segmentIndex += 1; - // } - segmentIndex += 1; + if (decompressionWorker != null) { + decompressionWorker.get(batchIndex - 1, segmentIndex++); + } else { + segmentIndex += 1; + } } if (bs != null) { From aa407497f07f1284b6a1524fdf13302f1ee38bcf Mon Sep 17 00:00:00 2001 From: Junfan Zhang Date: Thu, 26 Mar 2026 15:54:48 +0800 Subject: [PATCH 3/5] style fix --- .../uniffle/client/impl/ShuffleReadClientImplTest.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/client/src/test/java/org/apache/uniffle/client/impl/ShuffleReadClientImplTest.java b/client/src/test/java/org/apache/uniffle/client/impl/ShuffleReadClientImplTest.java index 9b6189b7ad..a37f0193b0 100644 --- a/client/src/test/java/org/apache/uniffle/client/impl/ShuffleReadClientImplTest.java +++ b/client/src/test/java/org/apache/uniffle/client/impl/ShuffleReadClientImplTest.java @@ -783,9 +783,6 @@ public void readTestSkipBlocksWithBackpressureDoesNotHang( Map expectedData = Maps.newHashMap(); Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf(); - // Expect only taskAttemptId=0 blocks; taskAttemptId=2 blocks will be skipped by the reader. - Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0); - // Write skipped blocks first to increase the chance of exhausting permits if permits are not // released when skipping. writeTestData(writeHandler, 20, 30, 1, 2, Maps.newHashMap(), blockIdBitmap); @@ -797,6 +794,8 @@ public void readTestSkipBlocksWithBackpressureDoesNotHang( rssConf.setInteger(RssClientConf.RSS_READ_OVERLAPPING_DECOMPRESSION_FETCH_SECONDS_THRESHOLD, 1); rssConf.setInteger(RssClientConf.RSS_READ_OVERLAPPING_DECOMPRESSION_MAX_CONCURRENT_SEGMENTS, 1); + // Expect only taskAttemptId=0 blocks; taskAttemptId=2 blocks will be skipped by the reader. + Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0); ShuffleReadClientImpl readClient = builderSupplier .get() From 75c54622847f5de9f55dda68810952c852994c08 Mon Sep 17 00:00:00 2001 From: Junfan Zhang Date: Thu, 26 Mar 2026 16:30:26 +0800 Subject: [PATCH 4/5] fix --- .../apache/uniffle/client/impl/DecompressionWorkerTest.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/client/src/test/java/org/apache/uniffle/client/impl/DecompressionWorkerTest.java b/client/src/test/java/org/apache/uniffle/client/impl/DecompressionWorkerTest.java index 92b33b2181..73036389c3 100644 --- a/client/src/test/java/org/apache/uniffle/client/impl/DecompressionWorkerTest.java +++ b/client/src/test/java/org/apache/uniffle/client/impl/DecompressionWorkerTest.java @@ -35,6 +35,7 @@ import static org.apache.uniffle.common.config.RssClientConf.COMPRESSION_TYPE; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; public class DecompressionWorkerTest { @@ -66,7 +67,9 @@ public void testBackpressure() throws Exception { } Thread.sleep(10); worker.get(0, maxSegments).getByteBuffer(); - assertEquals(1024 * maxSegments, worker.getPeekMemoryUsed()); + // Peak memory is a runtime metric and may include one additional segment due to thread timing. + assertTrue(worker.getPeekMemoryUsed() <= 1024L * (maxSegments + 1)); + assertTrue(worker.getPeekMemoryUsed() >= 1024L * maxSegments); assertEquals(maxSegments, worker.getAvailablePermits()); } From 390ea253c5932b940fd7c555993a9b1a24667a17 Mon Sep 17 00:00:00 2001 From: Junfan Zhang Date: Fri, 27 Mar 2026 13:58:06 +0800 Subject: [PATCH 5/5] fix --- .../org/apache/uniffle/client/impl/DecompressionWorker.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/client/src/main/java/org/apache/uniffle/client/impl/DecompressionWorker.java b/client/src/main/java/org/apache/uniffle/client/impl/DecompressionWorker.java index 6c3c7c37fd..a98f9553ec 100644 --- a/client/src/main/java/org/apache/uniffle/client/impl/DecompressionWorker.java +++ b/client/src/main/java/org/apache/uniffle/client/impl/DecompressionWorker.java @@ -159,9 +159,7 @@ public DecompressedShuffleBlock get(int batchIndex, int segmentIndex) { for (int i = 0; i < batchIndex; i++) { ConcurrentHashMap prevBlocks = tasks.remove(i); if (prevBlocks != null) { - for (DecompressedShuffleBlock block : prevBlocks.values()) { - segmentPermits.ifPresent(x -> x.release()); - } + segmentPermits.ifPresent(x -> x.release(prevBlocks.values().size())); } }