Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/coverage.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ jobs:
- name: Install prerequisites
run: |
apt-get update
apt-get install --yes build-essential libcap2-bin sudo cmake tcsh rsync protobuf-compiler fuse libfuse-dev curl unzip pkg-config libssl-dev git
apt-get install --yes build-essential libcap2-bin sudo cmake tcsh rsync protobuf-compiler fuse libfuse-dev curl unzip pkg-config libssl-dev git libcurl4-openssl-dev
# spfs-fuse requires this option enabled
echo user_allow_other >> /etc/fuse.conf
FB_REL=https://github.com/google/flatbuffers/releases/
Expand Down
6 changes: 3 additions & 3 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ jobs:
- name: Prepare Container
run: |
apt-get update
apt-get install -y libcap2-bin sudo cmake tcsh rsync protobuf-compiler fuse libfuse-dev curl unzip pkg-config libssl-dev git
apt-get install -y build-essential libcap2-bin sudo cmake tcsh rsync protobuf-compiler fuse libfuse-dev curl unzip pkg-config libssl-dev git libcurl4-openssl-dev
# spfs-fuse requires this option enabled
echo user_allow_other >> /etc/fuse.conf
FB_REL=https://github.com/google/flatbuffers/releases/
Expand Down Expand Up @@ -234,7 +234,7 @@ jobs:
- name: Prepare Container
run: |
apt-get update
apt-get install -y build-essential libcap2-bin sudo cmake tcsh rsync protobuf-compiler fuse libfuse-dev curl unzip pkg-config libssl-dev git
apt-get install -y build-essential libcap2-bin sudo cmake tcsh rsync protobuf-compiler fuse libfuse-dev curl unzip pkg-config libssl-dev git libcurl4-openssl-dev
# spfs-fuse requires this option enabled
echo user_allow_other >> /etc/fuse.conf
FB_REL=https://github.com/google/flatbuffers/releases/
Expand All @@ -244,7 +244,7 @@ jobs:
- name: Setup cross-compilation
if: matrix.platform == 'windows'
run: |
apt-get install -y gcc-mingw-w64-x86-64
apt-get install -y gcc-mingw-w64-x86-64 g++-mingw-w64-x86-64-win32
rustup target add ${{ matrix.target }}
- name: Configure cargo
run: |
Expand Down
84 changes: 80 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ progress_bar_derive_macro = { path = "crates/progress_bar_derive_macro" }
proptest = "1.0.0"
prost = "0.13"
rand = "0.8.5"
rdkafka = { version = "0.39.0", features = ["cmake-build"] }
regex = "1.6"
relative-path = "1.3"
resolvo = "0.9.1"
Expand Down
25 changes: 25 additions & 0 deletions crates/spk-config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,30 @@ pub struct HostOptions {
pub distro_rules: HashMap<String, DistroRule>,
}

/// Helper for testing default kafka timeout ms when not specified in
/// config
fn default_kafka_timeout_ms() -> u64 {
5000
}

/// Configuration for using Kafka as a message channel.
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct KafkaChannel {
/// List of brokers to connect to, specified as "hostname:port"
pub brokers: Vec<String>,
/// Topic name to push package updates to
pub package_updates_topic_name: Option<String>,
/// Message timeout in milliseconds, defaults to 5000 ms (5 seconds)
#[serde(default = "default_kafka_timeout_ms")]
pub timeout_ms: u64,
Comment on lines +212 to +214
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// Message timeout in milliseconds, defaults to 5000 ms (5 seconds)
#[serde(default = "default_kafka_timeout_ms")]
pub timeout_ms: u64,
/// Message timeout in milliseconds, defaults to 5000 ms (5 seconds)
#[serde(default = "default_kafka_message_timeout_ms")]
pub message_timeout_ms: u64,

I suggest mirroring the name of the kafka property as a convention to make it more plain what this setting maps to on the kafka side.

}

/// Types of message channels.
#[derive(Clone, Debug, Deserialize, Serialize)]
pub enum MessageChannel {
Kafka(KafkaChannel),
}

/// Configuration values for spk.
#[derive(Clone, Debug, Default, Deserialize, Serialize)]
#[serde(default)]
Expand All @@ -210,6 +234,7 @@ pub struct Config {
pub metadata: Metadata,
pub cli: Cli,
pub host_options: HostOptions,
pub messaging: Vec<MessageChannel>,
}

impl Config {
Expand Down
2 changes: 2 additions & 0 deletions crates/spk-storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ nom = { workspace = true }
nom-supreme = { workspace = true }
once_cell = { workspace = true }
paste = { workspace = true }
rdkafka = { workspace = true }
regex = { workspace = true }
relative-path = { workspace = true }
ring = { workspace = true }
Expand All @@ -57,6 +58,7 @@ spfs = { workspace = true }
spk-config = { workspace = true }
spk-proto = { workspace = true }
spk-schema = { workspace = true }
strum = { workspace = true }
sys-info = "0.9.0"
tar = "0.4.45"
tempfile = { workspace = true }
Expand Down
74 changes: 74 additions & 0 deletions crates/spk-storage/src/storage/messaging/kafka.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Copyright (c) Contributors to the SPK project.
// SPDX-License-Identifier: Apache-2.0
// https://github.com/spkenv/spk

use std::collections::HashMap;
use std::time::Duration;

use once_cell::sync::Lazy;
use rdkafka::ClientConfig;
use rdkafka::error::KafkaError;
use rdkafka::producer::{FutureProducer, FutureRecord};
use serde_json::json;
use spk_config::KafkaChannel;
use spk_schema::BuildIdent;
use tokio::sync::Mutex;

use crate::storage::messaging::PackageEvent;
use crate::{Error, Result};

type ProducersByBrokers = HashMap<Vec<String>, std::result::Result<FutureProducer, KafkaError>>;

static KAFKA_PRODUCERS: Lazy<Mutex<ProducersByBrokers>> = Lazy::new(|| Mutex::new(HashMap::new()));

pub(crate) async fn announce_package_event(
kafka_channel: &KafkaChannel,
event: PackageEvent,
to: &url::Url,
ident: &BuildIdent,
) -> Result<()> {
let Some(topic_name) = kafka_channel.package_updates_topic_name.as_ref() else {
return Ok(());
};

let mut kafka_producers = KAFKA_PRODUCERS.lock().await;

let producer = kafka_producers
.entry(kafka_channel.brokers.clone())
.or_insert_with(|| {
ClientConfig::new()
// XXX: probably should expose all configuration parameters in
// spk config
.set("bootstrap.servers", kafka_channel.brokers.join(","))
.set("message.timeout.ms", kafka_channel.timeout_ms.to_string())
.create()
})
.as_ref()
.map_err(|err| {
Error::String(format!(
"failed to create kafka producer from config: {err}"
))
})?;

producer
.send(
FutureRecord::to(topic_name)
// Using the package name as the key for purposes of sending
// all messages about the same package to the same topic
// partition.
.key(ident.name().as_str())
.payload(
&json!({
"event": event.to_string(),
"repo": to.to_string(),
"package": ident.to_string(),
})
.to_string(),
),
Duration::from_secs(0),
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This timeout is for waiting for room in the producer queue, it should be willing to wait some small amount of time as a form of back-pressure. Better to wait a little than fail aggressively.

)
.await
.map_err(|err| Error::String(format!("failed to send kafka message: {err:?}")))?;

Ok(())
}
38 changes: 38 additions & 0 deletions crates/spk-storage/src/storage/messaging/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright (c) Contributors to the SPK project.
// SPDX-License-Identifier: Apache-2.0
// https://github.com/spkenv/spk

mod kafka;

use spk_config::MessageChannel;
use spk_schema::BuildIdent;

use crate::Result;

/// Types of events than can be reported about a package
#[derive(Copy, Clone, Debug, strum::Display)]
pub(crate) enum PackageEvent {
#[strum(to_string = "package published")]
Published,
#[strum(to_string = "package modified")]
Modified,
#[strum(to_string = "package removed")]
Removed,
}

pub(crate) async fn announce_package_event(
event: PackageEvent,
to: &url::Url,
ident: &BuildIdent,
) -> Result<()> {
let config = spk_config::get_config()?;
for channel in &config.messaging {
match channel {
MessageChannel::Kafka(kafka_channel) => {
kafka::announce_package_event(kafka_channel, event, to, ident).await?
}
}
}

Ok(())
}
2 changes: 2 additions & 0 deletions crates/spk-storage/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ mod flatbuffer_index;
mod handle;
mod indexed;
mod mem;
mod messaging;
mod repository;
mod repository_index;
mod runtime;
Expand All @@ -17,6 +18,7 @@ pub use flatbuffer_index::FlatBufferRepoIndex;
pub use handle::RepositoryHandle;
pub use indexed::IndexedRepository;
pub use mem::MemRepository;
pub(crate) use messaging::announce_package_event;
pub use repository::{CachePolicy, Repository, Storage};
pub use repository_index::{RepoIndex, RepositoryIndex, RepositoryIndexMut};
pub use runtime::{RuntimeRepository, find_path_providers, pretty_print_filepath};
Expand Down
Loading
Loading