Skip to content

Java SDK: Serialize coordinator client send/receive for thread safety#69080

Open
FrankYang0529 wants to merge 1 commit into
apache:mainfrom
FrankYang0529:airflow-jdk-client-thread-safety
Open

Java SDK: Serialize coordinator client send/receive for thread safety#69080
FrankYang0529 wants to merge 1 commit into
apache:mainfrom
FrankYang0529:airflow-jdk-client-thread-safety

Conversation

@FrankYang0529

Copy link
Copy Markdown
Member

Guard the whole send/receive pair in communicateImpl with a Mutex so concurrent task threads are serialized over the single channel, and verify the response id echoes the request id to reject a mis-paired response.


Was generative AI tooling used to co-author this PR?
  • Yes - Claude Code with Opus 4.8

  • Read the Pull Request Guidelines for more information. Note: commit author/co-author name and email in commits become permanently public when merged.
  • For fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
  • When adding dependency, check compliance with the ASF 3rd Party License Policy.
  • For significant user-facing changes create newsfragment: {pr_number}.significant.rst, in airflow-core/newsfragments. You can add this file in a follow-up commit after the PR is created so you know the PR number.

A Java task may call the SDK client from multiple threads. Every call goes through
one request/response channel to the supervisor. So it must be thread safe.

Signed-off-by: PoAn Yang <payang@apache.org>
var frame: IncomingFrame? = null
suspend fun communicateImpl(body: Any): Any =
commMutex.withLock {
val requestId = nextId.fetchAndAdd(1)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be outside of the lock since nextId handles concurrency itself?

Comment on lines +123 to +129
sendMessage(requestId, body)
processOnce(::handle)
val received = frame ?: throw ApiError("No response received")
if (received.id != requestId) {
throw ApiError("response id ${received.id} does not match request id $requestId")
}
received.body ?: Unit

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if there’s a way to do though outside of a lock since we have the request ID to identify which response if for which. IIRC the Python implementation does this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants