diff --git a/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java b/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java new file mode 100644 index 000000000000..8c855efe52ea --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.io.Closeable; +import java.io.IOException; +import java.util.List; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Metrics; +import org.apache.iceberg.PartitionKey; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.util.Tasks; + +public abstract class BaseTaskWriter implements TaskWriter { + private final List completedFiles = Lists.newArrayList(); + private final PartitionSpec spec; + private final FileFormat format; + private final FileAppenderFactory appenderFactory; + private final OutputFileFactory fileFactory; + private final FileIO io; + private final long targetFileSize; + + protected BaseTaskWriter(PartitionSpec spec, FileFormat format, FileAppenderFactory appenderFactory, + OutputFileFactory fileFactory, FileIO io, long targetFileSize) { + this.spec = spec; + this.format = format; + this.appenderFactory = appenderFactory; + this.fileFactory = fileFactory; + this.io = io; + this.targetFileSize = targetFileSize; + } + + @Override + public void abort() throws IOException { + close(); + + // clean up files created by this writer + Tasks.foreach(completedFiles) + .throwFailureWhenFinished() + .noRetry() + .run(file -> io.deleteFile(file.path().toString())); + } + + @Override + public DataFile[] complete() throws IOException { + close(); + + return completedFiles.toArray(new DataFile[0]); + } + + protected class RollingFileWriter implements Closeable { + private static final int ROWS_DIVISOR = 1000; + private final PartitionKey partitionKey; + + private EncryptedOutputFile currentFile = null; + private FileAppender currentAppender = null; + private long currentRows = 0; + + public RollingFileWriter(PartitionKey partitionKey) { + this.partitionKey = partitionKey; + openCurrent(); + } + + public void add(T record) throws IOException { + this.currentAppender.add(record); + this.currentRows++; + + if (shouldRollToNewFile()) { + closeCurrent(); + openCurrent(); + } + } + + private void openCurrent() { + if (partitionKey == null) { + // unpartitioned + currentFile = fileFactory.newOutputFile(); + } else { + // partitioned + currentFile = fileFactory.newOutputFile(partitionKey); + } + currentAppender = appenderFactory.newAppender(currentFile.encryptingOutputFile(), format); + currentRows = 0; + } + + private boolean shouldRollToNewFile() { + //TODO: ORC file now not support target file size before closed + return !format.equals(FileFormat.ORC) && + currentRows % ROWS_DIVISOR == 0 && currentAppender.length() >= targetFileSize; + } + + private void closeCurrent() throws IOException { + if (currentAppender != null) { + currentAppender.close(); + // metrics are only valid after the appender is closed + Metrics metrics = currentAppender.metrics(); + long fileSizeInBytes = currentAppender.length(); + List splitOffsets = currentAppender.splitOffsets(); + this.currentAppender = null; + + if (metrics.recordCount() == 0L) { + io.deleteFile(currentFile.encryptingOutputFile()); + } else { + DataFile dataFile = DataFiles.builder(spec) + .withEncryptionKeyMetadata(currentFile.keyMetadata()) + .withPath(currentFile.encryptingOutputFile().location()) + .withFileSizeInBytes(fileSizeInBytes) + .withPartition(spec.fields().size() == 0 ? null : partitionKey) // set null if unpartitioned + .withMetrics(metrics) + .withSplitOffsets(splitOffsets) + .build(); + completedFiles.add(dataFile); + } + + this.currentFile = null; + this.currentRows = 0; + } + } + + @Override + public void close() throws IOException { + closeCurrent(); + } + } +} diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/TaskResult.java b/core/src/main/java/org/apache/iceberg/io/FileAppenderFactory.java similarity index 59% rename from spark/src/main/java/org/apache/iceberg/spark/source/TaskResult.java rename to core/src/main/java/org/apache/iceberg/io/FileAppenderFactory.java index 1a0b8fcf0caa..9afdca460f0a 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/source/TaskResult.java +++ b/core/src/main/java/org/apache/iceberg/io/FileAppenderFactory.java @@ -17,32 +17,23 @@ * under the License. */ -package org.apache.iceberg.spark.source; +package org.apache.iceberg.io; -import java.io.Serializable; -import java.util.List; -import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileFormat; -class TaskResult implements Serializable { - private final DataFile[] files; - - TaskResult() { - this.files = new DataFile[0]; - } - - TaskResult(DataFile file) { - this.files = new DataFile[] { file }; - } - - TaskResult(List files) { - this.files = files.toArray(new DataFile[files.size()]); - } - - TaskResult(DataFile[] files) { - this.files = files; - } - - DataFile[] files() { - return files; - } +/** + * Factory to create a new {@link FileAppender} to write records. + * + * @param data type of the rows to append. + */ +public interface FileAppenderFactory { + + /** + * Create a new {@link FileAppender}. + * + * @param outputFile an OutputFile used to create an output stream. + * @param fileFormat File format. + * @return a newly created {@link FileAppender} + */ + FileAppender newAppender(OutputFile outputFile, FileFormat fileFormat); } diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/OutputFileFactory.java b/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java similarity index 87% rename from spark/src/main/java/org/apache/iceberg/spark/source/OutputFileFactory.java rename to core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java index 08e66df79362..e68f4c5ad2ec 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/source/OutputFileFactory.java +++ b/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.iceberg.spark.source; +package org.apache.iceberg.io; import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; @@ -26,11 +26,8 @@ import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.encryption.EncryptedOutputFile; import org.apache.iceberg.encryption.EncryptionManager; -import org.apache.iceberg.io.FileIO; -import org.apache.iceberg.io.LocationProvider; -import org.apache.iceberg.io.OutputFile; -class OutputFileFactory { +public class OutputFileFactory { private final PartitionSpec spec; private final FileFormat format; private final LocationProvider locations; @@ -44,8 +41,8 @@ class OutputFileFactory { private final String uuid = UUID.randomUUID().toString(); private final AtomicInteger fileCount = new AtomicInteger(0); - OutputFileFactory(PartitionSpec spec, FileFormat format, LocationProvider locations, FileIO io, - EncryptionManager encryptionManager, int partitionId, long taskId) { + public OutputFileFactory(PartitionSpec spec, FileFormat format, LocationProvider locations, FileIO io, + EncryptionManager encryptionManager, int partitionId, long taskId) { this.spec = spec; this.format = format; this.locations = locations; diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/PartitionedWriter.java b/core/src/main/java/org/apache/iceberg/io/PartitionedWriter.java similarity index 61% rename from spark/src/main/java/org/apache/iceberg/spark/source/PartitionedWriter.java rename to core/src/main/java/org/apache/iceberg/io/PartitionedWriter.java index 0ead766f38c7..7b5fa90e4502 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/source/PartitionedWriter.java +++ b/core/src/main/java/org/apache/iceberg/io/PartitionedWriter.java @@ -17,44 +17,50 @@ * under the License. */ -package org.apache.iceberg.spark.source; +package org.apache.iceberg.io; import java.io.IOException; import java.util.Set; import org.apache.iceberg.FileFormat; import org.apache.iceberg.PartitionKey; import org.apache.iceberg.PartitionSpec; -import org.apache.iceberg.Schema; -import org.apache.iceberg.io.FileIO; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Sets; -import org.apache.iceberg.spark.SparkSchemaUtil; -import org.apache.spark.sql.catalyst.InternalRow; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -class PartitionedWriter extends BaseWriter { +public abstract class PartitionedWriter extends BaseTaskWriter { private static final Logger LOG = LoggerFactory.getLogger(PartitionedWriter.class); - private final PartitionKey key; - private final InternalRowWrapper wrapper; private final Set completedPartitions = Sets.newHashSet(); - PartitionedWriter(PartitionSpec spec, FileFormat format, SparkAppenderFactory appenderFactory, - OutputFileFactory fileFactory, FileIO io, long targetFileSize, Schema writeSchema) { + private PartitionKey currentKey = null; + private RollingFileWriter currentWriter = null; + + public PartitionedWriter(PartitionSpec spec, FileFormat format, FileAppenderFactory appenderFactory, + OutputFileFactory fileFactory, FileIO io, long targetFileSize) { super(spec, format, appenderFactory, fileFactory, io, targetFileSize); - this.key = new PartitionKey(spec, writeSchema); - this.wrapper = new InternalRowWrapper(SparkSchemaUtil.convert(writeSchema)); } + /** + * Create a PartitionKey from the values in row. + *

+ * Any PartitionKey returned by this method can be reused by the implementation. + * + * @param row a data row + */ + protected abstract PartitionKey partition(T row); + @Override - public void write(InternalRow row) throws IOException { - key.partition(wrapper.wrap(row)); + public void write(T row) throws IOException { + PartitionKey key = partition(row); - PartitionKey currentKey = getCurrentKey(); if (!key.equals(currentKey)) { - closeCurrent(); - completedPartitions.add(currentKey); + if (currentKey != null) { + // if the key is null, there was no previous current key and current writer. + currentWriter.close(); + completedPartitions.add(currentKey); + } if (completedPartitions.contains(key)) { // if rows are not correctly grouped, detect and fail the write @@ -63,10 +69,17 @@ public void write(InternalRow row) throws IOException { throw new IllegalStateException("Already closed files for partition: " + key.toPath()); } - setCurrentKey(key.copy()); - openCurrent(); + currentKey = key.copy(); + currentWriter = new RollingFileWriter(currentKey); } - writeInternal(row); + currentWriter.add(row); + } + + @Override + public void close() throws IOException { + if (currentWriter != null) { + currentWriter.close(); + } } } diff --git a/core/src/main/java/org/apache/iceberg/io/TaskWriter.java b/core/src/main/java/org/apache/iceberg/io/TaskWriter.java new file mode 100644 index 000000000000..806e37de1bee --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/TaskWriter.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.io.Closeable; +import java.io.IOException; +import org.apache.iceberg.DataFile; + +/** + * The writer interface could accept records and provide the generated data files. + * + * @param to indicate the record data type. + */ +public interface TaskWriter extends Closeable { + + /** + * Write the row into the data files. + */ + void write(T row) throws IOException; + + /** + * Close the writer and delete the completed files if possible when aborting. + * + * @throws IOException if any IO error happen. + */ + void abort() throws IOException; + + /** + * Close the writer and get the completed data files. + * + * @return the completed data files of this task writer. + */ + DataFile[] complete() throws IOException; +} diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/UnpartitionedWriter.java b/core/src/main/java/org/apache/iceberg/io/UnpartitionedWriter.java similarity index 64% rename from spark/src/main/java/org/apache/iceberg/spark/source/UnpartitionedWriter.java rename to core/src/main/java/org/apache/iceberg/io/UnpartitionedWriter.java index 5692b6afe24d..37f3db49aef4 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/source/UnpartitionedWriter.java +++ b/core/src/main/java/org/apache/iceberg/io/UnpartitionedWriter.java @@ -17,24 +17,29 @@ * under the License. */ -package org.apache.iceberg.spark.source; +package org.apache.iceberg.io; import java.io.IOException; import org.apache.iceberg.FileFormat; import org.apache.iceberg.PartitionSpec; -import org.apache.iceberg.io.FileIO; -import org.apache.spark.sql.catalyst.InternalRow; -class UnpartitionedWriter extends BaseWriter { - UnpartitionedWriter(PartitionSpec spec, FileFormat format, SparkAppenderFactory appenderFactory, - OutputFileFactory fileFactory, FileIO io, long targetFileSize) { +public class UnpartitionedWriter extends BaseTaskWriter { + + private final RollingFileWriter currentWriter; + + public UnpartitionedWriter(PartitionSpec spec, FileFormat format, FileAppenderFactory appenderFactory, + OutputFileFactory fileFactory, FileIO io, long targetFileSize) { super(spec, format, appenderFactory, fileFactory, io, targetFileSize); + currentWriter = new RollingFileWriter(null); + } - openCurrent(); + @Override + public void write(T record) throws IOException { + currentWriter.add(record); } @Override - public void write(InternalRow row) throws IOException { - writeInternal(row); + public void close() throws IOException { + currentWriter.close(); } } diff --git a/flink/src/main/java/org/apache/iceberg/flink/PartitionedFanoutWriter.java b/flink/src/main/java/org/apache/iceberg/flink/PartitionedFanoutWriter.java new file mode 100644 index 000000000000..78c29c5d112b --- /dev/null +++ b/flink/src/main/java/org/apache/iceberg/flink/PartitionedFanoutWriter.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink; + +import java.io.IOException; +import java.util.Map; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionKey; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.io.BaseTaskWriter; +import org.apache.iceberg.io.FileAppenderFactory; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.OutputFileFactory; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; + +abstract class PartitionedFanoutWriter extends BaseTaskWriter { + private final Map writers = Maps.newHashMap(); + + PartitionedFanoutWriter(PartitionSpec spec, FileFormat format, FileAppenderFactory appenderFactory, + OutputFileFactory fileFactory, FileIO io, long targetFileSize) { + super(spec, format, appenderFactory, fileFactory, io, targetFileSize); + } + + /** + * Create a PartitionKey from the values in row. + *

+ * Any PartitionKey returned by this method can be reused by the implementation. + * + * @param row a data row + */ + protected abstract PartitionKey partition(T row); + + @Override + public void write(T row) throws IOException { + PartitionKey partitionKey = partition(row); + + RollingFileWriter writer = writers.get(partitionKey); + if (writer == null) { + // NOTICE: we need to copy a new partition key here, in case of messing up the keys in writers. + PartitionKey copiedKey = partitionKey.copy(); + writer = new RollingFileWriter(copiedKey); + writers.put(copiedKey, writer); + } + + writer.add(row); + } + + @Override + public void close() throws IOException { + if (!writers.isEmpty()) { + for (PartitionKey key : writers.keySet()) { + writers.get(key).close(); + } + writers.clear(); + } + } +} diff --git a/flink/src/main/java/org/apache/iceberg/flink/TaskWriterFactory.java b/flink/src/main/java/org/apache/iceberg/flink/TaskWriterFactory.java new file mode 100644 index 000000000000..2bd6e24977f9 --- /dev/null +++ b/flink/src/main/java/org/apache/iceberg/flink/TaskWriterFactory.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Map; +import org.apache.flink.types.Row; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.MetricsConfig; +import org.apache.iceberg.PartitionKey; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.flink.data.FlinkAvroWriter; +import org.apache.iceberg.flink.data.FlinkParquetWriters; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.io.FileAppenderFactory; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.io.OutputFileFactory; +import org.apache.iceberg.io.TaskWriter; +import org.apache.iceberg.io.UnpartitionedWriter; +import org.apache.iceberg.parquet.Parquet; + +class TaskWriterFactory { + private TaskWriterFactory() { + } + + static TaskWriter createTaskWriter(Schema schema, + PartitionSpec spec, + FileFormat format, + FileAppenderFactory appenderFactory, + OutputFileFactory fileFactory, + FileIO io, + long targetFileSizeBytes) { + if (spec.fields().isEmpty()) { + return new UnpartitionedWriter<>(spec, format, appenderFactory, fileFactory, io, targetFileSizeBytes); + } else { + return new RowPartitionedFanoutWriter(spec, format, appenderFactory, fileFactory, + io, targetFileSizeBytes, schema); + } + } + + private static class RowPartitionedFanoutWriter extends PartitionedFanoutWriter { + + private final PartitionKey partitionKey; + private final RowWrapper rowWrapper; + + RowPartitionedFanoutWriter(PartitionSpec spec, FileFormat format, FileAppenderFactory appenderFactory, + OutputFileFactory fileFactory, FileIO io, long targetFileSize, Schema schema) { + super(spec, format, appenderFactory, fileFactory, io, targetFileSize); + this.partitionKey = new PartitionKey(spec, schema); + this.rowWrapper = new RowWrapper(schema.asStruct()); + } + + @Override + protected PartitionKey partition(Row row) { + partitionKey.partition(rowWrapper.wrap(row)); + return partitionKey; + } + } + + static class FlinkFileAppenderFactory implements FileAppenderFactory { + private final Schema schema; + private final Map props; + + FlinkFileAppenderFactory(Schema schema, Map props) { + this.schema = schema; + this.props = props; + } + + @Override + public FileAppender newAppender(OutputFile outputFile, FileFormat format) { + MetricsConfig metricsConfig = MetricsConfig.fromProperties(props); + try { + switch (format) { + case PARQUET: + return Parquet.write(outputFile) + .createWriterFunc(FlinkParquetWriters::buildWriter) + .setAll(props) + .metricsConfig(metricsConfig) + .schema(schema) + .overwrite() + .build(); + + case AVRO: + return Avro.write(outputFile) + .createWriterFunc(FlinkAvroWriter::new) + .setAll(props) + .schema(schema) + .overwrite() + .build(); + + case ORC: + default: + throw new UnsupportedOperationException("Cannot write unknown file format: " + format); + } + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + } +} diff --git a/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java b/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java new file mode 100644 index 000000000000..2ac907a3658e --- /dev/null +++ b/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.flink.types.Row; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.IcebergGenerics; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.hadoop.HadoopTables; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.Types; +import org.junit.Assert; + +public class SimpleDataUtil { + + private SimpleDataUtil() { + } + + static final Schema SCHEMA = new Schema( + Types.NestedField.optional(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "data", Types.StringType.get()) + ); + + static final Record RECORD = GenericRecord.create(SCHEMA); + + static Table createTable(String path, Map properties, boolean partitioned) { + PartitionSpec spec; + if (partitioned) { + spec = PartitionSpec.builderFor(SCHEMA).identity("data").build(); + } else { + spec = PartitionSpec.unpartitioned(); + } + return new HadoopTables().create(SCHEMA, spec, properties, path); + } + + static Record createRecord(Integer id, String data) { + Record record = RECORD.copy(); + record.setField("id", id); + record.setField("data", data); + return record; + } + + static void assertTableRows(String tablePath, List rows) throws IOException { + List records = Lists.newArrayList(); + for (Row row : rows) { + records.add(createRecord((Integer) row.getField(0), (String) row.getField(1))); + } + assertTableRecords(tablePath, records); + } + + static void assertTableRecords(String tablePath, List expected) throws IOException { + Preconditions.checkArgument(expected != null, "expected records shouldn't be null"); + Table newTable = new HadoopTables().load(tablePath); + Set resultSet; + try (CloseableIterable iterable = (CloseableIterable) IcebergGenerics.read(newTable).build()) { + resultSet = Sets.newHashSet(iterable); + } + Assert.assertEquals("Should produce the expected record", resultSet, Sets.newHashSet(expected)); + } +} diff --git a/flink/src/test/java/org/apache/iceberg/flink/TestTaskWriters.java b/flink/src/test/java/org/apache/iceberg/flink/TestTaskWriters.java new file mode 100644 index 000000000000..04962025192b --- /dev/null +++ b/flink/src/test/java/org/apache/iceberg/flink/TestTaskWriters.java @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink; + +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import org.apache.flink.types.Row; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.AppendFiles; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.flink.data.RandomData; +import org.apache.iceberg.io.OutputFileFactory; +import org.apache.iceberg.io.TaskWriter; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class TestTaskWriters { + private static final Configuration CONF = new Configuration(); + private static final long TARGET_FILE_SIZE = 128 * 1024 * 1024; + + @Rule + public final TemporaryFolder tempFolder = new TemporaryFolder(); + + // TODO add ORC unit test once the readers and writers are ready. + @Parameterized.Parameters(name = "format = {0}, partitioned = {1}") + public static Object[][] parameters() { + return new Object[][] { + new Object[] {"parquet", true}, + new Object[] {"parquet", false}, + new Object[] {"avro", true}, + new Object[] {"avro", false}, + }; + } + + private final FileFormat format; + private final boolean partitioned; + + private String path; + private Table table; + + public TestTaskWriters(String format, boolean partitioned) { + this.format = FileFormat.valueOf(format.toUpperCase(Locale.ENGLISH)); + this.partitioned = partitioned; + } + + @Before + public void before() throws IOException { + File folder = tempFolder.newFolder(); + path = folder.getAbsolutePath(); + + // Construct the iceberg table with the specified file format. + Map props = ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name()); + table = SimpleDataUtil.createTable(path, props, partitioned); + } + + @Test + public void testWriteZeroRecord() throws IOException { + try (TaskWriter taskWriter = createTaskWriter(TARGET_FILE_SIZE)) { + taskWriter.close(); + + DataFile[] dataFiles = taskWriter.complete(); + Assert.assertNotNull(dataFiles); + Assert.assertEquals(0, dataFiles.length); + + // Close again. + taskWriter.close(); + dataFiles = taskWriter.complete(); + Assert.assertNotNull(dataFiles); + Assert.assertEquals(0, dataFiles.length); + } + } + + @Test + public void testCloseTwice() throws IOException { + try (TaskWriter taskWriter = createTaskWriter(TARGET_FILE_SIZE)) { + taskWriter.write(Row.of(1, "hello")); + taskWriter.write(Row.of(2, "world")); + taskWriter.close(); // The first close + taskWriter.close(); // The second close + + int expectedFiles = partitioned ? 2 : 1; + DataFile[] dataFiles = taskWriter.complete(); + Assert.assertEquals(expectedFiles, dataFiles.length); + + FileSystem fs = FileSystem.get(CONF); + for (DataFile dataFile : dataFiles) { + Assert.assertTrue(fs.exists(new Path(dataFile.path().toString()))); + } + } + } + + @Test + public void testAbort() throws IOException { + try (TaskWriter taskWriter = createTaskWriter(TARGET_FILE_SIZE)) { + taskWriter.write(Row.of(1, "hello")); + taskWriter.write(Row.of(2, "world")); + + taskWriter.abort(); + DataFile[] dataFiles = taskWriter.complete(); + + int expectedFiles = partitioned ? 2 : 1; + Assert.assertEquals(expectedFiles, dataFiles.length); + + FileSystem fs = FileSystem.get(CONF); + for (DataFile dataFile : dataFiles) { + Assert.assertFalse(fs.exists(new Path(dataFile.path().toString()))); + } + } + } + + @Test + public void testCompleteFiles() throws IOException { + try (TaskWriter taskWriter = createTaskWriter(TARGET_FILE_SIZE)) { + taskWriter.write(Row.of(1, "a")); + taskWriter.write(Row.of(2, "b")); + taskWriter.write(Row.of(3, "c")); + taskWriter.write(Row.of(4, "d")); + + DataFile[] dataFiles = taskWriter.complete(); + int expectedFiles = partitioned ? 4 : 1; + Assert.assertEquals(expectedFiles, dataFiles.length); + + dataFiles = taskWriter.complete(); + Assert.assertEquals(expectedFiles, dataFiles.length); + + FileSystem fs = FileSystem.get(CONF); + for (DataFile dataFile : dataFiles) { + Assert.assertTrue(fs.exists(new Path(dataFile.path().toString()))); + } + + AppendFiles appendFiles = table.newAppend(); + for (DataFile dataFile : dataFiles) { + appendFiles.appendFile(dataFile); + } + appendFiles.commit(); + + // Assert the data rows. + SimpleDataUtil.assertTableRecords(path, Lists.newArrayList( + SimpleDataUtil.createRecord(1, "a"), + SimpleDataUtil.createRecord(2, "b"), + SimpleDataUtil.createRecord(3, "c"), + SimpleDataUtil.createRecord(4, "d") + )); + } + } + + @Test + public void testRollingWithTargetFileSize() throws IOException { + try (TaskWriter taskWriter = createTaskWriter(4)) { + List rows = Lists.newArrayListWithCapacity(8000); + List records = Lists.newArrayListWithCapacity(8000); + for (int i = 0; i < 2000; i++) { + for (String data : new String[] {"a", "b", "c", "d"}) { + rows.add(Row.of(i, data)); + records.add(SimpleDataUtil.createRecord(i, data)); + } + } + + for (Row row : rows) { + taskWriter.write(row); + } + + DataFile[] dataFiles = taskWriter.complete(); + Assert.assertEquals(8, dataFiles.length); + + AppendFiles appendFiles = table.newAppend(); + for (DataFile dataFile : dataFiles) { + appendFiles.appendFile(dataFile); + } + appendFiles.commit(); + + // Assert the data rows. + SimpleDataUtil.assertTableRecords(path, records); + } + } + + @Test + public void testRandomData() throws IOException { + try (TaskWriter taskWriter = createTaskWriter(TARGET_FILE_SIZE)) { + Iterable rows = RandomData.generate(SimpleDataUtil.SCHEMA, 100, 1996); + for (Row row : rows) { + taskWriter.write(row); + } + + taskWriter.close(); + DataFile[] dataFiles = taskWriter.complete(); + AppendFiles appendFiles = table.newAppend(); + for (DataFile dataFile : dataFiles) { + appendFiles.appendFile(dataFile); + } + appendFiles.commit(); + + // Assert the data rows. + SimpleDataUtil.assertTableRows(path, Lists.newArrayList(rows)); + } + } + + private TaskWriter createTaskWriter(long targetFileSize) { + TaskWriterFactory.FlinkFileAppenderFactory appenderFactory = + new TaskWriterFactory.FlinkFileAppenderFactory(table.schema(), table.properties()); + + OutputFileFactory outputFileFactory = new OutputFileFactory(table.spec(), format, table.locationProvider(), + table.io(), table.encryption(), 1, 1); + + return TaskWriterFactory.createTaskWriter(table.schema(), table.spec(), format, + appenderFactory, outputFileFactory, table.io(), targetFileSize); + } +} diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/BaseWriter.java b/spark/src/main/java/org/apache/iceberg/spark/source/BaseWriter.java deleted file mode 100644 index 8c41e77d0f10..000000000000 --- a/spark/src/main/java/org/apache/iceberg/spark/source/BaseWriter.java +++ /dev/null @@ -1,144 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iceberg.spark.source; - -import java.io.Closeable; -import java.io.IOException; -import java.util.List; -import org.apache.iceberg.DataFile; -import org.apache.iceberg.DataFiles; -import org.apache.iceberg.FileFormat; -import org.apache.iceberg.Metrics; -import org.apache.iceberg.PartitionKey; -import org.apache.iceberg.PartitionSpec; -import org.apache.iceberg.encryption.EncryptedOutputFile; -import org.apache.iceberg.io.FileAppender; -import org.apache.iceberg.io.FileIO; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.util.Tasks; -import org.apache.spark.sql.catalyst.InternalRow; - -abstract class BaseWriter implements Closeable { - protected static final int ROWS_DIVISOR = 1000; - - private final List completedFiles = Lists.newArrayList(); - private final PartitionSpec spec; - private final FileFormat format; - private final SparkAppenderFactory appenderFactory; - private final OutputFileFactory fileFactory; - private final FileIO io; - private final long targetFileSize; - private PartitionKey currentKey = null; - private FileAppender currentAppender = null; - private EncryptedOutputFile currentFile = null; - private long currentRows = 0; - - BaseWriter(PartitionSpec spec, FileFormat format, SparkAppenderFactory appenderFactory, - OutputFileFactory fileFactory, FileIO io, long targetFileSize) { - this.spec = spec; - this.format = format; - this.appenderFactory = appenderFactory; - this.fileFactory = fileFactory; - this.io = io; - this.targetFileSize = targetFileSize; - } - - public abstract void write(InternalRow row) throws IOException; - - public void writeInternal(InternalRow row) throws IOException { - //TODO: ORC file now not support target file size before closed - if (!format.equals(FileFormat.ORC) && - currentRows % ROWS_DIVISOR == 0 && currentAppender.length() >= targetFileSize) { - closeCurrent(); - openCurrent(); - } - - currentAppender.add(row); - currentRows++; - } - - public TaskResult complete() throws IOException { - closeCurrent(); - - return new TaskResult(completedFiles); - } - - public void abort() throws IOException { - closeCurrent(); - - // clean up files created by this writer - Tasks.foreach(completedFiles) - .throwFailureWhenFinished() - .noRetry() - .run(file -> io.deleteFile(file.path().toString())); - } - - @Override - public void close() throws IOException { - closeCurrent(); - } - - protected void openCurrent() { - if (spec.fields().size() == 0) { - // unpartitioned - currentFile = fileFactory.newOutputFile(); - } else { - // partitioned - currentFile = fileFactory.newOutputFile(currentKey); - } - currentAppender = appenderFactory.newAppender(currentFile.encryptingOutputFile(), format); - currentRows = 0; - } - - protected void closeCurrent() throws IOException { - if (currentAppender != null) { - currentAppender.close(); - // metrics are only valid after the appender is closed - Metrics metrics = currentAppender.metrics(); - long fileSizeInBytes = currentAppender.length(); - List splitOffsets = currentAppender.splitOffsets(); - this.currentAppender = null; - - if (metrics.recordCount() == 0L) { - io.deleteFile(currentFile.encryptingOutputFile()); - } else { - DataFile dataFile = DataFiles.builder(spec) - .withEncryptionKeyMetadata(currentFile.keyMetadata()) - .withPath(currentFile.encryptingOutputFile().location()) - .withFileSizeInBytes(fileSizeInBytes) - .withPartition(spec.fields().size() == 0 ? null : currentKey) // set null if unpartitioned - .withMetrics(metrics) - .withSplitOffsets(splitOffsets) - .build(); - completedFiles.add(dataFile); - } - - this.currentFile = null; - } - } - - protected PartitionKey getCurrentKey() { - return currentKey; - } - - protected void setCurrentKey(PartitionKey currentKey) { - this.currentKey = currentKey; - } -} diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java index 2837dbc2f59c..37ca56c700a4 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java +++ b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java @@ -20,7 +20,7 @@ package org.apache.iceberg.spark.source; import java.io.Serializable; -import java.util.Arrays; +import java.util.Collection; import java.util.List; import java.util.Locale; import java.util.Map; @@ -35,11 +35,16 @@ import org.apache.iceberg.encryption.EncryptionManager; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.LocationProvider; +import org.apache.iceberg.io.OutputFileFactory; +import org.apache.iceberg.io.TaskWriter; +import org.apache.iceberg.io.UnpartitionedWriter; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.spark.TaskContext; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.broadcast.Broadcast; import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.types.StructType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -77,14 +82,14 @@ public RowDataRewriter(Table table, PartitionSpec spec, boolean caseSensitive, } public List rewriteDataForTasks(JavaRDD taskRDD) { - JavaRDD taskCommitRDD = taskRDD.map(this::rewriteDataForTask); + JavaRDD> dataFilesRDD = taskRDD.map(this::rewriteDataForTask); - return taskCommitRDD.collect().stream() - .flatMap(taskCommit -> Arrays.stream(taskCommit.files())) + return dataFilesRDD.collect().stream() + .flatMap(Collection::stream) .collect(Collectors.toList()); } - private TaskResult rewriteDataForTask(CombinedScanTask task) throws Exception { + private List rewriteDataForTask(CombinedScanTask task) throws Exception { TaskContext context = TaskContext.get(); int partitionId = context.partitionId(); long taskId = context.taskAttemptId(); @@ -92,16 +97,17 @@ private TaskResult rewriteDataForTask(CombinedScanTask task) throws Exception { RowDataReader dataReader = new RowDataReader( task, schema, schema, nameMapping, io.value(), encryptionManager.value(), caseSensitive); - SparkAppenderFactory appenderFactory = new SparkAppenderFactory( - properties, schema, SparkSchemaUtil.convert(schema)); + StructType structType = SparkSchemaUtil.convert(schema); + SparkAppenderFactory appenderFactory = new SparkAppenderFactory(properties, schema, structType); OutputFileFactory fileFactory = new OutputFileFactory( spec, format, locations, io.value(), encryptionManager.value(), partitionId, taskId); - BaseWriter writer; + TaskWriter writer; if (spec.fields().isEmpty()) { - writer = new UnpartitionedWriter(spec, format, appenderFactory, fileFactory, io.value(), Long.MAX_VALUE); + writer = new UnpartitionedWriter<>(spec, format, appenderFactory, fileFactory, io.value(), Long.MAX_VALUE); } else { - writer = new PartitionedWriter(spec, format, appenderFactory, fileFactory, io.value(), Long.MAX_VALUE, schema); + writer = new SparkPartitionedWriter(spec, format, appenderFactory, fileFactory, io.value(), Long.MAX_VALUE, + schema, structType); } try { @@ -112,7 +118,9 @@ private TaskResult rewriteDataForTask(CombinedScanTask task) throws Exception { dataReader.close(); dataReader = null; - return writer.complete(); + + writer.close(); + return Lists.newArrayList(writer.complete()); } catch (Throwable originalThrowable) { try { diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java b/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java index d7b271e82396..bc9bbd946e03 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java +++ b/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java @@ -27,6 +27,7 @@ import org.apache.iceberg.avro.Avro; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.io.FileAppenderFactory; import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.orc.ORC; import org.apache.iceberg.parquet.Parquet; @@ -36,7 +37,7 @@ import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.types.StructType; -class SparkAppenderFactory { +class SparkAppenderFactory implements FileAppenderFactory { private final Map properties; private final Schema writeSchema; private final StructType dsSchema; @@ -47,6 +48,7 @@ class SparkAppenderFactory { this.dsSchema = dsSchema; } + @Override public FileAppender newAppender(OutputFile file, FileFormat fileFormat) { MetricsConfig metricsConfig = MetricsConfig.fromProperties(properties); try { diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitionedWriter.java b/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitionedWriter.java new file mode 100644 index 000000000000..f81a09926d85 --- /dev/null +++ b/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitionedWriter.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source; + +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionKey; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.io.FileAppenderFactory; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.OutputFileFactory; +import org.apache.iceberg.io.PartitionedWriter; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.types.StructType; + +public class SparkPartitionedWriter extends PartitionedWriter { + private final PartitionKey partitionKey; + private final InternalRowWrapper internalRowWrapper; + + public SparkPartitionedWriter(PartitionSpec spec, FileFormat format, + FileAppenderFactory appenderFactory, + OutputFileFactory fileFactory, FileIO io, long targetFileSize, + Schema schema, StructType sparkSchema) { + super(spec, format, appenderFactory, fileFactory, io, targetFileSize); + this.partitionKey = new PartitionKey(spec, schema); + this.internalRowWrapper = new InternalRowWrapper(sparkSchema); + } + + @Override + protected PartitionKey partition(InternalRow row) { + partitionKey.partition(internalRowWrapper.wrap(row)); + return partitionKey; + } +} diff --git a/spark2/src/main/java/org/apache/iceberg/spark/source/Writer.java b/spark2/src/main/java/org/apache/iceberg/spark/source/Writer.java index 9ce590c4720b..cf06a5176b8d 100644 --- a/spark2/src/main/java/org/apache/iceberg/spark/source/Writer.java +++ b/spark2/src/main/java/org/apache/iceberg/spark/source/Writer.java @@ -37,6 +37,8 @@ import org.apache.iceberg.encryption.EncryptionManager; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.LocationProvider; +import org.apache.iceberg.io.OutputFileFactory; +import org.apache.iceberg.io.UnpartitionedWriter; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Maps; @@ -211,7 +213,7 @@ protected Table table() { protected Iterable files(WriterCommitMessage[] messages) { if (messages.length > 0) { return Iterables.concat(Iterables.transform(Arrays.asList(messages), message -> message != null ? - ImmutableList.copyOf(((TaskResult) message).files()) : + ImmutableList.copyOf(((TaskCommit) message).files()) : ImmutableList.of())); } return ImmutableList.of(); @@ -222,9 +224,15 @@ public String toString() { return String.format("IcebergWrite(table=%s, format=%s)", table, format); } - private static class TaskCommit extends TaskResult implements WriterCommitMessage { - TaskCommit(TaskResult toCopy) { - super(toCopy.files()); + private static class TaskCommit implements WriterCommitMessage { + private final DataFile[] taskFiles; + + TaskCommit(DataFile[] files) { + this.taskFiles = files; + } + + DataFile[] files() { + return this.taskFiles; } } @@ -263,13 +271,14 @@ public DataWriter createDataWriter(int partitionId, long taskId, lo if (spec.fields().isEmpty()) { return new Unpartitioned24Writer(spec, format, appenderFactory, fileFactory, io.value(), targetFileSize); } else { - return new Partitioned24Writer( - spec, format, appenderFactory, fileFactory, io.value(), targetFileSize, writeSchema); + return new Partitioned24Writer(spec, format, appenderFactory, fileFactory, io.value(), + targetFileSize, writeSchema, dsSchema); } } } - private static class Unpartitioned24Writer extends UnpartitionedWriter implements DataWriter { + private static class Unpartitioned24Writer extends UnpartitionedWriter + implements DataWriter { Unpartitioned24Writer(PartitionSpec spec, FileFormat format, SparkAppenderFactory appenderFactory, OutputFileFactory fileFactory, FileIO fileIo, long targetFileSize) { super(spec, format, appenderFactory, fileFactory, fileIo, targetFileSize); @@ -277,18 +286,24 @@ private static class Unpartitioned24Writer extends UnpartitionedWriter implement @Override public WriterCommitMessage commit() throws IOException { + close(); + return new TaskCommit(complete()); } } - private static class Partitioned24Writer extends PartitionedWriter implements DataWriter { + private static class Partitioned24Writer extends SparkPartitionedWriter implements DataWriter { + Partitioned24Writer(PartitionSpec spec, FileFormat format, SparkAppenderFactory appenderFactory, - OutputFileFactory fileFactory, FileIO fileIo, long targetFileSize, Schema writeSchema) { - super(spec, format, appenderFactory, fileFactory, fileIo, targetFileSize, writeSchema); + OutputFileFactory fileFactory, FileIO fileIo, long targetFileSize, + Schema schema, StructType sparkSchema) { + super(spec, format, appenderFactory, fileFactory, fileIo, targetFileSize, schema, sparkSchema); } @Override public WriterCommitMessage commit() throws IOException { + close(); + return new TaskCommit(complete()); } } diff --git a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchWrite.java b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchWrite.java index e6b36f429eb8..0b7c00cca9fa 100644 --- a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchWrite.java +++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchWrite.java @@ -39,6 +39,8 @@ import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.LocationProvider; +import org.apache.iceberg.io.OutputFileFactory; +import org.apache.iceberg.io.UnpartitionedWriter; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Maps; @@ -239,9 +241,15 @@ public String toString() { return String.format("IcebergWrite(table=%s, format=%s)", table, format); } - public static class TaskCommit extends TaskResult implements WriterCommitMessage { - TaskCommit(TaskResult result) { - super(result.files()); + public static class TaskCommit implements WriterCommitMessage { + private final DataFile[] taskFiles; + + TaskCommit(DataFile[] taskFiles) { + this.taskFiles = taskFiles; + } + + DataFile[] files() { + return taskFiles; } } @@ -285,12 +293,13 @@ public DataWriter createWriter(int partitionId, long taskId, long e return new Unpartitioned3Writer(spec, format, appenderFactory, fileFactory, io.value(), targetFileSize); } else { return new Partitioned3Writer( - spec, format, appenderFactory, fileFactory, io.value(), targetFileSize, writeSchema); + spec, format, appenderFactory, fileFactory, io.value(), targetFileSize, writeSchema, dsSchema); } } } - private static class Unpartitioned3Writer extends UnpartitionedWriter implements DataWriter { + private static class Unpartitioned3Writer extends UnpartitionedWriter + implements DataWriter { Unpartitioned3Writer(PartitionSpec spec, FileFormat format, SparkAppenderFactory appenderFactory, OutputFileFactory fileFactory, FileIO io, long targetFileSize) { super(spec, format, appenderFactory, fileFactory, io, targetFileSize); @@ -298,18 +307,23 @@ private static class Unpartitioned3Writer extends UnpartitionedWriter implements @Override public WriterCommitMessage commit() throws IOException { + this.close(); + return new TaskCommit(complete()); } } - private static class Partitioned3Writer extends PartitionedWriter implements DataWriter { + private static class Partitioned3Writer extends SparkPartitionedWriter implements DataWriter { Partitioned3Writer(PartitionSpec spec, FileFormat format, SparkAppenderFactory appenderFactory, - OutputFileFactory fileFactory, FileIO io, long targetFileSize, Schema writeSchema) { - super(spec, format, appenderFactory, fileFactory, io, targetFileSize, writeSchema); + OutputFileFactory fileFactory, FileIO io, long targetFileSize, + Schema schema, StructType sparkSchema) { + super(spec, format, appenderFactory, fileFactory, io, targetFileSize, schema, sparkSchema); } @Override public WriterCommitMessage commit() throws IOException { + this.close(); + return new TaskCommit(complete()); } }