Skip to content
50 changes: 50 additions & 0 deletions src/buf/vec_deque.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,53 @@ impl Buf for VecDeque<u8> {
self.drain(..cnt);
}
}

impl<T: Buf> Buf for VecDeque<T> {
fn remaining(&self) -> usize {
self.iter().map(|b| b.remaining()).sum()
}

fn chunk(&self) -> &[u8] {
Comment thread
b01o marked this conversation as resolved.
self.iter()
.find(|b| b.has_remaining())
.map(|b| b.chunk())
.unwrap_or_default()
}
Comment on lines +47 to +52
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that I think of it more, this violates the implementer notes on chunk(). Since we accept any VecDeque, there could be empty elements within it, something that common BufList implementations prevent 1 2

chunk() should return an empty slice if and only if remaining() returns 0

Footnotes

  1. https://docs.rs/crate/hyper/1.8.1/source/src/common/buf.rs#17-21

  2. https://docs.rs/crate/watermelon-proto/0.1.8/source/src/util/buf_list.rs#22-27

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see what you mean, let me try to make it follow the contract.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we need a bugfix to existing code, it should be a separate PR.


#[cfg(feature = "std")]
fn chunks_vectored<'a>(&'a self, dst: &mut [io::IoSlice<'a>]) -> usize {
let mut n = 0;
for buf in self {
if n >= dst.len() {
break;
}

let old_n = n;
n += buf.chunks_vectored(&mut dst[n..]);
Comment on lines +57 to +63
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This loop must exit if the chunks written by buf have a combined length smaller than buf.remaining(), because you must read everything from the first buffer before you start reading from the second buffer. Please add a test that would have caught this.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The access issue should be fixed now. Let me know if I missed anything.


let total_length: usize = dst[old_n..n].iter().map(|s| s.len()).sum();
if total_length < buf.remaining() {
// * we don't gather all the remaining data of the current buffer,
// must stop here to preserve the correct data ordering.
break;
}
}
n
}

fn advance(&mut self, mut cnt: usize) {
while cnt > 0 {
let b = self
.front_mut()
.expect("advance called with cnt > remaining");
let rem = b.remaining();
if cnt < rem {
b.advance(cnt);
return;
} else {
cnt -= rem;
self.pop_front();
}
}
}
}
54 changes: 54 additions & 0 deletions tests/test_buf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -459,3 +459,57 @@ fn copy_to_bytes_mut() {
let bytes_mut2 = BytesMut::from(ret);
assert_eq!(bytes_mut2.as_ptr(), ptr);
}

#[test]
#[cfg(feature = "std")]
fn test_vec_deque_buf_sequential_integrity() {
/// A mock Buf that simulates a "fragmented" memory layout.
/// It claims to have 10 bytes remaining, but its `chunks_vectored`
/// implementation only exposes the first 4 bytes.
struct MockBuf(Bytes);
impl Buf for MockBuf {
fn remaining(&self) -> usize {
self.0.remaining()
}
fn chunk(&self) -> &[u8] {
&self.0.chunk()[..1]
}
fn advance(&mut self, cnt: usize) {
self.0.advance(cnt);
}
/// Purposefully return fewer bytes than `remaining()` to test
/// if the caller correctly stops at the first gap.
fn chunks_vectored<'a>(&'a self, dst: &mut [IoSlice<'a>]) -> usize {
if dst.is_empty() || self.0.is_empty() {
return 0;
}
let limit = std::cmp::min(self.0.len(), 4);
dst[0] = IoSlice::new(&self.0.chunk()[..limit]);
1
}
}

let buf1 = MockBuf(Bytes::from("0123456789")); // 10 bytes
let buf2 = MockBuf(Bytes::from("ABCDEFGHIJ")); // 10 bytes

let mut deque = VecDeque::new();
deque.push_back(buf1);
deque.push_back(buf2);

let mut slices = [IoSlice::new(&[]); 16];
let n = deque.chunks_vectored(&mut slices);

let total_len: usize = slices[..n].iter().map(|s| s.len()).sum();

// Verification Logic:
// A correct implementation must stop gathering if a buffer cannot
// expose its entire remaining content in a continuous vector of slices.
// If total_len > 4, it means the implementation skipped the tail of
// buf1 ("456789") and jumped straight to buf2, which breaks data ordering.
assert!(
total_len <= 4,
"Error: Implementation gathered data from subsequent buffers before
exhausting the current buffer's remaining data! Total len: {}",
total_len
);
}