Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions lib/cli/src/crewai_cli/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,11 @@ class AuthenticationRequiredError(SystemExit):
class BaseCommand:
def __init__(self) -> None:
self._telemetry = Telemetry()
self._telemetry.set_tracer()


class PlusAPIMixin:
def __init__(self, telemetry: Telemetry) -> None:
try:
telemetry.set_tracer()
self.plus_api_client = PlusAPI(api_key=get_auth_token())
except Exception:
telemetry.deploy_signup_error_span()
Expand Down
42 changes: 17 additions & 25 deletions lib/crewai-core/src/crewai_core/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,14 @@
import threading
from typing import Any, ClassVar, Final

from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import (
BatchSpanProcessor,
SpanExportResult,
)
from opentelemetry.trace import ProxyTracerProvider, Span, Status, StatusCode
from opentelemetry.trace import Span, Status, StatusCode, Tracer
from typing_extensions import Self


Expand Down Expand Up @@ -70,6 +69,12 @@ class Telemetry:

crewai's runtime extends this with crew/agent/task/tool/flow execution spans
and event-bus signal handlers (see ``crewai.telemetry.telemetry``).

The anonymous-telemetry pipeline owns a private ``TracerProvider`` that is
never installed as OpenTelemetry's global provider. Host applications keep
full control of the process-wide provider slot, and any host spans emitted
through ``crewai.telemetry.otel.operation`` stay on the host pipeline
rather than getting exfiltrated to crewAI's OTLP endpoint.
"""

_instance: ClassVar[Self | None] = None
Expand All @@ -88,7 +93,6 @@ def __init__(self) -> None:
return

self.ready: bool = False
self.trace_set: bool = False
self._initialized: bool = True

if self._is_telemetry_disabled():
Expand Down Expand Up @@ -144,21 +148,9 @@ def _shutdown(self) -> None:
except Exception as e:
logger.debug("Telemetry shutdown failed: %s", e)

def set_tracer(self) -> None:
"""Install our TracerProvider as the global one (idempotent)."""
if self.ready and not self.trace_set:
try:
with suppress_warnings():
existing_provider = trace.get_tracer_provider()
if not isinstance(existing_provider, ProxyTracerProvider):
self.trace_set = True
return
trace.set_tracer_provider(self.provider)
self.trace_set = True
except Exception as e:
logger.debug("Failed to set tracer provider: %s", e)
self.ready = False
self.trace_set = False
def _tracer(self) -> Tracer:
"""Return the anonymous-telemetry tracer from the private provider."""
return self.provider.get_tracer("crewai.telemetry")

def _safe_telemetry_operation(
self, operation: Callable[[], Span | None]
Expand Down Expand Up @@ -194,7 +186,7 @@ def deploy_signup_error_span(self) -> None:
"""Records when an error occurs during the deployment signup process."""

def _operation() -> None:
tracer = trace.get_tracer("crewai.telemetry")
tracer = self._tracer()
span = tracer.start_span("Deploy Signup Error")
close_span(span)

Expand All @@ -204,7 +196,7 @@ def start_deployment_span(self, uuid: str | None = None) -> None:
"""Records the start of a deployment process."""

def _operation() -> None:
tracer = trace.get_tracer("crewai.telemetry")
tracer = self._tracer()
span = tracer.start_span("Start Deployment")
if uuid:
self._add_attribute(span, "uuid", uuid)
Expand All @@ -216,7 +208,7 @@ def create_crew_deployment_span(self) -> None:
"""Records the creation of a new crew deployment."""

def _operation() -> None:
tracer = trace.get_tracer("crewai.telemetry")
tracer = self._tracer()
span = tracer.start_span("Create Crew Deployment")
close_span(span)

Expand All @@ -228,7 +220,7 @@ def get_crew_logs_span(
"""Records the retrieval of crew logs."""

def _operation() -> None:
tracer = trace.get_tracer("crewai.telemetry")
tracer = self._tracer()
span = tracer.start_span("Get Crew Logs")
self._add_attribute(span, "log_type", log_type)
if uuid:
Expand All @@ -241,7 +233,7 @@ def remove_crew_span(self, uuid: str | None = None) -> None:
"""Records the removal of a crew."""

def _operation() -> None:
tracer = trace.get_tracer("crewai.telemetry")
tracer = self._tracer()
span = tracer.start_span("Remove Crew")
if uuid:
self._add_attribute(span, "uuid", uuid)
Expand All @@ -253,7 +245,7 @@ def flow_creation_span(self, flow_name: str) -> None:
"""Records the creation of a new flow."""

def _operation() -> None:
tracer = trace.get_tracer("crewai.telemetry")
tracer = self._tracer()
span = tracer.start_span("Flow Creation")
self._add_attribute(span, "flow_name", flow_name)
close_span(span)
Expand All @@ -265,7 +257,7 @@ def template_installed_span(self, template_name: str) -> None:
from crewai_core.version import get_crewai_version

def _operation() -> None:
tracer = trace.get_tracer("crewai.telemetry")
tracer = self._tracer()
span = tracer.start_span("Template Installed")
self._add_attribute(span, "crewai_version", get_crewai_version())
self._add_attribute(span, "template_name", template_name)
Expand Down
26 changes: 5 additions & 21 deletions lib/crewai-core/tests/test_smoke.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
user_data,
version,
)
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry import trace
from opentelemetry.trace import ProxyTracerProvider
import pytest


Expand Down Expand Up @@ -97,7 +98,7 @@ def test_unused_var_warning_silenced() -> None:
assert os.environ is not None


def test_core_telemetry_skips_duplicate_tracer_provider(
def test_core_telemetry_does_not_install_global_tracer_provider(
monkeypatch: pytest.MonkeyPatch,
) -> None:
from crewai_core.telemetry import Telemetry
Expand All @@ -107,24 +108,7 @@ def test_core_telemetry_skips_duplicate_tracer_provider(
monkeypatch.delenv("CREWAI_DISABLE_TELEMETRY", raising=False)
monkeypatch.delenv("CREWAI_DISABLE_TRACKING", raising=False)

monkeypatch.setattr(
"crewai_core.telemetry.trace.get_tracer_provider",
lambda: TracerProvider(),
)

called = False

def fail_if_called(provider: object) -> None:
nonlocal called
called = True

monkeypatch.setattr(
"crewai_core.telemetry.trace.set_tracer_provider",
fail_if_called,
)

telemetry = Telemetry()
telemetry.set_tracer()

assert called is False
assert telemetry.trace_set is True
assert telemetry.ready is True
assert isinstance(trace.get_tracer_provider(), ProxyTracerProvider)
117 changes: 63 additions & 54 deletions lib/crewai/src/crewai/a2a/utils/delegation.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
A2ADelegationStartedEvent,
A2AMessageSentEvent,
)
from crewai.telemetry.otel import operation


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -303,73 +304,81 @@ async def aexecute_a2a_delegation(
if turn_number is None:
turn_number = len([m for m in conversation_history if m.role == Role.user]) + 1

try:
result = await _aexecute_a2a_delegation_impl(
endpoint=endpoint,
auth=auth,
timeout=timeout,
task_description=task_description,
context=context,
context_id=context_id,
task_id=task_id,
reference_task_ids=reference_task_ids,
metadata=metadata,
extensions=extensions,
conversation_history=conversation_history,
is_multiturn=is_multiturn,
turn_number=turn_number,
agent_branch=agent_branch,
agent_id=agent_id,
agent_role=agent_role,
response_model=response_model,
updates=updates,
from_task=from_task,
from_agent=from_agent,
skill_id=skill_id,
client_extensions=client_extensions,
transport=transport,
accepted_output_modes=accepted_output_modes,
input_files=input_files,
)
except Exception as e:
with operation(
"a2a delegate",
{
"crewai.a2a.endpoint": endpoint,
"crewai.a2a.is_multiturn": is_multiturn,
"crewai.a2a.turn_number": turn_number,
},
):
try:
result = await _aexecute_a2a_delegation_impl(
endpoint=endpoint,
auth=auth,
timeout=timeout,
task_description=task_description,
context=context,
context_id=context_id,
task_id=task_id,
reference_task_ids=reference_task_ids,
metadata=metadata,
extensions=extensions,
conversation_history=conversation_history,
is_multiturn=is_multiturn,
turn_number=turn_number,
agent_branch=agent_branch,
agent_id=agent_id,
agent_role=agent_role,
response_model=response_model,
updates=updates,
from_task=from_task,
from_agent=from_agent,
skill_id=skill_id,
client_extensions=client_extensions,
transport=transport,
accepted_output_modes=accepted_output_modes,
input_files=input_files,
)
except Exception as e:
crewai_event_bus.emit(
agent_branch,
A2ADelegationCompletedEvent(
status="failed",
result=None,
error=str(e),
context_id=context_id,
is_multiturn=is_multiturn,
endpoint=endpoint,
metadata=metadata,
extensions=list(extensions.keys()) if extensions else None,
from_task=from_task,
from_agent=from_agent,
),
)
raise

agent_card_data = result.get("agent_card")
crewai_event_bus.emit(
agent_branch,
A2ADelegationCompletedEvent(
status="failed",
result=None,
error=str(e),
status=result["status"],
result=result.get("result"),
error=result.get("error"),
context_id=context_id,
is_multiturn=is_multiturn,
endpoint=endpoint,
a2a_agent_name=result.get("a2a_agent_name"),
agent_card=agent_card_data,
provider=agent_card_data.get("provider") if agent_card_data else None,
metadata=metadata,
extensions=list(extensions.keys()) if extensions else None,
from_task=from_task,
from_agent=from_agent,
),
)
raise

agent_card_data = result.get("agent_card")
crewai_event_bus.emit(
agent_branch,
A2ADelegationCompletedEvent(
status=result["status"],
result=result.get("result"),
error=result.get("error"),
context_id=context_id,
is_multiturn=is_multiturn,
endpoint=endpoint,
a2a_agent_name=result.get("a2a_agent_name"),
agent_card=agent_card_data,
provider=agent_card_data.get("provider") if agent_card_data else None,
metadata=metadata,
extensions=list(extensions.keys()) if extensions else None,
from_task=from_task,
from_agent=from_agent,
),
)

return result
return result


async def _aexecute_a2a_delegation_impl(
Expand Down
Loading
Loading