diff --git a/api/src/main/java/org/apache/iceberg/types/TypeUtil.java b/api/src/main/java/org/apache/iceberg/types/TypeUtil.java index 4892696ab450..91a922063fe3 100644 --- a/api/src/main/java/org/apache/iceberg/types/TypeUtil.java +++ b/api/src/main/java/org/apache/iceberg/types/TypeUtil.java @@ -395,6 +395,10 @@ public static Type find(Schema schema, Predicate predicate) { return visit(schema, new FindTypeVisitor(predicate)); } + public static Type find(Type type, Predicate predicate) { + return visit(type, new FindTypeVisitor(predicate)); + } + public static boolean isPromotionAllowed(Type from, Type.PrimitiveType to) { // Warning! Before changing this function, make sure that the type change doesn't introduce // compatibility problems in partitioning. diff --git a/api/src/main/java/org/apache/iceberg/util/DateTimeUtil.java b/api/src/main/java/org/apache/iceberg/util/DateTimeUtil.java index e26e7098cb22..854e41c6e7d8 100644 --- a/api/src/main/java/org/apache/iceberg/util/DateTimeUtil.java +++ b/api/src/main/java/org/apache/iceberg/util/DateTimeUtil.java @@ -105,6 +105,10 @@ public static OffsetDateTime timestamptzFromMicros(long microsFromEpoch) { return ChronoUnit.MICROS.addTo(EPOCH, microsFromEpoch); } + public static OffsetDateTime timestamptzFromNanos(long nanosFromEpoch) { + return ChronoUnit.NANOS.addTo(EPOCH, nanosFromEpoch); + } + public static long microsFromTimestamptz(OffsetDateTime dateTime) { return ChronoUnit.MICROS.between(EPOCH, dateTime); } diff --git a/api/src/test/java/org/apache/iceberg/util/RandomUtil.java b/api/src/test/java/org/apache/iceberg/util/RandomUtil.java index 5f3fc370e4c0..6b74333bc42f 100644 --- a/api/src/test/java/org/apache/iceberg/util/RandomUtil.java +++ b/api/src/test/java/org/apache/iceberg/util/RandomUtil.java @@ -45,6 +45,9 @@ public static Object generatePrimitive(Type.PrimitiveType primitive, Random rand int choice = random.nextInt(20); switch (primitive.typeId()) { + case UNKNOWN: + return null; + case BOOLEAN: return choice < 10; @@ -126,6 +129,9 @@ public static Object generatePrimitive(Type.PrimitiveType primitive, Random rand case TIMESTAMP: return random.nextLong() % FIFTY_YEARS_IN_MICROS; + case TIMESTAMP_NANO: + return random.nextLong() % ABOUT_TEN_YEARS_IN_NANOS; + case STRING: return randomString(random); @@ -161,6 +167,8 @@ public static Object generateDictionaryEncodablePrimitive( Type.PrimitiveType primitive, Random random) { int value = random.nextInt(3); switch (primitive.typeId()) { + case UNKNOWN: + return null; case BOOLEAN: return true; // doesn't really matter for booleans since they are not dictionary encoded case INTEGER: @@ -201,6 +209,7 @@ public static Object generateDictionaryEncodablePrimitive( private static final long FIFTY_YEARS_IN_MICROS = (50L * (365 * 3 + 366) * 24 * 60 * 60 * 1_000_000) / 4; + private static final long ABOUT_TEN_YEARS_IN_NANOS = 10L * 365 * 24 * 60 * 60 * 1_000_000_000; private static final int ABOUT_380_YEARS_IN_DAYS = 380 * 365; private static final long ONE_DAY_IN_MICROS = 24 * 60 * 60 * 1_000_000L; private static final String CHARS = diff --git a/core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java b/core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java index cdc50959ac8a..e5aef3605ea5 100644 --- a/core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java +++ b/core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java @@ -154,7 +154,8 @@ public static Schema applyNameMapping(Schema fileSchema, NameMapping nameMapping public static boolean isTimestamptz(Schema schema) { LogicalType logicalType = schema.getLogicalType(); if (logicalType instanceof LogicalTypes.TimestampMillis - || logicalType instanceof LogicalTypes.TimestampMicros) { + || logicalType instanceof LogicalTypes.TimestampMicros + || logicalType instanceof LogicalTypes.TimestampNanos) { // timestamptz is adjusted to UTC Object value = schema.getObjectProp(ADJUST_TO_UTC_PROP); @@ -172,6 +173,10 @@ public static boolean isTimestamptz(Schema schema) { return false; } + public static boolean isOptional(Schema schema) { + return isOptionSchema(schema) || schema.getType() == Schema.Type.NULL; + } + public static boolean isOptionSchema(Schema schema) { if (schema.getType() == UNION && schema.getTypes().size() == 2) { if (schema.getTypes().get(0).getType() == Schema.Type.NULL) { @@ -184,12 +189,15 @@ public static boolean isOptionSchema(Schema schema) { } static Schema toOption(Schema schema) { - if (schema.getType() == UNION) { - Preconditions.checkArgument( - isOptionSchema(schema), "Union schemas are not supported: %s", schema); - return schema; - } else { - return Schema.createUnion(NULL, schema); + switch (schema.getType()) { + case UNION: + Preconditions.checkArgument( + isOptionSchema(schema), "Union schemas are not supported: %s", schema); + return schema; + case NULL: + return schema; + default: + return Schema.createUnion(NULL, schema); } } diff --git a/core/src/main/java/org/apache/iceberg/avro/AvroWithPartnerByStructureVisitor.java b/core/src/main/java/org/apache/iceberg/avro/AvroWithPartnerByStructureVisitor.java index 126a6373d210..87917bd76417 100644 --- a/core/src/main/java/org/apache/iceberg/avro/AvroWithPartnerByStructureVisitor.java +++ b/core/src/main/java/org/apache/iceberg/avro/AvroWithPartnerByStructureVisitor.java @@ -23,6 +23,7 @@ import org.apache.avro.Schema; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Type; import org.apache.iceberg.util.Pair; /** @@ -103,16 +104,19 @@ private static T visitUnion( } } } else { - boolean encounteredNull = false; + boolean encounteredNullWithoutUnknown = false; for (int i = 0; i < types.size(); i++) { // For a union-type (a, b, NULL, c) and the corresponding struct type (tag, a, b, c), the // types match according to the following pattern: // Before NULL, branch type i in the union maps to struct field i + 1. // After NULL, branch type i in the union maps to struct field i. - int structFieldIndex = encounteredNull ? i : i + 1; + int structFieldIndex = encounteredNullWithoutUnknown ? i : i + 1; if (types.get(i).getType() == Schema.Type.NULL) { visit(visitor.nullType(), types.get(i), visitor); - encounteredNull = true; + Pair nameAndType = visitor.fieldNameAndType(type, structFieldIndex); + if (((Type) nameAndType.second()).typeId() != Type.TypeID.UNKNOWN) { + encounteredNullWithoutUnknown = true; + } } else { options.add( visit( diff --git a/core/src/main/java/org/apache/iceberg/avro/BaseWriteBuilder.java b/core/src/main/java/org/apache/iceberg/avro/BaseWriteBuilder.java index f8a2bc604656..3955bd83a97a 100644 --- a/core/src/main/java/org/apache/iceberg/avro/BaseWriteBuilder.java +++ b/core/src/main/java/org/apache/iceberg/avro/BaseWriteBuilder.java @@ -78,6 +78,9 @@ public ValueWriter primitive(Schema primitive) { case "timestamp-micros": return ValueWriters.longs(); + case "timestamp-nanos": + return ValueWriters.longs(); + case "decimal": LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) logicalType; return ValueWriters.decimal(decimal.getPrecision(), decimal.getScale()); diff --git a/core/src/main/java/org/apache/iceberg/avro/GenericAvroReader.java b/core/src/main/java/org/apache/iceberg/avro/GenericAvroReader.java index bfdb65acf1c2..0ed342b458d5 100644 --- a/core/src/main/java/org/apache/iceberg/avro/GenericAvroReader.java +++ b/core/src/main/java/org/apache/iceberg/avro/GenericAvroReader.java @@ -181,7 +181,8 @@ public ValueReader primitive(Type partner, Schema primitive) { return (ValueReader) (decoder, ignored) -> longs.read(decoder, null) * 1000L; case "timestamp-micros": - // Spark uses the same representation + case "timestamp-nanos": + // both are handled in memory as long values, using the type to track units return ValueReaders.longs(); case "decimal": diff --git a/core/src/main/java/org/apache/iceberg/avro/InternalReader.java b/core/src/main/java/org/apache/iceberg/avro/InternalReader.java index e7bf0b7553d8..18ec485f083d 100644 --- a/core/src/main/java/org/apache/iceberg/avro/InternalReader.java +++ b/core/src/main/java/org/apache/iceberg/avro/InternalReader.java @@ -177,6 +177,8 @@ public ValueReader primitive(Pair partner, Schema primitive) { return (ValueReader) (decoder, ignored) -> longs.read(decoder, null) * 1000L; case "timestamp-micros": + case "timestamp-nanos": + // both are handled in memory as long values, using the type to track units return ValueReaders.longs(); case "decimal": diff --git a/core/src/main/java/org/apache/iceberg/avro/SchemaToType.java b/core/src/main/java/org/apache/iceberg/avro/SchemaToType.java index 444acc670e51..2ffd658f447e 100644 --- a/core/src/main/java/org/apache/iceberg/avro/SchemaToType.java +++ b/core/src/main/java/org/apache/iceberg/avro/SchemaToType.java @@ -92,7 +92,7 @@ public Type record(Schema record, List names, List fieldTypes) { Type fieldType = fieldTypes.get(i); int fieldId = getId(field); - if (AvroSchemaUtil.isOptionSchema(field.schema())) { + if (AvroSchemaUtil.isOptional(field.schema())) { newFields.add(Types.NestedField.optional(fieldId, field.name(), fieldType, field.doc())); } else { newFields.add(Types.NestedField.required(fieldId, field.name(), fieldType, field.doc())); @@ -105,7 +105,7 @@ public Type record(Schema record, List names, List fieldTypes) { @Override public Type union(Schema union, List options) { if (AvroSchemaUtil.isOptionSchema(union)) { - if (options.get(0) == null) { + if (union.getTypes().get(0).getType() == Schema.Type.NULL) { return options.get(1); } else { return options.get(0); @@ -165,7 +165,7 @@ public Type map(Schema map, Type valueType) { int keyId = getKeyId(map); int valueId = getValueId(map); - if (AvroSchemaUtil.isOptionSchema(valueSchema)) { + if (AvroSchemaUtil.isOptional(valueSchema)) { return Types.MapType.ofOptional(keyId, valueId, Types.StringType.get(), valueType); } else { return Types.MapType.ofRequired(keyId, valueId, Types.StringType.get(), valueType); @@ -177,34 +177,50 @@ public Type variant(Schema variant, Type metadataType, Type valueType) { return Types.VariantType.get(); } + public Type logicalType(Schema primitive, LogicalType logical) { + String name = logical.getName(); + if (logical instanceof LogicalTypes.Decimal) { + return Types.DecimalType.of( + ((LogicalTypes.Decimal) logical).getPrecision(), + ((LogicalTypes.Decimal) logical).getScale()); + + } else if (logical instanceof LogicalTypes.Date) { + return Types.DateType.get(); + + } else if (logical instanceof LogicalTypes.TimeMillis + || logical instanceof LogicalTypes.TimeMicros) { + return Types.TimeType.get(); + + } else if (logical instanceof LogicalTypes.TimestampMillis + || logical instanceof LogicalTypes.TimestampMicros) { + if (AvroSchemaUtil.isTimestamptz(primitive)) { + return Types.TimestampType.withZone(); + } else { + return Types.TimestampType.withoutZone(); + } + + } else if (logical instanceof LogicalTypes.TimestampNanos) { + if (AvroSchemaUtil.isTimestamptz(primitive)) { + return Types.TimestampNanoType.withZone(); + } else { + return Types.TimestampNanoType.withoutZone(); + } + + } else if (LogicalTypes.uuid().getName().equals(name)) { + return Types.UUIDType.get(); + } + + return null; + } + @Override public Type primitive(Schema primitive) { // first check supported logical types LogicalType logical = primitive.getLogicalType(); if (logical != null) { - String name = logical.getName(); - if (logical instanceof LogicalTypes.Decimal) { - return Types.DecimalType.of( - ((LogicalTypes.Decimal) logical).getPrecision(), - ((LogicalTypes.Decimal) logical).getScale()); - - } else if (logical instanceof LogicalTypes.Date) { - return Types.DateType.get(); - - } else if (logical instanceof LogicalTypes.TimeMillis - || logical instanceof LogicalTypes.TimeMicros) { - return Types.TimeType.get(); - - } else if (logical instanceof LogicalTypes.TimestampMillis - || logical instanceof LogicalTypes.TimestampMicros) { - if (AvroSchemaUtil.isTimestamptz(primitive)) { - return Types.TimestampType.withZone(); - } else { - return Types.TimestampType.withoutZone(); - } - - } else if (LogicalTypes.uuid().getName().equals(name)) { - return Types.UUIDType.get(); + Type result = logicalType(primitive, logical); + if (result != null) { + return result; } } @@ -227,7 +243,7 @@ public Type primitive(Schema primitive) { case BYTES: return Types.BinaryType.get(); case NULL: - return null; + return Types.UnknownType.get(); } throw new UnsupportedOperationException("Unsupported primitive type: " + primitive); diff --git a/core/src/main/java/org/apache/iceberg/avro/TypeToSchema.java b/core/src/main/java/org/apache/iceberg/avro/TypeToSchema.java index 4fcbcef16fd4..4442695b7a44 100644 --- a/core/src/main/java/org/apache/iceberg/avro/TypeToSchema.java +++ b/core/src/main/java/org/apache/iceberg/avro/TypeToSchema.java @@ -32,6 +32,7 @@ import org.apache.iceberg.types.Types; abstract class TypeToSchema extends TypeUtil.SchemaVisitor { + private static final Schema NULL_SCHEMA = Schema.create(Schema.Type.NULL); private static final Schema BOOLEAN_SCHEMA = Schema.create(Schema.Type.BOOLEAN); private static final Schema INTEGER_SCHEMA = Schema.create(Schema.Type.INT); private static final Schema LONG_SCHEMA = Schema.create(Schema.Type.LONG); @@ -45,6 +46,10 @@ abstract class TypeToSchema extends TypeUtil.SchemaVisitor { LogicalTypes.timestampMicros().addToSchema(Schema.create(Schema.Type.LONG)); private static final Schema TIMESTAMPTZ_SCHEMA = LogicalTypes.timestampMicros().addToSchema(Schema.create(Schema.Type.LONG)); + private static final Schema TIMESTAMP_NANO_SCHEMA = + LogicalTypes.timestampNanos().addToSchema(Schema.create(Schema.Type.LONG)); + private static final Schema TIMESTAMPTZ_NANO_SCHEMA = + LogicalTypes.timestampNanos().addToSchema(Schema.create(Schema.Type.LONG)); private static final Schema STRING_SCHEMA = Schema.create(Schema.Type.STRING); private static final Schema UUID_SCHEMA = LogicalTypes.uuid().addToSchema(Schema.createFixed("uuid_fixed", null, null, 16)); @@ -53,6 +58,8 @@ abstract class TypeToSchema extends TypeUtil.SchemaVisitor { static { TIMESTAMP_SCHEMA.addProp(AvroSchemaUtil.ADJUST_TO_UTC_PROP, false); TIMESTAMPTZ_SCHEMA.addProp(AvroSchemaUtil.ADJUST_TO_UTC_PROP, true); + TIMESTAMP_NANO_SCHEMA.addProp(AvroSchemaUtil.ADJUST_TO_UTC_PROP, false); + TIMESTAMPTZ_NANO_SCHEMA.addProp(AvroSchemaUtil.ADJUST_TO_UTC_PROP, true); } private final Deque fieldIds = Lists.newLinkedList(); @@ -206,6 +213,9 @@ public Schema variant(Types.VariantType variant) { public Schema primitive(Type.PrimitiveType primitive) { Schema primitiveSchema; switch (primitive.typeId()) { + case UNKNOWN: + primitiveSchema = NULL_SCHEMA; + break; case BOOLEAN: primitiveSchema = BOOLEAN_SCHEMA; break; @@ -234,6 +244,13 @@ public Schema primitive(Type.PrimitiveType primitive) { primitiveSchema = TIMESTAMP_SCHEMA; } break; + case TIMESTAMP_NANO: + if (((Types.TimestampNanoType) primitive).shouldAdjustToUTC()) { + primitiveSchema = TIMESTAMPTZ_NANO_SCHEMA; + } else { + primitiveSchema = TIMESTAMP_NANO_SCHEMA; + } + break; case STRING: primitiveSchema = STRING_SCHEMA; break; diff --git a/core/src/main/java/org/apache/iceberg/data/avro/DataReader.java b/core/src/main/java/org/apache/iceberg/data/avro/DataReader.java index dbb1df055035..75929dfde4c4 100644 --- a/core/src/main/java/org/apache/iceberg/data/avro/DataReader.java +++ b/core/src/main/java/org/apache/iceberg/data/avro/DataReader.java @@ -140,6 +140,12 @@ public ValueReader primitive(Type.PrimitiveType ignored, Schema primitive) { } return GenericReaders.timestamps(); + case "timestamp-nanos": + if (AvroSchemaUtil.isTimestamptz(primitive)) { + return GenericReaders.timestamptzNanos(); + } + return GenericReaders.timestampNanos(); + case "decimal": return ValueReaders.decimal( ValueReaders.decimalBytesReader(primitive), diff --git a/core/src/main/java/org/apache/iceberg/data/avro/DataWriter.java b/core/src/main/java/org/apache/iceberg/data/avro/DataWriter.java index 4d6973d3cfe3..46ac3e07ea15 100644 --- a/core/src/main/java/org/apache/iceberg/data/avro/DataWriter.java +++ b/core/src/main/java/org/apache/iceberg/data/avro/DataWriter.java @@ -118,6 +118,12 @@ public ValueWriter primitive(Schema primitive) { } return GenericWriters.timestamps(); + case "timestamp-nanos": + if (AvroSchemaUtil.isTimestamptz(primitive)) { + return GenericWriters.timestamptzNanos(); + } + return GenericWriters.timestampNanos(); + case "decimal": LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) logicalType; return ValueWriters.decimal(decimal.getPrecision(), decimal.getScale()); diff --git a/core/src/main/java/org/apache/iceberg/data/avro/GenericReaders.java b/core/src/main/java/org/apache/iceberg/data/avro/GenericReaders.java index b07ab5d18681..d072a26820ab 100644 --- a/core/src/main/java/org/apache/iceberg/data/avro/GenericReaders.java +++ b/core/src/main/java/org/apache/iceberg/data/avro/GenericReaders.java @@ -53,6 +53,14 @@ static ValueReader timestamptz() { return TimestamptzReader.INSTANCE; } + static ValueReader timestampNanos() { + return TimestampNanoReader.INSTANCE; + } + + static ValueReader timestamptzNanos() { + return TimestamptzNanoReader.INSTANCE; + } + static ValueReader struct( List>> readPlan, StructType struct) { return new PlannedRecordReader(readPlan, struct); @@ -107,6 +115,28 @@ public OffsetDateTime read(Decoder decoder, Object reuse) throws IOException { } } + private static class TimestampNanoReader implements ValueReader { + private static final TimestampNanoReader INSTANCE = new TimestampNanoReader(); + + private TimestampNanoReader() {} + + @Override + public LocalDateTime read(Decoder decoder, Object reuse) throws IOException { + return DateTimeUtil.timestampFromNanos(decoder.readLong()); + } + } + + private static class TimestamptzNanoReader implements ValueReader { + private static final TimestamptzNanoReader INSTANCE = new TimestamptzNanoReader(); + + private TimestamptzNanoReader() {} + + @Override + public OffsetDateTime read(Decoder decoder, Object reuse) throws IOException { + return DateTimeUtil.timestamptzFromNanos(decoder.readLong()); + } + } + private static class PlannedRecordReader extends ValueReaders.PlannedStructReader { private final StructType structType; diff --git a/core/src/main/java/org/apache/iceberg/data/avro/GenericWriters.java b/core/src/main/java/org/apache/iceberg/data/avro/GenericWriters.java index 1cea012e7a37..6ba5e7ded44b 100644 --- a/core/src/main/java/org/apache/iceberg/data/avro/GenericWriters.java +++ b/core/src/main/java/org/apache/iceberg/data/avro/GenericWriters.java @@ -31,6 +31,7 @@ import org.apache.iceberg.avro.ValueWriter; import org.apache.iceberg.avro.ValueWriters; import org.apache.iceberg.data.Record; +import org.apache.iceberg.util.DateTimeUtil; class GenericWriters { private GenericWriters() {} @@ -51,6 +52,14 @@ static ValueWriter timestamptz() { return TimestamptzWriter.INSTANCE; } + static ValueWriter timestampNanos() { + return TimestampNanoWriter.INSTANCE; + } + + static ValueWriter timestamptzNanos() { + return TimestamptzNanoWriter.INSTANCE; + } + static ValueWriter struct(List> writers) { return new GenericRecordWriter(writers); } @@ -87,7 +96,7 @@ private TimestampWriter() {} @Override public void write(LocalDateTime timestamp, Encoder encoder) throws IOException { - encoder.writeLong(ChronoUnit.MICROS.between(EPOCH, timestamp.atOffset(ZoneOffset.UTC))); + encoder.writeLong(DateTimeUtil.microsFromTimestamp(timestamp)); } } @@ -98,7 +107,29 @@ private TimestamptzWriter() {} @Override public void write(OffsetDateTime timestamptz, Encoder encoder) throws IOException { - encoder.writeLong(ChronoUnit.MICROS.between(EPOCH, timestamptz)); + encoder.writeLong(DateTimeUtil.microsFromTimestamptz(timestamptz)); + } + } + + private static class TimestampNanoWriter implements ValueWriter { + private static final TimestampNanoWriter INSTANCE = new TimestampNanoWriter(); + + private TimestampNanoWriter() {} + + @Override + public void write(LocalDateTime timestamp, Encoder encoder) throws IOException { + encoder.writeLong(DateTimeUtil.nanosFromTimestamp(timestamp)); + } + } + + private static class TimestamptzNanoWriter implements ValueWriter { + private static final TimestamptzNanoWriter INSTANCE = new TimestamptzNanoWriter(); + + private TimestamptzNanoWriter() {} + + @Override + public void write(OffsetDateTime timestamptz, Encoder encoder) throws IOException { + encoder.writeLong(DateTimeUtil.nanosFromTimestamptz(timestamptz)); } } diff --git a/core/src/main/java/org/apache/iceberg/data/avro/PlannedDataReader.java b/core/src/main/java/org/apache/iceberg/data/avro/PlannedDataReader.java index 64b3e943e270..3e401023464b 100644 --- a/core/src/main/java/org/apache/iceberg/data/avro/PlannedDataReader.java +++ b/core/src/main/java/org/apache/iceberg/data/avro/PlannedDataReader.java @@ -142,6 +142,12 @@ public ValueReader primitive(Type partner, Schema primitive) { } return GenericReaders.timestamps(); + case "timestamp-nanos": + if (AvroSchemaUtil.isTimestamptz(primitive)) { + return GenericReaders.timestamptzNanos(); + } + return GenericReaders.timestampNanos(); + case "decimal": return ValueReaders.decimal( ValueReaders.decimalBytesReader(primitive), diff --git a/core/src/test/java/org/apache/iceberg/InternalTestHelpers.java b/core/src/test/java/org/apache/iceberg/InternalTestHelpers.java index 7b7400b69ab4..3894298b7fc4 100644 --- a/core/src/test/java/org/apache/iceberg/InternalTestHelpers.java +++ b/core/src/test/java/org/apache/iceberg/InternalTestHelpers.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.Map; +import org.apache.iceberg.data.Record; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; @@ -29,15 +30,16 @@ public class InternalTestHelpers { private InternalTestHelpers() {} - public static void assertEquals(Types.StructType struct, StructLike expected, StructLike actual) { - List fields = struct.fields(); - for (int i = 0; i < fields.size(); i += 1) { - Type fieldType = fields.get(i).type(); - - Object expectedValue = expected.get(i, fieldType.typeId().javaClass()); - Object actualValue = actual.get(i, fieldType.typeId().javaClass()); - - assertEquals(fieldType, expectedValue, actualValue); + public static void assertEquals(Types.StructType struct, Record expected, Record actual) { + Types.StructType expectedType = expected.struct(); + for (Types.NestedField field : struct.fields()) { + Types.NestedField expectedField = expectedType.field(field.fieldId()); + if (expectedField != null) { + assertEquals( + field.type(), expected.getField(expectedField.name()), actual.getField(field.name())); + } else { + assertEquals(field.type(), field.initialDefault(), actual.getField(field.name())); + } } } @@ -82,6 +84,7 @@ private static void assertEquals(Type type, Object expected, Object actual) { case DATE: case TIME: case TIMESTAMP: + case TIMESTAMP_NANO: case UUID: case FIXED: case BINARY: @@ -91,7 +94,7 @@ private static void assertEquals(Type type, Object expected, Object actual) { case STRUCT: assertThat(expected).as("Expected should be a StructLike").isInstanceOf(StructLike.class); assertThat(actual).as("Actual should be a StructLike").isInstanceOf(StructLike.class); - assertEquals(type.asStructType(), (StructLike) expected, (StructLike) actual); + assertEquals(type.asStructType(), (Record) expected, (Record) actual); break; case LIST: assertThat(expected).as("Expected should be a List").isInstanceOf(List.class); diff --git a/core/src/test/java/org/apache/iceberg/RandomInternalData.java b/core/src/test/java/org/apache/iceberg/RandomInternalData.java index a7de8e4c8f01..d282fa443caa 100644 --- a/core/src/test/java/org/apache/iceberg/RandomInternalData.java +++ b/core/src/test/java/org/apache/iceberg/RandomInternalData.java @@ -24,6 +24,7 @@ import java.util.UUID; import java.util.function.Supplier; import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.TypeUtil; @@ -34,11 +35,11 @@ public class RandomInternalData { private RandomInternalData() {} - public static List generate(Schema schema, int numRecords, long seed) { + public static List generate(Schema schema, int numRecords, long seed) { RandomDataGenerator generator = new RandomDataGenerator(seed); - List records = Lists.newArrayListWithExpectedSize(numRecords); + List records = Lists.newArrayListWithExpectedSize(numRecords); for (int i = 0; i < numRecords; i += 1) { - records.add((StructLike) TypeUtil.visit(schema, generator)); + records.add((Record) TypeUtil.visit(schema, generator)); } return records; @@ -52,13 +53,13 @@ private RandomDataGenerator(long seed) { } @Override - public StructLike schema(Schema schema, Supplier structResult) { - return (StructLike) structResult.get(); + public Record schema(Schema schema, Supplier structResult) { + return (Record) structResult.get(); } @Override - public StructLike struct(Types.StructType struct, Iterable fieldResults) { - StructLike rec = GenericRecord.create(struct); + public Record struct(Types.StructType struct, Iterable fieldResults) { + Record rec = GenericRecord.create(struct); List values = Lists.newArrayList(fieldResults); for (int i = 0; i < values.size(); i += 1) { rec.set(i, values.get(i)); diff --git a/core/src/test/java/org/apache/iceberg/TestSchemaParser.java b/core/src/test/java/org/apache/iceberg/TestSchemaParser.java index acdba85adf55..cf6b03ee1417 100644 --- a/core/src/test/java/org/apache/iceberg/TestSchemaParser.java +++ b/core/src/test/java/org/apache/iceberg/TestSchemaParser.java @@ -27,7 +27,7 @@ import java.nio.ByteBuffer; import java.util.UUID; import java.util.stream.Stream; -import org.apache.iceberg.avro.AvroDataTest; +import org.apache.iceberg.data.DataTest; import org.apache.iceberg.expressions.Literal; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Sets; @@ -39,7 +39,22 @@ import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; -public class TestSchemaParser extends AvroDataTest { +public class TestSchemaParser extends DataTest { + @Override + protected boolean supportsUnknown() { + return true; + } + + @Override + protected boolean supportsTimestampNanos() { + return true; + } + + @Override + protected boolean supportsVariant() { + return true; + } + @Override protected void writeAndValidate(Schema schema) throws IOException { Schema serialized = SchemaParser.fromJson(SchemaParser.toJson(schema)); @@ -127,14 +142,4 @@ public void testPrimitiveTypeDefaultValues(Type.PrimitiveType type, Literal d assertThat(serialized.findField("col_with_default").writeDefault()) .isEqualTo(defaultValue.value()); } - - @Test - public void testVariantType() throws IOException { - Schema schema = - new Schema( - Types.NestedField.required(1, "id", Types.IntegerType.get()), - Types.NestedField.optional(2, "data", Types.VariantType.get())); - - writeAndValidate(schema); - } } diff --git a/core/src/test/java/org/apache/iceberg/avro/AvroDataTest.java b/core/src/test/java/org/apache/iceberg/avro/AvroDataTest.java deleted file mode 100644 index e3870f84decd..000000000000 --- a/core/src/test/java/org/apache/iceberg/avro/AvroDataTest.java +++ /dev/null @@ -1,194 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg.avro; - -import static org.apache.iceberg.types.Types.NestedField.optional; -import static org.apache.iceberg.types.Types.NestedField.required; - -import java.io.IOException; -import java.util.concurrent.atomic.AtomicInteger; -import org.apache.iceberg.Schema; -import org.apache.iceberg.types.TypeUtil; -import org.apache.iceberg.types.Types; -import org.apache.iceberg.types.Types.ListType; -import org.apache.iceberg.types.Types.LongType; -import org.apache.iceberg.types.Types.MapType; -import org.apache.iceberg.types.Types.StructType; -import org.junit.jupiter.api.Test; - -public abstract class AvroDataTest { - - protected abstract void writeAndValidate(Schema schema) throws IOException; - - private static final StructType SUPPORTED_PRIMITIVES = - StructType.of( - required(100, "id", LongType.get()), - optional(101, "data", Types.StringType.get()), - required(102, "b", Types.BooleanType.get()), - optional(103, "i", Types.IntegerType.get()), - required(104, "l", LongType.get()), - optional(105, "f", Types.FloatType.get()), - required(106, "d", Types.DoubleType.get()), - optional(107, "date", Types.DateType.get()), - required(108, "ts", Types.TimestampType.withZone()), - required(110, "s", Types.StringType.get()), - required(111, "uuid", Types.UUIDType.get()), - required(112, "fixed", Types.FixedType.ofLength(7)), - optional(113, "bytes", Types.BinaryType.get()), - required(114, "dec_9_0", Types.DecimalType.of(9, 0)), - required(115, "dec_11_2", Types.DecimalType.of(11, 2)), - required(116, "dec_38_10", Types.DecimalType.of(38, 10)), // maximum precision - required(117, "time", Types.TimeType.get())); - - @Test - public void testSimpleStruct() throws IOException { - writeAndValidate(new Schema(SUPPORTED_PRIMITIVES.fields())); - } - - @Test - public void testArray() throws IOException { - Schema schema = - new Schema( - required(0, "id", LongType.get()), - optional(1, "data", ListType.ofOptional(2, Types.StringType.get()))); - - writeAndValidate(schema); - } - - @Test - public void testArrayOfStructs() throws IOException { - Schema schema = - new Schema( - required(0, "id", LongType.get()), - optional(1, "data", ListType.ofOptional(2, SUPPORTED_PRIMITIVES))); - - writeAndValidate(schema); - } - - @Test - public void testMap() throws IOException { - Schema schema = - new Schema( - required(0, "id", LongType.get()), - optional( - 1, - "data", - MapType.ofOptional(2, 3, Types.StringType.get(), Types.StringType.get()))); - - writeAndValidate(schema); - } - - @Test - public void testNumericMapKey() throws IOException { - Schema schema = - new Schema( - required(0, "id", LongType.get()), - optional( - 1, "data", MapType.ofOptional(2, 3, Types.LongType.get(), Types.StringType.get()))); - - writeAndValidate(schema); - } - - @Test - public void testComplexMapKey() throws IOException { - Schema schema = - new Schema( - required(0, "id", LongType.get()), - optional( - 1, - "data", - MapType.ofOptional( - 2, - 3, - Types.StructType.of( - required(4, "i", Types.IntegerType.get()), - optional(5, "s", Types.StringType.get())), - Types.StringType.get()))); - - writeAndValidate(schema); - } - - @Test - public void testMapOfStructs() throws IOException { - Schema schema = - new Schema( - required(0, "id", LongType.get()), - optional( - 1, "data", MapType.ofOptional(2, 3, Types.StringType.get(), SUPPORTED_PRIMITIVES))); - - writeAndValidate(schema); - } - - @Test - public void testMixedTypes() throws IOException { - StructType structType = - StructType.of( - required(0, "id", LongType.get()), - optional( - 1, - "list_of_maps", - ListType.ofOptional( - 2, MapType.ofOptional(3, 4, Types.StringType.get(), SUPPORTED_PRIMITIVES))), - optional( - 5, - "map_of_lists", - MapType.ofOptional( - 6, 7, Types.StringType.get(), ListType.ofOptional(8, SUPPORTED_PRIMITIVES))), - required( - 9, - "list_of_lists", - ListType.ofOptional(10, ListType.ofOptional(11, SUPPORTED_PRIMITIVES))), - required( - 12, - "map_of_maps", - MapType.ofOptional( - 13, - 14, - Types.StringType.get(), - MapType.ofOptional(15, 16, Types.StringType.get(), SUPPORTED_PRIMITIVES))), - required( - 17, - "list_of_struct_of_nested_types", - ListType.ofOptional( - 19, - StructType.of( - Types.NestedField.required( - 20, - "m1", - MapType.ofOptional( - 21, 22, Types.StringType.get(), SUPPORTED_PRIMITIVES)), - Types.NestedField.optional( - 23, "l1", ListType.ofRequired(24, SUPPORTED_PRIMITIVES)), - Types.NestedField.required( - 25, "l2", ListType.ofRequired(26, SUPPORTED_PRIMITIVES)), - Types.NestedField.optional( - 27, - "m2", - MapType.ofOptional( - 28, 29, Types.StringType.get(), SUPPORTED_PRIMITIVES)))))); - - Schema schema = - new Schema( - TypeUtil.assignFreshIds(structType, new AtomicInteger(0)::incrementAndGet) - .asStructType() - .fields()); - - writeAndValidate(schema); - } -} diff --git a/core/src/test/java/org/apache/iceberg/avro/AvroTestHelpers.java b/core/src/test/java/org/apache/iceberg/avro/AvroTestHelpers.java index 3b96f844b537..d4c848c9e600 100644 --- a/core/src/test/java/org/apache/iceberg/avro/AvroTestHelpers.java +++ b/core/src/test/java/org/apache/iceberg/avro/AvroTestHelpers.java @@ -128,6 +128,7 @@ private static void assertEquals(Type type, Object expected, Object actual) { case DATE: case TIME: case TIMESTAMP: + case TIMESTAMP_NANO: case UUID: case FIXED: case BINARY: diff --git a/core/src/test/java/org/apache/iceberg/avro/TestAvroEncoderUtil.java b/core/src/test/java/org/apache/iceberg/avro/TestAvroEncoderUtil.java index 6051caf33e9d..152f3f278763 100644 --- a/core/src/test/java/org/apache/iceberg/avro/TestAvroEncoderUtil.java +++ b/core/src/test/java/org/apache/iceberg/avro/TestAvroEncoderUtil.java @@ -25,9 +25,19 @@ import java.util.Map; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; +import org.apache.iceberg.data.DataTest; import org.apache.iceberg.types.Type; -public class TestAvroEncoderUtil extends AvroDataTest { +public class TestAvroEncoderUtil extends DataTest { + @Override + protected boolean supportsUnknown() { + return true; + } + + @Override + protected boolean supportsTimestampNanos() { + return true; + } @Override protected void writeAndValidate(org.apache.iceberg.Schema schema) throws IOException { diff --git a/core/src/test/java/org/apache/iceberg/avro/TestGenericAvro.java b/core/src/test/java/org/apache/iceberg/avro/TestGenericAvro.java index 38a7e8f7fe3d..d13555748544 100644 --- a/core/src/test/java/org/apache/iceberg/avro/TestGenericAvro.java +++ b/core/src/test/java/org/apache/iceberg/avro/TestGenericAvro.java @@ -22,12 +22,23 @@ import java.util.List; import org.apache.avro.generic.GenericData.Record; import org.apache.iceberg.Schema; +import org.apache.iceberg.data.DataTest; import org.apache.iceberg.inmemory.InMemoryOutputFile; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.relocated.com.google.common.collect.Lists; -public class TestGenericAvro extends AvroDataTest { +public class TestGenericAvro extends DataTest { + @Override + protected boolean supportsUnknown() { + return true; + } + + @Override + protected boolean supportsTimestampNanos() { + return true; + } + @Override protected void writeAndValidate(Schema schema) throws IOException { List expected = RandomAvroData.generate(schema, 100, 0L); diff --git a/core/src/test/java/org/apache/iceberg/avro/TestInternalAvro.java b/core/src/test/java/org/apache/iceberg/avro/TestInternalAvro.java index b48109737f7c..f418e69e0580 100644 --- a/core/src/test/java/org/apache/iceberg/avro/TestInternalAvro.java +++ b/core/src/test/java/org/apache/iceberg/avro/TestInternalAvro.java @@ -24,42 +24,86 @@ import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.RandomInternalData; import org.apache.iceberg.Schema; -import org.apache.iceberg.StructLike; +import org.apache.iceberg.data.DataTest; +import org.apache.iceberg.data.Record; import org.apache.iceberg.inmemory.InMemoryOutputFile; import org.apache.iceberg.io.DataWriter; import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.relocated.com.google.common.collect.Lists; -public class TestInternalAvro extends AvroDataTest { +public class TestInternalAvro extends DataTest { + @Override + protected boolean supportsDefaultValues() { + return true; + } + + @Override + protected boolean supportsUnknown() { + return true; + } + + @Override + protected boolean supportsTimestampNanos() { + return true; + } + @Override protected void writeAndValidate(Schema schema) throws IOException { - List expected = RandomInternalData.generate(schema, 100, 42L); + List expected = RandomInternalData.generate(schema, 100, 42L); + writeAndValidate(schema, expected); + } + + @Override + protected void writeAndValidate(Schema writeSchema, Schema expectedSchema) throws IOException { + List expected = RandomInternalData.generate(writeSchema, 100, 42L); + writeAndValidate(writeSchema, expectedSchema, expected); + } + + @Override + protected void writeAndValidate(Schema schema, List expected) throws IOException { + writeAndValidate(schema, schema, expected); + } + protected void writeAndValidate(Schema writeSchema, Schema expectedSchema, List expected) + throws IOException { OutputFile outputFile = new InMemoryOutputFile(); - try (DataWriter dataWriter = + try (DataWriter dataWriter = Avro.writeData(outputFile) - .schema(schema) + .schema(writeSchema) .createWriterFunc(InternalWriter::create) .overwrite() .withSpec(PartitionSpec.unpartitioned()) .build()) { - for (StructLike rec : expected) { + for (Record rec : expected) { dataWriter.write(rec); } } - List rows; - try (AvroIterable reader = + List rows; + try (AvroIterable reader = Avro.read(outputFile.toInputFile()) - .project(schema) + .project(expectedSchema) .createResolvingReader(InternalReader::create) .build()) { rows = Lists.newArrayList(reader); } for (int i = 0; i < expected.size(); i += 1) { - InternalTestHelpers.assertEquals(schema.asStruct(), expected.get(i), rows.get(i)); + InternalTestHelpers.assertEquals(expectedSchema.asStruct(), expected.get(i), rows.get(i)); + } + + try (AvroIterable reader = + Avro.read(outputFile.toInputFile()) + .project(expectedSchema) + .createResolvingReader(InternalReader::create) + .build()) { + int index = 0; + for (Record actualRecord : reader) { + InternalTestHelpers.assertEquals( + expectedSchema.asStruct(), expected.get(index), actualRecord); + index += 1; + } } } } diff --git a/data/src/test/java/org/apache/iceberg/data/DataTest.java b/core/src/test/java/org/apache/iceberg/data/DataTest.java similarity index 90% rename from data/src/test/java/org/apache/iceberg/data/DataTest.java rename to core/src/test/java/org/apache/iceberg/data/DataTest.java index 0cbe8fd03d03..23ea8d879297 100644 --- a/data/src/test/java/org/apache/iceberg/data/DataTest.java +++ b/core/src/test/java/org/apache/iceberg/data/DataTest.java @@ -47,13 +47,17 @@ import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.FieldSource; import org.junit.jupiter.params.provider.MethodSource; public abstract class DataTest { protected abstract void writeAndValidate(Schema schema) throws IOException; - protected abstract void writeAndValidate(Schema schema, List data) throws IOException; + protected void writeAndValidate(Schema schema, List data) throws IOException { + throw new UnsupportedEncodingException( + "Cannot run test, writeAndValidate(Schema, List) is not implemented"); + } protected void writeAndValidate(Schema writeSchema, Schema expectedSchema) throws IOException { throw new UnsupportedEncodingException( @@ -90,6 +94,63 @@ protected boolean allowsWritingNullValuesForRequiredFields() { @TempDir protected Path temp; + private static final Type[] SIMPLE_TYPES = + new Type[] { + Types.UnknownType.get(), + Types.BooleanType.get(), + Types.IntegerType.get(), + LongType.get(), + Types.FloatType.get(), + Types.DoubleType.get(), + Types.DateType.get(), + Types.TimeType.get(), + Types.TimestampType.withZone(), + Types.TimestampType.withoutZone(), + Types.TimestampNanoType.withZone(), + Types.TimestampNanoType.withoutZone(), + Types.StringType.get(), + Types.FixedType.ofLength(7), + Types.BinaryType.get(), + Types.DecimalType.of(9, 0), + Types.DecimalType.of(11, 2), + Types.DecimalType.of(38, 10), + Types.VariantType.get(), + }; + + protected boolean supportsUnknown() { + return false; + } + + protected boolean supportsTimestampNanos() { + return false; + } + + protected boolean supportsVariant() { + return false; + } + + @ParameterizedTest + @FieldSource("SIMPLE_TYPES") + public void testTypeSchema(Type type) throws IOException { + Assumptions.assumeThat( + supportsUnknown() + || TypeUtil.find(type, t -> t.typeId() == Type.TypeID.UNKNOWN) == null) + .as("unknown is not yet implemented") + .isTrue(); + Assumptions.assumeThat( + supportsTimestampNanos() + || TypeUtil.find(type, t -> t.typeId() == Type.TypeID.TIMESTAMP_NANO) == null) + .as("timestamp_ns is not yet implemented") + .isTrue(); + Assumptions.assumeThat( + supportsVariant() + || TypeUtil.find(type, t -> t.typeId() == Type.TypeID.VARIANT) == null) + .as("variant is not yet implemented") + .isTrue(); + + writeAndValidate(new Schema(required(1, "id", LongType.get()), optional(2, "test_type", type))); + } + @Test public void testSimpleStruct() throws IOException { writeAndValidate(new Schema(SUPPORTED_PRIMITIVES.fields())); diff --git a/data/src/test/java/org/apache/iceberg/data/DataTestHelpers.java b/data/src/test/java/org/apache/iceberg/data/DataTestHelpers.java index f8536dfd01c5..990864f32980 100644 --- a/data/src/test/java/org/apache/iceberg/data/DataTestHelpers.java +++ b/data/src/test/java/org/apache/iceberg/data/DataTestHelpers.java @@ -76,6 +76,7 @@ private static void assertEquals(Type type, Object expected, Object actual) { } switch (type.typeId()) { + case UNKNOWN: case BOOLEAN: case INTEGER: case LONG: @@ -85,6 +86,7 @@ private static void assertEquals(Type type, Object expected, Object actual) { case DATE: case TIME: case TIMESTAMP: + case TIMESTAMP_NANO: case UUID: case BINARY: case DECIMAL: @@ -94,7 +96,7 @@ private static void assertEquals(Type type, Object expected, Object actual) { break; case FIXED: assertThat(expected).as("Expected should be a byte[]").isInstanceOf(byte[].class); - assertThat(expected).as("Actual should be a byte[]").isInstanceOf(byte[].class); + assertThat(actual).as("Actual should be a byte[]").isInstanceOf(byte[].class); assertThat(actual).as("Array contents should be equal").isEqualTo(expected); break; case STRUCT: diff --git a/data/src/test/java/org/apache/iceberg/data/RandomGenericData.java b/data/src/test/java/org/apache/iceberg/data/RandomGenericData.java index 06cdd4d30bce..907fe7345fd7 100644 --- a/data/src/test/java/org/apache/iceberg/data/RandomGenericData.java +++ b/data/src/test/java/org/apache/iceberg/data/RandomGenericData.java @@ -19,6 +19,7 @@ package org.apache.iceberg.data; import static java.time.temporal.ChronoUnit.MICROS; +import static java.time.temporal.ChronoUnit.NANOS; import java.nio.ByteBuffer; import java.time.Instant; @@ -242,12 +243,19 @@ public Object primitive(Type.PrimitiveType primitive) { case TIME: return LocalTime.ofNanoOfDay((long) result * 1000); case TIMESTAMP: - Types.TimestampType ts = (Types.TimestampType) primitive; - if (ts.shouldAdjustToUTC()) { + Types.TimestampType ts6 = (Types.TimestampType) primitive; + if (ts6.shouldAdjustToUTC()) { return EPOCH.plus((long) result, MICROS); } else { return EPOCH.plus((long) result, MICROS).toLocalDateTime(); } + case TIMESTAMP_NANO: + Types.TimestampNanoType ts9 = (Types.TimestampNanoType) primitive; + if (ts9.shouldAdjustToUTC()) { + return EPOCH.plus((long) result, NANOS); + } else { + return EPOCH.plus((long) result, NANOS).toLocalDateTime(); + } default: return result; } diff --git a/data/src/test/java/org/apache/iceberg/data/avro/TestGenericData.java b/data/src/test/java/org/apache/iceberg/data/avro/TestGenericData.java index 808fb3317339..19c8822e81be 100644 --- a/data/src/test/java/org/apache/iceberg/data/avro/TestGenericData.java +++ b/data/src/test/java/org/apache/iceberg/data/avro/TestGenericData.java @@ -88,4 +88,14 @@ private void writeAndValidate(Schema writeSchema, Schema expectedSchema, List expected = RandomInternalData.generate(schema, 100, 1376L); + List expected = RandomInternalData.generate(schema, 100, 1376L); + writeAndValidate(schema, expected); + } + + @Override + protected void writeAndValidate(Schema writeSchema, Schema expectedSchema) throws IOException { + List expected = RandomInternalData.generate(writeSchema, 100, 1376L); + writeAndValidate(writeSchema, expectedSchema, expected); + } + + @Override + protected void writeAndValidate(Schema schema, List expected) throws IOException { + writeAndValidate(schema, schema, expected); + } + protected void writeAndValidate(Schema writeSchema, Schema expectedSchema, List expected) + throws IOException { OutputFile outputFile = new InMemoryOutputFile(); try (DataWriter dataWriter = Parquet.writeData(outputFile) - .schema(schema) + .schema(writeSchema) .createWriterFunc(InternalWriter::create) .overwrite() .withSpec(PartitionSpec.unpartitioned()) .build()) { - for (StructLike record : expected) { + for (Record record : expected) { dataWriter.write(record); } } - List rows; - try (CloseableIterable reader = + List rows; + try (CloseableIterable reader = Parquet.read(outputFile.toInputFile()) - .project(schema) - .createReaderFunc(fileSchema -> InternalReader.create(schema, fileSchema)) + .project(expectedSchema) + .createReaderFunc(fileSchema -> InternalReader.create(expectedSchema, fileSchema)) .build()) { rows = Lists.newArrayList(reader); } for (int i = 0; i < expected.size(); i += 1) { - InternalTestHelpers.assertEquals(schema.asStruct(), expected.get(i), rows.get(i)); + InternalTestHelpers.assertEquals(expectedSchema.asStruct(), expected.get(i), rows.get(i)); } // test reuseContainers - try (CloseableIterable reader = + try (CloseableIterable reader = Parquet.read(outputFile.toInputFile()) - .project(schema) + .project(expectedSchema) .reuseContainers() - .createReaderFunc(fileSchema -> InternalReader.create(schema, fileSchema)) + .createReaderFunc(fileSchema -> InternalReader.create(expectedSchema, fileSchema)) .build()) { int index = 0; - for (StructLike actualRecord : reader) { - InternalTestHelpers.assertEquals(schema.asStruct(), expected.get(index), actualRecord); + for (Record actualRecord : reader) { + InternalTestHelpers.assertEquals( + expectedSchema.asStruct(), expected.get(index), actualRecord); index += 1; } }