Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 94 additions & 25 deletions crates/lib/docs_rs_storage/src/backends/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::{
types::FileRange,
utils::crc32::crc32_for_path_range,
};
use anyhow::{Context as _, Error};
use anyhow::{Context as _, Error, bail};
use async_stream::try_stream;
use aws_config::BehaviorVersion;
use aws_sdk_s3::{
Expand All @@ -23,12 +23,24 @@ use base64::{Engine as _, engine::general_purpose::STANDARD as b64};
use chrono::Utc;
use docs_rs_headers::{ETag, compute_etag};
use docs_rs_types::CompressionAlgorithm;
use docs_rs_utils::spawn_blocking;
use docs_rs_utils::{retry_backoff, spawn_blocking};
use futures_util::stream::{self, BoxStream, StreamExt, TryStreamExt};
use mime::Mime;
use std::path::Path;
use tokio::fs;
use tracing::error;
use tokio::{fs, time};
use tracing::{error, warn};

// S3 error codes that indicate a transient failure for an individual object
// inside a batch `DeleteObjects` response. The HTTP request itself succeeds
// (200), so the SDK's retry layer never sees these — we have to retry them
// ourselves. See:
// https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html
static RETRYABLE_DELETE_OBJECT_ERROR_CODES: [&str; 4] = [
"InternalError",
"RequestTimeout",
"ServiceUnavailable",
"SlowDown",
];

// error codes to check for when trying to determine if an error is
// a "NOT FOUND" error.
Expand All @@ -52,6 +64,7 @@ const S3_UPLOAD_BUFFER_SIZE: usize = 1024 * 1024; // 1 MiB
// normal uploads only work up to 5 GiB.
const S3_MULTIPART_UPLOAD_THRESHOLD: u64 = 100 * 1024 * 1024; // 100 MiB
const S3_MULTIPART_PART_SIZE: u64 = S3_MULTIPART_UPLOAD_THRESHOLD; // 100 MiB
const S3_DELETE_OBJECTS_LIMIT: usize = 1000;

trait S3ResultExt<T> {
fn convert_errors(self) -> anyhow::Result<T>;
Expand Down Expand Up @@ -88,6 +101,7 @@ pub(crate) struct S3Backend {
bucket: String,
otel_metrics: StorageMetrics,
network_parallelism: usize,
max_retries: u32,
#[cfg(any(test, feature = "testing"))]
temporary: bool,
}
Expand Down Expand Up @@ -122,6 +136,7 @@ impl S3Backend {
otel_metrics,
bucket: config.s3_bucket.clone(),
network_parallelism: config.network_parallelism,
max_retries: config.aws_sdk_max_retries,
#[cfg(any(test, feature = "testing"))]
temporary: config.s3_bucket_is_temporary,
})
Expand Down Expand Up @@ -539,37 +554,91 @@ impl StorageBackendMethods for S3Backend {

async fn delete_prefix(&self, prefix: &str) -> Result<(), Error> {
let stream = self.list_prefix(prefix).await;
let mut chunks = stream.chunks(900); // 1000 is the limit for the delete_objects API

while let Some(batch) = chunks.next().await {
let batch: Vec<_> = batch.into_iter().collect::<anyhow::Result<_>>()?;

let to_delete = Delete::builder()
.set_objects(Some(
batch
.into_iter()
.filter_map(|k| ObjectIdentifier::builder().key(k).build().ok())
.collect(),
))
.build()
.context("could not build delete request")?;
stream
.chunks(S3_DELETE_OBJECTS_LIMIT)
.map(|batch| batch.into_iter().collect::<Result<Vec<_>, Error>>())
.try_for_each_concurrent(Some(self.network_parallelism), |batch| {
self.delete_batch_with_retry(batch)
})
.await
}
}

impl S3Backend {
async fn delete_batch_with_retry(&self, keys: Vec<String>) -> Result<(), Error> {
let mut remaining = keys;
for attempt in 1.. {
let resp = self
.client
.delete_objects()
.bucket(&self.bucket)
.delete(to_delete)
.delete(Self::delete_request(remaining)?)
.send()
.await?;

if let Some(errs) = resp.errors {
for err in &errs {
error!("error deleting file from s3: {:?}", err);
}
let Some(errs) = resp.errors.filter(|e| !e.is_empty()) else {
return Ok(());
};

if attempt > self.max_retries {
bail!(
"errors deleting from s3 failed after {} retries: {:?}",
self.max_retries,
errs
);
}

anyhow::bail!("deleting from s3 failed");
let mut retry_keys = Vec::with_capacity(errs.len());
for aws_sdk_s3::types::Error {
key, code, message, ..
} in errs
{
let retryable = code
.as_deref()
.is_some_and(|c| RETRYABLE_DELETE_OBJECT_ERROR_CODES.contains(&c));
match (retryable, key) {
(true, Some(key)) => retry_keys.push(key),
(_, key) => {
// `delete_prefix` is not best-effort: once any key has a
// non-retryable delete error, the prefix delete failed.
bail!(
"error deleting file {:?} from s3: code={:?}, message={:?}",
key,
code,
message
);
}
}
}

let backoff = retry_backoff(attempt);
warn!(
attempt,
retry_count = retry_keys.len(),
?backoff,
"retrying s3 delete_objects for transient errors",
);
time::sleep(backoff).await;
remaining = retry_keys;
}
Ok(())

unreachable!("unbounded retry loop should return or fail internally")
}

fn delete_request(keys: Vec<String>) -> Result<Delete, Error> {
let objects: Vec<_> = keys
.into_iter()
.map(|key| {
ObjectIdentifier::builder()
.key(key)
.build()
.expect("can never fail, the keys come from list_prefix")
})
.collect();

Delete::builder()
.set_objects(Some(objects))
.build()
.context("could not build delete request")
}
}
13 changes: 10 additions & 3 deletions crates/lib/docs_rs_utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,11 @@ pub fn retry<T>(mut f: impl FnMut() -> Result<T>, max_attempts: u32) -> Result<T
unreachable!()
}

pub fn retry_backoff(failed_attempt: u32) -> Duration {
let sleep_for = 2u32.pow(failed_attempt.max(1));
Duration::from_secs(sleep_for as u64)
}

pub async fn retry_async<T, E, Fut, F: FnMut() -> Fut>(mut f: F, max_attempts: u32) -> Result<T, E>
where
Fut: Future<Output = Result<T, E>>,
Expand All @@ -151,12 +156,14 @@ where
if attempt > max_attempts {
return Err(err);
} else {
let sleep_for = 2u32.pow(attempt);
let backoff = retry_backoff(attempt);
warn!(
"got error on attempt {}, will try again after {}s:\n{:?}",
attempt, sleep_for, err
attempt,
backoff.as_secs(),
err
);
tokio::time::sleep(Duration::from_secs(sleep_for as u64)).await;
tokio::time::sleep(backoff).await;
}
}
}
Expand Down
Loading