SQ-874: Ensure dimensions update can only run once in parallel per project#80
Draft
brinkdp wants to merge 14 commits into
Draft
SQ-874: Ensure dimensions update can only run once in parallel per project#80brinkdp wants to merge 14 commits into
brinkdp wants to merge 14 commits into
Conversation
To facilitate reuse of Celery task names across the codebase, e.g. in db entries
Uses the CRUD fns to determine if there are existing active contenders for dimensions update for the project, and if so, returns that job id instead of creating a new one. This should prevent multiple dimensions update jobs from being enqueued at the same time for the same project, which can cause issues since these jobs are not designed to run concurrently.
For the case when a new submission loses the race against an already active dimensions update job
Also more task name class to its own file to avoid circular imports
Allows the user to see a message in their task history with the job id of another dimensions update task if their task was marked as a duplicate of that other task.
Seemed to occur when running the full test stack with pytest -s -v. Did not notice it when running the test file individually.
Contributor
There was a problem hiding this comment.
Pull request overview
Implements project-scoped deduplication for VCF dimensions updates so that, per project, only one “dimensions update” job is effectively allowed to proceed at a time (API-side submission gates + worker-side early-exit gate), reducing duplicate VCF downloads and duplicate worker computation.
Changes:
- Add API submission “reservation + winner” logic for dimensions updates using
TaskHistoryDB(provisional rows + re-check) and return a structured submit result (job_id+outcome). - Add worker-side “race winner” resolution using
TaskHistoryDB+CeleryTaskMetato skip duplicates before the first expensive operation. - Add schema/model/migration support (
task_nameonTaskHistoryDB), centralize Celery task names, and expand CLI + test coverage for the new behavior.
Reviewed changes
Copilot reviewed 14 out of 14 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
packages/divbase-api/src/divbase_api/routes/vcf_dimensions.py |
Adds API submission gates and changes PUT response to DimensionsUpdateSubmitResult. |
packages/divbase-api/src/divbase_api/crud/task_history.py |
Adds provisional reservation CRUD + contender detection query for dimensions updates. |
packages/divbase-api/src/divbase_api/worker/crud_dimensions.py |
Adds worker-side winner resolution across TaskHistoryDB/CeleryTaskMeta. |
packages/divbase-api/src/divbase_api/worker/tasks.py |
Adds Gate 3 worker check to skip duplicate dimensions updates early; uses centralized task names. |
packages/divbase-api/src/divbase_api/worker/task_names.py |
Introduces TaskName enum as a single source of truth for Celery task names. |
packages/divbase-api/src/divbase_api/models/task_history.py |
Adds task_name column to TaskHistoryDB model. |
packages/divbase-api/src/divbase_api/migrations/versions/2026-03-13_task_name_to_task_history.py |
Alembic migration adding task_name column + index. |
packages/divbase-api/src/divbase_api/crud/crud_constants.py |
Adds shared TTL/status constants used by both API and worker concurrency logic. |
packages/divbase-lib/src/divbase_lib/api_schemas/vcf_dimensions.py |
Extends dimensions task result schema and adds submit-result schema. |
packages/divbase-cli/src/divbase_cli/cli_commands/dimensions_cli.py |
Updates CLI to parse/handle new submit-result response (new vs existing). |
packages/divbase-cli/src/divbase_cli/display_task_history.py |
Adds display handling for skipped_duplicate dimensions task result. |
tests/unit/divbase_api/test_vcf_dimensions_concurrency_task_submission.py |
Unit tests for API submission gates (existing job reuse, winner/loser behavior). |
tests/unit/divbase_api/test_vcf_dimensions_concurrency_worker.py |
Unit tests for worker-side winner resolution and duplicate skipping result. |
tests/e2e_integration/cli_commands/test_dimensions_cli.py |
Adds an e2e test that submits two near-simultaneous update requests and asserts deduplication. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Comment on lines
+172
to
+174
| TaskHistoryDB.task_name == TaskName.UPDATE_VCF_DIMENSIONS.value, | ||
| or_( | ||
| and_( |
Comment on lines
+304
to
+313
| current_job_stmt = select(TaskHistoryDB.id).where( | ||
| TaskHistoryDB.project_id == project_id, | ||
| TaskHistoryDB.task_name == TaskName.UPDATE_VCF_DIMENSIONS.value, | ||
| TaskHistoryDB.task_id == task_id, | ||
| ) | ||
| current_job_id = db.execute(current_job_stmt).scalar_one_or_none() | ||
|
|
||
| # Avoid blocking task if there is CeleryTaskMeta drift. | ||
| if current_job_id is None: | ||
| return True, None, None |
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
This PR regards the idea we discussed in #74: "not letting more than 1 dimensions update job be run at any time for 1 project".
This was a little more involved than I first thought. Fun to think about, nevertheless.
Strategy
The risk with the dimensions update command is not really the database entries since it will just update the data with the same values. But it will trigger duplicate VCF downloads and duplicate dimensions calculation in the workers, which is a waste of resources.
One strategy often mentioned online is use row/advisory lock on the postgres table instead of the default MVCC (Multi-Version Concurrency Control). That could backfire with row deadlocks if not implemented well and if unlucky. I was not to keen on this at the moment.
The strategy I went is requires more code, but has for has minimal changes to the db models. Here I am using layered concurrency guards/gates to try to create a Swiss cheese model. The idea is that a single gate will not catch all race conditions, but layered together, they will catch a lot of them. And if a duplicate job still slips through despite all this, well, then the cost is "just" wasted resources. Each gate is a db lookup to see if there is already a concurrent dimensions update job for the given project, with the reasoning that db lookups are cheap compared to the download and calculation..
Implementation
Concurrent tasks submissions can happen in many ways. For instance:
Here I considered these race condition windows. There are probably more, but that would be more granular.
TaskHistoryDBCeleryTaskMetaentry is createdI added
task_nametoTaskHistoryDBto be able to identify dimensions update tasks already from this table (before this was only present inCeleryTaskMeta.I then implemented concurrency gates:
Gate 1 (API endpoint):
TaskHistoryDB.TaskHistoryDB, do not enqueue an dimensions update task to the brokerTaskHistoryDBwithtask_id=None. It hasprovisional_entry_ttl_seconds=120to make a provisional dimensions task “active” for 2 minutes. This was somewhat arbitrarily selected, mostly to avoid a deadlock against stale tasks.TaskHistoryDB.Gate 2 (API endpoint):
TaskHistoryDB, check again if there is another concurrent dimensions update task for the project. The is the second layer in the Swiss cheese.CeleryTaskMetaentry and a celery UUID as usual. ItsTaskHistoryDBentry will be updated fromtask_id=Nonetotask_id=<celery_UUID>.TaskHistoryDB.Gate 3 (Worker):
TaskHistoryDBentries for other dimensions update tasks as before, but also checkCeleryTaskMetafor jobs that have running status in celery.skipped_duplicate. This tasks will have run in a worker and taken up some resources in that manner, but will stop before heavy I/O compute step of the task.Testing for this is a little tricky since it is about concurrent requests. I learned about a cool way of how to use threads for e2e tests to try to fire two API requests at the same time.
Will take a review from Copilot first.
TODOs:
non_indexed_vcfs. Could this be implemented in a lightweight way in the API too?