Skip to content

Deferrable BeamRunPythonPipelineOperator (DataflowRunner) fails with "400 Request must contain a job and project id" when launcher stdout has no job-id line #68279

Description

@evgeniy-b

Under which category would you file this issue?

Providers

Apache Airflow version

3.1.7

What happened and how to reproduce it?

Issue description

A BeamRunPythonPipelineOperator (the Java/Go variants share the code path) with deferrable=True and runner="DataflowRunner" fails with:

airflow.exceptions.AirflowException: 400 Request must contain a job and project id.

whenever the Beam launcher's stdout does not contain a line matching JOB_ID_PATTERN — even though the Dataflow job itself launches and runs to completion. The identical task with deferrable=False succeeds against the same job. So enabling deferrable mode turns a passing task into a failing one, with no other change.

Root cause

The operator only learns its dataflow_job_id by scanning the launcher subprocess's stdout with this regex (providers/google/src/airflow/providers/google/cloud/hooks/dataflow.py):

JOB_ID_PATTERN = re.compile(
    r"Submitted job: (?P<job_id_java>[^\"\n\s]*)|Created job with id: \[(?P<job_id_python>[^\"\n\s]*)\]"
)

If that line never appears, on_new_job_id_callback never fires and self.dataflow_job_id stays None. There is a second consequence that matters: the launcher's stdout-reading loop (run_beam_command / process_fd in providers/apache/beam/.../hooks/beam.py) only short-circuits when is_dataflow_job_id_exist_callback() returns truthy. With no job id ever captured that callback stays False, so the loop never returns early — it blocks reading stdout until the subprocess exits, i.e. until the Dataflow job itself finishes. In the normal case the scan returns as soon as the id line is seen and the remaining wait is delegated to the Dataflow API; here the entire job runs inline before either wait mode is reached.

Why the line is commonly missing. The Beam SDK logs Created job with id: [...] from apache_beam.runners.dataflow.internal.apiclient at INFO level. Python's root logger defaults to WARNING, so a pipeline only emits that line if its own script opts into INFO logging — which the canonical Beam wordcount example does explicitly in its __main__:

logging.getLogger().setLevel(logging.INFO)

Any DataflowRunner pipeline whose entrypoint does not raise the root level to INFO (a common omission) suppresses the job-id line by default, with no code change on the Airflow side and no unusual setup. Note that the INFO lines that do show up in the Airflow task log (Beam version: ..., Running command: ..., Start waiting ...) come from Airflow's own self.log.info(...) in the worker process — not from the launcher subprocess — which is why they appear while the subprocess's job-id line does not. A newer/different Beam SDK that rewords or relevels the line is a second, independent way to miss the regex.

By the time the scan returns, the job has already finished. The two wait modes then diverge only in how they handle the (now-terminal) job with a None id:

  • deferrable=False — the operator calls DataflowHook.wait_for_done(job_id=None, job_name=...). With no id, _DataflowJobsController._get_current_jobs resolves the job by name prefix (_fetch_jobs_by_prefix_name); the job is already JOB_STATE_DONE, so wait_for_done returns after a single lookup (no real polling — there is no Waiting for done. Sleep iteration). The task succeeds.
  • deferrable=True — the operator defers with job_id=self.dataflow_job_id (i.e. None). The DataflowJobStateCompleteTrigger (or DataflowJobStatusTrigger on older google providers) then calls get_job_status(job_id=None); the Dataflow API rejects it with 400 Request must contain a job and project id.; the trigger yields an error TriggerEvent; and execute_complete re-raises it as AirflowException. The task fails.

The deferrable path has no by-name fallback for a missing job id; the sync path does. (Note the deferrable path is also strictly worse here: the job has already completed before the task defers, so deferring buys nothing and then fails on resume.)

Relevant code (paths on main):

  • JOB_ID_PATTERN + process_line_and_extract_dataflow_job_id_callbackproviders/google/src/airflow/providers/google/cloud/hooks/dataflow.py
  • launcher loop that only records the id on a regex match (process_fd / run_beam_command) — providers/apache/beam/src/airflow/providers/apache/beam/hooks/beam.py
  • defer(... job_id=self.dataflow_job_id ...) in execute_on_dataflowproviders/apache/beam/src/airflow/providers/apache/beam/operators/beam.py
  • execute_complete re-raising the error event — providers/apache/beam/.../operators/beam.py (raise AirflowException(event["message"]))
  • sync fallback to name lookup — DataflowHook.wait_for_done
  • trigger calling the API with job_id=Noneproviders/google/src/airflow/providers/google/cloud/triggers/dataflow.py

Steps to reproduce

Minimal, deterministic reproduction with a real Dataflow job (no mocking of Airflow internals). Verified on Airflow 3.1.7 / apache-beam 2.71.0 / apache-airflow-providers-google 20.0.0 / apache-airflow-providers-apache-beam 6.2.3.

  1. A no-op Beam pipeline that runs on Dataflow and does no logging configuration — it leaves the root logger at its default WARNING, so the Created job with id: [...] INFO line never reaches stdout. The job still submits and runs. (This mirrors any pipeline that doesn't opt into INFO logging.)

    repro_missing_jobid.py (staged in GCS)
    # No logging config on purpose: the SDK's "Created job with id: [...]" line
    # is INFO, the root logger defaults to WARNING, so the line is filtered out.
    import apache_beam as beam
    from apache_beam.options.pipeline_options import PipelineOptions
    
    with beam.Pipeline(options=PipelineOptions()) as p:
        p | "Create" >> beam.Create([1, 2, 3]) | "Noop" >> beam.Map(lambda x: x)
  2. A DAG with two tasks running that same pipeline on DataflowRunner, differing only in deferrable:

    from datetime import datetime
    
    from airflow.models import DAG
    from airflow.providers.apache.beam.operators.beam import BeamRunPythonPipelineOperator
    from airflow.providers.google.cloud.operators.dataflow import DataflowConfiguration
    
    PROJECT = "<your-project>"
    REGION = "<your-region>"  # e.g. europe-west1
    
    def beam_task(task_id, deferrable):
        return BeamRunPythonPipelineOperator(
            task_id=task_id,
            runner="DataflowRunner",
            py_file="gs://<bucket>/repro_missing_jobid.py",
            pipeline_options={
                "project": PROJECT,
                "region": REGION,
                "temp_location": "gs://<bucket>/tmp/",
            },
            dataflow_config=DataflowConfiguration(project_id=PROJECT, location=REGION),
            deferrable=deferrable,
        )
    
    with DAG("repro_beam_missing_jobid", start_date=datetime(2026, 1, 1),
             schedule=None, catchup=False):
        beam_task("run_sync", deferrable=False)
        beam_task("run_deferrable", deferrable=True)
  3. Trigger the DAG (a triggerer must be running for the deferrable task to resume).

Result:

  • run_syncsuccess.
  • run_deferrablefailed with 400 Request must contain a job and project id.

What you think should happen instead?

At minimum, a deferrable Beam/Dataflow task should not fail when the launcher stdout lacks the job-id line: the synchronous path already handles that exact condition (resolving the finished job by name) and the Dataflow job itself runs fine. Flipping deferrable from False to True should never turn a passing task into a failing one.

But the deeper issue is that the whole job-id detection mechanism is fragile, and it makes a true async path impossible whenever the id is absent from stdout:

  • Job-id discovery depends on scraping a specific SDK log line. JOB_ID_PATTERN matches the exact strings Created job with id: [...] (Python) / Submitted job: ... (Java). That line is emitted at INFO, so it only appears when the pipeline opts the root logger into INFO; it also breaks on any SDK rewording/relevelling or log reformatting. There is no API-based fallback to discover the id of the job that was just launched — so the id is silently lost through a perfectly ordinary configuration.

  • Without the id, there is no real async execution at all. The launcher's stdout loop only short-circuits once the id is seen; with no id it blocks until the subprocess exits — i.e. it runs the entire Dataflow job synchronously on the worker. By the time the operator reaches the deferrable branch the job has already finished, so deferring buys nothing (no worker slot is freed during the job) and then fails on resume because it defers with job_id=None. So the deferrable feature is not merely buggy in this case — it is structurally unable to defer until after the job id is captured, which is exactly what's missing.

A robust fix would decouple job-id resolution from stdout scraping (e.g. resolve the just-launched job's id via the Dataflow API by name before deferring, as the sync path effectively does), so that the deferrable path can defer immediately and the operator no longer depends on a fragile log-line match.

Operating System

No response

Deployment

Google Managed Service for Apache Airflow

Apache Airflow Provider(s)

google

Versions of Apache Airflow Providers

apache-airflow-providers-apache-beam==6.2.3
apache-airflow-providers-google==20.0.0

Official Helm Chart version

Not Applicable

Kubernetes Version

No response

Helm Chart configuration

No response

Docker Image customizations

No response

Anything else?

  • Occurs every time the launcher stdout omits the job-id line; never when it contains it. Because emitting the line requires the pipeline to opt into INFO logging (the root logger defaults to WARNING), any pipeline that doesn't do so fails deterministically in deferrable mode — it is not flaky. With the default append_job_name=True the job name is unique, so a name-based resolution would yield a single job.
  • Background and proposed fixes discussed on PR #67711.

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions