From 60fedb7b9399dea5185be2f57fc99bf241ed1536 Mon Sep 17 00:00:00 2001 From: Michael Nelson Date: Wed, 25 Feb 2026 09:51:20 +1100 Subject: [PATCH 1/6] polling: Add polling service handler and Acknowledgement model ADR003 --- .../polling/domain/services/__init__.py | 12 -- .../domain/services/polling_result_handler.py | 71 ++++++++++ src/julee/core/__init__.py | 0 src/julee/core/entities/__init__.py | 0 src/julee/core/entities/acknowledgement.py | 123 ++++++++++++++++++ 5 files changed, 194 insertions(+), 12 deletions(-) create mode 100644 src/julee/contrib/polling/domain/services/polling_result_handler.py create mode 100644 src/julee/core/__init__.py create mode 100644 src/julee/core/entities/__init__.py create mode 100644 src/julee/core/entities/acknowledgement.py diff --git a/src/julee/contrib/polling/domain/services/__init__.py b/src/julee/contrib/polling/domain/services/__init__.py index 71ef6401..e69de29b 100644 --- a/src/julee/contrib/polling/domain/services/__init__.py +++ b/src/julee/contrib/polling/domain/services/__init__.py @@ -1,12 +0,0 @@ -""" -Polling domain services. - -This module contains the service protocols for the polling contrib module. - -No re-exports to avoid import chains that pull non-deterministic code -into Temporal workflows. Import directly from specific modules: - -- from julee.contrib.polling.domain.services.poller import PollerService -""" - -__all__ = [] diff --git a/src/julee/contrib/polling/domain/services/polling_result_handler.py b/src/julee/contrib/polling/domain/services/polling_result_handler.py new file mode 100644 index 00000000..e7f6e8d1 --- /dev/null +++ b/src/julee/contrib/polling/domain/services/polling_result_handler.py @@ -0,0 +1,71 @@ +""" +PollingResultHandler protocol for cross-bounded-context polling orchestration. + +This module defines the PollingResultHandler protocol that enables cross-BC +coordination when new data is detected during polling operations. Following +ADR 003, this handler accepts domain-relevant arguments and allows the +solution provider to decide what happens with newly detected data. + +The polling system recognizes the condition (new data detected) and hands off +to the handler without knowing what the handler does - this is the +"green-dotted-egg-handler" principle. +""" + +from typing import Protocol, runtime_checkable + +from julee.core.entities.acknowledgement import Acknowledgement + + +@runtime_checkable +class PollingResultHandler(Protocol): + """ + Handler for new data detected during polling operations. + + This protocol enables cross-bounded-context orchestration by allowing + polling systems to hand off newly detected data to solution-specific + processing without knowing what that processing entails. + + Handlers may implement any orchestration pattern: + - Start Temporal workflows + - Queue messages + - Trigger use cases directly + - Log and notify + - Complex multi-step processing + + The handler signature uses primitives (str, bytes) since this is a + cross-BC interface and bounded contexts don't share domain types. + """ + + async def handle_new_data( + self, + endpoint_id: str, + previous_data: bytes | None, + new_data: bytes, + content_hash: str, + ) -> Acknowledgement: + """ + Handle newly detected data from a polling operation. + + This method is called when the polling system detects that data at + an endpoint has changed. The handler decides what to do with this + information - whether to start processing workflows, queue work, + send notifications, or any other domain-specific action. + + Args: + endpoint_id: Unique identifier for the polled endpoint + previous_data: Previous content (None if this is the first polling run) + new_data: New content that was detected as different from previous + content_hash: SHA256 hash of the new content for deduplication/tracking + + Returns: + Acknowledgement indicating handler response: + - wilco: Handler will process the new data + - unable: Handler cannot process (resource constraints, invalid state, etc.) + - roger: Handler acknowledges but makes no processing commitment + + Raises: + Exception: Handlers may raise exceptions for critical failures, + but should prefer returning Acknowledgement.unable() with + error details to avoid failing the polling workflow. + """ + ... diff --git a/src/julee/core/__init__.py b/src/julee/core/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/julee/core/entities/__init__.py b/src/julee/core/entities/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/julee/core/entities/acknowledgement.py b/src/julee/core/entities/acknowledgement.py new file mode 100644 index 00000000..449dfe4b --- /dev/null +++ b/src/julee/core/entities/acknowledgement.py @@ -0,0 +1,123 @@ +""" +Acknowledgement entity for handler service responses. + +This module implements the Acknowledgement entity as specified in ADR 003, +providing radio communication semantics (wilco/unable/roger) for handler +service responses. +""" + +from pydantic import BaseModel, Field + + +class Acknowledgement(BaseModel): + """ + Acknowledgement response from handler services using radio communication semantics. + + Following radio communication conventions: + - **Wilco** ("will comply"): Handler accepts and will process + - **Unable**: Handler cannot comply (resource constraints, invalid state, etc.) + - **Roger** ("received"): Handler acknowledges receipt but makes no commitment + about whether it will act - the wilco/unable distinction is not provided + """ + + will_comply: bool | None = Field( + default=None, + description="None = roger (no commitment), True = wilco (will process), False = unable (cannot process)", + ) + info: list[str] = Field( + default_factory=list, + description="Informational messages about handler processing", + ) + + @classmethod + def wilco( + cls, + info: list[str] | None = None, + ) -> "Acknowledgement": + """ + Will comply - handler accepts and will process. + + Args: + info: Optional informational messages + + Returns: + Acknowledgement with will_comply=True + """ + return cls( + will_comply=True, + info=info or [], + ) + + @classmethod + def unable( + cls, + info: list[str] | None = None, + ) -> "Acknowledgement": + """ + Unable to comply - handler cannot process. + + Args: + info: Optional informational messages explaining why handler cannot comply + + Returns: + Acknowledgement with will_comply=False + """ + return cls( + will_comply=False, + info=info or [], + ) + + @classmethod + def roger( + cls, + info: list[str] | None = None, + ) -> "Acknowledgement": + """ + Received - acknowledged, no commitment about action. + + Handler acknowledges receipt but makes no commitment about whether + it will act. This is appropriate when the handler logs or records + the request but doesn't guarantee processing. + + Args: + info: Optional informational messages + + Returns: + Acknowledgement with will_comply=None + """ + return cls( + will_comply=None, + info=info or [], + ) + + @property + def is_wilco(self) -> bool: + """True if handler will comply (will_comply=True).""" + return self.will_comply is True + + @property + def is_unable(self) -> bool: + """True if handler is unable to comply (will_comply=False).""" + return self.will_comply is False + + @property + def is_roger(self) -> bool: + """True if handler acknowledged with no commitment (will_comply=None).""" + return self.will_comply is None + + @property + def has_info(self) -> bool: + """True if acknowledgement contains any informational messages.""" + return len(self.info) > 0 + + def __str__(self) -> str: + """String representation showing acknowledgement type and message counts.""" + ack_type = "WILCO" if self.is_wilco else "UNABLE" if self.is_unable else "ROGER" + + if self.info: + return f"{ack_type} ({len(self.info)} info)" + return ack_type + + def __repr__(self) -> str: + """Detailed representation for debugging.""" + return f"Acknowledgement(will_comply={self.will_comply}, info={len(self.info)})" From 2b32515b3ec31f4cbd73c0904f60698d3f484ab9 Mon Sep 17 00:00:00 2001 From: Michael Nelson Date: Wed, 25 Feb 2026 10:18:49 +1100 Subject: [PATCH 2/6] polling: Update polling pipeline to use handler. --- .../contrib/polling/apps/worker/pipelines.py | 127 +++++++----------- 1 file changed, 46 insertions(+), 81 deletions(-) diff --git a/src/julee/contrib/polling/apps/worker/pipelines.py b/src/julee/contrib/polling/apps/worker/pipelines.py index 8375af1e..5c893548 100644 --- a/src/julee/contrib/polling/apps/worker/pipelines.py +++ b/src/julee/contrib/polling/apps/worker/pipelines.py @@ -11,9 +11,11 @@ from typing import Any from temporalio import workflow -from temporalio.workflow import ParentClosePolicy from julee.contrib.polling.domain.models.polling_config import PollingConfig +from julee.contrib.polling.domain.services.polling_result_handler import ( + PollingResultHandler, +) from julee.contrib.polling.infrastructure.temporal.proxies import ( WorkflowPollerServiceProxy, ) @@ -29,17 +31,18 @@ class NewDataDetectionPipeline: This workflow: 1. Polls an endpoint using the configured polling service 2. Compares result with previous completion to detect changes - 3. Triggers downstream processing when new data is detected + 3. Hands off to result handler when new data is detected 4. Returns completion result for next scheduled execution The workflow uses Temporal's schedule last completion result feature to automatically receive the previous execution's result for comparison. """ - def __init__(self) -> None: + def __init__(self, result_handler: PollingResultHandler) -> None: self.current_step = "initialized" self.endpoint_id: str | None = None self.has_new_data: bool = False + self._result_handler = result_handler @workflow.query def get_current_step(self) -> str: @@ -56,73 +59,22 @@ def get_has_new_data(self) -> bool: """Query method to check if new data was detected.""" return self.has_new_data - async def trigger_downstream_pipeline( - self, - downstream_pipeline: str, - previous_data: bytes | None, - new_data: bytes, - ) -> bool: - """ - Trigger downstream pipeline workflow. - - Args: - downstream_pipeline: Name of the downstream workflow to trigger - previous_data: Previous content (None if first run) - new_data: New content that was detected - - Returns: - True if successfully triggered, False otherwise - """ - try: - # Start child workflow for downstream processing with abandon policy - await workflow.start_child_workflow( - downstream_pipeline, # This would be the workflow class name - args=[previous_data, new_data], - id=f"downstream-{self.endpoint_id}-{workflow.info().workflow_id}", - task_queue="downstream-processing-queue", - parent_close_policy=ParentClosePolicy.ABANDON, - ) - - workflow.logger.info( - "Downstream pipeline triggered successfully", - extra={ - "endpoint_id": self.endpoint_id, - "downstream_pipeline": downstream_pipeline, - }, - ) - return True - - except Exception as e: - workflow.logger.error( - "Failed to trigger downstream pipeline", - extra={ - "endpoint_id": self.endpoint_id, - "downstream_pipeline": downstream_pipeline, - "error": str(e), - "error_type": type(e).__name__, - }, - ) - # Don't fail the polling workflow if downstream trigger fails - return False - @workflow.run async def run( self, config: PollingConfig | dict[str, Any], - downstream_pipeline: str | None = None, ) -> dict[str, Any]: """ Execute the new data detection workflow. Args: config: Configuration for the polling operation (PollingConfig or dict from schedule) - downstream_pipeline: Optional pipeline to trigger when new data detected Returns: Completion result containing polling result and detection metadata Raises: - RuntimeError: If polling or downstream processing fails after retries + RuntimeError: If polling fails after retries """ # Convert dict to PollingConfig if needed (for schedule compatibility) # Temporal schedules serialize arguments as dicts, not Pydantic models @@ -186,21 +138,20 @@ async def run( f"previous_hash: {previous_hash[:8] if previous_hash else 'None'}..." ) - # Step 3: Trigger downstream processing if new data detected - downstream_triggered = False - if has_new_data and downstream_pipeline: - self.current_step = "triggering_downstream" + # Step 3: Hand off to result handler if new data detected + handler_acknowledgement = None + if has_new_data: + self.current_step = "handling_new_data" workflow.logger.info( - "Triggering downstream pipeline", + "Handing off new data to result handler", extra={ "endpoint_id": self.endpoint_id, - "downstream_pipeline": downstream_pipeline, "content_length": len(current_content), }, ) - # Get previous data for comparison + # Get previous data for handler previous_data = None if previous_completion and "polling_result" in previous_completion: prev_content_str = previous_completion["polling_result"].get( @@ -211,7 +162,7 @@ async def run( previous_data = prev_content_str.encode("utf-8") except (UnicodeDecodeError, AttributeError) as e: workflow.logger.error( - "Failed to decode previous content for downstream pipeline", + "Failed to decode previous content for handler", extra={ "endpoint_id": self.endpoint_id, "error": str(e), @@ -221,24 +172,38 @@ async def run( raise RuntimeError( f"Previous content is corrupted or invalid: {e}" ) - elif previous_hash: - # We have previous run but no content - this is an error - workflow.logger.error( - "Previous content not available for downstream pipeline but previous hash exists", - extra={ - "endpoint_id": self.endpoint_id, - "previous_hash": previous_hash, - }, - ) - raise RuntimeError( - "Previous content is missing from completion result but is required for downstream pipeline" + + try: + handler_acknowledgement = ( + await self._result_handler.handle_new_data( + endpoint_id=self.endpoint_id, + previous_data=previous_data, + new_data=current_content, + content_hash=current_hash, ) + ) - downstream_triggered = await self.trigger_downstream_pipeline( - downstream_pipeline, - previous_data, - current_content, - ) + # Log handler response + workflow.logger.info( + f"Handler response: {handler_acknowledgement}", + extra={ + "endpoint_id": self.endpoint_id, + "handler_response": str(handler_acknowledgement), + "handler_info": handler_acknowledgement.info, + }, + ) + + except Exception as e: + workflow.logger.error( + "Handler failed to process new data", + extra={ + "endpoint_id": self.endpoint_id, + "error": str(e), + "error_type": type(e).__name__, + }, + ) + # Don't fail the polling workflow if handler fails + # handler_acknowledgement remains None to indicate handler exception self.current_step = "completed" @@ -256,7 +221,7 @@ async def run( "previous_hash": previous_hash, "current_hash": current_hash, }, - "downstream_triggered": downstream_triggered, + "handler_acknowledgement": handler_acknowledgement, "endpoint_id": self.endpoint_id, "completed_at": workflow.now().isoformat(), } @@ -266,7 +231,7 @@ async def run( extra={ "endpoint_id": self.endpoint_id, "has_new_data": has_new_data, - "downstream_triggered": downstream_triggered, + "handler_acknowledgement": handler_acknowledgement, }, ) From d4b75b9d8065efc37de1d3eb728b4e6ec27369bc Mon Sep 17 00:00:00 2001 From: Michael Nelson Date: Wed, 25 Feb 2026 10:48:03 +1100 Subject: [PATCH 3/6] polling: Bump julee version. --- pyproject.toml | 2 +- src/julee/__init__.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 239bd3cb..aa7f7090 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "julee" -version = "0.1.9" +version = "0.1.10" description = "Julee - Clean architecture for accountable and transparent digital supply chains" readme = "README.md" requires-python = ">=3.11" diff --git a/src/julee/__init__.py b/src/julee/__init__.py index ea2600f2..fa7ae3c9 100644 --- a/src/julee/__init__.py +++ b/src/julee/__init__.py @@ -1,3 +1,3 @@ """Julee - Clean architecture for accountable and transparent digital supply chains.""" -__version__ = "0.1.9" +__version__ = "0.1.10" From d08436c6938c4d31c3663f75b0b7526423cc2931 Mon Sep 17 00:00:00 2001 From: Michael Nelson Date: Thu, 26 Feb 2026 15:46:52 +1100 Subject: [PATCH 4/6] polling: extract PollDataUseCase and replace handler injection with get_handler() NewDataDetectionPipeline previously required result_handler via __init__, which Temporal cannot satisfy when instantiating workflow classes. Extract polling, change-detection, and handler coordination to PollDataUseCase. Subclasses override get_handler() to supply a handler without constructor injection. PollingManager.start_polling gains a workflow_name parameter (replacing the unused downstream_pipeline arg) to enable subclass-based routing. --- .../contrib/polling/apps/worker/pipelines.py | 162 ++++-------------- .../infrastructure/temporal/manager.py | 16 +- .../contrib/polling/use_cases/__init__.py | 0 .../contrib/polling/use_cases/poll_data.py | 133 ++++++++++++++ 4 files changed, 173 insertions(+), 138 deletions(-) create mode 100644 src/julee/contrib/polling/use_cases/__init__.py create mode 100644 src/julee/contrib/polling/use_cases/poll_data.py diff --git a/src/julee/contrib/polling/apps/worker/pipelines.py b/src/julee/contrib/polling/apps/worker/pipelines.py index 5c893548..1d0016f2 100644 --- a/src/julee/contrib/polling/apps/worker/pipelines.py +++ b/src/julee/contrib/polling/apps/worker/pipelines.py @@ -6,7 +6,6 @@ and reliable execution for endpoint polling and change detection. """ -import hashlib import logging from typing import Any @@ -19,6 +18,7 @@ from julee.contrib.polling.infrastructure.temporal.proxies import ( WorkflowPollerServiceProxy, ) +from julee.contrib.polling.use_cases.poll_data import PollDataRequest, PollDataUseCase logger = logging.getLogger(__name__) @@ -36,13 +36,24 @@ class NewDataDetectionPipeline: The workflow uses Temporal's schedule last completion result feature to automatically receive the previous execution's result for comparison. + + Subclasses override get_handler() to supply the appropriate handler + for each polling use case (credential, product, etc.). """ - def __init__(self, result_handler: PollingResultHandler) -> None: + def __init__(self) -> None: self.current_step = "initialized" self.endpoint_id: str | None = None self.has_new_data: bool = False - self._result_handler = result_handler + + def get_handler(self) -> PollingResultHandler | None: + """ + Return the PollingResultHandler for this pipeline. + + Subclasses override this method to provide a handler. + The base implementation returns None (detect only, no handoff). + """ + return None @workflow.query def get_current_step(self) -> str: @@ -68,7 +79,8 @@ async def run( Execute the new data detection workflow. Args: - config: Configuration for the polling operation (PollingConfig or dict from schedule) + config: Configuration for the polling operation (PollingConfig or dict + from Temporal schedule serialisation) Returns: Completion result containing polling result and detection metadata @@ -76,14 +88,12 @@ async def run( Raises: RuntimeError: If polling fails after retries """ - # Convert dict to PollingConfig if needed (for schedule compatibility) - # Temporal schedules serialize arguments as dicts, not Pydantic models + # Convert dict to PollingConfig if needed (Temporal schedules serialise args as dicts) if isinstance(config, dict): config = PollingConfig.model_validate(config) self.endpoint_id = config.endpoint_identifier - # Fetch previous completion result from Temporal previous_completion = workflow.get_last_completion_result() workflow.logger.info( @@ -100,142 +110,33 @@ async def run( self.current_step = "polling_endpoint" try: - # Step 1: Poll the endpoint - polling_service = WorkflowPollerServiceProxy() - polling_result = await polling_service.poll_endpoint(config) - - # Extract the timestamp from when polling actually happened - polled_at = polling_result.polled_at.isoformat() - - workflow.logger.debug( - "Polling completed", - extra={ - "endpoint_id": self.endpoint_id, - "polling_success": polling_result.success, - "content_length": len(polling_result.content), - }, + request = PollDataRequest( + config=config, + previous_completion=previous_completion, ) - - self.current_step = "detecting_changes" - - # Step 2: Detect new data using hash comparison - current_content = polling_result.content - current_hash = hashlib.sha256(current_content).hexdigest() - - previous_hash = None - if previous_completion and "polling_result" in previous_completion: - previous_hash = previous_completion["polling_result"].get( - "content_hash" - ) - - has_new_data = previous_hash != current_hash - self.has_new_data = has_new_data - - workflow.logger.info( - f"DEBUG: Change detection - has_new_data: {has_new_data}, " - f"is_first_run: {previous_hash is None}, " - f"current_hash: {current_hash[:8]}..., " - f"previous_hash: {previous_hash[:8] if previous_hash else 'None'}..." + use_case = PollDataUseCase( + poller=WorkflowPollerServiceProxy(), + handler=self.get_handler(), ) + result = await use_case.execute(request) - # Step 3: Hand off to result handler if new data detected - handler_acknowledgement = None - if has_new_data: - self.current_step = "handling_new_data" - - workflow.logger.info( - "Handing off new data to result handler", - extra={ - "endpoint_id": self.endpoint_id, - "content_length": len(current_content), - }, - ) - - # Get previous data for handler - previous_data = None - if previous_completion and "polling_result" in previous_completion: - prev_content_str = previous_completion["polling_result"].get( - "content" - ) - if prev_content_str: - try: - previous_data = prev_content_str.encode("utf-8") - except (UnicodeDecodeError, AttributeError) as e: - workflow.logger.error( - "Failed to decode previous content for handler", - extra={ - "endpoint_id": self.endpoint_id, - "error": str(e), - "error_type": type(e).__name__, - }, - ) - raise RuntimeError( - f"Previous content is corrupted or invalid: {e}" - ) - - try: - handler_acknowledgement = ( - await self._result_handler.handle_new_data( - endpoint_id=self.endpoint_id, - previous_data=previous_data, - new_data=current_content, - content_hash=current_hash, - ) - ) - - # Log handler response - workflow.logger.info( - f"Handler response: {handler_acknowledgement}", - extra={ - "endpoint_id": self.endpoint_id, - "handler_response": str(handler_acknowledgement), - "handler_info": handler_acknowledgement.info, - }, - ) - - except Exception as e: - workflow.logger.error( - "Handler failed to process new data", - extra={ - "endpoint_id": self.endpoint_id, - "error": str(e), - "error_type": type(e).__name__, - }, - ) - # Don't fail the polling workflow if handler fails - # handler_acknowledgement remains None to indicate handler exception - + self.endpoint_id = result.get("endpoint_id", self.endpoint_id) + self.has_new_data = result.get("detection_result", {}).get( + "has_new_data", False + ) self.current_step = "completed" - # Step 4: Return completion result for next scheduled execution - completion_result = { - "polling_result": { - "success": polling_result.success, - "content_hash": current_hash, - "content": current_content.decode("utf-8", errors="ignore"), - "polled_at": polled_at, - "content_length": len(current_content), - }, - "detection_result": { - "has_new_data": has_new_data, - "previous_hash": previous_hash, - "current_hash": current_hash, - }, - "handler_acknowledgement": handler_acknowledgement, - "endpoint_id": self.endpoint_id, - "completed_at": workflow.now().isoformat(), - } + result["completed_at"] = workflow.now().isoformat() workflow.logger.info( "New data detection pipeline completed successfully", extra={ "endpoint_id": self.endpoint_id, - "has_new_data": has_new_data, - "handler_acknowledgement": handler_acknowledgement, + "has_new_data": self.has_new_data, }, ) - return completion_result + return result except Exception as e: self.current_step = "failed" @@ -251,5 +152,4 @@ async def run( exc_info=True, ) - # Re-raise to let Temporal handle retry logic raise diff --git a/src/julee/contrib/polling/infrastructure/temporal/manager.py b/src/julee/contrib/polling/infrastructure/temporal/manager.py index 2f82d77d..efb0ac5b 100644 --- a/src/julee/contrib/polling/infrastructure/temporal/manager.py +++ b/src/julee/contrib/polling/infrastructure/temporal/manager.py @@ -78,7 +78,7 @@ async def start_polling( endpoint_id: str, config: PollingConfig, interval_seconds: int, - downstream_pipeline: str | None = None, + workflow_name: str = "NewDataDetectionPipeline", ) -> str: """ Start polling an HTTP endpoint at regular intervals. @@ -87,7 +87,9 @@ async def start_polling( endpoint_id: Unique identifier for this polling operation config: Configuration for the polling operation interval_seconds: How often to poll (in seconds) - downstream_pipeline: Optional pipeline to trigger when new data detected + workflow_name: Name of the Temporal workflow to schedule. + Defaults to "NewDataDetectionPipeline"; override + to use a subclass registered under a different name. Returns: Schedule ID that was created @@ -113,8 +115,8 @@ async def start_polling( schedule = Schedule( action=ScheduleActionStartWorkflow( - "NewDataDetectionPipeline", - args=[config, downstream_pipeline], + workflow_name, + args=[config], id=f"{schedule_id}-{{.timestamp}}", task_queue=self._task_queue, ), @@ -156,7 +158,7 @@ async def update_schedule_callback( "schedule_id": schedule_id, "config": config, "interval_seconds": interval_seconds, - "downstream_pipeline": downstream_pipeline, + "workflow_name": workflow_name, } return schedule_id @@ -209,7 +211,7 @@ async def list_active_polling(self) -> list[dict[str, Any]]: "interval_seconds": poll_info["interval_seconds"], "endpoint_identifier": poll_info["config"].endpoint_identifier, "polling_protocol": poll_info["config"].polling_protocol.value, - "downstream_pipeline": poll_info.get("downstream_pipeline"), + "workflow_name": poll_info.get("workflow_name"), } ) @@ -246,7 +248,7 @@ async def get_polling_status(self, endpoint_id: str) -> dict[str, Any] | None: "schedule_id": schedule_id, "interval_seconds": poll_info["interval_seconds"], "is_paused": schedule_description.schedule.state.paused, - "downstream_pipeline": poll_info.get("downstream_pipeline"), + "workflow_name": poll_info.get("workflow_name"), } async def pause_polling(self, endpoint_id: str) -> bool: diff --git a/src/julee/contrib/polling/use_cases/__init__.py b/src/julee/contrib/polling/use_cases/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/julee/contrib/polling/use_cases/poll_data.py b/src/julee/contrib/polling/use_cases/poll_data.py new file mode 100644 index 00000000..2f4b8a4e --- /dev/null +++ b/src/julee/contrib/polling/use_cases/poll_data.py @@ -0,0 +1,133 @@ +""" +PollDataUseCase — generic polling and new-data detection. + +This module contains the pure business logic for polling an endpoint +and detecting whether its content has changed since the last run. +It has no knowledge of Temporal, workflows, or application infrastructure. +""" + +import hashlib +import logging + +from pydantic import BaseModel + +from julee.contrib.polling.domain.models.polling_config import PollingConfig +from julee.contrib.polling.domain.services.poller import PollerService +from julee.contrib.polling.domain.services.polling_result_handler import ( + PollingResultHandler, +) + +logger = logging.getLogger(__name__) + + +class PollDataRequest(BaseModel): + """Input for PollDataUseCase.""" + + config: PollingConfig + previous_completion: dict | None = None + + +class PollDataUseCase: + """ + Use case for polling an endpoint and detecting new data. + + Responsibilities: + 1. Poll the endpoint via the injected PollerService + 2. Compute the SHA-256 hash of the response content + 3. Compare with the previous run's hash (from previous_completion) + 4. If content has changed, delegate to the optional PollingResultHandler + 5. Return a completion result dict in the same shape that + NewDataDetectionPipeline returns, so Temporal schedule + last-completion-result works unchanged. + """ + + def __init__( + self, + poller: PollerService, + handler: PollingResultHandler | None = None, + ) -> None: + self._poller = poller + self._handler = handler + + async def execute(self, request: PollDataRequest) -> dict: + """ + Execute the poll-and-detect use case. + + Args: + request: PollDataRequest containing polling config and + the previous run's completion result (may be None + for the first run). + + Returns: + Completion result dict with keys: + polling_result, detection_result, handler_acknowledgement, + endpoint_id + """ + config = request.config + endpoint_id = config.endpoint_identifier + + # Step 1: Poll the endpoint + polling_result = await self._poller.poll_endpoint(config) + polled_at = polling_result.polled_at.isoformat() + + # Step 2: Hash current content + current_content = polling_result.content + current_hash = hashlib.sha256(current_content).hexdigest() + + # Step 3: Extract previous hash + previous_hash: str | None = None + if request.previous_completion and "polling_result" in request.previous_completion: + previous_hash = request.previous_completion["polling_result"].get( + "content_hash" + ) + + # Step 4: Detect change + has_new_data = previous_hash != current_hash + + # Step 5: Invoke handler if new data detected + handler_acknowledgement = None + if has_new_data and self._handler is not None: + previous_data: bytes | None = None + if request.previous_completion and "polling_result" in request.previous_completion: + prev_content_str = request.previous_completion["polling_result"].get( + "content" + ) + if prev_content_str: + previous_data = prev_content_str.encode("utf-8") + + try: + handler_acknowledgement = await self._handler.handle_new_data( + endpoint_id, + previous_data, + current_content, + current_hash, + ) + except Exception as e: + logger.error( + "Handler raised an exception; continuing without ack", + extra={ + "endpoint_id": endpoint_id, + "error": str(e), + "error_type": type(e).__name__, + }, + exc_info=True, + ) + # handler_acknowledgement stays None to signal the exception + + # Step 6: Return completion result + return { + "polling_result": { + "success": polling_result.success, + "content_hash": current_hash, + "content": current_content.decode("utf-8", errors="ignore"), + "polled_at": polled_at, + "content_length": len(current_content), + }, + "detection_result": { + "has_new_data": has_new_data, + "previous_hash": previous_hash, + "current_hash": current_hash, + }, + "handler_acknowledgement": handler_acknowledgement, + "endpoint_id": endpoint_id, + } From 7a6895b7cad95530a27c9f7026794eee1c47c6c5 Mon Sep 17 00:00:00 2001 From: Michael Nelson Date: Thu, 26 Feb 2026 15:50:32 +1100 Subject: [PATCH 5/6] Lint --- src/julee/contrib/polling/use_cases/poll_data.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/src/julee/contrib/polling/use_cases/poll_data.py b/src/julee/contrib/polling/use_cases/poll_data.py index 2f4b8a4e..18c206e6 100644 --- a/src/julee/contrib/polling/use_cases/poll_data.py +++ b/src/julee/contrib/polling/use_cases/poll_data.py @@ -76,7 +76,10 @@ async def execute(self, request: PollDataRequest) -> dict: # Step 3: Extract previous hash previous_hash: str | None = None - if request.previous_completion and "polling_result" in request.previous_completion: + if ( + request.previous_completion + and "polling_result" in request.previous_completion + ): previous_hash = request.previous_completion["polling_result"].get( "content_hash" ) @@ -88,7 +91,10 @@ async def execute(self, request: PollDataRequest) -> dict: handler_acknowledgement = None if has_new_data and self._handler is not None: previous_data: bytes | None = None - if request.previous_completion and "polling_result" in request.previous_completion: + if ( + request.previous_completion + and "polling_result" in request.previous_completion + ): prev_content_str = request.previous_completion["polling_result"].get( "content" ) From 8b485e5de3853446e51046fe32dff1d80e4f3250 Mon Sep 17 00:00:00 2001 From: Michael Nelson Date: Mon, 2 Mar 2026 15:22:30 +1100 Subject: [PATCH 6/6] polling: require polling data analyzer separate from handler. --- .../contrib/polling/apps/worker/pipelines.py | 27 ++++++---- .../domain/services/new_data_analyzer.py | 50 +++++++++++++++++++ .../domain/services/polling_result_handler.py | 26 ++++++---- .../contrib/polling/use_cases/poll_data.py | 42 ++++++++-------- 4 files changed, 103 insertions(+), 42 deletions(-) create mode 100644 src/julee/contrib/polling/domain/services/new_data_analyzer.py diff --git a/src/julee/contrib/polling/apps/worker/pipelines.py b/src/julee/contrib/polling/apps/worker/pipelines.py index 1d0016f2..25ca0551 100644 --- a/src/julee/contrib/polling/apps/worker/pipelines.py +++ b/src/julee/contrib/polling/apps/worker/pipelines.py @@ -7,11 +7,13 @@ """ import logging +from abc import abstractmethod from typing import Any from temporalio import workflow from julee.contrib.polling.domain.models.polling_config import PollingConfig +from julee.contrib.polling.domain.services.new_data_analyzer import NewDataAnalyzer from julee.contrib.polling.domain.services.polling_result_handler import ( PollingResultHandler, ) @@ -31,14 +33,15 @@ class NewDataDetectionPipeline: This workflow: 1. Polls an endpoint using the configured polling service 2. Compares result with previous completion to detect changes - 3. Hands off to result handler when new data is detected - 4. Returns completion result for next scheduled execution + 3. Runs the analyzer to identify new item IDs (if provided) + 4. Hands off to result handler when new data is detected + 5. Returns completion result for next scheduled execution The workflow uses Temporal's schedule last completion result feature to automatically receive the previous execution's result for comparison. - Subclasses override get_handler() to supply the appropriate handler - for each polling use case (credential, product, etc.). + Subclasses must implement get_handler() and get_analyzer() to supply the + appropriate objects for each polling use case (credential, product, etc.). """ def __init__(self) -> None: @@ -46,14 +49,15 @@ def __init__(self) -> None: self.endpoint_id: str | None = None self.has_new_data: bool = False - def get_handler(self) -> PollingResultHandler | None: - """ - Return the PollingResultHandler for this pipeline. + @abstractmethod + def get_handler(self) -> PollingResultHandler: + """Return the PollingResultHandler for this pipeline.""" + ... - Subclasses override this method to provide a handler. - The base implementation returns None (detect only, no handoff). - """ - return None + @abstractmethod + def get_analyzer(self) -> NewDataAnalyzer: + """Return the NewDataAnalyzer for this pipeline.""" + ... @workflow.query def get_current_step(self) -> str: @@ -117,6 +121,7 @@ async def run( use_case = PollDataUseCase( poller=WorkflowPollerServiceProxy(), handler=self.get_handler(), + analyzer=self.get_analyzer(), ) result = await use_case.execute(request) diff --git a/src/julee/contrib/polling/domain/services/new_data_analyzer.py b/src/julee/contrib/polling/domain/services/new_data_analyzer.py new file mode 100644 index 00000000..9a482480 --- /dev/null +++ b/src/julee/contrib/polling/domain/services/new_data_analyzer.py @@ -0,0 +1,50 @@ +""" +NewDataAnalyzer protocol for converting raw polling bytes into item IDs. + +This protocol is the counterpart to PollingResultHandler. Where the handler +decides what to *do* when new items are found, the analyzer decides *what* +items are new by comparing previous and current polling payloads. + +Separating analysis from handling keeps each concern to a single class and +allows the NewDataDetectionPipeline to complete data→domain translation +before any application-level dispatch occurs. +""" + +from typing import Protocol, runtime_checkable + + +@runtime_checkable +class NewDataAnalyzer(Protocol): + """ + Converts raw polling bytes into a list of new item identifiers. + + The analyzer is responsible for: + - Parsing the raw bytes from the polling response + - Comparing with the previous response (if any) + - Returning the IDs of items that are new or changed + + The returned IDs are opaque strings from the polling system's perspective. + Their meaning is defined by the bounded context that provides the analyzer. + """ + + async def identify_new_items( + self, + previous_data: bytes | None, + new_data: bytes, + ) -> list[str]: + """ + Identify items that are new or changed since the previous poll. + + Args: + previous_data: Previous polling response content. + None if this is the first polling run. + new_data: Current polling response content. + + Returns: + List of item identifier strings that are new or changed. + Empty list if nothing is new. + + Raises: + ValueError: If the data cannot be parsed or is in an unexpected format. + """ + ... diff --git a/src/julee/contrib/polling/domain/services/polling_result_handler.py b/src/julee/contrib/polling/domain/services/polling_result_handler.py index e7f6e8d1..20d970a0 100644 --- a/src/julee/contrib/polling/domain/services/polling_result_handler.py +++ b/src/julee/contrib/polling/domain/services/polling_result_handler.py @@ -9,6 +9,10 @@ The polling system recognizes the condition (new data detected) and hands off to the handler without knowing what the handler does - this is the "green-dotted-egg-handler" principle. + +By the time handle_new_data() is called, the NewDataDetectionPipeline has +already translated raw bytes into item IDs via the NewDataAnalyzer. Handlers +therefore work with structured identifiers, not raw content. """ from typing import Protocol, runtime_checkable @@ -22,7 +26,7 @@ class PollingResultHandler(Protocol): Handler for new data detected during polling operations. This protocol enables cross-bounded-context orchestration by allowing - polling systems to hand off newly detected data to solution-specific + polling systems to hand off newly detected item IDs to solution-specific processing without knowing what that processing entails. Handlers may implement any orchestration pattern: @@ -32,29 +36,29 @@ class PollingResultHandler(Protocol): - Log and notify - Complex multi-step processing - The handler signature uses primitives (str, bytes) since this is a - cross-BC interface and bounded contexts don't share domain types. + The handler receives item IDs (strings) rather than raw bytes because the + NewDataDetectionPipeline runs a NewDataAnalyzer before calling the handler. + This keeps use-case logic out of handlers and makes handlers pure dispatchers. """ async def handle_new_data( self, endpoint_id: str, - previous_data: bytes | None, - new_data: bytes, + new_item_ids: list[str], content_hash: str, ) -> Acknowledgement: """ - Handle newly detected data from a polling operation. + Handle newly detected items from a polling operation. This method is called when the polling system detects that data at - an endpoint has changed. The handler decides what to do with this - information - whether to start processing workflows, queue work, - send notifications, or any other domain-specific action. + an endpoint has changed and the analyzer has identified the new items. + The handler decides what to do with the item IDs — whether to start + processing workflows, queue work, send notifications, or any other + domain-specific action. Args: endpoint_id: Unique identifier for the polled endpoint - previous_data: Previous content (None if this is the first polling run) - new_data: New content that was detected as different from previous + new_item_ids: List of item IDs identified as new or changed by the analyzer content_hash: SHA256 hash of the new content for deduplication/tracking Returns: diff --git a/src/julee/contrib/polling/use_cases/poll_data.py b/src/julee/contrib/polling/use_cases/poll_data.py index 18c206e6..7af9445a 100644 --- a/src/julee/contrib/polling/use_cases/poll_data.py +++ b/src/julee/contrib/polling/use_cases/poll_data.py @@ -12,6 +12,7 @@ from pydantic import BaseModel from julee.contrib.polling.domain.models.polling_config import PollingConfig +from julee.contrib.polling.domain.services.new_data_analyzer import NewDataAnalyzer from julee.contrib.polling.domain.services.poller import PollerService from julee.contrib.polling.domain.services.polling_result_handler import ( PollingResultHandler, @@ -35,19 +36,23 @@ class PollDataUseCase: 1. Poll the endpoint via the injected PollerService 2. Compute the SHA-256 hash of the response content 3. Compare with the previous run's hash (from previous_completion) - 4. If content has changed, delegate to the optional PollingResultHandler - 5. Return a completion result dict in the same shape that + 4. If content has changed and an analyzer is set, identify new item IDs + 5. Delegate to the optional PollingResultHandler with the item IDs + 6. Return a completion result dict in the same shape that NewDataDetectionPipeline returns, so Temporal schedule last-completion-result works unchanged. + """ def __init__( self, poller: PollerService, - handler: PollingResultHandler | None = None, + handler: PollingResultHandler, + analyzer: NewDataAnalyzer, ) -> None: self._poller = poller self._handler = handler + self._analyzer = analyzer async def execute(self, request: PollDataRequest) -> dict: """ @@ -74,8 +79,9 @@ async def execute(self, request: PollDataRequest) -> dict: current_content = polling_result.content current_hash = hashlib.sha256(current_content).hexdigest() - # Step 3: Extract previous hash + # Step 3: Extract previous hash and content previous_hash: str | None = None + previous_data: bytes | None = None if ( request.previous_completion and "polling_result" in request.previous_completion @@ -83,34 +89,30 @@ async def execute(self, request: PollDataRequest) -> dict: previous_hash = request.previous_completion["polling_result"].get( "content_hash" ) + prev_content_str = request.previous_completion["polling_result"].get( + "content" + ) + if prev_content_str: + previous_data = prev_content_str.encode("utf-8") # Step 4: Detect change has_new_data = previous_hash != current_hash - # Step 5: Invoke handler if new data detected + # Step 5: Analyze and invoke handler if new data detected handler_acknowledgement = None - if has_new_data and self._handler is not None: - previous_data: bytes | None = None - if ( - request.previous_completion - and "polling_result" in request.previous_completion - ): - prev_content_str = request.previous_completion["polling_result"].get( - "content" - ) - if prev_content_str: - previous_data = prev_content_str.encode("utf-8") - + if has_new_data: try: + item_ids = await self._analyzer.identify_new_items( + previous_data, current_content + ) handler_acknowledgement = await self._handler.handle_new_data( endpoint_id, - previous_data, - current_content, + item_ids, current_hash, ) except Exception as e: logger.error( - "Handler raised an exception; continuing without ack", + "Analyzer or handler raised an exception; continuing without ack", extra={ "endpoint_id": endpoint_id, "error": str(e),