Skip to content

airbyte-to-flow: support STREAM state messages#359

Open
Alex-Bair wants to merge 4 commits into
masterfrom
bair/airbyte-to-flow-support-stream-state-messages
Open

airbyte-to-flow: support STREAM state messages#359
Alex-Bair wants to merge 4 commits into
masterfrom
bair/airbyte-to-flow-support-stream-state-messages

Conversation

@Alex-Bair

@Alex-Bair Alex-Bair commented Jun 29, 2026

Copy link
Copy Markdown
Member

Problem

Active source-amazon-ads captures have been failing with:

  SchemaValidationError: Schema validation failed:
  - "sponsored_brands_report_stream" is not of type "object" (instance_path='')

This started after the connector was bumped from 3.0.0 → 7.3.1 in #356.

Root cause

Airbyte deprecated the LEGACY opaque-data state format (Platform v0.62.4 / CDK ≥ 1.3) in favor of per-stream STREAM (and GLOBAL) state. The 3.0.0 → 7.3.1 bump in source-amazon-ads crossed that boundary, but airbyte-to-flow only understood the old form, so it broke in both directions:

  • Inbound: ATF wrote Flow's persisted {stream: state} object verbatim to the --state file. The modern CDK's read_state expects a list of AirbyteStateMessages, so it iterated the object's keys (stream-name strings) and rejected them as not objects - the error above.
  • Outbound: STREAM messages carry no top-level data, so they failed to deserialize and were silently dropped, meaning state never advanced even once the inbound crash is fixed.

Solution

Flow still persists state as a {stream: state} object. airbyte-to-flow now translates between that and whatever the connector speaks:

  • State: data is now optional, plus a stream variant (and type/global captured for diagnostics).
  • Outbound (state_to_checkpoint): forward LEGACY data verbatim; persist STREAM state as {stream: state} via RFC 7396 merge patch. Log-and-skip anything else (e.g. GLOBAL) so the capture continues rather than erroring or dropping silently.
  • Inbound (convert_state_for_connector): convert the persisted object into a STREAM list gated on AIRBYTE_TO_FLOW_STATE_FORMAT=per_stream in each connector's Dockerfile. Connectors without it get verbatim state, so existing LEGACY connectors are unchanged.

References

Airbyte's documentation on the different state message types.
Slack thread containing details about the escalation motivating these changes.

Connectors built on modern Airbyte CDKs emit and expect per-stream `STREAM`
state, whereas older connectors used the `LEGACY` opaque `data` object. Airbyte
deprecated the `LEGACY` format (Platform v0.62.4 / CDK >= 1.3), so bumping a
connector across that boundary changes its state contract. `airbyte-to-flow` only
understood the `LEGACY` form, which broke both directions:

  - Inbound: ATF wrote Flow's persisted `{stream: state}` object verbatim to the
    `--state` file. A modern CDK's `read_state` expects a list of
    `AirbyteStateMessage`s, so it iterated the object's keys and failed with
    `"<stream name>" is not of type "object"`, breaking active captures.
  - Outbound: `STREAM` state messages carry no top-level `data`, so they failed to
    deserialize and were silently dropped, meaning state never advanced.

This commit fixes that by teaching ATF to translate between Flow's
`{stream: state}` object and Airbyte's new per-stream form, in both directions:

  - State: make `data` optional and add `stream` (`STREAM` payload). Also capture
    `type`/`global` as plain fields for diagnostics.
  - Outbound (`state_to_checkpoint`): forward a `LEGACY` `data` blob verbatim with
    its merge flag; persist `STREAM` state as `{stream: state}` reduced via RFC
    7396 merge patch so streams accumulate. Log-and-skip anything else (e.g.
    `GLOBAL`) so the capture continues instead of erroring or dropping silently.
  - Inbound (`convert_state_for_connector`): convert the persisted object into a
    per-stream `STREAM` list.

Gate the inbound conversion on the `AIRBYTE_TO_FLOW_STATE_FORMAT=per_stream` env
var. Without it, state is written verbatim so `LEGACY`-state connectors
are unchanged.

Includes unit tests for both conversion directions and the round-trip, plus
`README`/comment updates describing the `LEGACY`/`STREAM`/`GLOBAL` state types.
`source-amazon-ads` 7.3.1 is built on a modern Airbyte CDK that reads `--state`
as a list of per-stream AirbyteStateMessages rather than the legacy state
object. Set `AIRBYTE_TO_FLOW_STATE_FORMAT=per_stream` so `airbyte-to-flow` converts
the persisted state object into that form. Without it the connector's `read_state`
fails with `"<stream name>" is not of type "object"` and active captures break.
@Alex-Bair Alex-Bair requested a review from a team June 29, 2026 18:57

@nicolaslazo nicolaslazo left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, thanks for covering this

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants