diff --git a/src/transport/fusedev/bytes_cursor.rs b/src/transport/fusedev/bytes_cursor.rs new file mode 100644 index 00000000..528d10cb --- /dev/null +++ b/src/transport/fusedev/bytes_cursor.rs @@ -0,0 +1,110 @@ +use crate::transport::Error; +use std::{io, mem}; + +#[derive(Debug, Default)] +pub(crate) struct BytesCursor<'a> { + slice: &'a mut [u8], + /// position <= slice.len() + position: usize, +} + +impl<'a> BytesCursor<'a> { + pub(crate) fn new(slice: &'a mut [u8], position: usize) -> Self { + assert!(position <= slice.len()); + BytesCursor { slice, position } + } + + pub(crate) fn slice_mut(&mut self) -> &mut [u8] { + self.slice + } + + pub(crate) fn position(&self) -> usize { + self.position + } + + pub(crate) fn available_bytes(&self) -> usize { + self.slice.len() - self.position + } + + pub(crate) fn check_available_space(&self, sz: usize) -> io::Result<()> { + if sz > self.available_bytes() { + Err(io::Error::new( + io::ErrorKind::InvalidData, + format!( + "data out of range, available {} requested {}", + self.available_bytes(), + sz + ), + )) + } else { + Ok(()) + } + } + + pub(crate) fn available_slice(&mut self) -> &mut [u8] { + &mut self.slice[self.position..] + } + + pub(crate) fn account_written(&mut self, count: usize) { + assert!(self.available_bytes() >= count); + self.position += count; + } + + pub(crate) fn written(&self) -> &[u8] { + &self.slice[..self.position] + } + + pub(crate) fn split_at(&mut self, offset: usize) -> Result, Error> { + if self.slice.len() < offset { + return Err(Error::SplitOutOfBounds(offset)); + } + + let (len1, len2) = if self.position > offset { + (offset, self.position - offset) + } else { + (self.position, 0) + }; + let (slice1, slice2) = mem::take(&mut self.slice).split_at_mut(offset); + *self = BytesCursor::new(slice1, len1); + Ok(BytesCursor::new(slice2, len2)) + } + + pub(crate) fn extend_from_slice(&mut self, slice: &[u8]) { + self.slice[self.position..][..slice.len()].copy_from_slice(slice); + self.position += slice.len(); + } +} + +#[cfg(test)] +mod tests { + use crate::transport::fusedev::bytes_cursor::BytesCursor; + + #[test] + fn test_split_at() { + let mut array = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]; + + let mut b = BytesCursor::new(&mut array, 0); + let mut b1 = b.split_at(4).unwrap(); + + assert_eq!(&[0, 1, 2, 3], b.slice_mut()); + assert_eq!(0, b.position()); + assert_eq!(&[4, 5, 6, 7, 8, 9], b1.slice_mut()); + assert_eq!(0, b1.position()); + + let mut b = BytesCursor::new(&mut array, 6); + let mut b1 = b.split_at(4).unwrap(); + assert_eq!(&[0, 1, 2, 3], b.slice_mut()); + assert_eq!(4, b.position()); + assert_eq!(&[4, 5, 6, 7, 8, 9], b1.slice_mut()); + assert_eq!(2, b1.position()); + } + + #[test] + fn test_extend_from_slice() { + let mut array = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]; + let mut b = BytesCursor::new(&mut array, 3); + b.extend_from_slice(&[b'a', b'b']); + assert_eq!(&[0, 1, 2, b'a', b'b', 5, 6, 7, 8, 9], b.slice); + assert_eq!(5, b.position()); + } +} diff --git a/src/transport/fusedev/mod.rs b/src/transport/fusedev/mod.rs index 051a4c8f..3fe49fa5 100644 --- a/src/transport/fusedev/mod.rs +++ b/src/transport/fusedev/mod.rs @@ -10,7 +10,6 @@ use std::collections::VecDeque; use std::io::{self, IoSlice, Write}; use std::marker::PhantomData; -use std::mem::ManuallyDrop; use std::os::fd::AsRawFd; use std::os::unix::io::RawFd; @@ -20,8 +19,11 @@ use vm_memory::{ByteValued, VolatileSlice}; use super::{Error, FileReadWriteVolatile, IoBuffers, Reader, Result, Writer}; use crate::file_buf::FileVolatileSlice; +use crate::transport::fusedev::bytes_cursor::BytesCursor; use crate::BitmapSlice; +mod bytes_cursor; + #[cfg(target_os = "linux")] mod linux_session; #[cfg(target_os = "linux")] @@ -83,11 +85,11 @@ impl<'a, S: BitmapSlice + Default> Reader<'a, S> { /// 2. If the writer is split, a final commit() MUST be called to issue the /// device write operation. /// 3. Concurrency, caller should not write to the writer concurrently. -#[derive(Debug, PartialEq, Eq)] +#[derive(Debug)] pub struct FuseDevWriter<'a, S: BitmapSlice = ()> { fd: RawFd, buffered: bool, - buf: ManuallyDrop>, + buf: BytesCursor<'a>, bitmapslice: S, phantom: PhantomData<&'a mut [S]>, } @@ -95,11 +97,11 @@ pub struct FuseDevWriter<'a, S: BitmapSlice = ()> { impl<'a, S: BitmapSlice + Default> FuseDevWriter<'a, S> { /// Construct a new [Writer]. pub fn new(fd: RawFd, data_buf: &'a mut [u8]) -> Result> { - let buf = unsafe { Vec::from_raw_parts(data_buf.as_mut_ptr(), 0, data_buf.len()) }; + let buf = BytesCursor::new(data_buf, 0); Ok(FuseDevWriter { fd, buffered: false, - buf: ManuallyDrop::new(buf), + buf, bitmapslice: S::default(), phantom: PhantomData, }) @@ -113,27 +115,14 @@ impl<'a, S: BitmapSlice> FuseDevWriter<'a, S> { /// `Writer` can write up to `available_bytes() - offset` bytes. Returns an error if /// `offset > self.available_bytes()`. pub fn split_at(&mut self, offset: usize) -> Result> { - if self.buf.capacity() < offset { - return Err(Error::SplitOutOfBounds(offset)); - } + let buf2 = self.buf.split_at(offset)?; - let (len1, len2) = if self.buf.len() > offset { - (offset, self.buf.len() - offset) - } else { - (self.buf.len(), 0) - }; - let cap2 = self.buf.capacity() - offset; - let ptr = self.buf.as_mut_ptr(); - - // Safe because both buffers refer to different parts of the same underlying `data_buf`. - self.buf = unsafe { ManuallyDrop::new(Vec::from_raw_parts(ptr, len1, offset)) }; self.buffered = true; - let buf = unsafe { ManuallyDrop::new(Vec::from_raw_parts(ptr.add(offset), len2, cap2)) }; Ok(FuseDevWriter { fd: self.fd, buffered: true, - buf, + buf: buf2, bitmapslice: self.bitmapslice.clone(), phantom: PhantomData, }) @@ -146,15 +135,15 @@ impl<'a, S: BitmapSlice> FuseDevWriter<'a, S> { } let o = match other { - Some(Writer::FuseDev(w)) => w.buf.as_slice(), + Some(Writer::FuseDev(w)) => w.buf.written(), _ => &[], }; - let res = match (self.buf.len(), o.len()) { + let res = match (self.buf.position(), o.len()) { (0, 0) => Ok(0), (0, _) => write(self.fd, o), - (_, 0) => write(self.fd, self.buf.as_slice()), + (_, 0) => write(self.fd, self.buf.written()), (_, _) => { - let bufs = [IoSlice::new(self.buf.as_slice()), IoSlice::new(o)]; + let bufs = [IoSlice::new(self.buf.written()), IoSlice::new(o)]; writev(self.fd, &bufs) } }; @@ -164,18 +153,12 @@ impl<'a, S: BitmapSlice> FuseDevWriter<'a, S> { /// Return number of bytes already written to the internal buffer. pub fn bytes_written(&self) -> usize { - self.buf.len() + self.buf.position() } /// Return number of bytes available for writing. pub fn available_bytes(&self) -> usize { - self.buf.capacity() - self.buf.len() - } - - fn account_written(&mut self, count: usize) { - let new_len = self.buf.len() + count; - // Safe because check_avail_space() ensures that `count` is valid. - unsafe { self.buf.set_len(new_len) }; + self.buf.available_bytes() } /// Write an object to the writer. @@ -193,21 +176,17 @@ impl<'a, S: BitmapSlice> FuseDevWriter<'a, S> { ) -> io::Result { self.check_available_space(count)?; - let cnt = src.read_vectored_volatile( - // Safe because we have made sure buf has at least count capacity above - unsafe { - &[FileVolatileSlice::from_raw_ptr( - self.buf.as_mut_ptr().add(self.buf.len()), - count, - )] - }, - )?; - self.account_written(cnt); + let cnt = src.read_vectored_volatile(unsafe { + &[FileVolatileSlice::from_mut_slice( + &mut self.buf.available_slice()[..count], + )] + })?; + self.buf.account_written(cnt); if self.buffered { Ok(cnt) } else { - Self::do_write(self.fd, &self.buf[..cnt]) + Self::do_write(self.fd, &self.buf.written()[..cnt]) } } @@ -222,21 +201,19 @@ impl<'a, S: BitmapSlice> FuseDevWriter<'a, S> { self.check_available_space(count)?; let cnt = src.read_vectored_at_volatile( - // Safe because we have made sure buf has at least count capacity above unsafe { - &[FileVolatileSlice::from_raw_ptr( - self.buf.as_mut_ptr().add(self.buf.len()), - count, + &[FileVolatileSlice::from_mut_slice( + &mut self.buf.available_slice()[..count], )] }, off, )?; - self.account_written(cnt); + self.buf.account_written(cnt); if self.buffered { Ok(cnt) } else { - Self::do_write(self.fd, &self.buf[..cnt]) + Self::do_write(self.fd, &self.buf.slice_mut()[..cnt]) } } @@ -266,19 +243,8 @@ impl<'a, S: BitmapSlice> FuseDevWriter<'a, S> { } fn check_available_space(&self, sz: usize) -> io::Result<()> { - assert!(self.buffered || self.buf.is_empty()); - if sz > self.available_bytes() { - Err(io::Error::new( - io::ErrorKind::InvalidData, - format!( - "data out of range, available {} requested {}", - self.available_bytes(), - sz - ), - )) - } else { - Ok(()) - } + assert!(self.buffered || self.buf.position() == 0); + self.buf.check_available_space(sz) } fn do_write(fd: RawFd, data: &[u8]) -> io::Result { @@ -298,7 +264,7 @@ impl Write for FuseDevWriter<'_, S> { Ok(data.len()) } else { Self::do_write(self.fd, data).inspect(|&x| { - self.account_written(x); + self.buf.account_written(x); }) } } @@ -319,7 +285,7 @@ impl Write for FuseDevWriter<'_, S> { } writev(self.fd, bufs) .inspect(|&x| { - self.account_written(x); + self.buf.account_written(x); }) .map_err(|e| { error! {"fail to write to fuse device on commit: {}", e}; @@ -393,7 +359,7 @@ mod async_io { } else { nix::sys::uio::pwrite(self.fd, data, 0) .map(|x| { - self.account_written(x); + self.buf.account_written(x); x }) .map_err(|e| { @@ -419,7 +385,7 @@ mod async_io { let bufs = [IoSlice::new(data), IoSlice::new(data2)]; writev(self.fd, &bufs) .map(|x| { - self.account_written(x); + self.buf.account_written(x); x }) .map_err(|e| { @@ -451,7 +417,7 @@ mod async_io { let bufs = [IoSlice::new(data), IoSlice::new(data2), IoSlice::new(data3)]; writev(self.fd, &bufs) .map(|x| { - self.account_written(x); + self.buf.account_written(x); x }) .map_err(|e| { @@ -491,11 +457,12 @@ mod async_io { ) -> io::Result { self.check_available_space(count)?; - let buf = unsafe { FileVolatileBuf::from_raw_ptr(self.buf.as_mut_ptr(), 0, count) }; + let buf = + unsafe { FileVolatileBuf::new_with_data(&mut self.buf.slice_mut()[..count], 0) }; let (res, _) = src.async_read_at_volatile(buf, off).await; match res { Ok(cnt) => { - self.account_written(cnt); + self.buf.account_written(cnt); if self.buffered { Ok(cnt) } else { @@ -515,22 +482,22 @@ mod async_io { /// We need this because the lifetime of others is usually shorter than self. pub async fn async_commit(&mut self, other: Option<&Writer<'a, S>>) -> io::Result { let o = match other { - Some(Writer::FuseDev(w)) => w.buf.as_slice(), + Some(Writer::FuseDev(w)) => w.buf.written(), _ => &[], }; - let res = match (self.buf.len(), o.len()) { + let res = match (self.buf.position(), o.len()) { (0, 0) => Ok(0), (0, _) => nix::sys::uio::pwrite(self.fd, o, 0).map_err(|e| { error! {"fail to write to fuse device fd {}: {}", self.fd, e}; io::Error::other(format!("{}", e)) }), - (_, 0) => nix::sys::uio::pwrite(self.fd, self.buf.as_slice(), 0).map_err(|e| { + (_, 0) => nix::sys::uio::pwrite(self.fd, self.buf.written(), 0).map_err(|e| { error! {"fail to write to fuse device fd {}: {}", self.fd, e}; io::Error::other(format!("{}", e)) }), (_, _) => { - let bufs = [IoSlice::new(self.buf.as_slice()), IoSlice::new(o)]; + let bufs = [IoSlice::new(self.buf.written()), IoSlice::new(o)]; writev(self.fd, &bufs).map_err(|e| { error! {"fail to write to fuse device fd {}: {}", self.fd, e}; io::Error::other(format!("{}", e))