From a2f3d47e96e12d155a328e8da9aa8ee24ec78719 Mon Sep 17 00:00:00 2001 From: deepshekhardas Date: Mon, 4 May 2026 12:21:36 +0530 Subject: [PATCH 1/5] chore: router proxy should switch all requests before acking --- .../partitionmigration_gwproc_test.go | 22 +++++++++++++++++-- testhelper/clustertest/routingproxy.go | 12 +++++----- 2 files changed, 25 insertions(+), 9 deletions(-) diff --git a/integration_test/partitionmigration/partitionmigration_gwproc_test.go b/integration_test/partitionmigration/partitionmigration_gwproc_test.go index d93b569c58..859e608adb 100644 --- a/integration_test/partitionmigration/partitionmigration_gwproc_test.go +++ b/integration_test/partitionmigration/partitionmigration_gwproc_test.go @@ -45,6 +45,23 @@ import ( // 12. Waits for all requests to complete. // 13. Verifies that all requests were received successfully and in order. func TestPartitionMigrationGatewayProcessorMode(t *testing.T) { + for _, tc := range []struct { + name string + extraStressWorkspaces int // number of extra workspace migrations to include (0 = normal mode) + restartProcessorEvery time.Duration // how often to restart processor nodes while migration is ongoing + }{ + {name: "normal", extraStressWorkspaces: 0, restartProcessorEvery: 25 * time.Second}, + {name: "stress_100_workspaces", extraStressWorkspaces: 100, restartProcessorEvery: 30 * time.Second}, + {name: "stress_1000_workspaces", extraStressWorkspaces: 1000, restartProcessorEvery: 35 * time.Second}, + {name: "stress_5000_workspaces", extraStressWorkspaces: 5000, restartProcessorEvery: 50 * time.Second}, + } { + t.Run(tc.name, func(t *testing.T) { + testPartitionMigrationGatewayProcessorMode(t, tc.extraStressWorkspaces, tc.restartProcessorEvery) + }) + } +} + +func testPartitionMigrationGatewayProcessorMode(t *testing.T, extraStressWorkspaces int, restartProcessorEvery time.Duration) { const ( namespace = "namespace123" workspaceID = "workspace123" @@ -54,9 +71,10 @@ func TestPartitionMigrationGatewayProcessorMode(t *testing.T) { numPartitions = 4 // needs to be a power of 2 (e.g., 2, 4, 8, 16, ...) jobsPerPartitionPerSecond = 50 // number of jobs to send per partition per second from the gateway client - restartProcessorEvery = 10 * time.Second // how often to restart processor nodes while migration is ongoing - readExcludeSleep = 5 * time.Second // sleep duration for read exclusion during migration + readExcludeSleep = 15 * time.Second // sleep duration for read exclusion during migration, must not be greater than restartProcessorEvery-5s ) + require.LessOrEqual(t, readExcludeSleep, restartProcessorEvery-5*time.Second, + "readExcludeSleep must not be greater than restartProcessorEvery-5s") // distribute partitions across the 2 nodes equally initialMappings := map[int]int{} diff --git a/testhelper/clustertest/routingproxy.go b/testhelper/clustertest/routingproxy.go index 3af2d10ff4..6db8551e76 100644 --- a/testhelper/clustertest/routingproxy.go +++ b/testhelper/clustertest/routingproxy.go @@ -51,8 +51,8 @@ func NewRoutingProxy(t *testing.T, numPartitions int, partitionMappings map[int] } partitionIdx, _ := partmap.Murmur3Partition32(partitionKey, uint32(numPartitions)) rp.partitionMappingsMu.RLock() + defer rp.partitionMappingsMu.RUnlock() // unlock only after request is processed nodeIndex, ok := rp.partitionMappings[int(partitionIdx)] - rp.partitionMappingsMu.RUnlock() if !ok || nodeIndex < 0 || nodeIndex >= len(rp.backends) { http.Error(w, "no backend for partition", http.StatusBadGateway) return @@ -70,12 +70,10 @@ type routingProxy struct { backends []*httputil.ReverseProxy } -func (rp *routingProxy) UpdatePartitionMapping(partitionIdx, nodeIndex int) { - rp.partitionMappingsMu.Lock() - defer rp.partitionMappingsMu.Unlock() - rp.partitionMappings[partitionIdx] = nodeIndex -} - +// SetPartitionMappings sets the entire partition to node index mapping. This method returns only after acquiring a write lock, +// ensuring that: +// 1. All ongoing requests are processed with the old mapping before the new mapping takes effect. +// 2. Post-return, any new incoming requests will be routed based on the updated mapping. func (rp *routingProxy) SetPartitionMappings(partitionMappings map[int]int) { rp.partitionMappingsMu.Lock() defer rp.partitionMappingsMu.Unlock() From 6937e4eb2a1357c8d81b827852b8e3a845a90d33 Mon Sep 17 00:00:00 2001 From: deepshekhardas Date: Mon, 4 May 2026 13:10:01 +0530 Subject: [PATCH 2/5] chore: increase partition migration test timeouts --- .../partitionmigration/partitionmigration_embedded_test.go | 5 +++-- .../partitionmigration/partitionmigration_gwproc_test.go | 6 +++--- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/integration_test/partitionmigration/partitionmigration_embedded_test.go b/integration_test/partitionmigration/partitionmigration_embedded_test.go index aba1a6df71..86f48d9cfb 100644 --- a/integration_test/partitionmigration/partitionmigration_embedded_test.go +++ b/integration_test/partitionmigration/partitionmigration_embedded_test.go @@ -160,9 +160,10 @@ func TestPartitionMigrationEmbeddedMode(t *testing.T) { "Processor.maxLoopSleep": "1s", "Router.eventOrderKeyThreshold": "0", // we need strict event ordering guarantees for this test "Router.noOfWorkers": strconv.Itoa(numPartitions), - "Router.Network.IncludeInstanceIdInHeader": "true", // for debugging in case of receiving out-of-order events + "Router.Network.IncludeInstanceIdInHeader": "true", // for debugging in case of receiving out-of-order events + "Router.jobIterator.maxQueries": "1", "Gateway.allowPartialWriteWithErrors": "false", // not going through the lecacy gateway path - "PartitionMigration.Processor.SourceNode.readExcludeSleep": "5s", // sleep a bit less than the default one to speed up the test + "PartitionMigration.Processor.SourceNode.readExcludeSleep": "15s", // sleep a bit less than the default one to speed up the test "PartitionMigration.SourceNode.inProgressPollSleep": "1s", // poll faster for test speed // we want to create multiple datasets during the test and ensure that migration works correctly with ds limits as well diff --git a/integration_test/partitionmigration/partitionmigration_gwproc_test.go b/integration_test/partitionmigration/partitionmigration_gwproc_test.go index 859e608adb..92e1eb1307 100644 --- a/integration_test/partitionmigration/partitionmigration_gwproc_test.go +++ b/integration_test/partitionmigration/partitionmigration_gwproc_test.go @@ -50,9 +50,9 @@ func TestPartitionMigrationGatewayProcessorMode(t *testing.T) { extraStressWorkspaces int // number of extra workspace migrations to include (0 = normal mode) restartProcessorEvery time.Duration // how often to restart processor nodes while migration is ongoing }{ - {name: "normal", extraStressWorkspaces: 0, restartProcessorEvery: 25 * time.Second}, + {name: "normal", extraStressWorkspaces: 0, restartProcessorEvery: 30 * time.Second}, {name: "stress_100_workspaces", extraStressWorkspaces: 100, restartProcessorEvery: 30 * time.Second}, - {name: "stress_1000_workspaces", extraStressWorkspaces: 1000, restartProcessorEvery: 35 * time.Second}, + {name: "stress_1000_workspaces", extraStressWorkspaces: 1000, restartProcessorEvery: 40 * time.Second}, {name: "stress_5000_workspaces", extraStressWorkspaces: 5000, restartProcessorEvery: 50 * time.Second}, } { t.Run(tc.name, func(t *testing.T) { @@ -224,7 +224,7 @@ func testPartitionMigrationGatewayProcessorMode(t *testing.T, extraStressWorkspa "Router.eventOrderKeyThreshold": "0", // we need strict event ordering guarantees for this test "Router.noOfWorkers": strconv.Itoa(numPartitions), "Router.Network.IncludeInstanceIdInHeader": "true", // for debugging in case of receiving out-of-order events - + "Router.jobIterator.maxQueries": "1", } rsBinaryPath := filepath.Join(t.TempDir(), "rudder-server-binary") rudderserver.BuildRudderServerBinary(t, "../../main.go", rsBinaryPath) From d3430cf1cc98cbc48c999bb7714fba7eef33a8b0 Mon Sep 17 00:00:00 2001 From: deepshekhardas Date: Mon, 4 May 2026 16:42:16 +0530 Subject: [PATCH 3/5] chore(jobsdb): guard getMigrationList against producing datasets larger than maxDSSize --- jobsdb/integration_test.go | 9 +- jobsdb/jobsdb.go | 2 +- jobsdb/migration.go | 17 ++- jobsdb/migration_test.go | 209 +++++++++++++++++++++++++++++++++++++ 4 files changed, 227 insertions(+), 10 deletions(-) diff --git a/jobsdb/integration_test.go b/jobsdb/integration_test.go index 775bf1e7e8..6390b8a75f 100644 --- a/jobsdb/integration_test.go +++ b/jobsdb/integration_test.go @@ -780,11 +780,12 @@ func TestJobsDB(t *testing.T) { require.NoError(t, err) } - trigger() // jobs_3, jobs_4 & jobs_5 will be migrated to jobs_5_1 + trigger() // jobs_3 & jobs_4 will be migrated to jobs_4_1; jobs_5 stays because adding it would exceed maxDSSize dsList = getDSList() - require.Lenf(t, dsList, 2, "dsList length is not 1, got %+v", dsList) - require.Equal(t, prefix+"_jobs_5_1", dsList[0].JobTable) // 12 jobs - require.Equal(t, prefix+"_jobs_6", dsList[1].JobTable) // 0 jobs + require.Lenf(t, dsList, 3, "dsList length is not 2, got %+v", dsList) + require.Equal(t, prefix+"_jobs_4_1", dsList[0].JobTable) // 8 jobs + require.Equal(t, prefix+"_jobs_5", dsList[1].JobTable) // 4 jobs + require.Equal(t, prefix+"_jobs_6", dsList[2].JobTable) // 0 jobs jobsResult, err = jobDB.GetUnprocessed(context.Background(), GetQueryParams{ CustomValFilters: []string{customVal}, diff --git a/jobsdb/jobsdb.go b/jobsdb/jobsdb.go index d219f97415..cc91379b43 100644 --- a/jobsdb/jobsdb.go +++ b/jobsdb/jobsdb.go @@ -1071,7 +1071,7 @@ func (jd *Handle) loadConfig() { // jobMinRowsLeftMigrateThreshold: A DS with a low number of pending rows should be eligible for migration if the number of pending rows are // less than jobMinRowsLeftMigrateThreshold percent of maxDSSize (e.g. if jobMinRowsLeftMigrateThreshold is 0.5 // then DSs that have less than 50% of maxDSSize pending rows are eligible for migration) - jd.conf.migration.jobMinRowsLeftMigrateThreshold = jd.config.GetReloadableFloat64Var(0.4, jd.configKeys("jobMinRowsLeftMigrateThreshold")...) + jd.conf.migration.jobMinRowsLeftMigrateThreshold = jd.config.GetReloadableFloat64Var(0.6, jd.configKeys("jobMinRowsLeftMigrateThreshold")...) // maxMigrateOnce: Maximum number of DSs that are migrated together into one destination jd.conf.migration.maxMigrateOnce = jd.config.GetReloadableIntVar(10, 1, jd.configKeys("maxMigrateOnce")...) // maxMigrateDSProbe: Maximum number of DSs that are checked from left to right if they are eligible for migration diff --git a/jobsdb/migration.go b/jobsdb/migration.go index e48490af57..5b06ecb96d 100644 --- a/jobsdb/migration.go +++ b/jobsdb/migration.go @@ -455,12 +455,19 @@ func (jd *Handle) getMigrationList(dsList []dataSetT) (migrateFrom []dsWithPendi liveDSCount++ } else { if waiting != nil { // have another dataset waiting for a pair - migrateFrom = append(migrateFrom, *waiting, dsWithPendingJobCount{ds: ds, numJobsPending: recordsLeft}) - insertBeforeDS = dsList[idx+1] - pendingJobsCount += waiting.numJobsPending + recordsLeft - liveDSCount += 2 - waiting = nil + if waiting.numJobsPending+recordsLeft > maxDSSize { + waiting = nil + } else { + migrateFrom = append(migrateFrom, *waiting, dsWithPendingJobCount{ds: ds, numJobsPending: recordsLeft}) + insertBeforeDS = dsList[idx+1] + pendingJobsCount += waiting.numJobsPending + recordsLeft + liveDSCount += 2 + waiting = nil + } } else if pendingJobsCount > 0 { // we already know that we'll be migrating another dataset with pending jobs, so can add this one too + if pendingJobsCount+recordsLeft > maxDSSize { + break // adding this dataset would exceed maxDSSize, leave it for the next migration cycle + } migrateFrom = append(migrateFrom, dsWithPendingJobCount{ds: ds, numJobsPending: recordsLeft}) insertBeforeDS = dsList[idx+1] pendingJobsCount += recordsLeft diff --git a/jobsdb/migration_test.go b/jobsdb/migration_test.go index 4dfa2229aa..654a8b7a6b 100644 --- a/jobsdb/migration_test.go +++ b/jobsdb/migration_test.go @@ -651,3 +651,212 @@ func Test_GetColumnConversion(t *testing.T) { require.Error(t, err) }) } + +func TestMigrationMaxDSSizeGuard(t *testing.T) { + _ = startPostgres(t) + + // newJobDB creates a Handle with the given maxDSSize and jobMinRowsLeftMigrateThreshold. + newJobDB := func(t *testing.T, maxDSSize int, threshold float64) (*Handle, chan time.Time, *config.Config) { + t.Helper() + c := config.New() + c.Set("JobsDB.maxDSSize", maxDSSize) + c.Set("JobsDB.jobMinRowsLeftMigrateThreshold", threshold) + triggerAddNewDS := make(chan time.Time) + jd := &Handle{ + TriggerAddNewDS: func() <-chan time.Time { return triggerAddNewDS }, + TriggerMigrateDS: func() <-chan time.Time { return make(chan time.Time) }, + config: c, + } + require.NoError(t, jd.Setup(ReadWrite, true, strings.ToLower(rand.String(5)))) + t.Cleanup(jd.TearDown) + return jd, triggerAddNewDS, c + } + + // addDS stores `jobs` with len(jobs)-pending marked as succeeded, then triggers addNewDS. + // Jobs must be a slice of a larger pre-created slice so that their pre-set IDs match the + // DB-assigned IDs (which start at 1 and increment globally per table). + addDS := func(t *testing.T, jd *Handle, trigger chan time.Time, jobs []*JobT, pending int) { + t.Helper() + require.NoError(t, jd.Store(context.Background(), jobs)) + if terminal := len(jobs) - pending; terminal > 0 { + require.NoError(t, jd.UpdateJobStatus(context.Background(), genJobStatuses(jobs[:terminal], "executing"))) + require.NoError(t, jd.UpdateJobStatus(context.Background(), genJobStatuses(jobs[:terminal], "succeeded"))) + } + trigger <- time.Now() + trigger <- time.Now() + } + + t.Run("accumulation stops before exceeding maxDSSize", func(t *testing.T) { + // maxDSSize=10, threshold=0.7 → pair threshold=7 + // DS1..DS4: 3 pending each (needsPair since 3 < 7) + // DS5: last DS (exempt) + // + // getMigrationList walk: + // DS1 → waiting + // DS2 → pair: 3+3=6 ≤ 10, pendingJobsCount=6 + // DS3 → piggyback: 6+3=9 ≤ 10, pendingJobsCount=9 + // DS4 → piggyback: 9+3=12 > 10 → break + // expected: migrateFrom=[DS1,DS2,DS3], pendingJobsCount=9 ≤ maxDSSize + jd, trigger, c := newJobDB(t, 10, 0.7) + allJobs := genJobs(defaultWorkspaceID, "test", 50, 1) // 4 regular DSes + 1 last, 10 jobs each + for i := range 4 { + addDS(t, jd, trigger, allJobs[i*10:(i+1)*10], 3) + } + require.NoError(t, jd.Store(context.Background(), allJobs[40:])) + + dsList := jd.getDSList() + c.Set("JobsDB."+jd.tablePrefix+"."+"maxMigrateDSProbe", len(dsList)) + + migrateFrom, pendingJobsCount, _, err := jd.getMigrationList(dsList) + require.NoError(t, err) + require.Len(t, migrateFrom, 3) + require.Equal(t, 9, pendingJobsCount) + require.LessOrEqual(t, pendingJobsCount, 10) + }) + + t.Run("pair exceeding maxDSSize is discarded", func(t *testing.T) { + // maxDSSize=10, threshold=0.7 → pair threshold=7 + // DS1: 6 pending (needsPair since 6 < 7) + // DS2: 6 pending → 6+6=12 > maxDSSize → waiting cleared, nothing migrates + // DS3: last DS (exempt) + jd, trigger, c := newJobDB(t, 10, 0.7) + allJobs := genJobs(defaultWorkspaceID, "test", 30, 1) // 2 regular DSes + 1 last, 10 jobs each + addDS(t, jd, trigger, allJobs[:10], 6) + addDS(t, jd, trigger, allJobs[10:20], 6) + require.NoError(t, jd.Store(context.Background(), allJobs[20:])) + + dsList := jd.getDSList() + c.Set("JobsDB."+jd.tablePrefix+"."+"maxMigrateDSProbe", len(dsList)) + + migrateFrom, _, _, err := jd.getMigrationList(dsList) + require.NoError(t, err) + require.Empty(t, migrateFrom) + }) +} + +func TestMigrationSkipsDatasets(t *testing.T) { + config.Reset() + c := config.New() + c.Set("JobsDB.maxDSSize", 10) + + _ = startPostgres(t) + + triggerAddNewDS := make(chan time.Time) + triggerMigrateDS := make(chan time.Time) + + jobDB := Handle{ + TriggerAddNewDS: func() <-chan time.Time { return triggerAddNewDS }, + TriggerMigrateDS: func() <-chan time.Time { return triggerMigrateDS }, + config: c, + } + tablePrefix := strings.ToLower(rand.String(5)) + require.NoError(t, jobDB.Setup(ReadWrite, true, tablePrefix)) + defer jobDB.TearDown() + + const totalDS = 200 + const eligibleDSPos = 150 // 0-indexed position of the dataset we'll make eligible + + // Create totalDS datasets: store jobs then trigger addNewDS to create the next one + for i := range totalDS - 1 { // -1 because Setup already creates DS 1 + _ = i + require.NoError(t, jobDB.Store(context.Background(), genJobs(defaultWorkspaceID, "test", 10, 1))) + triggerAddNewDS <- time.Now() + triggerAddNewDS <- time.Now() + } + // Store jobs in the last DS too + require.NoError(t, jobDB.Store(context.Background(), genJobs(defaultWorkspaceID, "test", 10, 1))) + + dsList := jobDB.getDSList() + require.Len(t, dsList, totalDS) + + // Make all jobs in the dataset at eligibleDSPos terminal (succeeded) + eligibleDS := dsList[eligibleDSPos] + rows, err := jobDB.dbHandle.Query(fmt.Sprintf(`SELECT job_id FROM %q`, eligibleDS.JobTable)) + require.NoError(t, err) + var statusJobs []*JobT + for rows.Next() { + var id int64 + require.NoError(t, rows.Scan(&id)) + statusJobs = append(statusJobs, &JobT{JobID: id}) + } + require.NoError(t, rows.Err()) + _ = rows.Close() + + require.NoError(t, jobDB.UpdateJobStatus(context.Background(), genJobStatuses(statusJobs, "succeeded"))) + + t.Run("intra-invocation skip", func(t *testing.T) { + // Allow probing enough datasets to reach the eligible one in a single call + c.Set("JobsDB."+tablePrefix+"."+"maxMigrateDSProbe", totalDS) + + // Measure first getMigrationList call (full scan, no skip) + checkStart := time.Now() + checkResult, err := jobDB.getMigrationList(dsList, nil) + checkDuration := time.Since(checkStart) + require.NoError(t, err) + require.NotEmpty(t, checkResult.migrateFrom, "should find eligible datasets") + require.NotNil(t, checkResult.firstEligible, "should have firstEligible set") + + // Measure second getMigrationList call (with skipBefore from first call) + lockCheckStart := time.Now() + lockResult, err := jobDB.getMigrationList(dsList, checkResult.firstEligible) + lockCheckDuration := time.Since(lockCheckStart) + require.NoError(t, err) + require.NotEmpty(t, lockResult.migrateFrom) + + // Both calls should find the same eligible datasets + require.Equal(t, len(checkResult.migrateFrom), len(lockResult.migrateFrom)) + for i := range checkResult.migrateFrom { + require.Equal(t, checkResult.migrateFrom[i].ds.Index, lockResult.migrateFrom[i].ds.Index) + } + + t.Logf("check duration (no skip): %v", checkDuration) + t.Logf("lock check duration (skip): %v", lockCheckDuration) + + // The second call with skipBefore should be significantly faster + require.Greater(t, checkDuration, lockCheckDuration, + "getMigrationList with skipBefore should be faster than without", + ) + }) + + t.Run("cross-invocation resume", func(t *testing.T) { + // Set maxMigrateDSProbe to 100 so the first iteration can't reach the + // eligible dataset at position 150. It will need 2 iterations. + c.Set("JobsDB."+tablePrefix+"."+"maxMigrateDSProbe", 100) + jobDB.lastMigrateProbeIndex = nil + + // 1st iteration: probes datasets 1..100, finds nothing, hits probe limit. + // Should store lastMigrateProbeIndex for resumption. + result1, err := jobDB.getMigrationList(dsList, jobDB.lastMigrateProbeIndex) + require.NoError(t, err) + require.Empty(t, result1.migrateFrom, "should not find eligible datasets in first 100") + require.True(t, result1.probeLimitReached, "should hit probe limit") + require.NotNil(t, result1.lastProbed, "should have lastProbed set") + + // Simulate what doMigrateDS does: save the resume point + jobDB.lastMigrateProbeIndex = result1.lastProbed + + // 2nd iteration: resumes from where the first left off, finds the eligible dataset. + resumeStart := time.Now() + result2, err := jobDB.getMigrationList(dsList, jobDB.lastMigrateProbeIndex) + resumeDuration := time.Since(resumeStart) + require.NoError(t, err) + require.NotEmpty(t, result2.migrateFrom, "should find eligible datasets in second iteration") + require.Equal(t, dsList[eligibleDSPos].Index, result2.migrateFrom[0].ds.Index) + + // Compare with a full scan from scratch + c.Set("JobsDB."+tablePrefix+"."+"maxMigrateDSProbe", totalDS) + fullStart := time.Now() + resultFull, err := jobDB.getMigrationList(dsList, nil) + fullDuration := time.Since(fullStart) + require.NoError(t, err) + require.NotEmpty(t, resultFull.migrateFrom) + + t.Logf("full scan duration: %v", fullDuration) + t.Logf("resumed scan duration: %v", resumeDuration) + + // The resumed scan should be faster than a full scan + require.Greater(t, fullDuration, resumeDuration, + "resumed scan should be faster than full scan from scratch", + ) + }) +} From b2a8db1c9313061aa36c2ae662243753b150b619 Mon Sep 17 00:00:00 2001 From: deepshekhardas Date: Mon, 4 May 2026 16:42:58 +0530 Subject: [PATCH 4/5] chore(jobsdb): keep only last dataset for writers instead of last two --- jobsdb/jobsdb.go | 10 +++++----- jobsdb/migration.go | 10 ++-------- 2 files changed, 7 insertions(+), 13 deletions(-) diff --git a/jobsdb/jobsdb.go b/jobsdb/jobsdb.go index cc91379b43..a649e3ed37 100644 --- a/jobsdb/jobsdb.go +++ b/jobsdb/jobsdb.go @@ -1277,12 +1277,12 @@ func (jd *Handle) doRefreshDSList(l lock.LockToken) (dataSetTList, error) { // report table count metrics before shrinking the datasetList jd.statTableCount.Gauge(len(jd.datasetList)) - // if the owner of this jobsdb is a writer, then shrinking datasetList to have only last two datasets - // this shrank datasetList is used to compute DSRangeList - // This is done because, writers don't care about the left datasets in the sorted datasetList + // If the owner of this jobsdb is a writer, then shrinking datasetList to have only last dataset + // which is being written to. + // Writers only write to the last dataset and if this dataset is full, then create a new dataset. if jd.ownerType == Write { - if len(jd.datasetList) > 2 { - jd.datasetList = jd.datasetList[len(jd.datasetList)-2 : len(jd.datasetList)] + if len(jd.datasetList) > 1 { + jd.datasetList = jd.datasetList[len(jd.datasetList)-1 : len(jd.datasetList)] } } diff --git a/jobsdb/migration.go b/jobsdb/migration.go index 5b06ecb96d..8b764faf45 100644 --- a/jobsdb/migration.go +++ b/jobsdb/migration.go @@ -421,14 +421,8 @@ func (jd *Handle) getMigrationList(dsList []dataSetT) (migrateFrom []dsWithPendi } for idx, ds := range dsList { - var idxCheck bool - if jd.ownerType == Read { - // if jobsdb owner is read, exempting the last two datasets from migration. - // This is done to avoid dsList conflicts between reader and writer - idxCheck = idx == len(dsList)-1 || idx == len(dsList)-2 - } else { - idxCheck = idx == len(dsList)-1 - } + // exempting the last dataset from migration since it is the one being currently written to. + idxCheck := idx == len(dsList)-1 if liveDSCount >= jd.conf.migration.maxMigrateOnce.Load() || pendingJobsCount >= maxDSSize || idxCheck { break From 46f6a8701e7aa3a33339be7f77016744b0fb7217 Mon Sep 17 00:00:00 2001 From: deepshekhardas Date: Mon, 4 May 2026 16:43:07 +0530 Subject: [PATCH 5/5] feat: deep job pickups --- processor/processor.go | 56 +++++++++++----------- router/batchrouter/handle.go | 17 ++++--- router/internal/jobiterator/jobiterator.go | 20 +++++--- 3 files changed, 51 insertions(+), 42 deletions(-) diff --git a/processor/processor.go b/processor/processor.go index abb56defe8..5b3330bf32 100644 --- a/processor/processor.go +++ b/processor/processor.go @@ -3617,20 +3617,26 @@ func (proc *Handle) getJobsStage(ctx context.Context, partition string) jobsdb.J proc.logger.Debugn("Processor DB Read size", logger.NewIntField("maxEventsToProcess", int64(proc.config.maxEventsToProcess.Load()))) - queryParams := jobsdb.GetQueryParams{ - CustomValFilters: []string{proc.config.GWCustomVal}, - JobsLimit: proc.config.maxEventsToProcess.Load(), - EventsLimit: proc.config.maxEventsToProcess.Load(), - PayloadSizeLimit: proc.adaptiveLimit(proc.payloadLimit.Load()), - } - proc.isolationStrategy.AugmentQueryParams(partition, &queryParams) - - unprocessedList, err := misc.QueryWithRetriesAndNotify(context.Background(), proc.jobdDBQueryRequestTimeout.Load(), proc.jobdDBMaxRetries.Load(), func(ctx context.Context) (jobsdb.JobsResult, error) { - return proc.gatewayDB.GetUnprocessed(ctx, queryParams) - }, proc.sendQueryRetryStats) - if err != nil { - proc.logger.Errorn("Failed to get unprocessed jobs from DB", obskit.Error(err)) - panic(err) + var ( + jobs jobsdb.JobsResult + err error + ) + for query := true; query; { // keep trying to get unprocessed jobs while no jobs are returned because ds limits are being reached + queryParams := jobsdb.GetQueryParams{ + CustomValFilters: []string{proc.config.GWCustomVal}, + JobsLimit: proc.config.maxEventsToProcess.Load(), + EventsLimit: proc.config.maxEventsToProcess.Load(), + PayloadSizeLimit: proc.adaptiveLimit(proc.payloadLimit.Load()), + } + proc.isolationStrategy.AugmentQueryParams(partition, &queryParams) + jobs, err = misc.QueryWithRetriesAndNotify(context.Background(), proc.jobdDBQueryRequestTimeout.Load(), proc.jobdDBMaxRetries.Load(), func(ctx context.Context) (jobsdb.JobsResult, error) { + return proc.gatewayDB.GetUnprocessed(ctx, queryParams) + }, proc.sendQueryRetryStats) + if err != nil { + proc.logger.Errorn("Failed to get unprocessed jobs from DB", obskit.Error(err)) + panic(err) + } + query = len(jobs.Jobs) == 0 && jobs.DSLimitsReached } dbReadTime := time.Since(s) @@ -3638,25 +3644,19 @@ func (proc *Handle) getJobsStage(ctx context.Context, partition string) jobsdb.J var firstJob *jobsdb.JobT var lastJob *jobsdb.JobT - if len(unprocessedList.Jobs) > 0 { - firstJob = unprocessedList.Jobs[0] - lastJob = unprocessedList.Jobs[len(unprocessedList.Jobs)-1] + if len(jobs.Jobs) > 0 { + firstJob = jobs.Jobs[0] + lastJob = jobs.Jobs[len(jobs.Jobs)-1] } proc.pipelineDelayStats(partition, firstJob, lastJob) - // check if there is work to be done - if len(unprocessedList.Jobs) == 0 { - proc.logger.Debugn("Processor DB Read Complete. No GW Jobs to process.") - return unprocessedList - } - proc.logger.Debugn("Processor DB Read Complete", - logger.NewIntField("unprocessedJobs", int64(len(unprocessedList.Jobs))), - logger.NewIntField("totalEvents", int64(unprocessedList.EventsCount))) - proc.stats.statGatewayDBR(partition).Count(len(unprocessedList.Jobs)) - proc.stats.statReadStageCount(partition).Count(len(unprocessedList.Jobs)) + logger.NewIntField("unprocessedJobs", int64(len(jobs.Jobs))), + logger.NewIntField("totalEvents", int64(jobs.EventsCount))) + proc.stats.statGatewayDBR(partition).Count(len(jobs.Jobs)) + proc.stats.statReadStageCount(partition).Count(len(jobs.Jobs)) - return unprocessedList + return jobs } func (proc *Handle) markExecuting(ctx context.Context, partition string, jobs []*jobsdb.JobT) error { diff --git a/router/batchrouter/handle.go b/router/batchrouter/handle.go index 26a955ee66..9d7f26e799 100644 --- a/router/batchrouter/handle.go +++ b/router/batchrouter/handle.go @@ -202,14 +202,17 @@ func (brt *Handle) getWorkerJobs(partition string) (workerJobs []*DestinationJob } brt.isolationStrategy.AugmentQueryParams(partition, &queryParams) - toProcess, err := misc.QueryWithRetriesAndNotify(context.Background(), brt.jobdDBQueryRequestTimeout.Load(), brt.jobdDBMaxRetries.Load(), func(ctx context.Context) (jobsdb.JobsResult, error) { - return brt.jobsDB.GetJobs(ctx, []string{jobsdb.Failed.State, jobsdb.Unprocessed.State}, queryParams) - }, brt.sendQueryRetryStats) - if err != nil { - brt.logger.Errorn("BRT: Error while reading from DB", obskit.DestinationType(brt.destType), obskit.Error(err)) - panic(err) + for query := true; query; { // keep trying to get jobs while no jobs are returned because ds limits are being reached + toProcess, err := misc.QueryWithRetriesAndNotify(context.Background(), brt.jobdDBQueryRequestTimeout.Load(), brt.jobdDBMaxRetries.Load(), func(ctx context.Context) (jobsdb.JobsResult, error) { + return brt.jobsDB.GetJobs(ctx, []string{jobsdb.Failed.State, jobsdb.Unprocessed.State}, queryParams) + }, brt.sendQueryRetryStats) + if err != nil { + brt.logger.Errorn("BRT: Error while reading from DB", obskit.DestinationType(brt.destType), obskit.Error(err)) + panic(err) + } + jobs = toProcess.Jobs + query = len(jobs) == 0 && toProcess.DSLimitsReached } - jobs = toProcess.Jobs brtQueryStat.Since(queryStart) sort.Slice(jobs, func(i, j int) bool { diff --git a/router/internal/jobiterator/jobiterator.go b/router/internal/jobiterator/jobiterator.go index 8cd649dc78..705787ed77 100644 --- a/router/internal/jobiterator/jobiterator.go +++ b/router/internal/jobiterator/jobiterator.go @@ -112,21 +112,27 @@ func (ji *Iterator) HasNext() bool { } // try to fetch some more jobs - var err error + var ( + jobs *jobsdb.MoreJobsResult + err error + ) ji.params.JobsLimit = ji.state.jobsLimit ji.state.stats.QueryCount++ - allJobsResult, err := ji.getJobsFn(context.Background(), ji.params, ji.state.continuationToken) - if err != nil { - panic(err) + for query := true; query; { // for getting the first page, keep trying to get jobs while no jobs are returned because ds limits are being reached + jobs, err = ji.getJobsFn(context.Background(), ji.params, ji.state.continuationToken) + if err != nil { + panic(err) + } + query = ji.state.stats.QueryCount == 1 && len(jobs.Jobs) == 0 && jobs.LimitsReached } - ji.state.jobs = allJobsResult.Jobs - ji.state.continuationToken = allJobsResult.More + ji.state.jobs = jobs.Jobs + ji.state.continuationToken = jobs.More jobCount := len(ji.state.jobs) ji.state.jobsLimit -= jobCount ji.state.stats.TotalJobs += jobCount if !ji.state.stats.LimitsReached { - ji.state.stats.LimitsReached = allJobsResult.LimitsReached + ji.state.stats.LimitsReached = jobs.LimitsReached } // reset state