Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion airflow-core/src/airflow/models/dagbag.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from __future__ import annotations

import hashlib
import logging
import time
from collections.abc import MutableMapping
from contextlib import nullcontext
Expand All @@ -43,6 +44,8 @@
from airflow.models.serialized_dag import SerializedDagModel
from airflow.serialization.definitions.dag import SerializedDAG

log = logging.getLogger(__name__)


class _CacheEntry(NamedTuple):
"""A cached deserialized DAG plus the metadata needed to detect staleness on lookup."""
Expand Down Expand Up @@ -104,7 +107,14 @@ def __init__(
def _read_dag(self, serdag: SerializedDagModel) -> SerializedDAG | None:
"""Read and cache a SerializedDAG (with its ``dag_hash`` for staleness detection)."""
serdag.load_op_links = self.load_op_links
dag = serdag.dag
try:
dag = serdag.dag
except Exception:
# A serialized blob that exists but cannot be reconstructed (unimportable operator class,
# incompatible serialization version, blob written under a synthetic bundle by
# dag.test()) is treated as "no live definition" so read-only callers return 404, not 500.
log.warning("Failed to deserialize DAG from %r; treating as not found", serdag, exc_info=True)
return None
if not dag:
return None
with self._lock:
Expand Down
20 changes: 20 additions & 0 deletions airflow-core/tests/unit/models/test_dagbag.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,26 @@ def test__read_dag_returns_none_when_no_dag(self):
assert result is None
assert "v1" not in self.db_dag_bag._dags

def test__read_dag_returns_none_when_deserialization_fails(self):
"""A blob that exists but cannot be deserialized is treated as absent, not propagated.
Keeps read-only API callers (e.g. the DAG detail page) returning 404 instead of 500 for a
Dag whose serialized definition references an unimportable class or an incompatible version.
"""

class _Undeserializable:
load_op_links = True
dag_version_id = "v1"

@property
def dag(self):
raise ValueError("cannot deserialize")

result = self.db_dag_bag._read_dag(_Undeserializable())

assert result is None
assert "v1" not in self.db_dag_bag._dags

def test_get_dag_fetches_from_db_on_miss(self):
"""It should query the DB and cache the result (with its hash) when not in cache."""
mock_dag = MagicMock(spec=SerializedDAG)
Expand Down