Skip to content
Draft
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ cd observatory-platform

Install dependencies:
```bash
pip install -e .[tests] --constraint https://raw.githubusercontent.com/apache/airflow/constraints-2.10.5/constraints-no-providers-3.10.txt
pip install -e .[tests] --constraint https://raw.githubusercontent.com/apache/airflow/constraints-3.1.5/constraints-3.12.txt
```

Run unit tests:
Expand Down
7 changes: 4 additions & 3 deletions observatory_platform/airflow/airflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,14 @@
import six
import validators
from airflow.exceptions import AirflowException, AirflowNotFoundException
from airflow.hooks.base import BaseHook
from airflow.models import TaskInstance, XCom, DagRun, Connection
from airflow.sdk.bases.hook import BaseHook
from airflow.models import TaskInstance, XCom, DagRun
from airflow.sdk import Connection
from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook
from airflow.utils.db import provide_session
from dateutil.relativedelta import relativedelta
from sqlalchemy import and_
from sqlalchemy.orm import scoped_session, Session
from sqlalchemy.orm import Session
from observatory_platform.config import AirflowConns

ScheduleInterval = Union[str, timedelta, relativedelta]
Expand Down
2 changes: 1 addition & 1 deletion observatory_platform/airflow/sensors.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

import pendulum
from airflow.models import DagRun
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.providers.standard.sensors.external_task import ExternalTaskSensor
from airflow.utils.db import provide_session
from airflow.utils.state import State
from sqlalchemy.orm.scoping import scoped_session
Expand Down
4 changes: 2 additions & 2 deletions observatory_platform/airflow/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@
from typing import List, Optional

import airflow
from airflow.decorators import task
from airflow.sdk import task
from airflow.exceptions import AirflowNotFoundException
from airflow.hooks.base import BaseHook
from airflow.models import Variable
from airflow.sdk import Variable

from observatory_platform.google.gke import gke_create_volume, gke_delete_volume

Expand Down
4 changes: 2 additions & 2 deletions observatory_platform/airflow/tests/fixtures/bad_dag.py
Git LFS file not shown
4 changes: 2 additions & 2 deletions observatory_platform/airflow/tests/fixtures/good_dag.py
Git LFS file not shown
10 changes: 5 additions & 5 deletions observatory_platform/airflow/tests/test_airflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@
import unittest
from unittest.mock import MagicMock, patch

from airflow.decorators import dag
from airflow.exceptions import AirflowException, AirflowNotFoundException
from airflow.hooks.base import BaseHook
from airflow.models.connection import Connection
from airflow.models.dag import DAG, settings
from airflow.sdk import XCom, Connection
from airflow.models.dag import settings
from airflow.sdk import DAG, dag
from airflow.models.xcom import XCom
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.providers.standard.operators.bash import BashOperator
from airflow.providers.standard.operators.python import PythonOperator
from airflow.utils import db
from airflow.utils.session import provide_session
from airflow.utils.state import State
Expand Down
4 changes: 2 additions & 2 deletions observatory_platform/airflow/tests/test_sensors.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
import time_machine
from airflow.exceptions import AirflowException
from airflow.models import DagRun, DagModel
from airflow.models.dag import DAG
from airflow.operators.python import PythonOperator
from airflow.sdk import DAG
from airflow.providers.standard.operators.python import PythonOperator
from airflow.utils.session import provide_session
from airflow.utils.state import State

Expand Down
3 changes: 2 additions & 1 deletion observatory_platform/airflow/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@

import pendulum
from airflow import AirflowException
from airflow.models import DagBag, Variable
from airflow.models import DagBag
from airflow.sdk import Variable

from observatory_platform.airflow.airflow import delete_old_xcoms
from observatory_platform.config import AirflowVars
Expand Down
4 changes: 2 additions & 2 deletions observatory_platform/google/tests/fixtures/bad_dag.py
Git LFS file not shown
2 changes: 1 addition & 1 deletion observatory_platform/load_dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,6 @@
# The keywords airflow and DAG are required to load the DAGs from this file, see bullet 2 in the Apache Airflow FAQ:
# https://airflow.apache.org/docs/stable/faq.html

from observatory.platform.refactor.workflow import load_dags_from_config
from observatory_platform.airflow.workflow import load_dags_from_config

load_dags_from_config()
55 changes: 44 additions & 11 deletions observatory_platform/sandbox/sandbox_environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,19 @@
import os
from tempfile import mkdtemp
from typing import List, Optional, Set, Union
import uuid

import google
import pendulum
import requests
from airflow import DAG, settings
from airflow import settings
from airflow.sdk import DAG
from airflow.models.connection import Connection
from airflow.models.dagrun import DagRun
from airflow.models.dag import DagModel
from airflow.models.taskinstance import TaskInstance
from airflow.models.variable import Variable
from airflow.models.dag_version import DagVersion
from airflow.timetables.base import DataInterval
from airflow.utils import db
from airflow.utils.state import State
Expand Down Expand Up @@ -101,6 +105,7 @@ def __init__(
self.workflows = workflows
self.env_vars = env_vars
self.temp_dir = mkdtemp()
self.current_dag = None

if self.create_gcp_env:
self.download_bucket = self.add_bucket(roles=gcs_bucket_roles)
Expand Down Expand Up @@ -171,6 +176,12 @@ def add_bucket(self, prefix: Optional[str] = None, roles: Optional[Union[Set[str

return bucket_name

def _ensure_dag_version(self, dag):
dag_version = DagVersion.get_latest_version(dag.dag_id, session=self.session)
if dag_version is None:
dag_version = DagVersion.write_dag(dag, session=self.session)
return dag_version

def _create_bucket(self, bucket_id: str, roles: Optional[Union[str, Set[str]]] = None) -> None:
"""Create a Google Cloud Storage Bucket.

Expand Down Expand Up @@ -263,6 +274,7 @@ def add_variable(self, var: Variable) -> None:
:param var: the Airflow variable.
:return: None.
"""

try:
existing_var = self.session.query(Variable).filter(Variable.key == var.key).first()
if existing_var:
Expand All @@ -281,30 +293,41 @@ def add_variable(self, var: Variable) -> None:
self.session.add(var)
self.session.commit()

os.environ[f"AIRFLOW_VAR_{var.key.upper()}"] = var.val

def add_connection(self, conn: Connection):
"""Add an Airflow connection to the Observatory environment.

:param conn: the Airflow connection.
:return: None.
"""

# Add to the database for scheduler/legacy logic
existing_conn = self.session.query(Connection).filter(Connection.conn_id == conn.conn_id).first()
if existing_conn:
self.session.delete(existing_conn)

self.session.add(conn)
self.session.commit()

env_key = f"AIRFLOW_CONN_{conn.conn_id.upper()}"
os.environ[env_key] = conn.get_uri()

def run_task(self, task_id: str, map_index: int = -1) -> TaskInstance:
"""Run an Airflow task.

:param task_id: the Airflow task identifier.
:param map_index: the map index if the task is a daynamic task
:return: None.
"""

assert self.dag_run is not None, "with create_dag_run must be called before run_task"

dag = self.dag_run.dag
run_id = self.dag_run.run_id
task = dag.get_task(task_id=task_id)
ti = TaskInstance(task, run_id=run_id, map_index=map_index)
task = self.current_dag.get_task(task_id=task_id)
latest_version = DagVersion.get_latest_version(self.dag_run.dag_id)
# latest_version = self._ensure_dag_version(self.current_dag)
# ti = TaskInstance(task=task, dag_version_id=uuid.uuid4(), run_id=run_id, map_index=map_index)
ti = TaskInstance(task=task, dag_version_id=latest_version, run_id=run_id, map_index=map_index)
ti.refresh_from_db()

# TODO: remove this when this issue fixed / PR merged: https://github.com/apache/airflow/issues/34023#issuecomment-1705761692
Expand All @@ -327,7 +350,7 @@ def get_task_instance(self, task_id: str) -> TaskInstance:
assert self.dag_run is not None, "with create_dag_run must be called before get_task_instance"

run_id = self.dag_run.run_id
task = self.dag_run.dag.get_task(task_id=task_id)
task = self.current_dag.get_task(task_id=task_id)
ti = TaskInstance(task, run_id=run_id)
ti.refresh_from_db()
return ti
Expand All @@ -338,7 +361,7 @@ def create_dag_run(
dag: DAG,
logical_date: pendulum.DateTime = None,
data_interval: DataInterval = None,
run_type: DagRunType = DagRunType.SCHEDULED,
run_type: str = DagRunType.SCHEDULED,
):
"""Create a DagRun that can be used when running tasks.
During cleanup the DAG run state is updated.
Expand All @@ -354,19 +377,29 @@ def create_dag_run(
if not logical_date:
logical_date = data_interval.start
elif logical_date:
data_interval = dag.infer_automated_data_interval(logical_date=logical_date)
data_interval = dag.timetable.infer_manual_data_interval(run_after=logical_date)
start_date = data_interval.start
else:
raise ValueError("Must provide one of `data_inerval` or `logical_date`")

# dag_version = self._ensure_dag_version(dag)
try:
self.dag_run = dag.create_dagrun(
self.dag_run = DagRun(
dag_id=dag.dag_id,
run_id=dag.timetable.generate_run_id(
run_type=DagRunType(run_type), run_after=logical_date, data_interval=data_interval
),
state=State.RUNNING,
execution_date=logical_date,
logical_date=logical_date,
start_date=start_date,
run_type=run_type,
run_type=DagRunType(run_type),
data_interval=data_interval,
)
DagVersion.write_dag(dag.dag_id, "sandbox", session=self.session)
self.session.add(self.dag_run)
self.session.commit()
self.current_dag = dag

yield self.dag_run
finally:
self.dag_run.update_state()
Expand Down
4 changes: 2 additions & 2 deletions observatory_platform/sandbox/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@
import boto3
import httpretty
import pendulum
from airflow import DAG
from airflow.sdk import DAG
from airflow.exceptions import AirflowException
from airflow.models import DagBag
from airflow.operators.empty import EmptyOperator
from airflow.providers.standard.operators.empty import EmptyOperator
from deepdiff import DeepDiff
from google.cloud import bigquery, storage
from google.cloud.bigquery import SourceFormat
Expand Down
4 changes: 2 additions & 2 deletions observatory_platform/sandbox/tests/fixtures/bad_dag.py
Git LFS file not shown
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,10 @@
from datetime import timedelta

import pendulum
from airflow.decorators import dag, task, task_group
from airflow.sdk import dag, task, task_group, Variable
from airflow.exceptions import AirflowSkipException
from airflow.models.connection import Connection
from airflow.sdk import Connection
from airflow.models.dag import ScheduleArg
from airflow.models.variable import Variable
from airflow.timetables.base import DataInterval
from airflow.utils.state import TaskInstanceState
from google.cloud.exceptions import NotFound
Expand Down
35 changes: 18 additions & 17 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ build-backend = "setuptools.build_meta"
name = "observatory-platform"
dynamic = ["version"]
description = "The Observatory Platform is an environment for fetching, processing and analysing data to understand how well universities operate as Open Knowledge Institutions."
requires-python = ">=3.10"
requires-python = ">=3.12,<3.13"
license = { text = "Apache-2.0" }
keywords = ["science", "data", "workflows", "academic institutes", "academic-observatory-workflows"]
authors = [{ name = "Curtin University", email = "agent@observatory.academy" }]
Expand All @@ -19,7 +19,7 @@ classifiers = [
"License :: OSI Approved :: Apache Software License",
"Operating System :: OS Independent",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.12",
"Topic :: Scientific/Engineering",
"Topic :: Software Development :: Libraries",
"Topic :: Software Development :: Libraries :: Python Modules",
Expand All @@ -28,23 +28,24 @@ classifiers = [
dependencies = [
"setuptools==80.9.0",
# Airflow
"apache-airflow[slack]==2.10.5",
"apache-airflow-providers-cncf-kubernetes==7.4.0",
"apache-airflow-providers-common-compat==1.9.0",
"apache-airflow-providers-common-io==1.6.5",
"apache-airflow-providers-common-sql==1.29.0",
"apache-airflow-providers-fab==1.5.2",
"apache-airflow-providers-ftp==3.13.3",
"apache-airflow-providers-http==5.3.2",
"apache-airflow-providers-imap==3.9.4",
"apache-airflow-providers-slack==9.1.0",
"apache-airflow-providers-smtp==2.3.2",
"apache-airflow-providers-sqlite==4.1.3",
"apache-airflow==3.1.5",
"apache-airflow-providers-slack",
"apache-airflow-providers-cncf-kubernetes",
"apache-airflow-providers-common-compat",
"apache-airflow-providers-common-io",
"apache-airflow-providers-common-sql",
"apache-airflow-providers-fab",
"apache-airflow-providers-ftp",
"apache-airflow-providers-http",
"apache-airflow-providers-imap",
"apache-airflow-providers-smtp",
"apache-airflow-providers-sqlite",
"apache-airflow-providers-standard",
# Google Cloud
"google-crc32c>=1.1.0,<2",
"google-cloud-bigquery>=3.2,<4",
"google-api-python-client>=2,<3",
"google-cloud-storage>=2.7.0,<3",
"google-cloud-storage>=3.0.0,<4",
#"google-auth-oauthlib>=0.4.5,<1",
"google-cloud-compute >=1.16.0,<2.0",
# File manipulation, reading, writing
Expand All @@ -58,9 +59,9 @@ dependencies = [
# SFTP
"paramiko>=3,<4",
# Utils
"natsort>=7.1.1,<8",
"validators",
"natsort>=8,<9",
"backoff>=2,<3",
"validators<=0.20.0",
"xmltodict",
"tenacity>=8.0.0",
"python-dateutil",
Expand Down
Loading