From b9f93b15d65d81d782c0dda9791a7a292ca6a46f Mon Sep 17 00:00:00 2001 From: cmyui Date: Mon, 8 Jun 2026 18:26:04 -0700 Subject: [PATCH 01/10] Add osu API backoff for beatmap requests --- app/adapters/osu_api_backoff.py | 198 ++++++++++++++++++++++++++++ app/adapters/osu_api_v1.py | 43 +++++- app/adapters/osu_api_v2/api.py | 49 +++++++ app/oauth.py | 16 +++ app/usecases/akatsuki_beatmaps.py | 19 ++- app/usecases/cheesegull_beatmaps.py | 3 + app/usecases/osu_files.py | 17 ++- tests/test_osu_api_backoff.py | 118 +++++++++++++++++ 8 files changed, 456 insertions(+), 7 deletions(-) create mode 100644 app/adapters/osu_api_backoff.py create mode 100644 tests/test_osu_api_backoff.py diff --git a/app/adapters/osu_api_backoff.py b/app/adapters/osu_api_backoff.py new file mode 100644 index 0000000..7192c5c --- /dev/null +++ b/app/adapters/osu_api_backoff.py @@ -0,0 +1,198 @@ +import enum +import logging +import threading +import time +from datetime import datetime +from datetime import timezone +from email.utils import parsedate_to_datetime + +import httpx + +RATE_LIMIT_STATUS_CODES = {403, 429} +DEFAULT_FAILURE_COOLDOWN_SECONDS = 60 +DEFAULT_RATE_LIMIT_COOLDOWN_SECONDS = 60 * 60 + +logger = logging.getLogger(__name__) + + +class CircuitState(str, enum.Enum): + CLOSED = "closed" + OPEN = "open" + HALF_OPEN = "half_open" + + +class OsuApiBackoffError(RuntimeError): + pass + + +class OsuApiBackoff: + def __init__( + self, + *, + failure_threshold: int = 3, + failure_cooldown_seconds: float = DEFAULT_FAILURE_COOLDOWN_SECONDS, + ) -> None: + self._failure_threshold = failure_threshold + self._failure_cooldown_seconds = failure_cooldown_seconds + + self._state = CircuitState.CLOSED + self._consecutive_failures = 0 + self._opened_at = 0.0 + self._cooldown_seconds = failure_cooldown_seconds + self._probe_in_flight = False + self._lock = threading.Lock() + + @property + def state(self) -> CircuitState: + with self._lock: + return self._evaluate_state() + + def raise_if_unavailable(self, *, upstream: str) -> None: + with self._lock: + state = self._evaluate_state() + if state == CircuitState.CLOSED: + return + + if state == CircuitState.HALF_OPEN and not self._probe_in_flight: + self._probe_in_flight = True + logger.info( + "Allowing osu! API canary request", + extra={"upstream": upstream, "circuit_state": state}, + ) + return + + seconds_remaining = 0 + if self._state == CircuitState.OPEN: + seconds_remaining = max( + 0, + int(self._cooldown_seconds - self._elapsed_since_opened()), + ) + raise OsuApiBackoffError( + f"{upstream} is backing off for {seconds_remaining}s", + ) + + def record_success(self, *, upstream: str, endpoint: str) -> None: + with self._lock: + previous_state = self._state + self._state = CircuitState.CLOSED + self._consecutive_failures = 0 + self._opened_at = 0.0 + self._cooldown_seconds = self._failure_cooldown_seconds + self._probe_in_flight = False + + if previous_state == CircuitState.HALF_OPEN: + logger.info( + "Closed osu! API circuit after successful canary request", + extra={ + "upstream": upstream, + "endpoint": endpoint, + "previous_state": previous_state, + "circuit_state": CircuitState.CLOSED, + }, + ) + + def record_failure( + self, + *, + upstream: str, + endpoint: str, + cooldown_seconds: float | None = None, + force_open: bool = False, + ) -> None: + cooldown_seconds = cooldown_seconds or self._failure_cooldown_seconds + + with self._lock: + self._consecutive_failures += 1 + should_open = ( + force_open + or self._state == CircuitState.HALF_OPEN + or self._consecutive_failures >= self._failure_threshold + ) + if not should_open: + return + + previous_state = self._state + self._state = CircuitState.OPEN + self._opened_at = time.monotonic() + self._cooldown_seconds = cooldown_seconds + self._probe_in_flight = False + + logger.warning( + "Opened osu! API circuit", + extra={ + "upstream": upstream, + "endpoint": endpoint, + "previous_state": previous_state, + "circuit_state": CircuitState.OPEN, + "consecutive_failures": self._consecutive_failures, + "cooldown_seconds": cooldown_seconds, + }, + ) + + def apply_if_rate_limited( + self, + response: httpx.Response, + *, + upstream: str, + endpoint: str, + ) -> None: + if response.status_code not in RATE_LIMIT_STATUS_CODES: + return + + cooldown_seconds = _get_rate_limit_cooldown_seconds(response) + self.record_failure( + upstream=upstream, + endpoint=endpoint, + cooldown_seconds=cooldown_seconds, + force_open=True, + ) + raise OsuApiBackoffError( + f"{upstream} returned {response.status_code}; backing off", + ) + + def _evaluate_state(self) -> CircuitState: + if self._state != CircuitState.OPEN: + return self._state + + if self._elapsed_since_opened() < self._cooldown_seconds: + return self._state + + self._state = CircuitState.HALF_OPEN + self._probe_in_flight = False + logger.info( + "Transitioned osu! API circuit to half-open", + extra={ + "circuit_state": CircuitState.HALF_OPEN, + "cooldown_seconds": self._cooldown_seconds, + }, + ) + return self._state + + def _elapsed_since_opened(self) -> float: + return time.monotonic() - self._opened_at + + +def _get_rate_limit_cooldown_seconds(response: httpx.Response) -> int: + retry_after = response.headers.get("Retry-After") + if retry_after is None: + return DEFAULT_RATE_LIMIT_COOLDOWN_SECONDS + + try: + return max(1, int(retry_after)) + except ValueError: + retry_at = _parse_retry_after_datetime(retry_after) + if retry_at is None: + return DEFAULT_RATE_LIMIT_COOLDOWN_SECONDS + + return max(1, int((retry_at - datetime.now(timezone.utc)).total_seconds())) + + +def _parse_retry_after_datetime(retry_after: str) -> datetime | None: + try: + retry_at = parsedate_to_datetime(retry_after) + except (TypeError, ValueError): + return None + + if retry_at.tzinfo is None: + retry_at = retry_at.replace(tzinfo=timezone.utc) + return retry_at diff --git a/app/adapters/osu_api_v1.py b/app/adapters/osu_api_v1.py index 8edd3c7..8a599cc 100644 --- a/app/adapters/osu_api_v1.py +++ b/app/adapters/osu_api_v1.py @@ -7,12 +7,15 @@ from pydantic import BaseModel from app import settings +from app.adapters.osu_api_backoff import OsuApiBackoff +from app.adapters.osu_api_backoff import OsuApiBackoffError from app.common_models import GameMode osu_api_v1_http_client = httpx.AsyncClient( base_url="https://old.ppy.sh/", timeout=httpx.Timeout(15), ) +osu_api_v1_backoff = OsuApiBackoff() class Beatmap(BaseModel): @@ -64,7 +67,10 @@ async def fetch_one_beatmap( ) -> Beatmap | None: assert [beatmap_id, beatmap_md5].count(None) == 1 + osu_api_v1_backoff.raise_if_unavailable(upstream="osu! API v1") + osu_api_response_data: list[dict[str, Any]] | None = None + endpoint = "get_beatmaps" try: osu_api_v1_key = random.choice(settings.OSU_API_V1_API_KEYS_POOL) response = await osu_api_v1_http_client.get( @@ -82,17 +88,32 @@ async def fetch_one_beatmap( "authorized": True, }, ) + osu_api_v1_backoff.apply_if_rate_limited( + response, + upstream="osu! API v1", + endpoint=endpoint, + ) if response.status_code in (404, 451): + osu_api_v1_backoff.record_success( + upstream="osu! API v1", + endpoint=endpoint, + ) return None - if response.status_code == 403: - raise ValueError("osu api is down") from None response.raise_for_status() osu_api_response_data = response.json() if osu_api_response_data == []: + osu_api_v1_backoff.record_success( + upstream="osu! API v1", + endpoint=endpoint, + ) return None assert osu_api_response_data is not None + osu_api_v1_backoff.record_success(upstream="osu! API v1", endpoint=endpoint) return Beatmap(**osu_api_response_data[0]) + except OsuApiBackoffError: + raise except Exception: + osu_api_v1_backoff.record_failure(upstream="osu! API v1", endpoint=endpoint) logging.exception( "Failed to fetch beatmap from osu! API v1", extra={ @@ -104,6 +125,9 @@ async def fetch_one_beatmap( async def fetch_beatmap_osu_file_data(beatmap_id: int) -> bytes | None: + osu_api_v1_backoff.raise_if_unavailable(upstream="osu! API v1") + + endpoint = "osu_file" try: response = await osu_api_v1_http_client.get(f"osu/{beatmap_id}") logging.debug( @@ -113,13 +137,24 @@ async def fetch_beatmap_osu_file_data(beatmap_id: int) -> bytes | None: "authorized": False, }, ) + osu_api_v1_backoff.apply_if_rate_limited( + response, + upstream="osu! API v1", + endpoint=endpoint, + ) if response.status_code in (404, 451): + osu_api_v1_backoff.record_success( + upstream="osu! API v1", + endpoint=endpoint, + ) return None - if response.status_code == 403: - raise ValueError("osu api is down") from None response.raise_for_status() + osu_api_v1_backoff.record_success(upstream="osu! API v1", endpoint=endpoint) return response.read() + except OsuApiBackoffError: + raise except Exception: + osu_api_v1_backoff.record_failure(upstream="osu! API v1", endpoint=endpoint) logging.exception( "Failed to fetch beatmap osu file from osu! API v1", extra={"beatmap_id": beatmap_id}, diff --git a/app/adapters/osu_api_v2/api.py b/app/adapters/osu_api_v2/api.py index 06a26b6..6a1d653 100644 --- a/app/adapters/osu_api_v2/api.py +++ b/app/adapters/osu_api_v2/api.py @@ -5,6 +5,8 @@ from app import oauth from app import settings +from app.adapters.osu_api_backoff import OsuApiBackoff +from app.adapters.osu_api_backoff import OsuApiBackoffError from app.adapters.osu_api_v2.models import BeatmapExtended from app.adapters.osu_api_v2.models import BeatmapsetExtended from app.adapters.osu_api_v2.models import BeatmapsetSearchResponse @@ -19,6 +21,8 @@ OSU_API_V2_TOKEN_ENDPOINT = "https://osu.ppy.sh/oauth/token" +osu_api_v2_backoff = OsuApiBackoff() + osu_api_v2_http_client = httpx.AsyncClient( base_url="https://osu.ppy.sh/api/v2/", auth=oauth.AsyncOAuth( @@ -29,22 +33,39 @@ ), ], token_endpoint=OSU_API_V2_TOKEN_ENDPOINT, + backoff=osu_api_v2_backoff, ), timeout=httpx.Timeout(15), ) async def get_beatmap(beatmap_id: int) -> BeatmapExtended | None: + osu_api_v2_backoff.raise_if_unavailable(upstream="osu! API v2") + osu_api_response_data: dict[str, Any] | None = None + endpoint = "beatmaps" try: response = await osu_api_v2_http_client.get(f"beatmaps/{beatmap_id}") + osu_api_v2_backoff.apply_if_rate_limited( + response, + upstream="osu! API v2", + endpoint=endpoint, + ) if response.status_code in (404, 451): + osu_api_v2_backoff.record_success( + upstream="osu! API v2", + endpoint=endpoint, + ) return None response.raise_for_status() osu_api_response_data = response.json() assert osu_api_response_data is not None + osu_api_v2_backoff.record_success(upstream="osu! API v2", endpoint=endpoint) return BeatmapExtended(**osu_api_response_data) + except OsuApiBackoffError: + raise except Exception: + osu_api_v2_backoff.record_failure(upstream="osu! API v2", endpoint=endpoint) logging.exception( "Failed to fetch beatmap from osu! API v2", extra={ @@ -56,16 +77,32 @@ async def get_beatmap(beatmap_id: int) -> BeatmapExtended | None: async def get_beatmapset(beatmapset_id: int) -> BeatmapsetExtended | None: + osu_api_v2_backoff.raise_if_unavailable(upstream="osu! API v2") + osu_api_response_data: dict[str, Any] | None = None + endpoint = "beatmapsets" try: response = await osu_api_v2_http_client.get(f"beatmapsets/{beatmapset_id}") + osu_api_v2_backoff.apply_if_rate_limited( + response, + upstream="osu! API v2", + endpoint=endpoint, + ) if response.status_code in (404, 451): + osu_api_v2_backoff.record_success( + upstream="osu! API v2", + endpoint=endpoint, + ) return None response.raise_for_status() osu_api_response_data = response.json() assert osu_api_response_data is not None + osu_api_v2_backoff.record_success(upstream="osu! API v2", endpoint=endpoint) return BeatmapsetExtended(**osu_api_response_data) + except OsuApiBackoffError: + raise except Exception: + osu_api_v2_backoff.record_failure(upstream="osu! API v2", endpoint=endpoint) logging.exception( "Failed to fetch beatmapset from osu! API v2", extra={ @@ -93,7 +130,10 @@ async def search_beatmapsets( if [page, cursor_string].count(None) != 1: raise ValueError("Exactly one of page or cursor_string must be provided") + osu_api_v2_backoff.raise_if_unavailable(upstream="osu! API v2") + osu_api_response_data: dict[str, Any] | None = None + endpoint = "beatmapsets/search" try: response = await osu_api_v2_http_client.get( "beatmapsets/search", @@ -112,11 +152,20 @@ async def search_beatmapsets( **({"cursor_string": cursor_string} if cursor_string else {}), }, ) + osu_api_v2_backoff.apply_if_rate_limited( + response, + upstream="osu! API v2", + endpoint=endpoint, + ) response.raise_for_status() osu_api_response_data = response.json() assert osu_api_response_data is not None + osu_api_v2_backoff.record_success(upstream="osu! API v2", endpoint=endpoint) return BeatmapsetSearchResponse(**osu_api_response_data) + except OsuApiBackoffError: + raise except Exception: + osu_api_v2_backoff.record_failure(upstream="osu! API v2", endpoint=endpoint) logging.exception( "Failed to fetch beatmapsets from osu! API v2", extra={ diff --git a/app/oauth.py b/app/oauth.py index abd9340..228a2ce 100644 --- a/app/oauth.py +++ b/app/oauth.py @@ -7,6 +7,8 @@ import httpx from pydantic import BaseModel +from app.adapters.osu_api_backoff import OsuApiBackoff + class OAuthClientCredentials(BaseModel): client_id: str @@ -20,11 +22,13 @@ def __init__( self, client_credential_sets: list[OAuthClientCredentials], token_endpoint: str, + backoff: OsuApiBackoff | None = None, *args: Any, **kwargs: Any, ) -> None: self.client_credential_sets = client_credential_sets self.token_endpoint = token_endpoint + self.backoff = backoff super().__init__(*args, **kwargs) @@ -63,6 +67,12 @@ async def async_auth_flow( if client_credentials.access_token is None: refresh_response = yield self.build_refresh_request(client_credentials) await refresh_response.aread() + if self.backoff is not None: + self.backoff.apply_if_rate_limited( + refresh_response, + upstream="osu! API v2", + endpoint="oauth/token", + ) refresh_response_data = refresh_response.json() if "access_token" not in refresh_response_data: logging.warning( @@ -82,6 +92,12 @@ async def async_auth_flow( while response.status_code == 401: refresh_response = yield self.build_refresh_request(client_credentials) await refresh_response.aread() + if self.backoff is not None: + self.backoff.apply_if_rate_limited( + refresh_response, + upstream="osu! API v2", + endpoint="oauth/token", + ) refresh_response_data = refresh_response.json() if "access_token" not in refresh_response_data: logging.warning( diff --git a/app/usecases/akatsuki_beatmaps.py b/app/usecases/akatsuki_beatmaps.py index af79e58..c901362 100644 --- a/app/usecases/akatsuki_beatmaps.py +++ b/app/usecases/akatsuki_beatmaps.py @@ -4,6 +4,7 @@ from app.adapters import aws_s3 from app.adapters import discord_webhooks from app.adapters import osu_api_v1 +from app.adapters.osu_api_backoff import OsuApiBackoffError from app.common_models import RankedStatus from app.repositories import akatsuki_beatmaps from app.repositories.akatsuki_beatmaps import AkatsukiBeatmap @@ -86,6 +87,8 @@ async def _update_from_osu_api(old_beatmap: AkatsukiBeatmap) -> AkatsukiBeatmap await akatsuki_beatmaps.delete_by_md5(old_beatmap.beatmap_md5) await aws_s3.delete_object(f"/beatmaps/{old_beatmap.beatmap_id}.osu") return None + except OsuApiBackoffError: + return old_beatmap except Exception: # TODO: fallback to beatmap mirror raise @@ -150,7 +153,13 @@ async def _update_from_osu_api(old_beatmap: AkatsukiBeatmap) -> AkatsukiBeatmap async def fetch_one_by_id(beatmap_id: int) -> AkatsukiBeatmap | None: beatmap = await akatsuki_beatmaps.fetch_one_by_id(beatmap_id) if beatmap is None: - osu_api_v1_beatmap = await osu_api_v1.fetch_one_beatmap(beatmap_id=beatmap_id) + try: + osu_api_v1_beatmap = await osu_api_v1.fetch_one_beatmap( + beatmap_id=beatmap_id, + ) + except OsuApiBackoffError: + return None + if osu_api_v1_beatmap is None: return None @@ -180,7 +189,13 @@ async def fetch_one_by_id(beatmap_id: int) -> AkatsukiBeatmap | None: async def fetch_one_by_md5(beatmap_md5: str) -> AkatsukiBeatmap | None: beatmap = await akatsuki_beatmaps.fetch_one_by_md5(beatmap_md5) if beatmap is None: - osu_api_v1_beatmap = await osu_api_v1.fetch_one_beatmap(beatmap_md5=beatmap_md5) + try: + osu_api_v1_beatmap = await osu_api_v1.fetch_one_beatmap( + beatmap_md5=beatmap_md5, + ) + except OsuApiBackoffError: + return None + if osu_api_v1_beatmap is None: return None diff --git a/app/usecases/cheesegull_beatmaps.py b/app/usecases/cheesegull_beatmaps.py index 279750b..dbed7ce 100644 --- a/app/usecases/cheesegull_beatmaps.py +++ b/app/usecases/cheesegull_beatmaps.py @@ -2,6 +2,7 @@ from datetime import datetime from app.adapters import osu_mirrors +from app.adapters.osu_api_backoff import OsuApiBackoffError from app.adapters.osu_api_v2 import api as osu_api_v2 from app.adapters.osu_api_v2.models import BeatmapExtended from app.adapters.osu_api_v2.models import BeatmapsetExtended @@ -224,6 +225,8 @@ async def cheesegull_search( }, ) return cheesegull_beatmapsets + except OsuApiBackoffError: + return None except Exception: logging.exception( "Failed to fetch cheesegull search", diff --git a/app/usecases/osu_files.py b/app/usecases/osu_files.py index 7d6207d..1a8d22a 100644 --- a/app/usecases/osu_files.py +++ b/app/usecases/osu_files.py @@ -3,6 +3,7 @@ from app.adapters import aws_s3 from app.adapters import osu_api_v1 +from app.adapters.osu_api_backoff import OsuApiBackoffError from app.repositories import akatsuki_beatmaps @@ -36,7 +37,21 @@ async def fetch_beatmap_osu_file_data(beatmap_id: int) -> bytes | None: extra={"beatmap_id": beatmap_id}, ) - beatmap_osu_file_data = await osu_api_v1.fetch_beatmap_osu_file_data(beatmap_id) + try: + fresh_beatmap_osu_file_data = await osu_api_v1.fetch_beatmap_osu_file_data( + beatmap_id, + ) + except OsuApiBackoffError: + if beatmap_osu_file_data is not None: + logging.warning( + "Serving stale beatmap s3 osu file cache during osu! API backoff", + extra={"beatmap_id": beatmap_id}, + ) + return beatmap_osu_file_data + + return None + + beatmap_osu_file_data = fresh_beatmap_osu_file_data if beatmap_osu_file_data is None: return None diff --git a/tests/test_osu_api_backoff.py b/tests/test_osu_api_backoff.py new file mode 100644 index 0000000..76efa87 --- /dev/null +++ b/tests/test_osu_api_backoff.py @@ -0,0 +1,118 @@ +import time +import unittest +from unittest.mock import patch + +import httpx + +from app.adapters import osu_api_backoff +from app.adapters.osu_api_backoff import CircuitState +from app.adapters.osu_api_backoff import OsuApiBackoff +from app.adapters.osu_api_backoff import OsuApiBackoffError + + +class OsuApiBackoffTestCase(unittest.TestCase): + def setUp(self) -> None: + self._logger_disabled = osu_api_backoff.logger.disabled + osu_api_backoff.logger.disabled = True + + def tearDown(self) -> None: + osu_api_backoff.logger.disabled = self._logger_disabled + + def test_rate_limit_response_opens_circuit(self) -> None: + backoff = OsuApiBackoff() + response = httpx.Response(403) + + with self.assertRaises(OsuApiBackoffError): + backoff.apply_if_rate_limited( + response, + upstream="osu! API v1", + endpoint="get_beatmaps", + ) + + self.assertEqual(backoff.state, CircuitState.OPEN) + with self.assertRaises(OsuApiBackoffError): + backoff.raise_if_unavailable(upstream="osu! API v1") + + def test_skipped_calls_do_not_extend_cooldown(self) -> None: + backoff = OsuApiBackoff() + started_at = time.monotonic() + + with patch("app.adapters.osu_api_backoff.time") as mock_time: + mock_time.monotonic.return_value = started_at + with self.assertRaises(OsuApiBackoffError): + backoff.apply_if_rate_limited( + httpx.Response(429), + upstream="osu! API v2", + endpoint="beatmapsets/search", + ) + + mock_time.monotonic.return_value = started_at + 10 + for _ in range(5): + with self.assertRaises(OsuApiBackoffError): + backoff.raise_if_unavailable(upstream="osu! API v2") + + mock_time.monotonic.return_value = started_at + 3601 + self.assertEqual(backoff.state, CircuitState.HALF_OPEN) + + def test_only_one_canary_is_allowed_in_half_open(self) -> None: + backoff = OsuApiBackoff() + started_at = time.monotonic() + + with patch("app.adapters.osu_api_backoff.time") as mock_time: + mock_time.monotonic.return_value = started_at + with self.assertRaises(OsuApiBackoffError): + backoff.apply_if_rate_limited( + httpx.Response(403), + upstream="osu! API v1", + endpoint="get_beatmaps", + ) + + mock_time.monotonic.return_value = started_at + 3601 + backoff.raise_if_unavailable(upstream="osu! API v1") + with self.assertRaises(OsuApiBackoffError): + backoff.raise_if_unavailable(upstream="osu! API v1") + + def test_successful_canary_closes_circuit(self) -> None: + backoff = OsuApiBackoff() + started_at = time.monotonic() + + with patch("app.adapters.osu_api_backoff.time") as mock_time: + mock_time.monotonic.return_value = started_at + with self.assertRaises(OsuApiBackoffError): + backoff.apply_if_rate_limited( + httpx.Response(403), + upstream="osu! API v1", + endpoint="get_beatmaps", + ) + + mock_time.monotonic.return_value = started_at + 3601 + backoff.raise_if_unavailable(upstream="osu! API v1") + backoff.record_success(upstream="osu! API v1", endpoint="get_beatmaps") + + self.assertEqual(backoff.state, CircuitState.CLOSED) + backoff.raise_if_unavailable(upstream="osu! API v1") + + def test_failed_canary_reopens_circuit(self) -> None: + backoff = OsuApiBackoff() + started_at = time.monotonic() + + with patch("app.adapters.osu_api_backoff.time") as mock_time: + mock_time.monotonic.return_value = started_at + with self.assertRaises(OsuApiBackoffError): + backoff.apply_if_rate_limited( + httpx.Response(403), + upstream="osu! API v1", + endpoint="get_beatmaps", + ) + + mock_time.monotonic.return_value = started_at + 3601 + backoff.raise_if_unavailable(upstream="osu! API v1") + backoff.record_failure(upstream="osu! API v1", endpoint="get_beatmaps") + + self.assertEqual(backoff.state, CircuitState.OPEN) + with self.assertRaises(OsuApiBackoffError): + backoff.raise_if_unavailable(upstream="osu! API v1") + + +if __name__ == "__main__": + unittest.main() From 1aa95223554b05c0b15ae23afc857fa963a272d2 Mon Sep 17 00:00:00 2001 From: cmyui Date: Mon, 8 Jun 2026 18:31:30 -0700 Subject: [PATCH 02/10] Tune osu API rate-limit probe interval --- app/adapters/osu_api_backoff.py | 2 +- tests/test_osu_api_backoff.py | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/app/adapters/osu_api_backoff.py b/app/adapters/osu_api_backoff.py index 7192c5c..75851b5 100644 --- a/app/adapters/osu_api_backoff.py +++ b/app/adapters/osu_api_backoff.py @@ -10,7 +10,7 @@ RATE_LIMIT_STATUS_CODES = {403, 429} DEFAULT_FAILURE_COOLDOWN_SECONDS = 60 -DEFAULT_RATE_LIMIT_COOLDOWN_SECONDS = 60 * 60 +DEFAULT_RATE_LIMIT_COOLDOWN_SECONDS = 60 logger = logging.getLogger(__name__) diff --git a/tests/test_osu_api_backoff.py b/tests/test_osu_api_backoff.py index 76efa87..f6ec546 100644 --- a/tests/test_osu_api_backoff.py +++ b/tests/test_osu_api_backoff.py @@ -51,7 +51,7 @@ def test_skipped_calls_do_not_extend_cooldown(self) -> None: with self.assertRaises(OsuApiBackoffError): backoff.raise_if_unavailable(upstream="osu! API v2") - mock_time.monotonic.return_value = started_at + 3601 + mock_time.monotonic.return_value = started_at + 61 self.assertEqual(backoff.state, CircuitState.HALF_OPEN) def test_only_one_canary_is_allowed_in_half_open(self) -> None: @@ -67,7 +67,7 @@ def test_only_one_canary_is_allowed_in_half_open(self) -> None: endpoint="get_beatmaps", ) - mock_time.monotonic.return_value = started_at + 3601 + mock_time.monotonic.return_value = started_at + 61 backoff.raise_if_unavailable(upstream="osu! API v1") with self.assertRaises(OsuApiBackoffError): backoff.raise_if_unavailable(upstream="osu! API v1") @@ -85,7 +85,7 @@ def test_successful_canary_closes_circuit(self) -> None: endpoint="get_beatmaps", ) - mock_time.monotonic.return_value = started_at + 3601 + mock_time.monotonic.return_value = started_at + 61 backoff.raise_if_unavailable(upstream="osu! API v1") backoff.record_success(upstream="osu! API v1", endpoint="get_beatmaps") @@ -105,7 +105,7 @@ def test_failed_canary_reopens_circuit(self) -> None: endpoint="get_beatmaps", ) - mock_time.monotonic.return_value = started_at + 3601 + mock_time.monotonic.return_value = started_at + 61 backoff.raise_if_unavailable(upstream="osu! API v1") backoff.record_failure(upstream="osu! API v1", endpoint="get_beatmaps") From 5dc5ce2fc0324ad1e12c98550d6fcf246ee0c846 Mon Sep 17 00:00:00 2001 From: cmyui Date: Mon, 8 Jun 2026 19:05:43 -0700 Subject: [PATCH 03/10] Cap osu API request rate --- app/adapters/osu_api_backoff.py | 106 +++++++++++++++++++++++++++----- app/adapters/osu_api_v1.py | 3 +- app/adapters/osu_api_v2/api.py | 3 +- tests/test_osu_api_backoff.py | 46 ++++++++++++++ 4 files changed, 141 insertions(+), 17 deletions(-) diff --git a/app/adapters/osu_api_backoff.py b/app/adapters/osu_api_backoff.py index 75851b5..5911e4b 100644 --- a/app/adapters/osu_api_backoff.py +++ b/app/adapters/osu_api_backoff.py @@ -9,8 +9,10 @@ import httpx RATE_LIMIT_STATUS_CODES = {403, 429} -DEFAULT_FAILURE_COOLDOWN_SECONDS = 60 +DEFAULT_FAILURE_COOLDOWN_SECONDS = 10 DEFAULT_RATE_LIMIT_COOLDOWN_SECONDS = 60 +DEFAULT_OSU_API_REQUESTS_PER_MINUTE = 45 +DEFAULT_OSU_API_BURST_SIZE = 5 logger = logging.getLogger(__name__) @@ -25,15 +27,54 @@ class OsuApiBackoffError(RuntimeError): pass +class OsuApiRateLimiter: + def __init__( + self, + *, + requests_per_minute: float = DEFAULT_OSU_API_REQUESTS_PER_MINUTE, + burst_size: float = DEFAULT_OSU_API_BURST_SIZE, + ) -> None: + if requests_per_minute <= 0: + raise ValueError("requests_per_minute must be positive") + if burst_size <= 0: + raise ValueError("burst_size must be positive") + + self._tokens_per_second = requests_per_minute / 60 + self._bucket_size = burst_size + self._tokens = burst_size + self._last_refill_at = time.monotonic() + self._lock = threading.Lock() + + def acquire(self) -> float: + with self._lock: + self._refill() + if self._tokens >= 1: + self._tokens -= 1 + return 0 + + return (1 - self._tokens) / self._tokens_per_second + + def _refill(self) -> None: + now = time.monotonic() + elapsed = now - self._last_refill_at + self._tokens = min( + self._bucket_size, + self._tokens + elapsed * self._tokens_per_second, + ) + self._last_refill_at = now + + class OsuApiBackoff: def __init__( self, *, failure_threshold: int = 3, failure_cooldown_seconds: float = DEFAULT_FAILURE_COOLDOWN_SECONDS, + rate_limiter: OsuApiRateLimiter | None = None, ) -> None: self._failure_threshold = failure_threshold self._failure_cooldown_seconds = failure_cooldown_seconds + self._rate_limiter = rate_limiter self._state = CircuitState.CLOSED self._consecutive_failures = 0 @@ -51,24 +92,36 @@ def raise_if_unavailable(self, *, upstream: str) -> None: with self._lock: state = self._evaluate_state() if state == CircuitState.CLOSED: - return + allow_canary = False - if state == CircuitState.HALF_OPEN and not self._probe_in_flight: + elif state == CircuitState.HALF_OPEN and not self._probe_in_flight: self._probe_in_flight = True - logger.info( - "Allowing osu! API canary request", - extra={"upstream": upstream, "circuit_state": state}, + allow_canary = True + + else: + seconds_remaining = 0 + if self._state == CircuitState.OPEN: + seconds_remaining = max( + 0, + int(self._cooldown_seconds - self._elapsed_since_opened()), + ) + raise OsuApiBackoffError( + f"{upstream} is backing off for {seconds_remaining}s", ) - return - seconds_remaining = 0 - if self._state == CircuitState.OPEN: - seconds_remaining = max( - 0, - int(self._cooldown_seconds - self._elapsed_since_opened()), - ) - raise OsuApiBackoffError( - f"{upstream} is backing off for {seconds_remaining}s", + try: + self._raise_if_rate_limited(upstream=upstream) + except OsuApiBackoffError: + if allow_canary: + with self._lock: + if self._state == CircuitState.HALF_OPEN: + self._probe_in_flight = False + raise + + if allow_canary: + logger.info( + "Allowing osu! API canary request", + extra={"upstream": upstream, "circuit_state": CircuitState.HALF_OPEN}, ) def record_success(self, *, upstream: str, endpoint: str) -> None: @@ -171,6 +224,26 @@ def _evaluate_state(self) -> CircuitState: def _elapsed_since_opened(self) -> float: return time.monotonic() - self._opened_at + def _raise_if_rate_limited(self, *, upstream: str) -> None: + if self._rate_limiter is None: + return + + seconds_until_available = self._rate_limiter.acquire() + if seconds_until_available == 0: + return + + logger.info( + "Skipped osu! API request because local rate limit is exhausted", + extra={ + "upstream": upstream, + "seconds_until_available": seconds_until_available, + }, + ) + raise OsuApiBackoffError( + f"{upstream} local rate limit exhausted; retry in " + f"{seconds_until_available:.1f}s", + ) + def _get_rate_limit_cooldown_seconds(response: httpx.Response) -> int: retry_after = response.headers.get("Retry-After") @@ -196,3 +269,6 @@ def _parse_retry_after_datetime(retry_after: str) -> datetime | None: if retry_at.tzinfo is None: retry_at = retry_at.replace(tzinfo=timezone.utc) return retry_at + + +osu_api_rate_limiter = OsuApiRateLimiter() diff --git a/app/adapters/osu_api_v1.py b/app/adapters/osu_api_v1.py index 8a599cc..02e8c38 100644 --- a/app/adapters/osu_api_v1.py +++ b/app/adapters/osu_api_v1.py @@ -9,13 +9,14 @@ from app import settings from app.adapters.osu_api_backoff import OsuApiBackoff from app.adapters.osu_api_backoff import OsuApiBackoffError +from app.adapters.osu_api_backoff import osu_api_rate_limiter from app.common_models import GameMode osu_api_v1_http_client = httpx.AsyncClient( base_url="https://old.ppy.sh/", timeout=httpx.Timeout(15), ) -osu_api_v1_backoff = OsuApiBackoff() +osu_api_v1_backoff = OsuApiBackoff(rate_limiter=osu_api_rate_limiter) class Beatmap(BaseModel): diff --git a/app/adapters/osu_api_v2/api.py b/app/adapters/osu_api_v2/api.py index 6a1d653..8de08a9 100644 --- a/app/adapters/osu_api_v2/api.py +++ b/app/adapters/osu_api_v2/api.py @@ -7,6 +7,7 @@ from app import settings from app.adapters.osu_api_backoff import OsuApiBackoff from app.adapters.osu_api_backoff import OsuApiBackoffError +from app.adapters.osu_api_backoff import osu_api_rate_limiter from app.adapters.osu_api_v2.models import BeatmapExtended from app.adapters.osu_api_v2.models import BeatmapsetExtended from app.adapters.osu_api_v2.models import BeatmapsetSearchResponse @@ -21,7 +22,7 @@ OSU_API_V2_TOKEN_ENDPOINT = "https://osu.ppy.sh/oauth/token" -osu_api_v2_backoff = OsuApiBackoff() +osu_api_v2_backoff = OsuApiBackoff(rate_limiter=osu_api_rate_limiter) osu_api_v2_http_client = httpx.AsyncClient( base_url="https://osu.ppy.sh/api/v2/", diff --git a/tests/test_osu_api_backoff.py b/tests/test_osu_api_backoff.py index f6ec546..9f08bc0 100644 --- a/tests/test_osu_api_backoff.py +++ b/tests/test_osu_api_backoff.py @@ -8,6 +8,7 @@ from app.adapters.osu_api_backoff import CircuitState from app.adapters.osu_api_backoff import OsuApiBackoff from app.adapters.osu_api_backoff import OsuApiBackoffError +from app.adapters.osu_api_backoff import OsuApiRateLimiter class OsuApiBackoffTestCase(unittest.TestCase): @@ -33,6 +34,51 @@ def test_rate_limit_response_opens_circuit(self) -> None: with self.assertRaises(OsuApiBackoffError): backoff.raise_if_unavailable(upstream="osu! API v1") + def test_local_rate_limiter_caps_outbound_requests(self) -> None: + limiter = OsuApiRateLimiter(requests_per_minute=60, burst_size=1) + backoff = OsuApiBackoff(rate_limiter=limiter) + started_at = time.monotonic() + + with patch("app.adapters.osu_api_backoff.time") as mock_time: + mock_time.monotonic.return_value = started_at + backoff.raise_if_unavailable(upstream="osu! API v1") + + with self.assertRaises(OsuApiBackoffError) as exc: + backoff.raise_if_unavailable(upstream="osu! API v1") + + self.assertIn("local rate limit exhausted", str(exc.exception)) + + mock_time.monotonic.return_value = started_at + 1 + backoff.raise_if_unavailable(upstream="osu! API v1") + + def test_shared_rate_limiter_is_used_across_backoffs(self) -> None: + limiter = OsuApiRateLimiter(requests_per_minute=60, burst_size=1) + v1_backoff = OsuApiBackoff(rate_limiter=limiter) + v2_backoff = OsuApiBackoff(rate_limiter=limiter) + started_at = time.monotonic() + + with patch("app.adapters.osu_api_backoff.time") as mock_time: + mock_time.monotonic.return_value = started_at + v1_backoff.raise_if_unavailable(upstream="osu! API v1") + + with self.assertRaises(OsuApiBackoffError): + v2_backoff.raise_if_unavailable(upstream="osu! API v2") + + def test_generic_failures_use_short_cooldown(self) -> None: + backoff = OsuApiBackoff(failure_threshold=1) + started_at = time.monotonic() + + with patch("app.adapters.osu_api_backoff.time") as mock_time: + mock_time.monotonic.return_value = started_at + backoff.record_failure(upstream="osu! API v1", endpoint="get_beatmaps") + self.assertEqual(backoff.state, CircuitState.OPEN) + + mock_time.monotonic.return_value = started_at + 9 + self.assertEqual(backoff.state, CircuitState.OPEN) + + mock_time.monotonic.return_value = started_at + 10 + self.assertEqual(backoff.state, CircuitState.HALF_OPEN) + def test_skipped_calls_do_not_extend_cooldown(self) -> None: backoff = OsuApiBackoff() started_at = time.monotonic() From 7792395fa433a75909e765a6aff0c602aa3e3170 Mon Sep 17 00:00:00 2001 From: cmyui Date: Mon, 8 Jun 2026 20:36:12 -0700 Subject: [PATCH 04/10] Allow production deploy from backoff branch --- .github/workflows/production-deploy.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/production-deploy.yml b/.github/workflows/production-deploy.yml index bb033f5..c7c13fa 100644 --- a/.github/workflows/production-deploy.yml +++ b/.github/workflows/production-deploy.yml @@ -4,6 +4,7 @@ on: push: branches: - master + - fix/osu-api-backoff-beatmaps concurrency: group: ${{ github.workflow }}-${{ github.ref }} From 9eb8c4724c5aa07200ad64fde23107ff3e45580a Mon Sep 17 00:00:00 2001 From: cmyui Date: Mon, 8 Jun 2026 20:45:56 -0700 Subject: [PATCH 05/10] Route cheesegull mirror fallback by metadata support --- app/adapters/osu_mirrors/__init__.py | 4 +- .../osu_mirrors/backends/osu_direct.py | 7 ++- app/repositories/beatmap_mirror_requests.py | 2 + tests/test_osu_mirrors.py | 43 +++++++++++++++++++ 4 files changed, 53 insertions(+), 3 deletions(-) create mode 100644 tests/test_osu_mirrors.py diff --git a/app/adapters/osu_mirrors/__init__.py b/app/adapters/osu_mirrors/__init__.py index d7e9907..b003cc0 100644 --- a/app/adapters/osu_mirrors/__init__.py +++ b/app/adapters/osu_mirrors/__init__.py @@ -306,7 +306,7 @@ async def fetch_with_fallback( async def fetch_one_cheesegull_beatmap(beatmap_id: int) -> CheesegullBeatmap | None: """Fetch a cheesegull beatmap from the fastest available mirror.""" return await fetch_with_fallback( - resource=MirrorResource.OSZ_FILE, # Using OSZ_FILE for compatibility + resource=MirrorResource.CHEESEGULL_BEATMAP, resource_id=beatmap_id, fetch_func=lambda m: m.fetch_one_cheesegull_beatmap(beatmap_id), ) @@ -317,7 +317,7 @@ async def fetch_one_cheesegull_beatmapset( ) -> CheesegullBeatmapset | None: """Fetch a cheesegull beatmapset from the fastest available mirror.""" return await fetch_with_fallback( - resource=MirrorResource.OSZ_FILE, # Using OSZ_FILE for compatibility + resource=MirrorResource.CHEESEGULL_BEATMAPSET, resource_id=beatmapset_id, fetch_func=lambda m: m.fetch_one_cheesegull_beatmapset(beatmapset_id), ) diff --git a/app/adapters/osu_mirrors/backends/osu_direct.py b/app/adapters/osu_mirrors/backends/osu_direct.py index bee50d3..73a77ad 100644 --- a/app/adapters/osu_mirrors/backends/osu_direct.py +++ b/app/adapters/osu_mirrors/backends/osu_direct.py @@ -11,7 +11,12 @@ class OsuDirectMirror(AbstractBeatmapMirror): name = "osu_direct" base_url = "https://osu.direct" - supported_resources = {MirrorResource.OSZ_FILE, MirrorResource.BACKGROUND_IMAGE} + supported_resources = { + MirrorResource.OSZ_FILE, + MirrorResource.BACKGROUND_IMAGE, + MirrorResource.CHEESEGULL_BEATMAP, + MirrorResource.CHEESEGULL_BEATMAPSET, + } @override async def fetch_one_cheesegull_beatmap( diff --git a/app/repositories/beatmap_mirror_requests.py b/app/repositories/beatmap_mirror_requests.py index 89da1d9..8e1566e 100644 --- a/app/repositories/beatmap_mirror_requests.py +++ b/app/repositories/beatmap_mirror_requests.py @@ -14,6 +14,8 @@ class MirrorResource(StrEnum): OSZ_FILE = "osz_file" BACKGROUND_IMAGE = "background_image" + CHEESEGULL_BEATMAP = "cheesegull_beatmap" + CHEESEGULL_BEATMAPSET = "cheesegull_beatmapset" # TODO: beatmap audio file diff --git a/tests/test_osu_mirrors.py b/tests/test_osu_mirrors.py new file mode 100644 index 0000000..ce5208b --- /dev/null +++ b/tests/test_osu_mirrors.py @@ -0,0 +1,43 @@ +import os +import unittest + +os.environ.setdefault("APP_ENV", "test") +os.environ.setdefault("APP_HOST", "127.0.0.1") +os.environ.setdefault("APP_PORT", "8081") +os.environ.setdefault("CODE_HOTRELOAD", "false") +os.environ.setdefault("OSU_API_V2_CLIENT_ID", "test") +os.environ.setdefault("OSU_API_V2_CLIENT_SECRET", "test") +os.environ.setdefault("OSU_API_V1_API_KEYS_POOL", "test") +os.environ.setdefault("DB_USER", "test") +os.environ.setdefault("DB_PASS", "test") +os.environ.setdefault("DB_HOST", "127.0.0.1") +os.environ.setdefault("DB_PORT", "3306") +os.environ.setdefault("DB_NAME", "test") +os.environ.setdefault("AWS_S3_ENDPOINT_URL", "test") +os.environ.setdefault("AWS_S3_REGION_NAME", "test") +os.environ.setdefault("AWS_S3_BUCKET_NAME", "test") +os.environ.setdefault("AWS_S3_ACCESS_KEY_ID", "test") +os.environ.setdefault("AWS_S3_SECRET_ACCESS_KEY", "test") +os.environ.setdefault("DISCORD_BEATMAP_UPDATES_WEBHOOK_URL", "test") +os.environ.setdefault("MINO_INCREASED_RATELIMIT_KEY", "test") + +from app.adapters import osu_mirrors +from app.repositories.beatmap_mirror_requests import MirrorResource + + +class OsuMirrorsTestCase(unittest.TestCase): + def test_cheesegull_beatmap_metadata_uses_only_supporting_mirrors(self) -> None: + mirrors = osu_mirrors.get_available_mirrors(MirrorResource.CHEESEGULL_BEATMAP) + + self.assertEqual([mirror.name for mirror in mirrors], ["osu_direct"]) + + def test_cheesegull_beatmapset_metadata_uses_only_supporting_mirrors(self) -> None: + mirrors = osu_mirrors.get_available_mirrors( + MirrorResource.CHEESEGULL_BEATMAPSET, + ) + + self.assertEqual([mirror.name for mirror in mirrors], ["osu_direct"]) + + +if __name__ == "__main__": + unittest.main() From 8de8eca115207b30654a06ead8d99ec06b22ee24 Mon Sep 17 00:00:00 2001 From: cmyui Date: Mon, 8 Jun 2026 20:50:26 -0700 Subject: [PATCH 06/10] Fail fast on cheesegull osu API backoff --- app/usecases/cheesegull_beatmaps.py | 5 +- tests/test_cheesegull_beatmaps.py | 77 +++++++++++++++++++++++++++++ 2 files changed, 81 insertions(+), 1 deletion(-) create mode 100644 tests/test_cheesegull_beatmaps.py diff --git a/app/usecases/cheesegull_beatmaps.py b/app/usecases/cheesegull_beatmaps.py index dbed7ce..fea3292 100644 --- a/app/usecases/cheesegull_beatmaps.py +++ b/app/usecases/cheesegull_beatmaps.py @@ -7,7 +7,6 @@ from app.adapters.osu_api_v2.models import BeatmapExtended from app.adapters.osu_api_v2.models import BeatmapsetExtended from app.adapters.osu_api_v2.models import Category -from app.api.responses import JSONResponse from app.common_models import CheesegullBeatmap from app.common_models import CheesegullBeatmapset from app.common_models import CheesegullRankedStatus @@ -94,6 +93,8 @@ async def fetch_one_cheesegull_beatmap( osu_api_beatmap, ) ) + except OsuApiBackoffError: + return None except Exception: # Fallback to mirror cheesegull_beatmap = await osu_mirrors.fetch_one_cheesegull_beatmap( @@ -139,6 +140,8 @@ async def fetch_one_cheesegull_beatmapset( osu_api_beatmapset, ) ) + except OsuApiBackoffError: + return None except Exception: # Fallback to mirror cheesegull_beatmapset = await osu_mirrors.fetch_one_cheesegull_beatmapset( diff --git a/tests/test_cheesegull_beatmaps.py b/tests/test_cheesegull_beatmaps.py new file mode 100644 index 0000000..06a631c --- /dev/null +++ b/tests/test_cheesegull_beatmaps.py @@ -0,0 +1,77 @@ +import os +import unittest +from unittest.mock import AsyncMock +from unittest.mock import patch + +os.environ.setdefault("APP_ENV", "test") +os.environ.setdefault("APP_HOST", "127.0.0.1") +os.environ.setdefault("APP_PORT", "8081") +os.environ.setdefault("CODE_HOTRELOAD", "false") +os.environ.setdefault("OSU_API_V2_CLIENT_ID", "test") +os.environ.setdefault("OSU_API_V2_CLIENT_SECRET", "test") +os.environ.setdefault("OSU_API_V1_API_KEYS_POOL", "test") +os.environ.setdefault("DB_USER", "test") +os.environ.setdefault("DB_PASS", "test") +os.environ.setdefault("DB_HOST", "127.0.0.1") +os.environ.setdefault("DB_PORT", "3306") +os.environ.setdefault("DB_NAME", "test") +os.environ.setdefault("AWS_S3_ENDPOINT_URL", "test") +os.environ.setdefault("AWS_S3_REGION_NAME", "test") +os.environ.setdefault("AWS_S3_BUCKET_NAME", "test") +os.environ.setdefault("AWS_S3_ACCESS_KEY_ID", "test") +os.environ.setdefault("AWS_S3_SECRET_ACCESS_KEY", "test") +os.environ.setdefault("DISCORD_BEATMAP_UPDATES_WEBHOOK_URL", "test") +os.environ.setdefault("MINO_INCREASED_RATELIMIT_KEY", "test") + +from app.adapters.osu_api_backoff import OsuApiBackoffError +from app.usecases import cheesegull_beatmaps + + +class CheesegullBeatmapsTestCase(unittest.IsolatedAsyncioTestCase): + async def test_beatmap_backoff_does_not_fallback_to_mirrors(self) -> None: + with ( + patch.object( + cheesegull_beatmaps.osu_api_v2, + "get_beatmap", + AsyncMock(side_effect=OsuApiBackoffError), + ), + patch.object( + cheesegull_beatmaps.osu_mirrors, + "fetch_one_cheesegull_beatmap", + AsyncMock(), + ) as mirror_fetch, + ): + result = await cheesegull_beatmaps.fetch_one_cheesegull_beatmap( + 1, + client_ip_address=None, + client_user_agent=None, + ) + + self.assertIsNone(result) + mirror_fetch.assert_not_awaited() + + async def test_beatmapset_backoff_does_not_fallback_to_mirrors(self) -> None: + with ( + patch.object( + cheesegull_beatmaps.osu_api_v2, + "get_beatmapset", + AsyncMock(side_effect=OsuApiBackoffError), + ), + patch.object( + cheesegull_beatmaps.osu_mirrors, + "fetch_one_cheesegull_beatmapset", + AsyncMock(), + ) as mirror_fetch, + ): + result = await cheesegull_beatmaps.fetch_one_cheesegull_beatmapset( + 1, + client_ip_address=None, + client_user_agent=None, + ) + + self.assertIsNone(result) + mirror_fetch.assert_not_awaited() + + +if __name__ == "__main__": + unittest.main() From 03e165f39dc4ea0ab467245a65f09b4a1b9ed95f Mon Sep 17 00:00:00 2001 From: cmyui Date: Mon, 8 Jun 2026 21:48:01 -0700 Subject: [PATCH 07/10] Raise osu API local request budget --- app/adapters/osu_api_backoff.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/app/adapters/osu_api_backoff.py b/app/adapters/osu_api_backoff.py index 5911e4b..afeb50b 100644 --- a/app/adapters/osu_api_backoff.py +++ b/app/adapters/osu_api_backoff.py @@ -11,8 +11,8 @@ RATE_LIMIT_STATUS_CODES = {403, 429} DEFAULT_FAILURE_COOLDOWN_SECONDS = 10 DEFAULT_RATE_LIMIT_COOLDOWN_SECONDS = 60 -DEFAULT_OSU_API_REQUESTS_PER_MINUTE = 45 -DEFAULT_OSU_API_BURST_SIZE = 5 +DEFAULT_OSU_API_REQUESTS_PER_MINUTE = 600 +DEFAULT_OSU_API_BURST_SIZE = 60 logger = logging.getLogger(__name__) From 41762c6d5f1efaf82da740e907cf3ff748f958b2 Mon Sep 17 00:00:00 2001 From: cmyui Date: Mon, 8 Jun 2026 21:58:50 -0700 Subject: [PATCH 08/10] Stabilize osu API circuit under concurrency --- app/adapters/osu_api_backoff.py | 10 +++++++-- tests/test_osu_api_backoff.py | 40 +++++++++++++++++++++++++++++++++ 2 files changed, 48 insertions(+), 2 deletions(-) diff --git a/app/adapters/osu_api_backoff.py b/app/adapters/osu_api_backoff.py index afeb50b..11cf85a 100644 --- a/app/adapters/osu_api_backoff.py +++ b/app/adapters/osu_api_backoff.py @@ -12,7 +12,7 @@ DEFAULT_FAILURE_COOLDOWN_SECONDS = 10 DEFAULT_RATE_LIMIT_COOLDOWN_SECONDS = 60 DEFAULT_OSU_API_REQUESTS_PER_MINUTE = 600 -DEFAULT_OSU_API_BURST_SIZE = 60 +DEFAULT_OSU_API_BURST_SIZE = 10 logger = logging.getLogger(__name__) @@ -127,6 +127,9 @@ def raise_if_unavailable(self, *, upstream: str) -> None: def record_success(self, *, upstream: str, endpoint: str) -> None: with self._lock: previous_state = self._state + if previous_state == CircuitState.OPEN: + return + self._state = CircuitState.CLOSED self._consecutive_failures = 0 self._opened_at = 0.0 @@ -155,6 +158,10 @@ def record_failure( cooldown_seconds = cooldown_seconds or self._failure_cooldown_seconds with self._lock: + previous_state = self._state + if previous_state == CircuitState.OPEN: + return + self._consecutive_failures += 1 should_open = ( force_open @@ -164,7 +171,6 @@ def record_failure( if not should_open: return - previous_state = self._state self._state = CircuitState.OPEN self._opened_at = time.monotonic() self._cooldown_seconds = cooldown_seconds diff --git a/tests/test_osu_api_backoff.py b/tests/test_osu_api_backoff.py index 9f08bc0..a3a27c5 100644 --- a/tests/test_osu_api_backoff.py +++ b/tests/test_osu_api_backoff.py @@ -100,6 +100,46 @@ def test_skipped_calls_do_not_extend_cooldown(self) -> None: mock_time.monotonic.return_value = started_at + 61 self.assertEqual(backoff.state, CircuitState.HALF_OPEN) + def test_in_flight_success_does_not_close_open_circuit(self) -> None: + backoff = OsuApiBackoff() + + with self.assertRaises(OsuApiBackoffError): + backoff.apply_if_rate_limited( + httpx.Response(429), + upstream="osu! API v2", + endpoint="beatmaps", + ) + + backoff.record_success(upstream="osu! API v2", endpoint="beatmaps") + + self.assertEqual(backoff.state, CircuitState.OPEN) + with self.assertRaises(OsuApiBackoffError): + backoff.raise_if_unavailable(upstream="osu! API v2") + + def test_duplicate_in_flight_failures_do_not_extend_cooldown(self) -> None: + backoff = OsuApiBackoff() + started_at = time.monotonic() + + with patch("app.adapters.osu_api_backoff.time") as mock_time: + mock_time.monotonic.return_value = started_at + with self.assertRaises(OsuApiBackoffError): + backoff.apply_if_rate_limited( + httpx.Response(429), + upstream="osu! API v2", + endpoint="beatmaps", + ) + + mock_time.monotonic.return_value = started_at + 30 + with self.assertRaises(OsuApiBackoffError): + backoff.apply_if_rate_limited( + httpx.Response(429), + upstream="osu! API v2", + endpoint="beatmaps", + ) + + mock_time.monotonic.return_value = started_at + 61 + self.assertEqual(backoff.state, CircuitState.HALF_OPEN) + def test_only_one_canary_is_allowed_in_half_open(self) -> None: backoff = OsuApiBackoff() started_at = time.monotonic() From 5edb61a3500fa825734c763bb0392f67645cf57b Mon Sep 17 00:00:00 2001 From: cmyui Date: Mon, 8 Jun 2026 22:21:21 -0700 Subject: [PATCH 09/10] Validate osu file bodies before caching --- app/usecases/osu_files.py | 22 ++++++++++++++++++++++ tests/test_osu_files.py | 33 +++++++++++++++++++++++++++++++++ 2 files changed, 55 insertions(+) create mode 100644 tests/test_osu_files.py diff --git a/app/usecases/osu_files.py b/app/usecases/osu_files.py index 1a8d22a..6de4884 100644 --- a/app/usecases/osu_files.py +++ b/app/usecases/osu_files.py @@ -11,6 +11,18 @@ def hash_content(content: bytes) -> str: return hashlib.md5(content).hexdigest() +def _is_valid_osu_file_data(beatmap_osu_file_data: bytes) -> bool: + if not beatmap_osu_file_data: + return False + + text = beatmap_osu_file_data[:8192].decode("utf-8-sig", errors="replace") + lines = text.splitlines() + if not lines or not lines[0].startswith("osu file format v"): + return False + + return "[General]" in text and "[Metadata]" in text + + async def _osu_file_cache_expired( *, beatmap_id: int, @@ -55,6 +67,16 @@ async def fetch_beatmap_osu_file_data(beatmap_id: int) -> bytes | None: if beatmap_osu_file_data is None: return None + if not _is_valid_osu_file_data(beatmap_osu_file_data): + logging.warning( + "Skipping invalid beatmap osu file cache save", + extra={ + "beatmap_id": beatmap_id, + "data_size": len(beatmap_osu_file_data), + }, + ) + return None + await aws_s3.save_object_data( f"/beatmaps/{beatmap_id}.osu", beatmap_osu_file_data, diff --git a/tests/test_osu_files.py b/tests/test_osu_files.py new file mode 100644 index 0000000..0ce4054 --- /dev/null +++ b/tests/test_osu_files.py @@ -0,0 +1,33 @@ +import unittest + +from app.usecases import osu_files + + +class OsuFilesTestCase(unittest.TestCase): + def test_valid_osu_file_data(self) -> None: + data = b"""osu file format v14 + +[General] +AudioFilename: audio.mp3 + +[Metadata] +Title: Test +""" + + self.assertTrue(osu_files._is_valid_osu_file_data(data)) + + def test_html_error_page_is_invalid_osu_file_data(self) -> None: + data = b""" + +403 Forbidden + +""" + + self.assertFalse(osu_files._is_valid_osu_file_data(data)) + + def test_empty_osu_file_data_is_invalid(self) -> None: + self.assertFalse(osu_files._is_valid_osu_file_data(b"")) + + +if __name__ == "__main__": + unittest.main() From be7ca057b610276d62a9aab51d80ca3460a9a385 Mon Sep 17 00:00:00 2001 From: Josh Smith Date: Sat, 27 Jun 2026 16:23:28 -0400 Subject: [PATCH 10/10] Apply suggestion from @cmyui --- .github/workflows/production-deploy.yml | 1 - 1 file changed, 1 deletion(-) diff --git a/.github/workflows/production-deploy.yml b/.github/workflows/production-deploy.yml index c7c13fa..bb033f5 100644 --- a/.github/workflows/production-deploy.yml +++ b/.github/workflows/production-deploy.yml @@ -4,7 +4,6 @@ on: push: branches: - master - - fix/osu-api-backoff-beatmaps concurrency: group: ${{ github.workflow }}-${{ github.ref }}