Skip to content

Commit 394032c

Browse files
dmitriyzhukclaude
andcommitted
feat(llm): per-token rate-limit pool with smart selection
Each OAuth/API token now tracks its own cooldown state. When Anthropic returns 429, the token is marked rate-limited (Retry-After respected, or exponential backoff capped at 10min) and selectClientIndex() routes the next call to the first available token instead of retrying the same one. Reduces "shared quota" stalls when multiple agents chew through the same credential set: one token's cooldown no longer blocks the rest. Backward compatible — single-token configs see only the success path of markHealthy() with no behavior change. Auth-error fallback path unchanged. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent e7ed98a commit 394032c

3 files changed

Lines changed: 176 additions & 50 deletions

File tree

IMPROVEMENT_PLAN.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,9 @@ Currently `src/slices/bot/voice/` only does TTS. Add Whisper-based STT for inbou
165165

166166
| # | Item | Status |
167167
|---|---|---|
168-
| 1 | Auxiliary LLM client | in progress |
168+
| 1 | Auxiliary LLM client | shipped — v0.4.0 |
169+
| 1b | Per-token rate-limit pool (Phase A of credential-pool feature) | shipped — v0.5.0 |
170+
| 1c | Usage reporter to ranch (Phase C of credential-pool feature) | planned — v0.6.0 |
169171
| 2 | Cross-session FTS5 search | planned |
170172
| 3 | Provider plugin pattern | planned |
171173
| 4 | Curator | planned |

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@cleanslice/runtime",
3-
"version": "0.4.0",
3+
"version": "0.5.0",
44
"license": "MIT",
55
"type": "module",
66
"bin": {

src/slices/setup/llm/data/repositories/claude/claude.repository.ts

Lines changed: 172 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,47 @@ function isOverloadedError(err: unknown): boolean {
5353
return msg.includes("overloaded_error") || msg.includes("Overloaded")
5454
}
5555

56+
/** Detect HTTP 429 rate-limit. Per-token problem (cooldown), not per-model. */
57+
function isRateLimitError(err: unknown): boolean {
58+
if (!err || typeof err !== "object") return false
59+
const e = err as Record<string, unknown>
60+
if (e.status === 429) return true
61+
const msg = String(e.message ?? e.error ?? "")
62+
return msg.includes("rate_limit") || msg.includes("Rate limit")
63+
}
64+
65+
/**
66+
* Parse Retry-After header from a rate-limit error if present. Returns ms or
67+
* undefined. Anthropic sets either seconds (number) or HTTP-date.
68+
*/
69+
function parseRetryAfter(err: unknown): number | undefined {
70+
if (!err || typeof err !== "object") return undefined
71+
const headers = (err as { headers?: Record<string, string> }).headers
72+
const raw = headers?.["retry-after"] ?? headers?.["Retry-After"]
73+
if (!raw) return undefined
74+
const n = Number(raw)
75+
if (!isNaN(n) && n > 0) return n * 1000
76+
const date = Date.parse(raw)
77+
if (!isNaN(date)) return Math.max(0, date - Date.now())
78+
return undefined
79+
}
80+
81+
/**
82+
* Per-token rate-limit state. We keep one entry per OAuth token; a token in
83+
* cooldown is skipped by selectClientIndex() until its window expires.
84+
*
85+
* Why per-token, not per-credential: a credential is a *config* concept, but
86+
* Anthropic enforces limits per OAuth subject. Two tokens from the same user
87+
* share a quota in practice, but the SDK gives us no way to know that — we
88+
* track each token independently and let the server tell us via 429.
89+
*/
90+
interface TokenState {
91+
/** Epoch ms; 0 means available. */
92+
cooldownUntil: number
93+
/** For exponential cooldown when 429s repeat without recovery. */
94+
consecutive429s: number
95+
}
96+
5697
/**
5798
* Retry a LLM call up to maxAttempts times with exponential backoff.
5899
* Returns true if succeeded (result via out param), false if all attempts failed with overload.
@@ -103,6 +144,7 @@ export class ClaudeRepository implements ILlmGateway {
103144
private clients: Anthropic[] = [] // OAuth pool (index 0 = primary, 1,2... = fallbacks)
104145
private apiKeyClient: Anthropic | undefined // API key client (last resort)
105146
private currentClientIndex = 0
147+
private tokenStates: TokenState[] = [] // per-client rate-limit state, parallel to clients
106148
private model: string
107149
private fallbackModel: string | undefined
108150
private apiKey: string | undefined
@@ -191,10 +233,64 @@ export class ClaudeRepository implements ILlmGateway {
191233
this.apiKeyClient = undefined
192234
}
193235

236+
// Parallel state array. Always rebuild on init; switchToNextClient may
237+
// also extend this when promoting the apiKeyClient into the pool.
238+
this.tokenStates = this.clients.map(() => ({ cooldownUntil: 0, consecutive429s: 0 }))
239+
194240
console.log(`[llm] claude initialized — oauth=${oauthClients.length}, apiKeyFallback=${apiKeyClient ? "yes" : "no"}`)
195241
this.initialized = true
196242
}
197243

244+
/**
245+
* Pick the best client index for the next call. Prefers any client not in
246+
* cooldown; if every client is currently cooled-down, returns the one whose
247+
* cooldown expires soonest so the next retry has the best chance.
248+
*/
249+
private selectClientIndex(): number {
250+
const now = Date.now()
251+
for (let i = 0; i < this.clients.length; i++) {
252+
if (this.tokenStates[i] && this.tokenStates[i].cooldownUntil <= now) return i
253+
}
254+
// All in cooldown — return the one closest to expiry. Caller will likely
255+
// hit 429 again; that's OK, the cooldown re-extends.
256+
let best = 0
257+
let bestUntil = this.tokenStates[0]?.cooldownUntil ?? 0
258+
for (let i = 1; i < this.clients.length; i++) {
259+
const until = this.tokenStates[i]?.cooldownUntil ?? 0
260+
if (until < bestUntil) {
261+
best = i
262+
bestUntil = until
263+
}
264+
}
265+
return best
266+
}
267+
268+
/**
269+
* Mark a client as rate-limited. Uses Anthropic's Retry-After when present;
270+
* otherwise an exponential backoff capped at 10 minutes — Anthropic OAuth
271+
* limits typically reset within minutes, so longer waits are pointless.
272+
*/
273+
private markRateLimited(index: number, retryAfterMs?: number): void {
274+
const state = this.tokenStates[index]
275+
if (!state) return
276+
state.consecutive429s++
277+
const cooldown = retryAfterMs ?? Math.min(60_000 * Math.pow(2, state.consecutive429s - 1), 600_000)
278+
state.cooldownUntil = Date.now() + cooldown
279+
const available = this.tokenStates.filter(s => s.cooldownUntil <= Date.now()).length
280+
console.warn(
281+
`[llm] token[${index}] rate-limited, cooldown ${Math.round(cooldown / 1000)}s ` +
282+
`(consecutive=${state.consecutive429s}, available=${available}/${this.clients.length})`,
283+
)
284+
}
285+
286+
/** Reset 429 counter on success — token is healthy again. */
287+
private markHealthy(index: number): void {
288+
const state = this.tokenStates[index]
289+
if (state && state.consecutive429s > 0) {
290+
state.consecutive429s = 0
291+
}
292+
}
293+
198294
/** Get current active client. Throws if init never populated any clients. */
199295
private getClient(): Anthropic {
200296
const client = this.clients[this.currentClientIndex] ?? this.clients[0]
@@ -221,6 +317,8 @@ export class ClaudeRepository implements ILlmGateway {
221317
void sendAdminAlert(`🚨 All OAuth tokens exhausted!\n\nUsing API key fallback. Update tokens in .env`)
222318
// Replace current with apiKeyClient
223319
this.clients[this.currentClientIndex] = this.apiKeyClient
320+
// Reset state for the slot — apiKeyClient is fresh, no prior 429s.
321+
this.tokenStates[this.currentClientIndex] = { cooldownUntil: 0, consecutive429s: 0 }
224322
this.apiKeyClient = undefined
225323
return true
226324
}
@@ -272,58 +370,74 @@ export class ClaudeRepository implements ILlmGateway {
272370

273371
const result = await withRetry(
274372
async () => {
373+
// Re-select per attempt so 429-marked tokens are avoided on retry.
374+
this.currentClientIndex = this.selectClientIndex()
375+
const attemptIndex = this.currentClientIndex
275376
let accumulated = ""
276377
const toolCalls: Array<{ name: string; params: unknown }> = []
277378
const pendingTools = new Map<number, { id: string; name: string; jsonStr: string }>()
278379
let streamUsage: { input_tokens: number; output_tokens: number } | undefined
279380
let stopReason: string | undefined
280381

281-
const streamResponse = await this.getClient().messages.stream({
282-
model,
283-
max_tokens: this.maxTokens,
284-
system: systemPrompt,
285-
messages,
286-
...(anthropicTools.length > 0 ? { tools: anthropicTools } : {}),
287-
})
288-
289-
for await (const event of streamResponse) {
290-
if (event.type === "content_block_start" && event.content_block.type === "tool_use") {
291-
pendingTools.set(event.index, {
292-
id: event.content_block.id,
293-
name: event.content_block.name,
294-
jsonStr: "",
295-
})
296-
} else if (event.type === "content_block_delta") {
297-
if (event.delta.type === "text_delta") {
298-
accumulated += event.delta.text
299-
// While inside <thinking> block, show indicator instead of raw content
300-
if (accumulated.includes("<thinking>") && !accumulated.includes("</thinking>")) {
301-
onChunk("💭")
302-
} else {
303-
onChunk(stripThinking(accumulated))
382+
let streamResponse
383+
try {
384+
streamResponse = await this.getClient().messages.stream({
385+
model,
386+
max_tokens: this.maxTokens,
387+
system: systemPrompt,
388+
messages,
389+
...(anthropicTools.length > 0 ? { tools: anthropicTools } : {}),
390+
})
391+
} catch (err) {
392+
if (isRateLimitError(err)) this.markRateLimited(attemptIndex, parseRetryAfter(err))
393+
throw err
394+
}
395+
396+
try {
397+
for await (const event of streamResponse) {
398+
if (event.type === "content_block_start" && event.content_block.type === "tool_use") {
399+
pendingTools.set(event.index, {
400+
id: event.content_block.id,
401+
name: event.content_block.name,
402+
jsonStr: "",
403+
})
404+
} else if (event.type === "content_block_delta") {
405+
if (event.delta.type === "text_delta") {
406+
accumulated += event.delta.text
407+
// While inside <thinking> block, show indicator instead of raw content
408+
if (accumulated.includes("<thinking>") && !accumulated.includes("</thinking>")) {
409+
onChunk("💭")
410+
} else {
411+
onChunk(stripThinking(accumulated))
412+
}
413+
} else if (event.delta.type === "input_json_delta") {
414+
const pending = pendingTools.get(event.index)
415+
if (pending) pending.jsonStr += event.delta.partial_json
304416
}
305-
} else if (event.delta.type === "input_json_delta") {
417+
} else if (event.type === "content_block_stop") {
306418
const pending = pendingTools.get(event.index)
307-
if (pending) pending.jsonStr += event.delta.partial_json
308-
}
309-
} else if (event.type === "content_block_stop") {
310-
const pending = pendingTools.get(event.index)
311-
if (pending) {
312-
try {
313-
toolCalls.push({ name: pending.name, params: JSON.parse(pending.jsonStr || "{}") })
314-
} catch {
315-
toolCalls.push({ name: pending.name, params: {} })
419+
if (pending) {
420+
try {
421+
toolCalls.push({ name: pending.name, params: JSON.parse(pending.jsonStr || "{}") })
422+
} catch {
423+
toolCalls.push({ name: pending.name, params: {} })
424+
}
425+
pendingTools.delete(event.index)
316426
}
317-
pendingTools.delete(event.index)
427+
} else if (event.type === "message_delta") {
428+
if ((event as any).usage) streamUsage = (event as any).usage
429+
if ((event as any).delta?.stop_reason) stopReason = (event as any).delta.stop_reason
430+
} else if (event.type === "message_start" && (event as any).message?.usage) {
431+
streamUsage = (event as any).message.usage
318432
}
319-
} else if (event.type === "message_delta") {
320-
if ((event as any).usage) streamUsage = (event as any).usage
321-
if ((event as any).delta?.stop_reason) stopReason = (event as any).delta.stop_reason
322-
} else if (event.type === "message_start" && (event as any).message?.usage) {
323-
streamUsage = (event as any).message.usage
324433
}
434+
} catch (err) {
435+
if (isRateLimitError(err)) this.markRateLimited(attemptIndex, parseRetryAfter(err))
436+
throw err
325437
}
326438

439+
// Success — clear any prior 429 streak on this token.
440+
this.markHealthy(attemptIndex)
327441
return {
328442
text: stripThinking(accumulated),
329443
toolCalls: toolCalls.length > 0 ? toolCalls : undefined,
@@ -332,7 +446,7 @@ export class ClaudeRepository implements ILlmGateway {
332446
inputTokens: streamUsage.input_tokens,
333447
outputTokens: streamUsage.output_tokens,
334448
totalTokens: streamUsage.input_tokens + streamUsage.output_tokens,
335-
credentialId: `oauth-${this.currentClientIndex}`,
449+
credentialId: `oauth-${attemptIndex}`,
336450
} : undefined,
337451
} as ModelResponse
338452
},
@@ -393,13 +507,22 @@ export class ClaudeRepository implements ILlmGateway {
393507

394508
const result = await withRetry(
395509
async () => {
396-
const response = await this.getClient().messages.create({
397-
model,
398-
max_tokens: this.maxTokens,
399-
system: systemPrompt,
400-
messages,
401-
...(anthropicTools.length > 0 ? { tools: anthropicTools } : {}),
402-
})
510+
// Re-select per attempt so 429-marked tokens are avoided on retry.
511+
this.currentClientIndex = this.selectClientIndex()
512+
const attemptIndex = this.currentClientIndex
513+
let response
514+
try {
515+
response = await this.getClient().messages.create({
516+
model,
517+
max_tokens: this.maxTokens,
518+
system: systemPrompt,
519+
messages,
520+
...(anthropicTools.length > 0 ? { tools: anthropicTools } : {}),
521+
})
522+
} catch (err) {
523+
if (isRateLimitError(err)) this.markRateLimited(attemptIndex, parseRetryAfter(err))
524+
throw err
525+
}
403526

404527
const text = response.content
405528
.filter(b => b.type === "text")
@@ -413,6 +536,7 @@ export class ClaudeRepository implements ILlmGateway {
413536
return { name: block.name, params: block.input }
414537
})
415538

539+
this.markHealthy(attemptIndex)
416540
return {
417541
text: stripThinking(text),
418542
toolCalls: toolCalls.length > 0 ? toolCalls : undefined,
@@ -421,7 +545,7 @@ export class ClaudeRepository implements ILlmGateway {
421545
inputTokens: response.usage.input_tokens,
422546
outputTokens: response.usage.output_tokens,
423547
totalTokens: response.usage.input_tokens + response.usage.output_tokens,
424-
credentialId: `oauth-${this.currentClientIndex}`,
548+
credentialId: `oauth-${attemptIndex}`,
425549
} : undefined,
426550
} as ModelResponse
427551
},

0 commit comments

Comments
 (0)