Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion routers/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from fastapi import APIRouter, Depends, HTTPException, Path
from fastapi.params import Query
from pydantic import BaseModel, Field, ValidationError
from sqlalchemy import func, or_
from sqlmodel import Session, select

from auth.validator import get_current_user, user_is_admin
Expand Down Expand Up @@ -106,14 +107,16 @@ def get_filter_options():
response_model=list[BiocommonsUserResponse])
def get_users(db_session: Annotated[Session, Depends(get_db_session)],
pagination: Annotated[PaginationParams, Depends(get_pagination_params)],
filter_by: str = Query(None, description="Filter users by group ('tsi', 'bpa_galaxy') or platform ('galaxy', 'bpa_data_portal')")):
filter_by: str = Query(None, description="Filter users by group ('tsi', 'bpa_galaxy') or platform ('galaxy', 'bpa_data_portal')"),
search: str = Query(None, description="Search users by username or email")):
"""
Get all users from the database with pagination and optional filtering.

Args:
filter_by: Optional filter parameter. Can be:
- Group bundle names: 'tsi', 'bpa_galaxy'
- Platform names: 'galaxy', 'bpa_data_portal'
search: Optional search parameter for username or email
"""
base_query = select(BiocommonsUser)

Expand All @@ -138,6 +141,24 @@ def get_users(db_session: Annotated[Session, Depends(get_db_session)],
detail=f"Invalid filter_by value '{filter_by}'"
)

if search:
s = search.strip().lower()

if "@" in s:
base_query = base_query.where(
or_(
func.lower(BiocommonsUser.email) == s,
func.lower(BiocommonsUser.email).ilike(f"%{s}%")
)
)
else:
base_query = base_query.where(
or_(
func.lower(BiocommonsUser.username).ilike(f"%{s}%"),
func.lower(BiocommonsUser.email).ilike(f"%{s}%")
)
)

user_query = base_query.offset(pagination.start_index).limit(pagination.per_page)
users = db_session.exec(user_query).all()
return users
Expand Down
2 changes: 1 addition & 1 deletion schemas/biocommons.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ class Auth0UserData(BaseModel):

created_at: datetime
email: EmailStr
username: BiocommonsUsername
Comment thread
minh-biocommons marked this conversation as resolved.
username: Optional[BiocommonsUsername] = None
email_verified: bool
identities: List[Identity]
name: str
Expand Down
149 changes: 149 additions & 0 deletions tests/test_admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,155 @@ def test_get_users_invalid_filter(test_client, as_admin_user, test_db_session):
assert "Invalid filter_by value 'invalid_filter'" in resp.json()["detail"]


def test_get_users_search_by_email_exact(test_client, as_admin_user, test_db_session):
from tests.db.datagen import BiocommonsUserFactory

user1 = BiocommonsUserFactory.build(email="john.doe@example.com", username="johndoe")
user2 = BiocommonsUserFactory.build(email="jane.smith@example.com", username="janesmith")
user3 = BiocommonsUserFactory.build(email="bob.wilson@example.com", username="bobwilson")

for user in [user1, user2, user3]:
test_db_session.add(user)
test_db_session.commit()

resp = test_client.get("/admin/users?search=john.doe@example.com")
assert resp.status_code == 200
results = resp.json()
assert len(results) == 1
assert results[0]["email"] == "john.doe@example.com"


def test_get_users_search_by_email_partial(test_client, as_admin_user, test_db_session):
from tests.db.datagen import BiocommonsUserFactory

user1 = BiocommonsUserFactory.build(email="john.doe@example.com", username="johndoe")
user2 = BiocommonsUserFactory.build(email="jane.smith@example.com", username="janesmith")
user3 = BiocommonsUserFactory.build(email="bob.wilson@different.com", username="bobwilson")

for user in [user1, user2, user3]:
test_db_session.add(user)
test_db_session.commit()

resp = test_client.get("/admin/users?search=example.com")
assert resp.status_code == 200
results = resp.json()
assert len(results) == 2
emails = [user["email"] for user in results]
assert "john.doe@example.com" in emails
assert "jane.smith@example.com" in emails
assert "bob.wilson@different.com" not in emails


def test_get_users_search_by_username(test_client, as_admin_user, test_db_session):
from tests.db.datagen import BiocommonsUserFactory

user1 = BiocommonsUserFactory.build(email="john@example.com", username="johndoe")
user2 = BiocommonsUserFactory.build(email="jane@example.com", username="janesmith")
user3 = BiocommonsUserFactory.build(email="bob@example.com", username="bobwilson")

for user in [user1, user2, user3]:
test_db_session.add(user)
test_db_session.commit()

resp = test_client.get("/admin/users?search=john")
assert resp.status_code == 200
results = resp.json()
assert len(results) == 1
assert results[0]["username"] == "johndoe"


def test_get_users_search_by_username_partial(test_client, as_admin_user, test_db_session):
from tests.db.datagen import BiocommonsUserFactory

user1 = BiocommonsUserFactory.build(email="john@example.com", username="johnsmith")
user2 = BiocommonsUserFactory.build(email="jane@example.com", username="johndoe")
user3 = BiocommonsUserFactory.build(email="bob@example.com", username="bobwilson")

for user in [user1, user2, user3]:
test_db_session.add(user)
test_db_session.commit()

resp = test_client.get("/admin/users?search=john")
assert resp.status_code == 200
results = resp.json()
assert len(results) == 2
usernames = [user["username"] for user in results]
assert "johnsmith" in usernames
assert "johndoe" in usernames
assert "bobwilson" not in usernames


def test_get_users_search_case_insensitive(test_client, as_admin_user, test_db_session):
from tests.db.datagen import BiocommonsUserFactory

user1 = BiocommonsUserFactory.build(email="John.Doe@Example.Com", username="JohnDoe")

test_db_session.add(user1)
test_db_session.commit()

resp = test_client.get("/admin/users?search=JOHN")
assert resp.status_code == 200
results = resp.json()
assert len(results) == 1
assert results[0]["username"] == "JohnDoe"

resp = test_client.get("/admin/users?search=john.doe@example.com")
assert resp.status_code == 200
results = resp.json()
assert len(results) == 1
assert results[0]["email"] == "John.Doe@Example.Com"


def test_get_users_search_empty_string(test_client, as_admin_user, test_db_session):
from tests.db.datagen import BiocommonsUserFactory

users = BiocommonsUserFactory.batch(3)
for user in users:
test_db_session.add(user)
test_db_session.commit()

resp = test_client.get("/admin/users?search=")
assert resp.status_code == 200
results = resp.json()
assert len(results) == 3

resp = test_client.get("/admin/users?search= ")
assert resp.status_code == 200
results = resp.json()
assert len(results) == 3


def test_get_users_search_with_filter(test_client, as_admin_user, test_db_session):
from db.models import ApprovalStatusEnum, PlatformEnum, PlatformMembership
from tests.db.datagen import BiocommonsUserFactory

user1 = BiocommonsUserFactory.build(email="john@example.com", username="johndoe")
user2 = BiocommonsUserFactory.build(email="jane@example.com", username="janesmith")

for user in [user1, user2]:
test_db_session.add(user)
test_db_session.commit()

membership = PlatformMembership(
user_id=user1.id,
platform_id=PlatformEnum.GALAXY,
approval_status=ApprovalStatusEnum.APPROVED
)
test_db_session.add(membership)
test_db_session.commit()

resp = test_client.get("/admin/users?filter_by=galaxy&search=john")
assert resp.status_code == 200
results = resp.json()
assert len(results) == 1
assert results[0]["username"] == "johndoe"

resp = test_client.get("/admin/users?filter_by=galaxy&search=jane")
assert resp.status_code == 200
results = resp.json()
assert len(results) == 0


def test_get_filter_options(test_client, as_admin_user):
resp = test_client.get("/admin/filters")
assert resp.status_code == 200
Expand Down