diff --git a/Sources/Temporal/Worker/Workflow/InternalWorkflowContext.swift b/Sources/Temporal/Worker/Workflow/InternalWorkflowContext.swift index d28f6dc..8b4a826 100644 --- a/Sources/Temporal/Worker/Workflow/InternalWorkflowContext.swift +++ b/Sources/Temporal/Worker/Workflow/InternalWorkflowContext.swift @@ -155,62 +155,67 @@ struct InternalWorkflowContext: Sendable { func timeout( for duration: Duration, - body: @Sendable @escaping () async throws(Failure) -> Return + body: () async throws(Failure) -> Return ) async throws(Failure) -> Return { - try await withTaskGroup(of: TimeoutResult.self) { group in - group.addTask { - do { - try await self.sleep(for: duration) - return .sleepReturned - } catch { - return .sleepThrew + try await withoutActuallyEscaping(body) { escapingBody async throws(Failure) in + try await withTaskGroup(of: TimeoutResult.self) { group in + group.addTask { + do { + try await self.sleep(for: duration) + return .sleepReturned + } catch { + return .sleepThrew + } } - } - group.addTask { - do { - return .bodyReturned(try await body()) - } catch { - // TODO: Investigate why this requires a force cast with the compiler folks - return .bodyThrew(error as! Failure) + + // `addTask(_:)` will enqueue on the serial workflow executor, therefore, this is actually not sending anything + // and we can pass this as nonisolated(unsafe) + nonisolated(unsafe) let unsafeEscapingBody = escapingBody + group.addTask { + do throws(Failure) { + return .bodyReturned(try await unsafeEscapingBody()) + } catch { + return .bodyThrew(error) + } } - } - // This force unwrap is safe since we have two guaranteed child tasks - // If the method below - let result = await group.next()! - switch result { - case .sleepReturned, .sleepThrew: - // We either timed out or our parent task got cancelled - // so now we have to cancel the body child task and wait for its result - group.cancelAll() - let nextResult = await group.next()! - switch nextResult { + // This force unwrap is safe since we have two guaranteed child tasks + // If the method below + let result = await group.next()! + switch result { case .sleepReturned, .sleepThrew: - fatalError("The sleep child task already returned") + // We either timed out or our parent task got cancelled + // so now we have to cancel the body child task and wait for its result + group.cancelAll() + let nextResult = await group.next()! + switch nextResult { + case .sleepReturned, .sleepThrew: + fatalError("The sleep child task already returned") + case .bodyReturned(let value): + return Result.success(value) + case .bodyThrew(let error): + return Result.failure(error) + } case .bodyReturned(let value): + // We can cancel the sleep now and ignore any error from it + group.cancelAll() + _ = await group.next() return Result.success(value) case .bodyThrew(let error): + // We can cancel the sleep now and ignore any error from it + group.cancelAll() + _ = await group.next() return Result.failure(error) } - case .bodyReturned(let value): - // We can cancel the sleep now and ignore any error from it - group.cancelAll() - _ = await group.next() - return Result.success(value) - case .bodyThrew(let error): - // We can cancel the sleep now and ignore any error from it - group.cancelAll() - _ = await group.next() - return Result.failure(error) - } - }.get() + }.get() + } } - func withCancellationShield(_ operation: sending @escaping () async throws -> Result) async throws -> Result { + func withCancellationShield(_ operation: () async throws -> Result) async throws -> Result { try await self.stateMachine.withCancellationShield(operation) } - func condition(_ condition: @escaping () -> Bool) async throws { + func condition(_ condition: () -> Bool) async throws { try await self.stateMachine.condition(condition) } diff --git a/Sources/Temporal/Worker/Workflow/WorkflowContext.swift b/Sources/Temporal/Worker/Workflow/WorkflowContext.swift index a79b679..053ea79 100644 --- a/Sources/Temporal/Worker/Workflow/WorkflowContext.swift +++ b/Sources/Temporal/Worker/Workflow/WorkflowContext.swift @@ -334,7 +334,7 @@ public struct WorkflowContext: @unchecked Sendable /// - Returns: The result of the closure. public func timeout( for duration: Duration, - body: @Sendable @escaping () async throws(Failure) -> Return + body: () async throws(Failure) -> Return ) async throws(Failure) -> Return { try await self.internalContext.timeout(for: duration, body: body) } @@ -369,7 +369,7 @@ public struct WorkflowContext: @unchecked Sendable /// - Parameter condition: A closure that receives the workflow state and returns `true` /// when the condition is satisfied. /// - Throws: A `CanceledError` if the waiting was cancelled. - public func condition(_ condition: @escaping (Workflow) -> Bool) async throws { + public func condition(_ condition: (Workflow) -> Bool) async throws { try await self.internalContext.condition { [stateBox] in stateBox.withValue { condition($0) } } @@ -389,7 +389,7 @@ public struct WorkflowContext: @unchecked Sendable /// /// - Parameter condition: A closure that returns `true` when the condition is satisfied. /// - Throws: A `CanceledError` if the waiting was cancelled. - public func condition(_ condition: @escaping () -> Bool) async throws { + public func condition(_ condition: () -> Bool) async throws { try await self.internalContext.condition(condition) } @@ -445,7 +445,7 @@ public struct WorkflowContext: @unchecked Sendable /// For example, you can use this to execute an activity as part of a cleanup operation when your workflow is getting canceled. /// /// - Parameter operation: The operation that should be executed. - public func withCancellationShield(_ operation: sending @escaping () async throws -> Result) async throws -> Result { + public func withCancellationShield(_ operation: () async throws -> Result) async throws -> Result { try await self.internalContext.withCancellationShield(operation) } diff --git a/Sources/Temporal/Worker/Workflow/WorkflowStateMachineStorage.swift b/Sources/Temporal/Worker/Workflow/WorkflowStateMachineStorage.swift index 3480d3f..c9082cf 100644 --- a/Sources/Temporal/Worker/Workflow/WorkflowStateMachineStorage.swift +++ b/Sources/Temporal/Worker/Workflow/WorkflowStateMachineStorage.swift @@ -137,24 +137,29 @@ package final class WorkflowStateMachineStorage: @unchecked Sendable { return self.stateMachine.allHandlersFinished() } - func withCancellationShield(_ operation: sending @escaping () async throws -> Result) async throws -> Result { - try await Task(executorPreference: self.executor, operation: operation).value + func withCancellationShield(_ operation: () async throws -> Result) async throws -> Result { + try await withoutActuallyEscaping(operation) { escapingOperation in + nonisolated(unsafe) let unsafeEscapingOperation = escapingOperation + return try await Task(executorPreference: self.executor, operation: unsafeEscapingOperation).value + } } - func condition(_ condition: @escaping () -> Bool) async throws { - let id = self.stateMachine.condition(condition) - try await withTaskCancellationHandler { - try await withCheckedThrowingContinuation { continuation in - let continuation = self.stateMachine.storeConditionContinuation(id: id, continuation: continuation) + func condition(_ condition: () -> Bool) async throws { + try await withoutActuallyEscaping(condition) { escapingCondition in + let id = self.stateMachine.condition(escapingCondition) + try await withTaskCancellationHandler { + try await withCheckedThrowingContinuation { continuation in + let continuation = self.stateMachine.storeConditionContinuation(id: id, continuation: continuation) - // We use the same error message, we cannot predict which resume comes first due to an expected race condition, - // where the onCancel removes the condition but this resume is scheduled first. + // We use the same error message, we cannot predict which resume comes first due to an expected race condition, + // where the onCancel removes the condition but this resume is scheduled first. + continuation?.resume(throwing: CanceledError(message: "Wait condition cancelled")) + } + } onCancel: { + // We should ensure this during runtime by giving the instance a separate executor which we can assert on. + let continuation = self.stateMachine.cancelConditionContinuation(id: id) continuation?.resume(throwing: CanceledError(message: "Wait condition cancelled")) } - } onCancel: { - // We should ensure this during runtime by giving the instance a separate executor which we can assert on. - let continuation = self.stateMachine.cancelConditionContinuation(id: id) - continuation?.resume(throwing: CanceledError(message: "Wait condition cancelled")) } }