|
| 1 | +"""ADR-043 §9/§11 — ServiceQualification: 8-category exposure enum + structured result. |
| 2 | +
|
| 3 | +Maps a NatProfile to the canonical exposure_mode string and a ServiceQualification |
| 4 | +object that the directory can store as `nodes.exposure_mode` and consumers can use |
| 5 | +for routing decisions (prefer direct vs relay vs tunnel). |
| 6 | +
|
| 7 | +Usage:: |
| 8 | +
|
| 9 | + from iicp_client import IicpNode, qualify_service |
| 10 | + profile = await detect_nat(...) |
| 11 | + sq = qualify_service(profile) |
| 12 | + print(sq.exposure_mode) # e.g. "ipv4_public_direct" |
| 13 | +
|
| 14 | +Or standalone (runs detection internally):: |
| 15 | +
|
| 16 | + sq = await qualify_service_async(bind_host="0.0.0.0", bind_port=8020) |
| 17 | +""" |
| 18 | + |
| 19 | +from __future__ import annotations |
| 20 | + |
| 21 | +from dataclasses import dataclass, field |
| 22 | +from typing import TYPE_CHECKING |
| 23 | + |
| 24 | +if TYPE_CHECKING: |
| 25 | + from iicp_client.nat_detection import NatProfile |
| 26 | + |
| 27 | +# ── 8-category exposure enum (ADR-043 §9) ──────────────────────────────────── |
| 28 | + |
| 29 | +EXPOSURE_MODES = frozenset({ |
| 30 | + "outbound_only", |
| 31 | + "ipv4_public_direct", |
| 32 | + "ipv4_cgnat_blocked", |
| 33 | + "ipv6_direct_firewall_required", |
| 34 | + "ipv6_direct_pinhole_available", |
| 35 | + "relay_required", |
| 36 | + "tunnel_required", |
| 37 | + "dual_stack_available", |
| 38 | +}) |
| 39 | + |
| 40 | + |
| 41 | +@dataclass |
| 42 | +class Ipv4Qualification: |
| 43 | + public_ip: str | None = None |
| 44 | + cgnat: bool = False |
| 45 | + upnp_mapped: bool = False |
| 46 | + |
| 47 | + |
| 48 | +@dataclass |
| 49 | +class Ipv6Qualification: |
| 50 | + routable: bool = False |
| 51 | + pinhole_ok: bool = False |
| 52 | + address: str | None = None |
| 53 | + |
| 54 | + |
| 55 | +@dataclass |
| 56 | +class ExposureQualification: |
| 57 | + public_endpoint: str | None = None |
| 58 | + transport_endpoint: str | None = None |
| 59 | + |
| 60 | + |
| 61 | +@dataclass |
| 62 | +class ServiceQualification: |
| 63 | + """ADR-043 §11 — structured result of service qualification. |
| 64 | +
|
| 65 | + ``exposure_mode`` is the canonical 8-category enum used for directory |
| 66 | + routing decisions and the ``nodes.exposure_mode`` column. |
| 67 | + """ |
| 68 | + |
| 69 | + exposure_mode: str |
| 70 | + ipv4: Ipv4Qualification = field(default_factory=Ipv4Qualification) |
| 71 | + ipv6: Ipv6Qualification = field(default_factory=Ipv6Qualification) |
| 72 | + exposure: ExposureQualification = field(default_factory=ExposureQualification) |
| 73 | + recommendation: str = "" |
| 74 | + |
| 75 | + def to_dict(self) -> dict: |
| 76 | + return { |
| 77 | + "exposure_mode": self.exposure_mode, |
| 78 | + "ipv4": { |
| 79 | + "public_ip": self.ipv4.public_ip, |
| 80 | + "cgnat": self.ipv4.cgnat, |
| 81 | + "upnp_mapped": self.ipv4.upnp_mapped, |
| 82 | + }, |
| 83 | + "ipv6": { |
| 84 | + "routable": self.ipv6.routable, |
| 85 | + "pinhole_ok": self.ipv6.pinhole_ok, |
| 86 | + "address": self.ipv6.address, |
| 87 | + }, |
| 88 | + "exposure": { |
| 89 | + "public_endpoint": self.exposure.public_endpoint, |
| 90 | + "transport_endpoint": self.exposure.transport_endpoint, |
| 91 | + }, |
| 92 | + "recommendation": self.recommendation, |
| 93 | + } |
| 94 | + |
| 95 | + |
| 96 | +# ── Core mapping (NatProfile → ServiceQualification) ───────────────────────── |
| 97 | + |
| 98 | +def qualify_service(profile: "NatProfile") -> ServiceQualification: |
| 99 | + """Map a NatProfile to an ADR-043 ServiceQualification. |
| 100 | +
|
| 101 | + Derives the 8-category ``exposure_mode`` from tier/transport_method/detection_log |
| 102 | + and populates the structured sub-objects for directory storage. |
| 103 | +
|
| 104 | + Synchronous — pass an already-awaited NatProfile. For a fully autonomous |
| 105 | + detection + qualification flow use ``qualify_service_async()``. |
| 106 | + """ |
| 107 | + ipv4_q = Ipv4Qualification() |
| 108 | + ipv6_q = Ipv6Qualification() |
| 109 | + exposure_q = ExposureQualification( |
| 110 | + public_endpoint=profile.public_endpoint, |
| 111 | + transport_endpoint=profile.transport_endpoint, |
| 112 | + ) |
| 113 | + |
| 114 | + # Populate IPv4 sub-object |
| 115 | + if profile.public_endpoint: |
| 116 | + import re |
| 117 | + m = re.search(r"https?://([^:/]+)", profile.public_endpoint) |
| 118 | + if m: |
| 119 | + ipv4_q.public_ip = m.group(1) |
| 120 | + ipv4_q.cgnat = any( |
| 121 | + kw in log.lower() for log in profile.detection_log |
| 122 | + for kw in ("cgnat", "ds-lite", "carrier-grade") |
| 123 | + ) |
| 124 | + ipv4_q.upnp_mapped = profile.transport_method in ("upnp_mapped",) |
| 125 | + |
| 126 | + # Populate IPv6 sub-object |
| 127 | + if profile.ipv6: |
| 128 | + ipv6_q.routable = bool(profile.ipv6.routable_address) |
| 129 | + ipv6_q.pinhole_ok = getattr(profile.ipv6, "pinhole_ok", False) |
| 130 | + ipv6_q.address = profile.ipv6.routable_address |
| 131 | + |
| 132 | + exposure_mode = _derive_exposure_mode(profile, ipv4_q, ipv6_q) |
| 133 | + recommendation = _build_recommendation(exposure_mode, profile) |
| 134 | + |
| 135 | + return ServiceQualification( |
| 136 | + exposure_mode=exposure_mode, |
| 137 | + ipv4=ipv4_q, |
| 138 | + ipv6=ipv6_q, |
| 139 | + exposure=exposure_q, |
| 140 | + recommendation=recommendation, |
| 141 | + ) |
| 142 | + |
| 143 | + |
| 144 | +def _derive_exposure_mode( |
| 145 | + profile: "NatProfile", |
| 146 | + ipv4_q: Ipv4Qualification, |
| 147 | + ipv6_q: Ipv6Qualification, |
| 148 | +) -> str: |
| 149 | + ipv6_available = ipv6_q.routable |
| 150 | + |
| 151 | + if profile.tier == 3: |
| 152 | + return "relay_required" |
| 153 | + |
| 154 | + if profile.tier == 2 or profile.transport_method == "external_tunnel": |
| 155 | + return "tunnel_required" |
| 156 | + |
| 157 | + if profile.tier == 4 or profile.public_endpoint is None: |
| 158 | + return "ipv4_cgnat_blocked" if ipv4_q.cgnat else "outbound_only" |
| 159 | + |
| 160 | + # tier 0 or 1 — some form of direct/mapped reachability |
| 161 | + ipv4_reachable = profile.public_endpoint is not None |
| 162 | + |
| 163 | + if ipv4_reachable and ipv6_available and ipv6_q.pinhole_ok: |
| 164 | + return "dual_stack_available" |
| 165 | + |
| 166 | + if not ipv4_reachable and ipv6_available: |
| 167 | + return "ipv6_direct_pinhole_available" if ipv6_q.pinhole_ok else "ipv6_direct_firewall_required" |
| 168 | + |
| 169 | + if ipv4_reachable: |
| 170 | + return "ipv4_public_direct" |
| 171 | + |
| 172 | + return "outbound_only" |
| 173 | + |
| 174 | + |
| 175 | +def _build_recommendation(mode: str, profile: "NatProfile") -> str: |
| 176 | + messages = { |
| 177 | + "ipv4_public_direct": "Direct IPv4 connection available. No additional setup needed.", |
| 178 | + "dual_stack_available": "Dual-stack (IPv4 + IPv6) available. Consumers can reach you on either path.", |
| 179 | + "ipv6_direct_pinhole_available": "IPv6 direct connection available with firewall pinhole. IPv4 unreachable.", |
| 180 | + "ipv6_direct_firewall_required": "IPv6 address routable but firewall is blocking. Open the relevant port.", |
| 181 | + "relay_required": "Behind CGNAT or strict firewall — use relay mode (iicp-node --relay-worker-endpoint).", |
| 182 | + "tunnel_required": "External tunnel detected (ngrok/Tailscale). Advertise the tunnel URL as public endpoint.", |
| 183 | + "ipv4_cgnat_blocked": "Carrier-grade NAT detected. Relay mode is the recommended path.", |
| 184 | + "outbound_only": "No inbound connectivity detected. Set --public-endpoint manually or use relay mode.", |
| 185 | + } |
| 186 | + base = messages.get(mode, "Unknown exposure mode.") |
| 187 | + guidance = profile.operator_guidance |
| 188 | + return f"{base} {guidance}".strip() if guidance else base |
| 189 | + |
| 190 | + |
| 191 | +# ── Async convenience wrapper ───────────────────────────────────────────────── |
| 192 | + |
| 193 | +async def qualify_service_async( |
| 194 | + bind_host: str = "0.0.0.0", |
| 195 | + bind_port: int = 8020, |
| 196 | + transport_port: int = 9484, |
| 197 | + *, |
| 198 | + detect_v6: bool = True, |
| 199 | + timeout_s: float = 15.0, |
| 200 | +) -> ServiceQualification: |
| 201 | + """Run NAT detection and return a ServiceQualification in one call. |
| 202 | +
|
| 203 | + Equivalent to ``detect_nat(...)`` followed by ``qualify_service(profile)``. |
| 204 | + """ |
| 205 | + from iicp_client.nat_detection import detect_nat |
| 206 | + profile = await detect_nat( |
| 207 | + bind_host=bind_host, |
| 208 | + bind_port=bind_port, |
| 209 | + transport_port=transport_port, |
| 210 | + detect_v6=detect_v6, |
| 211 | + timeout_s=timeout_s, |
| 212 | + ) |
| 213 | + return qualify_service(profile) |
0 commit comments