Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions app/adapters/osu_mirrors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,15 @@ def get_available_mirrors(
- Circuit breaker state (not open)
- Rate limiter (has capacity)

Returns mirrors sorted by latency EMA (fastest first).
Returns mirrors sorted by composite score (latency weighted by reliability).
"""
available = [
mirror
for mirror in BEATMAP_MIRRORS
if resource in mirror.supported_resources and mirror.health.is_available()
]
# Sort by latency (fastest first)
available.sort(key=lambda m: m.health.latency_ema)
# Sort by composite score (latency weighted by reliability, lower is better)
available.sort(key=lambda m: m.health.score())
return available


Expand Down Expand Up @@ -242,9 +242,11 @@ async def fetch_with_fallback(
)
return response.data

# Hedged request failed, try remaining mirrors sequentially
# Hedged request failed, try remaining mirrors sequentially.
# Use wait_for_availability() so we briefly wait for rate limiter
# tokens instead of skipping healthy-but-throttled mirrors.
for mirror in available[HEDGE_COUNT:]:
if not mirror.health.is_available():
if not await mirror.health.wait_for_availability():
continue

started_at = time.time()
Expand Down
142 changes: 142 additions & 0 deletions app/adapters/osu_mirrors/backends/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
import asyncio
import logging
from abc import ABC
from collections.abc import Callable
from dataclasses import dataclass
from typing import Any
from typing import ClassVar
Expand All @@ -14,6 +17,14 @@
from app.repositories.beatmap_mirror_requests import MirrorResource

T = TypeVar("T", covariant=True)
_R = TypeVar("_R")

RETRYABLE_STATUSES = frozenset({429, 500, 502, 503})
NOT_FOUND_STATUSES = frozenset({404, 451})

MAX_RETRIES = 1
RETRY_BACKOFF_SECONDS = 1.0
MAX_RESPONSE_BYTES = 100 * 1024 * 1024 # 100 MB


@dataclass
Expand Down Expand Up @@ -47,6 +58,137 @@ def __init__(self, *args: Any, **kwargs: Any) -> None:
self.health = MirrorHealth(rate_limiter=rate_limiter)
super().__init__(*args, **kwargs)

def _extra_headers(self) -> dict[str, str]:
"""Override to add mirror-specific request headers."""
return {}

async def _fetch(
self,
url: str,
parse: Callable[[httpx.Response], _R],
*,
extra_headers: dict[str, str] | None = None,
max_response_bytes: int | None = None,
) -> BeatmapMirrorResponse[_R | None]:
"""Make an HTTP GET request with retry logic and error handling.

Retries on transient HTTP errors (429, 500, 502, 503) up to
MAX_RETRIES times with exponential backoff. Checks upstream
rate limit headers and applies backpressure to the token bucket.
Optionally rejects responses exceeding max_response_bytes.
"""
merged_headers = {**self._extra_headers(), **(extra_headers or {})}
response: httpx.Response | None = None

for attempt in range(MAX_RETRIES + 1):
try:
response = await self.http_client.get(
url,
headers=merged_headers or None,
)
except Exception as exc:
if attempt < MAX_RETRIES:
await asyncio.sleep(RETRY_BACKOFF_SECONDS * (attempt + 1))
continue
return BeatmapMirrorResponse(
data=None,
is_success=False,
request_url=None,
status_code=None,
error_message=str(exc),
)

self._check_rate_limit_headers(response)

if response.status_code in NOT_FOUND_STATUSES:
return BeatmapMirrorResponse(
data=None,
is_success=True,
request_url=str(response.request.url),
status_code=response.status_code,
)

if response.status_code in RETRYABLE_STATUSES:
if attempt < MAX_RETRIES:
await asyncio.sleep(RETRY_BACKOFF_SECONDS * (attempt + 1))
continue
return BeatmapMirrorResponse(
data=None,
is_success=False,
request_url=str(response.request.url),
status_code=response.status_code,
error_message=f"HTTP {response.status_code} after retries",
)

try:
response.raise_for_status()
except httpx.HTTPStatusError as exc:
return BeatmapMirrorResponse(
data=None,
is_success=False,
request_url=str(response.request.url),
status_code=response.status_code,
error_message=str(exc),
)

if max_response_bytes is not None:
content_length = response.headers.get("content-length")
if (
content_length is not None
and int(content_length) > max_response_bytes
):
return BeatmapMirrorResponse(
data=None,
is_success=False,
request_url=str(response.request.url),
status_code=response.status_code,
error_message=f"Response too large: {content_length} bytes",
)

try:
data = parse(response)
except Exception as exc:
logging.warning(
"Failed to parse mirror response",
extra={
"mirror_name": self.name,
"url": url,
"error": str(exc),
},
)
return BeatmapMirrorResponse(
data=None,
is_success=False,
request_url=str(response.request.url),
status_code=response.status_code,
error_message=str(exc),
)

return BeatmapMirrorResponse(
data=data,
is_success=True,
request_url=str(response.request.url),
status_code=response.status_code,
)

# Unreachable, but satisfies the type checker
return BeatmapMirrorResponse(
data=None,
is_success=False,
request_url=None,
status_code=None,
error_message="All retries exhausted",
)

def _check_rate_limit_headers(self, response: httpx.Response) -> None:
"""Check rate limit headers and apply backpressure to token bucket."""
remaining = response.headers.get("x-ratelimit-remaining")
if remaining is not None:
try:
self.health.apply_rate_limit_pressure(int(remaining))
except ValueError:
pass

async def fetch_one_cheesegull_beatmap(
self,
beatmap_id: int,
Expand Down
76 changes: 14 additions & 62 deletions app/adapters/osu_mirrors/backends/mino.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
import logging

import httpx
from typing_extensions import override

from app import settings
from app.adapters.osu_mirrors.backends import MAX_RESPONSE_BYTES
from app.adapters.osu_mirrors.backends import AbstractBeatmapMirror
from app.adapters.osu_mirrors.backends import BeatmapMirrorResponse
from app.repositories.beatmap_mirror_requests import MirrorResource
Expand All @@ -14,76 +12,30 @@ class MinoMirror(AbstractBeatmapMirror):
base_url = "https://catboy.best"
supported_resources = {MirrorResource.OSZ_FILE, MirrorResource.BACKGROUND_IMAGE}

@override
def _extra_headers(self) -> dict[str, str]:
return {"x-ratelimit-key": settings.MINO_INCREASED_RATELIMIT_KEY}

@override
async def fetch_beatmap_zip_data(
self,
beatmapset_id: int,
) -> BeatmapMirrorResponse[bytes | None]:
response: httpx.Response | None = None
try:
response = await self.http_client.get(
f"{self.base_url}/d/{beatmapset_id}",
headers={"x-ratelimit-key": settings.MINO_INCREASED_RATELIMIT_KEY},
)
if response.status_code in (404, 451):
return BeatmapMirrorResponse(
data=None,
is_success=True,
request_url=str(response.request.url),
status_code=response.status_code,
)
response.raise_for_status()
return BeatmapMirrorResponse(
data=response.read(),
is_success=True,
request_url=str(response.request.url),
status_code=response.status_code,
)
except Exception as exc:
return BeatmapMirrorResponse(
data=None,
is_success=False,
request_url=str(response.request.url) if response else None,
status_code=response.status_code if response else None,
error_message=str(exc),
)
return await self._fetch(
f"{self.base_url}/d/{beatmapset_id}",
lambda r: r.read(),
max_response_bytes=MAX_RESPONSE_BYTES,
)

@override
async def fetch_beatmap_background_image(
self,
beatmap_id: int,
) -> BeatmapMirrorResponse[bytes | None]:
response: httpx.Response | None = None
try:
response = await self.http_client.get(
f"{self.base_url}/preview/background/{beatmap_id}",
)
if response.status_code in (404, 451):
return BeatmapMirrorResponse(
data=None,
is_success=True,
request_url=str(response.request.url),
status_code=response.status_code,
)
response.raise_for_status()
return BeatmapMirrorResponse(
data=response.read(),
is_success=True,
request_url=str(response.request.url),
status_code=response.status_code,
)
except Exception as exc:
logging.warning(
"Failed to fetch beatmap background from catboy.best",
exc_info=True,
)
return BeatmapMirrorResponse(
data=None,
is_success=False,
request_url=str(response.request.url) if response else None,
status_code=response.status_code if response else None,
error_message=str(exc),
)
return await self._fetch(
f"{self.base_url}/preview/background/{beatmap_id}",
lambda r: r.read(),
)


class MinoCentralMirror(MinoMirror):
Expand Down
36 changes: 6 additions & 30 deletions app/adapters/osu_mirrors/backends/nerinyan.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import logging

import httpx
from typing_extensions import override

from app.adapters.osu_mirrors.backends import MAX_RESPONSE_BYTES
from app.adapters.osu_mirrors.backends import AbstractBeatmapMirror
from app.adapters.osu_mirrors.backends import BeatmapMirrorResponse
from app.repositories.beatmap_mirror_requests import MirrorResource
Expand All @@ -18,30 +16,8 @@ async def fetch_beatmap_zip_data(
self,
beatmapset_id: int,
) -> BeatmapMirrorResponse[bytes | None]:
response: httpx.Response | None = None
try:
response = await self.http_client.get(
f"{self.base_url}/d/{beatmapset_id}",
)
if response.status_code in (404, 451):
return BeatmapMirrorResponse(
data=None,
is_success=True,
request_url=str(response.request.url),
status_code=response.status_code,
)
response.raise_for_status()
return BeatmapMirrorResponse(
data=response.read(),
is_success=True,
request_url=str(response.request.url),
status_code=response.status_code,
)
except Exception as exc:
return BeatmapMirrorResponse(
data=None,
is_success=False,
request_url=str(response.request.url) if response else None,
status_code=response.status_code if response else None,
error_message=str(exc),
)
return await self._fetch(
f"{self.base_url}/d/{beatmapset_id}",
lambda r: r.read(),
max_response_bytes=MAX_RESPONSE_BYTES,
)
Loading