Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 22 additions & 4 deletions lib/cli/src/crewai_cli/crew_run_tui.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
(task header, plan checklist, activity timeline, streaming output).
"""

from collections.abc import Iterable
import json as _json
import re
import threading
Expand Down Expand Up @@ -46,6 +47,19 @@ def _is_save_to_memory_tool(tool_name: str | None) -> bool:
return (tool_name or "").replace(" ", "_").lower() == "save_to_memory"


def _is_streaming_output(value: Any) -> bool:
if not isinstance(value, Iterable):
return False

value_type = type(value)
try:
value_type.get_full_text # noqa: B018
value_type.result # noqa: B018
except AttributeError:
return False
return True


def _truncate_log_text(value: Any, limit: int) -> str | None:
if value is None:
return None
Expand Down Expand Up @@ -836,14 +850,18 @@ def _run_conversation_turn_worker(self, message: str) -> None:
set_suppress_tracing_messages(True)
try:
result = self._flow.handle_turn(message)
if hasattr(result, "get_full_text") and hasattr(result, "result"):
for _chunk in result:
pass
result = result.result
result = self._consume_conversation_streaming_result(result)
self.call_from_thread(self._on_conversation_turn_done, result)
except Exception as e:
self.call_from_thread(self._on_conversation_turn_failed, str(e))

def _consume_conversation_streaming_result(self, result: Any) -> Any:
if not _is_streaming_output(result):
return result
for _chunk in result:
pass
return result.result

def _on_conversation_turn_done(self, result: Any) -> None:
with self._lock:
output = self._stringify_output(result)
Expand Down
24 changes: 24 additions & 0 deletions lib/cli/tests/test_crew_run_tui.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
ToolUsageFinishedEvent,
ToolUsageStartedEvent,
)
from crewai.types.streaming import FlowStreamingOutput, StreamChunk
from crewai_cli.command import AuthenticationRequiredError
from crewai_cli import run_crew
from crewai_cli.crew_run_tui import (
Expand Down Expand Up @@ -177,6 +178,29 @@ class RawResult:
assert isinstance(app._crew_result, RawResult)


def test_conversation_streaming_result_is_consumed_before_result_access() -> None:
streaming = FlowStreamingOutput()
result_accessed_before_completion = False

def chunks():
yield StreamChunk(content="hello ")
yield StreamChunk(content="world")
streaming._set_result("hello world")

streaming._sync_iterator = chunks()

try:
streaming.result
except RuntimeError:
result_accessed_before_completion = True

app = CrewRunApp(conversational=True)

assert result_accessed_before_completion is True
assert app._consume_conversation_streaming_result(streaming) == "hello world"
assert streaming.get_full_text() == "hello world"


@pytest.mark.asyncio
async def test_conversation_input_submits_turn() -> None:
class FakeFlow:
Expand Down
47 changes: 46 additions & 1 deletion lib/crewai/src/crewai/experimental/conversational_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from __future__ import annotations

from collections.abc import Callable, Mapping, Sequence
from contextlib import contextmanager
from enum import Enum
import json
import logging
Expand Down Expand Up @@ -62,6 +63,21 @@
logger = logging.getLogger(__name__)


@contextmanager
def _streaming_conversation_llm(llm: BaseLLM, *, enabled: bool) -> Any:
"""Temporarily enable LLM streaming for Flow streaming turns."""
if not enabled:
yield
return

previous_stream = llm.stream
llm.stream = True
try:
yield
finally:
llm.stream = previous_stream
Comment on lines +66 to +78

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🩺 Stability & Availability | 🟠 Major | 🏗️ Heavy lift

🧩 Analysis chain

🌐 Web query:

Does the crewAI BaseLLM / LLM.call API accept a per-call streaming parameter, or is streaming controlled only via the instance-level llm.stream attribute?

💡 Result:

In crewAI, streaming is controlled at the instance level through the stream attribute of the LLM object, rather than through a per-call parameter in the call() or acall() API [1][2]. When initializing an LLM instance, you enable streaming by setting stream=True [1][2]: llm = LLM( model="openai/gpt-4o", stream=True) The call() and acall() methods do not accept a streaming flag [3]. Instead, when stream is set to True on the instance, crewAI automatically handles the streaming process by emitting LLMStreamChunkEvent objects via the crewai_event_bus for every token received [4][3][5][6]. To consume these streamed tokens, you must register an event handler with the event bus to listen for these LLMStreamChunkEvent occurrences [4][5][2]. Top results: [1][3][5][2][6]

Citations:


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Inspect the relevant files and symbols with line numbers.
git ls-files lib/crewai/src/crewai/experimental/conversational_mixin.py lib/crewai/src/crewai/llms/base_llm.py lib/crewai/src/crewai/llms/llm.py 2>/dev/null || true

printf '\n--- conversational_mixin.py outline ---\n'
ast-grep outline lib/crewai/src/crewai/experimental/conversational_mixin.py --view expanded

printf '\n--- base_llm.py outline ---\n'
ast-grep outline lib/crewai/src/crewai/llms/base_llm.py --view expanded

printf '\n--- search for stream usage and call signatures ---\n'
rg -n "def (call|acall)\(|stream\s*=|stream:" lib/crewai/src/crewai/llms lib/crewai/src/crewai/experimental -g '*.py'

Repository: crewAIInc/crewAI

Length of output: 15588


🏁 Script executed:

#!/bin/bash
set -euo pipefail

printf 'FILE: conversational_mixin.py\n'
sed -n '1,120p' lib/crewai/src/crewai/experimental/conversational_mixin.py | cat -n

printf '\nFILE: base_llm.py\n'
sed -n '1,260p' lib/crewai/src/crewai/llms/base_llm.py | cat -n

printf '\nFILE: llm.py\n'
sed -n '1,260p' lib/crewai/src/crewai/llms/llm.py | cat -n

Repository: crewAIInc/crewAI

Length of output: 15468


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Map the relevant files first, then read only the needed sections.
printf '--- conversational_mixin.py outline ---\n'
ast-grep outline lib/crewai/src/crewai/experimental/conversational_mixin.py --view expanded

printf '\n--- conversational_mixin.py lines 1-220 ---\n'
sed -n '1,220p' lib/crewai/src/crewai/experimental/conversational_mixin.py | cat -n

printf '\n--- conversational_mixin.py lines 220-820 ---\n'
sed -n '220,820p' lib/crewai/src/crewai/experimental/conversational_mixin.py | cat -n

printf '\n--- base_llm.py outline ---\n'
ast-grep outline lib/crewai/src/crewai/llms/base_llm.py --view expanded

printf '\n--- base_llm.py lines 1-260 ---\n'
sed -n '1,260p' lib/crewai/src/crewai/llms/base_llm.py | cat -n

Repository: crewAIInc/crewAI

Length of output: 245


🏁 Script executed:

#!/bin/bash
set -euo pipefail

set +e
printf '%s\n' '--- conversational_mixin.py around _default_conversation_llm ---'
sed -n '1010,1050p' lib/crewai/src/crewai/experimental/conversational_mixin.py | cat -n

printf '\n%s\n' '--- conversational_mixin.py around converse_turn ---'
sed -n '220,260p' lib/crewai/src/crewai/experimental/conversational_mixin.py | cat -n

printf '\n%s\n' '--- base_llm.py around call/acall ---'
sed -n '280,360p' lib/crewai/src/crewai/llms/base_llm.py | cat -n

printf '\n%s\n' '--- base_llm.py around stream handling helper ---'
sed -n '480,550p' lib/crewai/src/crewai/llms/base_llm.py | cat -n

Repository: crewAIInc/crewAI

Length of output: 11071


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Find where conversational_config / llm are assigned and used.
rg -n "conversational_config|_default_conversation_llm\(|llm =" lib/crewai/src/crewai/experimental/conversational_mixin.py lib/crewai/src/crewai/experimental -g '*.py'

# Inspect the ConversationConfig definition to see whether llm is mutable/shared by design.
rg -n "class ConversationConfig|llm:|router_llm:|config.llm" lib/crewai/src/crewai/experimental -g '*.py'

Repository: crewAIInc/crewAI

Length of output: 5604


🏁 Script executed:

#!/bin/bash
set -euo pipefail

printf '%s\n' '--- conversational_mixin.py around _default_conversation_llm ---'
sed -n '1010,1050p' lib/crewai/src/crewai/experimental/conversational_mixin.py | cat -n

printf '\n%s\n' '--- conversational_mixin.py around converse_turn ---'
sed -n '220,260p' lib/crewai/src/crewai/experimental/conversational_mixin.py | cat -n

printf '\n%s\n' '--- base_llm.py around call/acall ---'
sed -n '280,360p' lib/crewai/src/crewai/llms/base_llm.py | cat -n

printf '\n%s\n' '--- base_llm.py around stream handling helper ---'
sed -n '480,550p' lib/crewai/src/crewai/llms/base_llm.py | cat -n

printf '\n%s\n' '--- conversational config and llm assignment/search ---'
rg -n "class ConversationConfig|conversational_config|_default_conversation_llm\(|config\.llm|llm =" lib/crewai/src/crewai/experimental -g '*.py'

Repository: crewAIInc/crewAI

Length of output: 13283


Avoid mutating llm.stream on a shared instance

config.llm comes from the class-level conversational_config, so overlapping turns/sessions can race on this save/restore and leave stream in the wrong state. call()/acall() don’t accept a per-call streaming flag, so this needs a per-turn LLM instance or another concurrency-safe streaming hook instead of mutating shared state.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@lib/crewai/src/crewai/experimental/conversational_mixin.py` around lines 66 -
78, The _streaming_conversation_llm context manager currently mutates llm.stream
on a shared BaseLLM instance, which can race across overlapping turns and
restore the wrong state. Update the conversational flow so each turn uses a
per-call/per-session LLM instance or another concurrency-safe streaming
mechanism instead of modifying shared state in _streaming_conversation_llm, and
keep the streaming enablement scoped to call()/acall() usage in the
conversational mixin.



def _iter_condition_labels(condition: Any) -> set[str]:
if isinstance(condition, str):
return {condition}
Expand Down Expand Up @@ -146,6 +162,9 @@ def _collapse_to_outcome(
def _copy_and_serialize_state(self) -> dict[str, Any]:
pass

def _should_stream_llm_calls(self) -> bool:
pass

def kickoff(self, *args: Any, **kwargs: Any) -> Any:
pass

Expand Down Expand Up @@ -221,7 +240,12 @@ def converse_turn(self) -> str:
messages.append({"role": "system", "content": system_prompt})
messages.extend(self.conversation_messages)

response = self._coerce_llm(llm).call(messages=messages)
llm_instance = self._coerce_llm(llm)
with _streaming_conversation_llm(
llm_instance,
enabled=self._should_stream_llm_calls(),
):
response = llm_instance.call(messages=messages)
content = self._stringify_result(response)
self.append_assistant_message(content)
return content
Expand Down Expand Up @@ -703,6 +727,27 @@ def _should_apply_pending_kickoff_context(self) -> bool:
def _apply_pending_kickoff_context(self) -> None:
self._apply_pending_conversational_turn()

def _capture_pending_kickoff_context(self) -> dict[str, Any] | None:
if not self._should_apply_pending_kickoff_context():
return None
return {
"user_message": self._pending_user_message,
"intents": self._pending_intents,
"intent_llm": self._pending_intent_llm,
}

def _restore_pending_kickoff_context(self, context: Any) -> None:
if not isinstance(context, dict):
return
self._pending_user_message = context["user_message"]
self._pending_intents = context["intents"]
self._pending_intent_llm = context["intent_llm"]

def _clear_pending_kickoff_context(self) -> None:
self._pending_user_message = None
self._pending_intents = None
self._pending_intent_llm = None

def _order_start_methods_for_kickoff(
self,
start_methods: list[Any],
Expand Down
61 changes: 49 additions & 12 deletions lib/crewai/src/crewai/flow/runtime/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
Sequence,
)
from concurrent.futures import Future, ThreadPoolExecutor
from contextlib import contextmanager
import contextvars
import copy
from datetime import datetime
Expand Down Expand Up @@ -460,6 +461,16 @@ def _should_apply_pending_kickoff_context(self) -> bool:
def _apply_pending_kickoff_context(self) -> None:
"""Apply optional runtime-extension kickoff context."""

def _capture_pending_kickoff_context(self) -> Any | None:
"""Capture optional pending kickoff context for deferred execution."""
return None

def _restore_pending_kickoff_context(self, context: Any) -> None:
"""Restore optional pending kickoff context in deferred execution."""

def _clear_pending_kickoff_context(self) -> None:
"""Clear optional pending kickoff context after deferred execution."""

def _order_start_methods_for_kickoff(
self,
start_methods: list[FlowMethodName],
Expand All @@ -471,6 +482,19 @@ def _should_defer_trace_finalization(self) -> bool:
"""Whether this kickoff should defer final flow trace finalization."""
return bool(getattr(self, "defer_trace_finalization", False))

def _should_stream_llm_calls(self) -> bool:
"""Whether LLM calls inside the current flow run should stream chunks."""
return self.stream or self._streaming_run_active

@contextmanager
def _streaming_run(self) -> Iterator[None]:
previous_streaming_run = self._streaming_run_active
self._streaming_run_active = True
try:
yield
finally:
self._streaming_run_active = previous_streaming_run

@classmethod
def flow_definition(cls) -> FlowDefinition:
"""Return the static Flow Definition built from this Flow class."""
Expand Down Expand Up @@ -735,6 +759,7 @@ def _restore_from_checkpoint(self) -> None:
_usage_aggregation_handler: Callable[..., Any] | None = PrivateAttr(default=None)
_persist_backends: dict[int, FlowPersistence] = PrivateAttr(default_factory=dict)
_instance_persistence: bool = PrivateAttr(default=False)
_streaming_run_active: bool = PrivateAttr(default=False)

def __class_getitem__(cls: type[Flow[T]], item: type[T]) -> type[Flow[T]]: # type: ignore[override]
class _FlowGeneric(cls): # type: ignore[valid-type,misc]
Expand Down Expand Up @@ -1872,6 +1897,7 @@ def kickoff(
return restored.kickoff(inputs=inputs, input_files=input_files)
if self.stream:
result_holder: list[Any] = []
pending_kickoff_context = self._capture_pending_kickoff_context()
current_task_info: TaskInfo = {
"index": 0,
"name": "",
Expand All @@ -1887,12 +1913,15 @@ def kickoff(

def run_flow() -> None:
try:
self.stream = False
result = self.kickoff(
inputs=inputs,
input_files=input_files,
restore_from_state_id=restore_from_state_id,
)
if pending_kickoff_context is not None:
self._restore_pending_kickoff_context(pending_kickoff_context)
with self._streaming_run():
self.stream = False
result = self.kickoff(
inputs=inputs,
input_files=input_files,
restore_from_state_id=restore_from_state_id,
)
result_holder.append(result)
except Exception as e:
# HumanFeedbackPending is expected control flow, not an error
Expand All @@ -1901,6 +1930,8 @@ def run_flow() -> None:
else:
signal_error(state, e)
finally:
if pending_kickoff_context is not None:
self._clear_pending_kickoff_context()
self.stream = True
signal_end(state)

Expand Down Expand Up @@ -1972,6 +2003,7 @@ async def kickoff_async(
return await restored.kickoff_async(inputs=inputs, input_files=input_files)
if self.stream:
result_holder: list[Any] = []
pending_kickoff_context = self._capture_pending_kickoff_context()
current_task_info: TaskInfo = {
"index": 0,
"name": "",
Expand All @@ -1987,12 +2019,15 @@ async def kickoff_async(

async def run_flow() -> None:
try:
self.stream = False
result = await self.kickoff_async(
inputs=inputs,
input_files=input_files,
restore_from_state_id=restore_from_state_id,
)
if pending_kickoff_context is not None:
self._restore_pending_kickoff_context(pending_kickoff_context)
with self._streaming_run():
self.stream = False
result = await self.kickoff_async(
inputs=inputs,
input_files=input_files,
restore_from_state_id=restore_from_state_id,
)
result_holder.append(result)
except Exception as e:
# HumanFeedbackPending is expected control flow, not an error
Expand All @@ -2001,6 +2036,8 @@ async def run_flow() -> None:
else:
signal_error(state, e, is_async=True)
finally:
if pending_kickoff_context is not None:
self._clear_pending_kickoff_context()
self.stream = True
signal_end(state, is_async=True)

Expand Down
54 changes: 54 additions & 0 deletions lib/crewai/tests/test_flow_conversation.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,60 @@ def run_research(self) -> str:
assert flow.state.events[0].agent_name == "researcher"
assert flow.state.events[0].visibility == "public"

def test_builtin_converse_enables_llm_streaming_for_streaming_flow(self) -> None:
llm = MagicMock()
llm.stream = False
stream_values_seen: list[bool | None] = []

def call(*args: Any, **kwargs: Any) -> str:
stream_values_seen.append(llm.stream)
return "streamed reply"

llm.call.side_effect = call

@ConversationConfig(llm=llm)
class StreamingFlow(ConversationalFlow):
pass

flow = StreamingFlow()
flow.stream = False

with flow._streaming_run():
result = flow.converse_turn()

assert result == "streamed reply"
assert stream_values_seen == [True]
assert llm.stream is False
assert flow._should_stream_llm_calls() is False
assert flow.state.messages[-1].content == "streamed reply"

def test_streaming_handle_turn_preserves_pending_user_message(self) -> None:
@ConversationConfig(llm="unused")
class StreamingEchoFlow(ConversationalFlow):
stream = True

def route_turn(self, context: dict[str, Any]) -> str:
return "echo"

@listen("echo")
def handle_echo(self) -> str:
reply = f"heard: {self.state.current_user_message}"
self.append_assistant_message(reply)
return reply

flow = StreamingEchoFlow()
result = flow.handle_turn("hello streaming")
for _chunk in result:
pass

assert result.result == "heard: hello streaming"
assert [message.role for message in flow.state.messages] == [
"user",
"assistant",
]
assert flow.state.messages[0].content == "hello streaming"
assert flow.state.messages[1].content == "heard: hello streaming"

@conversational_graph_broken
def test_private_agent_results_stay_out_of_shared_history(self) -> None:
class PrivateFlow(ConversationalFlow):
Expand Down
Loading