From 8a522d111fa0b14a8134582f2a6368129d69fc49 Mon Sep 17 00:00:00 2001 From: Nikolaus Schuetz Date: Sat, 27 Jun 2026 21:55:22 -0700 Subject: [PATCH] Make schema_fields templated in GCSToBigQueryOperator schema_fields was not a template field, so a templated value (a Variable or XComArg resolving to a schema) passed to GCSToBigQueryOperator was never rendered and reached BigQuery unresolved. Mark it templated, matching how BigQueryUpdateTableSchemaOperator already templates schema_fields_updates. --- .../google/cloud/transfers/gcs_to_bigquery.py | 2 + .../cloud/transfers/test_gcs_to_bigquery.py | 38 +++++++++++++++++++ 2 files changed, 40 insertions(+) diff --git a/providers/google/src/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py b/providers/google/src/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py index e57fc20110f83..68a137d7d053b 100644 --- a/providers/google/src/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py +++ b/providers/google/src/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py @@ -196,6 +196,7 @@ class GCSToBigQueryOperator(BaseOperator): template_fields: Sequence[str] = ( "bucket", "source_objects", + "schema_fields", "schema_object", "schema_object_bucket", "destination_project_dataset_table", @@ -203,6 +204,7 @@ class GCSToBigQueryOperator(BaseOperator): "src_fmt_configs", "extra_config", ) + template_fields_renderers = {"schema_fields": "json"} template_ext: Sequence[str] = (".sql",) ui_color = "#f0eee4" operator_extra_links = (BigQueryTableLink(),) diff --git a/providers/google/tests/unit/google/cloud/transfers/test_gcs_to_bigquery.py b/providers/google/tests/unit/google/cloud/transfers/test_gcs_to_bigquery.py index be2f5ed52e5e5..09bef1b433f3d 100644 --- a/providers/google/tests/unit/google/cloud/transfers/test_gcs_to_bigquery.py +++ b/providers/google/tests/unit/google/cloud/transfers/test_gcs_to_bigquery.py @@ -19,6 +19,7 @@ import functools import json +from datetime import datetime from unittest import mock from unittest.mock import MagicMock, call @@ -27,6 +28,7 @@ from google.cloud.exceptions import Conflict from sqlalchemy import select +from airflow import DAG from airflow.exceptions import AirflowProviderDeprecationWarning from airflow.models.trigger import Trigger from airflow.providers.common.compat.openlineage.facet import ( @@ -2093,6 +2095,42 @@ def test_src_fmt_configs_and_extra_config_both_applied_with_precedence(self, hoo assert config["load"]["skipLeadingRows"] == 5 assert config["load"]["columnNameCharacterMap"] == "STRICT" + def test_schema_fields_is_templated(self): + """Regression test for #31481. + + ``schema_fields`` must be a template field. The issue passed a + ``MappedArgument`` (from ``.expand()``); such values resolve at render time + via the Resolvable ``resolve()`` protocol rather than Jinja, so an ordinary + DAG (no ``render_template_as_native_obj``) suffices. Before the fix the field + was skipped and the unresolved argument reached BigQuery verbatim. + """ + assert "schema_fields" in GCSToBigQueryOperator.template_fields + assert GCSToBigQueryOperator.template_fields_renderers["schema_fields"] == "json" + + class _SchemaArg: + """Stand-in for a MappedArgument/XComArg resolving to a schema.""" + + def resolve(self, context): + return SCHEMA_FIELDS + + with DAG( + dag_id="test_gcs_to_bq_schema_fields_templating", + schedule=None, + start_date=datetime(2024, 1, 1), + ) as dag: + operator = GCSToBigQueryOperator( + task_id=TASK_ID, + bucket=TEST_BUCKET, + source_objects=TEST_SOURCE_OBJECTS, + destination_project_dataset_table=TEST_EXPLICIT_DEST, + schema_fields=_SchemaArg(), + dag=dag, + ) + + operator.render_template_fields({}) + + assert operator.schema_fields == SCHEMA_FIELDS + @pytest.fixture def create_task_instance(create_task_instance_of_operator, session):