diff --git a/src/app/src/renderer/utils/backendInstaller.ts b/src/app/src/renderer/utils/backendInstaller.ts index ab1883383..1545aba82 100644 --- a/src/app/src/renderer/utils/backendInstaller.ts +++ b/src/app/src/renderer/utils/backendInstaller.ts @@ -662,6 +662,34 @@ function extractExplicitBackend(loadBody?: Record): { recipe: s return null; } +/** + * Wait for an in-flight model download started by another caller (or tab) + * instead of starting a second /pull that would race on the server. + */ +async function awaitExistingModelDownload(modelName: string): Promise { + const downloadId = downloadTracker.getStableDownloadId(modelName, 'model'); + downloadTracker.startServerPolling(); + + const serverDownloads = await downloadTracker.hydrateFromServer(); + const active = serverDownloads.find( + item => item.model_name === modelName && + (item.running === true || item.status === 'downloading'), + ); + if (active) { + downloadTracker.applyServerDownload(active); + } else if (!downloadTracker.isActive(modelName)) { + return; + } + + await waitForServerDownloadTerminal( + downloadId, + modelName, + new AbortController(), + () => undefined, + Boolean(active), + ); +} + /** * Universal pre-flight check for all inference requests. * Ensures backend is installed, model is downloaded, and model is loaded — @@ -791,7 +819,12 @@ async function ensureModelReadyInternal( // Step 5: Pull model if not downloaded (shows in Download Manager) if (!isDownloaded) { - await pullModel(modelName, { declaredSizeGB: modelsData[modelName]?.size }); + if (downloadTracker.isActive(modelName) || + await downloadTracker.hasActiveServerDownload(modelName)) { + await awaitExistingModelDownload(modelName); + } else { + await pullModel(modelName, { declaredSizeGB: modelsData[modelName]?.size }); + } } // Step 6: Load model into memory (merge loadBody if provided) diff --git a/src/cpp/include/lemon/model_manager.h b/src/cpp/include/lemon/model_manager.h index cdcf844dc..66ae5be8d 100644 --- a/src/cpp/include/lemon/model_manager.h +++ b/src/cpp/include/lemon/model_manager.h @@ -326,6 +326,13 @@ class ModelManager { bool refresh_user_models_from_disk_for_lookup(const std::string& model_name); void rebuild_public_model_aliases_locked(); + + // One download at a time per model. Concurrent /pull, /load, and auto-load + // paths otherwise race in HttpClient and corrupt shared .partial files. + std::shared_ptr get_model_download_lock(const std::string& model_name); + + std::mutex model_download_locks_mutex_; + std::map> model_download_locks_; }; } // namespace lemon diff --git a/src/cpp/server/model_manager.cpp b/src/cpp/server/model_manager.cpp index 1036e6bb6..7171fc5f0 100644 --- a/src/cpp/server/model_manager.cpp +++ b/src/cpp/server/model_manager.cpp @@ -3137,6 +3137,19 @@ bool ModelManager::is_model_downloaded(const std::string& model_name) { return false; } +std::shared_ptr ModelManager::get_model_download_lock(const std::string& model_name) { + const std::string lock_key = resolve_model_name(model_name); + std::lock_guard guard(model_download_locks_mutex_); + if (auto it = model_download_locks_.find(lock_key); it != model_download_locks_.end()) { + if (auto existing = it->second.lock()) { + return existing; + } + } + auto lock = std::make_shared(); + model_download_locks_[lock_key] = lock; + return lock; +} + void ModelManager::download_registered_model(const ModelInfo& info, bool do_not_upgrade, DownloadProgressCallback progress_callback) { // Cloud models have no local artifacts; "downloading" is a no-op. if (info.recipe == "cloud") { @@ -3144,6 +3157,14 @@ void ModelManager::download_registered_model(const ModelInfo& info, bool do_not_ return; } + auto model_lock = get_model_download_lock(info.model_name); + std::lock_guard download_guard(*model_lock); + + // Another caller may have finished while we waited for the model lock. + if (do_not_upgrade && is_model_downloaded(info.model_name)) { + return; + } + // Use recipe-specific download paths if (info.recipe == "flm") { download_from_flm(info.checkpoint(), do_not_upgrade, progress_callback); diff --git a/src/cpp/server/server.cpp b/src/cpp/server/server.cpp index b95176957..2aa65226a 100644 --- a/src/cpp/server/server.cpp +++ b/src/cpp/server/server.cpp @@ -3426,8 +3426,33 @@ void Server::handle_pull(const httplib::Request& req, httplib::Response& res) { // response exactly as before. stream_download_operation(res, operation); } else { - // Legacy synchronous mode - blocks until complete - model_manager_->download_model(model_name, request_json, do_not_upgrade); + // Legacy synchronous mode - blocks until complete. Route through the + // shared download job registry so sync /pull deduplicates with any + // in-flight server-owned or SSE pull for the same model. + auto operation = [this, model_name, request_json, do_not_upgrade](DownloadProgressCallback progress_cb) { + model_manager_->download_model(model_name, request_json, do_not_upgrade, progress_cb); + }; + auto job = start_download_job("model:" + model_name, "model", model_name, operation); + join_download_job(job); + + std::string error_message; + std::string error_code; + { + std::lock_guard lock(downloads_mutex_); + if (job->status == "error") { + error_message = job->error; + if (job->progress.contains("code") && job->progress["code"].is_string()) { + error_code = job->progress["code"].get(); + } + } + } + + if (!error_message.empty()) { + if (error_code == lemon::kUnknownModelErrorCode) { + throw lemon::UnknownModelError(error_message); + } + throw std::runtime_error(error_message); + } nlohmann::json response = {{"status", "success"}, {"model_name", model_name}}; res.set_content(response.dump(), "application/json"); diff --git a/src/cpp/server/utils/http_client.cpp b/src/cpp/server/utils/http_client.cpp index 471f8040f..9bf99eb8b 100644 --- a/src/cpp/server/utils/http_client.cpp +++ b/src/cpp/server/utils/http_client.cpp @@ -13,6 +13,8 @@ #include #include #include +#include +#include #include #include @@ -768,11 +770,51 @@ DownloadResult HttpClient::download_attempt(const std::string& url, return result; } +// Serialize concurrent downloads to the same output path. Without this, two +// callers (e.g. /pull and /load racing, or duplicate /pull requests) can both +// write to the same .partial file and corrupt the on-disk bytes. +struct PathDownloadLockRegistry { + std::mutex registry_mutex; + std::unordered_map> locks; + + std::shared_ptr acquire(const std::string& output_path) { + const std::string key = download_lock_key(output_path); + std::lock_guard guard(registry_mutex); + if (auto it = locks.find(key); it != locks.end()) { + if (auto existing = it->second.lock()) { + return existing; + } + } + auto lock = std::make_shared(); + locks[key] = lock; + return lock; + } + +private: + static std::string download_lock_key(const std::string& output_path) { + fs::path path = path_from_utf8(output_path); + std::error_code ec; + fs::path normalized = fs::weakly_canonical(path, ec); + if (ec) { + normalized = fs::absolute(path, ec); + if (ec) { + normalized = path; + } + } + return path_to_utf8(normalized); + } +}; + +static PathDownloadLockRegistry g_path_download_locks; + DownloadResult HttpClient::download_file(const std::string& url, const std::string& output_path, ProgressCallback callback, const std::map& headers, const DownloadOptions& options) { + auto path_lock = g_path_download_locks.acquire(output_path); + std::lock_guard path_guard(*path_lock); + DownloadResult final_result; int retry_delay_ms = options.initial_retry_delay_ms; const ExpectedHash expected_hash = parse_expected_hash(options);