diff --git a/data/src/main/java/org/apache/iceberg/data/GenericAppenderFactory.java b/data/src/main/java/org/apache/iceberg/data/GenericAppenderFactory.java new file mode 100644 index 000000000000..e5d2652bcb11 --- /dev/null +++ b/data/src/main/java/org/apache/iceberg/data/GenericAppenderFactory.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.data; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Map; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.MetricsConfig; +import org.apache.iceberg.Schema; +import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.data.avro.DataWriter; +import org.apache.iceberg.data.orc.GenericOrcWriter; +import org.apache.iceberg.data.parquet.GenericParquetWriter; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.io.FileAppenderFactory; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.orc.ORC; +import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; + +/** + * Factory to create a new {@link FileAppender} to write {@link Record}s. + */ +public class GenericAppenderFactory implements FileAppenderFactory { + + private final Schema schema; + private final Map config = Maps.newHashMap(); + + public GenericAppenderFactory(Schema schema) { + this.schema = schema; + } + + public GenericAppenderFactory set(String property, String value) { + config.put(property, value); + return this; + } + + public GenericAppenderFactory setAll(Map properties) { + config.putAll(properties); + return this; + } + + @Override + public FileAppender newAppender(OutputFile outputFile, FileFormat fileFormat) { + MetricsConfig metricsConfig = MetricsConfig.fromProperties(config); + try { + switch (fileFormat) { + case AVRO: + return Avro.write(outputFile) + .schema(schema) + .createWriterFunc(DataWriter::create) + .setAll(config) + .overwrite() + .build(); + + case PARQUET: + return Parquet.write(outputFile) + .schema(schema) + .createWriterFunc(GenericParquetWriter::buildWriter) + .setAll(config) + .metricsConfig(metricsConfig) + .overwrite() + .build(); + + case ORC: + return ORC.write(outputFile) + .schema(schema) + .createWriterFunc(GenericOrcWriter::buildWriter) + .setAll(config) + .overwrite() + .build(); + + default: + throw new UnsupportedOperationException("Cannot write format: " + fileFormat); + } + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } +} diff --git a/data/src/test/java/org/apache/iceberg/TestSplitScan.java b/data/src/test/java/org/apache/iceberg/TestSplitScan.java index b0d870a3e65a..a1c6410252f3 100644 --- a/data/src/test/java/org/apache/iceberg/TestSplitScan.java +++ b/data/src/test/java/org/apache/iceberg/TestSplitScan.java @@ -24,15 +24,12 @@ import java.util.List; import java.util.Locale; import org.apache.hadoop.conf.Configuration; -import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.data.GenericAppenderFactory; import org.apache.iceberg.data.IcebergGenerics; import org.apache.iceberg.data.RandomGenericData; import org.apache.iceberg.data.Record; -import org.apache.iceberg.data.avro.DataWriter; -import org.apache.iceberg.data.parquet.GenericParquetWriter; import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.io.FileAppender; -import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Types; import org.junit.Assert; @@ -122,28 +119,10 @@ private File writeToFile(List records, FileFormat fileFormat) throws IOE File file = temp.newFile(); Assert.assertTrue(file.delete()); - switch (fileFormat) { - case AVRO: - try (FileAppender appender = Avro.write(Files.localOutput(file)) - .schema(SCHEMA) - .createWriterFunc(DataWriter::create) - .named(fileFormat.name()) - .build()) { - appender.addAll(records); - } - break; - case PARQUET: - try (FileAppender appender = Parquet.write(Files.localOutput(file)) - .schema(SCHEMA) - .createWriterFunc(GenericParquetWriter::buildWriter) - .named(fileFormat.name()) - .set(TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES, String.valueOf(SPLIT_SIZE)) - .build()) { - appender.addAll(records); - } - break; - default: - throw new UnsupportedOperationException("Cannot write format: " + fileFormat); + GenericAppenderFactory factory = new GenericAppenderFactory(SCHEMA).set( + TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES, String.valueOf(SPLIT_SIZE)); + try (FileAppender appender = factory.newAppender(Files.localOutput(file), fileFormat)) { + appender.addAll(records); } return file; } diff --git a/data/src/test/java/org/apache/iceberg/data/GenericAppenderHelper.java b/data/src/test/java/org/apache/iceberg/data/GenericAppenderHelper.java new file mode 100644 index 000000000000..c32be08263a9 --- /dev/null +++ b/data/src/test/java/org/apache/iceberg/data/GenericAppenderHelper.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.data; + +import java.io.File; +import java.io.IOException; +import java.util.List; +import org.apache.iceberg.AppendFiles; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Files; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.junit.Assert; +import org.junit.rules.TemporaryFolder; + +/** + * Helper for appending {@link DataFile} to a table or appending {@link Record}s to a table. + */ +public class GenericAppenderHelper { + + private final Table table; + private final FileFormat fileFormat; + private final TemporaryFolder tmp; + + public GenericAppenderHelper(Table table, FileFormat fileFormat, TemporaryFolder tmp) { + this.table = table; + this.fileFormat = fileFormat; + this.tmp = tmp; + } + + public void appendToTable(DataFile... dataFiles) { + Preconditions.checkNotNull(table, "table not set"); + + AppendFiles append = table.newAppend(); + + for (DataFile dataFile : dataFiles) { + append = append.appendFile(dataFile); + } + + append.commit(); + } + + public void appendToTable(List records) throws IOException { + appendToTable(null, records); + } + + public void appendToTable(StructLike partition, List records) throws IOException { + appendToTable(writeFile(partition, records)); + } + + public DataFile writeFile(StructLike partition, List records) throws IOException { + Preconditions.checkNotNull(table, "table not set"); + File file = tmp.newFile(); + Assert.assertTrue(file.delete()); + return appendToLocalFile(table, file, fileFormat, partition, records); + } + + private static DataFile appendToLocalFile( + Table table, File file, FileFormat format, StructLike partition, List records) + throws IOException { + FileAppender appender = new GenericAppenderFactory(table.schema()).newAppender( + Files.localOutput(file), format); + try (FileAppender fileAppender = appender) { + fileAppender.addAll(records); + } + + return DataFiles.builder(table.spec()) + .withRecordCount(records.size()) + .withFileSizeInBytes(file.length()) + .withPath(file.toURI().toString()) + .withMetrics(appender.metrics()) + .withFormat(format) + .withPartition(partition) + .build(); + } +} diff --git a/data/src/test/java/org/apache/iceberg/data/TestLocalScan.java b/data/src/test/java/org/apache/iceberg/data/TestLocalScan.java index 5acd2a32ea44..b3a5323c6aab 100644 --- a/data/src/test/java/org/apache/iceberg/data/TestLocalScan.java +++ b/data/src/test/java/org/apache/iceberg/data/TestLocalScan.java @@ -39,16 +39,10 @@ import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.Tables; -import org.apache.iceberg.avro.Avro; -import org.apache.iceberg.data.avro.DataWriter; -import org.apache.iceberg.data.orc.GenericOrcWriter; -import org.apache.iceberg.data.parquet.GenericParquetWriter; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.hadoop.HadoopInputFile; import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.io.FileAppender; -import org.apache.iceberg.orc.ORC; -import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; @@ -458,59 +452,17 @@ private DataFile writeFile(String location, String filename, Schema schema, List Path path = new Path(location, filename); FileFormat fileFormat = FileFormat.fromFileName(filename); Preconditions.checkNotNull(fileFormat, "Cannot determine format for file: %s", filename); - switch (fileFormat) { - case AVRO: - FileAppender avroAppender = Avro.write(fromPath(path, CONF)) - .schema(schema) - .createWriterFunc(DataWriter::create) - .named(fileFormat.name()) - .build(); - try { - avroAppender.addAll(records); - } finally { - avroAppender.close(); - } - - return DataFiles.builder(PartitionSpec.unpartitioned()) - .withInputFile(HadoopInputFile.fromPath(path, CONF)) - .withMetrics(avroAppender.metrics()) - .build(); - - case PARQUET: - FileAppender parquetAppender = Parquet.write(fromPath(path, CONF)) - .schema(schema) - .createWriterFunc(GenericParquetWriter::buildWriter) - .build(); - try { - parquetAppender.addAll(records); - } finally { - parquetAppender.close(); - } - - return DataFiles.builder(PartitionSpec.unpartitioned()) - .withInputFile(HadoopInputFile.fromPath(path, CONF)) - .withMetrics(parquetAppender.metrics()) - .build(); - - case ORC: - FileAppender orcAppender = ORC.write(fromPath(path, CONF)) - .schema(schema) - .createWriterFunc(GenericOrcWriter::buildWriter) - .build(); - try { - orcAppender.addAll(records); - } finally { - orcAppender.close(); - } - - return DataFiles.builder(PartitionSpec.unpartitioned()) - .withInputFile(HadoopInputFile.fromPath(path, CONF)) - .withMetrics(orcAppender.metrics()) - .build(); - - default: - throw new UnsupportedOperationException("Cannot write format: " + fileFormat); + + FileAppender fileAppender = new GenericAppenderFactory(schema).newAppender( + fromPath(path, CONF), fileFormat); + try (FileAppender appender = fileAppender) { + appender.addAll(records); } + + return DataFiles.builder(PartitionSpec.unpartitioned()) + .withInputFile(HadoopInputFile.fromPath(path, CONF)) + .withMetrics(fileAppender.metrics()) + .build(); } @Test diff --git a/mr/src/test/java/org/apache/iceberg/mr/TestHelper.java b/mr/src/test/java/org/apache/iceberg/mr/TestHelper.java index 210142f3336d..91b814b8aa13 100644 --- a/mr/src/test/java/org/apache/iceberg/mr/TestHelper.java +++ b/mr/src/test/java/org/apache/iceberg/mr/TestHelper.java @@ -19,18 +19,14 @@ package org.apache.iceberg.mr; -import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; import org.apache.hadoop.conf.Configuration; -import org.apache.iceberg.AppendFiles; import org.apache.iceberg.DataFile; -import org.apache.iceberg.DataFiles; import org.apache.iceberg.FileFormat; -import org.apache.iceberg.Files; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.SchemaParser; @@ -38,23 +34,15 @@ import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.Tables; -import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.data.GenericAppenderHelper; import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.RandomGenericData; import org.apache.iceberg.data.Record; -import org.apache.iceberg.data.avro.DataWriter; -import org.apache.iceberg.data.orc.GenericOrcWriter; -import org.apache.iceberg.data.parquet.GenericParquetWriter; -import org.apache.iceberg.io.FileAppender; -import org.apache.iceberg.orc.ORC; -import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; -import org.junit.Assert; import org.junit.rules.TemporaryFolder; public class TestHelper { - private final Configuration conf; private final Tables tables; private final String tableIdentifier; @@ -114,77 +102,19 @@ public static List generateRandomRecords(Schema schema, int num, long se } public void appendToTable(DataFile... dataFiles) { - Preconditions.checkNotNull(table, "table not set"); - - AppendFiles append = table.newAppend(); - - for (DataFile dataFile : dataFiles) { - append = append.appendFile(dataFile); - } - - append.commit(); + appender().appendToTable(dataFiles); } public void appendToTable(StructLike partition, List records) throws IOException { - appendToTable(writeFile(partition, records)); + appender().appendToTable(partition, records); } public DataFile writeFile(StructLike partition, List records) throws IOException { - Preconditions.checkNotNull(table, "table not set"); - return writeFile(table, partition, records, fileFormat, tmp.newFile()); + return appender().writeFile(partition, records); } - public static DataFile writeFile(Table table, StructLike partition, List records, FileFormat fileFormat, - File file) throws IOException { - Assert.assertTrue(file.delete()); - - FileAppender appender; - - switch (fileFormat) { - case AVRO: - appender = Avro.write(Files.localOutput(file)) - .schema(table.schema()) - .createWriterFunc(DataWriter::create) - .named(fileFormat.name()) - .build(); - break; - - case PARQUET: - appender = Parquet.write(Files.localOutput(file)) - .schema(table.schema()) - .createWriterFunc(GenericParquetWriter::buildWriter) - .named(fileFormat.name()) - .build(); - break; - - case ORC: - appender = ORC.write(Files.localOutput(file)) - .schema(table.schema()) - .createWriterFunc(GenericOrcWriter::buildWriter) - .build(); - break; - - default: - throw new UnsupportedOperationException("Cannot write format: " + fileFormat); - } - - try { - appender.addAll(records); - } finally { - appender.close(); - } - - DataFiles.Builder builder = DataFiles.builder(table.spec()) - .withPath(file.toString()) - .withFormat(fileFormat) - .withFileSizeInBytes(file.length()) - .withMetrics(appender.metrics()); - - if (partition != null) { - builder.withPartition(partition); - } - - return builder.build(); + private GenericAppenderHelper appender() { + return new GenericAppenderHelper(table, fileFormat, tmp); } public static class RecordsBuilder { diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReadProjection.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReadProjection.java index ac64fa952c71..d88daef1c23f 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReadProjection.java +++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReadProjection.java @@ -31,14 +31,9 @@ import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; -import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.data.GenericAppenderFactory; import org.apache.iceberg.data.Record; -import org.apache.iceberg.data.avro.DataWriter; -import org.apache.iceberg.data.orc.GenericOrcWriter; -import org.apache.iceberg.data.parquet.GenericParquetWriter; import org.apache.iceberg.io.FileAppender; -import org.apache.iceberg.orc.ORC; -import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.spark.SparkValueConverter; @@ -111,33 +106,9 @@ protected Record writeAndRead(String desc, Schema writeSchema, Schema readSchema // When tables are created, the column ids are reassigned. Schema tableSchema = table.schema(); - switch (format) { - case AVRO: - try (FileAppender writer = Avro.write(localOutput(testFile)) - .createWriterFunc(DataWriter::create) - .schema(tableSchema) - .build()) { - writer.add(record); - } - break; - - case PARQUET: - try (FileAppender writer = Parquet.write(localOutput(testFile)) - .createWriterFunc(GenericParquetWriter::buildWriter) - .schema(tableSchema) - .build()) { - writer.add(record); - } - break; - - case ORC: - try (FileAppender writer = ORC.write(localOutput(testFile)) - .createWriterFunc(GenericOrcWriter::buildWriter) - .schema(tableSchema) - .build()) { - writer.add(record); - } - break; + try (FileAppender writer = new GenericAppenderFactory(tableSchema).newAppender( + localOutput(testFile), format)) { + writer.add(record); } DataFile file = DataFiles.builder(PartitionSpec.unpartitioned()) diff --git a/spark2/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java b/spark2/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java index 0d45179b315c..7fc7a98fd077 100644 --- a/spark2/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java +++ b/spark2/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java @@ -33,17 +33,12 @@ import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; -import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.data.GenericAppenderFactory; import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.Record; -import org.apache.iceberg.data.avro.DataWriter; -import org.apache.iceberg.data.orc.GenericOrcWriter; -import org.apache.iceberg.data.parquet.GenericParquetWriter; import org.apache.iceberg.expressions.Literal; import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.io.FileAppender; -import org.apache.iceberg.orc.ORC; -import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.data.GenericsHelpers; @@ -191,33 +186,9 @@ public void writeUnpartitionedTable() throws IOException { // create records using the table's schema this.records = testRecords(tableSchema); - switch (fileFormat) { - case AVRO: - try (FileAppender writer = Avro.write(localOutput(testFile)) - .createWriterFunc(DataWriter::create) - .schema(tableSchema) - .build()) { - writer.addAll(records); - } - break; - - case PARQUET: - try (FileAppender writer = Parquet.write(localOutput(testFile)) - .createWriterFunc(GenericParquetWriter::buildWriter) - .schema(tableSchema) - .build()) { - writer.addAll(records); - } - break; - - case ORC: - try (FileAppender writer = ORC.write(localOutput(testFile)) - .createWriterFunc(GenericOrcWriter::buildWriter) - .schema(tableSchema) - .build()) { - writer.addAll(records); - } - break; + try (FileAppender writer = new GenericAppenderFactory(tableSchema).newAppender( + localOutput(testFile), fileFormat)) { + writer.addAll(records); } DataFile file = DataFiles.builder(PartitionSpec.unpartitioned()) diff --git a/spark3/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java b/spark3/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java index 9be99383873f..1166f32c08ac 100644 --- a/spark3/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java +++ b/spark3/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java @@ -33,16 +33,11 @@ import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; -import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.data.GenericAppenderFactory; import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.Record; -import org.apache.iceberg.data.avro.DataWriter; -import org.apache.iceberg.data.orc.GenericOrcWriter; -import org.apache.iceberg.data.parquet.GenericParquetWriter; import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.io.FileAppender; -import org.apache.iceberg.orc.ORC; -import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.data.GenericsHelpers; @@ -188,33 +183,9 @@ public void writeUnpartitionedTable() throws IOException { this.records = testRecords(tableSchema); - switch (fileFormat) { - case AVRO: - try (FileAppender writer = Avro.write(localOutput(testFile)) - .createWriterFunc(DataWriter::create) - .schema(tableSchema) - .build()) { - writer.addAll(records); - } - break; - - case PARQUET: - try (FileAppender writer = Parquet.write(localOutput(testFile)) - .createWriterFunc(GenericParquetWriter::buildWriter) - .schema(tableSchema) - .build()) { - writer.addAll(records); - } - break; - - case ORC: - try (FileAppender writer = ORC.write(localOutput(testFile)) - .createWriterFunc(GenericOrcWriter::buildWriter) - .schema(tableSchema) - .build()) { - writer.addAll(records); - } - break; + try (FileAppender writer = new GenericAppenderFactory(tableSchema).newAppender( + localOutput(testFile), fileFormat)) { + writer.addAll(records); } DataFile file = DataFiles.builder(PartitionSpec.unpartitioned())