Skip to content

Fix deferrable Beam Dataflow operators failing with 400 when job ID is missing from stdout#69102

Open
gingeekrishna wants to merge 3 commits into
apache:mainfrom
gingeekrishna:feature/68279-fix-deferrable-beam-dataflow-job-id
Open

Fix deferrable Beam Dataflow operators failing with 400 when job ID is missing from stdout#69102
gingeekrishna wants to merge 3 commits into
apache:mainfrom
gingeekrishna:feature/68279-fix-deferrable-beam-dataflow-job-id

Conversation

@gingeekrishna

@gingeekrishna gingeekrishna commented Jun 28, 2026

Copy link
Copy Markdown

Closes #68279

Problem

When the Dataflow launcher subprocess runs with the default WARNING log level, it does not emit the "Created job with id: [...]" line that the Beam operator parses to capture the Dataflow job ID. This leaves dataflow_job_id = None.

The previous PRs (#67711, #68720) addressed this by adding a fallback after the launcher subprocess finished — but as reviewer @MaksYermak correctly noted, that is not the root cause fix: by the time the launcher exits, the Dataflow job may have already completed, so deferral never gets a chance to free the Airflow worker.

Root Cause Fix

The correct fix is to capture the job ID during the stdout-reading loop, before the launcher finishes, so the operator can truly defer.

Changes

providers/google/.../hooks/dataflow.py

  • Add DataflowHook.fetch_job_id_by_name(prefix_name, location, project_id) — looks up a Dataflow job by name prefix via the API, returning its ID.

providers/apache/beam/.../hooks/beam.py

  • Add import time
  • Add periodic_callback: Callable[[], None] | None = None parameter to run_beam_command(), _start_pipeline(), start_python_pipeline(), and start_java_pipeline()
  • In run_beam_command(): invoke periodic_callback() roughly every 5 seconds while the subprocess is running (using time.monotonic() tracking). After each periodic call, check is_dataflow_job_id_exist_callback() and exit early if the ID has been resolved — before the subprocess finishes.

providers/apache/beam/.../operators/beam.py

  • Add BeamDataflowMixin.__get_dataflow_job_id_poll_callback(): returns a closure that calls DataflowHook.fetch_job_id_by_name() and sets self.dataflow_job_id when a matching job is found; silently retries on transient errors.
  • Update BeamRunPythonPipelineOperator.execute_on_dataflow() and BeamRunJavaPipelineOperator.execute_on_dataflow() to create and pass this callback.

How this fixes the issue

  1. Beam launcher starts (with DataflowRunner) — the Dataflow job is submitted.
  2. Every ~5 s, the periodic callback polls the Dataflow Jobs API by job name prefix.
  3. Once the job appears, dataflow_job_id is set, is_dataflow_job_id_exist_callback() returns True, and the stdout-reading loop exits immediately — before the Dataflow job finishes.
  4. The operator defers successfully, freeing the Airflow worker. The Dataflow job continues running on Google Cloud.

This path is the same whether or not the launcher emits a job-ID line to stdout. If stdout does emit the line, process_line_callback sets dataflow_job_id and the loop exits on the next is_dataflow_job_id_exist_callback() check, as before.

Tests

  • Updated hook-level tests to include periodic_callback=None in run_beam_command mock assertions (all callers that don't pass a periodic_callback).
  • Updated operator-level test_exec_dataflow_runner tests to include periodic_callback=mock.ANY.
  • Added test_exec_dataflow_runner_periodic_callback_fetches_job_id for both BeamRunPythonPipelineOperator and BeamRunJavaPipelineOperator: captures the periodic_callback passed by the operator, calls it directly, and asserts that dataflow_job_id is set by polling fetch_job_id_by_name.

Checklist

  • Root cause fixed (not a post-hoc fallback)
  • Backward compatible: periodic_callback defaults to None; existing callers are unaffected
  • Go operator not touched (it is sync-only, no execute_on_dataflow)
  • Syntax validated with py_compile
  • Newsfragment added: providers/apache/beam/newsfragments/68279.bugfix.rst

… missing from stdout

When the Dataflow launcher process runs with WARNING log level (the default),
it does not emit the "Created job with id" line that the Beam operator parses
to capture the Dataflow job ID. This left dataflow_job_id as None, causing the
deferrable trigger to fail with "400 Request must contain a job and project id".

Fix by adding a periodic_callback parameter to run_beam_command() that is
invoked roughly every 5 seconds while the launcher subprocess is running. The
deferrable Beam operators now pass a callback that polls DataflowHook.fetch_job_id_by_name()
to resolve the job ID by name. Once the ID is set, the stdout-reading loop exits
early so the operator can truly defer, freeing the Airflow worker while the
Dataflow job continues running on Google Cloud.

Fixes apache#68279
@gingeekrishna gingeekrishna requested a review from shahar1 as a code owner June 28, 2026 06:07
Copilot AI review requested due to automatic review settings June 28, 2026 06:07
@boring-cyborg boring-cyborg Bot added area:providers provider:apache-beam provider:google Google (including GCP) related issues labels Jun 28, 2026

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot was unable to review this pull request because the user who requested the review has reached their quota limit.

Comment thread providers/apache/beam/newsfragments/68279.bugfix.rst Outdated
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

3 participants