Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion src/cpp/include/lemon/utils/http_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ struct DownloadOptions {
int low_speed_limit = 0; // Minimum bytes/sec before timeout (disabled — 0 = no limit)
int low_speed_time = 0; // Seconds below low_speed_limit before timeout (disabled)
int connect_timeout = 30; // Connection timeout in seconds
int no_progress_timeout = 60; // Seconds without byte progress before aborting (0 = disabled)
bool huggingface_range_retry = true;
bool force_initial_range_request = false;

// Optional content verification. expected_hash accepts plain hex or
// prefixed values like "sha256:<hex>", "sha1:<hex>", or
Expand Down Expand Up @@ -122,7 +125,8 @@ class HttpClient {
size_t resume_from,
ProgressCallback callback,
const std::map<std::string, std::string>& headers,
const DownloadOptions& options);
const DownloadOptions& options,
bool initial_range_request);
};

// Creates a throttled progress callback that prints at most once per second.
Expand Down
121 changes: 99 additions & 22 deletions src/cpp/server/utils/http_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,43 @@ static std::string lower_copy(std::string value) {
return value;
}

static bool starts_with(const std::string& value, const std::string& prefix) {
return value.rfind(prefix, 0) == 0;
}

static bool is_huggingface_resolve_url(const std::string& url) {
const std::string normalized = lower_copy(trim_copy(url));
const bool is_hf_host = starts_with(normalized, "https://huggingface.co/") ||
starts_with(normalized, "https://huggingface.co:") ||
starts_with(normalized, "http://huggingface.co/") ||
starts_with(normalized, "http://huggingface.co:") ||
starts_with(normalized, "https://hf.co/") ||
starts_with(normalized, "https://hf.co:") ||
starts_with(normalized, "http://hf.co/") ||
starts_with(normalized, "http://hf.co:");
return is_hf_host && normalized.find("/resolve/") != std::string::npos;
}

static bool should_use_initial_range_request(const std::string& url,
const DownloadOptions& options,
bool retrying_without_partial) {
return options.force_initial_range_request ||
(retrying_without_partial && options.huggingface_range_retry &&
is_huggingface_resolve_url(url));
}

static int effective_no_progress_timeout(const std::string& url,
const DownloadOptions& options) {
if (options.no_progress_timeout > 0) {
return options.no_progress_timeout;
}
return is_huggingface_resolve_url(url) ? 60 : 0;
}

static size_t curl_off_to_size(curl_off_t value) {
return value > 0 ? static_cast<size_t>(value) : 0;
}

static bool is_hex_digest(const std::string& value, size_t expected_len) {
if (value.size() != expected_len) {
return false;
Expand Down Expand Up @@ -249,10 +286,13 @@ static size_t write_file_callback(void* ptr, size_t size, size_t nmemb, void* st
return written;
}

// Callback for download progress
struct ProgressData {
ProgressCallback callback;
bool cancelled = false; // Set to true when callback returns false
bool cancelled = false;
bool stalled = false;
int no_progress_timeout = 0;
curl_off_t last_downloaded = 0;
std::chrono::steady_clock::time_point last_progress_time = std::chrono::steady_clock::now();
};

static fs::path get_disk_space_probe_path(const fs::path& output_path) {
Expand All @@ -270,25 +310,38 @@ static fs::path get_disk_space_probe_path(const fs::path& output_path) {
return fs::path(".");
}

// CURL progress callback - returns non-zero to abort transfer
static int progress_callback(void* clientp, curl_off_t dltotal, curl_off_t dlnow,
curl_off_t ultotal, curl_off_t ulnow) {
(void)ultotal;
(void)ulnow;

ProgressData* data = static_cast<ProgressData*>(clientp);
if (!data) return 0;

// Check if already cancelled
if (data->cancelled) {
return 1; // Abort transfer
return 1;
}

if (data->callback &&
!data->callback(curl_off_to_size(dlnow), curl_off_to_size(dltotal))) {
data->cancelled = true;
return 1;
}

if (dltotal > 0 && data->callback) {
// Call user callback - returns false to cancel
if (!data->callback(dlnow, dltotal)) {
data->cancelled = true;
return 1; // Abort transfer
const auto now = std::chrono::steady_clock::now();
if (dlnow > data->last_downloaded) {
data->last_downloaded = dlnow;
data->last_progress_time = now;
} else if (data->no_progress_timeout > 0) {
const auto idle_seconds = std::chrono::duration_cast<std::chrono::seconds>(
now - data->last_progress_time).count();
if (idle_seconds >= data->no_progress_timeout) {
data->stalled = true;
return 1;
}
}
return 0; // Continue transfer

return 0;
}

HttpResponse HttpClient::get(const std::string& url,
Expand Down Expand Up @@ -544,7 +597,8 @@ DownloadResult HttpClient::download_attempt(const std::string& url,
size_t resume_from,
ProgressCallback callback,
const std::map<std::string, std::string>& headers,
const DownloadOptions& options) {
const DownloadOptions& options,
bool initial_range_request) {
DownloadResult result;

CURL* curl = curl_easy_init();
Expand Down Expand Up @@ -579,12 +633,17 @@ DownloadResult HttpClient::download_attempt(const std::string& url,

if (resume_from > 0) {
curl_easy_setopt(curl, CURLOPT_RESUME_FROM_LARGE, static_cast<curl_off_t>(resume_from));
} else if (initial_range_request) {
curl_easy_setopt(curl, CURLOPT_RANGE, "0-");
}

const int no_progress_timeout = effective_no_progress_timeout(url, options);
ProgressData* prog_data = nullptr;
if (callback) {
if (callback || no_progress_timeout > 0) {
prog_data = new ProgressData();
prog_data->callback = callback;
prog_data->no_progress_timeout = no_progress_timeout;
prog_data->last_progress_time = std::chrono::steady_clock::now();
curl_easy_setopt(curl, CURLOPT_XFERINFOFUNCTION, progress_callback);
curl_easy_setopt(curl, CURLOPT_XFERINFODATA, prog_data);
curl_easy_setopt(curl, CURLOPT_NOPROGRESS, 0L);
Expand All @@ -602,8 +661,8 @@ DownloadResult HttpClient::download_attempt(const std::string& url,

CURLcode res = curl_easy_perform(curl);

// Check if download was cancelled by user callback
bool was_cancelled = (prog_data && prog_data->cancelled);
bool was_stalled = (prog_data && prog_data->stalled);

curl_off_t downloaded = 0;
curl_off_t total = 0;
Expand All @@ -622,11 +681,28 @@ DownloadResult HttpClient::download_attempt(const std::string& url,

curl_easy_cleanup(curl);

// Clean up progress data
if (prog_data) {
delete prog_data;
}

if (was_stalled) {
size_t current_file_size = 0;
if (fs::exists(output_path_fs)) {
current_file_size = fs::file_size(output_path_fs);
}

result.cancelled = false;
result.can_resume = current_file_size > 0;
std::ostringstream oss;
oss << "Download stalled: no bytes received for "
<< no_progress_timeout << " seconds";
if (current_file_size > 0) {
oss << "\n Partial file size: " << (current_file_size / (1024.0 * 1024.0)) << " MB (resumable)";
}
result.error_message = oss.str();
return result;
}

// Handle user cancellation
if (was_cancelled || res == CURLE_ABORTED_BY_CALLBACK) {
result.cancelled = true;
Expand Down Expand Up @@ -880,20 +956,21 @@ DownloadResult HttpClient::download_file(const std::string& url,

ProgressCallback adjusted_callback = nullptr;
if (callback) {
// Adjust progress to account for resume offset
// curl reports: current = bytes downloaded this session, total = remaining bytes
// We want to show: (resume_offset + current) / (resume_offset + total)
adjusted_callback = [callback, resume_offset](size_t current, size_t total) -> bool {
if (total > 0) { // Only call when we have valid progress info
if (total > 0) {
return callback(resume_offset + current, resume_offset + total);
}
return true; // Continue if no progress info yet
return callback(resume_offset + current, 0);
};
}

// Download to .partial file
const bool retrying_without_partial = (attempt > 0 && resume_offset == 0);
const bool initial_range_request =
should_use_initial_range_request(url, options, retrying_without_partial);

final_result = download_attempt(url, partial_path, resume_offset,
adjusted_callback, headers, options);
adjusted_callback, headers, options,
initial_range_request);

// If cancelled by user, return immediately without retrying
if (final_result.cancelled) {
Expand Down
Loading