diff --git a/airflow-core/newsfragments/69087.bugfix.rst b/airflow-core/newsfragments/69087.bugfix.rst new file mode 100644 index 0000000000000..9fd89cb6882c0 --- /dev/null +++ b/airflow-core/newsfragments/69087.bugfix.rst @@ -0,0 +1 @@ +Initialize versioned Dag bundles for callback queueing when bundle_version is None (e.g. with disable_bundle_versioning=True), so callbacks resolve a usable bundle path instead of an uninitialized one. diff --git a/airflow-core/src/airflow/dag_processing/manager.py b/airflow-core/src/airflow/dag_processing/manager.py index b2428985a6506..ce5dace5fde5d 100644 --- a/airflow-core/src/airflow/dag_processing/manager.py +++ b/airflow-core/src/airflow/dag_processing/manager.py @@ -701,22 +701,24 @@ def prepare_callback_bundle(self, request: CallbackRequest) -> BaseDagBundle | N Return the bundle to run the callback against, or ``None`` to skip the callback. Default implementation looks the bundle up via :class:`DagBundlesManager` and, for - versioned requests on bundles that support versioning, calls ``bundle.initialize()``. - Override to source the bundle from an API. + bundles that support versioning, calls ``bundle.initialize()`` so ``bundle.path`` is + resolved before the caller reads it. This happens even when ``request.bundle_version`` + is ``None`` (e.g. ``disable_bundle_versioning=True``), in which case the bundle resolves + its current effective version. Override to source the bundle from an API. """ try: bundle = DagBundlesManager().get_bundle(name=request.bundle_name, version=request.bundle_version) except ValueError: self.log.error("Bundle %s no longer configured, skipping callback", request.bundle_name) return None - if bundle.supports_versioning and request.bundle_version: + if bundle.supports_versioning: try: bundle.initialize() except Exception: self.log.exception( "Error initializing bundle %s version %s for callback, skipping", request.bundle_name, - request.bundle_version, + request.bundle_version if request.bundle_version is not None else "latest", ) return None return bundle diff --git a/airflow-core/tests/unit/dag_processing/test_manager.py b/airflow-core/tests/unit/dag_processing/test_manager.py index 4b55b6b2f1ee9..b5ace87c04b5e 100644 --- a/airflow-core/tests/unit/dag_processing/test_manager.py +++ b/airflow-core/tests/unit/dag_processing/test_manager.py @@ -1967,7 +1967,9 @@ def test_prepare_callback_bundle_initializes_versioned_bundle(self, mock_bundle_ bundle.initialize.assert_called_once() @mock.patch("airflow.dag_processing.manager.DagBundlesManager") - def test_prepare_callback_bundle_skips_initialize_for_unversioned_request(self, mock_bundle_manager): + def test_prepare_callback_bundle_initializes_versioned_bundle_when_version_is_none( + self, mock_bundle_manager + ): manager = DagFileProcessorManager(max_runs=1) bundle = MagicMock(spec=BaseDagBundle) bundle.supports_versioning = True @@ -1984,7 +1986,7 @@ def test_prepare_callback_bundle_skips_initialize_for_unversioned_request(self, ) assert manager.prepare_callback_bundle(request) is bundle - bundle.initialize.assert_not_called() + bundle.initialize.assert_called_once() @mock.patch("airflow.dag_processing.manager.DagBundlesManager") def test_prepare_callback_bundle_skips_initialize_for_non_versioning_bundle(self, mock_bundle_manager): @@ -2071,6 +2073,34 @@ def test_add_callback_queues_file_info_on_success(self, mock_bundle_manager): assert file_info.bundle_name == "testing" assert requests == [request] + @mock.patch("airflow.dag_processing.manager.DagBundlesManager") + def test_add_callback_queues_resolved_path_when_version_is_none(self, mock_bundle_manager): + manager = DagFileProcessorManager(max_runs=1) + initialized_path = Path("/tmp/bundle-initialized") + bundle = MagicMock(spec=BaseDagBundle) + bundle.supports_versioning = True + # A versioned bundle's path is only usable after initialize(); simulate that by + # leaving path unresolved until initialize() materializes it. + bundle.path = Path("/dev/null") + bundle.initialize.side_effect = lambda: setattr(bundle, "path", initialized_path) + mock_bundle_manager.return_value.get_bundle.return_value = bundle + + request = DagCallbackRequest( + filepath="file1.py", + dag_id="dag1", + run_id="run1", + is_failure_callback=False, + bundle_name="testing", + bundle_version=None, + msg=None, + ) + + manager._add_callback_to_queue(request) + + bundle.initialize.assert_called_once() + [(file_info, _)] = manager._callback_to_execute.items() + assert file_info.bundle_path == initialized_path + @mock.patch("airflow.dag_processing.manager.DagBundlesManager") def test_add_callback_skips_when_bundle_init_fails(self, mock_bundle_manager): manager = DagFileProcessorManager(max_runs=1)