diff --git a/processor/pipeline_worker.go b/processor/pipeline_worker.go index 8f6e6cb072..1e324e1bf4 100644 --- a/processor/pipeline_worker.go +++ b/processor/pipeline_worker.go @@ -9,7 +9,6 @@ import ( "github.com/rudderlabs/rudder-go-kit/logger" "github.com/rudderlabs/rudder-go-kit/stats" obskit "github.com/rudderlabs/rudder-observability-kit/go/labels" - "github.com/rudderlabs/rudder-server/jobsdb" "github.com/rudderlabs/rudder-server/processor/types" "github.com/rudderlabs/rudder-server/rruntime" "github.com/rudderlabs/rudder-server/utils/tracing" @@ -210,7 +209,7 @@ func (w *pipelineWorker) start() { ctx: subJob.ctx, rsourcesStats: subJob.rsourcesStats, dedupKeys: make(map[string]struct{}), - procErrorJobsByDestID: make(map[string][]*jobsdb.JobT), + procErrorJobsByDestID: make(map[string][]procErrorJob), sourceDupStats: make(map[dupStatKey]int), start: subJob.start, } diff --git a/processor/processor.go b/processor/processor.go index abb56defe8..acc5085182 100644 --- a/processor/processor.go +++ b/processor/processor.go @@ -974,7 +974,7 @@ func getSourceAndDestIDsFromKey(key string) (sourceID, destID string) { return fields[0], fields[1] } -func (proc *Handle) recordEventDeliveryStatus(jobsByDestID map[string][]*jobsdb.JobT) { +func (proc *Handle) recordEventDeliveryStatus(jobsByDestID map[string][]procErrorJob) { for destID, jobs := range jobsByDestID { if !proc.destDebugger.HasUploadEnabled(destID) { continue @@ -2519,7 +2519,7 @@ func (proc *Handle) destinationTransformStage(partition string, in *userTransfor defer proc.stats.statDtransformStageCount(partition).Count(len(in.statusList)) } - procErrorJobsByDestID := make(map[string][]*jobsdb.JobT) + procErrorJobsByDestID := make(map[string][]procErrorJob) var batchDestJobs []*jobsdb.JobT var destJobs []*jobsdb.JobT var droppedJobs []*jobsdb.JobT @@ -2593,6 +2593,13 @@ func (proc *Handle) destinationTransformStage(partition string, in *userTransfor } } +// procErrorJob wraps a jobsdb.JobT with the original parsed events, +// avoiding an expensive marshal/unmarshal round-trip through EventPayload. +type procErrorJob struct { + *jobsdb.JobT + events []types.SingularEventT +} + type storeMessage struct { ctx context.Context trackedUsersReports []*trackedusers.UsersReport @@ -2601,7 +2608,7 @@ type storeMessage struct { batchDestJobs []*jobsdb.JobT droppedJobs []*jobsdb.JobT - procErrorJobsByDestID map[string][]*jobsdb.JobT + procErrorJobsByDestID map[string][]procErrorJob routerDestIDs []string reportMetrics []*reportingtypes.PUReportedMetric @@ -2943,7 +2950,7 @@ func (proc *Handle) userTransformAndFilter(ctx context.Context, partition, srcAn commonMetaData := eventList[0].Metadata.CommonMetadata() reportMetrics := make([]*reportingtypes.PUReportedMetric, 0) - procErrorJobsByDestID := make(map[string][]*jobsdb.JobT) + procErrorJobsByDestID := make(map[string][]procErrorJob) droppedJobs := make([]*jobsdb.JobT, 0) proc.config.configSubscriberLock.RLock() @@ -3140,7 +3147,7 @@ func (proc *Handle) userTransformAndFilter(ctx context.Context, partition, srcAn nonSuccessMetrics := proc.getNonSuccessfulMetrics(response, eventList, commonMetaData, eventsByMessageID, inPU, reportingtypes.USER_TRANSFORMER) droppedJobs = append(droppedJobs, append(proc.getDroppedJobs(response, eventList), append(nonSuccessMetrics.failedJobs, nonSuccessMetrics.filteredJobs...)...)...) if _, ok := procErrorJobsByDestID[destID]; !ok { - procErrorJobsByDestID[destID] = make([]*jobsdb.JobT, 0) + procErrorJobsByDestID[destID] = make([]procErrorJob, 0) } procErrorJobsByDestID[destID] = append(procErrorJobsByDestID[destID], nonSuccessMetrics.failedJobs...) userTransformationStat.numOutputSuccessEvents.Count(len(eventsToTransform)) @@ -3231,7 +3238,7 @@ func (proc *Handle) userTransformAndFilter(ctx context.Context, partition, srcAn nonSuccessMetrics := proc.getNonSuccessfulMetrics(response, eventList, commonMetaData, eventsByMessageID, inPU, reportingtypes.EVENT_FILTER) droppedJobs = append(droppedJobs, append(proc.getDroppedJobs(response, eventsToTransform), append(nonSuccessMetrics.failedJobs, nonSuccessMetrics.filteredJobs...)...)...) if _, ok := procErrorJobsByDestID[destID]; !ok { - procErrorJobsByDestID[destID] = make([]*jobsdb.JobT, 0) + procErrorJobsByDestID[destID] = make([]procErrorJob, 0) } procErrorJobsByDestID[destID] = append(procErrorJobsByDestID[destID], nonSuccessMetrics.failedJobs...) eventsToTransform, successMetrics, successCountMap, successCountMetadataMap = proc.getTransformerEvents(response, commonMetaData, eventsByMessageID, destination, connection, inPU, reportingtypes.EVENT_FILTER) @@ -3325,7 +3332,7 @@ func (proc *Handle) destTransform(ctx context.Context, data userTransformAndFilt data.droppedJobs = append(data.droppedJobs, append(proc.getDroppedJobs(response, data.eventsToTransform), append(nonSuccessMetrics.failedJobs, nonSuccessMetrics.filteredJobs...)...)...) if _, ok := data.procErrorJobsByDestID[destID]; !ok { - data.procErrorJobsByDestID[destID] = make([]*jobsdb.JobT, 0) + data.procErrorJobsByDestID[destID] = make([]procErrorJob, 0) } data.procErrorJobsByDestID[destID] = append(data.procErrorJobsByDestID[destID], nonSuccessMetrics.failedJobs...)