fix: apply migration compaction and deep job pickups fixes#1
fix: apply migration compaction and deep job pickups fixes#1deepshekhardas wants to merge 5 commits into
Conversation
…er than maxDSSize
There was a problem hiding this comment.
4 issues found across 10 files
Prompt for AI agents (unresolved issues)
Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.
<file name="router/internal/jobiterator/jobiterator.go">
<violation number="1" location="router/internal/jobiterator/jobiterator.go:127">
P1: Use `DSLimitsReached` here; `LimitsReached` is a different jobs/events/payload limit, so the deep-pickup retry won't run when dataset limits are hit.</violation>
</file>
<file name="jobsdb/integration_test.go">
<violation number="1" location="jobsdb/integration_test.go:785">
P3: Assertion message is inconsistent with the asserted length (`3` vs message saying `2`).</violation>
</file>
<file name="jobsdb/migration_test.go">
<violation number="1" location="jobsdb/migration_test.go:816">
P2: Avoid strict duration comparisons in tests; they are nondeterministic and can cause flaky CI failures.</violation>
</file>
<file name="router/batchrouter/handle.go">
<violation number="1" location="router/batchrouter/handle.go:205">
P1: The new deep-pickup loop can run forever when DS limits remain reached, because it is unbounded and uses a non-cancelable context.</violation>
</file>
Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.
| if err != nil { | ||
| panic(err) | ||
| } | ||
| query = ji.state.stats.QueryCount == 1 && len(jobs.Jobs) == 0 && jobs.LimitsReached |
There was a problem hiding this comment.
P1: Use DSLimitsReached here; LimitsReached is a different jobs/events/payload limit, so the deep-pickup retry won't run when dataset limits are hit.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At router/internal/jobiterator/jobiterator.go, line 127:
<comment>Use `DSLimitsReached` here; `LimitsReached` is a different jobs/events/payload limit, so the deep-pickup retry won't run when dataset limits are hit.</comment>
<file context>
@@ -112,21 +112,27 @@ func (ji *Iterator) HasNext() bool {
+ if err != nil {
+ panic(err)
+ }
+ query = ji.state.stats.QueryCount == 1 && len(jobs.Jobs) == 0 && jobs.LimitsReached
}
- ji.state.jobs = allJobsResult.Jobs
</file context>
| query = ji.state.stats.QueryCount == 1 && len(jobs.Jobs) == 0 && jobs.LimitsReached | |
| query = ji.state.stats.QueryCount == 1 && len(jobs.Jobs) == 0 && jobs.DSLimitsReached |
| if err != nil { | ||
| brt.logger.Errorn("BRT: Error while reading from DB", obskit.DestinationType(brt.destType), obskit.Error(err)) | ||
| panic(err) | ||
| for query := true; query; { // keep trying to get jobs while no jobs are returned because ds limits are being reached |
There was a problem hiding this comment.
P1: The new deep-pickup loop can run forever when DS limits remain reached, because it is unbounded and uses a non-cancelable context.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At router/batchrouter/handle.go, line 205:
<comment>The new deep-pickup loop can run forever when DS limits remain reached, because it is unbounded and uses a non-cancelable context.</comment>
<file context>
@@ -202,14 +202,17 @@ func (brt *Handle) getWorkerJobs(partition string) (workerJobs []*DestinationJob
- if err != nil {
- brt.logger.Errorn("BRT: Error while reading from DB", obskit.DestinationType(brt.destType), obskit.Error(err))
- panic(err)
+ for query := true; query; { // keep trying to get jobs while no jobs are returned because ds limits are being reached
+ toProcess, err := misc.QueryWithRetriesAndNotify(context.Background(), brt.jobdDBQueryRequestTimeout.Load(), brt.jobdDBMaxRetries.Load(), func(ctx context.Context) (jobsdb.JobsResult, error) {
+ return brt.jobsDB.GetJobs(ctx, []string{jobsdb.Failed.State, jobsdb.Unprocessed.State}, queryParams)
</file context>
| t.Logf("lock check duration (skip): %v", lockCheckDuration) | ||
|
|
||
| // The second call with skipBefore should be significantly faster | ||
| require.Greater(t, checkDuration, lockCheckDuration, |
There was a problem hiding this comment.
P2: Avoid strict duration comparisons in tests; they are nondeterministic and can cause flaky CI failures.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At jobsdb/migration_test.go, line 816:
<comment>Avoid strict duration comparisons in tests; they are nondeterministic and can cause flaky CI failures.</comment>
<file context>
@@ -651,3 +651,212 @@ func Test_GetColumnConversion(t *testing.T) {
+ t.Logf("lock check duration (skip): %v", lockCheckDuration)
+
+ // The second call with skipBefore should be significantly faster
+ require.Greater(t, checkDuration, lockCheckDuration,
+ "getMigrationList with skipBefore should be faster than without",
+ )
</file context>
| require.Lenf(t, dsList, 2, "dsList length is not 1, got %+v", dsList) | ||
| require.Equal(t, prefix+"_jobs_5_1", dsList[0].JobTable) // 12 jobs | ||
| require.Equal(t, prefix+"_jobs_6", dsList[1].JobTable) // 0 jobs | ||
| require.Lenf(t, dsList, 3, "dsList length is not 2, got %+v", dsList) |
There was a problem hiding this comment.
P3: Assertion message is inconsistent with the asserted length (3 vs message saying 2).
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At jobsdb/integration_test.go, line 785:
<comment>Assertion message is inconsistent with the asserted length (`3` vs message saying `2`).</comment>
<file context>
@@ -780,11 +780,12 @@ func TestJobsDB(t *testing.T) {
- require.Lenf(t, dsList, 2, "dsList length is not 1, got %+v", dsList)
- require.Equal(t, prefix+"_jobs_5_1", dsList[0].JobTable) // 12 jobs
- require.Equal(t, prefix+"_jobs_6", dsList[1].JobTable) // 0 jobs
+ require.Lenf(t, dsList, 3, "dsList length is not 2, got %+v", dsList)
+ require.Equal(t, prefix+"_jobs_4_1", dsList[0].JobTable) // 8 jobs
+ require.Equal(t, prefix+"_jobs_5", dsList[1].JobTable) // 4 jobs
</file context>
| require.Lenf(t, dsList, 3, "dsList length is not 2, got %+v", dsList) | |
| require.Lenf(t, dsList, 3, "dsList length is not 3, got %+v", dsList) |
This PR applies fixes from two key PRs:
PR rudderlabs#6844 - Migration Compaction V2:
PR rudderlabs#6850 - Deep Job Pickups:
All changes have been applied with conflict resolution.
Summary by cubic
Stabilizes migration compaction and router partition switchovers, and adds deep job pickups to prevent job starvation when dataset limits block reads. This improves reliability during heavy migrations.
New Features
processor,router/batchrouter, and the job iterator now retry when reads return no jobs due to DS limits, until work is available.Bug Fixes
jobsdb.getMigrationList: enforcesmaxDSSize, skips oversize pairs, stops piggybacking before the limit; writers keep only the last dataset; defaultjobMinRowsLeftMigrateThresholdraised to 0.6.Written for commit 46f6a87. Summary will update on new commits.