diff --git a/crates/lib/docs_rs_storage/src/backends/s3.rs b/crates/lib/docs_rs_storage/src/backends/s3.rs index c1cd6ccd6..699fccab6 100644 --- a/crates/lib/docs_rs_storage/src/backends/s3.rs +++ b/crates/lib/docs_rs_storage/src/backends/s3.rs @@ -8,7 +8,7 @@ use crate::{ types::FileRange, utils::crc32::crc32_for_path_range, }; -use anyhow::{Context as _, Error}; +use anyhow::{Context as _, Error, bail}; use async_stream::try_stream; use aws_config::BehaviorVersion; use aws_sdk_s3::{ @@ -23,12 +23,24 @@ use base64::{Engine as _, engine::general_purpose::STANDARD as b64}; use chrono::Utc; use docs_rs_headers::{ETag, compute_etag}; use docs_rs_types::CompressionAlgorithm; -use docs_rs_utils::spawn_blocking; +use docs_rs_utils::{retry_backoff, spawn_blocking}; use futures_util::stream::{self, BoxStream, StreamExt, TryStreamExt}; use mime::Mime; use std::path::Path; -use tokio::fs; -use tracing::error; +use tokio::{fs, time}; +use tracing::{error, warn}; + +// S3 error codes that indicate a transient failure for an individual object +// inside a batch `DeleteObjects` response. The HTTP request itself succeeds +// (200), so the SDK's retry layer never sees these — we have to retry them +// ourselves. See: +// https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html +static RETRYABLE_DELETE_OBJECT_ERROR_CODES: [&str; 4] = [ + "InternalError", + "RequestTimeout", + "ServiceUnavailable", + "SlowDown", +]; // error codes to check for when trying to determine if an error is // a "NOT FOUND" error. @@ -52,6 +64,7 @@ const S3_UPLOAD_BUFFER_SIZE: usize = 1024 * 1024; // 1 MiB // normal uploads only work up to 5 GiB. const S3_MULTIPART_UPLOAD_THRESHOLD: u64 = 100 * 1024 * 1024; // 100 MiB const S3_MULTIPART_PART_SIZE: u64 = S3_MULTIPART_UPLOAD_THRESHOLD; // 100 MiB +const S3_DELETE_OBJECTS_LIMIT: usize = 1000; trait S3ResultExt { fn convert_errors(self) -> anyhow::Result; @@ -88,6 +101,7 @@ pub(crate) struct S3Backend { bucket: String, otel_metrics: StorageMetrics, network_parallelism: usize, + max_retries: u32, #[cfg(any(test, feature = "testing"))] temporary: bool, } @@ -122,6 +136,7 @@ impl S3Backend { otel_metrics, bucket: config.s3_bucket.clone(), network_parallelism: config.network_parallelism, + max_retries: config.aws_sdk_max_retries, #[cfg(any(test, feature = "testing"))] temporary: config.s3_bucket_is_temporary, }) @@ -539,37 +554,91 @@ impl StorageBackendMethods for S3Backend { async fn delete_prefix(&self, prefix: &str) -> Result<(), Error> { let stream = self.list_prefix(prefix).await; - let mut chunks = stream.chunks(900); // 1000 is the limit for the delete_objects API - - while let Some(batch) = chunks.next().await { - let batch: Vec<_> = batch.into_iter().collect::>()?; - - let to_delete = Delete::builder() - .set_objects(Some( - batch - .into_iter() - .filter_map(|k| ObjectIdentifier::builder().key(k).build().ok()) - .collect(), - )) - .build() - .context("could not build delete request")?; + stream + .chunks(S3_DELETE_OBJECTS_LIMIT) + .map(|batch| batch.into_iter().collect::, Error>>()) + .try_for_each_concurrent(Some(self.network_parallelism), |batch| { + self.delete_batch_with_retry(batch) + }) + .await + } +} +impl S3Backend { + async fn delete_batch_with_retry(&self, keys: Vec) -> Result<(), Error> { + let mut remaining = keys; + for attempt in 1.. { let resp = self .client .delete_objects() .bucket(&self.bucket) - .delete(to_delete) + .delete(Self::delete_request(remaining)?) .send() .await?; - if let Some(errs) = resp.errors { - for err in &errs { - error!("error deleting file from s3: {:?}", err); - } + let Some(errs) = resp.errors.filter(|e| !e.is_empty()) else { + return Ok(()); + }; + + if attempt > self.max_retries { + bail!( + "errors deleting from s3 failed after {} retries: {:?}", + self.max_retries, + errs + ); + } - anyhow::bail!("deleting from s3 failed"); + let mut retry_keys = Vec::with_capacity(errs.len()); + for aws_sdk_s3::types::Error { + key, code, message, .. + } in errs + { + let retryable = code + .as_deref() + .is_some_and(|c| RETRYABLE_DELETE_OBJECT_ERROR_CODES.contains(&c)); + match (retryable, key) { + (true, Some(key)) => retry_keys.push(key), + (_, key) => { + // `delete_prefix` is not best-effort: once any key has a + // non-retryable delete error, the prefix delete failed. + bail!( + "error deleting file {:?} from s3: code={:?}, message={:?}", + key, + code, + message + ); + } + } } + + let backoff = retry_backoff(attempt); + warn!( + attempt, + retry_count = retry_keys.len(), + ?backoff, + "retrying s3 delete_objects for transient errors", + ); + time::sleep(backoff).await; + remaining = retry_keys; } - Ok(()) + + unreachable!("unbounded retry loop should return or fail internally") + } + + fn delete_request(keys: Vec) -> Result { + let objects: Vec<_> = keys + .into_iter() + .map(|key| { + ObjectIdentifier::builder() + .key(key) + .build() + .expect("can never fail, the keys come from list_prefix") + }) + .collect(); + + Delete::builder() + .set_objects(Some(objects)) + .build() + .context("could not build delete request") } } diff --git a/crates/lib/docs_rs_utils/src/lib.rs b/crates/lib/docs_rs_utils/src/lib.rs index befdf8810..762415a32 100644 --- a/crates/lib/docs_rs_utils/src/lib.rs +++ b/crates/lib/docs_rs_utils/src/lib.rs @@ -139,6 +139,11 @@ pub fn retry(mut f: impl FnMut() -> Result, max_attempts: u32) -> Result Duration { + let sleep_for = 2u32.pow(failed_attempt.max(1)); + Duration::from_secs(sleep_for as u64) +} + pub async fn retry_async Fut>(mut f: F, max_attempts: u32) -> Result where Fut: Future>, @@ -151,12 +156,14 @@ where if attempt > max_attempts { return Err(err); } else { - let sleep_for = 2u32.pow(attempt); + let backoff = retry_backoff(attempt); warn!( "got error on attempt {}, will try again after {}s:\n{:?}", - attempt, sleep_for, err + attempt, + backoff.as_secs(), + err ); - tokio::time::sleep(Duration::from_secs(sleep_for as u64)).await; + tokio::time::sleep(backoff).await; } } }