Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 110 additions & 0 deletions src/transport/fusedev/bytes_cursor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
use crate::transport::Error;
use std::{io, mem};

#[derive(Debug, Default)]
pub(crate) struct BytesCursor<'a> {
slice: &'a mut [u8],
/// position <= slice.len()
position: usize,
}

impl<'a> BytesCursor<'a> {
pub(crate) fn new(slice: &'a mut [u8], position: usize) -> Self {
assert!(position <= slice.len());
BytesCursor { slice, position }
}

pub(crate) fn slice_mut(&mut self) -> &mut [u8] {
self.slice
}

pub(crate) fn position(&self) -> usize {
self.position
}

pub(crate) fn available_bytes(&self) -> usize {
self.slice.len() - self.position
}

pub(crate) fn check_available_space(&self, sz: usize) -> io::Result<()> {
if sz > self.available_bytes() {
Err(io::Error::new(
io::ErrorKind::InvalidData,
format!(
"data out of range, available {} requested {}",
self.available_bytes(),
sz
),
))
} else {
Ok(())
}
}

pub(crate) fn available_slice(&mut self) -> &mut [u8] {
&mut self.slice[self.position..]
}

pub(crate) fn account_written(&mut self, count: usize) {
assert!(self.available_bytes() >= count);
self.position += count;
}

pub(crate) fn written(&self) -> &[u8] {
&self.slice[..self.position]
}

pub(crate) fn split_at(&mut self, offset: usize) -> Result<BytesCursor<'a>, Error> {
if self.slice.len() < offset {
return Err(Error::SplitOutOfBounds(offset));
}

let (len1, len2) = if self.position > offset {
(offset, self.position - offset)
} else {
(self.position, 0)
};
let (slice1, slice2) = mem::take(&mut self.slice).split_at_mut(offset);
*self = BytesCursor::new(slice1, len1);
Ok(BytesCursor::new(slice2, len2))
}

pub(crate) fn extend_from_slice(&mut self, slice: &[u8]) {
self.slice[self.position..][..slice.len()].copy_from_slice(slice);
self.position += slice.len();
}
}

#[cfg(test)]
mod tests {
use crate::transport::fusedev::bytes_cursor::BytesCursor;

#[test]
fn test_split_at() {
let mut array = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9];

let mut b = BytesCursor::new(&mut array, 0);
let mut b1 = b.split_at(4).unwrap();

assert_eq!(&[0, 1, 2, 3], b.slice_mut());
assert_eq!(0, b.position());
assert_eq!(&[4, 5, 6, 7, 8, 9], b1.slice_mut());
assert_eq!(0, b1.position());

let mut b = BytesCursor::new(&mut array, 6);
let mut b1 = b.split_at(4).unwrap();
assert_eq!(&[0, 1, 2, 3], b.slice_mut());
assert_eq!(4, b.position());
assert_eq!(&[4, 5, 6, 7, 8, 9], b1.slice_mut());
assert_eq!(2, b1.position());
}

#[test]
fn test_extend_from_slice() {
let mut array = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
let mut b = BytesCursor::new(&mut array, 3);
b.extend_from_slice(&[b'a', b'b']);
assert_eq!(&[0, 1, 2, b'a', b'b', 5, 6, 7, 8, 9], b.slice);
assert_eq!(5, b.position());
}
}
113 changes: 40 additions & 73 deletions src/transport/fusedev/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
use std::collections::VecDeque;
use std::io::{self, IoSlice, Write};
use std::marker::PhantomData;
use std::mem::ManuallyDrop;
use std::os::fd::AsRawFd;
use std::os::unix::io::RawFd;

Expand All @@ -20,8 +19,11 @@ use vm_memory::{ByteValued, VolatileSlice};

use super::{Error, FileReadWriteVolatile, IoBuffers, Reader, Result, Writer};
use crate::file_buf::FileVolatileSlice;
use crate::transport::fusedev::bytes_cursor::BytesCursor;
use crate::BitmapSlice;

mod bytes_cursor;

#[cfg(target_os = "linux")]
mod linux_session;
#[cfg(target_os = "linux")]
Expand Down Expand Up @@ -83,23 +85,23 @@ impl<'a, S: BitmapSlice + Default> Reader<'a, S> {
/// 2. If the writer is split, a final commit() MUST be called to issue the
/// device write operation.
/// 3. Concurrency, caller should not write to the writer concurrently.
#[derive(Debug, PartialEq, Eq)]
#[derive(Debug)]
pub struct FuseDevWriter<'a, S: BitmapSlice = ()> {
fd: RawFd,
buffered: bool,
buf: ManuallyDrop<Vec<u8>>,
buf: BytesCursor<'a>,
bitmapslice: S,
phantom: PhantomData<&'a mut [S]>,
}

impl<'a, S: BitmapSlice + Default> FuseDevWriter<'a, S> {
/// Construct a new [Writer].
pub fn new(fd: RawFd, data_buf: &'a mut [u8]) -> Result<FuseDevWriter<'a, S>> {
let buf = unsafe { Vec::from_raw_parts(data_buf.as_mut_ptr(), 0, data_buf.len()) };
let buf = BytesCursor::new(data_buf, 0);
Ok(FuseDevWriter {
fd,
buffered: false,
buf: ManuallyDrop::new(buf),
buf,
bitmapslice: S::default(),
phantom: PhantomData,
})
Expand All @@ -113,27 +115,14 @@ impl<'a, S: BitmapSlice> FuseDevWriter<'a, S> {
/// `Writer` can write up to `available_bytes() - offset` bytes. Returns an error if
/// `offset > self.available_bytes()`.
pub fn split_at(&mut self, offset: usize) -> Result<FuseDevWriter<'a, S>> {
if self.buf.capacity() < offset {
return Err(Error::SplitOutOfBounds(offset));
}
let buf2 = self.buf.split_at(offset)?;

let (len1, len2) = if self.buf.len() > offset {
(offset, self.buf.len() - offset)
} else {
(self.buf.len(), 0)
};
let cap2 = self.buf.capacity() - offset;
let ptr = self.buf.as_mut_ptr();

// Safe because both buffers refer to different parts of the same underlying `data_buf`.
self.buf = unsafe { ManuallyDrop::new(Vec::from_raw_parts(ptr, len1, offset)) };
self.buffered = true;
let buf = unsafe { ManuallyDrop::new(Vec::from_raw_parts(ptr.add(offset), len2, cap2)) };

Ok(FuseDevWriter {
fd: self.fd,
buffered: true,
buf,
buf: buf2,
bitmapslice: self.bitmapslice.clone(),
phantom: PhantomData,
})
Expand All @@ -146,15 +135,15 @@ impl<'a, S: BitmapSlice> FuseDevWriter<'a, S> {
}

let o = match other {
Some(Writer::FuseDev(w)) => w.buf.as_slice(),
Some(Writer::FuseDev(w)) => w.buf.written(),
_ => &[],
};
let res = match (self.buf.len(), o.len()) {
let res = match (self.buf.position(), o.len()) {
(0, 0) => Ok(0),
(0, _) => write(self.fd, o),
(_, 0) => write(self.fd, self.buf.as_slice()),
(_, 0) => write(self.fd, self.buf.written()),
(_, _) => {
let bufs = [IoSlice::new(self.buf.as_slice()), IoSlice::new(o)];
let bufs = [IoSlice::new(self.buf.written()), IoSlice::new(o)];
writev(self.fd, &bufs)
}
};
Expand All @@ -164,18 +153,12 @@ impl<'a, S: BitmapSlice> FuseDevWriter<'a, S> {

/// Return number of bytes already written to the internal buffer.
pub fn bytes_written(&self) -> usize {
self.buf.len()
self.buf.position()
}

/// Return number of bytes available for writing.
pub fn available_bytes(&self) -> usize {
self.buf.capacity() - self.buf.len()
}

fn account_written(&mut self, count: usize) {
let new_len = self.buf.len() + count;
// Safe because check_avail_space() ensures that `count` is valid.
unsafe { self.buf.set_len(new_len) };
self.buf.available_bytes()
}

/// Write an object to the writer.
Expand All @@ -193,21 +176,17 @@ impl<'a, S: BitmapSlice> FuseDevWriter<'a, S> {
) -> io::Result<usize> {
self.check_available_space(count)?;

let cnt = src.read_vectored_volatile(
// Safe because we have made sure buf has at least count capacity above
unsafe {
&[FileVolatileSlice::from_raw_ptr(
self.buf.as_mut_ptr().add(self.buf.len()),
count,
)]
},
)?;
self.account_written(cnt);
let cnt = src.read_vectored_volatile(unsafe {
&[FileVolatileSlice::from_mut_slice(
&mut self.buf.available_slice()[..count],
)]
})?;
self.buf.account_written(cnt);

if self.buffered {
Ok(cnt)
} else {
Self::do_write(self.fd, &self.buf[..cnt])
Self::do_write(self.fd, &self.buf.written()[..cnt])
}
}

Expand All @@ -222,21 +201,19 @@ impl<'a, S: BitmapSlice> FuseDevWriter<'a, S> {
self.check_available_space(count)?;

let cnt = src.read_vectored_at_volatile(
// Safe because we have made sure buf has at least count capacity above
unsafe {
&[FileVolatileSlice::from_raw_ptr(
self.buf.as_mut_ptr().add(self.buf.len()),
count,
&[FileVolatileSlice::from_mut_slice(
&mut self.buf.available_slice()[..count],
)]
},
off,
)?;
self.account_written(cnt);
self.buf.account_written(cnt);

if self.buffered {
Ok(cnt)
} else {
Self::do_write(self.fd, &self.buf[..cnt])
Self::do_write(self.fd, &self.buf.slice_mut()[..cnt])
}
}

Expand Down Expand Up @@ -266,19 +243,8 @@ impl<'a, S: BitmapSlice> FuseDevWriter<'a, S> {
}

fn check_available_space(&self, sz: usize) -> io::Result<()> {
assert!(self.buffered || self.buf.is_empty());
if sz > self.available_bytes() {
Err(io::Error::new(
io::ErrorKind::InvalidData,
format!(
"data out of range, available {} requested {}",
self.available_bytes(),
sz
),
))
} else {
Ok(())
}
assert!(self.buffered || self.buf.position() == 0);
self.buf.check_available_space(sz)
}

fn do_write(fd: RawFd, data: &[u8]) -> io::Result<usize> {
Expand All @@ -298,7 +264,7 @@ impl<S: BitmapSlice> Write for FuseDevWriter<'_, S> {
Ok(data.len())
} else {
Self::do_write(self.fd, data).inspect(|&x| {
self.account_written(x);
self.buf.account_written(x);
})
}
}
Expand All @@ -319,7 +285,7 @@ impl<S: BitmapSlice> Write for FuseDevWriter<'_, S> {
}
writev(self.fd, bufs)
.inspect(|&x| {
self.account_written(x);
self.buf.account_written(x);
})
.map_err(|e| {
error! {"fail to write to fuse device on commit: {}", e};
Expand Down Expand Up @@ -393,7 +359,7 @@ mod async_io {
} else {
nix::sys::uio::pwrite(self.fd, data, 0)
.map(|x| {
self.account_written(x);
self.buf.account_written(x);
x
})
.map_err(|e| {
Expand All @@ -419,7 +385,7 @@ mod async_io {
let bufs = [IoSlice::new(data), IoSlice::new(data2)];
writev(self.fd, &bufs)
.map(|x| {
self.account_written(x);
self.buf.account_written(x);
x
})
.map_err(|e| {
Expand Down Expand Up @@ -451,7 +417,7 @@ mod async_io {
let bufs = [IoSlice::new(data), IoSlice::new(data2), IoSlice::new(data3)];
writev(self.fd, &bufs)
.map(|x| {
self.account_written(x);
self.buf.account_written(x);
x
})
.map_err(|e| {
Expand Down Expand Up @@ -491,11 +457,12 @@ mod async_io {
) -> io::Result<usize> {
self.check_available_space(count)?;

let buf = unsafe { FileVolatileBuf::from_raw_ptr(self.buf.as_mut_ptr(), 0, count) };
let buf =
unsafe { FileVolatileBuf::new_with_data(&mut self.buf.slice_mut()[..count], 0) };
let (res, _) = src.async_read_at_volatile(buf, off).await;
match res {
Ok(cnt) => {
self.account_written(cnt);
self.buf.account_written(cnt);
if self.buffered {
Ok(cnt)
} else {
Expand All @@ -515,22 +482,22 @@ mod async_io {
/// We need this because the lifetime of others is usually shorter than self.
pub async fn async_commit(&mut self, other: Option<&Writer<'a, S>>) -> io::Result<usize> {
let o = match other {
Some(Writer::FuseDev(w)) => w.buf.as_slice(),
Some(Writer::FuseDev(w)) => w.buf.written(),
_ => &[],
};

let res = match (self.buf.len(), o.len()) {
let res = match (self.buf.position(), o.len()) {
(0, 0) => Ok(0),
(0, _) => nix::sys::uio::pwrite(self.fd, o, 0).map_err(|e| {
error! {"fail to write to fuse device fd {}: {}", self.fd, e};
io::Error::other(format!("{}", e))
}),
(_, 0) => nix::sys::uio::pwrite(self.fd, self.buf.as_slice(), 0).map_err(|e| {
(_, 0) => nix::sys::uio::pwrite(self.fd, self.buf.written(), 0).map_err(|e| {
error! {"fail to write to fuse device fd {}: {}", self.fd, e};
io::Error::other(format!("{}", e))
}),
(_, _) => {
let bufs = [IoSlice::new(self.buf.as_slice()), IoSlice::new(o)];
let bufs = [IoSlice::new(self.buf.written()), IoSlice::new(o)];
writev(self.fd, &bufs).map_err(|e| {
error! {"fail to write to fuse device fd {}: {}", self.fd, e};
io::Error::other(format!("{}", e))
Expand Down
Loading