diff --git a/app/adapters/osu_mirrors/__init__.py b/app/adapters/osu_mirrors/__init__.py index d7e9907..6f712a1 100644 --- a/app/adapters/osu_mirrors/__init__.py +++ b/app/adapters/osu_mirrors/__init__.py @@ -75,15 +75,15 @@ def get_available_mirrors( - Circuit breaker state (not open) - Rate limiter (has capacity) - Returns mirrors sorted by latency EMA (fastest first). + Returns mirrors sorted by composite score (latency weighted by reliability). """ available = [ mirror for mirror in BEATMAP_MIRRORS if resource in mirror.supported_resources and mirror.health.is_available() ] - # Sort by latency (fastest first) - available.sort(key=lambda m: m.health.latency_ema) + # Sort by composite score (latency weighted by reliability, lower is better) + available.sort(key=lambda m: m.health.score()) return available @@ -242,9 +242,11 @@ async def fetch_with_fallback( ) return response.data - # Hedged request failed, try remaining mirrors sequentially + # Hedged request failed, try remaining mirrors sequentially. + # Use wait_for_availability() so we briefly wait for rate limiter + # tokens instead of skipping healthy-but-throttled mirrors. for mirror in available[HEDGE_COUNT:]: - if not mirror.health.is_available(): + if not await mirror.health.wait_for_availability(): continue started_at = time.time() diff --git a/app/adapters/osu_mirrors/backends/__init__.py b/app/adapters/osu_mirrors/backends/__init__.py index 16aaf30..7dd4416 100644 --- a/app/adapters/osu_mirrors/backends/__init__.py +++ b/app/adapters/osu_mirrors/backends/__init__.py @@ -1,4 +1,7 @@ +import asyncio +import logging from abc import ABC +from collections.abc import Callable from dataclasses import dataclass from typing import Any from typing import ClassVar @@ -14,6 +17,14 @@ from app.repositories.beatmap_mirror_requests import MirrorResource T = TypeVar("T", covariant=True) +_R = TypeVar("_R") + +RETRYABLE_STATUSES = frozenset({429, 500, 502, 503}) +NOT_FOUND_STATUSES = frozenset({404, 451}) + +MAX_RETRIES = 1 +RETRY_BACKOFF_SECONDS = 1.0 +MAX_RESPONSE_BYTES = 100 * 1024 * 1024 # 100 MB @dataclass @@ -47,6 +58,137 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: self.health = MirrorHealth(rate_limiter=rate_limiter) super().__init__(*args, **kwargs) + def _extra_headers(self) -> dict[str, str]: + """Override to add mirror-specific request headers.""" + return {} + + async def _fetch( + self, + url: str, + parse: Callable[[httpx.Response], _R], + *, + extra_headers: dict[str, str] | None = None, + max_response_bytes: int | None = None, + ) -> BeatmapMirrorResponse[_R | None]: + """Make an HTTP GET request with retry logic and error handling. + + Retries on transient HTTP errors (429, 500, 502, 503) up to + MAX_RETRIES times with exponential backoff. Checks upstream + rate limit headers and applies backpressure to the token bucket. + Optionally rejects responses exceeding max_response_bytes. + """ + merged_headers = {**self._extra_headers(), **(extra_headers or {})} + response: httpx.Response | None = None + + for attempt in range(MAX_RETRIES + 1): + try: + response = await self.http_client.get( + url, + headers=merged_headers or None, + ) + except Exception as exc: + if attempt < MAX_RETRIES: + await asyncio.sleep(RETRY_BACKOFF_SECONDS * (attempt + 1)) + continue + return BeatmapMirrorResponse( + data=None, + is_success=False, + request_url=None, + status_code=None, + error_message=str(exc), + ) + + self._check_rate_limit_headers(response) + + if response.status_code in NOT_FOUND_STATUSES: + return BeatmapMirrorResponse( + data=None, + is_success=True, + request_url=str(response.request.url), + status_code=response.status_code, + ) + + if response.status_code in RETRYABLE_STATUSES: + if attempt < MAX_RETRIES: + await asyncio.sleep(RETRY_BACKOFF_SECONDS * (attempt + 1)) + continue + return BeatmapMirrorResponse( + data=None, + is_success=False, + request_url=str(response.request.url), + status_code=response.status_code, + error_message=f"HTTP {response.status_code} after retries", + ) + + try: + response.raise_for_status() + except httpx.HTTPStatusError as exc: + return BeatmapMirrorResponse( + data=None, + is_success=False, + request_url=str(response.request.url), + status_code=response.status_code, + error_message=str(exc), + ) + + if max_response_bytes is not None: + content_length = response.headers.get("content-length") + if ( + content_length is not None + and int(content_length) > max_response_bytes + ): + return BeatmapMirrorResponse( + data=None, + is_success=False, + request_url=str(response.request.url), + status_code=response.status_code, + error_message=f"Response too large: {content_length} bytes", + ) + + try: + data = parse(response) + except Exception as exc: + logging.warning( + "Failed to parse mirror response", + extra={ + "mirror_name": self.name, + "url": url, + "error": str(exc), + }, + ) + return BeatmapMirrorResponse( + data=None, + is_success=False, + request_url=str(response.request.url), + status_code=response.status_code, + error_message=str(exc), + ) + + return BeatmapMirrorResponse( + data=data, + is_success=True, + request_url=str(response.request.url), + status_code=response.status_code, + ) + + # Unreachable, but satisfies the type checker + return BeatmapMirrorResponse( + data=None, + is_success=False, + request_url=None, + status_code=None, + error_message="All retries exhausted", + ) + + def _check_rate_limit_headers(self, response: httpx.Response) -> None: + """Check rate limit headers and apply backpressure to token bucket.""" + remaining = response.headers.get("x-ratelimit-remaining") + if remaining is not None: + try: + self.health.apply_rate_limit_pressure(int(remaining)) + except ValueError: + pass + async def fetch_one_cheesegull_beatmap( self, beatmap_id: int, diff --git a/app/adapters/osu_mirrors/backends/mino.py b/app/adapters/osu_mirrors/backends/mino.py index 9b39104..afeb416 100644 --- a/app/adapters/osu_mirrors/backends/mino.py +++ b/app/adapters/osu_mirrors/backends/mino.py @@ -1,9 +1,7 @@ -import logging - -import httpx from typing_extensions import override from app import settings +from app.adapters.osu_mirrors.backends import MAX_RESPONSE_BYTES from app.adapters.osu_mirrors.backends import AbstractBeatmapMirror from app.adapters.osu_mirrors.backends import BeatmapMirrorResponse from app.repositories.beatmap_mirror_requests import MirrorResource @@ -14,76 +12,30 @@ class MinoMirror(AbstractBeatmapMirror): base_url = "https://catboy.best" supported_resources = {MirrorResource.OSZ_FILE, MirrorResource.BACKGROUND_IMAGE} + @override + def _extra_headers(self) -> dict[str, str]: + return {"x-ratelimit-key": settings.MINO_INCREASED_RATELIMIT_KEY} + @override async def fetch_beatmap_zip_data( self, beatmapset_id: int, ) -> BeatmapMirrorResponse[bytes | None]: - response: httpx.Response | None = None - try: - response = await self.http_client.get( - f"{self.base_url}/d/{beatmapset_id}", - headers={"x-ratelimit-key": settings.MINO_INCREASED_RATELIMIT_KEY}, - ) - if response.status_code in (404, 451): - return BeatmapMirrorResponse( - data=None, - is_success=True, - request_url=str(response.request.url), - status_code=response.status_code, - ) - response.raise_for_status() - return BeatmapMirrorResponse( - data=response.read(), - is_success=True, - request_url=str(response.request.url), - status_code=response.status_code, - ) - except Exception as exc: - return BeatmapMirrorResponse( - data=None, - is_success=False, - request_url=str(response.request.url) if response else None, - status_code=response.status_code if response else None, - error_message=str(exc), - ) + return await self._fetch( + f"{self.base_url}/d/{beatmapset_id}", + lambda r: r.read(), + max_response_bytes=MAX_RESPONSE_BYTES, + ) @override async def fetch_beatmap_background_image( self, beatmap_id: int, ) -> BeatmapMirrorResponse[bytes | None]: - response: httpx.Response | None = None - try: - response = await self.http_client.get( - f"{self.base_url}/preview/background/{beatmap_id}", - ) - if response.status_code in (404, 451): - return BeatmapMirrorResponse( - data=None, - is_success=True, - request_url=str(response.request.url), - status_code=response.status_code, - ) - response.raise_for_status() - return BeatmapMirrorResponse( - data=response.read(), - is_success=True, - request_url=str(response.request.url), - status_code=response.status_code, - ) - except Exception as exc: - logging.warning( - "Failed to fetch beatmap background from catboy.best", - exc_info=True, - ) - return BeatmapMirrorResponse( - data=None, - is_success=False, - request_url=str(response.request.url) if response else None, - status_code=response.status_code if response else None, - error_message=str(exc), - ) + return await self._fetch( + f"{self.base_url}/preview/background/{beatmap_id}", + lambda r: r.read(), + ) class MinoCentralMirror(MinoMirror): diff --git a/app/adapters/osu_mirrors/backends/nerinyan.py b/app/adapters/osu_mirrors/backends/nerinyan.py index 351794b..e18c1d0 100644 --- a/app/adapters/osu_mirrors/backends/nerinyan.py +++ b/app/adapters/osu_mirrors/backends/nerinyan.py @@ -1,8 +1,6 @@ -import logging - -import httpx from typing_extensions import override +from app.adapters.osu_mirrors.backends import MAX_RESPONSE_BYTES from app.adapters.osu_mirrors.backends import AbstractBeatmapMirror from app.adapters.osu_mirrors.backends import BeatmapMirrorResponse from app.repositories.beatmap_mirror_requests import MirrorResource @@ -18,30 +16,8 @@ async def fetch_beatmap_zip_data( self, beatmapset_id: int, ) -> BeatmapMirrorResponse[bytes | None]: - response: httpx.Response | None = None - try: - response = await self.http_client.get( - f"{self.base_url}/d/{beatmapset_id}", - ) - if response.status_code in (404, 451): - return BeatmapMirrorResponse( - data=None, - is_success=True, - request_url=str(response.request.url), - status_code=response.status_code, - ) - response.raise_for_status() - return BeatmapMirrorResponse( - data=response.read(), - is_success=True, - request_url=str(response.request.url), - status_code=response.status_code, - ) - except Exception as exc: - return BeatmapMirrorResponse( - data=None, - is_success=False, - request_url=str(response.request.url) if response else None, - status_code=response.status_code if response else None, - error_message=str(exc), - ) + return await self._fetch( + f"{self.base_url}/d/{beatmapset_id}", + lambda r: r.read(), + max_response_bytes=MAX_RESPONSE_BYTES, + ) diff --git a/app/adapters/osu_mirrors/backends/osu_direct.py b/app/adapters/osu_mirrors/backends/osu_direct.py index bee50d3..a5e215e 100644 --- a/app/adapters/osu_mirrors/backends/osu_direct.py +++ b/app/adapters/osu_mirrors/backends/osu_direct.py @@ -1,6 +1,6 @@ -import httpx from typing_extensions import override +from app.adapters.osu_mirrors.backends import MAX_RESPONSE_BYTES from app.adapters.osu_mirrors.backends import AbstractBeatmapMirror from app.adapters.osu_mirrors.backends import BeatmapMirrorResponse from app.common_models import CheesegullBeatmap @@ -18,129 +18,38 @@ async def fetch_one_cheesegull_beatmap( self, beatmap_id: int, ) -> BeatmapMirrorResponse[CheesegullBeatmap | None]: - response: httpx.Response | None = None - try: - response = await self.http_client.get( - f"{self.base_url}/api/b/{beatmap_id}", - ) - if response.status_code in (404, 451): - return BeatmapMirrorResponse( - data=None, - is_success=True, - request_url=str(response.request.url), - status_code=response.status_code, - ) - response.raise_for_status() - return BeatmapMirrorResponse( - data=CheesegullBeatmap.model_validate(response.json()), - is_success=True, - request_url=str(response.request.url), - status_code=response.status_code, - ) - except Exception as exc: - return BeatmapMirrorResponse( - data=None, - is_success=False, - request_url=str(response.request.url) if response else None, - status_code=response.status_code if response else None, - error_message=str(exc), - ) + return await self._fetch( + f"{self.base_url}/api/b/{beatmap_id}", + lambda r: CheesegullBeatmap.model_validate(r.json()), + ) @override async def fetch_one_cheesegull_beatmapset( self, beatmapset_id: int, ) -> BeatmapMirrorResponse[CheesegullBeatmapset | None]: - response: httpx.Response | None = None - try: - response = await self.http_client.get( - f"{self.base_url}/api/s/{beatmapset_id}", - ) - if response.status_code in (404, 451): - return BeatmapMirrorResponse( - data=None, - is_success=True, - request_url=str(response.request.url), - status_code=response.status_code, - ) - response.raise_for_status() - return BeatmapMirrorResponse( - data=CheesegullBeatmapset.model_validate(response.json()), - is_success=True, - request_url=str(response.request.url), - status_code=response.status_code, - ) - except Exception as exc: - return BeatmapMirrorResponse( - data=None, - is_success=False, - request_url=str(response.request.url) if response else None, - status_code=response.status_code if response else None, - error_message=str(exc), - ) + return await self._fetch( + f"{self.base_url}/api/s/{beatmapset_id}", + lambda r: CheesegullBeatmapset.model_validate(r.json()), + ) @override async def fetch_beatmap_zip_data( self, beatmapset_id: int, ) -> BeatmapMirrorResponse[bytes | None]: - response: httpx.Response | None = None - try: - response = await self.http_client.get( - f"{self.base_url}/api/d/{beatmapset_id}", - ) - if response.status_code in (404, 451): - return BeatmapMirrorResponse( - data=None, - is_success=True, - request_url=str(response.request.url), - status_code=response.status_code, - ) - response.raise_for_status() - return BeatmapMirrorResponse( - data=response.read(), - is_success=True, - request_url=str(response.request.url), - status_code=response.status_code, - ) - except Exception as exc: - return BeatmapMirrorResponse( - data=None, - is_success=False, - request_url=str(response.request.url) if response else None, - status_code=response.status_code if response else None, - error_message=str(exc), - ) + return await self._fetch( + f"{self.base_url}/api/d/{beatmapset_id}", + lambda r: r.read(), + max_response_bytes=MAX_RESPONSE_BYTES, + ) @override async def fetch_beatmap_background_image( self, beatmap_id: int, ) -> BeatmapMirrorResponse[bytes | None]: - response: httpx.Response | None = None - try: - response = await self.http_client.get( - f"{self.base_url}/api/media/background/{beatmap_id}", - ) - if response.status_code in (404, 451): - return BeatmapMirrorResponse( - data=None, - is_success=True, - request_url=str(response.request.url), - status_code=response.status_code, - ) - response.raise_for_status() - return BeatmapMirrorResponse( - data=response.read(), - is_success=True, - request_url=str(response.request.url), - status_code=response.status_code, - ) - except Exception as exc: - return BeatmapMirrorResponse( - data=None, - is_success=False, - request_url=str(response.request.url) if response else None, - status_code=response.status_code if response else None, - error_message=str(exc), - ) + return await self._fetch( + f"{self.base_url}/api/media/background/{beatmap_id}", + lambda r: r.read(), + ) diff --git a/app/adapters/osu_mirrors/resilience.py b/app/adapters/osu_mirrors/resilience.py index c7a764f..1d03741 100644 --- a/app/adapters/osu_mirrors/resilience.py +++ b/app/adapters/osu_mirrors/resilience.py @@ -1,3 +1,4 @@ +import asyncio import time from dataclasses import dataclass from dataclasses import field @@ -10,6 +11,11 @@ class CircuitState(StrEnum): HALF_OPEN = "half_open" # Testing recovery, one request allowed +MAX_COOLDOWN_SECONDS = 600.0 # 10 minutes +RATE_LIMIT_REMAINING_THRESHOLD = 10 +MAX_RATE_LIMIT_WAIT_SECONDS = 2.0 + + @dataclass class CircuitBreaker: """ @@ -17,27 +23,42 @@ class CircuitBreaker: Tracks failures and temporarily stops sending requests to a failing backend. After a cooldown period, allows a single probe request to test recovery. + + Uses exponential backoff on cooldown: each time a probe fails and the + circuit re-opens, the cooldown doubles (up to MAX_COOLDOWN_SECONDS). + A successful probe resets the cooldown to the base value. """ failure_threshold: int = 3 - cooldown_seconds: float = 30.0 + base_cooldown_seconds: float = 30.0 state: CircuitState = field(default=CircuitState.CLOSED) consecutive_failures: int = field(default=0) opened_at: float | None = field(default=None) + current_cooldown: float = field(init=False) + + def __post_init__(self) -> None: + self.current_cooldown = self.base_cooldown_seconds def record_success(self) -> None: """Record a successful request. Resets failure count and closes circuit.""" self.consecutive_failures = 0 self.state = CircuitState.CLOSED self.opened_at = None + self.current_cooldown = self.base_cooldown_seconds def record_failure(self) -> None: """Record a failed request. Opens circuit if threshold is exceeded.""" self.consecutive_failures += 1 if self.consecutive_failures >= self.failure_threshold: + was_half_open = self.state == CircuitState.HALF_OPEN self.state = CircuitState.OPEN self.opened_at = time.time() + if was_half_open: + self.current_cooldown = min( + self.current_cooldown * 2, + MAX_COOLDOWN_SECONDS, + ) def should_allow_request(self) -> bool: """Check if a request should be allowed through the circuit.""" @@ -48,7 +69,7 @@ def should_allow_request(self) -> bool: if self.opened_at is None: return True elapsed = time.time() - self.opened_at - if elapsed >= self.cooldown_seconds: + if elapsed >= self.current_cooldown: # Transition to half-open, allow one probe request self.state = CircuitState.HALF_OPEN return True @@ -113,7 +134,7 @@ class MirrorHealth: """ Tracks health metrics for a single mirror. - Combines circuit breaker, rate limiting, and latency tracking. + Combines circuit breaker, rate limiting, and latency/failure tracking. """ circuit: CircuitBreaker = field(default_factory=CircuitBreaker) @@ -123,23 +144,56 @@ class MirrorHealth: latency_ema: float = field(default=1.0) latency_ema_alpha: float = field(default=0.3) # Weight for new observations + # Exponential moving average of failure rate (0.0 = all success, 1.0 = all failure) + failure_ema: float = field(default=0.0) + failure_ema_alpha: float = field(default=0.2) + + def score(self) -> float: + """Composite score for mirror ordering (lower is better). + + Penalizes mirrors with higher failure rates so that a fast + but unreliable mirror ranks below a slower but reliable one. + """ + return self.latency_ema * (1.0 + 3.0 * self.failure_ema) + def is_available(self) -> bool: - """Check if this mirror is available for requests.""" + """Check if this mirror is available for requests (non-blocking).""" if not self.circuit.should_allow_request(): return False if self.rate_limiter is not None and not self.rate_limiter.try_acquire(): return False return True + async def wait_for_availability(self) -> bool: + """Wait briefly for rate limiter tokens. Returns False if circuit is open.""" + if not self.circuit.should_allow_request(): + return False + if self.rate_limiter is not None: + wait = self.rate_limiter.time_until_available() + if wait > MAX_RATE_LIMIT_WAIT_SECONDS: + return False + if wait > 0: + await asyncio.sleep(wait) + self.rate_limiter.try_acquire() + return True + def record_success(self, latency_seconds: float) -> None: """Record a successful request with its latency.""" self.circuit.record_success() - # Update EMA: new_ema = alpha * observation + (1 - alpha) * old_ema self.latency_ema = ( self.latency_ema_alpha * latency_seconds + (1 - self.latency_ema_alpha) * self.latency_ema ) + self.failure_ema = (1 - self.failure_ema_alpha) * self.failure_ema def record_failure(self) -> None: """Record a failed request.""" self.circuit.record_failure() + self.failure_ema = ( + self.failure_ema_alpha + (1 - self.failure_ema_alpha) * self.failure_ema + ) + + def apply_rate_limit_pressure(self, remaining: int) -> None: + """Drain rate limiter tokens when upstream reports low remaining quota.""" + if self.rate_limiter is not None and remaining < RATE_LIMIT_REMAINING_THRESHOLD: + self.rate_limiter.tokens = 0 diff --git a/app/repositories/beatmap_mirror_requests.py b/app/repositories/beatmap_mirror_requests.py index 89da1d9..3167204 100644 --- a/app/repositories/beatmap_mirror_requests.py +++ b/app/repositories/beatmap_mirror_requests.py @@ -1,4 +1,3 @@ -import math from datetime import datetime from enum import StrEnum @@ -6,10 +5,6 @@ from app import state -# Give new mirrors a fair shot -# to get their foot in the race -MIRROR_INITIAL_WEIGHT = 100 - class MirrorResource(StrEnum): OSZ_FILE = "osz_file" @@ -30,55 +25,6 @@ class BeatmapMirrorRequest(BaseModel): resource: MirrorResource -class BeatmapMirrorScore(BaseModel): - mirror_name: str - score: float - - -async def get_mirror_weight(mirror_name: str, resource: MirrorResource) -> int: - """Give the mirror a weighting based on its latency and failure rate.""" - p75_success_ms_latency = await state.database.fetch_val( - """\ - WITH request_latencies AS ( - SELECT (ended_at - started_at) * 1000 AS ms_elapsed, - PERCENT_RANK() OVER (ORDER BY ended_at - started_at) p - FROM beatmap_mirror_requests - WHERE started_at > NOW() - INTERVAL 4 HOUR - AND mirror_name = :mirror_name - AND resource = :resource - AND success = 1 - ) - SELECT DISTINCT first_value(ms_elapsed) OVER ( - ORDER BY CASE WHEN p <= 0.75 THEN p END DESC - ) p75_success_ms_latency - FROM request_latencies - """, - {"mirror_name": mirror_name, "resource": resource.value}, - ) - if p75_success_ms_latency is None: - return MIRROR_INITIAL_WEIGHT - - failure_rate = await state.database.fetch_val( - """\ - SELECT AVG(success = 0) - FROM beatmap_mirror_requests - WHERE started_at > NOW() - INTERVAL 4 HOUR - AND mirror_name = :mirror_name - AND resource = :resource - """, - {"mirror_name": mirror_name, "resource": resource.value}, - ) - if failure_rate is None: - return MIRROR_INITIAL_WEIGHT - - # https://www.desmos.com/calculator/wxpsjhdby9 - latency_weight = 1000 * math.exp(-1 / 1000 * float(p75_success_ms_latency)) - failure_weight = math.exp(-30 * failure_rate) - # TODO: integrate `mirror_cache_age` into the weight calculation - weight = max(1, int(latency_weight * failure_weight)) - return weight - - async def create( request_url: str, api_key_id: str | None,