From 573fc25f9680edef01dd1253bc352efdb8454ab8 Mon Sep 17 00:00:00 2001 From: starship-s <45587122+starship-s@users.noreply.github.com> Date: Mon, 11 May 2026 21:46:24 -0600 Subject: [PATCH] fix(providers): load Codex quota from credential pool --- api/providers.py | 185 +++++++++++++++++++++++++++- tests/test_provider_quota_status.py | 151 +++++++++++++++++++++++ 2 files changed, 335 insertions(+), 1 deletion(-) diff --git a/api/providers.py b/api/providers.py index ed44c94c..9734e111 100644 --- a/api/providers.py +++ b/api/providers.py @@ -125,12 +125,19 @@ def _account_usage_preexec_fn() -> None: _ACCOUNT_USAGE_SUBPROCESS_CODE = r""" +import base64 import json import sys +from datetime import datetime, timezone +from types import SimpleNamespace +from urllib import request as urllib_request from agent.account_usage import fetch_account_usage +_CODEX_DEFAULT_BASE_URL = "https://chatgpt.com/backend-api/codex" + + def _iso(value): if value in (None, ""): return None @@ -165,9 +172,185 @@ def _snapshot_payload(snapshot): } +def _snapshot_available(snapshot): + if snapshot is None: + return False + try: + return bool(getattr(snapshot, "available", False)) + except Exception: + return False + + +def _number(value): + if isinstance(value, bool) or value is None: + return None + if isinstance(value, (int, float)): + return value + try: + text = str(value).strip() + if not text: + return None + number = float(text) + return int(number) if number.is_integer() else number + except Exception: + return None + + +def _parse_dt(value): + if value in (None, ""): + return None + if isinstance(value, (int, float)): + try: + return datetime.fromtimestamp(float(value), tz=timezone.utc) + except Exception: + return None + text = str(value).strip() + if not text: + return None + if text.endswith("Z"): + text = text[:-1] + "+00:00" + try: + dt = datetime.fromisoformat(text) + except ValueError: + return None + return dt if dt.tzinfo else dt.replace(tzinfo=timezone.utc) + + +def _title_case_slug(value): + cleaned = str(value or "").strip() + if not cleaned: + return None + return cleaned.replace("_", " ").replace("-", " ").title() + + +def _resolve_codex_usage_url(base_url): + normalized = str(base_url or "").strip().rstrip("/") or _CODEX_DEFAULT_BASE_URL + if normalized.endswith("/codex"): + normalized = normalized[: -len("/codex")] + if "/backend-api" in normalized: + return normalized + "/wham/usage" + return normalized + "/api/codex/usage" + + +def _jwt_claims(token): + if not isinstance(token, str) or token.count(".") != 2: + return {} + payload = token.split(".")[1] + payload += "=" * ((4 - len(payload) % 4) % 4) + try: + claims = json.loads(base64.urlsafe_b64decode(payload.encode("utf-8")).decode("utf-8")) + except Exception: + return {} + return claims if isinstance(claims, dict) else {} + + +def _codex_usage_headers(access_token): + headers = { + "Authorization": "Bearer " + access_token, + "Accept": "application/json", + "User-Agent": "codex_cli_rs/0.0.0 (Hermes WebUI)", + "originator": "codex_cli_rs", + } + auth_claim = _jwt_claims(access_token).get("https://api.openai.com/auth") + account_id = None + if isinstance(auth_claim, dict): + account_id = auth_claim.get("chatgpt_account_id") + if isinstance(account_id, str) and account_id.strip(): + headers["ChatGPT-Account-ID"] = account_id.strip() + return headers + + +def _entry_value(entry, *names): + for name in names: + try: + value = getattr(entry, name) + except Exception: + value = None + if value in (None, ""): + continue + text = str(value).strip() + if text: + return text + return None + + +def _codex_snapshot_from_usage_payload(payload): + if not isinstance(payload, dict): + payload = {} + rate_limit = payload.get("rate_limit") + if not isinstance(rate_limit, dict): + rate_limit = {} + windows = [] + for key, label in (("primary_window", "Session"), ("secondary_window", "Weekly")): + window = rate_limit.get(key) + if not isinstance(window, dict): + continue + used = _number(window.get("used_percent")) + if used is None: + continue + windows.append(SimpleNamespace( + label=label, + used_percent=float(used), + reset_at=_parse_dt(window.get("reset_at")), + detail=None, + )) + + details = [] + credits = payload.get("credits") + if isinstance(credits, dict) and credits.get("has_credits"): + balance = _number(credits.get("balance")) + if balance is not None: + details.append("Credits balance: $" + format(float(balance), ".2f")) + elif credits.get("unlimited"): + details.append("Credits balance: unlimited") + + return SimpleNamespace( + provider="openai-codex", + source="usage_api", + title="Account limits", + plan=_title_case_slug(payload.get("plan_type")), + windows=tuple(windows), + details=tuple(details), + available=bool(windows or details), + unavailable_reason=None, + fetched_at=datetime.now(timezone.utc), + ) + + +def _fetch_codex_account_usage_from_pool(): + try: + from agent.credential_pool import load_pool + + pool = load_pool("openai-codex") + entry = pool.select() if pool is not None else None + if entry is None: + return None + access_token = _entry_value(entry, "runtime_api_key", "access_token") + if not access_token: + return None + base_url = _entry_value(entry, "runtime_base_url", "base_url") or _CODEX_DEFAULT_BASE_URL + request = urllib_request.Request( + _resolve_codex_usage_url(base_url), + headers=_codex_usage_headers(access_token), + ) + with urllib_request.urlopen(request, timeout=15.0) as response: + payload = json.loads(response.read().decode("utf-8") or "{}") + return _codex_snapshot_from_usage_payload(payload) + except Exception: + return None + + provider = sys.argv[1] api_key = sys.argv[2] or None -print(json.dumps(_snapshot_payload(fetch_account_usage(provider, api_key=api_key)))) +try: + snapshot = fetch_account_usage(provider, api_key=api_key) +except Exception: + snapshot = None +if str(provider or "").strip().lower() == "openai-codex" and not _snapshot_available(snapshot): + fallback_snapshot = _fetch_codex_account_usage_from_pool() + if _snapshot_available(fallback_snapshot) or snapshot is None: + snapshot = fallback_snapshot +print(json.dumps(_snapshot_payload(snapshot))) """ # SECTION: Provider ↔ env var mapping diff --git a/tests/test_provider_quota_status.py b/tests/test_provider_quota_status.py index fa2d769b..8da72e98 100644 --- a/tests/test_provider_quota_status.py +++ b/tests/test_provider_quota_status.py @@ -2,10 +2,13 @@ from __future__ import annotations +import base64 import json import inspect import os +import sys import threading +import types import urllib.error from datetime import datetime, timezone from io import BytesIO @@ -275,6 +278,154 @@ def test_codex_account_usage_unavailable_is_sanitized(monkeypatch, tmp_path): assert "secret" not in repr(result).lower() +def test_codex_account_usage_subprocess_falls_back_to_credential_pool(monkeypatch, capsys): + """Codex quota probes should use credential_pool credentials when legacy auth misses.""" + import api.providers as providers + + def b64url(payload: bytes) -> str: + return base64.urlsafe_b64encode(payload).rstrip(b"=").decode("ascii") + + token = ".".join(( + b64url(b'{"alg":"none","typ":"JWT"}'), + b64url(json.dumps({ + "https://api.openai.com/auth": { + "chatgpt_account_id": "acct-test-123", + }, + }).encode("utf-8")), + b64url(b"signature"), + )) + + fetch_calls = [] + load_pool_calls = [] + selected = [] + seen = {} + + agent_mod = types.ModuleType("agent") + agent_mod.__path__ = [] + account_usage_mod = types.ModuleType("agent.account_usage") + credential_pool_mod = types.ModuleType("agent.credential_pool") + + def fake_fetch_account_usage(provider, *, base_url=None, api_key=None): + fetch_calls.append((provider, base_url, api_key)) + return None + + class FakePool: + def select(self): + selected.append(True) + return SimpleNamespace( + runtime_api_key=token, + runtime_base_url="https://chatgpt.com/backend-api/codex", + ) + + def fake_load_pool(provider): + load_pool_calls.append(provider) + return FakePool() + + def fake_urlopen(req, timeout): + seen["url"] = req.full_url + seen["timeout"] = timeout + seen["headers"] = {key.lower(): value for key, value in req.header_items()} + payload = { + "plan_type": "pro", + "rate_limit": { + "primary_window": {"used_percent": 15, "reset_at": 1_900_000_000}, + "secondary_window": {"used_percent": 40, "reset_at": "2030-03-24T12:30:00Z"}, + }, + "credits": {"has_credits": True, "balance": 12.5}, + } + return _FakeResponse(json.dumps(payload).encode("utf-8")) + + account_usage_mod.fetch_account_usage = fake_fetch_account_usage + credential_pool_mod.load_pool = fake_load_pool + monkeypatch.setitem(sys.modules, "agent", agent_mod) + monkeypatch.setitem(sys.modules, "agent.account_usage", account_usage_mod) + monkeypatch.setitem(sys.modules, "agent.credential_pool", credential_pool_mod) + monkeypatch.setattr(providers.urllib.request, "urlopen", fake_urlopen) + monkeypatch.setattr(sys, "argv", ["quota-probe", "openai-codex", ""]) + + exec(providers._ACCOUNT_USAGE_SUBPROCESS_CODE, {"__name__": "__main__"}) + + output = capsys.readouterr().out.strip() + snapshot = json.loads(output) + + assert fetch_calls == [("openai-codex", None, None)] + assert load_pool_calls == ["openai-codex"] + assert selected == [True] + assert seen["url"] == "https://chatgpt.com/backend-api/wham/usage" + assert seen["timeout"] == 15.0 + headers = seen["headers"] + assert headers["authorization"] == f"Bearer {token}" + assert headers["accept"] == "application/json" + assert headers["originator"] == "codex_cli_rs" + assert headers["user-agent"].startswith("codex_cli_rs/") + assert headers["chatgpt-account-id"] == "acct-test-123" + assert snapshot["provider"] == "openai-codex" + assert snapshot["source"] == "usage_api" + assert snapshot["plan"] == "Pro" + assert snapshot["windows"][0]["label"] == "Session" + assert snapshot["windows"][0]["used_percent"] == 15.0 + assert snapshot["windows"][1]["label"] == "Weekly" + assert snapshot["details"] == ["Credits balance: $12.50"] + assert snapshot["available"] is True + assert token not in output + + +def test_codex_account_usage_subprocess_keeps_legacy_reason_when_pool_misses(monkeypatch, capsys): + """A failed pool fallback should not discard the legacy unavailable reason.""" + import api.providers as providers + + fetch_calls = [] + load_pool_calls = [] + + agent_mod = types.ModuleType("agent") + agent_mod.__path__ = [] + account_usage_mod = types.ModuleType("agent.account_usage") + credential_pool_mod = types.ModuleType("agent.credential_pool") + + def fake_fetch_account_usage(provider, *, base_url=None, api_key=None): + fetch_calls.append((provider, base_url, api_key)) + return SimpleNamespace( + provider="openai-codex", + source="usage_api", + title="Account limits", + plan=None, + windows=(), + details=(), + available=False, + unavailable_reason="Codex account limits are not available for this credential.", + fetched_at=datetime(2030, 3, 17, 12, 30, tzinfo=timezone.utc), + ) + + class EmptyPool: + def select(self): + return None + + def fake_load_pool(provider): + load_pool_calls.append(provider) + return EmptyPool() + + def explode_urlopen(*_args, **_kwargs): + raise AssertionError("no network call should happen when the pool has no selected entry") + + account_usage_mod.fetch_account_usage = fake_fetch_account_usage + credential_pool_mod.load_pool = fake_load_pool + monkeypatch.setitem(sys.modules, "agent", agent_mod) + monkeypatch.setitem(sys.modules, "agent.account_usage", account_usage_mod) + monkeypatch.setitem(sys.modules, "agent.credential_pool", credential_pool_mod) + monkeypatch.setattr(providers.urllib.request, "urlopen", explode_urlopen) + monkeypatch.setattr(sys, "argv", ["quota-probe", "openai-codex", ""]) + + exec(providers._ACCOUNT_USAGE_SUBPROCESS_CODE, {"__name__": "__main__"}) + + snapshot = json.loads(capsys.readouterr().out.strip()) + + assert fetch_calls == [("openai-codex", None, None)] + assert load_pool_calls == ["openai-codex"] + assert snapshot["available"] is False + assert snapshot["unavailable_reason"] == "Codex account limits are not available for this credential." + assert snapshot["fetched_at"] == "2030-03-17T12:30:00Z" + + def test_anthropic_oauth_usage_unavailable_reason_is_reported(monkeypatch, tmp_path): """Hermes Agent can report why account limits are not available.""" monkeypatch.setattr(profiles, "get_active_hermes_home", lambda: tmp_path)