Skip to content

Commit af9f4fa

Browse files
committed
Add automatic retries
1 parent f0f67da commit af9f4fa

File tree

6 files changed

+109
-12
lines changed

6 files changed

+109
-12
lines changed

README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
<!-- [![Join the chat at https://gitter.im/durch/rust-s3](https://badges.gitter.im/durch/rust-s3.svg)](https://gitter.im/durch/rust-s3?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge) -->
77
## rust-s3 [[docs](https://docs.rs/rust-s3/)]
88

9-
Rust library for working with Amazon S3 or arbitrary S3 compatible APIs, fully compatible with **async/await** and `futures ^0.3`. All `async` features can be turned off and sync only implementations can be used.
9+
Rust library for working with Amazon S3 or arbitrary S3 compatible APIs, fully compatible with **async/await** and `futures ^0.3`. All `async` features can be turned off and sync only implementations can be used. All requests are automatically retried once, can be further modified with a `set_retries` call.
1010

1111
### :raised_hands: Support further maintenance and development
1212

s3/src/lib.rs

+43
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
extern crate serde_derive;
66

77
use std::sync::atomic::AtomicBool;
8+
use std::sync::atomic::AtomicU8;
89

910
pub use awscreds as creds;
1011
pub use awsregion as region;
@@ -35,6 +36,48 @@ const EMPTY_PAYLOAD_SHA: &str = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934
3536
#[cfg(not(feature = "disable-call-for-funding"))]
3637
static INITIALIZED: AtomicBool = AtomicBool::new(false);
3738

39+
static RETRIES: AtomicU8 = AtomicU8::new(1);
40+
41+
/// Sets the number of retries for operations that may fail and need to be retried.
42+
///
43+
/// This function stores the specified number of retries in an atomic variable,
44+
/// which can be safely shared across threads. This is used by the retry! macro to automatically retry all requests.
45+
///
46+
/// # Arguments
47+
///
48+
/// * `retries` - The number of retries to set.
49+
///
50+
/// # Example
51+
///
52+
/// ```rust
53+
/// use s3::set_retries;
54+
///
55+
/// set_retries(3);
56+
/// ```
57+
pub fn set_retries(retries: u8) {
58+
RETRIES.store(retries, std::sync::atomic::Ordering::SeqCst);
59+
}
60+
61+
/// Retrieves the current number of retries set for operations.
62+
///
63+
/// This function loads the value of the atomic variable storing the number of retries,
64+
/// which can be safely accessed across threads.
65+
///
66+
/// # Returns
67+
///
68+
/// The number of retries currently set, as a `u64`.
69+
///
70+
/// # Example
71+
///
72+
/// ```rust
73+
/// use s3::get_retries;
74+
///
75+
/// let retries = get_retries();
76+
/// ```
77+
pub fn get_retries() -> u64 {
78+
RETRIES.load(std::sync::atomic::Ordering::Relaxed) as u64
79+
}
80+
3881
#[cfg(not(feature = "disable-call-for-funding"))]
3982
#[inline(always)]
4083
pub(crate) fn init_once() {

s3/src/request/async_std_backend.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ impl<'a> Request for SurfRequest<'a> {
8585
}
8686

8787
async fn response_data(&self, etag: bool) -> Result<ResponseData, S3Error> {
88-
let mut response = self.response().await?;
88+
let mut response = crate::retry! {self.response().await}?;
8989
let status_code = response.status();
9090

9191
let response_headers = response
@@ -120,7 +120,7 @@ impl<'a> Request for SurfRequest<'a> {
120120
) -> Result<u16, S3Error> {
121121
let mut buffer = Vec::new();
122122

123-
let response = self.response().await?;
123+
let response = crate::retry! {self.response().await}?;
124124

125125
let status_code = response.status();
126126

@@ -135,7 +135,7 @@ impl<'a> Request for SurfRequest<'a> {
135135

136136
async fn response_header(&self) -> Result<(HeaderMap, u16), S3Error> {
137137
let mut header_map = HeaderMap::new();
138-
let response = self.response().await?;
138+
let response = crate::retry! {self.response().await}?;
139139
let status_code = response.status();
140140

141141
for (name, value) in response.iter() {
@@ -150,7 +150,7 @@ impl<'a> Request for SurfRequest<'a> {
150150
}
151151

152152
async fn response_data_to_stream(&self) -> Result<ResponseDataStream, S3Error> {
153-
let mut response = self.response().await?;
153+
let mut response = crate::retry! {self.response().await}?;
154154
let status_code = response.status();
155155

156156
let body = response

s3/src/request/blocking.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ impl<'a> Request for AttoRequest<'a> {
8383
}
8484

8585
fn response_data(&self, etag: bool) -> Result<ResponseData, S3Error> {
86-
let response = self.response()?;
86+
let response = crate::retry! {self.response()}?;
8787
let status_code = response.status().as_u16();
8888

8989
let response_headers = response
@@ -112,7 +112,7 @@ impl<'a> Request for AttoRequest<'a> {
112112
}
113113

114114
fn response_data_to_writer<T: Write + ?Sized>(&self, writer: &mut T) -> Result<u16, S3Error> {
115-
let mut response = self.response()?;
115+
let mut response = crate::retry! {self.response()}?;
116116

117117
let status_code = response.status();
118118
io::copy(&mut response, writer)?;
@@ -121,7 +121,7 @@ impl<'a> Request for AttoRequest<'a> {
121121
}
122122

123123
fn response_header(&self) -> Result<(Self::HeaderMap, u16), S3Error> {
124-
let response = self.response()?;
124+
let response = crate::retry! {self.response()}?;
125125
let status_code = response.status().as_u16();
126126
let headers = response.headers().clone();
127127
Ok((headers, status_code))

s3/src/request/tokio_backend.rs

+5-4
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use crate::bucket::Bucket;
1313
use crate::command::Command;
1414
use crate::command::HttpMethod;
1515
use crate::error::S3Error;
16+
use crate::retry;
1617
use crate::utils::now_utc;
1718

1819
use tokio_stream::StreamExt;
@@ -113,7 +114,7 @@ impl<'a> Request for ReqwestRequest<'a> {
113114
}
114115

115116
async fn response_data(&self, etag: bool) -> Result<ResponseData, S3Error> {
116-
let response = self.response().await?;
117+
let response = retry! {self.response().await }?;
117118
let status_code = response.status().as_u16();
118119
let mut headers = response.headers().clone();
119120
let response_headers = headers
@@ -145,7 +146,7 @@ impl<'a> Request for ReqwestRequest<'a> {
145146
writer: &mut T,
146147
) -> Result<u16, S3Error> {
147148
use tokio::io::AsyncWriteExt;
148-
let response = self.response().await?;
149+
let response = retry! {self.response().await}?;
149150

150151
let status_code = response.status();
151152
let mut stream = response.bytes_stream();
@@ -158,7 +159,7 @@ impl<'a> Request for ReqwestRequest<'a> {
158159
}
159160

160161
async fn response_data_to_stream(&self) -> Result<ResponseDataStream, S3Error> {
161-
let response = self.response().await?;
162+
let response = retry! {self.response().await}?;
162163
let status_code = response.status();
163164
let stream = response.bytes_stream().map_err(S3Error::Reqwest);
164165

@@ -169,7 +170,7 @@ impl<'a> Request for ReqwestRequest<'a> {
169170
}
170171

171172
async fn response_header(&self) -> Result<(Self::HeaderMap, u16), S3Error> {
172-
let response = self.response().await?;
173+
let response = retry! {self.response().await}?;
173174
let status_code = response.status().as_u16();
174175
let headers = response.headers().clone();
175176
Ok((headers, status_code))

s3/src/utils/mod.rs

+53
Original file line numberDiff line numberDiff line change
@@ -374,6 +374,59 @@ pub(crate) fn error_from_response_data(response_data: ResponseData) -> Result<S3
374374
))
375375
}
376376

377+
/// Retries a given expression a specified number of times with exponential backoff.
378+
///
379+
/// This macro attempts to execute the provided expression up to `N` times, where `N`
380+
/// is the value set by `set_retries`. If the expression returns `Ok`, it returns the value.
381+
/// If the expression returns `Err`, it logs a warning and retries after a delay that increases
382+
/// exponentially with each retry.
383+
///
384+
/// The delay between retries is calculated as `1 * retry_cnt.pow(2)` seconds, where `retry_cnt`
385+
/// is the current retry attempt.
386+
///
387+
/// This macro supports both asynchronous and synchronous contexts:
388+
/// - For `tokio` users, it uses `tokio::time::sleep`.
389+
/// - For `async-std` users, it uses `async_std::task::sleep`.
390+
/// - For synchronous contexts, it uses `std::thread::sleep`.
391+
///
392+
/// # Features
393+
///
394+
/// - `with-tokio`: Uses `tokio::time::sleep` for async retries.
395+
/// - `with-async-std`: Uses `async_std::task::sleep` for async retries.
396+
/// - `sync`: Uses `std::thread::sleep` for sync retries.
397+
///
398+
/// # Errors
399+
///
400+
/// If all retry attempts fail, the last error is returned.
401+
#[macro_export]
402+
macro_rules! retry {
403+
($e:expr) => {{
404+
let mut retry_cnt: u64 = 0;
405+
let max_retries = $crate::get_retries();
406+
407+
loop {
408+
match $e {
409+
Ok(v) => break Ok(v),
410+
Err(e) => {
411+
log::warn!("Retrying {e}");
412+
if retry_cnt >= max_retries {
413+
break Err(e);
414+
}
415+
retry_cnt += 1;
416+
let delay = std::time::Duration::from_secs(1 * retry_cnt.pow(2));
417+
#[cfg(feature = "with-tokio")]
418+
tokio::time::sleep(delay).await;
419+
#[cfg(feature = "with-async-std")]
420+
async_std::task::sleep(delay).await;
421+
#[cfg(feature = "sync")]
422+
std::thread::sleep(delay);
423+
continue;
424+
}
425+
}
426+
}
427+
}};
428+
}
429+
377430
#[cfg(test)]
378431
mod test {
379432
use crate::utils::etag_for_path;

0 commit comments

Comments
 (0)