From 1d227f178ba045f68c93ba75858324ffd157f319 Mon Sep 17 00:00:00 2001 From: justinpakzad <114518232+justinpakzad@users.noreply.github.com> Date: Sat, 27 Jun 2026 16:08:40 -0400 Subject: [PATCH] Add team_name tags to Amazon executor metrics --- .../executors/aws_lambda/lambda_executor.py | 9 +++++-- .../aws/executors/batch/batch_executor.py | 10 ++++--- .../amazon/aws/executors/ecs/ecs_executor.py | 9 +++++-- .../aws_lambda/test_lambda_executor.py | 27 +++++++++++++++++++ .../executors/batch/test_batch_executor.py | 25 +++++++++++++++++ .../aws/executors/ecs/test_ecs_executor.py | 27 ++++++++++++++++++- 6 files changed, 99 insertions(+), 8 deletions(-) diff --git a/providers/amazon/src/airflow/providers/amazon/aws/executors/aws_lambda/lambda_executor.py b/providers/amazon/src/airflow/providers/amazon/aws/executors/aws_lambda/lambda_executor.py index 72455165e83e2..490f609b0d126 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/executors/aws_lambda/lambda_executor.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/executors/aws_lambda/lambda_executor.py @@ -44,6 +44,7 @@ from airflow.providers.amazon.aws.hooks.sqs import SqsHook from airflow.providers.amazon.version_compat import AIRFLOW_V_3_0_PLUS, AIRFLOW_V_3_3_PLUS from airflow.providers.common.compat.sdk import AirflowException, Stats, timezone +from airflow.utils.helpers import prune_dict if TYPE_CHECKING: from sqlalchemy.orm import Session @@ -94,7 +95,9 @@ def __init__(self, *args, **kwargs): from airflow.providers.common.compat.sdk import conf self.conf = conf - + # Backwards compatibility for Airflow versions that do not define team_name. + if not hasattr(self, "team_name"): + self.team_name = None self.lambda_function_name = self.conf.get(CONFIG_GROUP_NAME, AllLambdaConfigKeys.FUNCTION_NAME) self.sqs_queue_url = self.conf.get(CONFIG_GROUP_NAME, AllLambdaConfigKeys.QUEUE_URL) self.dlq_url = self.conf.get(CONFIG_GROUP_NAME, AllLambdaConfigKeys.DLQ_URL) @@ -558,7 +561,9 @@ def try_adopt_task_instances(self, tis: Sequence[TaskInstance]) -> Sequence[Task :param tis: The task instances to adopt. """ - with Stats.timer("lambda_executor.adopt_task_instances.duration"): + with Stats.timer( + "lambda_executor.adopt_task_instances.duration", tags=prune_dict({"team_name": self.team_name}) + ): adopted_tis: list[TaskInstance] = [] if serialized_workload_keys := [ diff --git a/providers/amazon/src/airflow/providers/amazon/aws/executors/batch/batch_executor.py b/providers/amazon/src/airflow/providers/amazon/aws/executors/batch/batch_executor.py index 74614961f89e9..8d4eaba85e502 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/executors/batch/batch_executor.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/executors/batch/batch_executor.py @@ -35,7 +35,7 @@ from airflow.providers.amazon.aws.hooks.batch_client import BatchClientHook from airflow.providers.amazon.version_compat import AIRFLOW_V_3_0_PLUS, AIRFLOW_V_3_3_PLUS from airflow.providers.common.compat.sdk import AirflowException, Stats, timezone -from airflow.utils.helpers import merge_dicts +from airflow.utils.helpers import merge_dicts, prune_dict if TYPE_CHECKING: from sqlalchemy.orm import Session @@ -116,7 +116,9 @@ def __init__(self, *args, **kwargs): from airflow.providers.common.compat.sdk import conf self.conf = conf - + # Backwards compatibility for Airflow versions that do not define team_name. + if not hasattr(self, "team_name"): + self.team_name = None self.attempts_since_last_successful_connection = 0 self.load_batch_connection(check_connection=False) self.IS_BOTO_CONNECTION_HEALTHY = False @@ -540,7 +542,9 @@ def try_adopt_task_instances(self, tis: Sequence[TaskInstance]) -> Sequence[Task Anything that is not adopted will be cleared by the scheduler and becomes eligible for re-scheduling. """ - with Stats.timer("batch_executor.adopt_task_instances.duration"): + with Stats.timer( + "batch_executor.adopt_task_instances.duration", tags=prune_dict({"team_name": self.team_name}) + ): adopted_tis: list[TaskInstance] = [] if job_ids := [ti.external_executor_id for ti in tis if ti.external_executor_id]: diff --git a/providers/amazon/src/airflow/providers/amazon/aws/executors/ecs/ecs_executor.py b/providers/amazon/src/airflow/providers/amazon/aws/executors/ecs/ecs_executor.py index 644fbb29ed634..3e3f882b9dd42 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/executors/ecs/ecs_executor.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/executors/ecs/ecs_executor.py @@ -50,7 +50,7 @@ from airflow.providers.amazon.aws.hooks.ecs import EcsHook from airflow.providers.amazon.version_compat import AIRFLOW_V_3_0_PLUS, AIRFLOW_V_3_3_PLUS from airflow.providers.common.compat.sdk import AirflowException, Stats, timezone -from airflow.utils.helpers import merge_dicts +from airflow.utils.helpers import merge_dicts, prune_dict from airflow.utils.state import State if TYPE_CHECKING: @@ -126,6 +126,9 @@ def __init__(self, *args, **kwargs): from airflow.providers.common.compat.sdk import conf self.conf = conf + # Backwards compatibility for Airflow versions that do not define team_name. + if not hasattr(self, "team_name"): + self.team_name = None self.cluster = self.conf.get(CONFIG_GROUP_NAME, AllEcsConfigKeys.CLUSTER) self.container_name = self.conf.get(CONFIG_GROUP_NAME, AllEcsConfigKeys.CONTAINER_NAME) @@ -641,7 +644,9 @@ def try_adopt_task_instances(self, tis: Sequence[TaskInstance]) -> Sequence[Task Anything that is not adopted will be cleared by the scheduler and becomes eligible for re-scheduling. """ - with Stats.timer("ecs_executor.adopt_task_instances.duration"): + with Stats.timer( + "ecs_executor.adopt_task_instances.duration", tags=prune_dict({"team_name": self.team_name}) + ): adopted_tis: list[TaskInstance] = [] if task_arns := [ti.external_executor_id for ti in tis if ti.external_executor_id]: diff --git a/providers/amazon/tests/unit/amazon/aws/executors/aws_lambda/test_lambda_executor.py b/providers/amazon/tests/unit/amazon/aws/executors/aws_lambda/test_lambda_executor.py index 284865285f164..6613b8e39d76f 100644 --- a/providers/amazon/tests/unit/amazon/aws/executors/aws_lambda/test_lambda_executor.py +++ b/providers/amazon/tests/unit/amazon/aws/executors/aws_lambda/test_lambda_executor.py @@ -1082,6 +1082,33 @@ def test_try_adopt_task_instances_callback(self, mock_executor): assert len(not_adopted) == 0 + @pytest.mark.parametrize( + ("team_name", "expected_tags"), + [ + pytest.param(None, {}, id="without_team"), + pytest.param( + "team_a", + {"team_name": "team_a"}, + id="with_team", + marks=pytest.mark.skipif( + not AIRFLOW_V_3_1_PLUS, reason="Multi-team support requires Airflow 3.1+" + ), + ), + ], + ) + @mock.patch.object(lambda_executor.Stats, "timer") + def test_try_adopt_task_instances_emits_team_name_tag( + self, mock_timer, mock_executor, team_name, expected_tags + ): + """Test that the adopt task instances duration metric is tagged with the team name.""" + mock_executor.team_name = team_name + + mock_executor.try_adopt_task_instances([]) + + mock_timer.assert_called_once_with( + "lambda_executor.adopt_task_instances.duration", tags=expected_tags + ) + @mock.patch("airflow.providers.amazon.aws.executors.aws_lambda.lambda_executor.timezone") def test_end_timeout(self, mock_timezone, mock_executor, mock_airflow_key): """Test that executor can end successfully; waiting for all workloads to naturally exit.""" diff --git a/providers/amazon/tests/unit/amazon/aws/executors/batch/test_batch_executor.py b/providers/amazon/tests/unit/amazon/aws/executors/batch/test_batch_executor.py index 211ed64acf604..f5800232610ca 100644 --- a/providers/amazon/tests/unit/amazon/aws/executors/batch/test_batch_executor.py +++ b/providers/amazon/tests/unit/amazon/aws/executors/batch/test_batch_executor.py @@ -931,6 +931,31 @@ def test_team_config(self): assert submit_kwargs["jobDefinition"] == "some-job-def" assert submit_kwargs["jobName"] == "some-job-name" + @pytest.mark.parametrize( + ("team_name", "expected_tags"), + [ + pytest.param(None, {}, id="without_team"), + pytest.param( + "team_a", + {"team_name": "team_a"}, + id="with_team", + marks=pytest.mark.skipif( + not AIRFLOW_V_3_1_PLUS, reason="Multi-team support requires Airflow 3.1+" + ), + ), + ], + ) + @mock.patch.object(batch_executor.Stats, "timer") + def test_try_adopt_task_instances_emits_team_name_tag( + self, mock_timer, mock_executor, team_name, expected_tags + ): + """Test that the adopt task instances duration metric is tagged with the team name.""" + mock_executor.team_name = team_name + + mock_executor.try_adopt_task_instances([]) + + mock_timer.assert_called_once_with("batch_executor.adopt_task_instances.duration", tags=expected_tags) + class TestBatchExecutorConfig: @staticmethod diff --git a/providers/amazon/tests/unit/amazon/aws/executors/ecs/test_ecs_executor.py b/providers/amazon/tests/unit/amazon/aws/executors/ecs/test_ecs_executor.py index de5a17acf9666..b7c25d51277b4 100644 --- a/providers/amazon/tests/unit/amazon/aws/executors/ecs/test_ecs_executor.py +++ b/providers/amazon/tests/unit/amazon/aws/executors/ecs/test_ecs_executor.py @@ -60,7 +60,7 @@ from tests_common import RUNNING_TESTS_AGAINST_AIRFLOW_PACKAGES from tests_common.test_utils.config import conf_vars -from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS, AIRFLOW_V_3_3_PLUS +from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS, AIRFLOW_V_3_1_PLUS, AIRFLOW_V_3_3_PLUS airflow_version = VersionInfo(*map(int, airflow_version_str.split(".")[:3])) @@ -1334,6 +1334,31 @@ def test_try_adopt_task_instances(self, mock_executor): # The remaining one task is unable to be adopted. assert len(not_adopted_tasks) == 1 + @pytest.mark.parametrize( + ("team_name", "expected_tags"), + [ + pytest.param(None, {}, id="without_team"), + pytest.param( + "team_a", + {"team_name": "team_a"}, + id="with_team", + marks=pytest.mark.skipif( + not AIRFLOW_V_3_1_PLUS, reason="Multi-team support requires Airflow 3.1+" + ), + ), + ], + ) + @mock.patch.object(ecs_executor.Stats, "timer") + def test_try_adopt_task_instances_emits_team_name_tag( + self, mock_timer, mock_executor, team_name, expected_tags + ): + """Test that the adopt task instances duration metric is tagged with the team name.""" + mock_executor.team_name = team_name + + mock_executor.try_adopt_task_instances([]) + + mock_timer.assert_called_once_with("ecs_executor.adopt_task_instances.duration", tags=expected_tags) + class TestEcsExecutorConfig: @pytest.fixture