diff --git a/CMakeLists.txt b/CMakeLists.txt
index 311b19e3a..61dce4c63 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -408,8 +408,8 @@ add_compile_definitions(CPPHTTPLIB_THREAD_POOL_COUNT=8)
# Platform-specific compiler definitions
if(WIN32)
- # Set Windows target version to Windows 10 for httplib v0.26.0
- add_compile_definitions(_WIN32_WINNT=0x0A00)
+ # Set Windows target version to Windows 10 for httplib v0.26.0, and prevent Windows.h min/max macro definitions
+ add_compile_definitions(_WIN32_WINNT=0x0A00 NOMINMAX)
if(MSVC)
# Add security-hardening compiler flags for MSVC
# Control Flow Guard - prevents control flow hijacking
@@ -596,6 +596,7 @@ set(SOURCES_CORE
src/cpp/server/system_info.cpp
src/cpp/server/recipe_options.cpp
src/cpp/server/runtime_config.cpp
+ src/cpp/server/telemetry.cpp
src/cpp/server/logging_config.cpp
src/cpp/server/log_stream.cpp
src/cpp/server/prometheus_metrics.cpp
@@ -1834,3 +1835,24 @@ if(EXISTS "${_AUTO_TUNE_TEST_SRC}")
include(CTest)
add_test(NAME AutoTuneTest COMMAND test_auto_tune)
endif()
+
+set(_TELEMETRY_HELPERS_TEST_SRC "${CMAKE_CURRENT_SOURCE_DIR}/test/cpp/test_telemetry_helpers.cpp")
+if(EXISTS "${_TELEMETRY_HELPERS_TEST_SRC}")
+ add_executable(test_telemetry_helpers test/cpp/test_telemetry_helpers.cpp)
+ target_link_libraries(test_telemetry_helpers PRIVATE lemonade-server-core)
+ add_test(NAME TelemetryHelpersTest COMMAND test_telemetry_helpers)
+endif()
+
+set(_CONFIG_MIGRATION_TEST_SRC "${CMAKE_CURRENT_SOURCE_DIR}/test/cpp/test_config_migration.cpp")
+if(EXISTS "${_CONFIG_MIGRATION_TEST_SRC}")
+ add_executable(test_config_migration test/cpp/test_config_migration.cpp)
+ target_link_libraries(test_config_migration PRIVATE lemonade-server-core)
+ add_test(NAME ConfigMigrationTest COMMAND test_config_migration)
+endif()
+
+set(_CONFIG_TELEMETRY_TEST_SRC "${CMAKE_CURRENT_SOURCE_DIR}/test/cpp/test_config_telemetry.cpp")
+if(EXISTS "${_CONFIG_TELEMETRY_TEST_SRC}")
+ add_executable(test_config_telemetry test/cpp/test_config_telemetry.cpp)
+ target_link_libraries(test_config_telemetry PRIVATE lemonade-server-core)
+ add_test(NAME ConfigTelemetryTest COMMAND test_config_telemetry)
+endif()
diff --git a/docs/api/lemonade.md b/docs/api/lemonade.md
index bd8470820..2be0391f5 100644
--- a/docs/api/lemonade.md
+++ b/docs/api/lemonade.md
@@ -27,6 +27,7 @@ We have designed a set of Lemonade-specific endpoints to enable client applicati
| `WS` | [`/logs/stream`](#log-streaming-api-websocket) | Log Streaming |
| `GET` | [`/live`](#get-live) | Check server liveness for load balancers and orchestrators |
| `GET` | [`/metrics`](#get-metrics) | Prometheus metrics scrape endpoint |
+| `POST` | [`/internal/telemetry/flush`](#post-internaltelemetryflush) | Force-flush all queued telemetry trace spans |
## `POST /v1/pull`

@@ -683,6 +684,9 @@ curl http://localhost:13305/v1/health
"llm":1,
"reranking":1,
"tts":1
+ },
+ "telemetry": {
+ "enabled": false
}
}
```
@@ -712,6 +716,9 @@ curl http://localhost:13305/v1/health
- `image` - Maximum image models
- `tts` - Maximum text-to-speech models
- `websocket_port` - *(optional)* Port of the WebSocket server for the [Realtime Audio Transcription API](./openai.md#ws-realtime) and [Log Streaming API](#log-streaming-api-websocket). Only present when the WebSocket server is running. The port is OS-assigned or set via `--websocket-port`.
+- `telemetry` - Structured telemetry state object:
+ - `enabled` - Boolean indicating if telemetry collection is active
+ - `captures` - *(optional)* Array of captured telemetry components (e.g., `["inputs", "outputs", "thinking"]`), only present when `enabled` is `true`.
## `GET /v1/stats`

@@ -1383,3 +1390,32 @@ curl http://localhost:13305/live
```json
{"status":"ok"}
```
+
+## Internal Endpoints
+
+Internal endpoints are used for server control and configuration. By default, they are secured by `LEMONADE_ADMIN_API_KEY` (if set) to separate control privileges from standard inference operations.
+
+## `POST /internal/telemetry/flush`
+
+
+Forces the in-memory telemetry queue to flush all buffered trace spans immediately to the configured OTLP collector. This call blocks until all currently queued spans are serialized and sent.
+
+#### Parameters
+
+None.
+
+Example request:
+
+```bash
+curl -X POST http://localhost:13305/internal/telemetry/flush
+```
+
+#### Response Format
+
+Returns a JSON object indicating successful completion of the flush operation:
+
+```json
+{
+ "status": "flushed"
+}
+```
diff --git a/docs/guide/cli.md b/docs/guide/cli.md
index 50d388bbb..6c6945a0e 100644
--- a/docs/guide/cli.md
+++ b/docs/guide/cli.md
@@ -16,6 +16,7 @@ The `lemonade` CLI is the primary tool for interacting with Lemonade Server from
- [Options for launch](#options-for-launch)
- [Options for bench](#options-for-bench)
- [Options for scan](#options-for-scan)
+- [Options for telemetry](#options-for-telemetry)
## Commands
@@ -38,6 +39,7 @@ The `lemonade` CLI is the primary tool for interacting with Lemonade Server from
| `backends` | List supported recipes and backends or list all available recipes and backends with `--all`. Use `install` or `uninstall` to manage backends. |
| `cloud` | Manage cloud OpenAI-compatible providers. See command options [below](#options-for-cloud). |
| `scan` | Scan for network beacons on the local network. See command options [below](#options-for-scan). |
+| `telemetry` | Dynamically enable or disable telemetry tracing. See command options [below](#options-for-telemetry). |
### Model Management
@@ -659,6 +661,29 @@ lemonade scan
lemonade scan --duration 5
```
+## Options for telemetry
+
+Dynamically toggle telemetry tracing on the server. This setting is applied immediately in-memory without requiring a server restart, but is not persisted to the server's `config.json` (meaning it will revert when the server restarts).
+
+```bash
+lemonade telemetry
+```
+
+| Argument | Description |
+|----------|-------------|
+| `on` | Enable telemetry tracing. |
+| `off` | Disable telemetry tracing. |
+
+**Examples:**
+
+```bash
+# Enable telemetry tracing dynamically
+lemonade telemetry on
+
+# Disable telemetry tracing dynamically
+lemonade telemetry off
+```
+
## Options for bench
The `bench` command measures chat completion performance (TTFT and tokens-per-second) for one or more models across one or more installed backends, context sizes, and scenario workloads. It sends `POST /api/v1/chat/completions` requests and extracts timing data from the server response.
diff --git a/docs/guide/configuration/README.md b/docs/guide/configuration/README.md
index 93977148c..01186bb72 100644
--- a/docs/guide/configuration/README.md
+++ b/docs/guide/configuration/README.md
@@ -33,7 +33,7 @@ Values set in the user's `config.json` always take precedence over these seeded
```json
{
- "config_version": 1,
+ "config_version": 2,
"port": 13305,
"host": "localhost",
"log_level": "info",
@@ -83,13 +83,30 @@ Values set in the user's `config.json` always take precedence over these seeded
"vulkan_bin": "builtin"
},
"flm": {
- "args": "",
+ "args": ""
},
"ryzenai": {
"server_bin": "builtin"
},
"kokoro": {
"cpu_bin": "builtin"
+ },
+ "telemetry": {
+ "enabled": false,
+ "hide_inputs": false,
+ "hide_outputs": false,
+ "hide_thinking": false,
+ "max_queue_capacity": 1000,
+ "otlp": {
+ "endpoint": "http://localhost:4318/v1/traces",
+ "protocol": "http/protobuf",
+ "semantics": ["openinference", "otel_genai"],
+ "headers": {},
+ "max_retries": 0,
+ "retry_backoff_base_s": 5.0,
+ "send_batch_size": 100,
+ "batch_timeout_s": 1.0
+ }
}
}
```
@@ -171,6 +188,53 @@ Backend-specific settings are nested under their backend name:
API keys for these providers are **not** stored in `config.json` — they live in `LEMONADE__API_KEY` env vars (persistent) or `lemond` process memory via `POST /v1/cloud/auth` (ephemeral). Manage providers with `lemonade cloud install/uninstall/auth/list` rather than editing this section by hand.
+**telemetry** — Unified telemetry and tracing configurations:
+
+| Key | Type | Default | Description |
+|-----|------|---------|-------------|
+| `enabled` | bool | false | Enable or disable telemetry tracing. |
+| `hide_inputs` | bool | false | Redact prompt message content from spans. |
+| `hide_outputs` | bool | false | Redact generated assistant message content from spans. |
+| `hide_thinking` | bool | false | Redact reasoning/thought content from spans. |
+| `max_queue_capacity` | int | 1000 | The maximum capacity of the in-memory telemetry queue buffer. Oldest spans are dropped when full. Must be `> 0`. |
+| `otlp` | object | (nested object) | Sub-block grouping OTLP transport details (see below). |
+
+**telemetry.otlp** — Nested OTLP settings:
+
+| Key | Type | Default | Description |
+|-----|------|---------|-------------|
+| `endpoint` | string | "http://localhost:4318/v1/traces" | The OTLP endpoint to send traces to. |
+| `protocol` | string | "http/protobuf" | Supported OTLP trace protocol: `"http/protobuf"` or `"http/json"`. |
+| `semantics` | array of strings | ["openinference", "otel_genai"] | Active trace semantics. Supported values: `"openinference"` and `"otel_genai"`. |
+| `headers` | object | {} | Map of custom HTTP headers to pass to the OTLP receiver. |
+| `max_retries` | int | 0 | Maximum number of retry attempts for failed exports. Set to `0` to disable retries and discard failed spans immediately. Must be `>= 0`. |
+| `retry_backoff_base_s` | double | 5.0 | Base delay in seconds for exponential backoff retries. Must be `>= 0`. |
+| `send_batch_size` | int | 100 | Target maximum number of spans to group in a single batched OTLP request. Must be `>= 1`. |
+| `batch_timeout_s` | double | 1.0 | Maximum time to wait in seconds before exporting a partially filled batch of spans. Must be `> 0`. |
+
+#### Telemetry and Tracing Details
+
+Lemonade uses a unified telemetry subsystem to trace requests and capture critical execution spans. The following technical behaviors apply:
+
+- **Multi-Standard Semantic Conventions**: Supports exporting traces using two co-existing semantics:
+ - **OpenInference**: Uses Arize Phoenix-compatible properties (always prefixed with `openinference.span.kind`, `llm.model_name`, `llm.token_count.*`).
+ - **OpenTelemetry GenAI**: Uses standard OpenTelemetry GenAI properties (`gen_ai.system`, `gen_ai.request.model`, `gen_ai.usage.input_tokens`, `gen_ai.input.messages`, `gen_ai.output.messages`).
+ When both semantics are specified in `telemetry.otlp.semantics`, trace spans carry attributes for both conventions in a single network payload. This allows the collector to parse either convention without duplicate network requests.
+- **Dynamic Attribute Prefixing**: Span attributes are dynamically prefixed based on the query type to simplify filtering:
+ - `llm.*` for standard chat and completion spans.
+ - `embedding.*` for text embedding generation spans.
+ - `reranker.*` for document reranking spans.
+- **Token Tracking**: Captures and reports token usage metrics using semantic attributes depending on the enabled semantics:
+ - For **OpenInference**: Token count is prefixed with `llm.token_count` across all span kinds (`llm.token_count.prompt`, `llm.token_count.completion`, `llm.token_count.total`) alongside legacy keys like `llm.usage.prompt_tokens`.
+ - For **OpenTelemetry GenAI**: Token count uses standard fields like `gen_ai.usage.input_tokens` and `gen_ai.usage.output_tokens`.
+- **Calculated Performance Metrics**: In streaming mode, the server automatically computes and records throughput (`llm.performance.tokens_per_second` / `gen_ai.usage.tokens_per_second` depending on semantic conventions) and prefill latency (`llm.performance.time_to_first_token` / `gen_ai.performance.time_to_first_token`) if not natively returned by the backend (e.g., for vLLM and Cloud models).
+- **vLLM Engine Telemetry**: For the vLLM backend, the server queries the local `/metrics` endpoint on completion to attach scheduler queue metrics (`llm.vllm.num_requests_waiting`, `llm.vllm.num_requests_running`, `llm.vllm.num_requests_swapped`) and KV cache utilization (`llm.vllm.gpu_cache_usage_factor`, `llm.vllm.cpu_cache_usage_factor`) directly to the trace spans.
+- **Reasoning Model Support**: For reasoning models (e.g., DeepSeek models), the server extracts and records `reasoning_content` from the assistant's generation. Any variant thought-termination tags (e.g., ``) are automatically standardized to the canonical `` tag.
+- **Exporter Retry Backoff**: When retries are enabled (i.e., `max_retries > 0`), the exporter uses an exponential backoff strategy combined with randomized jitter for failed posts. The base retry interval starts at `retry_backoff_base_s` seconds (defaulting to 5), doubling on each subsequent failure (e.g., 5s, 10s, 20s, 40s), up to a maximum cap of 60 seconds. A randomized jitter factor between `0.5` and `1.5` is applied to each calculated delay to prevent a "thundering herd" when the collector recovers. Permanent client errors (`4xx` HTTP status codes, excluding `429 Too Many Requests`) are classified as non-retryable and cause the batch to be dropped immediately to save resources.
+- **OTLP Trace Batching**: Spans are aggregated in an in-memory queue buffer and exported in batches to minimize network overhead and maximize compression efficiency. Batching operates on a dual-trigger system: a batch is immediately serialized and dispatched if it reaches `send_batch_size` (default: `100`), or if `batch_timeout_s` (default: `1.0` second) has elapsed since the oldest span in the batch arrived. All remaining traces are flushed cleanly to the OTel collector upon server shutdown. Users can also trigger a manual flush at any time via the `POST /internal/telemetry/flush` endpoint.
+- **Request Failure Tracing**: Captures request failures directly on the telemetry spans. If a model fails to load, a request is rejected by the router, or a streaming connection encounters an exception or a non-200 HTTP status code from the backend, the span is ended with `Error` status and the specific error message is attached.
+- **Queue Blocking & Thundering Herd Prevention**: To prevent client requests from hanging and to avoid exhausting resources when the telemetry receiver endpoint is down, Lemonade employs a fail-fast mechanism. The exporter memory buffer is strictly bounded to a capacity of `max_queue_capacity` spans (default: `1000`). When full, a head-drop (FIFO) eviction policy is applied to drop the oldest telemetry spans to make room for newer ones, prioritizing current application state. If a telemetry transmission task fails all of its retries and is dropped, the endpoint is marked as **unreachable**. While in this unreachable state, subsequent spans in the transmission queue are attempted only once and immediately dropped without backoff delay if they fail, preventing the telemetry queue from blocking server operations. A single successful span delivery to the endpoint automatically resets the unreachable state and restores normal retry behavior.
+
### Backend binary selection
Every `*_bin` key (e.g. `llamacpp.vulkan_bin`, `whispercpp.cpu_bin`, `sdcpp.rocm_bin`) accepts the same set of values:
@@ -185,7 +249,7 @@ Every `*_bin` key (e.g. `llamacpp.vulkan_bin`, `whispercpp.cpu_bin`, `sdcpp.rocm
> Note: the `latest` setting is experimental.
-> **Important — `llamacpp.rocm_bin` version tags are channel-specific.** Each ROCm channel downloads from a different GitHub repository, so you must set the correct `rocm_channel` before pinning `rocm_bin` to a specific tag. See [Pinning to a Specific Version Tag](./llamacpp.md#pinning-to-a-specific-version-tag) for details.
+> Note: `llamacpp.rocm_bin` version tags are channel-specific. Each ROCm channel downloads from a different GitHub repository, so you must set the correct `rocm_channel` before pinning `rocm_bin` to a specific tag. See [Pinning to a Specific Version Tag](./llamacpp.md#pinning-to-a-specific-version-tag) for details.
Examples:
diff --git a/docs/guide/telemetry.md b/docs/guide/telemetry.md
new file mode 100644
index 000000000..6776226ed
--- /dev/null
+++ b/docs/guide/telemetry.md
@@ -0,0 +1,166 @@
+# Telemetry Guide
+
+Lemonade provides a unified, zero-dependency OpenTelemetry (OTLP) telemetry subsystem designed to trace inference requests and export metrics. This allows developers, prompt engineers, and system administrators to monitor token usage, latencies, request context, and scheduler performance.
+
+---
+
+## Architecture & Concepts
+
+Lemonade exports trace spans directly to any OpenTelemetry-compatible collector or application performance monitor (APM) using the OTLP wire protocol.
+
+```mermaid
+graph TD
+ Client[Application Client] -->|Inference Requests| Lemonade[Lemonade Server]
+ Lemonade -->|OTLP Traces / JSON or Protobuf| Collector[OTel Collector / SaaS APM]
+ Collector -->|Visualization / Analysis| UI[Arize Phoenix / Datadog / Honeycomb]
+```
+
+### Supported Semantic Conventions
+
+Lemonade supports two co-existing trace formats:
+
+1. **OpenInference (`openinference.*`)**:
+ - A mature convention designed specifically for LLM and agentic application tracing.
+ - Deeply integrated into AI evaluation suites (e.g., **Arize Phoenix**, **Langfuse**).
+ - Captures rich LLM attributes (such as `llm.token_count.prompt`, `llm.token_count.completion`, and raw inputs/outputs/thinking).
+2. **OpenTelemetry GenAI (`gen_ai.*`)**:
+ - The official, vendor-neutral standard defined by the OpenTelemetry community.
+ - Supported by mainstream APM backends (e.g., **Honeycomb**, **Datadog**, **Grafana**).
+ - Captures standardized attributes (such as `gen_ai.system`, `gen_ai.request.model`, and `gen_ai.usage.input_tokens`).
+
+Users can specify one or both formats. When both are enabled, Lemonade packs attributes for both conventions into a **single pass** and exports them in a **single network payload** to eliminate network overhead.
+
+---
+
+## Configuration
+
+Telemetry is configured under the `telemetry` block in your `config.json` or managed dynamically via the Lemonade CLI/API.
+
+### Settings Reference
+
+| Key | Type | Default | Description |
+|-----|------|---------|-------------|
+| `telemetry.enabled` | boolean | `false` | Set to `true` to enable tracing. |
+| `telemetry.hide_inputs` | boolean | `false` | Redacts raw prompt inputs from trace attributes to protect privacy. |
+| `telemetry.hide_outputs` | boolean | `false` | Redacts assistant output text from trace attributes. |
+| `telemetry.hide_thinking` | boolean | `false` | Redacts internal reasoning/thinking blocks from trace attributes. |
+| `telemetry.max_queue_capacity` | int | `1000` | Target memory buffer capacity for queued spans. When exceeded, the oldest spans are evicted first (FIFO). |
+| `telemetry.otlp.endpoint` | string | `"http://localhost:4318/v1/traces"` | OTLP HTTP receiver endpoint URL. |
+| `telemetry.otlp.protocol` | string | `"http/protobuf"` | Encoding protocol: `"http/protobuf"` or `"http/json"`. |
+| `telemetry.otlp.semantics` | array | `["openinference", "otel_genai"]` | Enabled conventions. Supported: `"openinference"`, `"otel_genai"`. |
+| `telemetry.otlp.headers` | object | `{}` | Key-value pairs for HTTP request headers (e.g., authorization API keys). |
+| `telemetry.otlp.max_retries` | int | `0` | Max export retry attempts for transient server errors. Set to `0` to disable retries. |
+| `telemetry.otlp.retry_backoff_base_s` | double | `5.0` | Base exponential backoff delay in seconds. |
+| `telemetry.otlp.send_batch_size` | int | `100` | Target span batch size to dispatch in a single HTTP request. |
+| `telemetry.otlp.batch_timeout_s` | double | `1.0` | Max buffer timeout in seconds before exporting a partial batch. |
+
+---
+
+## Dynamic Control via CLI & API
+
+You can toggle telemetry dynamically while the server is running without restarting.
+
+### Via Lemonade CLI
+
+```bash
+# Enable telemetry
+lemonade telemetry on
+
+# Disable telemetry
+lemonade telemetry off
+
+# View current configuration
+lemonade config
+```
+
+### Via Configuration API
+
+Internal configuration is updated via the `/internal/set` endpoint:
+
+```bash
+# Enable telemetry and select only OpenTelemetry GenAI semantics
+curl -X POST http://localhost:13305/internal/set \
+ -H "Content-Type: application/json" \
+ -d '{
+ "telemetry": {
+ "enabled": true,
+ "otlp": {
+ "semantics": ["otel_genai"]
+ }
+ }
+ }'
+```
+
+### Forcing a Flush
+
+Spans are buffered in memory to optimize networking. You can force-flush the telemetry queue at any time:
+
+```bash
+curl -X POST http://localhost:13305/internal/telemetry/flush
+```
+
+---
+
+## Integration Guides
+
+### 1. Local LLM Observability with Arize Phoenix
+
+[Arize Phoenix](https://phoenix.arize.com/) is an open-source AI observability platform that runs locally. It consumes the `openinference` format.
+
+#### Quick Start with Podman
+
+To run Arize Phoenix in the foreground (interactive mode, automatically removed on exit):
+```bash
+podman run --rm -it --name phoenix \
+ -p 6006:6006 -p 4317:4317 -p 4318:4318 \
+ docker.io/arizeai/phoenix:latest
+```
+
+Alternatively, to run it in the background (detached mode):
+```bash
+podman run -d --name phoenix \
+ -p 6006:6006 -p 4317:4317 -p 4318:4318 \
+ docker.io/arizeai/phoenix:latest
+```
+
+Once running, configure Lemonade to point to the local Phoenix OTLP endpoint:
+```bash
+lemonade config set telemetry.enabled=true \
+ telemetry.otlp.endpoint=http://localhost:4318/v1/traces \
+ telemetry.otlp.semantics='["openinference"]'
+```
+Navigate to `http://localhost:6006` in your browser to inspect trace flows.
+
+---
+
+### 2. Cloud Observability (SaaS APM)
+
+If you use a cloud-hosted observability provider (such as Honeycomb, Datadog, or New Relic), you can configure Lemonade to stream trace data directly over HTTPS without running local collectors.
+
+#### Example: Direct-to-Honeycomb
+
+To send trace details to Honeycomb (using `otel_genai` semantics):
+
+```bash
+lemonade config set telemetry.enabled=true \
+ telemetry.otlp.endpoint=https://api.honeycomb.io/v1/traces \
+ telemetry.otlp.semantics='["otel_genai"]' \
+ telemetry.otlp.headers='{"x-honeycomb-team": "YOUR_API_KEY"}'
+```
+
+---
+
+## Privacy & Redaction
+
+By default, Lemonade captures prompts, completions, and reasoning content in span attributes to allow full evaluation and debugging.
+
+If you are running in a privacy-sensitive environment, redact this content from outgoing telemetry payloads:
+
+```bash
+# Redact inputs, outputs, and reasoning steps
+lemonade config set telemetry.hide_inputs=true \
+ telemetry.hide_outputs=true \
+ telemetry.hide_thinking=true
+```
+
+When redacted, metadata (tokens, latency, model names, status codes) is still reported, but the actual text payloads are replaced with empty values.
diff --git a/src/cpp/cli/main.cpp b/src/cpp/cli/main.cpp
index a0f39c830..27e32fce1 100644
--- a/src/cpp/cli/main.cpp
+++ b/src/cpp/cli/main.cpp
@@ -177,6 +177,9 @@ struct CliConfig {
std::string cloud_api_key;
bool cloud_allow_insecure_http = false;
+ // Telemetry toggle options
+ std::string telemetry_status;
+
// Chat REPL options
bool chat_cli = false;
bool chat_no_stream = false;
@@ -991,17 +994,40 @@ static int handle_config_set(lemonade::LemonadeClient& client,
std::string key = arg.substr(0, eq_pos);
std::string value = arg.substr(eq_pos + 1);
- size_t dot_pos = key.find('.');
- if (dot_pos != std::string::npos) {
- std::string section = key.substr(0, dot_pos);
- std::string field = normalize_key(key.substr(dot_pos + 1));
+ std::vector path;
+ size_t last_pos = 0;
+ while (true) {
+ size_t next_dot = key.find('.', last_pos);
+ if (next_dot == std::string::npos) {
+ std::string part = key.substr(last_pos);
+ if (!part.empty()) {
+ path.push_back(normalize_key(part));
+ }
+ break;
+ }
+ std::string part = key.substr(last_pos, next_dot - last_pos);
+ if (!part.empty()) {
+ path.push_back(normalize_key(part));
+ }
+ last_pos = next_dot + 1;
+ }
+
+ if (path.empty()) {
+ std::cerr << "Error: empty key in '" << arg << "'" << std::endl;
+ return 1;
+ }
- if (!updates.contains(section)) {
- updates[section] = nlohmann::json::object();
+ nlohmann::json* current = &updates;
+ for (size_t i = 0; i < path.size(); ++i) {
+ const std::string& k = path[i];
+ if (i == path.size() - 1) {
+ (*current)[k] = parse_typed_value(value);
+ } else {
+ if (!current->contains(k) || !(*current)[k].is_object()) {
+ (*current)[k] = nlohmann::json::object();
+ }
+ current = &((*current)[k]);
}
- updates[section][field] = parse_typed_value(value);
- } else {
- updates[normalize_key(key)] = parse_typed_value(value);
}
}
@@ -1170,6 +1196,11 @@ int main(int argc, char* argv[]) {
CLI::App* logs_cmd = app.add_subcommand("logs", "Open server logs in the web UI")->group("Server");
CLI::App* scan_cmd = app.add_subcommand("scan", "Scan for network beacons")->group("Server");
+ CLI::App* telemetry_cmd = app.add_subcommand("telemetry", "Toggle server telemetry on or off")->group("Server");
+ telemetry_cmd->add_option("status", config.telemetry_status, "Telemetry status: 'on' or 'off'")
+ ->required()
+ ->check(CLI::IsMember({"on", "off"}));
+
// Config commands
CLI::App* config_cmd = app.add_subcommand("config", "View or modify server configuration")->group("Server");
CLI::App* config_set_cmd = config_cmd->add_subcommand("set", "Set configuration values (e.g., llamacpp.backend=rocm port=8123)")->group("Subcommands");
@@ -1507,6 +1538,23 @@ int main(int argc, char* argv[]) {
return 0;
} else if (scan_cmd->count() > 0) {
return handle_scan_command(config);
+ } else if (telemetry_cmd->count() > 0) {
+ bool enable = (config.telemetry_status == "on");
+ nlohmann::json body = {{"telemetry", {{"enabled", enable}}}};
+ try {
+ std::string res_str = client.make_request("/internal/set", "POST", body.dump(), "application/json");
+ auto json_res = nlohmann::json::parse(res_str);
+ if (json_res.contains("status") && json_res["status"] == "success") {
+ std::cout << "Telemetry successfully turned " << (enable ? "on" : "off") << "." << std::endl;
+ return 0;
+ } else {
+ std::cerr << "Failed to toggle telemetry: " << res_str << std::endl;
+ return 1;
+ }
+ } catch (const std::exception& e) {
+ std::cerr << "Error toggling telemetry: " << e.what() << std::endl;
+ return 1;
+ }
} else if (config_cmd->count() > 0) {
if (config_set_cmd->count() > 0) {
return handle_config_set(client, config_set_cmd->remaining());
diff --git a/src/cpp/include/lemon/backends/vllm_server.h b/src/cpp/include/lemon/backends/vllm_server.h
index 62ec94af2..4deeffbda 100644
--- a/src/cpp/include/lemon/backends/vllm_server.h
+++ b/src/cpp/include/lemon/backends/vllm_server.h
@@ -7,6 +7,8 @@
namespace lemon {
namespace backends {
+std::map parse_vllm_metrics_text(const std::string& body);
+
class VLLMServer : public WrappedServer {
public:
static InstallParams get_install_params(const std::string& backend, const std::string& version);
@@ -43,6 +45,10 @@ class VLLMServer : public WrappedServer {
long timeout_seconds = 0,
TelemetryCallback telemetry_callback = nullptr) override;
+ std::map get_additional_telemetry() override;
+ std::string get_additional_telemetry_url() const override;
+ std::function(const std::string&)> get_additional_telemetry_parser() const override;
+
};
} // namespace backends
diff --git a/src/cpp/include/lemon/config_file.h b/src/cpp/include/lemon/config_file.h
index ec56c17fb..4dfaed8a2 100644
--- a/src/cpp/include/lemon/config_file.h
+++ b/src/cpp/include/lemon/config_file.h
@@ -73,9 +73,6 @@ static inline bool config_migrate(json& config,
current_version = 2;
}
- // Run any future migrations here as the version increments.
- // if (current_version < 3) { ... }
-
return true;
}
diff --git a/src/cpp/include/lemon/router.h b/src/cpp/include/lemon/router.h
index e98a8b11d..af35e6153 100644
--- a/src/cpp/include/lemon/router.h
+++ b/src/cpp/include/lemon/router.h
@@ -24,6 +24,10 @@ using json = nlohmann::json;
class CloudProviderRegistry;
+namespace telemetry {
+class InferenceSpan;
+}
+
struct ModelTelemetryIdentity {
std::string model_name;
std::string checkpoint;
@@ -189,7 +193,7 @@ class Router {
auto execute_inference(const json& request, Func&& inference_func) -> decltype(inference_func(nullptr));
template
- void execute_streaming(const std::string& request_body, httplib::DataSink& sink, Func&& streaming_func);
+ void execute_streaming(const std::string& request_body, httplib::DataSink& sink, Func&& streaming_func, std::shared_ptr span = nullptr);
};
} // namespace lemon
diff --git a/src/cpp/include/lemon/runtime_config.h b/src/cpp/include/lemon/runtime_config.h
index 9dbc422f6..2e0d00e3c 100644
--- a/src/cpp/include/lemon/runtime_config.h
+++ b/src/cpp/include/lemon/runtime_config.h
@@ -35,6 +35,22 @@ class RuntimeConfig {
bool auto_evict() const;
double auto_evict_threshold_pct() const;
+ // Telemetry settings
+ bool telemetry_enabled() const;
+ bool telemetry_hide_inputs() const;
+ bool telemetry_hide_outputs() const;
+ bool telemetry_hide_thinking() const;
+ int telemetry_max_queue_capacity() const;
+ int telemetry_max_attribute_length() const;
+ std::string telemetry_otlp_endpoint() const;
+ std::string telemetry_otlp_protocol() const;
+ std::vector telemetry_otlp_semantics() const;
+ std::map telemetry_otlp_headers() const;
+ int telemetry_otlp_max_retries() const;
+ double telemetry_otlp_retry_backoff_base_s() const;
+ int telemetry_otlp_send_batch_size() const;
+ double telemetry_otlp_batch_timeout_s() const;
+
// Feature flags
bool offline() const;
@@ -114,6 +130,13 @@ class RuntimeConfig {
// backend sections).
void apply_changes(const json& changes, json& applied_diff);
+ // Helpers to retrieve configuration options with environment variable override
+ // and fallback to config JSON under a shared lock.
+ bool get_bool_opt(const char* env_name, const std::vector& path, bool default_val) const;
+ int get_int_opt(const char* env_name, const std::vector& path, int default_val) const;
+ double get_double_opt(const char* env_name, const std::vector& path, double default_val) const;
+ std::string get_string_opt(const char* env_name, const std::vector& path, const std::string& default_val) const;
+
mutable std::shared_mutex mutex_;
// Config stored as nested JSON matching config.json structure.
diff --git a/src/cpp/include/lemon/streaming_proxy.h b/src/cpp/include/lemon/streaming_proxy.h
index 4d56b702a..4441a5547 100644
--- a/src/cpp/include/lemon/streaming_proxy.h
+++ b/src/cpp/include/lemon/streaming_proxy.h
@@ -19,6 +19,7 @@ class StreamingProxy {
int output_tokens = 0;
double time_to_first_token = 0.0;
double tokens_per_second = 0.0;
+ std::string error_message = "";
void print() const {
if (input_tokens > 0 || output_tokens > 0) {
@@ -51,6 +52,8 @@ class StreamingProxy {
std::function on_chunk = nullptr
);
+ static void process_sse_lines(std::string& line_buffer, std::function line_callback);
+
private:
static TelemetryData parse_telemetry(const std::string& buffer);
};
diff --git a/src/cpp/include/lemon/wrapped_server.h b/src/cpp/include/lemon/wrapped_server.h
index f3ec74da4..576c1d091 100644
--- a/src/cpp/include/lemon/wrapped_server.h
+++ b/src/cpp/include/lemon/wrapped_server.h
@@ -315,9 +315,10 @@ class WrappedServer : public ICompletionServer {
// Forward streaming requests to the wrapped server (public for Router access)
// Virtual so backends can transform request (e.g., FLM needs checkpoint in model field)
using TelemetryCallback = std::function;
+ int output_tokens,
+ double time_to_first_token,
+ double tokens_per_second,
+ const std::string& error_message)>;
virtual void forward_streaming_request(const std::string& endpoint,
const std::string& request_body,
@@ -333,6 +334,18 @@ class WrappedServer : public ICompletionServer {
Telemetry get_telemetry() const { return telemetry_; }
+ virtual std::map get_additional_telemetry() {
+ return {};
+ }
+
+ virtual std::string get_additional_telemetry_url() const {
+ return "";
+ }
+
+ virtual std::function(const std::string&)> get_additional_telemetry_parser() const {
+ return nullptr;
+ }
+
// Mark observable backend progress. Streaming proxies call this for every
// delivered chunk; non-streaming requests call it on start/finish and when
// the watchdog observes a healthy out-of-band probe.
diff --git a/src/cpp/resources/defaults.json b/src/cpp/resources/defaults.json
index f79396266..78200aca6 100644
--- a/src/cpp/resources/defaults.json
+++ b/src/cpp/resources/defaults.json
@@ -67,5 +67,22 @@
"cpu_args": "",
"cpu_bin": "builtin"
},
- "cloud_providers": []
+ "cloud_providers": [],
+ "telemetry": {
+ "enabled": false,
+ "hide_inputs": false,
+ "hide_outputs": false,
+ "hide_thinking": false,
+ "max_queue_capacity": 1000,
+ "otlp": {
+ "endpoint": "http://localhost:4318/v1/traces",
+ "protocol": "http/protobuf",
+ "semantics": ["openinference", "otel_genai"],
+ "headers": {},
+ "max_retries": 0,
+ "retry_backoff_base_s": 5.0,
+ "send_batch_size": 100,
+ "batch_timeout_s": 1.0
+ }
+ }
}
diff --git a/src/cpp/server/backends/cloud_server.cpp b/src/cpp/server/backends/cloud_server.cpp
index b29fee4cc..85002fc40 100644
--- a/src/cpp/server/backends/cloud_server.cpp
+++ b/src/cpp/server/backends/cloud_server.cpp
@@ -553,7 +553,6 @@ void CloudServer::forward_streaming_request(const std::string& endpoint,
// reconstruct from chunked output, and matching local backends here
// would only diverge subtly. Passing the callback through preserves the
// contract for callers that pass one in.
- (void) telemetry_callback;
auto sse_error = [](const std::string& message, const std::string& type,
const json& extra = json::object()) {
json err = {{"error", {{"message", message}, {"type", type}}}};
@@ -567,6 +566,9 @@ void CloudServer::forward_streaming_request(const std::string& endpoint,
std::string error_msg = sse_error("Cloud model not loaded", "model_not_loaded");
sink.write(error_msg.c_str(), error_msg.size());
sink.done();
+ if (telemetry_callback) {
+ telemetry_callback(0, 0, 0.0, 0.0, "Cloud model not loaded");
+ }
return;
}
@@ -605,6 +607,9 @@ void CloudServer::forward_streaming_request(const std::string& endpoint,
std::string error_msg = missing_creds_sse();
sink.write(error_msg.c_str(), error_msg.size());
sink.done();
+ if (telemetry_callback) {
+ telemetry_callback(0, 0, 0.0, 0.0, "Missing API credentials");
+ }
return;
}
@@ -629,6 +634,36 @@ void CloudServer::forward_streaming_request(const std::string& endpoint,
bool has_done_marker = false;
bool streaming_mode = false;
bool first_chunk = true;
+
+ int input_tokens = 0;
+ int output_tokens = 0;
+ double time_to_first_token = 0.0;
+ double tokens_per_second = 0.0;
+ bool has_first_token = false;
+ const auto start_time = std::chrono::steady_clock::now();
+ std::string sse_line_buffer;
+
+ auto process_cloud_line = [&](const std::string& line) {
+ std::string json_str;
+ if (line.find("data: ") == 0) {
+ json_str = line.substr(6);
+ }
+ if (!json_str.empty() && json_str != "[DONE]") {
+ try {
+ auto chunk = json::parse(json_str);
+ if (chunk.contains("usage") && !chunk["usage"].is_null()) {
+ auto usage = chunk["usage"];
+ if (usage.contains("prompt_tokens") && usage["prompt_tokens"].is_number()) {
+ input_tokens = usage["prompt_tokens"].get();
+ }
+ if (usage.contains("completion_tokens") && usage["completion_tokens"].is_number()) {
+ output_tokens = usage["completion_tokens"].get();
+ }
+ }
+ } catch (...) {}
+ }
+ };
+
auto result = utils::HttpClient::post_stream(
url,
forwarded_body,
@@ -647,6 +682,17 @@ void CloudServer::forward_streaming_request(const std::string& endpoint,
if (std::string_view(data, length).find("[DONE]") != std::string_view::npos) {
has_done_marker = true;
}
+
+ // Parse SSE lines
+ sse_line_buffer.append(data, length);
+ StreamingProxy::process_sse_lines(sse_line_buffer, process_cloud_line);
+
+ if (!has_first_token && std::string_view(data, length).find("data: ") != std::string_view::npos) {
+ has_first_token = true;
+ time_to_first_token = std::chrono::duration(
+ std::chrono::steady_clock::now() - start_time).count();
+ }
+
return sink.write(data, length);
}
body_buffer.append(data, length);
@@ -664,6 +710,9 @@ void CloudServer::forward_streaming_request(const std::string& endpoint,
"cloud (" + provider_ + ") request failed", "backend_error", extra);
sink.write(error_msg.c_str(), error_msg.size());
sink.done();
+ if (telemetry_callback) {
+ telemetry_callback(0, 0, 0.0, 0.0, "cloud (" + provider_ + ") request failed with status " + std::to_string(result.status_code));
+ }
return;
}
@@ -678,6 +727,17 @@ void CloudServer::forward_streaming_request(const std::string& endpoint,
sink.write(done_marker, std::strlen(done_marker));
}
sink.done();
+
+ if (telemetry_callback) {
+ if (output_tokens > 0 && time_to_first_token > 0.0) {
+ double duration = std::chrono::duration(std::chrono::steady_clock::now() - start_time).count();
+ double generation_duration = duration - time_to_first_token;
+ if (generation_duration > 0.0) {
+ tokens_per_second = output_tokens / generation_duration;
+ }
+ }
+ telemetry_callback(input_tokens, output_tokens, time_to_first_token, tokens_per_second, "");
+ }
} else {
auto result = utils::HttpClient::post_stream(
url,
@@ -690,11 +750,21 @@ void CloudServer::forward_streaming_request(const std::string& endpoint,
);
if (result.status_code != 200) {
LOG(ERROR, "Cloud") << "Provider returned status " << result.status_code << std::endl;
+ if (telemetry_callback) {
+ telemetry_callback(0, 0, 0.0, 0.0, "status_code " + std::to_string(result.status_code));
+ }
+ } else {
+ if (telemetry_callback) {
+ telemetry_callback(0, 0, 0.0, 0.0, "");
+ }
}
sink.done();
}
} catch (const std::exception& e) {
LOG(ERROR, "Cloud") << "Streaming request failed: " << e.what() << std::endl;
+ if (telemetry_callback) {
+ telemetry_callback(0, 0, 0.0, 0.0, e.what());
+ }
try {
std::string error_msg = sse_error(e.what(), "streaming_error");
sink.write(error_msg.c_str(), error_msg.size());
diff --git a/src/cpp/server/backends/vllm_server.cpp b/src/cpp/server/backends/vllm_server.cpp
index 7584d56d9..9241c9b40 100644
--- a/src/cpp/server/backends/vllm_server.cpp
+++ b/src/cpp/server/backends/vllm_server.cpp
@@ -275,18 +275,21 @@ void VLLMServer::forward_streaming_request(const std::string& endpoint,
Telemetry telemetry;
bool has_telemetry = false;
+ std::string telemetry_error = "";
WrappedServer::forward_streaming_request(
endpoint, body, sink, sse, timeout_seconds,
- [&telemetry, &has_telemetry](int input_tokens,
+ [&telemetry, &has_telemetry, &telemetry_error](int input_tokens,
int output_tokens,
double time_to_first_token,
- double tokens_per_second) {
+ double tokens_per_second,
+ const std::string& error_message) {
has_telemetry = true;
telemetry.input_tokens = input_tokens;
telemetry.output_tokens = output_tokens;
telemetry.time_to_first_token = time_to_first_token;
telemetry.tokens_per_second = tokens_per_second;
+ telemetry_error = error_message;
});
if (has_telemetry) {
@@ -304,10 +307,71 @@ void VLLMServer::forward_streaming_request(const std::string& endpoint,
telemetry_callback(telemetry.input_tokens,
telemetry.output_tokens,
telemetry.time_to_first_token,
- telemetry.tokens_per_second);
+ telemetry.tokens_per_second,
+ telemetry_error);
}
}
}
+std::map parse_vllm_metrics_text(const std::string& body) {
+ std::map telemetry_attrs;
+ std::istringstream stream(body);
+ std::string line;
+ std::vector> keys = {
+ {"vllm:gpu_cache_usage_factor", "llm.vllm.gpu_cache_usage_factor"},
+ {"vllm:cpu_cache_usage_factor", "llm.vllm.cpu_cache_usage_factor"},
+ {"vllm:num_requests_waiting", "llm.vllm.num_requests_waiting"},
+ {"vllm:num_requests_running", "llm.vllm.num_requests_running"},
+ {"vllm:num_requests_swapped", "llm.vllm.num_requests_swapped"}
+ };
+ while (std::getline(stream, line)) {
+ for (const auto& [prom_key, span_key] : keys) {
+ if (line.rfind(prom_key, 0) == 0) {
+ size_t last_space = line.find_last_of(" \t");
+ if (last_space != std::string::npos) {
+ try {
+ double val = std::stod(line.substr(last_space + 1));
+ telemetry_attrs[span_key] = val;
+ } catch (...) {}
+ }
+ break;
+ }
+ }
+ }
+ return telemetry_attrs;
+}
+
+std::map VLLMServer::get_additional_telemetry() {
+ if (get_backend_port() <= 0) {
+ return {};
+ }
+
+ std::string url = "http://127.0.0.1:" + std::to_string(get_backend_port()) + "/metrics";
+ try {
+ auto response = utils::HttpClient::get(url, {}, 1);
+ if (response.status_code == 200) {
+ return parse_vllm_metrics_text(response.body);
+ }
+ } catch (const std::exception& e) {
+ LOG(DEBUG, "vLLM") << "Failed to fetch metrics from vllm-server: " << e.what() << std::endl;
+ } catch (...) {
+ LOG(DEBUG, "vLLM") << "Failed to fetch metrics from vllm-server: unknown error" << std::endl;
+ }
+ return {};
+}
+
+std::string VLLMServer::get_additional_telemetry_url() const {
+ if (get_backend_port() <= 0) {
+ return "";
+ }
+ return "http://127.0.0.1:" + std::to_string(get_backend_port()) + "/metrics";
+}
+
+std::function(const std::string&)> VLLMServer::get_additional_telemetry_parser() const {
+ return [](const std::string& body) {
+ return parse_vllm_metrics_text(body);
+ };
+}
+
} // namespace backends
} // namespace lemon
diff --git a/src/cpp/server/main.cpp b/src/cpp/server/main.cpp
index 861858291..617af8cc4 100644
--- a/src/cpp/server/main.cpp
+++ b/src/cpp/server/main.cpp
@@ -115,6 +115,23 @@ int main(int argc, char** argv) {
if (!config->extra_models_dir().empty()) {
LOG(INFO) << " Extra models dir: " << config->extra_models_dir() << std::endl;
}
+ if (config->telemetry_enabled()) {
+ std::string endpoint = config->telemetry_otlp_endpoint();
+ std::vector semantics = config->telemetry_otlp_semantics();
+ std::string semantics_str = "";
+ for (size_t i = 0; i < semantics.size(); ++i) {
+ if (i > 0) semantics_str += ", ";
+ semantics_str += semantics[i];
+ }
+ if (endpoint.empty()) {
+ LOG(INFO) << " Telemetry: enabled (no endpoint configured, semantics: [" << semantics_str << "])" << std::endl;
+ } else {
+ LOG(INFO) << " Telemetry: enabled (" << config->telemetry_otlp_protocol()
+ << " -> " << endpoint << ", semantics: [" << semantics_str << "])" << std::endl;
+ }
+ } else {
+ LOG(INFO) << " Telemetry: disabled" << std::endl;
+ }
Server server(config, cli_config.cache_dir);
diff --git a/src/cpp/server/router.cpp b/src/cpp/server/router.cpp
index b3ec22c3b..4d5aa3dc5 100644
--- a/src/cpp/server/router.cpp
+++ b/src/cpp/server/router.cpp
@@ -10,17 +10,26 @@
#include "lemon/backends/sd_server.h"
#include "lemon/backends/vllm_server.h"
#include "lemon/server_capabilities.h"
+#include "lemon/streaming_proxy.h"
#include "lemon/error_types.h"
#include "lemon/recipe_options.h"
#include "lemon/auto_tune.h"
-#include
+#include "telemetry.h"
#include
+#include
+#include
+#include
+#include
+#include
#include "lemon/utils/aixlog.hpp"
#include "lemon/global_vram_monitor.h"
#include "lemon/eviction_engine.h"
+#include "lemon/utils/http_client.h"
namespace lemon {
+
+
Router::Router(RuntimeConfig* config, ModelManager* model_manager, BackendManager* backend_manager)
: config_(config), model_manager_(model_manager), backend_manager_(backend_manager) {
@@ -887,7 +896,7 @@ auto Router::execute_inference(const json& request, Func&& inference_func) -> de
// Template method for streaming execution
template
-void Router::execute_streaming(const std::string& request_body, httplib::DataSink& sink, Func&& streaming_func) {
+void Router::execute_streaming(const std::string& request_body, httplib::DataSink& sink, Func&& streaming_func, std::shared_ptr span) {
WrappedServer* server = nullptr;
std::string requested_model;
@@ -994,6 +1003,10 @@ void Router::execute_streaming(const std::string& request_body, httplib::DataSin
continue;
}
+ if (span) {
+ span->end_with_error(e.what());
+ }
+
json error = ErrorResponse::create(
std::string("Backend for model '") + requested_model +
"' crashed before streaming started and could not be reloaded: " + e.what(),
@@ -1012,39 +1025,253 @@ void Router::execute_streaming(const std::string& request_body, httplib::DataSin
}
json Router::chat_completion(const json& request) {
- return execute_inference(request, [&](WrappedServer* server) {
- return server->chat_completion(request);
- });
+ std::string requested_model = request.value("model", "");
+ std::shared_ptr span = telemetry::TelemetryTracker::start_span("LLM", "chat.completions", requested_model, request);
+
+ try {
+ WrappedServer* active_server = nullptr;
+ json response = execute_inference(request, [&](WrappedServer* server) {
+ active_server = server;
+ ModelTelemetryIdentity identity = get_telemetry_identity(server);
+ if (span) {
+ span->set_attribute("llm.backend", identity.recipe);
+ span->set_attribute("llm.device_type", identity.device);
+ span->set_attribute("llm.checkpoint", identity.checkpoint);
+ span->set_attribute("llm.recipe", identity.recipe);
+ if (request.contains("temperature")) span->set_attribute("llm.config.temperature", request["temperature"]);
+ if (request.contains("top_p")) span->set_attribute("llm.config.top_p", request["top_p"]);
+ if (request.contains("max_tokens")) span->set_attribute("llm.config.max_tokens", request["max_tokens"]);
+ if (request.contains("max_completion_tokens")) span->set_attribute("llm.config.max_completion_tokens", request["max_completion_tokens"]);
+ }
+ return server->chat_completion(request);
+ });
+
+ if (span) {
+ if (response.contains("error")) {
+ std::string error_msg = "Request failed";
+ if (response["error"].contains("message") && response["error"]["message"].is_string()) {
+ error_msg = response["error"]["message"].get();
+ }
+ span->end_with_error(error_msg);
+ } else {
+ nlohmann::json usage_payload = nlohmann::json::object();
+ std::string text_output = "";
+ if (response.contains("usage")) {
+ auto usage = response["usage"];
+ if (usage.contains("prompt_tokens")) usage_payload["prompt_tokens"] = usage["prompt_tokens"].get();
+ if (usage.contains("completion_tokens")) usage_payload["completion_tokens"] = usage["completion_tokens"].get();
+ }
+ if (response.contains("timings")) {
+ auto timings = response["timings"];
+ if (timings.contains("prompt_n")) usage_payload["prompt_tokens"] = timings["prompt_n"].get();
+ if (timings.contains("predicted_n")) usage_payload["completion_tokens"] = timings["predicted_n"].get();
+
+ if (timings.contains("prompt_ms") && timings.contains("prompt_n")) {
+ double prompt_ms = timings["prompt_ms"].get();
+ if (prompt_ms > 0) {
+ span->set_attribute("llm.performance.time_to_first_token", prompt_ms / 1000.0);
+ }
+ }
+ if (timings.contains("predicted_ms") && timings.contains("predicted_n")) {
+ double predicted_ms = timings["predicted_ms"].get();
+ int predicted_n = timings["predicted_n"].get();
+ if (predicted_ms > 0 && predicted_n > 0) {
+ span->set_attribute("llm.performance.tokens_per_second", (predicted_n / (predicted_ms / 1000.0)));
+ }
+ }
+ }
+
+ if (response.contains("choices") && response["choices"].is_array() && !response["choices"].empty()) {
+ auto choice = response["choices"][0];
+ std::string reasoning_output = "";
+ if (choice.contains("message")) {
+ auto msg = choice["message"];
+ if (msg.contains("reasoning_content") && msg["reasoning_content"].is_string()) {
+ reasoning_output = msg["reasoning_content"].get();
+ } else if (msg.contains("thinking") && msg["thinking"].is_string()) {
+ reasoning_output = msg["thinking"].get();
+ }
+ if (msg.contains("content") && msg["content"].is_string()) {
+ text_output = msg["content"].get();
+ }
+ }
+ if (!reasoning_output.empty()) {
+ text_output = "\n" + reasoning_output + "\n\n" + text_output;
+ }
+ }
+
+ std::string url;
+ std::function(const std::string&)> parser;
+ if (active_server) {
+ url = active_server->get_additional_telemetry_url();
+ parser = active_server->get_additional_telemetry_parser();
+ }
+ telemetry::end_llm_span_async(span, url, parser, usage_payload, text_output);
+ }
+ }
+ return response;
+ } catch (const std::exception& e) {
+ if (span) span->end_with_error(e.what());
+ throw;
+ }
}
json Router::completion(const json& request) {
- return execute_inference(request, [&](WrappedServer* server) {
- return server->completion(request);
- });
+ std::string requested_model = request.value("model", "");
+ std::shared_ptr span = telemetry::TelemetryTracker::start_span("LLM", "completions", requested_model, request);
+
+ try {
+ WrappedServer* active_server = nullptr;
+ json response = execute_inference(request, [&](WrappedServer* server) {
+ active_server = server;
+ ModelTelemetryIdentity identity = get_telemetry_identity(server);
+ if (span) {
+ span->set_attribute("llm.backend", identity.recipe);
+ span->set_attribute("llm.device_type", identity.device);
+ span->set_attribute("llm.checkpoint", identity.checkpoint);
+ span->set_attribute("llm.recipe", identity.recipe);
+ if (request.contains("temperature")) span->set_attribute("llm.config.temperature", request["temperature"]);
+ if (request.contains("top_p")) span->set_attribute("llm.config.top_p", request["top_p"]);
+ if (request.contains("max_tokens")) span->set_attribute("llm.config.max_tokens", request["max_tokens"]);
+ }
+ return server->completion(request);
+ });
+
+ if (span) {
+ if (response.contains("error")) {
+ std::string error_msg = "Request failed";
+ if (response["error"].contains("message") && response["error"]["message"].is_string()) {
+ error_msg = response["error"]["message"].get();
+ }
+ span->end_with_error(error_msg);
+ } else {
+ nlohmann::json usage_payload = nlohmann::json::object();
+ std::string text_output = "";
+ if (response.contains("usage")) {
+ auto usage = response["usage"];
+ if (usage.contains("prompt_tokens")) usage_payload["prompt_tokens"] = usage["prompt_tokens"].get();
+ if (usage.contains("completion_tokens")) usage_payload["completion_tokens"] = usage["completion_tokens"].get();
+ }
+
+ if (response.contains("choices") && response["choices"].is_array() && !response["choices"].empty()) {
+ auto choice = response["choices"][0];
+ if (choice.contains("text") && choice["text"].is_string()) {
+ text_output = choice["text"].get();
+ }
+ }
+
+ std::string url;
+ std::function(const std::string&)> parser;
+ if (active_server) {
+ url = active_server->get_additional_telemetry_url();
+ parser = active_server->get_additional_telemetry_parser();
+ }
+ telemetry::end_llm_span_async(span, url, parser, usage_payload, text_output);
+ }
+ }
+ return response;
+ } catch (const std::exception& e) {
+ if (span) span->end_with_error(e.what());
+ throw;
+ }
}
json Router::embeddings(const json& request) {
- return execute_inference(request, [&](WrappedServer* server) {
- auto embeddings_server = dynamic_cast(server);
- if (!embeddings_server) {
- return ErrorResponse::from_exception(
- UnsupportedOperationException("Embeddings", device_type_to_string(server->get_device_type()))
- );
+ std::string requested_model = request.value("model", "");
+ std::shared_ptr span = telemetry::TelemetryTracker::start_span("EMBEDDING", "embeddings", requested_model, request);
+
+ try {
+ json response = execute_inference(request, [&](WrappedServer* server) {
+ ModelTelemetryIdentity identity = get_telemetry_identity(server);
+ if (span) {
+ span->set_attribute("embedding.backend", identity.recipe);
+ span->set_attribute("embedding.device_type", identity.device);
+ span->set_attribute("embedding.checkpoint", identity.checkpoint);
+ span->set_attribute("embedding.recipe", identity.recipe);
+ }
+ auto embeddings_server = dynamic_cast(server);
+ if (!embeddings_server) {
+ return ErrorResponse::from_exception(
+ UnsupportedOperationException("Embeddings", device_type_to_string(server->get_device_type()))
+ );
+ }
+ return embeddings_server->embeddings(request);
+ });
+
+ if (span) {
+ if (response.contains("error")) {
+ std::string error_msg = "Request failed";
+ if (response["error"].contains("message") && response["error"]["message"].is_string()) {
+ error_msg = response["error"]["message"].get();
+ }
+ span->end_with_error(error_msg);
+ } else {
+ nlohmann::json usage_payload = nlohmann::json::object();
+ if (response.contains("usage")) {
+ auto usage = response["usage"];
+ if (usage.contains("prompt_tokens")) usage_payload["prompt_tokens"] = usage["prompt_tokens"].get();
+ if (usage.contains("total_tokens")) usage_payload["total_tokens"] = usage["total_tokens"].get();
+ }
+ std::string output_dump = "";
+ if (response.contains("data") && response["data"].is_array()) {
+ output_dump = "Embeddings data with " + std::to_string(response["data"].size()) + " vectors.";
+ } else {
+ output_dump = response.dump();
+ }
+ span->end_with_success(usage_payload, output_dump);
+ }
}
- return embeddings_server->embeddings(request);
- });
+ return response;
+ } catch (const std::exception& e) {
+ if (span) span->end_with_error(e.what());
+ throw;
+ }
}
json Router::reranking(const json& request) {
- return execute_inference(request, [&](WrappedServer* server) {
- auto reranking_server = dynamic_cast(server);
- if (!reranking_server) {
- return ErrorResponse::from_exception(
- UnsupportedOperationException("Reranking", device_type_to_string(server->get_device_type()))
- );
+ std::string requested_model = request.value("model", "");
+ std::shared_ptr span = telemetry::TelemetryTracker::start_span("RERANKER", "reranking", requested_model, request);
+
+ try {
+ json response = execute_inference(request, [&](WrappedServer* server) {
+ ModelTelemetryIdentity identity = get_telemetry_identity(server);
+ if (span) {
+ span->set_attribute("reranker.backend", identity.recipe);
+ span->set_attribute("reranker.device_type", identity.device);
+ span->set_attribute("reranker.checkpoint", identity.checkpoint);
+ span->set_attribute("reranker.recipe", identity.recipe);
+ }
+ auto reranking_server = dynamic_cast(server);
+ if (!reranking_server) {
+ return ErrorResponse::from_exception(
+ UnsupportedOperationException("Reranking", device_type_to_string(server->get_device_type()))
+ );
+ }
+ return reranking_server->reranking(request);
+ });
+
+ if (span) {
+ if (response.contains("error")) {
+ std::string error_msg = "Request failed";
+ if (response["error"].contains("message") && response["error"]["message"].is_string()) {
+ error_msg = response["error"]["message"].get();
+ }
+ span->end_with_error(error_msg);
+ } else {
+ nlohmann::json usage_payload = nlohmann::json::object();
+ if (response.contains("usage")) {
+ auto usage = response["usage"];
+ if (usage.contains("prompt_tokens")) usage_payload["prompt_tokens"] = usage["prompt_tokens"].get();
+ if (usage.contains("total_tokens")) usage_payload["total_tokens"] = usage["total_tokens"].get();
+ }
+ span->end_with_success(usage_payload, response.dump());
+ }
}
- return reranking_server->reranking(request);
- });
+ return response;
+ } catch (const std::exception& e) {
+ if (span) span->end_with_error(e.what());
+ throw;
+ }
}
json Router::get_slots() {
@@ -1399,48 +1626,381 @@ void Router::update_prompt_tokens(const std::string& model_name, int prompt_toke
}
void Router::chat_completion_stream(const std::string& request_body, httplib::DataSink& sink) {
- execute_streaming(request_body, sink, [&](WrappedServer* server) {
- ModelTelemetryIdentity identity = get_telemetry_identity(server);
- server->forward_streaming_request("/v1/chat/completions", request_body, sink, true, 0,
- [this, identity](int input_tokens,
- int output_tokens,
- double time_to_first_token,
- double tokens_per_second) {
- record_telemetry_for_model(identity, input_tokens, output_tokens,
- time_to_first_token, tokens_per_second);
- record_prompt_tokens_for_model(identity, input_tokens);
- });
- });
+ json request_json;
+ try {
+ request_json = json::parse(request_body);
+ } catch (...) {}
+ std::string requested_model = request_json.value("model", "");
+
+ std::shared_ptr span = telemetry::TelemetryTracker::start_span("LLM", "chat.completions", requested_model, request_json);
+
+ bool hide_outputs = false;
+ bool hide_thinking = false;
+ if (auto* config = RuntimeConfig::global()) {
+ hide_outputs = config->telemetry_hide_outputs();
+ hide_thinking = config->telemetry_hide_thinking();
+ }
+
+ auto accumulated_text = std::make_shared();
+ auto accumulated_reasoning = std::make_shared();
+ auto line_buffer = std::make_shared();
+
+ httplib::DataSink telemetry_sink;
+ telemetry_sink.write = [accumulated_text, accumulated_reasoning, line_buffer, &sink, hide_outputs, hide_thinking](const char* data, size_t len) -> bool {
+ bool success = false;
+ if (sink.write) {
+ success = sink.write(data, len);
+ }
+ line_buffer->append(data, len);
+ StreamingProxy::process_sse_lines(*line_buffer, [accumulated_text, accumulated_reasoning, hide_outputs, hide_thinking](const std::string& line) {
+ if (line.rfind("data: ", 0) == 0) {
+ std::string json_str = line.substr(6);
+ if (json_str.find("[DONE]") == std::string::npos) {
+ try {
+ auto parsed = json::parse(json_str);
+ if (parsed.contains("choices") && parsed["choices"].is_array() && !parsed["choices"].empty()) {
+ auto delta = parsed["choices"][0]["delta"];
+ if (!hide_thinking) {
+ if (delta.contains("reasoning_content") && delta["reasoning_content"].is_string()) {
+ *accumulated_reasoning += delta["reasoning_content"].get();
+ } else if (delta.contains("thinking") && delta["thinking"].is_string()) {
+ *accumulated_reasoning += delta["thinking"].get();
+ }
+ }
+ if (!hide_outputs) {
+ if (delta.contains("content") && delta["content"].is_string()) {
+ *accumulated_text += delta["content"].get();
+ }
+ }
+ }
+ } catch (...) {}
+ }
+ }
+ });
+ return success;
+ };
+
+ telemetry_sink.is_writable = [&sink]() -> bool {
+ return sink.is_writable ? sink.is_writable() : true;
+ };
+
+ telemetry_sink.done = [&sink]() {
+ if (sink.done) {
+ sink.done();
+ }
+ };
+
+ telemetry_sink.done_with_trailer = [&sink](const httplib::Headers& trailer) {
+ if (sink.done_with_trailer) {
+ sink.done_with_trailer(trailer);
+ } else if (sink.done) {
+ sink.done();
+ }
+ };
+
+ try {
+ execute_streaming(request_body, telemetry_sink, [&](WrappedServer* server) {
+ ModelTelemetryIdentity identity = get_telemetry_identity(server);
+
+ if (span) {
+ span->set_attribute("llm.backend", identity.recipe);
+ span->set_attribute("llm.device_type", identity.device);
+ span->set_attribute("llm.checkpoint", identity.checkpoint);
+ span->set_attribute("llm.recipe", identity.recipe);
+ if (request_json.contains("temperature")) span->set_attribute("llm.config.temperature", request_json["temperature"]);
+ if (request_json.contains("top_p")) span->set_attribute("llm.config.top_p", request_json["top_p"]);
+ if (request_json.contains("max_tokens")) span->set_attribute("llm.config.max_tokens", request_json["max_tokens"]);
+ if (request_json.contains("max_completion_tokens")) span->set_attribute("llm.config.max_completion_tokens", request_json["max_completion_tokens"]);
+ }
+
+ server->forward_streaming_request("/v1/chat/completions", request_body, telemetry_sink, true, 0,
+ [this, identity, span, accumulated_text, accumulated_reasoning, server](int input_tokens,
+ int output_tokens,
+ double time_to_first_token,
+ double tokens_per_second,
+ const std::string& error_message) {
+ if (!error_message.empty()) {
+ if (span) {
+ span->end_with_error(error_message);
+ }
+ return;
+ }
+ record_telemetry_for_model(identity, input_tokens, output_tokens,
+ time_to_first_token, tokens_per_second);
+ record_prompt_tokens_for_model(identity, input_tokens);
+
+ if (span) {
+ nlohmann::json usage_payload = {
+ {"prompt_tokens", input_tokens},
+ {"completion_tokens", output_tokens}
+ };
+ span->set_attribute("llm.performance.time_to_first_token", time_to_first_token);
+ span->set_attribute("llm.performance.tokens_per_second", tokens_per_second);
+ std::string final_output = *accumulated_text;
+ if (!accumulated_reasoning->empty()) {
+ final_output = "\n" + *accumulated_reasoning + "\n\n" + final_output;
+ }
+
+ std::string url;
+ std::function(const std::string&)> parser;
+ if (server) {
+ url = server->get_additional_telemetry_url();
+ parser = server->get_additional_telemetry_parser();
+ }
+ telemetry::end_llm_span_async(span, url, parser, usage_payload, final_output);
+ }
+ });
+ }, span);
+ } catch (const std::exception& e) {
+ if (span) span->end_with_error(e.what());
+ throw;
+ } catch (...) {
+ if (span) span->end_with_error("Unknown error during streaming");
+ throw;
+ }
}
void Router::completion_stream(const std::string& request_body, httplib::DataSink& sink) {
- execute_streaming(request_body, sink, [&](WrappedServer* server) {
- ModelTelemetryIdentity identity = get_telemetry_identity(server);
- server->forward_streaming_request("/v1/completions", request_body, sink, true, 0,
- [this, identity](int input_tokens,
- int output_tokens,
- double time_to_first_token,
- double tokens_per_second) {
- record_telemetry_for_model(identity, input_tokens, output_tokens,
- time_to_first_token, tokens_per_second);
- record_prompt_tokens_for_model(identity, input_tokens);
- });
- });
+ json request_json;
+ try {
+ request_json = json::parse(request_body);
+ } catch (...) {}
+ std::string requested_model = request_json.value("model", "");
+
+ std::shared_ptr span = telemetry::TelemetryTracker::start_span("LLM", "completions", requested_model, request_json);
+
+ bool hide_outputs = false;
+ if (auto* config = RuntimeConfig::global()) {
+ hide_outputs = config->telemetry_hide_outputs();
+ }
+
+ auto accumulated_text = std::make_shared();
+ auto line_buffer = std::make_shared();
+
+ httplib::DataSink telemetry_sink;
+ telemetry_sink.write = [accumulated_text, line_buffer, &sink, hide_outputs](const char* data, size_t len) -> bool {
+ bool success = false;
+ if (sink.write) {
+ success = sink.write(data, len);
+ }
+ line_buffer->append(data, len);
+ StreamingProxy::process_sse_lines(*line_buffer, [accumulated_text, hide_outputs](const std::string& line) {
+ if (line.rfind("data: ", 0) == 0) {
+ std::string json_str = line.substr(6);
+ if (json_str.find("[DONE]") == std::string::npos) {
+ try {
+ auto parsed = json::parse(json_str);
+ if (parsed.contains("choices") && parsed["choices"].is_array() && !parsed["choices"].empty()) {
+ auto choice = parsed["choices"][0];
+ if (!hide_outputs) {
+ if (choice.contains("text") && choice["text"].is_string()) {
+ *accumulated_text += choice["text"].get();
+ }
+ }
+ }
+ } catch (...) {}
+ }
+ }
+ });
+ return success;
+ };
+
+ telemetry_sink.is_writable = [&sink]() -> bool {
+ return sink.is_writable ? sink.is_writable() : true;
+ };
+
+ telemetry_sink.done = [&sink]() {
+ if (sink.done) {
+ sink.done();
+ }
+ };
+
+ telemetry_sink.done_with_trailer = [&sink](const httplib::Headers& trailer) {
+ if (sink.done_with_trailer) {
+ sink.done_with_trailer(trailer);
+ } else if (sink.done) {
+ sink.done();
+ }
+ };
+
+ try {
+ execute_streaming(request_body, telemetry_sink, [&](WrappedServer* server) {
+ ModelTelemetryIdentity identity = get_telemetry_identity(server);
+
+ if (span) {
+ span->set_attribute("llm.backend", identity.recipe);
+ span->set_attribute("llm.device_type", identity.device);
+ span->set_attribute("llm.checkpoint", identity.checkpoint);
+ span->set_attribute("llm.recipe", identity.recipe);
+ if (request_json.contains("temperature")) span->set_attribute("llm.config.temperature", request_json["temperature"]);
+ if (request_json.contains("top_p")) span->set_attribute("llm.config.top_p", request_json["top_p"]);
+ if (request_json.contains("max_tokens")) span->set_attribute("llm.config.max_tokens", request_json["max_tokens"]);
+ }
+
+ server->forward_streaming_request("/v1/completions", request_body, telemetry_sink, true, 0,
+ [this, identity, span, accumulated_text, server](int input_tokens,
+ int output_tokens,
+ double time_to_first_token,
+ double tokens_per_second,
+ const std::string& error_message) {
+ if (!error_message.empty()) {
+ if (span) {
+ span->end_with_error(error_message);
+ }
+ return;
+ }
+ record_telemetry_for_model(identity, input_tokens, output_tokens,
+ time_to_first_token, tokens_per_second);
+ record_prompt_tokens_for_model(identity, input_tokens);
+
+ if (span) {
+ nlohmann::json usage_payload = {
+ {"prompt_tokens", input_tokens},
+ {"completion_tokens", output_tokens}
+ };
+ span->set_attribute("llm.performance.time_to_first_token", time_to_first_token);
+ span->set_attribute("llm.performance.tokens_per_second", tokens_per_second);
+
+ std::string url;
+ std::function(const std::string&)> parser;
+ if (server) {
+ url = server->get_additional_telemetry_url();
+ parser = server->get_additional_telemetry_parser();
+ }
+ telemetry::end_llm_span_async(span, url, parser, usage_payload, *accumulated_text);
+ }
+ });
+ }, span);
+ } catch (const std::exception& e) {
+ if (span) span->end_with_error(e.what());
+ throw;
+ } catch (...) {
+ if (span) span->end_with_error("Unknown error during streaming");
+ throw;
+ }
}
void Router::responses_stream(const std::string& request_body, httplib::DataSink& sink) {
- execute_streaming(request_body, sink, [&](WrappedServer* server) {
- ModelTelemetryIdentity identity = get_telemetry_identity(server);
- server->forward_streaming_request("/v1/responses", request_body, sink, true, 0,
- [this, identity](int input_tokens,
- int output_tokens,
- double time_to_first_token,
- double tokens_per_second) {
- record_telemetry_for_model(identity, input_tokens, output_tokens,
- time_to_first_token, tokens_per_second);
- record_prompt_tokens_for_model(identity, input_tokens);
- });
- });
+ json request_json;
+ try {
+ request_json = json::parse(request_body);
+ } catch (...) {}
+ std::string requested_model = request_json.value("model", "");
+
+ std::shared_ptr span = telemetry::TelemetryTracker::start_span("LLM", "responses", requested_model, request_json);
+
+ bool hide_outputs = false;
+ if (auto* config = RuntimeConfig::global()) {
+ hide_outputs = config->telemetry_hide_outputs();
+ }
+
+ auto accumulated_text = std::make_shared();
+ auto line_buffer = std::make_shared();
+
+ httplib::DataSink telemetry_sink;
+ telemetry_sink.write = [accumulated_text, line_buffer, &sink, hide_outputs](const char* data, size_t len) -> bool {
+ bool success = false;
+ if (sink.write) {
+ success = sink.write(data, len);
+ }
+ line_buffer->append(data, len);
+ StreamingProxy::process_sse_lines(*line_buffer, [accumulated_text, hide_outputs](const std::string& line) {
+ if (line.rfind("data: ", 0) == 0) {
+ std::string json_str = line.substr(6);
+ if (json_str.find("[DONE]") == std::string::npos) {
+ try {
+ auto parsed = json::parse(json_str);
+ if (!hide_outputs) {
+ if (parsed.contains("choices") && parsed["choices"].is_array() && !parsed["choices"].empty()) {
+ auto delta = parsed["choices"][0]["delta"];
+ if (delta.contains("content") && delta["content"].is_string()) {
+ *accumulated_text += delta["content"].get();
+ }
+ }
+ if (parsed.contains("response") && parsed["response"].is_string()) {
+ *accumulated_text += parsed["response"].get();
+ }
+ }
+ } catch (...) {}
+ }
+ }
+ });
+ return success;
+ };
+
+ telemetry_sink.is_writable = [&sink]() -> bool {
+ return sink.is_writable ? sink.is_writable() : true;
+ };
+
+ telemetry_sink.done = [&sink]() {
+ if (sink.done) {
+ sink.done();
+ }
+ };
+
+ telemetry_sink.done_with_trailer = [&sink](const httplib::Headers& trailer) {
+ if (sink.done_with_trailer) {
+ sink.done_with_trailer(trailer);
+ } else if (sink.done) {
+ sink.done();
+ }
+ };
+
+ try {
+ execute_streaming(request_body, telemetry_sink, [&](WrappedServer* server) {
+ ModelTelemetryIdentity identity = get_telemetry_identity(server);
+
+ if (span) {
+ span->set_attribute("llm.backend", identity.recipe);
+ span->set_attribute("llm.device_type", identity.device);
+ span->set_attribute("llm.checkpoint", identity.checkpoint);
+ span->set_attribute("llm.recipe", identity.recipe);
+ if (request_json.contains("temperature")) span->set_attribute("llm.config.temperature", request_json["temperature"]);
+ if (request_json.contains("top_p")) span->set_attribute("llm.config.top_p", request_json["top_p"]);
+ if (request_json.contains("max_tokens")) span->set_attribute("llm.config.max_tokens", request_json["max_tokens"]);
+ }
+
+ server->forward_streaming_request("/v1/responses", request_body, telemetry_sink, true, 0,
+ [this, identity, span, accumulated_text, server](int input_tokens,
+ int output_tokens,
+ double time_to_first_token,
+ double tokens_per_second,
+ const std::string& error_message) {
+ if (!error_message.empty()) {
+ if (span) {
+ span->end_with_error(error_message);
+ }
+ return;
+ }
+ record_telemetry_for_model(identity, input_tokens, output_tokens,
+ time_to_first_token, tokens_per_second);
+ record_prompt_tokens_for_model(identity, input_tokens);
+
+ if (span) {
+ nlohmann::json usage_payload = {
+ {"prompt_tokens", input_tokens},
+ {"completion_tokens", output_tokens}
+ };
+ span->set_attribute("llm.performance.time_to_first_token", time_to_first_token);
+ span->set_attribute("llm.performance.tokens_per_second", tokens_per_second);
+
+ std::string url;
+ std::function(const std::string&)> parser;
+ if (server) {
+ url = server->get_additional_telemetry_url();
+ parser = server->get_additional_telemetry_parser();
+ }
+ telemetry::end_llm_span_async(span, url, parser, usage_payload, *accumulated_text);
+ }
+ });
+ }, span);
+ } catch (const std::exception& e) {
+ if (span) span->end_with_error(e.what());
+ throw;
+ } catch (...) {
+ if (span) span->end_with_error("Unknown error during streaming");
+ throw;
+ }
}
int Router::count_pinned_servers_by_type(ModelType type) const {
diff --git a/src/cpp/server/runtime_config.cpp b/src/cpp/server/runtime_config.cpp
index 261de4477..401171a84 100644
--- a/src/cpp/server/runtime_config.cpp
+++ b/src/cpp/server/runtime_config.cpp
@@ -8,6 +8,7 @@
#include
#include
#include
+#include
#include
namespace fs = std::filesystem;
@@ -285,6 +286,85 @@ std::string RuntimeConfig::rocm_channel_for_recipe(const std::string& recipe) co
return channel;
}
+bool RuntimeConfig::telemetry_enabled() const {
+ return get_bool_opt(nullptr, {"telemetry", "enabled"}, false);
+}
+
+bool RuntimeConfig::telemetry_hide_inputs() const {
+ return get_bool_opt(nullptr, {"telemetry", "hide_inputs"}, false);
+}
+
+bool RuntimeConfig::telemetry_hide_outputs() const {
+ return get_bool_opt(nullptr, {"telemetry", "hide_outputs"}, false);
+}
+
+bool RuntimeConfig::telemetry_hide_thinking() const {
+ return get_bool_opt(nullptr, {"telemetry", "hide_thinking"}, false);
+}
+
+int RuntimeConfig::telemetry_max_queue_capacity() const {
+ return get_int_opt(nullptr, {"telemetry", "max_queue_capacity"}, 1000);
+}
+
+int RuntimeConfig::telemetry_max_attribute_length() const {
+ return get_int_opt(nullptr, {"telemetry", "max_attribute_length"}, 4096);
+}
+
+std::string RuntimeConfig::telemetry_otlp_endpoint() const {
+ return get_string_opt(nullptr, {"telemetry", "otlp", "endpoint"}, "http://localhost:4318/v1/traces");
+}
+
+std::string RuntimeConfig::telemetry_otlp_protocol() const {
+ return get_string_opt(nullptr, {"telemetry", "otlp", "protocol"}, "http/protobuf");
+}
+
+std::vector RuntimeConfig::telemetry_otlp_semantics() const {
+ std::shared_lock lock(mutex_);
+ std::vector semantics;
+ if (config_.contains("telemetry") && config_["telemetry"].is_object() &&
+ config_["telemetry"].contains("otlp") && config_["telemetry"]["otlp"].is_object() &&
+ config_["telemetry"]["otlp"].contains("semantics") && config_["telemetry"]["otlp"]["semantics"].is_array()) {
+ for (const auto& item : config_["telemetry"]["otlp"]["semantics"]) {
+ if (item.is_string()) {
+ semantics.push_back(item.get());
+ }
+ }
+ }
+ return semantics;
+}
+
+std::map RuntimeConfig::telemetry_otlp_headers() const {
+ std::map headers;
+ std::shared_lock lock(mutex_);
+ if (config_.contains("telemetry") && config_["telemetry"].is_object() &&
+ config_["telemetry"].contains("otlp") && config_["telemetry"]["otlp"].is_object() &&
+ config_["telemetry"]["otlp"].contains("headers") && config_["telemetry"]["otlp"]["headers"].is_object()) {
+ const auto& headers_json = config_["telemetry"]["otlp"]["headers"];
+ for (auto& [key, value] : headers_json.items()) {
+ if (value.is_string()) {
+ headers[key] = value.get();
+ }
+ }
+ }
+ return headers;
+}
+
+int RuntimeConfig::telemetry_otlp_max_retries() const {
+ return get_int_opt(nullptr, {"telemetry", "otlp", "max_retries"}, 0);
+}
+
+double RuntimeConfig::telemetry_otlp_retry_backoff_base_s() const {
+ return get_double_opt(nullptr, {"telemetry", "otlp", "retry_backoff_base_s"}, 5.0);
+}
+
+int RuntimeConfig::telemetry_otlp_send_batch_size() const {
+ return get_int_opt(nullptr, {"telemetry", "otlp", "send_batch_size"}, 100);
+}
+
+double RuntimeConfig::telemetry_otlp_batch_timeout_s() const {
+ return get_double_opt(nullptr, {"telemetry", "otlp", "batch_timeout_s"}, 1.0);
+}
+
json RuntimeConfig::backend_config(const std::string& backend_name) const {
std::shared_lock lock(mutex_);
if (config_.contains(backend_name) && config_[backend_name].is_object()) {
@@ -495,6 +575,124 @@ void RuntimeConfig::validate(const std::string& key, const json& value) const {
if (channel != "stable" && channel != "nightly") {
throw std::invalid_argument("'rocm_channel' must be either 'stable', or 'nightly'");
}
+ } else if (key == "telemetry") {
+ if (!value.is_object()) {
+ throw std::invalid_argument("'telemetry' must be an object");
+ }
+ static const std::unordered_set valid_telemetry_keys = {
+ "enabled", "hide_inputs", "hide_outputs", "hide_thinking", "max_queue_capacity", "max_attribute_length", "otlp"
+ };
+ for (auto& [t_key, t_val] : value.items()) {
+ if (valid_telemetry_keys.find(t_key) == valid_telemetry_keys.end()) {
+ throw std::invalid_argument("Unknown config key: 'telemetry." + t_key + "'");
+ }
+ }
+ if (value.contains("enabled") && !value["enabled"].is_boolean()) {
+ throw std::invalid_argument("'telemetry.enabled' must be a boolean");
+ }
+ if (value.contains("hide_inputs") && !value["hide_inputs"].is_boolean()) {
+ throw std::invalid_argument("'telemetry.hide_inputs' must be a boolean");
+ }
+ if (value.contains("hide_outputs") && !value["hide_outputs"].is_boolean()) {
+ throw std::invalid_argument("'telemetry.hide_outputs' must be a boolean");
+ }
+ if (value.contains("hide_thinking") && !value["hide_thinking"].is_boolean()) {
+ throw std::invalid_argument("'telemetry.hide_thinking' must be a boolean");
+ }
+ if (value.contains("max_queue_capacity")) {
+ if (!value["max_queue_capacity"].is_number_integer()) {
+ throw std::invalid_argument("'telemetry.max_queue_capacity' must be an integer");
+ }
+ if (value["max_queue_capacity"].get() <= 0) {
+ throw std::invalid_argument("'telemetry.max_queue_capacity' must be > 0");
+ }
+ }
+ if (value.contains("max_attribute_length")) {
+ if (!value["max_attribute_length"].is_number_integer()) {
+ throw std::invalid_argument("'telemetry.max_attribute_length' must be an integer");
+ }
+ if (value["max_attribute_length"].get() <= 0) {
+ throw std::invalid_argument("'telemetry.max_attribute_length' must be > 0");
+ }
+ }
+ if (value.contains("otlp")) {
+ const auto& otlp = value["otlp"];
+ if (!otlp.is_object()) {
+ throw std::invalid_argument("'telemetry.otlp' must be an object");
+ }
+ static const std::unordered_set valid_otlp_keys = {
+ "endpoint", "protocol", "semantics", "headers", "max_retries",
+ "retry_backoff_base_s", "send_batch_size", "batch_timeout_s"
+ };
+ for (auto& [otlp_key, otlp_val] : otlp.items()) {
+ if (valid_otlp_keys.find(otlp_key) == valid_otlp_keys.end()) {
+ throw std::invalid_argument("Unknown config key: 'telemetry.otlp." + otlp_key + "'");
+ }
+ }
+ if (otlp.contains("endpoint") && !otlp["endpoint"].is_string()) {
+ throw std::invalid_argument("'telemetry.otlp.endpoint' must be a string");
+ }
+ if (otlp.contains("protocol")) {
+ if (!otlp["protocol"].is_string()) {
+ throw std::invalid_argument("'telemetry.otlp.protocol' must be a string");
+ }
+ std::string proto = otlp["protocol"].get();
+ if (proto != "http/protobuf" && proto != "http/json") {
+ throw std::invalid_argument("'telemetry.otlp.protocol' must be either 'http/protobuf', or 'http/json'");
+ }
+ }
+ if (otlp.contains("semantics")) {
+ if (!otlp["semantics"].is_array()) {
+ throw std::invalid_argument("'telemetry.otlp.semantics' must be an array");
+ }
+ for (const auto& item : otlp["semantics"]) {
+ if (!item.is_string()) {
+ throw std::invalid_argument("'telemetry.otlp.semantics' items must be strings");
+ }
+ std::string sem = item.get();
+ if (sem != "openinference" && sem != "otel_genai") {
+ throw std::invalid_argument("'telemetry.otlp.semantics' items must be 'openinference' or 'otel_genai'");
+ }
+ }
+ }
+ if (otlp.contains("headers") && !otlp["headers"].is_object()) {
+ throw std::invalid_argument("'telemetry.otlp.headers' must be an object");
+ }
+ if (otlp.contains("max_retries")) {
+ if (!otlp["max_retries"].is_number_integer()) {
+ throw std::invalid_argument("'telemetry.otlp.max_retries' must be an integer");
+ }
+ if (otlp["max_retries"].get() < 0) {
+ throw std::invalid_argument("'telemetry.otlp.max_retries' must be >= 0");
+ }
+ }
+ if (otlp.contains("retry_backoff_base_s")) {
+ if (!otlp["retry_backoff_base_s"].is_number()) {
+ throw std::invalid_argument("'telemetry.otlp.retry_backoff_base_s' must be a number");
+ }
+ double base_val = otlp["retry_backoff_base_s"].get();
+ if (base_val < 0.0) {
+ throw std::invalid_argument("'telemetry.otlp.retry_backoff_base_s' must be >= 0");
+ }
+ }
+ if (otlp.contains("send_batch_size")) {
+ if (!otlp["send_batch_size"].is_number_integer()) {
+ throw std::invalid_argument("'telemetry.otlp.send_batch_size' must be an integer");
+ }
+ if (otlp["send_batch_size"].get() < 1) {
+ throw std::invalid_argument("'telemetry.otlp.send_batch_size' must be >= 1");
+ }
+ }
+ if (otlp.contains("batch_timeout_s")) {
+ if (!otlp["batch_timeout_s"].is_number()) {
+ throw std::invalid_argument("'telemetry.otlp.batch_timeout_s' must be a number");
+ }
+ double to_val = otlp["batch_timeout_s"].get();
+ if (to_val <= 0.0) {
+ throw std::invalid_argument("'telemetry.otlp.batch_timeout_s' must be positive");
+ }
+ }
+ }
} else if (is_backend_name(key)) {
if (!value.is_object()) {
throw std::invalid_argument("'" + key + "' must be an object");
@@ -574,6 +772,37 @@ void RuntimeConfig::apply_changes(const json& changes, json& applied_diff) {
applied_diff[key][sub_key] = sub_value;
}
}
+ } else if (key == "telemetry" && value.is_object()) {
+ if (!config_.contains("telemetry") || !config_["telemetry"].is_object()) {
+ config_["telemetry"] = json::object();
+ }
+ for (auto& [t_key, t_val] : value.items()) {
+ if (t_key == "otlp" && t_val.is_object()) {
+ if (!config_["telemetry"].contains("otlp") || !config_["telemetry"]["otlp"].is_object()) {
+ config_["telemetry"]["otlp"] = json::object();
+ }
+ for (auto& [otlp_key, otlp_val] : t_val.items()) {
+ if (!config_["telemetry"]["otlp"].contains(otlp_key) || config_["telemetry"]["otlp"][otlp_key] != otlp_val) {
+ config_["telemetry"]["otlp"][otlp_key] = otlp_val;
+ if (!applied_diff.contains("telemetry")) {
+ applied_diff["telemetry"] = json::object();
+ }
+ if (!applied_diff["telemetry"].contains("otlp")) {
+ applied_diff["telemetry"]["otlp"] = json::object();
+ }
+ applied_diff["telemetry"]["otlp"][otlp_key] = otlp_val;
+ }
+ }
+ } else {
+ if (!config_["telemetry"].contains(t_key) || config_["telemetry"][t_key] != t_val) {
+ config_["telemetry"][t_key] = t_val;
+ if (!applied_diff.contains("telemetry")) {
+ applied_diff["telemetry"] = json::object();
+ }
+ applied_diff["telemetry"][t_key] = t_val;
+ }
+ }
+ }
} else {
if (!config_.contains(key) || config_[key] != value) {
config_[key] = value;
@@ -625,4 +854,73 @@ json RuntimeConfig::snapshot() const {
return config_;
}
+bool RuntimeConfig::get_bool_opt(const char* env_name, const std::vector& path, bool default_val) const {
+ if (env_name) {
+ if (const char* env = std::getenv(env_name)) {
+ std::string s(env);
+ return s == "1" || s == "true" || s == "True" || s == "TRUE" || s == "yes" || s == "YES";
+ }
+ }
+ std::shared_lock lock(mutex_);
+ const json* current = &config_;
+ for (const auto& key : path) {
+ if (!current->is_object() || !current->contains(key)) return default_val;
+ current = &((*current)[key]);
+ }
+ return current->is_boolean() ? current->get() : default_val;
+}
+
+int RuntimeConfig::get_int_opt(const char* env_name, const std::vector& path, int default_val) const {
+ if (env_name) {
+ if (const char* env = std::getenv(env_name)) {
+ try { return std::stoi(env); } catch (...) {}
+ }
+ }
+ std::shared_lock lock(mutex_);
+ const json* current = &config_;
+ for (const auto& key : path) {
+ if (!current->is_object() || !current->contains(key)) return default_val;
+ current = &((*current)[key]);
+ }
+ try {
+ if (current->is_number_integer()) return current->get();
+ if (current->is_string()) return std::stoi(current->get());
+ } catch (...) {}
+ return default_val;
+}
+
+double RuntimeConfig::get_double_opt(const char* env_name, const std::vector& path, double default_val) const {
+ if (env_name) {
+ if (const char* env = std::getenv(env_name)) {
+ try { return std::stod(env); } catch (...) {}
+ }
+ }
+ std::shared_lock lock(mutex_);
+ const json* current = &config_;
+ for (const auto& key : path) {
+ if (!current->is_object() || !current->contains(key)) return default_val;
+ current = &((*current)[key]);
+ }
+ try {
+ if (current->is_number()) return current->get();
+ if (current->is_string()) return std::stod(current->get());
+ } catch (...) {}
+ return default_val;
+}
+
+std::string RuntimeConfig::get_string_opt(const char* env_name, const std::vector& path, const std::string& default_val) const {
+ if (env_name) {
+ if (const char* env = std::getenv(env_name)) {
+ return std::string(env);
+ }
+ }
+ std::shared_lock lock(mutex_);
+ const json* current = &config_;
+ for (const auto& key : path) {
+ if (!current->is_object() || !current->contains(key)) return default_val;
+ current = &((*current)[key]);
+ }
+ return current->is_string() ? current->get() : default_val;
+}
+
} // namespace lemon
diff --git a/src/cpp/server/server.cpp b/src/cpp/server/server.cpp
index 10724bbfa..32cc45f1a 100644
--- a/src/cpp/server/server.cpp
+++ b/src/cpp/server/server.cpp
@@ -15,6 +15,7 @@
#include "lemon/logging_config.h"
#include "lemon/prometheus_metrics.h"
#include "lemon/runtime_config.h"
+#include "telemetry.h"
#include "lemon/system_info.h"
#include "lemon/version.h"
#include
@@ -572,24 +573,24 @@ void Server::setup_routes(httplib::Server &web_server) {
web_server.Post("/api/v1/" + endpoint, handler);
web_server.Post("/v0/" + endpoint, handler);
web_server.Post("/v1/" + endpoint, handler);
- // Also register as GET for HEAD request support (HEAD uses GET handler)
- // Return 405 Method Not Allowed (endpoint exists but wrong method)
- web_server.Get("/api/v0/" + endpoint, [](const httplib::Request&, httplib::Response& res) {
- res.status = 405;
- res.set_content("{\"error\": \"Method Not Allowed. Use POST for this endpoint\"}", "application/json");
- });
- web_server.Get("/api/v1/" + endpoint, [](const httplib::Request&, httplib::Response& res) {
- res.status = 405;
- res.set_content("{\"error\": \"Method Not Allowed. Use POST for this endpoint\"}", "application/json");
- });
- web_server.Get("/v0/" + endpoint, [](const httplib::Request&, httplib::Response& res) {
- res.status = 405;
- res.set_content("{\"error\": \"Method Not Allowed. Use POST for this endpoint\"}", "application/json");
- });
- web_server.Get("/v1/" + endpoint, [](const httplib::Request&, httplib::Response& res) {
- res.status = 405;
- res.set_content("{\"error\": \"Method Not Allowed. Use POST for this endpoint\"}", "application/json");
- });
+ if (endpoint != "params") {
+ web_server.Get("/api/v0/" + endpoint, [](const httplib::Request&, httplib::Response& res) {
+ res.status = 405;
+ res.set_content("{\"error\": \"Method Not Allowed. Use POST for this endpoint\"}", "application/json");
+ });
+ web_server.Get("/api/v1/" + endpoint, [](const httplib::Request&, httplib::Response& res) {
+ res.status = 405;
+ res.set_content("{\"error\": \"Method Not Allowed. Use POST for this endpoint\"}", "application/json");
+ });
+ web_server.Get("/v0/" + endpoint, [](const httplib::Request&, httplib::Response& res) {
+ res.status = 405;
+ res.set_content("{\"error\": \"Method Not Allowed. Use POST for this endpoint\"}", "application/json");
+ });
+ web_server.Get("/v1/" + endpoint, [](const httplib::Request&, httplib::Response& res) {
+ res.status = 405;
+ res.set_content("{\"error\": \"Method Not Allowed. Use POST for this endpoint\"}", "application/json");
+ });
+ }
};
// Health check
@@ -722,6 +723,10 @@ void Server::setup_routes(httplib::Server &web_server) {
handle_params(req, res);
});
+ register_get("params", [this](const httplib::Request& req, httplib::Response& res) {
+ handle_config_get(req, res);
+ });
+
// Backend management endpoints
register_post("install", [this](const httplib::Request& req, httplib::Response& res) {
handle_install(req, res);
@@ -757,6 +762,12 @@ void Server::setup_routes(httplib::Server &web_server) {
handle_shutdown(req, res);
});
+ web_server.Post("/internal/telemetry/flush", [](const httplib::Request& req, httplib::Response& res) {
+ lemon::telemetry::flush();
+ res.status = 200;
+ res.set_content(nlohmann::json{{"status", "flushed"}}.dump(), "application/json");
+ });
+
web_server.Post("/internal/pin", [this](const httplib::Request& req, httplib::Response& res) {
handle_pin(req, res);
});
@@ -1630,6 +1641,10 @@ void Server::stop() {
LOG(ERROR, "Server") << "Error during cleanup: " << e.what() << std::endl;
}
}
+
+ LOG(INFO, "Server") << "Shutting down telemetry queue..." << std::endl;
+ lemon::telemetry::shutdown();
+
LOG(INFO, "Server") << "Cleanup complete" << std::endl;
}
@@ -1846,6 +1861,27 @@ void Server::handle_health(const httplib::Request& req, httplib::Response& res)
// Add version information
response["version"] = LEMON_VERSION_STRING;
+ // Add telemetry state using in-memory runtime configuration
+ if (config_) {
+ nlohmann::json telemetry_info = {
+ {"enabled", config_->telemetry_enabled()}
+ };
+ if (config_->telemetry_enabled()) {
+ std::vector captures;
+ if (!config_->telemetry_hide_inputs()) {
+ captures.push_back("inputs");
+ }
+ if (!config_->telemetry_hide_outputs()) {
+ captures.push_back("outputs");
+ }
+ if (!config_->telemetry_hide_thinking()) {
+ captures.push_back("thinking");
+ }
+ telemetry_info["captures"] = captures;
+ }
+ response["telemetry"] = telemetry_info;
+ }
+
// Add model loaded information like Python implementation
std::string loaded_model = router_->get_loaded_model();
@@ -2121,11 +2157,19 @@ void Server::handle_chat_completions(const httplib::Request& req, httplib::Respo
}
}
+ std::string requested_model;
+ if (request_json.contains("model") && request_json["model"].is_string()) {
+ requested_model = request_json["model"].get();
+ }
+ auto span = telemetry::TelemetryTracker::start_span("LLM", "chat.completions", requested_model, request_json);
+
// Handle model loading/switching
if (request_json.contains("model")) {
- std::string requested_model = request_json["model"];
try {
auto_load_model_if_needed(requested_model);
+ if (span) {
+ span->cancel();
+ }
} catch (const std::exception& e) {
LOG(ERROR, "Server") << "Failed to load model: " << e.what() << std::endl;
auto error_response = create_model_error(requested_model, e.what());
@@ -2133,12 +2177,19 @@ void Server::handle_chat_completions(const httplib::Request& req, httplib::Respo
std::string error_code = error_response["error"]["code"].get();
res.status = get_http_status_from_error(error_code);
res.set_content(error_response.dump(), "application/json");
+
+ if (span) {
+ span->end_with_error(e.what());
+ }
return;
}
} else if (!router_->is_model_loaded()) {
LOG(ERROR, "Server") << "No model loaded and no model specified in request" << std::endl;
res.status = 400;
res.set_content("{\"error\": \"No model loaded and no model specified in request\"}", "application/json");
+ if (span) {
+ span->cancel();
+ }
return;
}
@@ -2148,9 +2199,16 @@ void Server::handle_chat_completions(const httplib::Request& req, httplib::Respo
LOG(ERROR, "Server") << "Model does not support chat completion" << std::endl;
res.status = 400;
res.set_content(R"({"error": {"message": "This model does not support chat completion. Only LLM models support this endpoint.", "type": "invalid_request_error"}})", "application/json");
+ if (span) {
+ span->cancel();
+ }
return;
}
+ if (span) {
+ span->cancel();
+ }
+
// Check if streaming is requested
bool is_streaming = request_json.contains("stream") && request_json["stream"].get();
@@ -2191,9 +2249,7 @@ void Server::handle_chat_completions(const httplib::Request& req, httplib::Respo
return false; // We're done after the first call
}
- // Use unified Router path for streaming
router_->chat_completion_stream(request_body, sink);
-
return false;
}
);
@@ -2309,10 +2365,13 @@ void Server::handle_chat_completions(const httplib::Request& req, httplib::Respo
router_->update_prompt_tokens(request_json.value("model", ""), prompt_tokens);
}
}
+
+
}
} catch (const std::exception& e) {
LOG(ERROR, "Server") << "Chat completion failed: " << e.what() << std::endl;
+
res.status = 500;
nlohmann::json error = {{"error", e.what()}};
res.set_content(error.dump(), "application/json");
@@ -2327,11 +2386,19 @@ void Server::handle_completions(const httplib::Request& req, httplib::Response&
// Must be done before any model_manager/router lookups and before forwarding
normalize_client_model_name(request_json);
+ std::string requested_model;
+ if (request_json.contains("model") && request_json["model"].is_string()) {
+ requested_model = request_json["model"].get();
+ }
+ auto span = telemetry::TelemetryTracker::start_span("LLM", "completions", requested_model, request_json);
+
// Handle model loading/switching (same logic as chat_completions)
if (request_json.contains("model")) {
- std::string requested_model = request_json["model"];
try {
auto_load_model_if_needed(requested_model);
+ if (span) {
+ span->cancel();
+ }
} catch (const std::exception& e) {
LOG(ERROR, "Server") << "Failed to load model: " << e.what() << std::endl;
auto error_response = create_model_error(requested_model, e.what());
@@ -2339,12 +2406,19 @@ void Server::handle_completions(const httplib::Request& req, httplib::Response&
std::string error_code = error_response["error"]["code"].get();
res.status = get_http_status_from_error(error_code);
res.set_content(error_response.dump(), "application/json");
+
+ if (span) {
+ span->end_with_error(e.what());
+ }
return;
}
} else if (!router_->is_model_loaded()) {
LOG(ERROR, "Server") << "No model loaded and no model specified in request" << std::endl;
res.status = 400;
res.set_content("{\"error\": \"No model loaded and no model specified in request\"}", "application/json");
+ if (span) {
+ span->cancel();
+ }
return;
}
@@ -2354,9 +2428,16 @@ void Server::handle_completions(const httplib::Request& req, httplib::Response&
LOG(ERROR, "Server") << "Model does not support completion" << std::endl;
res.status = 400;
res.set_content(R"({"error": {"message": "This model does not support completion. Only LLM models support this endpoint.", "type": "invalid_request_error"}})", "application/json");
+ if (span) {
+ span->cancel();
+ }
return;
}
+ if (span) {
+ span->cancel();
+ }
+
// Check if streaming is requested
bool is_streaming = request_json.contains("stream") && request_json["stream"].get();
@@ -2507,26 +2588,45 @@ void Server::handle_embeddings(const httplib::Request& req, httplib::Response& r
try {
auto request_json = nlohmann::json::parse(req.body);
+ std::string requested_model;
+ if (request_json.contains("model") && request_json["model"].is_string()) {
+ requested_model = request_json["model"].get();
+ }
+ auto span = telemetry::TelemetryTracker::start_span("EMBEDDING", "embeddings", requested_model, request_json);
+
// Handle model loading/switching using helper function
if (request_json.contains("model")) {
- std::string requested_model = request_json["model"];
try {
auto_load_model_if_needed(requested_model);
+ if (span) {
+ span->cancel();
+ }
} catch (const std::exception& e) {
LOG(ERROR, "Server") << "Failed to load model: " << e.what() << std::endl;
auto error_response = create_model_error(requested_model, e.what());
std::string error_code = error_response["error"]["code"].get();
res.status = get_http_status_from_error(error_code);
res.set_content(error_response.dump(), "application/json");
+
+ if (span) {
+ span->end_with_error(e.what());
+ }
return;
}
} else if (!router_->is_model_loaded()) {
LOG(ERROR, "Server") << "No model loaded and no model specified in request" << std::endl;
res.status = 400;
res.set_content("{\"error\": \"No model loaded and no model specified in request\"}", "application/json");
+ if (span) {
+ span->cancel();
+ }
return;
}
+ if (span) {
+ span->cancel();
+ }
+
// Call router's embeddings method
auto response = router_->embeddings(request_json);
res.set_content(response.dump(), "application/json");
@@ -2543,26 +2643,45 @@ void Server::handle_reranking(const httplib::Request& req, httplib::Response& re
try {
auto request_json = nlohmann::json::parse(req.body);
+ std::string requested_model;
+ if (request_json.contains("model") && request_json["model"].is_string()) {
+ requested_model = request_json["model"].get();
+ }
+ auto span = telemetry::TelemetryTracker::start_span("RERANKER", "reranking", requested_model, request_json);
+
// Handle model loading/switching using helper function
if (request_json.contains("model")) {
- std::string requested_model = request_json["model"];
try {
auto_load_model_if_needed(requested_model);
+ if (span) {
+ span->cancel();
+ }
} catch (const std::exception& e) {
LOG(ERROR, "Server") << "Failed to load model: " << e.what() << std::endl;
auto error_response = create_model_error(requested_model, e.what());
std::string error_code = error_response["error"]["code"].get();
res.status = get_http_status_from_error(error_code);
res.set_content(error_response.dump(), "application/json");
+
+ if (span) {
+ span->end_with_error(e.what());
+ }
return;
}
} else if (!router_->is_model_loaded()) {
LOG(ERROR, "Server") << "No model loaded and no model specified in request" << std::endl;
res.status = 400;
res.set_content("{\"error\": \"No model loaded and no model specified in request\"}", "application/json");
+ if (span) {
+ span->cancel();
+ }
return;
}
+ if (span) {
+ span->cancel();
+ }
+
// Call router's reranking method
auto response = router_->reranking(request_json);
res.set_content(response.dump(), "application/json");
@@ -4770,6 +4889,16 @@ void Server::apply_config_side_effects(const json& applied_changes) {
LOG(INFO, "Server") << "Models dir changed to: " << dir << std::endl;
utils::set_models_dir(dir);
model_manager_->invalidate_models_cache();
+ } else if (key == "telemetry") {
+ if (value.is_object()) {
+ if (value.contains("enabled")) {
+ bool enabled = config_->telemetry_enabled();
+ LOG(INFO, "Server") << "Telemetry " << (enabled ? "enabled" : "disabled") << std::endl;
+ }
+ if (value.contains("otlp") && value["otlp"].is_object() && value["otlp"].contains("endpoint")) {
+ LOG(INFO, "Server") << "Telemetry endpoint changed to: " << config_->telemetry_otlp_endpoint() << std::endl;
+ }
+ }
} else if (value.is_object()) {
// Nested backend section change (llamacpp / whispercpp / sdcpp / ryzenai / kokoro).
// Recipe defaults (e.g. default_backend) are derived from these settings, so
diff --git a/src/cpp/server/streaming_proxy.cpp b/src/cpp/server/streaming_proxy.cpp
index abb22fc02..3af675602 100644
--- a/src/cpp/server/streaming_proxy.cpp
+++ b/src/cpp/server/streaming_proxy.cpp
@@ -17,23 +17,69 @@ void StreamingProxy::forward_sse_stream(
long timeout_seconds,
std::function on_chunk) {
- std::string telemetry_buffer;
+ TelemetryData telemetry;
+ std::string line_buffer;
bool stream_error = false;
bool has_done_marker = false;
bool has_first_token = false;
double time_to_first_token = 0.0;
const auto start_time = std::chrono::steady_clock::now();
+ auto process_line = [&telemetry](const std::string& line) {
+ std::string json_str;
+ if (line.find("data: ") == 0) {
+ json_str = line.substr(6);
+ } else if (line.find("ChatCompletionChunk: ") == 0) {
+ json_str = line.substr(21);
+ }
+ if (!json_str.empty() && json_str != "[DONE]") {
+ try {
+ auto chunk = json::parse(json_str);
+ if (chunk.contains("usage")) {
+ auto usage = chunk["usage"];
+ if (usage.contains("prompt_tokens")) {
+ telemetry.input_tokens = usage["prompt_tokens"].get();
+ }
+ if (usage.contains("completion_tokens")) {
+ telemetry.output_tokens = usage["completion_tokens"].get();
+ }
+ if (usage.contains("prefill_duration_ttft")) {
+ telemetry.time_to_first_token = usage["prefill_duration_ttft"].get();
+ }
+ if (usage.contains("decoding_speed_tps")) {
+ telemetry.tokens_per_second = usage["decoding_speed_tps"].get();
+ }
+ }
+ if (chunk.contains("timings")) {
+ auto timings = chunk["timings"];
+ if (timings.contains("prompt_n")) {
+ telemetry.input_tokens = timings["prompt_n"].get();
+ }
+ if (timings.contains("predicted_n")) {
+ telemetry.output_tokens = timings["predicted_n"].get();
+ }
+ if (timings.contains("prompt_ms")) {
+ telemetry.time_to_first_token = timings["prompt_ms"].get() / 1000.0;
+ }
+ if (timings.contains("predicted_per_second")) {
+ telemetry.tokens_per_second = timings["predicted_per_second"].get();
+ }
+ }
+ } catch (...) {}
+ }
+ };
+
auto result = utils::HttpClient::post_stream(
backend_url,
request_body,
- [&sink, &telemetry_buffer, &has_done_marker, &has_first_token,
- &time_to_first_token, &start_time, &on_chunk](const char* data, size_t length) {
+ [&sink, &line_buffer, &has_done_marker, &has_first_token,
+ &time_to_first_token, &start_time, &on_chunk, &process_line](const char* data, size_t length) {
if (on_chunk) {
on_chunk();
}
- telemetry_buffer.append(data, length);
+ line_buffer.append(data, length);
+ process_sse_lines(line_buffer, process_line);
std::string chunk(data, length);
if (!has_first_token && chunk.find("data: ") != std::string::npos) {
@@ -62,6 +108,7 @@ void StreamingProxy::forward_sse_stream(
if (result.status_code != 200) {
stream_error = true;
LOG(ERROR, "StreamingProxy") << "Backend returned error: " << result.status_code << std::endl;
+ telemetry.error_message = "Backend returned error status code: " + std::to_string(result.status_code);
}
if (transport_interrupted && !has_done_marker) {
@@ -89,10 +136,24 @@ void StreamingProxy::forward_sse_stream(
LOG(INFO, "Server") << "Streaming completed - 200 OK" << std::endl;
- auto telemetry = parse_telemetry(telemetry_buffer);
+ if (!line_buffer.empty()) {
+ if (line_buffer.back() == '\r') {
+ line_buffer.pop_back();
+ }
+ process_line(line_buffer);
+ }
+
if (telemetry.time_to_first_token <= 0.0) {
telemetry.time_to_first_token = time_to_first_token;
}
+ if (telemetry.tokens_per_second <= 0.0 && telemetry.output_tokens > 0) {
+ double total_duration = std::chrono::duration(
+ std::chrono::steady_clock::now() - start_time).count();
+ double decode_duration = total_duration - telemetry.time_to_first_token;
+ if (decode_duration > 0.0) {
+ telemetry.tokens_per_second = telemetry.output_tokens / decode_duration;
+ }
+ }
telemetry.print();
if (on_complete) {
@@ -100,6 +161,9 @@ void StreamingProxy::forward_sse_stream(
}
} else {
sink.done();
+ if (on_complete) {
+ on_complete(telemetry);
+ }
}
}
@@ -233,4 +297,16 @@ StreamingProxy::TelemetryData StreamingProxy::parse_telemetry(const std::string&
return telemetry;
}
+void StreamingProxy::process_sse_lines(std::string& line_buffer, std::function line_callback) {
+ size_t pos;
+ while ((pos = line_buffer.find('\n')) != std::string::npos) {
+ std::string line = line_buffer.substr(0, pos);
+ line_buffer.erase(0, pos + 1);
+ if (!line.empty() && line.back() == '\r') {
+ line.pop_back();
+ }
+ line_callback(line);
+ }
+}
+
} // namespace lemon
diff --git a/src/cpp/server/telemetry.cpp b/src/cpp/server/telemetry.cpp
new file mode 100644
index 000000000..570c5af69
--- /dev/null
+++ b/src/cpp/server/telemetry.cpp
@@ -0,0 +1,1215 @@
+#include "telemetry.h"
+#include "lemon/runtime_config.h"
+#include "lemon/utils/aixlog.hpp"
+#include "lemon/utils/http_client.h"
+#include "lemon/version.h"
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+
+namespace lemon::telemetry {
+
+namespace {
+
+class ProtoWriter {
+public:
+ std::string buf;
+
+ void write_varint(uint64_t val) {
+ while (val >= 0x80) {
+ buf.push_back(static_cast((val & 0x7F) | 0x80));
+ val >>= 7;
+ }
+ buf.push_back(static_cast(val & 0x7F));
+ }
+
+ void write_key(uint32_t field_num, uint32_t wire_type) {
+ write_varint((field_num << 3) | wire_type);
+ }
+
+ void write_int64(uint32_t field_num, int64_t val) {
+ write_key(field_num, 0);
+ write_varint(static_cast(val));
+ }
+
+ void write_uint32(uint32_t field_num, uint32_t val) {
+ write_key(field_num, 0);
+ write_varint(val);
+ }
+
+ void write_bool(uint32_t field_num, bool val) {
+ write_key(field_num, 0);
+ buf.push_back(val ? 1 : 0);
+ }
+
+ void write_fixed64(uint32_t field_num, uint64_t val) {
+ write_key(field_num, 1);
+ for (int i = 0; i < 8; ++i) {
+ buf.push_back(static_cast((val >> (i * 8)) & 0xFF));
+ }
+ }
+
+ void write_string(uint32_t field_num, const std::string& val) {
+ write_key(field_num, 2);
+ write_varint(val.size());
+ buf.append(val);
+ }
+
+ void write_bytes(uint32_t field_num, const std::string& val) {
+ write_key(field_num, 2);
+ write_varint(val.size());
+ buf.append(val);
+ }
+
+ void write_message(uint32_t field_num, const ProtoWriter& sub) {
+ write_key(field_num, 2);
+ write_varint(sub.buf.size());
+ buf.append(sub.buf);
+ }
+};
+
+} // namespace
+
+inline uint8_t hex_char_to_val(char c) {
+ if (c >= '0' && c <= '9') return c - '0';
+ if (c >= 'a' && c <= 'f') return c - 'a' + 10;
+ if (c >= 'A' && c <= 'F') return c - 'A' + 10;
+ return 0;
+}
+
+std::string hex_to_bytes(const std::string& hex) {
+ if (hex.length() % 2 != 0) {
+ throw std::invalid_argument("hex string must have an even length");
+ }
+ std::string bytes;
+ bytes.reserve(hex.length() / 2);
+ for (size_t i = 0; i < hex.length(); i += 2) {
+ uint8_t high = hex_char_to_val(hex[i]);
+ uint8_t low = hex_char_to_val(hex[i + 1]);
+ bytes.push_back(static_cast((high << 4) | low));
+ }
+ return bytes;
+}
+
+std::string strip_thinking(const std::string& text) {
+ std::string result = text;
+ std::vector> tag_pairs = {
+ {"<|think|>", "|think|>"},
+ {"<|think|>", ""},
+ {"<|think|>", "<|turn>"},
+ {"", ""},
+ {"", ""}
+ };
+ for (const auto& [start_tag, end_tag] : tag_pairs) {
+ size_t start_pos = 0;
+ while ((start_pos = result.find(start_tag, start_pos)) != std::string::npos) {
+ size_t end_pos = result.find(end_tag, start_pos + start_tag.length());
+ if (end_pos != std::string::npos) {
+ size_t erase_len = end_pos - start_pos;
+ if (end_tag == "|think|>" || end_tag == "" || end_tag == "") {
+ erase_len += end_tag.length();
+ }
+ result.erase(start_pos, erase_len);
+ } else {
+ result.erase(start_pos);
+ break;
+ }
+ }
+ }
+ size_t first = result.find_first_not_of(" \t\r\n");
+ if (first == std::string::npos) {
+ return "";
+ }
+ size_t last = result.find_last_not_of(" \t\r\n");
+ return result.substr(first, last - first + 1);
+}
+
+std::string standardize_thinking(const std::string& text) {
+ std::string result = text;
+ std::vector start_tags = {"<|think|>", ""};
+ for (const auto& tag : start_tags) {
+ size_t pos = 0;
+ while ((pos = result.find(tag, pos)) != std::string::npos) {
+ result.replace(pos, tag.length(), "");
+ pos += 7;
+ }
+ }
+
+ std::vector end_tags = {"|think|>", "", ""};
+ for (const auto& tag : end_tags) {
+ size_t pos = 0;
+ while ((pos = result.find(tag, pos)) != std::string::npos) {
+ result.replace(pos, tag.length(), "");
+ pos += 8;
+ }
+ }
+
+ std::vector transition_tags = {"", "<|turn>"};
+ for (const auto& transition_tag : transition_tags) {
+ size_t search_pos = 0;
+ while (true) {
+ size_t think_pos = result.find("", search_pos);
+ if (think_pos == std::string::npos) {
+ break;
+ }
+ size_t next_close = result.find("", think_pos + 7);
+ size_t transition_pos = result.find(transition_tag, think_pos + 7);
+
+ if (transition_pos != std::string::npos && (next_close == std::string::npos || next_close > transition_pos)) {
+ result.insert(transition_pos, "\n");
+ search_pos = transition_pos + 9 + transition_tag.length();
+ } else {
+ search_pos = (next_close != std::string::npos) ? next_close + 8 : std::string::npos;
+ if (search_pos == std::string::npos) {
+ break;
+ }
+ }
+ }
+ }
+
+ size_t search_start = 0;
+ while (true) {
+ size_t first_think = result.find("", search_start);
+ if (first_think == std::string::npos) {
+ break;
+ }
+ size_t next_pos = first_think + 7;
+ while (next_pos < result.size() && (result[next_pos] == ' ' || result[next_pos] == '\t' || result[next_pos] == '\r' || result[next_pos] == '\n')) {
+ next_pos++;
+ }
+ if (next_pos + 7 <= result.size() && result.substr(next_pos, 7) == "") {
+ result.erase(first_think + 7, (next_pos + 7) - (first_think + 7));
+ search_start = first_think;
+ } else {
+ search_start = first_think + 7;
+ }
+ }
+
+ size_t search_end = 0;
+ while (true) {
+ size_t first_close = result.find("", search_end);
+ if (first_close == std::string::npos) {
+ break;
+ }
+ size_t next_pos = first_close + 8;
+ while (next_pos < result.size() && (result[next_pos] == ' ' || result[next_pos] == '\t' || result[next_pos] == '\r' || result[next_pos] == '\n')) {
+ next_pos++;
+ }
+ if (next_pos + 8 <= result.size() && result.substr(next_pos, 8) == "") {
+ result.erase(first_close + 8, (next_pos + 8) - (first_close + 8));
+ search_end = first_close;
+ } else {
+ search_end = first_close + 8;
+ }
+ }
+
+ return result;
+}
+
+static std::string serialize_json_batch(const std::vector& spans) {
+ nlohmann::json otlp_payload = {
+ {"resourceSpans", nlohmann::json::array({
+ {
+ {"resource", {
+ {"attributes", nlohmann::json::array({
+ {{"key", "service.name"}, {"value", {{"stringValue", "lemonade-server"}}}},
+ {{"key", "service.version"}, {"value", {{"stringValue", LEMON_VERSION_STRING}}}}
+ })}
+ }},
+ {"scopeSpans", nlohmann::json::array({
+ {
+ {"scope", {{"name", "lemonade-server"}, {"version", LEMON_VERSION_STRING}}},
+ {"spans", nlohmann::json::array()}
+ }
+ })}
+ }
+ })}
+ };
+
+ auto& spans_arr = otlp_payload["resourceSpans"][0]["scopeSpans"][0]["spans"];
+ for (const auto& span : spans) {
+ spans_arr.push_back(span);
+ }
+ return otlp_payload.dump();
+}
+
+static std::string serialize_protobuf_batch(const std::vector& spans) {
+ ProtoWriter scope_spans_msg;
+
+ ProtoWriter scope_msg;
+ scope_msg.write_string(1, "lemonade-server");
+ scope_msg.write_string(2, LEMON_VERSION_STRING);
+ scope_spans_msg.write_message(1, scope_msg);
+
+ for (const auto& span_details : spans) {
+ ProtoWriter span_msg;
+
+ std::string trace_id_hex = span_details["traceId"].get();
+ span_msg.write_bytes(1, hex_to_bytes(trace_id_hex));
+
+ std::string span_id_hex = span_details["spanId"].get();
+ span_msg.write_bytes(2, hex_to_bytes(span_id_hex));
+
+ span_msg.write_string(5, span_details["name"].get());
+
+ span_msg.write_uint32(6, span_details["kind"].get());
+
+ uint64_t start_nano = std::stoull(span_details["startTimeUnixNano"].get());
+ span_msg.write_fixed64(7, start_nano);
+
+ uint64_t end_nano = std::stoull(span_details["endTimeUnixNano"].get());
+ span_msg.write_fixed64(8, end_nano);
+
+ if (span_details.contains("attributes") && span_details["attributes"].is_array()) {
+ for (const auto& attr : span_details["attributes"]) {
+ std::string key = attr["key"].get();
+ auto val_obj = attr["value"];
+ ProtoWriter kv;
+ kv.write_string(1, key);
+
+ ProtoWriter any_val;
+ if (val_obj.contains("stringValue")) {
+ any_val.write_string(1, val_obj["stringValue"].get());
+ } else if (val_obj.contains("intValue")) {
+ any_val.write_int64(3, val_obj["intValue"].get());
+ } else if (val_obj.contains("boolValue")) {
+ any_val.write_bool(2, val_obj["boolValue"].get());
+ } else if (val_obj.contains("doubleValue")) {
+ union { double d; uint64_t u; } u_val;
+ u_val.d = val_obj["doubleValue"].get();
+ any_val.write_fixed64(4, u_val.u);
+ }
+ kv.write_message(2, any_val);
+ span_msg.write_message(9, kv);
+ }
+ }
+
+ if (span_details.contains("status")) {
+ auto status_json = span_details["status"];
+ ProtoWriter status_msg;
+ if (status_json.contains("message")) {
+ status_msg.write_string(2, status_json["message"].get());
+ }
+ status_msg.write_uint32(3, status_json["code"].get());
+ span_msg.write_message(15, status_msg);
+ }
+
+ scope_spans_msg.write_message(2, span_msg);
+ }
+
+ ProtoWriter res_attr1;
+ {
+ ProtoWriter any_val;
+ any_val.write_string(1, "lemonade-server");
+ res_attr1.write_string(1, "service.name");
+ res_attr1.write_message(2, any_val);
+ }
+ ProtoWriter res_attr2;
+ {
+ ProtoWriter any_val;
+ any_val.write_string(1, LEMON_VERSION_STRING);
+ res_attr2.write_string(1, "service.version");
+ res_attr2.write_message(2, any_val);
+ }
+
+ ProtoWriter resource_msg;
+ resource_msg.write_message(1, res_attr1);
+ resource_msg.write_message(1, res_attr2);
+
+ ProtoWriter resource_spans_msg;
+ resource_spans_msg.write_message(1, resource_msg);
+ resource_spans_msg.write_message(2, scope_spans_msg);
+
+ ProtoWriter request_msg;
+ request_msg.write_message(1, resource_spans_msg);
+
+ return request_msg.buf;
+}
+
+class MetricsWorker {
+public:
+ MetricsWorker() : shutdown_(false), processing_(false) {
+ worker_thread_ = std::thread(&MetricsWorker::run, this);
+ }
+
+ ~MetricsWorker() {
+ stop();
+ }
+
+ void stop() {
+ {
+ std::lock_guard lock(mutex_);
+ if (shutdown_) return;
+ shutdown_ = true;
+ }
+ cv_.notify_all();
+ cv_drain_.notify_all();
+ if (worker_thread_.joinable()) {
+ worker_thread_.join();
+ }
+ }
+
+ bool enqueue(std::shared_ptr span,
+ const std::string& url,
+ std::function(const std::string&)> parser,
+ const nlohmann::json& usage_payload,
+ const std::string& text_output) {
+ std::lock_guard lock(mutex_);
+ if (queue_.size() >= 100) {
+ return false;
+ }
+ queue_.push({span, url, parser, usage_payload, text_output});
+ cv_.notify_one();
+ return true;
+ }
+
+ void drain() {
+ std::unique_lock lock(mutex_);
+ cv_drain_.wait(lock, [this]() { return queue_.empty() && !processing_; });
+ }
+
+private:
+ struct Task {
+ std::shared_ptr span;
+ std::string metrics_url;
+ std::function(const std::string&)> parser;
+ nlohmann::json usage_payload;
+ std::string text_output;
+ };
+
+ std::queue queue_;
+ std::mutex mutex_;
+ std::condition_variable cv_;
+ std::condition_variable cv_drain_;
+ bool shutdown_ = false;
+ bool processing_ = false;
+ std::thread worker_thread_;
+
+ void run() {
+ while (true) {
+ Task task;
+ {
+ std::unique_lock lock(mutex_);
+ cv_.wait(lock, [this]() { return shutdown_ || !queue_.empty(); });
+ if (shutdown_ && queue_.empty()) {
+ break;
+ }
+ task = std::move(queue_.front());
+ queue_.pop();
+ processing_ = true;
+ }
+
+ if (!task.metrics_url.empty() && task.parser) {
+ try {
+ auto response = utils::HttpClient::get(task.metrics_url, {}, 1);
+ if (response.status_code == 200) {
+ auto extra = task.parser(response.body);
+ for (const auto& [k, v] : extra) {
+ task.span->set_attribute(k, v);
+ }
+ }
+ } catch (const std::exception& e) {
+ LOG(DEBUG, "Telemetry") << "Failed to fetch metrics in background: " << e.what() << std::endl;
+ } catch (...) {
+ LOG(DEBUG, "Telemetry") << "Failed to fetch metrics in background: unknown error" << std::endl;
+ }
+ }
+ task.span->end_with_success(task.usage_payload, task.text_output);
+
+ {
+ std::lock_guard lock(mutex_);
+ processing_ = false;
+ if (queue_.empty()) {
+ cv_drain_.notify_all();
+ }
+ }
+ }
+ }
+};
+
+static MetricsWorker g_metrics_worker;
+
+class TelemetryQueue {
+private:
+ struct Task {
+ nlohmann::json span_details;
+ std::string endpoint;
+ std::map headers;
+ std::string protocol;
+ std::chrono::steady_clock::time_point arrival_time;
+ };
+
+ static constexpr size_t MAX_CAPACITY = 1000;
+ std::deque queue_;
+ std::mutex mutex_;
+ std::condition_variable cv_;
+ std::thread worker_;
+ bool shutdown_ = false;
+ size_t dropped_spans_count_ = 0;
+ bool endpoint_unreachable_ = false;
+ std::string last_endpoint_;
+ bool last_enabled_ = false;
+ bool flush_requested_ = false;
+ std::condition_variable cv_flush_;
+
+ void worker_loop() {
+ while (true) {
+ std::vector batch_spans;
+ std::string batch_endpoint;
+ std::map batch_headers;
+ std::string batch_protocol;
+
+ {
+ std::unique_lock lock(mutex_);
+
+ while (true) {
+ if (shutdown_ && queue_.empty()) {
+ if (flush_requested_) {
+ flush_requested_ = false;
+ cv_flush_.notify_all();
+ }
+ return;
+ }
+ if (shutdown_ && !queue_.empty()) {
+ const auto& oldest_task = queue_.front();
+ batch_endpoint = oldest_task.endpoint;
+ batch_headers = oldest_task.headers;
+ batch_protocol = oldest_task.protocol;
+
+ auto it = queue_.begin();
+ while (it != queue_.end()) {
+ if (it->endpoint == batch_endpoint &&
+ it->headers == batch_headers &&
+ it->protocol == batch_protocol) {
+ batch_spans.push_back(std::move(it->span_details));
+ it = queue_.erase(it);
+ } else {
+ ++it;
+ }
+ }
+ break;
+ }
+ if (flush_requested_) {
+ if (queue_.empty()) {
+ flush_requested_ = false;
+ cv_flush_.notify_all();
+ break;
+ }
+ const auto& oldest_task = queue_.front();
+ batch_endpoint = oldest_task.endpoint;
+ batch_headers = oldest_task.headers;
+ batch_protocol = oldest_task.protocol;
+
+ int batch_size = 100;
+ if (auto* config = RuntimeConfig::global()) {
+ batch_size = config->telemetry_otlp_send_batch_size();
+ }
+
+ auto it = queue_.begin();
+ while (it != queue_.end() && static_cast(batch_spans.size()) < batch_size) {
+ if (it->endpoint == batch_endpoint &&
+ it->headers == batch_headers &&
+ it->protocol == batch_protocol) {
+ batch_spans.push_back(std::move(it->span_details));
+ it = queue_.erase(it);
+ } else {
+ ++it;
+ }
+ }
+ LOG(INFO, "Telemetry") << "Flush requested. Exporting batch of "
+ << batch_spans.size() << " spans..." << std::endl;
+ break;
+ }
+ if (queue_.empty()) {
+ cv_.wait(lock);
+ continue;
+ }
+
+ const auto& oldest_task = queue_.front();
+ std::string target_endpoint = oldest_task.endpoint;
+ std::map target_headers = oldest_task.headers;
+ std::string target_protocol = oldest_task.protocol;
+ auto oldest_arrival = oldest_task.arrival_time;
+
+ int batch_size = 100;
+ double timeout_s = 1.0;
+ if (auto* config = RuntimeConfig::global()) {
+ batch_size = config->telemetry_otlp_send_batch_size();
+ timeout_s = config->telemetry_otlp_batch_timeout_s();
+ }
+
+ int matching_count = 0;
+ for (const auto& task : queue_) {
+ if (task.endpoint == target_endpoint &&
+ task.headers == target_headers &&
+ task.protocol == target_protocol) {
+ matching_count++;
+ }
+ }
+
+ auto now = std::chrono::steady_clock::now();
+ double elapsed_s = std::chrono::duration(now - oldest_arrival).count();
+
+ if (matching_count >= batch_size || elapsed_s >= timeout_s) {
+ batch_endpoint = target_endpoint;
+ batch_headers = target_headers;
+ batch_protocol = target_protocol;
+
+ auto it = queue_.begin();
+ while (it != queue_.end() && static_cast(batch_spans.size()) < batch_size) {
+ if (it->endpoint == target_endpoint &&
+ it->headers == target_headers &&
+ it->protocol == target_protocol) {
+ batch_spans.push_back(std::move(it->span_details));
+ it = queue_.erase(it);
+ } else {
+ ++it;
+ }
+ }
+
+ LOG(INFO, "Telemetry") << "Batch target size reached or timeout elapsed. Exporting batch of "
+ << batch_spans.size() << " spans..." << std::endl;
+ break;
+ } else {
+ double remaining_s = timeout_s - elapsed_s;
+ if (remaining_s < 0) remaining_s = 0;
+ cv_.wait_for(lock, std::chrono::duration(remaining_s));
+ }
+ }
+ }
+
+ if (batch_spans.empty()) {
+ continue;
+ }
+
+ std::string payload;
+ if (batch_protocol == "http/json") {
+ batch_headers["Content-Type"] = "application/json";
+ payload = serialize_json_batch(batch_spans);
+ } else {
+ batch_headers["Content-Type"] = "application/x-protobuf";
+ payload = serialize_protobuf_batch(batch_spans);
+ }
+
+ int max_retries = 0;
+ if (auto* config = RuntimeConfig::global()) {
+ max_retries = config->telemetry_otlp_max_retries();
+ }
+
+ bool bypass_retries = false;
+ {
+ std::unique_lock lock(mutex_);
+ bypass_retries = endpoint_unreachable_;
+ }
+ int retries = 0;
+ while (true) {
+ bool success = false;
+ bool retryable = true;
+ std::string error_detail;
+ try {
+ auto response = utils::HttpClient::post(batch_endpoint, payload, batch_headers, 3);
+ if (response.status_code >= 200 && response.status_code < 300) {
+ success = true;
+ LOG(INFO, "Telemetry") << "Successfully sent telemetry batch." << std::endl;
+ {
+ std::unique_lock lock(mutex_);
+ endpoint_unreachable_ = false;
+ }
+ } else {
+ error_detail = "Status: " + std::to_string(response.status_code) + ", Response: " + response.body;
+ if (response.status_code >= 400 && response.status_code < 500 && response.status_code != 429) {
+ retryable = false;
+ }
+ }
+ } catch (const std::exception& e) {
+ error_detail = e.what();
+ } catch (...) {
+ error_detail = "Unknown exception";
+ }
+
+ if (success) {
+ break;
+ }
+
+ LOG(ERROR, "Telemetry") << "Failed to send telemetry batch. Telemetry receiver may be down or unreachable." << std::endl;
+ LOG(DEBUG, "Telemetry") << "Telemetry batch failure details: " << error_detail << std::endl;
+
+ if (!retryable) {
+ LOG(WARNING, "Telemetry") << "Telemetry batch dropped immediately due to non-retryable HTTP error." << std::endl;
+ break;
+ }
+
+ if (!bypass_retries && retries < max_retries) {
+ retries++;
+ double backoff_base = 5.0;
+ if (auto* config = RuntimeConfig::global()) {
+ backoff_base = config->telemetry_otlp_retry_backoff_base_s();
+ }
+ int shift = (std::min)(retries - 1, 10);
+ double delay = (std::min)(backoff_base * (1 << shift), 60.0);
+
+ thread_local std::mt19937 gen(std::random_device{}());
+ std::uniform_real_distribution dist(0.5, 1.5);
+ double delay_with_jitter = delay * dist(gen);
+
+ LOG(INFO, "Telemetry") << "Retrying batch in " << delay_with_jitter << " seconds (with jitter, attempt " << retries << " of " << max_retries << ")..." << std::endl;
+
+ bool local_shutdown = false;
+ bool local_flush_requested = false;
+ {
+ std::unique_lock lock(mutex_);
+ cv_.wait_for(lock, std::chrono::duration(delay_with_jitter), [this]() { return shutdown_ || flush_requested_; });
+ local_shutdown = shutdown_;
+ local_flush_requested = flush_requested_;
+ }
+ if (local_shutdown) {
+ LOG(INFO, "Telemetry") << "Shutdown requested during retry sleep. Aborting." << std::endl;
+ return;
+ }
+ if (local_flush_requested) {
+ LOG(INFO, "Telemetry") << "Flush requested during retry sleep. Aborting retries for this batch." << std::endl;
+ break;
+ }
+ } else {
+ {
+ std::unique_lock lock(mutex_);
+ endpoint_unreachable_ = true;
+ }
+ if (max_retries > 0) {
+ LOG(WARNING, "Telemetry") << "Max retries reached (" << max_retries << ") or endpoint unreachable. Telemetry batch dropped." << std::endl;
+ } else {
+ LOG(WARNING, "Telemetry") << "Telemetry batch dropped (retries disabled)." << std::endl;
+ }
+ break;
+ }
+ }
+ }
+ }
+
+public:
+ TelemetryQueue() {
+ worker_ = std::thread(&TelemetryQueue::worker_loop, this);
+ }
+
+ void flush() {
+ std::unique_lock lock(mutex_);
+ if (queue_.empty()) {
+ return;
+ }
+ flush_requested_ = true;
+ cv_.notify_one();
+ cv_flush_.wait(lock, [this]() { return !flush_requested_; });
+ }
+
+ void reset_unreachable() {
+ std::unique_lock lock(mutex_);
+ endpoint_unreachable_ = false;
+ }
+
+ void shutdown() {
+ {
+ std::unique_lock lock(mutex_);
+ if (shutdown_) return;
+ shutdown_ = true;
+ }
+ cv_.notify_one();
+ if (worker_.joinable()) {
+ worker_.join();
+ }
+ }
+
+ ~TelemetryQueue() {
+ shutdown();
+ }
+
+ void push(nlohmann::json span_details, std::string endpoint, std::map headers, std::string protocol) {
+ std::unique_lock lock(mutex_);
+ if (shutdown_) return;
+
+ if (endpoint != last_endpoint_ || !last_enabled_) {
+ last_endpoint_ = endpoint;
+ last_enabled_ = true;
+ endpoint_unreachable_ = false;
+ }
+
+ size_t max_capacity = MAX_CAPACITY;
+ if (auto* config = RuntimeConfig::global()) {
+ max_capacity = config->telemetry_max_queue_capacity();
+ }
+
+ if (queue_.size() >= max_capacity) {
+ dropped_spans_count_++;
+ if (dropped_spans_count_ % 100 == 1) {
+ LOG(WARNING, "Telemetry") << "Telemetry queue full (capacity " << max_capacity
+ << "). Dropped oldest span. Total dropped: " << dropped_spans_count_ << std::endl;
+ }
+ queue_.pop_front();
+ }
+
+ int batch_size = 100;
+ if (auto* config = RuntimeConfig::global()) {
+ batch_size = config->telemetry_otlp_send_batch_size();
+ }
+ LOG(INFO, "Telemetry") << "Accumulating span to batch (size " << (queue_.size() + 1) << "/" << batch_size << ")..." << std::endl;
+
+ queue_.push_back({std::move(span_details), std::move(endpoint), std::move(headers), std::move(protocol), std::chrono::steady_clock::now()});
+ cv_.notify_one();
+ }
+};
+
+static TelemetryQueue& get_queue() {
+ static TelemetryQueue queue;
+ return queue;
+}
+
+void shutdown() {
+ g_metrics_worker.stop();
+ get_queue().shutdown();
+}
+
+void flush() {
+ g_metrics_worker.drain();
+ get_queue().flush();
+}
+
+static std::string generate_hex_id(size_t num_bytes) {
+ thread_local std::mt19937 gen(std::random_device{}());
+ std::uniform_int_distribution dist(0, 255);
+ std::stringstream ss;
+ for (size_t i = 0; i < num_bytes; ++i) {
+ ss << std::hex << std::setw(2) << std::setfill('0') << dist(gen);
+ }
+ return ss.str();
+}
+
+static uint64_t get_unix_nano() {
+ auto now = std::chrono::system_clock::now();
+ return std::chrono::duration_cast(now.time_since_epoch()).count();
+}
+
+static std::string truncate_string(const std::string& str, size_t max_len) {
+ if (str.length() <= max_len) {
+ return str;
+ }
+ if (max_len <= 15) {
+ return str.substr(0, max_len);
+ }
+ return str.substr(0, max_len - 15) + "... [TRUNCATED]";
+}
+
+InferenceSpan::InferenceSpan(const std::string& span_kind, const std::string& name, const std::string& model_name, const nlohmann::json& request_json)
+ : span_kind_(span_kind), name_(name), model_name_(model_name), start_time_(std::chrono::steady_clock::now()) {
+ trace_id_ = generate_hex_id(16);
+ span_id_ = generate_hex_id(8);
+
+ size_t max_len = 4096;
+ if (auto* config = RuntimeConfig::global()) {
+ max_len = static_cast(config->telemetry_max_attribute_length());
+ }
+
+ if (request_json.contains("user") && request_json["user"].is_string()) {
+ user_id_ = truncate_string(request_json["user"].get(), max_len);
+ }
+ if (request_json.contains("session_id") && request_json["session_id"].is_string()) {
+ session_id_ = truncate_string(request_json["session_id"].get(), max_len);
+ }
+
+ if (span_kind_ == "LLM") {
+ if (request_json.contains("messages") && request_json["messages"].is_array()) {
+ request_dump_ = truncate_string(request_json["messages"].dump(), max_len);
+ for (const auto& msg : request_json["messages"]) {
+ if (msg.is_object()) {
+ Message message;
+ if (msg.contains("role") && msg["role"].is_string()) {
+ message.role = msg["role"].get();
+ }
+ if (msg.contains("content") && msg["content"].is_string()) {
+ message.content = truncate_string(msg["content"].get(), max_len);
+ } else if (msg.contains("content")) {
+ message.content = truncate_string(msg["content"].dump(), max_len);
+ }
+ input_messages_.push_back(message);
+ }
+ }
+ } else if (request_json.contains("prompt") && request_json["prompt"].is_string()) {
+ std::string prompt_str = request_json["prompt"].get();
+ request_dump_ = truncate_string(prompt_str, max_len);
+ input_messages_.push_back({"user", request_dump_});
+ } else {
+ request_dump_ = truncate_string(request_json.dump(), max_len);
+ }
+ } else if (span_kind_ == "EMBEDDING") {
+ if (request_json.contains("input")) {
+ if (request_json["input"].is_string()) {
+ request_dump_ = truncate_string(request_json["input"].get(), max_len);
+ } else {
+ request_dump_ = truncate_string(request_json["input"].dump(), max_len);
+ }
+ } else {
+ request_dump_ = truncate_string(request_json.dump(), max_len);
+ }
+ } else {
+ request_dump_ = truncate_string(request_json.dump(), max_len);
+ }
+}
+
+InferenceSpan::~InferenceSpan() {
+ if (!ended_) {
+ end_with_error("Span destroyed before explicit completion");
+ }
+}
+
+void InferenceSpan::set_attribute(const std::string& key, const nlohmann::json& value) {
+ custom_attributes_[key] = value;
+}
+
+static void get_telemetry_semantics(bool& has_openinference, bool& has_otel_genai) {
+ std::vector semantics = {"openinference"};
+ if (auto* config = RuntimeConfig::global()) {
+ semantics = config->telemetry_otlp_semantics();
+ }
+ has_openinference = std::find(semantics.begin(), semantics.end(), "openinference") != semantics.end();
+ has_otel_genai = std::find(semantics.begin(), semantics.end(), "otel_genai") != semantics.end();
+}
+
+nlohmann::json InferenceSpan::build_common_attributes(bool has_openinference, bool has_otel_genai, bool hide_inputs) {
+ nlohmann::json attributes = nlohmann::json::array();
+ std::string final_input = hide_inputs ? "[REDACTED]" : request_dump_;
+
+ if (has_openinference) {
+ attributes.push_back({{"key", "openinference.span.kind"}, {"value", {{"stringValue", span_kind_}}}});
+
+ if (span_kind_ == "LLM") {
+ attributes.push_back({{"key", "llm.model_name"}, {"value", {{"stringValue", model_name_}}}});
+ } else if (span_kind_ == "EMBEDDING") {
+ attributes.push_back({{"key", "embedding.model_name"}, {"value", {{"stringValue", model_name_}}}});
+ } else if (span_kind_ == "RERANKER") {
+ attributes.push_back({{"key", "reranker.model_name"}, {"value", {{"stringValue", model_name_}}}});
+ }
+
+ attributes.push_back({{"key", "input.value"}, {"value", {{"stringValue", final_input}}}});
+
+ if (!user_id_.empty()) {
+ attributes.push_back({{"key", "openinference.user.id"}, {"value", {{"stringValue", user_id_}}}});
+ }
+ if (!session_id_.empty()) {
+ attributes.push_back({{"key", "openinference.session.id"}, {"value", {{"stringValue", session_id_}}}});
+ }
+
+ if (span_kind_ == "LLM") {
+ for (size_t i = 0; i < input_messages_.size(); ++i) {
+ std::string role_key = "llm.input_messages." + std::to_string(i) + ".message.role";
+ std::string content_key = "llm.input_messages." + std::to_string(i) + ".message.content";
+ attributes.push_back({{"key", role_key}, {"value", {{"stringValue", input_messages_[i].role}}}});
+ attributes.push_back({{"key", content_key}, {"value", {{"stringValue", hide_inputs ? "[REDACTED]" : input_messages_[i].content}}}});
+ }
+ }
+ }
+
+ if (has_otel_genai) {
+ std::string op_name = "chat";
+ if (span_kind_ == "EMBEDDING") op_name = "embeddings";
+ else if (span_kind_ == "RERANKER") op_name = "rerank";
+ else if (name_ != "chat.completions") op_name = "completion";
+
+ attributes.push_back({{"key", "gen_ai.operation.name"}, {"value", {{"stringValue", op_name}}}});
+ attributes.push_back({{"key", "gen_ai.request.model"}, {"value", {{"stringValue", model_name_}}}});
+ attributes.push_back({{"key", "gen_ai.provider.name"}, {"value", {{"stringValue", "lemonade"}}}});
+ attributes.push_back({{"key", "gen_ai.system"}, {"value", {{"stringValue", "lemonade"}}}});
+
+ if (!session_id_.empty()) {
+ attributes.push_back({{"key", "gen_ai.conversation.id"}, {"value", {{"stringValue", session_id_}}}});
+ }
+
+ if (span_kind_ == "LLM") {
+ for (size_t i = 0; i < input_messages_.size(); ++i) {
+ std::string role_key = "gen_ai.input.messages." + std::to_string(i) + ".role";
+ std::string content_key = "gen_ai.input.messages." + std::to_string(i) + ".content";
+ attributes.push_back({{"key", role_key}, {"value", {{"stringValue", input_messages_[i].role}}}});
+ attributes.push_back({{"key", content_key}, {"value", {{"stringValue", hide_inputs ? "[REDACTED]" : input_messages_[i].content}}}});
+ }
+ }
+ }
+
+ for (const auto& [k, v] : custom_attributes_) {
+ if (v.is_string()) {
+ attributes.push_back({{"key", k}, {"value", {{"stringValue", v.get()}}}});
+ } else if (v.is_number_integer()) {
+ attributes.push_back({{"key", k}, {"value", {{"intValue", v.get()}}}});
+ } else if (v.is_boolean()) {
+ attributes.push_back({{"key", k}, {"value", {{"boolValue", v.get()}}}});
+ } else if (v.is_number_float()) {
+ attributes.push_back({{"key", k}, {"value", {{"doubleValue", v.get()}}}});
+ } else {
+ attributes.push_back({{"key", k}, {"value", {{"stringValue", v.dump()}}}});
+ }
+ }
+
+ return attributes;
+}
+
+void InferenceSpan::end_with_success(const nlohmann::json& usage_or_timings, const std::string& complete_output) {
+ if (ended_) return;
+ ended_ = true;
+
+ auto end_time = get_unix_nano();
+ uint64_t start_nano = end_time - std::chrono::duration_cast(
+ std::chrono::steady_clock::now() - start_time_).count();
+
+ bool hide_inputs = false;
+ bool hide_outputs = false;
+ bool hide_thinking = false;
+ if (auto* config = RuntimeConfig::global()) {
+ hide_inputs = config->telemetry_hide_inputs();
+ hide_outputs = config->telemetry_hide_outputs();
+ hide_thinking = config->telemetry_hide_thinking();
+ }
+
+ std::string final_output;
+ if (hide_outputs) {
+ final_output = "[REDACTED]";
+ } else {
+ final_output = complete_output;
+ if (hide_thinking) {
+ final_output = strip_thinking(final_output);
+ } else {
+ final_output = standardize_thinking(final_output);
+ }
+ }
+
+ bool has_openinference = false;
+ bool has_otel_genai = false;
+ get_telemetry_semantics(has_openinference, has_otel_genai);
+
+ nlohmann::json attributes = build_common_attributes(has_openinference, has_otel_genai, hide_inputs);
+
+ int prompt_tokens = -1;
+ int completion_tokens = -1;
+ int total_tokens = -1;
+
+ std::string prefix = "llm";
+ if (span_kind_ == "EMBEDDING") prefix = "embedding";
+ else if (span_kind_ == "RERANKER") prefix = "reranker";
+
+ if (usage_or_timings.contains("prompt_tokens") && usage_or_timings["prompt_tokens"].is_number()) {
+ prompt_tokens = usage_or_timings["prompt_tokens"].get();
+ }
+ if (usage_or_timings.contains("completion_tokens") && usage_or_timings["completion_tokens"].is_number()) {
+ completion_tokens = usage_or_timings["completion_tokens"].get();
+ }
+ if (usage_or_timings.contains("total_tokens") && usage_or_timings["total_tokens"].is_number()) {
+ total_tokens = usage_or_timings["total_tokens"].get();
+ } else if (prompt_tokens >= 0 && completion_tokens >= 0) {
+ total_tokens = prompt_tokens + completion_tokens;
+ }
+
+ if (has_openinference) {
+ attributes.push_back({{"key", "output.value"}, {"value", {{"stringValue", final_output}}}});
+
+ if (span_kind_ == "LLM" && !final_output.empty()) {
+ attributes.push_back({{"key", "llm.output_messages.0.message.role"}, {"value", {{"stringValue", "assistant"}}}});
+ attributes.push_back({{"key", "llm.output_messages.0.message.content"}, {"value", {{"stringValue", final_output}}}});
+ }
+
+ if (prompt_tokens >= 0) {
+ attributes.push_back({{"key", prefix + ".usage.prompt_tokens"}, {"value", {{"intValue", prompt_tokens}}}});
+ }
+ if (completion_tokens >= 0) {
+ attributes.push_back({{"key", prefix + ".usage.completion_tokens"}, {"value", {{"intValue", completion_tokens}}}});
+ }
+ if (total_tokens >= 0) {
+ attributes.push_back({{"key", prefix + ".usage.total_tokens"}, {"value", {{"intValue", total_tokens}}}});
+ }
+
+ if (prompt_tokens >= 0) {
+ attributes.push_back({{"key", "llm.token_count.prompt"}, {"value", {{"intValue", prompt_tokens}}}});
+ }
+ if (completion_tokens >= 0) {
+ attributes.push_back({{"key", "llm.token_count.completion"}, {"value", {{"intValue", completion_tokens}}}});
+ }
+ if (total_tokens >= 0) {
+ attributes.push_back({{"key", "llm.token_count.total"}, {"value", {{"intValue", total_tokens}}}});
+ }
+ }
+
+ if (has_otel_genai) {
+ if (prompt_tokens >= 0) {
+ attributes.push_back({{"key", "gen_ai.usage.input_tokens"}, {"value", {{"intValue", prompt_tokens}}}});
+ }
+ if (completion_tokens >= 0) {
+ attributes.push_back({{"key", "gen_ai.usage.output_tokens"}, {"value", {{"intValue", completion_tokens}}}});
+ }
+ if (span_kind_ == "LLM" && !final_output.empty()) {
+ attributes.push_back({{"key", "gen_ai.output.messages.0.role"}, {"value", {{"stringValue", "assistant"}}}});
+ attributes.push_back({{"key", "gen_ai.output.messages.0.content"}, {"value", {{"stringValue", final_output}}}});
+ }
+ }
+
+ nlohmann::json span_json = {
+ {"traceId", trace_id_},
+ {"spanId", span_id_},
+ {"name", name_},
+ {"kind", 2},
+ {"startTimeUnixNano", std::to_string(start_nano)},
+ {"endTimeUnixNano", std::to_string(end_time)},
+ {"attributes", attributes},
+ {"status", {{"code", 1}}}
+ };
+
+ submit_span(span_json);
+}
+
+void InferenceSpan::end_with_error(const std::string& error_message) {
+ if (ended_) return;
+ ended_ = true;
+
+ auto end_time = get_unix_nano();
+ uint64_t start_nano = end_time - std::chrono::duration_cast(
+ std::chrono::steady_clock::now() - start_time_).count();
+
+ bool hide_inputs = false;
+ if (auto* config = RuntimeConfig::global()) {
+ hide_inputs = config->telemetry_hide_inputs();
+ }
+
+ bool has_openinference = false;
+ bool has_otel_genai = false;
+ get_telemetry_semantics(has_openinference, has_otel_genai);
+
+ nlohmann::json attributes = build_common_attributes(has_openinference, has_otel_genai, hide_inputs);
+
+ nlohmann::json span_json = {
+ {"traceId", trace_id_},
+ {"spanId", span_id_},
+ {"name", name_},
+ {"kind", 2},
+ {"startTimeUnixNano", std::to_string(start_nano)},
+ {"endTimeUnixNano", std::to_string(end_time)},
+ {"attributes", attributes},
+ {"status", {{"code", 2}, {"message", error_message}}}
+ };
+
+ submit_span(span_json);
+}
+
+void InferenceSpan::cancel() {
+ ended_ = true;
+}
+
+static bool is_valid_header_token(const std::string& str) {
+ for (char c : str) {
+ if (c == '\r' || c == '\n' || c == '\0') {
+ return false;
+ }
+ }
+ return true;
+}
+
+static std::string to_lowercase(std::string str) {
+ std::transform(str.begin(), str.end(), str.begin(), [](unsigned char c) {
+ return std::tolower(c);
+ });
+ return str;
+}
+
+void InferenceSpan::submit_span(const nlohmann::json& span_details) {
+ auto* config = RuntimeConfig::global();
+ if (!config || !config->telemetry_enabled()) return;
+
+ std::string endpoint = config->telemetry_otlp_endpoint();
+ auto raw_config_headers = config->telemetry_otlp_headers();
+ std::string protocol = config->telemetry_otlp_protocol();
+
+ std::map headers;
+
+ auto sanitize_and_insert = [&headers](std::string k, std::string v) {
+ size_t start = 0;
+ while (start < k.size() && std::isspace(static_cast(k[start]))) {
+ start++;
+ }
+ size_t end = k.size();
+ while (end > start && std::isspace(static_cast(k[end - 1]))) {
+ end--;
+ }
+ k = k.substr(start, end - start);
+
+ start = 0;
+ while (start < v.size() && std::isspace(static_cast(v[start]))) {
+ start++;
+ }
+ end = v.size();
+ while (end > start && std::isspace(static_cast(v[end - 1]))) {
+ end--;
+ }
+ v = v.substr(start, end - start);
+
+ if (!k.empty() && is_valid_header_token(k) && is_valid_header_token(v)) {
+ std::string k_lower = to_lowercase(k);
+ if (k_lower != "content-type" && k_lower != "content-length") {
+ headers[k] = v;
+ } else {
+ LOG(WARNING, "Telemetry") << "Disallowed overriding well-known OTLP header: " << k << std::endl;
+ }
+ } else if (!k.empty()) {
+ LOG(WARNING, "Telemetry") << "Rejected invalid OTLP header key or value containing CR, LF, or NUL." << std::endl;
+ }
+ };
+
+ for (const auto& [k, v] : raw_config_headers) {
+ sanitize_and_insert(k, v);
+ }
+
+ if (const char* env_headers = std::getenv("OTEL_EXPORTER_OTLP_HEADERS")) {
+ std::string env_str(env_headers);
+ std::stringstream ss(env_str);
+ std::string item;
+ while (std::getline(ss, item, ',')) {
+ size_t eq = item.find('=');
+ if (eq != std::string::npos) {
+ std::string k = item.substr(0, eq);
+ std::string v = item.substr(eq + 1);
+ sanitize_and_insert(k, v);
+ }
+ }
+ }
+
+ get_queue().push(span_details, std::move(endpoint), std::move(headers), std::move(protocol));
+}
+
+std::shared_ptr TelemetryTracker::start_span(const std::string& span_kind, const std::string& name, const std::string& model_name, const nlohmann::json& request_json) {
+ auto* config = RuntimeConfig::global();
+ if (config && config->telemetry_enabled()) {
+ return std::make_shared(span_kind, name, model_name, request_json);
+ }
+ return nullptr;
+}
+
+void end_llm_span_async(
+ std::shared_ptr span,
+ const std::string& metrics_url,
+ std::function(const std::string&)> parser,
+ const nlohmann::json& usage_payload,
+ const std::string& text_output) {
+
+ if (!span) return;
+
+ if (metrics_url.empty() || !parser) {
+ span->end_with_success(usage_payload, text_output);
+ return;
+ }
+
+ if (!g_metrics_worker.enqueue(span, metrics_url, parser, usage_payload, text_output)) {
+ LOG(WARNING, "Telemetry") << "MetricsWorker queue full. Dropping optional metrics and completing span immediately." << std::endl;
+ span->end_with_success(usage_payload, text_output);
+ }
+}
+
+} // namespace lemon::telemetry
diff --git a/src/cpp/server/telemetry.h b/src/cpp/server/telemetry.h
new file mode 100644
index 000000000..b88ca7a06
--- /dev/null
+++ b/src/cpp/server/telemetry.h
@@ -0,0 +1,62 @@
+#pragma once
+#include
+#include