From f5e2e8861554d4dc9aef0471793ebf15701e4da0 Mon Sep 17 00:00:00 2001 From: Ryan Bottriell Date: Fri, 20 Mar 2026 10:50:12 -0700 Subject: [PATCH 1/5] Add blob cache config to FUSE config structs Introduce blob_cache_max_bytes (256 MiB) and blob_cache_max_single_bytes (64 MiB) in spfs::config::Fuse, spfs_vfs::fuse::Config, and wire them together in cmd_fuse so operators can tune cache behaviour from the spfs config file. Co-Authored-By: Claude Sonnet 4.6 --- crates/spfs-cli/cmd-fuse/src/cmd_fuse.rs | 2 ++ crates/spfs-vfs/src/fuse.rs | 11 +++++++++++ crates/spfs/src/config.rs | 23 +++++++++++++++++++++++ cspell.json | 1 + 4 files changed, 37 insertions(+) diff --git a/crates/spfs-cli/cmd-fuse/src/cmd_fuse.rs b/crates/spfs-cli/cmd-fuse/src/cmd_fuse.rs index c510bb42a7..a069e9a251 100644 --- a/crates/spfs-cli/cmd-fuse/src/cmd_fuse.rs +++ b/crates/spfs-cli/cmd-fuse/src/cmd_fuse.rs @@ -141,6 +141,8 @@ impl CmdFuse { remotes: Vec::new(), mount_options: required_opts.into_iter().collect(), include_secondary_tags: config.fuse.include_secondary_tags, + blob_cache_max_bytes: config.fuse.blob_cache_max_bytes, + blob_cache_max_single_bytes: config.fuse.blob_cache_max_single_bytes, }; let parsed_opts = parse_options_from_args(&self.options); diff --git a/crates/spfs-vfs/src/fuse.rs b/crates/spfs-vfs/src/fuse.rs index d8e61d3583..a100b5fffa 100644 --- a/crates/spfs-vfs/src/fuse.rs +++ b/crates/spfs-vfs/src/fuse.rs @@ -62,6 +62,17 @@ pub struct Config { /// Whether to have the tags in the secondary repos included in /// the lookup methods. pub include_secondary_tags: bool, + /// Maximum total bytes held in the in-memory remote blob cache. + /// + /// Remote blobs up to `blob_cache_max_single_bytes` in size are buffered + /// in memory so they can be read and seeked at arbitrary offsets. + pub blob_cache_max_bytes: usize, + /// Maximum size of a single remote blob that will be buffered in memory. + /// + /// Blobs larger than this are instead downloaded once into the local + /// repository so that future opens find them locally without a network + /// round-trip, and so they can be managed by `spfs clean`. + pub blob_cache_max_single_bytes: usize, } /// Handles the allocation of inodes, and async responses to all FUSE requests diff --git a/crates/spfs/src/config.rs b/crates/spfs/src/config.rs index 2d4e82d585..42ebb6de1e 100644 --- a/crates/spfs/src/config.rs +++ b/crates/spfs/src/config.rs @@ -71,6 +71,14 @@ const fn default_fuse_heartbeat_grace_period_seconds() -> NonZeroU64 { unsafe { NonZeroU64::new_unchecked(300) } } +const fn default_fuse_blob_cache_max_bytes() -> usize { + 256 * 1024 * 1024 // 256 MiB +} + +const fn default_fuse_blob_cache_max_single_bytes() -> usize { + 64 * 1024 * 1024 // 64 MiB +} + pub fn default_proxy_repo_include_secondary_tags() -> bool { true } @@ -542,6 +550,19 @@ pub struct Fuse { /// Whether to include tags from secondary repos in lookup methods #[serde(default = "default_proxy_repo_include_secondary_tags")] pub include_secondary_tags: bool, + /// Maximum total bytes for the FUSE in-memory remote blob cache. + /// + /// Remote blobs smaller than `blob_cache_max_single_bytes` are buffered + /// here so that they can be sought and read at arbitrary offsets. + /// Default: 256 MiB. + #[serde(default = "default_fuse_blob_cache_max_bytes")] + pub blob_cache_max_bytes: usize, + /// Maximum bytes for a single remote blob to be held in the in-memory + /// cache. Blobs larger than this threshold are instead downloaded once + /// to the local repository so that future opens skip the network entirely. + /// Default: 64 MiB. + #[serde(default = "default_fuse_blob_cache_max_single_bytes")] + pub blob_cache_max_single_bytes: usize, } impl Fuse { @@ -562,6 +583,8 @@ impl Default for Fuse { heartbeat_interval_seconds: default_fuse_heartbeat_interval_seconds(), heartbeat_grace_period_seconds: default_fuse_heartbeat_grace_period_seconds(), include_secondary_tags: default_proxy_repo_include_secondary_tags(), + blob_cache_max_bytes: default_fuse_blob_cache_max_bytes(), + blob_cache_max_single_bytes: default_fuse_blob_cache_max_single_bytes(), } } } diff --git a/cspell.json b/cspell.json index b70acdbb0e..87d65f2624 100644 --- a/cspell.json +++ b/cspell.json @@ -650,6 +650,7 @@ "sddl", "seccomp", "seekable", + "seeked", "serde", "setattr", "setcap", From 207650d336d9a6a35e8ccfcbb32980f2bdc6cd6d Mon Sep 17 00:00:00 2001 From: Ryan Bottriell Date: Fri, 20 Mar 2026 10:58:37 -0700 Subject: [PATCH 2/5] Fix remote-file seeking by buffering blobs in a local cache Remote blobs served through the FUSE BlobStream handle were read with positional offsets ignored, causing data corruption and segfaults for any application that used mmap or pread on a remote-backed file. Introduce a byte-bounded in-memory BlobCache (HashMap + VecDeque, insertion-order eviction) on the Filesystem. When a remote blob is opened: * If its size is within blob_cache_max_single_bytes (default 64 MiB) the payload is fully downloaded into Arc and inserted into the cache keyed by digest. Subsequent opens of the same blob share the Arc without re-downloading. The new BlobCached handle variant services read() with a direct slice at the requested offset and lseek() via arithmetic on the data length, making both operations correct and O(1). * If the blob is larger than the threshold it is committed to the local FS repository via commit_blob() so that it is tracked by the object graph and eligible for cleanup by spfs-clean. The local file is then opened as an ordinary seekable BlobFile. Future opens find the blob in the local FS repo before reaching the remote repo arm. The BlobStream fallback is retained only when no local FS repository is present in the stack, and the FOPEN_NONSEEKABLE | FOPEN_STREAM flags continue to mark those handles as non-seekable. Co-Authored-By: Claude Sonnet 4.6 --- crates/spfs-vfs/src/fuse.rs | 279 ++++++++++++++++++++++++++++++------ 1 file changed, 236 insertions(+), 43 deletions(-) diff --git a/crates/spfs-vfs/src/fuse.rs b/crates/spfs-vfs/src/fuse.rs index a100b5fffa..f17db9fa92 100644 --- a/crates/spfs-vfs/src/fuse.rs +++ b/crates/spfs-vfs/src/fuse.rs @@ -3,6 +3,8 @@ // https://github.com/spkenv/spk use std::collections::HashSet; +#[cfg(feature = "fuse-backend-abi-7-31")] +use std::collections::{HashMap, VecDeque}; use std::ffi::{OsStr, OsString}; use std::io::{Seek, SeekFrom}; use std::mem::ManuallyDrop; @@ -12,6 +14,8 @@ use std::os::unix::prelude::FileExt; #[cfg(feature = "fuse-backend-abi-7-31")] use std::pin::Pin; use std::sync::Arc; +#[cfg(feature = "fuse-backend-abi-7-31")] +use std::sync::Mutex; use std::sync::atomic::{AtomicU64, Ordering}; use std::time::{Duration, SystemTime}; @@ -75,6 +79,58 @@ pub struct Config { pub blob_cache_max_single_bytes: usize, } +/// Byte-bounded in-memory cache for remote blob payloads. +/// +/// Entries are evicted in insertion order (oldest first) once the total +/// byte count would exceed `max_bytes`. Multiple open handles to the same +/// blob share the same `Arc` without copying. +#[cfg(feature = "fuse-backend-abi-7-31")] +struct BlobCache { + /// Digest insertion order for eviction (front = oldest). + order: VecDeque, + data: HashMap>, + current_bytes: usize, + max_bytes: usize, +} + +#[cfg(feature = "fuse-backend-abi-7-31")] +impl BlobCache { + fn new(max_bytes: usize) -> Self { + Self { + order: VecDeque::new(), + data: HashMap::new(), + current_bytes: 0, + max_bytes, + } + } + + fn get(&self, digest: &spfs::encoding::Digest) -> Option> { + self.data.get(digest).cloned() + } + + /// Insert `bytes` under `digest`, evicting oldest entries to stay within + /// `max_bytes`. Returns a shared `Arc` to the stored slice. + fn insert(&mut self, digest: spfs::encoding::Digest, bytes: bytes::Bytes) -> Arc { + // A concurrent open may have already inserted this digest. + if let Some(existing) = self.data.get(&digest) { + return Arc::clone(existing); + } + let len = bytes.len(); + while !self.order.is_empty() && self.current_bytes + len > self.max_bytes { + if let Some(evicted_digest) = self.order.pop_front() + && let Some(evicted) = self.data.remove(&evicted_digest) + { + self.current_bytes = self.current_bytes.saturating_sub(evicted.len()); + } + } + let arc = Arc::new(bytes); + self.data.insert(digest, Arc::clone(&arc)); + self.order.push_back(digest); + self.current_bytes += len; + arc + } +} + /// Handles the allocation of inodes, and async responses to all FUSE requests struct Filesystem { repos: Vec>, @@ -86,6 +142,8 @@ struct Filesystem { inodes: DashMap>>, handles: DashMap, fs_creation_time: SystemTime, + #[cfg(feature = "fuse-backend-abi-7-31")] + blob_cache: Mutex, } impl Filesystem { @@ -100,6 +158,8 @@ impl Filesystem { manifest: Manifest, opts: Config, ) -> Self { + #[cfg(feature = "fuse-backend-abi-7-31")] + let blob_cache = Mutex::new(BlobCache::new(opts.blob_cache_max_bytes)); let fs = Self { repos, opts, @@ -111,6 +171,8 @@ impl Filesystem { inodes: Default::default(), handles: Default::default(), fs_creation_time: SystemTime::now(), + #[cfg(feature = "fuse-backend-abi-7-31")] + blob_cache, }; // pre-allocate inodes for all entries in the manifest let mut root = manifest.take_root(); @@ -391,6 +453,29 @@ impl Filesystem { let mut handle = None; #[allow(unused_mut)] let mut flags = FOPEN_KEEP_CACHE; + + // Fast path: serve from the in-memory blob cache without touching any + // repository. This is checked before the repo loop so that even the + // FS repo open() syscall is avoided on a cache hit. + #[cfg(feature = "fuse-backend-abi-7-31")] + { + let cached = self + .blob_cache + .lock() + .expect("blob cache lock poisoned") + .get(digest); + if let Some(data) = cached { + let fh = self.allocate_handle(Handle::BlobCached { + entry, + data, + current_offset: AtomicU64::new(0), + }); + tracing::trace!("open {ino} = {fh} [CACHE HIT]"); + reply.opened(fh, FOPEN_KEEP_CACHE); + return; + } + } + for repo in self.repos.iter() { match &**repo { spfs::storage::RepositoryHandle::FS(fs_repo) => { @@ -412,13 +497,70 @@ impl Filesystem { } #[cfg(feature = "fuse-backend-abi-7-31")] repo => match repo.open_payload(*digest).await { - Ok((stream, _)) => { - // TODO: try to leverage the returned file path? - handle = Some(Handle::BlobStream { - entry, - stream: tokio::sync::Mutex::new(stream), - }); - flags |= FOPEN_NONSEEKABLE | FOPEN_STREAM; + Ok((mut stream, _)) => { + let file_size = entry.size() as usize; + if file_size <= self.opts.blob_cache_max_single_bytes { + // Small enough to buffer in memory. Download the + // full payload and add it to the shared LRU cache. + let mut buf = Vec::with_capacity(file_size); + unwrap!(reply, stream.read_to_end(&mut buf).await); + let data = self + .blob_cache + .lock() + .expect("blob cache lock poisoned") + .insert(*digest, bytes::Bytes::from(buf)); + handle = Some(Handle::BlobCached { + entry, + data, + current_offset: AtomicU64::new(0), + }); + } else { + // Too large for the in-memory cache. Persist to + // the local FS repo so that future opens are served + // directly from disk, and spfs-clean can reclaim + // the space through the normal object lifecycle. + let local_fs = self.repos.iter().find_map(|r| { + if let spfs::storage::RepositoryHandle::FS(fs) = &**r { + Some(fs) + } else { + None + } + }); + if let Some(local_fs) = local_fs { + let opened = unwrap!(reply, local_fs.opened().await); + let stored = unwrap!(reply, opened.commit_blob(stream).await); + if stored != *digest { + err!( + reply, + spfs::Error::String(format!( + "Payload digest mismatch when caching large \ + blob: expected {digest}, got {stored}" + )) + ); + } + let payload_path = opened.payloads().build_digest_path(digest); + match std::fs::OpenOptions::new().read(true).open(&payload_path) { + Ok(file) => { + handle = Some(Handle::BlobFile { entry, file }); + } + Err(err) => err!(reply, err), + } + } else { + // No local FS repo in the stack — fall back to a + // non-seekable stream. This should be rare in + // practice. + tracing::warn!( + %digest, + "no local FS repository available to cache large \ + blob; falling back to non-seekable stream" + ); + handle = Some(Handle::BlobStream { + entry, + stream: tokio::sync::Mutex::new(stream), + }); + flags |= FOPEN_NONSEEKABLE | FOPEN_STREAM; + } + } break; } Err(err) if err.try_next_repo() => continue, @@ -496,6 +638,18 @@ impl Filesystem { reply.data(&buf[..consumed]); } #[cfg(feature = "fuse-backend-abi-7-31")] + Handle::BlobCached { data, .. } => { + let start = offset as usize; + let end = (start + size as usize).min(data.len()); + let slice = if start < data.len() { + &data[start..end] + } else { + &[] + }; + tracing::trace!("read {fh} = {}/{size} [CACHED]", slice.len()); + reply.data(slice); + } + #[cfg(feature = "fuse-backend-abi-7-31")] Handle::BlobStream { entry: _, stream } => { let mut stream = stream.lock().await; let mut buf = vec![0; size as usize]; @@ -644,58 +798,83 @@ impl Filesystem { } async fn lseek(&self, _ino: u64, fh: u64, offset: i64, whence: i32, reply: fuser::ReplyLseek) { - let Some(handle) = self.handles.get_mut(&fh) else { + let Some(handle) = self.handles.get(&fh) else { tracing::debug!("lseek {fh} = EBADF"); reply.error(libc::EBADF); return; }; - let file = match handle.value() { + match handle.value() { Handle::Tree { .. } => { tracing::debug!("lseek {fh} = EISDIR"); reply.error(libc::EISDIR); - return; } - Handle::BlobFile { entry: _, file } => file, + #[cfg(feature = "fuse-backend-abi-7-31")] + Handle::BlobCached { + data, + current_offset, + .. + } => { + let cur = current_offset.load(Ordering::Relaxed); + let file_len = data.len() as u64; + let new_offset = match whence { + libc::SEEK_SET => offset as u64, + libc::SEEK_CUR => (cur as i64 + offset) as u64, + libc::SEEK_END => (file_len as i64 + offset) as u64, + // Simplest valid implementation per the linux man page: + // treat the entire file as data with no holes. + libc::SEEK_HOLE => file_len, + libc::SEEK_DATA => offset as u64, + _ => { + tracing::debug!("lseek {fh} = EINVAL"); + reply.error(libc::EINVAL); + return; + } + }; + current_offset.store(new_offset, Ordering::Relaxed); + tracing::trace!("lseek {fh} = {new_offset} [CACHED]"); + reply.offset(new_offset as i64); + } #[cfg(feature = "fuse-backend-abi-7-31")] Handle::BlobStream { .. } => { tracing::warn!("FUSE should not allow seek calls on streams"); tracing::debug!("lseek {fh} = EINVAL"); reply.error(libc::EINVAL); - return; } - }; + Handle::BlobFile { entry: _, file } => { + let pos = match whence { + libc::SEEK_CUR => SeekFrom::Current(offset), + libc::SEEK_END => SeekFrom::End(offset), + libc::SEEK_SET => SeekFrom::Start(offset as u64), + + // From linux man pages: In the + // simplest implementation, a filesystem can support the operations + // by making SEEK_HOLE always return the offset of the end of the + // file, and making SEEK_DATA always return offset (i.e., even if + // the location referred to by offset is a hole, it can be + // considered to consist of data that is a sequence of zeros). + libc::SEEK_HOLE => SeekFrom::End(0), + libc::SEEK_DATA => SeekFrom::Start(offset as u64), + + _ => { + reply.error(libc::EINVAL); + return; + } + }; - let pos = match whence { - libc::SEEK_CUR => SeekFrom::Current(offset), - libc::SEEK_END => SeekFrom::End(offset), - libc::SEEK_SET => SeekFrom::Start(offset as u64), - - // From linux man pages: In the - // simplest implementation, a filesystem can support the operations - // by making SEEK_HOLE always return the offset of the end of the - // file, and making SEEK_DATA always return offset (i.e., even if - // the location referred to by offset is a hole, it can be - // considered to consist of data that is a sequence of zeros). - libc::SEEK_HOLE => SeekFrom::End(0), - libc::SEEK_DATA => SeekFrom::Start(offset as u64), - - _ => { - reply.error(libc::EINVAL); - return; + // Safety: the fd must be valid and open, which we know. We also + // know that the file will live for the lifetime of this function + // and so can create a copy of it safely for use before that rather + // than duplicating it or using some kind of lock. + let f = unsafe { std::fs::File::from_raw_fd(file.as_raw_fd()) }; + // file takes ownership of the handle, but we need to make sure + // it is not closed since it's a copy of the File that remains alive + let mut f = ManuallyDrop::new(f); + let new_offset = unwrap!(reply, f.seek(pos)); + tracing::trace!("lseek {fh} = {new_offset} [FILE]"); + reply.offset(new_offset as i64); } - }; - - // Safety: the fd must be valid and open, which we know. We also - // know that the file will live for the livetime of this function - // and so can create a copy of it safely for use before that rather - // than duplicating it or using some kind of lock - let f = unsafe { std::fs::File::from_raw_fd(file.as_raw_fd()) }; - // file takes ownership of the handle, but we need to make sure - // it is not closed since it's a copy of the File that remains alive - let mut f = ManuallyDrop::new(f); - let new_offset = unwrap!(reply, f.seek(pos)); - reply.offset(new_offset as i64); + } } } @@ -1037,7 +1216,19 @@ enum Handle { file: std::fs::File, }, #[cfg(feature = "fuse-backend-abi-7-31")] - // A handle to an opaque file stream that can only be read once + /// A remote blob fully buffered in memory, allowing arbitrary seeks and + /// random-access reads without a "current position" in the FUSE sense + /// (FUSE read() always supplies an explicit absolute offset). The + /// `current_offset` field is only maintained for SEEK_CUR lseek calls. + BlobCached { + entry: Arc>, + data: Arc, + current_offset: AtomicU64, + }, + #[cfg(feature = "fuse-backend-abi-7-31")] + /// A handle to an opaque file stream that can only be read once. + /// Only used as a fallback when no local FS repo is available and the + /// file is too large for the in-memory cache. BlobStream { entry: Arc>, // TODO: we should avoid the tokio mutex at all costs, @@ -1055,6 +1246,8 @@ impl Handle { match self { Self::BlobFile { entry, .. } => Arc::clone(entry), #[cfg(feature = "fuse-backend-abi-7-31")] + Self::BlobCached { entry, .. } => Arc::clone(entry), + #[cfg(feature = "fuse-backend-abi-7-31")] Self::BlobStream { entry, .. } => Arc::clone(entry), Self::Tree { entry } => Arc::clone(entry), } From 473b00fe48a8a3bf202bd866f54a41dff3c7ed28 Mon Sep 17 00:00:00 2001 From: Ryan Bottriell Date: Fri, 20 Mar 2026 12:14:53 -0700 Subject: [PATCH 3/5] Lazily promote large remote blobs instead of eagerly downloading on open Replace the eager commit_blob() call in open() for large remote files with a new BlobRemote handle variant that streams sequentially from the remote and only downloads to the local FS repository the first time a non-sequential read or meaningful seek is detected. Sequential reads never touch disk; only files that are actually seeked incur the one-time promotion cost. If no local FS repo is present the existing BlobStream (non-seekable) fallback is retained. Co-Authored-By: Claude Sonnet 4.6 --- crates/spfs-vfs/src/fuse.rs | 255 +++++++++++++++++++++++++++++++----- 1 file changed, 222 insertions(+), 33 deletions(-) diff --git a/crates/spfs-vfs/src/fuse.rs b/crates/spfs-vfs/src/fuse.rs index f17db9fa92..91cdb7d95e 100644 --- a/crates/spfs-vfs/src/fuse.rs +++ b/crates/spfs-vfs/src/fuse.rs @@ -515,40 +515,28 @@ impl Filesystem { current_offset: AtomicU64::new(0), }); } else { - // Too large for the in-memory cache. Persist to - // the local FS repo so that future opens are served - // directly from disk, and spfs-clean can reclaim - // the space through the normal object lifecycle. - let local_fs = self.repos.iter().find_map(|r| { - if let spfs::storage::RepositoryHandle::FS(fs) = &**r { - Some(fs) - } else { - None - } - }); - if let Some(local_fs) = local_fs { - let opened = unwrap!(reply, local_fs.opened().await); - let stored = unwrap!(reply, opened.commit_blob(stream).await); - if stored != *digest { - err!( - reply, - spfs::Error::String(format!( - "Payload digest mismatch when caching large \ - blob: expected {digest}, got {stored}" - )) - ); - } - let payload_path = opened.payloads().build_digest_path(digest); - match std::fs::OpenOptions::new().read(true).open(&payload_path) { - Ok(file) => { - handle = Some(Handle::BlobFile { entry, file }); - } - Err(err) => err!(reply, err), - } + // Too large for the in-memory cache. Create a lazy + // handle: sequential reads stream directly from the + // remote; the first non-sequential read or seek + // triggers a one-time download into the local FS + // repository. If no local FS repo is available we + // fall back to a non-seekable stream. + let has_local_fs = self + .repos + .iter() + .any(|r| matches!(&**r, spfs::storage::RepositoryHandle::FS(_))); + if has_local_fs { + let owned_digest = *digest; + handle = Some(Handle::BlobRemote { + entry, + digest: owned_digest, + inner: tokio::sync::Mutex::new(BlobRemoteInner::Streaming { + stream, + position: 0, + }), + }); + flags = FOPEN_STREAM; } else { - // No local FS repo in the stack — fall back to a - // non-seekable stream. This should be rare in - // practice. tracing::warn!( %digest, "no local FS repository available to cache large \ @@ -650,6 +638,68 @@ impl Filesystem { reply.data(slice); } #[cfg(feature = "fuse-backend-abi-7-31")] + Handle::BlobRemote { + entry: _, + digest, + inner, + } => { + let mut guard = inner.lock().await; + let read_offset = offset as u64; + + // Promote to a local file on the first non-sequential access. + let needs_promotion = if let BlobRemoteInner::Streaming { position, .. } = &*guard { + *position != read_offset + } else { + false + }; + if needs_promotion { + tracing::debug!( + fh, + read_offset, + "non-sequential read on remote stream, promoting to local file" + ); + unwrap!( + reply, + self.promote_remote_to_local(&mut guard, digest).await + ); + } + + match &mut *guard { + BlobRemoteInner::Streaming { stream, position } => { + let mut buf = vec![0; size as usize]; + let mut consumed = 0; + while consumed < size as usize { + let count = unwrap!(reply, stream.read(&mut buf[consumed..]).await); + consumed += count; + if count == 0 { + break; + } + } + *position += consumed as u64; + tracing::trace!("read {fh} = {consumed}/{size} [REMOTE STREAM]"); + reply.data(&buf[..consumed]); + } + BlobRemoteInner::Local(file) => { + let f = unsafe { std::fs::File::from_raw_fd(file.as_raw_fd()) }; + let f = ManuallyDrop::new(f); + let mut buf = vec![0; size as usize]; + let mut consumed = 0; + while consumed < size as usize { + let count = unwrap!( + reply, + f.read_at(&mut buf[consumed..], consumed as u64 + read_offset) + ); + consumed += count; + if count == 0 { + break; + } + } + tracing::trace!("read {fh} = {consumed}/{size} [REMOTE LOCAL]"); + reply.data(&buf[..consumed]); + } + } + } + #[cfg(feature = "fuse-backend-abi-7-31")] Handle::BlobStream { entry: _, stream } => { let mut stream = stream.lock().await; let mut buf = vec![0; size as usize]; @@ -836,6 +886,53 @@ impl Filesystem { reply.offset(new_offset as i64); } #[cfg(feature = "fuse-backend-abi-7-31")] + Handle::BlobRemote { + entry, + digest: _, + inner, + } => { + let file_len = entry.size(); + let mut guard = inner.lock().await; + let new_offset = match &mut *guard { + BlobRemoteInner::Streaming { position, .. } => { + let cur = *position; + let new = match whence { + libc::SEEK_SET => offset as u64, + libc::SEEK_CUR => (cur as i64 + offset) as u64, + libc::SEEK_END => (file_len as i64 + offset) as u64, + libc::SEEK_HOLE => file_len, + libc::SEEK_DATA => offset as u64, + _ => { + tracing::debug!("lseek {fh} = EINVAL"); + reply.error(libc::EINVAL); + return; + } + }; + *position = new; + new + } + BlobRemoteInner::Local(file) => { + let pos = match whence { + libc::SEEK_CUR => SeekFrom::Current(offset), + libc::SEEK_END => SeekFrom::End(offset), + libc::SEEK_SET => SeekFrom::Start(offset as u64), + libc::SEEK_HOLE => SeekFrom::End(0), + libc::SEEK_DATA => SeekFrom::Start(offset as u64), + _ => { + tracing::debug!("lseek {fh} = EINVAL"); + reply.error(libc::EINVAL); + return; + } + }; + let f = unsafe { std::fs::File::from_raw_fd(file.as_raw_fd()) }; + let mut f = ManuallyDrop::new(f); + unwrap!(reply, f.seek(pos)) + } + }; + tracing::trace!("lseek {fh} = {new_offset} [REMOTE]"); + reply.offset(new_offset as i64); + } + #[cfg(feature = "fuse-backend-abi-7-31")] Handle::BlobStream { .. } => { tracing::warn!("FUSE should not allow seek calls on streams"); tracing::debug!("lseek {fh} = EINVAL"); @@ -876,6 +973,70 @@ impl Filesystem { } } } + + /// Download a large remote blob to the local FS repository and replace + /// the streaming handle state with a `Local` file handle. + /// + /// Idempotent: if the blob is already on disk (either from a previous + /// promotion or because the FS repo already had it), only the file open + /// is performed. + #[cfg(feature = "fuse-backend-abi-7-31")] + async fn promote_remote_to_local( + &self, + guard: &mut tokio::sync::MutexGuard<'_, BlobRemoteInner>, + digest: &spfs::encoding::Digest, + ) -> spfs::Result<()> { + if matches!(**guard, BlobRemoteInner::Local(_)) { + return Ok(()); + } + + let local_fs = self.repos.iter().find_map(|r| { + if let spfs::storage::RepositoryHandle::FS(fs) = &**r { + Some(fs) + } else { + None + } + }); + let local_fs = local_fs.ok_or_else(|| { + spfs::Error::String("no local FS repository available to cache large blob".into()) + })?; + + let opened = local_fs.opened().await?; + let payload_path = opened.payloads().build_digest_path(digest); + + // Only download if the payload is not already present on disk. + if std::fs::metadata(&payload_path).is_err() { + tracing::debug!(%digest, "promoting remote blob to local FS repository"); + let mut fresh_stream = None; + for repo in self.repos.iter() { + if matches!(&**repo, spfs::storage::RepositoryHandle::FS(_)) { + continue; + } + match repo.open_payload(*digest).await { + Ok((s, _)) => { + fresh_stream = Some(s); + break; + } + Err(err) if err.try_next_repo() => continue, + Err(err) => return Err(err), + } + } + let stream = fresh_stream.ok_or(spfs::Error::UnknownObject(*digest))?; + let stored = opened.commit_blob(stream).await?; + if stored != *digest { + return Err(spfs::Error::String(format!( + "payload digest mismatch when caching blob: expected {digest}, got {stored}" + ))); + } + } + + let file = std::fs::OpenOptions::new() + .read(true) + .open(&payload_path) + .map_err(|e| spfs::Error::String(format!("failed to open cached payload: {e}")))?; + **guard = BlobRemoteInner::Local(file); + Ok(()) + } } /// Represents a connected FUSE session. @@ -1209,6 +1370,22 @@ impl fuser::Filesystem for Session { } } +/// State for a lazily-promoted large remote blob. +/// +/// Starts as `Streaming` (sequential reads from the remote) and transitions to +/// `Local` on the first non-sequential read or meaningful lseek. +#[cfg(feature = "fuse-backend-abi-7-31")] +enum BlobRemoteInner { + Streaming { + stream: Pin>, + /// Byte offset of the next byte the stream will yield. + position: u64, + }, + /// The blob has been downloaded to the local FS repository and is + /// accessible as a regular file. + Local(std::fs::File), +} + enum Handle { /// A handle to real file on disk that can be seek'd, etc. BlobFile { @@ -1226,6 +1403,16 @@ enum Handle { current_offset: AtomicU64, }, #[cfg(feature = "fuse-backend-abi-7-31")] + /// A large remote blob served lazily: sequential reads stream directly + /// from the remote; the first non-sequential read or meaningful seek + /// promotes the blob to the local FS repository so all subsequent + /// accesses are from disk. + BlobRemote { + entry: Arc>, + digest: spfs::encoding::Digest, + inner: tokio::sync::Mutex, + }, + #[cfg(feature = "fuse-backend-abi-7-31")] /// A handle to an opaque file stream that can only be read once. /// Only used as a fallback when no local FS repo is available and the /// file is too large for the in-memory cache. @@ -1248,6 +1435,8 @@ impl Handle { #[cfg(feature = "fuse-backend-abi-7-31")] Self::BlobCached { entry, .. } => Arc::clone(entry), #[cfg(feature = "fuse-backend-abi-7-31")] + Self::BlobRemote { entry, .. } => Arc::clone(entry), + #[cfg(feature = "fuse-backend-abi-7-31")] Self::BlobStream { entry, .. } => Arc::clone(entry), Self::Tree { entry } => Arc::clone(entry), } From 3d613e992cd3b3a64c6e97f22d4f0908451aeb34 Mon Sep 17 00:00:00 2001 From: Ryan Bottriell Date: Fri, 20 Mar 2026 13:03:46 -0700 Subject: [PATCH 4/5] Add unit tests for BlobCache eviction and lazy-promotion state machine Tests cover: - BlobCache insert/get, LRU eviction, and Arc sharing for duplicate inserts - promote_remote_to_local succeeds when the payload already exists on disk - promote_remote_to_local is idempotent when the handle is already Local - promote_remote_to_local returns an error when no local FS repo is present Also adds tempfile and tokio/macros to dev-dependencies to support async tests. Co-Authored-By: Claude Sonnet 4.6 --- Cargo.lock | 1 + crates/spfs-vfs/Cargo.toml | 4 + crates/spfs-vfs/src/fuse.rs | 174 ++++++++++++++++++++++++++++++++++++ 3 files changed, 179 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index f8215976c0..ec800336a0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4154,6 +4154,7 @@ dependencies = [ "prost", "protobuf-src", "spfs", + "tempfile", "thiserror 1.0.69", "tokio", "tonic", diff --git a/crates/spfs-vfs/Cargo.toml b/crates/spfs-vfs/Cargo.toml index 04cc355365..b62627b232 100644 --- a/crates/spfs-vfs/Cargo.toml +++ b/crates/spfs-vfs/Cargo.toml @@ -83,6 +83,10 @@ windows = { workspace = true, optional = true, features = [ "Win32_System_Diagnostics_ToolHelp", ] } +[dev-dependencies] +tempfile = { workspace = true } +tokio = { workspace = true, features = ["macros"] } + [build-dependencies] protobuf-src = { version = "1.0.5", optional = true } # protoc @ 3.19.3 tonic-build = { workspace = true } diff --git a/crates/spfs-vfs/src/fuse.rs b/crates/spfs-vfs/src/fuse.rs index 91cdb7d95e..ce4340e111 100644 --- a/crates/spfs-vfs/src/fuse.rs +++ b/crates/spfs-vfs/src/fuse.rs @@ -1442,3 +1442,177 @@ impl Handle { } } } + +#[cfg(test)] +#[cfg(feature = "fuse-backend-abi-7-31")] +mod tests { + use std::sync::Arc; + + use spfs::tracking::Manifest; + + use super::*; + + // ── helpers ────────────────────────────────────────────────────────────── + + fn make_config() -> Config { + Config { + root_mode: 0o755 | libc::S_IFDIR as u32, + uid: nix::unistd::Uid::current(), + gid: nix::unistd::Gid::current(), + mount_options: Default::default(), + remotes: vec![], + include_secondary_tags: false, + blob_cache_max_bytes: 64 * 1024 * 1024, + blob_cache_max_single_bytes: 8 * 1024 * 1024, + } + } + + fn make_filesystem(repos: Vec>) -> Filesystem { + Filesystem::new(repos, Manifest::default(), make_config()) + } + + /// Convenience: create a `Digest` from a single repeated byte (for distinct + /// test digests without computing real hashes). + fn fake_digest(byte: u8) -> spfs::encoding::Digest { + spfs::encoding::Digest::from_bytes(&[byte; spfs::encoding::DIGEST_SIZE]).unwrap() + } + + // ── BlobCache ───────────────────────────────────────────────────────────── + + #[test] + fn blob_cache_insert_and_get() { + let mut cache = BlobCache::new(1024); + let digest = fake_digest(1); + let bytes = bytes::Bytes::from_static(b"hello cache"); + let stored = cache.insert(digest, bytes.clone()); + assert_eq!(*stored, bytes); + let hit = cache.get(&digest).expect("digest should be cached"); + assert_eq!(*hit, bytes); + } + + #[test] + fn blob_cache_miss_on_empty() { + let cache = BlobCache::new(1024); + assert!(cache.get(&fake_digest(42)).is_none()); + } + + #[test] + fn blob_cache_evicts_oldest_when_full() { + // Cap at 20 bytes; each entry is 10 bytes, so the third insert must + // evict the first. + let mut cache = BlobCache::new(20); + let (d1, d2, d3) = (fake_digest(1), fake_digest(2), fake_digest(3)); + cache.insert(d1, bytes::Bytes::from(vec![0u8; 10])); + cache.insert(d2, bytes::Bytes::from(vec![0u8; 10])); + cache.insert(d3, bytes::Bytes::from(vec![0u8; 10])); // evicts d1 + + assert!(cache.get(&d1).is_none(), "oldest entry should be evicted"); + assert!(cache.get(&d2).is_some()); + assert!(cache.get(&d3).is_some()); + } + + #[test] + fn blob_cache_duplicate_insert_returns_same_arc() { + let mut cache = BlobCache::new(1024); + let digest = fake_digest(5); + let a1 = cache.insert(digest, bytes::Bytes::from_static(b"data")); + let a2 = cache.insert(digest, bytes::Bytes::from_static(b"data")); + assert!( + Arc::ptr_eq(&a1, &a2), + "duplicate insert should reuse the existing Arc" + ); + } + + // ── promote_remote_to_local ─────────────────────────────────────────────── + + /// Helper: commit `content` to a freshly created FS repo and return the + /// repo together with the blob's digest. + async fn create_repo_with_blob( + path: &std::path::Path, + content: &[u8], + ) -> ( + spfs::storage::fs::MaybeOpenFsRepository, + spfs::encoding::Digest, + ) { + let repo = spfs::storage::fs::MaybeOpenFsRepository::create(path) + .await + .unwrap(); + let stream: Pin> = Box::pin(std::io::Cursor::new(content.to_vec())); + let opened = repo.opened().await.unwrap(); + let digest = opened.commit_blob(stream).await.unwrap(); + drop(opened); + (repo, digest) + } + + #[tokio::test] + async fn promote_uses_existing_on_disk_payload() { + // If the blob is already present in the local FS repository (e.g. from + // a prior mount), promote_remote_to_local must succeed without + // attempting a remote download and must transition the guard to Local. + let tmpdir = tempfile::tempdir().unwrap(); + let content = b"payload for promotion test"; + let (repo, digest) = create_repo_with_blob(tmpdir.path(), content).await; + + let repos = vec![Arc::new(spfs::storage::RepositoryHandle::FS(repo))]; + let fs = make_filesystem(repos); + + let cursor: Pin> = Box::pin(std::io::Cursor::new(content.to_vec())); + let mutex = tokio::sync::Mutex::new(BlobRemoteInner::Streaming { + stream: cursor, + position: 0, + }); + let mut guard = mutex.lock().await; + + fs.promote_remote_to_local(&mut guard, &digest) + .await + .expect("promotion should succeed when payload is already on disk"); + + assert!( + matches!(*guard, BlobRemoteInner::Local(_)), + "guard should be Local after promotion" + ); + } + + #[tokio::test] + async fn promote_idempotent_when_already_local() { + // A guard that is already in the Local state must be a no-op. + let tmpdir = tempfile::tempdir().unwrap(); + let path = tmpdir.path().join("dummy"); + std::fs::write(&path, b"x").unwrap(); + let file = std::fs::File::open(&path).unwrap(); + + let mutex = tokio::sync::Mutex::new(BlobRemoteInner::Local(file)); + let mut guard = mutex.lock().await; + + // Repos list is empty — if promote tried to do real work it would error. + let fs = make_filesystem(vec![]); + let digest = fake_digest(7); + + fs.promote_remote_to_local(&mut guard, &digest) + .await + .expect("promote on already-Local handle should succeed"); + assert!(matches!(*guard, BlobRemoteInner::Local(_))); + } + + #[tokio::test] + async fn promote_fails_without_local_fs_repo() { + // Without any FS repository in the stack, promotion cannot write the + // blob and must return an error rather than panic. + let fs = make_filesystem(vec![]); + let digest = fake_digest(8); + + let cursor: Pin> = Box::pin(std::io::Cursor::new(vec![])); + let mutex = tokio::sync::Mutex::new(BlobRemoteInner::Streaming { + stream: cursor, + position: 0, + }); + let mut guard = mutex.lock().await; + + assert!( + fs.promote_remote_to_local(&mut guard, &digest) + .await + .is_err(), + "promotion should fail when no local FS repository is configured" + ); + } +} From 6e5ac0e10dd25f10aa5cd13d8df2fc111a664c47 Mon Sep 17 00:00:00 2001 From: Ryan Bottriell Date: Fri, 20 Mar 2026 15:22:23 -0700 Subject: [PATCH 5/5] Merge BlobStream into BlobRemote and fix lseek on streaming handles Remove the BlobStream fallback variant (used when no local FS repo was available) and always use BlobRemote with FOPEN_STREAM. If promotion fails (e.g. no local FS repo), the handler returns EIO rather than silently serving ESPIPE via FOPEN_NONSEEKABLE. Fix a bug in the lseek handler for BlobRemote in Streaming state: the old code updated the tracked position arithmetically without moving the stream cursor, so a subsequent read at the new offset would incorrectly appear sequential and return bytes from the wrong position. The new logic answers SEEK_HOLE/SEEK_DATA trivially, is a no-op when the position is unchanged, and promotes to a local file before performing any position-changing seek so the stream cursor is always consistent. Rename BlobRemoteInner::Streaming.position to stream_pos for clarity. Co-Authored-By: Claude Opus 4.6 --- crates/spfs-vfs/src/fuse.rs | 181 ++++++++++++++++++------------------ 1 file changed, 92 insertions(+), 89 deletions(-) diff --git a/crates/spfs-vfs/src/fuse.rs b/crates/spfs-vfs/src/fuse.rs index ce4340e111..cac2d6c1de 100644 --- a/crates/spfs-vfs/src/fuse.rs +++ b/crates/spfs-vfs/src/fuse.rs @@ -515,39 +515,20 @@ impl Filesystem { current_offset: AtomicU64::new(0), }); } else { - // Too large for the in-memory cache. Create a lazy - // handle: sequential reads stream directly from the - // remote; the first non-sequential read or seek - // triggers a one-time download into the local FS - // repository. If no local FS repo is available we - // fall back to a non-seekable stream. - let has_local_fs = self - .repos - .iter() - .any(|r| matches!(&**r, spfs::storage::RepositoryHandle::FS(_))); - if has_local_fs { - let owned_digest = *digest; - handle = Some(Handle::BlobRemote { - entry, - digest: owned_digest, - inner: tokio::sync::Mutex::new(BlobRemoteInner::Streaming { - stream, - position: 0, - }), - }); - flags = FOPEN_STREAM; - } else { - tracing::warn!( - %digest, - "no local FS repository available to cache large \ - blob; falling back to non-seekable stream" - ); - handle = Some(Handle::BlobStream { - entry, - stream: tokio::sync::Mutex::new(stream), - }); - flags |= FOPEN_NONSEEKABLE | FOPEN_STREAM; - } + // Too large for the in-memory cache. Stream + // directly from the remote; the first + // non-sequential read or seek triggers a one-time + // download into the local FS repository. + let owned_digest = *digest; + handle = Some(Handle::BlobRemote { + entry, + digest: owned_digest, + inner: tokio::sync::Mutex::new(BlobRemoteInner::Streaming { + stream, + stream_pos: 0, + }), + }); + flags = FOPEN_STREAM; } break; } @@ -647,8 +628,9 @@ impl Filesystem { let read_offset = offset as u64; // Promote to a local file on the first non-sequential access. - let needs_promotion = if let BlobRemoteInner::Streaming { position, .. } = &*guard { - *position != read_offset + let needs_promotion = if let BlobRemoteInner::Streaming { stream_pos, .. } = &*guard + { + *stream_pos != read_offset } else { false }; @@ -665,7 +647,7 @@ impl Filesystem { } match &mut *guard { - BlobRemoteInner::Streaming { stream, position } => { + BlobRemoteInner::Streaming { stream, stream_pos } => { let mut buf = vec![0; size as usize]; let mut consumed = 0; while consumed < size as usize { @@ -675,7 +657,7 @@ impl Filesystem { break; } } - *position += consumed as u64; + *stream_pos += consumed as u64; tracing::trace!("read {fh} = {consumed}/{size} [REMOTE STREAM]"); reply.data(&buf[..consumed]); } @@ -699,22 +681,6 @@ impl Filesystem { } } } - #[cfg(feature = "fuse-backend-abi-7-31")] - Handle::BlobStream { entry: _, stream } => { - let mut stream = stream.lock().await; - let mut buf = vec![0; size as usize]; - let mut consumed = 0; - while consumed < size as usize { - let count = unwrap!(reply, stream.read(&mut buf[consumed..]).await); - consumed += count; - if count == 0 { - // the end of the file has been reached - break; - } - } - tracing::trace!("read {fh} = {consumed}/{size} [STREAM]"); - reply.data(&buf[..consumed]); - } }; } @@ -888,29 +854,82 @@ impl Filesystem { #[cfg(feature = "fuse-backend-abi-7-31")] Handle::BlobRemote { entry, - digest: _, + digest, inner, } => { let file_len = entry.size(); let mut guard = inner.lock().await; - let new_offset = match &mut *guard { - BlobRemoteInner::Streaming { position, .. } => { - let cur = *position; - let new = match whence { + + // For the Streaming state, handle trivial (non-position- + // changing) seeks without touching the stream, and compute + // a target position for all position-changing seeks so we can + // promote before the borrow is held. + let needs_promotion_to = + if let BlobRemoteInner::Streaming { stream_pos, .. } = &*guard { + // SEEK_HOLE / SEEK_DATA: answer without moving the stream. + if whence == libc::SEEK_HOLE { + tracing::trace!("lseek {fh} = {file_len} [REMOTE STREAM SEEK_HOLE]"); + reply.offset(file_len as i64); + return; + } + if whence == libc::SEEK_DATA { + tracing::trace!("lseek {fh} = {offset} [REMOTE STREAM SEEK_DATA]"); + reply.offset(offset); + return; + } + let new_pos = match whence { libc::SEEK_SET => offset as u64, - libc::SEEK_CUR => (cur as i64 + offset) as u64, + libc::SEEK_CUR => (*stream_pos as i64 + offset) as u64, libc::SEEK_END => (file_len as i64 + offset) as u64, - libc::SEEK_HOLE => file_len, - libc::SEEK_DATA => offset as u64, _ => { tracing::debug!("lseek {fh} = EINVAL"); reply.error(libc::EINVAL); return; } }; - *position = new; - new + if new_pos == *stream_pos { + // No-op: position unchanged, stream is still valid. + tracing::trace!("lseek {fh} = {new_pos} [REMOTE STREAM noop]"); + reply.offset(new_pos as i64); + return; + } + // Position is changing: we can't rewind the stream, + // so promote to a local file first. + Some(new_pos) + } else { + None + }; + + if let Some(new_pos) = needs_promotion_to { + tracing::debug!( + fh, + new_pos, + "seek requires rewind on remote stream, promoting to local file" + ); + unwrap!( + reply, + self.promote_remote_to_local(&mut guard, digest).await + ); + // Promotion succeeded; guard is now Local — seek to target. + match &mut *guard { + BlobRemoteInner::Local(file) => { + let f = unsafe { std::fs::File::from_raw_fd(file.as_raw_fd()) }; + let mut f = ManuallyDrop::new(f); + let new_offset = unwrap!(reply, f.seek(SeekFrom::Start(new_pos))); + tracing::trace!("lseek {fh} = {new_offset} [REMOTE PROMOTED]"); + reply.offset(new_offset as i64); + } + BlobRemoteInner::Streaming { .. } => { + unreachable!("promote_remote_to_local must transition to Local state"); + } } + return; + } + + // Guard is already Local (the Streaming branch above either + // returned early or set needs_promotion_to, so reaching here + // means guard is Local). + match &mut *guard { BlobRemoteInner::Local(file) => { let pos = match whence { libc::SEEK_CUR => SeekFrom::Current(offset), @@ -926,17 +945,14 @@ impl Filesystem { }; let f = unsafe { std::fs::File::from_raw_fd(file.as_raw_fd()) }; let mut f = ManuallyDrop::new(f); - unwrap!(reply, f.seek(pos)) + let new_offset = unwrap!(reply, f.seek(pos)); + tracing::trace!("lseek {fh} = {new_offset} [REMOTE LOCAL]"); + reply.offset(new_offset as i64); } - }; - tracing::trace!("lseek {fh} = {new_offset} [REMOTE]"); - reply.offset(new_offset as i64); - } - #[cfg(feature = "fuse-backend-abi-7-31")] - Handle::BlobStream { .. } => { - tracing::warn!("FUSE should not allow seek calls on streams"); - tracing::debug!("lseek {fh} = EINVAL"); - reply.error(libc::EINVAL); + BlobRemoteInner::Streaming { .. } => { + unreachable!("Streaming state must have been handled above"); + } + } } Handle::BlobFile { entry: _, file } => { let pos = match whence { @@ -1379,7 +1395,7 @@ enum BlobRemoteInner { Streaming { stream: Pin>, /// Byte offset of the next byte the stream will yield. - position: u64, + stream_pos: u64, }, /// The blob has been downloaded to the local FS repository and is /// accessible as a regular file. @@ -1412,17 +1428,6 @@ enum Handle { digest: spfs::encoding::Digest, inner: tokio::sync::Mutex, }, - #[cfg(feature = "fuse-backend-abi-7-31")] - /// A handle to an opaque file stream that can only be read once. - /// Only used as a fallback when no local FS repo is available and the - /// file is too large for the in-memory cache. - BlobStream { - entry: Arc>, - // TODO: we should avoid the tokio mutex at all costs, - // but we need a mutable reference to this BlobRead and - // need to hold it across an await (for reading from the stream) - stream: tokio::sync::Mutex>>, - }, Tree { entry: Arc>, }, @@ -1436,8 +1441,6 @@ impl Handle { Self::BlobCached { entry, .. } => Arc::clone(entry), #[cfg(feature = "fuse-backend-abi-7-31")] Self::BlobRemote { entry, .. } => Arc::clone(entry), - #[cfg(feature = "fuse-backend-abi-7-31")] - Self::BlobStream { entry, .. } => Arc::clone(entry), Self::Tree { entry } => Arc::clone(entry), } } @@ -1559,7 +1562,7 @@ mod tests { let cursor: Pin> = Box::pin(std::io::Cursor::new(content.to_vec())); let mutex = tokio::sync::Mutex::new(BlobRemoteInner::Streaming { stream: cursor, - position: 0, + stream_pos: 0, }); let mut guard = mutex.lock().await; @@ -1604,7 +1607,7 @@ mod tests { let cursor: Pin> = Box::pin(std::io::Cursor::new(vec![])); let mutex = tokio::sync::Mutex::new(BlobRemoteInner::Streaming { stream: cursor, - position: 0, + stream_pos: 0, }); let mut guard = mutex.lock().await;