From 805df301e131a83bca4617302a47e5246a8a06e8 Mon Sep 17 00:00:00 2001 From: Andreas Bauer Date: Fri, 12 Jun 2026 09:11:12 +0200 Subject: [PATCH 1/3] Loosen the restrictions on the closure passed to `WorkflowContext/timeout(for:body:)` --- .../Workflow/InternalWorkflowContext.swift | 83 ++++++++++--------- .../Worker/Workflow/WorkflowContext.swift | 2 +- 2 files changed, 45 insertions(+), 40 deletions(-) diff --git a/Sources/Temporal/Worker/Workflow/InternalWorkflowContext.swift b/Sources/Temporal/Worker/Workflow/InternalWorkflowContext.swift index d28f6dc..c008c49 100644 --- a/Sources/Temporal/Worker/Workflow/InternalWorkflowContext.swift +++ b/Sources/Temporal/Worker/Workflow/InternalWorkflowContext.swift @@ -155,55 +155,60 @@ struct InternalWorkflowContext: Sendable { func timeout( for duration: Duration, - body: @Sendable @escaping () async throws(Failure) -> Return + body: () async throws(Failure) -> Return ) async throws(Failure) -> Return { - try await withTaskGroup(of: TimeoutResult.self) { group in - group.addTask { - do { - try await self.sleep(for: duration) - return .sleepReturned - } catch { - return .sleepThrew + try await withoutActuallyEscaping(body) { escapingBody async throws(Failure) in + try await withTaskGroup(of: TimeoutResult.self) { group in + group.addTask { + do { + try await self.sleep(for: duration) + return .sleepReturned + } catch { + return .sleepThrew + } } - } - group.addTask { - do { - return .bodyReturned(try await body()) - } catch { - // TODO: Investigate why this requires a force cast with the compiler folks - return .bodyThrew(error as! Failure) + + // `addTask(_:)` will enqueue on the serial workflow executor, therefore, this is actually not sending anything + // and we can pass this as nonisolated(unsafe) + nonisolated(unsafe) let unsafeEscapingBody = escapingBody + group.addTask { + do throws(Failure) { + return .bodyReturned(try await unsafeEscapingBody()) + } catch { + return .bodyThrew(error) + } } - } - // This force unwrap is safe since we have two guaranteed child tasks - // If the method below - let result = await group.next()! - switch result { - case .sleepReturned, .sleepThrew: - // We either timed out or our parent task got cancelled - // so now we have to cancel the body child task and wait for its result - group.cancelAll() - let nextResult = await group.next()! - switch nextResult { + // This force unwrap is safe since we have two guaranteed child tasks + // If the method below + let result = await group.next()! + switch result { case .sleepReturned, .sleepThrew: - fatalError("The sleep child task already returned") + // We either timed out or our parent task got cancelled + // so now we have to cancel the body child task and wait for its result + group.cancelAll() + let nextResult = await group.next()! + switch nextResult { + case .sleepReturned, .sleepThrew: + fatalError("The sleep child task already returned") + case .bodyReturned(let value): + return Result.success(value) + case .bodyThrew(let error): + return Result.failure(error) + } case .bodyReturned(let value): + // We can cancel the sleep now and ignore any error from it + group.cancelAll() + _ = await group.next() return Result.success(value) case .bodyThrew(let error): + // We can cancel the sleep now and ignore any error from it + group.cancelAll() + _ = await group.next() return Result.failure(error) } - case .bodyReturned(let value): - // We can cancel the sleep now and ignore any error from it - group.cancelAll() - _ = await group.next() - return Result.success(value) - case .bodyThrew(let error): - // We can cancel the sleep now and ignore any error from it - group.cancelAll() - _ = await group.next() - return Result.failure(error) - } - }.get() + }.get() + } } func withCancellationShield(_ operation: sending @escaping () async throws -> Result) async throws -> Result { diff --git a/Sources/Temporal/Worker/Workflow/WorkflowContext.swift b/Sources/Temporal/Worker/Workflow/WorkflowContext.swift index a79b679..3efaf2f 100644 --- a/Sources/Temporal/Worker/Workflow/WorkflowContext.swift +++ b/Sources/Temporal/Worker/Workflow/WorkflowContext.swift @@ -334,7 +334,7 @@ public struct WorkflowContext: @unchecked Sendable /// - Returns: The result of the closure. public func timeout( for duration: Duration, - body: @Sendable @escaping () async throws(Failure) -> Return + body: () async throws(Failure) -> Return ) async throws(Failure) -> Return { try await self.internalContext.timeout(for: duration, body: body) } From ab4b697c87768035fcfbdcbd90cdf68c81d6f232 Mon Sep 17 00:00:00 2001 From: Andreas Bauer Date: Fri, 12 Jun 2026 09:17:39 +0200 Subject: [PATCH 2/3] Loosen the restrictions on the closure passed to `WorkflowContext/condition(_:)` --- .../Workflow/InternalWorkflowContext.swift | 2 +- .../Worker/Workflow/WorkflowContext.swift | 4 ++-- .../WorkflowStateMachineStorage.swift | 24 ++++++++++--------- 3 files changed, 16 insertions(+), 14 deletions(-) diff --git a/Sources/Temporal/Worker/Workflow/InternalWorkflowContext.swift b/Sources/Temporal/Worker/Workflow/InternalWorkflowContext.swift index c008c49..c829820 100644 --- a/Sources/Temporal/Worker/Workflow/InternalWorkflowContext.swift +++ b/Sources/Temporal/Worker/Workflow/InternalWorkflowContext.swift @@ -215,7 +215,7 @@ struct InternalWorkflowContext: Sendable { try await self.stateMachine.withCancellationShield(operation) } - func condition(_ condition: @escaping () -> Bool) async throws { + func condition(_ condition: () -> Bool) async throws { try await self.stateMachine.condition(condition) } diff --git a/Sources/Temporal/Worker/Workflow/WorkflowContext.swift b/Sources/Temporal/Worker/Workflow/WorkflowContext.swift index 3efaf2f..08fcda0 100644 --- a/Sources/Temporal/Worker/Workflow/WorkflowContext.swift +++ b/Sources/Temporal/Worker/Workflow/WorkflowContext.swift @@ -369,7 +369,7 @@ public struct WorkflowContext: @unchecked Sendable /// - Parameter condition: A closure that receives the workflow state and returns `true` /// when the condition is satisfied. /// - Throws: A `CanceledError` if the waiting was cancelled. - public func condition(_ condition: @escaping (Workflow) -> Bool) async throws { + public func condition(_ condition: (Workflow) -> Bool) async throws { try await self.internalContext.condition { [stateBox] in stateBox.withValue { condition($0) } } @@ -389,7 +389,7 @@ public struct WorkflowContext: @unchecked Sendable /// /// - Parameter condition: A closure that returns `true` when the condition is satisfied. /// - Throws: A `CanceledError` if the waiting was cancelled. - public func condition(_ condition: @escaping () -> Bool) async throws { + public func condition(_ condition: () -> Bool) async throws { try await self.internalContext.condition(condition) } diff --git a/Sources/Temporal/Worker/Workflow/WorkflowStateMachineStorage.swift b/Sources/Temporal/Worker/Workflow/WorkflowStateMachineStorage.swift index 3480d3f..178805d 100644 --- a/Sources/Temporal/Worker/Workflow/WorkflowStateMachineStorage.swift +++ b/Sources/Temporal/Worker/Workflow/WorkflowStateMachineStorage.swift @@ -141,20 +141,22 @@ package final class WorkflowStateMachineStorage: @unchecked Sendable { try await Task(executorPreference: self.executor, operation: operation).value } - func condition(_ condition: @escaping () -> Bool) async throws { - let id = self.stateMachine.condition(condition) - try await withTaskCancellationHandler { - try await withCheckedThrowingContinuation { continuation in - let continuation = self.stateMachine.storeConditionContinuation(id: id, continuation: continuation) + func condition(_ condition: () -> Bool) async throws { + try await withoutActuallyEscaping(condition) { escapingCondition in + let id = self.stateMachine.condition(escapingCondition) + try await withTaskCancellationHandler { + try await withCheckedThrowingContinuation { continuation in + let continuation = self.stateMachine.storeConditionContinuation(id: id, continuation: continuation) - // We use the same error message, we cannot predict which resume comes first due to an expected race condition, - // where the onCancel removes the condition but this resume is scheduled first. + // We use the same error message, we cannot predict which resume comes first due to an expected race condition, + // where the onCancel removes the condition but this resume is scheduled first. + continuation?.resume(throwing: CanceledError(message: "Wait condition cancelled")) + } + } onCancel: { + // We should ensure this during runtime by giving the instance a separate executor which we can assert on. + let continuation = self.stateMachine.cancelConditionContinuation(id: id) continuation?.resume(throwing: CanceledError(message: "Wait condition cancelled")) } - } onCancel: { - // We should ensure this during runtime by giving the instance a separate executor which we can assert on. - let continuation = self.stateMachine.cancelConditionContinuation(id: id) - continuation?.resume(throwing: CanceledError(message: "Wait condition cancelled")) } } From b9caebf78d727694aeecd187403b6eb298ad3150 Mon Sep 17 00:00:00 2001 From: Andreas Bauer Date: Fri, 12 Jun 2026 09:17:58 +0200 Subject: [PATCH 3/3] Loosen the restrictions on the closure passed to `WorkflowContext/withCancellationShield(_:)` --- .../Temporal/Worker/Workflow/InternalWorkflowContext.swift | 2 +- Sources/Temporal/Worker/Workflow/WorkflowContext.swift | 2 +- .../Worker/Workflow/WorkflowStateMachineStorage.swift | 7 +++++-- 3 files changed, 7 insertions(+), 4 deletions(-) diff --git a/Sources/Temporal/Worker/Workflow/InternalWorkflowContext.swift b/Sources/Temporal/Worker/Workflow/InternalWorkflowContext.swift index c829820..8b4a826 100644 --- a/Sources/Temporal/Worker/Workflow/InternalWorkflowContext.swift +++ b/Sources/Temporal/Worker/Workflow/InternalWorkflowContext.swift @@ -211,7 +211,7 @@ struct InternalWorkflowContext: Sendable { } } - func withCancellationShield(_ operation: sending @escaping () async throws -> Result) async throws -> Result { + func withCancellationShield(_ operation: () async throws -> Result) async throws -> Result { try await self.stateMachine.withCancellationShield(operation) } diff --git a/Sources/Temporal/Worker/Workflow/WorkflowContext.swift b/Sources/Temporal/Worker/Workflow/WorkflowContext.swift index 08fcda0..053ea79 100644 --- a/Sources/Temporal/Worker/Workflow/WorkflowContext.swift +++ b/Sources/Temporal/Worker/Workflow/WorkflowContext.swift @@ -445,7 +445,7 @@ public struct WorkflowContext: @unchecked Sendable /// For example, you can use this to execute an activity as part of a cleanup operation when your workflow is getting canceled. /// /// - Parameter operation: The operation that should be executed. - public func withCancellationShield(_ operation: sending @escaping () async throws -> Result) async throws -> Result { + public func withCancellationShield(_ operation: () async throws -> Result) async throws -> Result { try await self.internalContext.withCancellationShield(operation) } diff --git a/Sources/Temporal/Worker/Workflow/WorkflowStateMachineStorage.swift b/Sources/Temporal/Worker/Workflow/WorkflowStateMachineStorage.swift index 178805d..c9082cf 100644 --- a/Sources/Temporal/Worker/Workflow/WorkflowStateMachineStorage.swift +++ b/Sources/Temporal/Worker/Workflow/WorkflowStateMachineStorage.swift @@ -137,8 +137,11 @@ package final class WorkflowStateMachineStorage: @unchecked Sendable { return self.stateMachine.allHandlersFinished() } - func withCancellationShield(_ operation: sending @escaping () async throws -> Result) async throws -> Result { - try await Task(executorPreference: self.executor, operation: operation).value + func withCancellationShield(_ operation: () async throws -> Result) async throws -> Result { + try await withoutActuallyEscaping(operation) { escapingOperation in + nonisolated(unsafe) let unsafeEscapingOperation = escapingOperation + return try await Task(executorPreference: self.executor, operation: unsafeEscapingOperation).value + } } func condition(_ condition: () -> Bool) async throws {