Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/run-itk-nightly.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ on:
# PR workflow's `branches: ["main", "epic"]` configuration. Drop the
# `epic` entry once this is merged to `main` and the cron takes over.
push:
branches: ["main", "epic"]
branches: ["main", "epic/**"]
paths:
- 'src/**'
- 'itk/**'
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/run-itk.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ name: Run ITK

on:
push:
branches: ["main", "epic"]
branches: ["main", "epic/**"]
pull_request:
paths:
- 'src/**'
Expand Down
14 changes: 12 additions & 2 deletions src/server/events/execution_event_queue.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { ExecutionEventBus, AgentExecutionEvent } from './execution_event_bus.js';
import { TERMINAL_STATE_LIST } from '../utils.js';
import { INTERRUPTED_STATE_LIST, TERMINAL_STATE_LIST } from '../utils.js';

/**
* An async queue that subscribes to an ExecutionEventBus for events
Expand Down Expand Up @@ -39,11 +39,21 @@ export class ExecutionEventQueue {
if (this.eventQueue.length > 0) {
const event = this.eventQueue.shift()!;
yield event;
// A consumer's event loop terminates on:
// * a Message (the only event a stateless agent ever produces);
// * a terminal Task status (COMPLETED / FAILED / CANCELED /
// REJECTED) — the task can never produce further events;
// * an interrupted Task status (INPUT_REQUIRED / AUTH_REQUIRED)
// — the executor has returned and is awaiting a fresh
// message, so blocking consumers must stop iterating; the
// underlying bus stays alive in `DefaultRequestHandler` so
// resubscribers and follow-up sends can still attach.
if (
event.kind === 'message' ||
(event.kind === 'statusUpdate' &&
event.data.status &&
TERMINAL_STATE_LIST.includes(event.data.status.state))
(TERMINAL_STATE_LIST.includes(event.data.status.state) ||
INTERRUPTED_STATE_LIST.includes(event.data.status.state)))
) {
this.handleFinished();
break;
Expand Down
72 changes: 65 additions & 7 deletions src/server/express/json_rpc_handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ import { JSONRPCResponse } from '../transports/jsonrpc/jsonrpc_transport_handler
import { A2ARequestHandler } from '../request_handler/a2a_request_handler.js';
import { JsonRpcTransportHandler } from '../transports/jsonrpc/jsonrpc_transport_handler.js';
import { ServerCallContext } from '../context.js';
import { A2A_VERSION_HEADER, HTTP_EXTENSION_HEADER } from '../../constants.js';
import { A2A_VERSION_HEADER, HTTP_EXTENSION_HEADER, JSON_CONTENT_TYPE } from '../../constants.js';
import { UserBuilder } from './common.js';
import { SSE_HEADERS, formatSSEEvent, formatSSEErrorEvent } from '../../sse_utils.js';
import { Extensions } from '../../extensions.js';
import { RequestMalformedError } from '../../errors.js';
import { ContentTypeNotSupportedError, RequestMalformedError } from '../../errors.js';
import { validateVersion } from '../version.js';
import { LegacyJsonRpcTransportHandler, isLegacyJsonRpcMethod } from '../../compat/v0_3/index.js';
import { LEGACY_HTTP_EXTENSION_HEADER } from '../../compat/v0_3/constants.js';
Expand Down Expand Up @@ -79,6 +79,11 @@ export function jsonRpcHandler(options: JsonRpcHandlerOptions): RequestHandler {

const router = express.Router();

// §9.1 JSON-RPC requests MUST use `application/json`. Reject any
// other content type with `ContentTypeNotSupportedError` (-32005)
// rather than letting `express.json()` ignore the body and the
// dispatcher fall back to a misleading `InvalidParamsError` (-32602).
router.use(contentTypeGuard);
router.use(express.json(), jsonErrorHandler);

router.post('/', async (req: Request, res: Response) => {
Expand Down Expand Up @@ -131,17 +136,38 @@ export function jsonRpcHandler(options: JsonRpcHandlerOptions): RequestHandler {
if (typeof (rpcResponseOrStream as AsyncGenerator)?.[Symbol.asyncIterator] === 'function') {
const stream = rpcResponseOrStream as AsyncGenerator<JSONRPCResponse, void, undefined>;

// Set SSE headers using shared utility
// Peek the first event BEFORE flushing SSE headers so that an
// early failure (e.g. `resubscribe` on a terminal task, which
// throws `UnsupportedOperationError`) surfaces as a proper
// JSON-RPC error response with the right HTTP status rather
// than a 200 SSE stream carrying a single error event — the
// latter looks like a successful subscription to most clients.
const iterator = stream[Symbol.asyncIterator]();
let firstResult: IteratorResult<JSONRPCResponse>;
try {
firstResult = await iterator.next();
} catch (streamError) {
console.error(`Pre-stream error for request ${req.body?.id}:`, streamError);
const errorResponse: JSONRPCErrorResponse = {
jsonrpc: '2.0',
id: req.body?.id || null,
error: mapToError(streamError),
};
res.status(200).json(errorResponse);
return;
}

// First event succeeded — now switch to SSE.
Object.entries(SSE_HEADERS).forEach(([key, value]) => {
res.setHeader(key, value);
});

res.flushHeaders();

try {
for await (const event of stream) {
// Each event from the stream is already a JSONRPCResult
// Use shared formatSSEEvent utility
if (!firstResult.done) {
res.write(formatSSEEvent(firstResult.value));
}
for await (const event of { [Symbol.asyncIterator]: () => iterator }) {
res.write(formatSSEEvent(event));
}
} catch (streamError) {
Expand Down Expand Up @@ -189,6 +215,38 @@ export function jsonRpcHandler(options: JsonRpcHandlerOptions): RequestHandler {
return router;
}

/**
* Express middleware that rejects requests whose Content-Type is not
* `application/json` (§9.1) with `ContentTypeNotSupportedError`
* (-32005). Bodyless requests (GET, OPTIONS, …) and requests without a
* Content-Type header pass through; only POSTs that explicitly declare
* a non-JSON content type are rejected.
*/
const contentTypeGuard: RequestHandler = (req, res, next) => {
const rawContentType = req.header('content-type');
if (!rawContentType) {
next();
return;
}
// Strip charset and other params before comparing.
const mediaType = rawContentType.split(';', 1)[0].trim().toLowerCase();
if (mediaType === JSON_CONTENT_TYPE) {
next();
return;
}
const errorResponse: JSONRPCErrorResponse = {
jsonrpc: '2.0',
id: null,
error: JsonRpcTransportHandler.mapToJSONRPCError(
new ContentTypeNotSupportedError(
`Unsupported Content-Type "${rawContentType}"; expected application/json.`
)
),
};
// Per §5.4 ContentTypeNotSupportedError maps to HTTP 400.
res.status(400).json(errorResponse);
};

export const jsonErrorHandler: ErrorRequestHandler = (
err: unknown,
_req: Request,
Expand Down
33 changes: 32 additions & 1 deletion src/server/express/rest_handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import {
TaskPushNotificationConfig,
} from '../../types/pb/a2a.js';
import { ToProto } from '../../types/converters/to_proto.js';
import { RequestMalformedError } from '../../errors.js';
import { ContentTypeNotSupportedError, RequestMalformedError } from '../../errors.js';

/**
* Options for configuring the HTTP+JSON/REST handler.
Expand Down Expand Up @@ -86,6 +86,31 @@ const restErrorHandler: ErrorRequestHandler = (
next(err);
};

/**
* Express middleware that rejects body-bearing REST requests whose
* Content-Type is neither `application/json` nor `application/a2a+json`,
* surfacing a `ContentTypeNotSupportedError` mapped to HTTP 400 per §5.4.
*/
const restContentTypeGuard: RequestHandler = (req, res, next) => {
const rawContentType = req.header('content-type');
// Only enforce on requests that actually carry a body. GET / DELETE
// without a Content-Type header are routine; CORS preflights live on
// OPTIONS and must keep working.
if (!rawContentType) {
next();
return;
}
const mediaType = rawContentType.split(';', 1)[0].trim().toLowerCase();
if (mediaType === JSON_CONTENT_TYPE || mediaType === A2A_CONTENT_TYPE) {
next();
return;
}
const error = new ContentTypeNotSupportedError(
`Unsupported Content-Type "${rawContentType}"; expected application/json or application/a2a+json.`
);
res.status(HTTP_STATUS.BAD_REQUEST).json(toHTTPError(error, HTTP_STATUS.BAD_REQUEST));
};

// Route patterns removed - using explicit route definitions instead

/**
Expand Down Expand Up @@ -145,6 +170,12 @@ export function restHandler(options: RestHandlerOptions): RequestHandler {
res.setHeader('Content-Type', A2A_CONTENT_TYPE);
next();
},
// A body-bearing request (POST / PUT / DELETE) with an unsupported
// Content-Type must surface as `ContentTypeNotSupportedError`
// mapped to HTTP 400 (§5.4), not as a generic 400 once
// `express.json()` silently skips parsing. Bodyless requests
// (GET / OPTIONS / HEAD) pass through.
restContentTypeGuard,
express.json({ type: [JSON_CONTENT_TYPE, A2A_CONTENT_TYPE], strict: false }),
restErrorHandler
);
Expand Down
79 changes: 74 additions & 5 deletions src/server/request_handler/default_request_handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ import { PushNotificationSender } from '../push_notification/push_notification_s
import { DefaultPushNotificationSender } from '../push_notification/default_push_notification_sender.js';
import { ServerCallContext } from '../context.js';
import { DEFAULT_PAGE_SIZE } from '../../constants.js';
import { TERMINAL_STATE_LIST, isTask, StreamPattern } from '../utils.js';
import { INTERRUPTED_STATE_LIST, TERMINAL_STATE_LIST, isTask, StreamPattern } from '../utils.js';
import { AgentCardSignatureGenerator } from '../../signature.js';
import { extractErrorMessage } from '../../errors.js';

Expand Down Expand Up @@ -323,6 +323,13 @@ export class DefaultRequestHandler implements A2ARequestHandler {
requestContext: RequestContext,
finalMessageForAgent: Message
): void {
// Subscribe a lightweight listener that tracks the last task state
// published on the bus. We can't rely on `resultManager.getCurrentTask()`
// in the `.finally` block because the consumer loop that drains the
// queue into `ResultManager` runs in a separate microtask: by the
// time `.finally()` runs, the consumer has not necessarily processed
// the events yet.
const stateTracker = trackLatestTaskState(eventBus);
this.agentExecutor
.execute(requestContext, eventBus)
.catch((err: unknown) => {
Expand Down Expand Up @@ -381,11 +388,34 @@ export class DefaultRequestHandler implements A2ARequestHandler {
);
})
.finally(() => {
eventBus.finished();
this.eventBusManager.cleanupByTaskId(taskId);
// Closes the bus for terminal tasks; kept alive for
// INPUT_REQUIRED / AUTH_REQUIRED so follow-up sends and
// resubscribers can still attach.
this._settleBus(taskId, eventBus, stateTracker());
});
}

/**
* Settles the event bus once the executor returns.
*
* Terminal states (and the bare-Message pattern in §3.1.2) close the
* bus immediately. Interrupted states (INPUT_REQUIRED / AUTH_REQUIRED)
* keep it alive so a follow-up `message/send` can resume the same
* execution via `createOrGetByTaskId`, and `tasks/resubscribe` can
* attach in the meantime (§3.4.3).
*/
private _settleBus(
taskId: string,
eventBus: ExecutionEventBus,
lastState: TaskState | undefined
): void {
if (lastState !== undefined && INTERRUPTED_STATE_LIST.includes(lastState)) {
return;
}
eventBus.finished();
this.eventBusManager.cleanupByTaskId(taskId);
}

/**
* Streaming variant of {@link _runExecutor}.
*
Expand All @@ -403,6 +433,9 @@ export class DefaultRequestHandler implements A2ARequestHandler {
resultManager: ResultManager
): void {
const finalMessageForAgent = requestContext.userMessage;
// See `_runExecutor` for why we snoop the bus directly instead of
// re-reading state from `resultManager` in the `.finally` block.
const stateTracker = trackLatestTaskState(eventBus);
this.agentExecutor
.execute(requestContext, eventBus)
.catch((err: unknown) => {
Expand Down Expand Up @@ -451,8 +484,10 @@ export class DefaultRequestHandler implements A2ARequestHandler {
eventBus.publish(AgentEvent.statusUpdate(errorTaskStatus));
})
.finally(() => {
eventBus.finished();
this.eventBusManager.cleanupByTaskId(taskId);
// Closes the bus for terminal tasks; kept alive for
// INPUT_REQUIRED / AUTH_REQUIRED so follow-up sends and
// resubscribers can still attach.
this._settleBus(taskId, eventBus, stateTracker());
});
}

Expand Down Expand Up @@ -1008,3 +1043,37 @@ export class DefaultRequestHandler implements A2ARequestHandler {
}

export type ExtendedAgentCardProvider = (context: ServerCallContext) => Promise<AgentCard>;

/**
* Subscribes a lightweight listener on `bus` that records the most
* recent task state published — either via a `Task` event or a
* `TaskStatusUpdateEvent`. Returns a thunk that the caller invokes in
* the executor's `.finally` block to detach the listener and read the
* last seen state.
*
* Used by `_runExecutor` / `_runStreamExecutor` to decide whether to
* tear down the bus after `execute()` returns. Reading state from the
* `ResultManager` directly is unsafe at that point: the consumer loop
* that drains the bus into the `ResultManager` runs in a separate
* microtask, so the `ResultManager`'s view of the task may still lag
* the publish call.
*/
function trackLatestTaskState(bus: ExecutionEventBus): () => TaskState | undefined {
let lastState: TaskState | undefined;
const listener = (event: AgentExecutionEvent) => {
if (event.kind === 'task' && event.data.status?.state !== undefined) {
lastState = event.data.status.state;
} else if (event.kind === 'statusUpdate' && event.data.status?.state !== undefined) {
lastState = event.data.status.state;
}
};
bus.on('event', listener);
// Detach the listener when the caller reads the state in `.finally()`.
// Without this, every executor turn on a long-lived bus (kept alive
// for INPUT_REQUIRED / AUTH_REQUIRED) would accumulate another
// listener on the same bus.
return () => {
bus.off('event', listener);
return lastState;
};
}
14 changes: 14 additions & 0 deletions src/server/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,20 @@ const TERMINAL_STATE_LIST: TaskState[] = [
];
export { TERMINAL_STATE_LIST };

/**
* Non-terminal states that pause the executor and require a follow-up
* message before progress can resume. The agent's `execute()` is
* expected to return after publishing one of these states, but the
* underlying task remains live so a client can resubscribe — or send a
* fresh message with the same `taskId` — to continue the flow. See
* §3.4.3 (Input Required State) of the A2A specification.
*/
const INTERRUPTED_STATE_LIST: TaskState[] = [
TaskState.TASK_STATE_INPUT_REQUIRED,
TaskState.TASK_STATE_AUTH_REQUIRED,
];
export { INTERRUPTED_STATE_LIST };

/**
* Generates a timestamp in ISO 8601 format.
* @returns The current timestamp as a string.
Expand Down
18 changes: 11 additions & 7 deletions test/server/express/express_app.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -262,14 +262,18 @@ describe('A2AExpressApp', () => {
.send(requestBody)
.expect(200);

// Assert SSE headers and error event content
assert.include(response.headers['content-type'], 'text/event-stream');
assert.equal(response.headers['cache-control'], 'no-cache');
assert.equal(response.headers['connection'], 'keep-alive');
// When the streaming generator throws BEFORE yielding its first
// event (e.g. resubscribe on a terminal task) the handler must
// surface a plain JSON-RPC error response rather than a 200 SSE
// stream carrying a single error event: the latter looks like a
// successful subscription to most clients.
assert.include(response.headers['content-type'], 'application/json');
assert.notInclude(response.headers['content-type'], 'text/event-stream');

const responseText = response.text;
assert.include(responseText, 'event: error');
assert.include(responseText, 'Immediate streaming error');
assert.equal(response.body.jsonrpc, '2.0');
assert.equal(response.body.id, 'immediate-stream-error-test');
assert.exists(response.body.error);
assert.include(response.body.error.message, 'Immediate streaming error');
});

it('should handle general processing error', async () => {
Expand Down
Loading