diff --git a/airflow-core/src/airflow/config_templates/config.yml b/airflow-core/src/airflow/config_templates/config.yml index 39423755a6841..65414fbe534f2 100644 --- a/airflow-core/src/airflow/config_templates/config.yml +++ b/airflow-core/src/airflow/config_templates/config.yml @@ -2628,6 +2628,16 @@ scheduler: type: float example: ~ default: "30.0" + dagrun_late_threshold: + description: | + How many seconds after the scheduled time a scheduled DagRun may remain + uncreated or queued before the scheduler includes it in the + ``scheduler.dagruns.not_started`` gauge. + Set to ``0.0`` to treat any delay as late. + version_added: 3.2.0 + type: float + example: ~ + default: "15.0" scheduler_health_check_threshold: description: | If the last scheduler heartbeat happened more than ``[scheduler] scheduler_health_check_threshold`` diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py b/airflow-core/src/airflow/jobs/scheduler_job_runner.py index df2abe2aabd06..132d0f05ae181 100644 --- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py +++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py @@ -1723,7 +1723,7 @@ def _run_scheduler_loop(self) -> None: timers.call_regular_interval( conf.getfloat("scheduler", "dagrun_metrics_interval", fallback=30.0), - self._emit_running_dags_metric, + self._emit_dagrun_metrics, ) timers.call_regular_interval( @@ -3220,6 +3220,39 @@ def _emit_running_dags_metric(self, *, session: Session = NEW_SESSION) -> None: running_dags = float(session.scalar(stmt) or 0) stats.gauge("scheduler.dagruns.running", running_dags) + @provide_session + def _emit_dagrun_metrics(self, *, session: Session = NEW_SESSION) -> None: + self._emit_running_dags_metric(session=session) + self._emit_not_started_dagruns_metric(session=session) + + def _emit_not_started_dagruns_metric(self, *, session: Session) -> None: + cutoff = timezone.utcnow() - timedelta(seconds=conf.getfloat("scheduler", "dagrun_late_threshold")) + + uncreated_count = session.scalar( + select(func.count()) + .select_from(DagModel) + .where( + DagModel.is_paused == expression.false(), + DagModel.is_stale == expression.false(), + DagModel.has_import_errors == expression.false(), + DagModel.exceeds_max_non_backfill == expression.false(), + DagModel.next_dagrun.is_not(None), + DagModel.next_dagrun_create_after.is_not(None), + DagModel.next_dagrun_create_after <= cutoff, + ) + ) + queued_count = session.scalar( + select(func.count()) + .select_from(DagRun) + .where( + DagRun.run_type == DagRunType.SCHEDULED, + DagRun.state == DagRunState.QUEUED, + DagRun.run_after <= cutoff, + ) + ) + not_started_count = int(uncreated_count or 0) + int(queued_count or 0) + stats.gauge("scheduler.dagruns.not_started", not_started_count) + @provide_session def _emit_pool_metrics(self, *, session: Session = NEW_SESSION) -> None: from airflow.models.pool import Pool diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py b/airflow-core/tests/unit/jobs/test_scheduler_job.py index e2d5977eb86f7..94a27e182fe2e 100644 --- a/airflow-core/tests/unit/jobs/test_scheduler_job.py +++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py @@ -8998,6 +8998,86 @@ def _fake_gauge(metric: str, value: int, *_, **__): assert recorded == [("scheduler.dagruns.running", 2)] + @time_machine.travel("2024-01-01T00:00:00+00:00", tick=False) + @conf_vars({("scheduler", "dagrun_late_threshold"): "15.0"}) + def test_emit_not_started_dagruns_metric_counts_uncreated_and_queued( + self, dag_maker, monkeypatch, session + ): + now = timezone.utcnow() + + with dag_maker("uncreated_late"): + pass + dag_model = dag_maker.dag_model + dag_model.is_paused = False + dag_model.is_stale = False + dag_model.has_import_errors = False + dag_model.exceeds_max_non_backfill = False + dag_model.next_dagrun = now - timedelta(minutes=2) + dag_model.next_dagrun_create_after = now - timedelta(seconds=60) + + with dag_maker("uncreated_on_time"): + pass + dag_model = dag_maker.dag_model + dag_model.is_paused = False + dag_model.is_stale = False + dag_model.has_import_errors = False + dag_model.exceeds_max_non_backfill = False + dag_model.next_dagrun = now + dag_model.next_dagrun_create_after = now - timedelta(seconds=5) + + with dag_maker("queued_late"): + pass + dag_maker.create_dagrun( + run_id="scheduled_queued_late", + run_type=DagRunType.SCHEDULED, + state=DagRunState.QUEUED, + logical_date=now - timedelta(minutes=2), + run_after=now - timedelta(seconds=60), + ) + dag_maker.create_dagrun( + run_id="scheduled_queued_on_time", + run_type=DagRunType.SCHEDULED, + state=DagRunState.QUEUED, + logical_date=now - timedelta(minutes=1), + run_after=now - timedelta(seconds=5), + ) + dag_maker.create_dagrun( + run_id="manual_queued_late", + run_type=DagRunType.MANUAL, + state=DagRunState.QUEUED, + logical_date=now - timedelta(minutes=3), + run_after=now - timedelta(seconds=60), + ) + session.flush() + + recorded: list[tuple[str, int]] = [] + + def _fake_gauge(metric: str, value: int, *_, **__): + recorded.append((metric, value)) + + monkeypatch.setattr("airflow._shared.observability.metrics.stats.gauge", _fake_gauge, raising=True) + + scheduler_job = Job() + self.job_runner = SchedulerJobRunner(job=scheduler_job, executors=[self.null_exec]) + self.job_runner._emit_not_started_dagruns_metric(session=session) + + assert recorded == [("scheduler.dagruns.not_started", 2)] + + @conf_vars({("scheduler", "dagrun_late_threshold"): "15.0"}) + def test_emit_not_started_dagruns_metric_emits_zero_count(self, monkeypatch, session): + recorded: list[tuple[str, int]] = [] + + def _fake_gauge(metric: str, value: int, *_, **__): + recorded.append((metric, value)) + + monkeypatch.setattr("airflow._shared.observability.metrics.stats.gauge", _fake_gauge, raising=True) + + scheduler_job = Job() + self.job_runner = SchedulerJobRunner(job=scheduler_job, executors=[self.null_exec]) + self.job_runner._emit_not_started_dagruns_metric(session=session) + + assert recorded == [("scheduler.dagruns.not_started", 0)] + # Multi-team scheduling tests def test_multi_team_get_team_names_for_dag_ids_success(self, dag_maker, session): """Test successful team name resolution for multiple DAG IDs.""" diff --git a/shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml b/shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml index 6c51e32ff7798..14886c5473aae 100644 --- a/shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml +++ b/shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml @@ -205,6 +205,14 @@ metrics: legacy_name: "-" name_variables: [] + - name: "scheduler.dagruns.not_started" + description: "Number of scheduled DagRuns that have not started after their scheduled time + plus the configured ``[scheduler] dagrun_late_threshold``. This includes due runs without a + DagRun row and scheduled DagRuns still in the queued state." + type: "gauge" + legacy_name: "-" + name_variables: [] + - name: "ti.start" description: "Number of started task in a given Dag. Similar to {job_name}_start but for task. Metric with dag_id and task_id tagging."