From e5ebbce96a42b932f84cadbefe6e550dedd1371a Mon Sep 17 00:00:00 2001 From: Christoph Walser Date: Sun, 1 Mar 2026 20:55:50 +0000 Subject: [PATCH 1/6] renamed benchmark to model_benchmark because of name shadowing --- benchmark/{benchmark.py => model_benchmark.py} | 0 launch_benchmark.py | 2 +- 2 files changed, 1 insertion(+), 1 deletion(-) rename benchmark/{benchmark.py => model_benchmark.py} (100%) diff --git a/benchmark/benchmark.py b/benchmark/model_benchmark.py similarity index 100% rename from benchmark/benchmark.py rename to benchmark/model_benchmark.py diff --git a/launch_benchmark.py b/launch_benchmark.py index c9b1ddf..7bdd544 100755 --- a/launch_benchmark.py +++ b/launch_benchmark.py @@ -41,7 +41,7 @@ ) from rich.table import Table -from benchmark.benchmark import ModelBenchmark +from benchmark.model_benchmark import ModelBenchmark from benchmark.utils_multi import load_multi_cfg # ── Silence noisy logs up‑front ───────────────────────────────────────────── From c29823996f0ec88b53c40864d05d9645b2f29748 Mon Sep 17 00:00:00 2001 From: Christoph Walser Date: Sun, 1 Mar 2026 20:56:10 +0000 Subject: [PATCH 2/6] add newer chat completions api to inference engine client --- benchmark/inference_engine_client.py | 186 ++++++++++++++++++++++----- 1 file changed, 157 insertions(+), 29 deletions(-) diff --git a/benchmark/inference_engine_client.py b/benchmark/inference_engine_client.py index ea4a789..98b4831 100644 --- a/benchmark/inference_engine_client.py +++ b/benchmark/inference_engine_client.py @@ -2,32 +2,35 @@ import subprocess import time import requests +import base64 +import mimetypes from pathlib import Path import httpx from benchmark.utils import _start_log_tailer class InferenceEngineClient: """ - Wrapper for an OpenAI‐compatible server. + Wrapper for an OpenAI‐compatible server. launch() will call your existing launch_engine.sh script (which runs Docker in the foreground), then poll /v1/completions every second until it returns 200. """ - def __init__(self, base_url="http://localhost:23333/v1", api_key="none"): + def __init__(self, base_url="http://127.0.0.1:23333/v1", api_key="none"): from openai import OpenAI + self.base_url = base_url self.client = OpenAI(api_key=api_key, base_url=base_url, timeout=httpx.Timeout(60.0)) self._launcher_proc = None self.model = None self.NAME = "bench360_inference_engine" - + def launch(self, backend: str, model: str, timeout: float = 500.0, dump_server_output: bool = False, script_path: str = None): """ 1) Starts your existing launch_engine.sh in a Popen (non-blocking). 2) Polls `http://127.0.0.1:23333/v1/models` every 2 seconds until the desired model shows up (or timeout). - + :param backend: One of {tgi, vllm, mii, sglang, lmdeploy} :param model: HF model ID or local path :param timeout: Max seconds to wait for the model to appear before raising. @@ -58,7 +61,7 @@ def launch(self, backend: str, model: str, timeout: float = 500.0, dump_server_o _start_log_tailer(self, max_lines=10, dump_server_output=dump_server_output) # 2) Poll /v1/models until our model_id appears or timeout. - list_url = "http://127.0.0.1:23333/v1/models" + list_url = self.base_url + "/models" start_time = time.time() while True: @@ -90,7 +93,61 @@ def launch(self, backend: str, model: str, timeout: float = 500.0, dump_server_o ) time.sleep(2.0) + def chat_completion( + self, + messages, + images: str | list[str] | None = None, # Accepts a single path/URL or a list + model: str | None = None, + temperature: float = 0.1, + max_tokens: int = 64, + top_p: float = 0.9, + stream: bool = False, + ): + """ + Send a request to the newer chat.completions endpoint. + :param messages: A list of OpenAI message dicts, OR a simple string prompt. + :param images: Optional local file path(s), HTTP URL(s), or base64 data URI(s). + """ + model_to_use = model or self.model + + if isinstance(messages, str): + if images: + # 1) Standardize input into a list + if isinstance(images, str): + images = [images] + + # 2) Build the multimodal content list + content_list = [{"type": "text", "text": messages}] + + # 3) Process and append each image + for img in images: + processed_uri = self._prepare_image_input(img) + content_list.append({ + "type": "image_url", + "image_url": {"url": processed_uri} + }) + + formatted_messages = [{"role": "user", "content": content_list}] + else: + # Text-only simple prompt + formatted_messages = [{"role": "user", "content": messages}] + else: + # Assume it's already a well-formatted OpenAI message list + formatted_messages = messages + + resp = self.client.chat.completions.create( + model=model_to_use, + messages=formatted_messages, + temperature=temperature, + max_tokens=max_tokens, + top_p=top_p, + stream=stream, + ) + if stream: + return resp + + return resp.choices[0].message.content def completion( self, @@ -124,19 +181,16 @@ def completion( def warmup(self, num_iters: int = 3): """ - Send a few small dummy requests to load the model into memory and - JIT any kernels so that subsequent inference calls are faster. - :param model: HF model ID or local path; defaults to self.default_model - :param num_iters: Number of warmup calls to make + Send a few small dummy requests using the chat endpoint to load the model into memory + and JIT any kernels so that subsequent inference calls are faster. """ - - dummy_prompt = "Warmup" + messages = [{"role": "user", "content": "Warmup request."}] for _ in range(num_iters): try: - _ = self.client.completions.create( + _ = self.client.chat.completions.create( model=self.model, - prompt=dummy_prompt, + messages=messages, max_tokens=1, temperature=0.1, ) @@ -147,19 +201,22 @@ def warmup(self, num_iters: int = 3): def measure_ttft(self) -> float: """ - Issue a 1‐token streaming request and measure the time until the first chunk arrives. + Issue a streaming chat request and measure the time until the first chunk arrives. """ import time - prompt = ( - "Artificial intelligence is a rapidly evolving field with applications in " - "healthcare, finance, education, and more. One of the most transformative " - "technologies is" - ) + messages = [{ + "role": "user", + "content": ( + "Artificial intelligence is a rapidly evolving field with applications in " + "healthcare, finance, education, and more. One of the most transformative " + "technologies is" + ) + }] start = time.time() - stream_resp = self.client.completions.create( + stream_resp = self.client.chat.completions.create( model=self.model, - prompt=prompt, + messages=messages, max_tokens=1, temperature=0.1, stream=True, @@ -167,12 +224,17 @@ def measure_ttft(self) -> float: first_token_time = None for chunk in stream_resp: - delta = getattr(chunk.choices[0], "delta", None) - if delta and delta.get("text", "").strip() != "": + # In chat.completions, the text chunk is stored in delta.content + delta = chunk.choices[0].delta + content = getattr(delta, "content", None) + + # If content exists and is not an empty string, we caught the first token + if content: first_token_time = time.time() break if first_token_time is None: + # Fallback if the stream ends without yielding valid text first_token_time = time.time() return first_token_time - start @@ -213,13 +275,79 @@ def close(self): except Exception: pass + def _prepare_image_input(self, image_input: str) -> str: + """ + Helper method to detect if the input is a local file path. + If it is, it reads the file and returns a Base64 data URI. + Otherwise, it assumes it's already a URL or data URI and returns it as-is. + """ + # Check if the string is a valid path to an existing local file + if os.path.isfile(image_input): + # Guess the mime type (e.g., image/jpeg, image/png) based on extension + mime_type, _ = mimetypes.guess_type(image_input) + mime_type = mime_type or "image/jpeg" # Fallback + + with open(image_input, "rb") as f: + encoded_string = base64.b64encode(f.read()).decode("utf-8") + + return f"data:{mime_type};base64,{encoded_string}" + + # If it's not a local file, assume it's a web URL or pre-encoded Base64 string + return image_input + if __name__ == "__main__": - ### ttft + + # --- Generate Local Dummy Images for Offline Testing --- + # 1x1 Red Pixel + img1_path = "test_red.png" + img1_b64 = "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR42mP8z8BQDwAEhQGAhKmMIQAAAABJRU5ErkJggg==" + if not os.path.exists(img1_path): + with open(img1_path, "wb") as f: + f.write(base64.b64decode(img1_b64)) + + # 1x1 Blue Pixel + img2_path = "test_blue.png" + img2_b64 = "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR42mNkYPhfDwAChwGA60e6kgAAAABJRU5ErkJggg==" + if not os.path.exists(img2_path): + with open(img2_path, "wb") as f: + f.write(base64.b64decode(img2_b64)) + + # Instantiate the client directly client = InferenceEngineClient() - client.launch(backend="tgi", model="Qwen/Qwen2.5-7B-Instruct") - client.warmup() - ttft = client.measure_ttft() - print(f"TTFT: {ttft:.3f} seconds") - client.close() \ No newline at end of file + + try: + # Launching with the actual multimodal model + client.launch(backend="sglang", model="Qwen/Qwen3-VL-2B-Instruct") + client.warmup() + + # 1. Benchmark TTFT + ttft = client.measure_ttft() + print(f"TTFT: {ttft:.3f} seconds") + + # 2. Test Case: Single Image (Local Path -> Base64) + print("\n--- Testing Single Image (Fully Offline) ---") + single_resp = client.chat_completion( + messages="What color is the main subject in this image?", + images=img1_path # The helper will auto-detect and base64-encode this local file + ) + print("Response:\n", single_resp) + + # 3. Test Case: Multiple Images (Local Paths -> Base64) + print("\n--- Testing Multiple Images (Fully Offline) ---") + multi_resp = client.chat_completion( + messages="Are these two images identical? Explain the differences.", + images=[img1_path, img2_path] # Both local files will be base64-encoded automatically + ) + print("Response:\n", multi_resp) + + finally: + # Ensures Docker container and subprocesses are always shut down cleanly + client.close() + + # Cleanup temporary local dummy files + if os.path.exists(img1_path): + os.remove(img1_path) + if os.path.exists(img2_path): + os.remove(img2_path) \ No newline at end of file From ddd506f57a6dad77bba4965d39386153c116da4a Mon Sep 17 00:00:00 2001 From: Christoph Walser Date: Sun, 1 Mar 2026 21:49:10 +0000 Subject: [PATCH 3/6] added api_type literal to base task --- benchmark/tasks/base_task.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/benchmark/tasks/base_task.py b/benchmark/tasks/base_task.py index 40f39b9..9851e96 100644 --- a/benchmark/tasks/base_task.py +++ b/benchmark/tasks/base_task.py @@ -1,9 +1,11 @@ from abc import ABC, abstractmethod +from typing import Literal class BaseTask(ABC): """ Abstract base class for all NLP tasks. """ + api_type: Literal["completion", "chat_completion"] = "completion" @abstractmethod def generate_prompts(self, num_examples: int) -> tuple[list[str], list[str]]: From 06f1a625d8a3ad8212ec93e6b96dfa63ad35e14e Mon Sep 17 00:00:00 2001 From: Christoph Walser Date: Sun, 1 Mar 2026 22:17:12 +0000 Subject: [PATCH 4/6] added logic for distinction in generate between completion and chat completion and laterals in base task --- benchmark/model_benchmark.py | 42 +++++++++++++++++++++++++++++++++--- benchmark/tasks/base_task.py | 8 +++---- 2 files changed, 43 insertions(+), 7 deletions(-) diff --git a/benchmark/model_benchmark.py b/benchmark/model_benchmark.py index e6333c1..1115d20 100755 --- a/benchmark/model_benchmark.py +++ b/benchmark/model_benchmark.py @@ -181,9 +181,38 @@ def get_model_size(self, repo_id: str, revision: str | None = None) -> float: print(f"[ERROR] Could not estimate model size for {repo_id}: {e}") return 0.0 - def generate(self, prompts): - return self.iec.completion(prompts, temperature=self.temperature, top_p=self.top_p, max_tokens=self.max_tokens) + api = getattr(self, "api_type", "completion") + + if api == "chat_completion": + results = [] + for p in prompts: + # If the prompt is a dict, unpack it to support images + if isinstance(p, dict): + res = self.iec.chat_completion( + messages=p.get("messages", ""), + images=p.get("images", None), + temperature=self.temperature, + top_p=self.top_p, + max_tokens=self.max_tokens + ) + else: + # Standard text-only chat completion + res = self.iec.chat_completion( + messages=p, + temperature=self.temperature, + top_p=self.top_p, + max_tokens=self.max_tokens + ) + results.append(res) + return results + else: + return self.iec.completion( + prompts, + temperature=self.temperature, + top_p=self.top_p, + max_tokens=self.max_tokens + ) def _generate_and_time(self, prompt): """ @@ -444,7 +473,8 @@ def _run_scenario( batch_size: Optional[int] = None, samples: Optional[int] = None, sample_interval: float = 0.1, - quality_metric: bool = True + quality_metric: bool = True, + api_type: Optional[str] = None, ): # 1) instantiate Task if task == "summarization": @@ -462,9 +492,15 @@ def _run_scenario( elif task == "mmlu": from benchmark.tasks.mmlu import MMLUTask task_ = MMLUTask() + elif task == "vision": + from benchmark.tasks.vision import SimpleVisionTask + task_ = SimpleVisionTask() else: raise ValueError(f"Task {task!r} not supported.") + # Capture the api_type: use the explicit override if provided, otherwise fallback to task default + self.api_type = api_type or getattr(task_, "api_type", "completion") + # 2) initialize & metadata meta_df = self._initialize(task=task, scenario=scenario) diff --git a/benchmark/tasks/base_task.py b/benchmark/tasks/base_task.py index 9851e96..37dd153 100644 --- a/benchmark/tasks/base_task.py +++ b/benchmark/tasks/base_task.py @@ -1,5 +1,5 @@ from abc import ABC, abstractmethod -from typing import Literal +from typing import Literal, Any class BaseTask(ABC): """ @@ -8,9 +8,9 @@ class BaseTask(ABC): api_type: Literal["completion", "chat_completion"] = "completion" @abstractmethod - def generate_prompts(self, num_examples: int) -> tuple[list[str], list[str]]: + def generate_prompts(self, num_examples: int) -> tuple[list[str | dict[str, Any]], list[str]]: """ - Should return a list of prompts and reference answers. + Should return a list of prompts (strings or multimodal dicts) and reference answers. """ pass @@ -19,4 +19,4 @@ def quality_metrics(self, generated: str, reference: str) -> dict[str, float]: """ Should return a dictionary of task-specific evaluation metrics. """ - pass + pass \ No newline at end of file From b3731aa083a01a1215e9d0dfe2cdca42f1de2a70 Mon Sep 17 00:00:00 2001 From: Christoph Walser Date: Tue, 3 Mar 2026 22:01:54 +0000 Subject: [PATCH 5/6] added code for batch scenario and chat completions api --- benchmark/inference_engine_client.py | 17 +++++++++++++---- benchmark/model_benchmark.py | 19 ++++++++++++------- 2 files changed, 25 insertions(+), 11 deletions(-) diff --git a/benchmark/inference_engine_client.py b/benchmark/inference_engine_client.py index 98b4831..ec62738 100644 --- a/benchmark/inference_engine_client.py +++ b/benchmark/inference_engine_client.py @@ -104,9 +104,19 @@ def chat_completion( stream: bool = False, ): """ - Send a request to the newer chat.completions endpoint. - :param messages: A list of OpenAI message dicts, OR a simple string prompt. - :param images: Optional local file path(s), HTTP URL(s), or base64 data URI(s). + Send a request to the chat.completions endpoint. + + Args: + messages (str | list[dict]): A list of OpenAI message dicts, or a simple string prompt. + images (str | list[str] | None, optional): Local file path(s), HTTP URL(s), or base64 data URI(s). Defaults to None. + model (str | None, optional): The model to use. If None, falls back to the instance's default model. Defaults to None. + temperature (float, optional): Sampling temperature. Defaults to 0.1. + max_tokens (int, optional): The maximum number of tokens to generate. Defaults to 64. + top_p (float, optional): Nucleus sampling parameter. Defaults to 0.9. + stream (bool, optional): Whether to stream the response. Defaults to False. + + Returns: + str | Any: The text content of the generated message if stream is False. Otherwise, returns the stream object. """ model_to_use = model or self.model @@ -161,7 +171,6 @@ def completion( """ Send one or more prompts. :param prompt: string or list[str] """ - model = model is_batch = isinstance(prompt, (list, tuple)) resp = self.client.completions.create( diff --git a/benchmark/model_benchmark.py b/benchmark/model_benchmark.py index 1115d20..66844d8 100755 --- a/benchmark/model_benchmark.py +++ b/benchmark/model_benchmark.py @@ -185,11 +185,10 @@ def generate(self, prompts): api = getattr(self, "api_type", "completion") if api == "chat_completion": - results = [] - for p in prompts: - # If the prompt is a dict, unpack it to support images + # Helper function for the thread pool to execute + def _call_chat(p): if isinstance(p, dict): - res = self.iec.chat_completion( + return self.iec.chat_completion( messages=p.get("messages", ""), images=p.get("images", None), temperature=self.temperature, @@ -197,16 +196,22 @@ def generate(self, prompts): max_tokens=self.max_tokens ) else: - # Standard text-only chat completion - res = self.iec.chat_completion( + return self.iec.chat_completion( messages=p, temperature=self.temperature, top_p=self.top_p, max_tokens=self.max_tokens ) - results.append(res) + + # Fire all requests in the batch concurrently + results = [] + with concurrent.futures.ThreadPoolExecutor(max_workers=len(prompts)) as executor: + # executor.map keeps the results in the exact same order as the input prompts + results = list(executor.map(_call_chat, prompts)) return results + else: + # Standard completion API handles list of strings natively return self.iec.completion( prompts, temperature=self.temperature, From 08fcd73857f7d96caecec486725df6f2cf4afdf6 Mon Sep 17 00:00:00 2001 From: Christoph Walser Date: Wed, 4 Mar 2026 10:16:34 +0000 Subject: [PATCH 6/6] implemented asyncio and asyncopenai --- benchmark/inference_engine_client.py | 102 ++++---- benchmark/model_benchmark.py | 345 +++++++++++++-------------- 2 files changed, 227 insertions(+), 220 deletions(-) diff --git a/benchmark/inference_engine_client.py b/benchmark/inference_engine_client.py index ec62738..1372d25 100644 --- a/benchmark/inference_engine_client.py +++ b/benchmark/inference_engine_client.py @@ -6,6 +6,7 @@ import mimetypes from pathlib import Path import httpx +from openai import OpenAI, AsyncOpenAI from benchmark.utils import _start_log_tailer class InferenceEngineClient: @@ -16,15 +17,12 @@ class InferenceEngineClient: """ def __init__(self, base_url="http://127.0.0.1:23333/v1", api_key="none"): - from openai import OpenAI self.base_url = base_url self.client = OpenAI(api_key=api_key, base_url=base_url, timeout=httpx.Timeout(60.0)) self._launcher_proc = None self.model = None self.NAME = "bench360_inference_engine" - - def launch(self, backend: str, model: str, timeout: float = 500.0, dump_server_output: bool = False, script_path: str = None): """ 1) Starts your existing launch_engine.sh in a Popen (non-blocking). @@ -51,10 +49,10 @@ def launch(self, backend: str, model: str, timeout: float = 500.0, dump_server_o ] self._launcher_proc = subprocess.Popen( cmd, - stdout=subprocess.PIPE, # ⟵ was DEVNULL - stderr=subprocess.STDOUT, # merge both streams + stdout=subprocess.PIPE, # ⟵ was DEVNULL + stderr=subprocess.STDOUT, # merge both streams text=True, - bufsize=1 # line-buffered + bufsize=1 # line-buffered ) # Start a daemon thread to read the launcher output @@ -93,10 +91,28 @@ def launch(self, backend: str, model: str, timeout: float = 500.0, dump_server_o ) time.sleep(2.0) + def _format_messages(self, messages, images=None): + """Helper to standardize message formatting for both sync and async calls.""" + if isinstance(messages, str): + if images: + if isinstance(images, str): + images = [images] + content_list = [{"type": "text", "text": messages}] + for img in images: + processed_uri = self._prepare_image_input(img) + content_list.append({ + "type": "image_url", + "image_url": {"url": processed_uri} + }) + return [{"role": "user", "content": content_list}] + else: + return [{"role": "user", "content": messages}] + return messages + def chat_completion( self, messages, - images: str | list[str] | None = None, # Accepts a single path/URL or a list + images: str | list[str] | None = None, model: str | None = None, temperature: float = 0.1, max_tokens: int = 64, @@ -105,45 +121,9 @@ def chat_completion( ): """ Send a request to the chat.completions endpoint. - - Args: - messages (str | list[dict]): A list of OpenAI message dicts, or a simple string prompt. - images (str | list[str] | None, optional): Local file path(s), HTTP URL(s), or base64 data URI(s). Defaults to None. - model (str | None, optional): The model to use. If None, falls back to the instance's default model. Defaults to None. - temperature (float, optional): Sampling temperature. Defaults to 0.1. - max_tokens (int, optional): The maximum number of tokens to generate. Defaults to 64. - top_p (float, optional): Nucleus sampling parameter. Defaults to 0.9. - stream (bool, optional): Whether to stream the response. Defaults to False. - - Returns: - str | Any: The text content of the generated message if stream is False. Otherwise, returns the stream object. """ model_to_use = model or self.model - - if isinstance(messages, str): - if images: - # 1) Standardize input into a list - if isinstance(images, str): - images = [images] - - # 2) Build the multimodal content list - content_list = [{"type": "text", "text": messages}] - - # 3) Process and append each image - for img in images: - processed_uri = self._prepare_image_input(img) - content_list.append({ - "type": "image_url", - "image_url": {"url": processed_uri} - }) - - formatted_messages = [{"role": "user", "content": content_list}] - else: - # Text-only simple prompt - formatted_messages = [{"role": "user", "content": messages}] - else: - # Assume it's already a well-formatted OpenAI message list - formatted_messages = messages + formatted_messages = self._format_messages(messages, images) resp = self.client.chat.completions.create( model=model_to_use, @@ -159,6 +139,39 @@ def chat_completion( return resp.choices[0].message.content + async def async_chat_completion( + self, + messages, + images: str | list[str] | None = None, + model: str | None = None, + temperature: float = 0.1, + max_tokens: int = 64, + top_p: float = 0.9, + ): + """ + Send an asynchronous request to the chat.completions endpoint using AsyncOpenAI. + """ + model_to_use = self.model + formatted_messages = self._format_messages(messages, images) + + # Initialize the async client locally to bind to the current event loop safely + async_client = AsyncOpenAI( + api_key=self.client.api_key, + base_url=self.base_url, + timeout=httpx.Timeout(60.0) + ) + + resp = await async_client.chat.completions.create( + model=model_to_use, + messages=formatted_messages, + temperature=temperature, + max_tokens=max_tokens, + top_p=top_p, + ) + + await async_client.close() + return resp.choices[0].message.content + def completion( self, prompt, @@ -305,7 +318,6 @@ def _prepare_image_input(self, image_input: str) -> str: return image_input - if __name__ == "__main__": # --- Generate Local Dummy Images for Offline Testing --- diff --git a/benchmark/model_benchmark.py b/benchmark/model_benchmark.py index 66844d8..404e784 100755 --- a/benchmark/model_benchmark.py +++ b/benchmark/model_benchmark.py @@ -9,6 +9,7 @@ import random import multiprocessing as mp import concurrent.futures +import asyncio import psutil import subprocess import math @@ -41,18 +42,19 @@ LOOKUP_DIR = PROJECT_ROOT / "benchmark" / "lookup" RESULTS_DIR = PROJECT_ROOT / "results_benchmark" + class ModelBenchmark: def __init__( - self, - backend="tgi", - model_name="", - model_path=None, - temperature: float = 0.6, - top_p: float = 0.95, - max_tokens: int = 128, - verbose=False, - dump_server_output: bool = False, - script_path: str = None + self, + backend="tgi", + model_name="", + model_path=None, + temperature: float = 0.6, + top_p: float = 0.95, + max_tokens: int = 128, + verbose=False, + dump_server_output: bool = False, + script_path: str = None ): self.backend = backend self.model_path = model_path @@ -162,7 +164,7 @@ def get_model_size(self, repo_id: str, revision: str | None = None) -> float: with open(index_path, "r", encoding="utf-8") as f: total = json.load(f).get("metadata", {}).get("total_size") if isinstance(total, int) and total > 0: - return total / (1024**2) + return total / (1024 ** 2) except Exception as e: if self.verbose: print(f"[WARN] Could not retrieve index.json for {repo_id}: {e}") @@ -175,41 +177,42 @@ def get_model_size(self, repo_id: str, revision: str | None = None) -> float: if Path(file_info).suffix in weight_exts: hf_obj = self.api.head(repo_id, file_info, revision=revision, repo_type="model") size_sum += hf_obj.size - return round(size_sum / (1024**2), 2) + return round(size_sum / (1024 ** 2), 2) except Exception as e: if self.verbose: print(f"[ERROR] Could not estimate model size for {repo_id}: {e}") return 0.0 + async def _async_generate_batch(self, prompts): + """Fire concurrent async requests for a batch of prompts.""" + tasks = [] + for p in prompts: + # Extract messages and images if the prompt is a dictionary (multimodal) + if isinstance(p, dict): + msg = p.get("messages", p) + imgs = p.get("images", None) + else: + msg = p + imgs = None + + tasks.append( + self.iec.async_chat_completion( + messages=msg, + images=imgs, + temperature=self.temperature, + top_p=self.top_p, + max_tokens=self.max_tokens + ) + ) + # Run all requests in the current batch concurrently + return await asyncio.gather(*tasks) + def generate(self, prompts): api = getattr(self, "api_type", "completion") if api == "chat_completion": - # Helper function for the thread pool to execute - def _call_chat(p): - if isinstance(p, dict): - return self.iec.chat_completion( - messages=p.get("messages", ""), - images=p.get("images", None), - temperature=self.temperature, - top_p=self.top_p, - max_tokens=self.max_tokens - ) - else: - return self.iec.chat_completion( - messages=p, - temperature=self.temperature, - top_p=self.top_p, - max_tokens=self.max_tokens - ) - - # Fire all requests in the batch concurrently - results = [] - with concurrent.futures.ThreadPoolExecutor(max_workers=len(prompts)) as executor: - # executor.map keeps the results in the exact same order as the input prompts - results = list(executor.map(_call_chat, prompts)) - return results - + # Execute the async event loop for this batch + return asyncio.run(self._async_generate_batch(prompts)) else: # Standard completion API handles list of strings natively return self.iec.completion( @@ -228,13 +231,12 @@ def _generate_and_time(self, prompt): out = self.generate([prompt])[0] return out, t0 - @staticmethod def estimate_local_query_cost( - inf_time_sec: float, - power_watts: float = 0.0, - electricity_usd_per_kwh: float = 0.31, - csv_path: Path | str = LOOKUP_DIR / "nvidia_llm_gpus.csv" + inf_time_sec: float, + power_watts: float = 0.0, + electricity_usd_per_kwh: float = 0.31, + csv_path: Path | str = LOOKUP_DIR / "nvidia_llm_gpus.csv" ) -> dict: """ Estimate amortization + energy cost for a single LLM query. @@ -265,14 +267,15 @@ def _initialize(self, task: str, scenario: str): if getattr(self, "_already_launched", False): # clone cached base row to avoid mutating the original meta = self._meta_base.copy() - meta["task"] = task - meta["scenario"] = scenario + meta["task"] = task + meta["scenario"] = scenario # no cold-start fields change here return meta # ── First launch: cold start ────────────────────────────── t0 = time.time() - self.iec.launch(backend=self.backend, model=self.model_path, dump_server_output=self.dump_server_output, script_path=self.script_path) + self.iec.launch(backend=self.backend, model=self.model_path, dump_server_output=self.dump_server_output, + script_path=self.script_path) startup = time.time() - t0 # optional warm-up @@ -282,17 +285,17 @@ def _initialize(self, task: str, scenario: str): self.model_size = self.get_model_size(self.model_path) meta = { - "model_name": self.model_name, + "model_name": self.model_name, "model_size_mb": self.model_size, - "temperature": self.temperature, - "top_p": self.top_p, - "max_tokens": self.max_tokens, - "task": task, - "scenario": scenario, - "backend": self.backend, - "startup": round(startup, 4), - "ttft_sec": round(ttft, 4), - "coldstart": round(startup + ttft, 4) + "temperature": self.temperature, + "top_p": self.top_p, + "max_tokens": self.max_tokens, + "task": task, + "scenario": scenario, + "backend": self.backend, + "startup": round(startup, 4), + "ttft_sec": round(ttft, 4), + "coldstart": round(startup + ttft, 4) } # ── cache for later calls ───────────────────────────────── @@ -300,12 +303,11 @@ def _initialize(self, task: str, scenario: str): self._meta_base = pd.DataFrame([meta]) # store DataFrame copy return self._meta_base - def _batch_generator( - self, - prompts: list[str], - refs: list[str], - batch_size: int + self, + prompts: list[str], + refs: list[str], + batch_size: int ): """ Yield (batch_prompts, batch_refs) of size `batch_size`. @@ -317,18 +319,17 @@ def _batch_generator( batch_size = len(prompts) for ps, rs in zip( - chunker(prompts, batch_size), - chunker(refs, batch_size) + chunker(prompts, batch_size), + chunker(refs, batch_size) ): yield ps, rs - def _prepare_prompts_per_user( - self, - task_obj, - run_time: float, - num_users: int, - requests_per_user_per_min: float + self, + task_obj, + run_time: float, + num_users: int, + requests_per_user_per_min: float ): """ Generate per-user arrival times and collect prompts. @@ -345,10 +346,10 @@ def _prepare_prompts_per_user( return prompts, refs, sched_ts, user_ids def _generate_per_user_arrivals( - self, - num_users: int, - rps_per_user: float, - run_time: float + self, + num_users: int, + rps_per_user: float, + run_time: float ): """ Independently sample arrival times per user; merge sorted. @@ -366,15 +367,15 @@ def _generate_per_user_arrivals( return all_events def _user_producer( - self, - uid: int, - rps_user: float, - wall_start: float, - run_time: float, - prompts: list[str], - refs: list[str], - executor: concurrent.futures.Executor, - futures: list, # ← NEW: shared list + self, + uid: int, + rps_user: float, + wall_start: float, + run_time: float, + prompts: list[str], + refs: list[str], + executor: concurrent.futures.Executor, + futures: list, # ← NEW: shared list ): """ Submit a Poisson stream of requests for exactly `run_time` seconds. @@ -382,23 +383,23 @@ def _user_producer( Each submitted Future is appended to the shared `futures` list so the caller can later harvest results with concurrent.futures.as_completed(). """ - rng = numpy_rng # global RNG (seeded once) - deadline = wall_start + run_time - idx = 0 # round-robin over prompt pool + rng = numpy_rng # global RNG (seeded once) + deadline = wall_start + run_time + idx = 0 # round-robin over prompt pool while True: - gap = rng.exponential(1.0 / rps_user) # inter-arrival - target = time.time() + gap + gap = rng.exponential(1.0 / rps_user) # inter-arrival + target = time.time() + gap if target >= deadline: - break # stop after n seconds + break # stop after n seconds time.sleep(target - time.time()) prompt = prompts[idx % len(prompts)] - ref = refs[idx % len(refs)] - idx += 1 + ref = refs[idx % len(refs)] + idx += 1 scheduled_ts = target - wall_start - submit_ts = time.time() + submit_ts = time.time() future = executor.submit( self._run_single_request_capture, @@ -409,9 +410,7 @@ def _user_producer( uid, submit_ts, ) - futures.append(future) # ← track the Future - - + futures.append(future) # ← track the Future # ───────────────────────────────────────────────────────────── # Per-request capture (worker thread) @@ -420,10 +419,10 @@ def _run_single_request_capture( self, prompt: str, reference: str, - start_wall: float, # wall clock of experiment start (main thread) - scheduled_ts: float, # target arrival time relative to start_wall + start_wall: float, # wall clock of experiment start (main thread) + scheduled_ts: float, # target arrival time relative to start_wall user_id: int, - submit_time: float, # NEW – when main thread queued the task + submit_time: float, # NEW – when main thread queued the task ): """ All stamps are **client-side**. @@ -439,47 +438,45 @@ def _run_single_request_capture( """ # ── 1. thread has started ────────────────────────────────────────── - send_time = time.time() - queue_time = send_time - submit_time # Item #4 + send_time = time.time() + queue_time = send_time - submit_time # Item #4 # difference between intended arrival and dispatch to backend - wait_time = send_time - (start_wall + scheduled_ts) + wait_time = send_time - (start_wall + scheduled_ts) # ── 2. generate answer & measure ────────────────────────────────── raw_out, start_time = self._generate_and_time(prompt) - gen_time = time.time() - start_time - e2e_latency = wait_time + gen_time # Item #3 + gen_time = time.time() - start_time + e2e_latency = wait_time + gen_time # Item #3 # ── 3. return record ────────────────────────────────────────────── return { - "user_id": user_id, - "prompt": prompt, - "generated_raw": raw_out, - "reference": reference, - "submit_time": submit_time, - "send_time": send_time, - "start_time": start_time, - "generation_time": gen_time, - "scheduled_ts": scheduled_ts, - "queue_time": round(queue_time, 6), - "wait_time": round(wait_time, 6), - "e2e_latency": round(e2e_latency, 6), + "user_id": user_id, + "prompt": prompt, + "generated_raw": raw_out, + "reference": reference, + "submit_time": submit_time, + "send_time": send_time, + "start_time": start_time, + "generation_time": gen_time, + "scheduled_ts": scheduled_ts, + "queue_time": round(queue_time, 6), + "wait_time": round(wait_time, 6), + "e2e_latency": round(e2e_latency, 6), } - - def _run_scenario( - self, - task: str, - scenario: str = "server", - run_time: float = 600.0, - concurrent_users: int = 32, - requests_per_user_per_min: float = 60.0, - batch_size: Optional[int] = None, - samples: Optional[int] = None, - sample_interval: float = 0.1, - quality_metric: bool = True, - api_type: Optional[str] = None, + self, + task: str, + scenario: str = "server", + run_time: float = 600.0, + concurrent_users: int = 32, + requests_per_user_per_min: float = 60.0, + batch_size: Optional[int] = None, + samples: Optional[int] = None, + sample_interval: float = 0.1, + quality_metric: bool = True, + api_type: Optional[str] = None, ): # 1) instantiate Task if task == "summarization": @@ -522,7 +519,7 @@ def _run_scenario( elif scenario == "batch": meta_df = meta_df.assign( batch_size=batch_size or 0, - num_queries= samples or 0 + num_queries=samples or 0 ) elif scenario == "long_context": meta_df = meta_df.assign( @@ -535,7 +532,7 @@ def _run_scenario( # 4) prepare prompts if scenario == "server": - est_requests = math.ceil( # rough upper bound: + est_requests = math.ceil( # rough upper bound: run_time * concurrent_users * requests_per_user_per_min / 60 ) prompts, refs = task_.generate_prompts(num_examples=est_requests) @@ -545,14 +542,13 @@ def _run_scenario( elif scenario == "long_context": prompts, refs, lengths, crs = task_.generate_prompts(num_samples_per_level=samples or 0) - # ───────────────────────────────────────────────────────────── # 5) start metrics monitor (needs wall_start for hard_end) # ───────────────────────────────────────────────────────────── global_readings = {"memory": [], "power": [], "util": [], "cpu": [], "ram": []} - stop_evt = threading.Event() + stop_evt = threading.Event() - wall_start = time.time() # must precede the monitor + wall_start = time.time() # must precede the monitor # hard_end = wall_start + run_time # 300 s etc. mon = threading.Thread( @@ -568,19 +564,19 @@ def _run_scenario( # ───────────────────────────────────────────────────────────── # 6) executor + k user-producer threads # ───────────────────────────────────────────────────────────── - futures : list[concurrent.futures.Future] = [] - rps_user = requests_per_user_per_min / 60.0 - executor = concurrent.futures.ThreadPoolExecutor(max_workers=concurrent_users) + futures: list[concurrent.futures.Future] = [] + rps_user = requests_per_user_per_min / 60.0 + executor = concurrent.futures.ThreadPoolExecutor(max_workers=concurrent_users) producers = [] for uid in range(concurrent_users): th = threading.Thread( - target=self._user_producer, # <- unchanged helper + target=self._user_producer, # <- unchanged helper args=( uid, rps_user, wall_start, run_time, prompts, refs, - executor, # pass the pool - futures, # pass *shared* list to collect futures + executor, # pass the pool + futures, # pass *shared* list to collect futures ), daemon=True, ) @@ -592,20 +588,20 @@ def _run_scenario( # ───────────────────────────────────────────────────────────── time.sleep(run_time) for th in producers: - th.join() # guarantees no new tasks after the deadline + th.join() # guarantees no new tasks after the deadline # ───────────────────────────────────────────────────────────── # 8) drain the executor (let all queued futures finish) # ───────────────────────────────────────────────────────────── - executor.shutdown(wait=True) # waits until every submitted task is done + executor.shutdown(wait=True) # waits until every submitted task is done # collect finished records - + for fut in concurrent.futures.as_completed(futures): try: intermediate_records.append(fut.result()) except Exception: - pass # ignore cancelled / failed requests + pass # ignore cancelled / failed requests # ───────────────────────────────────────────────────────────── # 9) stop monitor exactly at hard_end @@ -613,7 +609,6 @@ def _run_scenario( stop_evt.set() mon.join() - elif scenario == "batch": for ps, rs in self._batch_generator(prompts, refs, batch_size): t0 = time.time() @@ -626,7 +621,7 @@ def _run_scenario( "reference": ref, "generation_time": gen_time / len(ps) if ps else 0, }) - + elif scenario == "long_context": for prompt, ref, length, crs in zip(prompts, refs, lengths, crs): t0 = time.time() @@ -640,8 +635,8 @@ def _run_scenario( gen_time = time.time() - t0 intermediate_records.append({ - "context_range": crs, # e.g. "3k" or "4k" - "length": length, # original context length in tokens + "context_range": crs, # e.g. "3k" or "4k" + "length": length, # original context length in tokens "prompt": prompt, "generated_raw": raw_out, "reference": ref, @@ -649,7 +644,6 @@ def _run_scenario( "successful": success }) - else: # single or other non-server, non-batch scenario for prompt, ref in zip(prompts, refs): t0 = time.time() @@ -747,8 +741,8 @@ def _run_scenario( for rec in intermediate_records: generated = clean_prediction([rec["generated_raw"]])[0] - nt = tok_cnt(generated) # number of tokens generated - ns = sent_cnt(generated, mode = scenario) # number of sentences generated + nt = tok_cnt(generated) # number of tokens generated + ns = sent_cnt(generated, mode=scenario) # number of sentences generated gen_time = rec["generation_time"] # 1) Average Token Latency (seconds per token) @@ -777,40 +771,41 @@ def _run_scenario( # If you’re in “server” mode, also include scheduling info: if scenario == "server": final_rec = { - "user_id": rec["user_id"], + "user_id": rec["user_id"], "scheduled_ts": rec["scheduled_ts"], - "submit_time": rec["submit_time"], - "send_time": rec["send_time"], - "start_time": rec["start_time"], - "queue_time": rec["queue_time"], - "wait_time": rec["wait_time"], - "e2e_latency": rec["e2e_latency"], + "submit_time": rec["submit_time"], + "send_time": rec["send_time"], + "start_time": rec["start_time"], + "queue_time": rec["queue_time"], + "wait_time": rec["wait_time"], + "e2e_latency": rec["e2e_latency"], } elif scenario == "long_context": final_rec = { - "context_range": rec["context_range"], # e.g. "3k" or "4k" - "length": rec["length"] + 160 + 10, # original context length in tokens + 153 for the prompt + 10 for the question - "successful": rec['successful'], # whether generation was successful + "context_range": rec["context_range"], # e.g. "3k" or "4k" + "length": rec["length"] + 160 + 10, + # original context length in tokens + 153 for the prompt + 10 for the question + "successful": rec['successful'], # whether generation was successful } else: final_rec = { } add_final_rec = { - - "prompt": rec["prompt"], - "generated_answer": generated, - "reference_answer": rec["reference"], - "generation_time": gen_time, - "tokens_generated": nt, + + "prompt": rec["prompt"], + "generated_answer": generated, + "reference_answer": rec["reference"], + "generation_time": gen_time, + "tokens_generated": nt, "sentences_generated": ns, - "ATL": round(ATL, 6), - "GL" : round(GL, 6), - "TPS": round(TPS, 2), - "SPS": round(SPS, 2), - "energy_per_token": round(energy_per_token, 6), # in J/token - "energy_per_sentence": round(energy_per_sentence, 6), # in J/sentence + "ATL": round(ATL, 6), + "GL": round(GL, 6), + "TPS": round(TPS, 2), + "SPS": round(SPS, 2), + "energy_per_token": round(energy_per_token, 6), # in J/token + "energy_per_sentence": round(energy_per_sentence, 6), # in J/sentence **quality } @@ -832,13 +827,12 @@ def _run_scenario( # 14) Return the run report and details DataFrame return run_report, details_df, readings_df - def run( self, *, scenario: str = "server", samples = None, - task: Optional[str] = None, + task: Optional[str] = None, batch_size: Optional[int] = None, run_time: Optional[float] = None, concurrent_users: int = 32, @@ -855,7 +849,8 @@ def run( # SERVER scenario: require run_time, disallow samples or batch_size if scenario == "server": if run_time is None or concurrent_users is None or requests_per_user_per_min is None: - raise ValueError("For 'server' scenario, run_time , concurrent_users, and requests_per_user_per_min must be specified.") + raise ValueError( + "For 'server' scenario, run_time , concurrent_users, and requests_per_user_per_min must be specified.") if samples is not None or batch_size is not None: raise ValueError( "For 'server' scenario, do not set 'samples' or 'batch_size'; " @@ -873,7 +868,7 @@ def run( elif scenario == "single": batch_size = 1 run_time = None # not used in single mode - + else: # LONG_CONTEXT scenario: samples is the number of queries per context if samples is None: @@ -891,4 +886,4 @@ def run( requests_per_user_per_min=requests_per_user_per_min, sample_interval=sample_interval, quality_metric=quality_metric - ) + ) \ No newline at end of file