diff --git a/airflow-core/src/airflow/models/dagbag.py b/airflow-core/src/airflow/models/dagbag.py index c4bd8eceea102..7dd748397cfd7 100644 --- a/airflow-core/src/airflow/models/dagbag.py +++ b/airflow-core/src/airflow/models/dagbag.py @@ -18,6 +18,7 @@ from __future__ import annotations import hashlib +import logging import time from collections.abc import MutableMapping from contextlib import nullcontext @@ -43,6 +44,8 @@ from airflow.models.serialized_dag import SerializedDagModel from airflow.serialization.definitions.dag import SerializedDAG +log = logging.getLogger(__name__) + class _CacheEntry(NamedTuple): """A cached deserialized DAG plus the metadata needed to detect staleness on lookup.""" @@ -104,7 +107,14 @@ def __init__( def _read_dag(self, serdag: SerializedDagModel) -> SerializedDAG | None: """Read and cache a SerializedDAG (with its ``dag_hash`` for staleness detection).""" serdag.load_op_links = self.load_op_links - dag = serdag.dag + try: + dag = serdag.dag + except Exception: + # A serialized blob that exists but cannot be reconstructed (unimportable operator class, + # incompatible serialization version, blob written under a synthetic bundle by + # dag.test()) is treated as "no live definition" so read-only callers return 404, not 500. + log.warning("Failed to deserialize DAG from %r; treating as not found", serdag, exc_info=True) + return None if not dag: return None with self._lock: diff --git a/airflow-core/tests/unit/models/test_dagbag.py b/airflow-core/tests/unit/models/test_dagbag.py index 79668d4fe54f4..1b9dd06e26e16 100644 --- a/airflow-core/tests/unit/models/test_dagbag.py +++ b/airflow-core/tests/unit/models/test_dagbag.py @@ -77,6 +77,26 @@ def test__read_dag_returns_none_when_no_dag(self): assert result is None assert "v1" not in self.db_dag_bag._dags + def test__read_dag_returns_none_when_deserialization_fails(self): + """A blob that exists but cannot be deserialized is treated as absent, not propagated. + + Keeps read-only API callers (e.g. the DAG detail page) returning 404 instead of 500 for a + Dag whose serialized definition references an unimportable class or an incompatible version. + """ + + class _Undeserializable: + load_op_links = True + dag_version_id = "v1" + + @property + def dag(self): + raise ValueError("cannot deserialize") + + result = self.db_dag_bag._read_dag(_Undeserializable()) + + assert result is None + assert "v1" not in self.db_dag_bag._dags + def test_get_dag_fetches_from_db_on_miss(self): """It should query the DB and cache the result (with its hash) when not in cache.""" mock_dag = MagicMock(spec=SerializedDAG)