diff --git a/Cargo.lock b/Cargo.lock index f8215976c0..ec800336a0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4154,6 +4154,7 @@ dependencies = [ "prost", "protobuf-src", "spfs", + "tempfile", "thiserror 1.0.69", "tokio", "tonic", diff --git a/crates/spfs-cli/cmd-fuse/src/cmd_fuse.rs b/crates/spfs-cli/cmd-fuse/src/cmd_fuse.rs index c510bb42a7..a069e9a251 100644 --- a/crates/spfs-cli/cmd-fuse/src/cmd_fuse.rs +++ b/crates/spfs-cli/cmd-fuse/src/cmd_fuse.rs @@ -141,6 +141,8 @@ impl CmdFuse { remotes: Vec::new(), mount_options: required_opts.into_iter().collect(), include_secondary_tags: config.fuse.include_secondary_tags, + blob_cache_max_bytes: config.fuse.blob_cache_max_bytes, + blob_cache_max_single_bytes: config.fuse.blob_cache_max_single_bytes, }; let parsed_opts = parse_options_from_args(&self.options); diff --git a/crates/spfs-vfs/Cargo.toml b/crates/spfs-vfs/Cargo.toml index 04cc355365..b62627b232 100644 --- a/crates/spfs-vfs/Cargo.toml +++ b/crates/spfs-vfs/Cargo.toml @@ -83,6 +83,10 @@ windows = { workspace = true, optional = true, features = [ "Win32_System_Diagnostics_ToolHelp", ] } +[dev-dependencies] +tempfile = { workspace = true } +tokio = { workspace = true, features = ["macros"] } + [build-dependencies] protobuf-src = { version = "1.0.5", optional = true } # protoc @ 3.19.3 tonic-build = { workspace = true } diff --git a/crates/spfs-vfs/src/fuse.rs b/crates/spfs-vfs/src/fuse.rs index d8e61d3583..cac2d6c1de 100644 --- a/crates/spfs-vfs/src/fuse.rs +++ b/crates/spfs-vfs/src/fuse.rs @@ -3,6 +3,8 @@ // https://github.com/spkenv/spk use std::collections::HashSet; +#[cfg(feature = "fuse-backend-abi-7-31")] +use std::collections::{HashMap, VecDeque}; use std::ffi::{OsStr, OsString}; use std::io::{Seek, SeekFrom}; use std::mem::ManuallyDrop; @@ -12,6 +14,8 @@ use std::os::unix::prelude::FileExt; #[cfg(feature = "fuse-backend-abi-7-31")] use std::pin::Pin; use std::sync::Arc; +#[cfg(feature = "fuse-backend-abi-7-31")] +use std::sync::Mutex; use std::sync::atomic::{AtomicU64, Ordering}; use std::time::{Duration, SystemTime}; @@ -62,6 +66,69 @@ pub struct Config { /// Whether to have the tags in the secondary repos included in /// the lookup methods. pub include_secondary_tags: bool, + /// Maximum total bytes held in the in-memory remote blob cache. + /// + /// Remote blobs up to `blob_cache_max_single_bytes` in size are buffered + /// in memory so they can be read and seeked at arbitrary offsets. + pub blob_cache_max_bytes: usize, + /// Maximum size of a single remote blob that will be buffered in memory. + /// + /// Blobs larger than this are instead downloaded once into the local + /// repository so that future opens find them locally without a network + /// round-trip, and so they can be managed by `spfs clean`. + pub blob_cache_max_single_bytes: usize, +} + +/// Byte-bounded in-memory cache for remote blob payloads. +/// +/// Entries are evicted in insertion order (oldest first) once the total +/// byte count would exceed `max_bytes`. Multiple open handles to the same +/// blob share the same `Arc` without copying. +#[cfg(feature = "fuse-backend-abi-7-31")] +struct BlobCache { + /// Digest insertion order for eviction (front = oldest). + order: VecDeque, + data: HashMap>, + current_bytes: usize, + max_bytes: usize, +} + +#[cfg(feature = "fuse-backend-abi-7-31")] +impl BlobCache { + fn new(max_bytes: usize) -> Self { + Self { + order: VecDeque::new(), + data: HashMap::new(), + current_bytes: 0, + max_bytes, + } + } + + fn get(&self, digest: &spfs::encoding::Digest) -> Option> { + self.data.get(digest).cloned() + } + + /// Insert `bytes` under `digest`, evicting oldest entries to stay within + /// `max_bytes`. Returns a shared `Arc` to the stored slice. + fn insert(&mut self, digest: spfs::encoding::Digest, bytes: bytes::Bytes) -> Arc { + // A concurrent open may have already inserted this digest. + if let Some(existing) = self.data.get(&digest) { + return Arc::clone(existing); + } + let len = bytes.len(); + while !self.order.is_empty() && self.current_bytes + len > self.max_bytes { + if let Some(evicted_digest) = self.order.pop_front() + && let Some(evicted) = self.data.remove(&evicted_digest) + { + self.current_bytes = self.current_bytes.saturating_sub(evicted.len()); + } + } + let arc = Arc::new(bytes); + self.data.insert(digest, Arc::clone(&arc)); + self.order.push_back(digest); + self.current_bytes += len; + arc + } } /// Handles the allocation of inodes, and async responses to all FUSE requests @@ -75,6 +142,8 @@ struct Filesystem { inodes: DashMap>>, handles: DashMap, fs_creation_time: SystemTime, + #[cfg(feature = "fuse-backend-abi-7-31")] + blob_cache: Mutex, } impl Filesystem { @@ -89,6 +158,8 @@ impl Filesystem { manifest: Manifest, opts: Config, ) -> Self { + #[cfg(feature = "fuse-backend-abi-7-31")] + let blob_cache = Mutex::new(BlobCache::new(opts.blob_cache_max_bytes)); let fs = Self { repos, opts, @@ -100,6 +171,8 @@ impl Filesystem { inodes: Default::default(), handles: Default::default(), fs_creation_time: SystemTime::now(), + #[cfg(feature = "fuse-backend-abi-7-31")] + blob_cache, }; // pre-allocate inodes for all entries in the manifest let mut root = manifest.take_root(); @@ -380,6 +453,29 @@ impl Filesystem { let mut handle = None; #[allow(unused_mut)] let mut flags = FOPEN_KEEP_CACHE; + + // Fast path: serve from the in-memory blob cache without touching any + // repository. This is checked before the repo loop so that even the + // FS repo open() syscall is avoided on a cache hit. + #[cfg(feature = "fuse-backend-abi-7-31")] + { + let cached = self + .blob_cache + .lock() + .expect("blob cache lock poisoned") + .get(digest); + if let Some(data) = cached { + let fh = self.allocate_handle(Handle::BlobCached { + entry, + data, + current_offset: AtomicU64::new(0), + }); + tracing::trace!("open {ino} = {fh} [CACHE HIT]"); + reply.opened(fh, FOPEN_KEEP_CACHE); + return; + } + } + for repo in self.repos.iter() { match &**repo { spfs::storage::RepositoryHandle::FS(fs_repo) => { @@ -401,13 +497,39 @@ impl Filesystem { } #[cfg(feature = "fuse-backend-abi-7-31")] repo => match repo.open_payload(*digest).await { - Ok((stream, _)) => { - // TODO: try to leverage the returned file path? - handle = Some(Handle::BlobStream { - entry, - stream: tokio::sync::Mutex::new(stream), - }); - flags |= FOPEN_NONSEEKABLE | FOPEN_STREAM; + Ok((mut stream, _)) => { + let file_size = entry.size() as usize; + if file_size <= self.opts.blob_cache_max_single_bytes { + // Small enough to buffer in memory. Download the + // full payload and add it to the shared LRU cache. + let mut buf = Vec::with_capacity(file_size); + unwrap!(reply, stream.read_to_end(&mut buf).await); + let data = self + .blob_cache + .lock() + .expect("blob cache lock poisoned") + .insert(*digest, bytes::Bytes::from(buf)); + handle = Some(Handle::BlobCached { + entry, + data, + current_offset: AtomicU64::new(0), + }); + } else { + // Too large for the in-memory cache. Stream + // directly from the remote; the first + // non-sequential read or seek triggers a one-time + // download into the local FS repository. + let owned_digest = *digest; + handle = Some(Handle::BlobRemote { + entry, + digest: owned_digest, + inner: tokio::sync::Mutex::new(BlobRemoteInner::Streaming { + stream, + stream_pos: 0, + }), + }); + flags = FOPEN_STREAM; + } break; } Err(err) if err.try_next_repo() => continue, @@ -485,20 +607,79 @@ impl Filesystem { reply.data(&buf[..consumed]); } #[cfg(feature = "fuse-backend-abi-7-31")] - Handle::BlobStream { entry: _, stream } => { - let mut stream = stream.lock().await; - let mut buf = vec![0; size as usize]; - let mut consumed = 0; - while consumed < size as usize { - let count = unwrap!(reply, stream.read(&mut buf[consumed..]).await); - consumed += count; - if count == 0 { - // the end of the file has been reached - break; + Handle::BlobCached { data, .. } => { + let start = offset as usize; + let end = (start + size as usize).min(data.len()); + let slice = if start < data.len() { + &data[start..end] + } else { + &[] + }; + tracing::trace!("read {fh} = {}/{size} [CACHED]", slice.len()); + reply.data(slice); + } + #[cfg(feature = "fuse-backend-abi-7-31")] + Handle::BlobRemote { + entry: _, + digest, + inner, + } => { + let mut guard = inner.lock().await; + let read_offset = offset as u64; + + // Promote to a local file on the first non-sequential access. + let needs_promotion = if let BlobRemoteInner::Streaming { stream_pos, .. } = &*guard + { + *stream_pos != read_offset + } else { + false + }; + if needs_promotion { + tracing::debug!( + fh, + read_offset, + "non-sequential read on remote stream, promoting to local file" + ); + unwrap!( + reply, + self.promote_remote_to_local(&mut guard, digest).await + ); + } + + match &mut *guard { + BlobRemoteInner::Streaming { stream, stream_pos } => { + let mut buf = vec![0; size as usize]; + let mut consumed = 0; + while consumed < size as usize { + let count = unwrap!(reply, stream.read(&mut buf[consumed..]).await); + consumed += count; + if count == 0 { + break; + } + } + *stream_pos += consumed as u64; + tracing::trace!("read {fh} = {consumed}/{size} [REMOTE STREAM]"); + reply.data(&buf[..consumed]); + } + BlobRemoteInner::Local(file) => { + let f = unsafe { std::fs::File::from_raw_fd(file.as_raw_fd()) }; + let f = ManuallyDrop::new(f); + let mut buf = vec![0; size as usize]; + let mut consumed = 0; + while consumed < size as usize { + let count = unwrap!( + reply, + f.read_at(&mut buf[consumed..], consumed as u64 + read_offset) + ); + consumed += count; + if count == 0 { + break; + } + } + tracing::trace!("read {fh} = {consumed}/{size} [REMOTE LOCAL]"); + reply.data(&buf[..consumed]); } } - tracing::trace!("read {fh} = {consumed}/{size} [STREAM]"); - reply.data(&buf[..consumed]); } }; } @@ -633,58 +814,244 @@ impl Filesystem { } async fn lseek(&self, _ino: u64, fh: u64, offset: i64, whence: i32, reply: fuser::ReplyLseek) { - let Some(handle) = self.handles.get_mut(&fh) else { + let Some(handle) = self.handles.get(&fh) else { tracing::debug!("lseek {fh} = EBADF"); reply.error(libc::EBADF); return; }; - let file = match handle.value() { + match handle.value() { Handle::Tree { .. } => { tracing::debug!("lseek {fh} = EISDIR"); reply.error(libc::EISDIR); - return; } - Handle::BlobFile { entry: _, file } => file, #[cfg(feature = "fuse-backend-abi-7-31")] - Handle::BlobStream { .. } => { - tracing::warn!("FUSE should not allow seek calls on streams"); - tracing::debug!("lseek {fh} = EINVAL"); - reply.error(libc::EINVAL); - return; + Handle::BlobCached { + data, + current_offset, + .. + } => { + let cur = current_offset.load(Ordering::Relaxed); + let file_len = data.len() as u64; + let new_offset = match whence { + libc::SEEK_SET => offset as u64, + libc::SEEK_CUR => (cur as i64 + offset) as u64, + libc::SEEK_END => (file_len as i64 + offset) as u64, + // Simplest valid implementation per the linux man page: + // treat the entire file as data with no holes. + libc::SEEK_HOLE => file_len, + libc::SEEK_DATA => offset as u64, + _ => { + tracing::debug!("lseek {fh} = EINVAL"); + reply.error(libc::EINVAL); + return; + } + }; + current_offset.store(new_offset, Ordering::Relaxed); + tracing::trace!("lseek {fh} = {new_offset} [CACHED]"); + reply.offset(new_offset as i64); } - }; + #[cfg(feature = "fuse-backend-abi-7-31")] + Handle::BlobRemote { + entry, + digest, + inner, + } => { + let file_len = entry.size(); + let mut guard = inner.lock().await; + + // For the Streaming state, handle trivial (non-position- + // changing) seeks without touching the stream, and compute + // a target position for all position-changing seeks so we can + // promote before the borrow is held. + let needs_promotion_to = + if let BlobRemoteInner::Streaming { stream_pos, .. } = &*guard { + // SEEK_HOLE / SEEK_DATA: answer without moving the stream. + if whence == libc::SEEK_HOLE { + tracing::trace!("lseek {fh} = {file_len} [REMOTE STREAM SEEK_HOLE]"); + reply.offset(file_len as i64); + return; + } + if whence == libc::SEEK_DATA { + tracing::trace!("lseek {fh} = {offset} [REMOTE STREAM SEEK_DATA]"); + reply.offset(offset); + return; + } + let new_pos = match whence { + libc::SEEK_SET => offset as u64, + libc::SEEK_CUR => (*stream_pos as i64 + offset) as u64, + libc::SEEK_END => (file_len as i64 + offset) as u64, + _ => { + tracing::debug!("lseek {fh} = EINVAL"); + reply.error(libc::EINVAL); + return; + } + }; + if new_pos == *stream_pos { + // No-op: position unchanged, stream is still valid. + tracing::trace!("lseek {fh} = {new_pos} [REMOTE STREAM noop]"); + reply.offset(new_pos as i64); + return; + } + // Position is changing: we can't rewind the stream, + // so promote to a local file first. + Some(new_pos) + } else { + None + }; - let pos = match whence { - libc::SEEK_CUR => SeekFrom::Current(offset), - libc::SEEK_END => SeekFrom::End(offset), - libc::SEEK_SET => SeekFrom::Start(offset as u64), - - // From linux man pages: In the - // simplest implementation, a filesystem can support the operations - // by making SEEK_HOLE always return the offset of the end of the - // file, and making SEEK_DATA always return offset (i.e., even if - // the location referred to by offset is a hole, it can be - // considered to consist of data that is a sequence of zeros). - libc::SEEK_HOLE => SeekFrom::End(0), - libc::SEEK_DATA => SeekFrom::Start(offset as u64), - - _ => { - reply.error(libc::EINVAL); - return; + if let Some(new_pos) = needs_promotion_to { + tracing::debug!( + fh, + new_pos, + "seek requires rewind on remote stream, promoting to local file" + ); + unwrap!( + reply, + self.promote_remote_to_local(&mut guard, digest).await + ); + // Promotion succeeded; guard is now Local — seek to target. + match &mut *guard { + BlobRemoteInner::Local(file) => { + let f = unsafe { std::fs::File::from_raw_fd(file.as_raw_fd()) }; + let mut f = ManuallyDrop::new(f); + let new_offset = unwrap!(reply, f.seek(SeekFrom::Start(new_pos))); + tracing::trace!("lseek {fh} = {new_offset} [REMOTE PROMOTED]"); + reply.offset(new_offset as i64); + } + BlobRemoteInner::Streaming { .. } => { + unreachable!("promote_remote_to_local must transition to Local state"); + } + } + return; + } + + // Guard is already Local (the Streaming branch above either + // returned early or set needs_promotion_to, so reaching here + // means guard is Local). + match &mut *guard { + BlobRemoteInner::Local(file) => { + let pos = match whence { + libc::SEEK_CUR => SeekFrom::Current(offset), + libc::SEEK_END => SeekFrom::End(offset), + libc::SEEK_SET => SeekFrom::Start(offset as u64), + libc::SEEK_HOLE => SeekFrom::End(0), + libc::SEEK_DATA => SeekFrom::Start(offset as u64), + _ => { + tracing::debug!("lseek {fh} = EINVAL"); + reply.error(libc::EINVAL); + return; + } + }; + let f = unsafe { std::fs::File::from_raw_fd(file.as_raw_fd()) }; + let mut f = ManuallyDrop::new(f); + let new_offset = unwrap!(reply, f.seek(pos)); + tracing::trace!("lseek {fh} = {new_offset} [REMOTE LOCAL]"); + reply.offset(new_offset as i64); + } + BlobRemoteInner::Streaming { .. } => { + unreachable!("Streaming state must have been handled above"); + } + } } - }; + Handle::BlobFile { entry: _, file } => { + let pos = match whence { + libc::SEEK_CUR => SeekFrom::Current(offset), + libc::SEEK_END => SeekFrom::End(offset), + libc::SEEK_SET => SeekFrom::Start(offset as u64), + + // From linux man pages: In the + // simplest implementation, a filesystem can support the operations + // by making SEEK_HOLE always return the offset of the end of the + // file, and making SEEK_DATA always return offset (i.e., even if + // the location referred to by offset is a hole, it can be + // considered to consist of data that is a sequence of zeros). + libc::SEEK_HOLE => SeekFrom::End(0), + libc::SEEK_DATA => SeekFrom::Start(offset as u64), + + _ => { + reply.error(libc::EINVAL); + return; + } + }; + + // Safety: the fd must be valid and open, which we know. We also + // know that the file will live for the lifetime of this function + // and so can create a copy of it safely for use before that rather + // than duplicating it or using some kind of lock. + let f = unsafe { std::fs::File::from_raw_fd(file.as_raw_fd()) }; + // file takes ownership of the handle, but we need to make sure + // it is not closed since it's a copy of the File that remains alive + let mut f = ManuallyDrop::new(f); + let new_offset = unwrap!(reply, f.seek(pos)); + tracing::trace!("lseek {fh} = {new_offset} [FILE]"); + reply.offset(new_offset as i64); + } + } + } + + /// Download a large remote blob to the local FS repository and replace + /// the streaming handle state with a `Local` file handle. + /// + /// Idempotent: if the blob is already on disk (either from a previous + /// promotion or because the FS repo already had it), only the file open + /// is performed. + #[cfg(feature = "fuse-backend-abi-7-31")] + async fn promote_remote_to_local( + &self, + guard: &mut tokio::sync::MutexGuard<'_, BlobRemoteInner>, + digest: &spfs::encoding::Digest, + ) -> spfs::Result<()> { + if matches!(**guard, BlobRemoteInner::Local(_)) { + return Ok(()); + } + + let local_fs = self.repos.iter().find_map(|r| { + if let spfs::storage::RepositoryHandle::FS(fs) = &**r { + Some(fs) + } else { + None + } + }); + let local_fs = local_fs.ok_or_else(|| { + spfs::Error::String("no local FS repository available to cache large blob".into()) + })?; + + let opened = local_fs.opened().await?; + let payload_path = opened.payloads().build_digest_path(digest); + + // Only download if the payload is not already present on disk. + if std::fs::metadata(&payload_path).is_err() { + tracing::debug!(%digest, "promoting remote blob to local FS repository"); + let mut fresh_stream = None; + for repo in self.repos.iter() { + if matches!(&**repo, spfs::storage::RepositoryHandle::FS(_)) { + continue; + } + match repo.open_payload(*digest).await { + Ok((s, _)) => { + fresh_stream = Some(s); + break; + } + Err(err) if err.try_next_repo() => continue, + Err(err) => return Err(err), + } + } + let stream = fresh_stream.ok_or(spfs::Error::UnknownObject(*digest))?; + let stored = opened.commit_blob(stream).await?; + if stored != *digest { + return Err(spfs::Error::String(format!( + "payload digest mismatch when caching blob: expected {digest}, got {stored}" + ))); + } + } - // Safety: the fd must be valid and open, which we know. We also - // know that the file will live for the livetime of this function - // and so can create a copy of it safely for use before that rather - // than duplicating it or using some kind of lock - let f = unsafe { std::fs::File::from_raw_fd(file.as_raw_fd()) }; - // file takes ownership of the handle, but we need to make sure - // it is not closed since it's a copy of the File that remains alive - let mut f = ManuallyDrop::new(f); - let new_offset = unwrap!(reply, f.seek(pos)); - reply.offset(new_offset as i64); + let file = std::fs::OpenOptions::new() + .read(true) + .open(&payload_path) + .map_err(|e| spfs::Error::String(format!("failed to open cached payload: {e}")))?; + **guard = BlobRemoteInner::Local(file); + Ok(()) } } @@ -1019,6 +1386,22 @@ impl fuser::Filesystem for Session { } } +/// State for a lazily-promoted large remote blob. +/// +/// Starts as `Streaming` (sequential reads from the remote) and transitions to +/// `Local` on the first non-sequential read or meaningful lseek. +#[cfg(feature = "fuse-backend-abi-7-31")] +enum BlobRemoteInner { + Streaming { + stream: Pin>, + /// Byte offset of the next byte the stream will yield. + stream_pos: u64, + }, + /// The blob has been downloaded to the local FS repository and is + /// accessible as a regular file. + Local(std::fs::File), +} + enum Handle { /// A handle to real file on disk that can be seek'd, etc. BlobFile { @@ -1026,13 +1409,24 @@ enum Handle { file: std::fs::File, }, #[cfg(feature = "fuse-backend-abi-7-31")] - // A handle to an opaque file stream that can only be read once - BlobStream { + /// A remote blob fully buffered in memory, allowing arbitrary seeks and + /// random-access reads without a "current position" in the FUSE sense + /// (FUSE read() always supplies an explicit absolute offset). The + /// `current_offset` field is only maintained for SEEK_CUR lseek calls. + BlobCached { + entry: Arc>, + data: Arc, + current_offset: AtomicU64, + }, + #[cfg(feature = "fuse-backend-abi-7-31")] + /// A large remote blob served lazily: sequential reads stream directly + /// from the remote; the first non-sequential read or meaningful seek + /// promotes the blob to the local FS repository so all subsequent + /// accesses are from disk. + BlobRemote { entry: Arc>, - // TODO: we should avoid the tokio mutex at all costs, - // but we need a mutable reference to this BlobRead and - // need to hold it across an await (for reading from the stream) - stream: tokio::sync::Mutex>>, + digest: spfs::encoding::Digest, + inner: tokio::sync::Mutex, }, Tree { entry: Arc>, @@ -1044,8 +1438,184 @@ impl Handle { match self { Self::BlobFile { entry, .. } => Arc::clone(entry), #[cfg(feature = "fuse-backend-abi-7-31")] - Self::BlobStream { entry, .. } => Arc::clone(entry), + Self::BlobCached { entry, .. } => Arc::clone(entry), + #[cfg(feature = "fuse-backend-abi-7-31")] + Self::BlobRemote { entry, .. } => Arc::clone(entry), Self::Tree { entry } => Arc::clone(entry), } } } + +#[cfg(test)] +#[cfg(feature = "fuse-backend-abi-7-31")] +mod tests { + use std::sync::Arc; + + use spfs::tracking::Manifest; + + use super::*; + + // ── helpers ────────────────────────────────────────────────────────────── + + fn make_config() -> Config { + Config { + root_mode: 0o755 | libc::S_IFDIR as u32, + uid: nix::unistd::Uid::current(), + gid: nix::unistd::Gid::current(), + mount_options: Default::default(), + remotes: vec![], + include_secondary_tags: false, + blob_cache_max_bytes: 64 * 1024 * 1024, + blob_cache_max_single_bytes: 8 * 1024 * 1024, + } + } + + fn make_filesystem(repos: Vec>) -> Filesystem { + Filesystem::new(repos, Manifest::default(), make_config()) + } + + /// Convenience: create a `Digest` from a single repeated byte (for distinct + /// test digests without computing real hashes). + fn fake_digest(byte: u8) -> spfs::encoding::Digest { + spfs::encoding::Digest::from_bytes(&[byte; spfs::encoding::DIGEST_SIZE]).unwrap() + } + + // ── BlobCache ───────────────────────────────────────────────────────────── + + #[test] + fn blob_cache_insert_and_get() { + let mut cache = BlobCache::new(1024); + let digest = fake_digest(1); + let bytes = bytes::Bytes::from_static(b"hello cache"); + let stored = cache.insert(digest, bytes.clone()); + assert_eq!(*stored, bytes); + let hit = cache.get(&digest).expect("digest should be cached"); + assert_eq!(*hit, bytes); + } + + #[test] + fn blob_cache_miss_on_empty() { + let cache = BlobCache::new(1024); + assert!(cache.get(&fake_digest(42)).is_none()); + } + + #[test] + fn blob_cache_evicts_oldest_when_full() { + // Cap at 20 bytes; each entry is 10 bytes, so the third insert must + // evict the first. + let mut cache = BlobCache::new(20); + let (d1, d2, d3) = (fake_digest(1), fake_digest(2), fake_digest(3)); + cache.insert(d1, bytes::Bytes::from(vec![0u8; 10])); + cache.insert(d2, bytes::Bytes::from(vec![0u8; 10])); + cache.insert(d3, bytes::Bytes::from(vec![0u8; 10])); // evicts d1 + + assert!(cache.get(&d1).is_none(), "oldest entry should be evicted"); + assert!(cache.get(&d2).is_some()); + assert!(cache.get(&d3).is_some()); + } + + #[test] + fn blob_cache_duplicate_insert_returns_same_arc() { + let mut cache = BlobCache::new(1024); + let digest = fake_digest(5); + let a1 = cache.insert(digest, bytes::Bytes::from_static(b"data")); + let a2 = cache.insert(digest, bytes::Bytes::from_static(b"data")); + assert!( + Arc::ptr_eq(&a1, &a2), + "duplicate insert should reuse the existing Arc" + ); + } + + // ── promote_remote_to_local ─────────────────────────────────────────────── + + /// Helper: commit `content` to a freshly created FS repo and return the + /// repo together with the blob's digest. + async fn create_repo_with_blob( + path: &std::path::Path, + content: &[u8], + ) -> ( + spfs::storage::fs::MaybeOpenFsRepository, + spfs::encoding::Digest, + ) { + let repo = spfs::storage::fs::MaybeOpenFsRepository::create(path) + .await + .unwrap(); + let stream: Pin> = Box::pin(std::io::Cursor::new(content.to_vec())); + let opened = repo.opened().await.unwrap(); + let digest = opened.commit_blob(stream).await.unwrap(); + drop(opened); + (repo, digest) + } + + #[tokio::test] + async fn promote_uses_existing_on_disk_payload() { + // If the blob is already present in the local FS repository (e.g. from + // a prior mount), promote_remote_to_local must succeed without + // attempting a remote download and must transition the guard to Local. + let tmpdir = tempfile::tempdir().unwrap(); + let content = b"payload for promotion test"; + let (repo, digest) = create_repo_with_blob(tmpdir.path(), content).await; + + let repos = vec![Arc::new(spfs::storage::RepositoryHandle::FS(repo))]; + let fs = make_filesystem(repos); + + let cursor: Pin> = Box::pin(std::io::Cursor::new(content.to_vec())); + let mutex = tokio::sync::Mutex::new(BlobRemoteInner::Streaming { + stream: cursor, + stream_pos: 0, + }); + let mut guard = mutex.lock().await; + + fs.promote_remote_to_local(&mut guard, &digest) + .await + .expect("promotion should succeed when payload is already on disk"); + + assert!( + matches!(*guard, BlobRemoteInner::Local(_)), + "guard should be Local after promotion" + ); + } + + #[tokio::test] + async fn promote_idempotent_when_already_local() { + // A guard that is already in the Local state must be a no-op. + let tmpdir = tempfile::tempdir().unwrap(); + let path = tmpdir.path().join("dummy"); + std::fs::write(&path, b"x").unwrap(); + let file = std::fs::File::open(&path).unwrap(); + + let mutex = tokio::sync::Mutex::new(BlobRemoteInner::Local(file)); + let mut guard = mutex.lock().await; + + // Repos list is empty — if promote tried to do real work it would error. + let fs = make_filesystem(vec![]); + let digest = fake_digest(7); + + fs.promote_remote_to_local(&mut guard, &digest) + .await + .expect("promote on already-Local handle should succeed"); + assert!(matches!(*guard, BlobRemoteInner::Local(_))); + } + + #[tokio::test] + async fn promote_fails_without_local_fs_repo() { + // Without any FS repository in the stack, promotion cannot write the + // blob and must return an error rather than panic. + let fs = make_filesystem(vec![]); + let digest = fake_digest(8); + + let cursor: Pin> = Box::pin(std::io::Cursor::new(vec![])); + let mutex = tokio::sync::Mutex::new(BlobRemoteInner::Streaming { + stream: cursor, + stream_pos: 0, + }); + let mut guard = mutex.lock().await; + + assert!( + fs.promote_remote_to_local(&mut guard, &digest) + .await + .is_err(), + "promotion should fail when no local FS repository is configured" + ); + } +} diff --git a/crates/spfs/src/config.rs b/crates/spfs/src/config.rs index 2d4e82d585..42ebb6de1e 100644 --- a/crates/spfs/src/config.rs +++ b/crates/spfs/src/config.rs @@ -71,6 +71,14 @@ const fn default_fuse_heartbeat_grace_period_seconds() -> NonZeroU64 { unsafe { NonZeroU64::new_unchecked(300) } } +const fn default_fuse_blob_cache_max_bytes() -> usize { + 256 * 1024 * 1024 // 256 MiB +} + +const fn default_fuse_blob_cache_max_single_bytes() -> usize { + 64 * 1024 * 1024 // 64 MiB +} + pub fn default_proxy_repo_include_secondary_tags() -> bool { true } @@ -542,6 +550,19 @@ pub struct Fuse { /// Whether to include tags from secondary repos in lookup methods #[serde(default = "default_proxy_repo_include_secondary_tags")] pub include_secondary_tags: bool, + /// Maximum total bytes for the FUSE in-memory remote blob cache. + /// + /// Remote blobs smaller than `blob_cache_max_single_bytes` are buffered + /// here so that they can be sought and read at arbitrary offsets. + /// Default: 256 MiB. + #[serde(default = "default_fuse_blob_cache_max_bytes")] + pub blob_cache_max_bytes: usize, + /// Maximum bytes for a single remote blob to be held in the in-memory + /// cache. Blobs larger than this threshold are instead downloaded once + /// to the local repository so that future opens skip the network entirely. + /// Default: 64 MiB. + #[serde(default = "default_fuse_blob_cache_max_single_bytes")] + pub blob_cache_max_single_bytes: usize, } impl Fuse { @@ -562,6 +583,8 @@ impl Default for Fuse { heartbeat_interval_seconds: default_fuse_heartbeat_interval_seconds(), heartbeat_grace_period_seconds: default_fuse_heartbeat_grace_period_seconds(), include_secondary_tags: default_proxy_repo_include_secondary_tags(), + blob_cache_max_bytes: default_fuse_blob_cache_max_bytes(), + blob_cache_max_single_bytes: default_fuse_blob_cache_max_single_bytes(), } } } diff --git a/cspell.json b/cspell.json index b70acdbb0e..87d65f2624 100644 --- a/cspell.json +++ b/cspell.json @@ -650,6 +650,7 @@ "sddl", "seccomp", "seekable", + "seeked", "serde", "setattr", "setcap",