diff --git a/Cargo.lock b/Cargo.lock index 06e670c1c..67aa0ff35 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1344,6 +1344,12 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "foldhash" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" + [[package]] name = "foreign-types" version = "0.3.2" @@ -1619,6 +1625,9 @@ name = "hashbrown" version = "0.15.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9229cfe53dfd69f0609a49f65461bd93001ea1ef889cd5529dd176593f5338a1" +dependencies = [ + "foldhash", +] [[package]] name = "hashlink" @@ -2715,6 +2724,18 @@ dependencies = [ "indexmap 2.11.0", ] +[[package]] +name = "petgraph" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8701b58ea97060d5e5b155d383a69952a60943f0e6dfe30b04c287beb0b27455" +dependencies = [ + "fixedbitset", + "hashbrown 0.15.5", + "indexmap 2.11.0", + "serde", +] + [[package]] name = "phf" version = "0.11.3" @@ -2999,7 +3020,7 @@ dependencies = [ "log", "multimap", "once_cell", - "petgraph", + "petgraph 0.7.1", "prettyplease", "prost", "prost-types", @@ -3317,7 +3338,7 @@ dependencies = [ "futures", "indexmap 2.11.0", "itertools 0.14.0", - "petgraph", + "petgraph 0.7.1", "tokio", "tracing", ] @@ -3931,6 +3952,7 @@ dependencies = [ "num_cpus", "once_cell", "parsedbuf", + "petgraph 0.8.3", "pin-project-lite", "procfs", "progress_bar_derive_macro", diff --git a/Cargo.toml b/Cargo.toml index 18f520c5a..c933479fe 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -83,6 +83,7 @@ nonempty = "0.12.0" once_cell = "1.8" parsedbuf = { path = "crates/parsedbuf" } paste = "1.0" +petgraph = "0.8.3" pin-project-lite = "0.2.0" procfs = "0.16.0" progress_bar_derive_macro = { path = "crates/progress_bar_derive_macro" } diff --git a/crates/spfs/Cargo.toml b/crates/spfs/Cargo.toml index 4585bbfb3..ba8cdc3b0 100644 --- a/crates/spfs/Cargo.toml +++ b/crates/spfs/Cargo.toml @@ -63,6 +63,7 @@ nonempty = "0.8.1" num_cpus = "1.13.1" once_cell = { workspace = true } parsedbuf = { path = "../parsedbuf" } +petgraph = { workspace = true } pin-project-lite = { workspace = true } progress_bar_derive_macro = { workspace = true } prost = { workspace = true } diff --git a/crates/spfs/src/check.rs b/crates/spfs/src/check.rs index 47f8214ec..f62e1beef 100644 --- a/crates/spfs/src/check.rs +++ b/crates/spfs/src/check.rs @@ -13,7 +13,7 @@ use once_cell::sync::OnceCell; use progress_bar_derive_macro::ProgressBar; use tokio::sync::Semaphore; -use crate::graph::{AnnotationValue, FoundDigest}; +use crate::graph::{AnnotationValue, RichDigest}; use crate::prelude::*; use crate::sync::SyncPolicy; use crate::sync::reporter::{Summary, SyncObjectResult, SyncPayloadResult}; @@ -124,8 +124,8 @@ where .find_digests(&graph::DigestSearchCriteria::All) .filter_map(|res| async move { match res { - Ok(FoundDigest::Object(digest)) => Some(Ok(digest)), - Ok(FoundDigest::Payload(_digest)) => { + Ok(RichDigest::Object(digest)) => Some(Ok(digest)), + Ok(RichDigest::Payload(_digest)) => { // Payloads are skipped on the basis that they have no // child objects. None @@ -231,11 +231,11 @@ where partial: encoding::PartialDigest, ) -> Result { match self.repo.resolve_full_digest(&partial).await? { - FoundDigest::Object(digest) => self + RichDigest::Object(digest) => self .check_object_digest(digest) .await .map(CheckItemResult::Object), - FoundDigest::Payload(digest) => self + RichDigest::Payload(digest) => self .check_payload(digest) .await .map(CheckItemResult::Payload), diff --git a/crates/spfs/src/clean.rs b/crates/spfs/src/clean.rs index 766d2b356..a80fe270d 100644 --- a/crates/spfs/src/clean.rs +++ b/crates/spfs/src/clean.rs @@ -2,7 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 // https://github.com/spkenv/spk -use std::collections::{HashMap, HashSet}; +use std::collections::{HashMap, HashSet, hash_map}; use std::fmt::Write; use std::future::ready; use std::num::NonZero; @@ -14,9 +14,13 @@ use colored::Colorize; use dashmap::DashSet; use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt}; use once_cell::sync::OnceCell; +use petgraph::algo::toposort; +use petgraph::graph::DiGraph; +use petgraph::visit::Dfs; use progress_bar_derive_macro::ProgressBar; use super::prune::PruneParameters; +use crate::graph::{DatabaseItem, RichDigest}; use crate::io::Pluralize; use crate::prelude::*; use crate::runtime::makedirs_with_perms; @@ -408,11 +412,14 @@ where } #[async_recursion::async_recursion] - async fn walk_attached_objects(&self, digests: &[encoding::Digest]) -> Result { + async fn walk_attached_objects(&self, digests: &[O]) -> Result + where + O: AsRef + Sync, + { let mut result = CleanResult::default(); let mut walk_stream = futures::stream::iter(digests.iter()) - .then(|digest| ready(self.visit_attached_objects(*digest).boxed())) + .then(|digest| ready(self.visit_attached_objects(*digest.as_ref()).boxed())) .buffer_unordered(self.discover_concurrency) .boxed(); while let Some(res) = walk_stream.try_next().await? { @@ -534,7 +541,20 @@ where // This recursively calls visit_attached_objects() (this // method) on any child objects. - result += self.walk_attached_objects(&obj.child_objects()).await?; + let child_objects = obj + .child_items() + .into_iter() + .filter_map(|child| match child { + RichDigest::Object(digest) => Some(digest), + RichDigest::Payload(digest) => { + result.visited_payloads += 1; + self.reporter.visit_payload(&digest); + self.attached.insert(digest); + None + } + }) + .collect::>(); + result += self.walk_attached_objects(&child_objects).await?; Ok(result) } @@ -544,15 +564,117 @@ where /// objects has completed successfully and with no errors. Otherwise, it may /// remove data that is still being used async unsafe fn remove_unvisited_objects_and_payloads(&self) -> Result { + #[derive(Debug)] + enum Node { + DatabaseItem(DatabaseItem), + ObjectDigest(encoding::Digest), + } + + impl Node { + #[inline] + fn digest(&self) -> &encoding::Digest { + match self { + Node::DatabaseItem(item) => item.digest(), + Node::ObjectDigest(digest) => digest, + } + } + } + + // Construct a graph of all items to be able to delete in topological + // order, top down. This is how we avoid creating dangling references. + let mut g = DiGraph::new(); + let mut idx_to_node = HashMap::new(); + let mut digest_to_idx = HashMap::new(); + + let root_attached_node = g.add_node(()); + let root_unattached_node = g.add_node(()); + + let mut stream = self.repo.iter_items(); + + while let Some(item) = stream.try_next().await? { + let item_digest = *item.item.digest(); + + // "Upgrade" the Node from ObjectDigest to DatabaseItem when + // possible. + let node = if let DatabaseItem::Object(digest, _obj) = &item.item + && let Some(node) = digest_to_idx.get(digest) + { + if let Some(Node::ObjectDigest(_)) = idx_to_node.get(node) { + idx_to_node.insert(*node, Node::DatabaseItem(item.item)); + } + *node + } else { + let node = g.add_node(()); + idx_to_node.insert(node, Node::DatabaseItem(item.item)); + digest_to_idx.insert(item_digest, node); + node + }; + + if let Some(parent) = item.parent { + let parent_node = match digest_to_idx.entry(parent) { + hash_map::Entry::Occupied(occupied_entry) => *occupied_entry.get(), + hash_map::Entry::Vacant(vacant_entry) => { + let parent_node = g.add_node(()); + idx_to_node.insert(parent_node, Node::ObjectDigest(parent)); + vacant_entry.insert(parent_node); + + parent_node + } + }; + + if self.attached.contains(&parent) { + g.add_edge(root_attached_node, parent_node, ()); + } else { + g.add_edge(root_unattached_node, parent_node, ()); + } + + g.add_edge(parent_node, node, ()); + } else if !self.attached.contains(&item_digest) { + g.add_edge(root_unattached_node, node, ()); + } else { + g.add_edge(root_attached_node, node, ()); + } + } + + let mut dfs = Dfs::new(&g, root_attached_node); + let mut attached_nodes = HashSet::new(); + while let Some(node) = dfs.next(&g) { + idx_to_node + .get(&node) + .map(|node| attached_nodes.insert(node.digest())); + } + + let topo = toposort(&g, None).map_err(|_| { + Error::String("Cycle detected in object graph during clean operation".to_string()) + })?; + let mut result = CleanResult::default(); - let mut stream = self - .repo - .iter_objects() - // we have no interest in removing attached items - .try_filter(|item| ready(!self.attached.contains(item.digest()))) - // we have already visited all attached objects - // but also want to report these ones - .inspect_ok(|item| match item { + for node_idx in topo { + let Some(node) = idx_to_node.get(&node_idx) else { + continue; + }; + let digest = node.digest(); + if attached_nodes.contains(digest) { + continue; + } + + let Node::DatabaseItem(item) = node else { + // Can this happen? This item is considered garbage but it was + // only ever reported as a parent and the stream never visited + // the object itself. + debug_assert!(false, "Dangling object digest found during clean"); + + // We're not going to clean this item so we also can't clean + // anything downstream of it. + let mut dfs = Dfs::new(&g, node_idx); + while let Some(n) = dfs.next(&g) { + let node = idx_to_node.get(&n).unwrap(); + attached_nodes.insert(node.digest()); + } + continue; + }; + + match item { graph::DatabaseItem::Object(_digest, flat_object) => { self.reporter.visit_object(flat_object); result.visited_objects += 1; @@ -561,62 +683,56 @@ where self.reporter.visit_payload(digest); result.visited_payloads += 1; } - }) - .and_then(|item| { - if self.dry_run { - // XXX: this ignores must_be_older_than - return ready(Ok(ready(Ok((item, true))).boxed())); + } + + if self.dry_run { + // XXX: this ignores must_be_older_than + continue; + } + + let future = match &item { + graph::DatabaseItem::Object(digest, _flat_object) => futures::future::Either::Left( + self.repo + .remove_object_if_older_than(self.must_be_older_than, *digest), + ), + graph::DatabaseItem::Payload(digest) => futures::future::Either::Right( + self.repo + .remove_payload_if_older_than(self.must_be_older_than, *digest), + ), + }; + + let was_removed = match future.await { + Ok(removed) => removed, + Err(Error::UnknownObject(_)) => true, + Err(err) => { + self.reporter.error_encountered(&err); + result.errors.push(err); + false } - let future = (match &item { - graph::DatabaseItem::Object(digest, _flat_object) => self - .repo - .remove_object_if_older_than(self.must_be_older_than, *digest) - .boxed(), - graph::DatabaseItem::Payload(digest) => self - .repo - .remove_payload_if_older_than(self.must_be_older_than, *digest) - .boxed(), - }) - .map(|res| { - if let Err(Error::UnknownObject(_)) = res { - return Ok(true); - } - res - }) - .map_ok(move |removed| (item, removed)) - .boxed(); - ready(Ok(future)) - }) - .try_buffer_unordered(self.removal_concurrency) - .try_filter_map(|(item, removed)| { - if !removed { - // objects that are too new to be removed become - // implicitly attached - self.attached.insert(*item.digest()); + }; + + // If the item wasn't removed, anything downstream of it can't be + // removed either. + if !was_removed { + let mut dfs = Dfs::new(&g, node_idx); + while let Some(n) = dfs.next(&g) { + let node = idx_to_node.get(&n).unwrap(); + attached_nodes.insert(node.digest()); } - ready(Ok(removed.then_some(item))) - }) - .inspect_ok(|item| match item { - graph::DatabaseItem::Object(_digest, flat_object) => { + continue; + } + + match item { + graph::DatabaseItem::Object(digest, flat_object) => { self.reporter.object_removed(flat_object); + result.removed_objects.insert(*digest); } graph::DatabaseItem::Payload(digest) => { self.reporter.payload_removed(digest); + result.removed_payloads.insert(*digest); } - }) - .boxed(); - let mut result = CleanResult::default(); - while let Some(item) = stream.try_next().await? { - match item { - graph::DatabaseItem::Object(digest, _flat_object) => { - result.removed_objects.insert(digest); - } - graph::DatabaseItem::Payload(digest) => { - result.removed_payloads.insert(digest); - } - }; + } } - drop(stream); Ok(result) } diff --git a/crates/spfs/src/clean_test.rs b/crates/spfs/src/clean_test.rs index 151a5e5f6..0c31f2b7a 100644 --- a/crates/spfs/src/clean_test.rs +++ b/crates/spfs/src/clean_test.rs @@ -185,6 +185,53 @@ async fn test_clean_untagged_objects(#[future] tmprepo: TempRepo, tmpdir: tempfi } } +#[rstest] +#[tokio::test] +async fn test_clean_unattached_payloads(#[future] tmprepo: TempRepo, tmpdir: tempfile::TempDir) { + init_logging(); + let tmprepo = tmprepo.await; + + let data_dir_1 = tmpdir.path().join("data"); + ensure(data_dir_1.join("dir/dir/test.file"), "1 hello"); + ensure(data_dir_1.join("dir/dir/test.file2"), "1 hello, world"); + ensure(data_dir_1.join("dir/dir/test.file4"), "1 hello, world"); + + let manifest = crate::Committer::new(&tmprepo) + .commit_dir(data_dir_1.as_path()) + .await + .unwrap(); + + // Detach all the payloads by deleting the manifest object. + tmprepo + .remove_object(manifest.to_graph_manifest().digest().unwrap()) + .await + .unwrap(); + + let cleaner = Cleaner::new(&tmprepo).with_reporter(TracingCleanReporter); + let _result = cleaner + .prune_all_tags_and_clean() + .await + .expect("failed to clean objects"); + + // All the payloads should be gone. + for node in manifest.walk() { + if !node.entry.kind.is_blob() { + continue; + } + let res = tmprepo.open_payload(node.entry.object).await; + if let Err(Error::UnknownObject(_)) = res { + continue; + } + if let Err(err) = res { + println!("{err:?}"); + } + panic!( + "expected payload to be cleaned but it was not: {:?}", + node.entry.object + ); + } +} + #[rstest] #[tokio::test] async fn test_clean_on_repo_with_tag_namespace( @@ -419,6 +466,10 @@ async fn test_clean_untagged_objects_layers_platforms(#[future] tmprepo: TempRep init_logging(); let tmprepo = tmprepo.await; let manifest = tracking::Manifest::<()>::default(); + tmprepo + .write_object(&manifest.to_graph_manifest()) + .await + .unwrap(); let layer = tmprepo .create_layer(&manifest.to_graph_manifest()) .await @@ -517,3 +568,48 @@ fn list_files>(dirname: P) -> Vec { } all_files } + +/// Clean must not delete items that referenced by other items. +#[rstest] +#[tokio::test] +async fn clean_must_not_violate_invariants(#[future] tmprepo: TempRepo) { + init_logging(); + let tmprepo = tmprepo.await; + let manifest = tracking::Manifest::<()>::default(); + tmprepo + .write_object(&manifest.to_graph_manifest()) + .await + .unwrap(); + let layer = tmprepo + .create_layer(&manifest.to_graph_manifest()) + .await + .unwrap(); + + // Note current time now. + let time_before_platform = Utc::now(); + + // Time passes... + sleep(Duration::from_millis(250)).await; + + let platform = tmprepo + .create_platform(layer.digest().unwrap().into()) + .await + .unwrap(); + + let cleaner = Cleaner::new(&tmprepo) + .with_required_age_cutoff(time_before_platform) + .with_reporter(TracingCleanReporter); + let result = cleaner + .prune_all_tags_and_clean() + .await + .expect("failed to clean objects"); + println!("{result:#?}"); + + if let Err(Error::UnknownObject(_)) = tmprepo.read_platform(platform.digest().unwrap()).await { + panic!("expected platform to not be cleaned, because it is newer than the cutoff") + } + + if let Err(Error::UnknownObject(_)) = tmprepo.read_layer(layer.digest().unwrap()).await { + panic!("expected layer to not be cleaned, because it is referenced by the platform") + } +} diff --git a/crates/spfs/src/graph/annotation.rs b/crates/spfs/src/graph/annotation.rs index 9ceb7e7bb..c85c1296b 100644 --- a/crates/spfs/src/graph/annotation.rs +++ b/crates/spfs/src/graph/annotation.rs @@ -5,6 +5,7 @@ use std::borrow::Cow; use std::fmt::Display; +use crate::graph::RichDigest; use crate::{Result, encoding}; #[cfg(test)] @@ -166,13 +167,13 @@ impl<'buf> Annotation<'buf> { } } - /// Return the child objects of this object, if any. - pub fn child_objects(&self) -> Vec { + /// Return the child items of this object, if any. + pub fn child_items(&self) -> Vec { let mut result = Vec::new(); if let Some(data) = self.0.data_as_annotation_digest() && let Some(d) = data.digest() { - result.push(*d) + result.push(RichDigest::Payload(*d)) } result } diff --git a/crates/spfs/src/graph/database.rs b/crates/spfs/src/graph/database.rs index d9b2228a6..4fdaa8ffd 100644 --- a/crates/spfs/src/graph/database.rs +++ b/crates/spfs/src/graph/database.rs @@ -2,7 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 // https://github.com/spkenv/spk -use std::collections::VecDeque; +use std::collections::{HashSet, VecDeque}; use std::pin::Pin; use std::task::Poll; @@ -10,6 +10,7 @@ use chrono::{DateTime, Utc}; use futures::{Future, FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt}; use super::{FlatObject, Object, ObjectProto}; +use crate::graph::object::ChildItem; use crate::{Error, Result, encoding}; /// Walks an object tree depth-first starting at some root digest @@ -17,10 +18,10 @@ use crate::{Error, Result, encoding}; pub struct DatabaseWalker<'db> { db: &'db dyn DatabaseView, next: Option<( - encoding::Digest, - Pin> + Send + 'db>>, + ChildItem, + Pin> + Send + 'db>>, )>, - queue: VecDeque, + queue: VecDeque, } impl<'db> DatabaseWalker<'db> { @@ -29,9 +30,12 @@ impl<'db> DatabaseWalker<'db> { /// /// # Errors /// The same as [`DatabaseView::read_object`] - pub fn new(db: &'db dyn DatabaseView, root: encoding::Digest) -> Self { + pub fn new(db: &'db dyn DatabaseView, root: RichDigest) -> Self { let mut queue = VecDeque::new(); - queue.push_back(root); + queue.push_back(ChildItem { + parent: *root.digest(), + child: root, + }); DatabaseWalker { db, queue, @@ -40,45 +44,109 @@ impl<'db> DatabaseWalker<'db> { } } +/// An item returned by [`DatabaseWalker`]. +pub struct DatabaseWalkerItem { + /// The parent object digest of the currently walked item. + pub parent: encoding::Digest, + /// The currently walked item. + pub child: DatabaseItem, +} + impl Stream for DatabaseWalker<'_> { - type Item = Result<(encoding::Digest, Object)>; + type Item = Result; fn poll_next( mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> Poll> { - let (digest, mut current_future) = match self.next.take() { + let (child_object, mut current_future) = match self.next.take() { Some(f) => f, None => match self.queue.pop_front() { None => return Poll::Ready(None), - Some(digest) => (digest, self.db.read_object(digest)), + Some(ChildItem { + parent, + child: RichDigest::Object(digest), + }) => ( + ChildItem { + parent, + child: RichDigest::Object(digest), + }, + self.db + .read_object(digest) + .map_ok(move |obj| DatabaseItem::Object(digest, obj)) + .boxed(), + ), + Some(ChildItem { + parent, + child: RichDigest::Payload(digest), + }) => ( + ChildItem { + parent, + child: RichDigest::Payload(digest), + }, + async move { Ok(DatabaseItem::Payload(digest)) }.boxed(), + ), }, }; match Pin::new(&mut current_future).poll(cx) { Poll::Pending => { - self.next = Some((digest, current_future)); + self.next = Some((child_object, current_future)); Poll::Pending } Poll::Ready(obj) => Poll::Ready(match obj { - Ok(obj) => { - for digest in obj.child_objects() { - self.queue.push_back(digest); + Ok(DatabaseItem::Object(_, obj)) => { + for digest in obj.child_items() { + self.queue.push_back(ChildItem { + parent: *child_object.child.digest(), + child: digest, + }); } - Some(Ok((digest, obj))) + Some(Ok(DatabaseWalkerItem { + parent: child_object.parent, + child: DatabaseItem::Object(*child_object.child.digest(), obj), + })) } + Ok(DatabaseItem::Payload(digest)) => Some(Ok(DatabaseWalkerItem { + parent: child_object.parent, + child: DatabaseItem::Payload(digest), + })), Err(err) => Some(Err(err)), }), } } } -/// Iterates all objects in a database, in no particular order +type FindDigestStream = Pin> + Send>>; +type WalkObjectsStream<'db> = Pin> + Send + 'db>>; + +#[derive(Default)] +enum DatabaseIterState<'db> { + /// Ready to work on the next digest from find_digests + NextDigest(FindDigestStream), + /// Walking the latest object returned from read_object + WalkingObject { + digest: encoding::Digest, + stream: WalkObjectsStream<'db>, + find_digest_stream: FindDigestStream, + }, + #[default] + Unit, +} + +/// Iterates all items in a database, in no particular order. +/// +/// Items will be repeated if they are reachable by multiple paths, allowing +/// the caller to build a graph if desired. This iterator may not return objects +/// that were added concurrently after the iterator was created. #[allow(clippy::type_complexity)] pub struct DatabaseIterator<'db> { db: &'db dyn DatabaseView, - next: Option> + Send + 'db>>>, - inner: Pin> + Send>>, + state: DatabaseIterState<'db>, + /// Digests are "walked" as they are found, which can visit digests that + /// will also be later found by `find_digests`. Track what digests have been + /// walked to avoid walking them redundantly. + walked_digests: HashSet, } impl<'db> DatabaseIterator<'db> { @@ -91,13 +159,31 @@ impl<'db> DatabaseIterator<'db> { let iter = db.find_digests(&crate::graph::DigestSearchCriteria::All); DatabaseIterator { db, - inner: iter, - next: None, + state: DatabaseIterState::NextDigest(iter), + walked_digests: HashSet::new(), } } } /// An item returned by [`DatabaseIterator`]. +#[derive(Debug)] +pub struct DatabaseIterItem { + /// The parent object of this item, if any. + /// + /// A child object may have multiple parents; this is just one of them. + /// [`DatabaseIterator`] will yield the same item at least as many times as + /// it has different parents, but may also yield it with `None` if no parent + /// has been determined yet. + /// + /// It is possible for objects to be written concurrently while iterating, + /// so this may be `None` even for objects that do have a parent. + pub parent: Option, + /// The item itself. + pub item: DatabaseItem, +} + +/// Discriminate digests between objects and payloads. +#[derive(Debug)] pub enum DatabaseItem { Object(encoding::Digest, Object), Payload(encoding::Digest), @@ -114,41 +200,112 @@ impl DatabaseItem { } } +impl From<&DatabaseItem> for RichDigest { + fn from(value: &DatabaseItem) -> Self { + match value { + DatabaseItem::Object(digest, _) => RichDigest::Object(*digest), + DatabaseItem::Payload(digest) => RichDigest::Payload(*digest), + } + } +} + impl Stream for DatabaseIterator<'_> { - type Item = Result; + type Item = Result; fn poll_next( mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> Poll> { - let mut current_future = match self.next.take() { - Some(f) => f, - None => match Pin::new(&mut self.inner).poll_next(cx) { - Poll::Pending => return Poll::Pending, + match std::mem::take(&mut self.state) { + DatabaseIterState::NextDigest(mut find_digest_stream) => { + match Pin::new(&mut find_digest_stream).poll_next(cx) { + Poll::Pending => { + self.state = DatabaseIterState::NextDigest(find_digest_stream); + Poll::Pending + } + Poll::Ready(inner_next) => match inner_next { + None => Poll::Ready(None), + Some(Err(err)) => { + self.state = DatabaseIterState::NextDigest(find_digest_stream); + Poll::Ready(Some(Err(err))) + } + Some(Ok(RichDigest::Object(digest))) => { + if self.walked_digests.contains(&digest) { + // Already walked this object, skip it. + self.state = DatabaseIterState::NextDigest(find_digest_stream); + return self.poll_next(cx); + } + let stream = self.db.walk_items(RichDigest::Object(digest)); + self.state = DatabaseIterState::WalkingObject { + digest, + stream: stream.boxed(), + find_digest_stream, + }; + self.poll_next(cx) + } + Some(Ok(RichDigest::Payload(digest))) => { + self.state = DatabaseIterState::NextDigest(find_digest_stream); + if self.walked_digests.contains(&digest) { + // Already walked this payload, skip it. + return self.poll_next(cx); + } + Poll::Ready(Some(Ok(DatabaseIterItem { + parent: None, + item: DatabaseItem::Payload(digest), + }))) + } + }, + } + } + DatabaseIterState::WalkingObject { + digest, + mut stream, + find_digest_stream, + } => match Stream::poll_next(Pin::new(&mut stream), cx) { + Poll::Pending => { + self.state = DatabaseIterState::WalkingObject { + digest, + stream, + find_digest_stream, + }; + Poll::Pending + } Poll::Ready(inner_next) => match inner_next { - None => return Poll::Ready(None), - Some(Err(err)) => return Poll::Ready(Some(Err(err))), - Some(Ok(FoundDigest::Object(digest))) => self - .db - .read_object(digest) - .map_ok(move |x| DatabaseItem::Object(digest, x)) - .map_err(move |err| format!("Error reading object {digest}: {err}").into()) - .boxed(), - Some(Ok(FoundDigest::Payload(digest))) => { - futures::future::ready(Ok(DatabaseItem::Payload(digest))).boxed() + None => { + self.state = DatabaseIterState::NextDigest(find_digest_stream); + self.poll_next(cx) + } + Some(Err(err)) => { + self.state = DatabaseIterState::WalkingObject { + digest, + stream, + find_digest_stream, + }; + Poll::Ready(Some(Err(err))) + } + Some(Ok(walked_item)) => { + self.state = DatabaseIterState::WalkingObject { + digest, + stream, + find_digest_stream, + }; + // Any digests seen here can be considered walked. + self.walked_digests.insert(*walked_item.child.digest()); + Poll::Ready(Some(Ok(DatabaseIterItem { + parent: { + // walk_objects yields the root object with + // itself as parent; + (walked_item.parent != *walked_item.child.digest()) + .then_some(walked_item.parent) + }, + item: walked_item.child, + }))) } }, }, - }; - match Pin::new(&mut current_future).poll(cx) { - Poll::Pending => { - self.next = Some(current_future); - Poll::Pending + DatabaseIterState::Unit => { + unreachable!() } - Poll::Ready(res) => Poll::Ready(match res { - Ok(obj) => Some(Ok(obj)), - Err(err) => Some(Err(err)), - }), } } } @@ -160,19 +317,19 @@ pub enum DigestSearchCriteria { } /// The types of digests that can exist in a database. -#[derive(PartialEq, Eq)] -pub enum FoundDigest { +#[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +pub enum RichDigest { Object(encoding::Digest), Payload(encoding::Digest), } -impl FoundDigest { +impl RichDigest { /// Borrow the inner digest. #[inline] pub fn digest(&self) -> &encoding::Digest { match self { - FoundDigest::Object(d) => d, - FoundDigest::Payload(d) => d, + RichDigest::Object(d) => d, + RichDigest::Payload(d) => d, } } @@ -180,8 +337,8 @@ impl FoundDigest { #[inline] pub fn into_digest(self) -> encoding::Digest { match self { - FoundDigest::Object(d) => d, - FoundDigest::Payload(d) => d, + RichDigest::Object(d) => d, + RichDigest::Payload(d) => d, } } @@ -189,8 +346,8 @@ impl FoundDigest { #[inline] pub fn as_bytes(&self) -> &[u8] { match self { - FoundDigest::Object(d) => d.as_bytes(), - FoundDigest::Payload(d) => d.as_bytes(), + RichDigest::Object(d) => d.as_bytes(), + RichDigest::Payload(d) => d.as_bytes(), } } } @@ -210,7 +367,7 @@ pub trait DatabaseView: Sync + Send { fn find_digests<'a>( &self, search_criteria: &'a DigestSearchCriteria, - ) -> Pin> + Send + 'a>>; + ) -> Pin> + Send + 'a>>; /// Return true if this database contains the identified object. /// @@ -218,10 +375,12 @@ pub trait DatabaseView: Sync + Send { async fn has_object(&self, digest: encoding::Digest) -> bool; /// Iterate all the objects and payloads in this database. - fn iter_objects(&self) -> DatabaseIterator<'_>; + fn iter_items(&self) -> DatabaseIterator<'_>; - /// Walk all objects and payloads connected to the given root object. - fn walk_objects<'db>(&'db self, root: &encoding::Digest) -> DatabaseWalker<'db>; + /// Walk all objects and payloads connected to the given root item. + /// + /// The given item is included in the walk. + fn walk_items<'db>(&'db self, root: RichDigest) -> DatabaseWalker<'db>; /// Return the shortened version of the given digest. /// @@ -262,7 +421,7 @@ pub trait DatabaseView: Sync + Send { /// # Errors /// - UnknownReferenceError: if the digest cannot be resolved /// - AmbiguousReferenceError: if the digest could point to multiple items - async fn resolve_full_digest(&self, partial: &encoding::PartialDigest) -> Result { + async fn resolve_full_digest(&self, partial: &encoding::PartialDigest) -> Result { let options: Vec<_> = self .find_digests(&crate::graph::DigestSearchCriteria::StartsWith( partial.clone(), @@ -291,16 +450,16 @@ impl DatabaseView for &T { fn find_digests<'a>( &self, search_criteria: &'a DigestSearchCriteria, - ) -> Pin> + Send + 'a>> { + ) -> Pin> + Send + 'a>> { DatabaseView::find_digests(&**self, search_criteria) } - fn iter_objects(&self) -> DatabaseIterator<'_> { - DatabaseView::iter_objects(&**self) + fn iter_items(&self) -> DatabaseIterator<'_> { + DatabaseView::iter_items(&**self) } - fn walk_objects<'db>(&'db self, root: &encoding::Digest) -> DatabaseWalker<'db> { - DatabaseView::walk_objects(&**self, root) + fn walk_items<'db>(&'db self, root: RichDigest) -> DatabaseWalker<'db> { + DatabaseView::walk_items(&**self, root) } } diff --git a/crates/spfs/src/graph/layer.rs b/crates/spfs/src/graph/layer.rs index d4f74a1ec..e20d88104 100644 --- a/crates/spfs/src/graph/layer.rs +++ b/crates/spfs/src/graph/layer.rs @@ -6,6 +6,7 @@ use spfs_proto::LayerArgs; use super::object::HeaderBuilder; use super::{Annotation, AnnotationValue, ObjectKind}; +use crate::graph::RichDigest; use crate::{Error, Result, encoding}; #[cfg(test)] @@ -105,16 +106,15 @@ impl Layer { .collect::>() } - /// Return the child object of this one in the object DG. - #[inline] - pub fn child_objects(&self) -> Vec { + /// Return the child items of this one in the DG. + pub fn child_items(&self) -> Vec { let mut children = Vec::new(); if let Some(manifest_digest) = self.manifest() { - children.push(*manifest_digest) + children.push(RichDigest::Object(*manifest_digest)) } for entry in self.annotations() { let annotation: Annotation = entry.into(); - children.extend(annotation.child_objects()); + children.extend(annotation.child_items()); } children } diff --git a/crates/spfs/src/graph/manifest.rs b/crates/spfs/src/graph/manifest.rs index 2ac6e71d0..ae056788c 100644 --- a/crates/spfs/src/graph/manifest.rs +++ b/crates/spfs/src/graph/manifest.rs @@ -9,6 +9,7 @@ use spfs_proto::ManifestArgs; use super::object::HeaderBuilder; use super::{Entry, ObjectKind, Tree}; +use crate::graph::RichDigest; use crate::prelude::*; use crate::{Result, encoding, tracking}; @@ -63,13 +64,13 @@ impl Manifest { std::iter::once(self.root()).chain(self.trees()) } - /// Return the digests of objects that this manifest refers to. - pub fn child_objects(&self) -> Vec { + /// Return the digests of items that this manifest refers to. + pub fn child_items(&self) -> Vec { let mut children = BTreeSet::new(); for tree in self.iter_trees() { for entry in tree.entries() { if entry.kind().is_blob() { - children.insert(*entry.object()); + children.insert(RichDigest::Payload(*entry.object())); } } } diff --git a/crates/spfs/src/graph/mod.rs b/crates/spfs/src/graph/mod.rs index b5637594b..3b91ffb7d 100644 --- a/crates/spfs/src/graph/mod.rs +++ b/crates/spfs/src/graph/mod.rs @@ -29,11 +29,12 @@ pub use database::{ Database, DatabaseExt, DatabaseItem, + DatabaseIterItem, DatabaseIterator, DatabaseView, DatabaseWalker, DigestSearchCriteria, - FoundDigest, + RichDigest, }; pub use entry::Entry; pub use kind::{HasKind, Kind, ObjectKind}; diff --git a/crates/spfs/src/graph/object.rs b/crates/spfs/src/graph/object.rs index d0f8f44bb..9abacb97d 100644 --- a/crates/spfs/src/graph/object.rs +++ b/crates/spfs/src/graph/object.rs @@ -12,12 +12,28 @@ use serde::{Deserialize, Serialize}; use super::error::{ObjectError, ObjectResult}; use super::{Annotation, Blob, DatabaseView, HasKind, Kind, Layer, Manifest, ObjectKind, Platform}; use crate::encoding; +use crate::graph::RichDigest; use crate::storage::RepositoryHandle; #[cfg(test)] #[path = "./object_test.rs"] mod object_test; +/// A child item digest along with its parent +pub struct ChildItem { + /// The parent object digest of this child. + pub parent: encoding::Digest, + /// The child item digest. + pub child: RichDigest, +} + +impl AsRef for ChildItem { + #[inline] + fn as_ref(&self) -> &encoding::Digest { + self.child.digest() + } +} + /// An node in the spfs object graph pub type Object = FlatObject>; @@ -159,11 +175,16 @@ impl Object { } } - pub fn child_objects(&self) -> Vec { + /// Return digests of all the immediate child items of this object. + pub fn child_items(&self) -> Vec { match self.to_enum() { - Enum::Platform(platform) => platform.child_objects(), - Enum::Layer(layer) => layer.child_objects(), - Enum::Manifest(manifest) => manifest.child_objects(), + Enum::Platform(platform) => platform + .child_objects() + .into_iter() + .map(RichDigest::Object) + .collect(), + Enum::Layer(layer) => layer.child_items(), + Enum::Manifest(manifest) => manifest.child_items(), Enum::Blob(_blob) => Vec::new(), } } diff --git a/crates/spfs/src/proto/conversions.rs b/crates/spfs/src/proto/conversions.rs index 44da10009..14a68ce32 100644 --- a/crates/spfs/src/proto/conversions.rs +++ b/crates/spfs/src/proto/conversions.rs @@ -49,30 +49,30 @@ impl<'a> TryFrom<&'a super::Digest> for &'a encoding::Digest { } } -impl From for super::FoundDigest { - fn from(source: graph::FoundDigest) -> Self { +impl From for super::FoundDigest { + fn from(source: graph::RichDigest) -> Self { use super::found_digest::Kind; match source { - graph::FoundDigest::Object(digest) => Self { + graph::RichDigest::Object(digest) => Self { kind: Some(Kind::Object(digest.into())), }, - graph::FoundDigest::Payload(digest) => Self { + graph::RichDigest::Payload(digest) => Self { kind: Some(Kind::Payload(digest.into())), }, } } } -impl TryFrom for graph::FoundDigest { +impl TryFrom for graph::RichDigest { type Error = Error; fn try_from(source: super::FoundDigest) -> Result { match source.kind { Some(kind) => match kind { super::found_digest::Kind::Object(digest) => { - Ok(graph::FoundDigest::Object(digest.try_into()?)) + Ok(graph::RichDigest::Object(digest.try_into()?)) } super::found_digest::Kind::Payload(digest) => { - Ok(graph::FoundDigest::Payload(digest.try_into()?)) + Ok(graph::RichDigest::Payload(digest.try_into()?)) } }, None => Err("Unknown found digest kind".into()), diff --git a/crates/spfs/src/resolve.rs b/crates/spfs/src/resolve.rs index f79b0af0c..7143eefad 100644 --- a/crates/spfs/src/resolve.rs +++ b/crates/spfs/src/resolve.rs @@ -152,8 +152,8 @@ pub async fn compute_environment_manifest( repo.resolve_full_digest(p) .and_then(|found_digest| async move { match found_digest { - graph::FoundDigest::Object(digest) => Ok(digest), - graph::FoundDigest::Payload(_digest) => { + graph::RichDigest::Object(digest) => Ok(digest), + graph::RichDigest::Payload(_digest) => { Err("unexpected payload digest in environment spec".into()) } } diff --git a/crates/spfs/src/storage/fallback/repository.rs b/crates/spfs/src/storage/fallback/repository.rs index c1bd8a217..90df346e6 100644 --- a/crates/spfs/src/storage/fallback/repository.rs +++ b/crates/spfs/src/storage/fallback/repository.rs @@ -11,7 +11,7 @@ use futures::Stream; use relative_path::RelativePath; use crate::config::{ToAddress, default_fallback_repo_include_secondary_tags}; -use crate::graph::{FoundDigest, ObjectProto}; +use crate::graph::{ObjectProto, RichDigest}; use crate::prelude::*; use crate::storage::fs::{FsHashStore, ManifestRenderPath, OpenFsRepository, RenderStore}; use crate::storage::proxy::ProxyRepositoryExt; @@ -226,16 +226,16 @@ impl graph::DatabaseView for FallbackProxy { fn find_digests<'a>( &self, search_criteria: &'a graph::DigestSearchCriteria, - ) -> Pin> + Send + 'a>> { + ) -> Pin> + Send + 'a>> { self.primary.find_digests(search_criteria) } - fn iter_objects(&self) -> graph::DatabaseIterator<'_> { - self.primary.iter_objects() + fn iter_items(&self) -> graph::DatabaseIterator<'_> { + self.primary.iter_items() } - fn walk_objects<'db>(&'db self, root: &encoding::Digest) -> graph::DatabaseWalker<'db> { - self.primary.walk_objects(root) + fn walk_items<'db>(&'db self, root: RichDigest) -> graph::DatabaseWalker<'db> { + self.primary.walk_items(root) } } diff --git a/crates/spfs/src/storage/fs/database.rs b/crates/spfs/src/storage/fs/database.rs index 8f1e7d376..a1c603206 100644 --- a/crates/spfs/src/storage/fs/database.rs +++ b/crates/spfs/src/storage/fs/database.rs @@ -13,7 +13,7 @@ use encoding::prelude::*; use futures::{Stream, StreamExt, TryFutureExt}; use tokio::io::{AsyncReadExt, AsyncWriteExt}; -use crate::graph::{DatabaseView, FoundDigest, Object, ObjectProto}; +use crate::graph::{DatabaseView, Object, ObjectProto, RichDigest}; use crate::{Error, Result, encoding, graph}; #[async_trait::async_trait] @@ -32,22 +32,22 @@ impl DatabaseView for super::MaybeOpenFsRepository { fn find_digests<'a>( &self, search_criteria: &'a graph::DigestSearchCriteria, - ) -> Pin> + Send + 'a>> { + ) -> Pin> + Send + 'a>> { self.opened() .map_ok(|opened| opened.find_digests(search_criteria)) .try_flatten_stream() .boxed() } - fn iter_objects(&self) -> graph::DatabaseIterator<'_> { + fn iter_items(&self) -> graph::DatabaseIterator<'_> { graph::DatabaseIterator::new(self) } - fn walk_objects<'db>(&'db self, root: &encoding::Digest) -> graph::DatabaseWalker<'db> { - graph::DatabaseWalker::new(self, *root) + fn walk_items<'db>(&'db self, root: RichDigest) -> graph::DatabaseWalker<'db> { + graph::DatabaseWalker::new(self, root) } - async fn resolve_full_digest(&self, partial: &encoding::PartialDigest) -> Result { + async fn resolve_full_digest(&self, partial: &encoding::PartialDigest) -> Result { self.opened().await?.resolve_full_digest(partial).await } } @@ -106,33 +106,33 @@ impl DatabaseView for super::OpenFsRepository { fn find_digests<'a>( &self, search_criteria: &'a graph::DigestSearchCriteria, - ) -> Pin> + Send + 'a>> { + ) -> Pin> + Send + 'a>> { Box::pin( self.objects .find(search_criteria) - .map(|d| d.map(FoundDigest::Object)) + .map(|d| d.map(RichDigest::Object)) .chain( self.payloads .find(search_criteria) - .map(|d| d.map(FoundDigest::Payload)), + .map(|d| d.map(RichDigest::Payload)), ), ) } - fn iter_objects(&self) -> graph::DatabaseIterator<'_> { + fn iter_items(&self) -> graph::DatabaseIterator<'_> { graph::DatabaseIterator::new(self) } - fn walk_objects<'db>(&'db self, root: &encoding::Digest) -> graph::DatabaseWalker<'db> { - graph::DatabaseWalker::new(self, *root) + fn walk_items<'db>(&'db self, root: RichDigest) -> graph::DatabaseWalker<'db> { + graph::DatabaseWalker::new(self, root) } - async fn resolve_full_digest(&self, partial: &encoding::PartialDigest) -> Result { + async fn resolve_full_digest(&self, partial: &encoding::PartialDigest) -> Result { match self.objects.resolve_full_digest(partial).await { - Ok(digest) => Ok(FoundDigest::Object(digest)), + Ok(digest) => Ok(RichDigest::Object(digest)), Err(_) => { let digest = self.payloads.resolve_full_digest(partial).await?; - Ok(FoundDigest::Payload(digest)) + Ok(RichDigest::Payload(digest)) } } } diff --git a/crates/spfs/src/storage/handle.rs b/crates/spfs/src/storage/handle.rs index c0b3278e1..fbc491ab8 100644 --- a/crates/spfs/src/storage/handle.rs +++ b/crates/spfs/src/storage/handle.rs @@ -14,7 +14,7 @@ use spfs_encoding as encoding; use super::prelude::*; use super::tag::TagSpecAndTagStream; use super::{TagNamespace, TagNamespaceBuf, TagStorageMut}; -use crate::graph::{FoundDigest, ObjectProto}; +use crate::graph::{ObjectProto, RichDigest}; use crate::tracking::{self, BlobRead}; use crate::{Error, Result, graph}; @@ -304,16 +304,16 @@ impl DatabaseView for RepositoryHandle { fn find_digests<'a>( &self, search_criteria: &'a graph::DigestSearchCriteria, - ) -> Pin> + Send + 'a>> { + ) -> Pin> + Send + 'a>> { each_variant!(self, repo, { repo.find_digests(search_criteria) }) } - fn iter_objects(&self) -> graph::DatabaseIterator<'_> { - each_variant!(self, repo, { repo.iter_objects() }) + fn iter_items(&self) -> graph::DatabaseIterator<'_> { + each_variant!(self, repo, { repo.iter_items() }) } - fn walk_objects<'db>(&'db self, root: &encoding::Digest) -> graph::DatabaseWalker<'db> { - each_variant!(self, repo, { repo.walk_objects(root) }) + fn walk_items<'db>(&'db self, root: RichDigest) -> graph::DatabaseWalker<'db> { + each_variant!(self, repo, { repo.walk_items(root) }) } } @@ -488,16 +488,16 @@ impl DatabaseView for Arc { fn find_digests<'a>( &self, search_criteria: &'a graph::DigestSearchCriteria, - ) -> Pin> + Send + 'a>> { + ) -> Pin> + Send + 'a>> { each_variant!(&**self, repo, { repo.find_digests(search_criteria) }) } - fn iter_objects(&self) -> graph::DatabaseIterator<'_> { - each_variant!(&**self, repo, { repo.iter_objects() }) + fn iter_items(&self) -> graph::DatabaseIterator<'_> { + each_variant!(&**self, repo, { repo.iter_items() }) } - fn walk_objects<'db>(&'db self, root: &encoding::Digest) -> graph::DatabaseWalker<'db> { - each_variant!(&**self, repo, { repo.walk_objects(root) }) + fn walk_items<'db>(&'db self, root: RichDigest) -> graph::DatabaseWalker<'db> { + each_variant!(&**self, repo, { repo.walk_items(root) }) } } diff --git a/crates/spfs/src/storage/layer.rs b/crates/spfs/src/storage/layer.rs index bddff3a43..27b11e664 100644 --- a/crates/spfs/src/storage/layer.rs +++ b/crates/spfs/src/storage/layer.rs @@ -8,7 +8,8 @@ use encoding::prelude::*; use futures::Stream; use tokio_stream::StreamExt; -use crate::{Result, encoding, graph, tracking}; +use crate::graph::{self, DatabaseIterItem}; +use crate::{Result, encoding, tracking}; pub type LayerStreamItem = Result<(encoding::Digest, graph::Layer)>; @@ -16,11 +17,15 @@ pub type LayerStreamItem = Result<(encoding::Digest, graph::Layer)>; pub trait LayerStorage: graph::Database + Sync + Send { /// Iterate the objects in this storage which are layers. fn iter_layers<'db>(&'db self) -> Pin + 'db>> { - let stream = self.iter_objects().filter_map(|res| match res { - Ok(graph::DatabaseItem::Object(digest, obj)) => { - obj.into_layer().map(|b| Ok((digest, b))) - } - Ok(graph::DatabaseItem::Payload(_digest)) => None, + let stream = self.iter_items().filter_map(|res| match res { + Ok(DatabaseIterItem { + item: graph::DatabaseItem::Object(digest, obj), + .. + }) => obj.into_layer().map(|b| Ok((digest, b))), + Ok(DatabaseIterItem { + item: graph::DatabaseItem::Payload(_digest), + .. + }) => None, Err(err) => Some(Err(err)), }); Box::pin(stream) diff --git a/crates/spfs/src/storage/manifest.rs b/crates/spfs/src/storage/manifest.rs index fa719d81f..8f759c076 100644 --- a/crates/spfs/src/storage/manifest.rs +++ b/crates/spfs/src/storage/manifest.rs @@ -7,7 +7,8 @@ use std::pin::Pin; use futures::stream::Stream; use tokio_stream::StreamExt; -use crate::{Result, encoding, graph}; +use crate::graph::{self, DatabaseIterItem}; +use crate::{Result, encoding}; #[cfg(test)] #[path = "./manifest_test.rs"] @@ -19,11 +20,15 @@ pub type ManifestStreamItem = Result<(encoding::Digest, graph::Manifest)>; pub trait ManifestStorage: graph::Database + Sync + Send { /// Iterate the objects in this storage which are manifests. fn iter_manifests<'db>(&'db self) -> Pin + 'db>> { - let stream = self.iter_objects().filter_map(|res| match res { - Ok(graph::DatabaseItem::Object(digest, obj)) => { - obj.into_manifest().map(|b| Ok((digest, b))) - } - Ok(graph::DatabaseItem::Payload(_digest)) => None, + let stream = self.iter_items().filter_map(|res| match res { + Ok(DatabaseIterItem { + item: graph::DatabaseItem::Object(digest, obj), + .. + }) => obj.into_manifest().map(|b| Ok((digest, b))), + Ok(DatabaseIterItem { + item: graph::DatabaseItem::Payload(_digest), + .. + }) => None, Err(err) => Some(Err(err)), }); Box::pin(stream) diff --git a/crates/spfs/src/storage/manifest_test.rs b/crates/spfs/src/storage/manifest_test.rs index 17403ac77..4d4223a69 100644 --- a/crates/spfs/src/storage/manifest_test.rs +++ b/crates/spfs/src/storage/manifest_test.rs @@ -20,7 +20,7 @@ async fn test_read_write_manifest( repo: TempRepo, tmpdir: tempfile::TempDir, ) { - use crate::graph::FoundDigest; + use crate::graph::RichDigest; let dir = tmpdir.path(); let repo = repo.await; @@ -46,7 +46,7 @@ async fn test_read_write_manifest( .collect() .await; let digests = digests.unwrap(); - assert!(digests.contains(&FoundDigest::Object(expected))); + assert!(digests.contains(&RichDigest::Object(expected))); } #[rstest] diff --git a/crates/spfs/src/storage/pinned/repository.rs b/crates/spfs/src/storage/pinned/repository.rs index 9f2958d84..1a715693a 100644 --- a/crates/spfs/src/storage/pinned/repository.rs +++ b/crates/spfs/src/storage/pinned/repository.rs @@ -10,7 +10,7 @@ use chrono::{DateTime, Utc}; use futures::Stream; use spfs_encoding as encoding; -use crate::graph::{FoundDigest, ObjectProto}; +use crate::graph::{ObjectProto, RichDigest}; use crate::storage::prelude::*; use crate::tracking::BlobRead; use crate::{Error, Result, graph}; @@ -66,19 +66,19 @@ where fn find_digests<'a>( &self, search_criteria: &'a graph::DigestSearchCriteria, - ) -> Pin> + Send + 'a>> { + ) -> Pin> + Send + 'a>> { self.inner.find_digests(search_criteria) } - fn iter_objects(&self) -> graph::DatabaseIterator<'_> { - self.inner.iter_objects() + fn iter_items(&self) -> graph::DatabaseIterator<'_> { + self.inner.iter_items() } - fn walk_objects<'db>(&'db self, root: &encoding::Digest) -> graph::DatabaseWalker<'db> { - self.inner.walk_objects(root) + fn walk_items<'db>(&'db self, root: RichDigest) -> graph::DatabaseWalker<'db> { + self.inner.walk_items(root) } - async fn resolve_full_digest(&self, partial: &encoding::PartialDigest) -> Result { + async fn resolve_full_digest(&self, partial: &encoding::PartialDigest) -> Result { self.inner.resolve_full_digest(partial).await } } diff --git a/crates/spfs/src/storage/platform.rs b/crates/spfs/src/storage/platform.rs index 567beed18..bf947afb7 100644 --- a/crates/spfs/src/storage/platform.rs +++ b/crates/spfs/src/storage/platform.rs @@ -7,7 +7,8 @@ use std::pin::Pin; use futures::stream::Stream; use tokio_stream::StreamExt; -use crate::{Result, encoding, graph}; +use crate::graph::{self, DatabaseIterItem}; +use crate::{Result, encoding}; pub type PlatformStreamItem = Result<(encoding::Digest, graph::Platform)>; @@ -15,11 +16,15 @@ pub type PlatformStreamItem = Result<(encoding::Digest, graph::Platform)>; pub trait PlatformStorage: graph::Database + Sync + Send { /// Iterate the objects in this storage which are platforms. fn iter_platforms<'db>(&'db self) -> Pin + 'db>> { - let stream = self.iter_objects().filter_map(|res| match res { - Ok(graph::DatabaseItem::Object(digest, obj)) => { - obj.into_platform().map(|b| Ok((digest, b))) - } - Ok(graph::DatabaseItem::Payload(_digest)) => None, + let stream = self.iter_items().filter_map(|res| match res { + Ok(DatabaseIterItem { + item: graph::DatabaseItem::Object(digest, obj), + .. + }) => obj.into_platform().map(|b| Ok((digest, b))), + Ok(DatabaseIterItem { + item: graph::DatabaseItem::Payload(_digest), + .. + }) => None, Err(err) => Some(Err(err)), }); Box::pin(stream) diff --git a/crates/spfs/src/storage/proxy/repository.rs b/crates/spfs/src/storage/proxy/repository.rs index eb2d1a92c..a65006b05 100644 --- a/crates/spfs/src/storage/proxy/repository.rs +++ b/crates/spfs/src/storage/proxy/repository.rs @@ -13,7 +13,7 @@ use futures::{Stream, StreamExt, future}; use relative_path::RelativePath; use crate::config::{ToAddress, default_proxy_repo_include_secondary_tags}; -use crate::graph::{FoundDigest, ObjectProto}; +use crate::graph::{ObjectProto, RichDigest}; use crate::prelude::*; use crate::storage::proxy::ProxyRepositoryExt; use crate::storage::tag::TagSpecAndTagStream; @@ -162,16 +162,16 @@ impl graph::DatabaseView for ProxyRepository { fn find_digests<'a>( &self, search_criteria: &'a graph::DigestSearchCriteria, - ) -> Pin> + Send + 'a>> { + ) -> Pin> + Send + 'a>> { self.primary.find_digests(search_criteria) } - fn iter_objects(&self) -> graph::DatabaseIterator<'_> { - self.primary.iter_objects() + fn iter_items(&self) -> graph::DatabaseIterator<'_> { + self.primary.iter_items() } - fn walk_objects<'db>(&'db self, root: &encoding::Digest) -> graph::DatabaseWalker<'db> { - self.primary.walk_objects(root) + fn walk_items<'db>(&'db self, root: RichDigest) -> graph::DatabaseWalker<'db> { + self.primary.walk_items(root) } } diff --git a/crates/spfs/src/storage/rpc/database.rs b/crates/spfs/src/storage/rpc/database.rs index 4aebc795b..3a69ddd41 100644 --- a/crates/spfs/src/storage/rpc/database.rs +++ b/crates/spfs/src/storage/rpc/database.rs @@ -8,7 +8,7 @@ use std::pin::Pin; use futures::{Stream, TryStreamExt}; use proto::RpcResult; -use crate::graph::{self, FoundDigest, ObjectProto}; +use crate::graph::{self, ObjectProto, RichDigest}; use crate::{Result, encoding, proto}; #[async_trait::async_trait] @@ -43,7 +43,7 @@ impl graph::DatabaseView for super::RpcRepository { fn find_digests<'a>( &self, search_criteria: &'a graph::DigestSearchCriteria, - ) -> Pin> + Send + 'a>> { + ) -> Pin> + Send + 'a>> { let request = proto::FindDigestsRequest { search_criteria: Some(search_criteria.clone().into()), }; @@ -57,12 +57,12 @@ impl graph::DatabaseView for super::RpcRepository { Box::pin(stream) } - fn iter_objects(&self) -> graph::DatabaseIterator<'_> { + fn iter_items(&self) -> graph::DatabaseIterator<'_> { graph::DatabaseIterator::new(self) } - fn walk_objects<'db>(&'db self, root: &encoding::Digest) -> graph::DatabaseWalker<'db> { - graph::DatabaseWalker::new(self, *root) + fn walk_items<'db>(&'db self, root: RichDigest) -> graph::DatabaseWalker<'db> { + graph::DatabaseWalker::new(self, root) } } diff --git a/crates/spfs/src/storage/tar/repository.rs b/crates/spfs/src/storage/tar/repository.rs index 38e448e86..b549c7b11 100644 --- a/crates/spfs/src/storage/tar/repository.rs +++ b/crates/spfs/src/storage/tar/repository.rs @@ -14,7 +14,7 @@ use relative_path::RelativePath; use tar::{Archive, Builder}; use crate::config::{ToAddress, pathbuf_deserialize_with_tilde_expansion}; -use crate::graph::{FoundDigest, ObjectProto}; +use crate::graph::{ObjectProto, RichDigest}; use crate::prelude::*; use crate::storage::fs::DURABLE_EDITS_DIR; use crate::storage::tag::TagSpecAndTagStream; @@ -228,19 +228,19 @@ impl graph::DatabaseView for TarRepository { fn find_digests<'a>( &self, search_criteria: &'a graph::DigestSearchCriteria, - ) -> Pin> + Send + 'a>> { + ) -> Pin> + Send + 'a>> { self.repo.find_digests(search_criteria) } - fn iter_objects(&self) -> graph::DatabaseIterator<'_> { - self.repo.iter_objects() + fn iter_items(&self) -> graph::DatabaseIterator<'_> { + self.repo.iter_items() } - fn walk_objects<'db>(&'db self, root: &encoding::Digest) -> graph::DatabaseWalker<'db> { - self.repo.walk_objects(root) + fn walk_items<'db>(&'db self, root: RichDigest) -> graph::DatabaseWalker<'db> { + self.repo.walk_items(root) } - async fn resolve_full_digest(&self, partial: &encoding::PartialDigest) -> Result { + async fn resolve_full_digest(&self, partial: &encoding::PartialDigest) -> Result { self.repo.resolve_full_digest(partial).await } } diff --git a/crates/spfs/src/sync.rs b/crates/spfs/src/sync.rs index 3ac389a9e..a07b7b934 100644 --- a/crates/spfs/src/sync.rs +++ b/crates/spfs/src/sync.rs @@ -270,11 +270,11 @@ impl<'src, 'dst> Syncer<'src, 'dst> { res => res, }?; match found_digest { - graph::FoundDigest::Object(digest) => { + graph::RichDigest::Object(digest) => { let obj_result = self.sync_object_digest(digest).await?; Ok(SyncItemResult::Object(obj_result)) } - graph::FoundDigest::Payload(digest) => { + graph::RichDigest::Payload(digest) => { let payload_result = self.sync_payload(digest).await?; Ok(SyncItemResult::Payload(payload_result)) } diff --git a/cspell.json b/cspell.json index 127758232..365ef6887 100644 --- a/cspell.json +++ b/cspell.json @@ -532,6 +532,7 @@ "peekable", "perror", "persistable", + "petgraph", "pids", "Pipenv", "Pinnable", @@ -760,6 +761,8 @@ "tmprepo", "tmpspfs", "Toolhelp", + "topo", + "toposort", "topvalue", "topvar", "TPFWIZI",