diff --git a/README.md b/README.md index 8ae1dfd50..769c421f8 100644 --- a/README.md +++ b/README.md @@ -32,7 +32,7 @@ cd observatory-platform Install dependencies: ```bash -pip install -e .[tests] --constraint https://raw.githubusercontent.com/apache/airflow/constraints-2.10.5/constraints-no-providers-3.10.txt +pip install -e .[tests] --constraint https://raw.githubusercontent.com/apache/airflow/constraints-3.1.5/constraints-3.12.txt ``` Run unit tests: diff --git a/observatory_platform/airflow/airflow.py b/observatory_platform/airflow/airflow.py index ea0ed0064..80b04cc88 100644 --- a/observatory_platform/airflow/airflow.py +++ b/observatory_platform/airflow/airflow.py @@ -26,13 +26,14 @@ import six import validators from airflow.exceptions import AirflowException, AirflowNotFoundException -from airflow.hooks.base import BaseHook -from airflow.models import TaskInstance, XCom, DagRun, Connection +from airflow.sdk.bases.hook import BaseHook +from airflow.models import TaskInstance, XCom, DagRun +from airflow.sdk import Connection from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook from airflow.utils.db import provide_session from dateutil.relativedelta import relativedelta from sqlalchemy import and_ -from sqlalchemy.orm import scoped_session, Session +from sqlalchemy.orm import Session from observatory_platform.config import AirflowConns ScheduleInterval = Union[str, timedelta, relativedelta] diff --git a/observatory_platform/airflow/sensors.py b/observatory_platform/airflow/sensors.py index 2d72dbdee..146db4bb9 100644 --- a/observatory_platform/airflow/sensors.py +++ b/observatory_platform/airflow/sensors.py @@ -22,7 +22,7 @@ import pendulum from airflow.models import DagRun -from airflow.sensors.external_task import ExternalTaskSensor +from airflow.providers.standard.sensors.external_task import ExternalTaskSensor from airflow.utils.db import provide_session from airflow.utils.state import State from sqlalchemy.orm.scoping import scoped_session diff --git a/observatory_platform/airflow/tasks.py b/observatory_platform/airflow/tasks.py index b4646cd87..ce844c11d 100644 --- a/observatory_platform/airflow/tasks.py +++ b/observatory_platform/airflow/tasks.py @@ -19,10 +19,10 @@ from typing import List, Optional import airflow -from airflow.decorators import task +from airflow.sdk import task from airflow.exceptions import AirflowNotFoundException from airflow.hooks.base import BaseHook -from airflow.models import Variable +from airflow.sdk import Variable from observatory_platform.google.gke import gke_create_volume, gke_delete_volume diff --git a/observatory_platform/airflow/tests/fixtures/bad_dag.py b/observatory_platform/airflow/tests/fixtures/bad_dag.py index e4ce1af0c..fbfd321c6 100644 --- a/observatory_platform/airflow/tests/fixtures/bad_dag.py +++ b/observatory_platform/airflow/tests/fixtures/bad_dag.py @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:ecff8cd6bee7e5a6dd1a4b40358484eef5c52f0efe1b3c31a00f62a1d7987f81 -size 978 +oid sha256:ca28383c36130ea31943b776b540f15f41de1f0a086e479da28fa9ad46d4c32a +size 1001 diff --git a/observatory_platform/airflow/tests/fixtures/good_dag.py b/observatory_platform/airflow/tests/fixtures/good_dag.py index 5d26c28c0..b00270014 100644 --- a/observatory_platform/airflow/tests/fixtures/good_dag.py +++ b/observatory_platform/airflow/tests/fixtures/good_dag.py @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:321e9a394cad467e2bcc41583e55e45f9ecbaeb6c82a4d518290994cac75b708 -size 1152 +oid sha256:56c228ee6ac13255d4a82270505dccd601326235751d0d0fc0dc3e1f149609f4 +size 1175 diff --git a/observatory_platform/airflow/tests/test_airflow.py b/observatory_platform/airflow/tests/test_airflow.py index 9d0599c5e..4da83359c 100644 --- a/observatory_platform/airflow/tests/test_airflow.py +++ b/observatory_platform/airflow/tests/test_airflow.py @@ -24,14 +24,14 @@ import unittest from unittest.mock import MagicMock, patch -from airflow.decorators import dag from airflow.exceptions import AirflowException, AirflowNotFoundException from airflow.hooks.base import BaseHook -from airflow.models.connection import Connection -from airflow.models.dag import DAG, settings +from airflow.sdk import XCom, Connection +from airflow.models.dag import settings +from airflow.sdk import DAG, dag from airflow.models.xcom import XCom -from airflow.operators.bash import BashOperator -from airflow.operators.python import PythonOperator +from airflow.providers.standard.operators.bash import BashOperator +from airflow.providers.standard.operators.python import PythonOperator from airflow.utils import db from airflow.utils.session import provide_session from airflow.utils.state import State diff --git a/observatory_platform/airflow/tests/test_sensors.py b/observatory_platform/airflow/tests/test_sensors.py index bbe31f772..dc76c8ec0 100644 --- a/observatory_platform/airflow/tests/test_sensors.py +++ b/observatory_platform/airflow/tests/test_sensors.py @@ -23,8 +23,8 @@ import time_machine from airflow.exceptions import AirflowException from airflow.models import DagRun, DagModel -from airflow.models.dag import DAG -from airflow.operators.python import PythonOperator +from airflow.sdk import DAG +from airflow.providers.standard.operators.python import PythonOperator from airflow.utils.session import provide_session from airflow.utils.state import State diff --git a/observatory_platform/airflow/workflow.py b/observatory_platform/airflow/workflow.py index 7fc12fdad..1cc86e084 100644 --- a/observatory_platform/airflow/workflow.py +++ b/observatory_platform/airflow/workflow.py @@ -27,7 +27,8 @@ import pendulum from airflow import AirflowException -from airflow.models import DagBag, Variable +from airflow.models import DagBag +from airflow.sdk import Variable from observatory_platform.airflow.airflow import delete_old_xcoms from observatory_platform.config import AirflowVars diff --git a/observatory_platform/google/tests/fixtures/bad_dag.py b/observatory_platform/google/tests/fixtures/bad_dag.py index e4ce1af0c..fbfd321c6 100644 --- a/observatory_platform/google/tests/fixtures/bad_dag.py +++ b/observatory_platform/google/tests/fixtures/bad_dag.py @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:ecff8cd6bee7e5a6dd1a4b40358484eef5c52f0efe1b3c31a00f62a1d7987f81 -size 978 +oid sha256:ca28383c36130ea31943b776b540f15f41de1f0a086e479da28fa9ad46d4c32a +size 1001 diff --git a/observatory_platform/load_dags.py b/observatory_platform/load_dags.py index 587569244..561e1e1a9 100644 --- a/observatory_platform/load_dags.py +++ b/observatory_platform/load_dags.py @@ -15,6 +15,6 @@ # The keywords airflow and DAG are required to load the DAGs from this file, see bullet 2 in the Apache Airflow FAQ: # https://airflow.apache.org/docs/stable/faq.html -from observatory.platform.refactor.workflow import load_dags_from_config +from observatory_platform.airflow.workflow import load_dags_from_config load_dags_from_config() diff --git a/observatory_platform/sandbox/sandbox_environment.py b/observatory_platform/sandbox/sandbox_environment.py index 067a88f09..3122c0b9c 100644 --- a/observatory_platform/sandbox/sandbox_environment.py +++ b/observatory_platform/sandbox/sandbox_environment.py @@ -36,15 +36,19 @@ import os from tempfile import mkdtemp from typing import List, Optional, Set, Union +import uuid import google import pendulum import requests -from airflow import DAG, settings +from airflow import settings +from airflow.sdk import DAG from airflow.models.connection import Connection from airflow.models.dagrun import DagRun +from airflow.models.dag import DagModel from airflow.models.taskinstance import TaskInstance from airflow.models.variable import Variable +from airflow.models.dag_version import DagVersion from airflow.timetables.base import DataInterval from airflow.utils import db from airflow.utils.state import State @@ -101,6 +105,7 @@ def __init__( self.workflows = workflows self.env_vars = env_vars self.temp_dir = mkdtemp() + self.current_dag = None if self.create_gcp_env: self.download_bucket = self.add_bucket(roles=gcs_bucket_roles) @@ -171,6 +176,12 @@ def add_bucket(self, prefix: Optional[str] = None, roles: Optional[Union[Set[str return bucket_name + def _ensure_dag_version(self, dag): + dag_version = DagVersion.get_latest_version(dag.dag_id, session=self.session) + if dag_version is None: + dag_version = DagVersion.write_dag(dag, session=self.session) + return dag_version + def _create_bucket(self, bucket_id: str, roles: Optional[Union[str, Set[str]]] = None) -> None: """Create a Google Cloud Storage Bucket. @@ -263,6 +274,7 @@ def add_variable(self, var: Variable) -> None: :param var: the Airflow variable. :return: None. """ + try: existing_var = self.session.query(Variable).filter(Variable.key == var.key).first() if existing_var: @@ -281,6 +293,8 @@ def add_variable(self, var: Variable) -> None: self.session.add(var) self.session.commit() + os.environ[f"AIRFLOW_VAR_{var.key.upper()}"] = var.val + def add_connection(self, conn: Connection): """Add an Airflow connection to the Observatory environment. @@ -288,9 +302,17 @@ def add_connection(self, conn: Connection): :return: None. """ + # Add to the database for scheduler/legacy logic + existing_conn = self.session.query(Connection).filter(Connection.conn_id == conn.conn_id).first() + if existing_conn: + self.session.delete(existing_conn) + self.session.add(conn) self.session.commit() + env_key = f"AIRFLOW_CONN_{conn.conn_id.upper()}" + os.environ[env_key] = conn.get_uri() + def run_task(self, task_id: str, map_index: int = -1) -> TaskInstance: """Run an Airflow task. @@ -298,13 +320,14 @@ def run_task(self, task_id: str, map_index: int = -1) -> TaskInstance: :param map_index: the map index if the task is a daynamic task :return: None. """ - assert self.dag_run is not None, "with create_dag_run must be called before run_task" - dag = self.dag_run.dag run_id = self.dag_run.run_id - task = dag.get_task(task_id=task_id) - ti = TaskInstance(task, run_id=run_id, map_index=map_index) + task = self.current_dag.get_task(task_id=task_id) + latest_version = DagVersion.get_latest_version(self.dag_run.dag_id) + # latest_version = self._ensure_dag_version(self.current_dag) + # ti = TaskInstance(task=task, dag_version_id=uuid.uuid4(), run_id=run_id, map_index=map_index) + ti = TaskInstance(task=task, dag_version_id=latest_version, run_id=run_id, map_index=map_index) ti.refresh_from_db() # TODO: remove this when this issue fixed / PR merged: https://github.com/apache/airflow/issues/34023#issuecomment-1705761692 @@ -327,7 +350,7 @@ def get_task_instance(self, task_id: str) -> TaskInstance: assert self.dag_run is not None, "with create_dag_run must be called before get_task_instance" run_id = self.dag_run.run_id - task = self.dag_run.dag.get_task(task_id=task_id) + task = self.current_dag.get_task(task_id=task_id) ti = TaskInstance(task, run_id=run_id) ti.refresh_from_db() return ti @@ -338,7 +361,7 @@ def create_dag_run( dag: DAG, logical_date: pendulum.DateTime = None, data_interval: DataInterval = None, - run_type: DagRunType = DagRunType.SCHEDULED, + run_type: str = DagRunType.SCHEDULED, ): """Create a DagRun that can be used when running tasks. During cleanup the DAG run state is updated. @@ -354,19 +377,29 @@ def create_dag_run( if not logical_date: logical_date = data_interval.start elif logical_date: - data_interval = dag.infer_automated_data_interval(logical_date=logical_date) + data_interval = dag.timetable.infer_manual_data_interval(run_after=logical_date) start_date = data_interval.start else: raise ValueError("Must provide one of `data_inerval` or `logical_date`") + # dag_version = self._ensure_dag_version(dag) try: - self.dag_run = dag.create_dagrun( + self.dag_run = DagRun( + dag_id=dag.dag_id, + run_id=dag.timetable.generate_run_id( + run_type=DagRunType(run_type), run_after=logical_date, data_interval=data_interval + ), state=State.RUNNING, - execution_date=logical_date, + logical_date=logical_date, start_date=start_date, - run_type=run_type, + run_type=DagRunType(run_type), data_interval=data_interval, ) + DagVersion.write_dag(dag.dag_id, "sandbox", session=self.session) + self.session.add(self.dag_run) + self.session.commit() + self.current_dag = dag + yield self.dag_run finally: self.dag_run.update_state() diff --git a/observatory_platform/sandbox/test_utils.py b/observatory_platform/sandbox/test_utils.py index eda7bb059..58cb599fb 100644 --- a/observatory_platform/sandbox/test_utils.py +++ b/observatory_platform/sandbox/test_utils.py @@ -30,10 +30,10 @@ import boto3 import httpretty import pendulum -from airflow import DAG +from airflow.sdk import DAG from airflow.exceptions import AirflowException from airflow.models import DagBag -from airflow.operators.empty import EmptyOperator +from airflow.providers.standard.operators.empty import EmptyOperator from deepdiff import DeepDiff from google.cloud import bigquery, storage from google.cloud.bigquery import SourceFormat diff --git a/observatory_platform/sandbox/tests/fixtures/bad_dag.py b/observatory_platform/sandbox/tests/fixtures/bad_dag.py index e4ce1af0c..fbfd321c6 100644 --- a/observatory_platform/sandbox/tests/fixtures/bad_dag.py +++ b/observatory_platform/sandbox/tests/fixtures/bad_dag.py @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:ecff8cd6bee7e5a6dd1a4b40358484eef5c52f0efe1b3c31a00f62a1d7987f81 -size 978 +oid sha256:ca28383c36130ea31943b776b540f15f41de1f0a086e479da28fa9ad46d4c32a +size 1001 diff --git a/observatory_platform/sandbox/tests/test_sandbox_environment.py b/observatory_platform/sandbox/tests/test_sandbox_environment.py index ad17f638f..67f086f90 100644 --- a/observatory_platform/sandbox/tests/test_sandbox_environment.py +++ b/observatory_platform/sandbox/tests/test_sandbox_environment.py @@ -22,11 +22,10 @@ from datetime import timedelta import pendulum -from airflow.decorators import dag, task, task_group +from airflow.sdk import dag, task, task_group, Variable from airflow.exceptions import AirflowSkipException -from airflow.models.connection import Connection +from airflow.sdk import Connection from airflow.models.dag import ScheduleArg -from airflow.models.variable import Variable from airflow.timetables.base import DataInterval from airflow.utils.state import TaskInstanceState from google.cloud.exceptions import NotFound diff --git a/pyproject.toml b/pyproject.toml index ebc29617a..c5024285a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -6,7 +6,7 @@ build-backend = "setuptools.build_meta" name = "observatory-platform" dynamic = ["version"] description = "The Observatory Platform is an environment for fetching, processing and analysing data to understand how well universities operate as Open Knowledge Institutions." -requires-python = ">=3.10" +requires-python = ">=3.12,<3.13" license = { text = "Apache-2.0" } keywords = ["science", "data", "workflows", "academic institutes", "academic-observatory-workflows"] authors = [{ name = "Curtin University", email = "agent@observatory.academy" }] @@ -19,7 +19,7 @@ classifiers = [ "License :: OSI Approved :: Apache Software License", "Operating System :: OS Independent", "Programming Language :: Python :: 3", - "Programming Language :: Python :: 3.7", + "Programming Language :: Python :: 3.12", "Topic :: Scientific/Engineering", "Topic :: Software Development :: Libraries", "Topic :: Software Development :: Libraries :: Python Modules", @@ -28,23 +28,24 @@ classifiers = [ dependencies = [ "setuptools==80.9.0", # Airflow - "apache-airflow[slack]==2.10.5", - "apache-airflow-providers-cncf-kubernetes==7.4.0", - "apache-airflow-providers-common-compat==1.9.0", - "apache-airflow-providers-common-io==1.6.5", - "apache-airflow-providers-common-sql==1.29.0", - "apache-airflow-providers-fab==1.5.2", - "apache-airflow-providers-ftp==3.13.3", - "apache-airflow-providers-http==5.3.2", - "apache-airflow-providers-imap==3.9.4", - "apache-airflow-providers-slack==9.1.0", - "apache-airflow-providers-smtp==2.3.2", - "apache-airflow-providers-sqlite==4.1.3", + "apache-airflow==3.1.5", + "apache-airflow-providers-slack", + "apache-airflow-providers-cncf-kubernetes", + "apache-airflow-providers-common-compat", + "apache-airflow-providers-common-io", + "apache-airflow-providers-common-sql", + "apache-airflow-providers-fab", + "apache-airflow-providers-ftp", + "apache-airflow-providers-http", + "apache-airflow-providers-imap", + "apache-airflow-providers-smtp", + "apache-airflow-providers-sqlite", + "apache-airflow-providers-standard", # Google Cloud "google-crc32c>=1.1.0,<2", "google-cloud-bigquery>=3.2,<4", "google-api-python-client>=2,<3", - "google-cloud-storage>=2.7.0,<3", + "google-cloud-storage>=3.0.0,<4", #"google-auth-oauthlib>=0.4.5,<1", "google-cloud-compute >=1.16.0,<2.0", # File manipulation, reading, writing @@ -58,9 +59,9 @@ dependencies = [ # SFTP "paramiko>=3,<4", # Utils - "natsort>=7.1.1,<8", + "validators", + "natsort>=8,<9", "backoff>=2,<3", - "validators<=0.20.0", "xmltodict", "tenacity>=8.0.0", "python-dateutil",