diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index ab1c320..167cf2c 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -36,11 +36,16 @@ jobs:
uses: actions/setup-dotnet@v5
with:
global-json-file: global.json
+ # The SDK comes from global.json; these runtime installs are required
+ # because projects target net8.0 and net9.0.
dotnet-version: |
8.0.x
9.0.x
- name: NuGet package cache
+ # actions/cache can fail on Windows before restore even runs; keep the
+ # optimization on Unix runners and let Windows restore normally.
+ if: runner.os != 'Windows'
uses: actions/cache@v5
with:
path: ~/.nuget/packages
@@ -80,9 +85,23 @@ jobs:
- uses: actions/setup-dotnet@v5
with:
global-json-file: global.json
+ # dotnet format loads all target frameworks, so install both runtimes
+ # even though global.json chooses the SDK.
+ dotnet-version: |
+ 8.0.x
+ 9.0.x
+
+ - name: Restore
+ run: dotnet restore IcebergSharp.slnx
+
+ - name: Verify whitespace formatting
+ run: dotnet format whitespace IcebergSharp.slnx --verify-no-changes --no-restore
+
+ - name: Verify code style
+ run: dotnet format style IcebergSharp.slnx --verify-no-changes --severity info --no-restore
- - name: Verify formatting
- run: dotnet format IcebergSharp.slnx --verify-no-changes --severity info
+ - name: Verify analyzer fixes
+ run: dotnet format analyzers IcebergSharp.slnx --verify-no-changes --severity info --no-restore
pack:
name: pack (dry run)
diff --git a/.gitignore b/.gitignore
index 455a0dd..3e522cf 100644
--- a/.gitignore
+++ b/.gitignore
@@ -8,6 +8,7 @@ publish/
*.suo
*.userosscache
*.sln.docstates
+*.lscache
# IDE
.vs/
diff --git a/CHANGELOG.md b/CHANGELOG.md
index cda90bf..376f1fa 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -14,5 +14,17 @@ the 0.x line may include breaking changes; they will always be called out under
- Phase 0 scaffolding: solution layout, CI, license, scope and roadmap.
- Phase 1 core types and metadata: `Schema`, `IcebergType` hierarchy,
`TableMetadata` JSON parser, and fixture-based round-trip tests.
+- Phase 2 Avro manifest reading: stream-based manifest-list and manifest readers,
+ Avro OCF `null` / `deflate` codecs, dynamic schema parsing, and Phase 2 smoke
+ coverage from metadata to manifests.
+
+### Changed
+- CI now enforces whitespace, style, and analyzer fixes explicitly with
+ `dotnet format --verify-no-changes`.
+
+### Fixed
+- Hardened Avro decoding and schema parsing against truncated schema JSON,
+ oversized encoded lengths, invalid block headers, and invalid logical-type
+ annotations.
[Unreleased]: https://github.com/AndreaBozzo/IcebergSharp/commits/main
diff --git a/README.md b/README.md
index 3070eea..b78db55 100644
--- a/README.md
+++ b/README.md
@@ -31,8 +31,10 @@ doesn't expose Iceberg's metadata to them. There's no native client that gives a
no embedded query engine — just metadata and Arrow batches you can hand to
DuckDB.NET, ML.NET, or Power BI.
-> **Status:** Phase 0 — repository scaffolding only. No public API yet. See the
-> [roadmap](#roadmap) for what is coming and when.
+> **Status:** Phase 2 development. Core Iceberg metadata parsing and stream-based
+> Avro manifest / manifest-list readers are implemented and covered by unit
+> tests. Catalog, scan planning, file IO, and Parquet data reads are still on the
+> roadmap.
---
@@ -133,6 +135,13 @@ var lastWeek = table.NewScan()
See [docs/compatibility-matrix.md](docs/compatibility-matrix.md) for the up-to-date
matrix of supported catalogs, table-format versions, and storage backends.
+Current implemented surface:
+
+- `IcebergSharp.Core`: Iceberg v1/v2 table metadata, schemas, partition specs,
+ sort orders, snapshots, and manifest domain models.
+- `IcebergSharp.Avro`: stream-based Avro OCF readers for Iceberg manifest lists
+ and manifests, including `null` and `deflate` codecs.
+
Target servers for v1:
- Apache Polaris (reference implementation)
@@ -148,8 +157,8 @@ Target servers for v1:
| Phase | Weeks | Deliverable |
| --- | --- | --- |
| 0. Scaffolding | done | Repo, CI, license, solution layout |
-| 1. Core types & metadata | 1-2 | `Schema`, `TableMetadata`, JSON parser |
-| 2. Avro manifest reader | 3-4 | Custom mini Avro OCF reader for manifests |
+| 1. Core types & metadata | done | `Schema`, `TableMetadata`, JSON parser |
+| 2. Avro manifest reader | in progress | Custom mini Avro OCF reader for manifests |
| 3. REST catalog client | 5-6 | OAuth2 / Bearer / SigV4, dynamic discovery |
| 4. Scan planning & pruning | 7-8 | Partition + stats pruning, residual filters |
| 5. Parquet + schema evolution | 9-10 | Field-id resolution, add/drop/rename column |
@@ -180,9 +189,9 @@ column stats, and streams Parquet rows with field-id resolution.
## Contributing
-See [CONTRIBUTING.md](CONTRIBUTING.md). The project is in early development — the
-fastest way to help is to try the prerelease packages once Phase 1 ships and report
-incompatibilities against your specific catalog.
+See [CONTRIBUTING.md](CONTRIBUTING.md). The project is in early development; the
+fastest way to help right now is to try the metadata and manifest readers against
+real Iceberg tables and report incompatible schemas, codecs, or manifest shapes.
---
diff --git a/docs/compatibility-matrix.md b/docs/compatibility-matrix.md
index cf6a16d..e941ad0 100644
--- a/docs/compatibility-matrix.md
+++ b/docs/compatibility-matrix.md
@@ -1,6 +1,7 @@
# Compatibility matrix
-> Last updated: Phase 0 — entries marked _planned_ are targets, not validated yet.
+> Last updated: Phase 2 development. Entries marked _planned_ are targets, not
+> validated yet.
## Catalogs
@@ -18,8 +19,8 @@
| Spec version | Status |
| --- | --- |
-| v1 | 🟡 planned for Phase 1 |
-| v2 | 🟡 planned for Phase 1 (primary target) |
+| v1 | ✅ metadata JSON + Avro manifests covered by unit fixtures |
+| v2 | ✅ metadata JSON + Avro manifests covered by unit fixtures |
| v3 | 🟢 stretch goal — depends on spec stability |
## Storage backends
@@ -36,6 +37,7 @@
| Format | Read | Write |
| --- | --- | --- |
+| Iceberg manifest Avro OCF | ✅ `null` + `deflate` codecs | ⛔ out of scope for v1 |
| Parquet | 🟡 planned for Phase 5 | ⛔ out of scope for v1 |
| ORC | â›” out of scope for v1 | â›” out of scope for v1 |
| Avro (data files, not manifests) | â›” out of scope for v1 | â›” out of scope for v1 |
@@ -44,6 +46,9 @@
| Feature | Status |
| --- | --- |
+| Table metadata JSON parsing | ✅ validated with v1/v2 fixtures |
+| Manifest-list reading | ✅ stream-based Avro OCF reader |
+| Manifest reading | ✅ stream-based Avro OCF reader |
| Schema evolution (add / drop / rename / promote) | 🟡 planned for Phase 5 |
| Partition spec evolution | 🟡 planned for Phase 4 |
| Snapshot isolation / time travel | 🟡 planned for Phase 4 |
diff --git a/src/IcebergSharp.Avro/Internal/Codec/DeflateCodec.cs b/src/IcebergSharp.Avro/Internal/Codec/DeflateCodec.cs
new file mode 100644
index 0000000..b5917cb
--- /dev/null
+++ b/src/IcebergSharp.Avro/Internal/Codec/DeflateCodec.cs
@@ -0,0 +1,56 @@
+using System.Buffers;
+using System.IO.Compression;
+
+namespace IcebergSharp.Avro.Internal.Codec;
+
+///
+/// Avro's deflate codec is raw DEFLATE (no zlib header). Wraps
+/// in decompress mode over the source bytes.
+///
+internal sealed class DeflateCodec : IBlockCodec
+{
+ public static DeflateCodec Instance { get; } = new();
+ private DeflateCodec() { }
+
+ public string Name => "deflate";
+
+ public int Decode(ReadOnlySpan source, ref byte[] destination)
+ {
+ // System.IO.Compression doesn't accept ReadOnlySpan; copy the source
+ // into a rented buffer-backed MemoryStream. Renting is cheap relative
+ // to the decompression itself.
+ var sourceArr = ArrayPool.Shared.Rent(source.Length);
+ try
+ {
+ source.CopyTo(sourceArr);
+ using var src = new MemoryStream(sourceArr, 0, source.Length, writable: false);
+ using var deflate = new DeflateStream(src, CompressionMode.Decompress, leaveOpen: false);
+
+ var totalWritten = 0;
+ while (true)
+ {
+ if (totalWritten >= destination.Length)
+ {
+ var grown = ArrayPool.Shared.Rent(destination.Length * 2);
+ Buffer.BlockCopy(destination, 0, grown, 0, totalWritten);
+ ArrayPool.Shared.Return(destination);
+ destination = grown;
+ }
+
+ var n = deflate.Read(destination, totalWritten, destination.Length - totalWritten);
+ if (n == 0)
+ {
+ break;
+ }
+
+ totalWritten += n;
+ }
+
+ return totalWritten;
+ }
+ finally
+ {
+ ArrayPool.Shared.Return(sourceArr);
+ }
+ }
+}
diff --git a/src/IcebergSharp.Avro/Internal/Codec/IBlockCodec.cs b/src/IcebergSharp.Avro/Internal/Codec/IBlockCodec.cs
new file mode 100644
index 0000000..66543f6
--- /dev/null
+++ b/src/IcebergSharp.Avro/Internal/Codec/IBlockCodec.cs
@@ -0,0 +1,14 @@
+namespace IcebergSharp.Avro.Internal.Codec;
+
+internal interface IBlockCodec
+{
+ /// Codec name as it appears in the OCF header (null, deflate).
+ string Name { get; }
+
+ ///
+ /// Decodes into , resizing
+ /// via the array pool if it isn't large enough.
+ /// Returns the number of valid bytes in .
+ ///
+ int Decode(ReadOnlySpan source, ref byte[] destination);
+}
diff --git a/src/IcebergSharp.Avro/Internal/Codec/NullCodec.cs b/src/IcebergSharp.Avro/Internal/Codec/NullCodec.cs
new file mode 100644
index 0000000..4325f2d
--- /dev/null
+++ b/src/IcebergSharp.Avro/Internal/Codec/NullCodec.cs
@@ -0,0 +1,23 @@
+using System.Buffers;
+
+namespace IcebergSharp.Avro.Internal.Codec;
+
+internal sealed class NullCodec : IBlockCodec
+{
+ public static NullCodec Instance { get; } = new();
+ private NullCodec() { }
+
+ public string Name => "null";
+
+ public int Decode(ReadOnlySpan source, ref byte[] destination)
+ {
+ if (destination.Length < source.Length)
+ {
+ ArrayPool.Shared.Return(destination);
+ destination = ArrayPool.Shared.Rent(source.Length);
+ }
+
+ source.CopyTo(destination);
+ return source.Length;
+ }
+}
diff --git a/src/IcebergSharp.Avro/Internal/Decode/AvroToIcebergType.cs b/src/IcebergSharp.Avro/Internal/Decode/AvroToIcebergType.cs
new file mode 100644
index 0000000..7206c4e
--- /dev/null
+++ b/src/IcebergSharp.Avro/Internal/Decode/AvroToIcebergType.cs
@@ -0,0 +1,68 @@
+using IcebergSharp.Avro.Internal.Schema;
+using IcebergSharp.Types;
+
+namespace IcebergSharp.Avro.Internal.Decode;
+
+///
+/// Maps an node back to the Iceberg type that produced
+/// it. Only used for partition columns — the manifest's partition record has a
+/// dynamic schema and Phase 4 wants s on the boxed
+/// partition values.
+///
+internal static class AvroToIcebergType
+{
+ public static IcebergType Resolve(AvroSchema schema)
+ {
+ // Peel off a nullable union to get to the carrier.
+ if (schema is AvroUnion u)
+ {
+ schema = u.NonNull;
+ }
+
+ return schema switch
+ {
+ AvroPrimitive p => ResolvePrimitive(p),
+ AvroFixed f => ResolveFixed(f),
+ _ => throw new NotSupportedException($"partition columns cannot have Avro schema kind {schema.GetType().Name}"),
+ };
+ }
+
+ private static IcebergType ResolvePrimitive(AvroPrimitive p)
+ {
+ return p.LogicalType switch
+ {
+ AvroLogicalType.Date => DateType.Instance,
+ AvroLogicalType.TimeMicros or AvroLogicalType.TimeMillis => TimeType.Instance,
+ AvroLogicalType.TimestampMicros => TimestampTzType.Instance,
+ AvroLogicalType.TimestampMillis => TimestampType.Instance,
+ AvroLogicalType.Uuid => UuidType.Instance,
+ AvroLogicalType.Decimal => new DecimalType(p.DecimalPrecision, p.DecimalScale),
+ _ => p.Kind switch
+ {
+ AvroPrimitiveKind.Boolean => BooleanType.Instance,
+ AvroPrimitiveKind.Int => IntType.Instance,
+ AvroPrimitiveKind.Long => LongType.Instance,
+ AvroPrimitiveKind.Float => FloatType.Instance,
+ AvroPrimitiveKind.Double => DoubleType.Instance,
+ AvroPrimitiveKind.Bytes => BinaryType.Instance,
+ AvroPrimitiveKind.String => StringType.Instance,
+ _ => throw new NotSupportedException($"unsupported Avro primitive {p.Kind} for partition column"),
+ },
+ };
+ }
+
+ private static IcebergType ResolveFixed(AvroFixed f)
+ {
+ if (f.LogicalType == AvroLogicalType.Decimal)
+ {
+ return new DecimalType(f.DecimalPrecision, f.DecimalScale);
+ }
+
+ if (f.LogicalType == AvroLogicalType.Uuid || f.Size == 16)
+ {
+ return UuidType.Instance;
+ }
+
+ return new FixedType(f.Size);
+ }
+}
diff --git a/src/IcebergSharp.Avro/Internal/Decode/BinaryDecoder.cs b/src/IcebergSharp.Avro/Internal/Decode/BinaryDecoder.cs
new file mode 100644
index 0000000..66a3175
--- /dev/null
+++ b/src/IcebergSharp.Avro/Internal/Decode/BinaryDecoder.cs
@@ -0,0 +1,185 @@
+using System.Buffers.Binary;
+using System.Text;
+using IcebergSharp.Avro.Internal.Errors;
+
+namespace IcebergSharp.Avro.Internal.Decode;
+
+///
+/// Cursor over an in-memory Avro binary block. Read methods advance
+/// ; all primitives are decoded directly out of the
+/// underlying span — no allocation per read except where the caller asks for a
+/// materialised string/byte[].
+///
+internal ref struct BinaryDecoder
+{
+ private readonly ReadOnlySpan _buffer;
+ private int _position;
+
+ public BinaryDecoder(ReadOnlySpan buffer)
+ {
+ _buffer = buffer;
+ _position = 0;
+ }
+
+ public readonly int Position => _position;
+ public readonly int Length => _buffer.Length;
+ public readonly bool EndOfBuffer => _position >= _buffer.Length;
+
+ public bool ReadBoolean()
+ {
+ EnsureBytes(1);
+ var b = _buffer[_position++];
+ if (b > 1)
+ {
+ throw new AvroFormatException($"boolean must be 0 or 1, got 0x{b:X2}");
+ }
+
+ return b == 1;
+ }
+
+ public int ReadInt()
+ {
+ var value = ReadLong();
+ if (value is < int.MinValue or > int.MaxValue)
+ {
+ throw new AvroFormatException($"int value {value} is outside Int32 range");
+ }
+
+ return (int)value;
+ }
+
+ public long ReadLong()
+ {
+ // Zig-zag varint, little-endian, 7 bits per byte. At most 10 bytes for a long.
+ long value = 0;
+ var shift = 0;
+ while (true)
+ {
+ if (_position >= _buffer.Length)
+ {
+ throw new AvroFormatException("truncated varint");
+ }
+
+ var b = _buffer[_position++];
+ value |= (long)(b & 0x7F) << shift;
+ if ((b & 0x80) == 0)
+ {
+ break;
+ }
+
+ shift += 7;
+ if (shift > 63)
+ {
+ throw new AvroFormatException("varint too long for long");
+ }
+ }
+
+ // Zig-zag decode.
+ return (long)((ulong)value >> 1) ^ -(value & 1L);
+ }
+
+ public float ReadFloat()
+ {
+ EnsureBytes(4);
+ var v = BinaryPrimitives.ReadSingleLittleEndian(_buffer.Slice(_position, 4));
+ _position += 4;
+ return v;
+ }
+
+ public double ReadDouble()
+ {
+ EnsureBytes(8);
+ var v = BinaryPrimitives.ReadDoubleLittleEndian(_buffer.Slice(_position, 8));
+ _position += 8;
+ return v;
+ }
+
+ /// Reads the next length-prefixed byte sequence and returns a span into the underlying buffer (no copy).
+ public ReadOnlySpan ReadBytesSpan()
+ {
+ var len = ReadLong();
+ if (len < 0)
+ {
+ throw new AvroFormatException($"negative bytes length {len}");
+ }
+
+ if (len > int.MaxValue)
+ {
+ throw new AvroFormatException($"bytes length {len} exceeds supported maximum {int.MaxValue}");
+ }
+
+ if (len == 0)
+ {
+ return [];
+ }
+
+ EnsureBytes((int)len);
+ ReadOnlySpan span = _buffer.Slice(_position, (int)len);
+ _position += (int)len;
+ return span;
+ }
+
+ public string ReadString()
+ {
+ ReadOnlySpan bytes = ReadBytesSpan();
+ return bytes.IsEmpty ? string.Empty : Encoding.UTF8.GetString(bytes);
+ }
+
+ public ReadOnlySpan ReadFixed(int size)
+ {
+ if (size < 0)
+ {
+ throw new AvroFormatException($"fixed size must be non-negative, got {size}");
+ }
+
+ EnsureBytes(size);
+ ReadOnlySpan span = _buffer.Slice(_position, size);
+ _position += size;
+ return span;
+ }
+
+ ///
+ /// Reads the next block header of an array or map: returns the count of
+ /// items in this block (positive) and skips the byte-size header if the
+ /// block was prefixed with a negative count. A return of 0 terminates the
+ /// container.
+ ///
+ public long ReadBlockCount()
+ {
+ var count = ReadLong();
+ if (count < 0)
+ {
+ if (count == long.MinValue)
+ {
+ throw new AvroFormatException("array/map block count is too small to negate");
+ }
+
+ // Negative count means: -count items follow, prefixed with their byte size.
+ var byteSize = ReadLong(); // byte size, ignored — we read records by element count.
+ if (byteSize < 0)
+ {
+ throw new AvroFormatException($"negative array/map block byte size {byteSize}");
+ }
+
+ return -count;
+ }
+
+ return count;
+ }
+
+ /// Reads a union branch index (zigzag long).
+ public int ReadUnionBranch() => ReadInt();
+
+ private readonly void EnsureBytes(int count)
+ {
+ if (count < 0)
+ {
+ throw new AvroFormatException($"byte count must be non-negative, got {count}");
+ }
+
+ if (count > _buffer.Length - _position)
+ {
+ throw new AvroFormatException($"truncated buffer (need {count} bytes, have {_buffer.Length - _position})");
+ }
+ }
+}
diff --git a/src/IcebergSharp.Avro/Internal/Decode/IcebergFieldIds.cs b/src/IcebergSharp.Avro/Internal/Decode/IcebergFieldIds.cs
new file mode 100644
index 0000000..23405c0
--- /dev/null
+++ b/src/IcebergSharp.Avro/Internal/Decode/IcebergFieldIds.cs
@@ -0,0 +1,62 @@
+namespace IcebergSharp.Avro.Internal.Decode;
+
+///
+/// Iceberg spec field-ids for manifest-list and manifest schemas. Keeping them
+/// as named constants makes the sink code read close to the spec text.
+///
+///
+/// Source: Apache Iceberg specification, sections "Manifest Lists" and
+/// "Manifests". Field-ids are stable across spec versions, even where v1 and
+/// v2 use different field names — the id is the contract.
+///
+internal static class IcebergFieldIds
+{
+ // manifest_file (manifest list record)
+ public const int ManifestPath = 500;
+ public const int ManifestLength = 501;
+ public const int PartitionSpecId = 502;
+ public const int AddedSnapshotId = 503;
+ public const int AddedFilesCount = 504; // v1 int; v2 calls this added_data_files_count
+ public const int ExistingFilesCount = 505;
+ public const int DeletedFilesCount = 506;
+ public const int Partitions = 507;
+ public const int AddedRowsCount = 512;
+ public const int ExistingRowsCount = 513;
+ public const int DeletedRowsCount = 514;
+ public const int SequenceNumber = 515;
+ public const int MinSequenceNumber = 516;
+ public const int Content = 517; // manifest_file content (v2)
+ public const int KeyMetadataManifest = 519;
+
+ // field_summary (member of manifest_file.partitions array)
+ public const int FsContainsNull = 509;
+ public const int FsContainsNaN = 518;
+ public const int FsLowerBound = 510;
+ public const int FsUpperBound = 511;
+
+ // manifest_entry
+ public const int Status = 0;
+ public const int EntrySnapshotId = 1;
+ public const int DataFile = 2;
+ public const int EntrySequenceNumber = 3; // v2
+ public const int EntryFileSequenceNumber = 4; // v2
+
+ // data_file (member of manifest_entry)
+ public const int DfContent = 134; // v2
+ public const int DfFilePath = 100;
+ public const int DfFileFormat = 101;
+ public const int DfPartition = 102;
+ public const int DfRecordCount = 103;
+ public const int DfFileSizeInBytes = 104;
+ public const int DfBlockSizeInBytes = 105; // v1 only
+ public const int DfColumnSizes = 108;
+ public const int DfValueCounts = 109;
+ public const int DfNullValueCounts = 110;
+ public const int DfNanValueCounts = 137;
+ public const int DfLowerBounds = 125;
+ public const int DfUpperBounds = 128;
+ public const int DfKeyMetadata = 131;
+ public const int DfSplitOffsets = 132;
+ public const int DfEqualityIds = 135;
+ public const int DfSortOrderId = 140;
+}
diff --git a/src/IcebergSharp.Avro/Internal/Decode/ManifestEntrySink.cs b/src/IcebergSharp.Avro/Internal/Decode/ManifestEntrySink.cs
new file mode 100644
index 0000000..6110752
--- /dev/null
+++ b/src/IcebergSharp.Avro/Internal/Decode/ManifestEntrySink.cs
@@ -0,0 +1,419 @@
+using IcebergSharp.Avro.Internal.Errors;
+using IcebergSharp.Avro.Internal.Schema;
+using IcebergSharp.Types;
+
+namespace IcebergSharp.Avro.Internal.Decode;
+
+///
+/// Decodes one manifest_entry Avro record into a .
+/// Reuses a pre-resolved partition-record schema across rows so we don't walk
+/// the Avro tree per data file.
+///
+internal sealed class ManifestEntrySink
+{
+ private readonly AvroRecord _entryRecord;
+ private readonly AvroRecord _dataFileRecord;
+ private readonly AvroRecord? _partitionRecord;
+ private readonly string[] _partitionNames;
+ private readonly IcebergType[] _partitionTypes;
+
+ public ManifestEntrySink(AvroRecord entryRecord)
+ {
+ _entryRecord = entryRecord;
+ _dataFileRecord = ResolveDataFileRecord(entryRecord);
+ _partitionRecord = ResolvePartitionRecord(_dataFileRecord);
+ if (_partitionRecord is not null)
+ {
+ (_partitionNames, _partitionTypes) = PartitionRecordSink.ResolveSchema(_partitionRecord);
+ }
+ else
+ {
+ _partitionNames = [];
+ _partitionTypes = [];
+ }
+ }
+
+ public ManifestEntry Decode(ref BinaryDecoder decoder)
+ {
+ ManifestEntryStatus status = ManifestEntryStatus.Existing;
+ long? snapshotId = null;
+ long? sequenceNumber = null;
+ long? fileSequenceNumber = null;
+ DataFile? dataFile = null;
+
+ foreach (AvroRecordField field in _entryRecord.Fields)
+ {
+ switch (field.FieldId)
+ {
+ case IcebergFieldIds.Status:
+ status = (ManifestEntryStatus)NullableReader.ReadIntRequiredOrZero(ref decoder, field.Schema);
+ break;
+ case IcebergFieldIds.EntrySnapshotId:
+ snapshotId = NullableReader.ReadLongOrNull(ref decoder, field.Schema);
+ break;
+ case IcebergFieldIds.EntrySequenceNumber:
+ sequenceNumber = NullableReader.ReadLongOrNull(ref decoder, field.Schema);
+ break;
+ case IcebergFieldIds.EntryFileSequenceNumber:
+ fileSequenceNumber = NullableReader.ReadLongOrNull(ref decoder, field.Schema);
+ break;
+ case IcebergFieldIds.DataFile:
+ dataFile = DecodeDataFile(ref decoder, field.Schema);
+ break;
+ default:
+ ShapeDecoder.SkipValue(ref decoder, field.Schema);
+ break;
+ }
+ }
+
+ if (dataFile is null)
+ {
+ throw new AvroFormatException("manifest_entry is missing data_file (field-id 2)");
+ }
+
+ return new ManifestEntry(status, snapshotId, sequenceNumber, fileSequenceNumber, dataFile);
+ }
+
+ private DataFile DecodeDataFile(ref BinaryDecoder decoder, AvroSchema dfSchema)
+ {
+ if (NullableReader.PeelNullable(ref decoder, ref dfSchema))
+ {
+ throw new AvroFormatException("data_file is null");
+ }
+
+ if (dfSchema is not AvroRecord rec)
+ {
+ throw new AvroFormatException("data_file must be a record");
+ }
+
+ DataFileContent content = DataFileContent.Data;
+ string? filePath = null;
+ string? fileFormat = null;
+ PartitionValues partition = PartitionValues.Empty;
+ long recordCount = 0;
+ long fileSize = 0;
+ Dictionary? columnSizes = null;
+ Dictionary? valueCounts = null;
+ Dictionary? nullValueCounts = null;
+ Dictionary? nanValueCounts = null;
+ Dictionary>? lowerBounds = null;
+ Dictionary>? upperBounds = null;
+ byte[]? keyMetadata = null;
+ List? splitOffsets = null;
+ List? equalityIds = null;
+ int? sortOrderId = null;
+
+ foreach (AvroRecordField field in rec.Fields)
+ {
+ switch (field.FieldId)
+ {
+ case IcebergFieldIds.DfContent:
+ content = (DataFileContent)NullableReader.ReadIntRequiredOrZero(ref decoder, field.Schema);
+ break;
+ case IcebergFieldIds.DfFilePath:
+ filePath = NullableReader.ReadStringRequired(ref decoder, field.Schema, "file_path");
+ break;
+ case IcebergFieldIds.DfFileFormat:
+ fileFormat = NullableReader.ReadStringRequired(ref decoder, field.Schema, "file_format");
+ break;
+ case IcebergFieldIds.DfPartition:
+ partition = DecodePartition(ref decoder, field.Schema);
+ break;
+ case IcebergFieldIds.DfRecordCount:
+ recordCount = NullableReader.ReadLongRequiredOrZero(ref decoder, field.Schema);
+ break;
+ case IcebergFieldIds.DfFileSizeInBytes:
+ fileSize = NullableReader.ReadLongRequiredOrZero(ref decoder, field.Schema);
+ break;
+ case IcebergFieldIds.DfBlockSizeInBytes:
+ // v1 only; not surfaced in the domain model.
+ _ = NullableReader.ReadLongOrNull(ref decoder, field.Schema);
+ break;
+ case IcebergFieldIds.DfColumnSizes:
+ columnSizes = DecodeIntLongMap(ref decoder, field.Schema);
+ break;
+ case IcebergFieldIds.DfValueCounts:
+ valueCounts = DecodeIntLongMap(ref decoder, field.Schema);
+ break;
+ case IcebergFieldIds.DfNullValueCounts:
+ nullValueCounts = DecodeIntLongMap(ref decoder, field.Schema);
+ break;
+ case IcebergFieldIds.DfNanValueCounts:
+ nanValueCounts = DecodeIntLongMap(ref decoder, field.Schema);
+ break;
+ case IcebergFieldIds.DfLowerBounds:
+ lowerBounds = DecodeIntBytesMap(ref decoder, field.Schema);
+ break;
+ case IcebergFieldIds.DfUpperBounds:
+ upperBounds = DecodeIntBytesMap(ref decoder, field.Schema);
+ break;
+ case IcebergFieldIds.DfKeyMetadata:
+ keyMetadata = NullableReader.ReadBytesOrNull(ref decoder, field.Schema);
+ break;
+ case IcebergFieldIds.DfSplitOffsets:
+ splitOffsets = DecodeLongArray(ref decoder, field.Schema);
+ break;
+ case IcebergFieldIds.DfEqualityIds:
+ equalityIds = DecodeIntArray(ref decoder, field.Schema);
+ break;
+ case IcebergFieldIds.DfSortOrderId:
+ sortOrderId = NullableReader.ReadIntOrNull(ref decoder, field.Schema);
+ break;
+ default:
+ ShapeDecoder.SkipValue(ref decoder, field.Schema);
+ break;
+ }
+ }
+
+ if (filePath is null || fileFormat is null)
+ {
+ throw new AvroFormatException("data_file record is missing file_path or file_format");
+ }
+
+ return new DataFile(
+ content,
+ filePath,
+ fileFormat,
+ partition,
+ recordCount,
+ fileSize,
+ columnSizes,
+ valueCounts,
+ nullValueCounts,
+ nanValueCounts,
+ lowerBounds,
+ upperBounds,
+ keyMetadata ?? [],
+ splitOffsets,
+ equalityIds,
+ sortOrderId);
+ }
+
+ private PartitionValues DecodePartition(ref BinaryDecoder decoder, AvroSchema schema)
+ {
+ if (NullableReader.PeelNullable(ref decoder, ref schema))
+ {
+ return PartitionValues.Empty;
+ }
+
+ if (schema is not AvroRecord rec)
+ {
+ throw new AvroFormatException($"partition field must be a record, got {schema.GetType().Name}");
+ }
+
+ if (_partitionRecord is null)
+ {
+ // Should not happen — we resolved it in the constructor — but recover defensively.
+ ShapeDecoder.SkipValue(ref decoder, schema);
+ return PartitionValues.Empty;
+ }
+
+ return PartitionRecordSink.Decode(ref decoder, rec, _partitionNames, _partitionTypes);
+ }
+
+ private static Dictionary? DecodeIntLongMap(ref BinaryDecoder decoder, AvroSchema schema)
+ {
+ if (NullableReader.PeelNullable(ref decoder, ref schema))
+ {
+ return null;
+ }
+
+ // Iceberg encodes int-keyed maps as an Avro array of {key: int, value: T} records,
+ // because Avro's native map only supports string keys.
+ if (schema is not AvroArray arr || arr.Items is not AvroRecord entry)
+ {
+ throw new AvroFormatException($"expected int->long map (array), got {schema.GetType().Name}");
+ }
+
+ var result = new Dictionary();
+ while (true)
+ {
+ var count = decoder.ReadBlockCount();
+ if (count == 0)
+ {
+ return result;
+ }
+
+ for (long i = 0; i < count; i++)
+ {
+ int? key = null;
+ long? value = null;
+ foreach (AvroRecordField f in entry.Fields)
+ {
+ if (f.Name == "key")
+ {
+ key = NullableReader.ReadIntRequiredOrZero(ref decoder, f.Schema);
+ }
+ else if (f.Name == "value")
+ {
+ value = NullableReader.ReadLongRequiredOrZero(ref decoder, f.Schema);
+ }
+ else
+ {
+ ShapeDecoder.SkipValue(ref decoder, f.Schema);
+ }
+ }
+
+ if (key is null || value is null)
+ {
+ throw new AvroFormatException("kv-record missing key or value");
+ }
+
+ result[key.Value] = value.Value;
+ }
+ }
+ }
+
+ private static Dictionary>? DecodeIntBytesMap(ref BinaryDecoder decoder, AvroSchema schema)
+ {
+ if (NullableReader.PeelNullable(ref decoder, ref schema))
+ {
+ return null;
+ }
+
+ if (schema is not AvroArray arr || arr.Items is not AvroRecord entry)
+ {
+ throw new AvroFormatException($"expected int->bytes map (array), got {schema.GetType().Name}");
+ }
+
+ var result = new Dictionary>();
+ while (true)
+ {
+ var count = decoder.ReadBlockCount();
+ if (count == 0)
+ {
+ return result;
+ }
+
+ for (long i = 0; i < count; i++)
+ {
+ int? key = null;
+ byte[]? value = null;
+ foreach (AvroRecordField f in entry.Fields)
+ {
+ if (f.Name == "key")
+ {
+ key = NullableReader.ReadIntRequiredOrZero(ref decoder, f.Schema);
+ }
+ else if (f.Name == "value")
+ {
+ value = NullableReader.ReadBytesOrNull(ref decoder, f.Schema);
+ }
+ else
+ {
+ ShapeDecoder.SkipValue(ref decoder, f.Schema);
+ }
+ }
+
+ if (key is null)
+ {
+ throw new AvroFormatException("kv-record missing key");
+ }
+
+ result[key.Value] = value is null ? ReadOnlyMemory.Empty : value;
+ }
+ }
+ }
+
+ private static List? DecodeLongArray(ref BinaryDecoder decoder, AvroSchema schema)
+ {
+ if (NullableReader.PeelNullable(ref decoder, ref schema))
+ {
+ return null;
+ }
+
+ if (schema is not AvroArray arr)
+ {
+ throw new AvroFormatException($"expected array, got {schema.GetType().Name}");
+ }
+
+ var result = new List();
+ while (true)
+ {
+ var count = decoder.ReadBlockCount();
+ if (count == 0)
+ {
+ return result;
+ }
+
+ for (long i = 0; i < count; i++)
+ {
+ AvroSchema itemSchema = arr.Items;
+ result.Add(NullableReader.ReadLongRequiredOrZero(ref decoder, itemSchema));
+ }
+ }
+ }
+
+ private static List? DecodeIntArray(ref BinaryDecoder decoder, AvroSchema schema)
+ {
+ if (NullableReader.PeelNullable(ref decoder, ref schema))
+ {
+ return null;
+ }
+
+ if (schema is not AvroArray arr)
+ {
+ throw new AvroFormatException($"expected array, got {schema.GetType().Name}");
+ }
+
+ var result = new List();
+ while (true)
+ {
+ var count = decoder.ReadBlockCount();
+ if (count == 0)
+ {
+ return result;
+ }
+
+ for (long i = 0; i < count; i++)
+ {
+ AvroSchema itemSchema = arr.Items;
+ result.Add(NullableReader.ReadIntRequiredOrZero(ref decoder, itemSchema));
+ }
+ }
+ }
+
+ private static AvroRecord ResolveDataFileRecord(AvroRecord entryRecord)
+ {
+ foreach (AvroRecordField f in entryRecord.Fields)
+ {
+ if (f.FieldId == IcebergFieldIds.DataFile)
+ {
+ AvroSchema s = f.Schema;
+ if (s is AvroUnion u)
+ {
+ s = u.NonNull;
+ }
+
+ if (s is AvroRecord r)
+ {
+ return r;
+ }
+ }
+ }
+
+ throw new AvroFormatException("manifest_entry has no data_file record (field-id 2)");
+ }
+
+ private static AvroRecord? ResolvePartitionRecord(AvroRecord dataFileRecord)
+ {
+ foreach (AvroRecordField f in dataFileRecord.Fields)
+ {
+ if (f.FieldId == IcebergFieldIds.DfPartition)
+ {
+ AvroSchema s = f.Schema;
+ if (s is AvroUnion u)
+ {
+ s = u.NonNull;
+ }
+
+ if (s is AvroRecord r)
+ {
+ return r;
+ }
+ }
+ }
+
+ return null;
+ }
+}
diff --git a/src/IcebergSharp.Avro/Internal/Decode/ManifestFileSink.cs b/src/IcebergSharp.Avro/Internal/Decode/ManifestFileSink.cs
new file mode 100644
index 0000000..efc3429
--- /dev/null
+++ b/src/IcebergSharp.Avro/Internal/Decode/ManifestFileSink.cs
@@ -0,0 +1,189 @@
+using IcebergSharp.Avro.Internal.Errors;
+using IcebergSharp.Avro.Internal.Schema;
+
+namespace IcebergSharp.Avro.Internal.Decode;
+
+///
+/// Decodes one manifest_file Avro record into a .
+/// Field dispatch is by Iceberg field-id so the same code handles v1
+/// (where rows-count fields are absent) and v2 (where they are required).
+///
+internal static class ManifestFileSink
+{
+ public static ManifestFile Decode(ref BinaryDecoder decoder, AvroRecord record, int formatVersion)
+ {
+ string? manifestPath = null;
+ long manifestLength = 0;
+ var partitionSpecId = 0;
+ ManifestContent content = ManifestContent.Data;
+ long sequenceNumber = 0;
+ long minSequenceNumber = 0;
+ long addedSnapshotId = 0;
+ var addedFilesCount = 0;
+ var existingFilesCount = 0;
+ var deletedFilesCount = 0;
+ long addedRowsCount = 0;
+ long existingRowsCount = 0;
+ long deletedRowsCount = 0;
+ List? partitions = null;
+ byte[]? keyMetadata = null;
+
+ foreach (AvroRecordField field in record.Fields)
+ {
+ switch (field.FieldId)
+ {
+ case IcebergFieldIds.ManifestPath:
+ manifestPath = NullableReader.ReadStringRequired(ref decoder, field.Schema, "manifest_path");
+ break;
+ case IcebergFieldIds.ManifestLength:
+ manifestLength = NullableReader.ReadLongRequiredOrZero(ref decoder, field.Schema);
+ break;
+ case IcebergFieldIds.PartitionSpecId:
+ partitionSpecId = NullableReader.ReadIntRequiredOrZero(ref decoder, field.Schema);
+ break;
+ case IcebergFieldIds.Content:
+ content = (ManifestContent)NullableReader.ReadIntRequiredOrZero(ref decoder, field.Schema);
+ break;
+ case IcebergFieldIds.SequenceNumber:
+ sequenceNumber = NullableReader.ReadLongRequiredOrZero(ref decoder, field.Schema);
+ break;
+ case IcebergFieldIds.MinSequenceNumber:
+ minSequenceNumber = NullableReader.ReadLongRequiredOrZero(ref decoder, field.Schema);
+ break;
+ case IcebergFieldIds.AddedSnapshotId:
+ addedSnapshotId = NullableReader.ReadLongRequiredOrZero(ref decoder, field.Schema);
+ break;
+ case IcebergFieldIds.AddedFilesCount:
+ addedFilesCount = ReadIntCount(ref decoder, field.Schema, "added_data_files_count");
+ break;
+ case IcebergFieldIds.ExistingFilesCount:
+ existingFilesCount = ReadIntCount(ref decoder, field.Schema, "existing_data_files_count");
+ break;
+ case IcebergFieldIds.DeletedFilesCount:
+ deletedFilesCount = ReadIntCount(ref decoder, field.Schema, "deleted_data_files_count");
+ break;
+ case IcebergFieldIds.AddedRowsCount:
+ addedRowsCount = NullableReader.ReadLongRequiredOrZero(ref decoder, field.Schema);
+ break;
+ case IcebergFieldIds.ExistingRowsCount:
+ existingRowsCount = NullableReader.ReadLongRequiredOrZero(ref decoder, field.Schema);
+ break;
+ case IcebergFieldIds.DeletedRowsCount:
+ deletedRowsCount = NullableReader.ReadLongRequiredOrZero(ref decoder, field.Schema);
+ break;
+ case IcebergFieldIds.Partitions:
+ partitions = DecodePartitions(ref decoder, field.Schema);
+ break;
+ case IcebergFieldIds.KeyMetadataManifest:
+ keyMetadata = NullableReader.ReadBytesOrNull(ref decoder, field.Schema);
+ break;
+ default:
+ ShapeDecoder.SkipValue(ref decoder, field.Schema);
+ break;
+ }
+ }
+
+ if (manifestPath is null)
+ {
+ throw new AvroFormatException("manifest_file record is missing manifest_path (field-id 500)");
+ }
+
+ return new ManifestFile(
+ manifestPath,
+ manifestLength,
+ partitionSpecId,
+ content,
+ sequenceNumber,
+ minSequenceNumber,
+ addedSnapshotId,
+ addedFilesCount,
+ existingFilesCount,
+ deletedFilesCount,
+ addedRowsCount,
+ existingRowsCount,
+ deletedRowsCount,
+ partitions,
+ keyMetadata ?? [],
+ formatVersion);
+ }
+
+ private static List DecodePartitions(ref BinaryDecoder decoder, AvroSchema schema)
+ {
+ if (NullableReader.PeelNullable(ref decoder, ref schema))
+ {
+ return [];
+ }
+
+ if (schema is not AvroArray arr)
+ {
+ throw new AvroFormatException("partitions field must be array");
+ }
+
+ // Some writers wrap the field_summary record in [null, record] inside the array's items.
+ _ = arr.Items switch
+ {
+ AvroRecord => true,
+ AvroUnion { NonNull: AvroRecord } => true,
+ _ => throw new AvroFormatException("partitions array element must be a field_summary record"),
+ };
+
+ var result = new List();
+ while (true)
+ {
+ var count = decoder.ReadBlockCount();
+ if (count == 0)
+ {
+ return result;
+ }
+
+ for (long i = 0; i < count; i++)
+ {
+ AvroSchema itemSchema = arr.Items;
+ if (NullableReader.PeelNullable(ref decoder, ref itemSchema))
+ {
+ throw new AvroFormatException("null field_summary in partitions array");
+ }
+
+ result.Add(DecodeFieldSummary(ref decoder, (AvroRecord)itemSchema));
+ }
+ }
+ }
+
+ private static int ReadIntCount(ref BinaryDecoder decoder, AvroSchema schema, string fieldName)
+ {
+ var value = NullableReader.ReadLongRequiredOrZero(ref decoder, schema);
+ return NullableReader.ToInt32(value, fieldName);
+ }
+
+ private static FieldSummary DecodeFieldSummary(ref BinaryDecoder decoder, AvroRecord rec)
+ {
+ var containsNull = false;
+ bool? containsNaN = null;
+ byte[] lower = [];
+ byte[] upper = [];
+
+ foreach (AvroRecordField field in rec.Fields)
+ {
+ switch (field.FieldId)
+ {
+ case IcebergFieldIds.FsContainsNull:
+ containsNull = NullableReader.ReadBoolOrNull(ref decoder, field.Schema) ?? false;
+ break;
+ case IcebergFieldIds.FsContainsNaN:
+ containsNaN = NullableReader.ReadBoolOrNull(ref decoder, field.Schema);
+ break;
+ case IcebergFieldIds.FsLowerBound:
+ lower = NullableReader.ReadBytesOrNull(ref decoder, field.Schema) ?? [];
+ break;
+ case IcebergFieldIds.FsUpperBound:
+ upper = NullableReader.ReadBytesOrNull(ref decoder, field.Schema) ?? [];
+ break;
+ default:
+ ShapeDecoder.SkipValue(ref decoder, field.Schema);
+ break;
+ }
+ }
+
+ return new FieldSummary(containsNull, containsNaN, lower, upper);
+ }
+}
diff --git a/src/IcebergSharp.Avro/Internal/Decode/NullableReader.cs b/src/IcebergSharp.Avro/Internal/Decode/NullableReader.cs
new file mode 100644
index 0000000..4ace650
--- /dev/null
+++ b/src/IcebergSharp.Avro/Internal/Decode/NullableReader.cs
@@ -0,0 +1,130 @@
+using IcebergSharp.Avro.Internal.Errors;
+using IcebergSharp.Avro.Internal.Schema;
+
+namespace IcebergSharp.Avro.Internal.Decode;
+
+///
+/// Helpers for reading possibly-nullable Avro values. Each method peels off a
+/// [null, T] union and dispatches on the carried type. When the encoded
+/// branch is null the helper returns a sentinel (null, 0,
+/// false, or an empty array) so callers don't carry per-field null flags.
+///
+internal static class NullableReader
+{
+ public static bool PeelNullable(ref BinaryDecoder decoder, ref AvroSchema schema)
+ {
+ if (schema is not AvroUnion u)
+ {
+ return false;
+ }
+
+ var branch = decoder.ReadUnionBranch();
+ if (branch == u.NullBranchIndex)
+ {
+ return true;
+ }
+
+ if (branch != u.NonNullBranchIndex)
+ {
+ throw new AvroFormatException($"unexpected union branch {branch}");
+ }
+
+ schema = u.NonNull;
+ return false;
+ }
+
+ public static string? ReadStringOrNull(ref BinaryDecoder decoder, AvroSchema schema)
+ {
+ if (PeelNullable(ref decoder, ref schema))
+ {
+ return null;
+ }
+
+ if (schema is AvroPrimitive { Kind: AvroPrimitiveKind.String })
+ {
+ return decoder.ReadString();
+ }
+
+ throw new AvroFormatException($"expected string schema, got {schema.GetType().Name}");
+ }
+
+ public static string ReadStringRequired(ref BinaryDecoder decoder, AvroSchema schema, string fieldName)
+ => ReadStringOrNull(ref decoder, schema)
+ ?? throw new AvroFormatException($"required string field '{fieldName}' is null");
+
+ public static long? ReadLongOrNull(ref BinaryDecoder decoder, AvroSchema schema)
+ {
+ if (PeelNullable(ref decoder, ref schema))
+ {
+ return null;
+ }
+
+ return schema switch
+ {
+ AvroPrimitive { Kind: AvroPrimitiveKind.Long } => decoder.ReadLong(),
+ AvroPrimitive { Kind: AvroPrimitiveKind.Int } => decoder.ReadInt(),
+ _ => throw new AvroFormatException($"expected long/int schema, got {schema.GetType().Name}"),
+ };
+ }
+
+ public static long ReadLongRequiredOrZero(ref BinaryDecoder decoder, AvroSchema schema)
+ => ReadLongOrNull(ref decoder, schema) ?? 0L;
+
+ public static int? ReadIntOrNull(ref BinaryDecoder decoder, AvroSchema schema)
+ {
+ if (PeelNullable(ref decoder, ref schema))
+ {
+ return null;
+ }
+
+ return schema switch
+ {
+ AvroPrimitive { Kind: AvroPrimitiveKind.Int } => decoder.ReadInt(),
+ AvroPrimitive { Kind: AvroPrimitiveKind.Long } => ToInt32(decoder.ReadLong(), "int-compatible long"),
+ _ => throw new AvroFormatException($"expected int schema, got {schema.GetType().Name}"),
+ };
+ }
+
+ public static int ReadIntRequiredOrZero(ref BinaryDecoder decoder, AvroSchema schema)
+ => ReadIntOrNull(ref decoder, schema) ?? 0;
+
+ public static int ToInt32(long value, string fieldName)
+ {
+ if (value is < int.MinValue or > int.MaxValue)
+ {
+ throw new AvroFormatException($"{fieldName} value {value} is outside Int32 range");
+ }
+
+ return (int)value;
+ }
+
+ public static bool? ReadBoolOrNull(ref BinaryDecoder decoder, AvroSchema schema)
+ {
+ if (PeelNullable(ref decoder, ref schema))
+ {
+ return null;
+ }
+
+ if (schema is AvroPrimitive { Kind: AvroPrimitiveKind.Boolean })
+ {
+ return decoder.ReadBoolean();
+ }
+
+ throw new AvroFormatException($"expected boolean schema, got {schema.GetType().Name}");
+ }
+
+ public static byte[]? ReadBytesOrNull(ref BinaryDecoder decoder, AvroSchema schema)
+ {
+ if (PeelNullable(ref decoder, ref schema))
+ {
+ return null;
+ }
+
+ return schema switch
+ {
+ AvroPrimitive { Kind: AvroPrimitiveKind.Bytes } => decoder.ReadBytesSpan().ToArray(),
+ AvroFixed f => decoder.ReadFixed(f.Size).ToArray(),
+ _ => throw new AvroFormatException($"expected bytes/fixed schema, got {schema.GetType().Name}"),
+ };
+ }
+}
diff --git a/src/IcebergSharp.Avro/Internal/Decode/PartitionRecordSink.cs b/src/IcebergSharp.Avro/Internal/Decode/PartitionRecordSink.cs
new file mode 100644
index 0000000..5dc1269
--- /dev/null
+++ b/src/IcebergSharp.Avro/Internal/Decode/PartitionRecordSink.cs
@@ -0,0 +1,39 @@
+using IcebergSharp.Avro.Internal.Schema;
+using IcebergSharp.Types;
+
+namespace IcebergSharp.Avro.Internal.Decode;
+
+///
+/// Decodes a manifest's data_file.partition record into a
+/// tuple, using the Avro record's own schema
+/// (which the writer stamps with field-ids that match the
+/// partition-spec metadata key).
+///
+internal static class PartitionRecordSink
+{
+ /// Resolves the "template" — names and types — from the partition-record schema.
+ public static (string[] Names, IcebergType[] Types) ResolveSchema(AvroRecord partitionRecord)
+ {
+ var names = new string[partitionRecord.Fields.Count];
+ var types = new IcebergType[partitionRecord.Fields.Count];
+ for (var i = 0; i < partitionRecord.Fields.Count; i++)
+ {
+ AvroRecordField f = partitionRecord.Fields[i];
+ names[i] = f.Name;
+ types[i] = AvroToIcebergType.Resolve(f.Schema);
+ }
+
+ return (names, types);
+ }
+
+ public static PartitionValues Decode(ref BinaryDecoder decoder, AvroRecord partitionRecord, string[] names, IcebergType[] types)
+ {
+ var values = new object?[partitionRecord.Fields.Count];
+ for (var i = 0; i < partitionRecord.Fields.Count; i++)
+ {
+ values[i] = ShapeDecoder.DecodeValue(ref decoder, partitionRecord.Fields[i].Schema);
+ }
+
+ return new PartitionValues(names, types, values);
+ }
+}
diff --git a/src/IcebergSharp.Avro/Internal/Decode/ShapeDecoder.cs b/src/IcebergSharp.Avro/Internal/Decode/ShapeDecoder.cs
new file mode 100644
index 0000000..c80e2a9
--- /dev/null
+++ b/src/IcebergSharp.Avro/Internal/Decode/ShapeDecoder.cs
@@ -0,0 +1,272 @@
+using System.Globalization;
+using IcebergSharp.Avro.Internal.Errors;
+using IcebergSharp.Avro.Internal.Schema;
+
+namespace IcebergSharp.Avro.Internal.Decode;
+
+///
+/// Schema-aware traversal helpers over a . The
+/// manifest/manifest-list record decoders use these to skip values they do not
+/// care about and to materialise typed values for the partition record (whose
+/// shape is dynamic).
+///
+internal static class ShapeDecoder
+{
+ ///
+ /// Consumes the next value of the given schema from the decoder, discarding
+ /// it. Used when a record field's field-id is not one the caller wires up.
+ ///
+ public static void SkipValue(ref BinaryDecoder decoder, AvroSchema schema)
+ {
+ switch (schema)
+ {
+ case AvroPrimitive p:
+ SkipPrimitive(ref decoder, p);
+ break;
+
+ case AvroFixed f:
+ _ = decoder.ReadFixed(f.Size);
+ break;
+
+ case AvroEnum:
+ _ = decoder.ReadInt();
+ break;
+
+ case AvroUnion u:
+ {
+ var branch = decoder.ReadUnionBranch();
+ if (branch == u.NullBranchIndex)
+ {
+ return;
+ }
+
+ if (branch != u.NonNullBranchIndex)
+ {
+ throw new AvroFormatException($"unexpected union branch {branch}");
+ }
+
+ SkipValue(ref decoder, u.NonNull);
+ break;
+ }
+
+ case AvroArray a:
+ {
+ while (true)
+ {
+ var count = decoder.ReadBlockCount();
+ if (count == 0)
+ {
+ break;
+ }
+
+ for (long i = 0; i < count; i++)
+ {
+ SkipValue(ref decoder, a.Items);
+ }
+ }
+
+ break;
+ }
+
+ case AvroMap m:
+ {
+ while (true)
+ {
+ var count = decoder.ReadBlockCount();
+ if (count == 0)
+ {
+ break;
+ }
+
+ for (long i = 0; i < count; i++)
+ {
+ _ = decoder.ReadString();
+ SkipValue(ref decoder, m.Values);
+ }
+ }
+
+ break;
+ }
+
+ case AvroRecord r:
+ {
+ foreach (AvroRecordField f in r.Fields)
+ {
+ SkipValue(ref decoder, f.Schema);
+ }
+
+ break;
+ }
+
+ default:
+ throw new AvroFormatException($"cannot skip unsupported schema kind {schema.GetType().Name}");
+ }
+ }
+
+ private static void SkipPrimitive(ref BinaryDecoder decoder, AvroPrimitive p)
+ {
+ switch (p.Kind)
+ {
+ case AvroPrimitiveKind.Null: break;
+ case AvroPrimitiveKind.Boolean: _ = decoder.ReadBoolean(); break;
+ case AvroPrimitiveKind.Int: _ = decoder.ReadInt(); break;
+ case AvroPrimitiveKind.Long: _ = decoder.ReadLong(); break;
+ case AvroPrimitiveKind.Float: _ = decoder.ReadFloat(); break;
+ case AvroPrimitiveKind.Double: _ = decoder.ReadDouble(); break;
+ case AvroPrimitiveKind.Bytes: _ = decoder.ReadBytesSpan(); break;
+ case AvroPrimitiveKind.String: _ = decoder.ReadBytesSpan(); break;
+ default: throw new AvroFormatException($"cannot skip primitive {p.Kind}");
+ }
+ }
+
+ ///
+ /// Decodes a single value of the given schema as a CLR .
+ /// Used for partition column values; honours the logical type to produce
+ /// , , , and
+ /// where appropriate.
+ ///
+ public static object? DecodeValue(ref BinaryDecoder decoder, AvroSchema schema)
+ {
+ if (schema is AvroUnion u)
+ {
+ var branch = decoder.ReadUnionBranch();
+ if (branch == u.NullBranchIndex)
+ {
+ return null;
+ }
+
+ if (branch != u.NonNullBranchIndex)
+ {
+ throw new AvroFormatException($"unexpected union branch {branch}");
+ }
+
+ schema = u.NonNull;
+ }
+
+ return schema switch
+ {
+ AvroPrimitive p => DecodePrimitive(ref decoder, p),
+ AvroFixed f => DecodeFixed(ref decoder, f),
+ _ => throw new NotSupportedException($"unexpected partition-value schema kind {schema.GetType().Name}"),
+ };
+ }
+
+ private static object DecodePrimitive(ref BinaryDecoder decoder, AvroPrimitive p)
+ {
+ switch (p.LogicalType)
+ {
+ case AvroLogicalType.Date:
+ return DateOnly.FromDayNumber(_epochDayNumber + decoder.ReadInt());
+
+ case AvroLogicalType.TimeMicros:
+ {
+ var micros = decoder.ReadLong();
+ return TimeSpan.FromTicks(micros * 10);
+ }
+
+ case AvroLogicalType.TimeMillis:
+ {
+ var millis = decoder.ReadInt();
+ return TimeSpan.FromMilliseconds(millis);
+ }
+
+ case AvroLogicalType.TimestampMicros:
+ {
+ var micros = decoder.ReadLong();
+ return DateTime.UnixEpoch.AddTicks(micros * 10);
+ }
+
+ case AvroLogicalType.TimestampMillis:
+ {
+ var millis = decoder.ReadLong();
+ return DateTime.UnixEpoch.AddMilliseconds(millis);
+ }
+
+ case AvroLogicalType.Uuid:
+ {
+ var str = decoder.ReadString();
+ return Guid.Parse(str, CultureInfo.InvariantCulture);
+ }
+
+ case AvroLogicalType.Decimal:
+ {
+ // Iceberg encodes decimals as variable-length big-endian two's complement bytes.
+ ReadOnlySpan bytes = decoder.ReadBytesSpan();
+ return DecodeDecimal(bytes, p.DecimalScale);
+ }
+ }
+
+ return p.Kind switch
+ {
+ AvroPrimitiveKind.Boolean => decoder.ReadBoolean(),
+ AvroPrimitiveKind.Int => decoder.ReadInt(),
+ AvroPrimitiveKind.Long => decoder.ReadLong(),
+ AvroPrimitiveKind.Float => decoder.ReadFloat(),
+ AvroPrimitiveKind.Double => decoder.ReadDouble(),
+ AvroPrimitiveKind.Bytes => decoder.ReadBytesSpan().ToArray(),
+ AvroPrimitiveKind.String => decoder.ReadString(),
+ _ => throw new AvroFormatException($"cannot decode primitive {p.Kind}"),
+ };
+ }
+
+ private static object DecodeFixed(ref BinaryDecoder decoder, AvroFixed f)
+ {
+ ReadOnlySpan bytes = decoder.ReadFixed(f.Size);
+
+ return f.LogicalType switch
+ {
+ AvroLogicalType.Decimal => DecodeDecimal(bytes, f.DecimalScale),
+ AvroLogicalType.Uuid => DecodeUuidBytes(bytes),
+ _ => bytes.ToArray(),
+ };
+ }
+
+ private static decimal DecodeDecimal(ReadOnlySpan bytes, int scale)
+ {
+ if (bytes.IsEmpty)
+ {
+ return decimal.Zero;
+ }
+
+ // Two's complement big-endian → System.Numerics.BigInteger
+ // (BigInteger expects little-endian, signed).
+ Span le = stackalloc byte[bytes.Length];
+ for (var i = 0; i < bytes.Length; i++)
+ {
+ le[i] = bytes[bytes.Length - 1 - i];
+ }
+
+ var bi = new System.Numerics.BigInteger(le, isUnsigned: false, isBigEndian: false);
+ var pow = System.Numerics.BigInteger.Pow(10, scale);
+ var quotient = System.Numerics.BigInteger.DivRem(bi, pow, out System.Numerics.BigInteger remainder);
+ // Reconstruct the decimal value: quotient + remainder/10^scale. Loses precision beyond decimal range.
+ var whole = (decimal)quotient;
+ if (remainder.IsZero)
+ {
+ return whole;
+ }
+
+ var frac = (decimal)remainder / (decimal)pow;
+ return whole + frac;
+ }
+
+ private static Guid DecodeUuidBytes(ReadOnlySpan bytes)
+ {
+ if (bytes.Length != 16)
+ {
+ throw new AvroFormatException($"UUID fixed must be 16 bytes, got {bytes.Length}");
+ }
+
+ // Iceberg uses RFC-4122 big-endian byte layout (same as Java UUID.toString()).
+ Span shuffled = stackalloc byte[16];
+ bytes.CopyTo(shuffled);
+ // System.Guid byte layout for the first three fields is little-endian; swap.
+ (shuffled[0], shuffled[3]) = (shuffled[3], shuffled[0]);
+ (shuffled[1], shuffled[2]) = (shuffled[2], shuffled[1]);
+ (shuffled[4], shuffled[5]) = (shuffled[5], shuffled[4]);
+ (shuffled[6], shuffled[7]) = (shuffled[7], shuffled[6]);
+ return new Guid(shuffled);
+ }
+
+ private static readonly int _epochDayNumber = new DateOnly(1970, 1, 1).DayNumber;
+}
diff --git a/src/IcebergSharp.Avro/Internal/Errors/AvroFormatException.cs b/src/IcebergSharp.Avro/Internal/Errors/AvroFormatException.cs
new file mode 100644
index 0000000..cc45303
--- /dev/null
+++ b/src/IcebergSharp.Avro/Internal/Errors/AvroFormatException.cs
@@ -0,0 +1,45 @@
+using System.Globalization;
+
+namespace IcebergSharp.Avro.Internal.Errors;
+
+///
+/// Thrown when a stream does not conform to the slice of the Avro OCF spec
+/// IcebergSharp.Avro implements (manifests + manifest lists). Carries optional
+/// file-offset and contextual location to make failures locatable in real-world
+/// manifests that can be hundreds of records long.
+///
+///
+/// We use a sibling of rather than a subclass
+/// because is sealed. Catch this if you want
+/// only Avro-format errors; otherwise catch (Exception) is fine.
+///
+internal sealed class AvroFormatException : Exception
+{
+ public long? FileOffset { get; }
+ public string? Location { get; }
+
+ public AvroFormatException(string message)
+ : base(message)
+ {
+ }
+
+ public AvroFormatException(string message, long fileOffset, string? location = null)
+ : base(BuildMessage(message, fileOffset, location))
+ {
+ FileOffset = fileOffset;
+ Location = location;
+ }
+
+ public AvroFormatException(string message, Exception inner)
+ : base(message, inner)
+ {
+ }
+
+ private static string BuildMessage(string message, long fileOffset, string? location)
+ {
+ var offsetStr = fileOffset.ToString(CultureInfo.InvariantCulture);
+ return location is null
+ ? $"{message} (offset {offsetStr})"
+ : $"{message} (offset {offsetStr}, {location})";
+ }
+}
diff --git a/src/IcebergSharp.Avro/Internal/Ocf/OcfHeader.cs b/src/IcebergSharp.Avro/Internal/Ocf/OcfHeader.cs
new file mode 100644
index 0000000..9ece3e8
--- /dev/null
+++ b/src/IcebergSharp.Avro/Internal/Ocf/OcfHeader.cs
@@ -0,0 +1,19 @@
+namespace IcebergSharp.Avro.Internal.Ocf;
+
+///
+/// Parsed Avro Object Container File header — magic, codec name, schema JSON,
+/// sync bytes, and any extra metadata keys the writer attached (Iceberg uses
+/// these for partition-spec, schema, format-version).
+///
+internal sealed class OcfHeader(string schemaJson, string codec, byte[] sync, IReadOnlyDictionary metadata)
+{
+ /// Avro OCF magic: Obj\x01.
+ public static ReadOnlySpan Magic => "Obj\x01"u8;
+
+ public const int SyncMarkerSize = 16;
+
+ public string SchemaJson { get; } = schemaJson;
+ public string Codec { get; } = codec;
+ public byte[] Sync { get; } = sync;
+ public IReadOnlyDictionary Metadata { get; } = metadata;
+}
diff --git a/src/IcebergSharp.Avro/Internal/Ocf/OcfReader.cs b/src/IcebergSharp.Avro/Internal/Ocf/OcfReader.cs
new file mode 100644
index 0000000..8a44405
--- /dev/null
+++ b/src/IcebergSharp.Avro/Internal/Ocf/OcfReader.cs
@@ -0,0 +1,366 @@
+using System.Buffers;
+using System.Text;
+using IcebergSharp.Avro.Internal.Codec;
+using IcebergSharp.Avro.Internal.Decode;
+using IcebergSharp.Avro.Internal.Errors;
+
+namespace IcebergSharp.Avro.Internal.Ocf;
+
+///
+/// Async block iterator over an Avro Object Container File. Reads the header on
+/// , then yields one decompressed block at a time
+/// via . Buffers are pooled and returned on
+/// dispose; callers must consume before requesting
+/// the next block.
+///
+internal sealed class OcfReader : IAsyncDisposable
+{
+ private const int _maxHeaderBlockSize = 16 * 1024 * 1024;
+ private const int _maxHeaderEntryCount = 1024;
+ private const int _maxHeaderKeyLength = 4096;
+ private const int _maxHeaderValueLength = 16 * 1024 * 1024;
+
+ private readonly Stream _stream;
+ private readonly bool _leaveOpen;
+ private readonly int _maxBlockSize;
+ private readonly int _maxBlockRecordCount;
+ private readonly byte[] _singleByte = new byte[1];
+
+ private byte[] _compressedBuffer = [];
+ private byte[] _decompressedBuffer = [];
+ private int _currentBlockLength;
+ private int _currentRecordCount;
+ private long _streamOffset;
+ private OcfHeader? _header;
+ private IBlockCodec? _codec;
+ private bool _initialized;
+
+ public OcfReader(Stream stream, bool leaveOpen, int maxBlockSize, int maxBlockRecordCount = 1_000_000)
+ {
+ ArgumentNullException.ThrowIfNull(stream);
+ ArgumentOutOfRangeException.ThrowIfNegativeOrZero(maxBlockSize);
+ ArgumentOutOfRangeException.ThrowIfNegativeOrZero(maxBlockRecordCount);
+
+ _stream = stream;
+ _leaveOpen = leaveOpen;
+ _maxBlockSize = maxBlockSize;
+ _maxBlockRecordCount = maxBlockRecordCount;
+ }
+
+ public OcfHeader Header => _header
+ ?? throw new InvalidOperationException("call InitializeAsync first");
+
+ public ReadOnlySpan CurrentBlock => _decompressedBuffer.AsSpan(0, _currentBlockLength);
+
+ public int CurrentRecordCount => _currentRecordCount;
+
+ public async Task InitializeAsync(CancellationToken cancellationToken)
+ {
+ if (_initialized)
+ {
+ return;
+ }
+
+ await ReadMagicAsync(cancellationToken).ConfigureAwait(false);
+ IReadOnlyDictionary metadata = await ReadHeaderMetadataAsync(cancellationToken).ConfigureAwait(false);
+ var sync = await ReadExactAsync(OcfHeader.SyncMarkerSize, cancellationToken).ConfigureAwait(false);
+
+ if (!metadata.TryGetValue("avro.schema", out var schemaBytes))
+ {
+ throw new AvroFormatException("OCF header is missing 'avro.schema'", _streamOffset);
+ }
+
+ var codecName = "null";
+ if (metadata.TryGetValue("avro.codec", out var codecBytes))
+ {
+ codecName = Encoding.UTF8.GetString(codecBytes);
+ }
+
+ _codec = codecName switch
+ {
+ "null" => NullCodec.Instance,
+ "deflate" => DeflateCodec.Instance,
+ _ => throw new AvroFormatException($"unsupported codec '{codecName}'", _streamOffset),
+ };
+
+ var schemaJson = Encoding.UTF8.GetString(schemaBytes);
+ _header = new OcfHeader(schemaJson, codecName, sync, metadata);
+ _initialized = true;
+ }
+
+ public async ValueTask ReadNextBlockAsync(CancellationToken cancellationToken)
+ {
+ if (!_initialized)
+ {
+ throw new InvalidOperationException("call InitializeAsync first");
+ }
+
+ // Block header: long(record-count) long(byte-size). Read varints byte-by-byte
+ // since we don't know the encoded length up front.
+ var blockStartOffset = _streamOffset;
+ var recordCountResult = await TryReadVarLongAsync(cancellationToken).ConfigureAwait(false);
+ if (recordCountResult is null)
+ {
+ return false; // clean EOF before block header
+ }
+
+ var recordCount = recordCountResult.Value;
+ var byteSize = await ReadVarLongAsync(cancellationToken).ConfigureAwait(false);
+
+ if (recordCount <= 0)
+ {
+ throw new AvroFormatException($"invalid block record count {recordCount}", blockStartOffset);
+ }
+
+ if (recordCount > _maxBlockRecordCount)
+ {
+ throw new AvroFormatException($"block record count {recordCount} exceeds limit {_maxBlockRecordCount}", blockStartOffset);
+ }
+
+ if (byteSize <= 0 || byteSize > _maxBlockSize)
+ {
+ throw new AvroFormatException($"invalid block byte size {byteSize} (max {_maxBlockSize})", blockStartOffset);
+ }
+
+ await EnsureCompressedCapacityAsync((int)byteSize, cancellationToken).ConfigureAwait(false);
+ await ReadExactIntoAsync(_compressedBuffer.AsMemory(0, (int)byteSize), cancellationToken).ConfigureAwait(false);
+
+ // Decompress into the decompressed buffer (resized inside the codec if necessary).
+ if (_decompressedBuffer.Length == 0)
+ {
+ _decompressedBuffer = ArrayPool.Shared.Rent(Math.Max(4096, (int)byteSize));
+ }
+
+ _currentBlockLength = _codec!.Decode(_compressedBuffer.AsSpan(0, (int)byteSize), ref _decompressedBuffer);
+ if (_currentBlockLength > _maxBlockSize)
+ {
+ throw new AvroFormatException($"decompressed block size {_currentBlockLength} exceeds limit {_maxBlockSize}", blockStartOffset);
+ }
+
+ _currentRecordCount = (int)recordCount;
+
+ // Verify the trailing sync marker.
+ var sync = await ReadExactAsync(OcfHeader.SyncMarkerSize, cancellationToken).ConfigureAwait(false);
+ if (!sync.AsSpan().SequenceEqual(_header!.Sync))
+ {
+ throw new AvroFormatException("sync marker mismatch", _streamOffset);
+ }
+
+ return true;
+ }
+
+ public async ValueTask DisposeAsync()
+ {
+ if (_compressedBuffer.Length > 0)
+ {
+ ArrayPool.Shared.Return(_compressedBuffer);
+ _compressedBuffer = [];
+ }
+
+ if (_decompressedBuffer.Length > 0)
+ {
+ ArrayPool.Shared.Return(_decompressedBuffer);
+ _decompressedBuffer = [];
+ }
+
+ if (!_leaveOpen)
+ {
+ await _stream.DisposeAsync().ConfigureAwait(false);
+ }
+ }
+
+ private async ValueTask ReadMagicAsync(CancellationToken cancellationToken)
+ {
+ var magic = await ReadExactAsync(OcfHeader.Magic.Length, cancellationToken).ConfigureAwait(false);
+ if (!magic.AsSpan().SequenceEqual(OcfHeader.Magic))
+ {
+ throw new AvroFormatException(
+ $"invalid Avro OCF magic: 0x{Convert.ToHexString(magic)}; expected Obj 01", 0);
+ }
+ }
+
+ private async ValueTask> ReadHeaderMetadataAsync(CancellationToken cancellationToken)
+ {
+ // Read the entire metadata map into memory; it's tiny in practice.
+ // We need to buffer enough to drive a sync BinaryDecoder over it.
+ var dict = new Dictionary(StringComparer.Ordinal);
+
+ while (true)
+ {
+ var count = await ReadVarLongAsync(cancellationToken).ConfigureAwait(false);
+ if (count == 0)
+ {
+ return dict;
+ }
+
+ long entries;
+ if (count < 0)
+ {
+ if (count == long.MinValue)
+ {
+ throw new AvroFormatException("OCF metadata block count is too small to negate", _streamOffset);
+ }
+
+ // Negative count: -count entries, followed by a non-negative byte-size header.
+ entries = -count;
+ var byteSize = await ReadVarLongAsync(cancellationToken).ConfigureAwait(false);
+ if (byteSize < 0 || byteSize > _maxHeaderBlockSize)
+ {
+ throw new AvroFormatException($"invalid OCF metadata block byte size {byteSize}", _streamOffset);
+ }
+ }
+ else
+ {
+ entries = count;
+ }
+
+ if (entries > _maxHeaderEntryCount)
+ {
+ throw new AvroFormatException($"OCF metadata entry count {entries} exceeds limit {_maxHeaderEntryCount}", _streamOffset);
+ }
+
+ for (long i = 0; i < entries; i++)
+ {
+ var key = await ReadVarStringAsync(_maxHeaderKeyLength, cancellationToken).ConfigureAwait(false);
+ var value = await ReadVarBytesAsync(_maxHeaderValueLength, cancellationToken).ConfigureAwait(false);
+ dict[key] = value;
+ }
+ }
+ }
+
+ private async ValueTask EnsureCompressedCapacityAsync(int size, CancellationToken cancellationToken)
+ {
+ if (_compressedBuffer.Length < size)
+ {
+ if (_compressedBuffer.Length > 0)
+ {
+ ArrayPool.Shared.Return(_compressedBuffer);
+ }
+
+ _compressedBuffer = ArrayPool.Shared.Rent(size);
+ }
+
+ await ValueTask.CompletedTask.ConfigureAwait(false);
+ _ = cancellationToken; // cancellation is checked on the actual stream reads
+ }
+
+ private async ValueTask ReadExactAsync(int count, CancellationToken cancellationToken)
+ {
+ var buffer = new byte[count];
+ await ReadExactIntoAsync(buffer, cancellationToken).ConfigureAwait(false);
+ return buffer;
+ }
+
+ private async ValueTask ReadExactIntoAsync(Memory destination, CancellationToken cancellationToken)
+ {
+ var total = 0;
+ while (total < destination.Length)
+ {
+ var n = await _stream.ReadAsync(destination[total..], cancellationToken).ConfigureAwait(false);
+ if (n == 0)
+ {
+ throw new AvroFormatException($"unexpected end of stream after {_streamOffset} bytes (need {destination.Length - total} more)", _streamOffset);
+ }
+
+ total += n;
+ _streamOffset += n;
+ }
+ }
+
+ private async ValueTask TryReadVarLongAsync(CancellationToken cancellationToken)
+ {
+ // Returns null on clean EOF before any byte is read; otherwise reads a full varint.
+ var n = await _stream.ReadAsync(_singleByte.AsMemory(0, 1), cancellationToken).ConfigureAwait(false);
+ if (n == 0)
+ {
+ return null;
+ }
+
+ _streamOffset += 1;
+
+ var firstByte = _singleByte[0];
+ long value = firstByte & 0x7F;
+ var shift = 7;
+ if ((firstByte & 0x80) != 0)
+ {
+ value |= await ReadRemainingVarLongAsync(shift, cancellationToken).ConfigureAwait(false);
+ }
+
+ return (long)((ulong)value >> 1) ^ -(value & 1L);
+ }
+
+ private async ValueTask ReadVarLongAsync(CancellationToken cancellationToken)
+ {
+ var v = await TryReadVarLongAsync(cancellationToken).ConfigureAwait(false) ?? throw new AvroFormatException("unexpected EOF while reading varint", _streamOffset);
+ return v;
+ }
+
+ private async ValueTask ReadRemainingVarLongAsync(int initialShift, CancellationToken cancellationToken)
+ {
+ long acc = 0;
+ var shift = initialShift;
+ while (true)
+ {
+ var n = await _stream.ReadAsync(_singleByte.AsMemory(0, 1), cancellationToken).ConfigureAwait(false);
+ if (n == 0)
+ {
+ throw new AvroFormatException("truncated varint", _streamOffset);
+ }
+
+ _streamOffset += 1;
+ var b = _singleByte[0];
+ acc |= (long)(b & 0x7F) << shift;
+ if ((b & 0x80) == 0)
+ {
+ return acc;
+ }
+
+ shift += 7;
+ if (shift > 63)
+ {
+ throw new AvroFormatException("varint too long", _streamOffset);
+ }
+ }
+ }
+
+ private async ValueTask ReadVarStringAsync(int maxLength, CancellationToken cancellationToken)
+ {
+ var len = await ReadVarLongAsync(cancellationToken).ConfigureAwait(false);
+ if (len < 0)
+ {
+ throw new AvroFormatException($"negative string length {len}", _streamOffset);
+ }
+
+ if (len > maxLength)
+ {
+ throw new AvroFormatException($"string length {len} exceeds supported maximum {maxLength}", _streamOffset);
+ }
+
+ if (len == 0)
+ {
+ return string.Empty;
+ }
+
+ var buf = await ReadExactAsync((int)len, cancellationToken).ConfigureAwait(false);
+ return Encoding.UTF8.GetString(buf);
+ }
+
+ private async ValueTask ReadVarBytesAsync(int maxLength, CancellationToken cancellationToken)
+ {
+ var len = await ReadVarLongAsync(cancellationToken).ConfigureAwait(false);
+ if (len < 0)
+ {
+ throw new AvroFormatException($"negative bytes length {len}", _streamOffset);
+ }
+
+ if (len > maxLength)
+ {
+ throw new AvroFormatException($"bytes length {len} exceeds supported maximum {maxLength}", _streamOffset);
+ }
+
+ return len == 0 ? [] : await ReadExactAsync((int)len, cancellationToken).ConfigureAwait(false);
+ }
+
+ // BinaryDecoder over the current decompressed block for sinks to use.
+ public BinaryDecoder OpenBlockDecoder() => new(CurrentBlock);
+}
diff --git a/src/IcebergSharp.Avro/Internal/Schema/AvroSchema.cs b/src/IcebergSharp.Avro/Internal/Schema/AvroSchema.cs
new file mode 100644
index 0000000..f1e960e
--- /dev/null
+++ b/src/IcebergSharp.Avro/Internal/Schema/AvroSchema.cs
@@ -0,0 +1,107 @@
+using System.Collections.ObjectModel;
+
+namespace IcebergSharp.Avro.Internal.Schema;
+
+///
+/// Avro primitive types Iceberg manifests actually use. Null only appears
+/// as a branch of a [null, T] union — not on its own.
+///
+internal enum AvroPrimitiveKind
+{
+ Null,
+ Boolean,
+ Int,
+ Long,
+ Float,
+ Double,
+ Bytes,
+ String,
+}
+
+///
+/// Avro "logical types" that overlay a physical primitive with semantic meaning.
+/// Only the ones Iceberg manifest schemas can carry on a partition column are
+/// modelled — everything else is ignored at the schema-parser layer.
+///
+internal enum AvroLogicalType
+{
+ None,
+ Decimal,
+ Date,
+ TimeMillis,
+ TimeMicros,
+ TimestampMillis,
+ TimestampMicros,
+ Uuid,
+}
+
+internal abstract class AvroSchema
+{
+ public AvroLogicalType LogicalType { get; init; } = AvroLogicalType.None;
+
+ /// Decimal precision (when is Decimal).
+ public int DecimalPrecision { get; init; }
+
+ /// Decimal scale (when is Decimal).
+ public int DecimalScale { get; init; }
+}
+
+internal sealed class AvroPrimitive(AvroPrimitiveKind kind) : AvroSchema
+{
+ public AvroPrimitiveKind Kind { get; } = kind;
+}
+
+internal sealed class AvroFixed(string name, int size) : AvroSchema
+{
+ public string Name { get; } = name;
+ public int Size { get; } = size;
+}
+
+internal sealed class AvroEnum(string name, IReadOnlyList symbols) : AvroSchema
+{
+ public string Name { get; } = name;
+ public IReadOnlyList Symbols { get; } = new ReadOnlyCollection([.. symbols]);
+}
+
+internal sealed class AvroArray(AvroSchema items) : AvroSchema
+{
+ public AvroSchema Items { get; } = items;
+ /// Iceberg field-id of the array element, when present.
+ public int? ElementId { get; init; }
+}
+
+internal sealed class AvroMap(AvroSchema values) : AvroSchema
+{
+ public AvroSchema Values { get; } = values;
+ public int? KeyId { get; init; }
+ public int? ValueId { get; init; }
+}
+
+///
+/// Avro union. IcebergSharp.Avro only supports a two-branch union where exactly
+/// one branch is null — i.e. [null, T] or [T, null]. The
+/// parser rejects everything else.
+///
+internal sealed class AvroUnion(AvroSchema nonNull, int nullBranchIndex) : AvroSchema
+{
+ public AvroSchema NonNull { get; } = nonNull;
+ /// Zero-based index of the null branch in the encoded union.
+ public int NullBranchIndex { get; } = nullBranchIndex;
+
+ /// Zero-based index of the non-null branch in the encoded union.
+ public int NonNullBranchIndex => NullBranchIndex == 0 ? 1 : 0;
+}
+
+internal sealed class AvroRecord(string name, IReadOnlyList fields) : AvroSchema
+{
+ public string Name { get; } = name;
+ public IReadOnlyList Fields { get; } = new ReadOnlyCollection([.. fields]);
+}
+
+internal sealed class AvroRecordField(string name, AvroSchema schema, int? fieldId)
+{
+ public string Name { get; } = name;
+ public AvroSchema Schema { get; } = schema;
+ /// Iceberg field-id JSON property — the stable key the decoder routes on.
+ public int? FieldId { get; } = fieldId;
+}
diff --git a/src/IcebergSharp.Avro/Internal/Schema/AvroSchemaParser.cs b/src/IcebergSharp.Avro/Internal/Schema/AvroSchemaParser.cs
new file mode 100644
index 0000000..32dc8a9
--- /dev/null
+++ b/src/IcebergSharp.Avro/Internal/Schema/AvroSchemaParser.cs
@@ -0,0 +1,488 @@
+using System.Text;
+using System.Text.Json;
+using IcebergSharp.Avro.Internal.Errors;
+
+namespace IcebergSharp.Avro.Internal.Schema;
+
+///
+/// Parses the JSON Avro schema embedded in an OCF header's avro.schema
+/// metadata key into the minimal internal model. Hand-rolled
+/// walk — AOT-safe, no DTOs, no reflection.
+///
+///
+/// Only the constructs Iceberg manifests use are supported. Named-type
+/// references across records, multi-branch unions other than [null, T],
+/// defaults beyond null, and named-type registries are intentionally
+/// rejected. Aliases and unknown metadata are ignored — IcebergSharp.Avro is
+/// not a general-purpose Avro library.
+///
+internal static class AvroSchemaParser
+{
+ public static AvroSchema Parse(string json)
+ {
+ ArgumentNullException.ThrowIfNull(json);
+ return Parse(Encoding.UTF8.GetBytes(json));
+ }
+
+ public static AvroSchema Parse(ReadOnlySpan utf8Json)
+ {
+ var reader = new Utf8JsonReader(utf8Json);
+ if (!reader.Read())
+ {
+ throw new AvroFormatException("empty Avro schema document");
+ }
+
+ AvroSchema schema = ReadSchema(ref reader);
+ if (reader.Read())
+ {
+ throw new AvroFormatException($"unexpected trailing token {reader.TokenType} after Avro schema");
+ }
+
+ return schema;
+ }
+
+ private static AvroSchema ReadSchema(ref Utf8JsonReader reader)
+ {
+ return reader.TokenType switch
+ {
+ JsonTokenType.String => ReadStringSchema(ref reader),
+ JsonTokenType.StartObject => ReadObjectSchema(ref reader),
+ JsonTokenType.StartArray => ReadUnionSchema(ref reader),
+ _ => throw new AvroFormatException($"unexpected token {reader.TokenType} for Avro schema"),
+ };
+ }
+
+ private static AvroPrimitive ReadStringSchema(ref Utf8JsonReader reader)
+ {
+ var name = reader.GetString() ?? throw new AvroFormatException("null Avro type name");
+ return ParsePrimitiveName(name);
+ }
+
+ private static AvroPrimitive ParsePrimitiveName(string name)
+ {
+ return name switch
+ {
+ "null" => new AvroPrimitive(AvroPrimitiveKind.Null),
+ "boolean" => new AvroPrimitive(AvroPrimitiveKind.Boolean),
+ "int" => new AvroPrimitive(AvroPrimitiveKind.Int),
+ "long" => new AvroPrimitive(AvroPrimitiveKind.Long),
+ "float" => new AvroPrimitive(AvroPrimitiveKind.Float),
+ "double" => new AvroPrimitive(AvroPrimitiveKind.Double),
+ "bytes" => new AvroPrimitive(AvroPrimitiveKind.Bytes),
+ "string" => new AvroPrimitive(AvroPrimitiveKind.String),
+ _ => throw new AvroFormatException($"unsupported Avro type name '{name}' (named-type references are not supported)"),
+ };
+ }
+
+ private static AvroUnion ReadUnionSchema(ref Utf8JsonReader reader)
+ {
+ // Parse the branches in order; we only accept exactly two, exactly one
+ // of which is "null".
+ var branches = new List(2);
+ var branchIsNull = new List(2);
+ var sawEndArray = false;
+
+ while (reader.Read())
+ {
+ if (reader.TokenType == JsonTokenType.EndArray)
+ {
+ sawEndArray = true;
+ break;
+ }
+
+ AvroSchema branch = ReadSchema(ref reader);
+ var isNull = branch is AvroPrimitive { Kind: AvroPrimitiveKind.Null };
+ branches.Add(branch);
+ branchIsNull.Add(isNull);
+ }
+
+ if (!sawEndArray)
+ {
+ throw new AvroFormatException("unexpected end of stream while reading Avro union");
+ }
+
+ if (branches.Count != 2 || branchIsNull.Count(b => b) != 1)
+ {
+ throw new AvroFormatException($"only [null, T] unions are supported (got {branches.Count} branches)");
+ }
+
+ var nullIdx = branchIsNull[0] ? 0 : 1;
+ AvroSchema nonNull = branches[1 - nullIdx];
+ return new AvroUnion(nonNull, nullIdx);
+ }
+
+ private static AvroSchema ReadObjectSchema(ref Utf8JsonReader reader)
+ {
+ string? typeTag = null;
+ string? name = null;
+ int? size = null;
+ List? symbols = null;
+ AvroSchema? items = null;
+ AvroSchema? values = null;
+ List? fields = null;
+ string? logicalType = null;
+ int? precision = null;
+ int? scale = null;
+ int? elementId = null;
+ int? keyId = null;
+ int? valueId = null;
+ var sawEndObject = false;
+
+ while (reader.Read())
+ {
+ if (reader.TokenType == JsonTokenType.EndObject)
+ {
+ sawEndObject = true;
+ break;
+ }
+
+ if (reader.TokenType != JsonTokenType.PropertyName)
+ {
+ throw new AvroFormatException($"unexpected token {reader.TokenType} while reading Avro type object");
+ }
+
+ var prop = reader.GetString();
+ if (!reader.Read())
+ {
+ throw new AvroFormatException("unexpected end of stream while reading Avro type object property");
+ }
+
+ switch (prop)
+ {
+ case "type":
+ if (reader.TokenType == JsonTokenType.String)
+ {
+ typeTag = reader.GetString();
+ }
+ else if (reader.TokenType is JsonTokenType.StartObject or JsonTokenType.StartArray)
+ {
+ // {"type": {"type": "..."}} or {"type": [null, ...]} — a wrapped schema. Unwrap and return.
+ AvroSchema inner = ReadSchema(ref reader);
+ // Skip remaining properties of the outer object.
+ SkipRemainingProperties(ref reader);
+ return inner;
+ }
+ else
+ {
+ throw new AvroFormatException($"unexpected token {reader.TokenType} for 'type' property");
+ }
+
+ break;
+
+ case "name": name = reader.GetString(); break;
+ case "size": size = reader.GetInt32(); break;
+ case "symbols":
+ symbols = ReadStringArray(ref reader);
+ break;
+ case "items":
+ items = ReadSchema(ref reader);
+ break;
+ case "values":
+ values = ReadSchema(ref reader);
+ break;
+ case "fields":
+ fields = ReadRecordFields(ref reader);
+ break;
+ case "logicalType": logicalType = reader.GetString(); break;
+ case "precision": precision = reader.GetInt32(); break;
+ case "scale": scale = reader.GetInt32(); break;
+ case "element-id": elementId = reader.GetInt32(); break;
+ case "key-id": keyId = reader.GetInt32(); break;
+ case "value-id": valueId = reader.GetInt32(); break;
+ case "aliases":
+ // Aliases are not supported but we tolerate their presence by skipping.
+ reader.Skip();
+ break;
+ default:
+ reader.Skip();
+ break;
+ }
+ }
+
+ if (!sawEndObject)
+ {
+ throw new AvroFormatException("unexpected end of stream while reading Avro type object");
+ }
+
+ if (typeTag is null)
+ {
+ throw new AvroFormatException("Avro type object is missing 'type'");
+ }
+
+ return BuildFromTag(typeTag, name, size, symbols, items, values, fields, logicalType, precision, scale, elementId, keyId, valueId);
+ }
+
+ private static void SkipRemainingProperties(ref Utf8JsonReader reader)
+ {
+ while (reader.Read())
+ {
+ if (reader.TokenType == JsonTokenType.EndObject)
+ {
+ return;
+ }
+
+ if (reader.TokenType == JsonTokenType.PropertyName)
+ {
+ if (!reader.Read())
+ {
+ throw new AvroFormatException("unexpected end of stream while skipping Avro type object property");
+ }
+
+ reader.Skip();
+ }
+ }
+
+ throw new AvroFormatException("unexpected end of stream while skipping Avro type object");
+ }
+
+ private static AvroSchema BuildFromTag(
+ string typeTag,
+ string? name,
+ int? size,
+ List? symbols,
+ AvroSchema? items,
+ AvroSchema? values,
+ List? fields,
+ string? logicalType,
+ int? precision,
+ int? scale,
+ int? elementId,
+ int? keyId,
+ int? valueId)
+ {
+ switch (typeTag)
+ {
+ case "record":
+ if (name is null || fields is null)
+ {
+ throw new AvroFormatException("record schema requires 'name' and 'fields'");
+ }
+
+ return new AvroRecord(name, fields);
+
+ case "array":
+ if (items is null)
+ {
+ throw new AvroFormatException("array schema requires 'items'");
+ }
+
+ return new AvroArray(items) { ElementId = elementId };
+
+ case "map":
+ if (values is null)
+ {
+ throw new AvroFormatException("map schema requires 'values'");
+ }
+
+ return new AvroMap(values) { KeyId = keyId, ValueId = valueId };
+
+ case "fixed":
+ if (name is null || size is null)
+ {
+ throw new AvroFormatException("fixed schema requires 'name' and 'size'");
+ }
+
+ if (size <= 0)
+ {
+ throw new AvroFormatException($"fixed schema size must be positive, got {size}");
+ }
+
+ return ApplyLogicalType(new AvroFixed(name, size.Value), logicalType, precision, scale);
+
+ case "enum":
+ if (name is null || symbols is null)
+ {
+ throw new AvroFormatException("enum schema requires 'name' and 'symbols'");
+ }
+
+ return new AvroEnum(name, symbols);
+
+ default:
+ // Primitive type encoded as object (e.g. {"type": "long", "logicalType": "timestamp-micros"}).
+ AvroSchema prim = ParsePrimitiveName(typeTag);
+ return ApplyLogicalType(prim, logicalType, precision, scale);
+ }
+ }
+
+ private static AvroSchema ApplyLogicalType(AvroSchema schema, string? logicalType, int? precision, int? scale)
+ {
+ if (logicalType is null)
+ {
+ return schema;
+ }
+
+ AvroLogicalType kind = logicalType switch
+ {
+ "decimal" => AvroLogicalType.Decimal,
+ "date" => AvroLogicalType.Date,
+ "time-millis" => AvroLogicalType.TimeMillis,
+ "time-micros" => AvroLogicalType.TimeMicros,
+ "timestamp-millis" => AvroLogicalType.TimestampMillis,
+ "timestamp-micros" => AvroLogicalType.TimestampMicros,
+ "uuid" => AvroLogicalType.Uuid,
+ _ => AvroLogicalType.None, // unknown logical types degrade to the underlying primitive
+ };
+
+ if (kind == AvroLogicalType.None || !IsLogicalTypeCompatible(schema, kind))
+ {
+ return schema;
+ }
+
+ if (kind == AvroLogicalType.Decimal)
+ {
+ if (precision is null or <= 0)
+ {
+ throw new AvroFormatException("decimal logical type requires positive 'precision'");
+ }
+
+ if (scale is < 0 || scale > precision)
+ {
+ throw new AvroFormatException($"decimal logical type scale {scale} must be between 0 and precision {precision}");
+ }
+ }
+
+ return schema switch
+ {
+ AvroPrimitive p => new AvroPrimitive(p.Kind)
+ {
+ LogicalType = kind,
+ DecimalPrecision = precision ?? 0,
+ DecimalScale = scale ?? 0,
+ },
+ AvroFixed f => new AvroFixed(f.Name, f.Size)
+ {
+ LogicalType = kind,
+ DecimalPrecision = precision ?? 0,
+ DecimalScale = scale ?? 0,
+ },
+ _ => schema,
+ };
+ }
+
+ private static bool IsLogicalTypeCompatible(AvroSchema schema, AvroLogicalType logicalType)
+ {
+ return logicalType switch
+ {
+ AvroLogicalType.Decimal => schema is AvroPrimitive { Kind: AvroPrimitiveKind.Bytes } or AvroFixed,
+ AvroLogicalType.Date => schema is AvroPrimitive { Kind: AvroPrimitiveKind.Int },
+ AvroLogicalType.TimeMillis => schema is AvroPrimitive { Kind: AvroPrimitiveKind.Int },
+ AvroLogicalType.TimeMicros => schema is AvroPrimitive { Kind: AvroPrimitiveKind.Long },
+ AvroLogicalType.TimestampMillis => schema is AvroPrimitive { Kind: AvroPrimitiveKind.Long },
+ AvroLogicalType.TimestampMicros => schema is AvroPrimitive { Kind: AvroPrimitiveKind.Long },
+ AvroLogicalType.Uuid => schema is AvroPrimitive { Kind: AvroPrimitiveKind.String } or AvroFixed { Size: 16 },
+ _ => false,
+ };
+ }
+
+ private static List ReadStringArray(ref Utf8JsonReader reader)
+ {
+ if (reader.TokenType != JsonTokenType.StartArray)
+ {
+ throw new AvroFormatException($"expected array, got {reader.TokenType}");
+ }
+
+ var list = new List();
+ while (reader.Read())
+ {
+ if (reader.TokenType == JsonTokenType.EndArray)
+ {
+ return list;
+ }
+
+ if (reader.TokenType != JsonTokenType.String)
+ {
+ throw new AvroFormatException($"expected string in array, got {reader.TokenType}");
+ }
+
+ list.Add(reader.GetString()!);
+ }
+
+ throw new AvroFormatException("unexpected end of stream while reading string array");
+ }
+
+ private static List ReadRecordFields(ref Utf8JsonReader reader)
+ {
+ if (reader.TokenType != JsonTokenType.StartArray)
+ {
+ throw new AvroFormatException($"expected array for 'fields', got {reader.TokenType}");
+ }
+
+ var list = new List();
+ while (reader.Read())
+ {
+ if (reader.TokenType == JsonTokenType.EndArray)
+ {
+ return list;
+ }
+
+ if (reader.TokenType != JsonTokenType.StartObject)
+ {
+ throw new AvroFormatException($"expected field object, got {reader.TokenType}");
+ }
+
+ list.Add(ReadRecordField(ref reader));
+ }
+
+ throw new AvroFormatException("unexpected end of stream while reading record fields");
+ }
+
+ private static AvroRecordField ReadRecordField(ref Utf8JsonReader reader)
+ {
+ string? name = null;
+ AvroSchema? schema = null;
+ int? fieldId = null;
+
+ while (reader.Read())
+ {
+ if (reader.TokenType == JsonTokenType.EndObject)
+ {
+ if (name is null || schema is null)
+ {
+ throw new AvroFormatException("record field requires 'name' and 'type'");
+ }
+
+ return new AvroRecordField(name, schema, fieldId);
+ }
+
+ if (reader.TokenType != JsonTokenType.PropertyName)
+ {
+ throw new AvroFormatException($"unexpected token {reader.TokenType} while reading field");
+ }
+
+ var prop = reader.GetString();
+ if (!reader.Read())
+ {
+ throw new AvroFormatException("unexpected end of stream while reading record field property");
+ }
+
+ switch (prop)
+ {
+ case "name": name = reader.GetString(); break;
+ case "type": schema = ReadSchema(ref reader); break;
+ case "field-id": fieldId = reader.GetInt32(); break;
+ case "doc":
+ case "default":
+ case "order":
+ case "aliases":
+ case "adjust-to-utc":
+ reader.Skip();
+ break;
+ default:
+ // Tolerate unknown keys on fields (writers add metadata such as "field-id" before it was standard).
+ if (prop is not null && prop.EndsWith("-id", StringComparison.Ordinal) && reader.TokenType == JsonTokenType.Number)
+ {
+ // Try to capture any "*-id" property the writer attached; we already pulled the most common one.
+ _ = reader.GetInt32();
+ }
+ else
+ {
+ reader.Skip();
+ }
+
+ break;
+ }
+ }
+
+ throw new AvroFormatException("unexpected end of stream while reading record field");
+ }
+}
diff --git a/src/IcebergSharp.Avro/ManifestListReadOptions.cs b/src/IcebergSharp.Avro/ManifestListReadOptions.cs
new file mode 100644
index 0000000..929e37a
--- /dev/null
+++ b/src/IcebergSharp.Avro/ManifestListReadOptions.cs
@@ -0,0 +1,13 @@
+namespace IcebergSharp.Avro;
+
+public sealed class ManifestListReadOptions
+{
+ /// If true, the caller's stream is not disposed when the iterator completes.
+ public bool LeaveOpen { get; init; }
+
+ /// Hard cap on the size of a single Avro data block, in bytes. Guards against malformed files.
+ public int MaxBlockSize { get; init; } = 64 * 1024 * 1024;
+
+ /// Hard cap on records declared by a single Avro data block. Guards against hostile block headers.
+ public int MaxBlockRecordCount { get; init; } = 1_000_000;
+}
diff --git a/src/IcebergSharp.Avro/ManifestListReader.cs b/src/IcebergSharp.Avro/ManifestListReader.cs
new file mode 100644
index 0000000..b1fbac0
--- /dev/null
+++ b/src/IcebergSharp.Avro/ManifestListReader.cs
@@ -0,0 +1,95 @@
+using System.Runtime.CompilerServices;
+using System.Text;
+using IcebergSharp.Avro.Internal.Decode;
+using IcebergSharp.Avro.Internal.Errors;
+using IcebergSharp.Avro.Internal.Ocf;
+using IcebergSharp.Avro.Internal.Schema;
+
+namespace IcebergSharp.Avro;
+
+///
+/// Reads an Iceberg manifest-list file (an Avro OCF whose records describe
+/// s) into a streaming sequence.
+///
+///
+/// The reader is async at the I/O layer but decodes each block synchronously;
+/// every yielded comes from an already-buffered
+/// decompressed block, so the inner loop has no suspension points.
+///
+public static class ManifestListReader
+{
+ public static IAsyncEnumerable ReadAsync(Stream stream, CancellationToken cancellationToken = default)
+ => ReadAsync(stream, options: null, cancellationToken);
+
+ public static async IAsyncEnumerable ReadAsync(
+ Stream stream,
+ ManifestListReadOptions? options,
+ [EnumeratorCancellation] CancellationToken cancellationToken = default)
+ {
+ ArgumentNullException.ThrowIfNull(stream);
+ options ??= new ManifestListReadOptions();
+
+ await using var reader = new OcfReader(stream, options.LeaveOpen, options.MaxBlockSize, options.MaxBlockRecordCount);
+ await reader.InitializeAsync(cancellationToken).ConfigureAwait(false);
+
+ AvroSchema parsedSchema = AvroSchemaParser.Parse(reader.Header.SchemaJson);
+ if (parsedSchema is not AvroRecord record)
+ {
+ throw new AvroFormatException($"manifest list root schema must be a record, got {parsedSchema.GetType().Name}");
+ }
+
+ var formatVersion = DetectFormatVersion(reader.Header.Metadata, record);
+
+ while (await reader.ReadNextBlockAsync(cancellationToken).ConfigureAwait(false))
+ {
+ // Materialise the block sync; BinaryDecoder is a ref struct and can't
+ // cross a yield boundary.
+ List entries = DecodeBlock(reader, record, formatVersion);
+ foreach (ManifestFile m in entries)
+ {
+ yield return m;
+ }
+ }
+ }
+
+ private static List DecodeBlock(OcfReader reader, AvroRecord record, int formatVersion)
+ {
+ var result = new List(reader.CurrentRecordCount);
+ BinaryDecoder decoder = reader.OpenBlockDecoder();
+ for (var i = 0; i < reader.CurrentRecordCount; i++)
+ {
+ result.Add(ManifestFileSink.Decode(ref decoder, record, formatVersion));
+ }
+
+ if (!decoder.EndOfBuffer)
+ {
+ throw new AvroFormatException($"block has {decoder.Length - decoder.Position} unread bytes after {reader.CurrentRecordCount} records");
+ }
+
+ return result;
+ }
+
+ private static int DetectFormatVersion(IReadOnlyDictionary metadata, AvroRecord record)
+ {
+ if (metadata.TryGetValue("format-version", out var bytes))
+ {
+ var s = Encoding.UTF8.GetString(bytes);
+ if (int.TryParse(s, out var v) && v is >= 1 and <= 2)
+ {
+ return v;
+ }
+ }
+
+ // v2 manifest-list records carry sequence_number (515) and content (517);
+ // v1 records do not. Use that as the structural signal when metadata is silent.
+ foreach (AvroRecordField f in record.Fields)
+ {
+ if (f.FieldId == Internal.Decode.IcebergFieldIds.SequenceNumber)
+ {
+ return 2;
+ }
+ }
+
+ return 1;
+ }
+}
diff --git a/src/IcebergSharp.Avro/ManifestReadOptions.cs b/src/IcebergSharp.Avro/ManifestReadOptions.cs
new file mode 100644
index 0000000..4f441b9
--- /dev/null
+++ b/src/IcebergSharp.Avro/ManifestReadOptions.cs
@@ -0,0 +1,13 @@
+namespace IcebergSharp.Avro;
+
+public sealed class ManifestReadOptions
+{
+ /// If true, the caller's stream is not disposed when the iterator completes.
+ public bool LeaveOpen { get; init; }
+
+ /// Hard cap on the size of a single Avro data block, in bytes. Guards against malformed files.
+ public int MaxBlockSize { get; init; } = 64 * 1024 * 1024;
+
+ /// Hard cap on records declared by a single Avro data block. Guards against hostile block headers.
+ public int MaxBlockRecordCount { get; init; } = 1_000_000;
+}
diff --git a/src/IcebergSharp.Avro/ManifestReader.cs b/src/IcebergSharp.Avro/ManifestReader.cs
new file mode 100644
index 0000000..34347fd
--- /dev/null
+++ b/src/IcebergSharp.Avro/ManifestReader.cs
@@ -0,0 +1,65 @@
+using System.Runtime.CompilerServices;
+using IcebergSharp.Avro.Internal.Decode;
+using IcebergSharp.Avro.Internal.Errors;
+using IcebergSharp.Avro.Internal.Ocf;
+using IcebergSharp.Avro.Internal.Schema;
+
+namespace IcebergSharp.Avro;
+
+///
+/// Reads an Iceberg manifest file (an Avro OCF whose records describe
+/// s) into a streaming sequence. Delete-content
+/// entries are emitted unfiltered — the caller inspects
+/// and skips with a warning per the v1 read-only scope.
+///
+public static class ManifestReader
+{
+ public static IAsyncEnumerable ReadAsync(Stream stream, CancellationToken cancellationToken = default)
+ => ReadAsync(stream, options: null, cancellationToken);
+
+ public static async IAsyncEnumerable ReadAsync(
+ Stream stream,
+ ManifestReadOptions? options,
+ [EnumeratorCancellation] CancellationToken cancellationToken = default)
+ {
+ ArgumentNullException.ThrowIfNull(stream);
+ options ??= new ManifestReadOptions();
+
+ await using var reader = new OcfReader(stream, options.LeaveOpen, options.MaxBlockSize, options.MaxBlockRecordCount);
+ await reader.InitializeAsync(cancellationToken).ConfigureAwait(false);
+
+ AvroSchema parsedSchema = AvroSchemaParser.Parse(reader.Header.SchemaJson);
+ if (parsedSchema is not AvroRecord record)
+ {
+ throw new AvroFormatException($"manifest root schema must be a record, got {parsedSchema.GetType().Name}");
+ }
+
+ var sink = new ManifestEntrySink(record);
+
+ while (await reader.ReadNextBlockAsync(cancellationToken).ConfigureAwait(false))
+ {
+ List entries = DecodeBlock(reader, sink);
+ foreach (ManifestEntry m in entries)
+ {
+ yield return m;
+ }
+ }
+ }
+
+ private static List DecodeBlock(OcfReader reader, ManifestEntrySink sink)
+ {
+ var result = new List(reader.CurrentRecordCount);
+ BinaryDecoder decoder = reader.OpenBlockDecoder();
+ for (var i = 0; i < reader.CurrentRecordCount; i++)
+ {
+ result.Add(sink.Decode(ref decoder));
+ }
+
+ if (!decoder.EndOfBuffer)
+ {
+ throw new AvroFormatException($"block has {decoder.Length - decoder.Position} unread bytes after {reader.CurrentRecordCount} records");
+ }
+
+ return result;
+ }
+}
diff --git a/src/IcebergSharp.Avro/Properties/AssemblyInfo.cs b/src/IcebergSharp.Avro/Properties/AssemblyInfo.cs
new file mode 100644
index 0000000..d350171
--- /dev/null
+++ b/src/IcebergSharp.Avro/Properties/AssemblyInfo.cs
@@ -0,0 +1,3 @@
+using System.Runtime.CompilerServices;
+
+[assembly: InternalsVisibleTo("IcebergSharp.Tests")]
diff --git a/src/IcebergSharp.Core/ManifestEntry.cs b/src/IcebergSharp.Core/ManifestEntry.cs
new file mode 100644
index 0000000..8b4d923
--- /dev/null
+++ b/src/IcebergSharp.Core/ManifestEntry.cs
@@ -0,0 +1,180 @@
+using System.Collections.ObjectModel;
+
+namespace IcebergSharp;
+
+///
+/// Status of a relative to the snapshot that wrote
+/// the manifest. Matches Iceberg spec field 0.
+///
+public enum ManifestEntryStatus
+{
+ Existing = 0,
+ Added = 1,
+ Deleted = 2,
+}
+
+///
+/// Distinguishes regular data files from delete files. v1 data files are
+/// always ; v2 entries carry the flag explicitly per spec
+/// field 134. Phase 1 (read-only, COW-only) treats delete content as a signal
+/// to skip — see README.
+///
+public enum DataFileContent
+{
+ Data = 0,
+ PositionDeletes = 1,
+ EqualityDeletes = 2,
+}
+
+///
+/// Data file storage format. The raw spec string is kept on
+/// so unknown formats (e.g. future
+/// additions to the Iceberg spec) survive a round-trip without forcing this
+/// enum to grow.
+///
+public enum DataFileFormat
+{
+ Unknown = 0,
+ Parquet,
+ Orc,
+ Avro,
+}
+
+///
+/// One data file recorded in a manifest. Carries the file's location, format,
+/// partition tuple, row count, byte size, and per-column statistics that the
+/// planner uses for stats pruning.
+///
+public sealed class DataFile
+{
+ public DataFileContent Content { get; }
+ public string FilePath { get; }
+ public DataFileFormat FileFormat { get; }
+ public string FileFormatRaw { get; }
+ public PartitionValues Partition { get; }
+ public long RecordCount { get; }
+ public long FileSizeInBytes { get; }
+ public IReadOnlyDictionary ColumnSizes { get; }
+ public IReadOnlyDictionary ValueCounts { get; }
+ public IReadOnlyDictionary NullValueCounts { get; }
+ public IReadOnlyDictionary NanValueCounts { get; }
+ public IReadOnlyDictionary> LowerBounds { get; }
+ public IReadOnlyDictionary> UpperBounds { get; }
+ public ReadOnlyMemory KeyMetadata { get; }
+ public IReadOnlyList SplitOffsets { get; }
+ public IReadOnlyList EqualityIds { get; }
+ public int? SortOrderId { get; }
+
+ public DataFile(
+ DataFileContent content,
+ string filePath,
+ string fileFormatRaw,
+ PartitionValues partition,
+ long recordCount,
+ long fileSizeInBytes,
+ IReadOnlyDictionary? columnSizes,
+ IReadOnlyDictionary? valueCounts,
+ IReadOnlyDictionary? nullValueCounts,
+ IReadOnlyDictionary? nanValueCounts,
+ IReadOnlyDictionary>? lowerBounds,
+ IReadOnlyDictionary>? upperBounds,
+ ReadOnlySpan keyMetadata,
+ IReadOnlyList? splitOffsets,
+ IReadOnlyList? equalityIds,
+ int? sortOrderId)
+ {
+ ArgumentException.ThrowIfNullOrEmpty(filePath);
+ ArgumentException.ThrowIfNullOrEmpty(fileFormatRaw);
+ ArgumentNullException.ThrowIfNull(partition);
+
+ Content = content;
+ FilePath = filePath;
+ FileFormatRaw = fileFormatRaw;
+ FileFormat = ParseFormat(fileFormatRaw);
+ Partition = partition;
+ RecordCount = recordCount;
+ FileSizeInBytes = fileSizeInBytes;
+ ColumnSizes = CopyDict(columnSizes);
+ ValueCounts = CopyDict(valueCounts);
+ NullValueCounts = CopyDict(nullValueCounts);
+ NanValueCounts = CopyDict(nanValueCounts);
+ LowerBounds = CopyDict(lowerBounds);
+ UpperBounds = CopyDict(upperBounds);
+ KeyMetadata = keyMetadata.IsEmpty ? ReadOnlyMemory.Empty : keyMetadata.ToArray();
+ SplitOffsets = splitOffsets is null ? [] : new ReadOnlyCollection([.. splitOffsets]);
+ EqualityIds = equalityIds is null ? [] : new ReadOnlyCollection([.. equalityIds]);
+ SortOrderId = sortOrderId;
+ }
+
+ private static DataFileFormat ParseFormat(string raw)
+ {
+ // Iceberg writes the format as an upper-case enum symbol ("PARQUET"),
+ // but we accept any casing to be forgiving of catalog-side rewrites.
+ if (raw.Equals("PARQUET", StringComparison.OrdinalIgnoreCase))
+ {
+ return DataFileFormat.Parquet;
+ }
+
+ if (raw.Equals("ORC", StringComparison.OrdinalIgnoreCase))
+ {
+ return DataFileFormat.Orc;
+ }
+
+ if (raw.Equals("AVRO", StringComparison.OrdinalIgnoreCase))
+ {
+ return DataFileFormat.Avro;
+ }
+
+ return DataFileFormat.Unknown;
+ }
+
+ private static IReadOnlyDictionary CopyDict(IReadOnlyDictionary? source)
+ {
+ if (source is null || source.Count == 0)
+ {
+ return EmptyReadOnlyDict.Instance;
+ }
+
+ var copy = new Dictionary(source.Count);
+ foreach (KeyValuePair kv in source)
+ {
+ copy[kv.Key] = kv.Value;
+ }
+
+ return new ReadOnlyDictionary(copy);
+ }
+
+ private static class EmptyReadOnlyDict
+ {
+ public static readonly IReadOnlyDictionary Instance = new ReadOnlyDictionary(new Dictionary(0));
+ }
+}
+
+///
+/// One row of an Iceberg manifest: a plus the
+/// status/sequence/snapshot tagging that lets the planner decide whether the
+/// file is part of the snapshot being scanned.
+///
+public sealed class ManifestEntry
+{
+ public ManifestEntryStatus Status { get; }
+ public long? SnapshotId { get; }
+ public long? SequenceNumber { get; }
+ public long? FileSequenceNumber { get; }
+ public DataFile DataFile { get; }
+
+ public ManifestEntry(
+ ManifestEntryStatus status,
+ long? snapshotId,
+ long? sequenceNumber,
+ long? fileSequenceNumber,
+ DataFile dataFile)
+ {
+ ArgumentNullException.ThrowIfNull(dataFile);
+ Status = status;
+ SnapshotId = snapshotId;
+ SequenceNumber = sequenceNumber;
+ FileSequenceNumber = fileSequenceNumber;
+ DataFile = dataFile;
+ }
+}
diff --git a/src/IcebergSharp.Core/ManifestFile.cs b/src/IcebergSharp.Core/ManifestFile.cs
new file mode 100644
index 0000000..885500a
--- /dev/null
+++ b/src/IcebergSharp.Core/ManifestFile.cs
@@ -0,0 +1,124 @@
+using System.Collections.ObjectModel;
+
+namespace IcebergSharp;
+
+///
+/// Distinguishes a manifest that lists data files (the default) from one that
+/// lists delete files. v1 manifests are always ; v2 manifests
+/// carry the flag explicitly per spec field 517.
+///
+public enum ManifestContent
+{
+ Data = 0,
+ Deletes = 1,
+}
+
+///
+/// Per-partition-column summary embedded in a manifest-list entry. The reader
+/// passes / through as raw
+/// bytes — the encoding is the Iceberg single-value byte format keyed by the
+/// partition column's type, which Phase 4 decodes against the table schema.
+///
+public sealed class FieldSummary : IEquatable
+{
+ public bool ContainsNull { get; }
+ public bool? ContainsNaN { get; }
+ public ReadOnlyMemory LowerBound { get; }
+ public ReadOnlyMemory UpperBound { get; }
+
+ public FieldSummary(bool containsNull, bool? containsNaN, ReadOnlySpan lowerBound, ReadOnlySpan upperBound)
+ {
+ ContainsNull = containsNull;
+ ContainsNaN = containsNaN;
+ LowerBound = lowerBound.IsEmpty ? ReadOnlyMemory.Empty : lowerBound.ToArray();
+ UpperBound = upperBound.IsEmpty ? ReadOnlyMemory.Empty : upperBound.ToArray();
+ }
+
+ public bool Equals(FieldSummary? other)
+ => other is not null
+ && ContainsNull == other.ContainsNull
+ && ContainsNaN == other.ContainsNaN
+ && LowerBound.Span.SequenceEqual(other.LowerBound.Span)
+ && UpperBound.Span.SequenceEqual(other.UpperBound.Span);
+
+ public override bool Equals(object? obj) => Equals(obj as FieldSummary);
+
+ public override int GetHashCode()
+ {
+ var hc = new HashCode();
+ hc.Add(ContainsNull);
+ hc.Add(ContainsNaN);
+ hc.AddBytes(LowerBound.Span);
+ hc.AddBytes(UpperBound.Span);
+ return hc.ToHashCode();
+ }
+}
+
+///
+/// One entry of a snapshot's manifest-list file. Points at an Iceberg manifest
+/// (an Avro file listing data or delete files) and carries the per-partition
+/// bounds the planner needs to prune at the manifest level before opening it.
+///
+public sealed class ManifestFile
+{
+ public string ManifestPath { get; }
+ public long ManifestLength { get; }
+ public int PartitionSpecId { get; }
+ public ManifestContent Content { get; }
+ public long SequenceNumber { get; }
+ public long MinSequenceNumber { get; }
+ public long AddedSnapshotId { get; }
+ public int AddedFilesCount { get; }
+ public int ExistingFilesCount { get; }
+ public int DeletedFilesCount { get; }
+ public long AddedRowsCount { get; }
+ public long ExistingRowsCount { get; }
+ public long DeletedRowsCount { get; }
+ public IReadOnlyList Partitions { get; }
+ public ReadOnlyMemory KeyMetadata { get; }
+ public int FormatVersion { get; }
+
+ public ManifestFile(
+ string manifestPath,
+ long manifestLength,
+ int partitionSpecId,
+ ManifestContent content,
+ long sequenceNumber,
+ long minSequenceNumber,
+ long addedSnapshotId,
+ int addedFilesCount,
+ int existingFilesCount,
+ int deletedFilesCount,
+ long addedRowsCount,
+ long existingRowsCount,
+ long deletedRowsCount,
+ IReadOnlyList? partitions,
+ ReadOnlySpan keyMetadata,
+ int formatVersion)
+ {
+ ArgumentException.ThrowIfNullOrEmpty(manifestPath);
+ if (formatVersion is < 1 or > 2)
+ {
+ throw new ArgumentOutOfRangeException(nameof(formatVersion), formatVersion, "supported manifest format versions are 1 and 2");
+ }
+
+ ManifestPath = manifestPath;
+ ManifestLength = manifestLength;
+ PartitionSpecId = partitionSpecId;
+ Content = content;
+ SequenceNumber = sequenceNumber;
+ MinSequenceNumber = minSequenceNumber;
+ AddedSnapshotId = addedSnapshotId;
+ AddedFilesCount = addedFilesCount;
+ ExistingFilesCount = existingFilesCount;
+ DeletedFilesCount = deletedFilesCount;
+ AddedRowsCount = addedRowsCount;
+ ExistingRowsCount = existingRowsCount;
+ DeletedRowsCount = deletedRowsCount;
+ Partitions = partitions is null
+ ? []
+ : new ReadOnlyCollection([.. partitions]);
+ KeyMetadata = keyMetadata.IsEmpty ? ReadOnlyMemory.Empty : keyMetadata.ToArray();
+ FormatVersion = formatVersion;
+ }
+}
diff --git a/src/IcebergSharp.Core/PartitionValues.cs b/src/IcebergSharp.Core/PartitionValues.cs
new file mode 100644
index 0000000..da787ba
--- /dev/null
+++ b/src/IcebergSharp.Core/PartitionValues.cs
@@ -0,0 +1,72 @@
+using System.Collections.ObjectModel;
+using IcebergSharp.Types;
+
+namespace IcebergSharp;
+
+///
+/// Decoded partition tuple for a single data file. The shape of the tuple is
+/// determined at write time by the manifest's partition-spec metadata — this
+/// type carries the resolved names and types alongside the boxed values so the
+/// planner can reason about them without re-reading the spec.
+///
+///
+/// Values are object? because the partition spec is dynamic per
+/// manifest; Phase 4 compares them against boxed literals from the expression
+/// DSL. null is a legal partition value (the partition column is
+/// nullable for that file).
+///
+public sealed class PartitionValues
+{
+ private readonly object?[] _values;
+ private readonly Dictionary _indexByName;
+
+ public IReadOnlyList Names { get; }
+ public IReadOnlyList Types { get; }
+
+ public PartitionValues(IReadOnlyList names, IReadOnlyList types, IReadOnlyList