diff --git a/src/EfficientDynamoDb/DynamoDbContext/DynamoDbContext,BatchWriteItem.cs b/src/EfficientDynamoDb/DynamoDbContext/DynamoDbContext,BatchWriteItem.cs index 6be1951e..6502bbaa 100644 --- a/src/EfficientDynamoDb/DynamoDbContext/DynamoDbContext,BatchWriteItem.cs +++ b/src/EfficientDynamoDb/DynamoDbContext/DynamoDbContext,BatchWriteItem.cs @@ -1,6 +1,7 @@ using System.Threading; using System.Threading.Tasks; using EfficientDynamoDb.Exceptions; +using EfficientDynamoDb.Internal.Extensions; using EfficientDynamoDb.Internal.Operations.BatchWriteItem; using EfficientDynamoDb.Operations.BatchWriteItem; using EfficientDynamoDb.Operations.Query; @@ -20,7 +21,7 @@ internal async Task BatchWriteItemAsync(BuilderNode node, CancellationToken canc using var httpContent = new BatchWriteItemHighLevelHttpContent(this, node, Config.TableNamePrefix); using var response = await Api.SendAsync(httpContent, cancellationToken).ConfigureAwait(false); - var documentResult = await DynamoDbLowLevelContext.ReadDocumentAsync(response, BatchWriteItemParsingOptions.Instance, cancellationToken).ConfigureAwait(false); + var documentResult = await response.ReadDocumentAsync(BatchWriteItemParsingOptions.Instance, cancellationToken).ConfigureAwait(false); var attempt = 0; while (documentResult != null) @@ -36,7 +37,7 @@ internal async Task BatchWriteItemAsync(BuilderNode node, CancellationToken canc using var unprocessedHttpContent = new BatchWriteItemHttpContent(new BatchWriteItemRequest{RequestItems = unprocessedItems}, null); using var unprocessedResponse = await Api.SendAsync(unprocessedHttpContent, cancellationToken).ConfigureAwait(false); - documentResult = await DynamoDbLowLevelContext.ReadDocumentAsync(unprocessedResponse, BatchWriteItemParsingOptions.Instance, cancellationToken).ConfigureAwait(false); + documentResult = await unprocessedResponse.ReadDocumentAsync(BatchWriteItemParsingOptions.Instance, cancellationToken).ConfigureAwait(false); } } } diff --git a/src/EfficientDynamoDb/DynamoDbContext/DynamoDbContext.cs b/src/EfficientDynamoDb/DynamoDbContext/DynamoDbContext.cs index 5531d626..deea6979 100644 --- a/src/EfficientDynamoDb/DynamoDbContext/DynamoDbContext.cs +++ b/src/EfficientDynamoDb/DynamoDbContext/DynamoDbContext.cs @@ -9,7 +9,6 @@ using EfficientDynamoDb.Internal.Converters.Json; using EfficientDynamoDb.Internal.Extensions; using EfficientDynamoDb.Internal.Reader; -using static EfficientDynamoDb.DynamoDbLowLevelContext; namespace EfficientDynamoDb { @@ -46,7 +45,7 @@ private async ValueTask ReadAsync(HttpResponseMessage response { await using var responseStream = await response.Content.ReadAsStreamAsync().ConfigureAwait(false); - var expectedCrc = GetExpectedCrc(response); + var expectedCrc = response.GetExpectedCrc(); var classInfo = Config.Metadata.GetOrAddClassInfo(typeof(TResult), typeof(JsonObjectDdbConverter)); var result = await EntityDdbJsonReader.ReadAsync(responseStream, classInfo, Config.Metadata, expectedCrc.HasValue, cancellationToken: cancellationToken).ConfigureAwait(false); diff --git a/src/EfficientDynamoDb/DynamoDbContext/DynamoDbLowLevelContext.cs b/src/EfficientDynamoDb/DynamoDbContext/DynamoDbLowLevelContext.cs index 062dee4b..1d302476 100644 --- a/src/EfficientDynamoDb/DynamoDbContext/DynamoDbLowLevelContext.cs +++ b/src/EfficientDynamoDb/DynamoDbContext/DynamoDbLowLevelContext.cs @@ -4,7 +4,6 @@ using System.Threading; using System.Threading.Tasks; using EfficientDynamoDb.DocumentModel; -using EfficientDynamoDb.Exceptions; using EfficientDynamoDb.Internal; using EfficientDynamoDb.Internal.Extensions; using EfficientDynamoDb.Internal.Operations.BatchGetItem; @@ -18,7 +17,6 @@ using EfficientDynamoDb.Internal.Operations.TransactGetItems; using EfficientDynamoDb.Internal.Operations.TransactWriteItems; using EfficientDynamoDb.Internal.Operations.UpdateItem; -using EfficientDynamoDb.Internal.Reader; using EfficientDynamoDb.Operations.BatchGetItem; using EfficientDynamoDb.Operations.BatchWriteItem; using EfficientDynamoDb.Operations.DeleteItem; @@ -57,7 +55,7 @@ public async Task BatchGetItemAsync(BatchGetItemRequest re using var httpContent = new BatchGetItemHttpContent(request, Config.TableNamePrefix); using var response = await Api.SendAsync(httpContent, cancellationToken).ConfigureAwait(false); - var result = await ReadDocumentAsync(response, BatchGetItemParsingOptions.Instance, cancellationToken).ConfigureAwait(false); + var result = await response.ReadDocumentAsync(BatchGetItemParsingOptions.Instance, cancellationToken).ConfigureAwait(false); return BatchGetItemResponseParser.Parse(result!); } @@ -67,7 +65,7 @@ public async Task BatchWriteItemAsync(BatchWriteItemRequ using var httpContent = new BatchWriteItemHttpContent(request, Config.TableNamePrefix); using var response = await Api.SendAsync(httpContent, cancellationToken).ConfigureAwait(false); - var result = await ReadDocumentAsync(response, BatchWriteItemParsingOptions.Instance, cancellationToken).ConfigureAwait(false); + var result = await response.ReadDocumentAsync(BatchWriteItemParsingOptions.Instance, cancellationToken).ConfigureAwait(false); return BatchWriteItemResponseParser.Parse(result!); } @@ -77,7 +75,7 @@ public async Task QueryAsync(QueryRequest request, CancellationTo using var httpContent = new QueryHttpContent(request, Config.TableNamePrefix); using var response = await Api.SendAsync(httpContent, cancellationToken).ConfigureAwait(false); - var result = await ReadDocumentAsync(response, QueryParsingOptions.Instance, cancellationToken).ConfigureAwait(false); + var result = await response.ReadDocumentAsync(QueryParsingOptions.Instance, cancellationToken).ConfigureAwait(false); return QueryResponseParser.Parse(result!); } @@ -87,7 +85,7 @@ public async Task ScanAsync(ScanRequest request, CancellationToken using var httpContent = new ScanHttpContent(request, Config.TableNamePrefix); using var response = await Api.SendAsync(httpContent, cancellationToken).ConfigureAwait(false); - var result = await ReadDocumentAsync(response, QueryParsingOptions.Instance, cancellationToken).ConfigureAwait(false); + var result = await response.ReadDocumentAsync(QueryParsingOptions.Instance, cancellationToken).ConfigureAwait(false); return ScanResponseParser.Parse(result!); } @@ -97,7 +95,7 @@ public async Task TransactGetItemsAsync(TransactGetIte using var httpContent = new TransactGetItemsHttpContent(request, Config.TableNamePrefix); using var response = await Api.SendAsync(httpContent, cancellationToken).ConfigureAwait(false); - var result = await ReadDocumentAsync(response, TransactGetItemsParsingOptions.Instance, cancellationToken).ConfigureAwait(false); + var result = await response.ReadDocumentAsync(TransactGetItemsParsingOptions.Instance, cancellationToken).ConfigureAwait(false); return TransactGetItemsResponseParser.Parse(result!); } @@ -107,7 +105,7 @@ public async Task PutItemAsync(PutItemRequest request, Cancella using var httpContent = new PutItemHttpContent(request, Config.TableNamePrefix); using var response = await Api.SendAsync(httpContent, cancellationToken).ConfigureAwait(false); - var result = await ReadDocumentAsync(response, PutItemParsingOptions.Instance, cancellationToken).ConfigureAwait(false); + var result = await response.ReadDocumentAsync(PutItemParsingOptions.Instance, cancellationToken).ConfigureAwait(false); return PutItemResponseParser.Parse(result); } @@ -117,7 +115,7 @@ public async Task UpdateItemAsync(UpdateItemRequest request, using var httpContent = await BuildHttpContentAsync(request).ConfigureAwait(false); using var response = await Api.SendAsync(httpContent, cancellationToken).ConfigureAwait(false); - var result = await ReadDocumentAsync(response, UpdateItemParsingOptions.Instance, cancellationToken).ConfigureAwait(false); + var result = await response.ReadDocumentAsync(UpdateItemParsingOptions.Instance, cancellationToken).ConfigureAwait(false); return UpdateItemResponseParser.Parse(result); } @@ -131,7 +129,7 @@ public async Task DeleteItemAsync(DeleteItemRequest request, using var httpContent = new DeleteItemHttpContent(request, pkName, skName, Config.TableNamePrefix); using var response = await Api.SendAsync(httpContent, cancellationToken).ConfigureAwait(false); - var result = await ReadDocumentAsync(response, PutItemParsingOptions.Instance, cancellationToken).ConfigureAwait(false); + var result = await response.ReadDocumentAsync(PutItemParsingOptions.Instance, cancellationToken).ConfigureAwait(false); return DeleteItemResponseParser.Parse(result); } @@ -141,7 +139,7 @@ public async Task TransactWriteItemsAsync(TransactWr using var httpContent = new TransactWriteItemsHttpContent(request, Config.TableNamePrefix); using var response = await Api.SendAsync(httpContent, cancellationToken).ConfigureAwait(false); - var result = await ReadDocumentAsync(response, TransactWriteItemsParsingOptions.Instance, cancellationToken).ConfigureAwait(false); + var result = await response.ReadDocumentAsync(TransactWriteItemsParsingOptions.Instance, cancellationToken).ConfigureAwait(false); return TransactWriteItemsResponseParser.Parse(result); } @@ -153,7 +151,7 @@ public async Task TransactWriteItemsAsync(TransactWr private async ValueTask GetItemInternalAsync(HttpContent httpContent, CancellationToken cancellationToken = default) { using var response = await Api.SendAsync(httpContent, cancellationToken).ConfigureAwait(false); - var result = await ReadDocumentAsync(response, GetItemParsingOptions.Instance, cancellationToken).ConfigureAwait(false); + var result = await response.ReadDocumentAsync(GetItemParsingOptions.Instance, cancellationToken).ConfigureAwait(false); // TODO: Consider removing root dictionary return GetItemResponseParser.Parse(result!); @@ -197,29 +195,5 @@ private async ValueTask BuildHttpContentAsync(UpdateItemRequest req keySchema.FirstOrDefault(x => x.KeyType == KeyType.Range)?.AttributeName); } } - - internal static async ValueTask ReadDocumentAsync(HttpResponseMessage response, IParsingOptions options, CancellationToken cancellationToken = default) - { - await using var responseStream = await response.Content.ReadAsStreamAsync().ConfigureAwait(false); - - var expectedCrc = GetExpectedCrc(response); - var result = await DdbJsonReader.ReadAsync(responseStream, options, expectedCrc.HasValue, cancellationToken).ConfigureAwait(false); - - if (expectedCrc.HasValue && expectedCrc.Value != result.Crc) - throw new ChecksumMismatchException(); - - return result.Value; - } - - internal static uint? GetExpectedCrc(HttpResponseMessage response) - { - if (!response.Content.Headers.ContentLength.HasValue) - return null; - - if (response.Headers.TryGetValues("x-amz-crc32", out var crcHeaderValues) && uint.TryParse(crcHeaderValues.FirstOrDefault(), out var expectedCrc)) - return expectedCrc; - - return null; - } } } \ No newline at end of file diff --git a/src/EfficientDynamoDb/DynamoDbStreamContext/DynamoDbStreamContext.cs b/src/EfficientDynamoDb/DynamoDbStreamContext/DynamoDbStreamContext.cs new file mode 100644 index 00000000..4c0ef4ff --- /dev/null +++ b/src/EfficientDynamoDb/DynamoDbStreamContext/DynamoDbStreamContext.cs @@ -0,0 +1,22 @@ +using EfficientDynamoDb.Internal; +using EfficientDynamoDb.Internal.Constants; + +namespace EfficientDynamoDb +{ + public interface IDynamoDbStreamsContext + { + IDynamoDbStreamsLowLevelContext LowLevel { get; } + } + + public class DynamoDbStreamsContext : IDynamoDbStreamsContext + { + private readonly DynamoDbStreamsLowLevelContext _lowLevel; + + public DynamoDbStreamsContext(DynamoDbContextConfig config) + { + _lowLevel = new DynamoDbStreamsLowLevelContext(config, new HttpApi(config, ServiceNames.DynamoDbStreams)); + } + + public IDynamoDbStreamsLowLevelContext LowLevel => _lowLevel; + } +} \ No newline at end of file diff --git a/src/EfficientDynamoDb/DynamoDbStreamContext/DynamoDbStreamsLowLevelContext.cs b/src/EfficientDynamoDb/DynamoDbStreamContext/DynamoDbStreamsLowLevelContext.cs new file mode 100644 index 00000000..f63bca19 --- /dev/null +++ b/src/EfficientDynamoDb/DynamoDbStreamContext/DynamoDbStreamsLowLevelContext.cs @@ -0,0 +1,56 @@ +using System.Threading; +using System.Threading.Tasks; +using EfficientDynamoDb.Internal; +using EfficientDynamoDb.Internal.Extensions; +using EfficientDynamoDb.Internal.Operations.Streams; +using EfficientDynamoDb.Operations; + +namespace EfficientDynamoDb +{ + public class DynamoDbStreamsLowLevelContext : IDynamoDbStreamsLowLevelContext + { + internal DynamoDbContextConfig Config { get; } + + internal HttpApi Api { get; } + + internal DynamoDbStreamsLowLevelContext(DynamoDbContextConfig config, HttpApi api) + { + Config = config; + Api = api; + } + + public async Task ListStreamsAsync(ListStreamsRequest request, CancellationToken cancellationToken = default) + { + using var httpContent = new ListStreamsHttpContent(request, Config.TableNamePrefix); + + var response = await Api.SendAsync(httpContent, cancellationToken).ConfigureAwait(false); + return response; + } + + public async Task GetShardIteratorAsync(GetShardIteratorRequest request, CancellationToken cancellationToken = default) + { + using var httpContext = new GetShardIteratorHttpContent(request); + + var response = await Api.SendAsync(httpContext, cancellationToken).ConfigureAwait(false); + return response; + } + + public async Task DescribeStreamAsync(DescribeStreamRequest request, CancellationToken cancellationToken = default) + { + using var httpContext = new DescribeStreamHttpContent(request); + + var response = await Api.SendAsync(httpContext, cancellationToken).ConfigureAwait(false); + return response; + } + + public async Task GetRecordsAsync(GetRecordsRequest request, CancellationToken cancellationToken = default) + { + using var httpContext = new GetRecordsHttpContent(request); + + var response = await Api.SendAsync(httpContext, cancellationToken).ConfigureAwait(false); + var result = await response.ReadDocumentAsync(GetRecordsParsingOptions.Instance, cancellationToken).ConfigureAwait(false); + + return GetRecordsResponseParser.Parse(result!); + } + } +} \ No newline at end of file diff --git a/src/EfficientDynamoDb/DynamoDbStreamContext/IDynamoDbStreamsLowLevelContext.cs b/src/EfficientDynamoDb/DynamoDbStreamContext/IDynamoDbStreamsLowLevelContext.cs new file mode 100644 index 00000000..3344c2d8 --- /dev/null +++ b/src/EfficientDynamoDb/DynamoDbStreamContext/IDynamoDbStreamsLowLevelContext.cs @@ -0,0 +1,17 @@ +using System.Threading; +using System.Threading.Tasks; +using EfficientDynamoDb.Operations; + +namespace EfficientDynamoDb +{ + public interface IDynamoDbStreamsLowLevelContext + { + Task ListStreamsAsync(ListStreamsRequest request, CancellationToken cancellationToken = default); + + Task GetShardIteratorAsync(GetShardIteratorRequest request, CancellationToken cancellationToken = default); + + Task DescribeStreamAsync(DescribeStreamRequest request, CancellationToken cancellationToken = default); + + Task GetRecordsAsync(GetRecordsRequest request, CancellationToken cancellationToken = default); + } +} \ No newline at end of file diff --git a/src/EfficientDynamoDb/EfficientDynamoDb.csproj.DotSettings b/src/EfficientDynamoDb/EfficientDynamoDb.csproj.DotSettings index 60f0c18c..eff4c376 100644 --- a/src/EfficientDynamoDb/EfficientDynamoDb.csproj.DotSettings +++ b/src/EfficientDynamoDb/EfficientDynamoDb.csproj.DotSettings @@ -2,8 +2,23 @@ True True True + True + True + False + True + True + True + True True True True True - True \ No newline at end of file + True + True + True + True + True + True + True + True + True \ No newline at end of file diff --git a/src/EfficientDynamoDb/Internal/Extensions/HttpResponseMessageExtensions.cs b/src/EfficientDynamoDb/Internal/Extensions/HttpResponseMessageExtensions.cs new file mode 100644 index 00000000..58146c63 --- /dev/null +++ b/src/EfficientDynamoDb/Internal/Extensions/HttpResponseMessageExtensions.cs @@ -0,0 +1,37 @@ +using System.Linq; +using System.Net.Http; +using System.Threading; +using System.Threading.Tasks; +using EfficientDynamoDb.DocumentModel; +using EfficientDynamoDb.Exceptions; +using EfficientDynamoDb.Internal.Reader; + +namespace EfficientDynamoDb.Internal.Extensions +{ + internal static class HttpResponseMessageExtensions + { + internal static async ValueTask ReadDocumentAsync(this HttpResponseMessage response, IParsingOptions options, CancellationToken cancellationToken = default) + { + await using var responseStream = await response.Content.ReadAsStreamAsync().ConfigureAwait(false); + + var expectedCrc = response.GetExpectedCrc(); + var result = await DdbJsonReader.ReadAsync(responseStream, options, expectedCrc.HasValue, cancellationToken).ConfigureAwait(false); + + if (expectedCrc.HasValue && expectedCrc.Value != result.Crc) + throw new ChecksumMismatchException(); + + return result.Value; + } + + internal static uint? GetExpectedCrc(this HttpResponseMessage response) + { + if (!response.Content.Headers.ContentLength.HasValue) + return null; + + if (response.Headers.TryGetValues("x-amz-crc32", out var crcHeaderValues) && uint.TryParse(crcHeaderValues.FirstOrDefault(), out var expectedCrc)) + return expectedCrc; + + return null; + } + } +} \ No newline at end of file diff --git a/src/EfficientDynamoDb/Internal/Extensions/Utf8JsonWriterExtensions.cs b/src/EfficientDynamoDb/Internal/Extensions/Utf8JsonWriterExtensions.cs index 59996d0b..a6bc641d 100644 --- a/src/EfficientDynamoDb/Internal/Extensions/Utf8JsonWriterExtensions.cs +++ b/src/EfficientDynamoDb/Internal/Extensions/Utf8JsonWriterExtensions.cs @@ -28,6 +28,20 @@ public static void WriteReturnConsumedCapacity(this Utf8JsonWriter writer, Retur }); } + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public static void WriteEnum(this Utf8JsonWriter writer, ReadOnlySpan label, T value) where T : struct, Enum + { + var enumString = value.ToString(); + + Span buffer = stackalloc char[enumString.Length * 2]; // Allocate enough space to account for new underscores + var sb = new NoAllocStringBuilder(in buffer, true); + + enumString.ToUpperSnakeCase(ref sb); + + writer.WriteString(label, sb.GetBuffer()); + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] public static void WriteAttributesDictionary(this Utf8JsonWriter writer, IReadOnlyDictionary attributesDictionary) { diff --git a/src/EfficientDynamoDb/Internal/Operations/Shared/RecordsParser.cs b/src/EfficientDynamoDb/Internal/Operations/Shared/RecordsParser.cs new file mode 100644 index 00000000..8ab5dad3 --- /dev/null +++ b/src/EfficientDynamoDb/Internal/Operations/Shared/RecordsParser.cs @@ -0,0 +1,58 @@ +using System; +using System.Runtime.CompilerServices; +using EfficientDynamoDb.DocumentModel; +using EfficientDynamoDb.Internal.Extensions; +using EfficientDynamoDb.Internal.TypeParsers; +using EfficientDynamoDb.Operations; +using EfficientDynamoDb.Operations.DescribeTable.Models.Enums; + +namespace EfficientDynamoDb.Internal.Operations.Shared +{ + internal static class RecordsParser + { + internal static Record[] ParseRecords(Document response) + { + if (!response.TryGetValue("Records", out var recordsAttribute)) + return Array.Empty(); + + var records = recordsAttribute.AsListAttribute().Items; + var result = new Record[records.Count]; + for (var i = 0; i < records.Count; i++) + { + result[i] = ParseRecord(records[i].AsDocument()); + } + + return result; + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private static Record ParseRecord(Document record) => new Record + { + AwsRegion = record["awsRegion"].AsString(), + DynamoDb = ParseStreamRecord(record["dynamodb"].AsDocument()), + EventId = record["eventID"].AsString(), + EventName = EnumParser.ParseUpperSnakeCase(record["eventName"].AsString()), + EventSource = record["eventSource"].AsString(), + EventVersion = record["eventVersion"].AsString(), + }; + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private static StreamRecord ParseStreamRecord(Document streamRecord) + { + var result = new StreamRecord + { + ApproximateCreationDateTime = streamRecord["ApproximateCreationDateTime"].AsNumberAttribute().ToDouble().FromUnixSeconds(), + Keys = streamRecord["Keys"].AsDocument(), + SequenceNumber = streamRecord["SequenceNumber"].AsString(), + SizeBytes = streamRecord["SizeBytes"].AsNumberAttribute().ToInt(), + StreamViewType = EnumParser.ParseUpperSnakeCase(streamRecord["StreamViewType"].AsString()) + }; + if (result.StreamViewType == StreamViewType.NewImage || result.StreamViewType == StreamViewType.NewAndOldImages) + result.NewImage = streamRecord["NewImage"].AsDocument(); + if (result.StreamViewType == StreamViewType.OldImage || result.StreamViewType == StreamViewType.NewAndOldImages) + result.OldImage = streamRecord["OldImage"].AsDocument(); + + return result; + } + } +} \ No newline at end of file diff --git a/src/EfficientDynamoDb/Internal/Operations/Streams/DescribeStream/DescribeStreamHttpContent.cs b/src/EfficientDynamoDb/Internal/Operations/Streams/DescribeStream/DescribeStreamHttpContent.cs new file mode 100644 index 00000000..715b878a --- /dev/null +++ b/src/EfficientDynamoDb/Internal/Operations/Streams/DescribeStream/DescribeStreamHttpContent.cs @@ -0,0 +1,33 @@ +using System.Threading.Tasks; +using EfficientDynamoDb.Converters; +using EfficientDynamoDb.Internal.Operations.Shared; +using EfficientDynamoDb.Operations; + +namespace EfficientDynamoDb.Internal.Operations.Streams +{ + internal class DescribeStreamHttpContent : DynamoDbHttpContent + { + private readonly DescribeStreamRequest _request; + + public DescribeStreamHttpContent(DescribeStreamRequest request) : base("DynamoDBStreams_20120810.DescribeStream") + { + _request = request; + } + + protected override ValueTask WriteDataAsync(DdbWriter ddbWriter) + { + var writer = ddbWriter.JsonWriter; + + writer.WriteStartObject(); + + writer.WriteString("StreamArn", _request.StreamArn); + if (!string.IsNullOrEmpty(_request.ExclusiveStartShardId)) + writer.WriteString("ExclusiveStartShardId", _request.ExclusiveStartShardId); + if (_request.Limit > 0) + writer.WriteNumber("Limit", _request.Limit); + + writer.WriteEndObject(); + return default; + } + } +} \ No newline at end of file diff --git a/src/EfficientDynamoDb/Internal/Operations/Streams/GetRecords/GetRecordsHttpContent.cs b/src/EfficientDynamoDb/Internal/Operations/Streams/GetRecords/GetRecordsHttpContent.cs new file mode 100644 index 00000000..bc78163e --- /dev/null +++ b/src/EfficientDynamoDb/Internal/Operations/Streams/GetRecords/GetRecordsHttpContent.cs @@ -0,0 +1,30 @@ +using System.Threading.Tasks; +using EfficientDynamoDb.Converters; +using EfficientDynamoDb.Internal.Operations.Shared; +using EfficientDynamoDb.Operations; + +namespace EfficientDynamoDb.Internal.Operations.Streams +{ + internal sealed class GetRecordsHttpContent : DynamoDbHttpContent + { + private readonly GetRecordsRequest _request; + + public GetRecordsHttpContent(GetRecordsRequest request) : base("DynamoDBStreams_20120810.GetRecords") + { + _request = request; + } + + protected override ValueTask WriteDataAsync(DdbWriter ddbWriter) + { + var writer = ddbWriter.JsonWriter; + writer.WriteStartObject(); + + writer.WriteString("ShardIterator", _request.ShardIterator); + if (_request.Limit > 0) + writer.WriteNumber("Limit", _request.Limit); + + writer.WriteEndObject(); + return default; + } + } +} \ No newline at end of file diff --git a/src/EfficientDynamoDb/Internal/Operations/Streams/GetRecords/GetRecordsParsingOptions.cs b/src/EfficientDynamoDb/Internal/Operations/Streams/GetRecords/GetRecordsParsingOptions.cs new file mode 100644 index 00000000..9d32ce19 --- /dev/null +++ b/src/EfficientDynamoDb/Internal/Operations/Streams/GetRecords/GetRecordsParsingOptions.cs @@ -0,0 +1,39 @@ +using System.Text.Json; +using EfficientDynamoDb.Internal.Reader; +using EfficientDynamoDb.Internal.Reader.Metadata; + +namespace EfficientDynamoDb.Internal.Operations.Streams +{ + internal sealed class GetRecordsParsingOptions : IParsingOptions + { + public static readonly GetRecordsParsingOptions Instance = new GetRecordsParsingOptions(); + + public JsonObjectMetadata? Metadata { get; } = new JsonObjectMetadata(new DictionaryFieldsMetadata + { + {"Records", new JsonObjectMetadata(new DictionaryFieldsMetadata + { + {"dynamodb", new JsonObjectMetadata(new DictionaryFieldsMetadata + { + { "Keys", new JsonObjectMetadata(true, false) }, + { "NewImage", new JsonObjectMetadata(true, false) }, + { "OldImage", new JsonObjectMetadata(true, false) } + }, false, false) + } + }, false, false, true) + } + }, false, false); + + public bool HasNumberCallback => false; + + public void StartParsing(ref DdbReadStack state) + { + if(GlobalDynamoDbConfig.InternAttributeNames) + state.KeysCache = new KeysCache(DdbReadStack.DefaultKeysCacheSize, DdbReadStack.MaxKeysCacheSize); + } + + public void OnNumber(ref Utf8JsonReader reader, ref DdbReadStack state) + { + + } + } +} \ No newline at end of file diff --git a/src/EfficientDynamoDb/Internal/Operations/Streams/GetRecords/GetRecordsResponseParser.cs b/src/EfficientDynamoDb/Internal/Operations/Streams/GetRecords/GetRecordsResponseParser.cs new file mode 100644 index 00000000..ff95fa79 --- /dev/null +++ b/src/EfficientDynamoDb/Internal/Operations/Streams/GetRecords/GetRecordsResponseParser.cs @@ -0,0 +1,15 @@ +using EfficientDynamoDb.DocumentModel; +using EfficientDynamoDb.Internal.Operations.Shared; +using EfficientDynamoDb.Operations; + +namespace EfficientDynamoDb.Internal.Operations.Streams +{ + internal static class GetRecordsResponseParser + { + public static GetRecordsResponse Parse(Document response) => new GetRecordsResponse(ParseNextShardIterator(response), + RecordsParser.ParseRecords(response)); + + private static string? ParseNextShardIterator(Document response) => + response.TryGetValue("NextShardIterator", out var nextShardIterator) ? nextShardIterator.AsString() : null; + } +} \ No newline at end of file diff --git a/src/EfficientDynamoDb/Internal/Operations/Streams/GetShardIterator/GetShardIteratorHttpContent.cs b/src/EfficientDynamoDb/Internal/Operations/Streams/GetShardIterator/GetShardIteratorHttpContent.cs new file mode 100644 index 00000000..78cebf99 --- /dev/null +++ b/src/EfficientDynamoDb/Internal/Operations/Streams/GetShardIterator/GetShardIteratorHttpContent.cs @@ -0,0 +1,33 @@ +using System.Threading.Tasks; +using EfficientDynamoDb.Converters; +using EfficientDynamoDb.Internal.Extensions; +using EfficientDynamoDb.Internal.Operations.Shared; +using EfficientDynamoDb.Operations; + +namespace EfficientDynamoDb.Internal.Operations.Streams +{ + internal class GetShardIteratorHttpContent : DynamoDbHttpContent + { + private readonly GetShardIteratorRequest _request; + + public GetShardIteratorHttpContent(GetShardIteratorRequest request) : base("DynamoDBStreams_20120810.GetShardIterator") + { + _request = request; + } + + protected override ValueTask WriteDataAsync(DdbWriter ddbWriter) + { + var writer = ddbWriter.JsonWriter; + writer.WriteStartObject(); + + writer.WriteString("StreamArn", _request.StreamArn); + writer.WriteString("ShardId", _request.ShardId); + writer.WriteEnum("ShardIteratorType", _request.ShardIteratorType); + if (!string.IsNullOrEmpty(_request.SequenceNumber)) + writer.WriteString("SequenceNumber", _request.SequenceNumber); + + writer.WriteEndObject(); + return default; + } + } +} \ No newline at end of file diff --git a/src/EfficientDynamoDb/Internal/Operations/Streams/ListStreams/ListStreamsHttpContent.cs b/src/EfficientDynamoDb/Internal/Operations/Streams/ListStreams/ListStreamsHttpContent.cs new file mode 100644 index 00000000..39082ba9 --- /dev/null +++ b/src/EfficientDynamoDb/Internal/Operations/Streams/ListStreams/ListStreamsHttpContent.cs @@ -0,0 +1,39 @@ +using System.Threading.Tasks; +using EfficientDynamoDb.Converters; +using EfficientDynamoDb.Internal.Extensions; +using EfficientDynamoDb.Internal.Operations.Shared; +using EfficientDynamoDb.Operations; + +namespace EfficientDynamoDb.Internal.Operations.Streams +{ + internal class ListStreamsHttpContent : DynamoDbHttpContent + { + private readonly ListStreamsRequest _request; + private readonly string? _tablePrefix; + + public ListStreamsHttpContent(ListStreamsRequest request, string? tablePrefix) : base("DynamoDBStreams_20120810.ListStreams") + { + _request = request; + _tablePrefix = tablePrefix; + } + + protected override ValueTask WriteDataAsync(DdbWriter ddbWriter) + { + var writer = ddbWriter.JsonWriter; + writer.WriteStartObject(); + + if (_request.ExclusiveStartStreamArn != null) + writer.WriteString("ExclusiveStartStreamArn", _request.ExclusiveStartStreamArn); + + if (_request.Limit > 0) + writer.WriteNumber("Limit", _request.Limit); + + if (_request.TableName != null) + writer.WriteTableName(_tablePrefix, _request.TableName); + + writer.WriteEndObject(); + + return default; + } + } +} \ No newline at end of file diff --git a/src/EfficientDynamoDb/Internal/TypeParsers/EnumParser.cs b/src/EfficientDynamoDb/Internal/TypeParsers/EnumParser.cs index d56e0242..d6a3e58d 100644 --- a/src/EfficientDynamoDb/Internal/TypeParsers/EnumParser.cs +++ b/src/EfficientDynamoDb/Internal/TypeParsers/EnumParser.cs @@ -9,8 +9,8 @@ public static class EnumParser [MethodImpl(MethodImplOptions.AggressiveInlining)] public static bool TryParseCaseInsensitive(string? value, out TEnum result) where TEnum : struct, Enum => Enum.TryParse(value, out result) || Enum.TryParse(value, true, out result); - - public static bool TryParseUpperSnakeCase(string? value, out TEnum result) where TEnum : struct, Enum + + internal static bool TryParseUpperSnakeCase(string? value, out TEnum result) where TEnum : struct, Enum { if (value == null) { @@ -20,7 +20,7 @@ public static bool TryParseUpperSnakeCase(string? value, out TEnum result Span buffer = stackalloc char[value.Length]; var sb = new NoAllocStringBuilder(in buffer, true); - + var isNextUpper = false; foreach (var c in value) { @@ -34,8 +34,13 @@ public static bool TryParseUpperSnakeCase(string? value, out TEnum result sb.Append(nextChar); isNextUpper = false; } - + return Enum.TryParse(sb.ToString(), true, out result); } + + internal static TEnum ParseUpperSnakeCase(string? value) where TEnum : struct, Enum + { + return TryParseUpperSnakeCase(value, out TEnum result) ? result : default; + } } } \ No newline at end of file diff --git a/src/EfficientDynamoDb/Operations/DescribeStream/DescribeStreamRequest.cs b/src/EfficientDynamoDb/Operations/DescribeStream/DescribeStreamRequest.cs new file mode 100644 index 00000000..de5f1bb8 --- /dev/null +++ b/src/EfficientDynamoDb/Operations/DescribeStream/DescribeStreamRequest.cs @@ -0,0 +1,25 @@ +namespace EfficientDynamoDb.Operations +{ + public class DescribeStreamRequest + { + /// + /// The Amazon Resource Name (ARN) for the stream. + /// + /// Required + /// + /// + public string StreamArn { get; set; } = null!; + + /// + /// The shard ID of the first item that this operation will evaluate. + /// Use the value that was returned for in the previous operation. + /// + public string? ExclusiveStartShardId { get; set; } + + /// + /// The maximum number of shard objects to return. + /// The upper limit is 100. + /// + public int Limit { get; set; } + } +} \ No newline at end of file diff --git a/src/EfficientDynamoDb/Operations/DescribeStream/DescribeStreamResponse.cs b/src/EfficientDynamoDb/Operations/DescribeStream/DescribeStreamResponse.cs new file mode 100644 index 00000000..53c4e48c --- /dev/null +++ b/src/EfficientDynamoDb/Operations/DescribeStream/DescribeStreamResponse.cs @@ -0,0 +1,16 @@ +namespace EfficientDynamoDb.Operations +{ + public class DescribeStreamResponse + { + /// + /// A complete description of the stream, including its creation date and time, the DynamoDB table associated with the stream, + /// the shard IDs within the stream, and the beginning and ending sequence numbers of stream records within the shards. + /// + public StreamDescription StreamDescription { get; } + + public DescribeStreamResponse(StreamDescription streamDescription) + { + StreamDescription = streamDescription; + } + } +} \ No newline at end of file diff --git a/src/EfficientDynamoDb/Operations/DescribeStream/SequenceNumberRange.cs b/src/EfficientDynamoDb/Operations/DescribeStream/SequenceNumberRange.cs new file mode 100644 index 00000000..37f81347 --- /dev/null +++ b/src/EfficientDynamoDb/Operations/DescribeStream/SequenceNumberRange.cs @@ -0,0 +1,23 @@ +namespace EfficientDynamoDb.Operations +{ + public class SequenceNumberRange + { + /// + /// The first sequence number for the stream records contained within a shard. + /// String contains numeric characters only. + /// + public string StartingSequenceNumber { get; } + + /// + /// The last sequence number for the stream records contained within a shard. + /// String contains numeric characters only. + /// + public string EndingSequenceNumber { get; } + + public SequenceNumberRange(string startingSequenceNumber, string endingSequenceNumber) + { + StartingSequenceNumber = startingSequenceNumber; + EndingSequenceNumber = endingSequenceNumber; + } + } +} \ No newline at end of file diff --git a/src/EfficientDynamoDb/Operations/DescribeStream/Shard.cs b/src/EfficientDynamoDb/Operations/DescribeStream/Shard.cs new file mode 100644 index 00000000..2d6eec7e --- /dev/null +++ b/src/EfficientDynamoDb/Operations/DescribeStream/Shard.cs @@ -0,0 +1,27 @@ +namespace EfficientDynamoDb.Operations +{ + public class Shard + { + /// + /// The shard ID of the current shard's parent. + /// + public string ParentShardId { get; } + + /// + /// The range of possible sequence numbers for the shard. + /// + public SequenceNumberRange SequenceNumberRange { get; } + + /// + /// The system-generated identifier for this shard. + /// + public string ShardId { get; } + + public Shard(string parentShardId, SequenceNumberRange sequenceNumberRange, string shardId) + { + ParentShardId = parentShardId; + SequenceNumberRange = sequenceNumberRange; + ShardId = shardId; + } + } +} \ No newline at end of file diff --git a/src/EfficientDynamoDb/Operations/DescribeStream/StreamDescription.cs b/src/EfficientDynamoDb/Operations/DescribeStream/StreamDescription.cs new file mode 100644 index 00000000..c592beac --- /dev/null +++ b/src/EfficientDynamoDb/Operations/DescribeStream/StreamDescription.cs @@ -0,0 +1,65 @@ +using System; +using System.Collections.Generic; +using EfficientDynamoDb.Operations.DescribeTable.Models; +using EfficientDynamoDb.Operations.DescribeTable.Models.Enums; + +namespace EfficientDynamoDb.Operations +{ + public class StreamDescription + { + /// + /// The date and time when the request to create this stream was issued. + /// + public DateTime CreationRequestDateTime { get; set; } + + /// + /// The key attribute(s) of the stream's DynamoDB table. + /// + public IReadOnlyList KeySchema { get; set; } = Array.Empty(); + + /// + /// + /// The shard ID of the item where the operation stopped, inclusive of the previous result set. + /// Use this value to start a new operation, excluding this value in the new request. + /// + /// + /// If is empty, then the "last page" of results has been processed and there is currently no more data to be retrieved. + /// + /// + /// If is not empty, it does not necessarily mean that there is more data in the result set. + /// The only way to know when you have reached the end of the result set is when LastEvaluatedShardId is empty. + /// + /// + public string? LastEvaluatedShardId { get; set; } + + /// + /// The shards that comprise the stream. + /// + public IReadOnlyList Shards { get; set; } = Array.Empty(); + + /// + /// The Amazon Resource Name (ARN) for the stream. + /// + public string StreamArn { get; set; } = null!; + + /// + /// A timestamp, in ISO 8601 format, for this stream. + /// + public string StreamLabel { get; set; } = null!; + + /// + /// Indicates the current status of the stream. + /// + public StreamStatus StreamStatus { get; set; } + + /// + /// Indicates the format of the records within this stream. + /// + public StreamViewType StreamViewType { get; set; } + + /// + /// The DynamoDB table with which the stream is associated. + /// + public string TableName { get; set; } = null!; + } +} \ No newline at end of file diff --git a/src/EfficientDynamoDb/Operations/DescribeStream/StreamStatus.cs b/src/EfficientDynamoDb/Operations/DescribeStream/StreamStatus.cs new file mode 100644 index 00000000..71d867f2 --- /dev/null +++ b/src/EfficientDynamoDb/Operations/DescribeStream/StreamStatus.cs @@ -0,0 +1,26 @@ +namespace EfficientDynamoDb.Operations +{ + /// + /// Indicates the current status of the stream + /// + public enum StreamStatus + { + Undefined = 0, + /// + /// Streams is currently being enabled on the DynamoDB table. + /// + Enabling = 10, + /// + /// The stream is enabled. + /// + Enabled = 20, + /// + /// Streams is currently being disabled on the DynamoDB table. + /// + Disabling = 30, + /// + /// The stream is disabled. + /// + Disabled = 40, + } +} \ No newline at end of file diff --git a/src/EfficientDynamoDb/Operations/DescribeTable/Models/Enums/StreamViewType.cs b/src/EfficientDynamoDb/Operations/DescribeTable/Models/Enums/StreamViewType.cs index 694e0a90..bbf05350 100644 --- a/src/EfficientDynamoDb/Operations/DescribeTable/Models/Enums/StreamViewType.cs +++ b/src/EfficientDynamoDb/Operations/DescribeTable/Models/Enums/StreamViewType.cs @@ -1,11 +1,26 @@ namespace EfficientDynamoDb.Operations.DescribeTable.Models.Enums { + /// + /// Indicates the format of the records within the stream. + /// public enum StreamViewType { Undefined = 0, + /// + /// Only the key attributes of items that were modified in the DynamoDB table. + /// KeysOnly = 10, + /// + /// Entire items from the table, as they appeared after they were modified. + /// NewImage = 20, + /// + /// Entire items from the table, as they appeared before they were modified. + /// OldImage = 30, + /// + /// Both the new and the old images of the items from the table. + /// NewAndOldImages = 40 } } \ No newline at end of file diff --git a/src/EfficientDynamoDb/Operations/GetRecords/EventName.cs b/src/EfficientDynamoDb/Operations/GetRecords/EventName.cs new file mode 100644 index 00000000..c855f04b --- /dev/null +++ b/src/EfficientDynamoDb/Operations/GetRecords/EventName.cs @@ -0,0 +1,22 @@ +namespace EfficientDynamoDb.Operations +{ + /// + /// The type of data modification that was performed on the DynamoDB table. + /// + public enum EventName + { + Undefined = 0, + /// + /// A new item was added to the table. + /// + Insert = 10, + /// + /// One or more of an existing item's attributes were modified. + /// + Modify = 20, + /// + /// The item was deleted from the table. + /// + Remove = 30 + } +} \ No newline at end of file diff --git a/src/EfficientDynamoDb/Operations/GetRecords/GetRecordsRequest.cs b/src/EfficientDynamoDb/Operations/GetRecords/GetRecordsRequest.cs new file mode 100644 index 00000000..3358c01b --- /dev/null +++ b/src/EfficientDynamoDb/Operations/GetRecords/GetRecordsRequest.cs @@ -0,0 +1,21 @@ +namespace EfficientDynamoDb.Operations +{ + public class GetRecordsRequest + { + /// + /// The maximum number of records to return from the shard. The upper limit is 1000. + /// + public int Limit { get; set; } = -1; + + /// + /// + /// A shard iterator that was retrieved from previous or . + /// This iterator can be used to access the stream records in this shard. + /// + /// + /// Required + /// + /// + public string ShardIterator { get; set; } = null!; + } +} \ No newline at end of file diff --git a/src/EfficientDynamoDb/Operations/GetRecords/GetRecordsResponse.cs b/src/EfficientDynamoDb/Operations/GetRecords/GetRecordsResponse.cs new file mode 100644 index 00000000..d4060acf --- /dev/null +++ b/src/EfficientDynamoDb/Operations/GetRecords/GetRecordsResponse.cs @@ -0,0 +1,25 @@ +using System; +using System.Collections.Generic; + +namespace EfficientDynamoDb.Operations +{ + public class GetRecordsResponse + { + /// + /// The next position in the shard from which to start sequentially reading stream records. + /// If set to null, the shard has been closed and the requested iterator will not return any more data. + /// + public string? NextShardIterator { get; } + + /// + /// The stream records from the shard, which were retrieved using the shard iterator. + /// + public IReadOnlyList Records { get; } + + public GetRecordsResponse(string? nextShardIterator, IReadOnlyList? records) + { + NextShardIterator = nextShardIterator; + Records = records ?? Array.Empty(); + } + } +} \ No newline at end of file diff --git a/src/EfficientDynamoDb/Operations/GetRecords/Record.cs b/src/EfficientDynamoDb/Operations/GetRecords/Record.cs new file mode 100644 index 00000000..503540ae --- /dev/null +++ b/src/EfficientDynamoDb/Operations/GetRecords/Record.cs @@ -0,0 +1,67 @@ +using System.Text.Json.Serialization; + +namespace EfficientDynamoDb.Operations +{ + /// + /// A description of a unique event within a stream. + /// + public class Record + { + /// + /// The region in which the GetRecords request was received. + /// + [JsonPropertyName("awsRegion")] + public string AwsRegion { get; set; } = ""; + + /// + /// The main body of the stream record, containing all of the DynamoDB-specific fields. + /// + [JsonPropertyName("dynamodb")] + public StreamRecord DynamoDb { get; set; } = null!; + + /// + /// A globally unique identifier for the event that was recorded in this stream record. + /// + [JsonPropertyName("eventID")] + public string EventId { get; set; } = ""; + + /// + /// The type of data modification that was performed on the DynamoDB table. + /// + [JsonPropertyName("eventName")] + public EventName EventName { get; set; } + + /// + /// The AWS service from which the stream record originated. For DynamoDB Streams, this is "aws:dynamodb". + /// + [JsonPropertyName("eventSource")] + public string EventSource { get; set; } = ""; + + /// + /// + /// The version number of the stream record format. This number is updated whenever the structure of is modified. + /// + /// + /// Client applications must not assume that eventVersion will remain at a particular value, as this number is subject to change at any time. + /// In general, eventVersion will only increase as the low-level DynamoDB Streams API evolves. + /// + /// + [JsonPropertyName("eventVersion")] + public string EventVersion { get; set; } = ""; + + /// + /// Items that are deleted by the Time to Live process after expiration have the following fields: + /// + /// + /// set to "Service" + /// + /// + /// set to "dynamodb.amazonaws.com" + /// + /// + /// The field is null for all other records. + /// + [JsonPropertyName("userIdentity")] + public UserIdentity? UserIdentity { get; set; } + } +} \ No newline at end of file diff --git a/src/EfficientDynamoDb/Operations/GetRecords/StreamRecord.cs b/src/EfficientDynamoDb/Operations/GetRecords/StreamRecord.cs new file mode 100644 index 00000000..f2170af7 --- /dev/null +++ b/src/EfficientDynamoDb/Operations/GetRecords/StreamRecord.cs @@ -0,0 +1,49 @@ +using System; +using EfficientDynamoDb.DocumentModel; +using EfficientDynamoDb.Operations.DescribeTable.Models.Enums; + +namespace EfficientDynamoDb.Operations +{ + /// + /// A description of a single data modification that was performed on an item in a DynamoDB table. + /// + public class StreamRecord + { + /// + /// The approximate date and time when the stream record was created, rounded to the nearest second. + /// + public DateTime ApproximateCreationDateTime { get; set; } + + /// + /// The primary key attribute(s) for the DynamoDB item that was modified. + /// + public Document Keys { get; set; } = null!; + + /// + /// The item in the DynamoDB table as it appeared after it was modified. + /// This property is present only if is set to or + /// + public Document? NewImage { get; set; } + + /// + /// The item in the DynamoDB table as it appeared before it was modified. + /// This property is present only if is set to or + /// + public Document? OldImage { get; set; } + + /// + /// The sequence number of the stream record. + /// + public string SequenceNumber { get; set; } = ""; + + /// + /// The size of the stream record, in bytes. + /// + public int SizeBytes { get; set; } + + /// + /// The type of data from the modified DynamoDB item that was captured in this stream record. + /// + public StreamViewType StreamViewType { get; set; } + } +} \ No newline at end of file diff --git a/src/EfficientDynamoDb/Operations/GetRecords/UserIdentity.cs b/src/EfficientDynamoDb/Operations/GetRecords/UserIdentity.cs new file mode 100644 index 00000000..1d5c4e13 --- /dev/null +++ b/src/EfficientDynamoDb/Operations/GetRecords/UserIdentity.cs @@ -0,0 +1,20 @@ +namespace EfficientDynamoDb.Operations +{ + /// + /// Contains details about the type of identity that made the request. + /// + public class UserIdentity + { + /// + /// A unique identifier for the entity that made the call. + /// For Time To Live, the principalId is "dynamodb.amazonaws.com". + /// + public string PrincipalId { get; set; } = ""; + + /// + /// The type of the identity. + /// For Time To Live, the type is "Service". + /// + public string Type { get; set; } = ""; + } +} \ No newline at end of file diff --git a/src/EfficientDynamoDb/Operations/GetShardIterator/GetShardIteratorRequest.cs b/src/EfficientDynamoDb/Operations/GetShardIterator/GetShardIteratorRequest.cs new file mode 100644 index 00000000..d5c70eca --- /dev/null +++ b/src/EfficientDynamoDb/Operations/GetShardIterator/GetShardIteratorRequest.cs @@ -0,0 +1,37 @@ +namespace EfficientDynamoDb.Operations +{ + public class GetShardIteratorRequest + { + /// + /// + /// The identifier of the shard. + /// The iterator will be returned for this shard ID. + /// + /// + /// Required + /// + /// + public string ShardId { get; set; } = null!; + + /// + /// The sequence number of a stream record in the shard from which to start reading. + /// + public string? SequenceNumber { get; set; } + + /// + /// The Amazon Resource Name (ARN) for the stream. + /// + /// Required + /// + /// + public string StreamArn { get; set; } = null!; + + /// + /// Determines how the shard iterator is used to start reading stream records from the shard. + /// + /// Required + /// + /// + public ShardIteratorType ShardIteratorType { get; set; } + } +} \ No newline at end of file diff --git a/src/EfficientDynamoDb/Operations/GetShardIterator/GetShardIteratorResponse.cs b/src/EfficientDynamoDb/Operations/GetShardIterator/GetShardIteratorResponse.cs new file mode 100644 index 00000000..54a114d7 --- /dev/null +++ b/src/EfficientDynamoDb/Operations/GetShardIterator/GetShardIteratorResponse.cs @@ -0,0 +1,11 @@ +namespace EfficientDynamoDb.Operations +{ + public class GetShardIteratorResponse + { + /// + /// The position in the shard from which to start reading stream records sequentially. + /// A shard iterator specifies this position using the sequence number of a stream record in a shard. + /// + public string ShardIterator { get; set; } = null!; + } +} \ No newline at end of file diff --git a/src/EfficientDynamoDb/Operations/GetShardIterator/ShardIteratorType.cs b/src/EfficientDynamoDb/Operations/GetShardIterator/ShardIteratorType.cs new file mode 100644 index 00000000..063b5e9c --- /dev/null +++ b/src/EfficientDynamoDb/Operations/GetShardIterator/ShardIteratorType.cs @@ -0,0 +1,28 @@ +namespace EfficientDynamoDb.Operations +{ + /// + /// Determines how the shard iterator is used to start reading stream records from the shard. + /// + public enum ShardIteratorType + { + Undefined = 0, + /// + /// Start reading exactly from the position denoted by a specific sequence number + /// + AtSequenceNumber = 10, + /// + /// Start reading right after the position denoted by a specific sequence number + /// + AfterSequenceNumber = 20, + /// + /// Start reading at the last (untrimmed) stream record, which is the oldest record in the shard. + /// In DynamoDB Streams, there is a 24 hour limit on data retention. + /// Stream records whose age exceeds this limit are subject to removal (trimming) from the stream. + /// + TrimHorizon = 30, + /// + /// Start reading just after the most recent stream record in the shard, so that you always read the most recent data in the shard. + /// + Latest = 40 + } +} \ No newline at end of file diff --git a/src/EfficientDynamoDb/Operations/ListStreams/ListStreamsRequest.cs b/src/EfficientDynamoDb/Operations/ListStreams/ListStreamsRequest.cs new file mode 100644 index 00000000..6183f1d9 --- /dev/null +++ b/src/EfficientDynamoDb/Operations/ListStreams/ListStreamsRequest.cs @@ -0,0 +1,23 @@ +namespace EfficientDynamoDb.Operations +{ + public class ListStreamsRequest + { + /// + /// The ARN (Amazon Resource Name) of the first item that this operation will evaluate. + /// Use the value that was returned for in the previous operation. + /// + public string? ExclusiveStartStreamArn { get; set; } + + /// + /// The maximum number of streams to return. + /// Values less or equal to zero are interpreted as no limit. + /// The upper limit is 100. + /// + public int Limit { get; set; } = -1; + + /// + /// If this parameter is provided, then only the streams associated with this table name are returned. + /// + public string? TableName { get; set; } + } +} \ No newline at end of file diff --git a/src/EfficientDynamoDb/Operations/ListStreams/ListStreamsResponse.cs b/src/EfficientDynamoDb/Operations/ListStreams/ListStreamsResponse.cs new file mode 100644 index 00000000..ca6cd4d2 --- /dev/null +++ b/src/EfficientDynamoDb/Operations/ListStreams/ListStreamsResponse.cs @@ -0,0 +1,24 @@ +using System; +using System.Collections.Generic; + +namespace EfficientDynamoDb.Operations +{ + public class ListStreamsResponse + { + /// + /// The stream ARN of the item where the operation stopped, inclusive of the previous result set. + /// Use this value to start a new operation, excluding this value in the new request. + /// + /// + /// If is empty, then the "last page" of results has been processed and there is no more data to be retrieved. + /// If is not empty, it does not necessarily mean that there is more data in the result set. + /// The only way to know when you have reached the end of the result set is when is empty. + /// + public string? LastEvaluatedStreamArn { get; set; } + + /// + /// A list of stream descriptors. + /// + public IReadOnlyList Streams { get; set; } = Array.Empty(); + } +} \ No newline at end of file diff --git a/src/EfficientDynamoDb/Operations/ListStreams/StreamInfo.cs b/src/EfficientDynamoDb/Operations/ListStreams/StreamInfo.cs new file mode 100644 index 00000000..0fc635d7 --- /dev/null +++ b/src/EfficientDynamoDb/Operations/ListStreams/StreamInfo.cs @@ -0,0 +1,20 @@ +namespace EfficientDynamoDb.Operations +{ + public class StreamInfo + { + /// + /// The Amazon Resource Name (ARN) for the stream. + /// + public string StreamArn { get; set; } = ""; + + /// + /// A timestamp, in ISO 8601 format, for this stream. + /// + public string StreamLabel { get; set; } = ""; + + /// + /// The DynamoDB table with which the stream is associated. + /// + public string TableName { get; set; } = ""; + } +} \ No newline at end of file