Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 46 additions & 41 deletions Sources/Temporal/Worker/Workflow/InternalWorkflowContext.swift
Original file line number Diff line number Diff line change
Expand Up @@ -155,62 +155,67 @@ struct InternalWorkflowContext: Sendable {

func timeout<Return: Sendable, Failure: Error>(
for duration: Duration,
body: @Sendable @escaping () async throws(Failure) -> Return
body: () async throws(Failure) -> Return
) async throws(Failure) -> Return {
try await withTaskGroup(of: TimeoutResult<Return, Failure>.self) { group in
group.addTask {
do {
try await self.sleep(for: duration)
return .sleepReturned
} catch {
return .sleepThrew
try await withoutActuallyEscaping(body) { escapingBody async throws(Failure) in
try await withTaskGroup(of: TimeoutResult<Return, Failure>.self) { group in
group.addTask {
do {
try await self.sleep(for: duration)
return .sleepReturned
} catch {
return .sleepThrew
}
}
}
group.addTask {
do {
return .bodyReturned(try await body())
} catch {
// TODO: Investigate why this requires a force cast with the compiler folks
return .bodyThrew(error as! Failure)

// `addTask(_:)` will enqueue on the serial workflow executor, therefore, this is actually not sending anything
// and we can pass this as nonisolated(unsafe)
nonisolated(unsafe) let unsafeEscapingBody = escapingBody
group.addTask {
do throws(Failure) {
return .bodyReturned(try await unsafeEscapingBody())
} catch {
return .bodyThrew(error)
}
}
}

// This force unwrap is safe since we have two guaranteed child tasks
// If the method below
let result = await group.next()!
switch result {
case .sleepReturned, .sleepThrew:
// We either timed out or our parent task got cancelled
// so now we have to cancel the body child task and wait for its result
group.cancelAll()
let nextResult = await group.next()!
switch nextResult {
// This force unwrap is safe since we have two guaranteed child tasks
// If the method below
let result = await group.next()!
switch result {
case .sleepReturned, .sleepThrew:
fatalError("The sleep child task already returned")
// We either timed out or our parent task got cancelled
// so now we have to cancel the body child task and wait for its result
group.cancelAll()
let nextResult = await group.next()!
switch nextResult {
case .sleepReturned, .sleepThrew:
fatalError("The sleep child task already returned")
case .bodyReturned(let value):
return Result<Return, Failure>.success(value)
case .bodyThrew(let error):
return Result<Return, Failure>.failure(error)
}
case .bodyReturned(let value):
// We can cancel the sleep now and ignore any error from it
group.cancelAll()
_ = await group.next()
return Result<Return, Failure>.success(value)
case .bodyThrew(let error):
// We can cancel the sleep now and ignore any error from it
group.cancelAll()
_ = await group.next()
return Result<Return, Failure>.failure(error)
}
case .bodyReturned(let value):
// We can cancel the sleep now and ignore any error from it
group.cancelAll()
_ = await group.next()
return Result<Return, Failure>.success(value)
case .bodyThrew(let error):
// We can cancel the sleep now and ignore any error from it
group.cancelAll()
_ = await group.next()
return Result<Return, Failure>.failure(error)
}
}.get()
}.get()
}
}

func withCancellationShield<Result: Sendable>(_ operation: sending @escaping () async throws -> Result) async throws -> Result {
func withCancellationShield<Result: Sendable>(_ operation: () async throws -> Result) async throws -> Result {
try await self.stateMachine.withCancellationShield(operation)
}

func condition(_ condition: @escaping () -> Bool) async throws {
func condition(_ condition: () -> Bool) async throws {
try await self.stateMachine.condition(condition)
}

Expand Down
8 changes: 4 additions & 4 deletions Sources/Temporal/Worker/Workflow/WorkflowContext.swift
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ public struct WorkflowContext<Workflow: WorkflowDefinition>: @unchecked Sendable
/// - Returns: The result of the closure.
public func timeout<Return: Sendable, Failure: Error>(
for duration: Duration,
body: @Sendable @escaping () async throws(Failure) -> Return
body: () async throws(Failure) -> Return
) async throws(Failure) -> Return {
try await self.internalContext.timeout(for: duration, body: body)
}
Expand Down Expand Up @@ -369,7 +369,7 @@ public struct WorkflowContext<Workflow: WorkflowDefinition>: @unchecked Sendable
/// - Parameter condition: A closure that receives the workflow state and returns `true`
/// when the condition is satisfied.
/// - Throws: A `CanceledError` if the waiting was cancelled.
public func condition(_ condition: @escaping (Workflow) -> Bool) async throws {
public func condition(_ condition: (Workflow) -> Bool) async throws {
try await self.internalContext.condition { [stateBox] in
stateBox.withValue { condition($0) }
}
Expand All @@ -389,7 +389,7 @@ public struct WorkflowContext<Workflow: WorkflowDefinition>: @unchecked Sendable
///
/// - Parameter condition: A closure that returns `true` when the condition is satisfied.
/// - Throws: A `CanceledError` if the waiting was cancelled.
public func condition(_ condition: @escaping () -> Bool) async throws {
public func condition(_ condition: () -> Bool) async throws {
try await self.internalContext.condition(condition)
}

Expand Down Expand Up @@ -445,7 +445,7 @@ public struct WorkflowContext<Workflow: WorkflowDefinition>: @unchecked Sendable
/// For example, you can use this to execute an activity as part of a cleanup operation when your workflow is getting canceled.
///
/// - Parameter operation: The operation that should be executed.
public func withCancellationShield<Result: Sendable>(_ operation: sending @escaping () async throws -> Result) async throws -> Result {
public func withCancellationShield<Result: Sendable>(_ operation: () async throws -> Result) async throws -> Result {
try await self.internalContext.withCancellationShield(operation)
}

Expand Down
31 changes: 18 additions & 13 deletions Sources/Temporal/Worker/Workflow/WorkflowStateMachineStorage.swift
Original file line number Diff line number Diff line change
Expand Up @@ -137,24 +137,29 @@ package final class WorkflowStateMachineStorage: @unchecked Sendable {
return self.stateMachine.allHandlersFinished()
}

func withCancellationShield<Result: Sendable>(_ operation: sending @escaping () async throws -> Result) async throws -> Result {
try await Task(executorPreference: self.executor, operation: operation).value
func withCancellationShield<Result: Sendable>(_ operation: () async throws -> Result) async throws -> Result {
try await withoutActuallyEscaping(operation) { escapingOperation in
nonisolated(unsafe) let unsafeEscapingOperation = escapingOperation
return try await Task(executorPreference: self.executor, operation: unsafeEscapingOperation).value
}
}

func condition(_ condition: @escaping () -> Bool) async throws {
let id = self.stateMachine.condition(condition)
try await withTaskCancellationHandler {
try await withCheckedThrowingContinuation { continuation in
let continuation = self.stateMachine.storeConditionContinuation(id: id, continuation: continuation)
func condition(_ condition: () -> Bool) async throws {
try await withoutActuallyEscaping(condition) { escapingCondition in
let id = self.stateMachine.condition(escapingCondition)
try await withTaskCancellationHandler {
try await withCheckedThrowingContinuation { continuation in
let continuation = self.stateMachine.storeConditionContinuation(id: id, continuation: continuation)

// We use the same error message, we cannot predict which resume comes first due to an expected race condition,
// where the onCancel removes the condition but this resume is scheduled first.
// We use the same error message, we cannot predict which resume comes first due to an expected race condition,
// where the onCancel removes the condition but this resume is scheduled first.
continuation?.resume(throwing: CanceledError(message: "Wait condition cancelled"))
}
} onCancel: {
// We should ensure this during runtime by giving the instance a separate executor which we can assert on.
let continuation = self.stateMachine.cancelConditionContinuation(id: id)
continuation?.resume(throwing: CanceledError(message: "Wait condition cancelled"))
}
} onCancel: {
// We should ensure this during runtime by giving the instance a separate executor which we can assert on.
let continuation = self.stateMachine.cancelConditionContinuation(id: id)
continuation?.resume(throwing: CanceledError(message: "Wait condition cancelled"))
}
}

Expand Down
Loading