feat(api): OpenAI-compatible /v1 endpoint surface (chat, embeddings, audio, realtime)#854
feat(api): OpenAI-compatible /v1 endpoint surface (chat, embeddings, audio, realtime)#854HexaField wants to merge 2 commits into
Conversation
…audio, realtime)
Adds an industry-standard, OpenAI-compatible HTTP/WS API mounted at
\`/v1\` alongside the existing AD4M WS-RPC transport. Any tool that
speaks OpenAI — the official \`openai\` SDKs, LangChain, LlamaIndex,
the Vercel AI SDK, Cursor/Continue, Open WebUI, \`llm\` — can point
\`base_url\` at the executor and just work.
All five proposal phases land together.
Surface
=======
GET /v1/models list registered models
POST /v1/chat/completions stateless chat, oneshot + SSE streaming
POST /v1/completions legacy text-completion shim
POST /v1/embeddings raw f32 arrays (no zlib+b64)
POST /v1/audio/transcriptions multipart batch whisper
POST /v1/audio/speech TTS, remote OpenAI-compatible passthrough
GET /v1/realtime WS, OpenAI Realtime-style streaming STT
Also mounted at \`/api/v1/openai/v1\` for proxies that hard-code the
existing AD4M prefix.
What changed
============
New module \`api/openai_compat/\` (11 files, ~1200 LOC):
mod.rs module wiring
router.rs axum router mounted at /v1 and /api/v1/openai/v1
types.rs serde structs mirroring OpenAI's wire schema
errors.rs { error: { message, type, param, code } } envelope
model_selector.rs resolve model strings → AD4M model_id (id → name → "default")
models.rs GET /v1/models
chat.rs POST /v1/chat/completions (oneshot + SSE) + completions
embeddings.rs POST /v1/embeddings — raw f32 arrays
audio.rs POST /v1/audio/transcriptions + speech
tts_passthrough.rs OpenAI-compatible upstream passthrough for /v1/audio/speech
realtime.rs GET /v1/realtime WebSocket (subset of OpenAI Realtime STT)
AIService surface additions
============================
AIService::prompt_messages(model_id, messages: Vec<(role, content)>)
Stateless ephemeral task: build → spawn → prompt → remove on the
existing per-model LLM thread. No DB writes.
AIService::prompt_messages_stream(model_id, messages)
Same shape but the LLM thread forwards each Kalosm token through
an mpsc::UnboundedReceiver<String>. Backs stream: true. Local
Kalosm uses ChatResponseBuilder's Stream<Item=String> impl; remote
upstreams degrade to one terminal chunk (native streaming upstream
client is a follow-up).
AIService::transcribe_buffer(model_id, samples, auth_token)
One-shot transcription: open → feed → drain broadcast → close.
Backs the batch /v1/audio/transcriptions endpoint.
LLMTaskRequest::PromptStream new variant in the per-model thread's
command channel; carries token_sender + done_sender.
PromptResult now derives Debug + Clone (needed for oneshot transit).
Auth + billing
===============
Reuses the existing JWT extractor (AuthContext from auth.rs) and
capability framework verbatim. Authorization: Bearer <jwt> (or
?token= for the realtime WS, same as events_ws). Capabilities map:
chat/completions/embeddings → AI_PROMPT
transcriptions + realtime → AI_TRANSCRIBE
speech → AI_PROMPT
models → AI_READ
bill_compute deducts per-user credits with the existing operation
labels (ai_prompt, ai_embedding, ai_transcription, ai_tts).
InsufficientCredits → HTTP 429 with type: "insufficient_quota"
matching OpenAI's quota-exhaustion shape so SDKs retrying on 429 do
the right thing.
Error envelope
===============
Every error response carries:
{ "error": { "message", "type", "param", "code" } }
With correct HTTP status (400 / 401 / 403 / 404 / 429 / 500 / 501).
Audio decode (Phase 3 scope note)
==================================
Batch transcription accepts 16 kHz mono PCM WAV out of the box (the
simplest container that requires no new deps and matches Whisper's
reference format). Other containers (mp3, m4a, ogg, flac) return a
400 pointing at the Content-Type so clients can transcode locally;
full container support via \`symphonia\` + \`rubato\` is a self-
contained follow-up that wasn't worth pulling into the same diff.
TTS backend (Phase 4 scope note)
=================================
/v1/audio/speech forwards to a configured OpenAI-compatible upstream
via the existing reqwest dep (no new deps). The proposal allows
passthrough as a valid Phase-4 deliverable alongside local
Kokoro/Piper backends; the local engines pull in ort + voice model
assets that deserve their own review and didn't make sense to ship
in the same diff. No upstream configured → 501 with a clear message.
Dependency surface
===================
Only change: axum gains the \`multipart\` feature (already had \`ws\`).
No new top-level deps.
Native surface unchanged
=========================
The WS-RPC ai.* methods, core/AIClient, Flux, and the launcher are
untouched. Existing clients see no behaviour change.
Refs: \`PROPOSAL_AI_OPENAI_COMPATIBLE_ENDPOINT.md\`
|
Important Review skippedDraft detected. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 7
🧹 Nitpick comments (2)
rust-executor/src/api/openai_compat/realtime.rs (1)
171-186: ⚖️ Poor tradeoffSynchronous drain may miss transcription deltas between appends.
The
try_recv()loop (line 178) only drains events available at the moment of the append. If Whisper produces transcription output after the drain completes but before the nextappendarrives, those deltas won't be delivered until the subsequent append—or not at all if the client stops sending audio.Consider spawning a background forwarder task that continuously drains the broadcast receiver, or documenting this as a known limitation of the simplified protocol.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@rust-executor/src/api/openai_compat/realtime.rs` around lines 171 - 186, The synchronous drain using try_recv() in the while loop only retrieves transcription deltas available at the moment of append, which can miss deltas produced by Whisper after the drain completes but before the next append arrives. To fix this, either spawn a background forwarder task that continuously drains the broadcast receiver (rx) in a separate async task instead of draining synchronously on each append, or add clear documentation explaining this behavior as a known limitation where transcription deltas may be delayed or missed if audio appends stop after Whisper produces output.rust-executor/src/api/openai_compat/chat.rs (1)
247-262: ⚡ Quick win
done_rxresult is ignored; consider using it for accurate billing.The
done_rxreceiver carriesPromptResultwith actualprompt_tokensandcompletion_tokens, butlet _ = done_rx.awaitdiscards them. The flat 1.0 billing charge (line 253-257) could be replaced with token-based billing using these counts for parity with the non-streaming path.Suggested improvement
- if let Some(email) = user_email(&auth_clone) { - let _ = bill_compute( - &email, - 1.0, - "ai_prompt", - Some("v1/chat/completions[stream]"), - ); - } - let _ = done_rx.await; + // Await the final result to get token counts for billing + if let Ok(Ok(result)) = done_rx.await { + if let Some(email) = user_email(&auth_clone) { + let total = (result.prompt_tokens + result.completion_tokens) as f64; + let _ = bill_compute( + &email, + total, // or apply a per-token rate + "ai_prompt", + Some("v1/chat/completions[stream]"), + ); + } + }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@rust-executor/src/api/openai_compat/chat.rs` around lines 247 - 262, The code currently discards the result from done_rx.await by using let _ =, but this receiver carries PromptResult containing actual prompt_tokens and completion_tokens that should be used for accurate billing. Instead of the flat 1.0 charge being applied to the bill_compute call, capture the PromptResult from done_rx.await, extract the prompt_tokens and completion_tokens values from it, and use those actual token counts when calling bill_compute to achieve token-based billing parity with the non-streaming chat.completions path.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@rust-executor/src/ai_service/mod.rs`:
- Around line 1164-1171: The issue is in the "user" match arm where consecutive
user messages cause content loss. When a second user message arrives
(current_user is Some), the first user message is temporarily saved to
final_prompt, but then overwritten by setting current_user to the new message,
resulting in the first message being discarded. Instead of replacing prev_user
with current_user, concatenate them together or append the new content to the
existing content so both consecutive user messages are preserved. Modify the
logic where prev_user is taken and content is assigned to current_user to
combine the messages rather than discard the earlier one.
In `@rust-executor/src/api/openai_compat/audio.rs`:
- Around line 229-267: The WAV chunk parsing loop validates that the header can
be read with `pos + 8 <= bytes.len()` but fails to validate that the entire
chunk data fits within the buffer before advancing `pos` by the chunk size.
After reading the chunk size value with `u32::from_le_bytes`, add a bounds check
to ensure `pos + size <= bytes.len()` before processing any chunk data. This
check should occur immediately after parsing the size and before the match
statement, so that any chunk with an inflated size is rejected early rather than
allowing `pos` to advance beyond the buffer bounds, which could cause panics or
undefined behavior on subsequent iterations.
In `@rust-executor/src/api/openai_compat/chat.rs`:
- Around line 80-88: The billing check in the code block (lines 80-88) occurs
after the inference work is already completed via prompt_messages, meaning
compute resources are consumed before validating the user has sufficient
credits. Move the credit check before calling prompt_messages so that users are
rejected upfront if they lack sufficient credits, rather than having work
performed that cannot be billed. Alternatively, if post-inference billing is
required, remove the error return from the bill_compute failure handling and
instead log the billing failure while still returning the inference result to
the user.
In `@rust-executor/src/api/openai_compat/embeddings.rs`:
- Around line 55-57: The current pattern in the bill_compute call only matches
and handles BillingError::InsufficientCredits, which means other billing errors
are silently dropped and the function continues to return a successful 200
response, creating unbilled calls when billing is degraded. Change the error
handling to catch all billing failures from bill_compute, not just
InsufficientCredits. When any Err result is returned from bill_compute, return
an appropriate error response (such as 402 Payment Required) instead of allowing
the embedding response to succeed. This ensures that all billing failures,
regardless of the error type, properly reject the request rather than silently
succeeding.
In `@rust-executor/src/api/openai_compat/model_selector.rs`:
- Around line 24-36: The special case handling for the requested model being
"default" is occurring too early in the resolution logic, before checking if
there is an actual model with the id or name "default". Move the `if requested
== "default"` block that calls `Ad4mDb::with_global_instance` to resolve the
default model (currently checking the keyword and using `get_default_model` with
`type_label`) to after the id/name resolution passes so that exact id/name
matches are attempted first, and only if no model is found by id or name should
the keyword "default" be resolved to the configured default model.
In `@rust-executor/src/api/openai_compat/tts_passthrough.rs`:
- Around line 102-114: The first `.find()` call in the candidate selection logic
only checks if m.api.is_some() but doesn't verify that the api_type is OpenAi,
allowing a non-OpenAI model with the requested name to incorrectly match. Add
the same api_type verification to the first `.find()` condition that checks
m.api.as_ref().map(|a| matches!(a.api_type,
ModelApiType::OpenAi)).unwrap_or(false), so both the exact-match path and the
fallback path enforce that the model is OpenAI-compatible.
- Around line 48-58: The url variable constructed at lines 48-54 is not being
used in the client.post() call at line 58. Instead of using the url variable,
the code rebuilds the URL with format!("{url}/v1/audio/speech"), which is
redundant and likely incorrect. Remove this dead code by using the url variable
directly in the client.post() method call, rather than reconstructing it with an
additional format! macro.
---
Nitpick comments:
In `@rust-executor/src/api/openai_compat/chat.rs`:
- Around line 247-262: The code currently discards the result from done_rx.await
by using let _ =, but this receiver carries PromptResult containing actual
prompt_tokens and completion_tokens that should be used for accurate billing.
Instead of the flat 1.0 charge being applied to the bill_compute call, capture
the PromptResult from done_rx.await, extract the prompt_tokens and
completion_tokens values from it, and use those actual token counts when calling
bill_compute to achieve token-based billing parity with the non-streaming
chat.completions path.
In `@rust-executor/src/api/openai_compat/realtime.rs`:
- Around line 171-186: The synchronous drain using try_recv() in the while loop
only retrieves transcription deltas available at the moment of append, which can
miss deltas produced by Whisper after the drain completes but before the next
append arrives. To fix this, either spawn a background forwarder task that
continuously drains the broadcast receiver (rx) in a separate async task instead
of draining synchronously on each append, or add clear documentation explaining
this behavior as a known limitation where transcription deltas may be delayed or
missed if audio appends stop after Whisper produces output.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: eff96a40-6e67-4337-9fbe-d1a34800b5b6
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (14)
rust-executor/Cargo.tomlrust-executor/src/ai_service/mod.rsrust-executor/src/api/mod.rsrust-executor/src/api/openai_compat/audio.rsrust-executor/src/api/openai_compat/chat.rsrust-executor/src/api/openai_compat/embeddings.rsrust-executor/src/api/openai_compat/errors.rsrust-executor/src/api/openai_compat/mod.rsrust-executor/src/api/openai_compat/model_selector.rsrust-executor/src/api/openai_compat/models.rsrust-executor/src/api/openai_compat/realtime.rsrust-executor/src/api/openai_compat/router.rsrust-executor/src/api/openai_compat/tts_passthrough.rsrust-executor/src/api/openai_compat/types.rs
| "user" => { | ||
| if let Some(prev_user) = current_user.take() { | ||
| // user→user with no assistant in between; treat | ||
| // earlier user as a finalised final prompt slot | ||
| // (will be overwritten if more turns follow). | ||
| final_prompt = prev_user; | ||
| } | ||
| current_user = Some(content.clone()); |
There was a problem hiding this comment.
Consecutive user messages cause content loss.
When two user messages appear without an intervening assistant message (e.g., [user1, user2]), user1 is assigned to final_prompt but then immediately overwritten when user2 sets current_user. At the end of parsing, only user2 survives as final_prompt—user1 is silently discarded.
Consider concatenating consecutive user messages or converting earlier ones into the system prompt:
Proposed fix
"user" => {
if let Some(prev_user) = current_user.take() {
- // user→user with no assistant in between; treat
- // earlier user as a finalised final prompt slot
- // (will be overwritten if more turns follow).
- final_prompt = prev_user;
+ // user→user with no assistant in between;
+ // concatenate into a single user turn.
+ current_user = Some(format!("{}\n{}", prev_user, content));
+ } else {
+ current_user = Some(content.clone());
}
- current_user = Some(content.clone());
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| "user" => { | |
| if let Some(prev_user) = current_user.take() { | |
| // user→user with no assistant in between; treat | |
| // earlier user as a finalised final prompt slot | |
| // (will be overwritten if more turns follow). | |
| final_prompt = prev_user; | |
| } | |
| current_user = Some(content.clone()); | |
| "user" => { | |
| if let Some(prev_user) = current_user.take() { | |
| // user→user with no assistant in between; | |
| // concatenate into a single user turn. | |
| current_user = Some(format!("{}\n{}", prev_user, content)); | |
| } else { | |
| current_user = Some(content.clone()); | |
| } | |
| } |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@rust-executor/src/ai_service/mod.rs` around lines 1164 - 1171, The issue is
in the "user" match arm where consecutive user messages cause content loss. When
a second user message arrives (current_user is Some), the first user message is
temporarily saved to final_prompt, but then overwritten by setting current_user
to the new message, resulting in the first message being discarded. Instead of
replacing prev_user with current_user, concatenate them together or append the
new content to the existing content so both consecutive user messages are
preserved. Modify the logic where prev_user is taken and content is assigned to
current_user to combine the messages rather than discard the earlier one.
| while pos + 8 <= bytes.len() { | ||
| let id = &bytes[pos..pos + 4]; | ||
| let size = u32::from_le_bytes([ | ||
| bytes[pos + 4], | ||
| bytes[pos + 5], | ||
| bytes[pos + 6], | ||
| bytes[pos + 7], | ||
| ]) as usize; | ||
| pos += 8; | ||
| match id { | ||
| b"fmt " => { | ||
| if pos + 16 > bytes.len() { | ||
| return Err(OpenAIError::invalid_request("Truncated WAV fmt chunk")); | ||
| } | ||
| audio_format = u16::from_le_bytes([bytes[pos], bytes[pos + 1]]); | ||
| channels = u16::from_le_bytes([bytes[pos + 2], bytes[pos + 3]]); | ||
| sample_rate = u32::from_le_bytes([ | ||
| bytes[pos + 4], | ||
| bytes[pos + 5], | ||
| bytes[pos + 6], | ||
| bytes[pos + 7], | ||
| ]); | ||
| bits_per_sample = u16::from_le_bytes([bytes[pos + 14], bytes[pos + 15]]); | ||
| pos += size; | ||
| } | ||
| b"data" => { | ||
| data_offset = Some(pos); | ||
| data_size = size; | ||
| pos += size; | ||
| } | ||
| _ => { | ||
| // Skip unknown chunks (LIST, INFO, etc). | ||
| pos += size; | ||
| } | ||
| } | ||
| if pos % 2 != 0 { | ||
| pos += 1; // RIFF chunks are word-aligned. | ||
| } | ||
| } |
There was a problem hiding this comment.
Chunk size validation could read past buffer bounds.
The loop checks pos + 8 <= bytes.len() (line 229) but doesn't validate that pos + size stays within bounds before advancing. A malformed WAV with an inflated chunk size could cause pos to overflow or wrap, potentially reading garbage or causing a panic on subsequent iterations.
Proposed fix
while pos + 8 <= bytes.len() {
let id = &bytes[pos..pos + 4];
let size = u32::from_le_bytes([
bytes[pos + 4],
bytes[pos + 5],
bytes[pos + 6],
bytes[pos + 7],
]) as usize;
pos += 8;
+ // Clamp size to remaining bytes to handle malformed/truncated files
+ let size = size.min(bytes.len().saturating_sub(pos));
match id {📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| while pos + 8 <= bytes.len() { | |
| let id = &bytes[pos..pos + 4]; | |
| let size = u32::from_le_bytes([ | |
| bytes[pos + 4], | |
| bytes[pos + 5], | |
| bytes[pos + 6], | |
| bytes[pos + 7], | |
| ]) as usize; | |
| pos += 8; | |
| match id { | |
| b"fmt " => { | |
| if pos + 16 > bytes.len() { | |
| return Err(OpenAIError::invalid_request("Truncated WAV fmt chunk")); | |
| } | |
| audio_format = u16::from_le_bytes([bytes[pos], bytes[pos + 1]]); | |
| channels = u16::from_le_bytes([bytes[pos + 2], bytes[pos + 3]]); | |
| sample_rate = u32::from_le_bytes([ | |
| bytes[pos + 4], | |
| bytes[pos + 5], | |
| bytes[pos + 6], | |
| bytes[pos + 7], | |
| ]); | |
| bits_per_sample = u16::from_le_bytes([bytes[pos + 14], bytes[pos + 15]]); | |
| pos += size; | |
| } | |
| b"data" => { | |
| data_offset = Some(pos); | |
| data_size = size; | |
| pos += size; | |
| } | |
| _ => { | |
| // Skip unknown chunks (LIST, INFO, etc). | |
| pos += size; | |
| } | |
| } | |
| if pos % 2 != 0 { | |
| pos += 1; // RIFF chunks are word-aligned. | |
| } | |
| } | |
| while pos + 8 <= bytes.len() { | |
| let id = &bytes[pos..pos + 4]; | |
| let size = u32::from_le_bytes([ | |
| bytes[pos + 4], | |
| bytes[pos + 5], | |
| bytes[pos + 6], | |
| bytes[pos + 7], | |
| ]) as usize; | |
| pos += 8; | |
| // Clamp size to remaining bytes to handle malformed/truncated files | |
| let size = size.min(bytes.len().saturating_sub(pos)); | |
| match id { | |
| b"fmt " => { | |
| if pos + 16 > bytes.len() { | |
| return Err(OpenAIError::invalid_request("Truncated WAV fmt chunk")); | |
| } | |
| audio_format = u16::from_le_bytes([bytes[pos], bytes[pos + 1]]); | |
| channels = u16::from_le_bytes([bytes[pos + 2], bytes[pos + 3]]); | |
| sample_rate = u32::from_le_bytes([ | |
| bytes[pos + 4], | |
| bytes[pos + 5], | |
| bytes[pos + 6], | |
| bytes[pos + 7], | |
| ]); | |
| bits_per_sample = u16::from_le_bytes([bytes[pos + 14], bytes[pos + 15]]); | |
| pos += size; | |
| } | |
| b"data" => { | |
| data_offset = Some(pos); | |
| data_size = size; | |
| pos += size; | |
| } | |
| _ => { | |
| // Skip unknown chunks (LIST, INFO, etc). | |
| pos += size; | |
| } | |
| } | |
| if pos % 2 != 0 { | |
| pos += 1; // RIFF chunks are word-aligned. | |
| } | |
| } |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@rust-executor/src/api/openai_compat/audio.rs` around lines 229 - 267, The WAV
chunk parsing loop validates that the header can be read with `pos + 8 <=
bytes.len()` but fails to validate that the entire chunk data fits within the
buffer before advancing `pos` by the chunk size. After reading the chunk size
value with `u32::from_le_bytes`, add a bounds check to ensure `pos + size <=
bytes.len()` before processing any chunk data. This check should occur
immediately after parsing the size and before the match statement, so that any
chunk with an inflated size is rejected early rather than allowing `pos` to
advance beyond the buffer bounds, which could cause panics or undefined behavior
on subsequent iterations.
| if let Some(email) = user_email(&auth) { | ||
| if let Err(BillingError::InsufficientCredits) = | ||
| bill_compute(&email, 1.0, "ai_prompt", Some("v1/completions")) | ||
| { | ||
| return Err(OpenAIError::insufficient_quota( | ||
| "Insufficient compute credits", | ||
| )); | ||
| } | ||
| } |
There was a problem hiding this comment.
Billing check after inference creates inconsistent behavior.
The billing check runs after prompt_messages completes (line 76-78), so the compute is consumed before checking credits. If the user has insufficient credits, they receive an error but the work was already done. This differs from typical pre-check patterns that reject requests upfront.
Consider either:
- Checking credits before inference and failing fast
- Removing the error return here (log the billing failure but return the result)
The current approach risks delivering no value despite consuming resources.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@rust-executor/src/api/openai_compat/chat.rs` around lines 80 - 88, The
billing check in the code block (lines 80-88) occurs after the inference work is
already completed via prompt_messages, meaning compute resources are consumed
before validating the user has sufficient credits. Move the credit check before
calling prompt_messages so that users are rejected upfront if they lack
sufficient credits, rather than having work performed that cannot be billed.
Alternatively, if post-inference billing is required, remove the error return
from the bill_compute failure handling and instead log the billing failure while
still returning the inference result to the user.
| if let Err(BillingError::InsufficientCredits) = | ||
| bill_compute(&email, 1.0, "ai_embedding", Some("v1/embeddings")) | ||
| { |
There was a problem hiding this comment.
Don’t swallow non-quota billing failures
Lines 55-57 only handle InsufficientCredits; other billing errors are dropped and the embedding response still returns 200. That creates unbilled successful calls when billing is degraded.
Suggested patch
- if let Some(email) = crate::agent::capabilities::user_email_from_token(auth.auth_token.clone())
- {
- if let Err(BillingError::InsufficientCredits) =
- bill_compute(&email, 1.0, "ai_embedding", Some("v1/embeddings"))
- {
- return Err(OpenAIError::insufficient_quota(
- "Insufficient compute credits",
- ));
- }
- }
+ if let Some(email) = crate::agent::capabilities::user_email_from_token(auth.auth_token.clone())
+ {
+ match bill_compute(&email, 1.0, "ai_embedding", Some("v1/embeddings")) {
+ Ok(_) => {}
+ Err(BillingError::InsufficientCredits) => {
+ return Err(OpenAIError::insufficient_quota(
+ "Insufficient compute credits",
+ ));
+ }
+ Err(e) => {
+ return Err(OpenAIError::internal(format!("Billing error: {e}")));
+ }
+ }
+ }📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if let Err(BillingError::InsufficientCredits) = | |
| bill_compute(&email, 1.0, "ai_embedding", Some("v1/embeddings")) | |
| { | |
| if let Some(email) = crate::agent::capabilities::user_email_from_token(auth.auth_token.clone()) | |
| { | |
| match bill_compute(&email, 1.0, "ai_embedding", Some("v1/embeddings")) { | |
| Ok(_) => {} | |
| Err(BillingError::InsufficientCredits) => { | |
| return Err(OpenAIError::insufficient_quota( | |
| "Insufficient compute credits", | |
| )); | |
| } | |
| Err(e) => { | |
| return Err(OpenAIError::internal(format!("Billing error: {e}"))); | |
| } | |
| } | |
| } |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@rust-executor/src/api/openai_compat/embeddings.rs` around lines 55 - 57, The
current pattern in the bill_compute call only matches and handles
BillingError::InsufficientCredits, which means other billing errors are silently
dropped and the function continues to return a successful 200 response, creating
unbilled calls when billing is degraded. Change the error handling to catch all
billing failures from bill_compute, not just InsufficientCredits. When any Err
result is returned from bill_compute, return an appropriate error response (such
as 402 Payment Required) instead of allowing the embedding response to succeed.
This ensures that all billing failures, regardless of the error type, properly
reject the request rather than silently succeeding.
| // Resolve `default` via the DB's per-type pointer. This mirrors | ||
| // `AIService::replace_model_variables` so OpenAI callers can use the | ||
| // same shorthand as native AD4M ones. | ||
| if requested == "default" { | ||
| return Ad4mDb::with_global_instance(|db| db.get_default_model(expected_type.clone())) | ||
| .map_err(|e| OpenAIError::internal(format!("Database error: {e}")))? | ||
| .ok_or_else(|| { | ||
| OpenAIError::not_found(format!( | ||
| "No default {} model configured on this executor", | ||
| type_label(&expected_type) | ||
| )) | ||
| }); | ||
| } |
There was a problem hiding this comment.
Honor the documented resolution precedence for "default"
Line 27 resolves "default" before the id/name passes, which makes any real model id/name "default" unreachable. Move the keyword handling after Lines 42-65 to preserve id/name-first semantics.
Suggested patch
- // Resolve `default` via the DB's per-type pointer. This mirrors
- // `AIService::replace_model_variables` so OpenAI callers can use the
- // same shorthand as native AD4M ones.
- if requested == "default" {
- return Ad4mDb::with_global_instance(|db| db.get_default_model(expected_type.clone()))
- .map_err(|e| OpenAIError::internal(format!("Database error: {e}")))?
- .ok_or_else(|| {
- OpenAIError::not_found(format!(
- "No default {} model configured on this executor",
- type_label(&expected_type)
- ))
- });
- }
-
let models = Ad4mDb::with_global_instance(|db| db.get_models())
.map_err(|e| OpenAIError::internal(format!("Database error listing models: {e}")))?;
@@
if let Some(m) = models.iter().find(|m| m.name == requested) {
@@
return Ok(m.id.clone());
}
+
+ // Pass 3: reserved keyword `default`.
+ if requested == "default" {
+ return Ad4mDb::with_global_instance(|db| db.get_default_model(expected_type.clone()))
+ .map_err(|e| OpenAIError::internal(format!("Database error: {e}")))?
+ .ok_or_else(|| {
+ OpenAIError::not_found(format!(
+ "No default {} model configured on this executor",
+ type_label(&expected_type)
+ ))
+ });
+ }Also applies to: 41-65
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@rust-executor/src/api/openai_compat/model_selector.rs` around lines 24 - 36,
The special case handling for the requested model being "default" is occurring
too early in the resolution logic, before checking if there is an actual model
with the id or name "default". Move the `if requested == "default"` block that
calls `Ad4mDb::with_global_instance` to resolve the default model (currently
checking the keyword and using `get_default_model` with `type_label`) to after
the id/name resolution passes so that exact id/name matches are attempted first,
and only if no model is found by id or name should the keyword "default" be
resolved to the configured default model.
| let url = format!( | ||
| "{}/audio/speech", | ||
| upstream | ||
| .base_url | ||
| .trim_end_matches('/') | ||
| .trim_end_matches("/v1"), | ||
| ); | ||
|
|
||
| let client = reqwest::Client::new(); | ||
| let resp = client | ||
| .post(format!("{url}/v1/audio/speech")) |
There was a problem hiding this comment.
Dead code: url on line 48 is immediately discarded.
Line 48 builds a url string including /audio/speech, but line 58 rebuilds the URL from the trimmed base_url and appends /v1/audio/speech again. The intermediate format! at line 48-54 is dead code.
Proposed fix
- let url = format!(
- "{}/audio/speech",
- upstream
- .base_url
- .trim_end_matches('/')
- .trim_end_matches("/v1"),
- );
+ let base = upstream
+ .base_url
+ .trim_end_matches('/')
+ .trim_end_matches("/v1");
let client = reqwest::Client::new();
let resp = client
- .post(format!("{url}/v1/audio/speech"))
+ .post(format!("{base}/v1/audio/speech"))
.bearer_auth(&upstream.api_key)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| let url = format!( | |
| "{}/audio/speech", | |
| upstream | |
| .base_url | |
| .trim_end_matches('/') | |
| .trim_end_matches("/v1"), | |
| ); | |
| let client = reqwest::Client::new(); | |
| let resp = client | |
| .post(format!("{url}/v1/audio/speech")) | |
| let base = upstream | |
| .base_url | |
| .trim_end_matches('/') | |
| .trim_end_matches("/v1"); | |
| let client = reqwest::Client::new(); | |
| let resp = client | |
| .post(format!("{base}/v1/audio/speech")) | |
| .bearer_auth(&upstream.api_key) |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@rust-executor/src/api/openai_compat/tts_passthrough.rs` around lines 48 - 58,
The url variable constructed at lines 48-54 is not being used in the
client.post() call at line 58. Instead of using the url variable, the code
rebuilds the URL with format!("{url}/v1/audio/speech"), which is redundant and
likely incorrect. Remove this dead code by using the url variable directly in
the client.post() method call, rather than reconstructing it with an additional
format! macro.
| let candidate = models | ||
| .iter() | ||
| .find(|m| m.model_type == ModelType::Llm && m.api.is_some() && m.name == requested_model) | ||
| .or_else(|| { | ||
| models.iter().find(|m| { | ||
| m.model_type == ModelType::Llm | ||
| && m.api | ||
| .as_ref() | ||
| .map(|a| matches!(a.api_type, ModelApiType::OpenAi)) | ||
| .unwrap_or(false) | ||
| }) | ||
| }) | ||
| .ok_or(TtsPassthroughError::NotConfigured)?; |
There was a problem hiding this comment.
Exact-match path doesn't verify api_type is OpenAI.
The first .find() at line 104 checks m.api.is_some() but doesn't verify api_type == OpenAi. A model named "tts-1" with a non-OpenAI API (e.g., local) would match, and the passthrough would attempt to use credentials that may not work with an OpenAI-compatible endpoint.
Proposed fix
let candidate = models
.iter()
- .find(|m| m.model_type == ModelType::Llm && m.api.is_some() && m.name == requested_model)
+ .find(|m| {
+ m.model_type == ModelType::Llm
+ && m.api
+ .as_ref()
+ .map(|a| matches!(a.api_type, ModelApiType::OpenAi))
+ .unwrap_or(false)
+ && m.name == requested_model
+ })
.or_else(|| {📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| let candidate = models | |
| .iter() | |
| .find(|m| m.model_type == ModelType::Llm && m.api.is_some() && m.name == requested_model) | |
| .or_else(|| { | |
| models.iter().find(|m| { | |
| m.model_type == ModelType::Llm | |
| && m.api | |
| .as_ref() | |
| .map(|a| matches!(a.api_type, ModelApiType::OpenAi)) | |
| .unwrap_or(false) | |
| }) | |
| }) | |
| .ok_or(TtsPassthroughError::NotConfigured)?; | |
| let candidate = models | |
| .iter() | |
| .find(|m| { | |
| m.model_type == ModelType::Llm | |
| && m.api | |
| .as_ref() | |
| .map(|a| matches!(a.api_type, ModelApiType::OpenAi)) | |
| .unwrap_or(false) | |
| && m.name == requested_model | |
| }) | |
| .or_else(|| { | |
| models.iter().find(|m| { | |
| m.model_type == ModelType::Llm | |
| && m.api | |
| .as_ref() | |
| .map(|a| matches!(a.api_type, ModelApiType::OpenAi)) | |
| .unwrap_or(false) | |
| }) | |
| }) | |
| .ok_or(TtsPassthroughError::NotConfigured)?; |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@rust-executor/src/api/openai_compat/tts_passthrough.rs` around lines 102 -
114, The first `.find()` call in the candidate selection logic only checks if
m.api.is_some() but doesn't verify that the api_type is OpenAi, allowing a
non-OpenAI model with the requested name to incorrectly match. Add the same
api_type verification to the first `.find()` condition that checks
m.api.as_ref().map(|a| matches!(a.api_type,
ModelApiType::OpenAi)).unwrap_or(false), so both the exact-match path and the
fallback path enforce that the model is OpenAI-compatible.
Summary
Adds an industry-standard, OpenAI-compatible HTTP/WebSocket API surface mounted at
/v1, alongside the existing AD4M WS-RPC transport. Any tool that speaks OpenAI — the officialopenaiSDKs, LangChain, LlamaIndex, the Vercel AI SDK, Cursor/Continue, Open WebUI, thellmCLI — can point itsbase_urlat a running AD4M executor and just work.All five phases from the design proposal land together:
GET /v1/models,POST /v1/chat/completions(non-streaming),POST /v1/completions(legacy),POST /v1/embeddings, OpenAI error envelopePOST /v1/chat/completionswithstream: true— token-by-token SSEPOST /v1/audio/transcriptions— multipart upload, batch WhisperPOST /v1/audio/speech— TTSGET /v1/realtime— WebSocket streaming STT (OpenAI Realtime subset)The native WS-RPC
ai.*surface,core/AIClient, Flux, and the launcher are unchanged. Existing clients see no behaviour change.What changed
New module
rust-executor/src/api/openai_compat/mod.rsrouter.rs/v1and/api/v1/openai/v1types.rserrors.rs{ "error": { "message", "type", "param", "code" } }envelope +IntoResponsemodel_selector.rsmodelstrings → AD4Mmodel_id(id → name →"default")models.rsGET /v1/modelschat.rsPOST /v1/chat/completions(oneshot + SSE) andPOST /v1/completionsembeddings.rsPOST /v1/embeddings— rawf32arraysaudio.rsPOST /v1/audio/transcriptions(multipart STT) +POST /v1/audio/speech(TTS)tts_passthrough.rs/v1/audio/speechrealtime.rsGET /v1/realtimeWebSocket (subset of OpenAI Realtime transcription)AIService surface additions
Two new public methods on
AIServiceso the OpenAI layer can drive the existing per-model LLM threads without bypassing them:prompt_messages(model_id, messages: Vec<(role, content)>) -> PromptResult— builds an ephemeral in-memoryAITaskfor the duration of the call (no DB writes), spawns it on the per-model thread, prompts, removes. Stateless by design — backs/v1/chat/completionsand/v1/completions.prompt_messages_stream(model_id, messages) -> (mpsc::Receiver<String>, oneshot::Receiver<PromptResult>)— same shape but the LLM thread forwards each Kalosm token through an mpsc channel. Backsstream: true.Plus:
transcribe_buffer(model_id, samples, auth_token) -> String— open / feed / drain / close loop around the existing transcription engine. Backs the batch/v1/audio/transcriptionsendpoint.LLMTaskRequest::PromptStreamvariant in the per-model thread's command channel. Local Kalosm callstask.run(prompt).into_stream()and forwards tokens; remote upstreams collapse to a single terminal chunk (a native streaming upstream client is a follow-up).Auth + billing
Reuses the existing JWT extractor (
AuthContext) and capability framework verbatim. The OpenAI layer changes the shape of requests, not the policy:Authorization: Bearer <jwt>(or?token=for the WebSocket realtime endpoint, same asevents_ws)AI_PROMPT, transcriptions + realtime →AI_TRANSCRIBE, speech →AI_PROMPT, models →AI_READbill_computededucts per-user credits with the existing operation labels (ai_prompt,ai_embedding,ai_transcription,ai_tts).InsufficientCredits→ HTTP 429 withtype: "insufficient_quota"matching OpenAI's quota-exhaustion shape so SDKs that retry on 429 do the right thing.Error envelope
Every error response carries the OpenAI envelope:
{ "error": { "message": "...", "type": "invalid_request_error", "param": null, "code": "..." } }With correct HTTP status codes: 400 / 401 / 403 / 404 / 429 / 500 / 501.
Audio decode (Phase 3 scope note)
The batch transcription endpoint accepts 16 kHz mono PCM WAV out of the box — the simplest container that requires no new dependencies and matches Whisper's reference format. Other containers (mp3, m4a, ogg, flac) return a clear 400 pointing at the
Content-Typeso clients can transcode locally; full container support viasymphonia+rubatois a self-contained follow-up that wasn't worth pulling into the same diff.TTS backend (Phase 4 scope note)
/v1/audio/speechforwards to a configured OpenAI-compatible upstream viareqwest(no new dependencies). The proposal calls out passthrough as a valid Phase-4 deliverable alongside local Kokoro/Piper backends; the local engines pull inort+ voice model assets that deserve their own review and didn't make sense to ship in the same PR as the rest of the surface.When no upstream is configured the endpoint returns 501 with a clear message pointing at the configuration path.
Routing
The
/v1surface mounts at two paths:/v1/...— the canonical OpenAI path/api/v1/openai/v1/...— alias for proxies that hard-code the/api/v1prefix from AD4M's native surfaceBoth share handlers and
AppState.Test plan
cargo check -p ad4m-executor --lib— cleancargo test -p ad4m-executor --lib api::openai_compat— handler unit testsopenaiPython SDK against a live executor:client.models.list()client.chat.completions.create(stream=False)client.chat.completions.create(stream=True)client.embeddings.create()client.audio.transcriptions.create(file=open("clip.wav", "rb"))client.audio.speech.create()(with TTS upstream configured)openai.audio.transcriptions.stream()(subset)InsufficientCreditsclient.chat.completions.createtext matchesai.promptover WS-RPC for the same model/promptcargo fmt --check+build-and-test)What's still deferred (not in this PR)
symphonia+rubatofor mp3/m4a/ogg/flac decode + resample). Out of scope; clients should transcode to 16 kHz mono WAV today.ort, or Piper).ModelType::TextToSpeechenum variant, a dedicated TTS-typed model registration, and the local backend itself ship in a follow-up.usagecounts. Local Kalosm prompts still reportchars/4estimates; remote upstreams already return exact counts and we forward those. Wiring exact counts on the local path means reading the Kalosm tokenizer's encoded length per request — straightforward but better tracked alongside the per-token billing follow-up.Companion / follow-up artefacts
None for this PR — there's no client-side migration needed. External tools point their
base_urlat the executor and that's it. AD4M-native clients keep usingcore/AIClientagainst the WS-RPC surface unchanged.cc @lucksus @data-bot-coasys for review.
Summary by CodeRabbit
Release Notes
New Features
/v1for chat completions, embeddings, audio transcription, and text-to-speech synthesis.