From 85fe84d2f817f1836c4b13fe0a890e5f93a40b92 Mon Sep 17 00:00:00 2001 From: xianjingfeng Date: Wed, 25 Mar 2026 15:19:46 +0800 Subject: [PATCH 1/3] Trigger flush when there are too many blocks in shuffle buffer --- .../uniffle/server/ShuffleServerConf.java | 15 ++ .../uniffle/server/ShuffleTaskManager.java | 3 + .../server/buffer/ShuffleBufferManager.java | 162 ++++++++++++++---- .../buffer/ShuffleBufferManagerTest.java | 59 +++++++ 4 files changed, 203 insertions(+), 36 deletions(-) diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java index 114160c7a6..3780c406b8 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java @@ -40,6 +40,12 @@ public class ShuffleServerConf extends RssBaseConf { .defaultValue(-1L) .withDescription("Max memory of buffer manager for shuffle server"); + public static final ConfigOption SERVER_BUFFER_BLOCK_COUNT_CAPACITY = + ConfigOptions.key("rss.server.buffer.blockCount.capacity") + .longType() + .defaultValue(0L) + .withDescription("Max block count of buffer manager for shuffle server"); + public static final ConfigOption SERVER_BUFFER_CAPACITY_RATIO = ConfigOptions.key("rss.server.buffer.capacity.ratio") .doubleType() @@ -511,6 +517,15 @@ public class ShuffleServerConf extends RssBaseConf { "Threshold when flushing shuffle data to persistent storage, recommend value would be 256K, " + "512K, or even 1M"); + public static final ConfigOption SERVER_SHUFFLE_FLUSH_BLOCKCOUNT_THRESHOLD = + ConfigOptions.key("rss.server.shuffle.flush.blockCountThreshold") + .longType() + .checkValue( + ConfigUtils.NON_NEGATIVE_LONG_VALIDATOR, "Block count threshold must be non-negative") + .defaultValue(0L) + .withDescription( + "Threshold for flushing shuffle data to persistent storage based on block count"); + public static final ConfigOption SERVER_SHUFFLE_BUFFER_LAB_ENABLE = ConfigOptions.key("rss.server.buffer.lab.enabled") .booleanType() diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java index 5b1ed01dee..73ea943052 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java @@ -1048,6 +1048,9 @@ private void triggerFlush() { if (this.shuffleBufferManager.needToFlush()) { this.shuffleBufferManager.flushIfNecessary(); } + if (this.shuffleBufferManager.ifTooManyBlock()) { + this.shuffleBufferManager.flushIfTooManyBlock(); + } } public Map getShuffleTaskInfos() { diff --git a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java index dee8418984..41135f2fea 100644 --- a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java +++ b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java @@ -77,9 +77,12 @@ public class ShuffleBufferManager { private ShuffleTaskManager shuffleTaskManager; private final ShuffleFlushManager shuffleFlushManager; private long capacity; + private long blockCountCapacity; private long readCapacity; private long highWaterMark; private long lowWaterMark; + private long blockCountHighWaterMark; + private long blockCountLowWaterMark; private final boolean bufferFlushWhenCachingData; private boolean bufferFlushEnabled; private long bufferFlushThreshold; @@ -88,6 +91,7 @@ public class ShuffleBufferManager { // kept in memory to // reduce small I/Os to persistent storage, especially for local HDDs. private long shuffleFlushThreshold; + private long shuffleFlushBlockCountThreshold; // Huge partition vars private ReconfigurableConfManager.Reconfigurable hugePartitionSizeThresholdRef; private ReconfigurableConfManager.Reconfigurable hugePartitionSizeHardLimitRef; @@ -97,12 +101,15 @@ public class ShuffleBufferManager { protected AtomicLong inFlushSize = new AtomicLong(0L); protected AtomicLong usedMemory = new AtomicLong(0L); private AtomicLong readDataMemory = new AtomicLong(0L); - private final AtomicLong inMemoryBlockCount = new AtomicLong(0); + protected final AtomicLong inMemoryBlockCount = new AtomicLong(0); private final AtomicLong inFlushBlockCount = new AtomicLong(0); // appId -> shuffleId -> partitionId -> ShuffleBuffer to avoid too many appId protected Map>> bufferPool; // appId -> shuffleId -> shuffle size in buffer protected Map> shuffleSizeMap = JavaUtils.newConcurrentMap(); + // appId -> shuffleId -> shuffle block count in buffer + protected Map> shuffleBlockCountMap = + JavaUtils.newConcurrentMap(); private final boolean appBlockSizeMetricEnabled; private final ReentrantLock globalFlushLock = new ReentrantLock(); @@ -144,6 +151,18 @@ public ShuffleBufferManager( (capacity / 100.0 * conf.get(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE)); + this.blockCountCapacity = + conf.getSizeAsBytes(ShuffleServerConf.SERVER_BUFFER_BLOCK_COUNT_CAPACITY); + this.blockCountHighWaterMark = + (long) + (blockCountCapacity + / 100.0 + * conf.get(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_HIGHWATERMARK_PERCENTAGE)); + this.blockCountLowWaterMark = + (long) + (blockCountCapacity + / 100.0 + * conf.get(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE)); this.bufferFlushWhenCachingData = conf.getBoolean(ShuffleServerConf.BUFFER_FLUSH_TRIGGERED_WHEN_CACHEING_DATA); this.bufferFlushEnabled = conf.getBoolean(ShuffleServerConf.SINGLE_BUFFER_FLUSH_ENABLED); @@ -153,6 +172,8 @@ public ShuffleBufferManager( conf.getInteger(ShuffleServerConf.SINGLE_BUFFER_FLUSH_BLOCKS_NUM_THRESHOLD); this.shuffleFlushThreshold = conf.getSizeAsBytes(ShuffleServerConf.SERVER_SHUFFLE_FLUSH_THRESHOLD); + this.shuffleFlushBlockCountThreshold = + conf.getSizeAsBytes(ShuffleServerConf.SERVER_SHUFFLE_FLUSH_BLOCKCOUNT_THRESHOLD); this.hugePartitionSizeThresholdRef = conf.getReconfigurableConf(ShuffleServerConf.HUGE_PARTITION_SIZE_THRESHOLD); this.hugePartitionSizeHardLimitRef = @@ -318,6 +339,8 @@ public StatusCode cacheShuffleData( shuffleId, spd.getPartitionId()); updateShuffleSize(appId, shuffleId, size); + updateShuffleBlockCount( + appId, shuffleId, spd.getBlockList().length - spd.getDuplicateBlockCount()); flushSingleBufferIfNecessary( buffer, appId, @@ -328,14 +351,24 @@ public StatusCode cacheShuffleData( if (bufferFlushWhenCachingData && needToFlush()) { flushIfNecessary(); } + if (bufferFlushWhenCachingData && ifTooManyBlock()) { + flushIfTooManyBlock(); + } return StatusCode.SUCCESS; } private void updateShuffleSize(String appId, int shuffleId, long size) { - shuffleSizeMap.computeIfAbsent(appId, key -> JavaUtils.newConcurrentMap()); - Map shuffleIdToSize = shuffleSizeMap.get(appId); - shuffleIdToSize.computeIfAbsent(shuffleId, key -> new AtomicLong(0)); - shuffleIdToSize.get(shuffleId).addAndGet(size); + Map shuffleIdToSize = + shuffleSizeMap.computeIfAbsent(appId, key -> JavaUtils.newConcurrentMap()); + shuffleIdToSize.computeIfAbsent(shuffleId, key -> new AtomicLong(0)).addAndGet(size); + } + + private void updateShuffleBlockCount(String appId, int shuffleId, long blockCount) { + Map shuffleIdToBlockCount = + shuffleBlockCountMap.computeIfAbsent(appId, key -> JavaUtils.newConcurrentMap()); + shuffleIdToBlockCount + .computeIfAbsent(shuffleId, key -> new AtomicLong(0)) + .addAndGet(blockCount); } public Entry, ShuffleBuffer> getShuffleBufferEntry( @@ -435,8 +468,41 @@ public void flushIfNecessary() { usedMemory.get(), preAllocatedSize.get(), inFlushSize.get()); - Map> pickedShuffle = pickFlushedShuffle(); - flush(pickedShuffle); + Map> pickedShuffle = + pickFlushedShuffle(highWaterMark, lowWaterMark, false); + flush(pickedShuffle, highWaterMark, lowWaterMark, false); + } + } catch (InterruptedException e) { + LOG.warn("Ignore the InterruptedException which should be caused by internal killed"); + } finally { + if (lockAcquired) { + globalFlushLock.unlock(); + } + } + } + + public boolean ifTooManyBlock() { + return blockCountHighWaterMark > 0 + && inMemoryBlockCount.get() - inFlushBlockCount.get() > blockCountHighWaterMark; + } + + public void flushIfTooManyBlock() { + boolean lockAcquired = false; + try { + lockAcquired = globalFlushLock.tryLock(flushTryLockTimeout, TimeUnit.MILLISECONDS); + if (!lockAcquired) { + return; + } + if (ifTooManyBlock()) { + // todo: add a metric here to track how many times flush occurs. + LOG.info( + "Start to flush with inMemoryBlockCount[{}], blockCountHighWaterMark[{}], inFlushBlockCount[{}]", + inMemoryBlockCount.get(), + blockCountHighWaterMark, + inFlushBlockCount.get()); + Map> pickedShuffle = + pickFlushedShuffle(blockCountHighWaterMark, blockCountLowWaterMark, true); + flush(pickedShuffle, blockCountHighWaterMark, blockCountLowWaterMark, true); } } catch (InterruptedException e) { LOG.warn("Ignore the InterruptedException which should be caused by internal killed"); @@ -510,6 +576,7 @@ protected boolean flushBuffer( addInFlushBlockCount(-blockCount); }); updateShuffleSize(appId, shuffleId, -event.getEncodedLength()); + updateShuffleBlockCount(appId, shuffleId, -event.getBlockCount()); inFlushSize.addAndGet(event.getEncodedLength()); if (isHugePartition) { event.markOwnedByHugePartition(); @@ -532,6 +599,7 @@ public void removeBuffer(String appId) { } removeBufferByShuffleId(appId, shuffleIdToBuffers.keySet()); shuffleSizeMap.remove(appId); + shuffleBlockCountMap.remove(appId); bufferPool.remove(appId); if (appBlockSizeMetricEnabled) { ShuffleServerMetrics.appHistogramWriteBlockSize.remove(appId); @@ -686,9 +754,14 @@ public void releaseReadMemory(long size) { // Flush the buffer with required map which is shuffleId>. // If the total size of the shuffles picked is bigger than the expected flush size, // it will just flush a part of partitions. - private synchronized void flush(Map> requiredFlush) { - long pickedFlushSize = 0L; - long expectedFlushSize = highWaterMark - lowWaterMark; + private synchronized void flush( + Map> requiredFlush, + long highWaterMark, + long lowWaterMark, + boolean basedOnBlockCount) { + long pickedFlushValue = 0L; + long expectedFlushValue = highWaterMark - lowWaterMark; + String metricType = basedOnBlockCount ? "blocks" : "bytes"; for (Map.Entry>> appIdToBuffers : bufferPool.entrySet()) { String appId = appIdToBuffers.getKey(); @@ -712,7 +785,10 @@ private synchronized void flush(Map> requiredFlush) { shuffleIdToBuffers.getValue().asMapOfRanges().entrySet()) { Range range = rangeEntry.getKey(); ShuffleBuffer shuffleBuffer = rangeEntry.getValue(); - long bufferEncodedLength = shuffleBuffer.getEncodedLength(); + long bufferValue = + basedOnBlockCount + ? shuffleBuffer.getBlockCount() + : shuffleBuffer.getEncodedLength(); boolean success = flushBuffer( shuffleBuffer, @@ -723,10 +799,11 @@ private synchronized void flush(Map> requiredFlush) { HugePartitionUtils.isHugePartition( shuffleTaskManager, appId, shuffleId, range.lowerEndpoint())); if (success) { - pickedFlushSize += bufferEncodedLength; + pickedFlushValue += bufferValue; } - if (pickedFlushSize > expectedFlushSize) { - LOG.info("Already picked enough buffers to flush {} bytes", pickedFlushSize); + if (pickedFlushValue > expectedFlushValue) { + LOG.info( + "Already picked enough buffers to flush {} {}", pickedFlushValue, metricType); return; } } @@ -816,10 +893,17 @@ public long getPreAllocatedSize() { return preAllocatedSize.get(); } - // sort for shuffle according to data size, then pick properly data which will be flushed - private Map> pickFlushedShuffle() { + // sort for shuffle according to data size or block count, then pick properly data which will be + // flushed + private Map> pickFlushedShuffle( + long highWaterMark, long lowWaterMark, boolean basedOnBlockCount) { + Map> shuffleMap = + basedOnBlockCount ? shuffleBlockCountMap : shuffleSizeMap; + long threshold = basedOnBlockCount ? shuffleFlushBlockCountThreshold : shuffleFlushThreshold; + String metricType = basedOnBlockCount ? "blocks" : "bytes"; + // create list for sort - List> sizeList = generateSizeList(); + List> sizeList = generateShuffleList(shuffleMap); sizeList.sort( (entry1, entry2) -> { if (entry1 == null && entry2 == null) { @@ -840,36 +924,37 @@ private Map> pickFlushedShuffle() { }); Map> pickedShuffle = Maps.newHashMap(); - // The algorithm here is to flush data size > highWaterMark - lowWaterMark + // The algorithm here is to flush data size/blockCount > highWaterMark - lowWaterMark // the remaining data in buffer maybe more than lowWaterMark // because shuffle server is still receiving data, but it should be ok - long expectedFlushSize = highWaterMark - lowWaterMark; - long atLeastFlushSizeIgnoreThreshold = expectedFlushSize >>> 1; - long pickedFlushSize = 0L; + long expectedFlushValue = highWaterMark - lowWaterMark; + long atLeastFlushSizeIgnoreThreshold = expectedFlushValue >>> 1; + long pickedValue = 0L; int printIndex = 0; int printIgnoreIndex = 0; int printMax = 10; for (Map.Entry entry : sizeList) { - long size = entry.getValue().get(); + long value = entry.getValue().get(); String appIdShuffleIdKey = entry.getKey(); - if (size > this.shuffleFlushThreshold || pickedFlushSize <= atLeastFlushSizeIgnoreThreshold) { - pickedFlushSize += size; + if (value > threshold || pickedValue <= atLeastFlushSizeIgnoreThreshold) { + pickedValue += value; addPickedShuffle(appIdShuffleIdKey, pickedShuffle); // print detail picked info if (printIndex < printMax) { - LOG.info("Pick application_shuffleId[{}] with {} bytes", appIdShuffleIdKey, size); + LOG.info( + "Pick application_shuffleId[{}] with {} {}", appIdShuffleIdKey, value, metricType); printIndex++; } - if (pickedFlushSize > expectedFlushSize) { - LOG.info("Finish flush pick with {} bytes", pickedFlushSize); + if (pickedValue > expectedFlushValue) { + LOG.info("Finish flush pick with {} {}", pickedValue, metricType); break; } } else { - // since shuffle size is ordered by size desc, we can skip process more shuffle data once - // some shuffle's size - // is less than threshold + // since shuffle value is ordered by value desc, we can skip process more shuffle data once + // some shuffle's value is less than threshold if (printIgnoreIndex < printMax) { - LOG.info("Ignore application_shuffleId[{}] with {} bytes", appIdShuffleIdKey, size); + LOG.info( + "Ignore application_shuffleId[{}] with {} {}", appIdShuffleIdKey, value, metricType); printIgnoreIndex++; } else { break; @@ -879,16 +964,17 @@ private Map> pickFlushedShuffle() { return pickedShuffle; } - private List> generateSizeList() { - Map sizeMap = Maps.newHashMap(); - for (Map.Entry> appEntry : shuffleSizeMap.entrySet()) { + private List> generateShuffleList( + Map> shuffleMap) { + Map result = Maps.newHashMap(); + for (Map.Entry> appEntry : shuffleMap.entrySet()) { String appId = appEntry.getKey(); for (Map.Entry shuffleEntry : appEntry.getValue().entrySet()) { Integer shuffleId = shuffleEntry.getKey(); - sizeMap.put(RssUtils.generateShuffleKey(appId, shuffleId), shuffleEntry.getValue()); + result.put(RssUtils.generateShuffleKey(appId, shuffleId), shuffleEntry.getValue()); } } - return Lists.newArrayList(sizeMap.entrySet()); + return Lists.newArrayList(result.entrySet()); } private void addPickedShuffle(String shuffleIdKey, Map> pickedShuffle) { @@ -907,7 +993,11 @@ public void removeBufferByShuffleId(String appId, Collection shuffleIds } Map shuffleIdToSizeMap = shuffleSizeMap.get(appId); + Map shuffleIdToBlockCountMap = shuffleBlockCountMap.get(appId); for (int shuffleId : shuffleIds) { + if (shuffleIdToBlockCountMap != null) { + shuffleIdToBlockCountMap.remove(shuffleId); + } RangeMap bufferRangeMap = shuffleIdToBuffers.remove(shuffleId); if (bufferRangeMap == null) { continue; diff --git a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java index 12b901a655..0c05c26367 100644 --- a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java +++ b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java @@ -894,4 +894,63 @@ public void cacheDuplicateBlockTest() { assertEquals(StatusCode.SUCCESS, sc); assertEquals(48, shuffleBufferManager.getUsedMemory()); } + + @Test + public void flushIfTooManyBlockTest(@TempDir File tmpDir) throws Exception { + ShuffleServerConf shuffleConf = new ShuffleServerConf(); + File dataDir = new File(tmpDir, "data"); + shuffleConf.setString(ShuffleServerConf.RSS_STORAGE_TYPE.key(), StorageType.LOCALFILE.name()); + shuffleConf.set( + ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList(dataDir.getAbsolutePath())); + shuffleConf.set(ShuffleServerConf.SERVER_BUFFER_CAPACITY, 5000L); + shuffleConf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE, 40.0); + shuffleConf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_HIGHWATERMARK_PERCENTAGE, 80.0); + shuffleConf.setLong(ShuffleServerConf.DISK_CAPACITY, 1024L * 1024L * 1024L); + shuffleConf.setLong(ShuffleServerConf.SERVER_TRIGGER_FLUSH_CHECK_INTERVAL, 500L); + // Configure block count capacity and threshold + // blockCountHighWaterMark = 10 * 80% = 8, so 9 blocks will trigger flush + shuffleConf.setLong(ShuffleServerConf.SERVER_BUFFER_BLOCK_COUNT_CAPACITY, 10L); + shuffleConf.set(ShuffleServerConf.SERVER_SHUFFLE_FLUSH_BLOCKCOUNT_THRESHOLD, 2L); + + ShuffleServer mockShuffleServer = mock(ShuffleServer.class); + StorageManager storageManager = + StorageManagerFactory.getInstance().createStorageManager(shuffleConf); + ShuffleFlushManager shuffleFlushManager = + new ShuffleFlushManager(shuffleConf, mockShuffleServer, storageManager); + shuffleBufferManager = new ShuffleBufferManager(shuffleConf, shuffleFlushManager, false); + ShuffleTaskManager shuffleTaskManager = + new ShuffleTaskManager( + shuffleConf, shuffleFlushManager, shuffleBufferManager, storageManager); + + when(mockShuffleServer.getShuffleFlushManager()).thenReturn(shuffleFlushManager); + when(mockShuffleServer.getShuffleBufferManager()).thenReturn(shuffleBufferManager); + when(mockShuffleServer.getShuffleTaskManager()).thenReturn(shuffleTaskManager); + + String appId = "flushIfTooManyBlockTest"; + int shuffleId = 1; + + shuffleTaskManager.registerShuffle( + appId, + shuffleId, + Arrays.asList(new PartitionRange(0, 0), new PartitionRange(1, 1)), + new RemoteStorageInfo(""), + ""); + + // Cache 9 blocks total to exceed blockCountHighWaterMark (8) + // Each cacheShuffleData call caches 1 block + for (int i = 0; i < 9; i++) { + int partitionId = i % 2; + ShufflePartitionedData data = createData(partitionId, 16); + shuffleTaskManager.cacheShuffleData(appId, shuffleId, false, data); + shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, partitionId, data); + } + + // Wait for flush to complete + Awaitility.await() + .atMost(Duration.ofSeconds(10)) + .until(() -> !shuffleBufferManager.ifTooManyBlock()); + Awaitility.await() + .atMost(Duration.ofSeconds(100)) + .until(() -> shuffleBufferManager.inMemoryBlockCount.get() == 4); + } } From b832ccfab752fd55d3eedfa0510ccc9e31ae364d Mon Sep 17 00:00:00 2001 From: xianjingfeng Date: Wed, 25 Mar 2026 15:33:10 +0800 Subject: [PATCH 2/3] nit --- .../apache/uniffle/server/buffer/ShuffleBufferManagerTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java index 0c05c26367..6cdccb0003 100644 --- a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java +++ b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java @@ -950,7 +950,8 @@ public void flushIfTooManyBlockTest(@TempDir File tmpDir) throws Exception { .atMost(Duration.ofSeconds(10)) .until(() -> !shuffleBufferManager.ifTooManyBlock()); Awaitility.await() - .atMost(Duration.ofSeconds(100)) + .atMost(Duration.ofSeconds(10)) .until(() -> shuffleBufferManager.inMemoryBlockCount.get() == 4); + } } From f3f4b422e1405b12a30d7bc5314601d3e4999e1b Mon Sep 17 00:00:00 2001 From: xianjingfeng Date: Wed, 25 Mar 2026 15:40:54 +0800 Subject: [PATCH 3/3] nit --- .../apache/uniffle/server/buffer/ShuffleBufferManagerTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java index 6cdccb0003..55d07524ce 100644 --- a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java +++ b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java @@ -952,6 +952,5 @@ public void flushIfTooManyBlockTest(@TempDir File tmpDir) throws Exception { Awaitility.await() .atMost(Duration.ofSeconds(10)) .until(() -> shuffleBufferManager.inMemoryBlockCount.get() == 4); - } }