Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public EqualityDeleteWriter(FileAppender<T> appender, FileFormat format, String
}

@Override
public void write(T row) throws IOException {
public void write(T row) {
appender.add(row);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public PositionDeleteWriter(FileAppender<StructLike> appender, FileFormat format
}

@Override
public void write(PositionDelete<T> positionDelete) throws IOException {
public void write(PositionDelete<T> positionDelete) {
referencedDataFiles.add(positionDelete.path());
appender.add(positionDelete);
}
Expand Down
11 changes: 8 additions & 3 deletions core/src/main/java/org/apache/iceberg/io/ClusteredWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.iceberg.io;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Comparator;
import java.util.Set;
import org.apache.iceberg.PartitionSpec;
Expand Down Expand Up @@ -63,7 +64,7 @@ abstract class ClusteredWriter<T, R> implements PartitioningWriter<T, R> {
protected abstract R aggregatedResult();

@Override
public void write(T row, PartitionSpec spec, StructLike partition) throws IOException {
public void write(T row, PartitionSpec spec, StructLike partition) {
if (!spec.equals(currentSpec)) {
if (currentSpec != null) {
closeCurrentWriter();
Expand Down Expand Up @@ -110,9 +111,13 @@ public void close() throws IOException {
}
}

private void closeCurrentWriter() throws IOException {
private void closeCurrentWriter() {
if (currentWriter != null) {
currentWriter.close();
try {
currentWriter.close();
} catch (IOException e) {
throw new UncheckedIOException("Failed to close current writer", e);
}

addResult(currentWriter.result());

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/org/apache/iceberg/io/FanoutWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ abstract class FanoutWriter<T, R> implements PartitioningWriter<T, R> {
protected abstract R aggregatedResult();

@Override
public void write(T row, PartitionSpec spec, StructLike partition) throws IOException {
public void write(T row, PartitionSpec spec, StructLike partition) {
FileWriter<T, R> writer = writer(spec, partition);
writer.write(row);
}
Expand Down
7 changes: 2 additions & 5 deletions core/src/main/java/org/apache/iceberg/io/FileWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package org.apache.iceberg.io;

import java.io.Closeable;
import java.io.IOException;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;

Expand All @@ -41,9 +40,8 @@ public interface FileWriter<T, R> extends Closeable {
* Writes rows to a predefined spec/partition.
*
* @param rows data or delete records
* @throws IOException in case of an error during the write process
*/
default void write(Iterable<T> rows) throws IOException {
default void write(Iterable<T> rows) {
for (T row : rows) {
write(row);
}
Expand All @@ -53,9 +51,8 @@ default void write(Iterable<T> rows) throws IOException {
* Writes a row to a predefined spec/partition.
*
* @param row a data or delete record
* @throws IOException in case of an error during the write process
*/
void write(T row) throws IOException;
void write(T row);

/**
* Returns the number of bytes that were currently written by this writer.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package org.apache.iceberg.io;

import java.io.Closeable;
import java.io.IOException;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.PartitionSpec;
Expand All @@ -46,9 +45,8 @@ public interface PartitioningWriter<T, R> extends Closeable {
* @param row a data or delete record
* @param spec a partition spec
* @param partition a partition or null if the spec is unpartitioned
* @throws IOException in case of an error during the write process
*/
void write(T row, PartitionSpec spec, StructLike partition) throws IOException;
void write(T row, PartitionSpec spec, StructLike partition);

/**
* Returns a result that contains information about written {@link DataFile}s or {@link DeleteFile}s.
Expand Down
11 changes: 8 additions & 3 deletions core/src/main/java/org/apache/iceberg/io/RollingFileWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.iceberg.io;

import java.io.IOException;
import java.io.UncheckedIOException;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.encryption.EncryptedOutputFile;
Expand Down Expand Up @@ -81,7 +82,7 @@ public long length() {
}

@Override
public void write(T row) throws IOException {
public void write(T row) {
currentWriter.write(row);
currentFileRows++;

Expand Down Expand Up @@ -111,9 +112,13 @@ private EncryptedOutputFile newFile() {
}
}

private void closeCurrentWriter() throws IOException {
private void closeCurrentWriter() {
if (currentWriter != null) {
currentWriter.close();
try {
currentWriter.close();
} catch (IOException e) {
throw new UncheckedIOException("Failed to close current writer", e);
}

if (currentFileRows == 0L) {
io.deleteFile(currentFile.encryptingOutputFile());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public long length() {
}

@Override
public void write(PositionDelete<T> payload) throws IOException {
public void write(PositionDelete<T> payload) {
delete(payload.path(), payload.pos(), payload.row());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.DataFile;
Expand Down Expand Up @@ -152,13 +151,7 @@ public void testClusteredDataWriterOutOfOrderPartitions() throws IOException {

AssertHelpers.assertThrows("Should fail to write out of order partitions",
IllegalStateException.class, "Encountered records that belong to already closed files",
() -> {
try {
writer.write(toRow(6, "aaa"), spec, partitionKey(spec, "aaa"));
} catch (IOException e) {
throw new UncheckedIOException(e);
}
});
() -> writer.write(toRow(6, "aaa"), spec, partitionKey(spec, "aaa")));

writer.close();
}
Expand Down Expand Up @@ -303,23 +296,11 @@ public void testClusteredEqualityDeleteWriterOutOfOrderSpecsAndPartitions() thro

AssertHelpers.assertThrows("Should fail to write out of order partitions",
IllegalStateException.class, "Encountered records that belong to already closed files",
() -> {
try {
writer.write(toRow(7, "ccc"), identitySpec, partitionKey(identitySpec, "ccc"));
} catch (IOException e) {
throw new UncheckedIOException(e);
}
});
() -> writer.write(toRow(7, "ccc"), identitySpec, partitionKey(identitySpec, "ccc")));

AssertHelpers.assertThrows("Should fail to write out of order specs",
IllegalStateException.class, "Encountered records that belong to already closed files",
() -> {
try {
writer.write(toRow(7, "aaa"), unpartitionedSpec, null);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
});
() -> writer.write(toRow(7, "aaa"), unpartitionedSpec, null));

writer.close();
}
Expand Down Expand Up @@ -459,23 +440,15 @@ public void testClusteredPositionDeleteWriterOutOfOrderSpecsAndPartitions() thro
AssertHelpers.assertThrows("Should fail to write out of order partitions",
IllegalStateException.class, "Encountered records that belong to already closed files",
() -> {
try {
PositionDelete<T> positionDelete = positionDelete("file-5.parquet", 1L, null);
writer.write(positionDelete, identitySpec, partitionKey(identitySpec, "ccc"));
} catch (IOException e) {
throw new UncheckedIOException(e);
}
PositionDelete<T> positionDelete = positionDelete("file-5.parquet", 1L, null);
writer.write(positionDelete, identitySpec, partitionKey(identitySpec, "ccc"));
});

AssertHelpers.assertThrows("Should fail to write out of order specs",
IllegalStateException.class, "Encountered records that belong to already closed files",
() -> {
try {
PositionDelete<T> positionDelete = positionDelete("file-1.parquet", 3L, null);
writer.write(positionDelete, unpartitionedSpec, null);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
PositionDelete<T> positionDelete = positionDelete("file-1.parquet", 3L, null);
writer.write(positionDelete, unpartitionedSpec, null);
});

writer.close();
Expand Down