Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions kartograf/merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,12 @@
from pathlib import Path
import ipaddress
import math
import os
import shutil
from types import SimpleNamespace
import pandas as pd

from kartograf.timed import timed
from kartograf.util import get_root_network
from kartograf.util import get_root_network, get_threads


class BaseNetworkIndex:
Expand Down Expand Up @@ -163,7 +162,7 @@ def pick_chunk_size(n_rows: int, workers: int | None = None,
min_chunk: int = 5,
max_chunk: int = 200_000) -> int:
if workers is None:
workers = os.cpu_count() or 4
workers = get_threads()
chunk = math.ceil(n_rows / workers)
return max(min_chunk, min(max_chunk, chunk))

Expand Down
57 changes: 20 additions & 37 deletions kartograf/rpki/fetch.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
import subprocess
import sys

from concurrent.futures import ThreadPoolExecutor, as_completed
import json
from threading import Lock
from pathlib import Path
import requests
from tqdm import tqdm

from kartograf.timed import timed
from kartograf.util import (
calculate_sha256,
calculate_sha256_directory,
get_threads,
)

TAL_URLS = {
Expand Down Expand Up @@ -99,62 +97,47 @@ def fetch_rpki_db(context):

@timed
def validate_rpki_db(context):
files = [path for path in Path(context.data_dir_rpki_cache).rglob('*')
if path.is_file() and ((path.suffix == ".roa")
or (path.name == ".roa"))]

print(f"{len(files)} raw RKPI ROA files found.")
print("Validating RPKI ROAs")
rpki_raw_file = 'rpki_raw.json'
result_path = Path(context.out_dir_rpki) / rpki_raw_file

tal_options = [item for path in data_tals(context) for item in ('-t', path)]
threads = get_threads()

debug_file_lock = Lock()

if context.debug_log:
with open(context.debug_log, 'a') as logs:
logs.write("\n\n=== RPKI Validation ===\n")

def process_files_batch(batch):
def process_rpki_cache():
result = subprocess.run(["rpki-client",
"-j",
"-n",
"-d",
context.data_dir_rpki_cache,
f"-p {threads}",
"-P",
context.epoch,
] + tal_options +
["-f"] + batch, # -f has to be last
] + tal_options + [context.out_dir_rpki],
capture_output=True,
check=False)

if result.stderr and context.debug_log:
stderr_output = result.stderr.decode()
if context.debug_log:
with debug_file_lock:
with open(context.debug_log, 'a') as logs:
logs.write(stderr_output)
if result.stderr:
std_err = result.stderr.decode()
logs.write(std_err)
if result.stdout:
logs.write("\n== RPKI Validation Summary ==\n")
std_output = result.stdout.decode()
for line in std_output:
logs.write(line)
return result.stdout

total = len(files)
batch_size = 250
batches = []
for i in range(0, total, batch_size):
batch = [str(f) for f in files[i:i + batch_size]]
batches.append(batch)

total_batches = len(batches)
results = []
with ThreadPoolExecutor() as executor:
futures = [executor.submit(process_files_batch, batch) for batch in batches]
for future in tqdm(as_completed(futures), total=total_batches):
result = future.result()
if result:
normalized = result.replace(b"\n}\n{\n\t", b"\n},\n{\n").decode('utf-8').strip()
results.append(normalized)
results_json = json.loads("[" + ",".join(results) + "]")
s = sorted(results_json, key=lambda result: result["hash_id"])

with open(result_path, 'w') as f:
json.dump(s, f)

print(f"{len(results_json)} RKPI ROAs validated\nSaved to: {result_path.name}\nFile hash: {calculate_sha256(result_path)}")
process_rpki_cache()
default_out_dir = Path(context.out_dir_rpki) / "json"
default_out_dir.rename(result_path)

print(f"RKPI ROAs validated and saved to {result_path}, file hash: {calculate_sha256(result_path)}")
107 changes: 32 additions & 75 deletions kartograf/rpki/parse.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,87 +20,47 @@ def parse_rpki(context):

dups_count = 0
out_count = 0
invalids = 0
incompletes = 0
not_roas = 0

with open(raw_input, "r") as dump:
data = json.loads(dump.read())
print(f'Parsing {len(data)} ROAs')

for roa in data:
# Sometimes ROAs are incomplete and we have to skip them
key_list = [
'type',
'validation',
'aki',
'ski',
'vrps',
'valid_until'
]
if not all(key in roa for key in key_list):
incompletes += 1
continue

# We are only interested in ROAs
if roa['type'] != "roa":

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

they're all ROAs, iiuc

not_roas += 1
print(f'Parsing {data["metadata"]["roas"]} ROAs')

for roa in data["roas"]:
asn = roa['asn']
expiry = roa['expires']
prefix = parse_pfx(roa['prefix'])

# Bogon prefixes and ASNs are excluded since they can not
# be used for routing.
if not prefix or is_bogon_pfx(prefix) or is_bogon_asn(asn):
if context.debug_log:
with open(context.debug_log, 'a') as logs:
logs.write(f"RPKI: parser encountered an invalid entry: {prefix} with ASN {asn}\n")
continue

# We are only interested in valid ROAs
if roa['validation'] != "OK":

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only valid ROAs are output

invalids += 1
if context.max_encode and is_out_of_encoding_range(asn, context.max_encode):
continue

valid_until = roa['valid_until']
valid_since = roa['valid_since']

for vrp in roa['vrps']:
asn = vrp['asid']
prefix = parse_pfx(vrp['prefix'])
if not prefix:
if context.debug_log:
with open(context.debug_log, 'a') as logs:
logs.write(f"Could not parse prefix from line: {vrp['prefix']}")
continue
# Bogon prefixes and ASNs are excluded since they can not
# be used for routing.
if is_bogon_pfx(prefix) or is_bogon_asn(asn):
if context.debug_log:
with open(context.debug_log, 'a') as logs:
logs.write(f"RPKI: parser encountered an invalid IP network: {prefix}\n")
continue

if context.max_encode and is_out_of_encoding_range(asn, context.max_encode):
continue

# Multiple ROAs for the same prefix are possible and we need
# to decide if we update the entry or not
if output_cache.get(prefix):
dups_count += 1
# If the new ASN is from a ROA that is valid for longer,
# we override the old entry with it
[old_asn, old_valid_until, old_valid_since] = output_cache[prefix]
if int(valid_until) > int(old_valid_until):
output_cache[prefix] = [asn, valid_until, valid_since]
# If the entries have the same validity period, we need to
# choose a different tie breaker
if int(valid_until) == int(old_valid_until):
# Prefer the ROA that was announced last
if int(valid_since) > int(old_valid_since):
output_cache[prefix] = [asn, valid_until, valid_since]
# If the ROAs were also announced at the same time, we
# fall back to using the lower ASN just to be
# deterministic
if int(valid_since) == int(old_valid_since):
if int(asn) < int(old_asn):
output_cache[prefix] = [asn, valid_until, valid_since]
else:
# No duplicate, add to cache
output_cache[prefix] = [asn, valid_until, valid_since]
# Multiple ROAs for the same prefix are possible and we need
# to decide if we update the entry or not
if output_cache.get(prefix):
dups_count += 1
# If the new ASN is from a ROA that is valid for longer,
# we override the old entry with it
[old_asn, old_expiry] = output_cache[prefix]
if expiry > old_expiry:
output_cache[prefix] = [asn, expiry]
# If the entries have the same validity period, we need to
# choose a different tie breaker
if expiry == old_expiry:
if int(asn) < int(old_asn):
output_cache[prefix] = [asn, expiry]
else:
# No duplicate, add to cache
output_cache[prefix] = [asn, expiry]

with open(rpki_res, "w") as asmap:
for prefix, [asn, _, _] in output_cache.items():
for prefix, [asn, _] in output_cache.items():
line_out = f"{prefix} AS{asn}"

asmap.write(line_out + '\n')
Expand All @@ -110,6 +70,3 @@ def parse_rpki(context):

print(f'Result entries written: {out_count}')
print(f'Duplicates found: {dups_count}')
print(f'Invalids found: {invalids}')
print(f'Incompletes: {incompletes}')
print(f'Non-ROA files: {not_roas}')
8 changes: 8 additions & 0 deletions kartograf/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,3 +173,11 @@ def get_root_network(pfx):
if root_net:
return int(root_net, 16)
return None


def get_threads():
"""
Returns the count of logical CPUs on the host machine.
If None are found (for some reason), defaults to 4.
"""
return os.cpu_count() or 4
14 changes: 7 additions & 7 deletions tests/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,16 @@ def load_rpki_csv_to_json(context, fixtures_path):
with open(csv_path, 'r') as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
vrps = [{"prefix": row["prefix"], "asid": row["asid"], "maxlen": row["maxlen"]}]
del row["prefix"]
del row["asid"]
del row["maxlen"]
row["vrps"] = vrps
rpki_data.append(row)
vrps = {"prefix": row["prefix"], "asn": row["asid"], "maxlen": row["maxlen"], "expires": row["valid_until"]}
# del row["prefix"]
# del row["asid"]
# del row["maxlen"]
# row["vrps"] = vrps
rpki_data.append(vrps)

output_path = Path(context.out_dir_rpki) / 'rpki_raw.json'
with open(output_path, 'w') as jsonfile:
json.dump(rpki_data, jsonfile, indent=2)
json.dump({"metadata": {"roas": 24}, "roas": rpki_data}, jsonfile, indent=2)

def load_irr_fixtures(context, fixtures_path):
for file in irr_fixtures():
Expand Down
68 changes: 2 additions & 66 deletions tests/test_rpki_parser.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import json
import os

from kartograf.rpki.parse import parse_rpki
Expand Down Expand Up @@ -36,57 +35,9 @@ def test_roa_validations(tmp_path, capsys):

captured = capsys.readouterr()

assert len(final_lines) == 10, "Should have found 10 valid ROAs"
assert "Result entries written: 10" in captured.out
assert len(final_lines) == 12, "Should have found 12 valid ROAs"
assert "Result entries written: 12" in captured.out
assert "Duplicates found: 5" in captured.out
assert "Invalids found: 1" in captured.out
assert "Incompletes: 0" in captured.out
assert "Non-ROA files: 1" in captured.out

def test_roa_incompletes(tmp_path, capsys):
'''
Test that the ROA file has missing entries.
The data is mocked here and written to a json file.
'''
epoch = "111111112"
context = create_test_context(tmp_path, epoch)
test_data = [
{
"type": "roa",
"validation": "OK",
"ski": "some-ski",
"vrps": [{"prefix": "192.0.2.0/24", "asid": "64496", "maxlen": "24"}],
"valid_until": "1234567890",
"valid_since": "1234567880"
},
{
"type": "roa",
"validation": "OK",
"ski": "some-ski",
"vrps": [{"prefix": "198.51.100.0/24", "asid": "64497", "maxlen": "24"}],
"valid_until": "1234567890",
"valid_since": "1234567880"
}
]

# Write test data to rpki_raw.json
with open(os.path.join(context.out_dir_rpki, "rpki_raw.json"), "w") as f:
json.dump(test_data, f)

parse_rpki(context)

# Check that rpki_final.txt was created
final_path = os.path.join(context.out_dir_rpki, "rpki_final.txt")
assert os.path.exists(final_path), "rpki_final.txt should exist"

# Count entries in final output
with open(final_path, "r") as f:
final_lines = f.readlines()

assert len(final_lines) == 0, "No rows should be written"
captured = capsys.readouterr()
assert "Incompletes: 2" in captured.out


def test_roa_valid_until_fallback(tmp_path):
'''Test ROA selection falls back to later valid_until'''
Expand All @@ -103,21 +54,6 @@ def test_roa_valid_until_fallback(tmp_path):
assert not any("101.0.1.0/24 AS11101" in e for e in entries), "ROA with earlier valid_until should not be selected"


def test_roa_valid_since_fallback(tmp_path):
'''Test ROA selection falls back to later valid_since when valid_until matches'''
epoch = "111111111"
context = create_test_context(tmp_path, epoch)
setup_test_data(context)
parse_rpki(context)

final_path = os.path.join(context.out_dir_rpki, "rpki_final.txt")
with open(final_path, "r") as f:
entries = [line.strip() for line in f.readlines()]

assert "102.0.100.0/24 AS11104" in entries, "ROA with later valid_since should be selected"
assert not any("102.0.100.0/24 AS11103" in e for e in entries), "ROA with earlier valid_since should not be selected"


def test_roa_asn_fallback(tmp_path):
'''Test ROA selection falls back to lower ASN when timestamps match'''
epoch = "111111111"
Expand Down
Loading