From 1bf4400396960a66e7dc96b969d47bbd498da094 Mon Sep 17 00:00:00 2001 From: Nikolaus Schuetz Date: Sat, 27 Jun 2026 09:11:51 -0700 Subject: [PATCH] Initialize versioned Dag bundle for callbacks when bundle_version is None When bundle versioning is disabled, DagRun.produce_dag_callback() sets bundle_version to None on the callback request. Callback queueing then skipped bundle.initialize() for versioning-capable bundles, so the queued DagFileInfo was built from an unresolved bundle path. Initialize the bundle whenever it supports versioning, letting it resolve its current effective version, so callbacks always queue a usable path. --- airflow-core/newsfragments/69087.bugfix.rst | 1 + .../src/airflow/dag_processing/manager.py | 10 +++--- .../tests/unit/dag_processing/test_manager.py | 34 +++++++++++++++++-- 3 files changed, 39 insertions(+), 6 deletions(-) create mode 100644 airflow-core/newsfragments/69087.bugfix.rst diff --git a/airflow-core/newsfragments/69087.bugfix.rst b/airflow-core/newsfragments/69087.bugfix.rst new file mode 100644 index 0000000000000..9fd89cb6882c0 --- /dev/null +++ b/airflow-core/newsfragments/69087.bugfix.rst @@ -0,0 +1 @@ +Initialize versioned Dag bundles for callback queueing when bundle_version is None (e.g. with disable_bundle_versioning=True), so callbacks resolve a usable bundle path instead of an uninitialized one. diff --git a/airflow-core/src/airflow/dag_processing/manager.py b/airflow-core/src/airflow/dag_processing/manager.py index b2428985a6506..ce5dace5fde5d 100644 --- a/airflow-core/src/airflow/dag_processing/manager.py +++ b/airflow-core/src/airflow/dag_processing/manager.py @@ -701,22 +701,24 @@ def prepare_callback_bundle(self, request: CallbackRequest) -> BaseDagBundle | N Return the bundle to run the callback against, or ``None`` to skip the callback. Default implementation looks the bundle up via :class:`DagBundlesManager` and, for - versioned requests on bundles that support versioning, calls ``bundle.initialize()``. - Override to source the bundle from an API. + bundles that support versioning, calls ``bundle.initialize()`` so ``bundle.path`` is + resolved before the caller reads it. This happens even when ``request.bundle_version`` + is ``None`` (e.g. ``disable_bundle_versioning=True``), in which case the bundle resolves + its current effective version. Override to source the bundle from an API. """ try: bundle = DagBundlesManager().get_bundle(name=request.bundle_name, version=request.bundle_version) except ValueError: self.log.error("Bundle %s no longer configured, skipping callback", request.bundle_name) return None - if bundle.supports_versioning and request.bundle_version: + if bundle.supports_versioning: try: bundle.initialize() except Exception: self.log.exception( "Error initializing bundle %s version %s for callback, skipping", request.bundle_name, - request.bundle_version, + request.bundle_version if request.bundle_version is not None else "latest", ) return None return bundle diff --git a/airflow-core/tests/unit/dag_processing/test_manager.py b/airflow-core/tests/unit/dag_processing/test_manager.py index 4b55b6b2f1ee9..b5ace87c04b5e 100644 --- a/airflow-core/tests/unit/dag_processing/test_manager.py +++ b/airflow-core/tests/unit/dag_processing/test_manager.py @@ -1967,7 +1967,9 @@ def test_prepare_callback_bundle_initializes_versioned_bundle(self, mock_bundle_ bundle.initialize.assert_called_once() @mock.patch("airflow.dag_processing.manager.DagBundlesManager") - def test_prepare_callback_bundle_skips_initialize_for_unversioned_request(self, mock_bundle_manager): + def test_prepare_callback_bundle_initializes_versioned_bundle_when_version_is_none( + self, mock_bundle_manager + ): manager = DagFileProcessorManager(max_runs=1) bundle = MagicMock(spec=BaseDagBundle) bundle.supports_versioning = True @@ -1984,7 +1986,7 @@ def test_prepare_callback_bundle_skips_initialize_for_unversioned_request(self, ) assert manager.prepare_callback_bundle(request) is bundle - bundle.initialize.assert_not_called() + bundle.initialize.assert_called_once() @mock.patch("airflow.dag_processing.manager.DagBundlesManager") def test_prepare_callback_bundle_skips_initialize_for_non_versioning_bundle(self, mock_bundle_manager): @@ -2071,6 +2073,34 @@ def test_add_callback_queues_file_info_on_success(self, mock_bundle_manager): assert file_info.bundle_name == "testing" assert requests == [request] + @mock.patch("airflow.dag_processing.manager.DagBundlesManager") + def test_add_callback_queues_resolved_path_when_version_is_none(self, mock_bundle_manager): + manager = DagFileProcessorManager(max_runs=1) + initialized_path = Path("/tmp/bundle-initialized") + bundle = MagicMock(spec=BaseDagBundle) + bundle.supports_versioning = True + # A versioned bundle's path is only usable after initialize(); simulate that by + # leaving path unresolved until initialize() materializes it. + bundle.path = Path("/dev/null") + bundle.initialize.side_effect = lambda: setattr(bundle, "path", initialized_path) + mock_bundle_manager.return_value.get_bundle.return_value = bundle + + request = DagCallbackRequest( + filepath="file1.py", + dag_id="dag1", + run_id="run1", + is_failure_callback=False, + bundle_name="testing", + bundle_version=None, + msg=None, + ) + + manager._add_callback_to_queue(request) + + bundle.initialize.assert_called_once() + [(file_info, _)] = manager._callback_to_execute.items() + assert file_info.bundle_path == initialized_path + @mock.patch("airflow.dag_processing.manager.DagBundlesManager") def test_add_callback_skips_when_bundle_init_fails(self, mock_bundle_manager): manager = DagFileProcessorManager(max_runs=1)