From b2d30f30ca6cf8af5aacdf7fe43a575e25a249fb Mon Sep 17 00:00:00 2001 From: Roo Sczesnak Date: Thu, 11 Jun 2026 15:19:18 -0700 Subject: [PATCH] fix: preserve operation order in TransactionWriteSet TransactionWriteSet stored saves/puts/deletes/checks in separate sets and toTransactionWriteRequest emitted them grouped by kind (all saves, then puts, then deletes, then checks). DynamoDB returns a List that is positionally aligned with the submitted TransactWriteItems, so grouping by kind meant callers could not reliably map a cancellation reason back to the operation that triggered it once operations were interleaved across kinds. Track the caller's insertion order in TransactionWriteSet.Builder via a new WriteOperation sealed type, expose it as TransactionWriteSet.operations, and replay that order in both toTransactionWriteRequest and describeOperations. The field is populated in build() and kept out of the data class constructor to preserve the public equals/copy/destructuring signature; direct construction falls back to the historical save -> put -> delete -> check order. Co-Authored-By: Claude Opus 4.8 --- .../main/kotlin/app/cash/tempest2/Model.kt | 72 +++++++++++++++---- .../tempest2/internal/DynamoDbLogicalDb.kt | 66 ++++++++--------- .../cash/tempest2/LogicalDbTransactionTest.kt | 48 +++++++++++++ 3 files changed, 139 insertions(+), 47 deletions(-) diff --git a/tempest2/src/main/kotlin/app/cash/tempest2/Model.kt b/tempest2/src/main/kotlin/app/cash/tempest2/Model.kt index f31183f18..7437fc19c 100644 --- a/tempest2/src/main/kotlin/app/cash/tempest2/Model.kt +++ b/tempest2/src/main/kotlin/app/cash/tempest2/Model.kt @@ -92,6 +92,24 @@ data class TransactionWriteSet( val idempotencyToken: String? ) { + /** + * The save/put/delete/check operations in the order the caller added them via the [Builder]. + * + * DynamoDB aligns the `List` on a `TransactionCanceledException` positionally + * with the items in the submitted request, so preserving the caller's insertion order across + * operation kinds lets callers map a cancellation reason back to the operation that triggered it. + * + * When a [TransactionWriteSet] is constructed directly rather than through the [Builder], this + * defaults to save → put → delete → check order, matching the historical request ordering. + */ + var operations: List = buildList { + itemsToSave.forEach { add(WriteOperation.Save(it)) } + itemsToPut.forEach { add(WriteOperation.Put(it)) } + keysToDelete.forEach { add(WriteOperation.Delete(it)) } + keysToCheck.forEach { add(WriteOperation.Check(it)) } + } + internal set + val sizeDynamoDbTable get() = itemsToSave.size + itemsToPut.size + keysToDelete.size + keysToCheck.size @@ -101,6 +119,7 @@ data class TransactionWriteSet( private val keysToDelete = mutableSetOf() private val keysToCheck = mutableSetOf() private val writeExpressions = mutableMapOf() + private val operations = mutableListOf() private var idempotencyToken: String? = null val size @@ -123,6 +142,7 @@ data class TransactionWriteSet( require(added) { "Duplicate items are not allowed" } + operations.add(WriteOperation.Save(item)) if (expression != null) { writeExpressions[item] = expression } @@ -143,6 +163,7 @@ data class TransactionWriteSet( require(added) { "Duplicate items are not allowed" } + operations.add(WriteOperation.Put(item)) if (expression != null) { writeExpressions[item] = expression } @@ -158,6 +179,7 @@ data class TransactionWriteSet( "Duplicate items are not allowed: $key." } keysToDelete.add(key) + operations.add(WriteOperation.Delete(key)) if (expression != null) { writeExpressions[key] = expression } @@ -172,6 +194,7 @@ data class TransactionWriteSet( "Duplicate items are not allowed: $key." } keysToCheck.add(key) + operations.add(WriteOperation.Check(key)) if (expression != null) { writeExpressions[key] = expression } @@ -184,17 +207,15 @@ data class TransactionWriteSet( fun addAll(builder: Builder) { check(builder.idempotencyToken == null) { "too many idempotency tokens" } - for (item in builder.itemsToSave) { - save(item) - } - for (item in builder.itemsToPut) { - put(item) - } - for (item in builder.keysToDelete) { - delete(item) - } - for (item in builder.keysToCheck) { - checkCondition(item) + // Replay in the source builder's insertion order so the merged set preserves ordering across + // operation kinds. Expressions are merged below rather than per-operation. + for (operation in builder.operations) { + when (operation) { + is WriteOperation.Save -> save(operation.item) + is WriteOperation.Put -> put(operation.item) + is WriteOperation.Delete -> delete(operation.key) + is WriteOperation.Check -> checkCondition(operation.key) + } } writeExpressions += builder.writeExpressions @@ -208,11 +229,38 @@ data class TransactionWriteSet( KeySet(keysToCheck), writeExpressions.toMap(), idempotencyToken - ) + ).also { + it.operations = operations.toList() + } } } } +/** + * A single operation in a [TransactionWriteSet], retaining the order it was added to the + * [TransactionWriteSet.Builder]. + */ +sealed class WriteOperation { + /** The item or key this operation acts on; the lookup key into [TransactionWriteSet.writeExpressions]. */ + abstract val subject: Any + + data class Save(val item: Any) : WriteOperation() { + override val subject get() = item + } + + data class Put(val item: Any) : WriteOperation() { + override val subject get() = item + } + + data class Delete(val key: Any) : WriteOperation() { + override val subject get() = key + } + + data class Check(val key: Any) : WriteOperation() { + override val subject get() = key + } +} + /** * A collection of keys or items across tables. */ diff --git a/tempest2/src/main/kotlin/app/cash/tempest2/internal/DynamoDbLogicalDb.kt b/tempest2/src/main/kotlin/app/cash/tempest2/internal/DynamoDbLogicalDb.kt index a18212476..a95660d99 100644 --- a/tempest2/src/main/kotlin/app/cash/tempest2/internal/DynamoDbLogicalDb.kt +++ b/tempest2/src/main/kotlin/app/cash/tempest2/internal/DynamoDbLogicalDb.kt @@ -27,6 +27,7 @@ import app.cash.tempest2.KeySet import app.cash.tempest2.LogicalDb import app.cash.tempest2.LogicalTable import app.cash.tempest2.TransactionWriteSet +import app.cash.tempest2.WriteOperation import app.cash.tempest2.internal.DynamoDbLogicalDb.WriteRequest.Op.CLOBBER import app.cash.tempest2.internal.DynamoDbLogicalDb.WriteRequest.Op.DELETE import kotlinx.coroutines.flow.map @@ -345,26 +346,26 @@ internal class DynamoDbLogicalDb( private fun toTransactionWriteRequest(writeSet: TransactionWriteSet): TransactWriteItemsEnhancedRequest? { return TransactWriteItemsEnhancedRequest.builder() .apply { - for (itemToSave in writeSet.itemsToSave) { - addUpdateItem(itemToSave.encodeAsItem(), writeSet.writeExpressions[itemToSave]) - } - for (itemToPut in writeSet.itemsToPut) { - val userExpression = writeSet.writeExpressions[itemToPut] - val encodedItem = itemToPut.encodeAsItem() - if (userExpression != null) { - // Manual versioning: merge version check with user expression - addPutItemWithManualVersioning(encodedItem, userExpression) - } else { - // Let SDK handle versioning automatically - addPutItem(encodedItem, null) + // Replay operations in the caller's insertion order so the request items line up + // positionally with DynamoDB's returned List. + for (operation in writeSet.operations) { + val userExpression = writeSet.writeExpressions[operation.subject] + when (operation) { + is WriteOperation.Save -> addUpdateItem(operation.item.encodeAsItem(), userExpression) + is WriteOperation.Put -> { + val encodedItem = operation.item.encodeAsItem() + if (userExpression != null) { + // Manual versioning: merge version check with user expression + addPutItemWithManualVersioning(encodedItem, userExpression) + } else { + // Let SDK handle versioning automatically + addPutItem(encodedItem, null) + } + } + is WriteOperation.Delete -> addDeleteItem(operation.key.encodeAsKey(), userExpression) + is WriteOperation.Check -> addConditionCheck(operation.key.encodeAsKey(), userExpression) } } - for (keyToDelete in writeSet.keysToDelete) { - addDeleteItem(keyToDelete.encodeAsKey(), writeSet.writeExpressions[keyToDelete]) - } - for (keyToCheck in writeSet.keysToCheck) { - addConditionCheck(keyToCheck.encodeAsKey(), writeSet.writeExpressions[keyToCheck]) - } if (writeSet.idempotencyToken != null) { clientRequestToken(writeSet.idempotencyToken) } @@ -439,24 +440,19 @@ internal class DynamoDbLogicalDb( } private fun TransactionWriteSet.describeOperations(): List { - val descriptions = mutableListOf() - for (itemToSave in itemsToSave) { - val rawItemKey = itemToSave.encodeAsItem().rawItemKey() - descriptions.add("Save item (non-key attributes omitted) $rawItemKey") - } - for (itemToPut in itemsToPut) { - val rawItemKey = itemToPut.encodeAsItem().rawItemKey() - descriptions.add("Put item (non-key attributes omitted) $rawItemKey") - } - for (keyToDelete in keysToDelete) { - val rawItemKey = keyToDelete.encodeAsKey().rawItemKey() - descriptions.add("Delete key $rawItemKey") - } - for (keyToCheck in keysToCheck) { - val rawItemKey = keyToCheck.encodeAsKey().rawItemKey() - descriptions.add("Check key $rawItemKey") + // Describe in insertion order so the message lines up with the returned cancellation reasons. + return operations.map { operation -> + when (operation) { + is WriteOperation.Save -> + "Save item (non-key attributes omitted) ${operation.item.encodeAsItem().rawItemKey()}" + is WriteOperation.Put -> + "Put item (non-key attributes omitted) ${operation.item.encodeAsItem().rawItemKey()}" + is WriteOperation.Delete -> + "Delete key ${operation.key.encodeAsKey().rawItemKey()}" + is WriteOperation.Check -> + "Check key ${operation.key.encodeAsKey().rawItemKey()}" + } } - return descriptions.toList() } private fun ReadBatch.Builder.addGetItem(key: Key, consistentReads: Boolean) = diff --git a/tempest2/src/test/kotlin/app/cash/tempest2/LogicalDbTransactionTest.kt b/tempest2/src/test/kotlin/app/cash/tempest2/LogicalDbTransactionTest.kt index 401a29ce1..132566cf9 100644 --- a/tempest2/src/test/kotlin/app/cash/tempest2/LogicalDbTransactionTest.kt +++ b/tempest2/src/test/kotlin/app/cash/tempest2/LogicalDbTransactionTest.kt @@ -290,6 +290,54 @@ class LogicalDbTransactionTest { ) } + @Test + fun transactionWritePreservesOperationOrderAcrossKinds() { + val playlistInfoV1 = PlaylistInfo("PLAYLIST_1", "WFH Music", emptyList()) + musicTable.playlistInfo.save(playlistInfoV1) + val albumTrack = AlbumTrack( + "ALBUM_1", + 1, + "dreamin'", + Duration.parse("PT3M28S") + ) + musicTable.albumTracks.save(albumTrack) + + val playlistInfoV2 = playlistInfoV1.copy( + playlist_version = playlistInfoV1.playlist_version + 1 + ) + + // Interleave a delete before a save. Historically Tempest grouped saves ahead of deletes when + // building the request, so the request order (and the positionally-aligned cancellation reasons + // and failure message) would not match the caller's insertion order. + val writeTransaction = TransactionWriteSet.Builder() + .delete(AlbumTrack.Key("ALBUM_1", 1)) + .save( + playlistInfoV2, + ifPlaylistVersionIs(playlistInfoV1.playlist_version) + ) + .build() + + // The operations list reflects the caller's insertion order. + assertThat(writeTransaction.operations).containsExactly( + WriteOperation.Delete(AlbumTrack.Key("ALBUM_1", 1)), + WriteOperation.Save(playlistInfoV2) + ) + + // Introduce a race condition so the save's condition expression fails and the transaction is + // cancelled, surfacing the operation order in the failure message. + musicTable.playlistInfo.save(playlistInfoV2) + + assertThatExceptionOfType(TransactionCanceledException::class.java) + .isThrownBy { + musicDb.transactionWrite(writeTransaction) + } + .withMessageContaining( + "Write transaction failed: [" + + "Delete key music_items[partition_key=AttributeValue(S=ALBUM_1),sort_key=AttributeValue(S=TRACK_0000000000000001)], " + + "Save item (non-key attributes omitted) music_items[partition_key=AttributeValue(S=PLAYLIST_1),sort_key=AttributeValue(S=INFO_)]]" + ) + } + @Test fun transactionWriteWithAutoGeneratedTimestamp() { versionedAttributeDb.transactionWrite(