diff --git a/Cargo.lock b/Cargo.lock index 93482fd13..2aca824f4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4978,6 +4978,7 @@ dependencies = [ "ignore", "indexmap 2.11.0", "itertools 0.14.0", + "libc", "memmap2", "miette", "nom", @@ -4989,6 +4990,7 @@ dependencies = [ "ring", "rstest", "sentry", + "sentry-miette", "serde", "serde_json", "serde_yaml 0.9.34+deprecated", @@ -5008,6 +5010,7 @@ dependencies = [ "ulid", "url", "variantly", + "whoami", ] [[package]] diff --git a/crates/spfs/src/storage/fs/tag.rs b/crates/spfs/src/storage/fs/tag.rs index 81097d70e..f031735b1 100644 --- a/crates/spfs/src/storage/fs/tag.rs +++ b/crates/spfs/src/storage/fs/tag.rs @@ -953,7 +953,7 @@ impl TagLock { /// # Safety: /// Tag locks are used to ensure that only one process is writing to a tag file at a time. /// Removing the lock file without ensuring that the tag file is not being written to may - /// cause the data within the file to become corrupt. + /// cause the data within the tag file to become corrupt. pub unsafe fn remove>(tag_file: P) -> Result<()> { let mut lock_file = tag_file.as_ref().to_path_buf(); lock_file.set_extension(Self::LOCK_EXT); diff --git a/crates/spk-cli/cmd-repo/src/cmd_repo.rs b/crates/spk-cli/cmd-repo/src/cmd_repo.rs index 29bcedbdd..ea2a6f771 100644 --- a/crates/spk-cli/cmd-repo/src/cmd_repo.rs +++ b/crates/spk-cli/cmd-repo/src/cmd_repo.rs @@ -135,23 +135,37 @@ impl RepoCommand { // Load the current index for this repo now let mut was_full_index = String::from(""); - match FlatBufferRepoIndex::from_repo_file(&repo_to_index).await { + let result = match FlatBufferRepoIndex::from_repo_file(&repo_to_index).await { Ok(current_index) => { - current_index - .update_packages(&repo_to_index, &idents) - .await? + current_index.update_packages(&repo_to_index, &idents).await } Err(err) => { // There isn't an existing index, so generate one from scratch that // will also include the update package version. tracing::warn!("Failed to load flatbuffer index: {err}"); tracing::warn!("No current index to update. Creating a full index ..."); - FlatBufferRepoIndex::index_repo(&repos).await?; was_full_index = - " [no previous index, so a full index was created]".to_string() + " [no previous index, so a full index was created]".to_string(); + FlatBufferRepoIndex::index_repo(&repos).await } }; + if result.is_err() { + // Need to keep these if-statements separate + // to allow for two different error handling cases. + #[allow(clippy::collapsible_if)] + if let Some(err) = result.err() { + if let Some(spk_storage::Error::UnableToGetWriteLockError(..)) = + err.root_cause().downcast_ref::() + { + tracing::error!("{err}"); + // A distinct exit code when unable to lock the index file. + return Ok(3); + } + return Err(err); + } + } + tracing::info!( "Index update for '{}' in '{}' repo completed in: {} secs{was_full_index}", idents.iter().map(ToString::to_string).join(", "), diff --git a/crates/spk-config/src/config.rs b/crates/spk-config/src/config.rs index aa8a146b2..00d0d343a 100644 --- a/crates/spk-config/src/config.rs +++ b/crates/spk-config/src/config.rs @@ -104,6 +104,14 @@ pub struct Index { /// one kind of index available for the repository. The default is /// 'flatb', a flatbuffers file based index. pub kind: String, + + /// Time to sleep between getting a write lock on the index data, + /// for index generation and updates. + pub lock_sleep_seconds: u64, + + /// Maximum number of times to try to get a write lock on the + /// index data, for index generation and updates. + pub lock_max_tries: u64, } impl Default for Index { @@ -113,6 +121,12 @@ impl Default for Index { // safer but can add some overhead. verify_before_use: true, kind: String::from(FLATBUFFER_INDEX_TOKEN), + // Sleeping for 6 seconds between write lock attempts and + // allowing 5 tries before bailing out, gives a total of + // about 30 seconds before timing out of getting a lock + // for index writing. + lock_sleep_seconds: 6, + lock_max_tries: 5, } } } diff --git a/crates/spk-storage/Cargo.toml b/crates/spk-storage/Cargo.toml index 51cff3b01..7bca02509 100644 --- a/crates/spk-storage/Cargo.toml +++ b/crates/spk-storage/Cargo.toml @@ -13,6 +13,10 @@ description = { workspace = true } workspace = true [features] +sentry = [ + "dep:sentry", + "dep:sentry-miette", +] [dependencies] arc-swap = { workspace = true } @@ -33,6 +37,7 @@ glob = { workspace = true } ignore = "0.4.18" indexmap = { workspace = true } itertools = { workspace = true } +libc = { workspace = true } memmap2 = { workspace = true } miette = { workspace = true } nom = { workspace = true } @@ -44,6 +49,7 @@ relative-path = { workspace = true } ring = { workspace = true } rstest = { workspace = true } sentry = { workspace = true, optional = true } +sentry-miette = { workspace = true, optional = true } serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true } serde_yaml = { workspace = true } @@ -61,6 +67,7 @@ tracing-subscriber = { workspace = true } ulid = { workspace = true } url = "2.2" variantly = { workspace = true } +whoami = { workspace = true } [dev-dependencies] spfstest = { workspace = true } diff --git a/crates/spk-storage/src/error.rs b/crates/spk-storage/src/error.rs index e2f464b42..6914fb10c 100644 --- a/crates/spk-storage/src/error.rs +++ b/crates/spk-storage/src/error.rs @@ -91,6 +91,14 @@ pub enum Error { IndexFailedToGenerate(String), #[error("Unknown index kind: '{0}', unable to {1}load that kind of index")] IndexUnknownKind(String, String), + #[error("Unable to open the {0} lock file at all: {1}: {2}")] + UnableToOpenLockFileError(String, String, String), + #[error( + "Unable to lock the {0} file exclusively. {1} tries with {2} seconds between each. Lock is held by {3} for {4}. Giving up. Try again later, or investigate the process that made the lock file." + )] + UnableToGetWriteLockError(String, u64, u64, String, String), + #[error("Failed to remove the {0} lock file: {1}: {2}")] + UnableToRemoveWriteLockError(String, String, String), #[error("{0}")] String(String), diff --git a/crates/spk-storage/src/storage/flatbuffer_index.rs b/crates/spk-storage/src/storage/flatbuffer_index.rs index 4842839fe..e45ef255b 100644 --- a/crates/spk-storage/src/storage/flatbuffer_index.rs +++ b/crates/spk-storage/src/storage/flatbuffer_index.rs @@ -7,9 +7,9 @@ use std::collections::{HashMap, HashSet}; use std::fs::Permissions; #[cfg(unix)] use std::os::unix::fs::PermissionsExt; -use std::path::PathBuf; +use std::path::{Path, PathBuf}; use std::sync::Arc; -use std::time::Instant; +use std::time::{Duration, Instant}; use futures::TryStreamExt; use itertools::Itertools; @@ -44,6 +44,7 @@ use spk_schema::{ version_to_fb_version, }; +use super::RepositoryHandle; use crate::storage::{RepositoryIndex, RepositoryIndexMut}; use crate::{Error, RepoWalkerBuilder, RepoWalkerItem, Result}; @@ -82,6 +83,288 @@ async fn remove_index_file(filepath: &PathBuf) -> Result<()> { } } +/// Helper to make a lock file for the index +async fn lock_index_file(repo: &RepositoryHandle) -> Result { + // Create a lock for the index file + let filepath = FlatBufferRepoIndex::repo_index_location(repo).await?; + + let hostname = whoami::fallible::hostname().unwrap_or_else(|_| "unknown host".to_string()); + let pid = std::process::id(); + let lock_contents = format!("host: {hostname}\npid: {pid}"); + + let index_config = match spk_config::get_config() { + Ok(config) => { + if let Some(repo_config) = config.repositories.get(&repo.name().to_string()) { + repo_config.index.clone() + } else { + spk_config::Index::default() + } + } + Err(err) => { + tracing::warn!("Unable to read spk config file, using defaults, due to: {err}"); + spk_config::Index::default() + } + }; + + FileLock::new( + filepath, + format!("{} index", repo.name()), + Some(lock_contents), + index_config.lock_sleep_seconds, + index_config.lock_max_tries, + ) + .await +} + +#[cfg(feature = "sentry")] +fn add_filelock_sentry_breadcrumb( + lock_file: &Path, + contents: Option, + held_for: Option, + sleep_seconds: Option, + max_tries: Option, +) { + let mut data = std::collections::BTreeMap::new(); + data.insert( + String::from("lock_file"), + serde_json::json!(lock_file.display().to_string()), + ); + if let Some(c) = contents { + data.insert(String::from("contents"), serde_json::json!(c)); + } + if let Some(s) = held_for { + data.insert(String::from("held_for"), serde_json::json!(s)); + } + if let Some(s) = sleep_seconds { + data.insert(String::from("sleep_seconds"), serde_json::json!(s)); + } + if let Some(t) = max_tries { + data.insert(String::from("max_tries"), serde_json::json!(t)); + } + + sentry::add_breadcrumb(sentry::Breadcrumb { + category: Some("filelock".into()), + message: Some(String::from("FileLock data")), + data, + level: sentry::Level::Info, + ..Default::default() + }); +} + +// TODO: probably should be in its own file/place +/// Helper for file locking used when saving data to a file +struct FileLock { + // Path to the lock file + lock_file: PathBuf, + // A label about the kind of file being locked for log messages + label: String, + // Whether the lock file has already been deleted (unlocked) + has_been_deleted: bool, +} + +impl FileLock { + const LOCK_EXT: &'static str = "write_lock"; + + /// Create a lock file for the given data file + pub async fn new>( + file_path: P, + label: String, + contents: Option, + sleep_seconds: u64, + max_tries: u64, + ) -> Result { + use spfs::OsError; + + let mut lock_file = file_path.as_ref().to_path_buf(); + lock_file.set_extension(Self::LOCK_EXT); + + let mut num_tries = 0; + while num_tries < max_tries { + num_tries += 1; + + match tokio::fs::OpenOptions::new() + .create_new(true) + .write(true) + .open(&lock_file) + .await + { + Ok(_file) => { + // Write any given contents to the lock file as well + if let Some(ref data) = contents + && let Err(err) = tokio::fs::write(&lock_file, data).await + { + tracing::warn!("Unable to write contents to {label} lock file: {err}"); + } + + // The write lock has been set up successfully + return Ok(FileLock { + lock_file, + label, + has_been_deleted: false, + }); + } + Err(err) => { + match err.os_error() { + Some(libc::EEXIST) => { + // Wait until the timeout before trying to acquire the + // lock again, but fail immediately for other [non-temporary] + // problems, e.g. directory not existing. + tracing::info!( + "Lock file for {label} exists, unable to write exclusively. Sleeping for {} seconds before trying again ... ({num_tries} of {} tries)", + sleep_seconds, + max_tries + ); + tokio::time::sleep(Duration::from_secs(sleep_seconds)).await; + continue; + } + _ => { + return Err(Error::UnableToOpenLockFileError( + label, + lock_file.display().to_string(), + err.to_string(), + )); + } + }; + } + } + } + + // Unable to get a lock at this point. Read the existing lock + // file for the lock data and add it to the returned error. + use std::io::Read; + let mut lock_data = String::new(); + let held_for = match std::fs::File::open(&lock_file) { + Ok(mut f) => { + f.read_to_string(&mut lock_data).unwrap_or_else(|err| { + lock_data = format!("Unknown due to error ({err})"); + 0 + }); + + match std::fs::File::metadata(&f) { + Ok(metadata) => match metadata.modified() { + Ok(time) => match time.elapsed() { + Ok(duration) => format!("{} seconds", duration.as_secs()), + Err(err) => format!("Unknown due to error ({err})"), + }, + Err(mod_time_err) => format!("Unknown due to error ({mod_time_err})"), + }, + Err(metadata_err) => format!("Unknown due to error ({metadata_err})"), + } + } + Err(open_err) => { + lock_data = format!("Unknown due to error ({open_err})"); + String::from("Unknown seconds") + } + }; + lock_data = lock_data.replace("\n", " "); + + // Extra sentry data is added here because this does not send + // directly to sentry. That happens in the top level cli error + // handler, and that handle can also adjust the scope. + #[cfg(feature = "sentry")] + add_filelock_sentry_breadcrumb( + &lock_file, + contents, + Some(held_for.clone()), + Some(sleep_seconds), + Some(max_tries), + ); + + Err(Error::UnableToGetWriteLockError( + label, + max_tries, + sleep_seconds, + lock_data, + held_for, + )) + } + + /// Unlock this file lock by removing the lock file + /// + /// # Safety: + /// File locks are used to ensure that only one process is writing + /// to a data file at a time. Removing the lock file without + /// ensuring that the data file is not being written to may cause + /// the data within the file to become corrupt. + pub unsafe fn unlock(&mut self) -> Result<()> { + // Need to keep these if-statements separate to allow + // remove_file() to be called, and set the has_been_deleted + // flag only if it succeeds in removing the file. + #[allow(clippy::collapsible_if)] + if !self.has_been_deleted { + if let Err(err) = std::fs::remove_file(&self.lock_file) { + // File not found errors are ignored. + if err.kind() != std::io::ErrorKind::NotFound { + // Log this issue directly with sentry to ensure + // it does not get lost if unlock() is called from + // the drop handler. This can adjust the scope and + // add breadcrumbs because it sends directly to sentry. + #[cfg(feature = "sentry")] + { + // Unfortunately, most of the breadcrumb data + // is not available here. But still want to + // attach the path to the lock file in the breadcrumb. + add_filelock_sentry_breadcrumb(&self.lock_file, None, None, None, None); + sentry::with_scope( + |scope| { + // Make a new context section for the sentry message + let mut data = std::collections::BTreeMap::new(); + data.insert(String::from("label"), serde_json::json!(self.label)); + data.insert( + String::from("lockfile"), + serde_json::json!(self.lock_file), + ); + scope.set_context( + "LockFile", + sentry::protocol::Context::Other(data), + ); + + let fingerprints: Vec<&str> = vec!["{{ error.value }}"]; + scope.set_fingerprint(Some(&fingerprints)); + }, + || { + // The Error enums are not clonable so a duplicate is + // made here to send to sentry directly. + let unlock_error = Error::UnableToRemoveWriteLockError( + self.label.clone(), + self.lock_file.display().to_string(), + err.to_string(), + ); + sentry_miette::capture_miette(&(unlock_error.into())); + }, + ); + } + + let unlock_error = Error::UnableToRemoveWriteLockError( + self.label.clone(), + self.lock_file.display().to_string(), + err.to_string(), + ); + + return Err(unlock_error); + } + } else { + // The remove_file() call above has succeeded and the + // lock file has been deleted. + self.has_been_deleted = true; + } + } + Ok(()) + } +} + +impl Drop for FileLock { + fn drop(&mut self) { + if !self.has_been_deleted + && let Err(err) = unsafe { self.unlock() } + { + // Can only the problem here. The unlock() method will + // have sent it to sentry, if that feature is enabled. + tracing::warn!("{err}"); + } + } +} + #[derive(Debug, Default)] pub struct FlatBufferRepoIndex { // The bytes of the index, usually read from a file @@ -1030,11 +1313,14 @@ impl RepositoryIndexMut for FlatBufferRepoIndex { ) .into()); } + let repo = &repos[0].1; + + // Lock the index file until the data has been updated + let mut _lock = lock_index_file(repo).await?; let (packages, global_vars) = FlatBufferRepoIndex::gather_all_data_from_repo(repos).await?; // Assemble the data into a flatbuffer index and save it - let repo = &repos[0].1; let builder = FlatBufferRepoIndex::generate_index_builder(repo, packages, global_vars).await?; @@ -1057,6 +1343,10 @@ impl RepositoryIndexMut for FlatBufferRepoIndex { repo: &crate::RepositoryHandle, package_versions: &[OptVersionIdent], ) -> miette::Result<()> { + // Lock the index file until the data has been updated + let mut _lock = lock_index_file(repo).await?; + + // Get the updated package data from the index and the repo let (packages, global_vars) = self .gather_updates_from_repo(repo, package_versions) .await?; diff --git a/crates/spk/src/cli.rs b/crates/spk/src/cli.rs index eb951d37d..26fda3cea 100644 --- a/crates/spk/src/cli.rs +++ b/crates/spk/src/cli.rs @@ -101,6 +101,18 @@ impl Opt { // from io::Decision::Formatter before it returns // these errors. } + Some(Error::SpkStorageError(storage_error)) + if matches!( + &**storage_error, + spk_storage::Error::UnableToRemoveWriteLockError(..) + ) => + { + // UnableToRemoveWriteLock errors are not sent to + // sentry here. A message has already been sent to + // sentry from the FileLock's unlock() method before + // it returns the error (even when it has been + // hidden by the FileLock's drop handler). + } _ => { // Send all other errors that reach this level to sentry sentry::with_scope( diff --git a/cspell.json b/cspell.json index cbc565a5c..a4aa8c8c6 100644 --- a/cspell.json +++ b/cspell.json @@ -234,6 +234,7 @@ "FETCHCONTENT", "fexceptions", "filehandle", + "filelock", "filesequence", "filesystems", "filesytem",