diff --git a/api/src/test/java/org/apache/iceberg/util/RandomUtil.java b/api/src/test/java/org/apache/iceberg/util/RandomUtil.java index 42dd6825a4d5..7414a457c520 100644 --- a/api/src/test/java/org/apache/iceberg/util/RandomUtil.java +++ b/api/src/test/java/org/apache/iceberg/util/RandomUtil.java @@ -21,7 +21,14 @@ import java.math.BigDecimal; import java.math.BigInteger; import java.util.Arrays; +import java.util.List; +import java.util.Map; import java.util.Random; +import java.util.Set; +import java.util.function.Supplier; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; @@ -228,4 +235,54 @@ private static BigInteger randomUnscaled(int precision, Random random) { return new BigInteger(sb.toString()); } + + public static List generateList( + Random random, Types.ListType list, Supplier elementResult) { + int numElements = random.nextInt(20); + + List result = Lists.newArrayListWithExpectedSize(numElements); + for (int i = 0; i < numElements; i += 1) { + // return null 5% of the time when the value is optional + if (list.isElementOptional() && random.nextInt(20) == 1) { + result.add(null); + } else { + result.add(elementResult.get()); + } + } + + return result; + } + + public static Map generateMap( + Random random, Types.MapType map, Supplier keyResult, Supplier valueResult) { + int numEntries = random.nextInt(20); + + Map result = Maps.newLinkedHashMap(); + Supplier keyFunc; + if (map.keyType() == Types.StringType.get()) { + keyFunc = () -> keyResult.get().toString(); + } else { + keyFunc = keyResult; + } + + Set keySet = Sets.newHashSet(); + for (int i = 0; i < numEntries; i += 1) { + Object key = keyFunc.get(); + // ensure no collisions + while (keySet.contains(key)) { + key = keyFunc.get(); + } + + keySet.add(key); + + // return null 5% of the time when the value is optional + if (map.isValueOptional() && random.nextInt(20) == 1) { + result.put(key, null); + } else { + result.put(key, valueResult.get()); + } + } + + return result; + } } diff --git a/core/src/main/java/org/apache/iceberg/avro/BaseWriteBuilder.java b/core/src/main/java/org/apache/iceberg/avro/BaseWriteBuilder.java new file mode 100644 index 000000000000..f8a2bc604656 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/avro/BaseWriteBuilder.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.avro; + +import java.util.List; +import org.apache.avro.LogicalType; +import org.apache.avro.LogicalTypes; +import org.apache.avro.Schema; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +abstract class BaseWriteBuilder extends AvroSchemaVisitor> { + + protected abstract ValueWriter createRecordWriter(List> fields); + + protected abstract ValueWriter fixedWriter(int length); + + @Override + public ValueWriter record(Schema record, List names, List> fields) { + return createRecordWriter(fields); + } + + @Override + public ValueWriter union(Schema union, List> options) { + Preconditions.checkArgument( + options.size() == 2, "Cannot create writer for non-option union: %s", union); + if (union.getTypes().get(0).getType() == Schema.Type.NULL) { + return ValueWriters.option(0, options.get(1)); + } else if (union.getTypes().get(1).getType() == Schema.Type.NULL) { + return ValueWriters.option(1, options.get(0)); + } else { + throw new IllegalArgumentException( + String.format("Cannot create writer for non-option union: %s", union)); + } + } + + @Override + public ValueWriter array(Schema array, ValueWriter elementWriter) { + if (array.getLogicalType() instanceof LogicalMap) { + ValueWriters.StructWriter keyValueWriter = (ValueWriters.StructWriter) elementWriter; + return ValueWriters.arrayMap(keyValueWriter.writer(0), keyValueWriter.writer(1)); + } + + return ValueWriters.array(elementWriter); + } + + @Override + public ValueWriter map(Schema map, ValueWriter valueWriter) { + return ValueWriters.map(ValueWriters.strings(), valueWriter); + } + + @Override + public ValueWriter primitive(Schema primitive) { + LogicalType logicalType = primitive.getLogicalType(); + if (logicalType != null) { + switch (logicalType.getName()) { + case "date": + return ValueWriters.ints(); + + case "time-micros": + return ValueWriters.longs(); + + case "timestamp-micros": + return ValueWriters.longs(); + + case "decimal": + LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) logicalType; + return ValueWriters.decimal(decimal.getPrecision(), decimal.getScale()); + + case "uuid": + return ValueWriters.uuids(); + + default: + throw new IllegalArgumentException("Unsupported logical type: " + logicalType); + } + } + + switch (primitive.getType()) { + case NULL: + return ValueWriters.nulls(); + case BOOLEAN: + return ValueWriters.booleans(); + case INT: + return ValueWriters.ints(); + case LONG: + return ValueWriters.longs(); + case FLOAT: + return ValueWriters.floats(); + case DOUBLE: + return ValueWriters.doubles(); + case STRING: + return ValueWriters.strings(); + case FIXED: + return fixedWriter(primitive.getFixedSize()); + case BYTES: + return ValueWriters.byteBuffers(); + default: + throw new IllegalArgumentException("Unsupported type: " + primitive); + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/avro/GenericAvroWriter.java b/core/src/main/java/org/apache/iceberg/avro/GenericAvroWriter.java index 421bfc9dc462..316dd94d2f68 100644 --- a/core/src/main/java/org/apache/iceberg/avro/GenericAvroWriter.java +++ b/core/src/main/java/org/apache/iceberg/avro/GenericAvroWriter.java @@ -21,12 +21,9 @@ import java.io.IOException; import java.util.List; import java.util.stream.Stream; -import org.apache.avro.LogicalType; -import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; import org.apache.avro.io.Encoder; import org.apache.iceberg.FieldMetrics; -import org.apache.iceberg.relocated.com.google.common.base.Preconditions; public class GenericAvroWriter implements MetricsAwareDatumWriter { private ValueWriter writer = null; @@ -55,92 +52,16 @@ public Stream metrics() { return writer.metrics(); } - private static class WriteBuilder extends AvroSchemaVisitor> { - private WriteBuilder() {} + private static class WriteBuilder extends BaseWriteBuilder { @Override - public ValueWriter record(Schema record, List names, List> fields) { + protected ValueWriter createRecordWriter(List> fields) { return ValueWriters.record(fields); } @Override - public ValueWriter union(Schema union, List> options) { - Preconditions.checkArgument( - options.contains(ValueWriters.nulls()), - "Cannot create writer for non-option union: %s", - union); - Preconditions.checkArgument( - options.size() == 2, "Cannot create writer for non-option union: %s", union); - if (union.getTypes().get(0).getType() == Schema.Type.NULL) { - return ValueWriters.option(0, options.get(1)); - } else { - return ValueWriters.option(1, options.get(0)); - } - } - - @Override - public ValueWriter array(Schema array, ValueWriter elementWriter) { - if (array.getLogicalType() instanceof LogicalMap) { - ValueWriters.StructWriter keyValueWriter = (ValueWriters.StructWriter) elementWriter; - return ValueWriters.arrayMap(keyValueWriter.writer(0), keyValueWriter.writer(1)); - } - - return ValueWriters.array(elementWriter); - } - - @Override - public ValueWriter map(Schema map, ValueWriter valueWriter) { - return ValueWriters.map(ValueWriters.strings(), valueWriter); - } - - @Override - public ValueWriter primitive(Schema primitive) { - LogicalType logicalType = primitive.getLogicalType(); - if (logicalType != null) { - switch (logicalType.getName()) { - case "date": - return ValueWriters.ints(); - - case "time-micros": - return ValueWriters.longs(); - - case "timestamp-micros": - return ValueWriters.longs(); - - case "decimal": - LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) logicalType; - return ValueWriters.decimal(decimal.getPrecision(), decimal.getScale()); - - case "uuid": - return ValueWriters.uuids(); - - default: - throw new IllegalArgumentException("Unsupported logical type: " + logicalType); - } - } - - switch (primitive.getType()) { - case NULL: - return ValueWriters.nulls(); - case BOOLEAN: - return ValueWriters.booleans(); - case INT: - return ValueWriters.ints(); - case LONG: - return ValueWriters.longs(); - case FLOAT: - return ValueWriters.floats(); - case DOUBLE: - return ValueWriters.doubles(); - case STRING: - return ValueWriters.strings(); - case FIXED: - return ValueWriters.genericFixed(primitive.getFixedSize()); - case BYTES: - return ValueWriters.byteBuffers(); - default: - throw new IllegalArgumentException("Unsupported type: " + primitive); - } + protected ValueWriter fixedWriter(int length) { + return ValueWriters.genericFixed(length); } } } diff --git a/core/src/main/java/org/apache/iceberg/avro/InternalReader.java b/core/src/main/java/org/apache/iceberg/avro/InternalReader.java index ca83ce2ba7cd..0dab3646d4d4 100644 --- a/core/src/main/java/org/apache/iceberg/avro/InternalReader.java +++ b/core/src/main/java/org/apache/iceberg/avro/InternalReader.java @@ -205,7 +205,6 @@ public ValueReader primitive(Pair partner, Schema primitive) { case STRING: return ValueReaders.strings(); case FIXED: - return ValueReaders.fixed(primitive); case BYTES: return ValueReaders.byteBuffers(); case ENUM: diff --git a/core/src/main/java/org/apache/iceberg/avro/InternalWriter.java b/core/src/main/java/org/apache/iceberg/avro/InternalWriter.java new file mode 100644 index 000000000000..8f20aeeb6bfd --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/avro/InternalWriter.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.avro; + +import java.io.IOException; +import java.util.List; +import java.util.stream.Stream; +import org.apache.avro.Schema; +import org.apache.avro.io.Encoder; +import org.apache.iceberg.FieldMetrics; +import org.apache.iceberg.types.Type; + +/** + * A Writer that consumes Iceberg's internal in-memory object model. + * + *

Iceberg's internal in-memory object model produces the types defined in {@link + * Type.TypeID#javaClass()}. + */ +public class InternalWriter implements MetricsAwareDatumWriter { + private ValueWriter writer = null; + + public static InternalWriter create(Schema schema) { + return new InternalWriter<>(schema); + } + + InternalWriter(Schema schema) { + setSchema(schema); + } + + @Override + @SuppressWarnings("unchecked") + public void setSchema(Schema schema) { + this.writer = (ValueWriter) AvroSchemaVisitor.visit(schema, new WriteBuilder()); + } + + @Override + public void write(T datum, Encoder out) throws IOException { + writer.write(datum, out); + } + + @Override + public Stream metrics() { + return writer.metrics(); + } + + private static class WriteBuilder extends BaseWriteBuilder { + + @Override + protected ValueWriter createRecordWriter(List> fields) { + return ValueWriters.struct(fields); + } + + @Override + protected ValueWriter fixedWriter(int length) { + return ValueWriters.fixedBuffers(length); + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/avro/ValueWriters.java b/core/src/main/java/org/apache/iceberg/avro/ValueWriters.java index 3844ede1c16a..afc0a37a2838 100644 --- a/core/src/main/java/org/apache/iceberg/avro/ValueWriters.java +++ b/core/src/main/java/org/apache/iceberg/avro/ValueWriters.java @@ -32,6 +32,7 @@ import org.apache.avro.generic.IndexedRecord; import org.apache.avro.io.Encoder; import org.apache.avro.util.Utf8; +import org.apache.iceberg.StructLike; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.util.DecimalUtil; @@ -92,6 +93,10 @@ public static ValueWriter genericFixed(int length) { return new GenericFixedWriter(length); } + public static ValueWriter fixedBuffers(int length) { + return new FixedByteBufferWriter(length); + } + public static ValueWriter bytes() { return BytesWriter.INSTANCE; } @@ -126,6 +131,10 @@ public static ValueWriter record(List> writers) { return new RecordWriter(writers); } + public static ValueWriter struct(List> writers) { + return new StructLikeWriter(writers); + } + private static class NullWriter implements ValueWriter { private static final NullWriter INSTANCE = new NullWriter(); @@ -327,6 +336,24 @@ public void write(ByteBuffer bytes, Encoder encoder) throws IOException { } } + private static class FixedByteBufferWriter implements ValueWriter { + private final int length; + + private FixedByteBufferWriter(int length) { + this.length = length; + } + + @Override + public void write(ByteBuffer bytes, Encoder encoder) throws IOException { + Preconditions.checkArgument( + bytes.remaining() == length, + "Cannot write byte buffer of length %s as fixed[%s]", + bytes.remaining(), + length); + encoder.writeBytes(bytes); + } + } + private static class DecimalWriter implements ValueWriter { private final int precision; private final int scale; @@ -484,4 +511,15 @@ protected Object get(IndexedRecord struct, int pos) { return struct.get(pos); } } + + private static class StructLikeWriter extends StructWriter { + private StructLikeWriter(List> writers) { + super(writers); + } + + @Override + protected Object get(StructLike struct, int pos) { + return struct.get(pos, Object.class); + } + } } diff --git a/core/src/test/java/org/apache/iceberg/InternalTestHelpers.java b/core/src/test/java/org/apache/iceberg/InternalTestHelpers.java new file mode 100644 index 000000000000..7b7400b69ab4 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/InternalTestHelpers.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.List; +import java.util.Map; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; + +public class InternalTestHelpers { + + private InternalTestHelpers() {} + + public static void assertEquals(Types.StructType struct, StructLike expected, StructLike actual) { + List fields = struct.fields(); + for (int i = 0; i < fields.size(); i += 1) { + Type fieldType = fields.get(i).type(); + + Object expectedValue = expected.get(i, fieldType.typeId().javaClass()); + Object actualValue = actual.get(i, fieldType.typeId().javaClass()); + + assertEquals(fieldType, expectedValue, actualValue); + } + } + + public static void assertEquals(Types.ListType list, List expected, List actual) { + Type elementType = list.elementType(); + + assertThat(actual).as("List size should match").hasSameSizeAs(expected); + + for (int i = 0; i < expected.size(); i += 1) { + Object expectedValue = expected.get(i); + Object actualValue = actual.get(i); + + assertEquals(elementType, expectedValue, actualValue); + } + } + + public static void assertEquals(Types.MapType map, Map expected, Map actual) { + Type valueType = map.valueType(); + + assertThat(actual).as("Map keys should match").hasSameSizeAs(expected); + + for (Object expectedKey : expected.keySet()) { + Object expectedValue = expected.get(expectedKey); + Object actualValue = actual.get(expectedKey); + + assertEquals(valueType, expectedValue, actualValue); + } + } + + private static void assertEquals(Type type, Object expected, Object actual) { + if (expected == null && actual == null) { + return; + } + + switch (type.typeId()) { + case BOOLEAN: + case INTEGER: + case LONG: + case FLOAT: + case DOUBLE: + case STRING: + case DATE: + case TIME: + case TIMESTAMP: + case UUID: + case FIXED: + case BINARY: + case DECIMAL: + assertThat(actual).as("Primitive value should be equal to expected").isEqualTo(expected); + break; + case STRUCT: + assertThat(expected).as("Expected should be a StructLike").isInstanceOf(StructLike.class); + assertThat(actual).as("Actual should be a StructLike").isInstanceOf(StructLike.class); + assertEquals(type.asStructType(), (StructLike) expected, (StructLike) actual); + break; + case LIST: + assertThat(expected).as("Expected should be a List").isInstanceOf(List.class); + assertThat(actual).as("Actual should be a List").isInstanceOf(List.class); + assertEquals(type.asListType(), (List) expected, (List) actual); + break; + case MAP: + assertThat(expected).as("Expected should be a Map").isInstanceOf(Map.class); + assertThat(actual).as("Actual should be a Map").isInstanceOf(Map.class); + assertEquals(type.asMapType(), (Map) expected, (Map) actual); + break; + default: + throw new IllegalArgumentException("Not a supported type: " + type); + } + } +} diff --git a/core/src/test/java/org/apache/iceberg/RandomInternalData.java b/core/src/test/java/org/apache/iceberg/RandomInternalData.java new file mode 100644 index 000000000000..a7de8e4c8f01 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/RandomInternalData.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Random; +import java.util.UUID; +import java.util.function.Supplier; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.RandomUtil; + +public class RandomInternalData { + + private RandomInternalData() {} + + public static List generate(Schema schema, int numRecords, long seed) { + RandomDataGenerator generator = new RandomDataGenerator(seed); + List records = Lists.newArrayListWithExpectedSize(numRecords); + for (int i = 0; i < numRecords; i += 1) { + records.add((StructLike) TypeUtil.visit(schema, generator)); + } + + return records; + } + + private static class RandomDataGenerator extends TypeUtil.CustomOrderSchemaVisitor { + private final Random random; + + private RandomDataGenerator(long seed) { + this.random = new Random(seed); + } + + @Override + public StructLike schema(Schema schema, Supplier structResult) { + return (StructLike) structResult.get(); + } + + @Override + public StructLike struct(Types.StructType struct, Iterable fieldResults) { + StructLike rec = GenericRecord.create(struct); + List values = Lists.newArrayList(fieldResults); + for (int i = 0; i < values.size(); i += 1) { + rec.set(i, values.get(i)); + } + + return rec; + } + + @Override + public Object field(Types.NestedField field, Supplier fieldResult) { + // return null 5% of the time when the value is optional + if (field.isOptional() && random.nextInt(20) == 1) { + return null; + } + return fieldResult.get(); + } + + @Override + public Object list(Types.ListType list, Supplier elementResult) { + return RandomUtil.generateList(random, list, elementResult); + } + + @Override + public Object map(Types.MapType map, Supplier keyResult, Supplier valueResult) { + return RandomUtil.generateMap(random, map, keyResult, valueResult); + } + + @Override + public Object primitive(Type.PrimitiveType primitive) { + Object result = RandomUtil.generatePrimitive(primitive, random); + + switch (primitive.typeId()) { + case FIXED: + case BINARY: + return ByteBuffer.wrap((byte[]) result); + case UUID: + return UUID.nameUUIDFromBytes((byte[]) result); + default: + return result; + } + } + } +} diff --git a/core/src/test/java/org/apache/iceberg/avro/RandomAvroData.java b/core/src/test/java/org/apache/iceberg/avro/RandomAvroData.java index 473a468f7e94..ebb9d93a5342 100644 --- a/core/src/test/java/org/apache/iceberg/avro/RandomAvroData.java +++ b/core/src/test/java/org/apache/iceberg/avro/RandomAvroData.java @@ -22,7 +22,6 @@ import java.util.List; import java.util.Map; import java.util.Random; -import java.util.Set; import java.util.UUID; import java.util.function.Supplier; import org.apache.avro.generic.GenericData; @@ -30,8 +29,6 @@ import org.apache.avro.util.Utf8; import org.apache.iceberg.Schema; import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.relocated.com.google.common.collect.Maps; -import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.types.Types; @@ -88,52 +85,12 @@ public Object field(Types.NestedField field, Supplier fieldResult) { @Override public Object list(Types.ListType list, Supplier elementResult) { - int numElements = random.nextInt(20); - - List result = Lists.newArrayListWithExpectedSize(numElements); - for (int i = 0; i < numElements; i += 1) { - // return null 5% of the time when the value is optional - if (list.isElementOptional() && random.nextInt(20) == 1) { - result.add(null); - } else { - result.add(elementResult.get()); - } - } - - return result; + return RandomUtil.generateList(random, list, elementResult); } @Override public Object map(Types.MapType map, Supplier keyResult, Supplier valueResult) { - int numEntries = random.nextInt(20); - - Map result = Maps.newLinkedHashMap(); - Supplier keyFunc; - if (map.keyType() == Types.StringType.get()) { - keyFunc = () -> keyResult.get().toString(); - } else { - keyFunc = keyResult; - } - - Set keySet = Sets.newHashSet(); - for (int i = 0; i < numEntries; i += 1) { - Object key = keyFunc.get(); - // ensure no collisions - while (keySet.contains(key)) { - key = keyFunc.get(); - } - - keySet.add(key); - - // return null 5% of the time when the value is optional - if (map.isValueOptional() && random.nextInt(20) == 1) { - result.put(key, null); - } else { - result.put(key, valueResult.get()); - } - } - - return result; + return RandomUtil.generateMap(random, map, keyResult, valueResult); } @Override diff --git a/core/src/test/java/org/apache/iceberg/avro/TestInternalAvro.java b/core/src/test/java/org/apache/iceberg/avro/TestInternalAvro.java new file mode 100644 index 000000000000..b48109737f7c --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/avro/TestInternalAvro.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.avro; + +import java.io.IOException; +import java.util.List; +import org.apache.iceberg.InternalTestHelpers; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.RandomInternalData; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.inmemory.InMemoryOutputFile; +import org.apache.iceberg.io.DataWriter; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; + +public class TestInternalAvro extends AvroDataTest { + @Override + protected void writeAndValidate(Schema schema) throws IOException { + List expected = RandomInternalData.generate(schema, 100, 42L); + + OutputFile outputFile = new InMemoryOutputFile(); + + try (DataWriter dataWriter = + Avro.writeData(outputFile) + .schema(schema) + .createWriterFunc(InternalWriter::create) + .overwrite() + .withSpec(PartitionSpec.unpartitioned()) + .build()) { + for (StructLike rec : expected) { + dataWriter.write(rec); + } + } + + List rows; + try (AvroIterable reader = + Avro.read(outputFile.toInputFile()) + .project(schema) + .createResolvingReader(InternalReader::create) + .build()) { + rows = Lists.newArrayList(reader); + } + + for (int i = 0; i < expected.size(); i += 1) { + InternalTestHelpers.assertEquals(schema.asStruct(), expected.get(i), rows.get(i)); + } + } +}