diff --git a/tempest2/src/main/kotlin/app/cash/tempest2/Model.kt b/tempest2/src/main/kotlin/app/cash/tempest2/Model.kt index f31183f18..7437fc19c 100644 --- a/tempest2/src/main/kotlin/app/cash/tempest2/Model.kt +++ b/tempest2/src/main/kotlin/app/cash/tempest2/Model.kt @@ -92,6 +92,24 @@ data class TransactionWriteSet( val idempotencyToken: String? ) { + /** + * The save/put/delete/check operations in the order the caller added them via the [Builder]. + * + * DynamoDB aligns the `List` on a `TransactionCanceledException` positionally + * with the items in the submitted request, so preserving the caller's insertion order across + * operation kinds lets callers map a cancellation reason back to the operation that triggered it. + * + * When a [TransactionWriteSet] is constructed directly rather than through the [Builder], this + * defaults to save → put → delete → check order, matching the historical request ordering. + */ + var operations: List = buildList { + itemsToSave.forEach { add(WriteOperation.Save(it)) } + itemsToPut.forEach { add(WriteOperation.Put(it)) } + keysToDelete.forEach { add(WriteOperation.Delete(it)) } + keysToCheck.forEach { add(WriteOperation.Check(it)) } + } + internal set + val sizeDynamoDbTable get() = itemsToSave.size + itemsToPut.size + keysToDelete.size + keysToCheck.size @@ -101,6 +119,7 @@ data class TransactionWriteSet( private val keysToDelete = mutableSetOf() private val keysToCheck = mutableSetOf() private val writeExpressions = mutableMapOf() + private val operations = mutableListOf() private var idempotencyToken: String? = null val size @@ -123,6 +142,7 @@ data class TransactionWriteSet( require(added) { "Duplicate items are not allowed" } + operations.add(WriteOperation.Save(item)) if (expression != null) { writeExpressions[item] = expression } @@ -143,6 +163,7 @@ data class TransactionWriteSet( require(added) { "Duplicate items are not allowed" } + operations.add(WriteOperation.Put(item)) if (expression != null) { writeExpressions[item] = expression } @@ -158,6 +179,7 @@ data class TransactionWriteSet( "Duplicate items are not allowed: $key." } keysToDelete.add(key) + operations.add(WriteOperation.Delete(key)) if (expression != null) { writeExpressions[key] = expression } @@ -172,6 +194,7 @@ data class TransactionWriteSet( "Duplicate items are not allowed: $key." } keysToCheck.add(key) + operations.add(WriteOperation.Check(key)) if (expression != null) { writeExpressions[key] = expression } @@ -184,17 +207,15 @@ data class TransactionWriteSet( fun addAll(builder: Builder) { check(builder.idempotencyToken == null) { "too many idempotency tokens" } - for (item in builder.itemsToSave) { - save(item) - } - for (item in builder.itemsToPut) { - put(item) - } - for (item in builder.keysToDelete) { - delete(item) - } - for (item in builder.keysToCheck) { - checkCondition(item) + // Replay in the source builder's insertion order so the merged set preserves ordering across + // operation kinds. Expressions are merged below rather than per-operation. + for (operation in builder.operations) { + when (operation) { + is WriteOperation.Save -> save(operation.item) + is WriteOperation.Put -> put(operation.item) + is WriteOperation.Delete -> delete(operation.key) + is WriteOperation.Check -> checkCondition(operation.key) + } } writeExpressions += builder.writeExpressions @@ -208,11 +229,38 @@ data class TransactionWriteSet( KeySet(keysToCheck), writeExpressions.toMap(), idempotencyToken - ) + ).also { + it.operations = operations.toList() + } } } } +/** + * A single operation in a [TransactionWriteSet], retaining the order it was added to the + * [TransactionWriteSet.Builder]. + */ +sealed class WriteOperation { + /** The item or key this operation acts on; the lookup key into [TransactionWriteSet.writeExpressions]. */ + abstract val subject: Any + + data class Save(val item: Any) : WriteOperation() { + override val subject get() = item + } + + data class Put(val item: Any) : WriteOperation() { + override val subject get() = item + } + + data class Delete(val key: Any) : WriteOperation() { + override val subject get() = key + } + + data class Check(val key: Any) : WriteOperation() { + override val subject get() = key + } +} + /** * A collection of keys or items across tables. */ diff --git a/tempest2/src/main/kotlin/app/cash/tempest2/internal/DynamoDbLogicalDb.kt b/tempest2/src/main/kotlin/app/cash/tempest2/internal/DynamoDbLogicalDb.kt index a18212476..a95660d99 100644 --- a/tempest2/src/main/kotlin/app/cash/tempest2/internal/DynamoDbLogicalDb.kt +++ b/tempest2/src/main/kotlin/app/cash/tempest2/internal/DynamoDbLogicalDb.kt @@ -27,6 +27,7 @@ import app.cash.tempest2.KeySet import app.cash.tempest2.LogicalDb import app.cash.tempest2.LogicalTable import app.cash.tempest2.TransactionWriteSet +import app.cash.tempest2.WriteOperation import app.cash.tempest2.internal.DynamoDbLogicalDb.WriteRequest.Op.CLOBBER import app.cash.tempest2.internal.DynamoDbLogicalDb.WriteRequest.Op.DELETE import kotlinx.coroutines.flow.map @@ -345,26 +346,26 @@ internal class DynamoDbLogicalDb( private fun toTransactionWriteRequest(writeSet: TransactionWriteSet): TransactWriteItemsEnhancedRequest? { return TransactWriteItemsEnhancedRequest.builder() .apply { - for (itemToSave in writeSet.itemsToSave) { - addUpdateItem(itemToSave.encodeAsItem(), writeSet.writeExpressions[itemToSave]) - } - for (itemToPut in writeSet.itemsToPut) { - val userExpression = writeSet.writeExpressions[itemToPut] - val encodedItem = itemToPut.encodeAsItem() - if (userExpression != null) { - // Manual versioning: merge version check with user expression - addPutItemWithManualVersioning(encodedItem, userExpression) - } else { - // Let SDK handle versioning automatically - addPutItem(encodedItem, null) + // Replay operations in the caller's insertion order so the request items line up + // positionally with DynamoDB's returned List. + for (operation in writeSet.operations) { + val userExpression = writeSet.writeExpressions[operation.subject] + when (operation) { + is WriteOperation.Save -> addUpdateItem(operation.item.encodeAsItem(), userExpression) + is WriteOperation.Put -> { + val encodedItem = operation.item.encodeAsItem() + if (userExpression != null) { + // Manual versioning: merge version check with user expression + addPutItemWithManualVersioning(encodedItem, userExpression) + } else { + // Let SDK handle versioning automatically + addPutItem(encodedItem, null) + } + } + is WriteOperation.Delete -> addDeleteItem(operation.key.encodeAsKey(), userExpression) + is WriteOperation.Check -> addConditionCheck(operation.key.encodeAsKey(), userExpression) } } - for (keyToDelete in writeSet.keysToDelete) { - addDeleteItem(keyToDelete.encodeAsKey(), writeSet.writeExpressions[keyToDelete]) - } - for (keyToCheck in writeSet.keysToCheck) { - addConditionCheck(keyToCheck.encodeAsKey(), writeSet.writeExpressions[keyToCheck]) - } if (writeSet.idempotencyToken != null) { clientRequestToken(writeSet.idempotencyToken) } @@ -439,24 +440,19 @@ internal class DynamoDbLogicalDb( } private fun TransactionWriteSet.describeOperations(): List { - val descriptions = mutableListOf() - for (itemToSave in itemsToSave) { - val rawItemKey = itemToSave.encodeAsItem().rawItemKey() - descriptions.add("Save item (non-key attributes omitted) $rawItemKey") - } - for (itemToPut in itemsToPut) { - val rawItemKey = itemToPut.encodeAsItem().rawItemKey() - descriptions.add("Put item (non-key attributes omitted) $rawItemKey") - } - for (keyToDelete in keysToDelete) { - val rawItemKey = keyToDelete.encodeAsKey().rawItemKey() - descriptions.add("Delete key $rawItemKey") - } - for (keyToCheck in keysToCheck) { - val rawItemKey = keyToCheck.encodeAsKey().rawItemKey() - descriptions.add("Check key $rawItemKey") + // Describe in insertion order so the message lines up with the returned cancellation reasons. + return operations.map { operation -> + when (operation) { + is WriteOperation.Save -> + "Save item (non-key attributes omitted) ${operation.item.encodeAsItem().rawItemKey()}" + is WriteOperation.Put -> + "Put item (non-key attributes omitted) ${operation.item.encodeAsItem().rawItemKey()}" + is WriteOperation.Delete -> + "Delete key ${operation.key.encodeAsKey().rawItemKey()}" + is WriteOperation.Check -> + "Check key ${operation.key.encodeAsKey().rawItemKey()}" + } } - return descriptions.toList() } private fun ReadBatch.Builder.addGetItem(key: Key, consistentReads: Boolean) = diff --git a/tempest2/src/test/kotlin/app/cash/tempest2/LogicalDbTransactionTest.kt b/tempest2/src/test/kotlin/app/cash/tempest2/LogicalDbTransactionTest.kt index 401a29ce1..132566cf9 100644 --- a/tempest2/src/test/kotlin/app/cash/tempest2/LogicalDbTransactionTest.kt +++ b/tempest2/src/test/kotlin/app/cash/tempest2/LogicalDbTransactionTest.kt @@ -290,6 +290,54 @@ class LogicalDbTransactionTest { ) } + @Test + fun transactionWritePreservesOperationOrderAcrossKinds() { + val playlistInfoV1 = PlaylistInfo("PLAYLIST_1", "WFH Music", emptyList()) + musicTable.playlistInfo.save(playlistInfoV1) + val albumTrack = AlbumTrack( + "ALBUM_1", + 1, + "dreamin'", + Duration.parse("PT3M28S") + ) + musicTable.albumTracks.save(albumTrack) + + val playlistInfoV2 = playlistInfoV1.copy( + playlist_version = playlistInfoV1.playlist_version + 1 + ) + + // Interleave a delete before a save. Historically Tempest grouped saves ahead of deletes when + // building the request, so the request order (and the positionally-aligned cancellation reasons + // and failure message) would not match the caller's insertion order. + val writeTransaction = TransactionWriteSet.Builder() + .delete(AlbumTrack.Key("ALBUM_1", 1)) + .save( + playlistInfoV2, + ifPlaylistVersionIs(playlistInfoV1.playlist_version) + ) + .build() + + // The operations list reflects the caller's insertion order. + assertThat(writeTransaction.operations).containsExactly( + WriteOperation.Delete(AlbumTrack.Key("ALBUM_1", 1)), + WriteOperation.Save(playlistInfoV2) + ) + + // Introduce a race condition so the save's condition expression fails and the transaction is + // cancelled, surfacing the operation order in the failure message. + musicTable.playlistInfo.save(playlistInfoV2) + + assertThatExceptionOfType(TransactionCanceledException::class.java) + .isThrownBy { + musicDb.transactionWrite(writeTransaction) + } + .withMessageContaining( + "Write transaction failed: [" + + "Delete key music_items[partition_key=AttributeValue(S=ALBUM_1),sort_key=AttributeValue(S=TRACK_0000000000000001)], " + + "Save item (non-key attributes omitted) music_items[partition_key=AttributeValue(S=PLAYLIST_1),sort_key=AttributeValue(S=INFO_)]]" + ) + } + @Test fun transactionWriteWithAutoGeneratedTimestamp() { versionedAttributeDb.transactionWrite(