From 1b929f3d07b48dcb00e5b9d16cbbfc5ebe6d6469 Mon Sep 17 00:00:00 2001 From: Michael Nelson Date: Thu, 9 Apr 2026 09:33:47 +1000 Subject: [PATCH 01/11] ceap: support $ref schemas in AssemblySpecification and resolve at query time MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit UNTP and similar standards publish their JSON schemas at stable URLs. Embedding copies leads to version drift and large JSON fixtures. Allow AssemblySpecification.jsonschema to be a bare {'$ref': 'url#/ptr'} value. The ref is validated synchronously during model construction (to catch bad URLs early) but the resolved schema is not stored — it is fetched fresh each time a query is assembled so that patch updates to the published schema are picked up automatically. ExtractAssembleDataUseCase._assemble_iteration now resolves the schema once per iteration (async, via httpx) before building the PointableJSONSchema and passes the resolved dict to _validate_assembled_data. --- pyproject.toml | 3 +- .../assembly_specification.py | 55 +++++++++++++++- .../ceap/use_cases/extract_assemble_data.py | 66 +++++++++++++------ uv.lock | 4 +- 4 files changed, 103 insertions(+), 25 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index dc25da6e..5e89a32e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "julee" -version = "0.1.15" +version = "0.1.16" description = "Julee - Clean architecture for accountable and transparent digital supply chains" readme = "README.md" requires-python = ">=3.11" @@ -46,6 +46,7 @@ dependencies = [ "six>=1.16.0", "jsonschema>=4.0.0", "jsonpointer>=3.0.0", + "httpx>=0.27.0", # Code introspection (doctrine tests, CLI) "griffe>=1.0.0,<2", "inflect>=7.5.0", diff --git a/src/julee/contrib/ceap/domain/models/assembly_specification/assembly_specification.py b/src/julee/contrib/ceap/domain/models/assembly_specification/assembly_specification.py index 9db0a555..56accad9 100644 --- a/src/julee/contrib/ceap/domain/models/assembly_specification/assembly_specification.py +++ b/src/julee/contrib/ceap/domain/models/assembly_specification/assembly_specification.py @@ -18,6 +18,7 @@ from enum import Enum from typing import Any +import httpx import jsonpointer # type: ignore import jsonschema from pydantic import Field, field_validator @@ -25,6 +26,31 @@ from julee.core.entities.entity import Entity +def _fetch_and_resolve_ref(ref: str) -> dict[str, Any]: + """Fetch an external $ref URL and return the resolved schema dict. + + Handles an optional JSON Pointer fragment (e.g. #/$defs/Product) by + extracting the target sub-schema and bundling parent $defs so internal + $ref values remain valid. + """ + url, _, fragment = ref.partition("#") + response = httpx.get(url) + response.raise_for_status() + full_schema = response.json() + + if not fragment: + return full_schema + + target = jsonpointer.resolve_pointer(full_schema, fragment) + if not isinstance(target, dict): + raise ValueError(f"$ref fragment '{fragment}' did not resolve to a JSON object") + result = dict(target) + parent_defs = full_schema.get("$defs", {}) + if parent_defs: + result["$defs"] = {**parent_defs, **result.get("$defs", {})} + return result + + class AssemblySpecificationStatus(str, Enum): """Status of an assembly specification configuration.""" @@ -113,11 +139,24 @@ def jsonschema_must_be_valid(cls, v: dict[str, Any]) -> dict[str, Any]: if not isinstance(v, dict): raise ValueError("JSON Schema must be a dictionary") - # Basic validation that it looks like a JSON schema + if len(v) == 1 and "$ref" in v: + # Resolve the ref to validate it produces a valid schema, + # but store the original $ref unchanged. + try: + resolved = _fetch_and_resolve_ref(v["$ref"]) + except Exception as e: + raise ValueError(f"Could not resolve $ref '{v['$ref']}': {e}") + if "type" not in resolved: + raise ValueError("Resolved $ref schema must have a 'type' field") + try: + jsonschema.Draft7Validator.check_schema(resolved) + except jsonschema.SchemaError as e: + raise ValueError(f"Invalid JSON Schema at $ref: {e.message}") + return v + if "type" not in v: raise ValueError("JSON Schema must have a 'type' field") - # Validate that it's a proper JSON Schema using jsonschema library try: jsonschema.Draft7Validator.check_schema(v) except jsonschema.SchemaError as e: @@ -138,6 +177,18 @@ def knowledge_service_queries_must_be_valid( if not jsonschema_value: raise ValueError("Cannot validate schema pointers without jsonschema field") + # If jsonschema is a bare $ref, resolve it to validate pointers against + # the actual schema content + if ( + isinstance(jsonschema_value, dict) + and len(jsonschema_value) == 1 + and "$ref" in jsonschema_value + ): + try: + jsonschema_value = _fetch_and_resolve_ref(jsonschema_value["$ref"]) + except Exception as e: + raise ValueError(f"Could not resolve $ref for pointer validation: {e}") + cleaned_queries = {} for schema_pointer, query_id in v.items(): # Validate schema pointer keys are strings diff --git a/src/julee/contrib/ceap/use_cases/extract_assemble_data.py b/src/julee/contrib/ceap/use_cases/extract_assemble_data.py index e8049806..84eed900 100644 --- a/src/julee/contrib/ceap/use_cases/extract_assemble_data.py +++ b/src/julee/contrib/ceap/use_cases/extract_assemble_data.py @@ -10,8 +10,11 @@ import hashlib import json import logging +from collections.abc import Mapping from typing import Any +import httpx +import jsonpointer import jsonschema import multihash from pydantic import BaseModel @@ -42,6 +45,36 @@ logger = logging.getLogger(__name__) +async def _resolve_jsonschema(schema: Mapping[str, Any]) -> dict[str, Any]: + """Fetch and resolve a bare $ref schema; return inline schemas unchanged. + + If the schema is exactly {"$ref": "url#/fragment"}, fetches the URL afresh, + navigates to the fragment, and bundles the parent $defs so internal $ref + values remain valid. Otherwise returns the schema as-is. Re-fetching on + every query ensures the latest published version of the schema is used. + """ + if not (len(schema) == 1 and "$ref" in schema): + return dict(schema) + + url, _, fragment = schema["$ref"].partition("#") + async with httpx.AsyncClient() as client: + response = await client.get(url) + response.raise_for_status() + full_schema = response.json() + + if not fragment: + return full_schema + + target = jsonpointer.resolve_pointer(full_schema, fragment) + if not isinstance(target, dict): + raise ValueError(f"$ref fragment '{fragment}' did not resolve to a JSON object") + result = dict(target) + parent_defs = full_schema.get("$defs", {}) + if parent_defs: + result["$defs"] = {**parent_defs, **result.get("$defs", {})} + return result + + class ExtractAssembleDataRequest(BaseModel): document_id: str assembly_specification_id: str @@ -414,6 +447,12 @@ async def _assemble_iteration( # Initialize the result data structure assembled_data: dict[str, Any] = {} + # Resolve $ref schemas afresh on every query so any published patch + # to the external schema is picked up automatically. + resolved_jsonschema = await _resolve_jsonschema( + assembly_specification.jsonschema + ) + # Process each knowledge service query # TODO: This is where we may want to fan-out/fan-in to do these # in parallel. @@ -422,7 +461,7 @@ async def _assemble_iteration( query_id, ) in assembly_specification.knowledge_service_queries.items(): # Use PointableJSONSchema to generate complete schema for pointer target - pointable_schema = PointableJSONSchema(assembly_specification.jsonschema) + pointable_schema = PointableJSONSchema(resolved_jsonschema) output_schema = pointable_schema.schema_for_pointer(schema_pointer) # Get the query configuration @@ -464,7 +503,7 @@ async def _assemble_iteration( ) # Validate the assembled data against the JSON schema - self._validate_assembled_data(assembled_data, assembly_specification) + self._validate_assembled_data(assembled_data, resolved_jsonschema) # Create the assembled document assembled_document_id = await self._create_assembled_document( @@ -597,26 +636,16 @@ async def _create_assembled_document( def _validate_assembled_data( self, assembled_data: dict[str, Any], - assembly_specification: AssemblySpecification, + resolved_jsonschema: dict[str, Any], ) -> None: """Validate that the assembled data conforms to the JSON schema.""" try: - jsonschema.validate(assembled_data, assembly_specification.jsonschema) - logger.debug( - "Assembled data validation passed", - extra={ - "assembly_specification_id": ( - assembly_specification.assembly_specification_id - ), - }, - ) + jsonschema.validate(assembled_data, resolved_jsonschema) + logger.debug("Assembled data validation passed") except jsonschema.ValidationError as e: logger.error( "Assembled data validation failed", extra={ - "assembly_specification_id": ( - assembly_specification.assembly_specification_id - ), "validation_error": str(e), "error_path": (list(e.absolute_path) if e.absolute_path else []), "schema_path": (list(e.schema_path) if e.schema_path else []), @@ -628,12 +657,7 @@ def _validate_assembled_data( except jsonschema.SchemaError as e: logger.error( "JSON schema is invalid", - extra={ - "assembly_specification_id": ( - assembly_specification.assembly_specification_id - ), - "schema_error": str(e), - }, + extra={"schema_error": str(e)}, ) raise ValueError( f"Invalid JSON schema in assembly specification: {e.message}" diff --git a/uv.lock b/uv.lock index ed85a610..dad93f22 100644 --- a/uv.lock +++ b/uv.lock @@ -933,7 +933,7 @@ wheels = [ [[package]] name = "julee" -version = "0.1.14" +version = "0.1.16" source = { editable = "." } dependencies = [ { name = "anthropic" }, @@ -941,6 +941,7 @@ dependencies = [ { name = "fastapi" }, { name = "fastapi-pagination" }, { name = "griffe" }, + { name = "httpx" }, { name = "inflect" }, { name = "jinja2" }, { name = "jsonpointer" }, @@ -1001,6 +1002,7 @@ requires-dist = [ { name = "fastapi-pagination", specifier = ">=0.12.0" }, { name = "furo", marker = "extra == 'docs'", specifier = ">=2023.9.10" }, { name = "griffe", specifier = ">=1.0.0,<2" }, + { name = "httpx", specifier = ">=0.27.0" }, { name = "hypothesis", marker = "extra == 'dev'", specifier = ">=6.0.0" }, { name = "inflect", specifier = ">=7.5.0" }, { name = "jinja2", specifier = ">=3.0.0" }, From 64ac948d6156abfa9bded3420c78e431ca9b4450 Mon Sep 17 00:00:00 2001 From: Michael Nelson Date: Thu, 9 Apr 2026 10:38:13 +1000 Subject: [PATCH 02/11] ceap: test $ref schema resolution with a live HTTP server Cover the new $ref code paths: - TestAssemblyRefSchemaValidation verifies that a bare $ref is accepted, stored unchanged, validated against the resolved schema content (including JSON Pointer checks against the resolved shape), and survives a serialisation roundtrip. - TestResolveJsonSchema verifies the async _resolve_jsonschema helper: inline schemas pass through unchanged; a bare $ref is fetched and returned; a fragment ref extracts the sub-schema and bundles parent $defs. A shared conftest.py fixture starts a minimal stdlib HTTP server (random free port, no test dependencies) used by both test classes. --- src/julee/contrib/ceap/conftest.py | 60 ++++++++++ .../tests/test_assembly_specification.py | 108 ++++++++++++++++++ .../tests/test_extract_assemble_data.py | 52 +++++++++ 3 files changed, 220 insertions(+) create mode 100644 src/julee/contrib/ceap/conftest.py diff --git a/src/julee/contrib/ceap/conftest.py b/src/julee/contrib/ceap/conftest.py new file mode 100644 index 00000000..c2c85997 --- /dev/null +++ b/src/julee/contrib/ceap/conftest.py @@ -0,0 +1,60 @@ +""" +Shared pytest fixtures for CEAP tests. +""" + +import json +import threading +from http.server import BaseHTTPRequestHandler, HTTPServer + +import pytest + + +@pytest.fixture +def schema_server(): + """Start a minimal HTTP server that serves registered JSON schemas. + + Binds to a random free port on localhost. Schemas are registered + via ``server.register(path, schema_dict)``, which returns the full URL. + + Usage:: + + def test_something(schema_server): + url = schema_server.register("/my.json", {"type": "object", ...}) + # use url in test + """ + registry: dict[str, bytes] = {} + + class _Handler(BaseHTTPRequestHandler): + def do_GET(self) -> None: + body = registry.get(self.path) + if body is not None: + self.send_response(200) + self.send_header("Content-Type", "application/json") + self.send_header("Content-Length", str(len(body))) + self.end_headers() + self.wfile.write(body) + else: + self.send_response(404) + self.end_headers() + + def log_message(self, format: str, *args: object) -> None: + pass # suppress test output + + server = HTTPServer(("127.0.0.1", 0), _Handler) + host, port = server.server_address + + thread = threading.Thread(target=server.serve_forever) + thread.daemon = True + thread.start() + + class _Server: + base_url = f"http://{host}:{port}" + + def register(self, path: str, schema: dict) -> str: + """Register schema at path; return its absolute URL.""" + registry[path] = json.dumps(schema).encode() + return f"{self.base_url}{path}" + + yield _Server() + + server.shutdown() diff --git a/src/julee/contrib/ceap/domain/models/assembly_specification/tests/test_assembly_specification.py b/src/julee/contrib/ceap/domain/models/assembly_specification/tests/test_assembly_specification.py index b52dad1f..0ba0202a 100644 --- a/src/julee/contrib/ceap/domain/models/assembly_specification/tests/test_assembly_specification.py +++ b/src/julee/contrib/ceap/domain/models/assembly_specification/tests/test_assembly_specification.py @@ -31,6 +31,9 @@ from .factories import AssemblyFactory +# Guaranteed-unresolvable URL for negative tests (.invalid TLD per RFC 2606) +_UNRESOLVABLE_URL = "http://schema.invalid/test.json" + pytestmark = pytest.mark.unit @@ -493,3 +496,108 @@ def test_version_validation(self, version: str, expected_success: bool) -> None: else: with pytest.raises((ValueError, ValidationError)): AssemblyFactory.build(version=version) + + +class TestAssemblyRefSchemaValidation: + """Tests for AssemblySpecification with a bare $ref jsonschema value.""" + + def test_ref_schema_accepted_and_stored_unchanged(self, schema_server) -> None: + """A $ref pointing at a valid schema is accepted; the ref is stored + as-is rather than replaced with the resolved content.""" + url = schema_server.register( + "/schema.json", + {"type": "object", "properties": {"name": {"type": "string"}}}, + ) + spec = AssemblySpecification( + assembly_specification_id="ref-test", + name="Ref Test", + applicability="Testing $ref support", + jsonschema={"$ref": url}, + ) + assert spec.jsonschema == {"$ref": url} + + def test_ref_with_fragment_is_accepted(self, schema_server) -> None: + """A $ref with a JSON Pointer fragment is resolved to validate the + target sub-schema, but the original $ref value is stored unchanged.""" + url = schema_server.register( + "/full.json", + { + "$defs": { + "Item": { + "type": "object", + "properties": {"code": {"type": "string"}}, + } + } + }, + ) + ref = f"{url}#/$defs/Item" + spec = AssemblySpecification( + assembly_specification_id="fragment-test", + name="Fragment Test", + applicability="Testing fragment $ref support", + jsonschema={"$ref": ref}, + ) + assert spec.jsonschema == {"$ref": ref} + + def test_ref_to_unresolvable_url_raises(self) -> None: + """A $ref that cannot be fetched raises a ValidationError.""" + with pytest.raises(ValidationError, match="Could not resolve"): + AssemblySpecification( + assembly_specification_id="bad-ref-test", + name="Bad Ref Test", + applicability="Testing invalid $ref", + jsonschema={"$ref": _UNRESOLVABLE_URL}, + ) + + def test_ref_survives_serialisation_roundtrip(self, schema_server) -> None: + """The $ref value is preserved through model_dump_json and + re-instantiation.""" + url = schema_server.register( + "/roundtrip.json", + {"type": "object", "properties": {"x": {"type": "integer"}}}, + ) + original = AssemblySpecification( + assembly_specification_id="roundtrip-test", + name="Roundtrip Test", + applicability="Testing serialisation roundtrip", + jsonschema={"$ref": url}, + ) + data = json.loads(original.model_dump_json()) + restored = AssemblySpecification(**data) + assert restored.jsonschema == {"$ref": url} + + def test_knowledge_service_queries_validated_against_resolved_ref( + self, schema_server + ) -> None: + """JSON Pointer keys in knowledge_service_queries are checked against + the resolved $ref content, so valid pointers are accepted.""" + url = schema_server.register( + "/product.json", + {"type": "object", "properties": {"sku": {"type": "string"}}}, + ) + spec = AssemblySpecification( + assembly_specification_id="ksq-ref-test", + name="KSQ Ref Test", + applicability="Testing pointer validation against $ref", + jsonschema={"$ref": url}, + knowledge_service_queries={"/properties/sku": "extract-sku"}, + ) + assert spec.knowledge_service_queries == {"/properties/sku": "extract-sku"} + + def test_knowledge_service_queries_rejects_pointer_absent_from_resolved_ref( + self, schema_server + ) -> None: + """A JSON Pointer that does not exist in the resolved $ref schema is + rejected, even though the bare $ref dict has no such path.""" + url = schema_server.register( + "/product.json", + {"type": "object", "properties": {"sku": {"type": "string"}}}, + ) + with pytest.raises(ValidationError, match="nonexistent"): + AssemblySpecification( + assembly_specification_id="bad-pointer-test", + name="Bad Pointer Test", + applicability="Testing pointer rejection against $ref", + jsonschema={"$ref": url}, + knowledge_service_queries={"/properties/nonexistent": "query-1"}, + ) diff --git a/src/julee/contrib/ceap/use_cases/tests/test_extract_assemble_data.py b/src/julee/contrib/ceap/use_cases/tests/test_extract_assemble_data.py index 4fadb4f1..e8b8a8e0 100644 --- a/src/julee/contrib/ceap/use_cases/tests/test_extract_assemble_data.py +++ b/src/julee/contrib/ceap/use_cases/tests/test_extract_assemble_data.py @@ -26,6 +26,7 @@ ) from julee.contrib.ceap.domain.models.knowledge_service_config import ServiceApi from julee.contrib.ceap.use_cases import ExtractAssembleDataUseCase +from julee.contrib.ceap.use_cases.extract_assemble_data import _resolve_jsonschema from julee.repositories.memory import ( MemoryAssemblyRepository, MemoryAssemblySpecificationRepository, @@ -680,3 +681,54 @@ async def test_assembly_fails_with_invalid_json_schema( document_id="doc-123", assembly_specification_id="spec-123", ) + + +class TestResolveJsonSchema: + """Tests for the _resolve_jsonschema async helper.""" + + @pytest.mark.asyncio + async def test_inline_schema_returned_unchanged(self) -> None: + """An inline schema dict is returned as-is without any HTTP calls.""" + schema = { + "type": "object", + "properties": {"x": {"type": "string"}}, + } + result = await _resolve_jsonschema(schema) + assert result == schema + + @pytest.mark.asyncio + async def test_ref_schema_fetched_and_resolved(self, schema_server) -> None: + """A bare $ref is fetched over HTTP and the resolved schema is returned.""" + served = {"type": "object", "properties": {"y": {"type": "integer"}}} + url = schema_server.register("/schema.json", served) + result = await _resolve_jsonschema({"$ref": url}) + assert result == served + + @pytest.mark.asyncio + async def test_ref_with_fragment_extracts_sub_schema( + self, schema_server + ) -> None: + """A $ref with a fragment extracts the target sub-schema and bundles + the parent $defs so internal $refs remain valid.""" + full_schema = { + "$defs": { + "Address": { + "type": "object", + "properties": {"street": {"type": "string"}}, + }, + "Person": { + "type": "object", + "properties": { + "name": {"type": "string"}, + "address": {"$ref": "#/$defs/Address"}, + }, + }, + } + } + url = schema_server.register("/people.json", full_schema) + result = await _resolve_jsonschema({"$ref": f"{url}#/$defs/Person"}) + + assert result["type"] == "object" + assert "name" in result["properties"] + # Parent $defs must be bundled so the internal #/$defs/Address ref works + assert "Address" in result["$defs"] From e3a13c855cca9cf19effbac6f09faf163ee8f112 Mon Sep 17 00:00:00 2001 From: Michael Nelson Date: Thu, 9 Apr 2026 10:41:44 +1000 Subject: [PATCH 03/11] ceap: extract shared fragment resolution into _schema_ref module _fetch_and_resolve_ref and _resolve_jsonschema both duplicated the same fragment-extraction and $defs-bundling logic. Extract it into extract_schema_from_fetched() in a shared ceap._schema_ref module so the two callers only differ in how they perform the HTTP fetch (sync vs async). --- src/julee/contrib/ceap/_schema_ref.py | 35 +++++++++++++++++++ .../assembly_specification.py | 22 ++---------- .../ceap/use_cases/extract_assemble_data.py | 21 +++-------- .../tests/test_extract_assemble_data.py | 4 +-- 4 files changed, 44 insertions(+), 38 deletions(-) create mode 100644 src/julee/contrib/ceap/_schema_ref.py diff --git a/src/julee/contrib/ceap/_schema_ref.py b/src/julee/contrib/ceap/_schema_ref.py new file mode 100644 index 00000000..a4031f62 --- /dev/null +++ b/src/julee/contrib/ceap/_schema_ref.py @@ -0,0 +1,35 @@ +""" +Shared helpers for resolving JSON Schema $ref values. + +A bare $ref schema is a dict of exactly {"$ref": "url#/fragment"}. +Resolution means fetching the URL and, if a fragment is present, +navigating to the target sub-schema and bundling the parent $defs so +that internal $ref values within the sub-schema remain valid. +""" + +from typing import Any + +import jsonpointer # type: ignore + + +def extract_schema_from_fetched( + full_schema: dict[str, Any], fragment: str +) -> dict[str, Any]: + """Return the sub-schema identified by *fragment* from *full_schema*. + + If *fragment* is empty the full schema is returned as-is. Otherwise the + fragment is treated as a JSON Pointer; the target object is extracted and + the parent ``$defs`` are merged in so that any internal ``$ref`` values + within the sub-schema continue to resolve correctly. + """ + if not fragment: + return full_schema + + target = jsonpointer.resolve_pointer(full_schema, fragment) + if not isinstance(target, dict): + raise ValueError(f"$ref fragment '{fragment}' did not resolve to a JSON object") + result = dict(target) + parent_defs = full_schema.get("$defs", {}) + if parent_defs: + result["$defs"] = {**parent_defs, **result.get("$defs", {})} + return result diff --git a/src/julee/contrib/ceap/domain/models/assembly_specification/assembly_specification.py b/src/julee/contrib/ceap/domain/models/assembly_specification/assembly_specification.py index 56accad9..495ca54d 100644 --- a/src/julee/contrib/ceap/domain/models/assembly_specification/assembly_specification.py +++ b/src/julee/contrib/ceap/domain/models/assembly_specification/assembly_specification.py @@ -23,32 +23,16 @@ import jsonschema from pydantic import Field, field_validator +from julee.contrib.ceap._schema_ref import extract_schema_from_fetched from julee.core.entities.entity import Entity def _fetch_and_resolve_ref(ref: str) -> dict[str, Any]: - """Fetch an external $ref URL and return the resolved schema dict. - - Handles an optional JSON Pointer fragment (e.g. #/$defs/Product) by - extracting the target sub-schema and bundling parent $defs so internal - $ref values remain valid. - """ + """Fetch an external $ref URL and return the resolved schema dict.""" url, _, fragment = ref.partition("#") response = httpx.get(url) response.raise_for_status() - full_schema = response.json() - - if not fragment: - return full_schema - - target = jsonpointer.resolve_pointer(full_schema, fragment) - if not isinstance(target, dict): - raise ValueError(f"$ref fragment '{fragment}' did not resolve to a JSON object") - result = dict(target) - parent_defs = full_schema.get("$defs", {}) - if parent_defs: - result["$defs"] = {**parent_defs, **result.get("$defs", {})} - return result + return extract_schema_from_fetched(response.json(), fragment) class AssemblySpecificationStatus(str, Enum): diff --git a/src/julee/contrib/ceap/use_cases/extract_assemble_data.py b/src/julee/contrib/ceap/use_cases/extract_assemble_data.py index 84eed900..db07bce2 100644 --- a/src/julee/contrib/ceap/use_cases/extract_assemble_data.py +++ b/src/julee/contrib/ceap/use_cases/extract_assemble_data.py @@ -14,11 +14,11 @@ from typing import Any import httpx -import jsonpointer import jsonschema import multihash from pydantic import BaseModel +from julee.contrib.ceap._schema_ref import extract_schema_from_fetched from julee.contrib.ceap.domain.models import ( Assembly, AssemblySpecification, @@ -48,10 +48,9 @@ async def _resolve_jsonschema(schema: Mapping[str, Any]) -> dict[str, Any]: """Fetch and resolve a bare $ref schema; return inline schemas unchanged. - If the schema is exactly {"$ref": "url#/fragment"}, fetches the URL afresh, - navigates to the fragment, and bundles the parent $defs so internal $ref - values remain valid. Otherwise returns the schema as-is. Re-fetching on - every query ensures the latest published version of the schema is used. + If the schema is exactly {"$ref": "url#/fragment"}, fetches the URL afresh + and delegates fragment extraction to extract_schema_from_fetched. + Re-fetching on every query ensures the latest published version is used. """ if not (len(schema) == 1 and "$ref" in schema): return dict(schema) @@ -62,17 +61,7 @@ async def _resolve_jsonschema(schema: Mapping[str, Any]) -> dict[str, Any]: response.raise_for_status() full_schema = response.json() - if not fragment: - return full_schema - - target = jsonpointer.resolve_pointer(full_schema, fragment) - if not isinstance(target, dict): - raise ValueError(f"$ref fragment '{fragment}' did not resolve to a JSON object") - result = dict(target) - parent_defs = full_schema.get("$defs", {}) - if parent_defs: - result["$defs"] = {**parent_defs, **result.get("$defs", {})} - return result + return extract_schema_from_fetched(full_schema, fragment) class ExtractAssembleDataRequest(BaseModel): diff --git a/src/julee/contrib/ceap/use_cases/tests/test_extract_assemble_data.py b/src/julee/contrib/ceap/use_cases/tests/test_extract_assemble_data.py index e8b8a8e0..e782b194 100644 --- a/src/julee/contrib/ceap/use_cases/tests/test_extract_assemble_data.py +++ b/src/julee/contrib/ceap/use_cases/tests/test_extract_assemble_data.py @@ -705,9 +705,7 @@ async def test_ref_schema_fetched_and_resolved(self, schema_server) -> None: assert result == served @pytest.mark.asyncio - async def test_ref_with_fragment_extracts_sub_schema( - self, schema_server - ) -> None: + async def test_ref_with_fragment_extracts_sub_schema(self, schema_server) -> None: """A $ref with a fragment extracts the target sub-schema and bundles the parent $defs so internal $refs remain valid.""" full_schema = { From 6c9a38dda66fe1cbfcf8e809a4d285600b038d55 Mon Sep 17 00:00:00 2001 From: Michael Nelson Date: Thu, 9 Apr 2026 10:46:29 +1000 Subject: [PATCH 04/11] julee: bump version to 0.1.16 --- src/julee/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/julee/__init__.py b/src/julee/__init__.py index 453398d2..a591dc6d 100644 --- a/src/julee/__init__.py +++ b/src/julee/__init__.py @@ -1,3 +1,3 @@ """Julee - Clean architecture for accountable and transparent digital supply chains.""" -__version__ = "0.1.15" +__version__ = "0.1.16" From 82ce33ff0dde298b81f2f903a045d606de304672 Mon Sep 17 00:00:00 2001 From: Michael Nelson Date: Thu, 9 Apr 2026 12:13:32 +1000 Subject: [PATCH 05/11] ceap: route $ref schema fetching through a Temporal activity MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Direct httpx calls in ExtractAssembleDataUseCase violated Temporal's determinism requirement (non-deterministic I/O in workflow code breaks replay) and Clean Architecture (HTTP is infrastructure, not use case logic). The sandbox ban on httpx at import time was the signal, not a technicality to work around. Introduces RemoteSchemaRepository — a single fetch(url) protocol — run through the standard three-layer pattern: HttpRemoteSchemaRepository (Layer 1), TemporalHttpRemoteSchemaRepository (Layer 2 activity), WorkflowRemoteSchemaRepositoryProxy (Layer 3 proxy). The use case _resolve_jsonschema becomes an instance method calling the injected repo, keeping all I/O behind the activity boundary. --- .../ceap/apps/worker/extract_assemble.py | 2 + .../ceap/domain/repositories/__init__.py | 2 + .../ceap/domain/repositories/remote_schema.py | 8 ++++ .../ceap/use_cases/extract_assemble_data.py | 42 +++++++++---------- .../tests/test_extract_assemble_data.py | 37 +++++++++++++--- src/julee/repositories/http/__init__.py | 0 src/julee/repositories/http/schema.py | 11 +++++ src/julee/repositories/memory/__init__.py | 2 + .../repositories/memory/remote_schema.py | 14 +++++++ src/julee/repositories/temporal/activities.py | 10 +++++ .../repositories/temporal/activity_names.py | 2 + src/julee/repositories/temporal/proxies.py | 18 ++++++++ 12 files changed, 122 insertions(+), 26 deletions(-) create mode 100644 src/julee/contrib/ceap/domain/repositories/remote_schema.py create mode 100644 src/julee/repositories/http/__init__.py create mode 100644 src/julee/repositories/http/schema.py create mode 100644 src/julee/repositories/memory/remote_schema.py diff --git a/src/julee/contrib/ceap/apps/worker/extract_assemble.py b/src/julee/contrib/ceap/apps/worker/extract_assemble.py index 33fec438..f7b6588b 100644 --- a/src/julee/contrib/ceap/apps/worker/extract_assemble.py +++ b/src/julee/contrib/ceap/apps/worker/extract_assemble.py @@ -22,6 +22,7 @@ WorkflowDocumentRepositoryProxy, WorkflowKnowledgeServiceConfigRepositoryProxy, WorkflowKnowledgeServiceQueryRepositoryProxy, + WorkflowRemoteSchemaRepositoryProxy, ) from julee.services.temporal.proxies import ( WorkflowKnowledgeServiceProxy, @@ -127,6 +128,7 @@ async def run(self, document_id: str, assembly_specification_id: str) -> Assembl knowledge_service_query_repo=knowledge_service_query_repo, knowledge_service_config_repo=knowledge_service_config_repo, knowledge_service=knowledge_service, + remote_schema_repo=WorkflowRemoteSchemaRepositoryProxy(), # type: ignore[abstract] clock_service=clock_service, execution_service=execution_service, ) diff --git a/src/julee/contrib/ceap/domain/repositories/__init__.py b/src/julee/contrib/ceap/domain/repositories/__init__.py index b14fdceb..b82677e8 100644 --- a/src/julee/contrib/ceap/domain/repositories/__init__.py +++ b/src/julee/contrib/ceap/domain/repositories/__init__.py @@ -12,6 +12,7 @@ from .knowledge_service_config import KnowledgeServiceConfigRepository from .knowledge_service_query import KnowledgeServiceQueryRepository from .policy import PolicyRepository +from .remote_schema import RemoteSchemaRepository __all__ = [ "DocumentRepository", @@ -21,4 +22,5 @@ "KnowledgeServiceQueryRepository", "PolicyRepository", "DocumentPolicyValidationRepository", + "RemoteSchemaRepository", ] diff --git a/src/julee/contrib/ceap/domain/repositories/remote_schema.py b/src/julee/contrib/ceap/domain/repositories/remote_schema.py new file mode 100644 index 00000000..ba9765e6 --- /dev/null +++ b/src/julee/contrib/ceap/domain/repositories/remote_schema.py @@ -0,0 +1,8 @@ +from typing import Any, Protocol, runtime_checkable + + +@runtime_checkable +class RemoteSchemaRepository(Protocol): + async def fetch(self, url: str) -> dict[str, Any]: + """Fetch and return the JSON document at url.""" + ... diff --git a/src/julee/contrib/ceap/use_cases/extract_assemble_data.py b/src/julee/contrib/ceap/use_cases/extract_assemble_data.py index db07bce2..beb0cea5 100644 --- a/src/julee/contrib/ceap/use_cases/extract_assemble_data.py +++ b/src/julee/contrib/ceap/use_cases/extract_assemble_data.py @@ -13,7 +13,6 @@ from collections.abc import Mapping from typing import Any -import httpx import jsonschema import multihash from pydantic import BaseModel @@ -33,6 +32,7 @@ DocumentRepository, KnowledgeServiceConfigRepository, KnowledgeServiceQueryRepository, + RemoteSchemaRepository, ) from julee.core.services import ClockService, ExecutionService, SystemClockService from julee.core.services.execution import DefaultExecutionService @@ -45,25 +45,6 @@ logger = logging.getLogger(__name__) -async def _resolve_jsonschema(schema: Mapping[str, Any]) -> dict[str, Any]: - """Fetch and resolve a bare $ref schema; return inline schemas unchanged. - - If the schema is exactly {"$ref": "url#/fragment"}, fetches the URL afresh - and delegates fragment extraction to extract_schema_from_fetched. - Re-fetching on every query ensures the latest published version is used. - """ - if not (len(schema) == 1 and "$ref" in schema): - return dict(schema) - - url, _, fragment = schema["$ref"].partition("#") - async with httpx.AsyncClient() as client: - response = await client.get(url) - response.raise_for_status() - full_schema = response.json() - - return extract_schema_from_fetched(full_schema, fragment) - - class ExtractAssembleDataRequest(BaseModel): document_id: str assembly_specification_id: str @@ -109,6 +90,7 @@ def __init__( knowledge_service_query_repo: KnowledgeServiceQueryRepository, knowledge_service_config_repo: KnowledgeServiceConfigRepository, knowledge_service: KnowledgeService, + remote_schema_repo: RemoteSchemaRepository, clock_service: ClockService | None = None, execution_service: ExecutionService | None = None, ) -> None: @@ -149,6 +131,10 @@ def __init__( DocumentRepository, # type: ignore[type-abstract] ) self.knowledge_service = knowledge_service + self.remote_schema_repo = ensure_repository_protocol( + remote_schema_repo, + RemoteSchemaRepository, # type: ignore[type-abstract] + ) self._clock_service: ClockService = clock_service or SystemClockService() self._execution_service: ExecutionService = ( execution_service or DefaultExecutionService() @@ -401,6 +387,20 @@ async def _retrieve_all_queries( queries[query_id] = query return queries + async def _resolve_jsonschema(self, schema: Mapping[str, Any]) -> dict[str, Any]: + """Fetch and resolve a bare $ref schema; return inline schemas unchanged. + + If the schema is exactly {"$ref": "url#/fragment"}, fetches the URL via + the injected remote_schema_repo (a Temporal activity in workflow context) + and delegates fragment extraction to extract_schema_from_fetched. + Re-fetching on every query ensures the latest published version is used. + """ + if not (len(schema) == 1 and "$ref" in schema): + return dict(schema) + url, _, fragment = schema["$ref"].partition("#") + full_schema = await self.remote_schema_repo.fetch(url) + return extract_schema_from_fetched(full_schema, fragment) + @try_use_case_step("assembly_iteration") async def _assemble_iteration( self, @@ -438,7 +438,7 @@ async def _assemble_iteration( # Resolve $ref schemas afresh on every query so any published patch # to the external schema is picked up automatically. - resolved_jsonschema = await _resolve_jsonschema( + resolved_jsonschema = await self._resolve_jsonschema( assembly_specification.jsonschema ) diff --git a/src/julee/contrib/ceap/use_cases/tests/test_extract_assemble_data.py b/src/julee/contrib/ceap/use_cases/tests/test_extract_assemble_data.py index e782b194..cef43697 100644 --- a/src/julee/contrib/ceap/use_cases/tests/test_extract_assemble_data.py +++ b/src/julee/contrib/ceap/use_cases/tests/test_extract_assemble_data.py @@ -26,13 +26,14 @@ ) from julee.contrib.ceap.domain.models.knowledge_service_config import ServiceApi from julee.contrib.ceap.use_cases import ExtractAssembleDataUseCase -from julee.contrib.ceap.use_cases.extract_assemble_data import _resolve_jsonschema +from julee.repositories.http.schema import HttpRemoteSchemaRepository from julee.repositories.memory import ( MemoryAssemblyRepository, MemoryAssemblySpecificationRepository, MemoryDocumentRepository, MemoryKnowledgeServiceConfigRepository, MemoryKnowledgeServiceQueryRepository, + MemoryRemoteSchemaRepository, ) from julee.services.knowledge_service import QueryResult from julee.services.knowledge_service.memory import ( @@ -125,6 +126,11 @@ def configured_knowledge_service(self) -> MemoryKnowledgeService: ) return memory_service + @pytest.fixture + def remote_schema_repo(self) -> MemoryRemoteSchemaRepository: + """Create a memory RemoteSchemaRepository for testing.""" + return MemoryRemoteSchemaRepository() + @pytest.fixture def use_case( self, @@ -134,6 +140,7 @@ def use_case( knowledge_service_query_repo: MemoryKnowledgeServiceQueryRepository, knowledge_service_config_repo: MemoryKnowledgeServiceConfigRepository, knowledge_service: MemoryKnowledgeService, + remote_schema_repo: MemoryRemoteSchemaRepository, ) -> ExtractAssembleDataUseCase: """Create ExtractAssembleDataUseCase with memory repository dependencies.""" @@ -144,6 +151,7 @@ def use_case( knowledge_service_query_repo=knowledge_service_query_repo, knowledge_service_config_repo=knowledge_service_config_repo, knowledge_service=knowledge_service, + remote_schema_repo=remote_schema_repo, ) @pytest.fixture @@ -155,6 +163,7 @@ def configured_use_case( knowledge_service_query_repo: MemoryKnowledgeServiceQueryRepository, knowledge_service_config_repo: MemoryKnowledgeServiceConfigRepository, configured_knowledge_service: MemoryKnowledgeService, + remote_schema_repo: MemoryRemoteSchemaRepository, ) -> ExtractAssembleDataUseCase: """Create ExtractAssembleDataUseCase with configured knowledge service for full workflow tests.""" @@ -165,6 +174,7 @@ def configured_use_case( knowledge_service_query_repo=knowledge_service_query_repo, knowledge_service_config_repo=knowledge_service_config_repo, knowledge_service=configured_knowledge_service, + remote_schema_repo=remote_schema_repo, ) @pytest.mark.asyncio @@ -670,6 +680,7 @@ async def test_assembly_fails_with_invalid_json_schema( knowledge_service_query_repo=knowledge_service_query_repo, knowledge_service_config_repo=knowledge_service_config_repo, knowledge_service=memory_service, + remote_schema_repo=MemoryRemoteSchemaRepository(), ) # Act & Assert @@ -684,7 +695,18 @@ async def test_assembly_fails_with_invalid_json_schema( class TestResolveJsonSchema: - """Tests for the _resolve_jsonschema async helper.""" + """Tests for ExtractAssembleDataUseCase._resolve_jsonschema.""" + + def _make_use_case(self, remote_schema_repo) -> ExtractAssembleDataUseCase: + return ExtractAssembleDataUseCase( + document_repo=MemoryDocumentRepository(), + assembly_repo=MemoryAssemblyRepository(), + assembly_specification_repo=MemoryAssemblySpecificationRepository(), + knowledge_service_query_repo=MemoryKnowledgeServiceQueryRepository(), + knowledge_service_config_repo=MemoryKnowledgeServiceConfigRepository(), + knowledge_service=AsyncMock(), + remote_schema_repo=remote_schema_repo, + ) @pytest.mark.asyncio async def test_inline_schema_returned_unchanged(self) -> None: @@ -693,7 +715,8 @@ async def test_inline_schema_returned_unchanged(self) -> None: "type": "object", "properties": {"x": {"type": "string"}}, } - result = await _resolve_jsonschema(schema) + use_case = self._make_use_case(MemoryRemoteSchemaRepository()) + result = await use_case._resolve_jsonschema(schema) assert result == schema @pytest.mark.asyncio @@ -701,13 +724,16 @@ async def test_ref_schema_fetched_and_resolved(self, schema_server) -> None: """A bare $ref is fetched over HTTP and the resolved schema is returned.""" served = {"type": "object", "properties": {"y": {"type": "integer"}}} url = schema_server.register("/schema.json", served) - result = await _resolve_jsonschema({"$ref": url}) + use_case = self._make_use_case(HttpRemoteSchemaRepository()) + result = await use_case._resolve_jsonschema({"$ref": url}) assert result == served @pytest.mark.asyncio async def test_ref_with_fragment_extracts_sub_schema(self, schema_server) -> None: """A $ref with a fragment extracts the target sub-schema and bundles the parent $defs so internal $refs remain valid.""" + from julee.repositories.http.schema import HttpRemoteSchemaRepository + full_schema = { "$defs": { "Address": { @@ -724,7 +750,8 @@ async def test_ref_with_fragment_extracts_sub_schema(self, schema_server) -> Non } } url = schema_server.register("/people.json", full_schema) - result = await _resolve_jsonschema({"$ref": f"{url}#/$defs/Person"}) + use_case = self._make_use_case(HttpRemoteSchemaRepository()) + result = await use_case._resolve_jsonschema({"$ref": f"{url}#/$defs/Person"}) assert result["type"] == "object" assert "name" in result["properties"] diff --git a/src/julee/repositories/http/__init__.py b/src/julee/repositories/http/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/julee/repositories/http/schema.py b/src/julee/repositories/http/schema.py new file mode 100644 index 00000000..ac6a1b83 --- /dev/null +++ b/src/julee/repositories/http/schema.py @@ -0,0 +1,11 @@ +from typing import Any + +import httpx + + +class HttpRemoteSchemaRepository: + async def fetch(self, url: str) -> dict[str, Any]: + async with httpx.AsyncClient() as client: + response = await client.get(url) + response.raise_for_status() + return response.json() diff --git a/src/julee/repositories/memory/__init__.py b/src/julee/repositories/memory/__init__.py index 99f5d28a..f30675f0 100644 --- a/src/julee/repositories/memory/__init__.py +++ b/src/julee/repositories/memory/__init__.py @@ -19,6 +19,7 @@ from .knowledge_service_config import MemoryKnowledgeServiceConfigRepository from .knowledge_service_query import MemoryKnowledgeServiceQueryRepository from .policy import MemoryPolicyRepository +from .remote_schema import MemoryRemoteSchemaRepository __all__ = [ "MemoryAssemblyRepository", @@ -28,4 +29,5 @@ "MemoryKnowledgeServiceConfigRepository", "MemoryKnowledgeServiceQueryRepository", "MemoryPolicyRepository", + "MemoryRemoteSchemaRepository", ] diff --git a/src/julee/repositories/memory/remote_schema.py b/src/julee/repositories/memory/remote_schema.py new file mode 100644 index 00000000..a7c56c7d --- /dev/null +++ b/src/julee/repositories/memory/remote_schema.py @@ -0,0 +1,14 @@ +from typing import Any + + +class MemoryRemoteSchemaRepository: + def __init__(self, schemas: dict[str, dict] | None = None) -> None: + self._schemas: dict[str, dict] = schemas or {} + + def register(self, url: str, schema: dict) -> None: + self._schemas[url] = schema + + async def fetch(self, url: str) -> dict[str, Any]: + if url not in self._schemas: + raise ValueError(f"No schema registered for URL: {url}") + return dict(self._schemas[url]) diff --git a/src/julee/repositories/temporal/activities.py b/src/julee/repositories/temporal/activities.py index bdfe65da..180bed79 100644 --- a/src/julee/repositories/temporal/activities.py +++ b/src/julee/repositories/temporal/activities.py @@ -27,6 +27,7 @@ from julee.repositories.minio.policy import ( MinioPolicyRepository, ) +from julee.repositories.http.schema import HttpRemoteSchemaRepository # Import activity name bases from shared module from julee.repositories.temporal.activity_names import ( @@ -37,6 +38,7 @@ KNOWLEDGE_SERVICE_CONFIG_ACTIVITY_BASE, KNOWLEDGE_SERVICE_QUERY_ACTIVITY_BASE, POLICY_ACTIVITY_BASE, + REMOTE_SCHEMA_ACTIVITY_BASE, ) from julee.util.temporal.decorators import temporal_activity_registration @@ -98,6 +100,13 @@ class TemporalMinioDocumentPolicyValidationRepository( pass +@temporal_activity_registration(REMOTE_SCHEMA_ACTIVITY_BASE) +class TemporalHttpRemoteSchemaRepository(HttpRemoteSchemaRepository): + """Temporal activity wrapper for HttpRemoteSchemaRepository.""" + + pass + + # Export the temporal repository classes for use in worker.py __all__ = [ "TemporalMinioAssemblyRepository", @@ -111,4 +120,5 @@ class TemporalMinioDocumentPolicyValidationRepository( "DOCUMENT_ACTIVITY_BASE", "KNOWLEDGE_SERVICE_CONFIG_ACTIVITY_BASE", "KNOWLEDGE_SERVICE_QUERY_ACTIVITY_BASE", + "TemporalHttpRemoteSchemaRepository", ] diff --git a/src/julee/repositories/temporal/activity_names.py b/src/julee/repositories/temporal/activity_names.py index 3f5ec1f2..ba80f076 100644 --- a/src/julee/repositories/temporal/activity_names.py +++ b/src/julee/repositories/temporal/activity_names.py @@ -20,6 +20,7 @@ KNOWLEDGE_SERVICE_QUERY_ACTIVITY_BASE = "julee.knowledge_service_query_repo.minio" POLICY_ACTIVITY_BASE = "julee.policy_repo.minio" DOCUMENT_POLICY_VALIDATION_ACTIVITY_BASE = "julee.document_policy_validation_repo.minio" +REMOTE_SCHEMA_ACTIVITY_BASE = "julee.remote_schema_repo.http" # Export all constants @@ -31,4 +32,5 @@ "KNOWLEDGE_SERVICE_QUERY_ACTIVITY_BASE", "POLICY_ACTIVITY_BASE", "DOCUMENT_POLICY_VALIDATION_ACTIVITY_BASE", + "REMOTE_SCHEMA_ACTIVITY_BASE", ] diff --git a/src/julee/repositories/temporal/proxies.py b/src/julee/repositories/temporal/proxies.py index 333c2431..fda266f8 100644 --- a/src/julee/repositories/temporal/proxies.py +++ b/src/julee/repositories/temporal/proxies.py @@ -26,6 +26,7 @@ KnowledgeServiceQueryRepository, ) from julee.contrib.ceap.domain.repositories.policy import PolicyRepository +from julee.contrib.ceap.domain.repositories.remote_schema import RemoteSchemaRepository # Import activity name bases from shared module from julee.repositories.temporal.activity_names import ( @@ -36,6 +37,7 @@ KNOWLEDGE_SERVICE_CONFIG_ACTIVITY_BASE, KNOWLEDGE_SERVICE_QUERY_ACTIVITY_BASE, POLICY_ACTIVITY_BASE, + REMOTE_SCHEMA_ACTIVITY_BASE, ) from julee.util.temporal.decorators import temporal_workflow_proxy @@ -149,6 +151,21 @@ class WorkflowDocumentPolicyValidationRepositoryProxy( pass +@temporal_workflow_proxy( + activity_base=REMOTE_SCHEMA_ACTIVITY_BASE, + default_timeout_seconds=30, + retry_methods=["fetch"], +) +class WorkflowRemoteSchemaRepositoryProxy(RemoteSchemaRepository): + """ + Workflow implementation of RemoteSchemaRepository that calls activities. + All methods are automatically generated by the @temporal_workflow_proxy + decorator. + """ + + pass + + # Export the workflow proxy classes __all__ = [ "WorkflowAssemblyRepositoryProxy", @@ -156,4 +173,5 @@ class WorkflowDocumentPolicyValidationRepositoryProxy( "WorkflowDocumentRepositoryProxy", "WorkflowKnowledgeServiceConfigRepositoryProxy", "WorkflowKnowledgeServiceQueryRepositoryProxy", + "WorkflowRemoteSchemaRepositoryProxy", ] From bc71a7e2454c9ba992258cb877576e9547eee6e5 Mon Sep 17 00:00:00 2001 From: Michael Nelson Date: Thu, 9 Apr 2026 12:15:53 +1000 Subject: [PATCH 06/11] ceap: sort imports in temporal activities --- src/julee/repositories/temporal/activities.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/julee/repositories/temporal/activities.py b/src/julee/repositories/temporal/activities.py index 180bed79..a17a5c60 100644 --- a/src/julee/repositories/temporal/activities.py +++ b/src/julee/repositories/temporal/activities.py @@ -10,6 +10,7 @@ - Each repository type gets its own activity prefix """ +from julee.repositories.http.schema import HttpRemoteSchemaRepository from julee.repositories.minio.assembly import MinioAssemblyRepository from julee.repositories.minio.assembly_specification import ( MinioAssemblySpecificationRepository, @@ -27,7 +28,6 @@ from julee.repositories.minio.policy import ( MinioPolicyRepository, ) -from julee.repositories.http.schema import HttpRemoteSchemaRepository # Import activity name bases from shared module from julee.repositories.temporal.activity_names import ( From 170a4b9d1ca9379d406d29f709aaaca2bf176dc7 Mon Sep 17 00:00:00 2001 From: Michael Nelson Date: Thu, 9 Apr 2026 12:18:33 +1000 Subject: [PATCH 07/11] ceap: defer httpx import in AssemblySpecification validator --- .../models/assembly_specification/assembly_specification.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/julee/contrib/ceap/domain/models/assembly_specification/assembly_specification.py b/src/julee/contrib/ceap/domain/models/assembly_specification/assembly_specification.py index 495ca54d..c4be7429 100644 --- a/src/julee/contrib/ceap/domain/models/assembly_specification/assembly_specification.py +++ b/src/julee/contrib/ceap/domain/models/assembly_specification/assembly_specification.py @@ -18,7 +18,6 @@ from enum import Enum from typing import Any -import httpx import jsonpointer # type: ignore import jsonschema from pydantic import Field, field_validator @@ -29,6 +28,8 @@ def _fetch_and_resolve_ref(ref: str) -> dict[str, Any]: """Fetch an external $ref URL and return the resolved schema dict.""" + import httpx + url, _, fragment = ref.partition("#") response = httpx.get(url) response.raise_for_status() From fa83c6b954cbf6cd9c9797c518bff1562b39e4a8 Mon Sep 17 00:00:00 2001 From: Michael Nelson Date: Thu, 9 Apr 2026 13:25:09 +1000 Subject: [PATCH 08/11] assembly_spec: defer $ref resolution to assembly time MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Domain model validators cannot make HTTP calls — they run at object construction and deserialization time, including inside Temporal workflow replay where networking is banned by the sandbox. Bare \$ref schemas are now accepted as-is in the jsonschema validator; existence-checking of JSON Pointer keys against the resolved schema is skipped when the schema is a bare \$ref. Both deferral cases have explicit comments explaining when resolution actually happens (RemoteSchemaRepository at assembly time). --- .../assembly_specification.py | 48 ++++++------------- 1 file changed, 14 insertions(+), 34 deletions(-) diff --git a/src/julee/contrib/ceap/domain/models/assembly_specification/assembly_specification.py b/src/julee/contrib/ceap/domain/models/assembly_specification/assembly_specification.py index c4be7429..4cb255cf 100644 --- a/src/julee/contrib/ceap/domain/models/assembly_specification/assembly_specification.py +++ b/src/julee/contrib/ceap/domain/models/assembly_specification/assembly_specification.py @@ -22,20 +22,9 @@ import jsonschema from pydantic import Field, field_validator -from julee.contrib.ceap._schema_ref import extract_schema_from_fetched from julee.core.entities.entity import Entity -def _fetch_and_resolve_ref(ref: str) -> dict[str, Any]: - """Fetch an external $ref URL and return the resolved schema dict.""" - import httpx - - url, _, fragment = ref.partition("#") - response = httpx.get(url) - response.raise_for_status() - return extract_schema_from_fetched(response.json(), fragment) - - class AssemblySpecificationStatus(str, Enum): """Status of an assembly specification configuration.""" @@ -125,18 +114,10 @@ def jsonschema_must_be_valid(cls, v: dict[str, Any]) -> dict[str, Any]: raise ValueError("JSON Schema must be a dictionary") if len(v) == 1 and "$ref" in v: - # Resolve the ref to validate it produces a valid schema, - # but store the original $ref unchanged. - try: - resolved = _fetch_and_resolve_ref(v["$ref"]) - except Exception as e: - raise ValueError(f"Could not resolve $ref '{v['$ref']}': {e}") - if "type" not in resolved: - raise ValueError("Resolved $ref schema must have a 'type' field") - try: - jsonschema.Draft7Validator.check_schema(resolved) - except jsonschema.SchemaError as e: - raise ValueError(f"Invalid JSON Schema at $ref: {e.message}") + # Bare $ref — accept as-is. Resolution and schema validation + # happen at assembly time via RemoteSchemaRepository, not here. + if not isinstance(v["$ref"], str) or not v["$ref"].strip(): + raise ValueError("$ref value must be a non-empty string") return v if "type" not in v: @@ -162,17 +143,11 @@ def knowledge_service_queries_must_be_valid( if not jsonschema_value: raise ValueError("Cannot validate schema pointers without jsonschema field") - # If jsonschema is a bare $ref, resolve it to validate pointers against - # the actual schema content - if ( + is_ref_schema = ( isinstance(jsonschema_value, dict) and len(jsonschema_value) == 1 and "$ref" in jsonschema_value - ): - try: - jsonschema_value = _fetch_and_resolve_ref(jsonschema_value["$ref"]) - except Exception as e: - raise ValueError(f"Could not resolve $ref for pointer validation: {e}") + ) cleaned_queries = {} for schema_pointer, query_id in v.items(): @@ -180,20 +155,25 @@ def knowledge_service_queries_must_be_valid( if not isinstance(schema_pointer, str): raise ValueError("Schema pointer keys must be strings") - # Validate JSON Pointer format and that it exists in the schema + # Validate JSON Pointer format; existence against the resolved + # schema is only possible for inline schemas (not bare $refs — + # those are resolved at assembly time via RemoteSchemaRepository). try: if schema_pointer == "": # Empty string is valid - refers to root of schema pass + elif is_ref_schema: + # Format validation only — can't check existence without + # fetching the remote schema + jsonpointer.JsonPointer(schema_pointer) else: - # Use jsonpointer to validate format and existence ptr = jsonpointer.JsonPointer(schema_pointer) ptr.resolve(jsonschema_value) except jsonpointer.JsonPointerException as e: raise ValueError(f"Invalid JSON Pointer '{schema_pointer}': {e}") except (KeyError, IndexError, TypeError): raise ValueError( - f"JSON Pointer '{schema_pointer}' does not exist in " f"schema" + f"JSON Pointer '{schema_pointer}' does not exist in schema" ) # Validate query ID values From 454d1f31c9c3f44217519e404fdb87159a5c7579 Mon Sep 17 00:00:00 2001 From: Michael Nelson Date: Thu, 9 Apr 2026 13:36:37 +1000 Subject: [PATCH 09/11] assembly_spec: update tests to reflect deferred validation Tests that expected ValidationError on unresolvable URLs or missing pointers in remote schemas were written against the old eager-fetch behaviour. Now that the domain model defers resolution to assembly time, those cases are accepted at construction; update tests accordingly and add an explicit test that malformed pointer format is still rejected. --- .../tests/test_assembly_specification.py | 74 ++++++++++--------- 1 file changed, 41 insertions(+), 33 deletions(-) diff --git a/src/julee/contrib/ceap/domain/models/assembly_specification/tests/test_assembly_specification.py b/src/julee/contrib/ceap/domain/models/assembly_specification/tests/test_assembly_specification.py index 0ba0202a..5a96942f 100644 --- a/src/julee/contrib/ceap/domain/models/assembly_specification/tests/test_assembly_specification.py +++ b/src/julee/contrib/ceap/domain/models/assembly_specification/tests/test_assembly_specification.py @@ -539,15 +539,19 @@ def test_ref_with_fragment_is_accepted(self, schema_server) -> None: ) assert spec.jsonschema == {"$ref": ref} - def test_ref_to_unresolvable_url_raises(self) -> None: - """A $ref that cannot be fetched raises a ValidationError.""" - with pytest.raises(ValidationError, match="Could not resolve"): - AssemblySpecification( - assembly_specification_id="bad-ref-test", - name="Bad Ref Test", - applicability="Testing invalid $ref", - jsonschema={"$ref": _UNRESOLVABLE_URL}, - ) + def test_ref_to_unresolvable_url_is_accepted(self) -> None: + """A $ref pointing at an unresolvable URL is accepted as-is. + + Resolution is deferred to assembly time via RemoteSchemaRepository; + the domain model does not fetch remote schemas during construction. + """ + spec = AssemblySpecification( + assembly_specification_id="bad-ref-test", + name="Bad Ref Test", + applicability="Testing invalid $ref", + jsonschema={"$ref": _UNRESOLVABLE_URL}, + ) + assert spec.jsonschema == {"$ref": _UNRESOLVABLE_URL} def test_ref_survives_serialisation_roundtrip(self, schema_server) -> None: """The $ref value is preserved through model_dump_json and @@ -566,38 +570,42 @@ def test_ref_survives_serialisation_roundtrip(self, schema_server) -> None: restored = AssemblySpecification(**data) assert restored.jsonschema == {"$ref": url} - def test_knowledge_service_queries_validated_against_resolved_ref( - self, schema_server - ) -> None: - """JSON Pointer keys in knowledge_service_queries are checked against - the resolved $ref content, so valid pointers are accepted.""" - url = schema_server.register( - "/product.json", - {"type": "object", "properties": {"sku": {"type": "string"}}}, - ) + def test_knowledge_service_queries_format_validated_for_ref_schema(self) -> None: + """JSON Pointer keys in knowledge_service_queries are format-validated + for bare $ref schemas; existence against the resolved schema is deferred + to assembly time via RemoteSchemaRepository.""" spec = AssemblySpecification( assembly_specification_id="ksq-ref-test", name="KSQ Ref Test", applicability="Testing pointer validation against $ref", - jsonschema={"$ref": url}, + jsonschema={"$ref": _UNRESOLVABLE_URL}, knowledge_service_queries={"/properties/sku": "extract-sku"}, ) assert spec.knowledge_service_queries == {"/properties/sku": "extract-sku"} - def test_knowledge_service_queries_rejects_pointer_absent_from_resolved_ref( - self, schema_server - ) -> None: - """A JSON Pointer that does not exist in the resolved $ref schema is - rejected, even though the bare $ref dict has no such path.""" - url = schema_server.register( - "/product.json", - {"type": "object", "properties": {"sku": {"type": "string"}}}, + def test_knowledge_service_queries_pointer_absent_from_ref_is_accepted(self) -> None: + """A JSON Pointer that would not exist in the resolved $ref schema is + still accepted at construction time; existence checking is deferred to + assembly time when the remote schema can actually be fetched.""" + spec = AssemblySpecification( + assembly_specification_id="deferred-pointer-test", + name="Deferred Pointer Test", + applicability="Testing pointer deferral for $ref schemas", + jsonschema={"$ref": _UNRESOLVABLE_URL}, + knowledge_service_queries={"/properties/nonexistent": "query-1"}, ) - with pytest.raises(ValidationError, match="nonexistent"): + assert spec.knowledge_service_queries == {"/properties/nonexistent": "query-1"} + + def test_knowledge_service_queries_rejects_malformed_pointer_for_ref_schema( + self, + ) -> None: + """A malformed JSON Pointer (not starting with /) is rejected even for + bare $ref schemas, since format validation still applies.""" + with pytest.raises(ValidationError): AssemblySpecification( - assembly_specification_id="bad-pointer-test", - name="Bad Pointer Test", - applicability="Testing pointer rejection against $ref", - jsonschema={"$ref": url}, - knowledge_service_queries={"/properties/nonexistent": "query-1"}, + assembly_specification_id="bad-format-test", + name="Bad Format Test", + applicability="Testing malformed pointer rejection", + jsonschema={"$ref": _UNRESOLVABLE_URL}, + knowledge_service_queries={"not-a-pointer": "query-1"}, ) From 09b18486b8f5e49818bdc95734d45e2ef5505ace Mon Sep 17 00:00:00 2001 From: Michael Nelson Date: Thu, 9 Apr 2026 13:53:57 +1000 Subject: [PATCH 10/11] remote_schema: explicitly inherit from RemoteSchemaRepository temporal_activity_registration discovers methods to wrap by scanning for Protocol classes in the MRO. Without explicit inheritance, the Protocol never appears in the MRO and fetch is never registered as an activity, causing NotFoundError at runtime. --- src/julee/repositories/http/schema.py | 4 +++- src/julee/repositories/memory/remote_schema.py | 4 +++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/src/julee/repositories/http/schema.py b/src/julee/repositories/http/schema.py index ac6a1b83..fc2a9394 100644 --- a/src/julee/repositories/http/schema.py +++ b/src/julee/repositories/http/schema.py @@ -2,8 +2,10 @@ import httpx +from julee.contrib.ceap.domain.repositories.remote_schema import RemoteSchemaRepository -class HttpRemoteSchemaRepository: + +class HttpRemoteSchemaRepository(RemoteSchemaRepository): async def fetch(self, url: str) -> dict[str, Any]: async with httpx.AsyncClient() as client: response = await client.get(url) diff --git a/src/julee/repositories/memory/remote_schema.py b/src/julee/repositories/memory/remote_schema.py index a7c56c7d..9e380163 100644 --- a/src/julee/repositories/memory/remote_schema.py +++ b/src/julee/repositories/memory/remote_schema.py @@ -1,7 +1,9 @@ from typing import Any +from julee.contrib.ceap.domain.repositories.remote_schema import RemoteSchemaRepository -class MemoryRemoteSchemaRepository: + +class MemoryRemoteSchemaRepository(RemoteSchemaRepository): def __init__(self, schemas: dict[str, dict] | None = None) -> None: self._schemas: dict[str, dict] = schemas or {} From 841b69e95dd453fe57adc0e92543309404695d56 Mon Sep 17 00:00:00 2001 From: Michael Nelson Date: Thu, 9 Apr 2026 14:00:02 +1000 Subject: [PATCH 11/11] lint: format test_assembly_specification --- .../tests/test_assembly_specification.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/julee/contrib/ceap/domain/models/assembly_specification/tests/test_assembly_specification.py b/src/julee/contrib/ceap/domain/models/assembly_specification/tests/test_assembly_specification.py index 5a96942f..057517df 100644 --- a/src/julee/contrib/ceap/domain/models/assembly_specification/tests/test_assembly_specification.py +++ b/src/julee/contrib/ceap/domain/models/assembly_specification/tests/test_assembly_specification.py @@ -583,7 +583,9 @@ def test_knowledge_service_queries_format_validated_for_ref_schema(self) -> None ) assert spec.knowledge_service_queries == {"/properties/sku": "extract-sku"} - def test_knowledge_service_queries_pointer_absent_from_ref_is_accepted(self) -> None: + def test_knowledge_service_queries_pointer_absent_from_ref_is_accepted( + self, + ) -> None: """A JSON Pointer that would not exist in the resolved $ref schema is still accepted at construction time; existence checking is deferred to assembly time when the remote schema can actually be fetched."""