Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 69 additions & 10 deletions lib/crewai/src/crewai/llms/providers/openai/completion.py
Original file line number Diff line number Diff line change
Expand Up @@ -1570,8 +1570,10 @@ def _prepare_completion_params(
params["reasoning_effort"] = self.reasoning_effort

if self.response_format is not None:
if isinstance(self.response_format, type) and issubclass(
self.response_format, BaseModel
if (
isinstance(self.response_format, type)
and issubclass(self.response_format, BaseModel)
and self.supports_native_structured_output()
):
params["response_format"] = generate_model_description(
self.response_format
Expand Down Expand Up @@ -1636,7 +1638,7 @@ def _handle_completion(
) -> str | Any:
"""Handle non-streaming chat completion."""
try:
if response_model:
if response_model and self.supports_native_structured_output():
parse_params = {
k: v for k, v in params.items() if k != "response_format"
}
Expand Down Expand Up @@ -1722,10 +1724,14 @@ def _handle_completion(

content = message.content or ""

if self.response_format and isinstance(self.response_format, type):
# When native structured output was skipped (e.g. a provider that
# rejects json_schema), validate the plain completion against the
# requested model client-side so a parsed object is still returned.
structured_format = response_model or self.response_format
if structured_format is not None and isinstance(structured_format, type):
try:
structured_result = self._validate_structured_output(
content, self.response_format
content, structured_format
)
self._emit_call_completed_event(
response=structured_result,
Expand Down Expand Up @@ -1799,7 +1805,8 @@ def _finalize_streaming_response(
from_agent: Any | None = None,
finish_reason: str | None = None,
response_id: str | None = None,
) -> str | list[dict[str, Any]]:
response_model: type[BaseModel] | None = None,
) -> str | list[dict[str, Any]] | BaseModel:
Comment thread
coderabbitai[bot] marked this conversation as resolved.
"""Finalize a streaming response with usage tracking, tool call handling, and events.

Args:
Expand All @@ -1813,6 +1820,10 @@ def _finalize_streaming_response(
finish_reason: Raw provider finish reason (e.g. "stop", "length",
"tool_calls") extracted from the last streaming chunk.
response_id: Raw provider response id from any chunk.
response_model: When set and the stream produced text (no tool
calls), validate the accumulated response against this model
and return the parsed object. Used by the fallback path for
providers that don't support native json_schema streaming.

Returns:
Tool calls list when tools were invoked without available_functions,
Expand Down Expand Up @@ -1880,6 +1891,35 @@ def _finalize_streaming_response(

full_response = self._apply_stop_words(full_response)

# Fallback structured-output validation: when native json_schema
# streaming was skipped (e.g. DeepSeek), parse the accumulated text
# into the requested model so the call still returns a parsed object,
# matching the async streaming path and the non-streaming fallback.
# Honor both a per-call response_model and a configured response_format.
structured_format = response_model or self.response_format
if (
structured_format is not None
and isinstance(structured_format, type)
and full_response
):
try:
structured_result = self._validate_structured_output(
full_response, structured_format
)
self._emit_call_completed_event(
response=structured_result,
call_type=LLMCallType.LLM_CALL,
from_task=from_task,
from_agent=from_agent,
messages=params["messages"],
usage=usage_data,
finish_reason=finish_reason,
response_id=response_id,
)
return structured_result
except ValueError as e:
logging.warning(f"Structured output validation failed: {e}")

self._emit_call_completed_event(
response=full_response,
call_type=LLMCallType.LLM_CALL,
Expand All @@ -1905,7 +1945,7 @@ def _handle_streaming_completion(
full_response = ""
tool_calls: dict[int, dict[str, Any]] = {}

if response_model:
if response_model and self.supports_native_structured_output():
Comment thread
coderabbitai[bot] marked this conversation as resolved.
parse_params = {
k: v
for k, v in params.items()
Expand Down Expand Up @@ -2040,6 +2080,7 @@ def _handle_streaming_completion(
from_agent=from_agent,
finish_reason=stream_finish_reason,
response_id=stream_response_id,
response_model=response_model,
)
if isinstance(result, str):
return self._invoke_after_llm_call_hooks(
Expand All @@ -2057,7 +2098,7 @@ async def _ahandle_completion(
) -> str | Any:
"""Handle non-streaming async chat completion."""
try:
if response_model:
if response_model and self.supports_native_structured_output():
parse_params = {
k: v for k, v in params.items() if k != "response_format"
}
Expand Down Expand Up @@ -2149,10 +2190,14 @@ async def _ahandle_completion(

content = message.content or ""

if self.response_format and isinstance(self.response_format, type):
# When native structured output was skipped (e.g. a provider that
# rejects json_schema), validate the plain completion against the
# requested model client-side so a parsed object is still returned.
structured_format = response_model or self.response_format
if structured_format is not None and isinstance(structured_format, type):
try:
structured_result = self._validate_structured_output(
content, self.response_format
content, structured_format
)
self._emit_call_completed_event(
response=structured_result,
Expand Down Expand Up @@ -2381,6 +2426,20 @@ def supports_function_calling(self) -> bool:
"""Check if the model supports function calling."""
return not self.is_o1_model

def supports_native_structured_output(self) -> bool:
"""Whether the endpoint accepts OpenAI's json_schema structured outputs.

OpenAI's ``beta.chat.completions.parse`` / ``.stream`` and a Pydantic
``response_format`` send a ``response_format`` of type ``json_schema``.
Some OpenAI-compatible endpoints reject it (e.g. DeepSeek: "This
response_format type is unavailable now"). Subclasses override this to
fall back to a plain completion plus client-side validation.

Returns:
``True`` for OpenAI; subclasses may return ``False``.
"""
return True

def supports_stop_words(self) -> bool:
"""Check if the model supports stop words."""
model_lower = self.model.lower() if self.model else ""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ class ProviderConfig:
default_headers: HTTP headers to include in all requests.
api_key_required: Whether an API key is required for this provider.
default_api_key: Default API key to use if none is provided and not required.
supports_json_schema: Whether the endpoint accepts OpenAI's
``json_schema`` structured-output ``response_format``. Some
OpenAI-compatible endpoints (e.g. DeepSeek) reject it.
"""

base_url: str
Expand All @@ -40,6 +43,7 @@ class ProviderConfig:
default_headers: dict[str, str] = field(default_factory=dict)
api_key_required: bool = True
default_api_key: str | None = None
supports_json_schema: bool = True


OPENAI_COMPATIBLE_PROVIDERS: dict[str, ProviderConfig] = {
Expand All @@ -55,6 +59,9 @@ class ProviderConfig:
api_key_env="DEEPSEEK_API_KEY",
base_url_env="DEEPSEEK_BASE_URL",
api_key_required=True,
# DeepSeek rejects OpenAI's json_schema response_format with
# "This response_format type is unavailable now" (#5990).
supports_json_schema=False,
),
"ollama": ProviderConfig(
base_url="http://localhost:11434/v1",
Expand Down Expand Up @@ -261,3 +268,16 @@ def supports_function_calling(self) -> bool:
Whether the model supports function calling.
"""
return super().supports_function_calling()

def supports_native_structured_output(self) -> bool:
"""Honor each provider's json_schema support (#5990).

Some OpenAI-compatible endpoints (e.g. DeepSeek) reject OpenAI's
``json_schema`` ``response_format``. For those, structured output
falls back to a plain completion and client-side validation.

Returns:
Whether the configured provider accepts json_schema response_format.
"""
config = OPENAI_COMPATIBLE_PROVIDERS.get(self.provider)
return config.supports_json_schema if config else True
154 changes: 153 additions & 1 deletion lib/crewai/tests/llms/openai_compatible/test_openai_compatible.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
"""Tests for OpenAI-compatible providers."""

import os
from unittest.mock import patch
from unittest.mock import MagicMock, patch

import pytest
from pydantic import BaseModel

from crewai.llm import LLM
from crewai.llms.providers.openai.completion import OpenAICompletion
from crewai.llms.providers.openai_compatible.completion import (
OPENAI_COMPATIBLE_PROVIDERS,
OpenAICompatibleCompletion,
Expand Down Expand Up @@ -36,6 +38,7 @@ def test_provider_config_defaults(self):
assert config.default_headers == {}
assert config.api_key_required is True
assert config.default_api_key is None
assert config.supports_json_schema is True


class TestProviderRegistry:
Expand All @@ -56,6 +59,8 @@ def test_deepseek_config(self):
assert config.base_url == "https://api.deepseek.com/v1"
assert config.api_key_env == "DEEPSEEK_API_KEY"
assert config.api_key_required is True
# DeepSeek rejects OpenAI's json_schema response_format (#5990)
assert config.supports_json_schema is False

def test_ollama_config(self):
"""Test Ollama provider configuration."""
Expand Down Expand Up @@ -307,3 +312,150 @@ def test_acall_method_exists(self):
completion = OpenAICompatibleCompletion(model="llama3", provider="ollama")
assert hasattr(completion, "acall")
assert callable(completion.acall)


class _Answer(BaseModel):
value: int


def _stream_chunk(content: str | None = None, finish: str | None = None) -> MagicMock:
"""Build a minimal OpenAI streaming chunk for the regular (non-native) path."""
chunk = MagicMock()
chunk.id = "id"
chunk.usage = None # not a usage chunk
choice = MagicMock()
delta = MagicMock()
delta.content = content
delta.tool_calls = None
choice.delta = delta
choice.finish_reason = finish
chunk.choices = [choice]
return chunk


class TestStructuredOutputFallback:
"""Structured output must degrade gracefully on OpenAI-compatible
endpoints that reject OpenAI's json_schema response_format (#5990).
"""

def test_deepseek_does_not_support_native_structured_output(self):
with patch.dict(os.environ, {"DEEPSEEK_API_KEY": "test-key"}):
llm = OpenAICompatibleCompletion(model="deepseek-chat", provider="deepseek")
assert llm.supports_native_structured_output() is False

def test_openrouter_supports_native_structured_output(self):
with patch.dict(os.environ, {"OPENROUTER_API_KEY": "test-key"}):
llm = OpenAICompatibleCompletion(model="some-model", provider="openrouter")
assert llm.supports_native_structured_output() is True

def test_openai_supports_native_structured_output(self):
assert OpenAICompletion(model="gpt-4o").supports_native_structured_output()

def test_deepseek_omits_json_schema_response_format(self):
"""A Pydantic response_format must not be sent as json_schema to DeepSeek."""
with patch.dict(os.environ, {"DEEPSEEK_API_KEY": "test-key"}):
llm = OpenAICompatibleCompletion(
model="deepseek-chat", provider="deepseek", response_format=_Answer
)
params = llm._prepare_completion_params([{"role": "user", "content": "hi"}])
assert "response_format" not in params

def test_openai_keeps_json_schema_response_format(self):
"""OpenAI still receives the json_schema response_format (no regression)."""
llm = OpenAICompletion(model="gpt-4o", response_format=_Answer)
params = llm._prepare_completion_params([{"role": "user", "content": "hi"}])
assert params.get("response_format", {}).get("type") == "json_schema"

def test_deepseek_completion_skips_native_parse_and_validates_client_side(self):
"""DeepSeek + response_model must use a plain completion (no json_schema
parse) and still return the validated model via client-side parsing.
"""
with patch.dict(os.environ, {"DEEPSEEK_API_KEY": "test-key"}):
llm = OpenAICompatibleCompletion(model="deepseek-chat", provider="deepseek")

client = MagicMock()
message = MagicMock()
message.content = '{"value": 42}'
message.tool_calls = None
response = MagicMock()
response.choices = [MagicMock(message=message)]
client.chat.completions.create.return_value = response

with (
patch.object(llm, "_get_sync_client", return_value=client),
patch.object(llm, "_extract_openai_token_usage", return_value={}),
patch.object(
llm, "_extract_chat_finish_reason_and_id", return_value=("stop", "id")
),
patch.object(llm, "_emit_call_completed_event"),
):
result = llm._handle_completion(
{"messages": [{"role": "user", "content": "hi"}]},
response_model=_Answer,
)

client.beta.chat.completions.parse.assert_not_called()
client.chat.completions.create.assert_called_once()
assert isinstance(result, _Answer)
assert result.value == 42

def test_deepseek_streaming_skips_native_stream_and_validates_client_side(self):
"""DeepSeek + streaming + response_model must use a plain streaming
completion (no json_schema beta.stream) and still parse the accumulated
text into the requested model, matching the non-streaming fallback.
"""

with patch.dict(os.environ, {"DEEPSEEK_API_KEY": "test-key"}):
llm = OpenAICompatibleCompletion(model="deepseek-chat", provider="deepseek")

client = MagicMock()
client.chat.completions.create.return_value = [
_stream_chunk(content='{"value": '),
_stream_chunk(content="42}", finish="stop"),
]

with (
patch.object(llm, "_get_sync_client", return_value=client),
patch.object(llm, "_emit_stream_chunk_event"),
patch.object(llm, "_emit_call_completed_event"),
):
result = llm._handle_streaming_completion(
{"messages": [{"role": "user", "content": "hi"}]},
response_model=_Answer,
)

# The native json_schema streaming path must be skipped entirely.
client.beta.chat.completions.stream.assert_not_called()
client.chat.completions.create.assert_called_once()
# ...and the accumulated text is still parsed into the model.
assert isinstance(result, _Answer)
assert result.value == 42

def test_deepseek_streaming_validates_configured_response_format(self):
"""stream=True with a configured response_format (not a per-call
response_model) must also be parsed into the model on the fallback
path, matching the non-streaming behavior.
"""
with patch.dict(os.environ, {"DEEPSEEK_API_KEY": "test-key"}):
llm = OpenAICompatibleCompletion(
model="deepseek-chat", provider="deepseek", response_format=_Answer
)

client = MagicMock()
client.chat.completions.create.return_value = [
_stream_chunk(content='{"value": '),
_stream_chunk(content="7}", finish="stop"),
]

with (
patch.object(llm, "_get_sync_client", return_value=client),
patch.object(llm, "_emit_stream_chunk_event"),
patch.object(llm, "_emit_call_completed_event"),
):
result = llm._handle_streaming_completion(
{"messages": [{"role": "user", "content": "hi"}]},
)

client.beta.chat.completions.stream.assert_not_called()
assert isinstance(result, _Answer)
assert result.value == 7