-
Notifications
You must be signed in to change notification settings - Fork 11
Indexes 9 - Adds messaging on package events to kafka #1356
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: index-8-spk-build-fix
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,74 @@ | ||
| // Copyright (c) Contributors to the SPK project. | ||
| // SPDX-License-Identifier: Apache-2.0 | ||
| // https://github.com/spkenv/spk | ||
|
|
||
| use std::collections::HashMap; | ||
| use std::time::Duration; | ||
|
|
||
| use once_cell::sync::Lazy; | ||
| use rdkafka::ClientConfig; | ||
| use rdkafka::error::KafkaError; | ||
| use rdkafka::producer::{FutureProducer, FutureRecord}; | ||
| use serde_json::json; | ||
| use spk_config::KafkaChannel; | ||
| use spk_schema::BuildIdent; | ||
| use tokio::sync::Mutex; | ||
|
|
||
| use crate::storage::messaging::PackageEvent; | ||
| use crate::{Error, Result}; | ||
|
|
||
| type ProducersByBrokers = HashMap<Vec<String>, std::result::Result<FutureProducer, KafkaError>>; | ||
|
|
||
| static KAFKA_PRODUCERS: Lazy<Mutex<ProducersByBrokers>> = Lazy::new(|| Mutex::new(HashMap::new())); | ||
|
|
||
| pub(crate) async fn announce_package_event( | ||
| kafka_channel: &KafkaChannel, | ||
| event: PackageEvent, | ||
| to: &url::Url, | ||
| ident: &BuildIdent, | ||
| ) -> Result<()> { | ||
| let Some(topic_name) = kafka_channel.package_updates_topic_name.as_ref() else { | ||
| return Ok(()); | ||
| }; | ||
|
|
||
| let mut kafka_producers = KAFKA_PRODUCERS.lock().await; | ||
|
|
||
| let producer = kafka_producers | ||
| .entry(kafka_channel.brokers.clone()) | ||
| .or_insert_with(|| { | ||
| ClientConfig::new() | ||
| // XXX: probably should expose all configuration parameters in | ||
| // spk config | ||
| .set("bootstrap.servers", kafka_channel.brokers.join(",")) | ||
| .set("message.timeout.ms", kafka_channel.timeout_ms.to_string()) | ||
| .create() | ||
| }) | ||
| .as_ref() | ||
| .map_err(|err| { | ||
| Error::String(format!( | ||
| "failed to create kafka producer from config: {err}" | ||
| )) | ||
| })?; | ||
|
|
||
| producer | ||
| .send( | ||
| FutureRecord::to(topic_name) | ||
| // Using the package name as the key for purposes of sending | ||
| // all messages about the same package to the same topic | ||
| // partition. | ||
| .key(ident.name().as_str()) | ||
| .payload( | ||
| &json!({ | ||
| "event": event.to_string(), | ||
| "repo": to.to_string(), | ||
| "package": ident.to_string(), | ||
| }) | ||
| .to_string(), | ||
| ), | ||
| Duration::from_secs(0), | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This timeout is for waiting for room in the producer queue, it should be willing to wait some small amount of time as a form of back-pressure. Better to wait a little than fail aggressively. |
||
| ) | ||
| .await | ||
| .map_err(|err| Error::String(format!("failed to send kafka message: {err:?}")))?; | ||
|
|
||
| Ok(()) | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,38 @@ | ||
| // Copyright (c) Contributors to the SPK project. | ||
| // SPDX-License-Identifier: Apache-2.0 | ||
| // https://github.com/spkenv/spk | ||
|
|
||
| mod kafka; | ||
|
|
||
| use spk_config::MessageChannel; | ||
| use spk_schema::BuildIdent; | ||
|
|
||
| use crate::Result; | ||
|
|
||
| /// Types of events than can be reported about a package | ||
| #[derive(Copy, Clone, Debug, strum::Display)] | ||
| pub(crate) enum PackageEvent { | ||
| #[strum(to_string = "package published")] | ||
| Published, | ||
| #[strum(to_string = "package modified")] | ||
| Modified, | ||
| #[strum(to_string = "package removed")] | ||
| Removed, | ||
| } | ||
|
|
||
| pub(crate) async fn announce_package_event( | ||
| event: PackageEvent, | ||
| to: &url::Url, | ||
| ident: &BuildIdent, | ||
| ) -> Result<()> { | ||
| let config = spk_config::get_config()?; | ||
| for channel in &config.messaging { | ||
| match channel { | ||
| MessageChannel::Kafka(kafka_channel) => { | ||
| kafka::announce_package_event(kafka_channel, event, to, ident).await? | ||
| } | ||
| } | ||
| } | ||
|
|
||
| Ok(()) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest mirroring the name of the kafka property as a convention to make it more plain what this setting maps to on the kafka side.