Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import java.util.List;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
Expand All @@ -35,16 +34,14 @@ public class ClusteredDataWriter<T> extends ClusteredWriter<T, DataWriteResult>
private final FileWriterFactory<T> writerFactory;
private final OutputFileFactory fileFactory;
private final FileIO io;
private final FileFormat fileFormat;
private final long targetFileSizeInBytes;
private final List<DataFile> dataFiles;

public ClusteredDataWriter(FileWriterFactory<T> writerFactory, OutputFileFactory fileFactory,
FileIO io, FileFormat fileFormat, long targetFileSizeInBytes) {
FileIO io, long targetFileSizeInBytes) {
this.writerFactory = writerFactory;
this.fileFactory = fileFactory;
this.io = io;
this.fileFormat = fileFormat;
this.targetFileSizeInBytes = targetFileSizeInBytes;
this.dataFiles = Lists.newArrayList();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import java.util.List;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
Expand All @@ -36,16 +35,14 @@ public class ClusteredEqualityDeleteWriter<T> extends ClusteredWriter<T, DeleteW
private final FileWriterFactory<T> writerFactory;
private final OutputFileFactory fileFactory;
private final FileIO io;
private final FileFormat fileFormat;
private final long targetFileSizeInBytes;
private final List<DeleteFile> deleteFiles;

public ClusteredEqualityDeleteWriter(FileWriterFactory<T> writerFactory, OutputFileFactory fileFactory,
FileIO io, FileFormat fileFormat, long targetFileSizeInBytes) {
FileIO io, long targetFileSizeInBytes) {
this.writerFactory = writerFactory;
this.fileFactory = fileFactory;
this.io = io;
this.fileFormat = fileFormat;
this.targetFileSizeInBytes = targetFileSizeInBytes;
this.deleteFiles = Lists.newArrayList();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import java.util.List;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.deletes.PositionDelete;
Expand All @@ -37,17 +36,15 @@ public class ClusteredPositionDeleteWriter<T> extends ClusteredWriter<PositionDe
private final FileWriterFactory<T> writerFactory;
private final OutputFileFactory fileFactory;
private final FileIO io;
private final FileFormat fileFormat;
private final long targetFileSizeInBytes;
private final List<DeleteFile> deleteFiles;
private final CharSequenceSet referencedDataFiles;

public ClusteredPositionDeleteWriter(FileWriterFactory<T> writerFactory, OutputFileFactory fileFactory,
FileIO io, FileFormat fileFormat, long targetFileSizeInBytes) {
FileIO io, long targetFileSizeInBytes) {
this.writerFactory = writerFactory;
this.fileFactory = fileFactory;
this.io = io;
this.fileFormat = fileFormat;
this.targetFileSizeInBytes = targetFileSizeInBytes;
this.deleteFiles = Lists.newArrayList();
this.referencedDataFiles = CharSequenceSet.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import java.util.List;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
Expand All @@ -35,16 +34,14 @@ public class FanoutDataWriter<T> extends FanoutWriter<T, DataWriteResult> {
private final FileWriterFactory<T> writerFactory;
private final OutputFileFactory fileFactory;
private final FileIO io;
private final FileFormat fileFormat;
private final long targetFileSizeInBytes;
private final List<DataFile> dataFiles;

public FanoutDataWriter(FileWriterFactory<T> writerFactory, OutputFileFactory fileFactory,
FileIO io, FileFormat fileFormat, long targetFileSizeInBytes) {
FileIO io, long targetFileSizeInBytes) {
this.writerFactory = writerFactory;
this.fileFactory = fileFactory;
this.io = io;
this.fileFormat = fileFormat;
this.targetFileSizeInBytes = targetFileSizeInBytes;
this.dataFiles = Lists.newArrayList();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,7 @@ public void setupTable() throws Exception {
public void testClusteredDataWriterNoRecords() throws IOException {
FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());
ClusteredDataWriter<T> writer = new ClusteredDataWriter<>(
writerFactory, fileFactory, table.io(),
fileFormat, TARGET_FILE_SIZE);
writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE);

writer.close();
Assert.assertEquals("Must be no data files", 0, writer.result().dataFiles().size());
Expand All @@ -100,8 +99,7 @@ public void testClusteredDataWriterMultiplePartitions() throws IOException {

FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());
ClusteredDataWriter<T> writer = new ClusteredDataWriter<>(
writerFactory, fileFactory, table.io(),
fileFormat, TARGET_FILE_SIZE);
writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE);

PartitionSpec spec = table.spec();

Expand Down Expand Up @@ -138,8 +136,7 @@ public void testClusteredDataWriterOutOfOrderPartitions() throws IOException {

FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());
ClusteredDataWriter<T> writer = new ClusteredDataWriter<>(
writerFactory, fileFactory, table.io(),
fileFormat, TARGET_FILE_SIZE);
writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE);

PartitionSpec spec = table.spec();

Expand All @@ -162,8 +159,7 @@ public void testClusteredEqualityDeleteWriterNoRecords() throws IOException {
Schema equalityDeleteRowSchema = table.schema().select("id");
FileWriterFactory<T> writerFactory = newWriterFactory(table.schema(), equalityFieldIds, equalityDeleteRowSchema);
ClusteredEqualityDeleteWriter<T> writer = new ClusteredEqualityDeleteWriter<>(
writerFactory, fileFactory, table.io(),
fileFormat, TARGET_FILE_SIZE);
writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE);

writer.close();
Assert.assertEquals(0, writer.result().deleteFiles().size());
Expand Down Expand Up @@ -226,8 +222,7 @@ public void testClusteredEqualityDeleteWriterMultipleSpecs() throws IOException
.commit();

ClusteredEqualityDeleteWriter<T> writer = new ClusteredEqualityDeleteWriter<>(
writerFactory, fileFactory, table.io(),
fileFormat, TARGET_FILE_SIZE);
writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE);

PartitionSpec unpartitionedSpec = table.specs().get(0);
PartitionSpec bucketSpec = table.specs().get(1);
Expand Down Expand Up @@ -274,8 +269,7 @@ public void testClusteredEqualityDeleteWriterOutOfOrderSpecsAndPartitions() thro
.commit();

ClusteredEqualityDeleteWriter<T> writer = new ClusteredEqualityDeleteWriter<>(
writerFactory, fileFactory, table.io(),
fileFormat, TARGET_FILE_SIZE);
writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE);

PartitionSpec unpartitionedSpec = table.specs().get(0);
PartitionSpec bucketSpec = table.specs().get(1);
Expand Down Expand Up @@ -303,8 +297,7 @@ public void testClusteredEqualityDeleteWriterOutOfOrderSpecsAndPartitions() thro
public void testClusteredPositionDeleteWriterNoRecords() throws IOException {
FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());
ClusteredPositionDeleteWriter<T> writer = new ClusteredPositionDeleteWriter<>(
writerFactory, fileFactory, table.io(),
fileFormat, TARGET_FILE_SIZE);
writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE);

writer.close();
Assert.assertEquals(0, writer.result().deleteFiles().size());
Expand Down Expand Up @@ -365,8 +358,7 @@ public void testClusteredPositionDeleteWriterMultipleSpecs() throws IOException
.commit();

ClusteredPositionDeleteWriter<T> writer = new ClusteredPositionDeleteWriter<>(
writerFactory, fileFactory, table.io(),
fileFormat, TARGET_FILE_SIZE);
writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE);

PartitionSpec unpartitionedSpec = table.specs().get(0);
PartitionSpec bucketSpec = table.specs().get(1);
Expand Down Expand Up @@ -411,8 +403,7 @@ public void testClusteredPositionDeleteWriterOutOfOrderSpecsAndPartitions() thro
.commit();

ClusteredPositionDeleteWriter<T> writer = new ClusteredPositionDeleteWriter<>(
writerFactory, fileFactory, table.io(),
fileFormat, TARGET_FILE_SIZE);
writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE);

PartitionSpec unpartitionedSpec = table.specs().get(0);
PartitionSpec bucketSpec = table.specs().get(1);
Expand Down Expand Up @@ -446,8 +437,7 @@ public void testClusteredPositionDeleteWriterOutOfOrderSpecsAndPartitions() thro
public void testFanoutDataWriterNoRecords() throws IOException {
FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());
FanoutDataWriter<T> writer = new FanoutDataWriter<>(
writerFactory, fileFactory, table.io(),
fileFormat, TARGET_FILE_SIZE);
writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE);

writer.close();
Assert.assertEquals("Must be no data files", 0, writer.result().dataFiles().size());
Expand All @@ -464,8 +454,7 @@ public void testFanoutDataWriterMultiplePartitions() throws IOException {

FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());
FanoutDataWriter<T> writer = new FanoutDataWriter<>(
writerFactory, fileFactory, table.io(),
fileFormat, TARGET_FILE_SIZE);
writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE);

PartitionSpec spec = table.spec();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,11 @@ public void testPositionDeltaInsertOnly() throws IOException {
FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());

ClusteredDataWriter<T> insertWriter = new ClusteredDataWriter<>(
writerFactory, fileFactory, table.io(),
fileFormat, TARGET_FILE_SIZE);
writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE);
ClusteredDataWriter<T> updateWriter = new ClusteredDataWriter<>(
writerFactory, fileFactory, table.io(),
fileFormat, TARGET_FILE_SIZE);
writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE);
ClusteredPositionDeleteWriter<T> deleteWriter = new ClusteredPositionDeleteWriter<>(
writerFactory, fileFactory, table.io(),
fileFormat, TARGET_FILE_SIZE);
writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE);
PositionDeltaWriter<T> deltaWriter = new BasePositionDeltaWriter<>(insertWriter, updateWriter, deleteWriter);

deltaWriter.insert(toRow(1, "aaa"), table.spec(), null);
Expand Down Expand Up @@ -148,14 +145,11 @@ public void testPositionDeltaDeleteOnly() throws IOException {
PartitionSpec partitionedSpec = table.specs().get(1);

ClusteredDataWriter<T> insertWriter = new ClusteredDataWriter<>(
writerFactory, fileFactory, table.io(),
fileFormat, TARGET_FILE_SIZE);
writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE);
ClusteredDataWriter<T> updateWriter = new ClusteredDataWriter<>(
writerFactory, fileFactory, table.io(),
fileFormat, TARGET_FILE_SIZE);
writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE);
ClusteredPositionDeleteWriter<T> deleteWriter = new ClusteredPositionDeleteWriter<>(
writerFactory, fileFactory, table.io(),
fileFormat, TARGET_FILE_SIZE);
writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE);
PositionDeltaWriter<T> deltaWriter = new BasePositionDeltaWriter<>(insertWriter, updateWriter, deleteWriter);

deltaWriter.delete(dataFile1.path(), 2L, unpartitionedSpec, null);
Expand Down Expand Up @@ -220,14 +214,11 @@ public void testPositionDeltaMultipleSpecs() throws IOException {
PartitionSpec partitionedSpec = table.specs().get(1);

ClusteredDataWriter<T> insertWriter = new ClusteredDataWriter<>(
writerFactory, fileFactory, table.io(),
fileFormat, TARGET_FILE_SIZE);
writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE);
ClusteredDataWriter<T> updateWriter = new ClusteredDataWriter<>(
writerFactory, fileFactory, table.io(),
fileFormat, TARGET_FILE_SIZE);
writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE);
ClusteredPositionDeleteWriter<T> deleteWriter = new ClusteredPositionDeleteWriter<>(
writerFactory, fileFactory, table.io(),
fileFormat, TARGET_FILE_SIZE);
writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE);
PositionDeltaWriter<T> deltaWriter = new BasePositionDeltaWriter<>(insertWriter, updateWriter, deleteWriter);

deltaWriter.delete(dataFile1.path(), 2L, unpartitionedSpec, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.configuration.CoreOptions;
Expand Down Expand Up @@ -50,7 +51,6 @@
import org.apache.iceberg.types.Types;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
Expand Down Expand Up @@ -320,7 +320,6 @@ public void testRewriteLargeTableHasResiduals() throws IOException {
*/
@Test
public void testRewriteAvoidRepeateCompress() throws IOException {
Assume.assumeFalse("ORC does not support getting length when file is opening", format.equals(FileFormat.ORC));
List<Record> expected = Lists.newArrayList();
Schema schema = icebergTableUnPartitioned.schema();
GenericAppenderFactory genericAppenderFactory = new GenericAppenderFactory(schema);
Expand All @@ -329,7 +328,7 @@ public void testRewriteAvoidRepeateCompress() throws IOException {
try (FileAppender<Record> fileAppender = genericAppenderFactory.newAppender(Files.localOutput(file), format)) {
long filesize = 20000;
for (; fileAppender.length() < filesize; count++) {
Record record = SimpleDataUtil.createRecord(count, "iceberg");
Record record = SimpleDataUtil.createRecord(count, UUID.randomUUID().toString());
fileAppender.add(record);
expected.add(record);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,10 +233,6 @@ public void testBoundedStreamCloseWithEmittingDataFiles() throws Exception {

@Test
public void testTableWithTargetFileSize() throws Exception {
// TODO: ORC file does not support target file size before closed.
if (format == FileFormat.ORC) {
return;
}
// Adjust the target-file-size in table properties.
table.updateProperties()
.set(TableProperties.WRITE_TARGET_FILE_SIZE_BYTES, "4") // ~4 bytes; low enough to trigger
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,10 +183,6 @@ public void testCompleteFiles() throws IOException {

@Test
public void testRollingWithTargetFileSize() throws IOException {
// TODO ORC don't support target file size before closed.
if (format == FileFormat.ORC) {
return;
}
try (TaskWriter<RowData> taskWriter = createTaskWriter(4)) {
List<RowData> rows = Lists.newArrayListWithCapacity(8000);
List<Record> records = Lists.newArrayListWithCapacity(8000);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.configuration.CoreOptions;
Expand Down Expand Up @@ -50,7 +51,6 @@
import org.apache.iceberg.types.Types;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
Expand Down Expand Up @@ -322,7 +322,6 @@ public void testRewriteLargeTableHasResiduals() throws IOException {
*/
@Test
public void testRewriteAvoidRepeateCompress() throws IOException {
Assume.assumeFalse("ORC does not support getting length when file is opening", format.equals(FileFormat.ORC));
List<Record> expected = Lists.newArrayList();
Schema schema = icebergTableUnPartitioned.schema();
GenericAppenderFactory genericAppenderFactory = new GenericAppenderFactory(schema);
Expand All @@ -331,7 +330,7 @@ public void testRewriteAvoidRepeateCompress() throws IOException {
try (FileAppender<Record> fileAppender = genericAppenderFactory.newAppender(Files.localOutput(file), format)) {
long filesize = 20000;
for (; fileAppender.length() < filesize; count++) {
Record record = SimpleDataUtil.createRecord(count, "iceberg");
Record record = SimpleDataUtil.createRecord(count, UUID.randomUUID().toString());
fileAppender.add(record);
expected.add(record);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,10 +233,6 @@ public void testBoundedStreamCloseWithEmittingDataFiles() throws Exception {

@Test
public void testTableWithTargetFileSize() throws Exception {
// TODO: ORC file does not support target file size before closed.
if (format == FileFormat.ORC) {
return;
}
// Adjust the target-file-size in table properties.
table.updateProperties()
.set(TableProperties.WRITE_TARGET_FILE_SIZE_BYTES, "4") // ~4 bytes; low enough to trigger
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,10 +183,6 @@ public void testCompleteFiles() throws IOException {

@Test
public void testRollingWithTargetFileSize() throws IOException {
// TODO ORC don't support target file size before closed.
if (format == FileFormat.ORC) {
return;
}
try (TaskWriter<RowData> taskWriter = createTaskWriter(4)) {
List<RowData> rows = Lists.newArrayListWithCapacity(8000);
List<Record> records = Lists.newArrayListWithCapacity(8000);
Expand Down
Loading