Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,8 @@ public List<DeleteFile> deleteFiles() {
public CharSequenceSet referencedDataFiles() {
return referencedDataFiles;
}

public boolean referencesDataFiles() {
return referencedDataFiles != null && referencedDataFiles.size() > 0;
}
}
61 changes: 61 additions & 0 deletions core/src/main/java/org/apache/iceberg/io/RollingDataWriter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.io;

import java.util.List;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;

/**
* A rolling data writer that splits incoming data into multiple files within one spec/partition
* based on the target file size.
*/
public class RollingDataWriter<T> extends RollingFileWriter<T, DataWriter<T>, DataWriteResult> {

private final FileWriterFactory<T> writerFactory;
private final List<DataFile> dataFiles;

public RollingDataWriter(FileWriterFactory<T> writerFactory, OutputFileFactory fileFactory,
FileIO io, long targetFileSizeInBytes,
PartitionSpec spec, StructLike partition) {
super(fileFactory, io, targetFileSizeInBytes, spec, partition);
this.writerFactory = writerFactory;
this.dataFiles = Lists.newArrayList();
openCurrentWriter();
}

@Override
protected DataWriter<T> newWriter(EncryptedOutputFile file) {
return writerFactory.newDataWriter(file, spec(), partition());
}

@Override
protected void addResult(DataWriteResult result) {
dataFiles.addAll(result.dataFiles());
}

@Override
protected DataWriteResult aggregatedResult() {
return new DataWriteResult(dataFiles);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.io;

import java.util.List;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.deletes.EqualityDeleteWriter;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;

/**
* A rolling equality delete writer that splits incoming deletes into multiple files within one spec/partition
* based on the target file size.
*/
public class RollingEqualityDeleteWriter<T> extends RollingFileWriter<T, EqualityDeleteWriter<T>, DeleteWriteResult> {
Copy link
Contributor Author

@aokolnychyi aokolnychyi Sep 20, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Delete writers are in org.apache.iceberg.deletes while all other writers are in org.apache.iceberg.io.
I think it makes sense to have writer-related classes in the io package so I added rolling writers there.


private final FileWriterFactory<T> writerFactory;
private final List<DeleteFile> deleteFiles;

public RollingEqualityDeleteWriter(FileWriterFactory<T> writerFactory, OutputFileFactory fileFactory,
FileIO io, long targetFileSizeInBytes,
PartitionSpec spec, StructLike partition) {
super(fileFactory, io, targetFileSizeInBytes, spec, partition);
this.writerFactory = writerFactory;
this.deleteFiles = Lists.newArrayList();
openCurrentWriter();
}

@Override
protected EqualityDeleteWriter<T> newWriter(EncryptedOutputFile file) {
return writerFactory.newEqualityDeleteWriter(file, spec(), partition());
}

@Override
protected void addResult(DeleteWriteResult result) {
Preconditions.checkArgument(!result.referencesDataFiles(), "Equality deletes cannot reference data files");
deleteFiles.addAll(result.deleteFiles());
}

@Override
protected DeleteWriteResult aggregatedResult() {
return new DeleteWriteResult(deleteFiles);
}
}
143 changes: 143 additions & 0 deletions core/src/main/java/org/apache/iceberg/io/RollingFileWriter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.io;

import java.io.IOException;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

/**
* A rolling writer capable of splitting incoming data or deletes into multiple files within one spec/partition
* based on the target file size.
*/
abstract class RollingFileWriter<T, W extends FileWriter<T, R>, R> implements FileWriter<T, R> {
private static final int ROWS_DIVISOR = 1000;

private final OutputFileFactory fileFactory;
private final FileIO io;
private final long targetFileSizeInBytes;
private final PartitionSpec spec;
private final StructLike partition;

private EncryptedOutputFile currentFile = null;
private long currentFileRows = 0;
private W currentWriter = null;

private boolean closed = false;

protected RollingFileWriter(OutputFileFactory fileFactory, FileIO io, long targetFileSizeInBytes,
PartitionSpec spec, StructLike partition) {
this.fileFactory = fileFactory;
this.io = io;
this.targetFileSizeInBytes = targetFileSizeInBytes;
this.spec = spec;
this.partition = partition;
}

protected abstract W newWriter(EncryptedOutputFile file);

protected abstract void addResult(R result);

protected abstract R aggregatedResult();

protected PartitionSpec spec() {
return spec;
}

protected StructLike partition() {
return partition;
}

public CharSequence currentFilePath() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed the precondition here that the current file is not null. We will call this method for every single row while writing CDC records. Right now, currentFile is never null as we init it in the constructor.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to me. You mean the CDC writer constructor?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean all classes that extend RollingFileWriter init the writer immediately so we shouldn't worry about the current file being null.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I saw that. I don't have a good way around this, but at least we know that it will fail quickly without that init call.

return currentFile.encryptingOutputFile().location();
}

public long currentFileRows() {
return currentFileRows;
}

@Override
public long length() {
throw new UnsupportedOperationException(this.getClass().getName() + " does not implement length");
}

@Override
public void write(T row) throws IOException {
currentWriter.write(row);
currentFileRows++;

if (shouldRollToNewFile()) {
closeCurrentWriter();
openCurrentWriter();
}
}

private boolean shouldRollToNewFile() {
return currentFileRows % ROWS_DIVISOR == 0 && currentWriter.length() >= targetFileSizeInBytes;
}

protected void openCurrentWriter() {
Preconditions.checkState(currentWriter == null, "Current writer has been already initialized");

this.currentFile = newFile();
this.currentFileRows = 0;
this.currentWriter = newWriter(currentFile);
}

private EncryptedOutputFile newFile() {
if (partition == null) {
return fileFactory.newOutputFile();
} else {
return fileFactory.newOutputFile(spec, partition);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rdblue, I've updated this place to pass spec like we discussed in the original PR.

}
}

private void closeCurrentWriter() throws IOException {
if (currentWriter != null) {
currentWriter.close();

if (currentFileRows == 0L) {
io.deleteFile(currentFile.encryptingOutputFile());
} else {
addResult(currentWriter.result());
}

this.currentFile = null;
this.currentFileRows = 0;
this.currentWriter = null;
}
}

@Override
public void close() throws IOException {
if (!closed) {
closeCurrentWriter();
this.closed = true;
}
}

@Override
public final R result() {
Preconditions.checkState(closed, "Cannot get result from unclosed writer");
return aggregatedResult();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.io;

import java.util.List;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.deletes.PositionDelete;
import org.apache.iceberg.deletes.PositionDeleteWriter;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.util.CharSequenceSet;

/**
* A rolling position delete writer that splits incoming deletes into multiple files within one spec/partition
* based on the target file size.
*/
public class RollingPositionDeleteWriter<T>
extends RollingFileWriter<PositionDelete<T>, PositionDeleteWriter<T>, DeleteWriteResult> {

private final FileWriterFactory<T> writerFactory;
private final List<DeleteFile> deleteFiles;
private final CharSequenceSet referencedDataFiles;

public RollingPositionDeleteWriter(FileWriterFactory<T> writerFactory, OutputFileFactory fileFactory,
FileIO io, long targetFileSizeInBytes,
PartitionSpec spec, StructLike partition) {
super(fileFactory, io, targetFileSizeInBytes, spec, partition);
this.writerFactory = writerFactory;
this.deleteFiles = Lists.newArrayList();
this.referencedDataFiles = CharSequenceSet.empty();
openCurrentWriter();
}

@Override
protected PositionDeleteWriter<T> newWriter(EncryptedOutputFile file) {
return writerFactory.newPositionDeleteWriter(file, spec(), partition());
}

@Override
protected void addResult(DeleteWriteResult result) {
deleteFiles.addAll(result.deleteFiles());
referencedDataFiles.addAll(result.referencedDataFiles());
}

@Override
protected DeleteWriteResult aggregatedResult() {
return new DeleteWriteResult(deleteFiles, referencedDataFiles);
}
}
Loading