diff --git a/.github/workflows/coverage.yml b/.github/workflows/coverage.yml index 31f1ea4a1..d6d188603 100644 --- a/.github/workflows/coverage.yml +++ b/.github/workflows/coverage.yml @@ -39,7 +39,7 @@ jobs: - name: Install prerequisites run: | apt-get update - apt-get install --yes build-essential libcap2-bin sudo cmake tcsh rsync protobuf-compiler fuse libfuse-dev curl unzip pkg-config libssl-dev git + apt-get install --yes build-essential libcap2-bin sudo cmake tcsh rsync protobuf-compiler fuse libfuse-dev curl unzip pkg-config libssl-dev git libcurl4-openssl-dev # spfs-fuse requires this option enabled echo user_allow_other >> /etc/fuse.conf FB_REL=https://github.com/google/flatbuffers/releases/ diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 59001f34e..1fb8d69d6 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -122,7 +122,7 @@ jobs: - name: Prepare Container run: | apt-get update - apt-get install -y libcap2-bin sudo cmake tcsh rsync protobuf-compiler fuse libfuse-dev curl unzip pkg-config libssl-dev git + apt-get install -y build-essential libcap2-bin sudo cmake tcsh rsync protobuf-compiler fuse libfuse-dev curl unzip pkg-config libssl-dev git libcurl4-openssl-dev # spfs-fuse requires this option enabled echo user_allow_other >> /etc/fuse.conf FB_REL=https://github.com/google/flatbuffers/releases/ @@ -234,7 +234,7 @@ jobs: - name: Prepare Container run: | apt-get update - apt-get install -y build-essential libcap2-bin sudo cmake tcsh rsync protobuf-compiler fuse libfuse-dev curl unzip pkg-config libssl-dev git + apt-get install -y build-essential libcap2-bin sudo cmake tcsh rsync protobuf-compiler fuse libfuse-dev curl unzip pkg-config libssl-dev git libcurl4-openssl-dev # spfs-fuse requires this option enabled echo user_allow_other >> /etc/fuse.conf FB_REL=https://github.com/google/flatbuffers/releases/ @@ -244,7 +244,7 @@ jobs: - name: Setup cross-compilation if: matrix.platform == 'windows' run: | - apt-get install -y gcc-mingw-w64-x86-64 + apt-get install -y gcc-mingw-w64-x86-64 g++-mingw-w64-x86-64-win32 rustup target add ${{ matrix.target }} - name: Configure cargo run: | diff --git a/Cargo.lock b/Cargo.lock index 2aca824f4..e032670ef 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -508,9 +508,9 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" [[package]] name = "cc" -version = "1.2.35" +version = "1.2.60" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "590f9024a68a8c40351881787f1934dc11afd69090f5edb6831464694d836ea3" +checksum = "43c5703da9466b66a946814e1adf53ea2c90f10063b86290cc9eb67ce3478a20" dependencies = [ "find-msvc-tools", "shlex", @@ -645,6 +645,15 @@ version = "0.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b94f61472cee1439c0b966b47e3aca9ae07e45d070759512cd390ea2bebc6675" +[[package]] +name = "cmake" +version = "0.1.58" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c0f78a02292a74a88ac736019ab962ece0bc380e3f977bf72e376c5d78ff0678" +dependencies = [ + "cc", +] + [[package]] name = "colorchoice" version = "1.0.4" @@ -1299,9 +1308,9 @@ dependencies = [ [[package]] name = "find-msvc-tools" -version = "0.1.0" +version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e178e4fba8a2726903f6ba98a6d221e76f9c12c650d5dc0e6afdc50677b49650" +checksum = "5baebc0774151f905a1a2cc41989300b1e6fbb29aff0ceffa1064fdd3088d582" [[package]] name = "fixedbitset" @@ -2160,6 +2169,18 @@ dependencies = [ "redox_syscall", ] +[[package]] +name = "libz-sys" +version = "1.1.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc3a226e576f50782b3305c5ccf458698f92798987f551c6a02efe8276721e22" +dependencies = [ + "cc", + "libc", + "pkg-config", + "vcpkg", +] + [[package]] name = "linked-hash-map" version = "0.5.6" @@ -2463,6 +2484,28 @@ dependencies = [ "libc", ] +[[package]] +name = "num_enum" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d0bca838442ec211fa11de3a8b0e0e8f3a4522575b5c4c06ed722e005036f26" +dependencies = [ + "num_enum_derive", + "rustversion", +] + +[[package]] +name = "num_enum_derive" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "680998035259dcfcafe653688bf2aa6d3e2dc05e98be6ab46afb089dc84f1df8" +dependencies = [ + "proc-macro-crate", + "proc-macro2", + "quote", + "syn 2.0.106", +] + [[package]] name = "number_prefix" version = "0.4.0" @@ -3161,6 +3204,37 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "rdkafka" +version = "0.39.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7956f9ac12b5712e50372d9749a3102f4810a8d42481c5eae3748d36d585bcf" +dependencies = [ + "futures-channel", + "futures-util", + "libc", + "log", + "rdkafka-sys", + "serde", + "serde_derive", + "serde_json", + "slab", + "tokio", +] + +[[package]] +name = "rdkafka-sys" +version = "4.10.0+2.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e234cf318915c1059d4921ef7f75616b5219b10b46e9f3a511a15eb4b56a3f77" +dependencies = [ + "cmake", + "libc", + "libz-sys", + "num_enum", + "pkg-config", +] + [[package]] name = "redox_syscall" version = "0.5.17" @@ -4985,6 +5059,7 @@ dependencies = [ "nom-supreme", "once_cell", "paste", + "rdkafka", "regex", "relative-path", "ring", @@ -5000,6 +5075,7 @@ dependencies = [ "spk-proto", "spk-schema", "spk-solve-macros", + "strum", "sys-info", "tar", "tempfile", diff --git a/Cargo.toml b/Cargo.toml index b3bfb5491..875617914 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -95,6 +95,7 @@ progress_bar_derive_macro = { path = "crates/progress_bar_derive_macro" } proptest = "1.0.0" prost = "0.13" rand = "0.8.5" +rdkafka = { version = "0.39.0", features = ["cmake-build"] } regex = "1.6" relative-path = "1.3" resolvo = "0.9.1" diff --git a/crates/spk-config/src/config.rs b/crates/spk-config/src/config.rs index 00d0d343a..73099a1b4 100644 --- a/crates/spk-config/src/config.rs +++ b/crates/spk-config/src/config.rs @@ -196,6 +196,30 @@ pub struct HostOptions { pub distro_rules: HashMap, } +/// Helper for testing default kafka timeout ms when not specified in +/// config +fn default_kafka_timeout_ms() -> u64 { + 5000 +} + +/// Configuration for using Kafka as a message channel. +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct KafkaChannel { + /// List of brokers to connect to, specified as "hostname:port" + pub brokers: Vec, + /// Topic name to push package updates to + pub package_updates_topic_name: Option, + /// Message timeout in milliseconds, defaults to 5000 ms (5 seconds) + #[serde(default = "default_kafka_timeout_ms")] + pub timeout_ms: u64, +} + +/// Types of message channels. +#[derive(Clone, Debug, Deserialize, Serialize)] +pub enum MessageChannel { + Kafka(KafkaChannel), +} + /// Configuration values for spk. #[derive(Clone, Debug, Default, Deserialize, Serialize)] #[serde(default)] @@ -210,6 +234,7 @@ pub struct Config { pub metadata: Metadata, pub cli: Cli, pub host_options: HostOptions, + pub messaging: Vec, } impl Config { diff --git a/crates/spk-storage/Cargo.toml b/crates/spk-storage/Cargo.toml index 7bca02509..3a14b028d 100644 --- a/crates/spk-storage/Cargo.toml +++ b/crates/spk-storage/Cargo.toml @@ -44,6 +44,7 @@ nom = { workspace = true } nom-supreme = { workspace = true } once_cell = { workspace = true } paste = { workspace = true } +rdkafka = { workspace = true } regex = { workspace = true } relative-path = { workspace = true } ring = { workspace = true } @@ -57,6 +58,7 @@ spfs = { workspace = true } spk-config = { workspace = true } spk-proto = { workspace = true } spk-schema = { workspace = true } +strum = { workspace = true } sys-info = "0.9.0" tar = "0.4.45" tempfile = { workspace = true } diff --git a/crates/spk-storage/src/storage/messaging/kafka.rs b/crates/spk-storage/src/storage/messaging/kafka.rs new file mode 100644 index 000000000..c2eacc1ac --- /dev/null +++ b/crates/spk-storage/src/storage/messaging/kafka.rs @@ -0,0 +1,74 @@ +// Copyright (c) Contributors to the SPK project. +// SPDX-License-Identifier: Apache-2.0 +// https://github.com/spkenv/spk + +use std::collections::HashMap; +use std::time::Duration; + +use once_cell::sync::Lazy; +use rdkafka::ClientConfig; +use rdkafka::error::KafkaError; +use rdkafka::producer::{FutureProducer, FutureRecord}; +use serde_json::json; +use spk_config::KafkaChannel; +use spk_schema::BuildIdent; +use tokio::sync::Mutex; + +use crate::storage::messaging::PackageEvent; +use crate::{Error, Result}; + +type ProducersByBrokers = HashMap, std::result::Result>; + +static KAFKA_PRODUCERS: Lazy> = Lazy::new(|| Mutex::new(HashMap::new())); + +pub(crate) async fn announce_package_event( + kafka_channel: &KafkaChannel, + event: PackageEvent, + to: &url::Url, + ident: &BuildIdent, +) -> Result<()> { + let Some(topic_name) = kafka_channel.package_updates_topic_name.as_ref() else { + return Ok(()); + }; + + let mut kafka_producers = KAFKA_PRODUCERS.lock().await; + + let producer = kafka_producers + .entry(kafka_channel.brokers.clone()) + .or_insert_with(|| { + ClientConfig::new() + // XXX: probably should expose all configuration parameters in + // spk config + .set("bootstrap.servers", kafka_channel.brokers.join(",")) + .set("message.timeout.ms", kafka_channel.timeout_ms.to_string()) + .create() + }) + .as_ref() + .map_err(|err| { + Error::String(format!( + "failed to create kafka producer from config: {err}" + )) + })?; + + producer + .send( + FutureRecord::to(topic_name) + // Using the package name as the key for purposes of sending + // all messages about the same package to the same topic + // partition. + .key(ident.name().as_str()) + .payload( + &json!({ + "event": event.to_string(), + "repo": to.to_string(), + "package": ident.to_string(), + }) + .to_string(), + ), + Duration::from_secs(0), + ) + .await + .map_err(|err| Error::String(format!("failed to send kafka message: {err:?}")))?; + + Ok(()) +} diff --git a/crates/spk-storage/src/storage/messaging/mod.rs b/crates/spk-storage/src/storage/messaging/mod.rs new file mode 100644 index 000000000..a7136cb81 --- /dev/null +++ b/crates/spk-storage/src/storage/messaging/mod.rs @@ -0,0 +1,38 @@ +// Copyright (c) Contributors to the SPK project. +// SPDX-License-Identifier: Apache-2.0 +// https://github.com/spkenv/spk + +mod kafka; + +use spk_config::MessageChannel; +use spk_schema::BuildIdent; + +use crate::Result; + +/// Types of events than can be reported about a package +#[derive(Copy, Clone, Debug, strum::Display)] +pub(crate) enum PackageEvent { + #[strum(to_string = "package published")] + Published, + #[strum(to_string = "package modified")] + Modified, + #[strum(to_string = "package removed")] + Removed, +} + +pub(crate) async fn announce_package_event( + event: PackageEvent, + to: &url::Url, + ident: &BuildIdent, +) -> Result<()> { + let config = spk_config::get_config()?; + for channel in &config.messaging { + match channel { + MessageChannel::Kafka(kafka_channel) => { + kafka::announce_package_event(kafka_channel, event, to, ident).await? + } + } + } + + Ok(()) +} diff --git a/crates/spk-storage/src/storage/mod.rs b/crates/spk-storage/src/storage/mod.rs index b757f77f9..40b63f129 100644 --- a/crates/spk-storage/src/storage/mod.rs +++ b/crates/spk-storage/src/storage/mod.rs @@ -7,6 +7,7 @@ mod flatbuffer_index; mod handle; mod indexed; mod mem; +mod messaging; mod repository; mod repository_index; mod runtime; @@ -17,6 +18,7 @@ pub use flatbuffer_index::FlatBufferRepoIndex; pub use handle::RepositoryHandle; pub use indexed::IndexedRepository; pub use mem::MemRepository; +pub(crate) use messaging::announce_package_event; pub use repository::{CachePolicy, Repository, Storage}; pub use repository_index::{RepoIndex, RepositoryIndex, RepositoryIndexMut}; pub use runtime::{RuntimeRepository, find_path_providers, pretty_print_filepath}; diff --git a/crates/spk-storage/src/storage/repository.rs b/crates/spk-storage/src/storage/repository.rs index c0a6a07c1..c921e4371 100644 --- a/crates/spk-storage/src/storage/repository.rs +++ b/crates/spk-storage/src/storage/repository.rs @@ -15,6 +15,8 @@ use spk_schema::option_map::get_host_options_filters; use spk_schema::{BuildIdent, Components, Deprecate, Package, PackageMut, VersionIdent}; use self::internal::RepositoryExt; +use super::announce_package_event; +use crate::storage::messaging::PackageEvent; use crate::{Error, Result}; #[cfg(test)] @@ -429,6 +431,8 @@ pub trait Repository: Storage + Sync { } } + announce_package_event(PackageEvent::Published, self.address(), package.ident()).await?; + Ok(()) } @@ -538,6 +542,8 @@ pub trait Repository: Storage + Sync { // else if there was no original spec, assume there is nothing needed // to do. + announce_package_event(PackageEvent::Modified, self.address(), package.ident()).await?; + Ok(()) } @@ -557,7 +563,9 @@ pub trait Repository: Storage + Sync { } } - self.remove_package_from_storage(pkg).await + self.remove_package_from_storage(pkg).await?; + + announce_package_event(PackageEvent::Removed, self.address(), pkg).await } /// Identify the payloads for this identified package's components. diff --git a/cspell.json b/cspell.json index a4aa8c8c6..502767c87 100644 --- a/cspell.json +++ b/cspell.json @@ -63,6 +63,7 @@ "Bottriell", "bracoxide", "bracoxidize", + "brokername", "bufread", "buildable", "builddep", @@ -140,6 +141,7 @@ "depvar", "depver", "Deque", + "derserializer", "deserializing", "DESTDIR", "devcontainer", @@ -526,6 +528,7 @@ "oslbatch", "oslcudatarget", "osloptix", + "otherbrokername", "otherdata", "otherfile", "outerrepo", @@ -579,6 +582,7 @@ "proptest", "prost", "proto", + "protos", "protobuf", "protoc", "protos", @@ -615,6 +619,7 @@ "RANLIB", "rbottriell", "rdev", + "rdkafka", "RDONLY", "RDWR", "readdir", diff --git a/docs/admin/config.md b/docs/admin/config.md index e18b7788e..1ace1495a 100644 --- a/docs/admin/config.md +++ b/docs/admin/config.md @@ -318,4 +318,14 @@ verify_index_before_use = true # The kind of index format to use for this repository. SPK currently supports # a flatbuffer based index. # index_kind = "flatb" + +# SPK can send messages to external messaging systems when packages are: +# published, modified, or removed. Each messaging system must be configured here. +# Currently, only kafka is supported. +# +# To send messages to a kafka system, the brokers and package_updates_topic_name +# must be configured. The timeout_ms is optional and defaults to 5 seconds: +# [[messaging]] +# kafka = { brokers = [ "brokername:port", "otherbrokername:port", ... ], package_updates_topic_name = "spk-package-updates-or-whatever-your-topic-name-is", timeout_ms = 5000 } + ``` diff --git a/docs/ref/messaging.md b/docs/ref/messaging.md new file mode 100644 index 000000000..02539cc8d --- /dev/null +++ b/docs/ref/messaging.md @@ -0,0 +1,49 @@ +--- +title: Messaging +summary: Messaging from SPK +weight: 120 +--- + +This explains SPK'S support for sending messages to external messaging systems + +## Messaging from SPK + +`SPK` supports sending messages to configured messaging systems after certain events. + +These events are when packages are: +- published +- modified +- deleted + + +### Supported messaging systems + +SPK only supports sending messages for events to kafka systems. + + +### SPK messaging configuration + +To send messages, SPK must be configured for each messaging system it +will send to. Multiple backend messaging systems can be specified. + +See [here]({{< ref "../admin/config" >}}) for the kafka configuration. + + +### SPK message format + +SPK sends messages in json format. They contain these fields and data: +``` +{ + "event": "generating event" + "repo": "repository name" + "package": "package/version/build identifier" +} +``` +For example: +``` +{ + "event": "package published" + "repo": "origin" + "package": "mypkg/1.2.3/ABCDEF" +} +```