diff --git a/.github/workflows/run-itk-nightly.yaml b/.github/workflows/run-itk-nightly.yaml index 0b93c16a..57304bdc 100644 --- a/.github/workflows/run-itk-nightly.yaml +++ b/.github/workflows/run-itk-nightly.yaml @@ -16,7 +16,7 @@ on: # PR workflow's `branches: ["main", "epic"]` configuration. Drop the # `epic` entry once this is merged to `main` and the cron takes over. push: - branches: ["main", "epic"] + branches: ["main", "epic/**"] paths: - 'src/**' - 'itk/**' diff --git a/.github/workflows/run-itk.yaml b/.github/workflows/run-itk.yaml index 5a3d85b8..27f465bd 100644 --- a/.github/workflows/run-itk.yaml +++ b/.github/workflows/run-itk.yaml @@ -2,7 +2,7 @@ name: Run ITK on: push: - branches: ["main", "epic"] + branches: ["main", "epic/**"] pull_request: paths: - 'src/**' diff --git a/src/server/events/execution_event_queue.ts b/src/server/events/execution_event_queue.ts index cdb718ca..3694dd41 100644 --- a/src/server/events/execution_event_queue.ts +++ b/src/server/events/execution_event_queue.ts @@ -1,5 +1,5 @@ import { ExecutionEventBus, AgentExecutionEvent } from './execution_event_bus.js'; -import { TERMINAL_STATE_LIST } from '../utils.js'; +import { INTERRUPTED_STATE_LIST, TERMINAL_STATE_LIST } from '../utils.js'; /** * An async queue that subscribes to an ExecutionEventBus for events @@ -39,11 +39,21 @@ export class ExecutionEventQueue { if (this.eventQueue.length > 0) { const event = this.eventQueue.shift()!; yield event; + // A consumer's event loop terminates on: + // * a Message (the only event a stateless agent ever produces); + // * a terminal Task status (COMPLETED / FAILED / CANCELED / + // REJECTED) — the task can never produce further events; + // * an interrupted Task status (INPUT_REQUIRED / AUTH_REQUIRED) + // — the executor has returned and is awaiting a fresh + // message, so blocking consumers must stop iterating; the + // underlying bus stays alive in `DefaultRequestHandler` so + // resubscribers and follow-up sends can still attach. if ( event.kind === 'message' || (event.kind === 'statusUpdate' && event.data.status && - TERMINAL_STATE_LIST.includes(event.data.status.state)) + (TERMINAL_STATE_LIST.includes(event.data.status.state) || + INTERRUPTED_STATE_LIST.includes(event.data.status.state))) ) { this.handleFinished(); break; diff --git a/src/server/express/json_rpc_handler.ts b/src/server/express/json_rpc_handler.ts index e4cdbb1f..2af812c3 100644 --- a/src/server/express/json_rpc_handler.ts +++ b/src/server/express/json_rpc_handler.ts @@ -10,11 +10,11 @@ import { JSONRPCResponse } from '../transports/jsonrpc/jsonrpc_transport_handler import { A2ARequestHandler } from '../request_handler/a2a_request_handler.js'; import { JsonRpcTransportHandler } from '../transports/jsonrpc/jsonrpc_transport_handler.js'; import { ServerCallContext } from '../context.js'; -import { A2A_VERSION_HEADER, HTTP_EXTENSION_HEADER } from '../../constants.js'; +import { A2A_VERSION_HEADER, HTTP_EXTENSION_HEADER, JSON_CONTENT_TYPE } from '../../constants.js'; import { UserBuilder } from './common.js'; import { SSE_HEADERS, formatSSEEvent, formatSSEErrorEvent } from '../../sse_utils.js'; import { Extensions } from '../../extensions.js'; -import { RequestMalformedError } from '../../errors.js'; +import { ContentTypeNotSupportedError, RequestMalformedError } from '../../errors.js'; import { validateVersion } from '../version.js'; import { LegacyJsonRpcTransportHandler, isLegacyJsonRpcMethod } from '../../compat/v0_3/index.js'; import { LEGACY_HTTP_EXTENSION_HEADER } from '../../compat/v0_3/constants.js'; @@ -79,6 +79,11 @@ export function jsonRpcHandler(options: JsonRpcHandlerOptions): RequestHandler { const router = express.Router(); + // §9.1 JSON-RPC requests MUST use `application/json`. Reject any + // other content type with `ContentTypeNotSupportedError` (-32005) + // rather than letting `express.json()` ignore the body and the + // dispatcher fall back to a misleading `InvalidParamsError` (-32602). + router.use(contentTypeGuard); router.use(express.json(), jsonErrorHandler); router.post('/', async (req: Request, res: Response) => { @@ -131,17 +136,38 @@ export function jsonRpcHandler(options: JsonRpcHandlerOptions): RequestHandler { if (typeof (rpcResponseOrStream as AsyncGenerator)?.[Symbol.asyncIterator] === 'function') { const stream = rpcResponseOrStream as AsyncGenerator; - // Set SSE headers using shared utility + // Peek the first event BEFORE flushing SSE headers so that an + // early failure (e.g. `resubscribe` on a terminal task, which + // throws `UnsupportedOperationError`) surfaces as a proper + // JSON-RPC error response with the right HTTP status rather + // than a 200 SSE stream carrying a single error event — the + // latter looks like a successful subscription to most clients. + const iterator = stream[Symbol.asyncIterator](); + let firstResult: IteratorResult; + try { + firstResult = await iterator.next(); + } catch (streamError) { + console.error(`Pre-stream error for request ${req.body?.id}:`, streamError); + const errorResponse: JSONRPCErrorResponse = { + jsonrpc: '2.0', + id: req.body?.id || null, + error: mapToError(streamError), + }; + res.status(200).json(errorResponse); + return; + } + + // First event succeeded — now switch to SSE. Object.entries(SSE_HEADERS).forEach(([key, value]) => { res.setHeader(key, value); }); - res.flushHeaders(); try { - for await (const event of stream) { - // Each event from the stream is already a JSONRPCResult - // Use shared formatSSEEvent utility + if (!firstResult.done) { + res.write(formatSSEEvent(firstResult.value)); + } + for await (const event of { [Symbol.asyncIterator]: () => iterator }) { res.write(formatSSEEvent(event)); } } catch (streamError) { @@ -189,6 +215,38 @@ export function jsonRpcHandler(options: JsonRpcHandlerOptions): RequestHandler { return router; } +/** + * Express middleware that rejects requests whose Content-Type is not + * `application/json` (§9.1) with `ContentTypeNotSupportedError` + * (-32005). Bodyless requests (GET, OPTIONS, …) and requests without a + * Content-Type header pass through; only POSTs that explicitly declare + * a non-JSON content type are rejected. + */ +const contentTypeGuard: RequestHandler = (req, res, next) => { + const rawContentType = req.header('content-type'); + if (!rawContentType) { + next(); + return; + } + // Strip charset and other params before comparing. + const mediaType = rawContentType.split(';', 1)[0].trim().toLowerCase(); + if (mediaType === JSON_CONTENT_TYPE) { + next(); + return; + } + const errorResponse: JSONRPCErrorResponse = { + jsonrpc: '2.0', + id: null, + error: JsonRpcTransportHandler.mapToJSONRPCError( + new ContentTypeNotSupportedError( + `Unsupported Content-Type "${rawContentType}"; expected application/json.` + ) + ), + }; + // Per §5.4 ContentTypeNotSupportedError maps to HTTP 400. + res.status(400).json(errorResponse); +}; + export const jsonErrorHandler: ErrorRequestHandler = ( err: unknown, _req: Request, diff --git a/src/server/express/rest_handler.ts b/src/server/express/rest_handler.ts index 43c3493b..53b13570 100644 --- a/src/server/express/rest_handler.ts +++ b/src/server/express/rest_handler.ts @@ -37,7 +37,7 @@ import { TaskPushNotificationConfig, } from '../../types/pb/a2a.js'; import { ToProto } from '../../types/converters/to_proto.js'; -import { RequestMalformedError } from '../../errors.js'; +import { ContentTypeNotSupportedError, RequestMalformedError } from '../../errors.js'; /** * Options for configuring the HTTP+JSON/REST handler. @@ -86,6 +86,31 @@ const restErrorHandler: ErrorRequestHandler = ( next(err); }; +/** + * Express middleware that rejects body-bearing REST requests whose + * Content-Type is neither `application/json` nor `application/a2a+json`, + * surfacing a `ContentTypeNotSupportedError` mapped to HTTP 400 per §5.4. + */ +const restContentTypeGuard: RequestHandler = (req, res, next) => { + const rawContentType = req.header('content-type'); + // Only enforce on requests that actually carry a body. GET / DELETE + // without a Content-Type header are routine; CORS preflights live on + // OPTIONS and must keep working. + if (!rawContentType) { + next(); + return; + } + const mediaType = rawContentType.split(';', 1)[0].trim().toLowerCase(); + if (mediaType === JSON_CONTENT_TYPE || mediaType === A2A_CONTENT_TYPE) { + next(); + return; + } + const error = new ContentTypeNotSupportedError( + `Unsupported Content-Type "${rawContentType}"; expected application/json or application/a2a+json.` + ); + res.status(HTTP_STATUS.BAD_REQUEST).json(toHTTPError(error, HTTP_STATUS.BAD_REQUEST)); +}; + // Route patterns removed - using explicit route definitions instead /** @@ -145,6 +170,12 @@ export function restHandler(options: RestHandlerOptions): RequestHandler { res.setHeader('Content-Type', A2A_CONTENT_TYPE); next(); }, + // A body-bearing request (POST / PUT / DELETE) with an unsupported + // Content-Type must surface as `ContentTypeNotSupportedError` + // mapped to HTTP 400 (§5.4), not as a generic 400 once + // `express.json()` silently skips parsing. Bodyless requests + // (GET / OPTIONS / HEAD) pass through. + restContentTypeGuard, express.json({ type: [JSON_CONTENT_TYPE, A2A_CONTENT_TYPE], strict: false }), restErrorHandler ); diff --git a/src/server/request_handler/default_request_handler.ts b/src/server/request_handler/default_request_handler.ts index 66267907..ecdd0cb6 100644 --- a/src/server/request_handler/default_request_handler.ts +++ b/src/server/request_handler/default_request_handler.ts @@ -56,7 +56,7 @@ import { PushNotificationSender } from '../push_notification/push_notification_s import { DefaultPushNotificationSender } from '../push_notification/default_push_notification_sender.js'; import { ServerCallContext } from '../context.js'; import { DEFAULT_PAGE_SIZE } from '../../constants.js'; -import { TERMINAL_STATE_LIST, isTask, StreamPattern } from '../utils.js'; +import { INTERRUPTED_STATE_LIST, TERMINAL_STATE_LIST, isTask, StreamPattern } from '../utils.js'; import { AgentCardSignatureGenerator } from '../../signature.js'; import { extractErrorMessage } from '../../errors.js'; @@ -323,6 +323,13 @@ export class DefaultRequestHandler implements A2ARequestHandler { requestContext: RequestContext, finalMessageForAgent: Message ): void { + // Subscribe a lightweight listener that tracks the last task state + // published on the bus. We can't rely on `resultManager.getCurrentTask()` + // in the `.finally` block because the consumer loop that drains the + // queue into `ResultManager` runs in a separate microtask: by the + // time `.finally()` runs, the consumer has not necessarily processed + // the events yet. + const stateTracker = trackLatestTaskState(eventBus); this.agentExecutor .execute(requestContext, eventBus) .catch((err: unknown) => { @@ -381,11 +388,34 @@ export class DefaultRequestHandler implements A2ARequestHandler { ); }) .finally(() => { - eventBus.finished(); - this.eventBusManager.cleanupByTaskId(taskId); + // Closes the bus for terminal tasks; kept alive for + // INPUT_REQUIRED / AUTH_REQUIRED so follow-up sends and + // resubscribers can still attach. + this._settleBus(taskId, eventBus, stateTracker()); }); } + /** + * Settles the event bus once the executor returns. + * + * Terminal states (and the bare-Message pattern in §3.1.2) close the + * bus immediately. Interrupted states (INPUT_REQUIRED / AUTH_REQUIRED) + * keep it alive so a follow-up `message/send` can resume the same + * execution via `createOrGetByTaskId`, and `tasks/resubscribe` can + * attach in the meantime (§3.4.3). + */ + private _settleBus( + taskId: string, + eventBus: ExecutionEventBus, + lastState: TaskState | undefined + ): void { + if (lastState !== undefined && INTERRUPTED_STATE_LIST.includes(lastState)) { + return; + } + eventBus.finished(); + this.eventBusManager.cleanupByTaskId(taskId); + } + /** * Streaming variant of {@link _runExecutor}. * @@ -403,6 +433,9 @@ export class DefaultRequestHandler implements A2ARequestHandler { resultManager: ResultManager ): void { const finalMessageForAgent = requestContext.userMessage; + // See `_runExecutor` for why we snoop the bus directly instead of + // re-reading state from `resultManager` in the `.finally` block. + const stateTracker = trackLatestTaskState(eventBus); this.agentExecutor .execute(requestContext, eventBus) .catch((err: unknown) => { @@ -451,8 +484,10 @@ export class DefaultRequestHandler implements A2ARequestHandler { eventBus.publish(AgentEvent.statusUpdate(errorTaskStatus)); }) .finally(() => { - eventBus.finished(); - this.eventBusManager.cleanupByTaskId(taskId); + // Closes the bus for terminal tasks; kept alive for + // INPUT_REQUIRED / AUTH_REQUIRED so follow-up sends and + // resubscribers can still attach. + this._settleBus(taskId, eventBus, stateTracker()); }); } @@ -1008,3 +1043,37 @@ export class DefaultRequestHandler implements A2ARequestHandler { } export type ExtendedAgentCardProvider = (context: ServerCallContext) => Promise; + +/** + * Subscribes a lightweight listener on `bus` that records the most + * recent task state published — either via a `Task` event or a + * `TaskStatusUpdateEvent`. Returns a thunk that the caller invokes in + * the executor's `.finally` block to detach the listener and read the + * last seen state. + * + * Used by `_runExecutor` / `_runStreamExecutor` to decide whether to + * tear down the bus after `execute()` returns. Reading state from the + * `ResultManager` directly is unsafe at that point: the consumer loop + * that drains the bus into the `ResultManager` runs in a separate + * microtask, so the `ResultManager`'s view of the task may still lag + * the publish call. + */ +function trackLatestTaskState(bus: ExecutionEventBus): () => TaskState | undefined { + let lastState: TaskState | undefined; + const listener = (event: AgentExecutionEvent) => { + if (event.kind === 'task' && event.data.status?.state !== undefined) { + lastState = event.data.status.state; + } else if (event.kind === 'statusUpdate' && event.data.status?.state !== undefined) { + lastState = event.data.status.state; + } + }; + bus.on('event', listener); + // Detach the listener when the caller reads the state in `.finally()`. + // Without this, every executor turn on a long-lived bus (kept alive + // for INPUT_REQUIRED / AUTH_REQUIRED) would accumulate another + // listener on the same bus. + return () => { + bus.off('event', listener); + return lastState; + }; +} diff --git a/src/server/utils.ts b/src/server/utils.ts index 856a2cb7..d45ddef4 100644 --- a/src/server/utils.ts +++ b/src/server/utils.ts @@ -10,6 +10,20 @@ const TERMINAL_STATE_LIST: TaskState[] = [ ]; export { TERMINAL_STATE_LIST }; +/** + * Non-terminal states that pause the executor and require a follow-up + * message before progress can resume. The agent's `execute()` is + * expected to return after publishing one of these states, but the + * underlying task remains live so a client can resubscribe — or send a + * fresh message with the same `taskId` — to continue the flow. See + * §3.4.3 (Input Required State) of the A2A specification. + */ +const INTERRUPTED_STATE_LIST: TaskState[] = [ + TaskState.TASK_STATE_INPUT_REQUIRED, + TaskState.TASK_STATE_AUTH_REQUIRED, +]; +export { INTERRUPTED_STATE_LIST }; + /** * Generates a timestamp in ISO 8601 format. * @returns The current timestamp as a string. diff --git a/test/server/express/express_app.spec.ts b/test/server/express/express_app.spec.ts index 4797e8fa..6412a0ad 100644 --- a/test/server/express/express_app.spec.ts +++ b/test/server/express/express_app.spec.ts @@ -262,14 +262,18 @@ describe('A2AExpressApp', () => { .send(requestBody) .expect(200); - // Assert SSE headers and error event content - assert.include(response.headers['content-type'], 'text/event-stream'); - assert.equal(response.headers['cache-control'], 'no-cache'); - assert.equal(response.headers['connection'], 'keep-alive'); + // When the streaming generator throws BEFORE yielding its first + // event (e.g. resubscribe on a terminal task) the handler must + // surface a plain JSON-RPC error response rather than a 200 SSE + // stream carrying a single error event: the latter looks like a + // successful subscription to most clients. + assert.include(response.headers['content-type'], 'application/json'); + assert.notInclude(response.headers['content-type'], 'text/event-stream'); - const responseText = response.text; - assert.include(responseText, 'event: error'); - assert.include(responseText, 'Immediate streaming error'); + assert.equal(response.body.jsonrpc, '2.0'); + assert.equal(response.body.id, 'immediate-stream-error-test'); + assert.exists(response.body.error); + assert.include(response.body.error.message, 'Immediate streaming error'); }); it('should handle general processing error', async () => {