diff --git a/core/Cargo.lock b/core/Cargo.lock index 18c2011cd1ae..a8371c739231 100644 --- a/core/Cargo.lock +++ b/core/Cargo.lock @@ -5941,6 +5941,7 @@ dependencies = [ "opendal-service-dropbox", "opendal-service-etcd", "opendal-service-foundationdb", + "opendal-service-foyer", "opendal-service-fs", "opendal-service-ftp", "opendal-service-gcs", @@ -6572,6 +6573,20 @@ dependencies = [ "tokio", ] +[[package]] +name = "opendal-service-foyer" +version = "0.55.0" +dependencies = [ + "foyer", + "log", + "opendal", + "opendal-core", + "serde", + "size", + "tempfile", + "tokio", +] + [[package]] name = "opendal-service-fs" version = "0.55.0" diff --git a/core/Cargo.toml b/core/Cargo.toml index a8f05cb0f778..7317a6076a80 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -136,6 +136,7 @@ services-dbfs = ["dep:opendal-service-dbfs"] services-dropbox = ["dep:opendal-service-dropbox"] services-etcd = ["dep:opendal-service-etcd"] services-foundationdb = ["dep:opendal-service-foundationdb"] +services-foyer = ["dep:opendal-service-foyer"] services-fs = ["dep:opendal-service-fs"] services-ftp = ["dep:opendal-service-ftp"] services-gcs = ["dep:opendal-service-gcs"] @@ -252,6 +253,7 @@ opendal-service-dbfs = { path = "services/dbfs", version = "0.55.0", optional = opendal-service-dropbox = { path = "services/dropbox", version = "0.55.0", optional = true, default-features = false } opendal-service-etcd = { path = "services/etcd", version = "0.55.0", optional = true, default-features = false } opendal-service-foundationdb = { path = "services/foundationdb", version = "0.55.0", optional = true, default-features = false } +opendal-service-foyer = { path = "services/foyer", version = "0.55.0", optional = true, default-features = false } opendal-service-fs = { path = "services/fs", version = "0.55.0", optional = true, default-features = false } opendal-service-ftp = { path = "services/ftp", version = "0.55.0", optional = true, default-features = false } opendal-service-gcs = { path = "services/gcs", version = "0.55.0", optional = true, default-features = false } diff --git a/core/services/foyer/Cargo.toml b/core/services/foyer/Cargo.toml new file mode 100644 index 000000000000..d769538e510f --- /dev/null +++ b/core/services/foyer/Cargo.toml @@ -0,0 +1,46 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +description = "Apache OpenDAL Foyer service implementation" +name = "opendal-service-foyer" + +authors = { workspace = true } +edition = { workspace = true } +homepage = { workspace = true } +license = { workspace = true } +repository = { workspace = true } +rust-version = { workspace = true } +version = { workspace = true } + +[package.metadata.docs.rs] +all-features = true + +[dependencies] +foyer = { version = "0.18", features = ["serde"] } +log = { workspace = true } +opendal-core = { path = "../../core", version = "0.55.0", default-features = false } +serde = { workspace = true, features = ["derive"] } + +[dev-dependencies] +opendal = { path = "../..", version = "0.55.0", features = ["services-memory"] } +opendal-core = { path = "../../core", version = "0.55.0", features = [ + "services-memory", +] } +size = "0.5" +tempfile = "3" +tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } diff --git a/core/services/foyer/src/backend.rs b/core/services/foyer/src/backend.rs new file mode 100644 index 000000000000..4c5b5ff6c604 --- /dev/null +++ b/core/services/foyer/src/backend.rs @@ -0,0 +1,301 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::fmt::Debug; +use std::sync::Arc; + +use foyer::HybridCache; +use log::debug; + +use super::FOYER_SCHEME; +use super::FoyerKey; +use super::FoyerValue; +use super::config::FoyerConfig; +use super::core::FoyerCore; +use super::deleter::FoyerDeleter; +use super::writer::FoyerWriter; +use opendal_core::raw::*; +use opendal_core::*; + +/// [foyer](https://github.com/foyer-rs/foyer) backend support. +#[doc = include_str!("docs.md")] +#[derive(Default)] +pub struct FoyerBuilder { + pub(super) config: FoyerConfig, + pub(super) cache: Option>>, +} + +impl Debug for FoyerBuilder { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("FoyerBuilder") + .field("config", &self.config) + .finish_non_exhaustive() + } +} + +impl FoyerBuilder { + /// Create a new [`FoyerBuilder`] with default settings. + /// + /// The cache will be lazily initialized when first accessed if not provided via [`Self::cache`]. + /// + /// # Example + /// + /// ```no_run + /// use opendal_service_foyer::Foyer; + /// + /// let builder = Foyer::new() + /// .memory(64 * 1024 * 1024) + /// .root("/cache"); + /// ``` + pub fn new() -> Self { + Self { + ..Default::default() + } + } + + /// Set the name of this cache instance. + pub fn name(mut self, name: &str) -> Self { + if !name.is_empty() { + self.config.name = Some(name.to_owned()); + } + self + } + + /// Set a pre-built [`foyer::HybridCache`] instance. + /// + /// If provided, this cache will be used directly. Otherwise, a cache will be + /// lazily initialized using the configured memory size. + /// + /// # Example + /// + /// ```no_run + /// use opendal_service_foyer::Foyer; + /// use foyer::{HybridCacheBuilder, Engine}; + /// + /// # async fn example() -> Result<(), Box> { + /// let cache = HybridCacheBuilder::new() + /// .memory(64 * 1024 * 1024) + /// .storage(Engine::Large(Default::default())) + /// .build() + /// .await?; + /// + /// let builder = Foyer::new().cache(cache); + /// # Ok(()) + /// # } + /// ``` + pub fn cache(mut self, cache: HybridCache) -> Self { + self.cache = Some(Arc::new(cache)); + self + } + + /// Set the root path of this backend. + /// + /// All operations will be relative to this root path. + pub fn root(mut self, path: &str) -> Self { + self.config.root = if path.is_empty() { + None + } else { + Some(path.to_string()) + }; + self + } + + /// Set the memory capacity in bytes for the cache. + /// + /// This is used when the cache is lazily initialized (i.e., when no pre-built cache + /// is provided via [`Self::cache`]). + /// + /// Default is 1 GiB (1024 * 1024 * 1024 bytes). + pub fn memory(mut self, size: usize) -> Self { + self.config.memory = Some(size); + self + } + + /// Set the disk cache directory path. + /// + /// Enables hybrid cache with disk storage. When memory cache is full, data will + /// be persisted to this directory. + pub fn disk_path(mut self, path: &str) -> Self { + self.config.disk_path = if path.is_empty() { + None + } else { + Some(path.to_string()) + }; + self + } + + /// Set the disk cache total capacity in bytes. + /// + /// Only used when `disk_path` is set. + pub fn disk_capacity(mut self, size: usize) -> Self { + self.config.disk_capacity = Some(size); + self + } + + /// Set the individual cache file size in bytes. + /// + /// Default is 1 MiB (1024 * 1024 bytes). + /// Only used when `disk_path` is set. + pub fn disk_file_size(mut self, size: usize) -> Self { + self.config.disk_file_size = Some(size); + self + } + + /// Set the recovery mode when starting the cache. + /// + /// Valid values: "none" (default), "quiet", "strict". + /// - "none": Don't recover from disk + /// - "quiet": Recover and skip errors + /// - "strict": Recover and panic on errors + pub fn recover_mode(mut self, mode: &str) -> Self { + if !mode.is_empty() { + self.config.recover_mode = Some(mode.to_string()); + } + self + } + + /// Set the number of shards for concurrent access. + /// + /// Default is 1. Higher values improve concurrency but increase overhead. + pub fn shards(mut self, count: usize) -> Self { + self.config.shards = Some(count); + self + } +} + +impl Builder for FoyerBuilder { + type Config = FoyerConfig; + + fn build(self) -> Result { + debug!("backend build started: {:?}", &self); + + let root = normalize_root( + self.config + .root + .clone() + .unwrap_or_else(|| "/".to_string()) + .as_str(), + ); + + let mut core = FoyerCore::new(self.config.clone()); + if let Some(cache) = self.cache { + core = core.with_cache(cache.clone()); + } + + debug!("backend build finished: {:?}", self.config); + + Ok(FoyerBackend::new(core).with_normalized_root(root)) + } +} + +#[derive(Debug, Clone)] +pub struct FoyerBackend { + core: Arc, + root: String, + info: Arc, +} + +impl FoyerBackend { + fn new(core: FoyerCore) -> Self { + let info = AccessorInfo::default(); + info.set_scheme(FOYER_SCHEME); + info.set_name(core.name().unwrap_or("foyer")); + info.set_root("/"); + info.set_native_capability(Capability { + read: true, + write: true, + write_can_empty: true, + delete: true, + stat: true, + shared: true, + ..Default::default() + }); + + Self { + core: Arc::new(core), + root: "/".to_string(), + info: Arc::new(info), + } + } + + fn with_normalized_root(mut self, root: String) -> Self { + self.info.set_root(&root); + self.root = root; + self + } +} + +impl Access for FoyerBackend { + type Reader = Buffer; + type Writer = FoyerWriter; + type Lister = (); + type Deleter = oio::OneShotDeleter; + + fn info(&self) -> Arc { + self.info.clone() + } + + async fn stat(&self, path: &str, _: OpStat) -> Result { + let p = build_abs_path(&self.root, path); + + if p == build_abs_path(&self.root, "") { + Ok(RpStat::new(Metadata::new(EntryMode::DIR))) + } else { + match self.core.get(&p).await? { + Some(bs) => Ok(RpStat::new( + Metadata::new(EntryMode::FILE).with_content_length(bs.len() as u64), + )), + None => Err(Error::new(ErrorKind::NotFound, "key not found in foyer")), + } + } + } + + async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { + let p = build_abs_path(&self.root, path); + + let buffer = match self.core.get(&p).await? { + Some(bs) => bs, + None => return Err(Error::new(ErrorKind::NotFound, "key not found in foyer")), + }; + + let buffer = if args.range().is_full() { + buffer + } else { + let range = args.range(); + let start = range.offset() as usize; + let end = match range.size() { + Some(size) => (range.offset() + size) as usize, + None => buffer.len(), + }; + buffer.slice(start..end.min(buffer.len())) + }; + + Ok((RpRead::new(), buffer)) + } + + async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> { + let p = build_abs_path(&self.root, path); + Ok((RpWrite::new(), FoyerWriter::new(self.core.clone(), p))) + } + + async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> { + Ok(( + RpDelete::default(), + oio::OneShotDeleter::new(FoyerDeleter::new(self.core.clone(), self.root.clone())), + )) + } +} diff --git a/core/services/foyer/src/config.rs b/core/services/foyer/src/config.rs new file mode 100644 index 000000000000..0056e0f913a1 --- /dev/null +++ b/core/services/foyer/src/config.rs @@ -0,0 +1,143 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use serde::Deserialize; +use serde::Serialize; + +use super::backend::FoyerBuilder; +use opendal_core::{Configurator, OperatorUri, Result}; + +/// Config for Foyer services support. +#[derive(Default, Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] +#[serde(default)] +#[non_exhaustive] +pub struct FoyerConfig { + /// Name for this cache instance. + pub name: Option, + /// Root path of this backend. + pub root: Option, + /// Memory capacity in bytes for the cache. + pub memory: Option, + /// Disk cache directory path. + /// + /// If set, enables hybrid cache with disk storage. Data will be persisted to + /// this directory when memory cache is full. + pub disk_path: Option, + /// Disk cache total capacity in bytes. + /// Only used when `disk_path` is set. + pub disk_capacity: Option, + /// Individual cache file size in bytes. + /// + /// Default is 1 MiB. + /// Only used when `disk_path` is set. + pub disk_file_size: Option, + /// Recovery mode when starting the cache. + /// + /// Valid values: "none" (default), "quiet", "strict". + /// - "none": Don't recover from disk + /// - "quiet": Recover and skip errors + /// - "strict": Recover and panic on errors + pub recover_mode: Option, + /// Number of shards for concurrent access. + /// + /// Default is 1. Higher values improve concurrency but increase overhead. + pub shards: Option, +} + +impl Configurator for FoyerConfig { + type Builder = FoyerBuilder; + + fn from_uri(uri: &OperatorUri) -> Result { + let mut map = uri.options().clone(); + + if let Some(name) = uri.option("name") { + map.insert("name".to_string(), name.to_string()); + } + + if let Some(root) = uri.root() { + if !root.is_empty() { + map.insert("root".to_string(), root.to_string()); + } + } + + Self::from_iter(map) + } + + fn into_builder(self) -> Self::Builder { + FoyerBuilder { + config: self, + ..Default::default() + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_from_uri_sets_memory() { + let uri = OperatorUri::new( + "foyer:///cache?name=test&memory=67108864", + Vec::<(String, String)>::new(), + ) + .unwrap(); + + let cfg = FoyerConfig::from_uri(&uri).unwrap(); + assert_eq!(cfg.name.as_deref(), Some("test")); + assert_eq!(cfg.root.as_deref(), Some("cache")); + assert_eq!(cfg.memory, Some(64 * 1024 * 1024)); + } + + #[test] + fn test_from_uri_sets_name_and_root() { + let uri = + OperatorUri::new("foyer:///data?name=session", Vec::<(String, String)>::new()).unwrap(); + + let cfg = FoyerConfig::from_uri(&uri).unwrap(); + assert_eq!(cfg.name.as_deref(), Some("session")); + assert_eq!(cfg.root.as_deref(), Some("data")); + } + + #[test] + fn test_from_uri_sets_disk_config() { + let uri = OperatorUri::new( + "foyer:///cache?memory=67108864&disk_path=/tmp/foyer&disk_capacity=1073741824&disk_file_size=2097152", + Vec::<(String, String)>::new(), + ) + .unwrap(); + + let cfg = FoyerConfig::from_uri(&uri).unwrap(); + assert_eq!(cfg.memory, Some(64 * 1024 * 1024)); + assert_eq!(cfg.disk_path.as_deref(), Some("/tmp/foyer")); + assert_eq!(cfg.disk_capacity, Some(1024 * 1024 * 1024)); + assert_eq!(cfg.disk_file_size, Some(2 * 1024 * 1024)); + } + + #[test] + fn test_from_uri_sets_recovery_and_shards() { + let uri = OperatorUri::new( + "foyer:///?recover_mode=quiet&shards=4", + Vec::<(String, String)>::new(), + ) + .unwrap(); + + let cfg = FoyerConfig::from_uri(&uri).unwrap(); + assert_eq!(cfg.recover_mode.as_deref(), Some("quiet")); + assert_eq!(cfg.shards, Some(4)); + } +} diff --git a/core/services/foyer/src/core.rs b/core/services/foyer/src/core.rs new file mode 100644 index 000000000000..1b9b627d9b3a --- /dev/null +++ b/core/services/foyer/src/core.rs @@ -0,0 +1,178 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::fmt::Debug; +use std::path::PathBuf; +use std::sync::Arc; + +use foyer::DirectFsDeviceOptions; +use foyer::Engine; +use foyer::HybridCache; +use foyer::HybridCacheBuilder; +use foyer::LargeEngineOptions; +use foyer::RecoverMode; + +use opendal_core::Buffer; +use opendal_core::Error; +use opendal_core::ErrorKind; +use opendal_core::Result; + +use super::FoyerConfig; +use super::FoyerKey; +use super::FoyerValue; + +/// FoyerCore holds the foyer HybridCache instance. +/// +/// It supports lazy initialization when constructed from URI. +#[derive(Clone)] +pub struct FoyerCore { + /// OnceLock for lazy cache initialization. + cache: std::sync::OnceLock>>, + /// Config + config: FoyerConfig, +} + +impl Debug for FoyerCore { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("FoyerCore") + .field("name", &self.config.name) + .finish_non_exhaustive() + } +} + +const FOYER_DEFAULT_MEMORY_BYTES: usize = 1024 * 1024 * 1024; +const FOYER_DEFAULT_DISK_FILE_SIZE: usize = 1024 * 1024; +const FOYER_DEFAULT_SHARDS: usize = 1; + +fn parse_recover_mode(mode_str: &str) -> Result { + match mode_str.to_lowercase().as_str() { + "none" => Ok(RecoverMode::None), + "quiet" => Ok(RecoverMode::Quiet), + "strict" => Ok(RecoverMode::Strict), + _ => Err(Error::new( + ErrorKind::ConfigInvalid, + format!( + "invalid recover_mode: {}, expected 'none', 'quiet', or 'strict'", + mode_str + ), + )), + } +} + +impl FoyerCore { + /// Create a new FoyerCore with a pre-built cache. + /// + /// This is used when the user provides a `HybridCache` via `Foyer::new(cache)`. + pub fn new(config: FoyerConfig) -> Self { + Self { + cache: std::sync::OnceLock::new(), + config, + } + } + + /// Initialize the cache with the given pre-built instance. + pub(crate) fn with_cache(self, cache: Arc>) -> Self { + let _ = self.cache.set(cache); + self + } + + /// Get the cache, initializing lazily if needed. + async fn get_cache(&self) -> Result>> { + if let Some(cache) = self.cache.get() { + return Ok(cache.clone()); + } + + let memory = self.config.memory.unwrap_or(FOYER_DEFAULT_MEMORY_BYTES); + let shards = self.config.shards.unwrap_or(FOYER_DEFAULT_SHARDS); + let recover_mode = if let Some(ref mode_str) = self.config.recover_mode { + parse_recover_mode(mode_str)? + } else { + RecoverMode::None + }; + + let mut builder = HybridCacheBuilder::new() + .memory(memory) + .with_shards(shards) + .storage(Engine::Large(LargeEngineOptions::new())) + .with_recover_mode(recover_mode); + + // Configure disk storage if disk_path is provided + if let Some(ref disk_path) = self.config.disk_path { + let path = PathBuf::from(disk_path); + + let mut device_options = DirectFsDeviceOptions::new(path); + + if let Some(capacity) = self.config.disk_capacity { + device_options = device_options.with_capacity(capacity); + } + + let file_size = self + .config + .disk_file_size + .unwrap_or(FOYER_DEFAULT_DISK_FILE_SIZE); + device_options = device_options.with_file_size(file_size); + + builder = builder.with_device_options(device_options); + } + + let cache = Arc::new(builder.build().await.map_err(|e| { + Error::new(ErrorKind::Unexpected, "failed to build foyer cache").set_source(e) + })?); + + self.cache + .set(cache.clone()) + .map_err(|_| Error::new(ErrorKind::Unexpected, "failed to initialize foyer cache"))?; + + Ok(cache) + } + + pub fn name(&self) -> Option<&str> { + self.config.name.as_deref() + } + + pub async fn get(&self, key: &str) -> Result> { + let cache = self.get_cache().await?; + let foyer_key = FoyerKey { + path: key.to_string(), + }; + + match cache.get(&foyer_key).await { + Ok(Some(entry)) => Ok(Some(entry.value().0.clone())), + Ok(None) => Ok(None), + Err(_) => Ok(None), + } + } + + pub async fn insert(&self, key: &str, value: Buffer) -> Result<()> { + let cache = self.get_cache().await?; + cache.insert( + FoyerKey { + path: key.to_string(), + }, + FoyerValue(value), + ); + Ok(()) + } + + pub async fn remove(&self, key: &str) -> Result<()> { + let cache = self.get_cache().await?; + cache.remove(&FoyerKey { + path: key.to_string(), + }); + Ok(()) + } +} diff --git a/core/services/foyer/src/deleter.rs b/core/services/foyer/src/deleter.rs new file mode 100644 index 000000000000..6528cefb1723 --- /dev/null +++ b/core/services/foyer/src/deleter.rs @@ -0,0 +1,42 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use opendal_core::raw::*; +use opendal_core::*; + +use super::core::FoyerCore; + +pub struct FoyerDeleter { + core: Arc, + root: String, +} + +impl FoyerDeleter { + pub fn new(core: Arc, root: String) -> Self { + Self { core, root } + } +} + +impl oio::OneShotDelete for FoyerDeleter { + async fn delete_once(&self, path: String, _: OpDelete) -> Result<()> { + let p = build_abs_path(&self.root, &path); + self.core.remove(&p).await?; + Ok(()) + } +} diff --git a/core/services/foyer/src/docs.md b/core/services/foyer/src/docs.md new file mode 100644 index 000000000000..90e95177e87c --- /dev/null +++ b/core/services/foyer/src/docs.md @@ -0,0 +1,128 @@ +# Foyer Service + +[foyer](https://github.com/foyer-rs/foyer) is a high-performance hybrid cache library +that supports both in-memory and on-disk caching. + +This service provides foyer as a volatile KV storage backend. Data stored may be evicted +when the cache is full. + +## Capabilities + +This service can be used to: + +- [x] stat +- [x] read +- [x] write +- [x] delete +- [ ] list (not supported) +- [ ] blocking (not supported) + +## Configuration + +Foyer service can be configured in two ways: + +1. **Pre-configured cache**: Pass a fully configured `HybridCache` instance for maximum control +2. **Auto-configured cache**: Use builder methods to configure memory and disk caching + +### Auto-configured Cache Options + +When using the builder API without providing a pre-built cache, the following options are available: + +- `memory`: Memory cache capacity (default: 1 GiB) +- `disk_path`: Directory path for disk cache (enables hybrid caching) +- `disk_capacity`: Total disk cache capacity +- `disk_file_size`: Individual cache file size (default: 1 MiB) +- `recover_mode`: Recovery mode on startup - "none" (default), "quiet", or "strict" +- `shards`: Number of cache shards for concurrency (default: 1) + +## Example + +### Via Pre-configured Cache + +```rust,no_run +use opendal::Operator; +use opendal_service_foyer::Foyer; +use opendal_service_foyer::FoyerKey; +use opendal_service_foyer::FoyerValue; +use foyer::{HybridCacheBuilder, Engine}; + +#[tokio::main] +async fn main() -> Result<(), Box> { + // Create a foyer HybridCache with full control + let cache = HybridCacheBuilder::new() + .memory(64 * 1024 * 1024) // 64MB memory cache + .storage(Engine::Large(Default::default())) + .build() + .await?; + + // Create operator + let op = Operator::new(Foyer::new().cache(cache))? + .finish(); + + // Use it like any other OpenDAL operator + op.write("key", "value").await?; + let value = op.read("key").await?; + + Ok(()) +} +``` + +### Via Auto-configuration (Hybrid Cache) + +```rust,no_run +use opendal::Operator; +use opendal_service_foyer::Foyer; + +#[tokio::main] +async fn main() -> Result<(), Box> { + // Create operator with hybrid cache (memory + disk) + let op = Operator::new( + Foyer::new() + .memory(64 * 1024 * 1024) // 64MB memory + .disk_path("/tmp/foyer_cache") // Enable disk cache + .disk_capacity(1024 * 1024 * 1024) // 1GB disk + .disk_file_size(1024 * 1024) // 1MB per file + .recover_mode("quiet") // Recover on restart + .shards(4) // 4 shards for concurrency + )? + .finish(); + + op.write("key", "value").await?; + let value = op.read("key").await?; + + Ok(()) +} +``` + +### Via URI + +```rust,no_run +use opendal::Operator; + +#[tokio::main] +async fn main() -> Result<(), Box> { + // Memory-only cache via URI + let op = Operator::from_uri("foyer:///?memory=67108864")?; + + // Hybrid cache via URI + let op = Operator::from_uri( + "foyer:///cache?memory=67108864&disk_path=/tmp/foyer&disk_capacity=1073741824" + )?; + + Ok(()) +} +``` + +## Notes + +- **Data Volatility**: Foyer is a cache, not persistent storage. Data may be + evicted at any time when the cache reaches its capacity limit. +- **Hybrid Caching**: When `disk_path` is configured, cold data automatically + moves from memory to disk as the memory cache fills up. +- **Recovery Modes**: + - `"none"`: Don't restore data from disk on restart (volatile cache) + - `"quiet"`: Restore data and skip any corrupted entries + - `"strict"`: Restore data and fail on any corruption +- **No List Support**: Foyer does not support efficient key iteration, so the + `list` operation is not available. +- **Async Only**: Foyer is async-only, blocking operations are not supported. diff --git a/core/services/foyer/src/lib.rs b/core/services/foyer/src/lib.rs new file mode 100644 index 000000000000..555b36036ce3 --- /dev/null +++ b/core/services/foyer/src/lib.rs @@ -0,0 +1,228 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Foyer service implementation for Apache OpenDAL. +//! +//! Foyer is a high-performance hybrid cache library that supports both +//! in-memory and on-disk caching. This service provides foyer as a +//! volatile KV storage backend, similar to using Redis as a cache. +//! +//! Note: Data stored in foyer may be evicted when the cache is full. +//! Do not use this service for persistent storage. + +#![cfg_attr(docsrs, feature(doc_cfg))] +#![deny(missing_docs)] + +mod backend; +mod config; +mod core; +mod deleter; +mod writer; + +use std::ops::Deref; + +use foyer::Code; +use foyer::CodeError; + +use opendal_core::Buffer; + +pub use backend::FoyerBuilder as Foyer; +pub use config::FoyerConfig; + +/// Default scheme for foyer service. +pub const FOYER_SCHEME: &str = "foyer"; + +/// [`FoyerKey`] is a key for the foyer cache. +/// +/// It uses bincode (via serde) for efficient serialization. +#[derive(Debug, Clone, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)] +pub struct FoyerKey { + /// The path of the key. + pub path: String, +} + +/// [`FoyerValue`] is a wrapper around `Buffer` that implements the `Code` trait. +#[derive(Debug)] +pub struct FoyerValue(pub Buffer); + +impl Deref for FoyerValue { + type Target = Buffer; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl Code for FoyerValue { + fn encode(&self, writer: &mut impl std::io::Write) -> std::result::Result<(), CodeError> { + let len = self.0.len() as u64; + writer.write_all(&len.to_le_bytes())?; + std::io::copy(&mut self.0.clone(), writer)?; + Ok(()) + } + + fn decode(reader: &mut impl std::io::Read) -> std::result::Result + where + Self: Sized, + { + let mut len_bytes = [0u8; 8]; + reader.read_exact(&mut len_bytes)?; + let len = u64::from_le_bytes(len_bytes) as usize; + let mut buffer = vec![0u8; len]; + reader.read_exact(&mut buffer[..len])?; + Ok(FoyerValue(buffer.into())) + } + + fn estimated_size(&self) -> usize { + 8 + self.0.len() + } +} + +/// Register this service into the given registry. +pub fn register_foyer_service(registry: &opendal_core::OperatorRegistry) { + registry.register::(FOYER_SCHEME); +} + +#[cfg(test)] +mod tests { + use foyer::{ + DirectFsDeviceOptions, Engine, HybridCacheBuilder, LargeEngineOptions, RecoverMode, + }; + use opendal_core::Operator; + use size::consts::MiB; + + use super::*; + + fn key(i: u8) -> String { + format!("obj-{i}") + } + + fn value(i: u8) -> Vec { + vec![i; 1024] + } + + #[tokio::test] + async fn test_basic_operations() { + let dir = tempfile::tempdir().unwrap(); + + let cache = HybridCacheBuilder::new() + .memory(10) + .with_shards(1) + .storage(Engine::Large(LargeEngineOptions::new())) + .with_device_options( + DirectFsDeviceOptions::new(dir.path()) + .with_capacity(16 * MiB as usize) + .with_file_size(MiB as usize), + ) + .with_recover_mode(RecoverMode::None) + .build() + .await + .unwrap(); + + let op = Operator::new(Foyer::new().cache(cache)).unwrap().finish(); + + // Write some data + for i in 0..10 { + op.write(&key(i), value(i)).await.unwrap(); + } + + // Read back + for i in 0..10 { + let buf = op.read(&key(i)).await.unwrap(); + assert_eq!(buf.to_vec(), value(i)); + } + + // Stat + for i in 0..10 { + let meta = op.stat(&key(i)).await.unwrap(); + assert_eq!(meta.content_length(), 1024); + } + + // Delete + for i in 0..10 { + op.delete(&key(i)).await.unwrap(); + } + + // Verify deleted + for i in 0..10 { + let res = op.read(&key(i)).await; + assert!(res.is_err(), "should fail to read deleted file"); + } + } + + #[tokio::test] + async fn test_range_read() { + let dir = tempfile::tempdir().unwrap(); + + let cache = HybridCacheBuilder::new() + .memory(1024 * 1024) + .with_shards(1) + .storage(Engine::Large(LargeEngineOptions::new())) + .with_device_options( + DirectFsDeviceOptions::new(dir.path()) + .with_capacity(16 * MiB as usize) + .with_file_size(MiB as usize), + ) + .with_recover_mode(RecoverMode::None) + .build() + .await + .unwrap(); + + let op = Operator::new(Foyer::new().cache(cache)).unwrap().finish(); + + let data: Vec = (0..100).collect(); + op.write("test", data.clone()).await.unwrap(); + + // Range read + let buf = op.read_with("test").range(10..20).await.unwrap(); + assert_eq!(buf.to_vec(), data[10..20]); + } + + #[tokio::test] + async fn test_hybrid_cache_via_config() { + let dir = tempfile::tempdir().unwrap(); + + // Test using the builder API with disk configuration + let op = Operator::new( + Foyer::new() + .memory(1024 * 1024) // 1MB memory + .disk_path(dir.path().to_str().unwrap()) + .disk_capacity(16 * 1024 * 1024) // 16MB disk + .disk_file_size(1024 * 1024) // 1MB per file + .recover_mode("none") + .shards(1), + ) + .unwrap() + .finish(); + + // Write some data + for i in 0..5 { + op.write(&key(i), value(i)).await.unwrap(); + } + + // Read back + for i in 0..5 { + let buf = op.read(&key(i)).await.unwrap(); + assert_eq!(buf.to_vec(), value(i)); + } + + // Delete + for i in 0..5 { + op.delete(&key(i)).await.unwrap(); + } + } +} diff --git a/core/services/foyer/src/writer.rs b/core/services/foyer/src/writer.rs new file mode 100644 index 000000000000..e9faa0230f0a --- /dev/null +++ b/core/services/foyer/src/writer.rs @@ -0,0 +1,60 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use opendal_core::raw::*; +use opendal_core::*; + +use super::core::FoyerCore; + +pub struct FoyerWriter { + core: Arc, + path: String, + buffer: oio::QueueBuf, +} + +impl FoyerWriter { + pub fn new(core: Arc, path: String) -> Self { + Self { + core, + path, + buffer: oio::QueueBuf::new(), + } + } +} + +impl oio::Write for FoyerWriter { + async fn write(&mut self, bs: Buffer) -> Result<()> { + self.buffer.push(bs); + Ok(()) + } + + async fn close(&mut self) -> Result { + let buf = self.buffer.clone().collect(); + let length = buf.len() as u64; + self.core.insert(&self.path, buf).await?; + + let meta = Metadata::new(EntryMode::from_path(&self.path)).with_content_length(length); + Ok(meta) + } + + async fn abort(&mut self) -> Result<()> { + self.buffer.clear(); + Ok(()) + } +} diff --git a/core/src/lib.rs b/core/src/lib.rs index 302fa88dc6d5..4e130ac99dce 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -94,6 +94,9 @@ fn init_default_registry_inner(registry: &OperatorRegistry) { #[cfg(feature = "services-foundationdb")] opendal_service_foundationdb::register_foundationdb_service(registry); + #[cfg(feature = "services-foyer")] + opendal_service_foyer::register_foyer_service(registry); + #[cfg(feature = "services-fs")] opendal_service_fs::register_fs_service(registry); @@ -274,6 +277,8 @@ pub mod services { pub use opendal_service_etcd::*; #[cfg(feature = "services-foundationdb")] pub use opendal_service_foundationdb::*; + #[cfg(feature = "services-foyer")] + pub use opendal_service_foyer::*; #[cfg(feature = "services-fs")] pub use opendal_service_fs::*; #[cfg(feature = "services-ftp")]