Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
c62bd6d
Define admin roles in config
marius-mather May 23, 2025
8c9eaee
Update .env example with admin_roles
marius-mather May 23, 2025
cc2a151
Add a user_is_admin dependency for checking admin roles
marius-mather May 23, 2025
1a2d5cd
Fix roles claim name: we now prefix with HTTPS (as recommended by Auth0)
marius-mather May 23, 2025
7971073
Remove old admin check
marius-mather May 23, 2025
e88afdc
Update AccessTokenPayload so it's easier to specify roles
marius-mather May 23, 2025
1ef747b
Update User.is_admin() to check configured roles
marius-mather May 23, 2025
243d548
Add a fixture for acting as an admin
marius-mather May 23, 2025
7dc7578
Add admin_roles to mock settings
marius-mather May 23, 2025
8800f50
Rename client_with_settings_override: too clunky for something we use…
marius-mather May 23, 2025
5d4530b
Add data generation for Auth0 user API data
marius-mather May 23, 2025
ffdbbdb
Move user schema tests to test_user_schema
marius-mather May 23, 2025
f72c89a
Initial Auth0 client for making requests to the management API
marius-mather May 23, 2025
0c22725
Initial admin router with one route defined (and all routes protected…
marius-mather May 23, 2025
be557e1
Add admin router to app
marius-mather May 23, 2025
8e6de4d
Add tests for admin endpoints
marius-mather May 23, 2025
56a074d
Force .env file to be ignored in tests
marius-mather May 23, 2025
9b984d7
Don't load env variables in main, just grab the needed value from the…
marius-mather May 23, 2025
184bfa3
Rework how we force the env file to be ignored in tests
marius-mather May 23, 2025
bb08277
Update tests to use the test client
marius-mather May 23, 2025
99bbaca
Style fix
marius-mather May 25, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ AUTH0_AUDIENCE=https://audience.com/api
# JWT secret key: used to provide some protection around registration
# Generate with: python -c "import secrets; print(secrets.token_urlsafe(32))"
JWT_SECRET_KEY=secret-key
# Note the list syntax pydantic-settings uses
ADMIN_ROLES='["Admin", "GalaxyAdmin"]'
# Comma-separated list of allowed origins. Note we
# don't process this with pydantic-settings as it needs
# to be used before the FastAPI app loads
Expand Down
1 change: 1 addition & 0 deletions auth/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ class Settings(BaseSettings):
auth0_audience: str
jwt_secret_key: str
auth0_algorithms: list[str] = ["RS256"]
admin_roles: list[str] = []
# Note we process this separately in app startup as it needs
# to be available before the app starts
cors_allowed_origins: str
Expand Down
31 changes: 18 additions & 13 deletions auth/validator.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
from typing import Annotated

import httpx
from fastapi import Depends, HTTPException
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import jwk, jwt
from jose.exceptions import JWTError

from auth.config import Settings
from auth.config import Settings, get_settings
from schemas.tokens import AccessTokenPayload
from schemas.user import User

Expand Down Expand Up @@ -33,20 +35,12 @@ def verify_jwt(token: str, settings: Settings) -> AccessTokenPayload:
except JWTError as e:
raise HTTPException(status_code=401, detail=f"Invalid token: {e}")

roles_claim = "biocommons.org.au/roles"
roles_claim = "https://biocommons.org.au/roles"
if roles_claim not in payload:
raise HTTPException(
status_code=403, detail=f"Missing required claim: {roles_claim}"
)

roles = payload[roles_claim]
if not isinstance(roles, list) or not any(
"admin" in role.lower() for role in roles
):
raise HTTPException(
status_code=403, detail="Access denied: Insufficient permissions"
)

return AccessTokenPayload(**payload)


Expand All @@ -63,6 +57,17 @@ def get_rsa_key(token: str, settings: Settings) -> jwk.RSAKey | None: # type: i
return None


def get_current_user(token: str = Depends(oauth2_scheme)) -> User:
access_token = verify_jwt(token)
def get_current_user(token: str = Depends(oauth2_scheme),
settings: Settings = Depends(get_settings)) -> User:
access_token = verify_jwt(token, settings=settings)
return User(access_token=access_token)


def user_is_admin(current_user: Annotated[User, Depends(get_current_user)],
settings: Annotated[Settings, Depends(get_settings)]) -> User:
if not current_user.is_admin(settings=settings):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You must be an admin to access this endpoint."
)
return current_user
Empty file added auth0/__init__.py
Empty file.
17 changes: 17 additions & 0 deletions auth0/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
__all__ = ["Auth0Client"]

import httpx

from auth0.schemas import Auth0UserResponse


class Auth0Client:

def __init__(self, domain: str):
self.domain = domain

def get_users(self, access_token: str) -> list[Auth0UserResponse]:
url = f"https://{self.domain}/api/v2/users"
headers = {"Authorization": f"Bearer {access_token}"}
resp = httpx.get(url, headers=headers)
return resp.json()
34 changes: 34 additions & 0 deletions auth0/schemas.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from datetime import datetime
from typing import List, Optional

from pydantic import BaseModel, ConfigDict, EmailStr


class Auth0UserResponse(BaseModel):
"""
Response returned by Auth0's /users endpoint.
Note we have our own Auth0User model
that includes specifying the metadata fields we use.
"""
user_id: str
email: EmailStr
email_verified: bool
username: Optional[str] = None
phone_number: Optional[str] = None
phone_verified: Optional[bool] = None
created_at: datetime
updated_at: datetime
identities: List[dict]
app_metadata: Optional[dict] = None
user_metadata: Optional[dict] = None
picture: Optional[str] = None
name: Optional[str] = None
nickname: Optional[str] = None
last_ip: Optional[str] = None
last_login: Optional[datetime] = None
logins_count: Optional[int] = None
blocked: Optional[bool] = None
given_name: Optional[str] = None
family_name: Optional[str] = None

model_config = ConfigDict(extra="allow")
10 changes: 5 additions & 5 deletions main.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
import os

from dotenv import load_dotenv
from dotenv import dotenv_values
from fastapi import FastAPI
from starlette.middleware.cors import CORSMiddleware

from routers import bpa_register, galaxy_register, user
from routers import admin, bpa_register, galaxy_register, user

# Load .env to get CORS_ALLOWED_ORIGINS.
# Note that for most env variables, we use pydantic-settings
# and load them via auth.config. But we need the
# allowed_origins before we load the app
load_dotenv()
env_values = dotenv_values(".env")
ALLOWED_ORIGINS = [
origin.strip() for origin in os.getenv("CORS_ALLOWED_ORIGINS", "").split(",")
origin.strip() for origin in env_values.get("CORS_ALLOWED_ORIGINS", "").split(",")
]

app = FastAPI()
Expand All @@ -30,6 +29,7 @@ def public_route():
return {"message": "AAI Backend API"}


app.include_router(admin.router)
app.include_router(user.router)
app.include_router(bpa_register.router)
app.include_router(galaxy_register.router)
23 changes: 23 additions & 0 deletions routers/admin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from fastapi import APIRouter, Depends

from auth.config import Settings, get_settings
from auth.management import get_management_token
from auth.validator import user_is_admin
from auth0.client import Auth0Client
from auth0.schemas import Auth0UserResponse

router = APIRouter(prefix="/admin", tags=["admin"],
dependencies=[Depends(user_is_admin)])


def get_auth0_client(settings: Settings = Depends(get_settings)):
return Auth0Client(settings.auth0_domain)


@router.get("/users",
response_model=list[Auth0UserResponse])
def get_users(settings: Settings = Depends(get_settings),
client: Auth0Client = Depends(get_auth0_client)):
token = get_management_token(settings=settings)
resp = client.get_users(token)
return resp
7 changes: 5 additions & 2 deletions schemas/tokens.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from typing import Optional

from pydantic import BaseModel, Field
from pydantic import BaseModel, ConfigDict, Field


class AccessTokenPayload(BaseModel):
Expand All @@ -9,7 +9,7 @@ class AccessTokenPayload(BaseModel):
"""

biocommons_roles: list[str] = Field(
alias="biocommons.org.au/roles",
alias="https://biocommons.org.au/roles",
Comment thread
amandazhuyilan marked this conversation as resolved.
description="BioCommons-specific roles assigned to the user",
)
email: Optional[str] = Field(None, description="Email address")
Expand All @@ -20,3 +20,6 @@ class AccessTokenPayload(BaseModel):
iat: int = Field(description="Issued at time (as Unix timestamp)")
azp: Optional[str] = Field(None, description="Authorized party")
permissions: list[str] = Field(description="Permissions granted to the user")

# Set populate_by_name so we can specify biocommons_roles as an argument
model_config = ConfigDict(populate_by_name=True)
6 changes: 4 additions & 2 deletions schemas/user.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from pydantic import BaseModel

from auth.config import Settings

from .tokens import AccessTokenPayload


Expand All @@ -12,13 +14,13 @@ class User(BaseModel):

access_token: AccessTokenPayload

def is_admin(self) -> bool:
def is_admin(self, settings: Settings) -> bool:
"""
Checks if the user has an admin role.
"""
# TODO: Need to finalize exactly what roles make
# a user an admin
for role in self.access_token.biocommons_roles:
if "admin" in role.lower():
if role in settings.admin_roles:
return True
return False
35 changes: 33 additions & 2 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,20 @@
from fastapi.testclient import TestClient

from auth.config import Settings, get_settings
from auth.validator import get_current_user
from main import app
from tests.datagen import AccessTokenPayloadFactory, UserFactory


@pytest.fixture(autouse=True)
def ignore_env_file():
"""
Always ignore the .env file when running tests,
so we get the same behaviour when the .env file is present or not.
"""
def get_settings_no_env_file():
return Settings(_env_file=None)
app.dependency_overrides[get_settings] = get_settings_no_env_file


@pytest.fixture
Expand All @@ -15,12 +28,15 @@ def mock_settings():
auth0_audience="mock-audience",
jwt_secret_key="mock-secret-key",
cors_allowed_origins="https://test",
admin_roles=["Admin"],
auth0_algorithms=["HS256"]
)


@pytest.fixture
def client_with_settings_override(mock_settings):
def test_client(mock_settings):
"""
Override the get_settings dependency to return a mocked Settings object.
"""
# Define override
def override_settings():
return mock_settings
Expand All @@ -34,3 +50,18 @@ def override_settings():

# Reset override
app.dependency_overrides.clear()


@pytest.fixture
def as_admin_user():
"""
Override the get_current_user dependency to return a User object with admin role,
so admin check will pass.
"""
def override_user():
token = AccessTokenPayloadFactory.build(biocommons_roles=["Admin"])
return UserFactory.build(access_token=token)

app.dependency_overrides[get_current_user] = override_user
Comment thread
marius-mather marked this conversation as resolved.
yield
app.dependency_overrides.clear()
4 changes: 4 additions & 0 deletions tests/datagen.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from polyfactory.decorators import post_generated
from polyfactory.factories.pydantic_factory import ModelFactory

from auth0.schemas import Auth0UserResponse
from routers.bpa_register import BPARegistrationRequest
from schemas.galaxy import GalaxyRegistrationData
from schemas.service import Auth0User
Expand All @@ -11,6 +12,9 @@
class AccessTokenPayloadFactory(ModelFactory[AccessTokenPayload]): ...


class Auth0UserResponseFactory(ModelFactory[Auth0UserResponse]): ...


class UserFactory(ModelFactory[User]): ...


Expand Down
46 changes: 46 additions & 0 deletions tests/test_admin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import pytest
from fastapi import HTTPException

from auth.validator import get_current_user, user_is_admin
from main import app
from tests.datagen import (
AccessTokenPayloadFactory,
Auth0UserResponseFactory,
UserFactory,
)


def test_get_users_requires_admin_unauthorized(test_client, mocker):
def get_nonadmin_user():
payload = AccessTokenPayloadFactory.build(biocommons_roles=["User"])
return UserFactory.build(access_token=payload)

app.dependency_overrides[get_current_user] = get_nonadmin_user
mocker.patch("routers.admin.get_management_token", return_value="mock_token")
resp = test_client.get("/admin/users")
assert resp.status_code == 403
assert resp.json() == {"detail": "You must be an admin to access this endpoint."}
app.dependency_overrides.clear()


def test_user_is_admin(mock_settings):
payload = AccessTokenPayloadFactory.build(biocommons_roles=["Admin"])
admin_user = UserFactory.build(access_token=payload)
assert user_is_admin(current_user=admin_user, settings=mock_settings)


def test_user_is_admin_nonadmin_user(mock_settings):
payload = AccessTokenPayloadFactory.build(biocommons_roles=["User"])
user = UserFactory.build(access_token=payload)
with pytest.raises(HTTPException, match="You must be an admin to access this endpoint."):
user_is_admin(current_user=user, settings=mock_settings)


def test_get_users(mocker, test_client, as_admin_user):
mocker.patch("routers.admin.get_management_token", return_value="mock_token")
mock_client = mocker.patch("routers.admin.Auth0Client")
users = Auth0UserResponseFactory.batch(3)
mock_client().get_users.return_value = users
resp = test_client.get("/admin/users")
assert resp.status_code == 200
assert len(resp.json()) == 3
Loading