Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,18 @@

// [START pubsub_publisher_retry_settings]
use google_cloud_gax::exponential_backoff::ExponentialBackoffBuilder;
use google_cloud_gax::retry_policy::{AlwaysRetry, RetryPolicyExt};
use google_cloud_gax::retry_policy::RetryPolicyExt;
use google_cloud_pubsub::client::Publisher;
use google_cloud_pubsub::model::Message;
use google_cloud_pubsub::retry_policy::RetryableErrors;
use std::time::Duration;

pub async fn sample(project_id: &str, topic_id: &str) -> anyhow::Result<()> {
let topic_name = format!("projects/{project_id}/topics/{topic_id}");

// Configure custom retry settings.
// In this example, we retry with a time limit of 10 minutes.
let retry_policy = AlwaysRetry.with_time_limit(Duration::from_secs(600));
// In this example, we retry with a time limit of 5 minutes.
let retry_policy = RetryableErrors.with_time_limit(Duration::from_secs(300));

// Configure custom backoff settings.
let backoff_policy = ExponentialBackoffBuilder::new()
Expand Down
1 change: 1 addition & 0 deletions src/pubsub/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ pub mod client {
}

pub mod error;
pub mod retry_policy;

/// Traits to mock the clients in this library.
pub mod stub {
Expand Down
5 changes: 3 additions & 2 deletions src/pubsub/src/publisher/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,9 +188,10 @@ impl PublisherBuilder {
/// ```
/// # use google_cloud_pubsub::client::Publisher;
/// # async fn sample() -> anyhow::Result<()> {
/// use google_cloud_gax::retry_policy::{AlwaysRetry, RetryPolicyExt};
/// use google_cloud_gax::retry_policy::RetryPolicyExt;
/// use google_cloud_pubsub::retry_policy::RetryableErrors;
/// let client = Publisher::builder("projects/my-project/topics/my-topic")
/// .with_retry_policy(AlwaysRetry.with_attempt_limit(3))
/// .with_retry_policy(RetryableErrors.with_attempt_limit(3))
/// .build().await?;
/// # Ok(()) };
/// ```
Expand Down
5 changes: 3 additions & 2 deletions src/pubsub/src/publisher/client_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,10 @@ impl BasePublisherBuilder {
/// ```
/// # use google_cloud_pubsub::client::BasePublisher;
/// # async fn sample() -> anyhow::Result<()> {
/// use google_cloud_gax::retry_policy::{AlwaysRetry, RetryPolicyExt};
/// use google_cloud_gax::retry_policy::RetryPolicyExt;
/// use google_cloud_pubsub::retry_policy::RetryableErrors;
/// let client = BasePublisher::builder()
/// .with_retry_policy(AlwaysRetry.with_attempt_limit(3))
/// .with_retry_policy(RetryableErrors.with_attempt_limit(3))
/// .build()
/// .await?;
/// # Ok(()) };
Expand Down
193 changes: 2 additions & 191 deletions src/pubsub/src/publisher/retry_policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,31 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

//! Defines the retry policies for the Cloud Pub/Sub Publisher.
//!
//! The Pub/Sub service [recommends] retrying several transient error codes.
//!
//! - [Unavailable][Code::Unavailable]
//! - [Internal][Code::Internal]
//! - [Resource Exhausted][Code::ResourceExhausted]
//! - [Aborted][Code::Aborted]
//! - [Deadline Exceeded][Code::DeadlineExceeded]
//! - [Cancelled][Code::Cancelled]
//! - [Unknown][Code::Unknown]
//!
//! [recommends]: https://docs.cloud.google.com/pubsub/docs/reference/error-codes
//! [Code::Unavailable]: google_cloud_gax::error::rpc::Code::Unavailable
//! [Code::Internal]: google_cloud_gax::error::rpc::Code::Internal
//! [Code::ResourceExhausted]: google_cloud_gax::error::rpc::Code::ResourceExhausted
//! [Code::Aborted]: google_cloud_gax::error::rpc::Code::Aborted
//! [Code::DeadlineExceeded]: google_cloud_gax::error::rpc::Code::DeadlineExceeded
//! [Code::Cancelled]: google_cloud_gax::error::rpc::Code::Cancelled
//! [Code::Unknown]: google_cloud_gax::error::rpc::Code::Unknown
//! Defines the internal retry policies for the Cloud Pub/Sub Publisher.

use crate::Error;
use crate::retry_policy::RetryableErrors;
use google_cloud_gax::retry_policy::{RetryPolicy, RetryPolicyExt};
use google_cloud_gax::retry_result::RetryResult;
use google_cloud_gax::retry_state::RetryState;
use std::time::Duration;

/// The default retry policy for the Pub/Sub publisher.
Expand All @@ -46,171 +25,3 @@ use std::time::Duration;
pub(crate) fn default_retry_policy() -> impl RetryPolicy {
RetryableErrors.with_time_limit(Duration::from_secs(600))
}

/// Follows the retry strategy recommended by the Cloud Pub/Sub guides on
/// [error codes].
///
/// This policy must be decorated to limit the duration of the retry loop.
///
/// [error codes]: https://docs.cloud.google.com/pubsub/docs/reference/error-codes
#[derive(Clone, Debug)]
pub struct RetryableErrors;

impl RetryPolicy for RetryableErrors {
fn on_error(&self, _state: &RetryState, error: Error) -> RetryResult {
if error.is_transient_and_before_rpc() {
return RetryResult::Continue(error);
}

if error.is_io() || error.is_timeout() {
return RetryResult::Continue(error);
}

if error.is_transport() && error.http_status_code().is_none() {
// Sometimes gRPC returns a transport error without an HTTP status
// code. We treat all of these as I/O errors and therefore
// retryable.
return RetryResult::Continue(error);
}

// Catch raw HTTP errors that may not have been mapped to a gRPC status.
// - 408: Request Timeout
// - 429: Resource Exhausted
// - 499: Cancelled Request
// - 5xx: Internal Server Error, Bad Gateway, etc.
if let Some(408 | 429 | 499 | 500..600) = error.http_status_code() {
return RetryResult::Continue(error);
}

if let Some(status) = error.status() {
use google_cloud_gax::error::rpc::Code;
return match status.code {
Code::Aborted
| Code::Cancelled
| Code::DeadlineExceeded
| Code::Internal
| Code::ResourceExhausted
| Code::Unavailable
| Code::Unknown => RetryResult::Continue(error),
_ => RetryResult::Permanent(error),
};
}

RetryResult::Permanent(error)
}
}

#[cfg(test)]
mod tests {
use super::*;
use google_cloud_gax::error::rpc::{Code, Status};
use google_cloud_gax::retry_state::RetryState;
use http::HeaderMap;
use test_case::test_case;

#[test]
fn transport_reset() {
let p = RetryableErrors;
assert!(
p.on_error(&RetryState::default(), transport_err())
.is_continue()
);
}

#[test_case(408)]
#[test_case(429)]
#[test_case(499)]
#[test_case(500)]
#[test_case(502)]
#[test_case(503)]
#[test_case(504)]
fn retryable_http(code: u16) {
let p = RetryableErrors;
assert!(
p.on_error(&RetryState::default(), http_error(code))
.is_continue()
);
}

#[test_case(409)]
#[test_case(400)]
#[test_case(404)]
fn permanent_http(code: u16) {
let p = RetryableErrors;
assert!(
p.on_error(&RetryState::default(), http_error(code))
.is_permanent()
);
}

#[test_case(Code::Unavailable)]
#[test_case(Code::Internal)]
#[test_case(Code::Aborted)]
#[test_case(Code::ResourceExhausted)]
#[test_case(Code::DeadlineExceeded)]
#[test_case(Code::Cancelled)]
#[test_case(Code::Unknown)]
fn retryable_grpc(code: Code) {
let p = RetryableErrors;
assert!(
p.on_error(&RetryState::default(), grpc_error(code))
.is_continue()
);
}

#[test_case(Code::NotFound)]
#[test_case(Code::PermissionDenied)]
#[test_case(Code::InvalidArgument)]
fn permanent_grpc(code: Code) {
let p = RetryableErrors;
assert!(
p.on_error(&RetryState::default(), grpc_error(code))
.is_permanent()
);
}

#[test]
fn io() {
let p = RetryableErrors;
assert!(p.on_error(&RetryState::default(), io_error()).is_continue());
}

#[test]
fn permanent_auth() {
let p = RetryableErrors;
let auth_error =
google_cloud_gax::error::CredentialsError::from_msg(false, "permanent auth error");
assert!(
p.on_error(&RetryState::default(), Error::authentication(auth_error))
.is_permanent()
);
}

#[test]
fn transient_auth() {
let p = RetryableErrors;
let auth_error =
google_cloud_gax::error::CredentialsError::from_msg(true, "transient auth error");
assert!(
p.on_error(&RetryState::default(), Error::authentication(auth_error))
.is_continue()
);
}

fn transport_err() -> Error {
Error::transport(HeaderMap::new(), "connection closed")
}

fn http_error(code: u16) -> Error {
Error::http(code, HeaderMap::new(), bytes::Bytes::new())
}

fn grpc_error(code: Code) -> Error {
let status = Status::default().set_code(code).set_message("try again");
Error::service(status)
}

fn io_error() -> Error {
Error::io(gaxi::grpc::tonic::Status::unavailable("try again"))
}
}
Loading
Loading