From b0e2d68be9d577defb8f31cd2d5e0f4fe0fb6124 Mon Sep 17 00:00:00 2001 From: Stanley Shyiko Date: Wed, 28 May 2025 13:42:19 -0700 Subject: [PATCH 1/3] Overlay iceberg-parquet-1.8.1 for uint type handling --- .../data/parquet/BaseParquetReaders.java | 438 ++++++++++++++++++ .../data/parquet/BaseParquetWriter.java | 264 +++++++++++ .../iceberg/parquet/MessageTypeToType.java | 259 +++++++++++ .../iceberg/parquet/ParquetConversions.java | 125 +++++ pom.xml | 1 + 5 files changed, 1087 insertions(+) create mode 100644 ice/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java create mode 100644 ice/src/main/java/org/apache/iceberg/data/parquet/BaseParquetWriter.java create mode 100644 ice/src/main/java/org/apache/iceberg/parquet/MessageTypeToType.java create mode 100644 ice/src/main/java/org/apache/iceberg/parquet/ParquetConversions.java diff --git a/ice/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java b/ice/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java new file mode 100644 index 00000000..70e6b3ff --- /dev/null +++ b/ice/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java @@ -0,0 +1,438 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.data.parquet; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.Schema; +import org.apache.iceberg.parquet.ParquetSchemaUtil; +import org.apache.iceberg.parquet.ParquetValueReader; +import org.apache.iceberg.parquet.ParquetValueReaders; +import org.apache.iceberg.parquet.TypeWithSchemaVisitor; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Type.TypeID; +import org.apache.iceberg.types.Types; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.DateLogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.EnumLogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.IntLogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.JsonLogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.LogicalTypeAnnotationVisitor; +import org.apache.parquet.schema.LogicalTypeAnnotation.StringLogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.TimeLogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.TimestampLogicalTypeAnnotation; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; + +/** + * @deprecated since 1.8.0, will be made package-private in 1.9.0 + */ +@Deprecated +public abstract class BaseParquetReaders { + protected BaseParquetReaders() {} + + protected ParquetValueReader createReader(Schema expectedSchema, MessageType fileSchema) { + return createReader(expectedSchema, fileSchema, ImmutableMap.of()); + } + + @SuppressWarnings("unchecked") + protected ParquetValueReader createReader( + Schema expectedSchema, MessageType fileSchema, Map idToConstant) { + if (ParquetSchemaUtil.hasIds(fileSchema)) { + return (ParquetValueReader) + TypeWithSchemaVisitor.visit( + expectedSchema.asStruct(), fileSchema, new ReadBuilder(fileSchema, idToConstant)); + } else { + return (ParquetValueReader) + TypeWithSchemaVisitor.visit( + expectedSchema.asStruct(), + fileSchema, + new FallbackReadBuilder(fileSchema, idToConstant)); + } + } + + protected abstract ParquetValueReader createStructReader( + List types, List> fieldReaders, Types.StructType structType); + + protected ParquetValueReader fixedReader(ColumnDescriptor desc) { + return new GenericParquetReaders.FixedReader(desc); + } + + protected ParquetValueReader dateReader(ColumnDescriptor desc) { + return new GenericParquetReaders.DateReader(desc); + } + + protected ParquetValueReader timeReader(ColumnDescriptor desc) { + LogicalTypeAnnotation time = desc.getPrimitiveType().getLogicalTypeAnnotation(); + Preconditions.checkArgument( + time instanceof TimeLogicalTypeAnnotation, "Invalid time logical type: " + time); + + LogicalTypeAnnotation.TimeUnit unit = ((TimeLogicalTypeAnnotation) time).getUnit(); + switch (unit) { + case MICROS: + return new GenericParquetReaders.TimeReader(desc); + case MILLIS: + return new GenericParquetReaders.TimeMillisReader(desc); + default: + throw new UnsupportedOperationException("Unsupported unit for time: " + unit); + } + } + + protected ParquetValueReader timestampReader(ColumnDescriptor desc, boolean isAdjustedToUTC) { + if (desc.getPrimitiveType().getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.INT96) { + return new GenericParquetReaders.TimestampInt96Reader(desc); + } + + LogicalTypeAnnotation timestamp = desc.getPrimitiveType().getLogicalTypeAnnotation(); + Preconditions.checkArgument( + timestamp instanceof TimestampLogicalTypeAnnotation, + "Invalid timestamp logical type: " + timestamp); + + LogicalTypeAnnotation.TimeUnit unit = ((TimestampLogicalTypeAnnotation) timestamp).getUnit(); + switch (unit) { + case MICROS: + return isAdjustedToUTC + ? new GenericParquetReaders.TimestamptzReader(desc) + : new GenericParquetReaders.TimestampReader(desc); + case MILLIS: + return isAdjustedToUTC + ? new GenericParquetReaders.TimestamptzMillisReader(desc) + : new GenericParquetReaders.TimestampMillisReader(desc); + default: + throw new UnsupportedOperationException("Unsupported unit for timestamp: " + unit); + } + } + + protected Object convertConstant(org.apache.iceberg.types.Type type, Object value) { + return value; + } + + private class FallbackReadBuilder extends ReadBuilder { + private FallbackReadBuilder(MessageType type, Map idToConstant) { + super(type, idToConstant); + } + + @Override + public ParquetValueReader message( + Types.StructType expected, MessageType message, List> fieldReaders) { + // the top level matches by ID, but the remaining IDs are missing + return super.struct(expected, message, fieldReaders); + } + + @Override + public ParquetValueReader struct( + Types.StructType expected, GroupType struct, List> fieldReaders) { + // the expected struct is ignored because nested fields are never found when the + List> newFields = + Lists.newArrayListWithExpectedSize(fieldReaders.size()); + List types = Lists.newArrayListWithExpectedSize(fieldReaders.size()); + List fields = struct.getFields(); + for (int i = 0; i < fields.size(); i += 1) { + ParquetValueReader fieldReader = fieldReaders.get(i); + if (fieldReader != null) { + Type fieldType = fields.get(i); + int fieldD = type().getMaxDefinitionLevel(path(fieldType.getName())) - 1; + newFields.add(ParquetValueReaders.option(fieldType, fieldD, fieldReader)); + types.add(fieldType); + } + } + + return createStructReader(types, newFields, expected); + } + } + + private class LogicalTypeReadBuilder + implements LogicalTypeAnnotationVisitor> { + + private final ColumnDescriptor desc; + private final org.apache.iceberg.types.Type.PrimitiveType expected; + + LogicalTypeReadBuilder( + ColumnDescriptor desc, org.apache.iceberg.types.Type.PrimitiveType expected) { + this.desc = desc; + this.expected = expected; + } + + @Override + public Optional> visit(StringLogicalTypeAnnotation stringLogicalType) { + return Optional.of(ParquetValueReaders.strings(desc)); + } + + @Override + public Optional> visit(EnumLogicalTypeAnnotation enumLogicalType) { + return Optional.of(ParquetValueReaders.strings(desc)); + } + + @Override + public Optional> visit(DecimalLogicalTypeAnnotation decimalLogicalType) { + return Optional.of(ParquetValueReaders.bigDecimals(desc)); + } + + @Override + public Optional> visit(DateLogicalTypeAnnotation dateLogicalType) { + return Optional.of(dateReader(desc)); + } + + @Override + public Optional> visit(TimeLogicalTypeAnnotation timeLogicalType) { + return Optional.of(timeReader(desc)); + } + + @Override + public Optional> visit( + TimestampLogicalTypeAnnotation timestampLogicalType) { + return Optional.of( + timestampReader(desc, ((Types.TimestampType) expected).shouldAdjustToUTC())); + } + + @Override + public Optional> visit(IntLogicalTypeAnnotation intLogicalType) { + if (intLogicalType.getBitWidth() == 64) { + Preconditions.checkArgument( + intLogicalType.isSigned(), "Cannot read UINT64 as a long value"); + + return Optional.of(new ParquetValueReaders.UnboxedReader<>(desc)); + } + + if (expected.typeId() == TypeID.LONG) { + return Optional.of(new ParquetValueReaders.IntAsLongReader(desc)); + } + + Preconditions.checkArgument( + intLogicalType.isSigned() || intLogicalType.getBitWidth() < 32, + "Cannot read UINT32 as an int value"); + + return Optional.of(new ParquetValueReaders.UnboxedReader<>(desc)); + } + + @Override + public Optional> visit(JsonLogicalTypeAnnotation jsonLogicalType) { + return Optional.of(ParquetValueReaders.strings(desc)); + } + + @Override + public Optional> visit( + LogicalTypeAnnotation.BsonLogicalTypeAnnotation bsonLogicalType) { + return Optional.of(ParquetValueReaders.byteBuffers(desc)); + } + + @Override + public Optional> visit( + LogicalTypeAnnotation.UUIDLogicalTypeAnnotation uuidLogicalType) { + return Optional.of(ParquetValueReaders.uuids(desc)); + } + } + + private class ReadBuilder extends TypeWithSchemaVisitor> { + private final MessageType type; + private final Map idToConstant; + + private ReadBuilder(MessageType type, Map idToConstant) { + this.type = type; + this.idToConstant = idToConstant; + } + + @Override + public ParquetValueReader message( + Types.StructType expected, MessageType message, List> fieldReaders) { + return struct(expected, message.asGroupType(), fieldReaders); + } + + @Override + public ParquetValueReader struct( + Types.StructType expected, GroupType struct, List> fieldReaders) { + // match the expected struct's order + Map> readersById = Maps.newHashMap(); + Map typesById = Maps.newHashMap(); + Map maxDefinitionLevelsById = Maps.newHashMap(); + List fields = struct.getFields(); + for (int i = 0; i < fields.size(); i += 1) { + ParquetValueReader fieldReader = fieldReaders.get(i); + if (fieldReader != null) { + Type fieldType = fields.get(i); + int fieldD = type.getMaxDefinitionLevel(path(fieldType.getName())) - 1; + int id = fieldType.getId().intValue(); + readersById.put(id, ParquetValueReaders.option(fieldType, fieldD, fieldReader)); + typesById.put(id, fieldType); + if (idToConstant.containsKey(id)) { + maxDefinitionLevelsById.put(id, fieldD); + } + } + } + + List expectedFields = + expected != null ? expected.fields() : ImmutableList.of(); + List> reorderedFields = + Lists.newArrayListWithExpectedSize(expectedFields.size()); + List types = Lists.newArrayListWithExpectedSize(expectedFields.size()); + // Defaulting to parent max definition level + int defaultMaxDefinitionLevel = type.getMaxDefinitionLevel(currentPath()); + for (Types.NestedField field : expectedFields) { + int id = field.fieldId(); + ParquetValueReader reader = readersById.get(id); + if (idToConstant.containsKey(id)) { + // containsKey is used because the constant may be null + int fieldMaxDefinitionLevel = + maxDefinitionLevelsById.getOrDefault(id, defaultMaxDefinitionLevel); + reorderedFields.add( + ParquetValueReaders.constant(idToConstant.get(id), fieldMaxDefinitionLevel)); + types.add(null); + } else if (id == MetadataColumns.ROW_POSITION.fieldId()) { + reorderedFields.add(ParquetValueReaders.position()); + types.add(null); + } else if (id == MetadataColumns.IS_DELETED.fieldId()) { + reorderedFields.add(ParquetValueReaders.constant(false)); + types.add(null); + } else if (reader != null) { + reorderedFields.add(reader); + types.add(typesById.get(id)); + } else if (field.initialDefault() != null) { + reorderedFields.add( + ParquetValueReaders.constant( + convertConstant(field.type(), field.initialDefault()), + maxDefinitionLevelsById.getOrDefault(id, defaultMaxDefinitionLevel))); + types.add(typesById.get(id)); + } else if (field.isOptional()) { + reorderedFields.add(ParquetValueReaders.nulls()); + types.add(null); + } else { + throw new IllegalArgumentException( + String.format("Missing required field: %s", field.name())); + } + } + + return createStructReader(types, reorderedFields, expected); + } + + @Override + public ParquetValueReader list( + Types.ListType expectedList, GroupType array, ParquetValueReader elementReader) { + if (expectedList == null) { + return null; + } + + String[] repeatedPath = currentPath(); + + int repeatedD = type.getMaxDefinitionLevel(repeatedPath) - 1; + int repeatedR = type.getMaxRepetitionLevel(repeatedPath) - 1; + + Type elementType = ParquetSchemaUtil.determineListElementType(array); + int elementD = type.getMaxDefinitionLevel(path(elementType.getName())) - 1; + + return new ParquetValueReaders.ListReader<>( + repeatedD, repeatedR, ParquetValueReaders.option(elementType, elementD, elementReader)); + } + + @Override + public ParquetValueReader map( + Types.MapType expectedMap, + GroupType map, + ParquetValueReader keyReader, + ParquetValueReader valueReader) { + if (expectedMap == null) { + return null; + } + + GroupType repeatedKeyValue = map.getFields().get(0).asGroupType(); + String[] repeatedPath = currentPath(); + + int repeatedD = type.getMaxDefinitionLevel(repeatedPath) - 1; + int repeatedR = type.getMaxRepetitionLevel(repeatedPath) - 1; + + Type keyType = repeatedKeyValue.getType(0); + int keyD = type.getMaxDefinitionLevel(path(keyType.getName())) - 1; + Type valueType = repeatedKeyValue.getType(1); + int valueD = type.getMaxDefinitionLevel(path(valueType.getName())) - 1; + + return new ParquetValueReaders.MapReader<>( + repeatedD, + repeatedR, + ParquetValueReaders.option(keyType, keyD, keyReader), + ParquetValueReaders.option(valueType, valueD, valueReader)); + } + + @Override + @SuppressWarnings("checkstyle:CyclomaticComplexity") + public ParquetValueReader primitive( + org.apache.iceberg.types.Type.PrimitiveType expected, PrimitiveType primitive) { + if (expected == null) { + return null; + } + + ColumnDescriptor desc = type.getColumnDescription(currentPath()); + + if (primitive.getLogicalTypeAnnotation() != null) { + return primitive + .getLogicalTypeAnnotation() + .accept(new LogicalTypeReadBuilder(desc, expected)) + .orElseThrow( + () -> + new UnsupportedOperationException( + "Unsupported logical type: " + primitive.getLogicalTypeAnnotation())); + } + + switch (primitive.getPrimitiveTypeName()) { + case FIXED_LEN_BYTE_ARRAY: + return fixedReader(desc); + case BINARY: + if (expected.typeId() == TypeID.STRING) { + return ParquetValueReaders.strings(desc); + } else { + return ParquetValueReaders.byteBuffers(desc); + } + case INT32: + if (expected.typeId() == TypeID.LONG) { + return ParquetValueReaders.intsAsLongs(desc); + } else { + return ParquetValueReaders.unboxed(desc); + } + case FLOAT: + if (expected.typeId() == TypeID.DOUBLE) { + return ParquetValueReaders.floatsAsDoubles(desc); + } else { + return ParquetValueReaders.unboxed(desc); + } + case BOOLEAN: + case INT64: + case DOUBLE: + return ParquetValueReaders.unboxed(desc); + case INT96: + // Impala & Spark used to write timestamps as INT96 without a logical type. For backwards + // compatibility we try to read INT96 as timestamps. + return timestampReader(desc, true); + default: + throw new UnsupportedOperationException("Unsupported type: " + primitive); + } + } + + MessageType type() { + return type; + } + } +} diff --git a/ice/src/main/java/org/apache/iceberg/data/parquet/BaseParquetWriter.java b/ice/src/main/java/org/apache/iceberg/data/parquet/BaseParquetWriter.java new file mode 100644 index 00000000..1bfa90b2 --- /dev/null +++ b/ice/src/main/java/org/apache/iceberg/data/parquet/BaseParquetWriter.java @@ -0,0 +1,264 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.data.parquet; + +import java.util.List; +import java.util.Optional; +import org.apache.iceberg.parquet.ParquetTypeVisitor; +import org.apache.iceberg.parquet.ParquetValueWriter; +import org.apache.iceberg.parquet.ParquetValueWriters; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; + +/** + * @deprecated since 1.8.0, will be made package-private in 1.9.0 + */ +@Deprecated +public abstract class BaseParquetWriter { + + @SuppressWarnings("unchecked") + protected ParquetValueWriter createWriter(MessageType type) { + return (ParquetValueWriter) ParquetTypeVisitor.visit(type, new WriteBuilder(type)); + } + + protected abstract ParquetValueWriters.StructWriter createStructWriter( + List> writers); + + protected ParquetValueWriter fixedWriter(ColumnDescriptor desc) { + return new GenericParquetWriter.FixedWriter(desc); + } + + protected ParquetValueWriter dateWriter(ColumnDescriptor desc) { + return new GenericParquetWriter.DateWriter(desc); + } + + protected ParquetValueWriter timeWriter(ColumnDescriptor desc) { + return new GenericParquetWriter.TimeWriter(desc); + } + + protected ParquetValueWriter timestampWriter(ColumnDescriptor desc, boolean isAdjustedToUTC) { + if (isAdjustedToUTC) { + return new GenericParquetWriter.TimestamptzWriter(desc); + } else { + return new GenericParquetWriter.TimestampWriter(desc); + } + } + + private class WriteBuilder extends ParquetTypeVisitor> { + private final MessageType type; + + private WriteBuilder(MessageType type) { + this.type = type; + } + + @Override + public ParquetValueWriter message( + MessageType message, List> fieldWriters) { + + return struct(message.asGroupType(), fieldWriters); + } + + @Override + public ParquetValueWriter struct( + GroupType struct, List> fieldWriters) { + List fields = struct.getFields(); + List> writers = Lists.newArrayListWithExpectedSize(fieldWriters.size()); + for (int i = 0; i < fields.size(); i += 1) { + Type fieldType = struct.getType(i); + int fieldD = type.getMaxDefinitionLevel(path(fieldType.getName())); + writers.add(ParquetValueWriters.option(fieldType, fieldD, fieldWriters.get(i))); + } + + return createStructWriter(writers); + } + + @Override + public ParquetValueWriter list(GroupType array, ParquetValueWriter elementWriter) { + GroupType repeated = array.getFields().get(0).asGroupType(); + String[] repeatedPath = currentPath(); + + int repeatedD = type.getMaxDefinitionLevel(repeatedPath); + int repeatedR = type.getMaxRepetitionLevel(repeatedPath); + + Type elementType = repeated.getType(0); + int elementD = type.getMaxDefinitionLevel(path(elementType.getName())); + + return ParquetValueWriters.collections( + repeatedD, repeatedR, ParquetValueWriters.option(elementType, elementD, elementWriter)); + } + + @Override + public ParquetValueWriter map( + GroupType map, ParquetValueWriter keyWriter, ParquetValueWriter valueWriter) { + GroupType repeatedKeyValue = map.getFields().get(0).asGroupType(); + String[] repeatedPath = currentPath(); + + int repeatedD = type.getMaxDefinitionLevel(repeatedPath); + int repeatedR = type.getMaxRepetitionLevel(repeatedPath); + + Type keyType = repeatedKeyValue.getType(0); + int keyD = type.getMaxDefinitionLevel(path(keyType.getName())); + Type valueType = repeatedKeyValue.getType(1); + int valueD = type.getMaxDefinitionLevel(path(valueType.getName())); + + return ParquetValueWriters.maps( + repeatedD, + repeatedR, + ParquetValueWriters.option(keyType, keyD, keyWriter), + ParquetValueWriters.option(valueType, valueD, valueWriter)); + } + + @Override + public ParquetValueWriter primitive(PrimitiveType primitive) { + ColumnDescriptor desc = type.getColumnDescription(currentPath()); + LogicalTypeAnnotation logicalType = primitive.getLogicalTypeAnnotation(); + if (logicalType != null) { + Optional> writer = + logicalType.accept(new LogicalTypeWriterVisitor(desc)); + if (writer.isPresent()) { + return writer.get(); + } + } + + switch (primitive.getPrimitiveTypeName()) { + case FIXED_LEN_BYTE_ARRAY: + return fixedWriter(desc); + case BINARY: + return ParquetValueWriters.byteBuffers(desc); + case BOOLEAN: + return ParquetValueWriters.booleans(desc); + case INT32: + return ParquetValueWriters.ints(desc); + case INT64: + return ParquetValueWriters.longs(desc); + case FLOAT: + return ParquetValueWriters.floats(desc); + case DOUBLE: + return ParquetValueWriters.doubles(desc); + default: + throw new UnsupportedOperationException("Unsupported type: " + primitive); + } + } + } + + private class LogicalTypeWriterVisitor + implements LogicalTypeAnnotation.LogicalTypeAnnotationVisitor> { + private final ColumnDescriptor desc; + + private LogicalTypeWriterVisitor(ColumnDescriptor desc) { + this.desc = desc; + } + + @Override + public Optional> visit( + LogicalTypeAnnotation.StringLogicalTypeAnnotation stringType) { + return Optional.of(ParquetValueWriters.strings(desc)); + } + + @Override + public Optional> visit( + LogicalTypeAnnotation.EnumLogicalTypeAnnotation enumType) { + return Optional.of(ParquetValueWriters.strings(desc)); + } + + @Override + public Optional> visit( + LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalType) { + switch (desc.getPrimitiveType().getPrimitiveTypeName()) { + case INT32: + return Optional.of( + ParquetValueWriters.decimalAsInteger( + desc, decimalType.getPrecision(), decimalType.getScale())); + case INT64: + return Optional.of( + ParquetValueWriters.decimalAsLong( + desc, decimalType.getPrecision(), decimalType.getScale())); + case BINARY: + case FIXED_LEN_BYTE_ARRAY: + return Optional.of( + ParquetValueWriters.decimalAsFixed( + desc, decimalType.getPrecision(), decimalType.getScale())); + } + return Optional.empty(); + } + + @Override + public Optional> visit( + LogicalTypeAnnotation.DateLogicalTypeAnnotation dateType) { + return Optional.of(dateWriter(desc)); + } + + @Override + public Optional> visit( + LogicalTypeAnnotation.TimeLogicalTypeAnnotation timeType) { + Preconditions.checkArgument( + LogicalTypeAnnotation.TimeUnit.MICROS.equals(timeType.getUnit()), + "Cannot write time in %s, only MICROS is supported", + timeType.getUnit()); + return Optional.of(timeWriter(desc)); + } + + @Override + public Optional> visit( + LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampType) { + Preconditions.checkArgument( + LogicalTypeAnnotation.TimeUnit.MICROS.equals(timestampType.getUnit()), + "Cannot write timestamp in %s, only MICROS is supported", + timestampType.getUnit()); + return Optional.of(timestampWriter(desc, timestampType.isAdjustedToUTC())); + } + + @Override + public Optional> visit( + LogicalTypeAnnotation.IntLogicalTypeAnnotation intType) { + Preconditions.checkArgument( + intType.isSigned() || intType.getBitWidth() < 64, + "Cannot read uint64: not a supported Java type"); + if (intType.getBitWidth() < 64) { + return Optional.of(ParquetValueWriters.ints(desc)); + } else { + return Optional.of(ParquetValueWriters.longs(desc)); + } + } + + @Override + public Optional> visit( + LogicalTypeAnnotation.JsonLogicalTypeAnnotation jsonLogicalType) { + return Optional.of(ParquetValueWriters.strings(desc)); + } + + @Override + public Optional> visit( + LogicalTypeAnnotation.BsonLogicalTypeAnnotation bsonType) { + return Optional.of(ParquetValueWriters.byteBuffers(desc)); + } + + @Override + public Optional> visit( + LogicalTypeAnnotation.UUIDLogicalTypeAnnotation uuidLogicalType) { + return Optional.of(ParquetValueWriters.uuids(desc)); + } + } +} diff --git a/ice/src/main/java/org/apache/iceberg/parquet/MessageTypeToType.java b/ice/src/main/java/org/apache/iceberg/parquet/MessageTypeToType.java new file mode 100644 index 00000000..26ef6e46 --- /dev/null +++ b/ice/src/main/java/org/apache/iceberg/parquet/MessageTypeToType.java @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.parquet; + +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.function.Function; +import org.apache.iceberg.relocated.com.google.common.base.Joiner; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.types.Types.TimestampType; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type.Repetition; + +/** + * A visitor that converts a {@link MessageType} to a {@link Type} in Iceberg. + * + *

Fields we could not determine IDs for will be pruned. + */ +class MessageTypeToType extends ParquetTypeVisitor { + private static final Joiner DOT = Joiner.on("."); + + private final Map aliasToId = Maps.newHashMap(); + private final Function nameToIdFunc; + + MessageTypeToType(Function nameToIdFunc) { + this.nameToIdFunc = nameToIdFunc; + } + + public Map getAliases() { + return aliasToId; + } + + @Override + public Type message(MessageType message, List fields) { + Type struct = struct(message, fields); + return struct != null ? struct : Types.StructType.of(Lists.newArrayList()); + } + + @Override + public Type struct(GroupType struct, List fieldTypes) { + List parquetFields = struct.getFields(); + List fields = Lists.newArrayListWithExpectedSize(fieldTypes.size()); + + for (int i = 0; i < parquetFields.size(); i += 1) { + org.apache.parquet.schema.Type field = parquetFields.get(i); + + Preconditions.checkArgument( + !field.isRepetition(Repetition.REPEATED), + "Fields cannot have repetition REPEATED: %s", + field); + + Integer fieldId = getId(field); + Type fieldType = fieldTypes.get(i); + + // keep the field if it has an id and it was not pruned (i.e. its type is not null) + if (fieldId != null && fieldType != null) { + addAlias(field.getName(), fieldId); + + if (parquetFields.get(i).isRepetition(Repetition.OPTIONAL)) { + fields.add(optional(fieldId, field.getName(), fieldType)); + } else { + fields.add(required(fieldId, field.getName(), fieldType)); + } + } + } + + return fields.isEmpty() ? null : Types.StructType.of(fields); + } + + @Override + public Type list(GroupType array, Type elementType) { + org.apache.parquet.schema.Type element = ParquetSchemaUtil.determineListElementType(array); + + Integer elementFieldId = getId(element); + + // keep the list if its element has an id and it was not pruned (i.e. its type is not null) + if (elementFieldId != null && elementType != null) { + addAlias(element.getName(), elementFieldId); + + if (element.isRepetition(Repetition.OPTIONAL)) { + return Types.ListType.ofOptional(elementFieldId, elementType); + } else { + return Types.ListType.ofRequired(elementFieldId, elementType); + } + } + + return null; + } + + @Override + public Type map(GroupType map, Type keyType, Type valueType) { + GroupType keyValue = map.getType(0).asGroupType(); + org.apache.parquet.schema.Type key = keyValue.getType(0); + org.apache.parquet.schema.Type value = keyValue.getType(1); + + Preconditions.checkArgument( + !value.isRepetition(Repetition.REPEATED), + "Values cannot have repetition REPEATED: %s", + value); + + Integer keyFieldId = getId(key); + Integer valueFieldId = getId(value); + + // keep the map if its key and values have ids and were not pruned (i.e. their types are not + // null) + if (keyFieldId != null && valueFieldId != null && keyType != null && valueType != null) { + addAlias(key.getName(), keyFieldId); + addAlias(value.getName(), valueFieldId); + + // check only values as keys are required by the spec + if (value.isRepetition(Repetition.OPTIONAL)) { + return Types.MapType.ofOptional(keyFieldId, valueFieldId, keyType, valueType); + } else { + return Types.MapType.ofRequired(keyFieldId, valueFieldId, keyType, valueType); + } + } + + return null; + } + + @Override + public Type primitive(PrimitiveType primitive) { + // first, use the logical type annotation, if present + LogicalTypeAnnotation logicalType = primitive.getLogicalTypeAnnotation(); + if (logicalType != null) { + Optional converted = logicalType.accept(ParquetLogicalTypeVisitor.get()); + if (converted.isPresent()) { + return converted.get(); + } + } + + // last, use the primitive type + switch (primitive.getPrimitiveTypeName()) { + case BOOLEAN: + return Types.BooleanType.get(); + case INT32: + return Types.IntegerType.get(); + case INT64: + return Types.LongType.get(); + case FLOAT: + return Types.FloatType.get(); + case DOUBLE: + return Types.DoubleType.get(); + case FIXED_LEN_BYTE_ARRAY: + return Types.FixedType.ofLength(primitive.getTypeLength()); + case INT96: + return Types.TimestampType.withZone(); + case BINARY: + return Types.BinaryType.get(); + } + + throw new UnsupportedOperationException("Cannot convert unknown primitive type: " + primitive); + } + + private static class ParquetLogicalTypeVisitor + implements LogicalTypeAnnotation.LogicalTypeAnnotationVisitor { + private static final ParquetLogicalTypeVisitor INSTANCE = new ParquetLogicalTypeVisitor(); + + private static ParquetLogicalTypeVisitor get() { + return INSTANCE; + } + + @Override + public Optional visit(LogicalTypeAnnotation.StringLogicalTypeAnnotation stringType) { + return Optional.of(Types.StringType.get()); + } + + @Override + public Optional visit(LogicalTypeAnnotation.EnumLogicalTypeAnnotation enumType) { + return Optional.of(Types.StringType.get()); + } + + @Override + public Optional visit(LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalType) { + return Optional.of(Types.DecimalType.of(decimalType.getPrecision(), decimalType.getScale())); + } + + @Override + public Optional visit(LogicalTypeAnnotation.DateLogicalTypeAnnotation dateType) { + return Optional.of(Types.DateType.get()); + } + + @Override + public Optional visit(LogicalTypeAnnotation.TimeLogicalTypeAnnotation timeType) { + return Optional.of(Types.TimeType.get()); + } + + @Override + public Optional visit( + LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampType) { + return Optional.of( + timestampType.isAdjustedToUTC() ? TimestampType.withZone() : TimestampType.withoutZone()); + } + + @Override + public Optional visit(LogicalTypeAnnotation.IntLogicalTypeAnnotation intType) { + Preconditions.checkArgument( + intType.isSigned() || intType.getBitWidth() < 64, + "Cannot use uint64: not a supported Java type"); + if (intType.getBitWidth() < 32) { + return Optional.of(Types.IntegerType.get()); + } else if (intType.getBitWidth() == 32 && intType.isSigned()) { + return Optional.of(Types.IntegerType.get()); + } else { + return Optional.of(Types.LongType.get()); + } + } + + @Override + public Optional visit(LogicalTypeAnnotation.JsonLogicalTypeAnnotation jsonType) { + return Optional.of(Types.StringType.get()); + } + + @Override + public Optional visit(LogicalTypeAnnotation.BsonLogicalTypeAnnotation bsonType) { + return Optional.of(Types.BinaryType.get()); + } + } + + private void addAlias(String name, int fieldId) { + aliasToId.put(DOT.join(path(name)), fieldId); + } + + private Integer getId(org.apache.parquet.schema.Type type) { + org.apache.parquet.schema.Type.ID id = type.getId(); + if (id != null) { + return id.intValue(); + } else { + return nameToIdFunc.apply(path(type.getName())); + } + } +} diff --git a/ice/src/main/java/org/apache/iceberg/parquet/ParquetConversions.java b/ice/src/main/java/org/apache/iceberg/parquet/ParquetConversions.java new file mode 100644 index 00000000..0f9878d2 --- /dev/null +++ b/ice/src/main/java/org/apache/iceberg/parquet/ParquetConversions.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.parquet; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.charset.StandardCharsets; +import java.util.UUID; +import java.util.function.Function; +import org.apache.iceberg.expressions.Literal; +import org.apache.iceberg.types.Type; +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation; +import org.apache.parquet.schema.PrimitiveType; + +class ParquetConversions { + private ParquetConversions() {} + + @SuppressWarnings("unchecked") + static Literal fromParquetPrimitive(Type type, PrimitiveType parquetType, Object value) { + switch (type.typeId()) { + case BOOLEAN: + return (Literal) Literal.of((Boolean) value); + case INTEGER: + case DATE: + return (Literal) Literal.of((Integer) value); + case LONG: + case TIME: + case TIMESTAMP: + return (Literal) Literal.of((Long) value); + case FLOAT: + return (Literal) Literal.of((Float) value); + case DOUBLE: + return (Literal) Literal.of((Double) value); + case STRING: + Function stringConversion = converterFromParquet(parquetType); + return (Literal) Literal.of((CharSequence) stringConversion.apply(value)); + case UUID: + Function uuidConversion = converterFromParquet(parquetType); + return (Literal) Literal.of((UUID) uuidConversion.apply(value)); + case FIXED: + case BINARY: + Function binaryConversion = converterFromParquet(parquetType); + return (Literal) Literal.of((ByteBuffer) binaryConversion.apply(value)); + case DECIMAL: + Function decimalConversion = converterFromParquet(parquetType); + return (Literal) Literal.of((BigDecimal) decimalConversion.apply(value)); + default: + throw new IllegalArgumentException("Unsupported primitive type: " + type); + } + } + + static Function converterFromParquet( + PrimitiveType parquetType, Type icebergType) { + Function fromParquet = converterFromParquet(parquetType); + if (icebergType != null) { + if (icebergType.typeId() == Type.TypeID.LONG + && parquetType.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.INT32) { + return value -> ((Integer) fromParquet.apply(value)).longValue(); + } else if (icebergType.typeId() == Type.TypeID.DOUBLE + && parquetType.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.FLOAT) { + return value -> ((Float) fromParquet.apply(value)).doubleValue(); + } + } + + return fromParquet; + } + + static Function converterFromParquet(PrimitiveType type) { + if (type.getOriginalType() != null) { + switch (type.getOriginalType()) { + case UTF8: + // decode to CharSequence to avoid copying into a new String + return binary -> StandardCharsets.UTF_8.decode(((Binary) binary).toByteBuffer()); + case DECIMAL: + DecimalLogicalTypeAnnotation decimal = + (DecimalLogicalTypeAnnotation) type.getLogicalTypeAnnotation(); + int scale = decimal.getScale(); + switch (type.getPrimitiveTypeName()) { + case INT32: + case INT64: + return num -> BigDecimal.valueOf(((Number) num).longValue(), scale); + case FIXED_LEN_BYTE_ARRAY: + case BINARY: + return bin -> new BigDecimal(new BigInteger(((Binary) bin).getBytes()), scale); + default: + throw new IllegalArgumentException( + "Unsupported primitive type for decimal: " + type.getPrimitiveTypeName()); + } + default: + } + } + + switch (type.getPrimitiveTypeName()) { + case FIXED_LEN_BYTE_ARRAY: + case BINARY: + return binary -> ByteBuffer.wrap(((Binary) binary).getBytes()); + case INT96: + return binary -> + ParquetUtil.extractTimestampInt96( + ByteBuffer.wrap(((Binary) binary).getBytes()).order(ByteOrder.LITTLE_ENDIAN)); + default: + } + + return obj -> obj; + } +} diff --git a/pom.xml b/pom.xml index 9c0d5cea..cf06f5d0 100644 --- a/pom.xml +++ b/pom.xml @@ -192,6 +192,7 @@ src/main/java/**/Route.java src/main/java/**/iceberg/io/internal/*.java src/main/java/**/iceberg/io/SchemeFileIO.java + src/main/java/org/apache/iceberg/**/*.java true From 818a164b4f47adf4c2e4ad989d2db43d840aca63 Mon Sep 17 00:00:00 2001 From: Stanley Shyiko Date: Wed, 28 May 2025 13:51:08 -0700 Subject: [PATCH 2/3] Fix uint32 handling Caused by: java.lang.ClassCastException: class java.lang.Integer cannot be cast to class java.lang.Long (java.lang.Integer and java.lang.Long are in module java.base of loader 'bootstrap') at org.apache.iceberg.parquet.ParquetConversions.fromParquetPrimitive(ParquetConversions.java:48) at org.apache.iceberg.parquet.ParquetUtil.footerMetrics(ParquetUtil.java:145) at org.apache.iceberg.parquet.ParquetUtil.footerMetrics(ParquetUtil.java:89) --- .../java/org/apache/iceberg/parquet/ParquetConversions.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/ice/src/main/java/org/apache/iceberg/parquet/ParquetConversions.java b/ice/src/main/java/org/apache/iceberg/parquet/ParquetConversions.java index 0f9878d2..ed5ee35b 100644 --- a/ice/src/main/java/org/apache/iceberg/parquet/ParquetConversions.java +++ b/ice/src/main/java/org/apache/iceberg/parquet/ParquetConversions.java @@ -45,6 +45,9 @@ static Literal fromParquetPrimitive(Type type, PrimitiveType parquetType, case LONG: case TIME: case TIMESTAMP: + if (parquetType.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.INT32) { // uint32 + return (Literal) Literal.of(Long.valueOf((Integer) value)); + } return (Literal) Literal.of((Long) value); case FLOAT: return (Literal) Literal.of((Float) value); From aaee83ed2b7151ae62d67c3330308f793540c28a Mon Sep 17 00:00:00 2001 From: Stanley Shyiko Date: Wed, 28 May 2025 13:59:31 -0700 Subject: [PATCH 3/3] Convert uint64 to int64 (lossy) Caused by: java.lang.IllegalArgumentException: Cannot use uint64: not a supported Java type at org.apache.iceberg.relocated.com.google.common.base.Preconditions.checkArgument(Preconditions.java:143) at org.apache.iceberg.parquet.MessageTypeToType$ParquetLogicalTypeVisitor.visit(MessageTypeToType.java:224) at org.apache.parquet.schema.LogicalTypeAnnotation$IntLogicalTypeAnnotation.accept(LogicalTypeAnnotation.java:800) at org.apache.iceberg.parquet.MessageTypeToType.primitive(MessageTypeToType.java:153) at org.apache.iceberg.parquet.MessageTypeToType.primitive(MessageTypeToType.java:46) at org.apache.iceberg.parquet.ParquetTypeVisitor.visit(ParquetTypeVisitor.java:39) at org.apache.iceberg.parquet.ParquetTypeVisitor.visitFields(ParquetTypeVisitor.java:182) at org.apache.iceberg.parquet.ParquetTypeVisitor.visit(ParquetTypeVisitor.java:36) at org.apache.iceberg.parquet.ParquetSchemaUtil.convertInternal(ParquetSchemaUtil.java:74) at org.apache.iceberg.parquet.ParquetSchemaUtil.convert(ParquetSchemaUtil.java:57) --- .../apache/iceberg/data/parquet/BaseParquetReaders.java | 7 ++++--- .../apache/iceberg/data/parquet/BaseParquetWriter.java | 8 +++++--- .../org/apache/iceberg/parquet/MessageTypeToType.java | 8 +++++--- 3 files changed, 14 insertions(+), 9 deletions(-) diff --git a/ice/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java b/ice/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java index 70e6b3ff..7f237b72 100644 --- a/ice/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java +++ b/ice/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java @@ -214,9 +214,10 @@ public Optional> visit( @Override public Optional> visit(IntLogicalTypeAnnotation intLogicalType) { if (intLogicalType.getBitWidth() == 64) { - Preconditions.checkArgument( - intLogicalType.isSigned(), "Cannot read UINT64 as a long value"); - + /* + Preconditions.checkArgument( + intLogicalType.isSigned(), "Cannot read UINT64 as a long value"); + */ return Optional.of(new ParquetValueReaders.UnboxedReader<>(desc)); } diff --git a/ice/src/main/java/org/apache/iceberg/data/parquet/BaseParquetWriter.java b/ice/src/main/java/org/apache/iceberg/data/parquet/BaseParquetWriter.java index 1bfa90b2..8199f698 100644 --- a/ice/src/main/java/org/apache/iceberg/data/parquet/BaseParquetWriter.java +++ b/ice/src/main/java/org/apache/iceberg/data/parquet/BaseParquetWriter.java @@ -233,9 +233,11 @@ public Optional> visit( @Override public Optional> visit( LogicalTypeAnnotation.IntLogicalTypeAnnotation intType) { - Preconditions.checkArgument( - intType.isSigned() || intType.getBitWidth() < 64, - "Cannot read uint64: not a supported Java type"); + /* + Preconditions.checkArgument( + intType.isSigned() || intType.getBitWidth() < 64, + "Cannot read uint64: not a supported Java type"); + */ if (intType.getBitWidth() < 64) { return Optional.of(ParquetValueWriters.ints(desc)); } else { diff --git a/ice/src/main/java/org/apache/iceberg/parquet/MessageTypeToType.java b/ice/src/main/java/org/apache/iceberg/parquet/MessageTypeToType.java index 26ef6e46..305ed9a4 100644 --- a/ice/src/main/java/org/apache/iceberg/parquet/MessageTypeToType.java +++ b/ice/src/main/java/org/apache/iceberg/parquet/MessageTypeToType.java @@ -221,9 +221,11 @@ public Optional visit( @Override public Optional visit(LogicalTypeAnnotation.IntLogicalTypeAnnotation intType) { - Preconditions.checkArgument( - intType.isSigned() || intType.getBitWidth() < 64, - "Cannot use uint64: not a supported Java type"); + /* + Preconditions.checkArgument( + intType.isSigned() || intType.getBitWidth() < 64, + "Cannot use uint64: not a supported Java type"); + */ if (intType.getBitWidth() < 32) { return Optional.of(Types.IntegerType.get()); } else if (intType.getBitWidth() == 32 && intType.isSigned()) {