From 56ea47d952f9ab09f43a336766f55516205a3e6b Mon Sep 17 00:00:00 2001 From: christopherkarani Date: Tue, 10 Mar 2026 09:14:05 +0300 Subject: [PATCH 1/2] Fix mission-critical concurrency and file-read safety gaps - synchronize OTLPHTTPServer mutable lifecycle state via queue-bound access for start/stop/port/deinit - serialize TerraTracedSession operations with explicit request gating and deterministic overlap errors - harden TraceFileReader against TOCTOU oversize bypass with same-handle bounded reads - add regression coverage for server lifecycle concurrency, traced-session overlap rejection, and reader max-size enforcement - document execution plan/review notes in tasks/todo.md and validate with targeted/full swift test plus swift build --- .../TerraTracedSession.swift | 143 ++++++++++++------ Sources/TerraTraceKit/OTLPHTTPServer.swift | 81 ++++++---- Sources/TerraTraceKit/TraceFileReader.swift | 26 ++-- .../TerraTracedSessionTests.swift | 26 +++- .../OTLPHTTPServerTests.swift | 26 ++++ Tests/TerraTraceKitTests/TraceKitTests.swift | 33 ++++ tasks/todo.md | 26 ++++ 7 files changed, 267 insertions(+), 94 deletions(-) diff --git a/Sources/TerraFoundationModels/TerraTracedSession.swift b/Sources/TerraFoundationModels/TerraTracedSession.swift index 2065ed4..09f16cb 100644 --- a/Sources/TerraFoundationModels/TerraTracedSession.swift +++ b/Sources/TerraFoundationModels/TerraTracedSession.swift @@ -5,8 +5,28 @@ import OpenTelemetryApi @available(macOS 26.0, iOS 26.0, *) public final class TerraTracedSession: @unchecked Sendable { + public enum SessionConcurrencyError: Error, Sendable, Equatable { + case concurrentOperationNotAllowed + } + + private actor RequestGate { + private var inFlight = false + + func enter() throws { + if inFlight { + throw SessionConcurrencyError.concurrentOperationNotAllowed + } + inFlight = true + } + + func leave() { + inFlight = false + } + } + private let session: LanguageModelSession public let modelIdentifier: String + private let requestGate = RequestGate() public init( model: SystemLanguageModel = .default, @@ -23,18 +43,21 @@ public final class TerraTracedSession: @unchecked Sendable { /// Respond to a prompt with auto-tracing. public func respond(to prompt: String, promptCapture: Terra.CaptureIntent = .default) async throws -> String { - let request = Terra.InferenceRequest( - model: modelIdentifier, - prompt: prompt, - promptCapture: promptCapture - ) - return try await Terra.withInferenceSpan(request) { scope in - scope.setAttributes([ - Terra.Keys.Terra.runtime: .string("foundation_models"), - Terra.Keys.Terra.autoInstrumented: .bool(true) - ]) - let response = try await session.respond(to: prompt) - return response.content + try await withExclusiveSessionAccess { + let request = Terra.InferenceRequest( + model: modelIdentifier, + prompt: prompt, + promptCapture: promptCapture + ) + let output = try await Terra.withInferenceSpan(request) { scope in + scope.setAttributes([ + Terra.Keys.Terra.runtime: .string("foundation_models"), + Terra.Keys.Terra.autoInstrumented: .bool(true) + ]) + let response = try await session.respond(to: prompt) + return response.content + } + return output } } @@ -44,18 +67,21 @@ public final class TerraTracedSession: @unchecked Sendable { generating type: T.Type, promptCapture: Terra.CaptureIntent = .default ) async throws -> T { - let request = Terra.InferenceRequest( - model: modelIdentifier, - prompt: prompt, - promptCapture: promptCapture - ) - return try await Terra.withInferenceSpan(request) { scope in - scope.setAttributes([ - Terra.Keys.Terra.runtime: .string("foundation_models"), - Terra.Keys.Terra.autoInstrumented: .bool(true), - "terra.foundation_models.response_type": .string(String(describing: T.self)) - ]) - return try await session.respond(to: prompt, generating: type).content + try await withExclusiveSessionAccess { + let request = Terra.InferenceRequest( + model: modelIdentifier, + prompt: prompt, + promptCapture: promptCapture + ) + let output: T = try await Terra.withInferenceSpan(request) { scope in + scope.setAttributes([ + Terra.Keys.Terra.runtime: .string("foundation_models"), + Terra.Keys.Terra.autoInstrumented: .bool(true), + "terra.foundation_models.response_type": .string(String(describing: T.self)) + ]) + return try await session.respond(to: prompt, generating: type).content + } + return output } } @@ -66,26 +92,33 @@ public final class TerraTracedSession: @unchecked Sendable { return AsyncThrowingStream { continuation in let task = Task { [weak self] in - let request = Terra.InferenceRequest( - model: modelIdentifier, - prompt: prompt, - promptCapture: promptCapture, - stream: true - ) + guard let self else { + continuation.finish(throwing: CancellationError()) + return + } + do { - try await Terra.withStreamingInferenceSpan(request) { streamScope in - streamScope.setAttributes([ - Terra.Keys.Terra.runtime: .string("foundation_models"), - Terra.Keys.Terra.autoInstrumented: .bool(true) - ]) - let stream = session.streamResponse(to: prompt) - for try await partial in stream { - try Task.checkCancellation() - streamScope.recordChunk() - if let explicitCount = self?.explicitOutputTokenCount(from: partial) { - streamScope.recordOutputTokenCount(explicitCount) + try await self.withExclusiveSessionAccess { + let request = Terra.InferenceRequest( + model: modelIdentifier, + prompt: prompt, + promptCapture: promptCapture, + stream: true + ) + try await Terra.withStreamingInferenceSpan(request) { streamScope in + streamScope.setAttributes([ + Terra.Keys.Terra.runtime: .string("foundation_models"), + Terra.Keys.Terra.autoInstrumented: .bool(true) + ]) + let stream = session.streamResponse(to: prompt) + for try await partial in stream { + try Task.checkCancellation() + streamScope.recordChunk() + if let explicitCount = self.explicitOutputTokenCount(from: partial) { + streamScope.recordOutputTokenCount(explicitCount) + } + continuation.yield(partial.content) } - continuation.yield(partial.content) } } continuation.finish() @@ -106,22 +139,34 @@ public final class TerraTracedSession: @unchecked Sendable { "tokensGenerated", ] - /// Tracks whether we've already probed for a token count field and found none. - private var tokenCountFieldChecked = false - private func explicitOutputTokenCount(from partial: Any) -> Int? { - // After first nil result, skip Mirror reflection entirely - if tokenCountFieldChecked { return nil } - for child in Mirror(reflecting: partial).children { guard let label = child.label, Self.supportedTokenCountNames.contains(label) else { continue } if let intValue = child.value as? Int, intValue >= 0 { return intValue } } - tokenCountFieldChecked = true return nil } + + private func withExclusiveSessionAccess(_ operation: () async throws -> T) async throws -> T { + try await requestGate.enter() + do { + let value = try await operation() + await requestGate.leave() + return value + } catch { + await requestGate.leave() + throw error + } + } + + func _holdExclusiveAccessForTesting(nanoseconds: UInt64) async throws { + _ = try await withExclusiveSessionAccess { + try await Task.sleep(nanoseconds: nanoseconds) + return () + } + } } #else diff --git a/Sources/TerraTraceKit/OTLPHTTPServer.swift b/Sources/TerraTraceKit/OTLPHTTPServer.swift index 0cdde2e..4b25b7e 100644 --- a/Sources/TerraTraceKit/OTLPHTTPServer.swift +++ b/Sources/TerraTraceKit/OTLPHTTPServer.swift @@ -43,13 +43,17 @@ public final class OTLPHTTPServer: @unchecked Sendable { private static let maxActiveConnections = 64 private let queue = DispatchQueue(label: "terra.trace.otlp.httpserver") + private let queueKey = DispatchSpecificKey() + private let queueIdentity: UInt8 = 1 private var listener: NWListener? private var activeConnections: [ObjectIdentifier: NWConnection] = [:] private var readTimeoutTimers: [ObjectIdentifier: DispatchSourceTimer] = [:] private var decodeTasks: [ObjectIdentifier: Task] = [:] public var port: UInt16 { - listener?.port?.rawValue ?? configuredPort + withQueueSync { + listener?.port?.rawValue ?? configuredPort + } } public init( @@ -66,55 +70,52 @@ public final class OTLPHTTPServer: @unchecked Sendable { self.traceStore = traceStore self.limits = limits self.onSpans = onSpans + queue.setSpecific(key: queueKey, value: queueIdentity) } public func start() throws { - guard listener == nil else { return } - - let parameters = NWParameters.tcp - let listener: NWListener - if configuredPort == 0 { - listener = try NWListener(using: parameters) - } else if let port = NWEndpoint.Port(rawValue: configuredPort) { - if shouldBindToHost(host) { - parameters.requiredLocalEndpoint = .hostPort(host: NWEndpoint.Host(host), port: port) + try withQueueSync { + guard listener == nil else { return } + + let parameters = NWParameters.tcp + let listener: NWListener + if configuredPort == 0 { listener = try NWListener(using: parameters) + } else if let port = NWEndpoint.Port(rawValue: configuredPort) { + if shouldBindToHost(host) { + parameters.requiredLocalEndpoint = .hostPort(host: NWEndpoint.Host(host), port: port) + listener = try NWListener(using: parameters) + } else { + listener = try NWListener(using: parameters, on: port) + } } else { - listener = try NWListener(using: parameters, on: port) + throw NSError(domain: "OTLPHTTPServer", code: 1, userInfo: [NSLocalizedDescriptionKey: "Invalid port"]) } - } else { - throw NSError(domain: "OTLPHTTPServer", code: 1, userInfo: [NSLocalizedDescriptionKey: "Invalid port"]) - } - listener.stateUpdateHandler = { [weak self] (state: NWListener.State) in - if case .failed = state { - self?.stop() + listener.stateUpdateHandler = { [weak self] (state: NWListener.State) in + if case .failed = state { + self?.stop() + } } - } - listener.newConnectionHandler = { [weak self] connection in - self?.handle(connection) - } + listener.newConnectionHandler = { [weak self] connection in + self?.handle(connection) + } - self.listener = listener - listener.start(queue: queue) + self.listener = listener + listener.start(queue: queue) + } } public func stop() { queue.async { - self.listener?.cancel() - self.listener = nil - for id in Array(self.activeConnections.keys) { - self.cleanupConnection(id: id) - } + self.stopLocked() } } deinit { - listener?.cancel() - listener = nil - for id in Array(activeConnections.keys) { - cleanupConnection(id: id) + withQueueSync { + self.stopLocked() } } @@ -410,6 +411,22 @@ public final class OTLPHTTPServer: @unchecked Sendable { return !lowered.isEmpty && lowered != "0.0.0.0" && lowered != "::" } + private func stopLocked() { + listener?.cancel() + listener = nil + for id in Array(activeConnections.keys) { + cleanupConnection(id: id) + } + } + + @discardableResult + private func withQueueSync(_ body: () throws -> T) rethrows -> T { + if DispatchQueue.getSpecific(key: queueKey) == queueIdentity { + return try body() + } + return try queue.sync(execute: body) + } + private func parseRequestHead(_ data: Data) -> Result { guard let headerString = String(data: data, encoding: .utf8) else { return .failure(.badRequest("Invalid header encoding")) diff --git a/Sources/TerraTraceKit/TraceFileReader.swift b/Sources/TerraTraceKit/TraceFileReader.swift index 079ac02..fc1c323 100644 --- a/Sources/TerraTraceKit/TraceFileReader.swift +++ b/Sources/TerraTraceKit/TraceFileReader.swift @@ -25,23 +25,25 @@ public struct TraceFileReader { } do { - let attributes = try fileManager.attributesOfItem(atPath: url.path) - if let sizeValue = attributes[.size] as? NSNumber { - let size = sizeValue.intValue - if size > maxFileSizeBytes { - throw TraceFileError.fileTooLarge(url, actualBytes: size, maxBytes: maxFileSizeBytes) - } + let handle = try FileHandle(forReadingFrom: url) + defer { try? handle.close() } + + let initialSize = try handle.seekToEnd() + if initialSize > UInt64(maxFileSizeBytes) { + let actualBytes = Int(initialSize > UInt64(Int.max) ? UInt64(Int.max) : initialSize) + throw TraceFileError.fileTooLarge(url, actualBytes: actualBytes, maxBytes: maxFileSizeBytes) + } + try handle.seek(toOffset: 0) + + let data = try handle.read(upToCount: maxFileSizeBytes + 1) ?? Data() + if data.count > maxFileSizeBytes { + throw TraceFileError.fileTooLarge(url, actualBytes: data.count, maxBytes: maxFileSizeBytes) } + return data } catch let fileError as TraceFileError { throw fileError } catch { throw TraceFileError.readFailed(url) } - - do { - return try Data(contentsOf: url) - } catch { - throw TraceFileError.readFailed(url) - } } } diff --git a/Tests/TerraFoundationModelsTests/TerraTracedSessionTests.swift b/Tests/TerraFoundationModelsTests/TerraTracedSessionTests.swift index 57a96dc..5f88825 100644 --- a/Tests/TerraFoundationModelsTests/TerraTracedSessionTests.swift +++ b/Tests/TerraFoundationModelsTests/TerraTracedSessionTests.swift @@ -1,7 +1,7 @@ import Testing #if canImport(FoundationModels) -import TerraFoundationModels +@testable import TerraFoundationModels import TerraCore @available(macOS 26.0, iOS 26.0, *) @@ -18,6 +18,30 @@ func tracedSessionInitializesWithCustomIdentifier() { #expect(session.modelIdentifier == "apple/custom-model") } +@available(macOS 26.0, iOS 26.0, *) +@Test("TerraTracedSession rejects concurrent in-flight operations") +func tracedSessionRejectsConcurrentOperations() async throws { + let session = TerraTracedSession() + + let holdingTask = Task { + try await session._holdExclusiveAccessForTesting(nanoseconds: 300_000_000) + } + defer { holdingTask.cancel() } + + try await Task.sleep(nanoseconds: 50_000_000) + + do { + try await session._holdExclusiveAccessForTesting(nanoseconds: 10_000_000) + Issue.record("Expected concurrentOperationNotAllowed error") + } catch let error as TerraTracedSession.SessionConcurrencyError { + #expect(error == .concurrentOperationNotAllowed) + } catch { + Issue.record("Unexpected error type: \(error)") + } + + try await holdingTask.value +} + #else // FoundationModels is not available on this platform or SDK. diff --git a/Tests/TerraTraceKitTests/OTLPHTTPServerTests.swift b/Tests/TerraTraceKitTests/OTLPHTTPServerTests.swift index 0431913..f6ab041 100644 --- a/Tests/TerraTraceKitTests/OTLPHTTPServerTests.swift +++ b/Tests/TerraTraceKitTests/OTLPHTTPServerTests.swift @@ -147,6 +147,32 @@ final class OTLPHTTPServerTests: XCTestCase { let snapshot = await store.snapshot(filter: nil) XCTAssertTrue(snapshot.allSpans.isEmpty) } + + func testOTLPHTTPServerConcurrentPortReadDuringLifecycleIsStable() async { + let store = TraceStore(maxSpans: 5) + let server = OTLPHTTPServer(host: "127.0.0.1", port: 0, traceStore: store) + defer { server.stop() } + + await withTaskGroup(of: Void.self) { group in + group.addTask { + for _ in 0..<200 { + _ = server.port + try? await Task.sleep(nanoseconds: 1_000_000) + } + } + + group.addTask { + for _ in 0..<50 { + try? server.start() + _ = server.port + server.stop() + try? await Task.sleep(nanoseconds: 2_000_000) + } + } + } + + XCTAssertTrue(true) + } } private func waitForBoundPort( diff --git a/Tests/TerraTraceKitTests/TraceKitTests.swift b/Tests/TerraTraceKitTests/TraceKitTests.swift index a8bb20e..8e4e12b 100644 --- a/Tests/TerraTraceKitTests/TraceKitTests.swift +++ b/Tests/TerraTraceKitTests/TraceKitTests.swift @@ -279,3 +279,36 @@ func loaderReportsOversizedFileFailures() throws { } } } + +@Test("TraceFileReader enforces max size using bounded handle reads") +func traceFileReaderRejectsOversizedFile() throws { + let dir = FileManager.default.temporaryDirectory + .appendingPathComponent(UUID().uuidString, isDirectory: true) + try FileManager.default.createDirectory(at: dir, withIntermediateDirectories: true) + defer { try? FileManager.default.removeItem(at: dir) } + + let fileURL = dir.appendingPathComponent("12345") + let oversized = Data(repeating: 0x42, count: 257) + try oversized.write(to: fileURL) + + let file = TraceFileReference( + url: fileURL, + fileName: "12345", + timestamp: Date(timeIntervalSince1970: 12345) + ) + let reader = TraceFileReader(maxFileSizeBytes: 256) + + do { + _ = try reader.read(file: file) + Issue.record("Expected fileTooLarge error") + } catch let error as TraceFileError { + if case let .fileTooLarge(_, actualBytes, maxBytes) = error { + #expect(actualBytes >= 257) + #expect(maxBytes == 256) + } else { + Issue.record("Expected fileTooLarge, got \(error)") + } + } catch { + Issue.record("Unexpected error type: \(error)") + } +} diff --git a/tasks/todo.md b/tasks/todo.md index 112773d..662a815 100644 --- a/tasks/todo.md +++ b/tasks/todo.md @@ -69,3 +69,29 @@ - Added trace file max-size guard in `TraceFileReader` with oversize failure test coverage. - Strengthened privacy defaults by making legacy SHA attributes opt-in (`emitLegacySHA256Attributes: false`), with updated redaction tests and README notes. - Validation: `swift test --filter TerraRedactionPolicyTests` and full `swift test` both pass. + +## Mission-Critical Audit Remediation (2026-03-10) + +- [x] Validate baseline (`swift test`) and capture candidate P1 issues. +- [x] Fix OTLPHTTPServer shared-state synchronization hazards. +- [x] Fix TerraTracedSession shared mutable state race in concurrent streams. +- [x] Harden TraceFileReader against TOCTOU oversize-read bypass. +- [x] Add/expand regression tests for each fix. +- [x] Run `swift test` and `swift build` to verify no regressions. +- [ ] Commit, push branch, and open PR with detailed notes. + +## Mission-Critical Review Notes (2026-03-10) + +- `OTLPHTTPServer` now synchronizes all listener/connection state reads and writes through its dedicated queue, including `port`, `start`, `stop`, and `deinit`. +- `TerraTracedSession` now enforces single in-flight session operations with a request gate and returns a deterministic concurrency error for overlap attempts. +- `TraceFileReader` now uses same-handle size check + bounded read (`max + 1`) to block TOCTOU oversized reads. +- Added regression tests: + - `testOTLPHTTPServerConcurrentPortReadDuringLifecycleIsStable` + - `traceFileReaderRejectsOversizedFile` + - `tracedSessionRejectsConcurrentOperations` +- Validation: + - `swift test --filter TerraTracedSessionTests` + - `swift test --filter OTLPHTTPServerTests` + - `swift test --filter traceFileReaderRejectsOversizedFile` + - full `swift test` + - `swift build` From 7468e29432270aee7c3d77c424c534a7d1d87835 Mon Sep 17 00:00:00 2001 From: christopherkarani Date: Tue, 10 Mar 2026 09:15:47 +0300 Subject: [PATCH 2/2] Document delivery status for mission-critical audit PR --- tasks/todo.md | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/tasks/todo.md b/tasks/todo.md index 662a815..9c32ef2 100644 --- a/tasks/todo.md +++ b/tasks/todo.md @@ -78,7 +78,7 @@ - [x] Harden TraceFileReader against TOCTOU oversize-read bypass. - [x] Add/expand regression tests for each fix. - [x] Run `swift test` and `swift build` to verify no regressions. -- [ ] Commit, push branch, and open PR with detailed notes. +- [x] Commit, push branch, and open PR with detailed notes. ## Mission-Critical Review Notes (2026-03-10) @@ -95,3 +95,8 @@ - `swift test --filter traceFileReaderRejectsOversizedFile` - full `swift test` - `swift build` +- Delivery: + - Branch: `codex/mission-critical-audit-20260310` + - Commit: `56ea47d` + - PR: `https://github.com/christopherkarani/Terra/pull/16` + - Follow-up needed: GitHub API connectivity failed while applying post-create PR edits/labels.