Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "julee"
version = "0.1.15"
version = "0.1.16"
description = "Julee - Clean architecture for accountable and transparent digital supply chains"
readme = "README.md"
requires-python = ">=3.11"
Expand Down Expand Up @@ -46,6 +46,7 @@ dependencies = [
"six>=1.16.0",
"jsonschema>=4.0.0",
"jsonpointer>=3.0.0",
"httpx>=0.27.0",
# Code introspection (doctrine tests, CLI)
"griffe>=1.0.0,<2",
"inflect>=7.5.0",
Expand Down
2 changes: 1 addition & 1 deletion src/julee/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
"""Julee - Clean architecture for accountable and transparent digital supply chains."""

__version__ = "0.1.15"
__version__ = "0.1.16"
35 changes: 35 additions & 0 deletions src/julee/contrib/ceap/_schema_ref.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
"""
Shared helpers for resolving JSON Schema $ref values.

A bare $ref schema is a dict of exactly {"$ref": "url#/fragment"}.
Resolution means fetching the URL and, if a fragment is present,
navigating to the target sub-schema and bundling the parent $defs so
that internal $ref values within the sub-schema remain valid.
"""

from typing import Any

import jsonpointer # type: ignore


def extract_schema_from_fetched(
full_schema: dict[str, Any], fragment: str
) -> dict[str, Any]:
"""Return the sub-schema identified by *fragment* from *full_schema*.

If *fragment* is empty the full schema is returned as-is. Otherwise the
fragment is treated as a JSON Pointer; the target object is extracted and
the parent ``$defs`` are merged in so that any internal ``$ref`` values
within the sub-schema continue to resolve correctly.
"""
if not fragment:
return full_schema

target = jsonpointer.resolve_pointer(full_schema, fragment)
if not isinstance(target, dict):
raise ValueError(f"$ref fragment '{fragment}' did not resolve to a JSON object")
result = dict(target)
parent_defs = full_schema.get("$defs", {})
if parent_defs:
result["$defs"] = {**parent_defs, **result.get("$defs", {})}
return result
2 changes: 2 additions & 0 deletions src/julee/contrib/ceap/apps/worker/extract_assemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
WorkflowDocumentRepositoryProxy,
WorkflowKnowledgeServiceConfigRepositoryProxy,
WorkflowKnowledgeServiceQueryRepositoryProxy,
WorkflowRemoteSchemaRepositoryProxy,
)
from julee.services.temporal.proxies import (
WorkflowKnowledgeServiceProxy,
Expand Down Expand Up @@ -127,6 +128,7 @@ async def run(self, document_id: str, assembly_specification_id: str) -> Assembl
knowledge_service_query_repo=knowledge_service_query_repo,
knowledge_service_config_repo=knowledge_service_config_repo,
knowledge_service=knowledge_service,
remote_schema_repo=WorkflowRemoteSchemaRepositoryProxy(), # type: ignore[abstract]
clock_service=clock_service,
execution_service=execution_service,
)
Expand Down
60 changes: 60 additions & 0 deletions src/julee/contrib/ceap/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
"""
Shared pytest fixtures for CEAP tests.
"""

import json
import threading
from http.server import BaseHTTPRequestHandler, HTTPServer

import pytest


@pytest.fixture
def schema_server():
"""Start a minimal HTTP server that serves registered JSON schemas.

Binds to a random free port on localhost. Schemas are registered
via ``server.register(path, schema_dict)``, which returns the full URL.

Usage::

def test_something(schema_server):
url = schema_server.register("/my.json", {"type": "object", ...})
# use url in test
"""
registry: dict[str, bytes] = {}

class _Handler(BaseHTTPRequestHandler):
def do_GET(self) -> None:
body = registry.get(self.path)
if body is not None:
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
else:
self.send_response(404)
self.end_headers()

def log_message(self, format: str, *args: object) -> None:
pass # suppress test output

server = HTTPServer(("127.0.0.1", 0), _Handler)
host, port = server.server_address

thread = threading.Thread(target=server.serve_forever)
thread.daemon = True
thread.start()

class _Server:
base_url = f"http://{host}:{port}"

def register(self, path: str, schema: dict) -> str:
"""Register schema at path; return its absolute URL."""
registry[path] = json.dumps(schema).encode()
return f"{self.base_url}{path}"

yield _Server()

server.shutdown()
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,16 @@ def jsonschema_must_be_valid(cls, v: dict[str, Any]) -> dict[str, Any]:
if not isinstance(v, dict):
raise ValueError("JSON Schema must be a dictionary")

# Basic validation that it looks like a JSON schema
if len(v) == 1 and "$ref" in v:
# Bare $ref — accept as-is. Resolution and schema validation
# happen at assembly time via RemoteSchemaRepository, not here.
if not isinstance(v["$ref"], str) or not v["$ref"].strip():
raise ValueError("$ref value must be a non-empty string")
return v

if "type" not in v:
raise ValueError("JSON Schema must have a 'type' field")

# Validate that it's a proper JSON Schema using jsonschema library
try:
jsonschema.Draft7Validator.check_schema(v)
except jsonschema.SchemaError as e:
Expand All @@ -138,26 +143,37 @@ def knowledge_service_queries_must_be_valid(
if not jsonschema_value:
raise ValueError("Cannot validate schema pointers without jsonschema field")

is_ref_schema = (
isinstance(jsonschema_value, dict)
and len(jsonschema_value) == 1
and "$ref" in jsonschema_value
)

cleaned_queries = {}
for schema_pointer, query_id in v.items():
# Validate schema pointer keys are strings
if not isinstance(schema_pointer, str):
raise ValueError("Schema pointer keys must be strings")

# Validate JSON Pointer format and that it exists in the schema
# Validate JSON Pointer format; existence against the resolved
# schema is only possible for inline schemas (not bare $refs —
# those are resolved at assembly time via RemoteSchemaRepository).
try:
if schema_pointer == "":
# Empty string is valid - refers to root of schema
pass
elif is_ref_schema:
# Format validation only — can't check existence without
# fetching the remote schema
jsonpointer.JsonPointer(schema_pointer)
else:
# Use jsonpointer to validate format and existence
ptr = jsonpointer.JsonPointer(schema_pointer)
ptr.resolve(jsonschema_value)
except jsonpointer.JsonPointerException as e:
raise ValueError(f"Invalid JSON Pointer '{schema_pointer}': {e}")
except (KeyError, IndexError, TypeError):
raise ValueError(
f"JSON Pointer '{schema_pointer}' does not exist in " f"schema"
f"JSON Pointer '{schema_pointer}' does not exist in schema"
)

# Validate query ID values
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@

from .factories import AssemblyFactory

# Guaranteed-unresolvable URL for negative tests (.invalid TLD per RFC 2606)
_UNRESOLVABLE_URL = "http://schema.invalid/test.json"

pytestmark = pytest.mark.unit


Expand Down Expand Up @@ -493,3 +496,118 @@ def test_version_validation(self, version: str, expected_success: bool) -> None:
else:
with pytest.raises((ValueError, ValidationError)):
AssemblyFactory.build(version=version)


class TestAssemblyRefSchemaValidation:
"""Tests for AssemblySpecification with a bare $ref jsonschema value."""

def test_ref_schema_accepted_and_stored_unchanged(self, schema_server) -> None:
"""A $ref pointing at a valid schema is accepted; the ref is stored
as-is rather than replaced with the resolved content."""
url = schema_server.register(
"/schema.json",
{"type": "object", "properties": {"name": {"type": "string"}}},
)
spec = AssemblySpecification(
assembly_specification_id="ref-test",
name="Ref Test",
applicability="Testing $ref support",
jsonschema={"$ref": url},
)
assert spec.jsonschema == {"$ref": url}

def test_ref_with_fragment_is_accepted(self, schema_server) -> None:
"""A $ref with a JSON Pointer fragment is resolved to validate the
target sub-schema, but the original $ref value is stored unchanged."""
url = schema_server.register(
"/full.json",
{
"$defs": {
"Item": {
"type": "object",
"properties": {"code": {"type": "string"}},
}
}
},
)
ref = f"{url}#/$defs/Item"
spec = AssemblySpecification(
assembly_specification_id="fragment-test",
name="Fragment Test",
applicability="Testing fragment $ref support",
jsonschema={"$ref": ref},
)
assert spec.jsonschema == {"$ref": ref}

def test_ref_to_unresolvable_url_is_accepted(self) -> None:
"""A $ref pointing at an unresolvable URL is accepted as-is.

Resolution is deferred to assembly time via RemoteSchemaRepository;
the domain model does not fetch remote schemas during construction.
"""
spec = AssemblySpecification(
assembly_specification_id="bad-ref-test",
name="Bad Ref Test",
applicability="Testing invalid $ref",
jsonschema={"$ref": _UNRESOLVABLE_URL},
)
assert spec.jsonschema == {"$ref": _UNRESOLVABLE_URL}

def test_ref_survives_serialisation_roundtrip(self, schema_server) -> None:
"""The $ref value is preserved through model_dump_json and
re-instantiation."""
url = schema_server.register(
"/roundtrip.json",
{"type": "object", "properties": {"x": {"type": "integer"}}},
)
original = AssemblySpecification(
assembly_specification_id="roundtrip-test",
name="Roundtrip Test",
applicability="Testing serialisation roundtrip",
jsonschema={"$ref": url},
)
data = json.loads(original.model_dump_json())
restored = AssemblySpecification(**data)
assert restored.jsonschema == {"$ref": url}

def test_knowledge_service_queries_format_validated_for_ref_schema(self) -> None:
"""JSON Pointer keys in knowledge_service_queries are format-validated
for bare $ref schemas; existence against the resolved schema is deferred
to assembly time via RemoteSchemaRepository."""
spec = AssemblySpecification(
assembly_specification_id="ksq-ref-test",
name="KSQ Ref Test",
applicability="Testing pointer validation against $ref",
jsonschema={"$ref": _UNRESOLVABLE_URL},
knowledge_service_queries={"/properties/sku": "extract-sku"},
)
assert spec.knowledge_service_queries == {"/properties/sku": "extract-sku"}

def test_knowledge_service_queries_pointer_absent_from_ref_is_accepted(
self,
) -> None:
"""A JSON Pointer that would not exist in the resolved $ref schema is
still accepted at construction time; existence checking is deferred to
assembly time when the remote schema can actually be fetched."""
spec = AssemblySpecification(
assembly_specification_id="deferred-pointer-test",
name="Deferred Pointer Test",
applicability="Testing pointer deferral for $ref schemas",
jsonschema={"$ref": _UNRESOLVABLE_URL},
knowledge_service_queries={"/properties/nonexistent": "query-1"},
)
assert spec.knowledge_service_queries == {"/properties/nonexistent": "query-1"}

def test_knowledge_service_queries_rejects_malformed_pointer_for_ref_schema(
self,
) -> None:
"""A malformed JSON Pointer (not starting with /) is rejected even for
bare $ref schemas, since format validation still applies."""
with pytest.raises(ValidationError):
AssemblySpecification(
assembly_specification_id="bad-format-test",
name="Bad Format Test",
applicability="Testing malformed pointer rejection",
jsonschema={"$ref": _UNRESOLVABLE_URL},
knowledge_service_queries={"not-a-pointer": "query-1"},
)
2 changes: 2 additions & 0 deletions src/julee/contrib/ceap/domain/repositories/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from .knowledge_service_config import KnowledgeServiceConfigRepository
from .knowledge_service_query import KnowledgeServiceQueryRepository
from .policy import PolicyRepository
from .remote_schema import RemoteSchemaRepository

__all__ = [
"DocumentRepository",
Expand All @@ -21,4 +22,5 @@
"KnowledgeServiceQueryRepository",
"PolicyRepository",
"DocumentPolicyValidationRepository",
"RemoteSchemaRepository",
]
8 changes: 8 additions & 0 deletions src/julee/contrib/ceap/domain/repositories/remote_schema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from typing import Any, Protocol, runtime_checkable


@runtime_checkable
class RemoteSchemaRepository(Protocol):
async def fetch(self, url: str) -> dict[str, Any]:
"""Fetch and return the JSON document at url."""
...
Loading
Loading