[FLINK-38930][checkpoint] Filtering record before processing without spilling strategy#27783
[FLINK-38930][checkpoint] Filtering record before processing without spilling strategy#277831996fanrui wants to merge 6 commits intoapache:masterfrom
Conversation
2b06750 to
997e3a3
Compare
pnowojski
left a comment
There was a problem hiding this comment.
Thanks! I've left a couple of comments from the first review pass
| * Deserializes records from {@code sourceBuffer}, applies the virtual channel's record | ||
| * filter, and re-serializes the surviving records into new buffers. | ||
| */ | ||
| List<Buffer> filterAndRewrite( |
There was a problem hiding this comment.
could you re-order methods in this class? Public first. Private either below all publics, or below the first usage?
| /** | ||
| * Filters a recovered buffer from the specified virtual channel, returning new buffers | ||
| * containing only the records that belong to the current subtask. | ||
| * | ||
| * @return filtered buffers, possibly empty if all records were filtered out. | ||
| */ | ||
| public List<Buffer> filterAndRewrite( | ||
| int gateIndex, | ||
| int oldSubtaskIndex, | ||
| int oldChannelIndex, | ||
| Buffer sourceBuffer, | ||
| BufferSupplier bufferSupplier) |
There was a problem hiding this comment.
Why does it return List from one single sourceBuffer? Could you explain this in the java doc? And how many Buffers can that be? If a lot, shouldn't this be an Iterator?
There was a problem hiding this comment.
The code comment is udpated.
The List return can contain more than 1 buffer when a spanning record completes in this buffer — the deserializer caches partial data from previous buffers, so the output may include data not present in the current source buffer.
This is uncommon but possible with any spanning record. For this case, it will be covered by spilling logic if network pool is insufficient.
| // Extra retain: filterAndRewrite consumes one ref, caller's finally releases another. | ||
| buffer.retainBuffer(); |
There was a problem hiding this comment.
nit: I think it would be slightly cleaner to call buffer.retainBuffer from the outside, and contract would be then that this method always takes over ownership of this buffer.
There was a problem hiding this comment.
Addressed together with the ownership concern in comment https://github.com/apache/flink/pull/27783/changes#r2996388666. Removed retainBuffer() and the catch block entirely. The buffer now has a single clean owner per path: in the filtering path, the deserializer recycles the buffer when consumed; the finally uses a defensive isRecycled() check only for the edge case where an exception occurs before the deserializer takes the buffer (e.g., VirtualChannel lookup failure). Added a buffer lifecycle diagram in the javadoc covering all paths. No extra retain/recycle needed.
| } catch (Throwable t) { | ||
| // filterAndRewrite didn't consume the buffer, release the extra ref. | ||
| buffer.recycleBuffer(); | ||
| throw t; | ||
| } |
There was a problem hiding this comment.
Hmm, that's a bit strange? It sounds like it's not clear who is owner of this buffer? There should be clean owner that's always responsible for cleaning up, no matter what.
...untime/src/main/java/org/apache/flink/streaming/runtime/io/recovery/RecordFilterContext.java
Show resolved
Hide resolved
.../main/java/org/apache/flink/runtime/checkpoint/channel/SequentialChannelStateReaderImpl.java
Outdated
Show resolved
Hide resolved
| List<StreamElement> filteredElements = new ArrayList<>(); | ||
|
|
||
| while (true) { | ||
| DeserializationResult result = vc.getNextRecord(deserializationDelegate); | ||
| if (result.isFullRecord()) { | ||
| filteredElements.add(deserializationDelegate.getInstance()); | ||
| } | ||
| if (result.isBufferConsumed()) { | ||
| break; | ||
| } | ||
| } | ||
|
|
||
| return serializeToBuffers(filteredElements, bufferSupplier); |
There was a problem hiding this comment.
ditto about List in List<StreamElement> filteredElements. It would be safer to be iterative. Current implementation risks OOMs if deserialised records are using more memory than the serialised records. This is not very common, but could happen.
| resultBuffers.add(currentBuffer.retainBuffer()); | ||
| } | ||
| currentBuffer.recycleBuffer(); | ||
| currentBuffer = bufferSupplier.requestBufferBlocking(); |
There was a problem hiding this comment.
Is it safe to block here? 🤔 Can this lead to deadlocks? I think we were discussing this, but AFAIR this code works differently to what we were discussing offline (either using unpooled buffer or create two different pools, or filter records in-place without requesting new buffer)?
There was a problem hiding this comment.
Good catch!
This is addressed in a follow-up commit in https://github.com/apache/flink/pull/27639/commits (FLINK-38544, f031ddf) by falling back to heap buffer when the buffer pool is insufficient.
There was a problem hiding this comment.
I think it would be better to squash that commit here, to avoid merging broken code given that we already have some working fix for it?
997e3a3 to
db2565f
Compare
…ringRecoveryEnabled
…spilling strategy Core filtering mechanism for recovered channel state buffers: - ChannelStateFilteringHandler with per-gate GateFilterHandler - RecordFilterContext with VirtualChannelRecordFilterFactory - Partial data check in SequentialChannelStateReaderImpl - Fix RecordFilterContext for Union downscale scenario
db2565f to
b12a097
Compare
1996fanrui
left a comment
There was a problem hiding this comment.
Thanks @pnowojski for the review,
All comments sound make sense to me, I have addressed all of them.
| /** | ||
| * Filters a recovered buffer from the specified virtual channel, returning new buffers | ||
| * containing only the records that belong to the current subtask. | ||
| * | ||
| * @return filtered buffers, possibly empty if all records were filtered out. | ||
| */ | ||
| public List<Buffer> filterAndRewrite( | ||
| int gateIndex, | ||
| int oldSubtaskIndex, | ||
| int oldChannelIndex, | ||
| Buffer sourceBuffer, | ||
| BufferSupplier bufferSupplier) |
There was a problem hiding this comment.
The code comment is udpated.
The List return can contain more than 1 buffer when a spanning record completes in this buffer — the deserializer caches partial data from previous buffers, so the output may include data not present in the current source buffer.
This is uncommon but possible with any spanning record. For this case, it will be covered by spilling logic if network pool is insufficient.
...untime/src/main/java/org/apache/flink/streaming/runtime/io/recovery/RecordFilterContext.java
Show resolved
Hide resolved
| // Extra retain: filterAndRewrite consumes one ref, caller's finally releases another. | ||
| buffer.retainBuffer(); |
There was a problem hiding this comment.
Addressed together with the ownership concern in comment https://github.com/apache/flink/pull/27783/changes#r2996388666. Removed retainBuffer() and the catch block entirely. The buffer now has a single clean owner per path: in the filtering path, the deserializer recycles the buffer when consumed; the finally uses a defensive isRecycled() check only for the edge case where an exception occurs before the deserializer takes the buffer (e.g., VirtualChannel lookup failure). Added a buffer lifecycle diagram in the javadoc covering all paths. No extra retain/recycle needed.
| List<StreamElement> filteredElements = new ArrayList<>(); | ||
|
|
||
| while (true) { | ||
| DeserializationResult result = vc.getNextRecord(deserializationDelegate); | ||
| if (result.isFullRecord()) { | ||
| filteredElements.add(deserializationDelegate.getInstance()); | ||
| } | ||
| if (result.isBufferConsumed()) { | ||
| break; | ||
| } | ||
| } | ||
|
|
||
| return serializeToBuffers(filteredElements, bufferSupplier); |
| resultBuffers.add(currentBuffer.retainBuffer()); | ||
| } | ||
| currentBuffer.recycleBuffer(); | ||
| currentBuffer = bufferSupplier.requestBufferBlocking(); |
- Add javadoc for filterAndRewrite explaining spanning record multi-buffer output - Move retainBuffer call to caller for clearer buffer ownership contract - Implement Closeable for ChannelStateFilteringHandler - Use try-with-resources in SequentialChannelStateReaderImpl
…during recovery When unaligned checkpointing during recovery is enabled, use a heap buffer as fallback instead of blocking on buffer pool, to avoid hanging if the buffer pool is not yet available. When the feature is disabled, the original blocking behavior is preserved.
b12a097 to
26602df
Compare
This PR depends on #27782
What is the purpose of the change
[FLINK-38930][checkpoint] Filtering record before processing without spilling strategy
Brief change log
Core filtering mechanism for recovered channel state buffers:
Verifying this change
Does this pull request potentially affect one of the following parts:
@Public(Evolving): noDocumentation