Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 106 additions & 26 deletions client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,79 @@ use serde_json::{json, Value};
use std::{
collections::{HashMap, HashSet},
error::Error,
fmt::Write as FmtWrite,
future::Future,
io::{ErrorKind, Write},
path::{Path, PathBuf},
time::{Duration, SystemTime, UNIX_EPOCH},
};
use tokio::time::{sleep, Instant};

fn reqwest_error(err: reqwest::Error, strip_url: bool) -> (String, bool, bool, bool) {
let is_connect = err.is_connect();
let is_timeout = err.is_timeout();
let is_connection_reset = is_connection_reset_err(&err);

let err = if strip_url { err.without_url() } else { err };

let mut out = String::new();
let mut cur: Option<&dyn Error> = Some(&err);

while let Some(e) = cur {
let _ = write!(
out,
"{}{}",
if out.is_empty() {
""
} else {
"\n\nCaused by: "
},
e
);
cur = e.source();
}

(out, is_connect, is_timeout, is_connection_reset)
}

fn map_reqwest<T>(res: Result<T, reqwest::Error>, strip_url: bool) -> Result<T, ClientError> {
res.map_err(|err| {
let (message, is_connect, is_timeout, is_connection_reset) = reqwest_error(err, strip_url);

ClientError::Reqwest {
message,
is_connect,
is_timeout,
is_connection_reset,
}
})
}

trait ReqwestResultExt<T> {
fn map_reqwest_err(self) -> Result<T, ClientError>;
}

impl<T> ReqwestResultExt<T> for Result<T, reqwest::Error> {
fn map_reqwest_err(self) -> Result<T, ClientError> {
map_reqwest(self, false)
}
}

fn is_connection_reset_err(err: &reqwest::Error) -> bool {
let mut source = err.source();

while let Some(source_err) = source {
if let Some(io_err) = source_err.downcast_ref::<std::io::Error>() {
if io_err.kind() == ErrorKind::ConnectionReset {
return true;
}
}
source = source_err.source();
}

false
}

#[derive(Debug, thiserror::Error)]
pub enum ClientError {
#[error("API error from {url} (status {status}): {body}")]
Expand All @@ -36,8 +102,13 @@ pub enum ClientError {
#[error("OSTree error: {0}")]
Ostree(String),

#[error(transparent)]
Reqwest(#[from] reqwest::Error),
#[error("{message}")]
Reqwest {
message: String,
is_connect: bool,
is_timeout: bool,
is_connection_reset: bool,
},

#[error(transparent)]
Json(#[from] serde_json::Error),
Expand All @@ -46,11 +117,27 @@ pub enum ClientError {
Io(#[from] std::io::Error),
}

impl From<reqwest::Error> for ClientError {
fn from(err: reqwest::Error) -> Self {
let (message, is_connect, is_timeout, is_connection_reset) = reqwest_error(err, false);
ClientError::Reqwest {
message,
is_connect,
is_timeout,
is_connection_reset,
}
}
}

impl ClientError {
pub fn is_retryable(&self) -> bool {
match self {
ClientError::Http { status, .. } => *status >= 500,
ClientError::Reqwest(err) => err.is_connect() || err.is_timeout(),
ClientError::Reqwest {
is_connect,
is_timeout,
..
} => *is_connect || *is_timeout,
_ => false,
}
}
Expand All @@ -73,12 +160,12 @@ impl ClientError {
"message": message,
},
}),
ClientError::Reqwest(_)
ClientError::Reqwest { .. }
| ClientError::Json(_)
| ClientError::Io(_)
| ClientError::Ostree(_) => {
let error_type = match self {
ClientError::Reqwest(_) => "reqwest",
ClientError::Reqwest { .. } => "reqwest",
ClientError::Json(_) => "json",
ClientError::Io(_) => "io",
ClientError::Ostree(_) => "ostree",
Expand Down Expand Up @@ -239,21 +326,6 @@ fn is_build_in_use_purge_error(body: &Value) -> bool {
body.get("message").and_then(Value::as_str) == Some(PURGE_IN_USE_MESSAGE)
}

fn is_connection_reset(err: &reqwest::Error) -> bool {
let mut source = err.source();

while let Some(source_err) = source {
if let Some(io_err) = source_err.downcast_ref::<std::io::Error>() {
if io_err.kind() == ErrorKind::ConnectionReset {
return true;
}
}
source = source_err.source();
}

false
}

fn poll_sleep_duration(iterations_since_change: u32) -> Duration {
if iterations_since_change <= 1 {
Duration::from_secs(1)
Expand Down Expand Up @@ -611,7 +683,7 @@ impl ApiClient {
if let Some(body) = body {
request = request.json(body);
}
let response = request.send().await?;
let response = request.send().await.map_reqwest_err()?;
parse_api_response(url, response).await
})
.await
Expand Down Expand Up @@ -738,7 +810,7 @@ impl ApiClient {
.header(CONTENT_TYPE, "application/json")
.body(body.clone())
.build()?;
let response = self.client.execute(request).await?;
let response = self.client.execute(request).await.map_reqwest_err()?;
let status = response.status();

if status.as_u16() != 200 {
Expand All @@ -749,7 +821,11 @@ impl ApiClient {
});
}

Ok(response.json::<MissingObjectsResponse>().await?.missing)
Ok(response
.json::<MissingObjectsResponse>()
.await
.map_reqwest_err()?
.missing)
})
.await
}
Expand Down Expand Up @@ -784,7 +860,8 @@ impl ApiClient {
.bearer_auth(&self.token)
.multipart(form)
.send()
.await?;
.await
.map_reqwest_err()?;
let status = response.status();

if status.as_u16() != 200 {
Expand All @@ -795,7 +872,7 @@ impl ApiClient {
});
}

let _ = response.bytes().await?;
let _ = response.bytes().await.map_reqwest_err()?;
Ok(())
})
.await
Expand Down Expand Up @@ -1385,7 +1462,10 @@ impl<'a> JobPoller<'a> {
return Err(err);
}
}
Err(ClientError::Reqwest(err)) if is_connection_reset(&err) => {}
Err(ClientError::Reqwest {
is_connection_reset: true,
..
}) => {}
Err(err) => return Err(err),
}

Expand Down
Loading