Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions docs/developer-guide-core.md
Original file line number Diff line number Diff line change
Expand Up @@ -826,10 +826,11 @@ WorkflowValidator.validate(workflow); // throws IllegalStateException on violati

### `WorkflowValidator` checks

| Check | Error example |
|------------------------------|---------------------------------------------------------------------------------|
| `writes` name not in schema | `Node 'write' writes 'draft' which is not declared in state schema` |
| Prompt `{var}` not in schema | `Node 'write' prompt references '{tone}' which is not declared in state schema` |
| Check | Error example |
|---------------------------------|---------------------------------------------------------------------------------|
| Transition target doesn't exist | `Node 'write' has transition to 'revieww' which does not exist in the workflow` |
| `writes` name not in schema | `Node 'write' writes 'draft' which is not declared in state schema` |
| Prompt `{var}` not in schema | `Node 'write' prompt references '{tone}' which is not declared in state schema` |

Validation is a no-op when no schema is declared. Legacy workflows always pass through unchanged.

Expand Down Expand Up @@ -1412,7 +1413,7 @@ Environment variables matching `*_API_KEY`, `*_KEY`, `*_SECRET`, or `*_TOKEN` pa
| `workflow/state/VarType.java` | Variable type enum: STRING, NUMBER, BOOLEAN, LIST_STRING |
| `workflow/transition/ApprovalTransition.java` | Boolean approval routing via the `approved` engine variable |
| `workflow/validation/SubWorkflowGraphValidator.java` | Load-time cycle + dangling-reference detector for sub-workflow graphs |
| `workflow/validation/WorkflowValidator.java` | Load-time validator for `writes` and prompt `{variable}` references |
| `workflow/validation/WorkflowValidator.java` | Load-time validator for transition targets, `writes`, and prompt `{variable}` references |
| `rubric/RubricEngine.java` | Quality evaluation engine |
| `rubric/model/Rubric.java` | Rubric definition model |
| `tool/ToolDefinition.java` | Protocol-agnostic tool descriptor |
Expand Down
13 changes: 10 additions & 3 deletions docs/developer-guide-server.md
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,12 @@ io.hensu.server/
│ ├── ExecutionEventResource # Execution monitoring SSE
│ ├── McpGatewayResource # MCP split-pipe SSE/POST
│ ├── ExecutionStartRequest # Request DTO for POST /executions
│ └── ResumeRequest # Request DTO for POST /executions/{id}/resume
│ ├── ResumeRequest # Request DTO for POST /executions/{id}/resume
│ ├── ResumeResponse # Response DTO for POST /executions/{id}/resume
│ ├── PushWorkflowResponse # Response DTO for POST /workflows
│ ├── WorkflowSummary # Response DTO for GET /workflows list entries
│ ├── GatewayStatusResponse # Response DTO for GET /mcp/status
│ └── ClientStatusResponse # Response DTO for GET /mcp/clients/{id}/status
├── validation/ # Input validation (Bean Validation)
│ ├── InputValidator # Shared validation predicates (safe-ID, dangerous chars, size)
Expand Down Expand Up @@ -350,10 +355,12 @@ io.hensu.server/
│ ├── WorkflowExecutionService # Start/resume orchestration
│ ├── ExecutionQueryService # Read-side: status, plan, output, paused list
│ ├── ExecutionStateService # Snapshot load/save with split-brain guard
│ ├── ExecutionResultHandler # Shared ExecutionResult → snapshot + SSE dispatch
│ ├── WorkflowContextUtil # Filters internal (_-prefixed) keys from context
│ ├── ExecutionHeartbeatJob # Periodic heartbeat emission (@Scheduled)
│ ├── WorkflowRecoveryJob # Orphaned execution sweeper (@Scheduled)
│ ├── ExecutionStartResult / ExecutionOutput / ExecutionSummary / PlanInfo / ResumeDecision # DTOs
│ ├── ExecutionStatus # Enum: COMPLETED / PAUSED / RUNNING
│ ├── ExecutionStartResult / ExecutionOutput / ExecutionSummary / PlanInfo # DTOs
│ ├── ExecutionStatus # DTO for execution status (with correlationId)
│ └── {Execution,Workflow}{NotFound,Execution}Exception # Domain exceptions
├── streaming/ # Execution event streaming
Expand Down
11 changes: 7 additions & 4 deletions docs/unified-architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,7 @@ Zero-dependency Java library. Contains:
- `WorkflowRepository` / `WorkflowStateRepository` — Tenant-scoped storage interfaces with in-memory defaults
- `HensuState` / `HensuSnapshot` / `ExecutionHistory` — Mutable runtime state, immutable checkpoints, execution trace
- Workflow model, Node types (including `SubWorkflowNode`), Transition rules
- `WorkflowValidator` (`workflow/validation`) — validates transition targets exist unconditionally; validates `writes` and prompt `{variable}` refs against schema when declared
- `SubWorkflowGraphValidator` (`workflow/validation`) — cycle + dangling-ref detection across the sub-workflow reference graph, single-DFS with `globallyVisited`; `SubWorkflowNodeExecutor` enforces `MAX_DEPTH = 16` and propagates `_tenant_id` into the child context

### hensu-dsl (Kotlin DSL)
Expand Down Expand Up @@ -587,9 +588,11 @@ flowchart LR
### 4. State Schema Validation

Workflows optionally declare a `WorkflowStateSchema` — a typed registry of domain variables
(`writes` declarations) and their expected types. At load time, `WorkflowValidator` verifies
the schema against all node `writes` declarations and prompt template bindings (e.g., `{orderId}`),
preventing runtime binding failures before execution begins.
(`writes` declarations) and their expected types. At load time, `WorkflowValidator` performs
two categories of checks: **structural** — every transition target must reference an existing
node in the workflow — and **schema** — all node `writes` declarations and prompt template
bindings (e.g., `{orderId}`) must be declared in the schema. Structural validation runs
unconditionally; schema validation is a no-op when no schema is declared.

Three **engine variables** (`score`, `approved`, `recommendation`) are predefined in
`WorkflowStateSchema.ENGINE_VARIABLES` — they must never appear in user `state { }` declarations
Expand Down Expand Up @@ -801,6 +804,6 @@ The unified architecture provides:
16. **API Separation** — Workflow definitions and executions are distinct REST resources
17. **GraalVM-First Design** — No-reflection core; explicit wiring enables static analysis
18. **Three-Layer Testing** — Unit (Mockito), Integration (inmem + stubs), Persistence (Testcontainers)
19. **State Schema Validation** — `WorkflowStateSchema` + `WorkflowValidator` enforce typed variable declarations and prompt bindings at load time
19. **Workflow Validation** — `WorkflowValidator` enforces transition target existence unconditionally; `WorkflowStateSchema` adds typed variable declarations and prompt binding checks at load time
20. **Execution Observability** — `ExecutionEventBroadcaster` fans out engine events to SSE subscribers; `ScopedValue` routes events across virtual threads without `ThreadLocal`
21. **CLI Daemon** — `DaemonServer` keeps the JVM and Kotlin compiler warm; `OutputRingBuffer` allows detach/re-attach without losing execution output
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,20 @@
import io.hensu.core.execution.action.CommandRegistry.CommandDefinition;
import io.hensu.core.template.SimpleTemplateResolver;
import io.hensu.core.template.TemplateResolver;
import io.hensu.core.util.ShellEscaper;
import jakarta.enterprise.context.ApplicationScoped;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;

Expand Down Expand Up @@ -53,7 +58,7 @@ public class CLIActionExecutor implements ActionExecutor {

private final TemplateResolver templateResolver = new SimpleTemplateResolver();
private final Map<String, ActionHandler> handlers = new ConcurrentHashMap<>();
private CommandRegistry commandRegistry;
private volatile CommandRegistry commandRegistry;

public CLIActionExecutor() {
this.commandRegistry = new CommandRegistry();
Expand Down Expand Up @@ -128,7 +133,6 @@ private ActionResult executeSend(Action.Send send, Map<String, Object> context)
private ActionResult executeCommand(Action.Execute exec, Map<String, Object> context) {
String commandId = exec.getCommandId();

// Resolve command from registry
if (!commandRegistry.hasCommand(commandId)) {
String msg =
"Command not found in registry: '"
Expand All @@ -141,7 +145,8 @@ private ActionResult executeCommand(Action.Execute exec, Map<String, Object> con
}

CommandDefinition cmdDef = commandRegistry.getCommand(commandId);
String command = templateResolver.resolve(cmdDef.command(), context);
Map<String, Object> shellSafeContext = shellEscapeContext(context);
String command = templateResolver.resolve(cmdDef.command(), shellSafeContext);

logger.info("Executing command [" + commandId + "]: " + command);

Expand All @@ -153,26 +158,36 @@ private ActionResult executeCommand(Action.Execute exec, Map<String, Object> con

Process process = pb.start();

StringBuilder output = new StringBuilder();
try (BufferedReader reader =
new BufferedReader(new InputStreamReader(process.getInputStream()))) {
String line;
while ((line = reader.readLine()) != null) {
output.append(line).append("\n");
logger.info("[CMD] " + line);
}
}
// Drain output asynchronously to prevent pipe-buffer deadlock with timeout
Future<String> outputFuture =
PROCESS_IO_EXECUTOR.submit(
() -> {
StringBuilder output = new StringBuilder();
try (BufferedReader reader =
new BufferedReader(
new InputStreamReader(
process.getInputStream(),
StandardCharsets.UTF_8))) {
String line;
while ((line = reader.readLine()) != null) {
output.append(line).append("\n");
logger.info("[CMD] " + line);
}
}
return output.toString().trim();
});

boolean finished = process.waitFor(cmdDef.timeoutMs(), TimeUnit.MILLISECONDS);
if (!finished) {
process.destroyForcibly();
outputFuture.cancel(true);
return ActionResult.failure("Command timed out after " + cmdDef.timeoutMs() + "ms");
}

String output = outputFuture.get(5, TimeUnit.SECONDS);
int exitCode = process.exitValue();
if (exitCode == 0) {
return ActionResult.success(
"Command completed successfully", output.toString().trim());
return ActionResult.success("Command completed successfully", output);
} else {
return ActionResult.failure("Command failed with exit code: " + exitCode);
}
Expand All @@ -183,6 +198,22 @@ private ActionResult executeCommand(Action.Execute exec, Map<String, Object> con
}
}

private static final ExecutorService PROCESS_IO_EXECUTOR =
Executors.newVirtualThreadPerTaskExecutor();

private Map<String, Object> shellEscapeContext(Map<String, Object> context) {
Map<String, Object> escaped = new HashMap<>(context.size());
for (Map.Entry<String, Object> entry : context.entrySet()) {
Object value = entry.getValue();
if (value instanceof String s) {
escaped.put(entry.getKey(), ShellEscaper.escape(s));
} else if (value != null) {
escaped.put(entry.getKey(), ShellEscaper.escape(value.toString()));
}
}
return escaped;
}

/// Resolves template variables in payload string values.
private Map<String, Object> resolvePayload(
Map<String, Object> payload, Map<String, Object> context) {
Expand Down
Loading
Loading