fix(connectors): Add unwrap_envelope transform and envelope detection to fix source-sink format mismatch#3197
Conversation
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #3197 +/- ##
=============================================
- Coverage 73.78% 56.58% -17.20%
Complexity 943 943
=============================================
Files 1200 1201 +1
Lines 109094 97051 -12043
Branches 85994 73951 -12043
=============================================
- Hits 80492 54916 -25576
- Misses 25866 39544 +13678
+ Partials 2736 2591 -145
🚀 New features to boost your workflow:
|
|
This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 7 days if no further activity occurs. If you need a review, please ensure CI is green and the PR is rebased on the latest master. Don't hesitate to ping the maintainers - either @core on Discord or by mentioning them directly here on the PR. Thank you for your contribution! |
|
@atharvalade please rebase this PR /author |
8defa26 to
7c92361
Compare
done |
|
/ready |
hubcio
left a comment
There was a problem hiding this comment.
review notes (not posted as line comments because targets are out-of-diff or already tracked):
-
the FFI
(consume)return code is still discarded atcore/connectors/runtime/src/sink.rs:642-650(ConsumeCallbackreturnsi32,sdk/src/sink.rs:28-36) andAutoCommit::When(AutoCommitWhen::PollingMessages)atcore/connectors/runtime/src/sink.rs:467commits offsets before the sink runs. this PR widens the blast radius: any envelope-detected batch nowreturn Err(Error::InvalidRecord)from the sink, gets logged, and the offsets have already advanced. tracked in #2927 and #2928 but this PR adds a brand new fail path triggerable purely by source/sink misconfiguration, so they should land together. -
core/connectors/sinks/delta_sink/src/sink.rs:106has the same envelope-vs-flat-JSON mismatch and no detection or unwrap guidance. fixing one sink and leaving the other broken means #3174 is still reachable through the postgres → delta path. either replicate the detection (with the tighter shape suggested below) or, better, documentunwrap_envelopeas the single fix and drop sink-side sniffing entirely. -
neither
core/connectors/sdk/README.mdnorcore/connectors/sinks/iceberg_sink/README.mdmention the newunwrap_envelopetransform or the source-compatibility requirement forpostgres_source→ iceberg. discoverability is the whole point of this fix; please add at least a short entry to both. -
architecturally, sink-side envelope sniffing is the wrong layer. transform discipline + docs is the real fix. if sink-side detection stays, it should be opt-in via config, not a hardcoded postgres-shaped heuristic.
overall, i'm not fan of this PR.
| return false; | ||
| }; | ||
| obj.contains_key("table_name") && obj.contains_key("data") | ||
| } |
There was a problem hiding this comment.
this heuristic checks 2 of the 5 keys on DatabaseRecord (table_name, operation_type, timestamp, data, old_data - see core/connectors/sources/postgres_source/src/lib.rs:110-116).
false positives: any legit iceberg table modeling audit logs, catalogs, or CDC metadata that happens to have table_name + data columns gets the whole batch rejected.
false negatives: Debezium / Kafka-Connect envelopes use before / after / op / source / payload and slip right through into JsonArrowReader, which then writes nulls - the exact bug #3174 is supposed to fix.
also it bakes a postgres-source shape into a generic iceberg sink. options: tighten to the full 4-or-5 key shape, or move detection behind an opt-in detect_envelope config flag, or drop sink-side sniffing entirely and rely on unwrap_envelope + docs.
| .collect(); | ||
|
|
||
| if let Some(first) = msgs.first() | ||
| && looks_like_envelope(first) |
There was a problem hiding this comment.
the sniff only looks at msgs.first(). mixed batch where the first message is flat and the rest are envelopes (or vice versa) skips detection entirely - envelope rows then hit JsonArrowReader and write nulls, which is the silent-corruption mode #3174 was opened for. envelope-first drops the whole batch including any valid flat rows.
either scan all messages, or document and enforce a batch-homogeneity invariant somewhere upstream.
| 'unwrap_envelope' transform with field = \"data\" to your \ | ||
| connector config to extract the inner payload." | ||
| ); | ||
| return Err(Error::InvalidRecord); |
There was a problem hiding this comment.
Error::InvalidRecord is a unit variant; the actionable hint ("add an unwrap_envelope transform with field = data") lives only in the error! log and is lost the moment the caller receives the error. Error::InvalidRecordValue(String) already exists (sdk/src/lib.rs:389) and is used in delta_sink and influxdb_source - switch to it and carry the hint in the variant.
this is also a concrete instance of #3176 (overloaded InvalidRecord).
| (detected 'table_name' + 'data' fields). The Iceberg sink \ | ||
| expects flat JSON matching the target table schema. Add an \ | ||
| 'unwrap_envelope' transform with field = \"data\" to your \ | ||
| connector config to extract the inner payload." |
There was a problem hiding this comment.
two issues with this error message:
-
it hardcodes
field = "data", which only matches the postgres envelope shape. if/when detection broadens to Debezium (after) or other shapes, the suggestion is wrong. -
no connector id / plugin id in the log line, so in a multi-tenant deployment with several iceberg sinks you cannot tell which one rejected the batch.
| warn!( | ||
| "unwrap_envelope: field '{}' not found in payload, passing through unchanged", | ||
| self.field | ||
| ); |
There was a problem hiding this comment.
this is a per-message warn! on a hot path. with the default batch_length = 1000, a single misconfigured connector floods stdout with 1000 identical warnings per poll cycle, masking real errors.
fix options: downgrade to debug!, rate-limit (warn-once per (stream, topic, config)), or count occurrences and emit a single summary per batch.
| /// entire payload with the contents of `data`. | ||
| #[derive(Debug, Serialize, Deserialize)] | ||
| pub struct UnwrapEnvelopeConfig { | ||
| pub field: String, |
There was a problem hiding this comment.
UnwrapEnvelopeConfig.field: String has no validation. an empty string deserializes fine through from_config at transforms/mod.rs:137-141, then every message hits the missing-field branch and warn-spams at message rate (compounds the log flood on the json side).
reject empty field either via a custom Deserialize impl, or inline in UnwrapEnvelope::new, or in the from_config arm.
| ) -> Result<Option<DecodedMessage>, Error> { | ||
| let Payload::Json(OwnedValue::Object(ref mut map)) = message.payload else { | ||
| return Ok(Some(message)); | ||
| }; |
There was a problem hiding this comment.
non-Object JSON (Array, scalar, null) silently passes through with no metric and no log. this is an operational blindspot - if a misconfigured source starts emitting arrays where the sink expects an unwrapped object, there's nothing in the logs that says the transform was a no-op.
at minimum a debug! with the actual payload variant, or a counter, so operators can see when the transform isn't doing anything.
| } | ||
| TransformType::UnwrapEnvelope => { | ||
| let cfg: UnwrapEnvelopeConfig = | ||
| serde_json::from_value(raw.clone()).map_err(|_| Error::InvalidConfig)?; |
There was a problem hiding this comment.
.map_err(|_| Error::InvalidConfig) throws away the serde error detail. for a user with a typo in field or a wrong type, all they see is InvalidConfig - no line number, no field name, no expected-vs-got. this is a pre-existing pattern in every arm of from_config, but this PR adds another instance.
worth either fixing all arms in a follow-up, or at least propagating the serde error through Error::InvalidConfigDetail(String) or similar.
Which issue does this PR close?
Closes #3174
Rationale
Sources (e.g. Postgres) wrap row data in a
DatabaseRecordenvelope while sinks (e.g. Iceberg) expect flat JSON matching the target table schema — no shared contract exists, producing silent null failures.What changed?
The Postgres source emits
{table_name, operation_type, timestamp, data: {...}, old_data}envelopes, but the Iceberg sink's Arrow JSON reader maps these nested structures to top-level fields as null, silently violating non-nullable constraints.This adds a reusable
unwrap_envelopetransform to the connector SDK that extracts a nested field (e.g.data) and promotes it as the top-level payload, plus explicit envelope detection in the Iceberg sink that errors with an actionable message instead of failing silently.Local Execution
AI Usage