diff --git a/lib/crewai/src/crewai/llms/providers/openai/completion.py b/lib/crewai/src/crewai/llms/providers/openai/completion.py index d8972e1de4..c44cba1c47 100644 --- a/lib/crewai/src/crewai/llms/providers/openai/completion.py +++ b/lib/crewai/src/crewai/llms/providers/openai/completion.py @@ -1570,8 +1570,10 @@ def _prepare_completion_params( params["reasoning_effort"] = self.reasoning_effort if self.response_format is not None: - if isinstance(self.response_format, type) and issubclass( - self.response_format, BaseModel + if ( + isinstance(self.response_format, type) + and issubclass(self.response_format, BaseModel) + and self.supports_native_structured_output() ): params["response_format"] = generate_model_description( self.response_format @@ -1636,7 +1638,7 @@ def _handle_completion( ) -> str | Any: """Handle non-streaming chat completion.""" try: - if response_model: + if response_model and self.supports_native_structured_output(): parse_params = { k: v for k, v in params.items() if k != "response_format" } @@ -1722,10 +1724,14 @@ def _handle_completion( content = message.content or "" - if self.response_format and isinstance(self.response_format, type): + # When native structured output was skipped (e.g. a provider that + # rejects json_schema), validate the plain completion against the + # requested model client-side so a parsed object is still returned. + structured_format = response_model or self.response_format + if structured_format is not None and isinstance(structured_format, type): try: structured_result = self._validate_structured_output( - content, self.response_format + content, structured_format ) self._emit_call_completed_event( response=structured_result, @@ -1799,7 +1805,8 @@ def _finalize_streaming_response( from_agent: Any | None = None, finish_reason: str | None = None, response_id: str | None = None, - ) -> str | list[dict[str, Any]]: + response_model: type[BaseModel] | None = None, + ) -> str | list[dict[str, Any]] | BaseModel: """Finalize a streaming response with usage tracking, tool call handling, and events. Args: @@ -1813,6 +1820,10 @@ def _finalize_streaming_response( finish_reason: Raw provider finish reason (e.g. "stop", "length", "tool_calls") extracted from the last streaming chunk. response_id: Raw provider response id from any chunk. + response_model: When set and the stream produced text (no tool + calls), validate the accumulated response against this model + and return the parsed object. Used by the fallback path for + providers that don't support native json_schema streaming. Returns: Tool calls list when tools were invoked without available_functions, @@ -1880,6 +1891,35 @@ def _finalize_streaming_response( full_response = self._apply_stop_words(full_response) + # Fallback structured-output validation: when native json_schema + # streaming was skipped (e.g. DeepSeek), parse the accumulated text + # into the requested model so the call still returns a parsed object, + # matching the async streaming path and the non-streaming fallback. + # Honor both a per-call response_model and a configured response_format. + structured_format = response_model or self.response_format + if ( + structured_format is not None + and isinstance(structured_format, type) + and full_response + ): + try: + structured_result = self._validate_structured_output( + full_response, structured_format + ) + self._emit_call_completed_event( + response=structured_result, + call_type=LLMCallType.LLM_CALL, + from_task=from_task, + from_agent=from_agent, + messages=params["messages"], + usage=usage_data, + finish_reason=finish_reason, + response_id=response_id, + ) + return structured_result + except ValueError as e: + logging.warning(f"Structured output validation failed: {e}") + self._emit_call_completed_event( response=full_response, call_type=LLMCallType.LLM_CALL, @@ -1905,7 +1945,7 @@ def _handle_streaming_completion( full_response = "" tool_calls: dict[int, dict[str, Any]] = {} - if response_model: + if response_model and self.supports_native_structured_output(): parse_params = { k: v for k, v in params.items() @@ -2040,6 +2080,7 @@ def _handle_streaming_completion( from_agent=from_agent, finish_reason=stream_finish_reason, response_id=stream_response_id, + response_model=response_model, ) if isinstance(result, str): return self._invoke_after_llm_call_hooks( @@ -2057,7 +2098,7 @@ async def _ahandle_completion( ) -> str | Any: """Handle non-streaming async chat completion.""" try: - if response_model: + if response_model and self.supports_native_structured_output(): parse_params = { k: v for k, v in params.items() if k != "response_format" } @@ -2149,10 +2190,14 @@ async def _ahandle_completion( content = message.content or "" - if self.response_format and isinstance(self.response_format, type): + # When native structured output was skipped (e.g. a provider that + # rejects json_schema), validate the plain completion against the + # requested model client-side so a parsed object is still returned. + structured_format = response_model or self.response_format + if structured_format is not None and isinstance(structured_format, type): try: structured_result = self._validate_structured_output( - content, self.response_format + content, structured_format ) self._emit_call_completed_event( response=structured_result, @@ -2381,6 +2426,20 @@ def supports_function_calling(self) -> bool: """Check if the model supports function calling.""" return not self.is_o1_model + def supports_native_structured_output(self) -> bool: + """Whether the endpoint accepts OpenAI's json_schema structured outputs. + + OpenAI's ``beta.chat.completions.parse`` / ``.stream`` and a Pydantic + ``response_format`` send a ``response_format`` of type ``json_schema``. + Some OpenAI-compatible endpoints reject it (e.g. DeepSeek: "This + response_format type is unavailable now"). Subclasses override this to + fall back to a plain completion plus client-side validation. + + Returns: + ``True`` for OpenAI; subclasses may return ``False``. + """ + return True + def supports_stop_words(self) -> bool: """Check if the model supports stop words.""" model_lower = self.model.lower() if self.model else "" diff --git a/lib/crewai/src/crewai/llms/providers/openai_compatible/completion.py b/lib/crewai/src/crewai/llms/providers/openai_compatible/completion.py index da4cfd03db..a9ca7700f1 100644 --- a/lib/crewai/src/crewai/llms/providers/openai_compatible/completion.py +++ b/lib/crewai/src/crewai/llms/providers/openai_compatible/completion.py @@ -32,6 +32,9 @@ class ProviderConfig: default_headers: HTTP headers to include in all requests. api_key_required: Whether an API key is required for this provider. default_api_key: Default API key to use if none is provided and not required. + supports_json_schema: Whether the endpoint accepts OpenAI's + ``json_schema`` structured-output ``response_format``. Some + OpenAI-compatible endpoints (e.g. DeepSeek) reject it. """ base_url: str @@ -40,6 +43,7 @@ class ProviderConfig: default_headers: dict[str, str] = field(default_factory=dict) api_key_required: bool = True default_api_key: str | None = None + supports_json_schema: bool = True OPENAI_COMPATIBLE_PROVIDERS: dict[str, ProviderConfig] = { @@ -55,6 +59,9 @@ class ProviderConfig: api_key_env="DEEPSEEK_API_KEY", base_url_env="DEEPSEEK_BASE_URL", api_key_required=True, + # DeepSeek rejects OpenAI's json_schema response_format with + # "This response_format type is unavailable now" (#5990). + supports_json_schema=False, ), "ollama": ProviderConfig( base_url="http://localhost:11434/v1", @@ -261,3 +268,16 @@ def supports_function_calling(self) -> bool: Whether the model supports function calling. """ return super().supports_function_calling() + + def supports_native_structured_output(self) -> bool: + """Honor each provider's json_schema support (#5990). + + Some OpenAI-compatible endpoints (e.g. DeepSeek) reject OpenAI's + ``json_schema`` ``response_format``. For those, structured output + falls back to a plain completion and client-side validation. + + Returns: + Whether the configured provider accepts json_schema response_format. + """ + config = OPENAI_COMPATIBLE_PROVIDERS.get(self.provider) + return config.supports_json_schema if config else True diff --git a/lib/crewai/tests/llms/openai_compatible/test_openai_compatible.py b/lib/crewai/tests/llms/openai_compatible/test_openai_compatible.py index ce856a5334..725173fe07 100644 --- a/lib/crewai/tests/llms/openai_compatible/test_openai_compatible.py +++ b/lib/crewai/tests/llms/openai_compatible/test_openai_compatible.py @@ -1,11 +1,13 @@ """Tests for OpenAI-compatible providers.""" import os -from unittest.mock import patch +from unittest.mock import MagicMock, patch import pytest +from pydantic import BaseModel from crewai.llm import LLM +from crewai.llms.providers.openai.completion import OpenAICompletion from crewai.llms.providers.openai_compatible.completion import ( OPENAI_COMPATIBLE_PROVIDERS, OpenAICompatibleCompletion, @@ -36,6 +38,7 @@ def test_provider_config_defaults(self): assert config.default_headers == {} assert config.api_key_required is True assert config.default_api_key is None + assert config.supports_json_schema is True class TestProviderRegistry: @@ -56,6 +59,8 @@ def test_deepseek_config(self): assert config.base_url == "https://api.deepseek.com/v1" assert config.api_key_env == "DEEPSEEK_API_KEY" assert config.api_key_required is True + # DeepSeek rejects OpenAI's json_schema response_format (#5990) + assert config.supports_json_schema is False def test_ollama_config(self): """Test Ollama provider configuration.""" @@ -307,3 +312,150 @@ def test_acall_method_exists(self): completion = OpenAICompatibleCompletion(model="llama3", provider="ollama") assert hasattr(completion, "acall") assert callable(completion.acall) + + +class _Answer(BaseModel): + value: int + + +def _stream_chunk(content: str | None = None, finish: str | None = None) -> MagicMock: + """Build a minimal OpenAI streaming chunk for the regular (non-native) path.""" + chunk = MagicMock() + chunk.id = "id" + chunk.usage = None # not a usage chunk + choice = MagicMock() + delta = MagicMock() + delta.content = content + delta.tool_calls = None + choice.delta = delta + choice.finish_reason = finish + chunk.choices = [choice] + return chunk + + +class TestStructuredOutputFallback: + """Structured output must degrade gracefully on OpenAI-compatible + endpoints that reject OpenAI's json_schema response_format (#5990). + """ + + def test_deepseek_does_not_support_native_structured_output(self): + with patch.dict(os.environ, {"DEEPSEEK_API_KEY": "test-key"}): + llm = OpenAICompatibleCompletion(model="deepseek-chat", provider="deepseek") + assert llm.supports_native_structured_output() is False + + def test_openrouter_supports_native_structured_output(self): + with patch.dict(os.environ, {"OPENROUTER_API_KEY": "test-key"}): + llm = OpenAICompatibleCompletion(model="some-model", provider="openrouter") + assert llm.supports_native_structured_output() is True + + def test_openai_supports_native_structured_output(self): + assert OpenAICompletion(model="gpt-4o").supports_native_structured_output() + + def test_deepseek_omits_json_schema_response_format(self): + """A Pydantic response_format must not be sent as json_schema to DeepSeek.""" + with patch.dict(os.environ, {"DEEPSEEK_API_KEY": "test-key"}): + llm = OpenAICompatibleCompletion( + model="deepseek-chat", provider="deepseek", response_format=_Answer + ) + params = llm._prepare_completion_params([{"role": "user", "content": "hi"}]) + assert "response_format" not in params + + def test_openai_keeps_json_schema_response_format(self): + """OpenAI still receives the json_schema response_format (no regression).""" + llm = OpenAICompletion(model="gpt-4o", response_format=_Answer) + params = llm._prepare_completion_params([{"role": "user", "content": "hi"}]) + assert params.get("response_format", {}).get("type") == "json_schema" + + def test_deepseek_completion_skips_native_parse_and_validates_client_side(self): + """DeepSeek + response_model must use a plain completion (no json_schema + parse) and still return the validated model via client-side parsing. + """ + with patch.dict(os.environ, {"DEEPSEEK_API_KEY": "test-key"}): + llm = OpenAICompatibleCompletion(model="deepseek-chat", provider="deepseek") + + client = MagicMock() + message = MagicMock() + message.content = '{"value": 42}' + message.tool_calls = None + response = MagicMock() + response.choices = [MagicMock(message=message)] + client.chat.completions.create.return_value = response + + with ( + patch.object(llm, "_get_sync_client", return_value=client), + patch.object(llm, "_extract_openai_token_usage", return_value={}), + patch.object( + llm, "_extract_chat_finish_reason_and_id", return_value=("stop", "id") + ), + patch.object(llm, "_emit_call_completed_event"), + ): + result = llm._handle_completion( + {"messages": [{"role": "user", "content": "hi"}]}, + response_model=_Answer, + ) + + client.beta.chat.completions.parse.assert_not_called() + client.chat.completions.create.assert_called_once() + assert isinstance(result, _Answer) + assert result.value == 42 + + def test_deepseek_streaming_skips_native_stream_and_validates_client_side(self): + """DeepSeek + streaming + response_model must use a plain streaming + completion (no json_schema beta.stream) and still parse the accumulated + text into the requested model, matching the non-streaming fallback. + """ + + with patch.dict(os.environ, {"DEEPSEEK_API_KEY": "test-key"}): + llm = OpenAICompatibleCompletion(model="deepseek-chat", provider="deepseek") + + client = MagicMock() + client.chat.completions.create.return_value = [ + _stream_chunk(content='{"value": '), + _stream_chunk(content="42}", finish="stop"), + ] + + with ( + patch.object(llm, "_get_sync_client", return_value=client), + patch.object(llm, "_emit_stream_chunk_event"), + patch.object(llm, "_emit_call_completed_event"), + ): + result = llm._handle_streaming_completion( + {"messages": [{"role": "user", "content": "hi"}]}, + response_model=_Answer, + ) + + # The native json_schema streaming path must be skipped entirely. + client.beta.chat.completions.stream.assert_not_called() + client.chat.completions.create.assert_called_once() + # ...and the accumulated text is still parsed into the model. + assert isinstance(result, _Answer) + assert result.value == 42 + + def test_deepseek_streaming_validates_configured_response_format(self): + """stream=True with a configured response_format (not a per-call + response_model) must also be parsed into the model on the fallback + path, matching the non-streaming behavior. + """ + with patch.dict(os.environ, {"DEEPSEEK_API_KEY": "test-key"}): + llm = OpenAICompatibleCompletion( + model="deepseek-chat", provider="deepseek", response_format=_Answer + ) + + client = MagicMock() + client.chat.completions.create.return_value = [ + _stream_chunk(content='{"value": '), + _stream_chunk(content="7}", finish="stop"), + ] + + with ( + patch.object(llm, "_get_sync_client", return_value=client), + patch.object(llm, "_emit_stream_chunk_event"), + patch.object(llm, "_emit_call_completed_event"), + ): + result = llm._handle_streaming_completion( + {"messages": [{"role": "user", "content": "hi"}]}, + ) + + client.beta.chat.completions.stream.assert_not_called() + assert isinstance(result, _Answer) + assert result.value == 7