Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 146 additions & 0 deletions core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.io;

import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Metrics;
import org.apache.iceberg.PartitionKey;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.util.Tasks;

public abstract class BaseTaskWriter<T> implements TaskWriter<T> {
private final List<DataFile> completedFiles = Lists.newArrayList();
private final PartitionSpec spec;
private final FileFormat format;
private final FileAppenderFactory<T> appenderFactory;
private final OutputFileFactory fileFactory;
private final FileIO io;
private final long targetFileSize;

protected BaseTaskWriter(PartitionSpec spec, FileFormat format, FileAppenderFactory<T> appenderFactory,
OutputFileFactory fileFactory, FileIO io, long targetFileSize) {
this.spec = spec;
this.format = format;
this.appenderFactory = appenderFactory;
this.fileFactory = fileFactory;
this.io = io;
this.targetFileSize = targetFileSize;
}

@Override
public void abort() throws IOException {
close();

// clean up files created by this writer
Tasks.foreach(completedFiles)
.throwFailureWhenFinished()
.noRetry()
.run(file -> io.deleteFile(file.path().toString()));
}

@Override
public DataFile[] complete() throws IOException {
close();

return completedFiles.toArray(new DataFile[0]);
}

protected class RollingFileWriter implements Closeable {
private static final int ROWS_DIVISOR = 1000;
private final PartitionKey partitionKey;

private EncryptedOutputFile currentFile = null;
private FileAppender<T> currentAppender = null;
private long currentRows = 0;

public RollingFileWriter(PartitionKey partitionKey) {
this.partitionKey = partitionKey;
openCurrent();
}

public void add(T record) throws IOException {
this.currentAppender.add(record);
this.currentRows++;

if (shouldRollToNewFile()) {
closeCurrent();
openCurrent();
}
}

private void openCurrent() {
if (partitionKey == null) {
// unpartitioned
currentFile = fileFactory.newOutputFile();
} else {
// partitioned
currentFile = fileFactory.newOutputFile(partitionKey);
}
currentAppender = appenderFactory.newAppender(currentFile.encryptingOutputFile(), format);
currentRows = 0;
}

private boolean shouldRollToNewFile() {
//TODO: ORC file now not support target file size before closed
return !format.equals(FileFormat.ORC) &&
currentRows % ROWS_DIVISOR == 0 && currentAppender.length() >= targetFileSize;
}

private void closeCurrent() throws IOException {
if (currentAppender != null) {
currentAppender.close();
// metrics are only valid after the appender is closed
Metrics metrics = currentAppender.metrics();
long fileSizeInBytes = currentAppender.length();
List<Long> splitOffsets = currentAppender.splitOffsets();
this.currentAppender = null;

if (metrics.recordCount() == 0L) {
io.deleteFile(currentFile.encryptingOutputFile());
} else {
DataFile dataFile = DataFiles.builder(spec)
.withEncryptionKeyMetadata(currentFile.keyMetadata())
.withPath(currentFile.encryptingOutputFile().location())
.withFileSizeInBytes(fileSizeInBytes)
.withPartition(spec.fields().size() == 0 ? null : partitionKey) // set null if unpartitioned
.withMetrics(metrics)
.withSplitOffsets(splitOffsets)
.build();
completedFiles.add(dataFile);
}

this.currentFile = null;
this.currentRows = 0;
}
}

@Override
public void close() throws IOException {
closeCurrent();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,32 +17,23 @@
* under the License.
*/

package org.apache.iceberg.spark.source;
package org.apache.iceberg.io;

import java.io.Serializable;
import java.util.List;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;

class TaskResult implements Serializable {
private final DataFile[] files;

TaskResult() {
this.files = new DataFile[0];
}

TaskResult(DataFile file) {
this.files = new DataFile[] { file };
}

TaskResult(List<DataFile> files) {
this.files = files.toArray(new DataFile[files.size()]);
}

TaskResult(DataFile[] files) {
this.files = files;
}

DataFile[] files() {
return files;
}
/**
* Factory to create a new {@link FileAppender} to write records.
*
* @param <T> data type of the rows to append.
*/
public interface FileAppenderFactory<T> {

/**
* Create a new {@link FileAppender}.
*
* @param outputFile an OutputFile used to create an output stream.
* @param fileFormat File format.
* @return a newly created {@link FileAppender}
*/
FileAppender<T> newAppender(OutputFile outputFile, FileFormat fileFormat);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

package org.apache.iceberg.spark.source;
package org.apache.iceberg.io;

import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -26,11 +26,8 @@
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.LocationProvider;
import org.apache.iceberg.io.OutputFile;

class OutputFileFactory {
public class OutputFileFactory {
private final PartitionSpec spec;
private final FileFormat format;
private final LocationProvider locations;
Expand All @@ -44,8 +41,8 @@ class OutputFileFactory {
private final String uuid = UUID.randomUUID().toString();
private final AtomicInteger fileCount = new AtomicInteger(0);

OutputFileFactory(PartitionSpec spec, FileFormat format, LocationProvider locations, FileIO io,
EncryptionManager encryptionManager, int partitionId, long taskId) {
public OutputFileFactory(PartitionSpec spec, FileFormat format, LocationProvider locations, FileIO io,
EncryptionManager encryptionManager, int partitionId, long taskId) {
this.spec = spec;
this.format = format;
this.locations = locations;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,44 +17,50 @@
* under the License.
*/

package org.apache.iceberg.spark.source;
package org.apache.iceberg.io;

import java.io.IOException;
import java.util.Set;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionKey;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.spark.sql.catalyst.InternalRow;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class PartitionedWriter extends BaseWriter {
public abstract class PartitionedWriter<T> extends BaseTaskWriter<T> {
private static final Logger LOG = LoggerFactory.getLogger(PartitionedWriter.class);

private final PartitionKey key;
private final InternalRowWrapper wrapper;
private final Set<PartitionKey> completedPartitions = Sets.newHashSet();

PartitionedWriter(PartitionSpec spec, FileFormat format, SparkAppenderFactory appenderFactory,
OutputFileFactory fileFactory, FileIO io, long targetFileSize, Schema writeSchema) {
private PartitionKey currentKey = null;
private RollingFileWriter currentWriter = null;

public PartitionedWriter(PartitionSpec spec, FileFormat format, FileAppenderFactory<T> appenderFactory,
OutputFileFactory fileFactory, FileIO io, long targetFileSize) {
super(spec, format, appenderFactory, fileFactory, io, targetFileSize);
this.key = new PartitionKey(spec, writeSchema);
this.wrapper = new InternalRowWrapper(SparkSchemaUtil.convert(writeSchema));
}

/**
* Create a PartitionKey from the values in row.
* <p>
* Any PartitionKey returned by this method can be reused by the implementation.
*
* @param row a data row
*/
protected abstract PartitionKey partition(T row);

@Override
public void write(InternalRow row) throws IOException {
key.partition(wrapper.wrap(row));
public void write(T row) throws IOException {
PartitionKey key = partition(row);

PartitionKey currentKey = getCurrentKey();
if (!key.equals(currentKey)) {
closeCurrent();
completedPartitions.add(currentKey);
if (currentKey != null) {
// if the key is null, there was no previous current key and current writer.
currentWriter.close();
completedPartitions.add(currentKey);
}

if (completedPartitions.contains(key)) {
// if rows are not correctly grouped, detect and fail the write
Expand All @@ -63,10 +69,17 @@ public void write(InternalRow row) throws IOException {
throw new IllegalStateException("Already closed files for partition: " + key.toPath());
}

setCurrentKey(key.copy());
openCurrent();
currentKey = key.copy();
currentWriter = new RollingFileWriter(currentKey);
}

writeInternal(row);
currentWriter.add(row);
}

@Override
public void close() throws IOException {
if (currentWriter != null) {
currentWriter.close();
}
}
}
51 changes: 51 additions & 0 deletions core/src/main/java/org/apache/iceberg/io/TaskWriter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.io;

import java.io.Closeable;
import java.io.IOException;
import org.apache.iceberg.DataFile;

/**
* The writer interface could accept records and provide the generated data files.
*
* @param <T> to indicate the record data type.
*/
public interface TaskWriter<T> extends Closeable {

/**
* Write the row into the data files.
*/
void write(T row) throws IOException;

/**
* Close the writer and delete the completed files if possible when aborting.
*
* @throws IOException if any IO error happen.
*/
void abort() throws IOException;

/**
* Close the writer and get the completed data files.
*
* @return the completed data files of this task writer.
*/
DataFile[] complete() throws IOException;
}
Loading