From 247e2e6af200c4ed01e6d7b625fe878e7a775869 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Wed, 8 Apr 2020 10:53:40 -0700 Subject: [PATCH 1/3] Move Avro generics to core. --- {data => api}/src/main/java/org/apache/iceberg/data/Record.java | 0 .../src/main/java/org/apache/iceberg/data/GenericRecord.java | 0 .../src/main/java/org/apache/iceberg/data/avro/DataReader.java | 0 .../src/main/java/org/apache/iceberg/data/avro/DataWriter.java | 0 .../main/java/org/apache/iceberg/data/avro/GenericReaders.java | 2 +- .../main/java/org/apache/iceberg/data/avro/GenericWriters.java | 0 .../main/java/org/apache/iceberg/data/avro/IcebergDecoder.java | 0 .../main/java/org/apache/iceberg/data/avro/IcebergEncoder.java | 0 .../src/main/java/org/apache/iceberg/util}/DateTimeUtil.java | 2 +- .../java/org/apache/iceberg/data/InternalRecordWrapper.java | 1 + .../main/java/org/apache/iceberg/data/TableScanIterable.java | 1 + 11 files changed, 4 insertions(+), 2 deletions(-) rename {data => api}/src/main/java/org/apache/iceberg/data/Record.java (100%) rename {data => core}/src/main/java/org/apache/iceberg/data/GenericRecord.java (100%) rename {data => core}/src/main/java/org/apache/iceberg/data/avro/DataReader.java (100%) rename {data => core}/src/main/java/org/apache/iceberg/data/avro/DataWriter.java (100%) rename {data => core}/src/main/java/org/apache/iceberg/data/avro/GenericReaders.java (98%) rename {data => core}/src/main/java/org/apache/iceberg/data/avro/GenericWriters.java (100%) rename {data => core}/src/main/java/org/apache/iceberg/data/avro/IcebergDecoder.java (100%) rename {data => core}/src/main/java/org/apache/iceberg/data/avro/IcebergEncoder.java (100%) rename {data/src/main/java/org/apache/iceberg/data => core/src/main/java/org/apache/iceberg/util}/DateTimeUtil.java (98%) diff --git a/data/src/main/java/org/apache/iceberg/data/Record.java b/api/src/main/java/org/apache/iceberg/data/Record.java similarity index 100% rename from data/src/main/java/org/apache/iceberg/data/Record.java rename to api/src/main/java/org/apache/iceberg/data/Record.java diff --git a/data/src/main/java/org/apache/iceberg/data/GenericRecord.java b/core/src/main/java/org/apache/iceberg/data/GenericRecord.java similarity index 100% rename from data/src/main/java/org/apache/iceberg/data/GenericRecord.java rename to core/src/main/java/org/apache/iceberg/data/GenericRecord.java diff --git a/data/src/main/java/org/apache/iceberg/data/avro/DataReader.java b/core/src/main/java/org/apache/iceberg/data/avro/DataReader.java similarity index 100% rename from data/src/main/java/org/apache/iceberg/data/avro/DataReader.java rename to core/src/main/java/org/apache/iceberg/data/avro/DataReader.java diff --git a/data/src/main/java/org/apache/iceberg/data/avro/DataWriter.java b/core/src/main/java/org/apache/iceberg/data/avro/DataWriter.java similarity index 100% rename from data/src/main/java/org/apache/iceberg/data/avro/DataWriter.java rename to core/src/main/java/org/apache/iceberg/data/avro/DataWriter.java diff --git a/data/src/main/java/org/apache/iceberg/data/avro/GenericReaders.java b/core/src/main/java/org/apache/iceberg/data/avro/GenericReaders.java similarity index 98% rename from data/src/main/java/org/apache/iceberg/data/avro/GenericReaders.java rename to core/src/main/java/org/apache/iceberg/data/avro/GenericReaders.java index df866726b90d..239f09dbeddc 100644 --- a/data/src/main/java/org/apache/iceberg/data/avro/GenericReaders.java +++ b/core/src/main/java/org/apache/iceberg/data/avro/GenericReaders.java @@ -29,10 +29,10 @@ import org.apache.avro.io.Decoder; import org.apache.iceberg.avro.ValueReader; import org.apache.iceberg.avro.ValueReaders; -import org.apache.iceberg.data.DateTimeUtil; import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.Record; import org.apache.iceberg.types.Types.StructType; +import org.apache.iceberg.util.DateTimeUtil; class GenericReaders { private GenericReaders() { diff --git a/data/src/main/java/org/apache/iceberg/data/avro/GenericWriters.java b/core/src/main/java/org/apache/iceberg/data/avro/GenericWriters.java similarity index 100% rename from data/src/main/java/org/apache/iceberg/data/avro/GenericWriters.java rename to core/src/main/java/org/apache/iceberg/data/avro/GenericWriters.java diff --git a/data/src/main/java/org/apache/iceberg/data/avro/IcebergDecoder.java b/core/src/main/java/org/apache/iceberg/data/avro/IcebergDecoder.java similarity index 100% rename from data/src/main/java/org/apache/iceberg/data/avro/IcebergDecoder.java rename to core/src/main/java/org/apache/iceberg/data/avro/IcebergDecoder.java diff --git a/data/src/main/java/org/apache/iceberg/data/avro/IcebergEncoder.java b/core/src/main/java/org/apache/iceberg/data/avro/IcebergEncoder.java similarity index 100% rename from data/src/main/java/org/apache/iceberg/data/avro/IcebergEncoder.java rename to core/src/main/java/org/apache/iceberg/data/avro/IcebergEncoder.java diff --git a/data/src/main/java/org/apache/iceberg/data/DateTimeUtil.java b/core/src/main/java/org/apache/iceberg/util/DateTimeUtil.java similarity index 98% rename from data/src/main/java/org/apache/iceberg/data/DateTimeUtil.java rename to core/src/main/java/org/apache/iceberg/util/DateTimeUtil.java index 7380b8cf1f35..c155f005b9d9 100644 --- a/data/src/main/java/org/apache/iceberg/data/DateTimeUtil.java +++ b/core/src/main/java/org/apache/iceberg/util/DateTimeUtil.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.iceberg.data; +package org.apache.iceberg.util; import java.time.Instant; import java.time.LocalDate; diff --git a/data/src/main/java/org/apache/iceberg/data/InternalRecordWrapper.java b/data/src/main/java/org/apache/iceberg/data/InternalRecordWrapper.java index c9efb3d41e67..d3ffcf940995 100644 --- a/data/src/main/java/org/apache/iceberg/data/InternalRecordWrapper.java +++ b/data/src/main/java/org/apache/iceberg/data/InternalRecordWrapper.java @@ -29,6 +29,7 @@ import org.apache.iceberg.StructLike; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.DateTimeUtil; class InternalRecordWrapper implements StructLike { private final Function[] transforms; diff --git a/data/src/main/java/org/apache/iceberg/data/TableScanIterable.java b/data/src/main/java/org/apache/iceberg/data/TableScanIterable.java index 77f2c0e46c0a..31bd7980091c 100644 --- a/data/src/main/java/org/apache/iceberg/data/TableScanIterable.java +++ b/data/src/main/java/org/apache/iceberg/data/TableScanIterable.java @@ -50,6 +50,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.DateTimeUtil; import org.apache.iceberg.util.PartitionUtil; class TableScanIterable extends CloseableGroup implements CloseableIterable { From dadaaec953be75be4ef02e995ac4c6516013bb59 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Tue, 26 May 2020 16:25:22 -0700 Subject: [PATCH 2/3] Update TestMetrics to use Iceberg generics. This is to support ORC metrics using the same tests. ORC doesn't support writing Avro records. --- build.gradle | 1 + .../java/org/apache/iceberg/TestMetrics.java | 307 +++++++++--------- .../iceberg/parquet/TestParquetMetrics.java | 93 ++++++ .../iceberg/parquet/TestParquetMetrics.java | 66 ---- 4 files changed, 239 insertions(+), 228 deletions(-) create mode 100644 data/src/test/java/org/apache/iceberg/parquet/TestParquetMetrics.java delete mode 100644 parquet/src/test/java/org/apache/iceberg/parquet/TestParquetMetrics.java diff --git a/build.gradle b/build.gradle index 454167a5547d..0208727c609f 100644 --- a/build.gradle +++ b/build.gradle @@ -193,6 +193,7 @@ project(':iceberg-data') { } testCompile project(path: ':iceberg-api', configuration: 'testArtifacts') + testCompile project(path: ':iceberg-core', configuration: 'testArtifacts') } test { diff --git a/core/src/test/java/org/apache/iceberg/TestMetrics.java b/core/src/test/java/org/apache/iceberg/TestMetrics.java index 190712b96cb7..1ebc157847df 100644 --- a/core/src/test/java/org/apache/iceberg/TestMetrics.java +++ b/core/src/test/java/org/apache/iceberg/TestMetrics.java @@ -19,7 +19,6 @@ package org.apache.iceberg; -import java.io.File; import java.io.IOException; import java.math.BigDecimal; import java.nio.ByteBuffer; @@ -28,13 +27,9 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.UUID; -import org.apache.avro.generic.GenericData; -import org.apache.avro.generic.GenericData.Record; -import org.apache.avro.generic.GenericFixed; -import org.apache.iceberg.avro.AvroSchemaUtil; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; import org.apache.iceberg.io.InputFile; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.types.Type; @@ -54,12 +49,11 @@ import org.apache.iceberg.types.Types.StructType; import org.apache.iceberg.types.Types.TimeType; import org.apache.iceberg.types.Types.TimestampType; -import org.apache.iceberg.types.Types.UUIDType; +import org.apache.iceberg.util.DateTimeUtil; import org.junit.Assert; +import org.junit.Assume; import org.junit.Test; -import static org.apache.iceberg.Files.localInput; -import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES; import static org.apache.iceberg.types.Conversions.fromByteBuffer; import static org.apache.iceberg.types.Types.NestedField.optional; import static org.apache.iceberg.types.Types.NestedField.required; @@ -68,7 +62,6 @@ */ public abstract class TestMetrics { - public static final int ROW_GROUP_SIZE = 1600; private static final StructType LEAF_STRUCT_TYPE = StructType.of( optional(5, "leafLongCol", LongType.get()), optional(6, "leafBinaryCol", BinaryType.get()) @@ -95,59 +88,57 @@ public abstract class TestMetrics { optional(8, "dateCol", DateType.get()), required(9, "timeCol", TimeType.get()), required(10, "timestampCol", TimestampType.withoutZone()), - optional(11, "uuidCol", UUIDType.get()), - required(12, "fixedCol", FixedType.ofLength(4)), - required(13, "binaryCol", BinaryType.get()) + required(11, "fixedCol", FixedType.ofLength(4)), + required(12, "binaryCol", BinaryType.get()) ); - private final UUID uuid = UUID.randomUUID(); - private final GenericFixed fixed = new GenericData.Fixed( - org.apache.avro.Schema.createFixed("fixedCol", null, null, 4), - "abcd".getBytes(StandardCharsets.UTF_8)); + private final byte[] fixed = "abcd".getBytes(StandardCharsets.UTF_8); public abstract Metrics getMetrics(InputFile file); - public abstract File writeRecords(Schema schema, Record... records) throws IOException; + public abstract InputFile writeRecords(Schema schema, Record... records) throws IOException; - public abstract File writeRecords(Schema schema, Map properties, GenericData.Record... records) + public abstract InputFile writeRecordsWithSmallRowGroups(Schema schema, Record... records) throws IOException; - public abstract int splitCount(File parquetFile) throws IOException; + public abstract int splitCount(InputFile parquetFile) throws IOException; + + public boolean supportsSmallRowGroups() { + return false; + } @Test public void testMetricsForRepeatedValues() throws IOException { - Record firstRecord = new Record(AvroSchemaUtil.convert(SIMPLE_SCHEMA.asStruct())); - firstRecord.put("booleanCol", true); - firstRecord.put("intCol", 3); - firstRecord.put("longCol", null); - firstRecord.put("floatCol", 2.0F); - firstRecord.put("doubleCol", 2.0D); - firstRecord.put("decimalCol", new BigDecimal("3.50")); - firstRecord.put("stringCol", "AAA"); - firstRecord.put("dateCol", 1500); - firstRecord.put("timeCol", 2000L); - firstRecord.put("timestampCol", 0L); - firstRecord.put("uuidCol", uuid); - firstRecord.put("fixedCol", fixed); - firstRecord.put("binaryCol", "S".getBytes()); - Record secondRecord = new Record(AvroSchemaUtil.convert(SIMPLE_SCHEMA.asStruct())); - secondRecord.put("booleanCol", true); - secondRecord.put("intCol", 3); - secondRecord.put("longCol", null); - secondRecord.put("floatCol", 2.0F); - secondRecord.put("doubleCol", 2.0D); - secondRecord.put("decimalCol", new BigDecimal("3.50")); - secondRecord.put("stringCol", "AAA"); - secondRecord.put("dateCol", 1500); - secondRecord.put("timeCol", 2000L); - secondRecord.put("timestampCol", 0L); - secondRecord.put("uuidCol", uuid); - secondRecord.put("fixedCol", fixed); - secondRecord.put("binaryCol", "S".getBytes()); - - File recordsFile = writeRecords(SIMPLE_SCHEMA, firstRecord, secondRecord); - - Metrics metrics = getMetrics(Files.localInput(recordsFile)); + Record firstRecord = GenericRecord.create(SIMPLE_SCHEMA); + firstRecord.setField("booleanCol", true); + firstRecord.setField("intCol", 3); + firstRecord.setField("longCol", null); + firstRecord.setField("floatCol", 2.0F); + firstRecord.setField("doubleCol", 2.0D); + firstRecord.setField("decimalCol", new BigDecimal("3.50")); + firstRecord.setField("stringCol", "AAA"); + firstRecord.setField("dateCol", DateTimeUtil.dateFromDays(1500)); + firstRecord.setField("timeCol", DateTimeUtil.timeFromMicros(2000L)); + firstRecord.setField("timestampCol", DateTimeUtil.timestampFromMicros(0L)); + firstRecord.setField("fixedCol", fixed); + firstRecord.setField("binaryCol", ByteBuffer.wrap("S".getBytes())); + Record secondRecord = GenericRecord.create(SIMPLE_SCHEMA); + secondRecord.setField("booleanCol", true); + secondRecord.setField("intCol", 3); + secondRecord.setField("longCol", null); + secondRecord.setField("floatCol", 2.0F); + secondRecord.setField("doubleCol", 2.0D); + secondRecord.setField("decimalCol", new BigDecimal("3.50")); + secondRecord.setField("stringCol", "AAA"); + secondRecord.setField("dateCol", DateTimeUtil.dateFromDays(1500)); + secondRecord.setField("timeCol", DateTimeUtil.timeFromMicros(2000L)); + secondRecord.setField("timestampCol", DateTimeUtil.timestampFromMicros(0L)); + secondRecord.setField("fixedCol", fixed); + secondRecord.setField("binaryCol", ByteBuffer.wrap("S".getBytes())); + + InputFile recordsFile = writeRecords(SIMPLE_SCHEMA, firstRecord, secondRecord); + + Metrics metrics = getMetrics(recordsFile); Assert.assertEquals(2L, (long) metrics.recordCount()); assertCounts(1, 2L, 0L, metrics); assertCounts(2, 2L, 0L, metrics); @@ -161,43 +152,40 @@ public void testMetricsForRepeatedValues() throws IOException { assertCounts(10, 2L, 0L, metrics); assertCounts(11, 2L, 0L, metrics); assertCounts(12, 2L, 0L, metrics); - assertCounts(13, 2L, 0L, metrics); } @Test public void testMetricsForTopLevelFields() throws IOException { - Record firstRecord = new Record(AvroSchemaUtil.convert(SIMPLE_SCHEMA.asStruct())); - firstRecord.put("booleanCol", true); - firstRecord.put("intCol", 3); - firstRecord.put("longCol", 5L); - firstRecord.put("floatCol", 2.0F); - firstRecord.put("doubleCol", 2.0D); - firstRecord.put("decimalCol", new BigDecimal("3.50")); - firstRecord.put("stringCol", "AAA"); - firstRecord.put("dateCol", 1500); - firstRecord.put("timeCol", 2000L); - firstRecord.put("timestampCol", 0L); - firstRecord.put("uuidCol", uuid); - firstRecord.put("fixedCol", fixed); - firstRecord.put("binaryCol", "S".getBytes()); - Record secondRecord = new Record(AvroSchemaUtil.convert(SIMPLE_SCHEMA.asStruct())); - secondRecord.put("booleanCol", false); - secondRecord.put("intCol", Integer.MIN_VALUE); - secondRecord.put("longCol", null); - secondRecord.put("floatCol", 1.0F); - secondRecord.put("doubleCol", null); - secondRecord.put("decimalCol", null); - secondRecord.put("stringCol", "ZZZ"); - secondRecord.put("dateCol", null); - secondRecord.put("timeCol", 3000L); - secondRecord.put("timestampCol", 1000L); - secondRecord.put("uuidCol", null); - secondRecord.put("fixedCol", fixed); - secondRecord.put("binaryCol", "W".getBytes()); - - File recordsFile = writeRecords(SIMPLE_SCHEMA, firstRecord, secondRecord); - - Metrics metrics = getMetrics(Files.localInput(recordsFile)); + Record firstRecord = GenericRecord.create(SIMPLE_SCHEMA); + firstRecord.setField("booleanCol", true); + firstRecord.setField("intCol", 3); + firstRecord.setField("longCol", 5L); + firstRecord.setField("floatCol", 2.0F); + firstRecord.setField("doubleCol", 2.0D); + firstRecord.setField("decimalCol", new BigDecimal("3.50")); + firstRecord.setField("stringCol", "AAA"); + firstRecord.setField("dateCol", DateTimeUtil.dateFromDays(1500)); + firstRecord.setField("timeCol", DateTimeUtil.timeFromMicros(2000L)); + firstRecord.setField("timestampCol", DateTimeUtil.timestampFromMicros(0L)); + firstRecord.setField("fixedCol", fixed); + firstRecord.setField("binaryCol", ByteBuffer.wrap("S".getBytes())); + Record secondRecord = GenericRecord.create(SIMPLE_SCHEMA); + secondRecord.setField("booleanCol", false); + secondRecord.setField("intCol", Integer.MIN_VALUE); + secondRecord.setField("longCol", null); + secondRecord.setField("floatCol", 1.0F); + secondRecord.setField("doubleCol", null); + secondRecord.setField("decimalCol", null); + secondRecord.setField("stringCol", "ZZZ"); + secondRecord.setField("dateCol", null); + secondRecord.setField("timeCol", DateTimeUtil.timeFromMicros(3000L)); + secondRecord.setField("timestampCol", DateTimeUtil.timestampFromMicros(1000L)); + secondRecord.setField("fixedCol", fixed); + secondRecord.setField("binaryCol", ByteBuffer.wrap("W".getBytes())); + + InputFile recordsFile = writeRecords(SIMPLE_SCHEMA, firstRecord, secondRecord); + + Metrics metrics = getMetrics(recordsFile); Assert.assertEquals(2L, (long) metrics.recordCount()); assertCounts(1, 2L, 0L, metrics); assertBounds(1, BooleanType.get(), false, true, metrics); @@ -219,13 +207,11 @@ public void testMetricsForTopLevelFields() throws IOException { assertBounds(9, TimeType.get(), 2000L, 3000L, metrics); assertCounts(10, 2L, 0L, metrics); assertBounds(10, TimestampType.withoutZone(), 0L, 1000L, metrics); - assertCounts(11, 2L, 1L, metrics); - assertBounds(11, UUIDType.get(), uuid, uuid, metrics); + assertCounts(11, 2L, 0L, metrics); + assertBounds(11, FixedType.ofLength(4), + ByteBuffer.wrap(fixed), ByteBuffer.wrap(fixed), metrics); assertCounts(12, 2L, 0L, metrics); - assertBounds(12, FixedType.ofLength(4), - ByteBuffer.wrap(fixed.bytes()), ByteBuffer.wrap(fixed.bytes()), metrics); - assertCounts(13, 2L, 0L, metrics); - assertBounds(13, BinaryType.get(), + assertBounds(12, BinaryType.get(), ByteBuffer.wrap("S".getBytes()), ByteBuffer.wrap("W".getBytes()), metrics); } @@ -237,14 +223,14 @@ public void testMetricsForDecimals() throws IOException { required(3, "decimalAsFixed", DecimalType.of(22, 2)) ); - Record record = new Record(AvroSchemaUtil.convert(schema.asStruct())); - record.put("decimalAsInt32", new BigDecimal("2.55")); - record.put("decimalAsInt64", new BigDecimal("4.75")); - record.put("decimalAsFixed", new BigDecimal("5.80")); + Record record = GenericRecord.create(schema); + record.setField("decimalAsInt32", new BigDecimal("2.55")); + record.setField("decimalAsInt64", new BigDecimal("4.75")); + record.setField("decimalAsFixed", new BigDecimal("5.80")); - File recordsFile = writeRecords(schema, record); + InputFile recordsFile = writeRecords(schema, record); - Metrics metrics = getMetrics(Files.localInput(recordsFile)); + Metrics metrics = getMetrics(recordsFile); Assert.assertEquals(1L, (long) metrics.recordCount()); assertCounts(1, 1L, 0L, metrics); assertBounds(1, DecimalType.of(4, 2), new BigDecimal("2.55"), new BigDecimal("2.55"), metrics); @@ -257,19 +243,19 @@ public void testMetricsForDecimals() throws IOException { @Test public void testMetricsForNestedStructFields() throws IOException { - Record leafStruct = new Record(AvroSchemaUtil.convert(LEAF_STRUCT_TYPE)); - leafStruct.put("leafLongCol", 20L); - leafStruct.put("leafBinaryCol", "A".getBytes()); - Record nestedStruct = new Record(AvroSchemaUtil.convert(NESTED_STRUCT_TYPE)); - nestedStruct.put("longCol", 100L); - nestedStruct.put("leafStructCol", leafStruct); - Record record = new Record(AvroSchemaUtil.convert(NESTED_SCHEMA.asStruct())); - record.put("intCol", Integer.MAX_VALUE); - record.put("nestedStructCol", nestedStruct); + Record leafStruct = GenericRecord.create(LEAF_STRUCT_TYPE); + leafStruct.setField("leafLongCol", 20L); + leafStruct.setField("leafBinaryCol", ByteBuffer.wrap("A".getBytes())); + Record nestedStruct = GenericRecord.create(NESTED_STRUCT_TYPE); + nestedStruct.setField("longCol", 100L); + nestedStruct.setField("leafStructCol", leafStruct); + Record record = GenericRecord.create(NESTED_SCHEMA); + record.setField("intCol", Integer.MAX_VALUE); + record.setField("nestedStructCol", nestedStruct); - File recordsFile = writeRecords(NESTED_SCHEMA, record); + InputFile recordsFile = writeRecords(NESTED_SCHEMA, record); - Metrics metrics = getMetrics(Files.localInput(recordsFile)); + Metrics metrics = getMetrics(recordsFile); Assert.assertEquals(1L, (long) metrics.recordCount()); assertCounts(1, 1L, 0L, metrics); assertBounds(1, IntegerType.get(), Integer.MAX_VALUE, Integer.MAX_VALUE, metrics); @@ -293,18 +279,18 @@ public void testMetricsForListAndMapElements() throws IOException { optional(5, "mapCol", MapType.ofRequired(6, 7, StringType.get(), structType)) ); - Record record = new Record(AvroSchemaUtil.convert(schema.asStruct())); - record.put("intListCol", Lists.newArrayList(10, 11, 12)); - Record struct = new Record(AvroSchemaUtil.convert(structType)); - struct.put("leafIntCol", 1); - struct.put("leafStringCol", "BBB"); + Record record = GenericRecord.create(schema); + record.setField("intListCol", Lists.newArrayList(10, 11, 12)); + Record struct = GenericRecord.create(structType); + struct.setField("leafIntCol", 1); + struct.setField("leafStringCol", "BBB"); Map map = Maps.newHashMap(); map.put("4", struct); - record.put(1, map); + record.set(1, map); - File recordsFile = writeRecords(schema, record); + InputFile recordsFile = writeRecords(schema, record); - Metrics metrics = getMetrics(Files.localInput(recordsFile)); + Metrics metrics = getMetrics(recordsFile); Assert.assertEquals(1L, (long) metrics.recordCount()); assertCounts(1, 1, 0, metrics); assertBounds(1, IntegerType.get(), null, null, metrics); @@ -321,14 +307,14 @@ public void testMetricsForNullColumns() throws IOException { Schema schema = new Schema( optional(1, "intCol", IntegerType.get()) ); - Record firstRecord = new Record(AvroSchemaUtil.convert(schema.asStruct())); - firstRecord.put("intCol", null); - Record secondRecord = new Record(AvroSchemaUtil.convert(schema.asStruct())); - secondRecord.put("intCol", null); + Record firstRecord = GenericRecord.create(schema); + firstRecord.setField("intCol", null); + Record secondRecord = GenericRecord.create(schema); + secondRecord.setField("intCol", null); - File recordsFile = writeRecords(schema, firstRecord, secondRecord); + InputFile recordsFile = writeRecords(schema, firstRecord, secondRecord); - Metrics metrics = getMetrics(Files.localInput(recordsFile)); + Metrics metrics = getMetrics(recordsFile); Assert.assertEquals(2L, (long) metrics.recordCount()); assertCounts(1, 2, 2, metrics); assertBounds(1, IntegerType.get(), null, null, metrics); @@ -336,38 +322,36 @@ public void testMetricsForNullColumns() throws IOException { @Test public void testMetricsForTopLevelWithMultipleRowGroup() throws Exception { + Assume.assumeTrue("Skip test for formats that do not support small row groups", supportsSmallRowGroups()); + int recordCount = 201; - List records = new ArrayList<>(recordCount); + List records = new ArrayList<>(recordCount); for (int i = 0; i < recordCount; i++) { - GenericData.Record newRecord = new GenericData.Record(AvroSchemaUtil.convert(SIMPLE_SCHEMA.asStruct())); - newRecord.put("booleanCol", i == 0 ? false : true); - newRecord.put("intCol", i + 1); - newRecord.put("longCol", i == 0 ? null : i + 1L); - newRecord.put("floatCol", i + 1.0F); - newRecord.put("doubleCol", i == 0 ? null : i + 1.0D); - newRecord.put("decimalCol", i == 0 ? null : new BigDecimal(i + "").add(new BigDecimal("1.00"))); - newRecord.put("stringCol", "AAA"); - newRecord.put("dateCol", i + 1); - newRecord.put("timeCol", i + 1L); - newRecord.put("timestampCol", i + 1L); - newRecord.put("uuidCol", uuid); - newRecord.put("fixedCol", fixed); - newRecord.put("binaryCol", "S".getBytes()); + Record newRecord = GenericRecord.create(SIMPLE_SCHEMA); + newRecord.setField("booleanCol", i == 0 ? false : true); + newRecord.setField("intCol", i + 1); + newRecord.setField("longCol", i == 0 ? null : i + 1L); + newRecord.setField("floatCol", i + 1.0F); + newRecord.setField("doubleCol", i == 0 ? null : i + 1.0D); + newRecord.setField("decimalCol", i == 0 ? null : new BigDecimal(i + "").add(new BigDecimal("1.00"))); + newRecord.setField("stringCol", "AAA"); + newRecord.setField("dateCol", DateTimeUtil.dateFromDays(i + 1)); + newRecord.setField("timeCol", DateTimeUtil.timeFromMicros(i + 1L)); + newRecord.setField("timestampCol", DateTimeUtil.timestampFromMicros(i + 1L)); + newRecord.setField("fixedCol", fixed); + newRecord.setField("binaryCol", ByteBuffer.wrap("S".getBytes())); records.add(newRecord); } // create parquet file with multiple row groups. by using smaller number of bytes - File parquetFile = writeRecords( - SIMPLE_SCHEMA, - ImmutableMap.of(PARQUET_ROW_GROUP_SIZE_BYTES, Integer.toString(ROW_GROUP_SIZE)), - records.toArray(new GenericData.Record[] {})); + InputFile recordsFile = writeRecordsWithSmallRowGroups(SIMPLE_SCHEMA, records.toArray(new Record[0])); - Assert.assertNotNull(parquetFile); + Assert.assertNotNull(recordsFile); // rowgroup size should be > 1 - Assert.assertEquals(3, splitCount(parquetFile)); + Assert.assertEquals(3, splitCount(recordsFile)); - Metrics metrics = getMetrics(localInput(parquetFile)); + Metrics metrics = getMetrics(recordsFile); Assert.assertEquals(201L, (long) metrics.recordCount()); assertCounts(1, 201L, 0L, metrics); assertBounds(1, Types.BooleanType.get(), false, true, metrics); @@ -385,33 +369,32 @@ public void testMetricsForTopLevelWithMultipleRowGroup() throws Exception { @Test public void testMetricsForNestedStructFieldsWithMultipleRowGroup() throws IOException { + Assume.assumeTrue("Skip test for formats that do not support small row groups", supportsSmallRowGroups()); + int recordCount = 201; - List records = new ArrayList(recordCount); + List records = Lists.newArrayListWithExpectedSize(recordCount); for (int i = 0; i < recordCount; i++) { - Record newLeafStruct = new Record(AvroSchemaUtil.convert(LEAF_STRUCT_TYPE)); - newLeafStruct.put("leafLongCol", i + 1L); - newLeafStruct.put("leafBinaryCol", "A".getBytes()); - Record newNestedStruct = new Record(AvroSchemaUtil.convert(NESTED_STRUCT_TYPE)); - newNestedStruct.put("longCol", i + 1L); - newNestedStruct.put("leafStructCol", newLeafStruct); - Record newRecord = new Record(AvroSchemaUtil.convert(NESTED_SCHEMA.asStruct())); - newRecord.put("intCol", i + 1); - newRecord.put("nestedStructCol", newNestedStruct); + Record newLeafStruct = GenericRecord.create(LEAF_STRUCT_TYPE); + newLeafStruct.setField("leafLongCol", i + 1L); + newLeafStruct.setField("leafBinaryCol", ByteBuffer.wrap("A".getBytes())); + Record newNestedStruct = GenericRecord.create(NESTED_STRUCT_TYPE); + newNestedStruct.setField("longCol", i + 1L); + newNestedStruct.setField("leafStructCol", newLeafStruct); + Record newRecord = GenericRecord.create(NESTED_SCHEMA); + newRecord.setField("intCol", i + 1); + newRecord.setField("nestedStructCol", newNestedStruct); records.add(newRecord); } // create parquet file with multiple row groups. by using smaller number of bytes - File parquetFile = writeRecords( - NESTED_SCHEMA, - ImmutableMap.of(PARQUET_ROW_GROUP_SIZE_BYTES, Integer.toString(ROW_GROUP_SIZE)), - records.toArray(new GenericData.Record[] {})); + InputFile recordsFile = writeRecordsWithSmallRowGroups(NESTED_SCHEMA, records.toArray(new Record[0])); - Assert.assertNotNull(parquetFile); + Assert.assertNotNull(recordsFile); // rowgroup size should be > 1 - Assert.assertEquals(3, splitCount(parquetFile)); + Assert.assertEquals(3, splitCount(recordsFile)); - Metrics metrics = getMetrics(localInput(parquetFile)); + Metrics metrics = getMetrics(recordsFile); Assert.assertEquals(201L, (long) metrics.recordCount()); assertCounts(1, 201L, 0L, metrics); assertBounds(1, IntegerType.get(), 1, 201, metrics); diff --git a/data/src/test/java/org/apache/iceberg/parquet/TestParquetMetrics.java b/data/src/test/java/org/apache/iceberg/parquet/TestParquetMetrics.java new file mode 100644 index 000000000000..3070a7d96cff --- /dev/null +++ b/data/src/test/java/org/apache/iceberg/parquet/TestParquetMetrics.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.parquet; + +import java.io.File; +import java.io.IOException; +import java.util.Map; +import java.util.UUID; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Files; +import org.apache.iceberg.Metrics; +import org.apache.iceberg.Schema; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.TestMetrics; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.data.parquet.GenericParquetWriter; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.junit.Rule; +import org.junit.rules.TemporaryFolder; + +/** + * Test Metrics for Parquet. + */ +public class TestParquetMetrics extends TestMetrics { + private static final Map SMALL_ROW_GROUP_CONFIG = ImmutableMap.of( + TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES, "1600"); + + @Rule + public TemporaryFolder temp = new TemporaryFolder(); + + @Override + public Metrics getMetrics(InputFile file) { + return ParquetUtil.fileMetrics(file); + } + + @Override + public InputFile writeRecords(Schema schema, Record... records) throws IOException { + return writeRecords(schema, ImmutableMap.of(), records); + } + + @Override + public InputFile writeRecordsWithSmallRowGroups(Schema schema, Record... records) throws IOException { + return writeRecords(schema, SMALL_ROW_GROUP_CONFIG, records); + } + + private InputFile writeRecords(Schema schema, Map properties, Record... records) throws IOException { + File tmpFolder = temp.newFolder("parquet"); + String filename = UUID.randomUUID().toString(); + OutputFile file = Files.localOutput(new File(tmpFolder, FileFormat.PARQUET.addExtension(filename))); + try (FileAppender writer = Parquet.write(file) + .schema(schema) + .setAll(properties) + .createWriterFunc(GenericParquetWriter::buildWriter) + .build()) { + writer.addAll(Lists.newArrayList(records)); + } + return file.toInputFile(); + } + + @Override + public int splitCount(InputFile parquetFile) throws IOException { + try (ParquetFileReader reader = ParquetFileReader.open(ParquetIO.file(parquetFile))) { + return reader.getRowGroups().size(); + } + } + + @Override + public boolean supportsSmallRowGroups() { + return true; + } +} diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetMetrics.java b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetMetrics.java deleted file mode 100644 index 38b7d9a3911c..000000000000 --- a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetMetrics.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iceberg.parquet; - -import java.io.File; -import java.io.IOException; -import java.util.Map; -import org.apache.avro.generic.GenericData; -import org.apache.iceberg.Metrics; -import org.apache.iceberg.Schema; -import org.apache.iceberg.TestMetrics; -import org.apache.iceberg.io.InputFile; -import org.apache.parquet.hadoop.ParquetFileReader; -import org.junit.Rule; -import org.junit.rules.TemporaryFolder; - -import static org.apache.iceberg.Files.localInput; - -/** - * Test Metrics for Parquet. - */ -public class TestParquetMetrics extends TestMetrics { - - @Rule - public TemporaryFolder temp = new TemporaryFolder(); - - @Override - public Metrics getMetrics(InputFile file) { - return ParquetUtil.fileMetrics(file); - } - - @Override - public File writeRecords(Schema schema, GenericData.Record... records) throws IOException { - return ParquetWritingTestUtils.writeRecords(temp, schema, records); - } - - @Override - public File writeRecords(Schema schema, Map properties, GenericData.Record... records) - throws IOException { - return ParquetWritingTestUtils.writeRecords(temp, schema, properties, records); - } - - @Override - public int splitCount(File parquetFile) throws IOException { - try (ParquetFileReader reader = ParquetFileReader.open(ParquetIO.file(localInput(parquetFile)))) { - return reader.getRowGroups().size(); - } - } -} From ee89bd324c4cbe68a6553188569dc1fd38880fa5 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Tue, 26 May 2020 17:23:21 -0700 Subject: [PATCH 3/3] Fix checkstyle. --- .../org/apache/iceberg/parquet/TestParquetMetrics.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/data/src/test/java/org/apache/iceberg/parquet/TestParquetMetrics.java b/data/src/test/java/org/apache/iceberg/parquet/TestParquetMetrics.java index 3070a7d96cff..a4c28cbf3f8c 100644 --- a/data/src/test/java/org/apache/iceberg/parquet/TestParquetMetrics.java +++ b/data/src/test/java/org/apache/iceberg/parquet/TestParquetMetrics.java @@ -56,13 +56,13 @@ public Metrics getMetrics(InputFile file) { } @Override - public InputFile writeRecords(Schema schema, Record... records) throws IOException { - return writeRecords(schema, ImmutableMap.of(), records); + public InputFile writeRecordsWithSmallRowGroups(Schema schema, Record... records) throws IOException { + return writeRecords(schema, SMALL_ROW_GROUP_CONFIG, records); } @Override - public InputFile writeRecordsWithSmallRowGroups(Schema schema, Record... records) throws IOException { - return writeRecords(schema, SMALL_ROW_GROUP_CONFIG, records); + public InputFile writeRecords(Schema schema, Record... records) throws IOException { + return writeRecords(schema, ImmutableMap.of(), records); } private InputFile writeRecords(Schema schema, Map properties, Record... records) throws IOException {