Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 60 additions & 12 deletions tempest2/src/main/kotlin/app/cash/tempest2/Model.kt
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,24 @@ data class TransactionWriteSet(
val idempotencyToken: String?
) {

/**
* The save/put/delete/check operations in the order the caller added them via the [Builder].
*
* DynamoDB aligns the `List<CancellationReason>` on a `TransactionCanceledException` positionally
* with the items in the submitted request, so preserving the caller's insertion order across
* operation kinds lets callers map a cancellation reason back to the operation that triggered it.
*
* When a [TransactionWriteSet] is constructed directly rather than through the [Builder], this
* defaults to save → put → delete → check order, matching the historical request ordering.
*/
var operations: List<WriteOperation> = buildList {
itemsToSave.forEach { add(WriteOperation.Save(it)) }
itemsToPut.forEach { add(WriteOperation.Put(it)) }
keysToDelete.forEach { add(WriteOperation.Delete(it)) }
keysToCheck.forEach { add(WriteOperation.Check(it)) }
}
internal set

val sizeDynamoDbTable
get() = itemsToSave.size + itemsToPut.size + keysToDelete.size + keysToCheck.size

Expand All @@ -101,6 +119,7 @@ data class TransactionWriteSet(
private val keysToDelete = mutableSetOf<Any>()
private val keysToCheck = mutableSetOf<Any>()
private val writeExpressions = mutableMapOf<Any, Expression>()
private val operations = mutableListOf<WriteOperation>()
private var idempotencyToken: String? = null

val size
Expand All @@ -123,6 +142,7 @@ data class TransactionWriteSet(
require(added) {
"Duplicate items are not allowed"
}
operations.add(WriteOperation.Save(item))
if (expression != null) {
writeExpressions[item] = expression
}
Expand All @@ -143,6 +163,7 @@ data class TransactionWriteSet(
require(added) {
"Duplicate items are not allowed"
}
operations.add(WriteOperation.Put(item))
if (expression != null) {
writeExpressions[item] = expression
}
Expand All @@ -158,6 +179,7 @@ data class TransactionWriteSet(
"Duplicate items are not allowed: $key."
}
keysToDelete.add(key)
operations.add(WriteOperation.Delete(key))
if (expression != null) {
writeExpressions[key] = expression
}
Expand All @@ -172,6 +194,7 @@ data class TransactionWriteSet(
"Duplicate items are not allowed: $key."
}
keysToCheck.add(key)
operations.add(WriteOperation.Check(key))
if (expression != null) {
writeExpressions[key] = expression
}
Expand All @@ -184,17 +207,15 @@ data class TransactionWriteSet(
fun addAll(builder: Builder) {
check(builder.idempotencyToken == null) { "too many idempotency tokens" }

for (item in builder.itemsToSave) {
save(item)
}
for (item in builder.itemsToPut) {
put(item)
}
for (item in builder.keysToDelete) {
delete(item)
}
for (item in builder.keysToCheck) {
checkCondition(item)
// Replay in the source builder's insertion order so the merged set preserves ordering across
// operation kinds. Expressions are merged below rather than per-operation.
for (operation in builder.operations) {
when (operation) {
is WriteOperation.Save -> save(operation.item)
is WriteOperation.Put -> put(operation.item)
is WriteOperation.Delete -> delete(operation.key)
is WriteOperation.Check -> checkCondition(operation.key)
}
}

writeExpressions += builder.writeExpressions
Expand All @@ -208,11 +229,38 @@ data class TransactionWriteSet(
KeySet(keysToCheck),
writeExpressions.toMap(),
idempotencyToken
)
).also {
it.operations = operations.toList()
}
}
}
}

/**
* A single operation in a [TransactionWriteSet], retaining the order it was added to the
* [TransactionWriteSet.Builder].
*/
sealed class WriteOperation {
/** The item or key this operation acts on; the lookup key into [TransactionWriteSet.writeExpressions]. */
abstract val subject: Any

data class Save(val item: Any) : WriteOperation() {
override val subject get() = item
}

data class Put(val item: Any) : WriteOperation() {
override val subject get() = item
}

data class Delete(val key: Any) : WriteOperation() {
override val subject get() = key
}

data class Check(val key: Any) : WriteOperation() {
override val subject get() = key
}
}

/**
* A collection of keys or items across tables.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import app.cash.tempest2.KeySet
import app.cash.tempest2.LogicalDb
import app.cash.tempest2.LogicalTable
import app.cash.tempest2.TransactionWriteSet
import app.cash.tempest2.WriteOperation
import app.cash.tempest2.internal.DynamoDbLogicalDb.WriteRequest.Op.CLOBBER
import app.cash.tempest2.internal.DynamoDbLogicalDb.WriteRequest.Op.DELETE
import kotlinx.coroutines.flow.map
Expand Down Expand Up @@ -345,26 +346,26 @@ internal class DynamoDbLogicalDb(
private fun toTransactionWriteRequest(writeSet: TransactionWriteSet): TransactWriteItemsEnhancedRequest? {
return TransactWriteItemsEnhancedRequest.builder()
.apply {
for (itemToSave in writeSet.itemsToSave) {
addUpdateItem(itemToSave.encodeAsItem(), writeSet.writeExpressions[itemToSave])
}
for (itemToPut in writeSet.itemsToPut) {
val userExpression = writeSet.writeExpressions[itemToPut]
val encodedItem = itemToPut.encodeAsItem()
if (userExpression != null) {
// Manual versioning: merge version check with user expression
addPutItemWithManualVersioning(encodedItem, userExpression)
} else {
// Let SDK handle versioning automatically
addPutItem(encodedItem, null)
// Replay operations in the caller's insertion order so the request items line up
// positionally with DynamoDB's returned List<CancellationReason>.
for (operation in writeSet.operations) {
val userExpression = writeSet.writeExpressions[operation.subject]
when (operation) {
is WriteOperation.Save -> addUpdateItem(operation.item.encodeAsItem(), userExpression)
is WriteOperation.Put -> {
val encodedItem = operation.item.encodeAsItem()
if (userExpression != null) {
// Manual versioning: merge version check with user expression
addPutItemWithManualVersioning(encodedItem, userExpression)
} else {
// Let SDK handle versioning automatically
addPutItem(encodedItem, null)
}
}
is WriteOperation.Delete -> addDeleteItem(operation.key.encodeAsKey(), userExpression)
is WriteOperation.Check -> addConditionCheck(operation.key.encodeAsKey(), userExpression)
}
}
for (keyToDelete in writeSet.keysToDelete) {
addDeleteItem(keyToDelete.encodeAsKey(), writeSet.writeExpressions[keyToDelete])
}
for (keyToCheck in writeSet.keysToCheck) {
addConditionCheck(keyToCheck.encodeAsKey(), writeSet.writeExpressions[keyToCheck])
}
if (writeSet.idempotencyToken != null) {
clientRequestToken(writeSet.idempotencyToken)
}
Expand Down Expand Up @@ -439,24 +440,19 @@ internal class DynamoDbLogicalDb(
}

private fun TransactionWriteSet.describeOperations(): List<String> {
val descriptions = mutableListOf<String>()
for (itemToSave in itemsToSave) {
val rawItemKey = itemToSave.encodeAsItem().rawItemKey()
descriptions.add("Save item (non-key attributes omitted) $rawItemKey")
}
for (itemToPut in itemsToPut) {
val rawItemKey = itemToPut.encodeAsItem().rawItemKey()
descriptions.add("Put item (non-key attributes omitted) $rawItemKey")
}
for (keyToDelete in keysToDelete) {
val rawItemKey = keyToDelete.encodeAsKey().rawItemKey()
descriptions.add("Delete key $rawItemKey")
}
for (keyToCheck in keysToCheck) {
val rawItemKey = keyToCheck.encodeAsKey().rawItemKey()
descriptions.add("Check key $rawItemKey")
// Describe in insertion order so the message lines up with the returned cancellation reasons.
return operations.map { operation ->
when (operation) {
is WriteOperation.Save ->
"Save item (non-key attributes omitted) ${operation.item.encodeAsItem().rawItemKey()}"
is WriteOperation.Put ->
"Put item (non-key attributes omitted) ${operation.item.encodeAsItem().rawItemKey()}"
is WriteOperation.Delete ->
"Delete key ${operation.key.encodeAsKey().rawItemKey()}"
is WriteOperation.Check ->
"Check key ${operation.key.encodeAsKey().rawItemKey()}"
}
}
return descriptions.toList()
}

private fun <T> ReadBatch.Builder<T>.addGetItem(key: Key, consistentReads: Boolean) =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,54 @@ class LogicalDbTransactionTest {
)
}

@Test
fun transactionWritePreservesOperationOrderAcrossKinds() {
val playlistInfoV1 = PlaylistInfo("PLAYLIST_1", "WFH Music", emptyList())
musicTable.playlistInfo.save(playlistInfoV1)
val albumTrack = AlbumTrack(
"ALBUM_1",
1,
"dreamin'",
Duration.parse("PT3M28S")
)
musicTable.albumTracks.save(albumTrack)

val playlistInfoV2 = playlistInfoV1.copy(
playlist_version = playlistInfoV1.playlist_version + 1
)

// Interleave a delete before a save. Historically Tempest grouped saves ahead of deletes when
// building the request, so the request order (and the positionally-aligned cancellation reasons
// and failure message) would not match the caller's insertion order.
val writeTransaction = TransactionWriteSet.Builder()
.delete(AlbumTrack.Key("ALBUM_1", 1))
.save(
playlistInfoV2,
ifPlaylistVersionIs(playlistInfoV1.playlist_version)
)
.build()

// The operations list reflects the caller's insertion order.
assertThat(writeTransaction.operations).containsExactly(
WriteOperation.Delete(AlbumTrack.Key("ALBUM_1", 1)),
WriteOperation.Save(playlistInfoV2)
)

// Introduce a race condition so the save's condition expression fails and the transaction is
// cancelled, surfacing the operation order in the failure message.
musicTable.playlistInfo.save(playlistInfoV2)

assertThatExceptionOfType(TransactionCanceledException::class.java)
.isThrownBy {
musicDb.transactionWrite(writeTransaction)
}
.withMessageContaining(
"Write transaction failed: [" +
"Delete key music_items[partition_key=AttributeValue(S=ALBUM_1),sort_key=AttributeValue(S=TRACK_0000000000000001)], " +
"Save item (non-key attributes omitted) music_items[partition_key=AttributeValue(S=PLAYLIST_1),sort_key=AttributeValue(S=INFO_)]]"
)
}

@Test
fun transactionWriteWithAutoGeneratedTimestamp() {
versionedAttributeDb.transactionWrite(
Expand Down
Loading