Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "julee"
version = "0.1.9"
version = "0.1.10"
description = "Julee - Clean architecture for accountable and transparent digital supply chains"
readme = "README.md"
requires-python = ">=3.11"
Expand Down
2 changes: 1 addition & 1 deletion src/julee/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
"""Julee - Clean architecture for accountable and transparent digital supply chains."""

__version__ = "0.1.9"
__version__ = "0.1.10"
212 changes: 41 additions & 171 deletions src/julee/contrib/polling/apps/worker/pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,21 @@
and reliable execution for endpoint polling and change detection.
"""

import hashlib
import logging
from abc import abstractmethod
from typing import Any

from temporalio import workflow
from temporalio.workflow import ParentClosePolicy

from julee.contrib.polling.domain.models.polling_config import PollingConfig
from julee.contrib.polling.domain.services.new_data_analyzer import NewDataAnalyzer
from julee.contrib.polling.domain.services.polling_result_handler import (
PollingResultHandler,
)
from julee.contrib.polling.infrastructure.temporal.proxies import (
WorkflowPollerServiceProxy,
)
from julee.contrib.polling.use_cases.poll_data import PollDataRequest, PollDataUseCase

logger = logging.getLogger(__name__)

Expand All @@ -29,18 +33,32 @@ class NewDataDetectionPipeline:
This workflow:
1. Polls an endpoint using the configured polling service
2. Compares result with previous completion to detect changes
3. Triggers downstream processing when new data is detected
4. Returns completion result for next scheduled execution
3. Runs the analyzer to identify new item IDs (if provided)
4. Hands off to result handler when new data is detected
5. Returns completion result for next scheduled execution

The workflow uses Temporal's schedule last completion result feature
to automatically receive the previous execution's result for comparison.

Subclasses must implement get_handler() and get_analyzer() to supply the
appropriate objects for each polling use case (credential, product, etc.).
"""

def __init__(self) -> None:
self.current_step = "initialized"
self.endpoint_id: str | None = None
self.has_new_data: bool = False

@abstractmethod
def get_handler(self) -> PollingResultHandler:
"""Return the PollingResultHandler for this pipeline."""
...

@abstractmethod
def get_analyzer(self) -> NewDataAnalyzer:
"""Return the NewDataAnalyzer for this pipeline."""
...

@workflow.query
def get_current_step(self) -> str:
"""Query method to get the current workflow step."""
Expand All @@ -56,82 +74,30 @@ def get_has_new_data(self) -> bool:
"""Query method to check if new data was detected."""
return self.has_new_data

async def trigger_downstream_pipeline(
self,
downstream_pipeline: str,
previous_data: bytes | None,
new_data: bytes,
) -> bool:
"""
Trigger downstream pipeline workflow.

Args:
downstream_pipeline: Name of the downstream workflow to trigger
previous_data: Previous content (None if first run)
new_data: New content that was detected

Returns:
True if successfully triggered, False otherwise
"""
try:
# Start child workflow for downstream processing with abandon policy
await workflow.start_child_workflow(
downstream_pipeline, # This would be the workflow class name
args=[previous_data, new_data],
id=f"downstream-{self.endpoint_id}-{workflow.info().workflow_id}",
task_queue="downstream-processing-queue",
parent_close_policy=ParentClosePolicy.ABANDON,
)

workflow.logger.info(
"Downstream pipeline triggered successfully",
extra={
"endpoint_id": self.endpoint_id,
"downstream_pipeline": downstream_pipeline,
},
)
return True

except Exception as e:
workflow.logger.error(
"Failed to trigger downstream pipeline",
extra={
"endpoint_id": self.endpoint_id,
"downstream_pipeline": downstream_pipeline,
"error": str(e),
"error_type": type(e).__name__,
},
)
# Don't fail the polling workflow if downstream trigger fails
return False

@workflow.run
async def run(
self,
config: PollingConfig | dict[str, Any],
downstream_pipeline: str | None = None,
) -> dict[str, Any]:
"""
Execute the new data detection workflow.

Args:
config: Configuration for the polling operation (PollingConfig or dict from schedule)
downstream_pipeline: Optional pipeline to trigger when new data detected
config: Configuration for the polling operation (PollingConfig or dict
from Temporal schedule serialisation)

Returns:
Completion result containing polling result and detection metadata

Raises:
RuntimeError: If polling or downstream processing fails after retries
RuntimeError: If polling fails after retries
"""
# Convert dict to PollingConfig if needed (for schedule compatibility)
# Temporal schedules serialize arguments as dicts, not Pydantic models
# Convert dict to PollingConfig if needed (Temporal schedules serialise args as dicts)
if isinstance(config, dict):
config = PollingConfig.model_validate(config)

self.endpoint_id = config.endpoint_identifier

# Fetch previous completion result from Temporal
previous_completion = workflow.get_last_completion_result()

workflow.logger.info(
Expand All @@ -148,129 +114,34 @@ async def run(
self.current_step = "polling_endpoint"

try:
# Step 1: Poll the endpoint
polling_service = WorkflowPollerServiceProxy()
polling_result = await polling_service.poll_endpoint(config)

# Extract the timestamp from when polling actually happened
polled_at = polling_result.polled_at.isoformat()

workflow.logger.debug(
"Polling completed",
extra={
"endpoint_id": self.endpoint_id,
"polling_success": polling_result.success,
"content_length": len(polling_result.content),
},
request = PollDataRequest(
config=config,
previous_completion=previous_completion,
)

self.current_step = "detecting_changes"

# Step 2: Detect new data using hash comparison
current_content = polling_result.content
current_hash = hashlib.sha256(current_content).hexdigest()

previous_hash = None
if previous_completion and "polling_result" in previous_completion:
previous_hash = previous_completion["polling_result"].get(
"content_hash"
)

has_new_data = previous_hash != current_hash
self.has_new_data = has_new_data

workflow.logger.info(
f"DEBUG: Change detection - has_new_data: {has_new_data}, "
f"is_first_run: {previous_hash is None}, "
f"current_hash: {current_hash[:8]}..., "
f"previous_hash: {previous_hash[:8] if previous_hash else 'None'}..."
use_case = PollDataUseCase(
poller=WorkflowPollerServiceProxy(),
handler=self.get_handler(),
analyzer=self.get_analyzer(),
)
result = await use_case.execute(request)

# Step 3: Trigger downstream processing if new data detected
downstream_triggered = False
if has_new_data and downstream_pipeline:
self.current_step = "triggering_downstream"

workflow.logger.info(
"Triggering downstream pipeline",
extra={
"endpoint_id": self.endpoint_id,
"downstream_pipeline": downstream_pipeline,
"content_length": len(current_content),
},
)

# Get previous data for comparison
previous_data = None
if previous_completion and "polling_result" in previous_completion:
prev_content_str = previous_completion["polling_result"].get(
"content"
)
if prev_content_str:
try:
previous_data = prev_content_str.encode("utf-8")
except (UnicodeDecodeError, AttributeError) as e:
workflow.logger.error(
"Failed to decode previous content for downstream pipeline",
extra={
"endpoint_id": self.endpoint_id,
"error": str(e),
"error_type": type(e).__name__,
},
)
raise RuntimeError(
f"Previous content is corrupted or invalid: {e}"
)
elif previous_hash:
# We have previous run but no content - this is an error
workflow.logger.error(
"Previous content not available for downstream pipeline but previous hash exists",
extra={
"endpoint_id": self.endpoint_id,
"previous_hash": previous_hash,
},
)
raise RuntimeError(
"Previous content is missing from completion result but is required for downstream pipeline"
)

downstream_triggered = await self.trigger_downstream_pipeline(
downstream_pipeline,
previous_data,
current_content,
)

self.endpoint_id = result.get("endpoint_id", self.endpoint_id)
self.has_new_data = result.get("detection_result", {}).get(
"has_new_data", False
)
self.current_step = "completed"

# Step 4: Return completion result for next scheduled execution
completion_result = {
"polling_result": {
"success": polling_result.success,
"content_hash": current_hash,
"content": current_content.decode("utf-8", errors="ignore"),
"polled_at": polled_at,
"content_length": len(current_content),
},
"detection_result": {
"has_new_data": has_new_data,
"previous_hash": previous_hash,
"current_hash": current_hash,
},
"downstream_triggered": downstream_triggered,
"endpoint_id": self.endpoint_id,
"completed_at": workflow.now().isoformat(),
}
result["completed_at"] = workflow.now().isoformat()

workflow.logger.info(
"New data detection pipeline completed successfully",
extra={
"endpoint_id": self.endpoint_id,
"has_new_data": has_new_data,
"downstream_triggered": downstream_triggered,
"has_new_data": self.has_new_data,
},
)

return completion_result
return result

except Exception as e:
self.current_step = "failed"
Expand All @@ -286,5 +157,4 @@ async def run(
exc_info=True,
)

# Re-raise to let Temporal handle retry logic
raise
12 changes: 0 additions & 12 deletions src/julee/contrib/polling/domain/services/__init__.py
Original file line number Diff line number Diff line change
@@ -1,12 +0,0 @@
"""
Polling domain services.

This module contains the service protocols for the polling contrib module.

No re-exports to avoid import chains that pull non-deterministic code
into Temporal workflows. Import directly from specific modules:

- from julee.contrib.polling.domain.services.poller import PollerService
"""

__all__ = []
50 changes: 50 additions & 0 deletions src/julee/contrib/polling/domain/services/new_data_analyzer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
"""
NewDataAnalyzer protocol for converting raw polling bytes into item IDs.

This protocol is the counterpart to PollingResultHandler. Where the handler
decides what to *do* when new items are found, the analyzer decides *what*
items are new by comparing previous and current polling payloads.

Separating analysis from handling keeps each concern to a single class and
allows the NewDataDetectionPipeline to complete data→domain translation
before any application-level dispatch occurs.
"""

from typing import Protocol, runtime_checkable


@runtime_checkable
class NewDataAnalyzer(Protocol):
"""
Converts raw polling bytes into a list of new item identifiers.

The analyzer is responsible for:
- Parsing the raw bytes from the polling response
- Comparing with the previous response (if any)
- Returning the IDs of items that are new or changed

The returned IDs are opaque strings from the polling system's perspective.
Their meaning is defined by the bounded context that provides the analyzer.
"""

async def identify_new_items(
self,
previous_data: bytes | None,
new_data: bytes,
) -> list[str]:
"""
Identify items that are new or changed since the previous poll.

Args:
previous_data: Previous polling response content.
None if this is the first polling run.
new_data: Current polling response content.

Returns:
List of item identifier strings that are new or changed.
Empty list if nothing is new.

Raises:
ValueError: If the data cannot be parsed or is in an unexpected format.
"""
...
Loading