fix(providers): load Codex quota from credential pool

This commit is contained in:
starship-s
2026-05-11 21:46:24 -06:00
parent 306dd2bf09
commit 573fc25f96
2 changed files with 335 additions and 1 deletions
+184 -1
View File
@@ -125,12 +125,19 @@ def _account_usage_preexec_fn() -> None:
_ACCOUNT_USAGE_SUBPROCESS_CODE = r"""
import base64
import json
import sys
from datetime import datetime, timezone
from types import SimpleNamespace
from urllib import request as urllib_request
from agent.account_usage import fetch_account_usage
_CODEX_DEFAULT_BASE_URL = "https://chatgpt.com/backend-api/codex"
def _iso(value):
if value in (None, ""):
return None
@@ -165,9 +172,185 @@ def _snapshot_payload(snapshot):
}
def _snapshot_available(snapshot):
if snapshot is None:
return False
try:
return bool(getattr(snapshot, "available", False))
except Exception:
return False
def _number(value):
if isinstance(value, bool) or value is None:
return None
if isinstance(value, (int, float)):
return value
try:
text = str(value).strip()
if not text:
return None
number = float(text)
return int(number) if number.is_integer() else number
except Exception:
return None
def _parse_dt(value):
if value in (None, ""):
return None
if isinstance(value, (int, float)):
try:
return datetime.fromtimestamp(float(value), tz=timezone.utc)
except Exception:
return None
text = str(value).strip()
if not text:
return None
if text.endswith("Z"):
text = text[:-1] + "+00:00"
try:
dt = datetime.fromisoformat(text)
except ValueError:
return None
return dt if dt.tzinfo else dt.replace(tzinfo=timezone.utc)
def _title_case_slug(value):
cleaned = str(value or "").strip()
if not cleaned:
return None
return cleaned.replace("_", " ").replace("-", " ").title()
def _resolve_codex_usage_url(base_url):
normalized = str(base_url or "").strip().rstrip("/") or _CODEX_DEFAULT_BASE_URL
if normalized.endswith("/codex"):
normalized = normalized[: -len("/codex")]
if "/backend-api" in normalized:
return normalized + "/wham/usage"
return normalized + "/api/codex/usage"
def _jwt_claims(token):
if not isinstance(token, str) or token.count(".") != 2:
return {}
payload = token.split(".")[1]
payload += "=" * ((4 - len(payload) % 4) % 4)
try:
claims = json.loads(base64.urlsafe_b64decode(payload.encode("utf-8")).decode("utf-8"))
except Exception:
return {}
return claims if isinstance(claims, dict) else {}
def _codex_usage_headers(access_token):
headers = {
"Authorization": "Bearer " + access_token,
"Accept": "application/json",
"User-Agent": "codex_cli_rs/0.0.0 (Hermes WebUI)",
"originator": "codex_cli_rs",
}
auth_claim = _jwt_claims(access_token).get("https://api.openai.com/auth")
account_id = None
if isinstance(auth_claim, dict):
account_id = auth_claim.get("chatgpt_account_id")
if isinstance(account_id, str) and account_id.strip():
headers["ChatGPT-Account-ID"] = account_id.strip()
return headers
def _entry_value(entry, *names):
for name in names:
try:
value = getattr(entry, name)
except Exception:
value = None
if value in (None, ""):
continue
text = str(value).strip()
if text:
return text
return None
def _codex_snapshot_from_usage_payload(payload):
if not isinstance(payload, dict):
payload = {}
rate_limit = payload.get("rate_limit")
if not isinstance(rate_limit, dict):
rate_limit = {}
windows = []
for key, label in (("primary_window", "Session"), ("secondary_window", "Weekly")):
window = rate_limit.get(key)
if not isinstance(window, dict):
continue
used = _number(window.get("used_percent"))
if used is None:
continue
windows.append(SimpleNamespace(
label=label,
used_percent=float(used),
reset_at=_parse_dt(window.get("reset_at")),
detail=None,
))
details = []
credits = payload.get("credits")
if isinstance(credits, dict) and credits.get("has_credits"):
balance = _number(credits.get("balance"))
if balance is not None:
details.append("Credits balance: $" + format(float(balance), ".2f"))
elif credits.get("unlimited"):
details.append("Credits balance: unlimited")
return SimpleNamespace(
provider="openai-codex",
source="usage_api",
title="Account limits",
plan=_title_case_slug(payload.get("plan_type")),
windows=tuple(windows),
details=tuple(details),
available=bool(windows or details),
unavailable_reason=None,
fetched_at=datetime.now(timezone.utc),
)
def _fetch_codex_account_usage_from_pool():
try:
from agent.credential_pool import load_pool
pool = load_pool("openai-codex")
entry = pool.select() if pool is not None else None
if entry is None:
return None
access_token = _entry_value(entry, "runtime_api_key", "access_token")
if not access_token:
return None
base_url = _entry_value(entry, "runtime_base_url", "base_url") or _CODEX_DEFAULT_BASE_URL
request = urllib_request.Request(
_resolve_codex_usage_url(base_url),
headers=_codex_usage_headers(access_token),
)
with urllib_request.urlopen(request, timeout=15.0) as response:
payload = json.loads(response.read().decode("utf-8") or "{}")
return _codex_snapshot_from_usage_payload(payload)
except Exception:
return None
provider = sys.argv[1]
api_key = sys.argv[2] or None
print(json.dumps(_snapshot_payload(fetch_account_usage(provider, api_key=api_key))))
try:
snapshot = fetch_account_usage(provider, api_key=api_key)
except Exception:
snapshot = None
if str(provider or "").strip().lower() == "openai-codex" and not _snapshot_available(snapshot):
fallback_snapshot = _fetch_codex_account_usage_from_pool()
if _snapshot_available(fallback_snapshot) or snapshot is None:
snapshot = fallback_snapshot
print(json.dumps(_snapshot_payload(snapshot)))
"""
# SECTION: Provider ↔ env var mapping
+151
View File
@@ -2,10 +2,13 @@
from __future__ import annotations
import base64
import json
import inspect
import os
import sys
import threading
import types
import urllib.error
from datetime import datetime, timezone
from io import BytesIO
@@ -275,6 +278,154 @@ def test_codex_account_usage_unavailable_is_sanitized(monkeypatch, tmp_path):
assert "secret" not in repr(result).lower()
def test_codex_account_usage_subprocess_falls_back_to_credential_pool(monkeypatch, capsys):
"""Codex quota probes should use credential_pool credentials when legacy auth misses."""
import api.providers as providers
def b64url(payload: bytes) -> str:
return base64.urlsafe_b64encode(payload).rstrip(b"=").decode("ascii")
token = ".".join((
b64url(b'{"alg":"none","typ":"JWT"}'),
b64url(json.dumps({
"https://api.openai.com/auth": {
"chatgpt_account_id": "acct-test-123",
},
}).encode("utf-8")),
b64url(b"signature"),
))
fetch_calls = []
load_pool_calls = []
selected = []
seen = {}
agent_mod = types.ModuleType("agent")
agent_mod.__path__ = []
account_usage_mod = types.ModuleType("agent.account_usage")
credential_pool_mod = types.ModuleType("agent.credential_pool")
def fake_fetch_account_usage(provider, *, base_url=None, api_key=None):
fetch_calls.append((provider, base_url, api_key))
return None
class FakePool:
def select(self):
selected.append(True)
return SimpleNamespace(
runtime_api_key=token,
runtime_base_url="https://chatgpt.com/backend-api/codex",
)
def fake_load_pool(provider):
load_pool_calls.append(provider)
return FakePool()
def fake_urlopen(req, timeout):
seen["url"] = req.full_url
seen["timeout"] = timeout
seen["headers"] = {key.lower(): value for key, value in req.header_items()}
payload = {
"plan_type": "pro",
"rate_limit": {
"primary_window": {"used_percent": 15, "reset_at": 1_900_000_000},
"secondary_window": {"used_percent": 40, "reset_at": "2030-03-24T12:30:00Z"},
},
"credits": {"has_credits": True, "balance": 12.5},
}
return _FakeResponse(json.dumps(payload).encode("utf-8"))
account_usage_mod.fetch_account_usage = fake_fetch_account_usage
credential_pool_mod.load_pool = fake_load_pool
monkeypatch.setitem(sys.modules, "agent", agent_mod)
monkeypatch.setitem(sys.modules, "agent.account_usage", account_usage_mod)
monkeypatch.setitem(sys.modules, "agent.credential_pool", credential_pool_mod)
monkeypatch.setattr(providers.urllib.request, "urlopen", fake_urlopen)
monkeypatch.setattr(sys, "argv", ["quota-probe", "openai-codex", ""])
exec(providers._ACCOUNT_USAGE_SUBPROCESS_CODE, {"__name__": "__main__"})
output = capsys.readouterr().out.strip()
snapshot = json.loads(output)
assert fetch_calls == [("openai-codex", None, None)]
assert load_pool_calls == ["openai-codex"]
assert selected == [True]
assert seen["url"] == "https://chatgpt.com/backend-api/wham/usage"
assert seen["timeout"] == 15.0
headers = seen["headers"]
assert headers["authorization"] == f"Bearer {token}"
assert headers["accept"] == "application/json"
assert headers["originator"] == "codex_cli_rs"
assert headers["user-agent"].startswith("codex_cli_rs/")
assert headers["chatgpt-account-id"] == "acct-test-123"
assert snapshot["provider"] == "openai-codex"
assert snapshot["source"] == "usage_api"
assert snapshot["plan"] == "Pro"
assert snapshot["windows"][0]["label"] == "Session"
assert snapshot["windows"][0]["used_percent"] == 15.0
assert snapshot["windows"][1]["label"] == "Weekly"
assert snapshot["details"] == ["Credits balance: $12.50"]
assert snapshot["available"] is True
assert token not in output
def test_codex_account_usage_subprocess_keeps_legacy_reason_when_pool_misses(monkeypatch, capsys):
"""A failed pool fallback should not discard the legacy unavailable reason."""
import api.providers as providers
fetch_calls = []
load_pool_calls = []
agent_mod = types.ModuleType("agent")
agent_mod.__path__ = []
account_usage_mod = types.ModuleType("agent.account_usage")
credential_pool_mod = types.ModuleType("agent.credential_pool")
def fake_fetch_account_usage(provider, *, base_url=None, api_key=None):
fetch_calls.append((provider, base_url, api_key))
return SimpleNamespace(
provider="openai-codex",
source="usage_api",
title="Account limits",
plan=None,
windows=(),
details=(),
available=False,
unavailable_reason="Codex account limits are not available for this credential.",
fetched_at=datetime(2030, 3, 17, 12, 30, tzinfo=timezone.utc),
)
class EmptyPool:
def select(self):
return None
def fake_load_pool(provider):
load_pool_calls.append(provider)
return EmptyPool()
def explode_urlopen(*_args, **_kwargs):
raise AssertionError("no network call should happen when the pool has no selected entry")
account_usage_mod.fetch_account_usage = fake_fetch_account_usage
credential_pool_mod.load_pool = fake_load_pool
monkeypatch.setitem(sys.modules, "agent", agent_mod)
monkeypatch.setitem(sys.modules, "agent.account_usage", account_usage_mod)
monkeypatch.setitem(sys.modules, "agent.credential_pool", credential_pool_mod)
monkeypatch.setattr(providers.urllib.request, "urlopen", explode_urlopen)
monkeypatch.setattr(sys, "argv", ["quota-probe", "openai-codex", ""])
exec(providers._ACCOUNT_USAGE_SUBPROCESS_CODE, {"__name__": "__main__"})
snapshot = json.loads(capsys.readouterr().out.strip())
assert fetch_calls == [("openai-codex", None, None)]
assert load_pool_calls == ["openai-codex"]
assert snapshot["available"] is False
assert snapshot["unavailable_reason"] == "Codex account limits are not available for this credential."
assert snapshot["fetched_at"] == "2030-03-17T12:30:00Z"
def test_anthropic_oauth_usage_unavailable_reason_is_reported(monkeypatch, tmp_path):
"""Hermes Agent can report why account limits are not available."""
monkeypatch.setattr(profiles, "get_active_hermes_home", lambda: tmp_path)