Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 71 additions & 34 deletions src/pubsub/src/subscriber/lease_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,22 @@ use super::handler::Action;
use super::lease_state::{LeaseEvent, LeaseOptions, LeaseState, NewMessage};
use super::leaser::{ConfirmedAcks, Leaser};
use super::shutdown_behavior::ShutdownBehavior;
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
#[cfg(test)]
use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::mpsc::{UnboundedReceiver, WeakUnboundedSender, unbounded_channel};
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;

/// A convenience struct that groups the components of the lease loop.
pub(super) struct LeaseLoop {
/// A handle to the task running the lease loop.
pub(super) handle: JoinHandle<()>,
/// For sending messages from the stream to the lease loop.
pub(super) message_tx: UnboundedSender<NewMessage>,
pub(super) message_tx: WeakUnboundedSender<NewMessage>,
/// For sending acks/nacks from the application to the lease loop.
pub(super) ack_tx: UnboundedSender<Action>,
pub(super) ack_tx: WeakUnboundedSender<Action>,
/// A token that can signal shutdown of the lease loop.
pub(super) cancel: CancellationToken,
}

impl LeaseLoop {
Expand All @@ -40,6 +45,10 @@ impl LeaseLoop {
{
let (message_tx, mut message_rx) = unbounded_channel::<NewMessage>();
let (ack_tx, mut ack_rx) = unbounded_channel();

let weak_message_tx = message_tx.downgrade();
let weak_ack_tx = ack_tx.downgrade();

let shutdown_guard = match options.shutdown_behavior {
// If the subscriber is configured to wait for processing, we do not
// want to break out of the lease loop when the stream drops its
Expand All @@ -48,6 +57,19 @@ impl LeaseLoop {
ShutdownBehavior::WaitForProcessing => Some(message_tx.clone()),
ShutdownBehavior::NackImmediately => None,
};

let cancel = CancellationToken::new();
let cancel_clone = cancel.clone();
tokio::spawn(async move {
// Hold the strong senders for the channels, dropping them when an
// application signals a shutdown. This lets us begin the shutdown
// procedure without requiring the application to `drop(stream)` or
// call `stream.next()`.
cancel_clone.cancelled().await;
drop(message_tx);
drop(ack_tx);
});

let mut state = LeaseState::new(leaser, options);

let handle = tokio::spawn(async move {
Expand Down Expand Up @@ -91,10 +113,23 @@ impl LeaseLoop {
});
LeaseLoop {
handle,
message_tx,
ack_tx,
message_tx: weak_message_tx,
ack_tx: weak_ack_tx,
cancel,
}
}

#[cfg(test)]
#[track_caller]
fn strong_ack_tx(&self) -> UnboundedSender<Action> {
self.ack_tx.upgrade().expect("shutdown has not begun")
}

#[cfg(test)]
#[track_caller]
fn strong_message_tx(&self) -> UnboundedSender<NewMessage> {
self.message_tx.upgrade().expect("shutdown has not begun")
}
}

// Shuts down lease management.
Expand Down Expand Up @@ -155,21 +190,23 @@ mod tests {

// Seed the lease loop with some messages
for i in 0..30 {
lease_loop.message_tx.send(NewMessage {
lease_loop.strong_message_tx().send(NewMessage {
ack_id: test_id(i),
lease_info: exactly_once_info(),
})?;
}

// Ack 10 messages
for i in 0..10 {
lease_loop.ack_tx.send(Action::ExactlyOnceAck(test_id(i)))?;
lease_loop
.strong_ack_tx()
.send(Action::ExactlyOnceAck(test_id(i)))?;
}

// Nack 10 messages
for i in 10..20 {
lease_loop
.ack_tx
.strong_ack_tx()
.send(Action::ExactlyOnceNack(test_id(i)))?;
}

Expand Down Expand Up @@ -212,11 +249,13 @@ mod tests {

// Add a message and confirm ack.
let (result_tx, mut result_rx) = channel();
lease_loop.message_tx.send(NewMessage {
lease_loop.strong_message_tx().send(NewMessage {
ack_id: test_id(0),
lease_info: LeaseInfo::ExactlyOnce(ExactlyOnceInfo::new(result_tx)),
})?;
lease_loop.ack_tx.send(Action::ExactlyOnceAck(test_id(0)))?;
lease_loop
.strong_ack_tx()
.send(Action::ExactlyOnceAck(test_id(0)))?;
let mut ack_results = HashMap::new();
ack_results.insert(test_id(0), Ok(()));
confirmed_tx.send(ack_results)?;
Expand Down Expand Up @@ -259,12 +298,12 @@ mod tests {

// Seed the lease loop with some messages
for i in 0..30 {
lease_loop.message_tx.send(test_message(i))?;
lease_loop.strong_message_tx().send(test_message(i))?;
}

// Ack 10 messages
for i in 0..10 {
lease_loop.ack_tx.send(Action::Ack(test_id(i)))?;
lease_loop.strong_ack_tx().send(Action::Ack(test_id(i)))?;
}

// Confirm initial state
Expand All @@ -287,7 +326,7 @@ mod tests {

// Nack 10 messages
for i in 10..20 {
lease_loop.ack_tx.send(Action::Nack(test_id(i)))?;
lease_loop.strong_ack_tx().send(Action::Nack(test_id(i)))?;
}

// Advance to and validate the second flush
Expand All @@ -307,11 +346,11 @@ mod tests {

// Ack 5 messages
for i in 20..25 {
lease_loop.ack_tx.send(Action::Ack(test_id(i)))?;
lease_loop.strong_ack_tx().send(Action::Ack(test_id(i)))?;
}
// Nack 5 messages
for i in 25..30 {
lease_loop.ack_tx.send(Action::Nack(test_id(i)))?;
lease_loop.strong_ack_tx().send(Action::Nack(test_id(i)))?;
}

// Advance to the third flush
Expand Down Expand Up @@ -361,7 +400,7 @@ mod tests {

// Seed the lease loop with some messages
for i in 0..30 {
lease_loop.message_tx.send(test_message(i))?;
lease_loop.strong_message_tx().send(test_message(i))?;
}

// Confirm initial state
Expand All @@ -384,7 +423,7 @@ mod tests {

// Ack 10 messages
for i in 0..10 {
lease_loop.ack_tx.send(Action::Ack(test_id(i)))?;
lease_loop.strong_ack_tx().send(Action::Ack(test_id(i)))?;
}

// Advance to and validate the second extension
Expand Down Expand Up @@ -416,12 +455,12 @@ mod tests {

// Seed the lease loop with some messages
for i in 0..30 {
lease_loop.message_tx.send(test_message(i))?;
lease_loop.strong_message_tx().send(test_message(i))?;
}

// Ack 10 messages
for i in 0..10 {
lease_loop.ack_tx.send(Action::Ack(test_id(i)))?;
lease_loop.strong_ack_tx().send(Action::Ack(test_id(i)))?;
}

// Drop the lease_loop.
Expand Down Expand Up @@ -458,22 +497,22 @@ mod tests {

// Seed the lease loop with some messages
for i in 0..30 {
lease_loop.message_tx.send(test_message(i))?;
lease_loop.strong_message_tx().send(test_message(i))?;
}

// Ack 10 messages
for i in 0..10 {
lease_loop.ack_tx.send(Action::Ack(test_id(i)))?;
lease_loop.strong_ack_tx().send(Action::Ack(test_id(i)))?;
}

// Nack 10 messages
for i in 10..20 {
lease_loop.ack_tx.send(Action::Nack(test_id(i)))?;
lease_loop.strong_ack_tx().send(Action::Nack(test_id(i)))?;
}

// Shutdown the lease_loop.
drop(lease_loop.message_tx);
drop(lease_loop.ack_tx);
lease_loop.cancel.cancel();
tokio::task::yield_now().await;
lease_loop.handle.await?;

Ok(())
Expand All @@ -495,11 +534,11 @@ mod tests {
..Default::default()
};
let lease_loop = LeaseLoop::new(mock, confirmed_rx, options);
let ack_tx = lease_loop.ack_tx.clone();
let ack_tx = lease_loop.strong_ack_tx();

// Seed the lease loop with some messages
for i in 0..20 {
lease_loop.message_tx.send(test_message(i))?;
lease_loop.strong_message_tx().send(test_message(i))?;
}

// Ack 10 messages
Expand All @@ -508,8 +547,7 @@ mod tests {
}

// Signal and await a shutdown of the lease_loop.
drop(lease_loop.message_tx);
drop(lease_loop.ack_tx);
lease_loop.cancel.cancel();
// Yield execution to the lease loop. If it shuts down now while
// `ack_tx` is still in scope, the test will fail.
tokio::task::yield_now().await;
Expand Down Expand Up @@ -559,17 +597,16 @@ mod tests {

// Seed the lease loop with some messages
for i in 0..30 {
lease_loop.message_tx.send(test_message(i))?;
lease_loop.strong_message_tx().send(test_message(i))?;
}

// Ack 10 messages
for i in 0..10 {
lease_loop.ack_tx.send(Action::Ack(test_id(i)))?;
lease_loop.strong_ack_tx().send(Action::Ack(test_id(i)))?;
}

// Shutdown the lease_loop.
drop(lease_loop.message_tx);
drop(lease_loop.ack_tx);
lease_loop.cancel.cancel();
lease_loop.handle.await?;

// Verify that we flushed the acks immediately, and waited for them to
Expand Down Expand Up @@ -605,9 +642,9 @@ mod tests {
tokio::task::yield_now().await;

// Seed the lease loop with a message
lease_loop.message_tx.send(test_message(1))?;
lease_loop.strong_message_tx().send(test_message(1))?;
// Immediately ack the message
lease_loop.ack_tx.send(Action::Ack(test_id(1)))?;
lease_loop.strong_ack_tx().send(Action::Ack(test_id(1)))?;

// Advance to and validate the first flush
{
Expand Down
20 changes: 3 additions & 17 deletions src/pubsub/src/subscriber/message_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,24 +151,10 @@ impl MessageStream {
handle,
message_tx,
ack_tx,
cancel: shutdown,
} = LeaseLoop::new(leaser, confirmed_rx, options);
let lease_loop = handle.map(|_| ()).boxed().shared();

let weak_message_tx = message_tx.downgrade();
let weak_ack_tx = ack_tx.downgrade();

let shutdown = CancellationToken::new();
let shutdown_clone = shutdown.clone();
let _shutdown_guard = shutdown.clone().drop_guard();
tokio::spawn(async move {
// Hold the strong senders for the channels, dropping them when an
// application signals a shutdown. This lets us begin the shutdown
// procedure without requiring the application to `drop(stream)` or
// call `stream.next()`.
shutdown_clone.cancelled().await;
drop(message_tx);
drop(ack_tx);
});

let initial_req = StreamingPullRequest {
subscription,
Expand All @@ -187,8 +173,8 @@ impl MessageStream {
initial_req,
stream: None,
pool: VecDeque::new(),
message_tx: weak_message_tx,
ack_tx: weak_ack_tx,
message_tx,
ack_tx,
shutdown: shutdown.clone(),
};
Self {
Expand Down
Loading