Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 38 additions & 33 deletions data/src/test/java/org/apache/iceberg/data/FileHelpers.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,45 +22,47 @@
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.MetricsConfig;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.data.parquet.GenericParquetWriter;
import org.apache.iceberg.deletes.EqualityDeleteWriter;
import org.apache.iceberg.deletes.PositionDeleteWriter;
import org.apache.iceberg.encryption.EncryptedFiles;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.encryption.EncryptionKeyMetadata;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.FileAppenderFactory;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.CharSequenceSet;
import org.apache.iceberg.util.Pair;

import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;

public class FileHelpers {
private FileHelpers() {
}

public static Pair<DeleteFile, CharSequenceSet> writeDeleteFile(Table table, OutputFile out,
List<Pair<CharSequence, Long>> deletes)
List<Pair<CharSequence, Long>> deletes)
throws IOException {
return writeDeleteFile(table, out, null, deletes);
}

public static Pair<DeleteFile, CharSequenceSet> writeDeleteFile(Table table, OutputFile out, StructLike partition,
List<Pair<CharSequence, Long>> deletes)
List<Pair<CharSequence, Long>> deletes)
throws IOException {
PositionDeleteWriter<?> writer = Parquet.writeDeletes(out)
.withSpec(table.spec())
.setAll(table.properties())
.metricsConfig(MetricsConfig.forTable(table))
.withPartition(partition)
.overwrite()
.buildPositionWriter();
FileFormat format = defaultFormat(table.properties());
FileAppenderFactory<Record> factory = new GenericAppenderFactory(table.schema(), table.spec());

PositionDeleteWriter<?> writer = factory.newPosDeleteWriter(encrypt(out), format, partition);
try (Closeable toClose = writer) {
for (Pair<CharSequence, Long> delete : deletes) {
writer.delete(delete.first(), delete.second());
Expand All @@ -76,16 +78,14 @@ public static DeleteFile writeDeleteFile(Table table, OutputFile out, List<Recor
}

public static DeleteFile writeDeleteFile(Table table, OutputFile out, StructLike partition,
List<Record> deletes, Schema deleteRowSchema) throws IOException {
EqualityDeleteWriter<Record> writer = Parquet.writeDeletes(out)
.forTable(table)
.withPartition(partition)
.rowSchema(deleteRowSchema)
.createWriterFunc(GenericParquetWriter::buildWriter)
.overwrite()
.equalityFieldIds(deleteRowSchema.columns().stream().mapToInt(Types.NestedField::fieldId).toArray())
.buildEqualityWriter();
List<Record> deletes, Schema deleteRowSchema)
throws IOException {
FileFormat format = defaultFormat(table.properties());
int[] equalityFieldIds = deleteRowSchema.columns().stream().mapToInt(Types.NestedField::fieldId).toArray();
FileAppenderFactory<Record> factory = new GenericAppenderFactory(table.schema(), table.spec(),
equalityFieldIds, deleteRowSchema, null);

EqualityDeleteWriter<Record> writer = factory.newEqDeleteWriter(encrypt(out), format, partition);
try (Closeable toClose = writer) {
writer.deleteAll(deletes);
}
Expand All @@ -94,18 +94,16 @@ public static DeleteFile writeDeleteFile(Table table, OutputFile out, StructLike
}

public static DataFile writeDataFile(Table table, OutputFile out, List<Record> rows) throws IOException {
FileAppender<Record> writer = Parquet.write(out)
.createWriterFunc(GenericParquetWriter::buildWriter)
.schema(table.schema())
.overwrite()
.build();
FileFormat format = defaultFormat(table.properties());
GenericAppenderFactory factory = new GenericAppenderFactory(table.schema());

FileAppender<Record> writer = factory.newAppender(out, format);
try (Closeable toClose = writer) {
writer.addAll(rows);
}

return DataFiles.builder(table.spec())
.withFormat(FileFormat.PARQUET)
.withFormat(format)
.withPath(out.location())
.withFileSizeInBytes(writer.length())
.withSplitOffsets(writer.splitOffsets())
Expand All @@ -115,23 +113,30 @@ public static DataFile writeDataFile(Table table, OutputFile out, List<Record> r

public static DataFile writeDataFile(Table table, OutputFile out, StructLike partition, List<Record> rows)
throws IOException {
FileAppender<Record> writer = Parquet.write(out)
.createWriterFunc(GenericParquetWriter::buildWriter)
.schema(table.schema())
.overwrite()
.build();
FileFormat format = defaultFormat(table.properties());
GenericAppenderFactory factory = new GenericAppenderFactory(table.schema(), table.spec());

FileAppender<Record> writer = factory.newAppender(out, format);
try (Closeable toClose = writer) {
writer.addAll(rows);
}

return DataFiles.builder(table.spec())
.withFormat(FileFormat.PARQUET)
.withFormat(format)
.withPath(out.location())
.withPartition(partition)
.withFileSizeInBytes(writer.length())
.withSplitOffsets(writer.splitOffsets())
.withMetrics(writer.metrics())
.build();
}

private static EncryptedOutputFile encrypt(OutputFile out) {
return EncryptedFiles.encryptedOutput(out, EncryptionKeyMetadata.EMPTY);
}

private static FileFormat defaultFormat(Map<String, String> properties) {
String formatString = properties.getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT);
return FileFormat.valueOf(formatString.toUpperCase(Locale.ENGLISH));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,23 @@
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import org.apache.avro.LogicalType;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.Decoder;
import org.apache.flink.table.data.RowData;
import org.apache.iceberg.avro.AvroSchemaWithTypeVisitor;
import org.apache.iceberg.avro.SupportsRowPosition;
import org.apache.iceberg.avro.ValueReader;
import org.apache.iceberg.avro.ValueReaders;
import org.apache.iceberg.data.avro.DecoderResolver;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;

public class FlinkAvroReader implements DatumReader<RowData> {
public class FlinkAvroReader implements DatumReader<RowData>, SupportsRowPosition {

private final Schema readSchema;
private final ValueReader<RowData> reader;
Expand All @@ -63,6 +65,13 @@ public RowData read(RowData reuse, Decoder decoder) throws IOException {
return DecoderResolver.resolveAndRead(decoder, readSchema, fileSchema, reader, reuse);
}

@Override
public void setRowPositionSupplier(Supplier<Long> posSupplier) {
if (reader instanceof SupportsRowPosition) {
((SupportsRowPosition) reader).setRowPositionSupplier(posSupplier);
}
}

private static class ReadBuilder extends AvroSchemaWithTypeVisitor<ValueReader<?>> {
private final Map<Integer, ?> idToConstant;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,23 @@
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import org.apache.avro.LogicalType;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.Decoder;
import org.apache.flink.table.data.RowData;
import org.apache.iceberg.avro.AvroSchemaWithTypeVisitor;
import org.apache.iceberg.avro.SupportsRowPosition;
import org.apache.iceberg.avro.ValueReader;
import org.apache.iceberg.avro.ValueReaders;
import org.apache.iceberg.data.avro.DecoderResolver;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;

public class FlinkAvroReader implements DatumReader<RowData> {
public class FlinkAvroReader implements DatumReader<RowData>, SupportsRowPosition {

private final Schema readSchema;
private final ValueReader<RowData> reader;
Expand All @@ -63,6 +65,13 @@ public RowData read(RowData reuse, Decoder decoder) throws IOException {
return DecoderResolver.resolveAndRead(decoder, readSchema, fileSchema, reader, reuse);
}

@Override
public void setRowPositionSupplier(Supplier<Long> posSupplier) {
if (reader instanceof SupportsRowPosition) {
((SupportsRowPosition) reader).setRowPositionSupplier(posSupplier);
}
}

private static class ReadBuilder extends AvroSchemaWithTypeVisitor<ValueReader<?>> {
private final Map<Integer, ?> idToConstant;

Expand Down