diff --git a/pyproject.toml b/pyproject.toml index dc25da6e..5e89a32e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "julee" -version = "0.1.15" +version = "0.1.16" description = "Julee - Clean architecture for accountable and transparent digital supply chains" readme = "README.md" requires-python = ">=3.11" @@ -46,6 +46,7 @@ dependencies = [ "six>=1.16.0", "jsonschema>=4.0.0", "jsonpointer>=3.0.0", + "httpx>=0.27.0", # Code introspection (doctrine tests, CLI) "griffe>=1.0.0,<2", "inflect>=7.5.0", diff --git a/src/julee/__init__.py b/src/julee/__init__.py index 453398d2..a591dc6d 100644 --- a/src/julee/__init__.py +++ b/src/julee/__init__.py @@ -1,3 +1,3 @@ """Julee - Clean architecture for accountable and transparent digital supply chains.""" -__version__ = "0.1.15" +__version__ = "0.1.16" diff --git a/src/julee/contrib/ceap/_schema_ref.py b/src/julee/contrib/ceap/_schema_ref.py new file mode 100644 index 00000000..a4031f62 --- /dev/null +++ b/src/julee/contrib/ceap/_schema_ref.py @@ -0,0 +1,35 @@ +""" +Shared helpers for resolving JSON Schema $ref values. + +A bare $ref schema is a dict of exactly {"$ref": "url#/fragment"}. +Resolution means fetching the URL and, if a fragment is present, +navigating to the target sub-schema and bundling the parent $defs so +that internal $ref values within the sub-schema remain valid. +""" + +from typing import Any + +import jsonpointer # type: ignore + + +def extract_schema_from_fetched( + full_schema: dict[str, Any], fragment: str +) -> dict[str, Any]: + """Return the sub-schema identified by *fragment* from *full_schema*. + + If *fragment* is empty the full schema is returned as-is. Otherwise the + fragment is treated as a JSON Pointer; the target object is extracted and + the parent ``$defs`` are merged in so that any internal ``$ref`` values + within the sub-schema continue to resolve correctly. + """ + if not fragment: + return full_schema + + target = jsonpointer.resolve_pointer(full_schema, fragment) + if not isinstance(target, dict): + raise ValueError(f"$ref fragment '{fragment}' did not resolve to a JSON object") + result = dict(target) + parent_defs = full_schema.get("$defs", {}) + if parent_defs: + result["$defs"] = {**parent_defs, **result.get("$defs", {})} + return result diff --git a/src/julee/contrib/ceap/apps/worker/extract_assemble.py b/src/julee/contrib/ceap/apps/worker/extract_assemble.py index 33fec438..f7b6588b 100644 --- a/src/julee/contrib/ceap/apps/worker/extract_assemble.py +++ b/src/julee/contrib/ceap/apps/worker/extract_assemble.py @@ -22,6 +22,7 @@ WorkflowDocumentRepositoryProxy, WorkflowKnowledgeServiceConfigRepositoryProxy, WorkflowKnowledgeServiceQueryRepositoryProxy, + WorkflowRemoteSchemaRepositoryProxy, ) from julee.services.temporal.proxies import ( WorkflowKnowledgeServiceProxy, @@ -127,6 +128,7 @@ async def run(self, document_id: str, assembly_specification_id: str) -> Assembl knowledge_service_query_repo=knowledge_service_query_repo, knowledge_service_config_repo=knowledge_service_config_repo, knowledge_service=knowledge_service, + remote_schema_repo=WorkflowRemoteSchemaRepositoryProxy(), # type: ignore[abstract] clock_service=clock_service, execution_service=execution_service, ) diff --git a/src/julee/contrib/ceap/conftest.py b/src/julee/contrib/ceap/conftest.py new file mode 100644 index 00000000..c2c85997 --- /dev/null +++ b/src/julee/contrib/ceap/conftest.py @@ -0,0 +1,60 @@ +""" +Shared pytest fixtures for CEAP tests. +""" + +import json +import threading +from http.server import BaseHTTPRequestHandler, HTTPServer + +import pytest + + +@pytest.fixture +def schema_server(): + """Start a minimal HTTP server that serves registered JSON schemas. + + Binds to a random free port on localhost. Schemas are registered + via ``server.register(path, schema_dict)``, which returns the full URL. + + Usage:: + + def test_something(schema_server): + url = schema_server.register("/my.json", {"type": "object", ...}) + # use url in test + """ + registry: dict[str, bytes] = {} + + class _Handler(BaseHTTPRequestHandler): + def do_GET(self) -> None: + body = registry.get(self.path) + if body is not None: + self.send_response(200) + self.send_header("Content-Type", "application/json") + self.send_header("Content-Length", str(len(body))) + self.end_headers() + self.wfile.write(body) + else: + self.send_response(404) + self.end_headers() + + def log_message(self, format: str, *args: object) -> None: + pass # suppress test output + + server = HTTPServer(("127.0.0.1", 0), _Handler) + host, port = server.server_address + + thread = threading.Thread(target=server.serve_forever) + thread.daemon = True + thread.start() + + class _Server: + base_url = f"http://{host}:{port}" + + def register(self, path: str, schema: dict) -> str: + """Register schema at path; return its absolute URL.""" + registry[path] = json.dumps(schema).encode() + return f"{self.base_url}{path}" + + yield _Server() + + server.shutdown() diff --git a/src/julee/contrib/ceap/domain/models/assembly_specification/assembly_specification.py b/src/julee/contrib/ceap/domain/models/assembly_specification/assembly_specification.py index 9db0a555..4cb255cf 100644 --- a/src/julee/contrib/ceap/domain/models/assembly_specification/assembly_specification.py +++ b/src/julee/contrib/ceap/domain/models/assembly_specification/assembly_specification.py @@ -113,11 +113,16 @@ def jsonschema_must_be_valid(cls, v: dict[str, Any]) -> dict[str, Any]: if not isinstance(v, dict): raise ValueError("JSON Schema must be a dictionary") - # Basic validation that it looks like a JSON schema + if len(v) == 1 and "$ref" in v: + # Bare $ref — accept as-is. Resolution and schema validation + # happen at assembly time via RemoteSchemaRepository, not here. + if not isinstance(v["$ref"], str) or not v["$ref"].strip(): + raise ValueError("$ref value must be a non-empty string") + return v + if "type" not in v: raise ValueError("JSON Schema must have a 'type' field") - # Validate that it's a proper JSON Schema using jsonschema library try: jsonschema.Draft7Validator.check_schema(v) except jsonschema.SchemaError as e: @@ -138,26 +143,37 @@ def knowledge_service_queries_must_be_valid( if not jsonschema_value: raise ValueError("Cannot validate schema pointers without jsonschema field") + is_ref_schema = ( + isinstance(jsonschema_value, dict) + and len(jsonschema_value) == 1 + and "$ref" in jsonschema_value + ) + cleaned_queries = {} for schema_pointer, query_id in v.items(): # Validate schema pointer keys are strings if not isinstance(schema_pointer, str): raise ValueError("Schema pointer keys must be strings") - # Validate JSON Pointer format and that it exists in the schema + # Validate JSON Pointer format; existence against the resolved + # schema is only possible for inline schemas (not bare $refs — + # those are resolved at assembly time via RemoteSchemaRepository). try: if schema_pointer == "": # Empty string is valid - refers to root of schema pass + elif is_ref_schema: + # Format validation only — can't check existence without + # fetching the remote schema + jsonpointer.JsonPointer(schema_pointer) else: - # Use jsonpointer to validate format and existence ptr = jsonpointer.JsonPointer(schema_pointer) ptr.resolve(jsonschema_value) except jsonpointer.JsonPointerException as e: raise ValueError(f"Invalid JSON Pointer '{schema_pointer}': {e}") except (KeyError, IndexError, TypeError): raise ValueError( - f"JSON Pointer '{schema_pointer}' does not exist in " f"schema" + f"JSON Pointer '{schema_pointer}' does not exist in schema" ) # Validate query ID values diff --git a/src/julee/contrib/ceap/domain/models/assembly_specification/tests/test_assembly_specification.py b/src/julee/contrib/ceap/domain/models/assembly_specification/tests/test_assembly_specification.py index b52dad1f..057517df 100644 --- a/src/julee/contrib/ceap/domain/models/assembly_specification/tests/test_assembly_specification.py +++ b/src/julee/contrib/ceap/domain/models/assembly_specification/tests/test_assembly_specification.py @@ -31,6 +31,9 @@ from .factories import AssemblyFactory +# Guaranteed-unresolvable URL for negative tests (.invalid TLD per RFC 2606) +_UNRESOLVABLE_URL = "http://schema.invalid/test.json" + pytestmark = pytest.mark.unit @@ -493,3 +496,118 @@ def test_version_validation(self, version: str, expected_success: bool) -> None: else: with pytest.raises((ValueError, ValidationError)): AssemblyFactory.build(version=version) + + +class TestAssemblyRefSchemaValidation: + """Tests for AssemblySpecification with a bare $ref jsonschema value.""" + + def test_ref_schema_accepted_and_stored_unchanged(self, schema_server) -> None: + """A $ref pointing at a valid schema is accepted; the ref is stored + as-is rather than replaced with the resolved content.""" + url = schema_server.register( + "/schema.json", + {"type": "object", "properties": {"name": {"type": "string"}}}, + ) + spec = AssemblySpecification( + assembly_specification_id="ref-test", + name="Ref Test", + applicability="Testing $ref support", + jsonschema={"$ref": url}, + ) + assert spec.jsonschema == {"$ref": url} + + def test_ref_with_fragment_is_accepted(self, schema_server) -> None: + """A $ref with a JSON Pointer fragment is resolved to validate the + target sub-schema, but the original $ref value is stored unchanged.""" + url = schema_server.register( + "/full.json", + { + "$defs": { + "Item": { + "type": "object", + "properties": {"code": {"type": "string"}}, + } + } + }, + ) + ref = f"{url}#/$defs/Item" + spec = AssemblySpecification( + assembly_specification_id="fragment-test", + name="Fragment Test", + applicability="Testing fragment $ref support", + jsonschema={"$ref": ref}, + ) + assert spec.jsonschema == {"$ref": ref} + + def test_ref_to_unresolvable_url_is_accepted(self) -> None: + """A $ref pointing at an unresolvable URL is accepted as-is. + + Resolution is deferred to assembly time via RemoteSchemaRepository; + the domain model does not fetch remote schemas during construction. + """ + spec = AssemblySpecification( + assembly_specification_id="bad-ref-test", + name="Bad Ref Test", + applicability="Testing invalid $ref", + jsonschema={"$ref": _UNRESOLVABLE_URL}, + ) + assert spec.jsonschema == {"$ref": _UNRESOLVABLE_URL} + + def test_ref_survives_serialisation_roundtrip(self, schema_server) -> None: + """The $ref value is preserved through model_dump_json and + re-instantiation.""" + url = schema_server.register( + "/roundtrip.json", + {"type": "object", "properties": {"x": {"type": "integer"}}}, + ) + original = AssemblySpecification( + assembly_specification_id="roundtrip-test", + name="Roundtrip Test", + applicability="Testing serialisation roundtrip", + jsonschema={"$ref": url}, + ) + data = json.loads(original.model_dump_json()) + restored = AssemblySpecification(**data) + assert restored.jsonschema == {"$ref": url} + + def test_knowledge_service_queries_format_validated_for_ref_schema(self) -> None: + """JSON Pointer keys in knowledge_service_queries are format-validated + for bare $ref schemas; existence against the resolved schema is deferred + to assembly time via RemoteSchemaRepository.""" + spec = AssemblySpecification( + assembly_specification_id="ksq-ref-test", + name="KSQ Ref Test", + applicability="Testing pointer validation against $ref", + jsonschema={"$ref": _UNRESOLVABLE_URL}, + knowledge_service_queries={"/properties/sku": "extract-sku"}, + ) + assert spec.knowledge_service_queries == {"/properties/sku": "extract-sku"} + + def test_knowledge_service_queries_pointer_absent_from_ref_is_accepted( + self, + ) -> None: + """A JSON Pointer that would not exist in the resolved $ref schema is + still accepted at construction time; existence checking is deferred to + assembly time when the remote schema can actually be fetched.""" + spec = AssemblySpecification( + assembly_specification_id="deferred-pointer-test", + name="Deferred Pointer Test", + applicability="Testing pointer deferral for $ref schemas", + jsonschema={"$ref": _UNRESOLVABLE_URL}, + knowledge_service_queries={"/properties/nonexistent": "query-1"}, + ) + assert spec.knowledge_service_queries == {"/properties/nonexistent": "query-1"} + + def test_knowledge_service_queries_rejects_malformed_pointer_for_ref_schema( + self, + ) -> None: + """A malformed JSON Pointer (not starting with /) is rejected even for + bare $ref schemas, since format validation still applies.""" + with pytest.raises(ValidationError): + AssemblySpecification( + assembly_specification_id="bad-format-test", + name="Bad Format Test", + applicability="Testing malformed pointer rejection", + jsonschema={"$ref": _UNRESOLVABLE_URL}, + knowledge_service_queries={"not-a-pointer": "query-1"}, + ) diff --git a/src/julee/contrib/ceap/domain/repositories/__init__.py b/src/julee/contrib/ceap/domain/repositories/__init__.py index b14fdceb..b82677e8 100644 --- a/src/julee/contrib/ceap/domain/repositories/__init__.py +++ b/src/julee/contrib/ceap/domain/repositories/__init__.py @@ -12,6 +12,7 @@ from .knowledge_service_config import KnowledgeServiceConfigRepository from .knowledge_service_query import KnowledgeServiceQueryRepository from .policy import PolicyRepository +from .remote_schema import RemoteSchemaRepository __all__ = [ "DocumentRepository", @@ -21,4 +22,5 @@ "KnowledgeServiceQueryRepository", "PolicyRepository", "DocumentPolicyValidationRepository", + "RemoteSchemaRepository", ] diff --git a/src/julee/contrib/ceap/domain/repositories/remote_schema.py b/src/julee/contrib/ceap/domain/repositories/remote_schema.py new file mode 100644 index 00000000..ba9765e6 --- /dev/null +++ b/src/julee/contrib/ceap/domain/repositories/remote_schema.py @@ -0,0 +1,8 @@ +from typing import Any, Protocol, runtime_checkable + + +@runtime_checkable +class RemoteSchemaRepository(Protocol): + async def fetch(self, url: str) -> dict[str, Any]: + """Fetch and return the JSON document at url.""" + ... diff --git a/src/julee/contrib/ceap/use_cases/extract_assemble_data.py b/src/julee/contrib/ceap/use_cases/extract_assemble_data.py index e8049806..beb0cea5 100644 --- a/src/julee/contrib/ceap/use_cases/extract_assemble_data.py +++ b/src/julee/contrib/ceap/use_cases/extract_assemble_data.py @@ -10,12 +10,14 @@ import hashlib import json import logging +from collections.abc import Mapping from typing import Any import jsonschema import multihash from pydantic import BaseModel +from julee.contrib.ceap._schema_ref import extract_schema_from_fetched from julee.contrib.ceap.domain.models import ( Assembly, AssemblySpecification, @@ -30,6 +32,7 @@ DocumentRepository, KnowledgeServiceConfigRepository, KnowledgeServiceQueryRepository, + RemoteSchemaRepository, ) from julee.core.services import ClockService, ExecutionService, SystemClockService from julee.core.services.execution import DefaultExecutionService @@ -87,6 +90,7 @@ def __init__( knowledge_service_query_repo: KnowledgeServiceQueryRepository, knowledge_service_config_repo: KnowledgeServiceConfigRepository, knowledge_service: KnowledgeService, + remote_schema_repo: RemoteSchemaRepository, clock_service: ClockService | None = None, execution_service: ExecutionService | None = None, ) -> None: @@ -127,6 +131,10 @@ def __init__( DocumentRepository, # type: ignore[type-abstract] ) self.knowledge_service = knowledge_service + self.remote_schema_repo = ensure_repository_protocol( + remote_schema_repo, + RemoteSchemaRepository, # type: ignore[type-abstract] + ) self._clock_service: ClockService = clock_service or SystemClockService() self._execution_service: ExecutionService = ( execution_service or DefaultExecutionService() @@ -379,6 +387,20 @@ async def _retrieve_all_queries( queries[query_id] = query return queries + async def _resolve_jsonschema(self, schema: Mapping[str, Any]) -> dict[str, Any]: + """Fetch and resolve a bare $ref schema; return inline schemas unchanged. + + If the schema is exactly {"$ref": "url#/fragment"}, fetches the URL via + the injected remote_schema_repo (a Temporal activity in workflow context) + and delegates fragment extraction to extract_schema_from_fetched. + Re-fetching on every query ensures the latest published version is used. + """ + if not (len(schema) == 1 and "$ref" in schema): + return dict(schema) + url, _, fragment = schema["$ref"].partition("#") + full_schema = await self.remote_schema_repo.fetch(url) + return extract_schema_from_fetched(full_schema, fragment) + @try_use_case_step("assembly_iteration") async def _assemble_iteration( self, @@ -414,6 +436,12 @@ async def _assemble_iteration( # Initialize the result data structure assembled_data: dict[str, Any] = {} + # Resolve $ref schemas afresh on every query so any published patch + # to the external schema is picked up automatically. + resolved_jsonschema = await self._resolve_jsonschema( + assembly_specification.jsonschema + ) + # Process each knowledge service query # TODO: This is where we may want to fan-out/fan-in to do these # in parallel. @@ -422,7 +450,7 @@ async def _assemble_iteration( query_id, ) in assembly_specification.knowledge_service_queries.items(): # Use PointableJSONSchema to generate complete schema for pointer target - pointable_schema = PointableJSONSchema(assembly_specification.jsonschema) + pointable_schema = PointableJSONSchema(resolved_jsonschema) output_schema = pointable_schema.schema_for_pointer(schema_pointer) # Get the query configuration @@ -464,7 +492,7 @@ async def _assemble_iteration( ) # Validate the assembled data against the JSON schema - self._validate_assembled_data(assembled_data, assembly_specification) + self._validate_assembled_data(assembled_data, resolved_jsonschema) # Create the assembled document assembled_document_id = await self._create_assembled_document( @@ -597,26 +625,16 @@ async def _create_assembled_document( def _validate_assembled_data( self, assembled_data: dict[str, Any], - assembly_specification: AssemblySpecification, + resolved_jsonschema: dict[str, Any], ) -> None: """Validate that the assembled data conforms to the JSON schema.""" try: - jsonschema.validate(assembled_data, assembly_specification.jsonschema) - logger.debug( - "Assembled data validation passed", - extra={ - "assembly_specification_id": ( - assembly_specification.assembly_specification_id - ), - }, - ) + jsonschema.validate(assembled_data, resolved_jsonschema) + logger.debug("Assembled data validation passed") except jsonschema.ValidationError as e: logger.error( "Assembled data validation failed", extra={ - "assembly_specification_id": ( - assembly_specification.assembly_specification_id - ), "validation_error": str(e), "error_path": (list(e.absolute_path) if e.absolute_path else []), "schema_path": (list(e.schema_path) if e.schema_path else []), @@ -628,12 +646,7 @@ def _validate_assembled_data( except jsonschema.SchemaError as e: logger.error( "JSON schema is invalid", - extra={ - "assembly_specification_id": ( - assembly_specification.assembly_specification_id - ), - "schema_error": str(e), - }, + extra={"schema_error": str(e)}, ) raise ValueError( f"Invalid JSON schema in assembly specification: {e.message}" diff --git a/src/julee/contrib/ceap/use_cases/tests/test_extract_assemble_data.py b/src/julee/contrib/ceap/use_cases/tests/test_extract_assemble_data.py index 4fadb4f1..cef43697 100644 --- a/src/julee/contrib/ceap/use_cases/tests/test_extract_assemble_data.py +++ b/src/julee/contrib/ceap/use_cases/tests/test_extract_assemble_data.py @@ -26,12 +26,14 @@ ) from julee.contrib.ceap.domain.models.knowledge_service_config import ServiceApi from julee.contrib.ceap.use_cases import ExtractAssembleDataUseCase +from julee.repositories.http.schema import HttpRemoteSchemaRepository from julee.repositories.memory import ( MemoryAssemblyRepository, MemoryAssemblySpecificationRepository, MemoryDocumentRepository, MemoryKnowledgeServiceConfigRepository, MemoryKnowledgeServiceQueryRepository, + MemoryRemoteSchemaRepository, ) from julee.services.knowledge_service import QueryResult from julee.services.knowledge_service.memory import ( @@ -124,6 +126,11 @@ def configured_knowledge_service(self) -> MemoryKnowledgeService: ) return memory_service + @pytest.fixture + def remote_schema_repo(self) -> MemoryRemoteSchemaRepository: + """Create a memory RemoteSchemaRepository for testing.""" + return MemoryRemoteSchemaRepository() + @pytest.fixture def use_case( self, @@ -133,6 +140,7 @@ def use_case( knowledge_service_query_repo: MemoryKnowledgeServiceQueryRepository, knowledge_service_config_repo: MemoryKnowledgeServiceConfigRepository, knowledge_service: MemoryKnowledgeService, + remote_schema_repo: MemoryRemoteSchemaRepository, ) -> ExtractAssembleDataUseCase: """Create ExtractAssembleDataUseCase with memory repository dependencies.""" @@ -143,6 +151,7 @@ def use_case( knowledge_service_query_repo=knowledge_service_query_repo, knowledge_service_config_repo=knowledge_service_config_repo, knowledge_service=knowledge_service, + remote_schema_repo=remote_schema_repo, ) @pytest.fixture @@ -154,6 +163,7 @@ def configured_use_case( knowledge_service_query_repo: MemoryKnowledgeServiceQueryRepository, knowledge_service_config_repo: MemoryKnowledgeServiceConfigRepository, configured_knowledge_service: MemoryKnowledgeService, + remote_schema_repo: MemoryRemoteSchemaRepository, ) -> ExtractAssembleDataUseCase: """Create ExtractAssembleDataUseCase with configured knowledge service for full workflow tests.""" @@ -164,6 +174,7 @@ def configured_use_case( knowledge_service_query_repo=knowledge_service_query_repo, knowledge_service_config_repo=knowledge_service_config_repo, knowledge_service=configured_knowledge_service, + remote_schema_repo=remote_schema_repo, ) @pytest.mark.asyncio @@ -669,6 +680,7 @@ async def test_assembly_fails_with_invalid_json_schema( knowledge_service_query_repo=knowledge_service_query_repo, knowledge_service_config_repo=knowledge_service_config_repo, knowledge_service=memory_service, + remote_schema_repo=MemoryRemoteSchemaRepository(), ) # Act & Assert @@ -680,3 +692,68 @@ async def test_assembly_fails_with_invalid_json_schema( document_id="doc-123", assembly_specification_id="spec-123", ) + + +class TestResolveJsonSchema: + """Tests for ExtractAssembleDataUseCase._resolve_jsonschema.""" + + def _make_use_case(self, remote_schema_repo) -> ExtractAssembleDataUseCase: + return ExtractAssembleDataUseCase( + document_repo=MemoryDocumentRepository(), + assembly_repo=MemoryAssemblyRepository(), + assembly_specification_repo=MemoryAssemblySpecificationRepository(), + knowledge_service_query_repo=MemoryKnowledgeServiceQueryRepository(), + knowledge_service_config_repo=MemoryKnowledgeServiceConfigRepository(), + knowledge_service=AsyncMock(), + remote_schema_repo=remote_schema_repo, + ) + + @pytest.mark.asyncio + async def test_inline_schema_returned_unchanged(self) -> None: + """An inline schema dict is returned as-is without any HTTP calls.""" + schema = { + "type": "object", + "properties": {"x": {"type": "string"}}, + } + use_case = self._make_use_case(MemoryRemoteSchemaRepository()) + result = await use_case._resolve_jsonschema(schema) + assert result == schema + + @pytest.mark.asyncio + async def test_ref_schema_fetched_and_resolved(self, schema_server) -> None: + """A bare $ref is fetched over HTTP and the resolved schema is returned.""" + served = {"type": "object", "properties": {"y": {"type": "integer"}}} + url = schema_server.register("/schema.json", served) + use_case = self._make_use_case(HttpRemoteSchemaRepository()) + result = await use_case._resolve_jsonschema({"$ref": url}) + assert result == served + + @pytest.mark.asyncio + async def test_ref_with_fragment_extracts_sub_schema(self, schema_server) -> None: + """A $ref with a fragment extracts the target sub-schema and bundles + the parent $defs so internal $refs remain valid.""" + from julee.repositories.http.schema import HttpRemoteSchemaRepository + + full_schema = { + "$defs": { + "Address": { + "type": "object", + "properties": {"street": {"type": "string"}}, + }, + "Person": { + "type": "object", + "properties": { + "name": {"type": "string"}, + "address": {"$ref": "#/$defs/Address"}, + }, + }, + } + } + url = schema_server.register("/people.json", full_schema) + use_case = self._make_use_case(HttpRemoteSchemaRepository()) + result = await use_case._resolve_jsonschema({"$ref": f"{url}#/$defs/Person"}) + + assert result["type"] == "object" + assert "name" in result["properties"] + # Parent $defs must be bundled so the internal #/$defs/Address ref works + assert "Address" in result["$defs"] diff --git a/src/julee/repositories/http/__init__.py b/src/julee/repositories/http/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/julee/repositories/http/schema.py b/src/julee/repositories/http/schema.py new file mode 100644 index 00000000..fc2a9394 --- /dev/null +++ b/src/julee/repositories/http/schema.py @@ -0,0 +1,13 @@ +from typing import Any + +import httpx + +from julee.contrib.ceap.domain.repositories.remote_schema import RemoteSchemaRepository + + +class HttpRemoteSchemaRepository(RemoteSchemaRepository): + async def fetch(self, url: str) -> dict[str, Any]: + async with httpx.AsyncClient() as client: + response = await client.get(url) + response.raise_for_status() + return response.json() diff --git a/src/julee/repositories/memory/__init__.py b/src/julee/repositories/memory/__init__.py index 99f5d28a..f30675f0 100644 --- a/src/julee/repositories/memory/__init__.py +++ b/src/julee/repositories/memory/__init__.py @@ -19,6 +19,7 @@ from .knowledge_service_config import MemoryKnowledgeServiceConfigRepository from .knowledge_service_query import MemoryKnowledgeServiceQueryRepository from .policy import MemoryPolicyRepository +from .remote_schema import MemoryRemoteSchemaRepository __all__ = [ "MemoryAssemblyRepository", @@ -28,4 +29,5 @@ "MemoryKnowledgeServiceConfigRepository", "MemoryKnowledgeServiceQueryRepository", "MemoryPolicyRepository", + "MemoryRemoteSchemaRepository", ] diff --git a/src/julee/repositories/memory/remote_schema.py b/src/julee/repositories/memory/remote_schema.py new file mode 100644 index 00000000..9e380163 --- /dev/null +++ b/src/julee/repositories/memory/remote_schema.py @@ -0,0 +1,16 @@ +from typing import Any + +from julee.contrib.ceap.domain.repositories.remote_schema import RemoteSchemaRepository + + +class MemoryRemoteSchemaRepository(RemoteSchemaRepository): + def __init__(self, schemas: dict[str, dict] | None = None) -> None: + self._schemas: dict[str, dict] = schemas or {} + + def register(self, url: str, schema: dict) -> None: + self._schemas[url] = schema + + async def fetch(self, url: str) -> dict[str, Any]: + if url not in self._schemas: + raise ValueError(f"No schema registered for URL: {url}") + return dict(self._schemas[url]) diff --git a/src/julee/repositories/temporal/activities.py b/src/julee/repositories/temporal/activities.py index bdfe65da..a17a5c60 100644 --- a/src/julee/repositories/temporal/activities.py +++ b/src/julee/repositories/temporal/activities.py @@ -10,6 +10,7 @@ - Each repository type gets its own activity prefix """ +from julee.repositories.http.schema import HttpRemoteSchemaRepository from julee.repositories.minio.assembly import MinioAssemblyRepository from julee.repositories.minio.assembly_specification import ( MinioAssemblySpecificationRepository, @@ -37,6 +38,7 @@ KNOWLEDGE_SERVICE_CONFIG_ACTIVITY_BASE, KNOWLEDGE_SERVICE_QUERY_ACTIVITY_BASE, POLICY_ACTIVITY_BASE, + REMOTE_SCHEMA_ACTIVITY_BASE, ) from julee.util.temporal.decorators import temporal_activity_registration @@ -98,6 +100,13 @@ class TemporalMinioDocumentPolicyValidationRepository( pass +@temporal_activity_registration(REMOTE_SCHEMA_ACTIVITY_BASE) +class TemporalHttpRemoteSchemaRepository(HttpRemoteSchemaRepository): + """Temporal activity wrapper for HttpRemoteSchemaRepository.""" + + pass + + # Export the temporal repository classes for use in worker.py __all__ = [ "TemporalMinioAssemblyRepository", @@ -111,4 +120,5 @@ class TemporalMinioDocumentPolicyValidationRepository( "DOCUMENT_ACTIVITY_BASE", "KNOWLEDGE_SERVICE_CONFIG_ACTIVITY_BASE", "KNOWLEDGE_SERVICE_QUERY_ACTIVITY_BASE", + "TemporalHttpRemoteSchemaRepository", ] diff --git a/src/julee/repositories/temporal/activity_names.py b/src/julee/repositories/temporal/activity_names.py index 3f5ec1f2..ba80f076 100644 --- a/src/julee/repositories/temporal/activity_names.py +++ b/src/julee/repositories/temporal/activity_names.py @@ -20,6 +20,7 @@ KNOWLEDGE_SERVICE_QUERY_ACTIVITY_BASE = "julee.knowledge_service_query_repo.minio" POLICY_ACTIVITY_BASE = "julee.policy_repo.minio" DOCUMENT_POLICY_VALIDATION_ACTIVITY_BASE = "julee.document_policy_validation_repo.minio" +REMOTE_SCHEMA_ACTIVITY_BASE = "julee.remote_schema_repo.http" # Export all constants @@ -31,4 +32,5 @@ "KNOWLEDGE_SERVICE_QUERY_ACTIVITY_BASE", "POLICY_ACTIVITY_BASE", "DOCUMENT_POLICY_VALIDATION_ACTIVITY_BASE", + "REMOTE_SCHEMA_ACTIVITY_BASE", ] diff --git a/src/julee/repositories/temporal/proxies.py b/src/julee/repositories/temporal/proxies.py index 333c2431..fda266f8 100644 --- a/src/julee/repositories/temporal/proxies.py +++ b/src/julee/repositories/temporal/proxies.py @@ -26,6 +26,7 @@ KnowledgeServiceQueryRepository, ) from julee.contrib.ceap.domain.repositories.policy import PolicyRepository +from julee.contrib.ceap.domain.repositories.remote_schema import RemoteSchemaRepository # Import activity name bases from shared module from julee.repositories.temporal.activity_names import ( @@ -36,6 +37,7 @@ KNOWLEDGE_SERVICE_CONFIG_ACTIVITY_BASE, KNOWLEDGE_SERVICE_QUERY_ACTIVITY_BASE, POLICY_ACTIVITY_BASE, + REMOTE_SCHEMA_ACTIVITY_BASE, ) from julee.util.temporal.decorators import temporal_workflow_proxy @@ -149,6 +151,21 @@ class WorkflowDocumentPolicyValidationRepositoryProxy( pass +@temporal_workflow_proxy( + activity_base=REMOTE_SCHEMA_ACTIVITY_BASE, + default_timeout_seconds=30, + retry_methods=["fetch"], +) +class WorkflowRemoteSchemaRepositoryProxy(RemoteSchemaRepository): + """ + Workflow implementation of RemoteSchemaRepository that calls activities. + All methods are automatically generated by the @temporal_workflow_proxy + decorator. + """ + + pass + + # Export the workflow proxy classes __all__ = [ "WorkflowAssemblyRepositoryProxy", @@ -156,4 +173,5 @@ class WorkflowDocumentPolicyValidationRepositoryProxy( "WorkflowDocumentRepositoryProxy", "WorkflowKnowledgeServiceConfigRepositoryProxy", "WorkflowKnowledgeServiceQueryRepositoryProxy", + "WorkflowRemoteSchemaRepositoryProxy", ] diff --git a/uv.lock b/uv.lock index ed85a610..dad93f22 100644 --- a/uv.lock +++ b/uv.lock @@ -933,7 +933,7 @@ wheels = [ [[package]] name = "julee" -version = "0.1.14" +version = "0.1.16" source = { editable = "." } dependencies = [ { name = "anthropic" }, @@ -941,6 +941,7 @@ dependencies = [ { name = "fastapi" }, { name = "fastapi-pagination" }, { name = "griffe" }, + { name = "httpx" }, { name = "inflect" }, { name = "jinja2" }, { name = "jsonpointer" }, @@ -1001,6 +1002,7 @@ requires-dist = [ { name = "fastapi-pagination", specifier = ">=0.12.0" }, { name = "furo", marker = "extra == 'docs'", specifier = ">=2023.9.10" }, { name = "griffe", specifier = ">=1.0.0,<2" }, + { name = "httpx", specifier = ">=0.27.0" }, { name = "hypothesis", marker = "extra == 'dev'", specifier = ">=6.0.0" }, { name = "inflect", specifier = ">=7.5.0" }, { name = "jinja2", specifier = ">=3.0.0" },