Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions processor/pipeline_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"
obskit "github.com/rudderlabs/rudder-observability-kit/go/labels"
"github.com/rudderlabs/rudder-server/jobsdb"
"github.com/rudderlabs/rudder-server/processor/types"
"github.com/rudderlabs/rudder-server/rruntime"
"github.com/rudderlabs/rudder-server/utils/tracing"
Expand Down Expand Up @@ -210,7 +209,7 @@ func (w *pipelineWorker) start() {
ctx: subJob.ctx,
rsourcesStats: subJob.rsourcesStats,
dedupKeys: make(map[string]struct{}),
procErrorJobsByDestID: make(map[string][]*jobsdb.JobT),
procErrorJobsByDestID: make(map[string][]procErrorJob),
sourceDupStats: make(map[dupStatKey]int),
start: subJob.start,
}
Expand Down
21 changes: 14 additions & 7 deletions processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -974,7 +974,7 @@
return fields[0], fields[1]
}

func (proc *Handle) recordEventDeliveryStatus(jobsByDestID map[string][]*jobsdb.JobT) {
func (proc *Handle) recordEventDeliveryStatus(jobsByDestID map[string][]procErrorJob) {
for destID, jobs := range jobsByDestID {
if !proc.destDebugger.HasUploadEnabled(destID) {
continue
Expand Down Expand Up @@ -2519,7 +2519,7 @@
defer proc.stats.statDtransformStageCount(partition).Count(len(in.statusList))
}

procErrorJobsByDestID := make(map[string][]*jobsdb.JobT)
procErrorJobsByDestID := make(map[string][]procErrorJob)
var batchDestJobs []*jobsdb.JobT
var destJobs []*jobsdb.JobT
var droppedJobs []*jobsdb.JobT
Expand Down Expand Up @@ -2567,7 +2567,7 @@
routerDestIDs = lo.Assign(routerDestIDs, o.routerDestIDs)
in.reportMetrics = append(in.reportMetrics, o.reportMetrics...)
for k, v := range o.errorsPerDestID {
procErrorJobsByDestID[k] = append(procErrorJobsByDestID[k], v...)

Check failure on line 2570 in processor/processor.go

View workflow job for this annotation

GitHub Actions / Integration (oss)

cannot use v (variable of type []*jobsdb.JobT) as []procErrorJob value in argument to append

Check failure on line 2570 in processor/processor.go

View workflow job for this annotation

GitHub Actions / Integration (enterprise)

cannot use v (variable of type []*jobsdb.JobT) as []procErrorJob value in argument to append

Check failure on line 2570 in processor/processor.go

View workflow job for this annotation

GitHub Actions / lint

cannot use v (variable of type []*jobsdb.JobT) as []procErrorJob value in argument to append

Check failure on line 2570 in processor/processor.go

View workflow job for this annotation

GitHub Actions / Unit

cannot use v (variable of type []*jobsdb.JobT) as []procErrorJob value in argument to append
}
}

Expand All @@ -2593,6 +2593,13 @@
}
}

// procErrorJob wraps a jobsdb.JobT with the original parsed events,
// avoiding an expensive marshal/unmarshal round-trip through EventPayload.
type procErrorJob struct {
*jobsdb.JobT
events []types.SingularEventT

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: procErrorJob.events is introduced but never used; delivery-status logic still unmarshals EventPayload, so the intended round-trip optimization is not applied.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At processor/processor.go, line 2600:

<comment>`procErrorJob.events` is introduced but never used; delivery-status logic still unmarshals `EventPayload`, so the intended round-trip optimization is not applied.</comment>

<file context>
@@ -2593,6 +2593,13 @@ func (proc *Handle) destinationTransformStage(partition string, in *userTransfor
+// avoiding an expensive marshal/unmarshal round-trip through EventPayload.
+type procErrorJob struct {
+	*jobsdb.JobT
+	events []types.SingularEventT
+}
+
</file context>

}

type storeMessage struct {
ctx context.Context
trackedUsersReports []*trackedusers.UsersReport
Expand All @@ -2601,7 +2608,7 @@
batchDestJobs []*jobsdb.JobT
droppedJobs []*jobsdb.JobT

procErrorJobsByDestID map[string][]*jobsdb.JobT
procErrorJobsByDestID map[string][]procErrorJob
routerDestIDs []string

reportMetrics []*reportingtypes.PUReportedMetric
Expand Down Expand Up @@ -2943,7 +2950,7 @@
commonMetaData := eventList[0].Metadata.CommonMetadata()

reportMetrics := make([]*reportingtypes.PUReportedMetric, 0)
procErrorJobsByDestID := make(map[string][]*jobsdb.JobT)
procErrorJobsByDestID := make(map[string][]procErrorJob)
droppedJobs := make([]*jobsdb.JobT, 0)

proc.config.configSubscriberLock.RLock()
Expand Down Expand Up @@ -3140,9 +3147,9 @@
nonSuccessMetrics := proc.getNonSuccessfulMetrics(response, eventList, commonMetaData, eventsByMessageID, inPU, reportingtypes.USER_TRANSFORMER)
droppedJobs = append(droppedJobs, append(proc.getDroppedJobs(response, eventList), append(nonSuccessMetrics.failedJobs, nonSuccessMetrics.filteredJobs...)...)...)
if _, ok := procErrorJobsByDestID[destID]; !ok {
procErrorJobsByDestID[destID] = make([]*jobsdb.JobT, 0)
procErrorJobsByDestID[destID] = make([]procErrorJob, 0)
}
procErrorJobsByDestID[destID] = append(procErrorJobsByDestID[destID], nonSuccessMetrics.failedJobs...)

Check failure on line 3152 in processor/processor.go

View workflow job for this annotation

GitHub Actions / Integration (oss)

cannot use nonSuccessMetrics.failedJobs (variable of type []*jobsdb.JobT) as []procErrorJob value in argument to append

Check failure on line 3152 in processor/processor.go

View workflow job for this annotation

GitHub Actions / Integration (enterprise)

cannot use nonSuccessMetrics.failedJobs (variable of type []*jobsdb.JobT) as []procErrorJob value in argument to append

Check failure on line 3152 in processor/processor.go

View workflow job for this annotation

GitHub Actions / lint

cannot use nonSuccessMetrics.failedJobs (variable of type []*jobsdb.JobT) as []procErrorJob value in argument to append

Check failure on line 3152 in processor/processor.go

View workflow job for this annotation

GitHub Actions / Unit

cannot use nonSuccessMetrics.failedJobs (variable of type []*jobsdb.JobT) as []procErrorJob value in argument to append
userTransformationStat.numOutputSuccessEvents.Count(len(eventsToTransform))
userTransformationStat.numOutputFailedEvents.Count(len(nonSuccessMetrics.failedJobs))
userTransformationStat.numOutputFilteredEvents.Count(len(nonSuccessMetrics.filteredJobs))
Expand Down Expand Up @@ -3187,7 +3194,7 @@
eventsToTransform: eventsToTransform,
commonMetaData: commonMetaData,
reportMetrics: reportMetrics,
procErrorJobsByDestID: procErrorJobsByDestID,

Check failure on line 3197 in processor/processor.go

View workflow job for this annotation

GitHub Actions / Integration (oss)

cannot use procErrorJobsByDestID (variable of type map[string][]procErrorJob) as map[string][]*jobsdb.JobT value in struct literal

Check failure on line 3197 in processor/processor.go

View workflow job for this annotation

GitHub Actions / Integration (enterprise)

cannot use procErrorJobsByDestID (variable of type map[string][]procErrorJob) as map[string][]*jobsdb.JobT value in struct literal

Check failure on line 3197 in processor/processor.go

View workflow job for this annotation

GitHub Actions / lint

cannot use procErrorJobsByDestID (variable of type map[string][]procErrorJob) as map[string][]*jobsdb.JobT value in struct literal

Check failure on line 3197 in processor/processor.go

View workflow job for this annotation

GitHub Actions / Unit

cannot use procErrorJobsByDestID (variable of type map[string][]procErrorJob) as map[string][]*jobsdb.JobT value in struct literal
droppedJobs: droppedJobs,
eventsByMessageID: eventsByMessageID,
srcAndDestKey: srcAndDestKey,
Expand Down Expand Up @@ -3231,9 +3238,9 @@
nonSuccessMetrics := proc.getNonSuccessfulMetrics(response, eventList, commonMetaData, eventsByMessageID, inPU, reportingtypes.EVENT_FILTER)
droppedJobs = append(droppedJobs, append(proc.getDroppedJobs(response, eventsToTransform), append(nonSuccessMetrics.failedJobs, nonSuccessMetrics.filteredJobs...)...)...)
if _, ok := procErrorJobsByDestID[destID]; !ok {
procErrorJobsByDestID[destID] = make([]*jobsdb.JobT, 0)
procErrorJobsByDestID[destID] = make([]procErrorJob, 0)
}
procErrorJobsByDestID[destID] = append(procErrorJobsByDestID[destID], nonSuccessMetrics.failedJobs...)

Check failure on line 3243 in processor/processor.go

View workflow job for this annotation

GitHub Actions / Integration (oss)

cannot use nonSuccessMetrics.failedJobs (variable of type []*jobsdb.JobT) as []procErrorJob value in argument to append

Check failure on line 3243 in processor/processor.go

View workflow job for this annotation

GitHub Actions / Integration (enterprise)

cannot use nonSuccessMetrics.failedJobs (variable of type []*jobsdb.JobT) as []procErrorJob value in argument to append

Check failure on line 3243 in processor/processor.go

View workflow job for this annotation

GitHub Actions / lint

cannot use nonSuccessMetrics.failedJobs (variable of type []*jobsdb.JobT) as []procErrorJob value in argument to append

Check failure on line 3243 in processor/processor.go

View workflow job for this annotation

GitHub Actions / Unit

cannot use nonSuccessMetrics.failedJobs (variable of type []*jobsdb.JobT) as []procErrorJob value in argument to append
eventsToTransform, successMetrics, successCountMap, successCountMetadataMap = proc.getTransformerEvents(response, commonMetaData, eventsByMessageID, destination, connection, inPU, reportingtypes.EVENT_FILTER)
proc.logger.Debugn("Supported messages filtering output size", logger.NewIntField("eventCount", int64(len(eventsToTransform))))

Expand All @@ -3260,7 +3267,7 @@
eventsToTransform: eventsToTransform,
commonMetaData: commonMetaData,
reportMetrics: reportMetrics,
procErrorJobsByDestID: procErrorJobsByDestID,

Check failure on line 3270 in processor/processor.go

View workflow job for this annotation

GitHub Actions / Integration (oss)

cannot use procErrorJobsByDestID (variable of type map[string][]procErrorJob) as map[string][]*jobsdb.JobT value in struct literal

Check failure on line 3270 in processor/processor.go

View workflow job for this annotation

GitHub Actions / Integration (enterprise)

cannot use procErrorJobsByDestID (variable of type map[string][]procErrorJob) as map[string][]*jobsdb.JobT value in struct literal

Check failure on line 3270 in processor/processor.go

View workflow job for this annotation

GitHub Actions / lint

cannot use procErrorJobsByDestID (variable of type map[string][]procErrorJob) as map[string][]*jobsdb.JobT value in struct literal

Check failure on line 3270 in processor/processor.go

View workflow job for this annotation

GitHub Actions / Unit

cannot use procErrorJobsByDestID (variable of type map[string][]procErrorJob) as map[string][]*jobsdb.JobT value in struct literal
droppedJobs: droppedJobs,
eventsByMessageID: eventsByMessageID,
srcAndDestKey: srcAndDestKey,
Expand Down Expand Up @@ -3325,7 +3332,7 @@
data.droppedJobs = append(data.droppedJobs, append(proc.getDroppedJobs(response, data.eventsToTransform), append(nonSuccessMetrics.failedJobs, nonSuccessMetrics.filteredJobs...)...)...)

if _, ok := data.procErrorJobsByDestID[destID]; !ok {
data.procErrorJobsByDestID[destID] = make([]*jobsdb.JobT, 0)
data.procErrorJobsByDestID[destID] = make([]procErrorJob, 0)

Check failure on line 3335 in processor/processor.go

View workflow job for this annotation

GitHub Actions / Integration (oss)

cannot use make([]procErrorJob, 0) (value of type []procErrorJob) as []*jobsdb.JobT value in assignment

Check failure on line 3335 in processor/processor.go

View workflow job for this annotation

GitHub Actions / Integration (enterprise)

cannot use make([]procErrorJob, 0) (value of type []procErrorJob) as []*jobsdb.JobT value in assignment

Check failure on line 3335 in processor/processor.go

View workflow job for this annotation

GitHub Actions / lint

cannot use make([]procErrorJob, 0) (value of type []procErrorJob) as []*jobsdb.JobT value in assignment

Check failure on line 3335 in processor/processor.go

View workflow job for this annotation

GitHub Actions / Unit

cannot use make([]procErrorJob, 0) (value of type []procErrorJob) as []*jobsdb.JobT value in assignment
}
data.procErrorJobsByDestID[destID] = append(data.procErrorJobsByDestID[destID], nonSuccessMetrics.failedJobs...)

Expand Down
Loading