Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
219 changes: 184 additions & 35 deletions benchmark/inference_engine_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,32 +2,33 @@
import subprocess
import time
import requests
import base64
import mimetypes
from pathlib import Path
import httpx
from openai import OpenAI, AsyncOpenAI
from benchmark.utils import _start_log_tailer

class InferenceEngineClient:
"""
Wrapper for an OpenAI‐compatible server.
Wrapper for an OpenAI‐compatible server.
launch() will call your existing launch_engine.sh script (which runs Docker in the foreground),
then poll /v1/completions every second until it returns 200.
"""

def __init__(self, base_url="http://localhost:23333/v1", api_key="none"):
from openai import OpenAI
def __init__(self, base_url="http://127.0.0.1:23333/v1", api_key="none"):
self.base_url = base_url
self.client = OpenAI(api_key=api_key, base_url=base_url, timeout=httpx.Timeout(60.0))
self._launcher_proc = None
self.model = None
self.NAME = "bench360_inference_engine"



def launch(self, backend: str, model: str, timeout: float = 500.0, dump_server_output: bool = False, script_path: str = None):
"""
1) Starts your existing launch_engine.sh in a Popen (non-blocking).
2) Polls `http://127.0.0.1:23333/v1/models` every 2 seconds
until the desired model shows up (or timeout).

:param backend: One of {tgi, vllm, mii, sglang, lmdeploy}
:param model: HF model ID or local path
:param timeout: Max seconds to wait for the model to appear before raising.
Expand All @@ -48,17 +49,17 @@ def launch(self, backend: str, model: str, timeout: float = 500.0, dump_server_o
]
self._launcher_proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE, # ⟵ was DEVNULL
stderr=subprocess.STDOUT, # merge both streams
stdout=subprocess.PIPE, # ⟵ was DEVNULL
stderr=subprocess.STDOUT, # merge both streams
text=True,
bufsize=1 # line-buffered
bufsize=1 # line-buffered
)

# Start a daemon thread to read the launcher output
_start_log_tailer(self, max_lines=10, dump_server_output=dump_server_output)

# 2) Poll /v1/models until our model_id appears or timeout.
list_url = "http://127.0.0.1:23333/v1/models"
list_url = self.base_url + "/models"
start_time = time.time()

while True:
Expand Down Expand Up @@ -90,7 +91,86 @@ def launch(self, backend: str, model: str, timeout: float = 500.0, dump_server_o
)
time.sleep(2.0)

def _format_messages(self, messages, images=None):
"""Helper to standardize message formatting for both sync and async calls."""
if isinstance(messages, str):
if images:
if isinstance(images, str):
images = [images]
content_list = [{"type": "text", "text": messages}]
for img in images:
processed_uri = self._prepare_image_input(img)
content_list.append({
"type": "image_url",
"image_url": {"url": processed_uri}
})
return [{"role": "user", "content": content_list}]
else:
return [{"role": "user", "content": messages}]
return messages

def chat_completion(
self,
messages,
images: str | list[str] | None = None,
model: str | None = None,
temperature: float = 0.1,
max_tokens: int = 64,
top_p: float = 0.9,
stream: bool = False,
):
"""
Send a request to the chat.completions endpoint.
"""
model_to_use = model or self.model
formatted_messages = self._format_messages(messages, images)

resp = self.client.chat.completions.create(
model=model_to_use,
messages=formatted_messages,
temperature=temperature,
max_tokens=max_tokens,
top_p=top_p,
stream=stream,
)

if stream:
return resp

return resp.choices[0].message.content

async def async_chat_completion(
self,
messages,
images: str | list[str] | None = None,
model: str | None = None,
temperature: float = 0.1,
max_tokens: int = 64,
top_p: float = 0.9,
):
"""
Send an asynchronous request to the chat.completions endpoint using AsyncOpenAI.
"""
model_to_use = self.model
formatted_messages = self._format_messages(messages, images)

# Initialize the async client locally to bind to the current event loop safely
async_client = AsyncOpenAI(
api_key=self.client.api_key,
base_url=self.base_url,
timeout=httpx.Timeout(60.0)
)

resp = await async_client.chat.completions.create(
model=model_to_use,
messages=formatted_messages,
temperature=temperature,
max_tokens=max_tokens,
top_p=top_p,
)

await async_client.close()
return resp.choices[0].message.content

def completion(
self,
Expand All @@ -104,7 +184,6 @@ def completion(
"""
Send one or more prompts. :param prompt: string or list[str]
"""
model = model
is_batch = isinstance(prompt, (list, tuple))

resp = self.client.completions.create(
Expand All @@ -124,19 +203,16 @@ def completion(

def warmup(self, num_iters: int = 3):
"""
Send a few small dummy requests to load the model into memory and
JIT any kernels so that subsequent inference calls are faster.
:param model: HF model ID or local path; defaults to self.default_model
:param num_iters: Number of warmup calls to make
Send a few small dummy requests using the chat endpoint to load the model into memory
and JIT any kernels so that subsequent inference calls are faster.
"""

dummy_prompt = "Warmup"
messages = [{"role": "user", "content": "Warmup request."}]

for _ in range(num_iters):
try:
_ = self.client.completions.create(
_ = self.client.chat.completions.create(
model=self.model,
prompt=dummy_prompt,
messages=messages,
max_tokens=1,
temperature=0.1,
)
Expand All @@ -147,32 +223,40 @@ def warmup(self, num_iters: int = 3):

def measure_ttft(self) -> float:
"""
Issue a 1‐token streaming request and measure the time until the first chunk arrives.
Issue a streaming chat request and measure the time until the first chunk arrives.
"""
import time
prompt = (
"Artificial intelligence is a rapidly evolving field with applications in "
"healthcare, finance, education, and more. One of the most transformative "
"technologies is"
)
messages = [{
"role": "user",
"content": (
"Artificial intelligence is a rapidly evolving field with applications in "
"healthcare, finance, education, and more. One of the most transformative "
"technologies is"
)
}]

start = time.time()
stream_resp = self.client.completions.create(
stream_resp = self.client.chat.completions.create(
model=self.model,
prompt=prompt,
messages=messages,
max_tokens=1,
temperature=0.1,
stream=True,
)

first_token_time = None
for chunk in stream_resp:
delta = getattr(chunk.choices[0], "delta", None)
if delta and delta.get("text", "").strip() != "":
# In chat.completions, the text chunk is stored in delta.content
delta = chunk.choices[0].delta
content = getattr(delta, "content", None)

# If content exists and is not an empty string, we caught the first token
if content:
first_token_time = time.time()
break

if first_token_time is None:
# Fallback if the stream ends without yielding valid text
first_token_time = time.time()

return first_token_time - start
Expand Down Expand Up @@ -213,13 +297,78 @@ def close(self):
except Exception:
pass

def _prepare_image_input(self, image_input: str) -> str:
"""
Helper method to detect if the input is a local file path.
If it is, it reads the file and returns a Base64 data URI.
Otherwise, it assumes it's already a URL or data URI and returns it as-is.
"""
# Check if the string is a valid path to an existing local file
if os.path.isfile(image_input):
# Guess the mime type (e.g., image/jpeg, image/png) based on extension
mime_type, _ = mimetypes.guess_type(image_input)
mime_type = mime_type or "image/jpeg" # Fallback

with open(image_input, "rb") as f:
encoded_string = base64.b64encode(f.read()).decode("utf-8")

return f"data:{mime_type};base64,{encoded_string}"

# If it's not a local file, assume it's a web URL or pre-encoded Base64 string
return image_input


if __name__ == "__main__":
### ttft

# --- Generate Local Dummy Images for Offline Testing ---
# 1x1 Red Pixel
img1_path = "test_red.png"
img1_b64 = "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR42mP8z8BQDwAEhQGAhKmMIQAAAABJRU5ErkJggg=="
if not os.path.exists(img1_path):
with open(img1_path, "wb") as f:
f.write(base64.b64decode(img1_b64))

# 1x1 Blue Pixel
img2_path = "test_blue.png"
img2_b64 = "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR42mNkYPhfDwAChwGA60e6kgAAAABJRU5ErkJggg=="
if not os.path.exists(img2_path):
with open(img2_path, "wb") as f:
f.write(base64.b64decode(img2_b64))

# Instantiate the client directly
client = InferenceEngineClient()
client.launch(backend="tgi", model="Qwen/Qwen2.5-7B-Instruct")
client.warmup()
ttft = client.measure_ttft()
print(f"TTFT: {ttft:.3f} seconds")
client.close()

try:
# Launching with the actual multimodal model
client.launch(backend="sglang", model="Qwen/Qwen3-VL-2B-Instruct")
client.warmup()

# 1. Benchmark TTFT
ttft = client.measure_ttft()
print(f"TTFT: {ttft:.3f} seconds")

# 2. Test Case: Single Image (Local Path -> Base64)
print("\n--- Testing Single Image (Fully Offline) ---")
single_resp = client.chat_completion(
messages="What color is the main subject in this image?",
images=img1_path # The helper will auto-detect and base64-encode this local file
)
print("Response:\n", single_resp)

# 3. Test Case: Multiple Images (Local Paths -> Base64)
print("\n--- Testing Multiple Images (Fully Offline) ---")
multi_resp = client.chat_completion(
messages="Are these two images identical? Explain the differences.",
images=[img1_path, img2_path] # Both local files will be base64-encoded automatically
)
print("Response:\n", multi_resp)

finally:
# Ensures Docker container and subprocesses are always shut down cleanly
client.close()

# Cleanup temporary local dummy files
if os.path.exists(img1_path):
os.remove(img1_path)
if os.path.exists(img2_path):
os.remove(img2_path)
Loading