From c99fa80fc3b57f444d27e5b76ebf635615bfb47c Mon Sep 17 00:00:00 2001 From: HumphreySun98 Date: Tue, 16 Jun 2026 11:25:59 -0500 Subject: [PATCH 1/3] fix(llm): fall back from json_schema on OpenAI-compatible providers that reject it DeepSeek and some other OpenAI-compatible endpoints reject OpenAI's json_schema structured-output response_format with "This response_format type is unavailable now". The native OpenAI provider unconditionally sent it via beta.chat.completions.parse/stream and via a Pydantic response_format, breaking any crew using structured output against those endpoints. Add a supports_native_structured_output() capability (True for OpenAI) and a per-provider supports_json_schema flag on the OpenAI-compatible config (DeepSeek = False). When unsupported, skip the native json_schema paths and fall back to a plain completion; the non-streaming path then validates the result against the requested model client-side, and the Task converter reconciles it otherwise. OpenAI behavior is unchanged. Fixes #5990 Co-Authored-By: Claude Opus 4.8 (1M context) --- .../llms/providers/openai/completion.py | 42 +++++++--- .../providers/openai_compatible/completion.py | 20 +++++ .../test_openai_compatible.py | 78 ++++++++++++++++++- 3 files changed, 130 insertions(+), 10 deletions(-) diff --git a/lib/crewai/src/crewai/llms/providers/openai/completion.py b/lib/crewai/src/crewai/llms/providers/openai/completion.py index d8972e1de4..173a263b6a 100644 --- a/lib/crewai/src/crewai/llms/providers/openai/completion.py +++ b/lib/crewai/src/crewai/llms/providers/openai/completion.py @@ -1570,8 +1570,10 @@ def _prepare_completion_params( params["reasoning_effort"] = self.reasoning_effort if self.response_format is not None: - if isinstance(self.response_format, type) and issubclass( - self.response_format, BaseModel + if ( + isinstance(self.response_format, type) + and issubclass(self.response_format, BaseModel) + and self.supports_native_structured_output() ): params["response_format"] = generate_model_description( self.response_format @@ -1636,7 +1638,7 @@ def _handle_completion( ) -> str | Any: """Handle non-streaming chat completion.""" try: - if response_model: + if response_model and self.supports_native_structured_output(): parse_params = { k: v for k, v in params.items() if k != "response_format" } @@ -1722,10 +1724,14 @@ def _handle_completion( content = message.content or "" - if self.response_format and isinstance(self.response_format, type): + # When native structured output was skipped (e.g. a provider that + # rejects json_schema), validate the plain completion against the + # requested model client-side so a parsed object is still returned. + structured_format = response_model or self.response_format + if structured_format is not None and isinstance(structured_format, type): try: structured_result = self._validate_structured_output( - content, self.response_format + content, structured_format ) self._emit_call_completed_event( response=structured_result, @@ -1905,7 +1911,7 @@ def _handle_streaming_completion( full_response = "" tool_calls: dict[int, dict[str, Any]] = {} - if response_model: + if response_model and self.supports_native_structured_output(): parse_params = { k: v for k, v in params.items() @@ -2057,7 +2063,7 @@ async def _ahandle_completion( ) -> str | Any: """Handle non-streaming async chat completion.""" try: - if response_model: + if response_model and self.supports_native_structured_output(): parse_params = { k: v for k, v in params.items() if k != "response_format" } @@ -2149,10 +2155,14 @@ async def _ahandle_completion( content = message.content or "" - if self.response_format and isinstance(self.response_format, type): + # When native structured output was skipped (e.g. a provider that + # rejects json_schema), validate the plain completion against the + # requested model client-side so a parsed object is still returned. + structured_format = response_model or self.response_format + if structured_format is not None and isinstance(structured_format, type): try: structured_result = self._validate_structured_output( - content, self.response_format + content, structured_format ) self._emit_call_completed_event( response=structured_result, @@ -2381,6 +2391,20 @@ def supports_function_calling(self) -> bool: """Check if the model supports function calling.""" return not self.is_o1_model + def supports_native_structured_output(self) -> bool: + """Whether the endpoint accepts OpenAI's json_schema structured outputs. + + OpenAI's ``beta.chat.completions.parse`` / ``.stream`` and a Pydantic + ``response_format`` send a ``response_format`` of type ``json_schema``. + Some OpenAI-compatible endpoints reject it (e.g. DeepSeek: "This + response_format type is unavailable now"). Subclasses override this to + fall back to a plain completion plus client-side validation. + + Returns: + ``True`` for OpenAI; subclasses may return ``False``. + """ + return True + def supports_stop_words(self) -> bool: """Check if the model supports stop words.""" model_lower = self.model.lower() if self.model else "" diff --git a/lib/crewai/src/crewai/llms/providers/openai_compatible/completion.py b/lib/crewai/src/crewai/llms/providers/openai_compatible/completion.py index da4cfd03db..a9ca7700f1 100644 --- a/lib/crewai/src/crewai/llms/providers/openai_compatible/completion.py +++ b/lib/crewai/src/crewai/llms/providers/openai_compatible/completion.py @@ -32,6 +32,9 @@ class ProviderConfig: default_headers: HTTP headers to include in all requests. api_key_required: Whether an API key is required for this provider. default_api_key: Default API key to use if none is provided and not required. + supports_json_schema: Whether the endpoint accepts OpenAI's + ``json_schema`` structured-output ``response_format``. Some + OpenAI-compatible endpoints (e.g. DeepSeek) reject it. """ base_url: str @@ -40,6 +43,7 @@ class ProviderConfig: default_headers: dict[str, str] = field(default_factory=dict) api_key_required: bool = True default_api_key: str | None = None + supports_json_schema: bool = True OPENAI_COMPATIBLE_PROVIDERS: dict[str, ProviderConfig] = { @@ -55,6 +59,9 @@ class ProviderConfig: api_key_env="DEEPSEEK_API_KEY", base_url_env="DEEPSEEK_BASE_URL", api_key_required=True, + # DeepSeek rejects OpenAI's json_schema response_format with + # "This response_format type is unavailable now" (#5990). + supports_json_schema=False, ), "ollama": ProviderConfig( base_url="http://localhost:11434/v1", @@ -261,3 +268,16 @@ def supports_function_calling(self) -> bool: Whether the model supports function calling. """ return super().supports_function_calling() + + def supports_native_structured_output(self) -> bool: + """Honor each provider's json_schema support (#5990). + + Some OpenAI-compatible endpoints (e.g. DeepSeek) reject OpenAI's + ``json_schema`` ``response_format``. For those, structured output + falls back to a plain completion and client-side validation. + + Returns: + Whether the configured provider accepts json_schema response_format. + """ + config = OPENAI_COMPATIBLE_PROVIDERS.get(self.provider) + return config.supports_json_schema if config else True diff --git a/lib/crewai/tests/llms/openai_compatible/test_openai_compatible.py b/lib/crewai/tests/llms/openai_compatible/test_openai_compatible.py index ce856a5334..c4ec6a894f 100644 --- a/lib/crewai/tests/llms/openai_compatible/test_openai_compatible.py +++ b/lib/crewai/tests/llms/openai_compatible/test_openai_compatible.py @@ -1,11 +1,13 @@ """Tests for OpenAI-compatible providers.""" import os -from unittest.mock import patch +from unittest.mock import MagicMock, patch import pytest +from pydantic import BaseModel from crewai.llm import LLM +from crewai.llms.providers.openai.completion import OpenAICompletion from crewai.llms.providers.openai_compatible.completion import ( OPENAI_COMPATIBLE_PROVIDERS, OpenAICompatibleCompletion, @@ -36,6 +38,7 @@ def test_provider_config_defaults(self): assert config.default_headers == {} assert config.api_key_required is True assert config.default_api_key is None + assert config.supports_json_schema is True class TestProviderRegistry: @@ -56,6 +59,8 @@ def test_deepseek_config(self): assert config.base_url == "https://api.deepseek.com/v1" assert config.api_key_env == "DEEPSEEK_API_KEY" assert config.api_key_required is True + # DeepSeek rejects OpenAI's json_schema response_format (#5990) + assert config.supports_json_schema is False def test_ollama_config(self): """Test Ollama provider configuration.""" @@ -307,3 +312,74 @@ def test_acall_method_exists(self): completion = OpenAICompatibleCompletion(model="llama3", provider="ollama") assert hasattr(completion, "acall") assert callable(completion.acall) + + +class _Answer(BaseModel): + value: int + + +class TestStructuredOutputFallback: + """Structured output must degrade gracefully on OpenAI-compatible + endpoints that reject OpenAI's json_schema response_format (#5990). + """ + + def test_deepseek_does_not_support_native_structured_output(self): + with patch.dict(os.environ, {"DEEPSEEK_API_KEY": "test-key"}): + llm = OpenAICompatibleCompletion(model="deepseek-chat", provider="deepseek") + assert llm.supports_native_structured_output() is False + + def test_openrouter_supports_native_structured_output(self): + with patch.dict(os.environ, {"OPENROUTER_API_KEY": "test-key"}): + llm = OpenAICompatibleCompletion(model="some-model", provider="openrouter") + assert llm.supports_native_structured_output() is True + + def test_openai_supports_native_structured_output(self): + assert OpenAICompletion(model="gpt-4o").supports_native_structured_output() + + def test_deepseek_omits_json_schema_response_format(self): + """A Pydantic response_format must not be sent as json_schema to DeepSeek.""" + with patch.dict(os.environ, {"DEEPSEEK_API_KEY": "test-key"}): + llm = OpenAICompatibleCompletion( + model="deepseek-chat", provider="deepseek", response_format=_Answer + ) + params = llm._prepare_completion_params([{"role": "user", "content": "hi"}]) + assert "response_format" not in params + + def test_openai_keeps_json_schema_response_format(self): + """OpenAI still receives the json_schema response_format (no regression).""" + llm = OpenAICompletion(model="gpt-4o", response_format=_Answer) + params = llm._prepare_completion_params([{"role": "user", "content": "hi"}]) + assert params.get("response_format", {}).get("type") == "json_schema" + + def test_deepseek_completion_skips_native_parse_and_validates_client_side(self): + """DeepSeek + response_model must use a plain completion (no json_schema + parse) and still return the validated model via client-side parsing. + """ + with patch.dict(os.environ, {"DEEPSEEK_API_KEY": "test-key"}): + llm = OpenAICompatibleCompletion(model="deepseek-chat", provider="deepseek") + + client = MagicMock() + message = MagicMock() + message.content = '{"value": 42}' + message.tool_calls = None + response = MagicMock() + response.choices = [MagicMock(message=message)] + client.chat.completions.create.return_value = response + + with ( + patch.object(llm, "_get_sync_client", return_value=client), + patch.object(llm, "_extract_openai_token_usage", return_value={}), + patch.object( + llm, "_extract_chat_finish_reason_and_id", return_value=("stop", "id") + ), + patch.object(llm, "_emit_call_completed_event"), + ): + result = llm._handle_completion( + {"messages": [{"role": "user", "content": "hi"}]}, + response_model=_Answer, + ) + + client.beta.chat.completions.parse.assert_not_called() + client.chat.completions.create.assert_called_once() + assert isinstance(result, _Answer) + assert result.value == 42 From 164057175ead5173d90f19fe6a2e88308ceaa931 Mon Sep 17 00:00:00 2001 From: HumphreySun98 Date: Wed, 24 Jun 2026 12:04:29 -0500 Subject: [PATCH 2/3] fix(llm): validate response_model in sync streaming fallback When native json_schema streaming is skipped for a provider that rejects it (e.g. DeepSeek), the sync streaming path fell through to a plain stream and returned the raw text, while the async streaming and non-streaming paths returned the parsed model. Validate the accumulated text against response_model in _finalize_streaming_response so all paths behave consistently. Adds a sync-streaming fallback regression test. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../llms/providers/openai/completion.py | 31 ++++++++++++- .../test_openai_compatible.py | 45 +++++++++++++++++++ 2 files changed, 75 insertions(+), 1 deletion(-) diff --git a/lib/crewai/src/crewai/llms/providers/openai/completion.py b/lib/crewai/src/crewai/llms/providers/openai/completion.py index 173a263b6a..3087629509 100644 --- a/lib/crewai/src/crewai/llms/providers/openai/completion.py +++ b/lib/crewai/src/crewai/llms/providers/openai/completion.py @@ -1805,7 +1805,8 @@ def _finalize_streaming_response( from_agent: Any | None = None, finish_reason: str | None = None, response_id: str | None = None, - ) -> str | list[dict[str, Any]]: + response_model: type[BaseModel] | None = None, + ) -> str | list[dict[str, Any]] | BaseModel: """Finalize a streaming response with usage tracking, tool call handling, and events. Args: @@ -1819,6 +1820,10 @@ def _finalize_streaming_response( finish_reason: Raw provider finish reason (e.g. "stop", "length", "tool_calls") extracted from the last streaming chunk. response_id: Raw provider response id from any chunk. + response_model: When set and the stream produced text (no tool + calls), validate the accumulated response against this model + and return the parsed object. Used by the fallback path for + providers that don't support native json_schema streaming. Returns: Tool calls list when tools were invoked without available_functions, @@ -1886,6 +1891,29 @@ def _finalize_streaming_response( full_response = self._apply_stop_words(full_response) + # Fallback structured-output validation: when native json_schema + # streaming was skipped (e.g. DeepSeek), parse the accumulated text + # into the requested model so the call still returns a parsed object, + # matching the async streaming path and the non-streaming fallback. + if response_model and full_response: + try: + structured_result = self._validate_structured_output( + full_response, response_model + ) + self._emit_call_completed_event( + response=structured_result, + call_type=LLMCallType.LLM_CALL, + from_task=from_task, + from_agent=from_agent, + messages=params["messages"], + usage=usage_data, + finish_reason=finish_reason, + response_id=response_id, + ) + return structured_result + except ValueError as e: + logging.warning(f"Structured output validation failed: {e}") + self._emit_call_completed_event( response=full_response, call_type=LLMCallType.LLM_CALL, @@ -2046,6 +2074,7 @@ def _handle_streaming_completion( from_agent=from_agent, finish_reason=stream_finish_reason, response_id=stream_response_id, + response_model=response_model, ) if isinstance(result, str): return self._invoke_after_llm_call_hooks( diff --git a/lib/crewai/tests/llms/openai_compatible/test_openai_compatible.py b/lib/crewai/tests/llms/openai_compatible/test_openai_compatible.py index c4ec6a894f..a191b0c5f8 100644 --- a/lib/crewai/tests/llms/openai_compatible/test_openai_compatible.py +++ b/lib/crewai/tests/llms/openai_compatible/test_openai_compatible.py @@ -383,3 +383,48 @@ def test_deepseek_completion_skips_native_parse_and_validates_client_side(self): client.chat.completions.create.assert_called_once() assert isinstance(result, _Answer) assert result.value == 42 + + def test_deepseek_streaming_skips_native_stream_and_validates_client_side(self): + """DeepSeek + streaming + response_model must use a plain streaming + completion (no json_schema beta.stream) and still parse the accumulated + text into the requested model, matching the non-streaming fallback. + """ + + def _chunk(content: str | None = None, finish: str | None = None) -> MagicMock: + chunk = MagicMock() + chunk.id = "id" + chunk.usage = None # not a usage chunk + choice = MagicMock() + delta = MagicMock() + delta.content = content + delta.tool_calls = None + choice.delta = delta + choice.finish_reason = finish + chunk.choices = [choice] + return chunk + + with patch.dict(os.environ, {"DEEPSEEK_API_KEY": "test-key"}): + llm = OpenAICompatibleCompletion(model="deepseek-chat", provider="deepseek") + + client = MagicMock() + client.chat.completions.create.return_value = [ + _chunk(content='{"value": '), + _chunk(content="42}", finish="stop"), + ] + + with ( + patch.object(llm, "_get_sync_client", return_value=client), + patch.object(llm, "_emit_stream_chunk_event"), + patch.object(llm, "_emit_call_completed_event"), + ): + result = llm._handle_streaming_completion( + {"messages": [{"role": "user", "content": "hi"}]}, + response_model=_Answer, + ) + + # The native json_schema streaming path must be skipped entirely. + client.beta.chat.completions.stream.assert_not_called() + client.chat.completions.create.assert_called_once() + # ...and the accumulated text is still parsed into the model. + assert isinstance(result, _Answer) + assert result.value == 42 From cb13afe7aba0093758bab50c7475e35591d6d1dd Mon Sep 17 00:00:00 2001 From: HumphreySun98 Date: Wed, 24 Jun 2026 12:17:40 -0500 Subject: [PATCH 3/3] fix(llm): honor configured response_format in streaming fallback The streaming fallback validated only against a per-call response_model, so stream=True with a configured self.response_format on a provider that rejects native json_schema returned raw text instead of the parsed model. Validate against `response_model or self.response_format`, matching the non-streaming fallback. Adds a regression test for the configured-response_format case. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../llms/providers/openai/completion.py | 10 ++- .../test_openai_compatible.py | 61 ++++++++++++++----- 2 files changed, 54 insertions(+), 17 deletions(-) diff --git a/lib/crewai/src/crewai/llms/providers/openai/completion.py b/lib/crewai/src/crewai/llms/providers/openai/completion.py index 3087629509..c44cba1c47 100644 --- a/lib/crewai/src/crewai/llms/providers/openai/completion.py +++ b/lib/crewai/src/crewai/llms/providers/openai/completion.py @@ -1895,10 +1895,16 @@ def _finalize_streaming_response( # streaming was skipped (e.g. DeepSeek), parse the accumulated text # into the requested model so the call still returns a parsed object, # matching the async streaming path and the non-streaming fallback. - if response_model and full_response: + # Honor both a per-call response_model and a configured response_format. + structured_format = response_model or self.response_format + if ( + structured_format is not None + and isinstance(structured_format, type) + and full_response + ): try: structured_result = self._validate_structured_output( - full_response, response_model + full_response, structured_format ) self._emit_call_completed_event( response=structured_result, diff --git a/lib/crewai/tests/llms/openai_compatible/test_openai_compatible.py b/lib/crewai/tests/llms/openai_compatible/test_openai_compatible.py index a191b0c5f8..725173fe07 100644 --- a/lib/crewai/tests/llms/openai_compatible/test_openai_compatible.py +++ b/lib/crewai/tests/llms/openai_compatible/test_openai_compatible.py @@ -318,6 +318,21 @@ class _Answer(BaseModel): value: int +def _stream_chunk(content: str | None = None, finish: str | None = None) -> MagicMock: + """Build a minimal OpenAI streaming chunk for the regular (non-native) path.""" + chunk = MagicMock() + chunk.id = "id" + chunk.usage = None # not a usage chunk + choice = MagicMock() + delta = MagicMock() + delta.content = content + delta.tool_calls = None + choice.delta = delta + choice.finish_reason = finish + chunk.choices = [choice] + return chunk + + class TestStructuredOutputFallback: """Structured output must degrade gracefully on OpenAI-compatible endpoints that reject OpenAI's json_schema response_format (#5990). @@ -390,26 +405,13 @@ def test_deepseek_streaming_skips_native_stream_and_validates_client_side(self): text into the requested model, matching the non-streaming fallback. """ - def _chunk(content: str | None = None, finish: str | None = None) -> MagicMock: - chunk = MagicMock() - chunk.id = "id" - chunk.usage = None # not a usage chunk - choice = MagicMock() - delta = MagicMock() - delta.content = content - delta.tool_calls = None - choice.delta = delta - choice.finish_reason = finish - chunk.choices = [choice] - return chunk - with patch.dict(os.environ, {"DEEPSEEK_API_KEY": "test-key"}): llm = OpenAICompatibleCompletion(model="deepseek-chat", provider="deepseek") client = MagicMock() client.chat.completions.create.return_value = [ - _chunk(content='{"value": '), - _chunk(content="42}", finish="stop"), + _stream_chunk(content='{"value": '), + _stream_chunk(content="42}", finish="stop"), ] with ( @@ -428,3 +430,32 @@ def _chunk(content: str | None = None, finish: str | None = None) -> MagicMock: # ...and the accumulated text is still parsed into the model. assert isinstance(result, _Answer) assert result.value == 42 + + def test_deepseek_streaming_validates_configured_response_format(self): + """stream=True with a configured response_format (not a per-call + response_model) must also be parsed into the model on the fallback + path, matching the non-streaming behavior. + """ + with patch.dict(os.environ, {"DEEPSEEK_API_KEY": "test-key"}): + llm = OpenAICompatibleCompletion( + model="deepseek-chat", provider="deepseek", response_format=_Answer + ) + + client = MagicMock() + client.chat.completions.create.return_value = [ + _stream_chunk(content='{"value": '), + _stream_chunk(content="7}", finish="stop"), + ] + + with ( + patch.object(llm, "_get_sync_client", return_value=client), + patch.object(llm, "_emit_stream_chunk_event"), + patch.object(llm, "_emit_call_completed_event"), + ): + result = llm._handle_streaming_completion( + {"messages": [{"role": "user", "content": "hi"}]}, + ) + + client.beta.chat.completions.stream.assert_not_called() + assert isinstance(result, _Answer) + assert result.value == 7