Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions airflow-core/src/airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2628,6 +2628,16 @@ scheduler:
type: float
example: ~
default: "30.0"
dagrun_late_threshold:
description: |
How many seconds after the scheduled time a scheduled DagRun may remain
uncreated or queued before the scheduler includes it in the
``scheduler.dagruns.not_started`` gauge.
Set to ``0.0`` to treat any delay as late.
version_added: 3.2.0
type: float
example: ~
default: "15.0"
scheduler_health_check_threshold:
description: |
If the last scheduler heartbeat happened more than ``[scheduler] scheduler_health_check_threshold``
Expand Down
35 changes: 34 additions & 1 deletion airflow-core/src/airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -1723,7 +1723,7 @@ def _run_scheduler_loop(self) -> None:

timers.call_regular_interval(
conf.getfloat("scheduler", "dagrun_metrics_interval", fallback=30.0),
self._emit_running_dags_metric,
self._emit_dagrun_metrics,
)

timers.call_regular_interval(
Expand Down Expand Up @@ -3220,6 +3220,39 @@ def _emit_running_dags_metric(self, *, session: Session = NEW_SESSION) -> None:
running_dags = float(session.scalar(stmt) or 0)
stats.gauge("scheduler.dagruns.running", running_dags)

@provide_session
def _emit_dagrun_metrics(self, *, session: Session = NEW_SESSION) -> None:
self._emit_running_dags_metric(session=session)
self._emit_not_started_dagruns_metric(session=session)

def _emit_not_started_dagruns_metric(self, *, session: Session) -> None:
cutoff = timezone.utcnow() - timedelta(seconds=conf.getfloat("scheduler", "dagrun_late_threshold"))

uncreated_count = session.scalar(
select(func.count())
.select_from(DagModel)
.where(
DagModel.is_paused == expression.false(),
DagModel.is_stale == expression.false(),
DagModel.has_import_errors == expression.false(),
DagModel.exceeds_max_non_backfill == expression.false(),
DagModel.next_dagrun.is_not(None),
DagModel.next_dagrun_create_after.is_not(None),
DagModel.next_dagrun_create_after <= cutoff,
)
)
queued_count = session.scalar(
select(func.count())
.select_from(DagRun)
.where(
DagRun.run_type == DagRunType.SCHEDULED,
DagRun.state == DagRunState.QUEUED,
DagRun.run_after <= cutoff,
)
)
not_started_count = int(uncreated_count or 0) + int(queued_count or 0)
stats.gauge("scheduler.dagruns.not_started", not_started_count)

@provide_session
def _emit_pool_metrics(self, *, session: Session = NEW_SESSION) -> None:
from airflow.models.pool import Pool
Expand Down
80 changes: 80 additions & 0 deletions airflow-core/tests/unit/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -8998,6 +8998,86 @@ def _fake_gauge(metric: str, value: int, *_, **__):

assert recorded == [("scheduler.dagruns.running", 2)]

@time_machine.travel("2024-01-01T00:00:00+00:00", tick=False)
@conf_vars({("scheduler", "dagrun_late_threshold"): "15.0"})
def test_emit_not_started_dagruns_metric_counts_uncreated_and_queued(
self, dag_maker, monkeypatch, session
):
now = timezone.utcnow()

with dag_maker("uncreated_late"):
pass
dag_model = dag_maker.dag_model
dag_model.is_paused = False
dag_model.is_stale = False
dag_model.has_import_errors = False
dag_model.exceeds_max_non_backfill = False
dag_model.next_dagrun = now - timedelta(minutes=2)
dag_model.next_dagrun_create_after = now - timedelta(seconds=60)

with dag_maker("uncreated_on_time"):
pass
dag_model = dag_maker.dag_model
dag_model.is_paused = False
dag_model.is_stale = False
dag_model.has_import_errors = False
dag_model.exceeds_max_non_backfill = False
dag_model.next_dagrun = now
dag_model.next_dagrun_create_after = now - timedelta(seconds=5)

with dag_maker("queued_late"):
pass
dag_maker.create_dagrun(
run_id="scheduled_queued_late",
run_type=DagRunType.SCHEDULED,
state=DagRunState.QUEUED,
logical_date=now - timedelta(minutes=2),
run_after=now - timedelta(seconds=60),
)
dag_maker.create_dagrun(
run_id="scheduled_queued_on_time",
run_type=DagRunType.SCHEDULED,
state=DagRunState.QUEUED,
logical_date=now - timedelta(minutes=1),
run_after=now - timedelta(seconds=5),
)
dag_maker.create_dagrun(
run_id="manual_queued_late",
run_type=DagRunType.MANUAL,
state=DagRunState.QUEUED,
logical_date=now - timedelta(minutes=3),
run_after=now - timedelta(seconds=60),
)
session.flush()

recorded: list[tuple[str, int]] = []

def _fake_gauge(metric: str, value: int, *_, **__):
recorded.append((metric, value))

monkeypatch.setattr("airflow._shared.observability.metrics.stats.gauge", _fake_gauge, raising=True)

scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job, executors=[self.null_exec])
self.job_runner._emit_not_started_dagruns_metric(session=session)

assert recorded == [("scheduler.dagruns.not_started", 2)]

@conf_vars({("scheduler", "dagrun_late_threshold"): "15.0"})
def test_emit_not_started_dagruns_metric_emits_zero_count(self, monkeypatch, session):
recorded: list[tuple[str, int]] = []

def _fake_gauge(metric: str, value: int, *_, **__):
recorded.append((metric, value))

monkeypatch.setattr("airflow._shared.observability.metrics.stats.gauge", _fake_gauge, raising=True)

scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job, executors=[self.null_exec])
self.job_runner._emit_not_started_dagruns_metric(session=session)

assert recorded == [("scheduler.dagruns.not_started", 0)]

# Multi-team scheduling tests
def test_multi_team_get_team_names_for_dag_ids_success(self, dag_maker, session):
"""Test successful team name resolution for multiple DAG IDs."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,14 @@ metrics:
legacy_name: "-"
name_variables: []

- name: "scheduler.dagruns.not_started"
description: "Number of scheduled DagRuns that have not started after their scheduled time
plus the configured ``[scheduler] dagrun_late_threshold``. This includes due runs without a
DagRun row and scheduled DagRuns still in the queued state."
type: "gauge"
legacy_name: "-"
name_variables: []

- name: "ti.start"
description: "Number of started task in a given Dag. Similar to {job_name}_start but for task.
Metric with dag_id and task_id tagging."
Expand Down
Loading