Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 93 additions & 37 deletions core/repo.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Git repository helpers for cloning, updating, and parsing the device-type library."""

import json
import os
import pickle
from glob import glob
Expand Down Expand Up @@ -28,22 +29,39 @@ def find_class(self, module, name):
)


_PICKLE_MAX_BYTES = 10 * 1024 * 1024 # 10 MiB — DTL pickles are typically <500 KiB
_INDEX_MAX_BYTES = 10 * 1024 * 1024 # 10 MiB — DTL index files are typically <1 MiB


def _vendor_slugs_from_pickle(
pickle_path: str, slugs_lower: list, slug_format, subdir_filter: "Optional[str]" = None
def _resolve_index_path(base_dir: str, stem: str) -> "Optional[str]":
"""Return the path to a DTL index file, preferring the JSON form over the legacy pickle.

Upstream DTL replaced ``tests/known-*.pickle`` with ``tests/known-*.json`` (GHSA-492p-5wp7-2w7c).
We prefer ``<stem>.json`` when present and fall back to ``<stem>.pickle`` so users pinned to an
older DTL checkout still get the slug fast path. Returns ``None`` when neither exists.
"""
json_path = os.path.join(base_dir, stem + ".json")
if os.path.exists(json_path):
return json_path
pickle_path = os.path.join(base_dir, stem + ".pickle")
if os.path.exists(pickle_path):
return pickle_path
return None


def _vendor_slugs_from_index(
index_path: "Optional[str]", slugs_lower: list, slug_format, subdir_filter: "Optional[str]" = None
) -> "Optional[set]":
"""Load a (model_name, vendor_dir) pickle and return the set of vendor slugs matching *slugs_lower*.
"""Load a (model_name, vendor_dir) index and return the set of vendor slugs matching *slugs_lower*.

*subdir_filter*, if given, requires the vendor_dir to contain that substring.
Returns ``None`` when the pickle is unavailable (missing or unreadable), so callers
*index_path* may point at a ``.json`` (current) or ``.pickle`` (legacy) file; the loader is
chosen by extension. *subdir_filter*, if given, requires the vendor_dir to contain that
substring. Returns ``None`` when the index is unavailable (missing or unreadable), so callers
can distinguish "no matches" (empty set) from "hint unavailable" (None).
"""
if not os.path.exists(pickle_path):
if index_path is None:
return None
try:
entries = _safe_pickle_load(pickle_path)
entries = _safe_index_load(index_path)
except Exception:
return None
result = set()
Expand All @@ -63,32 +81,67 @@ def _safe_abs_path(repo_root: str, relpath: str) -> "Optional[str]":
return abs_path if abs_path.startswith(os.path.normpath(repo_root) + os.sep) else None


def _safe_pickle_load(path: str):
"""Load a DTL upstream pickle using the restricted unpickler.
def _validate_index_data(data, source: str):
"""Validate that *data* is a set/list of (str, str) 2-element pairs.

Enforces a hard size cap before unpickling and validates the loaded object
is a set/list of (str, str) 2-tuples so malformed/oversized pickles cannot
cause resource exhaustion. Returns the loaded set on success or raises
``ValueError`` on shape violations (callers should catch and fall back).
Shared by the JSON and pickle loaders. JSON arrays decode to lists rather than tuples, so
both container shapes are accepted. Raises ``ValueError`` on any shape violation (callers
should catch and fall back to the normal glob path).
"""
size = os.path.getsize(path)
if size > _PICKLE_MAX_BYTES:
raise ValueError(f"Pickle file {path!r} is {size} bytes (limit {_PICKLE_MAX_BYTES}); refusing to load.")
with open(path, "rb") as fh:
data = _RestrictedUnpickler(fh).load()
if not isinstance(data, (set, list, frozenset)):
raise ValueError(f"Unexpected pickle root type {type(data).__name__!r}; expected set/list.")
raise ValueError(f"Unexpected {source} root type {type(data).__name__!r}; expected set/list.")
for item in data:
if (
not isinstance(item, tuple)
not isinstance(item, (tuple, list))
or len(item) != 2
or not isinstance(item[0], str)
or not isinstance(item[1], str)
):
raise ValueError(f"Unexpected item shape in pickle: {item!r}")
raise ValueError(f"Unexpected item shape in {source}: {item!r}")
return data


def _check_index_size(path: str):
"""Enforce the hard size cap before reading an index file, guarding against resource exhaustion."""
size = os.path.getsize(path)
if size > _INDEX_MAX_BYTES:
raise ValueError(f"Index file {path!r} is {size} bytes (limit {_INDEX_MAX_BYTES}); refusing to load.")


def _safe_json_load(path: str):
"""Load a DTL upstream ``known-*.json`` index file.

The current DTL format is a JSON array of ``[str, str]`` pairs. Enforces a hard size cap and
validates the decoded shape. JSON carries no code-execution risk, so no restricted loader is
needed. Returns the loaded list on success or raises ``ValueError`` on shape violations.
"""
_check_index_size(path)
with open(path, encoding="utf-8") as fh:
data = json.load(fh)
return _validate_index_data(data, "JSON index")


def _safe_pickle_load(path: str):
"""Load a legacy DTL ``known-*.pickle`` index using the restricted unpickler.

Enforces a hard size cap before unpickling and validates the loaded object is a set/list of
(str, str) 2-tuples so malformed/oversized pickles cannot cause resource exhaustion. Returns
the loaded set on success or raises ``ValueError`` on shape violations (callers should catch
and fall back).
"""
_check_index_size(path)
with open(path, "rb") as fh:
data = _RestrictedUnpickler(fh).load()
return _validate_index_data(data, "pickle")


def _safe_index_load(path: str):
"""Load a DTL slug index, dispatching on file extension (``.json`` current, ``.pickle`` legacy)."""
if path.endswith(".json"):
return _safe_json_load(path)
return _safe_pickle_load(path)


def validate_git_url(url):
"""Determine whether a Git remote URL is allowed (HTTPS, SSH, or file://).

Expand Down Expand Up @@ -484,16 +537,18 @@ def get_devices(self, base_path, vendors: list = None):
return files, discovered_vendors

def resolve_slug_files(self, slugs):
"""Use the upstream pickle indexes to resolve YAML file paths for slug/model matches.
"""Use the upstream slug indexes to resolve YAML file paths for slug/model matches.

The DTL repo ships three pickle files under ``tests/``:
The DTL repo ships three index files under ``tests/``. Upstream replaced the original
``known-*.pickle`` files with ``known-*.json`` (GHSA-492p-5wp7-2w7c); both carry the same
data shape, and we prefer JSON, falling back to pickle for older pinned checkouts:

* ``known-slugs.pickle`` — set of ``(manufacturer_prefixed_slug, relpath)`` for
* ``known-slugs.json`` — list of ``(manufacturer_prefixed_slug, relpath)`` for
device types. ``relpath`` is relative to the repo root, e.g.
``device-types/Nokia/7750-SR-7s.yaml``.
* ``known-modules.pickle`` — set of ``(model_name, vendor_dir)`` for module
* ``known-modules.json`` — list of ``(model_name, vendor_dir)`` for module
types. Only the vendor directory is stored, not the file name.
* ``known-racks.pickle`` — same format as known-modules.
* ``known-racks.json`` — same format as known-modules.

Matching uses a **case-insensitive substring** check identical to the runtime
:meth:`parse_files` filter so that partial slug/model searches work the same way.
Expand All @@ -502,32 +557,33 @@ def resolve_slug_files(self, slugs):
slugs (list[str]): User-supplied slug/model substrings (``--slugs``).

Returns:
dict or None: ``None`` when the device pickle is unavailable (caller falls back
dict or None: ``None`` when the device index is unavailable (caller falls back
to the normal glob path). Otherwise a dict with the keys:

``"device_files"``
``{vendor_slug: [abs_path, ...]}`` for devices resolved via pickle.
``{vendor_slug: [abs_path, ...]}`` for devices resolved via the index.
``"module_vendors"``
``{vendor_slug}`` — set of vendor slugs that may contain matching module
types, or ``None`` when the module pickle was unavailable (caller should
types, or ``None`` when the module index was unavailable (caller should
fall back to full glob+parse instead of skipping).
``"rack_vendors"``
``{vendor_slug}`` — same for rack types; ``None`` means unavailable.
"""
repo_root = self.get_absolute_path()
device_pickle = os.path.join(repo_root, "tests", "known-slugs.pickle")
module_pickle = os.path.join(repo_root, "tests", "known-modules.pickle")
rack_pickle = os.path.join(repo_root, "tests", "known-racks.pickle")
tests_dir = os.path.join(repo_root, "tests")
device_index = _resolve_index_path(tests_dir, "known-slugs")
module_index = _resolve_index_path(tests_dir, "known-modules")
rack_index = _resolve_index_path(tests_dir, "known-racks")

if not os.path.exists(device_pickle):
if device_index is None:
return None

slugs_lower = [s.casefold() for s in slugs]

# --- device types --------------------------------------------------
device_files = {} # vendor_slug -> [abs_path]
try:
known_slugs = _safe_pickle_load(device_pickle)
known_slugs = _safe_index_load(device_index)
except Exception:
return None

Expand All @@ -545,10 +601,10 @@ def resolve_slug_files(self, slugs):
device_files.setdefault(vendor_slug, []).append(abs_path)

# --- module types --------------------------------------------------
module_vendors = _vendor_slugs_from_pickle(module_pickle, slugs_lower, self.slug_format)
module_vendors = _vendor_slugs_from_index(module_index, slugs_lower, self.slug_format)

# --- rack types ----------------------------------------------------
rack_vendors = _vendor_slugs_from_pickle(rack_pickle, slugs_lower, self.slug_format, subdir_filter="rack-types")
rack_vendors = _vendor_slugs_from_index(rack_index, slugs_lower, self.slug_format, subdir_filter="rack-types")

return {
"device_files": device_files,
Expand Down
18 changes: 9 additions & 9 deletions nb-dt-import.py
Original file line number Diff line number Diff line change
Expand Up @@ -854,20 +854,20 @@ def _validate_argument_combinations(parser, args):


def _apply_slug_fast_path(dtl_repo, args, vendors_to_process, handle):
"""Use upstream pickle indexes to pre-resolve files and narrow the vendor list.
"""Use upstream slug indexes to pre-resolve files and narrow the vendor list.

When ``args.slugs`` is set and the DTL pickle files are present, resolves
exactly which device-type files match the requested slugs and restricts
``vendors_to_process`` to only those vendors. Returns a ``(vendors, resolved)``
pair where *resolved* is the dict returned by :meth:`DTLRepo.resolve_slug_files`
(or ``None`` when the pickle is absent/unavailable).
When ``args.slugs`` is set and the DTL index files (``known-*.json``, or legacy
``known-*.pickle``) are present, resolves exactly which device-type files match the
requested slugs and restricts ``vendors_to_process`` to only those vendors. Returns a
``(vendors, resolved)`` pair where *resolved* is the dict returned by
:meth:`DTLRepo.resolve_slug_files` (or ``None`` when the index is absent/unavailable).
"""
if not args.slugs:
return vendors_to_process, None

slug_resolved = dtl_repo.resolve_slug_files(args.slugs)
if slug_resolved is None:
handle.verbose_log("Slug pickle unavailable; falling back to full file scan.")
handle.verbose_log("Slug index unavailable; falling back to full file scan.")
return vendors_to_process, None

matched_vendor_slugs = (
Expand All @@ -876,11 +876,11 @@ def _apply_slug_fast_path(dtl_repo, args, vendors_to_process, handle):
| (slug_resolved["rack_vendors"] or set())
)
if not matched_vendor_slugs:
handle.verbose_log("Slug pickle returned no matches; falling back to full file scan.")
handle.verbose_log("Slug index returned no matches; falling back to full file scan.")
return vendors_to_process, None
narrowed = [v for v in vendors_to_process if v["slug"] in matched_vendor_slugs]
handle.verbose_log(
f"Slug pickle resolved {sum(len(f) for f in slug_resolved['device_files'].values())} "
f"Slug index resolved {sum(len(f) for f in slug_resolved['device_files'].values())} "
f"device file(s) across {len(matched_vendor_slugs)} vendor(s)."
)
return narrowed, slug_resolved
Expand Down
125 changes: 124 additions & 1 deletion tests/test_repo.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,15 @@
import pytest
from unittest.mock import MagicMock, call, mock_open, patch
from git import exc as git_exc
from core.repo import DTLRepo, _safe_pickle_load, validate_git_url, normalize_port_mappings
from core.repo import (
DTLRepo,
_resolve_index_path,
_safe_index_load,
_safe_json_load,
_safe_pickle_load,
validate_git_url,
normalize_port_mappings,
)


class TestValidateGitUrl:
Expand Down Expand Up @@ -1512,3 +1520,118 @@ def test_rack_pickle_only_uses_rack_type_entries(self, tmp_path):
result = repo.resolve_slug_files(["rack"])

assert result["rack_vendors"] == {"apc"}


class TestIndexLoaders:
"""Tests for the format-dispatching index loaders and path resolver."""

def test_resolve_prefers_json_over_pickle(self, tmp_path):
(tmp_path / "known-slugs.json").write_text("[]")
(tmp_path / "known-slugs.pickle").write_bytes(b"x")
assert _resolve_index_path(str(tmp_path), "known-slugs").endswith("known-slugs.json")

def test_resolve_falls_back_to_pickle(self, tmp_path):
(tmp_path / "known-slugs.pickle").write_bytes(b"x")
assert _resolve_index_path(str(tmp_path), "known-slugs").endswith("known-slugs.pickle")

def test_resolve_returns_none_when_absent(self, tmp_path):
assert _resolve_index_path(str(tmp_path), "known-slugs") is None

def test_safe_json_load_reads_pairs(self, tmp_path):
path = tmp_path / "known-slugs.json"
path.write_text('[["nokia-7750", "device-types/Nokia/7750.yaml"]]')
data = _safe_json_load(str(path))
assert data == [["nokia-7750", "device-types/Nokia/7750.yaml"]]

def test_safe_json_load_rejects_bad_shape(self, tmp_path):
path = tmp_path / "known-slugs.json"
path.write_text('[["only-one-element"]]')
with pytest.raises(ValueError, match="Unexpected item shape"):
_safe_json_load(str(path))

def test_safe_index_load_dispatches_on_extension(self, tmp_path):
import pickle

json_path = tmp_path / "a.json"
json_path.write_text('[["x", "y"]]')
pickle_path = tmp_path / "a.pickle"
pickle_path.write_bytes(pickle.dumps({("p", "q")}))
assert _safe_index_load(str(json_path)) == [["x", "y"]]
assert _safe_index_load(str(pickle_path)) == {("p", "q")}


class TestResolveSlugFilesJson:
"""Tests for the JSON slug fast path (current upstream format)."""

def _make_repo(self):
return TestResolveSlugFiles._make_repo(self)

def _write_json(self, tests_dir, stem, entries):
import json

(tests_dir / f"{stem}.json").write_text(json.dumps([list(e) for e in entries]))

def test_returns_device_files_for_matching_slug(self, tmp_path):
tests_dir = tmp_path / "tests"
tests_dir.mkdir()
self._write_json(tests_dir, "known-slugs", [("nokia-7750-sr-7s", "device-types/Nokia/7750-SR-7s.yaml")])
self._write_json(tests_dir, "known-modules", [])
self._write_json(tests_dir, "known-racks", [])

repo = self._make_repo()
repo.repo_path = str(tmp_path)
repo.cwd = ""

result = repo.resolve_slug_files(["nokia-7750-sr-7s"])

assert result is not None
expected_path = str(tmp_path / "device-types" / "Nokia" / "7750-SR-7s.yaml")
assert expected_path in result["device_files"]["nokia"]

def test_matching_module_json_adds_vendor_slug(self, tmp_path):
tests_dir = tmp_path / "tests"
tests_dir.mkdir()
self._write_json(tests_dir, "known-slugs", [])
self._write_json(tests_dir, "known-modules", [("7750 line card", "module-types/Nokia")])
self._write_json(tests_dir, "known-racks", [])

repo = self._make_repo()
repo.repo_path = str(tmp_path)
repo.cwd = ""

result = repo.resolve_slug_files(["7750"])

assert result["module_vendors"] == {"nokia"}

def test_json_preferred_over_pickle(self, tmp_path):
"""When both formats exist, JSON wins — pickle here would yield a different match."""
import pickle

tests_dir = tmp_path / "tests"
tests_dir.mkdir()
self._write_json(tests_dir, "known-slugs", [("nokia-7750-sr-7s", "device-types/Nokia/7750-SR-7s.yaml")])
(tests_dir / "known-slugs.pickle").write_bytes(
pickle.dumps({("cisco-catalyst-9200", "device-types/Cisco/Catalyst-9200.yaml")})
)
self._write_json(tests_dir, "known-modules", [])
self._write_json(tests_dir, "known-racks", [])

repo = self._make_repo()
repo.repo_path = str(tmp_path)
repo.cwd = ""

result = repo.resolve_slug_files(["nokia"])

assert "nokia" in result["device_files"]
assert "cisco" not in result["device_files"]

def test_corrupted_device_json_returns_none(self, tmp_path):
tests_dir = tmp_path / "tests"
tests_dir.mkdir()
(tests_dir / "known-slugs.json").write_text("{not valid json")

repo = self._make_repo()
repo.repo_path = str(tmp_path)
repo.cwd = ""

assert repo.resolve_slug_files(["nokia"]) is None