Skip to content

Commit 1a97da8

Browse files
committed
test(telemetry): guard propagate=False Baggage behavior; add Jaeger OTLP integration script
- Extend verify_invoke_agent_baggage subprocess helper with --propagate-false - Fix empty-span attribute dict falsy check - Add Jaeger end-to-end script (subprocess workers per OTEL provider limitation)
1 parent 32a921a commit 1a97da8

3 files changed

Lines changed: 293 additions & 10 deletions

File tree

Lines changed: 230 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,230 @@
1+
#!/usr/bin/env python3
2+
"""
3+
End-to-end check: OTLP export to Jaeger UI with propagate=True vs propagate=False.
4+
5+
Requires Jaeger all-in-one with OTLP gRPC on localhost:4317 and UI on localhost:16686, e.g.:
6+
7+
docker compose -f cluster-observability-verification/local-otel-jaeger-test/docker-compose.yml up -d
8+
9+
Then from cloud-sdk-python repo root:
10+
11+
PYTHONPATH=src python scripts/verify_invoke_agent_jaeger_integration.py
12+
13+
Each OTLP/Jaeger scenario runs in a **fresh Python subprocess** so TracerProvider can initialize
14+
(OpenTelemetry forbids overriding the provider in-process).
15+
16+
Exit code 0 only if both scenarios pass Jaeger assertions.
17+
"""
18+
19+
from __future__ import annotations
20+
21+
import argparse
22+
import json
23+
import os
24+
import subprocess
25+
import sys
26+
import time
27+
import urllib.error
28+
import urllib.parse
29+
import urllib.request
30+
from pathlib import Path
31+
from typing import Any, Optional
32+
33+
# Imports below are only needed in worker process (lazy in _worker_main)
34+
DEFAULT_OTLP = "http://localhost:4317"
35+
DEFAULT_UI = "http://localhost:16686"
36+
37+
38+
def _grpc_host_port(endpoint: str) -> str:
39+
u = endpoint.strip()
40+
if u.startswith("http://"):
41+
u = u[7:]
42+
elif u.startswith("https://"):
43+
u = u[8:]
44+
return u.split("/")[0]
45+
46+
47+
def _tags_dict(span: dict[str, Any]) -> dict[str, Any]:
48+
out: dict[str, Any] = {}
49+
for t in span.get("tags") or []:
50+
key = t.get("key")
51+
if key:
52+
out[key] = t.get("value")
53+
return out
54+
55+
56+
def _fetch_trace_for_service(ui_base: str, service: str, timeout_sec: float = 45.0) -> Optional[dict]:
57+
q = urllib.parse.urlencode({"service": service, "limit": "5"})
58+
url = f"{ui_base.rstrip('/')}/api/traces?{q}"
59+
deadline = time.monotonic() + timeout_sec
60+
last_err: Optional[Exception] = None
61+
while time.monotonic() < deadline:
62+
try:
63+
req = urllib.request.Request(url, headers={"Accept": "application/json"})
64+
with urllib.request.urlopen(req, timeout=5.0) as resp:
65+
data = json.loads(resp.read().decode())
66+
traces = data.get("data") or []
67+
if traces:
68+
return traces[0]
69+
except (urllib.error.URLError, TimeoutError, json.JSONDecodeError, OSError) as e:
70+
last_err = e
71+
time.sleep(0.8)
72+
if last_err:
73+
print(f"WARN: Jaeger poll error: {last_err}", file=sys.stderr)
74+
return None
75+
76+
77+
def _worker_main() -> int:
78+
"""Single-process OTLP emit + Jaeger assert (import OTEL only here)."""
79+
from opentelemetry import trace
80+
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
81+
from opentelemetry.processor.baggage import ALLOW_ALL_BAGGAGE_KEYS, BaggageSpanProcessor
82+
from opentelemetry.sdk.resources import Resource
83+
from opentelemetry.sdk.trace import TracerProvider
84+
from opentelemetry.sdk.trace.export import BatchSpanProcessor
85+
from opentelemetry.trace import SpanKind
86+
87+
from sap_cloud_sdk.core.telemetry.tracer import invoke_agent_span
88+
89+
p = argparse.ArgumentParser()
90+
p.add_argument("--worker", action="store_true")
91+
p.add_argument("--propagate", type=lambda x: x.lower() == "true", required=True)
92+
p.add_argument("--run-id", required=True)
93+
p.add_argument("--otlp-endpoint", default=DEFAULT_OTLP)
94+
p.add_argument("--jaeger-ui", default=DEFAULT_UI)
95+
p.add_argument("--agent-name", default="currency-agent-local")
96+
p.add_argument("--agent-id", default="f268d3fd-096f-4ebc-b7ed-6fc03dfb3dde")
97+
args = p.parse_args()
98+
99+
propagate = args.propagate
100+
svc = f"sdk-prop-{'true' if propagate else 'false'}-{args.run_id}"
101+
102+
resource = Resource.create({"service.name": svc})
103+
exporter = OTLPSpanExporter(endpoint=_grpc_host_port(args.otlp_endpoint), insecure=True)
104+
provider = TracerProvider(resource=resource)
105+
provider.add_span_processor(BaggageSpanProcessor(ALLOW_ALL_BAGGAGE_KEYS))
106+
provider.add_span_processor(BatchSpanProcessor(exporter))
107+
trace.set_tracer_provider(provider)
108+
109+
external = trace.get_tracer("langchain")
110+
111+
with invoke_agent_span(
112+
provider="sap",
113+
agent_name=args.agent_name,
114+
agent_id=args.agent_id,
115+
kind=SpanKind.INTERNAL,
116+
propagate=propagate,
117+
):
118+
with external.start_as_current_span("invoke_agent"):
119+
pass
120+
121+
provider.shutdown()
122+
123+
trace_data = _fetch_trace_for_service(args.jaeger_ui, svc)
124+
if not trace_data:
125+
print(
126+
f"FAIL: No trace in Jaeger for service={svc!r}",
127+
file=sys.stderr,
128+
)
129+
return 1
130+
131+
inner: Optional[dict] = None
132+
for sp in trace_data.get("spans", []):
133+
if sp.get("operationName") == "invoke_agent":
134+
inner = sp
135+
break
136+
if not inner:
137+
print(f"FAIL: No span invoke_agent in trace for {svc}", file=sys.stderr)
138+
return 1
139+
140+
tags = _tags_dict(inner)
141+
name_tag = tags.get("gen_ai.agent.name")
142+
id_tag = tags.get("gen_ai.agent.id")
143+
144+
if propagate:
145+
if name_tag != args.agent_name or id_tag != args.agent_id:
146+
print(
147+
f"FAIL propagate=True: expected name={args.agent_name!r} id={args.agent_id!r}; "
148+
f"got name={name_tag!r} id={id_tag!r} tags={tags}",
149+
file=sys.stderr,
150+
)
151+
return 1
152+
print(f"PASS propagate=True: Jaeger shows gen_ai.agent.* on nested span ({svc})", flush=True)
153+
return 0
154+
155+
if name_tag is not None or id_tag is not None:
156+
print(
157+
f"FAIL propagate=False: expected no gen_ai.agent.* on nested span; "
158+
f"got name={name_tag!r} id={id_tag!r}",
159+
file=sys.stderr,
160+
)
161+
return 1
162+
print(f"PASS propagate=False: nested span has no gen_ai.agent.* ({svc})", flush=True)
163+
return 0
164+
165+
166+
def orchestrator_main() -> int:
167+
parser = argparse.ArgumentParser(description=__doc__)
168+
parser.add_argument("--otlp-endpoint", default=DEFAULT_OTLP)
169+
parser.add_argument("--jaeger-ui", default=DEFAULT_UI)
170+
parser.add_argument("--agent-name", default="currency-agent-local")
171+
parser.add_argument("--agent-id", default="f268d3fd-096f-4ebc-b7ed-6fc03dfb3dde")
172+
args = parser.parse_args()
173+
174+
run_id = str(int(time.time() * 1000) % 10_000_000)
175+
script_path = Path(__file__).resolve()
176+
repo_root = script_path.parents[1]
177+
178+
py = sys.executable
179+
env = os.environ.copy()
180+
src = str(repo_root / "src")
181+
env["PYTHONPATH"] = env.get("PYTHONPATH", "") + os.pathsep + src if env.get("PYTHONPATH") else src
182+
183+
print(
184+
f"Jaeger UI: {args.jaeger_ui} | OTLP: {args.otlp_endpoint}\n"
185+
"Ensure Jaeger all-in-one is running (OTLP :4317, UI :16686).\n",
186+
flush=True,
187+
)
188+
189+
base_cmd = [
190+
py,
191+
str(script_path),
192+
"--worker",
193+
"--run-id",
194+
run_id,
195+
"--otlp-endpoint",
196+
args.otlp_endpoint,
197+
"--jaeger-ui",
198+
args.jaeger_ui,
199+
"--agent-name",
200+
args.agent_name,
201+
"--agent-id",
202+
args.agent_id,
203+
]
204+
205+
for propagate, label in ((False, "disabled (propagate=False)"), (True, "enabled (propagate=True)")):
206+
cmd = base_cmd + ["--propagate", str(propagate).lower()]
207+
print(f"\n=== {label} ===", flush=True)
208+
r = subprocess.run(cmd, env=env, cwd=str(repo_root))
209+
if r.returncode != 0:
210+
print(
211+
"\nTip: start Jaeger:\n"
212+
" docker compose -f "
213+
"<repo>/generic/cluster-observability-verification/"
214+
"local-otel-jaeger-test/docker-compose.yml up -d\n",
215+
file=sys.stderr,
216+
)
217+
return r.returncode
218+
219+
print("\nAll Jaeger integration checks passed.", flush=True)
220+
return 0
221+
222+
223+
def main() -> int:
224+
if "--worker" in sys.argv:
225+
return _worker_main()
226+
return orchestrator_main()
227+
228+
229+
if __name__ == "__main__":
230+
raise SystemExit(main())

tests/core/unit/telemetry/test_tracer.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -895,6 +895,30 @@ def test_invoke_agent_propagate_baggage_applies_to_external_tracer_span_subproce
895895
f"stderr={result.stderr}\nstdout={result.stdout}"
896896
)
897897

898+
def test_invoke_agent_propagate_false_skips_baggage_for_external_tracer_span_subprocess(
899+
self,
900+
):
901+
"""Baggage mirroring for agent identity runs only when propagate=True."""
902+
import os
903+
import subprocess
904+
import sys
905+
from pathlib import Path
906+
907+
root = Path(__file__).resolve().parents[4]
908+
script = root / "tests/core/unit/telemetry/verify_invoke_agent_baggage.py"
909+
env = os.environ.copy()
910+
env["PYTHONPATH"] = str(root / "src")
911+
result = subprocess.run(
912+
[sys.executable, str(script), "--propagate-false"],
913+
env=env,
914+
capture_output=True,
915+
text=True,
916+
timeout=60,
917+
)
918+
assert result.returncode == 0, (
919+
f"stderr={result.stderr}\nstdout={result.stdout}"
920+
)
921+
898922

899923
class TestPropagate:
900924
"""Test suite for propagate=True attribute propagation across nested spans."""

tests/core/unit/telemetry/verify_invoke_agent_baggage.py

Lines changed: 39 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,16 @@
11
#!/usr/bin/env python3
2-
"""Subprocess helper: invoke_agent_span(propagate=True) + BaggageSpanProcessor + external tracer.
2+
"""Subprocess helper: Baggage propagation on invoke_agent_span + external tracer.
33
44
Run from repo root with PYTHONPATH=src (see test_tracer.py).
5+
6+
Modes:
7+
default — propagate=True expects gen_ai.agent.* on nested span
8+
--propagate-false — propagate=False expects nested span WITHOUT those attrs from Baggage
59
"""
610

711
from __future__ import annotations
812

13+
import argparse
914
import sys
1015
from pathlib import Path
1116

@@ -26,6 +31,15 @@
2631

2732

2833
def main() -> int:
34+
parser = argparse.ArgumentParser()
35+
parser.add_argument(
36+
"--propagate-false",
37+
action="store_true",
38+
help="Verify Baggage is NOT applied to external span when propagate=False",
39+
)
40+
args = parser.parse_args()
41+
propagate = not args.propagate_false
42+
2943
exporter = InMemorySpanExporter()
3044
provider = TracerProvider()
3145
provider.add_span_processor(BaggageSpanProcessor(ALLOW_ALL_BAGGAGE_KEYS))
@@ -39,23 +53,38 @@ def main() -> int:
3953
agent_name="currency-agent-local",
4054
agent_id="f268d3fd-096f-4ebc-b7ed-6fc03dfb3dde",
4155
kind=SpanKind.INTERNAL,
42-
propagate=True,
56+
propagate=propagate,
4357
):
4458
with external_tracer.start_as_current_span("invoke_agent"):
4559
pass
4660

4761
spans = {s.name: dict(s.attributes or {}) for s in exporter.get_finished_spans()}
48-
inner = spans.get("invoke_agent")
49-
if not inner:
62+
if "invoke_agent" not in spans:
5063
print("FAIL: no inner invoke_agent span", spans, file=sys.stderr)
5164
return 1
52-
if inner.get("gen_ai.agent.name") != "currency-agent-local":
53-
print("FAIL: gen_ai.agent.name", inner, file=sys.stderr)
54-
return 1
55-
if inner.get("gen_ai.agent.id") != "f268d3fd-096f-4ebc-b7ed-6fc03dfb3dde":
56-
print("FAIL: gen_ai.agent.id", inner, file=sys.stderr)
65+
inner = spans["invoke_agent"]
66+
67+
if propagate:
68+
if inner.get("gen_ai.agent.name") != "currency-agent-local":
69+
print("FAIL: gen_ai.agent.name", inner, file=sys.stderr)
70+
return 1
71+
if inner.get("gen_ai.agent.id") != "f268d3fd-096f-4ebc-b7ed-6fc03dfb3dde":
72+
print("FAIL: gen_ai.agent.id", inner, file=sys.stderr)
73+
return 1
74+
print(
75+
"OK: external child span carries gen_ai.agent.name and gen_ai.agent.id via baggage"
76+
)
77+
return 0
78+
79+
# propagate=False: SDK must not inject agent identity into Baggage for this scope
80+
if inner.get("gen_ai.agent.name") is not None or inner.get("gen_ai.agent.id") is not None:
81+
print(
82+
"FAIL: propagate=False but nested span has gen_ai.agent.* (should not):",
83+
inner,
84+
file=sys.stderr,
85+
)
5786
return 1
58-
print("OK: external child span carries gen_ai.agent.name and gen_ai.agent.id via baggage")
87+
print("OK: propagate=False — nested span has no gen_ai.agent.* from Baggage injection")
5988
return 0
6089

6190

0 commit comments

Comments
 (0)