Skip to content

Commit bee0cb5

Browse files
authored
fix: Support OPA for Airflow when user role is not 'Admin' (#1512)
* fix: Support OPA for Airflow when user role is not 'Admin' * refactor: structure mocks in tests differently * refactor: cleanups
1 parent 959101d commit bee0cb5

3 files changed

Lines changed: 300 additions & 4 deletions

File tree

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ All notable changes to this project will be documented in this file.
2424
- vector: Look for SBOM in correct location ([#1471]).
2525
- vector: Use correct license ([#1476]).
2626
- trino: Build a patched Airlift from source and depend on it to backport [airlift/airlift#1943](https://github.com/airlift/airlift/pull/1943), applying the configured max response header size to Jetty's `maxResponseHeaderSize` ([#1510]).
27+
- airflow: Route DAG listings and menu items through OPA in the Airflow 3 OPA auth manager, and wire the OPA cache on the FastAPI api-server init path ([#1512]).
2728

2829
### Removed
2930

@@ -42,6 +43,7 @@ All notable changes to this project will be documented in this file.
4243
[#1493]: https://github.com/stackabletech/docker-images/pull/1493
4344
[#1509]: https://github.com/stackabletech/docker-images/pull/1509
4445
[#1510]: https://github.com/stackabletech/docker-images/pull/1510
46+
[#1512]: https://github.com/stackabletech/docker-images/pull/1512
4547

4648
## [26.3.0] - 2026-03-16
4749

airflow/opa-auth-manager/airflow-3/opa_auth_manager/opa_fab_auth_manager.py

Lines changed: 99 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,18 @@
1919
PoolDetails,
2020
VariableDetails,
2121
)
22+
from airflow.api_fastapi.common.types import MenuItem
2223
from airflow.configuration import conf
24+
from airflow.models.dag import DagModel
2325
from airflow.providers.fab.auth_manager.fab_auth_manager import FabAuthManager
2426
from airflow.providers.fab.auth_manager.models import User
2527
from airflow.stats import Stats
2628
from airflow.utils.log.logging_mixin import LoggingMixin
29+
from airflow.utils.session import NEW_SESSION, provide_session
2730
from cachetools import TTLCache, cachedmethod
2831
from overrides import override
32+
from sqlalchemy import select
33+
from sqlalchemy.orm import Session
2934

3035
METRIC_NAME_OPA_CACHE_LIMIT_REACHED = "opa_cache_limit_reached"
3136

@@ -75,12 +80,38 @@ class OpaFabAuthManager(FabAuthManager, LoggingMixin):
7580
AUTH_OPA_REQUEST_TIMEOUT_DEFAULT = 10
7681

7782
@override
78-
def init_flask_resources(self) -> None:
83+
def init(self) -> None:
7984
"""
8085
Run operations when Airflow is initializing.
86+
87+
Called by the FastAPI api-server during startup.
88+
"""
89+
90+
super().init()
91+
self._init_opa_resources()
92+
93+
@override
94+
def init_flask_resources(self) -> None:
95+
"""
96+
Run operations when the Flask app (FAB UI) is initializing.
8197
"""
8298

8399
super().init_flask_resources()
100+
self._init_opa_resources()
101+
102+
def _init_opa_resources(self) -> None:
103+
"""
104+
Set up the OPA cache and HTTP session.
105+
106+
Called from both ``init`` (FastAPI api-server) and
107+
``init_flask_resources`` (Flask AppBuilder). In Airflow 3 both run
108+
during a single api-server startup but on *different*
109+
OpaFabAuthManager instances — ``init_appbuilder`` calls
110+
``create_auth_manager()`` again, which constructs a fresh instance.
111+
The api-server instance is reachable via
112+
``request.app.state.auth_manager``; the FAB instance is returned by
113+
``get_auth_manager()``. Both need their own cache and session.
114+
"""
84115

85116
Stats.incr(METRIC_NAME_OPA_CACHE_LIMIT_REACHED, count=0)
86117

@@ -121,9 +152,6 @@ def _is_authorized_in_opa(self, endpoint: str, input: OpaInput) -> bool:
121152

122153
self.log.debug("Forward authorization request to OPA")
123154

124-
opa_url = conf.get(
125-
"core", "AUTH_OPA_REQUEST_URL", fallback=self.AUTH_OPA_REQUEST_URL_DEFAULT
126-
)
127155
opa_url = conf.get(
128156
"core", "AUTH_OPA_REQUEST_URL", fallback=self.AUTH_OPA_REQUEST_URL_DEFAULT
129157
)
@@ -551,3 +579,70 @@ def is_authorized_custom_view(
551579
}
552580
),
553581
)
582+
583+
@provide_session
584+
@override
585+
def get_authorized_dag_ids(
586+
self,
587+
*,
588+
user: User,
589+
method: ResourceMethod = "GET",
590+
session: Session = NEW_SESSION,
591+
) -> set[str]:
592+
# FabAuthManager's implementation consults the user's FAB DB role
593+
# permissions and bypasses is_authorized_dag entirely, which makes any
594+
# user without a FAB role (e.g. the default Public role) see an empty
595+
# DAG list even when OPA would allow them. List all DAG ids and filter
596+
# them via is_authorized_dag → OPA directly, without going through
597+
# filter_authorized_dag_ids — the Fab base class may add a DB-backed
598+
# override of that method in the future.
599+
dag_ids = {dag.dag_id for dag in session.execute(select(DagModel.dag_id))}
600+
return {
601+
dag_id
602+
for dag_id in dag_ids
603+
if self.is_authorized_dag(
604+
method=method, details=DagDetails(id=dag_id), user=user
605+
)
606+
}
607+
608+
@override
609+
def filter_authorized_menu_items(
610+
self, menu_items: list[MenuItem], user: User
611+
) -> list[MenuItem]:
612+
# FabAuthManager filters menu items via the FAB role permissions in the
613+
# DB, which yields an empty menu for users without FAB perms even when
614+
# OPA grants access. Route each menu item through the matching
615+
# OPA-backed is_authorized_* call instead.
616+
return [
617+
item for item in menu_items if self._is_menu_item_authorized(item, user)
618+
]
619+
620+
def _is_menu_item_authorized(self, menu_item: MenuItem, user: User) -> bool:
621+
if menu_item == MenuItem.ASSETS:
622+
return self.is_authorized_asset(method="GET", user=user)
623+
if menu_item == MenuItem.AUDIT_LOG:
624+
return self.is_authorized_dag(
625+
method="GET", access_entity=DagAccessEntity.AUDIT_LOG, user=user
626+
)
627+
if menu_item == MenuItem.CONFIG:
628+
return self.is_authorized_configuration(method="GET", user=user)
629+
if menu_item == MenuItem.CONNECTIONS:
630+
return self.is_authorized_connection(method="GET", user=user)
631+
if menu_item == MenuItem.DAGS:
632+
return self.is_authorized_dag(method="GET", user=user)
633+
if menu_item == MenuItem.DOCS:
634+
return self.is_authorized_view(access_view=AccessView.DOCS, user=user)
635+
if menu_item == MenuItem.PLUGINS:
636+
return self.is_authorized_view(access_view=AccessView.PLUGINS, user=user)
637+
if menu_item == MenuItem.POOLS:
638+
return self.is_authorized_pool(method="GET", user=user)
639+
if menu_item == MenuItem.PROVIDERS:
640+
return self.is_authorized_view(access_view=AccessView.PROVIDERS, user=user)
641+
if menu_item == MenuItem.VARIABLES:
642+
return self.is_authorized_variable(method="GET", user=user)
643+
if menu_item == MenuItem.XCOMS:
644+
return self.is_authorized_dag(
645+
method="GET", access_entity=DagAccessEntity.XCOM, user=user
646+
)
647+
self.log.warning("Unknown menu item %s — denying", menu_item)
648+
return False

airflow/opa-auth-manager/airflow-3/tests/test_opa_fab_auth_manager.py

Lines changed: 199 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
# Then we could run these tests against the Airflow instance and use the Airflow API to
77
# actually test the effect of Rego policies on user authorization.
88
#
9+
from types import SimpleNamespace
910
from unittest import mock
1011
from unittest.mock import Mock
1112

@@ -14,6 +15,7 @@
1415
DagAccessEntity,
1516
DagDetails,
1617
)
18+
from airflow.api_fastapi.common.types import MenuItem
1719
from airflow.providers.fab.www.extensions.init_appbuilder import init_appbuilder
1820
from airflow.providers.fab.www.security.permissions import (
1921
ACTION_CAN_CREATE,
@@ -55,7 +57,52 @@ def auth_manager_with_appbuilder(flask_app):
5557
return auth_manager
5658

5759

60+
@pytest.fixture
61+
def mock_opa(monkeypatch):
62+
"""
63+
Replace the OPA HTTP boundary (``call_opa``) so tests exercise the real
64+
``is_authorized_*`` → ``_is_authorized_in_opa`` → ``OpaInput`` chain
65+
without making network calls.
66+
67+
Set ``mock_opa.decide = lambda endpoint, body: bool`` to drive per-request
68+
decisions. Default returns ``False`` (deny). Recorded ``(endpoint, body)``
69+
pairs are available as ``mock_opa.calls``.
70+
"""
71+
state = SimpleNamespace(
72+
decide=lambda endpoint, body: False,
73+
calls=[],
74+
)
75+
76+
def fake_call_opa(self, url, json, timeout):
77+
endpoint = url.rsplit("/", 1)[-1]
78+
state.calls.append((endpoint, json))
79+
response = Mock()
80+
response.json.return_value = {"result": state.decide(endpoint, json)}
81+
return response
82+
83+
monkeypatch.setattr(OpaFabAuthManager, "call_opa", fake_call_opa)
84+
return state
85+
86+
87+
def _make_user(name="jane.doe", user_id="1"):
88+
user = Mock()
89+
user.get_id.return_value = user_id
90+
user.get_name.return_value = name
91+
return user
92+
93+
5894
class TestOpaFabAuthManager:
95+
def test_init_wires_opa_cache_for_fastapi_apiserver(self):
96+
# The FastAPI api-server calls auth_manager.init() instead of
97+
# init_flask_resources(). Without wiring the cache from init() too,
98+
# any is_authorized_* call from a REST handler crashes with
99+
# AttributeError: 'OpaFabAuthManager' object has no attribute 'opa_cache'.
100+
auth_manager = OpaFabAuthManager()
101+
auth_manager.init()
102+
103+
assert auth_manager.opa_cache is not None
104+
assert auth_manager.opa_session is not None
105+
59106
@pytest.mark.parametrize(
60107
"method, dag_access_entity, dag_details, user_permissions, expected_opa_result, expected_result",
61108
[
@@ -228,3 +275,155 @@ def test_is_authorized_dag(
228275
user=user,
229276
)
230277
assert result == expected_result
278+
279+
def test_get_authorized_dag_ids_uses_opa_not_fab_db(
280+
self, auth_manager_with_appbuilder, mock_opa
281+
):
282+
# Repro for the OPA listing bug: a user with no FAB permissions
283+
# (e.g. the default Public role) must still see the DAGs that OPA
284+
# allows. The FabAuthManager base override would return set() here
285+
# because it reads roles from the metadata DB.
286+
user = _make_user()
287+
288+
session = Mock()
289+
session.execute.return_value = [
290+
Mock(dag_id="allowed_dag"),
291+
Mock(dag_id="denied_dag"),
292+
]
293+
294+
mock_opa.decide = lambda endpoint, body: (
295+
endpoint == "is_authorized_dag"
296+
and body["input"]["details"]["id"] == "allowed_dag"
297+
)
298+
299+
result = auth_manager_with_appbuilder.get_authorized_dag_ids(
300+
user=user, method="GET", session=session
301+
)
302+
303+
assert result == {"allowed_dag"}
304+
# Every DAG id was offered to OPA — confirms per-item delegation
305+
# rather than a global FAB role lookup.
306+
asked = {body["input"]["details"]["id"] for _, body in mock_opa.calls}
307+
assert asked == {"allowed_dag", "denied_dag"}
308+
309+
def test_get_authorized_dag_ids_provides_session_when_caller_omits_it(
310+
self, auth_manager_with_appbuilder, mock_opa
311+
):
312+
# Real callers (api_fastapi/core_api/security.py) don't pass `session`.
313+
# Our override must rely on @provide_session to inject one; previously
314+
# it forwarded the default NEW_SESSION (None) and crashed with
315+
# 'NoneType' has no attribute 'execute'.
316+
user = _make_user()
317+
318+
session = Mock()
319+
session.execute.return_value = [Mock(dag_id="allowed_dag")]
320+
mock_opa.decide = lambda endpoint, body: True
321+
322+
with mock.patch("airflow.utils.session.create_session") as mock_create_session:
323+
mock_create_session.return_value.__enter__.return_value = session
324+
result = auth_manager_with_appbuilder.get_authorized_dag_ids(
325+
user=user, method="GET"
326+
)
327+
328+
assert result == {"allowed_dag"}
329+
mock_create_session.assert_called_once()
330+
331+
@pytest.mark.parametrize(
332+
"menu_item, expected_endpoint, expected_input_subset",
333+
[
334+
(MenuItem.ASSETS, "is_authorized_asset", {"method": "GET"}),
335+
(
336+
MenuItem.AUDIT_LOG,
337+
"is_authorized_dag",
338+
{"method": "GET", "access_entity": "AUDIT_LOG"},
339+
),
340+
(MenuItem.CONFIG, "is_authorized_configuration", {"method": "GET"}),
341+
(MenuItem.CONNECTIONS, "is_authorized_connection", {"method": "GET"}),
342+
(
343+
MenuItem.DAGS,
344+
"is_authorized_dag",
345+
{"method": "GET", "access_entity": None},
346+
),
347+
(MenuItem.DOCS, "is_authorized_view", {"access_view": "DOCS"}),
348+
(MenuItem.PLUGINS, "is_authorized_view", {"access_view": "PLUGINS"}),
349+
(MenuItem.POOLS, "is_authorized_pool", {"method": "GET"}),
350+
(MenuItem.PROVIDERS, "is_authorized_view", {"access_view": "PROVIDERS"}),
351+
(MenuItem.VARIABLES, "is_authorized_variable", {"method": "GET"}),
352+
(
353+
MenuItem.XCOMS,
354+
"is_authorized_dag",
355+
{"method": "GET", "access_entity": "XCOM"},
356+
),
357+
],
358+
)
359+
def test_filter_authorized_menu_items_routes_through_opa(
360+
self,
361+
menu_item,
362+
expected_endpoint,
363+
expected_input_subset,
364+
auth_manager_with_appbuilder,
365+
mock_opa,
366+
):
367+
# Each MenuItem must trigger a request to the matching OPA endpoint
368+
# with the expected input, so menu visibility is OPA-driven rather
369+
# than FAB-DB-driven, and the Rego wire contract is documented.
370+
user = _make_user()
371+
372+
mock_opa.decide = lambda endpoint, body: True
373+
allowed = auth_manager_with_appbuilder.filter_authorized_menu_items(
374+
[menu_item], user=user
375+
)
376+
assert allowed == [menu_item]
377+
378+
assert len(mock_opa.calls) == 1
379+
endpoint, body = mock_opa.calls[0]
380+
assert endpoint == expected_endpoint
381+
assert expected_input_subset.items() <= body["input"].items()
382+
383+
# Deny path: a fresh OPA decision actually filters the item out,
384+
# proving the dispatch consults OPA rather than always allowing.
385+
auth_manager_with_appbuilder.opa_cache.clear()
386+
mock_opa.decide = lambda endpoint, body: False
387+
denied = auth_manager_with_appbuilder.filter_authorized_menu_items(
388+
[menu_item], user=user
389+
)
390+
assert denied == []
391+
392+
def test_filter_authorized_menu_items_denies_unknown(
393+
self, auth_manager_with_appbuilder, mock_opa
394+
):
395+
# A MenuItem value not handled by _is_menu_item_authorized (e.g. one
396+
# introduced in a future Airflow version) must fail closed: denied
397+
# without consulting OPA, so a new UI surface isn't silently exposed
398+
# before the dispatch table is updated.
399+
unknown = Mock(spec=MenuItem)
400+
unknown.name = "FUTURE_THING"
401+
402+
result = auth_manager_with_appbuilder.filter_authorized_menu_items(
403+
[unknown], user=_make_user()
404+
)
405+
406+
assert result == []
407+
assert mock_opa.calls == []
408+
409+
def test_filter_authorized_menu_items_preserves_order_and_filters(
410+
self, auth_manager_with_appbuilder, mock_opa
411+
):
412+
user = _make_user()
413+
414+
def decide(endpoint, body):
415+
if endpoint == "is_authorized_dag":
416+
# Allow DAGs root menu, deny the XCOM access entity.
417+
return body["input"].get("access_entity") is None
418+
if endpoint == "is_authorized_connection":
419+
return True
420+
return False
421+
422+
mock_opa.decide = decide
423+
424+
result = auth_manager_with_appbuilder.filter_authorized_menu_items(
425+
[MenuItem.DAGS, MenuItem.DOCS, MenuItem.CONNECTIONS, MenuItem.XCOMS],
426+
user=user,
427+
)
428+
429+
assert result == [MenuItem.DAGS, MenuItem.CONNECTIONS]

0 commit comments

Comments
 (0)