Skip to content

Commit 0e9d8b7

Browse files
Add gossip interop tests for jvm-libp2p (#838)
* Add gossip interop tests for jvm-libp2p * Handle partial field * Add check for subnet blob message * Properly exit * seqno fix * Fix missing Identify protocol --------- Co-authored-by: Marco Munizaga <marco@marcopolo.io>
1 parent fbeebf2 commit 0e9d8b7

17 files changed

Lines changed: 1049 additions & 2 deletions

File tree

gossipsub-interop/Makefile

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,17 @@ all: binaries
44
binaries:
55
cd go-libp2p && go build -linkshared -o gossipsub-bin
66
cd rust-libp2p && cargo build
7+
cd jvm-libp2p && ./gradlew installDist
78

89
# Clean all generated shadow simulation files
910
clean:
1011
rm -rf shadow-outputs || true
12+
rm latest
1113
rm plots/* || true
1214

13-
test:
15+
test: test-partial-messages test-subnet-blob
16+
17+
test-partial-messages:
1418
# Testing partial messages
1519
@echo "Testing partial messages"
1620
@uv run run.py --node_count 32 --composition "rust-and-go" --scenario "partial-messages" && uv run checks/partial_messages.py latest --count 1
@@ -24,6 +28,22 @@ test:
2428
uv run run.py --node_count 8 --seed 2 --composition "rust-and-go" --scenario "partial-messages-fanout" && uv run checks/partial_messages.py latest/
2529
uv run run.py --node_count 8 --seed 3 --composition "rust-and-go" --scenario "partial-messages-fanout" && uv run checks/partial_messages.py latest/
2630

31+
test-subnet-blob:
32+
# Testing subnet blob scenario
33+
@echo "Testing subnet blob messages"
34+
@echo "Testing single implementations"
35+
uv run run.py --node_count 32 --composition "all-go" && uv run checks/subnet_blob_msg.py latest/
36+
uv run run.py --node_count 32 --composition "all-rust" && uv run checks/subnet_blob_msg.py latest/
37+
uv run run.py --node_count 32 --composition "all-jvm" && uv run checks/subnet_blob_msg.py latest/
38+
39+
@echo "Testing impl pairs"
40+
uv run run.py --node_count 32 --composition "rust-and-go" && uv run checks/subnet_blob_msg.py latest/
41+
uv run run.py --node_count 32 --composition "jvm-and-go" && uv run checks/subnet_blob_msg.py latest/
42+
uv run run.py --node_count 32 --composition "jvm-and-rust" && uv run checks/subnet_blob_msg.py latest/
43+
44+
@echo "Testing all"
45+
uv run run.py --node_count 32 --composition "all-three" && uv run checks/subnet_blob_msg.py latest/
46+
2747
test-go:
2848
# Testing partial messages
2949
@echo "Testing partial messages"
@@ -49,4 +69,4 @@ test-rust-only:
4969

5070

5171

52-
.PHONY: binaries all clean test
72+
.PHONY: binaries all clean test test-rust-only test-go test-subnet-blob test-partial-messages
Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
#!/usr/bin/env python3
2+
"""Verify that every message in a subnet-blob-msg run was delivered to all nodes."""
3+
4+
from __future__ import annotations
5+
6+
import argparse
7+
import json
8+
import sys
9+
from collections import defaultdict
10+
from pathlib import Path
11+
12+
13+
def parse_args() -> argparse.Namespace:
14+
parser = argparse.ArgumentParser(
15+
description=(
16+
"Validate that every message in a subnet-blob-msg Shadow run "
17+
"was delivered to all nodes."
18+
)
19+
)
20+
parser.add_argument(
21+
"shadow_output",
22+
help="Path to the Shadow output directory (the one containing the hosts/ folder).",
23+
)
24+
parser.add_argument(
25+
"--min-reach",
26+
type=float,
27+
default=1.0,
28+
help="Minimum fraction of non-publisher nodes that must receive each message (default: 1.0).",
29+
)
30+
parser.add_argument(
31+
"--skip",
32+
type=int,
33+
default=4,
34+
help="Number of initial (warmup) messages to skip when checking reach (default: 4).",
35+
)
36+
return parser.parse_args()
37+
38+
39+
def iter_stdout_logs(hosts_dir: Path):
40+
"""Yield all stdout log files under the given hosts directory."""
41+
for stdout_file in sorted(hosts_dir.rglob("*.stdout")):
42+
if stdout_file.is_file():
43+
yield stdout_file
44+
45+
46+
def parse_logs(hosts_dir: Path):
47+
"""Parse all stdout logs and return per-message delivery sets and total node count.
48+
49+
Returns:
50+
(message_deliveries, node_count) where message_deliveries maps
51+
message_id -> set of node_ids that received it, ordered by first
52+
delivery time across nodes.
53+
"""
54+
# message_id -> set of node_ids
55+
deliveries: dict[str, set[str]] = defaultdict(set)
56+
# message_id -> earliest timestamp string (for ordering)
57+
first_seen: dict[str, str] = {}
58+
node_ids: set[str] = set()
59+
60+
for log_path in iter_stdout_logs(hosts_dir):
61+
node_name = log_path.parent.name # e.g. "node0"
62+
current_node_id: str | None = None
63+
64+
with log_path.open("r", encoding="utf-8", errors="replace") as fh:
65+
for line in fh:
66+
try:
67+
entry = json.loads(line)
68+
except (json.JSONDecodeError, ValueError):
69+
continue
70+
71+
msg = entry.get("msg")
72+
if msg == "PeerID":
73+
current_node_id = str(entry.get("node_id", node_name))
74+
node_ids.add(current_node_id)
75+
elif msg == "Received Message":
76+
mid = entry.get("id", "")
77+
if not mid:
78+
continue
79+
nid = current_node_id or node_name
80+
node_ids.add(nid)
81+
deliveries[mid].add(nid)
82+
ts = entry.get("time", "")
83+
if mid not in first_seen or ts < first_seen[mid]:
84+
first_seen[mid] = ts
85+
86+
# Order messages by first delivery time
87+
ordered_ids = sorted(deliveries.keys(), key=lambda m: first_seen.get(m, ""))
88+
return deliveries, ordered_ids, len(node_ids)
89+
90+
91+
def main() -> int:
92+
args = parse_args()
93+
base_dir = Path(args.shadow_output).expanduser().resolve()
94+
if not base_dir.exists():
95+
print(f"shadow output directory does not exist: {base_dir}", file=sys.stderr)
96+
return 1
97+
98+
hosts_dir = base_dir / "hosts"
99+
if not hosts_dir.is_dir():
100+
print(f"hosts directory not found under: {base_dir}", file=sys.stderr)
101+
return 1
102+
103+
deliveries, ordered_ids, node_count = parse_logs(hosts_dir)
104+
105+
if not ordered_ids:
106+
print("no messages found in logs", file=sys.stderr)
107+
return 1
108+
109+
if node_count == 0:
110+
print("no nodes found in logs", file=sys.stderr)
111+
return 1
112+
113+
# Skip warmup messages
114+
check_ids = ordered_ids[args.skip:]
115+
if not check_ids:
116+
print(
117+
f"no messages left after skipping {args.skip} warmup messages "
118+
f"(total messages: {len(ordered_ids)})",
119+
file=sys.stderr,
120+
)
121+
return 1
122+
123+
# For each message, one node is the publisher so max receivers = node_count - 1
124+
expected_receivers = node_count - 1
125+
failures: list[tuple[str, int, float]] = []
126+
127+
for mid in check_ids:
128+
receivers = len(deliveries[mid])
129+
# The publisher also appears in "Received Message" sometimes, so cap at node_count
130+
reach = min(receivers, expected_receivers) / expected_receivers if expected_receivers > 0 else 0.0
131+
if reach < args.min_reach:
132+
failures.append((mid, receivers, reach))
133+
134+
print(f"Nodes: {node_count}")
135+
print(f"Total messages: {len(ordered_ids)} (skipped {args.skip} warmup)")
136+
print(f"Checked messages: {len(check_ids)}")
137+
print(f"Required reach: {args.min_reach:.0%}")
138+
print()
139+
140+
for mid in check_ids:
141+
receivers = len(deliveries[mid])
142+
reach = min(receivers, expected_receivers) / expected_receivers if expected_receivers > 0 else 0.0
143+
status = "OK" if reach >= args.min_reach else "FAIL"
144+
print(f" [{status}] {mid}: {receivers}/{expected_receivers} nodes ({reach:.0%})")
145+
146+
print()
147+
if failures:
148+
print(
149+
f"FAILED: {len(failures)}/{len(check_ids)} messages did not reach "
150+
f"{args.min_reach:.0%} of nodes.",
151+
file=sys.stderr,
152+
)
153+
return 1
154+
155+
print(f"PASSED: all {len(check_ids)} messages reached {args.min_reach:.0%} of nodes.")
156+
return 0
157+
158+
159+
if __name__ == "__main__":
160+
sys.exit(main())

gossipsub-interop/experiment.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -358,6 +358,42 @@ def composition(preset_name: str) -> List[Binary]:
358358
),
359359
Binary("go-libp2p/gossipsub-bin", percent_of_nodes=50),
360360
]
361+
case "all-jvm":
362+
return [
363+
Binary(
364+
"jvm-libp2p/build/install/jvm-libp2p-gossip/bin/jvm-libp2p-gossip",
365+
percent_of_nodes=100,
366+
)
367+
]
368+
case "jvm-and-go":
369+
return [
370+
Binary(
371+
"jvm-libp2p/build/install/jvm-libp2p-gossip/bin/jvm-libp2p-gossip",
372+
percent_of_nodes=50,
373+
),
374+
Binary("go-libp2p/gossipsub-bin", percent_of_nodes=50),
375+
]
376+
case "jvm-and-rust":
377+
return [
378+
Binary(
379+
"jvm-libp2p/build/install/jvm-libp2p-gossip/bin/jvm-libp2p-gossip",
380+
percent_of_nodes=50,
381+
),
382+
Binary(
383+
"rust-libp2p/target/debug/rust-libp2p-gossip", percent_of_nodes=50
384+
),
385+
]
386+
case "all-three":
387+
return [
388+
Binary("go-libp2p/gossipsub-bin", percent_of_nodes=34),
389+
Binary(
390+
"rust-libp2p/target/debug/rust-libp2p-gossip", percent_of_nodes=33
391+
),
392+
Binary(
393+
"jvm-libp2p/build/install/jvm-libp2p-gossip/bin/jvm-libp2p-gossip",
394+
percent_of_nodes=33,
395+
),
396+
]
361397
raise ValueError(f"Unknown preset name: {preset_name}")
362398

363399

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
build/
2+
.gradle/
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
plugins {
2+
kotlin("jvm") version "1.6.21"
3+
application
4+
}
5+
6+
repositories {
7+
mavenCentral()
8+
maven { url = uri("https://dl.cloudsmith.io/public/libp2p/jvm-libp2p/maven/") }
9+
maven { url = uri("https://jitpack.io") }
10+
maven { url = uri("https://artifacts.consensys.net/public/maven/maven/") }
11+
}
12+
13+
dependencies {
14+
implementation("io.libp2p:jvm-libp2p:1.2.2-RELEASE")
15+
implementation("com.fasterxml.jackson.module:jackson-module-kotlin:2.15.3")
16+
17+
testImplementation(kotlin("test"))
18+
}
19+
20+
application {
21+
mainClass.set("gossipsub.interop.MainKt")
22+
}
23+
24+
tasks.test {
25+
useJUnitPlatform()
26+
}
60.2 KB
Binary file not shown.
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
distributionBase=GRADLE_USER_HOME
2+
distributionPath=wrapper/dists
3+
distributionSha256Sum=31c55713e40233a8303827ceb42ca48a47267a0ad4bab9177123121e71524c26
4+
distributionUrl=https\://services.gradle.org/distributions/gradle-8.10.2-bin.zip
5+
networkTimeout=10000
6+
zipStoreBase=GRADLE_USER_HOME
7+
zipStorePath=wrapper/dists

0 commit comments

Comments
 (0)