-
Notifications
You must be signed in to change notification settings - Fork 0
fix: apply migration compaction and deep job pickups fixes #1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
a2f3d47
6937e4e
d3430cf
b2a8db1
46f6a87
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -651,3 +651,212 @@ | |
| require.Error(t, err) | ||
| }) | ||
| } | ||
|
|
||
| func TestMigrationMaxDSSizeGuard(t *testing.T) { | ||
| _ = startPostgres(t) | ||
|
|
||
| // newJobDB creates a Handle with the given maxDSSize and jobMinRowsLeftMigrateThreshold. | ||
| newJobDB := func(t *testing.T, maxDSSize int, threshold float64) (*Handle, chan time.Time, *config.Config) { | ||
| t.Helper() | ||
| c := config.New() | ||
| c.Set("JobsDB.maxDSSize", maxDSSize) | ||
| c.Set("JobsDB.jobMinRowsLeftMigrateThreshold", threshold) | ||
| triggerAddNewDS := make(chan time.Time) | ||
| jd := &Handle{ | ||
| TriggerAddNewDS: func() <-chan time.Time { return triggerAddNewDS }, | ||
| TriggerMigrateDS: func() <-chan time.Time { return make(chan time.Time) }, | ||
| config: c, | ||
| } | ||
| require.NoError(t, jd.Setup(ReadWrite, true, strings.ToLower(rand.String(5)))) | ||
| t.Cleanup(jd.TearDown) | ||
| return jd, triggerAddNewDS, c | ||
| } | ||
|
|
||
| // addDS stores `jobs` with len(jobs)-pending marked as succeeded, then triggers addNewDS. | ||
| // Jobs must be a slice of a larger pre-created slice so that their pre-set IDs match the | ||
| // DB-assigned IDs (which start at 1 and increment globally per table). | ||
| addDS := func(t *testing.T, jd *Handle, trigger chan time.Time, jobs []*JobT, pending int) { | ||
| t.Helper() | ||
| require.NoError(t, jd.Store(context.Background(), jobs)) | ||
| if terminal := len(jobs) - pending; terminal > 0 { | ||
| require.NoError(t, jd.UpdateJobStatus(context.Background(), genJobStatuses(jobs[:terminal], "executing"))) | ||
| require.NoError(t, jd.UpdateJobStatus(context.Background(), genJobStatuses(jobs[:terminal], "succeeded"))) | ||
| } | ||
| trigger <- time.Now() | ||
| trigger <- time.Now() | ||
| } | ||
|
|
||
| t.Run("accumulation stops before exceeding maxDSSize", func(t *testing.T) { | ||
| // maxDSSize=10, threshold=0.7 → pair threshold=7 | ||
| // DS1..DS4: 3 pending each (needsPair since 3 < 7) | ||
| // DS5: last DS (exempt) | ||
| // | ||
| // getMigrationList walk: | ||
| // DS1 → waiting | ||
| // DS2 → pair: 3+3=6 ≤ 10, pendingJobsCount=6 | ||
| // DS3 → piggyback: 6+3=9 ≤ 10, pendingJobsCount=9 | ||
| // DS4 → piggyback: 9+3=12 > 10 → break | ||
| // expected: migrateFrom=[DS1,DS2,DS3], pendingJobsCount=9 ≤ maxDSSize | ||
| jd, trigger, c := newJobDB(t, 10, 0.7) | ||
| allJobs := genJobs(defaultWorkspaceID, "test", 50, 1) // 4 regular DSes + 1 last, 10 jobs each | ||
| for i := range 4 { | ||
| addDS(t, jd, trigger, allJobs[i*10:(i+1)*10], 3) | ||
| } | ||
| require.NoError(t, jd.Store(context.Background(), allJobs[40:])) | ||
|
|
||
| dsList := jd.getDSList() | ||
| c.Set("JobsDB."+jd.tablePrefix+"."+"maxMigrateDSProbe", len(dsList)) | ||
|
|
||
| migrateFrom, pendingJobsCount, _, err := jd.getMigrationList(dsList) | ||
| require.NoError(t, err) | ||
| require.Len(t, migrateFrom, 3) | ||
| require.Equal(t, 9, pendingJobsCount) | ||
| require.LessOrEqual(t, pendingJobsCount, 10) | ||
| }) | ||
|
|
||
| t.Run("pair exceeding maxDSSize is discarded", func(t *testing.T) { | ||
| // maxDSSize=10, threshold=0.7 → pair threshold=7 | ||
| // DS1: 6 pending (needsPair since 6 < 7) | ||
| // DS2: 6 pending → 6+6=12 > maxDSSize → waiting cleared, nothing migrates | ||
| // DS3: last DS (exempt) | ||
| jd, trigger, c := newJobDB(t, 10, 0.7) | ||
| allJobs := genJobs(defaultWorkspaceID, "test", 30, 1) // 2 regular DSes + 1 last, 10 jobs each | ||
| addDS(t, jd, trigger, allJobs[:10], 6) | ||
| addDS(t, jd, trigger, allJobs[10:20], 6) | ||
| require.NoError(t, jd.Store(context.Background(), allJobs[20:])) | ||
|
|
||
| dsList := jd.getDSList() | ||
| c.Set("JobsDB."+jd.tablePrefix+"."+"maxMigrateDSProbe", len(dsList)) | ||
|
|
||
| migrateFrom, _, _, err := jd.getMigrationList(dsList) | ||
| require.NoError(t, err) | ||
| require.Empty(t, migrateFrom) | ||
| }) | ||
| } | ||
|
|
||
| func TestMigrationSkipsDatasets(t *testing.T) { | ||
| config.Reset() | ||
| c := config.New() | ||
| c.Set("JobsDB.maxDSSize", 10) | ||
|
|
||
| _ = startPostgres(t) | ||
|
|
||
| triggerAddNewDS := make(chan time.Time) | ||
| triggerMigrateDS := make(chan time.Time) | ||
|
|
||
| jobDB := Handle{ | ||
| TriggerAddNewDS: func() <-chan time.Time { return triggerAddNewDS }, | ||
| TriggerMigrateDS: func() <-chan time.Time { return triggerMigrateDS }, | ||
| config: c, | ||
| } | ||
| tablePrefix := strings.ToLower(rand.String(5)) | ||
| require.NoError(t, jobDB.Setup(ReadWrite, true, tablePrefix)) | ||
| defer jobDB.TearDown() | ||
|
|
||
| const totalDS = 200 | ||
| const eligibleDSPos = 150 // 0-indexed position of the dataset we'll make eligible | ||
|
|
||
| // Create totalDS datasets: store jobs then trigger addNewDS to create the next one | ||
| for i := range totalDS - 1 { // -1 because Setup already creates DS 1 | ||
| _ = i | ||
| require.NoError(t, jobDB.Store(context.Background(), genJobs(defaultWorkspaceID, "test", 10, 1))) | ||
| triggerAddNewDS <- time.Now() | ||
| triggerAddNewDS <- time.Now() | ||
| } | ||
| // Store jobs in the last DS too | ||
| require.NoError(t, jobDB.Store(context.Background(), genJobs(defaultWorkspaceID, "test", 10, 1))) | ||
|
|
||
| dsList := jobDB.getDSList() | ||
| require.Len(t, dsList, totalDS) | ||
|
|
||
| // Make all jobs in the dataset at eligibleDSPos terminal (succeeded) | ||
| eligibleDS := dsList[eligibleDSPos] | ||
| rows, err := jobDB.dbHandle.Query(fmt.Sprintf(`SELECT job_id FROM %q`, eligibleDS.JobTable)) | ||
| require.NoError(t, err) | ||
| var statusJobs []*JobT | ||
| for rows.Next() { | ||
| var id int64 | ||
| require.NoError(t, rows.Scan(&id)) | ||
| statusJobs = append(statusJobs, &JobT{JobID: id}) | ||
| } | ||
| require.NoError(t, rows.Err()) | ||
| _ = rows.Close() | ||
|
|
||
| require.NoError(t, jobDB.UpdateJobStatus(context.Background(), genJobStatuses(statusJobs, "succeeded"))) | ||
|
|
||
| t.Run("intra-invocation skip", func(t *testing.T) { | ||
| // Allow probing enough datasets to reach the eligible one in a single call | ||
| c.Set("JobsDB."+tablePrefix+"."+"maxMigrateDSProbe", totalDS) | ||
|
|
||
| // Measure first getMigrationList call (full scan, no skip) | ||
| checkStart := time.Now() | ||
| checkResult, err := jobDB.getMigrationList(dsList, nil) | ||
|
Check failure on line 793 in jobsdb/migration_test.go
|
||
| checkDuration := time.Since(checkStart) | ||
| require.NoError(t, err) | ||
| require.NotEmpty(t, checkResult.migrateFrom, "should find eligible datasets") | ||
| require.NotNil(t, checkResult.firstEligible, "should have firstEligible set") | ||
|
|
||
| // Measure second getMigrationList call (with skipBefore from first call) | ||
| lockCheckStart := time.Now() | ||
| lockResult, err := jobDB.getMigrationList(dsList, checkResult.firstEligible) | ||
|
Check failure on line 801 in jobsdb/migration_test.go
|
||
| lockCheckDuration := time.Since(lockCheckStart) | ||
| require.NoError(t, err) | ||
| require.NotEmpty(t, lockResult.migrateFrom) | ||
|
|
||
| // Both calls should find the same eligible datasets | ||
| require.Equal(t, len(checkResult.migrateFrom), len(lockResult.migrateFrom)) | ||
| for i := range checkResult.migrateFrom { | ||
| require.Equal(t, checkResult.migrateFrom[i].ds.Index, lockResult.migrateFrom[i].ds.Index) | ||
| } | ||
|
|
||
| t.Logf("check duration (no skip): %v", checkDuration) | ||
| t.Logf("lock check duration (skip): %v", lockCheckDuration) | ||
|
|
||
| // The second call with skipBefore should be significantly faster | ||
| require.Greater(t, checkDuration, lockCheckDuration, | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. P2: Avoid strict duration comparisons in tests; they are nondeterministic and can cause flaky CI failures. Prompt for AI agents |
||
| "getMigrationList with skipBefore should be faster than without", | ||
| ) | ||
| }) | ||
|
|
||
| t.Run("cross-invocation resume", func(t *testing.T) { | ||
| // Set maxMigrateDSProbe to 100 so the first iteration can't reach the | ||
| // eligible dataset at position 150. It will need 2 iterations. | ||
| c.Set("JobsDB."+tablePrefix+"."+"maxMigrateDSProbe", 100) | ||
| jobDB.lastMigrateProbeIndex = nil | ||
|
Check failure on line 825 in jobsdb/migration_test.go
|
||
|
|
||
| // 1st iteration: probes datasets 1..100, finds nothing, hits probe limit. | ||
| // Should store lastMigrateProbeIndex for resumption. | ||
| result1, err := jobDB.getMigrationList(dsList, jobDB.lastMigrateProbeIndex) | ||
|
Check failure on line 829 in jobsdb/migration_test.go
|
||
| require.NoError(t, err) | ||
| require.Empty(t, result1.migrateFrom, "should not find eligible datasets in first 100") | ||
| require.True(t, result1.probeLimitReached, "should hit probe limit") | ||
| require.NotNil(t, result1.lastProbed, "should have lastProbed set") | ||
|
|
||
| // Simulate what doMigrateDS does: save the resume point | ||
| jobDB.lastMigrateProbeIndex = result1.lastProbed | ||
|
Check failure on line 836 in jobsdb/migration_test.go
|
||
|
|
||
| // 2nd iteration: resumes from where the first left off, finds the eligible dataset. | ||
| resumeStart := time.Now() | ||
| result2, err := jobDB.getMigrationList(dsList, jobDB.lastMigrateProbeIndex) | ||
|
Check failure on line 840 in jobsdb/migration_test.go
|
||
| resumeDuration := time.Since(resumeStart) | ||
| require.NoError(t, err) | ||
| require.NotEmpty(t, result2.migrateFrom, "should find eligible datasets in second iteration") | ||
| require.Equal(t, dsList[eligibleDSPos].Index, result2.migrateFrom[0].ds.Index) | ||
|
|
||
| // Compare with a full scan from scratch | ||
| c.Set("JobsDB."+tablePrefix+"."+"maxMigrateDSProbe", totalDS) | ||
| fullStart := time.Now() | ||
| resultFull, err := jobDB.getMigrationList(dsList, nil) | ||
| fullDuration := time.Since(fullStart) | ||
| require.NoError(t, err) | ||
| require.NotEmpty(t, resultFull.migrateFrom) | ||
|
|
||
| t.Logf("full scan duration: %v", fullDuration) | ||
| t.Logf("resumed scan duration: %v", resumeDuration) | ||
|
|
||
| // The resumed scan should be faster than a full scan | ||
| require.Greater(t, fullDuration, resumeDuration, | ||
| "resumed scan should be faster than full scan from scratch", | ||
| ) | ||
| }) | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
P3: Assertion message is inconsistent with the asserted length (
3vs message saying2).Prompt for AI agents