Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions airflow-core/newsfragments/69087.bugfix.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Initialize versioned Dag bundles for callback queueing when bundle_version is None (e.g. with disable_bundle_versioning=True), so callbacks resolve a usable bundle path instead of an uninitialized one.
10 changes: 6 additions & 4 deletions airflow-core/src/airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -701,22 +701,24 @@ def prepare_callback_bundle(self, request: CallbackRequest) -> BaseDagBundle | N
Return the bundle to run the callback against, or ``None`` to skip the callback.

Default implementation looks the bundle up via :class:`DagBundlesManager` and, for
versioned requests on bundles that support versioning, calls ``bundle.initialize()``.
Override to source the bundle from an API.
bundles that support versioning, calls ``bundle.initialize()`` so ``bundle.path`` is
resolved before the caller reads it. This happens even when ``request.bundle_version``
is ``None`` (e.g. ``disable_bundle_versioning=True``), in which case the bundle resolves
its current effective version. Override to source the bundle from an API.
"""
try:
bundle = DagBundlesManager().get_bundle(name=request.bundle_name, version=request.bundle_version)
except ValueError:
self.log.error("Bundle %s no longer configured, skipping callback", request.bundle_name)
return None
if bundle.supports_versioning and request.bundle_version:
if bundle.supports_versioning:
try:
bundle.initialize()
except Exception:
self.log.exception(
"Error initializing bundle %s version %s for callback, skipping",
request.bundle_name,
request.bundle_version,
request.bundle_version if request.bundle_version is not None else "latest",
)
Comment thread
nikolauspschuetz marked this conversation as resolved.
return None
return bundle
Expand Down
34 changes: 32 additions & 2 deletions airflow-core/tests/unit/dag_processing/test_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -1967,7 +1967,9 @@ def test_prepare_callback_bundle_initializes_versioned_bundle(self, mock_bundle_
bundle.initialize.assert_called_once()

@mock.patch("airflow.dag_processing.manager.DagBundlesManager")
def test_prepare_callback_bundle_skips_initialize_for_unversioned_request(self, mock_bundle_manager):
def test_prepare_callback_bundle_initializes_versioned_bundle_when_version_is_none(
self, mock_bundle_manager
):
manager = DagFileProcessorManager(max_runs=1)
bundle = MagicMock(spec=BaseDagBundle)
bundle.supports_versioning = True
Expand All @@ -1984,7 +1986,7 @@ def test_prepare_callback_bundle_skips_initialize_for_unversioned_request(self,
)

assert manager.prepare_callback_bundle(request) is bundle
bundle.initialize.assert_not_called()
bundle.initialize.assert_called_once()

@mock.patch("airflow.dag_processing.manager.DagBundlesManager")
def test_prepare_callback_bundle_skips_initialize_for_non_versioning_bundle(self, mock_bundle_manager):
Expand Down Expand Up @@ -2071,6 +2073,34 @@ def test_add_callback_queues_file_info_on_success(self, mock_bundle_manager):
assert file_info.bundle_name == "testing"
assert requests == [request]

@mock.patch("airflow.dag_processing.manager.DagBundlesManager")
def test_add_callback_queues_resolved_path_when_version_is_none(self, mock_bundle_manager):
manager = DagFileProcessorManager(max_runs=1)
initialized_path = Path("/tmp/bundle-initialized")
bundle = MagicMock(spec=BaseDagBundle)
bundle.supports_versioning = True
# A versioned bundle's path is only usable after initialize(); simulate that by
# leaving path unresolved until initialize() materializes it.
bundle.path = Path("/dev/null")
bundle.initialize.side_effect = lambda: setattr(bundle, "path", initialized_path)
mock_bundle_manager.return_value.get_bundle.return_value = bundle

request = DagCallbackRequest(
filepath="file1.py",
dag_id="dag1",
run_id="run1",
is_failure_callback=False,
bundle_name="testing",
bundle_version=None,
msg=None,
)

manager._add_callback_to_queue(request)

bundle.initialize.assert_called_once()
[(file_info, _)] = manager._callback_to_execute.items()
assert file_info.bundle_path == initialized_path

@mock.patch("airflow.dag_processing.manager.DagBundlesManager")
def test_add_callback_skips_when_bundle_init_fails(self, mock_bundle_manager):
manager = DagFileProcessorManager(max_runs=1)
Expand Down