Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ public class ShuffleServerConf extends RssBaseConf {
.defaultValue(-1L)
.withDescription("Max memory of buffer manager for shuffle server");

public static final ConfigOption<Long> SERVER_BUFFER_BLOCK_COUNT_CAPACITY =
ConfigOptions.key("rss.server.buffer.blockCount.capacity")
.longType()
.defaultValue(0L)
.withDescription("Max block count of buffer manager for shuffle server");

public static final ConfigOption<Double> SERVER_BUFFER_CAPACITY_RATIO =
ConfigOptions.key("rss.server.buffer.capacity.ratio")
.doubleType()
Expand Down Expand Up @@ -511,6 +517,15 @@ public class ShuffleServerConf extends RssBaseConf {
"Threshold when flushing shuffle data to persistent storage, recommend value would be 256K, "
+ "512K, or even 1M");

public static final ConfigOption<Long> SERVER_SHUFFLE_FLUSH_BLOCKCOUNT_THRESHOLD =
ConfigOptions.key("rss.server.shuffle.flush.blockCountThreshold")
.longType()
.checkValue(
ConfigUtils.NON_NEGATIVE_LONG_VALIDATOR, "Block count threshold must be non-negative")
.defaultValue(0L)
.withDescription(
"Threshold for flushing shuffle data to persistent storage based on block count");

public static final ConfigOption<Boolean> SERVER_SHUFFLE_BUFFER_LAB_ENABLE =
ConfigOptions.key("rss.server.buffer.lab.enabled")
.booleanType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1048,6 +1048,9 @@ private void triggerFlush() {
if (this.shuffleBufferManager.needToFlush()) {
this.shuffleBufferManager.flushIfNecessary();
}
if (this.shuffleBufferManager.ifTooManyBlock()) {
this.shuffleBufferManager.flushIfTooManyBlock();
}
}

public Map<String, ShuffleTaskInfo> getShuffleTaskInfos() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,12 @@ public class ShuffleBufferManager {
private ShuffleTaskManager shuffleTaskManager;
private final ShuffleFlushManager shuffleFlushManager;
private long capacity;
private long blockCountCapacity;
private long readCapacity;
private long highWaterMark;
private long lowWaterMark;
private long blockCountHighWaterMark;
private long blockCountLowWaterMark;
private final boolean bufferFlushWhenCachingData;
private boolean bufferFlushEnabled;
private long bufferFlushThreshold;
Expand All @@ -88,6 +91,7 @@ public class ShuffleBufferManager {
// kept in memory to
// reduce small I/Os to persistent storage, especially for local HDDs.
private long shuffleFlushThreshold;
private long shuffleFlushBlockCountThreshold;
// Huge partition vars
private ReconfigurableConfManager.Reconfigurable<Long> hugePartitionSizeThresholdRef;
private ReconfigurableConfManager.Reconfigurable<Long> hugePartitionSizeHardLimitRef;
Expand All @@ -97,12 +101,15 @@ public class ShuffleBufferManager {
protected AtomicLong inFlushSize = new AtomicLong(0L);
protected AtomicLong usedMemory = new AtomicLong(0L);
private AtomicLong readDataMemory = new AtomicLong(0L);
private final AtomicLong inMemoryBlockCount = new AtomicLong(0);
protected final AtomicLong inMemoryBlockCount = new AtomicLong(0);
private final AtomicLong inFlushBlockCount = new AtomicLong(0);
// appId -> shuffleId -> partitionId -> ShuffleBuffer to avoid too many appId
protected Map<String, Map<Integer, RangeMap<Integer, ShuffleBuffer>>> bufferPool;
// appId -> shuffleId -> shuffle size in buffer
protected Map<String, Map<Integer, AtomicLong>> shuffleSizeMap = JavaUtils.newConcurrentMap();
// appId -> shuffleId -> shuffle block count in buffer
protected Map<String, Map<Integer, AtomicLong>> shuffleBlockCountMap =
JavaUtils.newConcurrentMap();
private final boolean appBlockSizeMetricEnabled;
private final ReentrantLock globalFlushLock = new ReentrantLock();

Expand Down Expand Up @@ -144,6 +151,18 @@ public ShuffleBufferManager(
(capacity
/ 100.0
* conf.get(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE));
this.blockCountCapacity =
conf.getSizeAsBytes(ShuffleServerConf.SERVER_BUFFER_BLOCK_COUNT_CAPACITY);
this.blockCountHighWaterMark =
(long)
(blockCountCapacity
/ 100.0
* conf.get(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_HIGHWATERMARK_PERCENTAGE));
this.blockCountLowWaterMark =
(long)
(blockCountCapacity
/ 100.0
* conf.get(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE));
this.bufferFlushWhenCachingData =
conf.getBoolean(ShuffleServerConf.BUFFER_FLUSH_TRIGGERED_WHEN_CACHEING_DATA);
this.bufferFlushEnabled = conf.getBoolean(ShuffleServerConf.SINGLE_BUFFER_FLUSH_ENABLED);
Expand All @@ -153,6 +172,8 @@ public ShuffleBufferManager(
conf.getInteger(ShuffleServerConf.SINGLE_BUFFER_FLUSH_BLOCKS_NUM_THRESHOLD);
this.shuffleFlushThreshold =
conf.getSizeAsBytes(ShuffleServerConf.SERVER_SHUFFLE_FLUSH_THRESHOLD);
this.shuffleFlushBlockCountThreshold =
conf.getSizeAsBytes(ShuffleServerConf.SERVER_SHUFFLE_FLUSH_BLOCKCOUNT_THRESHOLD);
this.hugePartitionSizeThresholdRef =
conf.getReconfigurableConf(ShuffleServerConf.HUGE_PARTITION_SIZE_THRESHOLD);
this.hugePartitionSizeHardLimitRef =
Expand Down Expand Up @@ -318,6 +339,8 @@ public StatusCode cacheShuffleData(
shuffleId,
spd.getPartitionId());
updateShuffleSize(appId, shuffleId, size);
updateShuffleBlockCount(
appId, shuffleId, spd.getBlockList().length - spd.getDuplicateBlockCount());
flushSingleBufferIfNecessary(
buffer,
appId,
Expand All @@ -328,14 +351,24 @@ public StatusCode cacheShuffleData(
if (bufferFlushWhenCachingData && needToFlush()) {
flushIfNecessary();
}
if (bufferFlushWhenCachingData && ifTooManyBlock()) {
flushIfTooManyBlock();
}
return StatusCode.SUCCESS;
}

private void updateShuffleSize(String appId, int shuffleId, long size) {
shuffleSizeMap.computeIfAbsent(appId, key -> JavaUtils.newConcurrentMap());
Map<Integer, AtomicLong> shuffleIdToSize = shuffleSizeMap.get(appId);
shuffleIdToSize.computeIfAbsent(shuffleId, key -> new AtomicLong(0));
shuffleIdToSize.get(shuffleId).addAndGet(size);
Map<Integer, AtomicLong> shuffleIdToSize =
shuffleSizeMap.computeIfAbsent(appId, key -> JavaUtils.newConcurrentMap());
shuffleIdToSize.computeIfAbsent(shuffleId, key -> new AtomicLong(0)).addAndGet(size);
}

private void updateShuffleBlockCount(String appId, int shuffleId, long blockCount) {
Map<Integer, AtomicLong> shuffleIdToBlockCount =
shuffleBlockCountMap.computeIfAbsent(appId, key -> JavaUtils.newConcurrentMap());
shuffleIdToBlockCount
.computeIfAbsent(shuffleId, key -> new AtomicLong(0))
.addAndGet(blockCount);
}

public Entry<Range<Integer>, ShuffleBuffer> getShuffleBufferEntry(
Expand Down Expand Up @@ -435,8 +468,41 @@ public void flushIfNecessary() {
usedMemory.get(),
preAllocatedSize.get(),
inFlushSize.get());
Map<String, Set<Integer>> pickedShuffle = pickFlushedShuffle();
flush(pickedShuffle);
Map<String, Set<Integer>> pickedShuffle =
pickFlushedShuffle(highWaterMark, lowWaterMark, false);
flush(pickedShuffle, highWaterMark, lowWaterMark, false);
}
} catch (InterruptedException e) {
LOG.warn("Ignore the InterruptedException which should be caused by internal killed");
} finally {
if (lockAcquired) {
globalFlushLock.unlock();
}
}
}

public boolean ifTooManyBlock() {
return blockCountHighWaterMark > 0
&& inMemoryBlockCount.get() - inFlushBlockCount.get() > blockCountHighWaterMark;
}

public void flushIfTooManyBlock() {
boolean lockAcquired = false;
try {
lockAcquired = globalFlushLock.tryLock(flushTryLockTimeout, TimeUnit.MILLISECONDS);
if (!lockAcquired) {
return;
}
if (ifTooManyBlock()) {
// todo: add a metric here to track how many times flush occurs.
LOG.info(
"Start to flush with inMemoryBlockCount[{}], blockCountHighWaterMark[{}], inFlushBlockCount[{}]",
inMemoryBlockCount.get(),
blockCountHighWaterMark,
inFlushBlockCount.get());
Map<String, Set<Integer>> pickedShuffle =
pickFlushedShuffle(blockCountHighWaterMark, blockCountLowWaterMark, true);
flush(pickedShuffle, blockCountHighWaterMark, blockCountLowWaterMark, true);
}
} catch (InterruptedException e) {
LOG.warn("Ignore the InterruptedException which should be caused by internal killed");
Expand Down Expand Up @@ -510,6 +576,7 @@ protected boolean flushBuffer(
addInFlushBlockCount(-blockCount);
});
updateShuffleSize(appId, shuffleId, -event.getEncodedLength());
updateShuffleBlockCount(appId, shuffleId, -event.getBlockCount());
inFlushSize.addAndGet(event.getEncodedLength());
if (isHugePartition) {
event.markOwnedByHugePartition();
Expand All @@ -532,6 +599,7 @@ public void removeBuffer(String appId) {
}
removeBufferByShuffleId(appId, shuffleIdToBuffers.keySet());
shuffleSizeMap.remove(appId);
shuffleBlockCountMap.remove(appId);
bufferPool.remove(appId);
if (appBlockSizeMetricEnabled) {
ShuffleServerMetrics.appHistogramWriteBlockSize.remove(appId);
Expand Down Expand Up @@ -686,9 +754,14 @@ public void releaseReadMemory(long size) {
// Flush the buffer with required map which is <appId -> shuffleId>.
// If the total size of the shuffles picked is bigger than the expected flush size,
// it will just flush a part of partitions.
private synchronized void flush(Map<String, Set<Integer>> requiredFlush) {
long pickedFlushSize = 0L;
long expectedFlushSize = highWaterMark - lowWaterMark;
private synchronized void flush(
Map<String, Set<Integer>> requiredFlush,
long highWaterMark,
long lowWaterMark,
boolean basedOnBlockCount) {
long pickedFlushValue = 0L;
long expectedFlushValue = highWaterMark - lowWaterMark;
String metricType = basedOnBlockCount ? "blocks" : "bytes";
for (Map.Entry<String, Map<Integer, RangeMap<Integer, ShuffleBuffer>>> appIdToBuffers :
bufferPool.entrySet()) {
String appId = appIdToBuffers.getKey();
Expand All @@ -712,7 +785,10 @@ private synchronized void flush(Map<String, Set<Integer>> requiredFlush) {
shuffleIdToBuffers.getValue().asMapOfRanges().entrySet()) {
Range<Integer> range = rangeEntry.getKey();
ShuffleBuffer shuffleBuffer = rangeEntry.getValue();
long bufferEncodedLength = shuffleBuffer.getEncodedLength();
long bufferValue =
basedOnBlockCount
? shuffleBuffer.getBlockCount()
: shuffleBuffer.getEncodedLength();
boolean success =
flushBuffer(
shuffleBuffer,
Expand All @@ -723,10 +799,11 @@ private synchronized void flush(Map<String, Set<Integer>> requiredFlush) {
HugePartitionUtils.isHugePartition(
shuffleTaskManager, appId, shuffleId, range.lowerEndpoint()));
if (success) {
pickedFlushSize += bufferEncodedLength;
pickedFlushValue += bufferValue;
}
if (pickedFlushSize > expectedFlushSize) {
LOG.info("Already picked enough buffers to flush {} bytes", pickedFlushSize);
if (pickedFlushValue > expectedFlushValue) {
LOG.info(
"Already picked enough buffers to flush {} {}", pickedFlushValue, metricType);
return;
}
}
Expand Down Expand Up @@ -816,10 +893,17 @@ public long getPreAllocatedSize() {
return preAllocatedSize.get();
}

// sort for shuffle according to data size, then pick properly data which will be flushed
private Map<String, Set<Integer>> pickFlushedShuffle() {
// sort for shuffle according to data size or block count, then pick properly data which will be
// flushed
private Map<String, Set<Integer>> pickFlushedShuffle(
long highWaterMark, long lowWaterMark, boolean basedOnBlockCount) {
Map<String, Map<Integer, AtomicLong>> shuffleMap =
basedOnBlockCount ? shuffleBlockCountMap : shuffleSizeMap;
long threshold = basedOnBlockCount ? shuffleFlushBlockCountThreshold : shuffleFlushThreshold;
String metricType = basedOnBlockCount ? "blocks" : "bytes";

// create list for sort
List<Entry<String, AtomicLong>> sizeList = generateSizeList();
List<Entry<String, AtomicLong>> sizeList = generateShuffleList(shuffleMap);
sizeList.sort(
(entry1, entry2) -> {
if (entry1 == null && entry2 == null) {
Expand All @@ -840,36 +924,37 @@ private Map<String, Set<Integer>> pickFlushedShuffle() {
});

Map<String, Set<Integer>> pickedShuffle = Maps.newHashMap();
// The algorithm here is to flush data size > highWaterMark - lowWaterMark
// The algorithm here is to flush data size/blockCount > highWaterMark - lowWaterMark
// the remaining data in buffer maybe more than lowWaterMark
// because shuffle server is still receiving data, but it should be ok
long expectedFlushSize = highWaterMark - lowWaterMark;
long atLeastFlushSizeIgnoreThreshold = expectedFlushSize >>> 1;
long pickedFlushSize = 0L;
long expectedFlushValue = highWaterMark - lowWaterMark;
long atLeastFlushSizeIgnoreThreshold = expectedFlushValue >>> 1;
long pickedValue = 0L;
int printIndex = 0;
int printIgnoreIndex = 0;
int printMax = 10;
for (Map.Entry<String, AtomicLong> entry : sizeList) {
long size = entry.getValue().get();
long value = entry.getValue().get();
String appIdShuffleIdKey = entry.getKey();
if (size > this.shuffleFlushThreshold || pickedFlushSize <= atLeastFlushSizeIgnoreThreshold) {
pickedFlushSize += size;
if (value > threshold || pickedValue <= atLeastFlushSizeIgnoreThreshold) {
pickedValue += value;
addPickedShuffle(appIdShuffleIdKey, pickedShuffle);
// print detail picked info
if (printIndex < printMax) {
LOG.info("Pick application_shuffleId[{}] with {} bytes", appIdShuffleIdKey, size);
LOG.info(
"Pick application_shuffleId[{}] with {} {}", appIdShuffleIdKey, value, metricType);
printIndex++;
}
if (pickedFlushSize > expectedFlushSize) {
LOG.info("Finish flush pick with {} bytes", pickedFlushSize);
if (pickedValue > expectedFlushValue) {
LOG.info("Finish flush pick with {} {}", pickedValue, metricType);
break;
}
} else {
// since shuffle size is ordered by size desc, we can skip process more shuffle data once
// some shuffle's size
// is less than threshold
// since shuffle value is ordered by value desc, we can skip process more shuffle data once
// some shuffle's value is less than threshold
if (printIgnoreIndex < printMax) {
LOG.info("Ignore application_shuffleId[{}] with {} bytes", appIdShuffleIdKey, size);
LOG.info(
"Ignore application_shuffleId[{}] with {} {}", appIdShuffleIdKey, value, metricType);
printIgnoreIndex++;
} else {
break;
Expand All @@ -879,16 +964,17 @@ private Map<String, Set<Integer>> pickFlushedShuffle() {
return pickedShuffle;
}

private List<Map.Entry<String, AtomicLong>> generateSizeList() {
Map<String, AtomicLong> sizeMap = Maps.newHashMap();
for (Map.Entry<String, Map<Integer, AtomicLong>> appEntry : shuffleSizeMap.entrySet()) {
private List<Map.Entry<String, AtomicLong>> generateShuffleList(
Map<String, Map<Integer, AtomicLong>> shuffleMap) {
Map<String, AtomicLong> result = Maps.newHashMap();
for (Map.Entry<String, Map<Integer, AtomicLong>> appEntry : shuffleMap.entrySet()) {
String appId = appEntry.getKey();
for (Map.Entry<Integer, AtomicLong> shuffleEntry : appEntry.getValue().entrySet()) {
Integer shuffleId = shuffleEntry.getKey();
sizeMap.put(RssUtils.generateShuffleKey(appId, shuffleId), shuffleEntry.getValue());
result.put(RssUtils.generateShuffleKey(appId, shuffleId), shuffleEntry.getValue());
}
}
return Lists.newArrayList(sizeMap.entrySet());
return Lists.newArrayList(result.entrySet());
}

private void addPickedShuffle(String shuffleIdKey, Map<String, Set<Integer>> pickedShuffle) {
Expand All @@ -907,7 +993,11 @@ public void removeBufferByShuffleId(String appId, Collection<Integer> shuffleIds
}

Map<Integer, AtomicLong> shuffleIdToSizeMap = shuffleSizeMap.get(appId);
Map<Integer, AtomicLong> shuffleIdToBlockCountMap = shuffleBlockCountMap.get(appId);
for (int shuffleId : shuffleIds) {
if (shuffleIdToBlockCountMap != null) {
shuffleIdToBlockCountMap.remove(shuffleId);
}
RangeMap<Integer, ShuffleBuffer> bufferRangeMap = shuffleIdToBuffers.remove(shuffleId);
if (bufferRangeMap == null) {
continue;
Expand Down
Loading
Loading