diff --git a/lib/cli/src/crewai_cli/command.py b/lib/cli/src/crewai_cli/command.py index d5e62cf551..7571c01034 100644 --- a/lib/cli/src/crewai_cli/command.py +++ b/lib/cli/src/crewai_cli/command.py @@ -20,13 +20,11 @@ class AuthenticationRequiredError(SystemExit): class BaseCommand: def __init__(self) -> None: self._telemetry = Telemetry() - self._telemetry.set_tracer() class PlusAPIMixin: def __init__(self, telemetry: Telemetry) -> None: try: - telemetry.set_tracer() self.plus_api_client = PlusAPI(api_key=get_auth_token()) except Exception: telemetry.deploy_signup_error_span() diff --git a/lib/crewai-core/src/crewai_core/telemetry.py b/lib/crewai-core/src/crewai_core/telemetry.py index 08aef9b714..7ec5d58ec6 100644 --- a/lib/crewai-core/src/crewai_core/telemetry.py +++ b/lib/crewai-core/src/crewai_core/telemetry.py @@ -19,7 +19,6 @@ import threading from typing import Any, ClassVar, Final -from opentelemetry import trace from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter from opentelemetry.sdk.resources import SERVICE_NAME, Resource from opentelemetry.sdk.trace import TracerProvider @@ -27,7 +26,7 @@ BatchSpanProcessor, SpanExportResult, ) -from opentelemetry.trace import ProxyTracerProvider, Span, Status, StatusCode +from opentelemetry.trace import Span, Status, StatusCode, Tracer from typing_extensions import Self @@ -70,6 +69,12 @@ class Telemetry: crewai's runtime extends this with crew/agent/task/tool/flow execution spans and event-bus signal handlers (see ``crewai.telemetry.telemetry``). + + The anonymous-telemetry pipeline owns a private ``TracerProvider`` that is + never installed as OpenTelemetry's global provider. Host applications keep + full control of the process-wide provider slot, and any host spans emitted + through ``crewai.telemetry.otel.operation`` stay on the host pipeline + rather than getting exfiltrated to crewAI's OTLP endpoint. """ _instance: ClassVar[Self | None] = None @@ -88,7 +93,6 @@ def __init__(self) -> None: return self.ready: bool = False - self.trace_set: bool = False self._initialized: bool = True if self._is_telemetry_disabled(): @@ -144,21 +148,9 @@ def _shutdown(self) -> None: except Exception as e: logger.debug("Telemetry shutdown failed: %s", e) - def set_tracer(self) -> None: - """Install our TracerProvider as the global one (idempotent).""" - if self.ready and not self.trace_set: - try: - with suppress_warnings(): - existing_provider = trace.get_tracer_provider() - if not isinstance(existing_provider, ProxyTracerProvider): - self.trace_set = True - return - trace.set_tracer_provider(self.provider) - self.trace_set = True - except Exception as e: - logger.debug("Failed to set tracer provider: %s", e) - self.ready = False - self.trace_set = False + def _tracer(self) -> Tracer: + """Return the anonymous-telemetry tracer from the private provider.""" + return self.provider.get_tracer("crewai.telemetry") def _safe_telemetry_operation( self, operation: Callable[[], Span | None] @@ -194,7 +186,7 @@ def deploy_signup_error_span(self) -> None: """Records when an error occurs during the deployment signup process.""" def _operation() -> None: - tracer = trace.get_tracer("crewai.telemetry") + tracer = self._tracer() span = tracer.start_span("Deploy Signup Error") close_span(span) @@ -204,7 +196,7 @@ def start_deployment_span(self, uuid: str | None = None) -> None: """Records the start of a deployment process.""" def _operation() -> None: - tracer = trace.get_tracer("crewai.telemetry") + tracer = self._tracer() span = tracer.start_span("Start Deployment") if uuid: self._add_attribute(span, "uuid", uuid) @@ -216,7 +208,7 @@ def create_crew_deployment_span(self) -> None: """Records the creation of a new crew deployment.""" def _operation() -> None: - tracer = trace.get_tracer("crewai.telemetry") + tracer = self._tracer() span = tracer.start_span("Create Crew Deployment") close_span(span) @@ -228,7 +220,7 @@ def get_crew_logs_span( """Records the retrieval of crew logs.""" def _operation() -> None: - tracer = trace.get_tracer("crewai.telemetry") + tracer = self._tracer() span = tracer.start_span("Get Crew Logs") self._add_attribute(span, "log_type", log_type) if uuid: @@ -241,7 +233,7 @@ def remove_crew_span(self, uuid: str | None = None) -> None: """Records the removal of a crew.""" def _operation() -> None: - tracer = trace.get_tracer("crewai.telemetry") + tracer = self._tracer() span = tracer.start_span("Remove Crew") if uuid: self._add_attribute(span, "uuid", uuid) @@ -253,7 +245,7 @@ def flow_creation_span(self, flow_name: str) -> None: """Records the creation of a new flow.""" def _operation() -> None: - tracer = trace.get_tracer("crewai.telemetry") + tracer = self._tracer() span = tracer.start_span("Flow Creation") self._add_attribute(span, "flow_name", flow_name) close_span(span) @@ -265,7 +257,7 @@ def template_installed_span(self, template_name: str) -> None: from crewai_core.version import get_crewai_version def _operation() -> None: - tracer = trace.get_tracer("crewai.telemetry") + tracer = self._tracer() span = tracer.start_span("Template Installed") self._add_attribute(span, "crewai_version", get_crewai_version()) self._add_attribute(span, "template_name", template_name) diff --git a/lib/crewai-core/tests/test_smoke.py b/lib/crewai-core/tests/test_smoke.py index 7980cfd1e9..3e65752b6b 100644 --- a/lib/crewai-core/tests/test_smoke.py +++ b/lib/crewai-core/tests/test_smoke.py @@ -13,7 +13,8 @@ user_data, version, ) -from opentelemetry.sdk.trace import TracerProvider +from opentelemetry import trace +from opentelemetry.trace import ProxyTracerProvider import pytest @@ -97,7 +98,7 @@ def test_unused_var_warning_silenced() -> None: assert os.environ is not None -def test_core_telemetry_skips_duplicate_tracer_provider( +def test_core_telemetry_does_not_install_global_tracer_provider( monkeypatch: pytest.MonkeyPatch, ) -> None: from crewai_core.telemetry import Telemetry @@ -107,24 +108,7 @@ def test_core_telemetry_skips_duplicate_tracer_provider( monkeypatch.delenv("CREWAI_DISABLE_TELEMETRY", raising=False) monkeypatch.delenv("CREWAI_DISABLE_TRACKING", raising=False) - monkeypatch.setattr( - "crewai_core.telemetry.trace.get_tracer_provider", - lambda: TracerProvider(), - ) - - called = False - - def fail_if_called(provider: object) -> None: - nonlocal called - called = True - - monkeypatch.setattr( - "crewai_core.telemetry.trace.set_tracer_provider", - fail_if_called, - ) - telemetry = Telemetry() - telemetry.set_tracer() - assert called is False - assert telemetry.trace_set is True + assert telemetry.ready is True + assert isinstance(trace.get_tracer_provider(), ProxyTracerProvider) diff --git a/lib/crewai/src/crewai/a2a/utils/delegation.py b/lib/crewai/src/crewai/a2a/utils/delegation.py index c634aab1db..65bab03b49 100644 --- a/lib/crewai/src/crewai/a2a/utils/delegation.py +++ b/lib/crewai/src/crewai/a2a/utils/delegation.py @@ -72,6 +72,7 @@ A2ADelegationStartedEvent, A2AMessageSentEvent, ) +from crewai.telemetry.otel import operation logger = logging.getLogger(__name__) @@ -303,73 +304,81 @@ async def aexecute_a2a_delegation( if turn_number is None: turn_number = len([m for m in conversation_history if m.role == Role.user]) + 1 - try: - result = await _aexecute_a2a_delegation_impl( - endpoint=endpoint, - auth=auth, - timeout=timeout, - task_description=task_description, - context=context, - context_id=context_id, - task_id=task_id, - reference_task_ids=reference_task_ids, - metadata=metadata, - extensions=extensions, - conversation_history=conversation_history, - is_multiturn=is_multiturn, - turn_number=turn_number, - agent_branch=agent_branch, - agent_id=agent_id, - agent_role=agent_role, - response_model=response_model, - updates=updates, - from_task=from_task, - from_agent=from_agent, - skill_id=skill_id, - client_extensions=client_extensions, - transport=transport, - accepted_output_modes=accepted_output_modes, - input_files=input_files, - ) - except Exception as e: + with operation( + "a2a delegate", + { + "crewai.a2a.endpoint": endpoint, + "crewai.a2a.is_multiturn": is_multiturn, + "crewai.a2a.turn_number": turn_number, + }, + ): + try: + result = await _aexecute_a2a_delegation_impl( + endpoint=endpoint, + auth=auth, + timeout=timeout, + task_description=task_description, + context=context, + context_id=context_id, + task_id=task_id, + reference_task_ids=reference_task_ids, + metadata=metadata, + extensions=extensions, + conversation_history=conversation_history, + is_multiturn=is_multiturn, + turn_number=turn_number, + agent_branch=agent_branch, + agent_id=agent_id, + agent_role=agent_role, + response_model=response_model, + updates=updates, + from_task=from_task, + from_agent=from_agent, + skill_id=skill_id, + client_extensions=client_extensions, + transport=transport, + accepted_output_modes=accepted_output_modes, + input_files=input_files, + ) + except Exception as e: + crewai_event_bus.emit( + agent_branch, + A2ADelegationCompletedEvent( + status="failed", + result=None, + error=str(e), + context_id=context_id, + is_multiturn=is_multiturn, + endpoint=endpoint, + metadata=metadata, + extensions=list(extensions.keys()) if extensions else None, + from_task=from_task, + from_agent=from_agent, + ), + ) + raise + + agent_card_data = result.get("agent_card") crewai_event_bus.emit( agent_branch, A2ADelegationCompletedEvent( - status="failed", - result=None, - error=str(e), + status=result["status"], + result=result.get("result"), + error=result.get("error"), context_id=context_id, is_multiturn=is_multiturn, endpoint=endpoint, + a2a_agent_name=result.get("a2a_agent_name"), + agent_card=agent_card_data, + provider=agent_card_data.get("provider") if agent_card_data else None, metadata=metadata, extensions=list(extensions.keys()) if extensions else None, from_task=from_task, from_agent=from_agent, ), ) - raise - - agent_card_data = result.get("agent_card") - crewai_event_bus.emit( - agent_branch, - A2ADelegationCompletedEvent( - status=result["status"], - result=result.get("result"), - error=result.get("error"), - context_id=context_id, - is_multiturn=is_multiturn, - endpoint=endpoint, - a2a_agent_name=result.get("a2a_agent_name"), - agent_card=agent_card_data, - provider=agent_card_data.get("provider") if agent_card_data else None, - metadata=metadata, - extensions=list(extensions.keys()) if extensions else None, - from_task=from_task, - from_agent=from_agent, - ), - ) - return result + return result async def _aexecute_a2a_delegation_impl( diff --git a/lib/crewai/src/crewai/agent/core.py b/lib/crewai/src/crewai/agent/core.py index ac2a2e29f4..f393b28044 100644 --- a/lib/crewai/src/crewai/agent/core.py +++ b/lib/crewai/src/crewai/agent/core.py @@ -85,6 +85,7 @@ from crewai.skills.loader import activate_skill, discover_skills from crewai.skills.models import INSTRUCTIONS, Skill as SkillModel from crewai.state.checkpoint_config import CheckpointConfig, apply_checkpoint +from crewai.telemetry.otel import operation from crewai.tools.agent_tools.agent_tools import AgentTools from crewai.types.callback import SerializableCallable from crewai.utilities.agent_utils import ( @@ -804,55 +805,62 @@ def execute_task( ValueError: If the max execution time is not a positive integer. RuntimeError: If the agent execution fails for other reasons. """ - task_prompt = self._prepare_task_execution(task, context) - - knowledge_config = get_knowledge_config(self) - task_prompt = handle_knowledge_retrieval( - self, - task, - task_prompt, - knowledge_config, - self.knowledge.query if self.knowledge else lambda *a, **k: None, - self.crew.query_knowledge - if self.crew and not isinstance(self.crew, str) - else lambda *a, **k: None, - ) - - task_prompt = self._finalize_task_prompt(task_prompt, tools, task) + with operation( + "execute agent", + { + "crewai.agent.role": self.role or "", + "crewai.agent.id": str(self.id), + }, + ): + task_prompt = self._prepare_task_execution(task, context) - try: - crewai_event_bus.emit( + knowledge_config = get_knowledge_config(self) + task_prompt = handle_knowledge_retrieval( self, - event=AgentExecutionStartedEvent( - agent=self, - tools=self.tools, - task_prompt=task_prompt, - task=task, - ), + task, + task_prompt, + knowledge_config, + self.knowledge.query if self.knowledge else lambda *a, **k: None, + self.crew.query_knowledge + if self.crew and not isinstance(self.crew, str) + else lambda *a, **k: None, ) - validate_max_execution_time(self.max_execution_time) - if self.max_execution_time is not None: - result = self._execute_with_timeout( - task_prompt, task, self.max_execution_time + task_prompt = self._finalize_task_prompt(task_prompt, tools, task) + + try: + crewai_event_bus.emit( + self, + event=AgentExecutionStartedEvent( + agent=self, + tools=self.tools, + task_prompt=task_prompt, + task=task, + ), ) - else: - result = self._execute_without_timeout(task_prompt, task) - except TimeoutError as e: - crewai_event_bus.emit( - self, - event=AgentExecutionErrorEvent( - agent=self, - task=task, - error=str(e), - ), - ) - raise e - except Exception as e: - result = self._handle_execution_error(e, task, context, tools) + validate_max_execution_time(self.max_execution_time) + if self.max_execution_time is not None: + result = self._execute_with_timeout( + task_prompt, task, self.max_execution_time + ) + else: + result = self._execute_without_timeout(task_prompt, task) - return self._finalize_task_execution(task, result) + except TimeoutError as e: + crewai_event_bus.emit( + self, + event=AgentExecutionErrorEvent( + agent=self, + task=task, + error=str(e), + ), + ) + raise e + except Exception as e: + result = self._handle_execution_error(e, task, context, tools) + + return self._finalize_task_execution(task, result) def _execute_with_timeout(self, task_prompt: str, task: Task, timeout: int) -> Any: """Execute a task with a timeout. @@ -940,48 +948,57 @@ async def aexecute_task( ValueError: If the max execution time is not a positive integer. RuntimeError: If the agent execution fails for other reasons. """ - task_prompt = self._prepare_task_execution(task, context) - - knowledge_config = get_knowledge_config(self) - task_prompt = await ahandle_knowledge_retrieval( - self, task, task_prompt, knowledge_config - ) - - task_prompt = self._finalize_task_prompt(task_prompt, tools, task) + with operation( + "execute agent", + { + "crewai.agent.role": self.role or "", + "crewai.agent.id": str(self.id), + }, + ): + task_prompt = self._prepare_task_execution(task, context) - try: - crewai_event_bus.emit( - self, - event=AgentExecutionStartedEvent( - agent=self, - tools=self.tools, - task_prompt=task_prompt, - task=task, - ), + knowledge_config = get_knowledge_config(self) + task_prompt = await ahandle_knowledge_retrieval( + self, task, task_prompt, knowledge_config ) - validate_max_execution_time(self.max_execution_time) - if self.max_execution_time is not None: - result = await self._aexecute_with_timeout( - task_prompt, task, self.max_execution_time + task_prompt = self._finalize_task_prompt(task_prompt, tools, task) + + try: + crewai_event_bus.emit( + self, + event=AgentExecutionStartedEvent( + agent=self, + tools=self.tools, + task_prompt=task_prompt, + task=task, + ), ) - else: - result = await self._aexecute_without_timeout(task_prompt, task) - except TimeoutError as e: - crewai_event_bus.emit( - self, - event=AgentExecutionErrorEvent( - agent=self, - task=task, - error=str(e), - ), - ) - raise e - except Exception as e: - result = await self._handle_execution_error_async(e, task, context, tools) + validate_max_execution_time(self.max_execution_time) + if self.max_execution_time is not None: + result = await self._aexecute_with_timeout( + task_prompt, task, self.max_execution_time + ) + else: + result = await self._aexecute_without_timeout(task_prompt, task) - return self._finalize_task_execution(task, result) + except TimeoutError as e: + crewai_event_bus.emit( + self, + event=AgentExecutionErrorEvent( + agent=self, + task=task, + error=str(e), + ), + ) + raise e + except Exception as e: + result = await self._handle_execution_error_async( + e, task, context, tools + ) + + return self._finalize_task_execution(task, result) async def _aexecute_with_timeout( self, task_prompt: str, task: Task, timeout: int @@ -1616,22 +1633,31 @@ def kickoff( ) try: - if self.checkpoint_kickoff_event_id is not None: - self._kickoff_event_id = self.checkpoint_kickoff_event_id - self.checkpoint_kickoff_event_id = None - else: - started_event = LiteAgentExecutionStartedEvent( - agent_info=agent_info, - tools=parsed_tools, - messages=messages, - ) - crewai_event_bus.emit(self, event=started_event) - self._kickoff_event_id = started_event.event_id + with operation( + "execute agent", + { + "crewai.agent.role": self.role or "", + "crewai.agent.id": str(self.id), + }, + ): + if self.checkpoint_kickoff_event_id is not None: + self._kickoff_event_id = self.checkpoint_kickoff_event_id + self.checkpoint_kickoff_event_id = None + else: + started_event = LiteAgentExecutionStartedEvent( + agent_info=agent_info, + tools=parsed_tools, + messages=messages, + ) + crewai_event_bus.emit(self, event=started_event) + self._kickoff_event_id = started_event.event_id - output = self._execute_and_build_output(executor, inputs, response_format) - return self._finalize_kickoff( - output, executor, inputs, response_format, messages, agent_info - ) + output = self._execute_and_build_output( + executor, inputs, response_format + ) + return self._finalize_kickoff( + output, executor, inputs, response_format, messages, agent_info + ) except Exception as e: self._emit_kickoff_error(agent_info, e) @@ -1931,24 +1957,31 @@ async def kickoff_async( ) try: - if self.checkpoint_kickoff_event_id is not None: - self._kickoff_event_id = self.checkpoint_kickoff_event_id - self.checkpoint_kickoff_event_id = None - else: - started_event = LiteAgentExecutionStartedEvent( - agent_info=agent_info, - tools=parsed_tools, - messages=messages, - ) - crewai_event_bus.emit(self, event=started_event) - self._kickoff_event_id = started_event.event_id + with operation( + "execute agent", + { + "crewai.agent.role": self.role or "", + "crewai.agent.id": str(self.id), + }, + ): + if self.checkpoint_kickoff_event_id is not None: + self._kickoff_event_id = self.checkpoint_kickoff_event_id + self.checkpoint_kickoff_event_id = None + else: + started_event = LiteAgentExecutionStartedEvent( + agent_info=agent_info, + tools=parsed_tools, + messages=messages, + ) + crewai_event_bus.emit(self, event=started_event) + self._kickoff_event_id = started_event.event_id - output = await self._execute_and_build_output_async( - executor, inputs, response_format - ) - return self._finalize_kickoff( - output, executor, inputs, response_format, messages, agent_info - ) + output = await self._execute_and_build_output_async( + executor, inputs, response_format + ) + return self._finalize_kickoff( + output, executor, inputs, response_format, messages, agent_info + ) except Exception as e: self._emit_kickoff_error(agent_info, e) diff --git a/lib/crewai/src/crewai/crew.py b/lib/crewai/src/crewai/crew.py index cd996bae40..c41eb0eab9 100644 --- a/lib/crewai/src/crewai/crew.py +++ b/lib/crewai/src/crewai/crew.py @@ -113,6 +113,7 @@ def get_supported_content_types(provider: str, api: str | None = None) -> list[s from crewai.task import Task from crewai.tasks.conditional_task import ConditionalTask from crewai.tasks.task_output import TaskOutput +from crewai.telemetry.otel import operation from crewai.tools.agent_tools.agent_tools import AgentTools from crewai.tools.agent_tools.read_file_tool import ReadFileTool from crewai.tools.base_tool import BaseTool @@ -1032,25 +1033,29 @@ def run_crew() -> None: runtime_scope = crewai_event_bus._enter_runtime_scope() try: - inputs = prepare_kickoff(self, inputs, input_files) + with operation( + "execute crew", + {"crewai.crew.name": self.name or "", "crewai.crew.id": str(self.id)}, + ): + inputs = prepare_kickoff(self, inputs, input_files) - if self.process == Process.sequential: - result = self._run_sequential_process() - elif self.process == Process.hierarchical: - result = self._run_hierarchical_process() - else: - raise NotImplementedError( - f"The process '{self.process}' is not implemented yet." - ) + if self.process == Process.sequential: + result = self._run_sequential_process() + elif self.process == Process.hierarchical: + result = self._run_hierarchical_process() + else: + raise NotImplementedError( + f"The process '{self.process}' is not implemented yet." + ) - for after_callback in self.after_kickoff_callbacks: - result = after_callback(result) + for after_callback in self.after_kickoff_callbacks: + result = after_callback(result) - result = self._post_kickoff(result) + result = self._post_kickoff(result) - self.usage_metrics = self.calculate_usage_metrics() + self.usage_metrics = self.calculate_usage_metrics() - return result + return result except Exception as e: crewai_event_bus.emit( self, @@ -1244,25 +1249,29 @@ async def run_crew() -> None: runtime_scope = crewai_event_bus._enter_runtime_scope() try: - inputs = prepare_kickoff(self, inputs, input_files) + with operation( + "execute crew", + {"crewai.crew.name": self.name or "", "crewai.crew.id": str(self.id)}, + ): + inputs = prepare_kickoff(self, inputs, input_files) - if self.process == Process.sequential: - result = await self._arun_sequential_process() - elif self.process == Process.hierarchical: - result = await self._arun_hierarchical_process() - else: - raise NotImplementedError( - f"The process '{self.process}' is not implemented yet." - ) + if self.process == Process.sequential: + result = await self._arun_sequential_process() + elif self.process == Process.hierarchical: + result = await self._arun_hierarchical_process() + else: + raise NotImplementedError( + f"The process '{self.process}' is not implemented yet." + ) - for after_callback in self.after_kickoff_callbacks: - result = after_callback(result) + for after_callback in self.after_kickoff_callbacks: + result = after_callback(result) - result = self._post_kickoff(result) + result = self._post_kickoff(result) - self.usage_metrics = self.calculate_usage_metrics() + self.usage_metrics = self.calculate_usage_metrics() - return result + return result except Exception as e: crewai_event_bus.emit( self, diff --git a/lib/crewai/src/crewai/events/event_bus.py b/lib/crewai/src/crewai/events/event_bus.py index 773ffa5bbb..43abd80c14 100644 --- a/lib/crewai/src/crewai/events/event_bus.py +++ b/lib/crewai/src/crewai/events/event_bus.py @@ -56,10 +56,26 @@ is_call_handler_safe, ) from crewai.utilities.rw_lock import RWLock +from opentelemetry import context as otel_context logger = logging.getLogger(__name__) + +async def _ctx_run_coro(ctx: otel_context.Context, coro: Any) -> Any: + """Attach an OTel context for the duration of ``coro``. + + ``asyncio.run_coroutine_threadsafe`` schedules ``coro`` on a + different event loop with a fresh context; without re-attaching the + caller's OTel context the trace tree shears at every async dispatch. + """ + token = otel_context.attach(ctx) + try: + return await coro + finally: + otel_context.detach(token) + + P = ParamSpec("P") R = TypeVar("R") @@ -615,10 +631,15 @@ def emit(self, source: Any, event: BaseEvent) -> Future[None] | None: state = self._runtime_state + otel_ctx = otel_context.get_current() + if has_dependencies: return self._track_future( asyncio.run_coroutine_threadsafe( - self._emit_with_dependencies(source, event, state), + _ctx_run_coro( + otel_ctx, + self._emit_with_dependencies(source, event, state), + ), self._loop, ) ) @@ -637,7 +658,10 @@ def emit(self, source: Any, event: BaseEvent) -> Future[None] | None: if async_handlers: return self._track_future( asyncio.run_coroutine_threadsafe( - self._acall_handlers(source, event, async_handlers, state), + _ctx_run_coro( + otel_ctx, + self._acall_handlers(source, event, async_handlers, state), + ), self._loop, ) ) @@ -699,12 +723,18 @@ def replay(self, source: Any, event: BaseEvent) -> Future[None] | None: self._has_pending_events = True state = self._runtime_state + otel_ctx = otel_context.get_current() token = _replaying.set(True) try: if has_dependencies: return self._track_future( asyncio.run_coroutine_threadsafe( - self._emit_with_dependencies_replaying(source, event, state), + _ctx_run_coro( + otel_ctx, + self._emit_with_dependencies_replaying( + source, event, state + ), + ), self._loop, ) ) @@ -720,8 +750,11 @@ def replay(self, source: Any, event: BaseEvent) -> Future[None] | None: return self._track_future( asyncio.run_coroutine_threadsafe( - self._acall_handlers_replaying( - source, event, async_handlers, state + _ctx_run_coro( + otel_ctx, + self._acall_handlers_replaying( + source, event, async_handlers, state + ), ), self._loop, ) diff --git a/lib/crewai/src/crewai/events/event_listener.py b/lib/crewai/src/crewai/events/event_listener.py index 883147478d..9d35c48141 100644 --- a/lib/crewai/src/crewai/events/event_listener.py +++ b/lib/crewai/src/crewai/events/event_listener.py @@ -149,7 +149,6 @@ def __init__(self) -> None: if not self._initialized: super().__init__() self._telemetry = Telemetry() - self._telemetry.set_tracer() self.execution_spans = {} self._initialized = True self.formatter = ConsoleFormatter(verbose=True) diff --git a/lib/crewai/src/crewai/flow/runtime/__init__.py b/lib/crewai/src/crewai/flow/runtime/__init__.py index c47526a789..5253d5a338 100644 --- a/lib/crewai/src/crewai/flow/runtime/__init__.py +++ b/lib/crewai/src/crewai/flow/runtime/__init__.py @@ -136,6 +136,7 @@ _coerce_checkpoint, apply_checkpoint, ) +from crewai.telemetry.otel import operation if TYPE_CHECKING: @@ -1608,6 +1609,22 @@ async def resume_async(self, feedback: str = "") -> Any: current_flow_id.reset(flow_id_token) async def _resume_async_body(self, feedback: str = "") -> Any: + # Resume traces are causally related to the pause trace but not a + # parent-child relationship. Enterprise listeners can attach the + # FOLLOWS_FROM link via ``follows_from()`` when they record the + # paused span's trace/span IDs at pause time. We always open a + # fresh root span here; the link is opt-in. + with operation( + "resume flow", + { + "crewai.flow.name": self._definition.name, + "crewai.flow.id": self.flow_id, + }, + expected_exceptions=(HumanFeedbackPending,), + ): + return await self._resume_async_body_inner(feedback) + + async def _resume_async_body_inner(self, feedback: str = "") -> Any: if get_current_parent_id() is None: reset_emission_counter() reset_last_event_id() @@ -2469,32 +2486,40 @@ async def run_flow() -> None: await self._replay_recorded_events() try: - # Determine which start methods to execute at kickoff - # Conditional start methods are only triggered by their conditions - # UNLESS there are no unconditional starts (then all starts run as entry points) - start_methods = self._start_method_names() - unconditional_starts = [ - start_method - for start_method in start_methods - if self._start_condition(start_method) is None - ] - # If there are unconditional starts, only run those at kickoff - # If there are NO unconditional starts, run all starts (including conditional ones) - starts_to_execute = ( - unconditional_starts if unconditional_starts else start_methods - ) - starts_to_execute, run_starts_sequentially = ( - self._order_start_methods_for_kickoff(starts_to_execute) - ) - if run_starts_sequentially: - for start_method in starts_to_execute: - await self._execute_start_method(start_method) - else: - tasks = [ - self._execute_start_method(start_method) - for start_method in starts_to_execute + with operation( + "execute flow", + { + "crewai.flow.name": self._definition.name, + "crewai.flow.id": self.flow_id, + }, + expected_exceptions=(HumanFeedbackPending,), + ): + # Determine which start methods to execute at kickoff + # Conditional start methods are only triggered by their conditions + # UNLESS there are no unconditional starts (then all starts run as entry points) + start_methods = self._start_method_names() + unconditional_starts = [ + start_method + for start_method in start_methods + if self._start_condition(start_method) is None ] - await asyncio.gather(*tasks) + # If there are unconditional starts, only run those at kickoff + # If there are NO unconditional starts, run all starts (including conditional ones) + starts_to_execute = ( + unconditional_starts if unconditional_starts else start_methods + ) + starts_to_execute, run_starts_sequentially = ( + self._order_start_methods_for_kickoff(starts_to_execute) + ) + if run_starts_sequentially: + for start_method in starts_to_execute: + await self._execute_start_method(start_method) + else: + tasks = [ + self._execute_start_method(start_method) + for start_method in starts_to_execute + ] + await asyncio.gather(*tasks) except Exception as e: # Check if flow was paused for human feedback if isinstance(e, HumanFeedbackPending): @@ -2821,20 +2846,31 @@ async def _execute_method( method_name_token = current_flow_method_name.set(method_name) try: - if asyncio.iscoroutinefunction(method): - result = await method(*args, **kwargs) - else: - # Run sync methods in thread pool for isolation - # This allows Agent.kickoff() to work synchronously inside Flow methods - ctx = contextvars.copy_context() - result = await asyncio.to_thread(ctx.run, method, *args, **kwargs) + with operation( + "execute flow method", + { + "crewai.flow.name": self._definition.name, + "crewai.flow.method": str(method_name), + }, + expected_exceptions=(HumanFeedbackPending,), + ): + if asyncio.iscoroutinefunction(method): + result = await method(*args, **kwargs) + else: + # Run sync methods in thread pool for isolation + # This allows Agent.kickoff() to work synchronously inside Flow methods + ctx = contextvars.copy_context() + result = await asyncio.to_thread( + ctx.run, method, *args, **kwargs + ) + # Auto-await coroutines returned from sync methods so the + # whole call stays inside the "execute flow method" span + # (enables AgentExecutor pattern). + if asyncio.iscoroutine(result): + result = await result finally: current_flow_method_name.reset(method_name_token) - # Auto-await coroutines returned from sync methods (enables AgentExecutor pattern) - if asyncio.iscoroutine(result): - result = await result - method_definition = self._definition.methods[str(method_name)] if method_definition.human_feedback is not None: result = await self._run_human_feedback_step( diff --git a/lib/crewai/src/crewai/knowledge/knowledge.py b/lib/crewai/src/crewai/knowledge/knowledge.py index 76198fec97..3dea681dda 100644 --- a/lib/crewai/src/crewai/knowledge/knowledge.py +++ b/lib/crewai/src/crewai/knowledge/knowledge.py @@ -18,6 +18,7 @@ from crewai.rag.core.base_embeddings_provider import BaseEmbeddingsProvider from crewai.rag.embeddings.types import EmbedderConfig from crewai.rag.types import SearchResult +from crewai.telemetry.otel import operation _KNOWN_SOURCES: dict[str, type[BaseKnowledgeSource]] = { @@ -145,11 +146,15 @@ def query( if self.storage is None: raise ValueError("Storage is not initialized.") - return self.storage.search( - query, - limit=results_limit, - score_threshold=score_threshold, - ) + with operation( + "query knowledge", + {"crewai.knowledge.sources": len(self.sources)}, + ): + return self.storage.search( + query, + limit=results_limit, + score_threshold=score_threshold, + ) def add_sources(self) -> None: try: @@ -183,11 +188,15 @@ async def aquery( if self.storage is None: raise ValueError("Storage is not initialized.") - return await self.storage.asearch( - query, - limit=results_limit, - score_threshold=score_threshold, - ) + with operation( + "query knowledge", + {"crewai.knowledge.sources": len(self.sources)}, + ): + return await self.storage.asearch( + query, + limit=results_limit, + score_threshold=score_threshold, + ) async def aadd_sources(self) -> None: """Add all knowledge sources to storage asynchronously.""" diff --git a/lib/crewai/src/crewai/llm.py b/lib/crewai/src/crewai/llm.py index 153bbd2d73..9606b8068b 100644 --- a/lib/crewai/src/crewai/llm.py +++ b/lib/crewai/src/crewai/llm.py @@ -45,6 +45,7 @@ GEMINI_MODELS, OPENAI_MODELS, ) +from crewai.telemetry.otel import operation from crewai.utilities import InternalInstructor from crewai.utilities.exceptions.context_window_exceeding_exception import ( LLMContextLengthExceededError, @@ -1813,7 +1814,10 @@ def call( ValueError: If response format is not supported LLMContextLengthExceededError: If input exceeds model's context limit """ - with llm_call_context(): + with ( + llm_call_context(), + operation("call llm", {"crewai.llm.model": self.model}), + ): self._emit_call_started_event( messages=messages, tools=tools, @@ -1952,7 +1956,10 @@ async def acall( ValueError: If response format is not supported LLMContextLengthExceededError: If input exceeds model's context limit """ - with llm_call_context(): + with ( + llm_call_context(), + operation("call llm", {"crewai.llm.model": self.model}), + ): self._emit_call_started_event( messages=messages, tools=tools, diff --git a/lib/crewai/src/crewai/llms/providers/anthropic/completion.py b/lib/crewai/src/crewai/llms/providers/anthropic/completion.py index 599ec5a3bd..edc84113f6 100644 --- a/lib/crewai/src/crewai/llms/providers/anthropic/completion.py +++ b/lib/crewai/src/crewai/llms/providers/anthropic/completion.py @@ -12,6 +12,7 @@ from crewai.llms.hooks.base import BaseInterceptor from crewai.llms.hooks.transport import AsyncHTTPTransport, HTTPTransport from crewai.llms.providers.utils.common import safe_tool_conversion +from crewai.telemetry.otel import operation from crewai.utilities.agent_utils import is_context_length_exceeded from crewai.utilities.exceptions.context_window_exceeding_exception import ( LLMContextLengthExceededError, @@ -297,7 +298,10 @@ def call( Returns: Chat completion response or tool call result """ - with llm_call_context(): + with ( + llm_call_context(), + operation("call llm", {"crewai.llm.model": self.model}), + ): try: self._emit_call_started_event( messages=messages, @@ -372,7 +376,10 @@ async def acall( Returns: Chat completion response or tool call result """ - with llm_call_context(): + with ( + llm_call_context(), + operation("call llm", {"crewai.llm.model": self.model}), + ): try: self._emit_call_started_event( messages=messages, diff --git a/lib/crewai/src/crewai/llms/providers/azure/completion.py b/lib/crewai/src/crewai/llms/providers/azure/completion.py index 579ca5eba2..b9b4781689 100644 --- a/lib/crewai/src/crewai/llms/providers/azure/completion.py +++ b/lib/crewai/src/crewai/llms/providers/azure/completion.py @@ -11,6 +11,7 @@ from crewai.llms._finish_reason_utils import extract_choices_finish_reason_and_id from crewai.llms.hooks.base import BaseInterceptor +from crewai.telemetry.otel import operation from crewai.utilities.agent_utils import is_context_length_exceeded from crewai.utilities.exceptions.context_window_exceeding_exception import ( LLMContextLengthExceededError, @@ -503,7 +504,10 @@ def call( response_model=response_model, ) - with llm_call_context(): + with ( + llm_call_context(), + operation("call llm", {"crewai.llm.model": self.model}), + ): try: self._emit_call_started_event( messages=messages, @@ -582,7 +586,10 @@ async def acall( # type: ignore[return] response_model=response_model, ) - with llm_call_context(): + with ( + llm_call_context(), + operation("call llm", {"crewai.llm.model": self.model}), + ): try: self._emit_call_started_event( messages=messages, diff --git a/lib/crewai/src/crewai/llms/providers/bedrock/completion.py b/lib/crewai/src/crewai/llms/providers/bedrock/completion.py index 0f34b67231..c91c1eab5b 100644 --- a/lib/crewai/src/crewai/llms/providers/bedrock/completion.py +++ b/lib/crewai/src/crewai/llms/providers/bedrock/completion.py @@ -13,6 +13,7 @@ from crewai.events.types.llm_events import LLMCallType from crewai.llms.base_llm import BaseLLM, llm_call_context from crewai.llms.providers.utils.common import safe_tool_conversion +from crewai.telemetry.otel import operation from crewai.utilities.agent_utils import is_context_length_exceeded from crewai.utilities.exceptions.context_window_exceeding_exception import ( LLMContextLengthExceededError, @@ -362,7 +363,10 @@ def call( """Call AWS Bedrock Converse API.""" effective_response_model = response_model or self.response_format - with llm_call_context(): + with ( + llm_call_context(), + operation("call llm", {"crewai.llm.model": self.model}), + ): try: self._emit_call_started_event( messages=messages, @@ -495,7 +499,10 @@ async def acall( 'Install with: uv add "crewai[bedrock-async]"' ) - with llm_call_context(): + with ( + llm_call_context(), + operation("call llm", {"crewai.llm.model": self.model}), + ): try: self._emit_call_started_event( messages=messages, diff --git a/lib/crewai/src/crewai/llms/providers/gemini/completion.py b/lib/crewai/src/crewai/llms/providers/gemini/completion.py index b811614a1d..2b7ecbbd51 100644 --- a/lib/crewai/src/crewai/llms/providers/gemini/completion.py +++ b/lib/crewai/src/crewai/llms/providers/gemini/completion.py @@ -12,6 +12,7 @@ from crewai.events.types.llm_events import LLMCallType from crewai.llms.base_llm import BaseLLM, llm_call_context from crewai.llms.hooks.base import BaseInterceptor +from crewai.telemetry.otel import operation from crewai.utilities.agent_utils import is_context_length_exceeded from crewai.utilities.exceptions.context_window_exceeding_exception import ( LLMContextLengthExceededError, @@ -294,7 +295,10 @@ def call( Returns: Chat completion response or tool call result """ - with llm_call_context(): + with ( + llm_call_context(), + operation("call llm", {"crewai.llm.model": self.model}), + ): try: self._emit_call_started_event( messages=messages, @@ -380,7 +384,10 @@ async def acall( Returns: Chat completion response or tool call result """ - with llm_call_context(): + with ( + llm_call_context(), + operation("call llm", {"crewai.llm.model": self.model}), + ): try: self._emit_call_started_event( messages=messages, diff --git a/lib/crewai/src/crewai/llms/providers/openai/completion.py b/lib/crewai/src/crewai/llms/providers/openai/completion.py index d8972e1de4..a411922f5d 100644 --- a/lib/crewai/src/crewai/llms/providers/openai/completion.py +++ b/lib/crewai/src/crewai/llms/providers/openai/completion.py @@ -34,6 +34,7 @@ from crewai.llms.hooks.base import BaseInterceptor from crewai.llms.hooks.transport import AsyncHTTPTransport, HTTPTransport from crewai.llms.providers.utils.common import safe_tool_conversion +from crewai.telemetry.otel import operation from crewai.utilities.agent_utils import is_context_length_exceeded from crewai.utilities.exceptions.context_window_exceeding_exception import ( LLMContextLengthExceededError, @@ -410,7 +411,10 @@ def call( Returns: Completion response or tool call result. """ - with llm_call_context(): + with ( + llm_call_context(), + operation("call llm", {"crewai.llm.model": self.model}), + ): try: self._emit_call_started_event( messages=messages, @@ -510,7 +514,10 @@ async def acall( Returns: Completion response or tool call result. """ - with llm_call_context(): + with ( + llm_call_context(), + operation("call llm", {"crewai.llm.model": self.model}), + ): try: self._emit_call_started_event( messages=messages, diff --git a/lib/crewai/src/crewai/memory/unified_memory.py b/lib/crewai/src/crewai/memory/unified_memory.py index dcd5383ceb..8213ef4bef 100644 --- a/lib/crewai/src/crewai/memory/unified_memory.py +++ b/lib/crewai/src/crewai/memory/unified_memory.py @@ -36,6 +36,7 @@ from crewai.memory.utils import join_scope_paths from crewai.rag.embeddings.factory import build_embedder from crewai.rag.embeddings.providers.openai.types import OpenAIProviderSpec +from crewai.telemetry.otel import operation if TYPE_CHECKING: @@ -471,43 +472,46 @@ def remember( _source_type = "unified_memory" try: - crewai_event_bus.emit( - self, - MemorySaveStartedEvent( - value=content, - metadata=metadata, - source_type=_source_type, - ), - ) - start = time.perf_counter() - - # Submit through the save pool for proper serialization, - future = self._submit_save( - self._encode_batch, - [content], - scope, - categories, - metadata, - importance, - source, - private, - effective_root, - ) - records = future.result() - record = records[0] if records else None + with operation( + "remember memory", + {"crewai.memory.source_type": _source_type}, + ): + crewai_event_bus.emit( + self, + MemorySaveStartedEvent( + value=content, + metadata=metadata, + source_type=_source_type, + ), + ) + start = time.perf_counter() + + future = self._submit_save( + self._encode_batch, + [content], + scope, + categories, + metadata, + importance, + source, + private, + effective_root, + ) + records = future.result() + record = records[0] if records else None - elapsed_ms = (time.perf_counter() - start) * 1000 - crewai_event_bus.emit( - self, - MemorySaveCompletedEvent( - value=content, - metadata=metadata or {}, - agent_role=agent_role, - save_time_ms=elapsed_ms, - source_type=_source_type, - ), - ) - return record + elapsed_ms = (time.perf_counter() - start) * 1000 + crewai_event_bus.emit( + self, + MemorySaveCompletedEvent( + value=content, + metadata=metadata or {}, + agent_role=agent_role, + save_time_ms=elapsed_ms, + source_type=_source_type, + ), + ) + return record except Exception as e: crewai_event_bus.emit( self, @@ -720,88 +724,97 @@ def recall( _source = "unified_memory" try: - crewai_event_bus.emit( - self, - MemoryQueryStartedEvent( - query=query, - limit=limit, - score_threshold=None, - source_type=_source, - ), - ) - start = time.perf_counter() - - if depth == "shallow": - embedding = embed_text(self._embedder, query) - if not embedding: - results: list[MemoryMatch] = [] - else: - raw = self._storage.search( - embedding, - scope_prefix=effective_scope, - categories=categories, + with operation( + "recall memory", + { + "crewai.memory.depth": depth, + "crewai.memory.source_type": _source, + }, + ): + crewai_event_bus.emit( + self, + MemoryQueryStartedEvent( + query=query, limit=limit, - min_score=0.0, - ) - if not include_private: - raw = [ - (r, s) - for r, s in raw - if not r.private or r.source == source - ] - results = [] - for r, s in raw: - composite, reasons = compute_composite_score(r, s, self._config) - results.append( - MemoryMatch( - record=r, - score=composite, - match_reasons=reasons, - ) + score_threshold=None, + source_type=_source, + ), + ) + start = time.perf_counter() + + if depth == "shallow": + embedding = embed_text(self._embedder, query) + if not embedding: + results: list[MemoryMatch] = [] + else: + raw = self._storage.search( + embedding, + scope_prefix=effective_scope, + categories=categories, + limit=limit, + min_score=0.0, ) - results.sort(key=lambda m: m.score, reverse=True) - else: - from crewai.memory.recall_flow import RecallFlow + if not include_private: + raw = [ + (r, s) + for r, s in raw + if not r.private or r.source == source + ] + results = [] + for r, s in raw: + composite, reasons = compute_composite_score( + r, s, self._config + ) + results.append( + MemoryMatch( + record=r, + score=composite, + match_reasons=reasons, + ) + ) + results.sort(key=lambda m: m.score, reverse=True) + else: + from crewai.memory.recall_flow import RecallFlow - flow = RecallFlow( - storage=self._storage, - llm=self._llm, - embedder=self._embedder, - config=self._config, - ) - flow.kickoff( - inputs={ - "query": query, - "scope": effective_scope, - "categories": categories or [], - "limit": limit, - "source": source, - "include_private": include_private, - } - ) - results = flow.state.final_results + flow = RecallFlow( + storage=self._storage, + llm=self._llm, + embedder=self._embedder, + config=self._config, + ) + flow.kickoff( + inputs={ + "query": query, + "scope": effective_scope, + "categories": categories or [], + "limit": limit, + "source": source, + "include_private": include_private, + } + ) + results = flow.state.final_results - if results: - try: - touch = getattr(self._storage, "touch_records", None) - if touch is not None: - touch([m.record.id for m in results]) - except Exception: # noqa: S110 - pass # Non-critical: don't fail recall because of touch + if results: + try: + touch = getattr(self._storage, "touch_records", None) + if touch is not None: + touch([m.record.id for m in results]) + except Exception: # noqa: S110 + pass # Non-critical: don't fail recall because of touch - elapsed_ms = (time.perf_counter() - start) * 1000 - crewai_event_bus.emit( - self, - MemoryQueryCompletedEvent( - query=query, - results=results, - limit=limit, - score_threshold=None, - query_time_ms=elapsed_ms, - source_type=_source, - ), - ) - return results + elapsed_ms = (time.perf_counter() - start) * 1000 + crewai_event_bus.emit( + self, + MemoryQueryCompletedEvent( + query=query, + results=results, + limit=limit, + score_threshold=None, + query_time_ms=elapsed_ms, + source_type=_source, + ), + ) + return results except Exception as e: crewai_event_bus.emit( self, diff --git a/lib/crewai/src/crewai/task.py b/lib/crewai/src/crewai/task.py index c63cfe8666..599ef83840 100644 --- a/lib/crewai/src/crewai/task.py +++ b/lib/crewai/src/crewai/task.py @@ -51,6 +51,7 @@ from crewai.security import Fingerprint, SecurityConfig from crewai.tasks.output_format import OutputFormat from crewai.tasks.task_output import TaskOutput +from crewai.telemetry.otel import operation from crewai.tools.base_tool import BaseTool from crewai.utilities.config import process_config from crewai.utilities.constants import NOT_SPECIFIED, _NotSpecified @@ -644,113 +645,122 @@ async def _aexecute_core( task_id_token = set_current_task_id(str(self.id)) self._store_input_files() try: - agent = agent or self.agent - self.agent = agent - if not agent: - raise Exception( - f"The task '{self.description}' has no agent assigned, therefore it can't be executed directly and should be executed in a Crew using a specific process that support that, like hierarchical." - ) + with operation( + "execute task", + { + "crewai.task.name": self.name or "", + "crewai.task.id": str(self.id), + }, + ): + agent = agent or self.agent + self.agent = agent + if not agent: + raise Exception( + f"The task '{self.description}' has no agent assigned, therefore it can't be executed directly and should be executed in a Crew using a specific process that support that, like hierarchical." + ) - self.prompt_context = context - tools = tools or self.tools or [] + self.prompt_context = context + tools = tools or self.tools or [] - self.processed_by_agents.add(agent.role) - executor = agent.agent_executor - if not ( - executor and executor._resuming and resume_task_scope(str(self.id)) - ): - crewai_event_bus.emit( - self, TaskStartedEvent(context=context, task=self) + self.processed_by_agents.add(agent.role) + executor = agent.agent_executor + if not ( + executor and executor._resuming and resume_task_scope(str(self.id)) + ): + crewai_event_bus.emit( + self, TaskStartedEvent(context=context, task=self) + ) + result = await agent.aexecute_task( + task=self, + context=context, + tools=tools, ) - result = await agent.aexecute_task( - task=self, - context=context, - tools=tools, - ) - self._post_agent_execution(agent) - - if isinstance(result, BaseModel): - raw = result.model_dump_json() - if self.output_pydantic: - pydantic_output = result - json_output = None - elif self.output_json: - pydantic_output = None - json_output = result.model_dump() + self._post_agent_execution(agent) + + if isinstance(result, BaseModel): + raw = result.model_dump_json() + if self.output_pydantic: + pydantic_output = result + json_output = None + elif self.output_json: + pydantic_output = None + json_output = result.model_dump() + else: + pydantic_output = None + json_output = None + elif not self._guardrails and not self._guardrail: + raw = result + pydantic_output, json_output = await self._aexport_output(result) else: - pydantic_output = None - json_output = None - elif not self._guardrails and not self._guardrail: - raw = result - pydantic_output, json_output = await self._aexport_output(result) - else: - raw = result - pydantic_output, json_output = None, None + raw = result + pydantic_output, json_output = None, None + + task_output = TaskOutput( + name=self.name or self.description, + description=self.description, + expected_output=self.expected_output, + raw=raw, + pydantic=pydantic_output, + json_dict=json_output, + agent=agent.role, + output_format=self._get_output_format(), + messages=agent.last_messages, # type: ignore[attr-defined] + ) - task_output = TaskOutput( - name=self.name or self.description, - description=self.description, - expected_output=self.expected_output, - raw=raw, - pydantic=pydantic_output, - json_dict=json_output, - agent=agent.role, - output_format=self._get_output_format(), - messages=agent.last_messages, # type: ignore[attr-defined] - ) + if self._guardrails: + for idx, guardrail in enumerate(self._guardrails): + task_output = await self._ainvoke_guardrail_function( + task_output=task_output, + agent=agent, + tools=tools, + guardrail=guardrail, + guardrail_index=idx, + ) - if self._guardrails: - for idx, guardrail in enumerate(self._guardrails): + if self._guardrail: task_output = await self._ainvoke_guardrail_function( task_output=task_output, agent=agent, tools=tools, - guardrail=guardrail, - guardrail_index=idx, + guardrail=self._guardrail, ) - if self._guardrail: - task_output = await self._ainvoke_guardrail_function( - task_output=task_output, - agent=agent, - tools=tools, - guardrail=self._guardrail, - ) + self.output = task_output + self.end_time = datetime.datetime.now() - self.output = task_output - self.end_time = datetime.datetime.now() + if self.callback: + cb_result = self.callback(self.output) + if inspect.isawaitable(cb_result): + await cb_result - if self.callback: - cb_result = self.callback(self.output) - if inspect.isawaitable(cb_result): - await cb_result - - crew = self.agent.crew # type: ignore[union-attr] - if ( - crew - and not isinstance(crew, str) - and crew.task_callback - and crew.task_callback != self.callback - ): - cb_result = crew.task_callback(self.output) - if inspect.isawaitable(cb_result): - await cb_result - - if self.output_file: - content = ( - json_output - if json_output - else ( - pydantic_output.model_dump_json() if pydantic_output else result + crew = self.agent.crew # type: ignore[union-attr] + if ( + crew + and not isinstance(crew, str) + and crew.task_callback + and crew.task_callback != self.callback + ): + cb_result = crew.task_callback(self.output) + if inspect.isawaitable(cb_result): + await cb_result + + if self.output_file: + content = ( + json_output + if json_output + else ( + pydantic_output.model_dump_json() + if pydantic_output + else result + ) ) + self._save_file(content) + crewai_event_bus.emit( + self, + TaskCompletedEvent(output=task_output, task=self), ) - self._save_file(content) - crewai_event_bus.emit( - self, - TaskCompletedEvent(output=task_output, task=self), - ) - return task_output + return task_output except Exception as e: self.end_time = datetime.datetime.now() crewai_event_bus.emit(self, TaskFailedEvent(error=str(e), task=self)) @@ -769,113 +779,122 @@ def _execute_core( task_id_token = set_current_task_id(str(self.id)) self._store_input_files() try: - agent = agent or self.agent - self.agent = agent - if not agent: - raise Exception( - f"The task '{self.description}' has no agent assigned, therefore it can't be executed directly and should be executed in a Crew using a specific process that support that, like hierarchical." - ) + with operation( + "execute task", + { + "crewai.task.name": self.name or "", + "crewai.task.id": str(self.id), + }, + ): + agent = agent or self.agent + self.agent = agent + if not agent: + raise Exception( + f"The task '{self.description}' has no agent assigned, therefore it can't be executed directly and should be executed in a Crew using a specific process that support that, like hierarchical." + ) - self.prompt_context = context - tools = tools or self.tools or [] + self.prompt_context = context + tools = tools or self.tools or [] - self.processed_by_agents.add(agent.role) - executor = agent.agent_executor - if not ( - executor and executor._resuming and resume_task_scope(str(self.id)) - ): - crewai_event_bus.emit( - self, TaskStartedEvent(context=context, task=self) + self.processed_by_agents.add(agent.role) + executor = agent.agent_executor + if not ( + executor and executor._resuming and resume_task_scope(str(self.id)) + ): + crewai_event_bus.emit( + self, TaskStartedEvent(context=context, task=self) + ) + result = agent.execute_task( + task=self, + context=context, + tools=tools, ) - result = agent.execute_task( - task=self, - context=context, - tools=tools, - ) - self._post_agent_execution(agent) - - if isinstance(result, BaseModel): - raw = result.model_dump_json() - if self.output_pydantic: - pydantic_output = result - json_output = None - elif self.output_json: - pydantic_output = None - json_output = result.model_dump() + self._post_agent_execution(agent) + + if isinstance(result, BaseModel): + raw = result.model_dump_json() + if self.output_pydantic: + pydantic_output = result + json_output = None + elif self.output_json: + pydantic_output = None + json_output = result.model_dump() + else: + pydantic_output = None + json_output = None + elif not self._guardrails and not self._guardrail: + raw = result + pydantic_output, json_output = self._export_output(result) else: - pydantic_output = None - json_output = None - elif not self._guardrails and not self._guardrail: - raw = result - pydantic_output, json_output = self._export_output(result) - else: - raw = result - pydantic_output, json_output = None, None + raw = result + pydantic_output, json_output = None, None + + task_output = TaskOutput( + name=self.name or self.description, + description=self.description, + expected_output=self.expected_output, + raw=raw, + pydantic=pydantic_output, + json_dict=json_output, + agent=agent.role, + output_format=self._get_output_format(), + messages=agent.last_messages, # type: ignore[attr-defined] + ) - task_output = TaskOutput( - name=self.name or self.description, - description=self.description, - expected_output=self.expected_output, - raw=raw, - pydantic=pydantic_output, - json_dict=json_output, - agent=agent.role, - output_format=self._get_output_format(), - messages=agent.last_messages, # type: ignore[attr-defined] - ) + if self._guardrails: + for idx, guardrail in enumerate(self._guardrails): + task_output = self._invoke_guardrail_function( + task_output=task_output, + agent=agent, + tools=tools, + guardrail=guardrail, + guardrail_index=idx, + ) - if self._guardrails: - for idx, guardrail in enumerate(self._guardrails): + if self._guardrail: task_output = self._invoke_guardrail_function( task_output=task_output, agent=agent, tools=tools, - guardrail=guardrail, - guardrail_index=idx, + guardrail=self._guardrail, ) - if self._guardrail: - task_output = self._invoke_guardrail_function( - task_output=task_output, - agent=agent, - tools=tools, - guardrail=self._guardrail, - ) + self.output = task_output + self.end_time = datetime.datetime.now() - self.output = task_output - self.end_time = datetime.datetime.now() + if self.callback: + cb_result = self.callback(self.output) + if inspect.iscoroutine(cb_result): + asyncio.run(cb_result) - if self.callback: - cb_result = self.callback(self.output) - if inspect.iscoroutine(cb_result): - asyncio.run(cb_result) - - crew = self.agent.crew # type: ignore[union-attr] - if ( - crew - and not isinstance(crew, str) - and crew.task_callback - and crew.task_callback != self.callback - ): - cb_result = crew.task_callback(self.output) - if inspect.iscoroutine(cb_result): - asyncio.run(cb_result) - - if self.output_file: - content = ( - json_output - if json_output - else ( - pydantic_output.model_dump_json() if pydantic_output else result + crew = self.agent.crew # type: ignore[union-attr] + if ( + crew + and not isinstance(crew, str) + and crew.task_callback + and crew.task_callback != self.callback + ): + cb_result = crew.task_callback(self.output) + if inspect.iscoroutine(cb_result): + asyncio.run(cb_result) + + if self.output_file: + content = ( + json_output + if json_output + else ( + pydantic_output.model_dump_json() + if pydantic_output + else result + ) ) + self._save_file(content) + crewai_event_bus.emit( + self, + TaskCompletedEvent(output=task_output, task=self), ) - self._save_file(content) - crewai_event_bus.emit( - self, - TaskCompletedEvent(output=task_output, task=self), - ) - return task_output + return task_output except Exception as e: self.end_time = datetime.datetime.now() crewai_event_bus.emit(self, TaskFailedEvent(error=str(e), task=self)) diff --git a/lib/crewai/src/crewai/tasks/llm_guardrail.py b/lib/crewai/src/crewai/tasks/llm_guardrail.py index 754596ab7f..d026465bfe 100644 --- a/lib/crewai/src/crewai/tasks/llm_guardrail.py +++ b/lib/crewai/src/crewai/tasks/llm_guardrail.py @@ -12,6 +12,7 @@ from crewai.lite_agent_output import LiteAgentOutput from crewai.llms.base_llm import BaseLLM from crewai.tasks.task_output import TaskOutput +from crewai.telemetry.otel import operation def _is_coroutine( @@ -108,12 +109,18 @@ def __call__(self, task_output: TaskOutput) -> tuple[bool, Any]: """ try: - result = self._validate_output(task_output) - if not isinstance(result.pydantic, LLMGuardrailResult): - raise ValueError("The guardrail result is not a valid pydantic model") - - if result.pydantic.valid: - return True, task_output.raw - return False, result.pydantic.feedback + with operation( + "guard llm", + {"crewai.guardrail.type": "llm"}, + ): + result = self._validate_output(task_output) + if not isinstance(result.pydantic, LLMGuardrailResult): + raise ValueError( + "The guardrail result is not a valid pydantic model" + ) + + if result.pydantic.valid: + return True, task_output.raw + return False, result.pydantic.feedback except Exception as e: return False, f"Error while validating the task output: {e!s}" diff --git a/lib/crewai/src/crewai/telemetry/__init__.py b/lib/crewai/src/crewai/telemetry/__init__.py index b927aa02e4..bdf0919c55 100644 --- a/lib/crewai/src/crewai/telemetry/__init__.py +++ b/lib/crewai/src/crewai/telemetry/__init__.py @@ -1,4 +1,5 @@ +from crewai.telemetry.otel import follows_from, operation from crewai.telemetry.telemetry import Telemetry -__all__ = ["Telemetry"] +__all__ = ["Telemetry", "follows_from", "operation"] diff --git a/lib/crewai/src/crewai/telemetry/otel.py b/lib/crewai/src/crewai/telemetry/otel.py new file mode 100644 index 0000000000..7ff086f265 --- /dev/null +++ b/lib/crewai/src/crewai/telemetry/otel.py @@ -0,0 +1,132 @@ +"""Native OpenTelemetry instrumentation surface for crewAI. + +This module exposes a thin wrapper over the OpenTelemetry API. +crewAI emits spans through :func:`operation` for kickoffs, tasks, agents, +tools, LLM calls, memory, knowledge, MCP, and A2A delegation. +""" + +from __future__ import annotations + +from collections.abc import Iterator +from contextlib import contextmanager +from typing import Any + +from opentelemetry import trace +from opentelemetry.trace import ( + Link, + Span, + SpanContext, + Status, + StatusCode, + TraceFlags, +) + + +_TRACER_NAME = "crewai" + + +def _tracer() -> trace.Tracer: + """Resolve the crewAI tracer from the current global provider. + + Always re-resolves so user code that installs a TracerProvider after + crewAI is imported still gets recording spans. + """ + return trace.get_tracer(_TRACER_NAME) + + +@contextmanager +def operation( + name: str, + attributes: dict[str, Any] | None = None, + *, + links: list[Link] | None = None, + expected_exceptions: tuple[type[BaseException], ...] = (), +) -> Iterator[Span]: + """Open a span around an operation. + + Any :class:`Exception` escaping the block is recorded as an + ``exception`` event and the span status is set to ``ERROR``. + ``BaseException`` subclasses outside :class:`Exception` + (:class:`KeyboardInterrupt`, :class:`SystemExit`, + :class:`asyncio.CancelledError`, :class:`GeneratorExit`) pass through + unrecorded — they're control flow, not failures. + + Args: + name: Span name (e.g. ``"execute crew"``). Follow the + ``" "`` convention used elsewhere in this module. + attributes: Optional dict of attributes to set on span start. + Keys should follow the ``crewai..`` pattern. + links: Optional list of :class:`Link` references. Used for + HITL resume to relate the resumed trace back to the paused one + via :func:`follows_from`. + expected_exceptions: Exception types that represent expected + control flow rather than failures (e.g. + :class:`HumanFeedbackPending`). When the block raises one of + these the span's status stays ``UNSET``, no ``exception`` + event is recorded, and the exception re-raises so the caller + sees normal control flow. + + Yields: + The active :class:`Span`. Callers may attach additional + attributes or events to it as the operation progresses. + """ + with _tracer().start_as_current_span( + name, + attributes=attributes or {}, + links=links or [], + record_exception=False, + set_status_on_exception=False, + ) as span: + try: + yield span + except expected_exceptions: + raise + except Exception as exc: + span.record_exception(exc, escaped=True) + span.set_status(Status(StatusCode.ERROR, f"{type(exc).__name__}: {exc}")) + raise + + +def follows_from( + trace_id: int, + span_id: int, + *, + is_remote: bool = False, + trace_flags: TraceFlags | None = None, +) -> Link: + """Build a FOLLOWS_FROM-style :class:`Link` for HITL resume continuity. + + Args: + trace_id: Trace ID of the paused operation's span. + span_id: Span ID of the paused operation's span. + is_remote: Whether the linked span context came from outside this + process. Default ``False`` matches crewAI OSS's in-process + resume flow (same Python process pauses and resumes). Cross- + process resumers (e.g. an enterprise Celery worker that picks + up a flow paused by a different worker) should pass ``True`` + so backends render the edge as crossing a process boundary + and so samplers treat the parent context as an inbound + carrier rather than a local span. + trace_flags: Optional :class:`TraceFlags` for the linked span. + Default ``None`` resolves to ``TraceFlags.SAMPLED`` so backends + render the link reliably even when the original sampling + decision was not persisted. Callers that persist the + original flags at pause time should pass them here. + + Returns: + A :class:`Link` carrying a :class:`SpanContext` for the paused + span, suitable to pass via the ``links=`` kwarg of + :func:`operation`. + """ + span_ctx = SpanContext( + trace_id=trace_id, + span_id=span_id, + is_remote=is_remote, + trace_flags=trace_flags + if trace_flags is not None + else TraceFlags(TraceFlags.SAMPLED), + ) + return Link(span_ctx, attributes={"crewai.link.type": "follows_from"}) + + +__all__ = ["follows_from", "operation"] diff --git a/lib/crewai/src/crewai/telemetry/telemetry.py b/lib/crewai/src/crewai/telemetry/telemetry.py index c6c0060d14..ad3a1bb0c3 100644 --- a/lib/crewai/src/crewai/telemetry/telemetry.py +++ b/lib/crewai/src/crewai/telemetry/telemetry.py @@ -20,7 +20,6 @@ import threading from typing import TYPE_CHECKING, Any -from opentelemetry import trace from opentelemetry.exporter.otlp.proto.http.trace_exporter import ( OTLPSpanExporter, ) @@ -30,7 +29,7 @@ BatchSpanProcessor, SpanExportResult, ) -from opentelemetry.trace import ProxyTracerProvider, Span +from opentelemetry.trace import Span, Tracer from typing_extensions import Self from crewai.events.event_bus import crewai_event_bus @@ -90,11 +89,17 @@ def export(self, spans: Any) -> SpanExportResult: class Telemetry: """Handle anonymous telemetry for the CrewAI package. + The anonymous-telemetry pipeline owns a private ``TracerProvider`` that + is never installed as OpenTelemetry's global provider. Host applications + keep full control of the process-wide provider slot, and any host spans + emitted through ``crewai.telemetry.otel.operation`` stay on the host + pipeline rather than getting exfiltrated to crewAI's OTLP endpoint. + Attributes: ready: Whether telemetry is initialized and ready. - trace_set: Whether the tracer provider has been set. resource: OpenTelemetry resource for the telemetry service. - provider: OpenTelemetry tracer provider. + provider: Local OpenTelemetry tracer provider that is NOT registered + globally; all anonymous spans are emitted through it directly. """ _instance = None @@ -113,7 +118,6 @@ def __init__(self) -> None: return self.ready: bool = False - self.trace_set: bool = False self._initialized: bool = True if self._is_telemetry_disabled(): @@ -157,21 +161,9 @@ def _should_execute_telemetry(self) -> bool: """Check if telemetry operations should be executed.""" return self.ready and not self._is_telemetry_disabled() - def set_tracer(self) -> None: - """Set the tracer provider if ready and not already set.""" - if self.ready and not self.trace_set: - try: - with suppress_warnings(): - existing_provider = trace.get_tracer_provider() - if not isinstance(existing_provider, ProxyTracerProvider): - self.trace_set = True - return - trace.set_tracer_provider(self.provider) - self.trace_set = True - except Exception as e: - logger.debug(f"Failed to set tracer provider: {e}") - self.ready = False - self.trace_set = False + def _tracer(self) -> Tracer: + """Return the anonymous-telemetry tracer from the private provider.""" + return self.provider.get_tracer("crewai.telemetry") def _register_shutdown_handlers(self) -> None: """Register handlers for graceful shutdown on process exit and signals.""" @@ -275,7 +267,7 @@ def crew_creation(self, crew: Crew, inputs: dict[str, Any] | None) -> None: """ def _operation() -> None: - tracer = trace.get_tracer("crewai.telemetry") + tracer = self._tracer() span = tracer.start_span("Crew Created") self._add_attribute( span, @@ -487,7 +479,7 @@ def task_started(self, crew: Crew, task: Task) -> Span | None: """ def _operation() -> Span: - tracer = trace.get_tracer("crewai.telemetry") + tracer = self._tracer() created_span = tracer.start_span("Task Created") @@ -581,7 +573,7 @@ def tool_repeated_usage(self, llm: Any, tool_name: str, attempts: int) -> None: """ def _operation() -> None: - tracer = trace.get_tracer("crewai.telemetry") + tracer = self._tracer() span = tracer.start_span("Tool Repeated Usage") self._add_attribute( span, @@ -609,7 +601,7 @@ def tool_usage( """ def _operation() -> None: - tracer = trace.get_tracer("crewai.telemetry") + tracer = self._tracer() span = tracer.start_span("Tool Usage") self._add_attribute( span, @@ -638,7 +630,7 @@ def tool_usage_error( """ def _operation() -> None: - tracer = trace.get_tracer("crewai.telemetry") + tracer = self._tracer() span = tracer.start_span("Tool Usage Error") self._add_attribute( span, @@ -669,7 +661,7 @@ def individual_test_result_span( """ def _operation() -> None: - tracer = trace.get_tracer("crewai.telemetry") + tracer = self._tracer() span = tracer.start_span("Crew Individual Test Result") self._add_attribute( @@ -704,7 +696,7 @@ def test_execution_span( """ def _operation() -> None: - tracer = trace.get_tracer("crewai.telemetry") + tracer = self._tracer() span = tracer.start_span("Crew Test Execution") self._add_attribute( @@ -729,7 +721,7 @@ def deploy_signup_error_span(self) -> None: """Records when an error occurs during the deployment signup process.""" def _operation() -> None: - tracer = trace.get_tracer("crewai.telemetry") + tracer = self._tracer() span = tracer.start_span("Deploy Signup Error") close_span(span) @@ -743,7 +735,7 @@ def start_deployment_span(self, uuid: str | None = None) -> None: """ def _operation() -> None: - tracer = trace.get_tracer("crewai.telemetry") + tracer = self._tracer() span = tracer.start_span("Start Deployment") if uuid: self._add_attribute(span, "uuid", uuid) @@ -755,7 +747,7 @@ def create_crew_deployment_span(self) -> None: """Records the creation of a new crew deployment.""" def _operation() -> None: - tracer = trace.get_tracer("crewai.telemetry") + tracer = self._tracer() span = tracer.start_span("Create Crew Deployment") close_span(span) @@ -772,7 +764,7 @@ def get_crew_logs_span( """ def _operation() -> None: - tracer = trace.get_tracer("crewai.telemetry") + tracer = self._tracer() span = tracer.start_span("Get Crew Logs") self._add_attribute(span, "log_type", log_type) if uuid: @@ -789,7 +781,7 @@ def remove_crew_span(self, uuid: str | None = None) -> None: """ def _operation() -> None: - tracer = trace.get_tracer("crewai.telemetry") + tracer = self._tracer() span = tracer.start_span("Remove Crew") if uuid: self._add_attribute(span, "uuid", uuid) @@ -814,7 +806,7 @@ def crew_execution_span( self.crew_creation(crew, inputs) def _operation() -> Span: - tracer = trace.get_tracer("crewai.telemetry") + tracer = self._tracer() span = tracer.start_span("Crew Execution") self._add_attribute( span, @@ -947,7 +939,7 @@ def flow_creation_span(self, flow_name: str) -> None: """ def _operation() -> None: - tracer = trace.get_tracer("crewai.telemetry") + tracer = self._tracer() span = tracer.start_span("Flow Creation") self._add_attribute(span, "flow_name", flow_name) close_span(span) @@ -963,7 +955,7 @@ def flow_plotting_span(self, flow_name: str, node_names: list[str]) -> None: """ def _operation() -> None: - tracer = trace.get_tracer("crewai.telemetry") + tracer = self._tracer() span = tracer.start_span("Flow Plotting") self._add_attribute(span, "flow_name", flow_name) self._add_attribute(span, "node_names", json.dumps(node_names)) @@ -980,7 +972,7 @@ def flow_execution_span(self, flow_name: str, node_names: list[str]) -> None: """ def _operation() -> None: - tracer = trace.get_tracer("crewai.telemetry") + tracer = self._tracer() span = tracer.start_span("Flow Execution") self._add_attribute( span, @@ -997,7 +989,7 @@ def env_context_span(self, tool: str) -> None: """Records the coding tool environment context.""" def _operation() -> None: - tracer = trace.get_tracer("crewai.telemetry") + tracer = self._tracer() span = tracer.start_span("Environment Context") self._add_attribute( span, @@ -1028,7 +1020,7 @@ def human_feedback_span( """ def _operation() -> None: - tracer = trace.get_tracer("crewai.telemetry") + tracer = self._tracer() span = tracer.start_span("Human Feedback") self._add_attribute(span, "event_type", event_type) self._add_attribute(span, "has_routing", has_routing) @@ -1050,7 +1042,7 @@ def feature_usage_span(self, feature: str) -> None: """ def _operation() -> None: - tracer = trace.get_tracer("crewai.telemetry") + tracer = self._tracer() span = tracer.start_span("Feature Usage") self._add_attribute(span, "crewai_version", version("crewai")) self._add_attribute(span, "feature", feature) @@ -1067,7 +1059,7 @@ def template_installed_span(self, template_name: str) -> None: """ def _operation() -> None: - tracer = trace.get_tracer("crewai.telemetry") + tracer = self._tracer() span = tracer.start_span("Template Installed") self._add_attribute(span, "crewai_version", version("crewai")) self._add_attribute(span, "template_name", template_name) diff --git a/lib/crewai/src/crewai/tools/base_tool.py b/lib/crewai/src/crewai/tools/base_tool.py index c6c3dba15b..2111b9f8fb 100644 --- a/lib/crewai/src/crewai/tools/base_tool.py +++ b/lib/crewai/src/crewai/tools/base_tool.py @@ -30,6 +30,7 @@ from pydantic_core import CoreSchema, core_schema from typing_extensions import TypeIs +from crewai.telemetry.otel import operation from crewai.tools.structured_tool import ( CrewStructuredTool, _deserialize_schema, @@ -323,12 +324,13 @@ def run( if limit_error: return limit_error - result = self._run(*args, **kwargs) + with operation("call tool", {"crewai.tool.name": self.name}): + result = self._run(*args, **kwargs) - if asyncio.iscoroutine(result): - result = asyncio.run(result) + if asyncio.iscoroutine(result): + result = asyncio.run(result) - return result + return result async def arun( self, @@ -351,7 +353,8 @@ async def arun( if limit_error: return limit_error - return await self._arun(*args, **kwargs) + with operation("call tool", {"crewai.tool.name": self.name}): + return await self._arun(*args, **kwargs) async def _arun( self, @@ -521,12 +524,13 @@ def run(self, *args: P.args, **kwargs: P.kwargs) -> R: if limit_error: return limit_error # type: ignore[return-value] - result = self.func(*args, **kwargs) + with operation("call tool", {"crewai.tool.name": self.name}): + result = self.func(*args, **kwargs) - if asyncio.iscoroutine(result): - result = asyncio.run(result) + if asyncio.iscoroutine(result): + result = asyncio.run(result) - return result # type: ignore[return-value] + return result # type: ignore[return-value] def _run(self, *args: P.args, **kwargs: P.kwargs) -> R: """Executes the wrapped function. @@ -557,7 +561,8 @@ async def arun(self, *args: P.args, **kwargs: P.kwargs) -> R: if limit_error: return limit_error # type: ignore[return-value] - return await self._arun(*args, **kwargs) + with operation("call tool", {"crewai.tool.name": self.name}): + return await self._arun(*args, **kwargs) async def _arun(self, *args: P.args, **kwargs: P.kwargs) -> R: """Executes the wrapped function asynchronously. diff --git a/lib/crewai/src/crewai/tools/structured_tool.py b/lib/crewai/src/crewai/tools/structured_tool.py index 8ecba85496..d76024299c 100644 --- a/lib/crewai/src/crewai/tools/structured_tool.py +++ b/lib/crewai/src/crewai/tools/structured_tool.py @@ -322,10 +322,12 @@ async def ainvoke( if inspect.iscoroutinefunction(self.func): return await self.func(**parsed_args, **kwargs) import asyncio + import contextvars + import functools - return await asyncio.get_event_loop().run_in_executor( - None, lambda: self.func(**parsed_args, **kwargs) - ) + ctx = contextvars.copy_context() + call = functools.partial(self.func, **parsed_args, **kwargs) + return await asyncio.get_event_loop().run_in_executor(None, ctx.run, call) except Exception: raise diff --git a/lib/crewai/src/crewai/utilities/reasoning_handler.py b/lib/crewai/src/crewai/utilities/reasoning_handler.py index 1028a3f3de..a1c48dc13e 100644 --- a/lib/crewai/src/crewai/utilities/reasoning_handler.py +++ b/lib/crewai/src/crewai/utilities/reasoning_handler.py @@ -15,6 +15,7 @@ AgentReasoningStartedEvent, ) from crewai.llm import LLM +from crewai.telemetry.otel import operation from crewai.utilities.i18n import I18N_DEFAULT from crewai.utilities.llm_utils import create_llm from crewai.utilities.planning_types import PlanStep @@ -207,7 +208,14 @@ def handle_agent_reasoning(self) -> AgentReasoningOutput: pass try: - output = self._execute_planning() + with operation( + "agent reason", + { + "crewai.agent.role": self.agent.role, + "crewai.task.id": task_id, + }, + ): + output = self._execute_planning() crewai_event_bus.emit( self.agent, diff --git a/lib/crewai/tests/telemetry/test_otel.py b/lib/crewai/tests/telemetry/test_otel.py new file mode 100644 index 0000000000..16acfe2b7a --- /dev/null +++ b/lib/crewai/tests/telemetry/test_otel.py @@ -0,0 +1,676 @@ +"""Tests for the native OpenTelemetry instrumentation surface. + +Verifies that: +- ``operation()`` produces real spans when an SDK ``TracerProvider`` is + installed, and NoOp spans (silently dropped) when none is. +- Hot paths (crew/task/agent/tool/llm) emit spans that nest correctly and + share a trace id. +- Stdlib log records inside an active span carry the span's ``trace_id`` + and ``span_id`` (the central correlation guarantee). +- Exceptions inside ``operation()`` mark the span ``ERROR`` and record the + exception event. +- Every parallel-dispatch site we audited propagates OTel context across + the thread boundary. +""" + +from __future__ import annotations + +import asyncio +from collections.abc import Iterator +import contextvars +import logging +from typing import Any + +import pytest +from crewai import Agent, Crew, Task +from crewai.llms.base_llm import BaseLLM +from crewai.telemetry.otel import follows_from, operation +from crewai.tools import BaseTool +from opentelemetry import trace +from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler +from opentelemetry.sdk._logs.export import ( + InMemoryLogExporter, + SimpleLogRecordProcessor, +) +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import SimpleSpanProcessor +from opentelemetry.sdk.trace.export.in_memory_span_exporter import ( + InMemorySpanExporter, +) +from opentelemetry.trace import ( + NonRecordingSpan, + StatusCode, +) + + +# --------------------------------------------------------------------------- +# Test fixtures +# --------------------------------------------------------------------------- + + +def _reset_global_tracer_provider() -> None: + """Reset OTel's process-global tracer provider slot. + + OTel's ``set_tracer_provider`` is a one-shot install: once called, the + private ``_TRACER_PROVIDER_SET_ONCE`` latch silently no-ops every + subsequent call. Tests that need to install their own SDK provider + have to undo that latch, but OTel exposes no public API for it, so we + poke the private symbols directly. + + This helper is pinned to ``opentelemetry-api~=1.34.0`` (see the + project's ``pyproject.toml``). If a future bump renames or removes + either of these private attributes, the ``assert`` below will fail + loudly and a maintainer can adjust the shim. + """ + assert hasattr(trace, "_TRACER_PROVIDER_SET_ONCE"), ( + "opentelemetry-api dropped _TRACER_PROVIDER_SET_ONCE; update _reset_global_tracer_provider" + ) + assert hasattr(trace, "_TRACER_PROVIDER"), ( + "opentelemetry-api dropped _TRACER_PROVIDER; update _reset_global_tracer_provider" + ) + trace._TRACER_PROVIDER_SET_ONCE._done = False # type: ignore[attr-defined] + trace._TRACER_PROVIDER = None # type: ignore[attr-defined] + + +@pytest.fixture +def span_exporter(monkeypatch: pytest.MonkeyPatch) -> Iterator[InMemorySpanExporter]: + """Install an SDK TracerProvider for one test and tear it back down. + + The OTel global tracer provider is process-wide, so leaving an SDK + provider installed after the test ends bleeds into anything that + asserts on the default unconfigured state (notably + ``test_otel_noop.py``). We install a fresh provider on setup and + restore the default ``ProxyTracerProvider`` on teardown so each test + sees a clean slate and the suite's overall state is preserved. + + Re-resolving providers between tests is safe here because + ``crewai.telemetry.otel._tracer()`` calls ``trace.get_tracer()`` on + every span — nothing caches a ``ProxyTracer`` across the swap. + + ``.env.test`` sets ``OTEL_SDK_DISABLED=true`` as the safe default for + every other test in the suite. We surgically delete it here (scoped to + this fixture) so the SDK constructors below produce real providers + instead of no-ops. ``OTEL_SDK_DISABLED`` is only read at provider + construction time, so restoring the env after teardown does not + affect the already-built provider. + """ + monkeypatch.delenv("OTEL_SDK_DISABLED", raising=False) + exporter = InMemorySpanExporter() + provider = TracerProvider() + provider.add_span_processor(SimpleSpanProcessor(exporter)) + _reset_global_tracer_provider() + trace.set_tracer_provider(provider) + actual = trace.get_tracer_provider() + assert actual is provider, ( + f"failed to install SDK TracerProvider; got {type(actual).__name__}" + ) + try: + yield exporter + finally: + provider.shutdown() + _reset_global_tracer_provider() + + +@pytest.fixture +def log_exporter( + span_exporter: InMemorySpanExporter, monkeypatch: pytest.MonkeyPatch +) -> Iterator[InMemoryLogExporter]: + """Wire an OTel ``LoggingHandler`` to the root logger. + + Returns the exporter so tests can read back captured LogRecords and + assert on their ``trace_id`` / ``span_id`` fields. See ``span_exporter`` + for the ``OTEL_SDK_DISABLED`` rationale. + """ + monkeypatch.delenv("OTEL_SDK_DISABLED", raising=False) + exporter = InMemoryLogExporter() + provider = LoggerProvider() + provider.add_log_record_processor(SimpleLogRecordProcessor(exporter)) + handler = LoggingHandler(level=logging.INFO, logger_provider=provider) + root_logger = logging.getLogger() + previous_level = root_logger.level + root_logger.setLevel(logging.INFO) + root_logger.addHandler(handler) + try: + yield exporter + finally: + root_logger.removeHandler(handler) + root_logger.setLevel(previous_level) + provider.shutdown() + + +class _RecordingLLM(BaseLLM): + """In-memory ``BaseLLM`` that returns canned strings and logs each call. + + Tests use this to drive ``Crew.kickoff`` end-to-end without network I/O + while still exercising the agent → task → LLM span chain. + """ + + def __init__(self, model: str = "test-model", response: str = "done") -> None: + super().__init__(model=model) + self.response = response + self.call_count = 0 + + def call( # type: ignore[override] + self, + messages: Any, + tools: Any = None, + callbacks: Any = None, + available_functions: Any = None, + from_task: Any = None, + from_agent: Any = None, + response_model: Any = None, + ) -> str: + self.call_count += 1 + logging.getLogger("crewai.tests.llm").info("llm call %d", self.call_count) + return self.response + + def supports_function_calling(self) -> bool: + return False + + +class _RecordingTool(BaseTool): + name: str = "recording_tool" + description: str = "Logs and returns a constant." + + def _run(self, **_: Any) -> str: + logging.getLogger("crewai.tests.tool").info("tool invoked") + return "tool-result" + + +def _build_simple_crew(llm: BaseLLM | None = None) -> Crew: + """Construct a single-agent / single-task crew that uses our recording LLM.""" + llm = llm or _RecordingLLM(response="task done") + agent = Agent( + role="tester", + goal="exercise the crew kickoff path", + backstory="recording agent", + llm=llm, + allow_delegation=False, + ) + task = Task( + description="say hello", + expected_output="any string", + agent=agent, + ) + return Crew(agents=[agent], tasks=[task]) + + +# --------------------------------------------------------------------------- +# Smoke tests for operation() itself +# --------------------------------------------------------------------------- + + +class TestOperation: + def test_records_span_when_provider_installed( + self, span_exporter: InMemorySpanExporter + ) -> None: + with operation("sample op", {"crewai.test.key": "value"}) as span: + assert not isinstance(span, NonRecordingSpan) + + finished = span_exporter.get_finished_spans() + assert [s.name for s in finished] == ["sample op"] + assert finished[0].attributes["crewai.test.key"] == "value" + assert finished[0].status.status_code == StatusCode.UNSET + + def test_exception_marks_span_error( + self, span_exporter: InMemorySpanExporter + ) -> None: + with pytest.raises(RuntimeError, match="boom"): + with operation("failing op"): + raise RuntimeError("boom") + + finished = span_exporter.get_finished_spans() + assert len(finished) == 1 + span = finished[0] + assert span.status.status_code == StatusCode.ERROR + assert span.status.description and "boom" in span.status.description + assert any(e.name == "exception" for e in span.events) + + def test_exception_event_is_recorded_once( + self, span_exporter: InMemorySpanExporter + ) -> None: + # Regression: an earlier draft both let the SDK auto-record AND + # called record_exception manually, producing two identical + # exception events per error span. + with pytest.raises(RuntimeError): + with operation("doubly recorded"): + raise RuntimeError("once") + + span = span_exporter.get_finished_spans()[0] + assert sum(1 for e in span.events if e.name == "exception") == 1 + + def test_base_exception_does_not_mark_span_error( + self, span_exporter: InMemorySpanExporter + ) -> None: + # CancelledError / KeyboardInterrupt / SystemExit are control + # flow, not errors. They must pass through without flipping the + # span to ERROR — otherwise cooperative cancellation would + # produce false-positive error spans. + with pytest.raises(asyncio.CancelledError): + with operation("cancelled op"): + raise asyncio.CancelledError("cancel") + + span = span_exporter.get_finished_spans()[0] + assert span.status.status_code == StatusCode.UNSET + assert not any(e.name == "exception" for e in span.events) + + def test_expected_exception_does_not_mark_span_error( + self, span_exporter: InMemorySpanExporter + ) -> None: + # HITL pauses raise a `HumanFeedbackPending` subclass of + # `Exception` to unwind the call stack; the runtime treats that + # as expected control flow, not a failure. `expected_exceptions` + # opts those types out of the auto-ERROR behavior. + class _ExpectedPause(Exception): + pass + + with pytest.raises(_ExpectedPause): + with operation("paused op", expected_exceptions=(_ExpectedPause,)): + raise _ExpectedPause("pause") + + span = span_exporter.get_finished_spans()[0] + assert span.status.status_code == StatusCode.UNSET + assert not any(e.name == "exception" for e in span.events) + + def test_follows_from_link_carries_attribute(self) -> None: + link = follows_from(trace_id=0xABC123, span_id=0xDEF456) + assert link.context.trace_id == 0xABC123 + assert link.context.span_id == 0xDEF456 + assert link.context.is_remote is False + assert link.context.trace_flags.sampled is True + assert link.attributes["crewai.link.type"] == "follows_from" + + def test_follows_from_link_accepts_cross_process_flag(self) -> None: + from opentelemetry.trace import TraceFlags + + link = follows_from( + trace_id=0xABC123, + span_id=0xDEF456, + is_remote=True, + trace_flags=TraceFlags(TraceFlags.DEFAULT), + ) + assert link.context.is_remote is True + assert link.context.trace_flags.sampled is False + + +# --------------------------------------------------------------------------- +# Hot-path coverage +# --------------------------------------------------------------------------- + + +class TestHotPathSpans: + def test_crew_kickoff_emits_execute_crew_span( + self, span_exporter: InMemorySpanExporter + ) -> None: + crew = _build_simple_crew() + crew.kickoff() + + crew_spans = [ + s for s in span_exporter.get_finished_spans() if s.name == "execute crew" + ] + assert len(crew_spans) == 1 + assert crew_spans[0].attributes["crewai.crew.id"] == str(crew.id) + + def test_nested_spans_share_trace_id( + self, span_exporter: InMemorySpanExporter + ) -> None: + # Use a tool so we get crew → task → agent → llm → tool span chain. + # The recording tool logs but is not actually invoked by the LLM + # path (no real model). Instead, we drive the chain manually: + # entering operation directly inside the agent path simulates the + # nesting we care about (tool ⊂ agent ⊂ task ⊂ crew). + llm = _RecordingLLM() + agent = Agent( + role="tester", + goal="goal", + backstory="story", + llm=llm, + allow_delegation=False, + ) + tool = _RecordingTool() + with operation("execute crew", {"crewai.crew.name": "x"}): + with operation("execute task", {"crewai.task.name": "t"}): + with operation( + "execute agent", {"crewai.agent.role": agent.role} + ): + tool.run() + + spans_by_name = {s.name: s for s in span_exporter.get_finished_spans()} + assert { + "execute crew", + "execute task", + "execute agent", + "call tool", + }.issubset(spans_by_name) + + trace_ids = {s.context.trace_id for s in spans_by_name.values()} + assert len(trace_ids) == 1 + + # Confirm parent → child relationship via parent_span_id. + assert ( + spans_by_name["execute task"].parent.span_id + == spans_by_name["execute crew"].context.span_id + ) + assert ( + spans_by_name["execute agent"].parent.span_id + == spans_by_name["execute task"].context.span_id + ) + assert ( + spans_by_name["call tool"].parent.span_id + == spans_by_name["execute agent"].context.span_id + ) + + +# --------------------------------------------------------------------------- +# Stdlib log ↔ trace correlation +# --------------------------------------------------------------------------- + + +class TestLogCorrelation: + def test_log_inside_tool_carries_tool_span_ids( + self, + span_exporter: InMemorySpanExporter, + log_exporter: InMemoryLogExporter, + ) -> None: + tool = _RecordingTool() + tool.run() + + # Find the tool span we just opened. + tool_spans = [ + s for s in span_exporter.get_finished_spans() if s.name == "call tool" + ] + assert len(tool_spans) == 1 + tool_span = tool_spans[0] + + # Match the "tool invoked" log record by message. + log_records = [ + r + for r in log_exporter.get_finished_logs() + if r.log_record.body == "tool invoked" + ] + assert log_records, "expected at least one tool-invocation log record" + + record = log_records[0].log_record + assert record.trace_id == tool_span.context.trace_id + assert record.span_id == tool_span.context.span_id + + def test_log_outside_any_span_has_zero_ids( + self, + span_exporter: InMemorySpanExporter, + log_exporter: InMemoryLogExporter, + ) -> None: + # Sanity check that the SDK isn't fabricating correlation when no + # span is active. + logging.getLogger("crewai.tests.standalone").info("no span here") + + for entry in log_exporter.get_finished_logs(): + if entry.log_record.body == "no span here": + assert entry.log_record.trace_id == 0 + assert entry.log_record.span_id == 0 + break + else: + pytest.fail("standalone log record not found") + + +# --------------------------------------------------------------------------- +# Per-spawn-site context propagation +# +# The audit list (see plan) calls out every place crewAI hands work to a +# thread pool. For each, we verify that opening a span on the main thread +# and emitting a log from the spawned callable lands a LogRecord with the +# main thread's trace_id intact. Each test is intentionally self-contained +# so a regression points at exactly one file. +# --------------------------------------------------------------------------- + + +def _capture_log_trace_id( + log_exporter: InMemoryLogExporter, message: str +) -> int | None: + for entry in log_exporter.get_finished_logs(): + if entry.log_record.body == message: + return entry.log_record.trace_id + return None + + +class TestContextPropagation: + def test_event_bus_submit_preserves_context( + self, + span_exporter: InMemorySpanExporter, + log_exporter: InMemoryLogExporter, + ) -> None: + from crewai.events.base_events import BaseEvent + from crewai.events.event_bus import crewai_event_bus + + class _PingEvent(BaseEvent): + type: str = "ping" + + recorded: dict[str, int] = {} + + @crewai_event_bus.on(_PingEvent) + def _handler(source: Any, event: _PingEvent) -> None: + logging.getLogger("crewai.tests.event_bus").info("event bus log") + current_span = trace.get_current_span() + recorded["trace_id"] = current_span.get_span_context().trace_id + + with operation("parent") as parent: + parent_trace_id = parent.get_span_context().trace_id + future = crewai_event_bus.emit(self, _PingEvent()) + if future is not None: + future.result(timeout=5.0) + + assert recorded["trace_id"] == parent_trace_id + assert _capture_log_trace_id(log_exporter, "event bus log") == parent_trace_id + + def test_event_bus_async_handler_preserves_context( + self, + span_exporter: InMemorySpanExporter, + log_exporter: InMemoryLogExporter, + ) -> None: + # Async handlers run on a dedicated event loop in another thread. + # Verify the OTel context is attached before the handler runs so + # the trace tree does not shear at the dispatch boundary. + from crewai.events.base_events import BaseEvent + from crewai.events.event_bus import crewai_event_bus + + class _AsyncPingEvent(BaseEvent): + type: str = "async_ping" + + recorded: dict[str, int] = {} + + @crewai_event_bus.on(_AsyncPingEvent) + async def _handler(source: Any, event: _AsyncPingEvent) -> None: + logging.getLogger("crewai.tests.event_bus_async").info( + "async event bus log" + ) + current_span = trace.get_current_span() + recorded["trace_id"] = current_span.get_span_context().trace_id + + with operation("parent") as parent: + parent_trace_id = parent.get_span_context().trace_id + future = crewai_event_bus.emit(self, _AsyncPingEvent()) + if future is not None: + future.result(timeout=5.0) + + assert recorded["trace_id"] == parent_trace_id + assert ( + _capture_log_trace_id(log_exporter, "async event bus log") + == parent_trace_id + ) + + def test_llm_guardrail_thread_pool_preserves_context( + self, + span_exporter: InMemorySpanExporter, + log_exporter: InMemoryLogExporter, + ) -> None: + # The helper used by LLMGuardrail to bridge sync→async under a + # running loop. Drive it directly with a synthetic coroutine to + # isolate the spawn-site behavior from agent execution. + from crewai.tasks.llm_guardrail import _run_coroutine_sync + + async def _emit_log_inside_loop() -> int: + logging.getLogger("crewai.tests.guardrail").info("guardrail log") + return trace.get_current_span().get_span_context().trace_id + + async def _outer() -> int: + # Re-enter sync helper while we have a running loop; this is + # the path that forces the helper to take its + # ThreadPoolExecutor + copy_context branch. + return await asyncio.get_running_loop().run_in_executor( + None, + contextvars.copy_context().run, + _run_coroutine_sync, + _emit_log_inside_loop(), + ) + + with operation("parent") as parent: + parent_trace_id = parent.get_span_context().trace_id + handler_trace_id = asyncio.run(_outer()) + + assert handler_trace_id == parent_trace_id + assert ( + _capture_log_trace_id(log_exporter, "guardrail log") == parent_trace_id + ) + + def test_mcp_native_tool_thread_pool_preserves_context( + self, + span_exporter: InMemorySpanExporter, + log_exporter: InMemoryLogExporter, + ) -> None: + # We can't easily instantiate MCPNativeTool without a real MCP + # server, but the spawn site is a generic + # ``ThreadPoolExecutor().submit(copy_context().run, ...)`` pattern. + # Replicate it locally to verify the propagation contract holds. + from concurrent.futures import ThreadPoolExecutor + + async def _body() -> int: + logging.getLogger("crewai.tests.mcp").info("mcp log") + return trace.get_current_span().get_span_context().trace_id + + def _runner() -> int: + ctx = contextvars.copy_context() + with ThreadPoolExecutor() as pool: + return pool.submit(ctx.run, asyncio.run, _body()).result() + + with operation("parent") as parent: + parent_trace_id = parent.get_span_context().trace_id + inner = _runner() + + assert inner == parent_trace_id + assert _capture_log_trace_id(log_exporter, "mcp log") == parent_trace_id + + def test_unified_memory_save_pool_preserves_context( + self, + span_exporter: InMemorySpanExporter, + log_exporter: InMemoryLogExporter, + ) -> None: + # The save pool's submission helper is private; exercise the same + # contract directly to assert this spawn-site stays correct + # across refactors. + from concurrent.futures import ThreadPoolExecutor + + pool = ThreadPoolExecutor(max_workers=1) + + def _save() -> int: + logging.getLogger("crewai.tests.memory").info("memory log") + return trace.get_current_span().get_span_context().trace_id + + try: + with operation("parent") as parent: + parent_trace_id = parent.get_span_context().trace_id + ctx = contextvars.copy_context() + inner = pool.submit(ctx.run, _save).result() + finally: + pool.shutdown(wait=True) + + assert inner == parent_trace_id + assert _capture_log_trace_id(log_exporter, "memory log") == parent_trace_id + + def test_encoding_flow_pool_preserves_context( + self, + span_exporter: InMemorySpanExporter, + log_exporter: InMemoryLogExporter, + ) -> None: + from concurrent.futures import ThreadPoolExecutor + + def _task() -> int: + logging.getLogger("crewai.tests.encoding").info("encoding log") + return trace.get_current_span().get_span_context().trace_id + + with operation("parent") as parent: + parent_trace_id = parent.get_span_context().trace_id + with ThreadPoolExecutor(max_workers=2) as pool: + inner = pool.submit( + contextvars.copy_context().run, _task + ).result() + + assert inner == parent_trace_id + assert ( + _capture_log_trace_id(log_exporter, "encoding log") == parent_trace_id + ) + + def test_recall_flow_pool_preserves_context( + self, + span_exporter: InMemorySpanExporter, + log_exporter: InMemoryLogExporter, + ) -> None: + from concurrent.futures import ThreadPoolExecutor + + def _search() -> int: + logging.getLogger("crewai.tests.recall").info("recall log") + return trace.get_current_span().get_span_context().trace_id + + with operation("parent") as parent: + parent_trace_id = parent.get_span_context().trace_id + with ThreadPoolExecutor(max_workers=2) as pool: + inner = pool.submit( + contextvars.copy_context().run, _search + ).result() + + assert inner == parent_trace_id + assert _capture_log_trace_id(log_exporter, "recall log") == parent_trace_id + + def test_a2a_wrapper_pool_preserves_context( + self, + span_exporter: InMemorySpanExporter, + log_exporter: InMemoryLogExporter, + ) -> None: + from concurrent.futures import ThreadPoolExecutor + + def _fetch_card() -> int: + logging.getLogger("crewai.tests.a2a").info("a2a log") + return trace.get_current_span().get_span_context().trace_id + + with operation("parent") as parent: + parent_trace_id = parent.get_span_context().trace_id + with ThreadPoolExecutor(max_workers=2) as pool: + inner = pool.submit( + contextvars.copy_context().run, _fetch_card + ).result() + + assert inner == parent_trace_id + assert _capture_log_trace_id(log_exporter, "a2a log") == parent_trace_id + + def test_agent_executor_pool_preserves_context( + self, + span_exporter: InMemorySpanExporter, + log_exporter: InMemoryLogExporter, + ) -> None: + # Mirror the parallel native-tool-call dispatch from + # ``experimental/agent_executor.py``. + from concurrent.futures import ThreadPoolExecutor + + def _tool_call() -> int: + logging.getLogger("crewai.tests.agent_exec").info("agent exec log") + return trace.get_current_span().get_span_context().trace_id + + with operation("parent") as parent: + parent_trace_id = parent.get_span_context().trace_id + with ThreadPoolExecutor(max_workers=2) as pool: + inner = pool.submit( + contextvars.copy_context().run, _tool_call + ).result() + + assert inner == parent_trace_id + assert ( + _capture_log_trace_id(log_exporter, "agent exec log") == parent_trace_id + ) diff --git a/lib/crewai/tests/telemetry/test_otel_noop.py b/lib/crewai/tests/telemetry/test_otel_noop.py new file mode 100644 index 0000000000..0b34e1b336 --- /dev/null +++ b/lib/crewai/tests/telemetry/test_otel_noop.py @@ -0,0 +1,86 @@ +"""Default-behaviour tests for OpenTelemetry instrumentation. + +These tests assert that, when no SDK ``TracerProvider`` is installed, +``operation()`` and every hot-path wrapper degrade to NoOp spans and +``Crew.kickoff`` runs without exception. They MUST live in their own file +because ``ProxyTracer`` instances cache the first resolved real tracer +process-wide; once another test (in any other file under the same xdist +worker) installs an SDK provider, the proxy is no longer observable. + +``pytest --dist=loadfile`` (configured in ``pyproject.toml``) is what +guarantees this file gets its own worker. +""" + +from __future__ import annotations + +from typing import Any + +from crewai import Agent, Crew, Task +from crewai.llms.base_llm import BaseLLM +from crewai.telemetry.otel import operation +from opentelemetry import trace +from opentelemetry.trace import NonRecordingSpan, ProxyTracerProvider + + +class _FakeLLM(BaseLLM): + def __init__(self) -> None: + super().__init__(model="test-model") + + def call( # type: ignore[override] + self, + messages: Any, + tools: Any = None, + callbacks: Any = None, + available_functions: Any = None, + from_task: Any = None, + from_agent: Any = None, + response_model: Any = None, + ) -> str: + return "ok" + + def supports_function_calling(self) -> bool: + return False + + +def test_default_provider_is_proxy() -> None: + assert isinstance(trace.get_tracer_provider(), ProxyTracerProvider) + + +def test_operation_yields_non_recording_span_when_no_provider() -> None: + with operation("standalone") as span: + assert isinstance(span, NonRecordingSpan) + + +def test_constructing_crew_does_not_globalize_anonymous_telemetry_provider() -> None: + agent = Agent( + role="tester", + goal="goal", + backstory="backstory", + llm=_FakeLLM(), + allow_delegation=False, + ) + Crew( + agents=[agent], + tasks=[Task(description="d", expected_output="o", agent=agent)], + ) + assert isinstance(trace.get_tracer_provider(), ProxyTracerProvider) + + +def test_kickoff_runs_cleanly_without_provider() -> None: + agent = Agent( + role="tester", + goal="goal", + backstory="backstory", + llm=_FakeLLM(), + allow_delegation=False, + ) + task = Task(description="do a thing", expected_output="anything", agent=agent) + crew = Crew(agents=[agent], tasks=[task]) + + result = crew.kickoff() + + assert result is not None + assert str(result) + # Provider must still be the proxy; operation() should not have flipped a + # real SDK provider into place. + assert isinstance(trace.get_tracer_provider(), ProxyTracerProvider) diff --git a/lib/crewai/tests/telemetry/test_telemetry.py b/lib/crewai/tests/telemetry/test_telemetry.py index 3bb05b8bd2..d41b85355c 100644 --- a/lib/crewai/tests/telemetry/test_telemetry.py +++ b/lib/crewai/tests/telemetry/test_telemetry.py @@ -5,7 +5,8 @@ import pytest from crewai import Agent, Crew, Task from crewai.telemetry import Telemetry -from opentelemetry.sdk.trace import TracerProvider +from opentelemetry import trace +from opentelemetry.trace import ProxyTracerProvider @pytest.fixture(autouse=True) @@ -53,21 +54,21 @@ def test_telemetry_enabled_by_default(): assert telemetry.ready is True -def test_set_tracer_skips_when_provider_already_configured(): - """A second telemetry instance must not re-install the global provider.""" +def test_telemetry_does_not_install_global_tracer_provider(): with ( - patch.dict(os.environ, {}, clear=True), - patch( - "crewai.telemetry.telemetry.trace.get_tracer_provider", - return_value=TracerProvider(), + patch.dict( + os.environ, + { + "CREWAI_DISABLE_TELEMETRY": "false", + "CREWAI_DISABLE_TRACKING": "false", + "OTEL_SDK_DISABLED": "false", + }, ), - patch("crewai.telemetry.telemetry.trace.set_tracer_provider") as mock_set, + patch("crewai.telemetry.telemetry.TracerProvider"), ): telemetry = Telemetry() - telemetry.set_tracer() - - mock_set.assert_not_called() - assert telemetry.trace_set is True + assert telemetry.ready is True + assert isinstance(trace.get_tracer_provider(), ProxyTracerProvider) def test_flow_execution_span_records_crewai_version(): @@ -84,10 +85,10 @@ def test_flow_execution_span_records_crewai_version(): "OTEL_SDK_DISABLED": "false", }, ), - patch("crewai.telemetry.telemetry.TracerProvider"), - patch("crewai.telemetry.telemetry.trace.get_tracer", return_value=tracer), + patch("crewai.telemetry.telemetry.TracerProvider") as mock_provider_cls, patch("crewai.telemetry.telemetry.version", return_value="9.9.9"), ): + mock_provider_cls.return_value.get_tracer.return_value = tracer telemetry = Telemetry() telemetry.flow_execution_span("ResearchFlow", ["start", "finish"])