Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,12 @@ class RealConnectionPool internal constructor(

references.removeAt(i)

// A leaked connection may still hold the abandoned response's unread bytes on its socket, so it
// must not be handed to a new exchange. Retire it here rather than relying on eviction: a single
// cleanup pass prunes every connection but evicts at most one, so any leaked connection that
// isn't the one evicted would otherwise stay eligible for reuse and corrupt the next exchange.
connection.noNewExchanges = true

// If this was the last allocation, the connection is eligible for immediate eviction.
if (references.isEmpty()) {
connection.idleAtNs = now - keepAliveDurationNs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,29 @@ class ConnectionPoolTest {
assertThat(c1.noNewExchanges).isTrue()
}

@Test fun prunedLeakedConnectionIsRetiredEvenWhenNotEvicted() {
val pool = factory.newConnectionPool()
val poolApi = ConnectionPool(pool)

// Two pooled connections to the same address, each with an allocation the caller leaks.
val c1 = factory.newConnection(pool, routeA1, 0L)
val c2 = factory.newConnection(pool, routeA1, 0L)
allocateAndLeakAllocation(poolApi, c1)
allocateAndLeakAllocation(poolApi, c2)

awaitGarbageCollection()

// A single cleanup pass prunes the leaked allocation on both connections but evicts at most one.
pool.closeConnections(100L)

// Any connection still in the pool must be retired. A leaked connection may hold the abandoned
// response's unread bytes, and reusing it would corrupt an unrelated exchange.
for (connection in listOf(c1, c2)) {
if (connection.socket().isClosed) continue // Evicted, so it can't be reused.
assertThat(connection.noNewExchanges).isTrue()
}
}

@Test fun interruptStopsThread() {
val taskRunnerThreads = mutableListOf<Thread>()
val taskRunner =
Expand Down