Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,19 @@ public EqualityDeleteWriter<Record> newEqDeleteWriter(EncryptedOutputFile file,
.equalityFieldIds(equalityFieldIds)
.buildEqualityWriter();

case ORC:
return ORC.writeDeletes(file.encryptingOutputFile())
.createWriterFunc(GenericOrcWriter::buildWriter)
.withPartition(partition)
.overwrite()
.setAll(config)
.metricsConfig(metricsConfig)
.rowSchema(eqDeleteRowSchema)
.withSpec(spec)
.withKeyMetadata(file.keyMetadata())
.equalityFieldIds(equalityFieldIds)
.buildEqualityWriter();

case PARQUET:
return Parquet.writeDeletes(file.encryptingOutputFile())
.createWriterFunc(GenericParquetWriter::buildWriter)
Expand Down Expand Up @@ -193,6 +206,17 @@ public PositionDeleteWriter<Record> newPosDeleteWriter(EncryptedOutputFile file,
.withKeyMetadata(file.keyMetadata())
.buildPositionWriter();

case ORC:
return ORC.writeDeletes(file.encryptingOutputFile())
.createWriterFunc(GenericOrcWriter::buildWriter)
.withPartition(partition)
.overwrite()
.setAll(config)
.rowSchema(posDeleteRowSchema)
.withSpec(spec)
.withKeyMetadata(file.keyMetadata())
.buildPositionWriter();

case PARQUET:
return Parquet.writeDeletes(file.encryptingOutputFile())
.createWriterFunc(GenericParquetWriter::buildWriter)
Expand Down
10 changes: 10 additions & 0 deletions data/src/test/java/org/apache/iceberg/io/TestAppenderFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,13 @@
import org.apache.iceberg.data.IcebergGenerics;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.data.avro.DataReader;
import org.apache.iceberg.data.orc.GenericOrcReader;
import org.apache.iceberg.data.parquet.GenericParquetReaders;
import org.apache.iceberg.deletes.EqualityDeleteWriter;
import org.apache.iceberg.deletes.PositionDelete;
import org.apache.iceberg.deletes.PositionDeleteWriter;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
Expand Down Expand Up @@ -67,6 +69,8 @@ public static Object[] parameters() {
return new Object[][] {
new Object[] {"avro", false},
new Object[] {"avro", true},
new Object[] {"orc", false},
new Object[] {"orc", true},
new Object[] {"parquet", false},
new Object[] {"parquet", true}
};
Expand Down Expand Up @@ -326,6 +330,12 @@ private CloseableIterable<Record> createReader(Schema schema, InputFile inputFil
.createReaderFunc(DataReader::create)
.build();

case ORC:
return ORC.read(inputFile)
.project(schema)
.createReaderFunc(fileSchema -> GenericOrcReader.buildReader(schema, fileSchema))
.build();

default:
throw new UnsupportedOperationException("Unsupported file format: " + format);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,10 @@
import org.apache.iceberg.data.IcebergGenerics;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.data.avro.DataReader;
import org.apache.iceberg.data.orc.GenericOrcReader;
import org.apache.iceberg.data.parquet.GenericParquetReaders;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
Expand All @@ -62,7 +64,8 @@ public class TestGenericSortedPosDeleteWriter extends TableTestBase {
public static Object[] parameters() {
return new Object[][] {
new Object[] {"avro"},
new Object[] {"parquet"},
new Object[] {"orc"},
new Object[] {"parquet"}
};
}

Expand Down Expand Up @@ -326,6 +329,13 @@ private List<Record> readRecordsAsList(Schema schema, CharSequence path) throws
.build();
break;

case ORC:
iterable = ORC.read(inputFile)
.project(schema)
.createReaderFunc(fileSchema -> GenericOrcReader.buildReader(schema, fileSchema))
.build();
break;

default:
throw new UnsupportedOperationException("Unsupported file format: " + format);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,11 @@
@RunWith(Parameterized.class)
public abstract class TestPositionDeltaWriters<T> extends WriterTestBase<T> {

// TODO: add ORC once we support ORC delete files

@Parameterized.Parameters(name = "FileFormat={0}")
public static Object[] parameters() {
return new Object[][] {
new Object[]{FileFormat.AVRO},
new Object[]{FileFormat.ORC},
new Object[]{FileFormat.PARQUET}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@
import org.apache.iceberg.data.IcebergGenerics;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.data.avro.DataReader;
import org.apache.iceberg.data.orc.GenericOrcReader;
import org.apache.iceberg.data.parquet.GenericParquetReaders;
import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
Expand Down Expand Up @@ -71,6 +73,7 @@ public class TestTaskEqualityDeltaWriter extends TableTestBase {
public static Object[][] parameters() {
return new Object[][] {
{"avro"},
{"orc"},
{"parquet"}
};
}
Expand Down Expand Up @@ -503,6 +506,13 @@ private List<Record> readRecordsAsList(Schema schema, CharSequence path) throws
.build();
break;

case ORC:
iterable = ORC.read(inputFile)
.project(schema)
.createReaderFunc(fileSchema -> GenericOrcReader.buildReader(schema, fileSchema))
.build();
break;

default:
throw new UnsupportedOperationException("Unsupported file format: " + format);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,18 @@ public EqualityDeleteWriter<RowData> newEqDeleteWriter(EncryptedOutputFile outpu
.equalityFieldIds(equalityFieldIds)
.buildEqualityWriter();

case ORC:
return ORC.writeDeletes(outputFile.encryptingOutputFile())
.createWriterFunc((iSchema, typDesc) -> FlinkOrcWriter.buildWriter(flinkSchema, iSchema))
.withPartition(partition)
.overwrite()
.setAll(props)
.rowSchema(eqDeleteRowSchema)
.withSpec(spec)
.withKeyMetadata(outputFile.keyMetadata())
.equalityFieldIds(equalityFieldIds)
.buildEqualityWriter();

case PARQUET:
return Parquet.writeDeletes(outputFile.encryptingOutputFile())
.createWriterFunc(msgType -> FlinkParquetWriters.buildWriter(lazyEqDeleteFlinkSchema(), msgType))
Expand Down Expand Up @@ -201,6 +213,20 @@ public PositionDeleteWriter<RowData> newPosDeleteWriter(EncryptedOutputFile outp
.withKeyMetadata(outputFile.keyMetadata())
.buildPositionWriter();

case ORC:
RowType orcPosDeleteSchema = FlinkSchemaUtil.convert(DeleteSchemaUtil.posDeleteSchema(posDeleteRowSchema));
return ORC.writeDeletes(outputFile.encryptingOutputFile())
.createWriterFunc((iSchema, typDesc) -> FlinkOrcWriter.buildWriter(orcPosDeleteSchema, iSchema))
.withPartition(partition)
.overwrite()
.setAll(props)
.metricsConfig(metricsConfig)
.rowSchema(posDeleteRowSchema)
.withSpec(spec)
.withKeyMetadata(outputFile.keyMetadata())
.transformPaths(path -> StringData.fromString(path.toString()))
.buildPositionWriter();

case PARQUET:
RowType flinkPosDeleteSchema = FlinkSchemaUtil.convert(DeleteSchemaUtil.posDeleteSchema(posDeleteRowSchema));
return Parquet.writeDeletes(outputFile.encryptingOutputFile())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public class TestDeltaTaskWriter extends TableTestBase {
public static Object[][] parameters() {
return new Object[][] {
{"avro"},
{"orc"},
{"parquet"}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ public static Object[][] parameters() {
new Object[] {"avro", 1, false},
new Object[] {"avro", 2, true},
new Object[] {"avro", 2, false},
new Object[] {"orc", 1, true},
new Object[] {"orc", 1, false},
new Object[] {"orc", 2, true},
new Object[] {"orc", 2, false},
new Object[] {"parquet", 1, true},
new Object[] {"parquet", 1, false},
new Object[] {"parquet", 2, true},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ public static Object[][] parameters() {
new Object[] {"parquet", 1},
new Object[] {"parquet", 2},
new Object[] {"orc", 1},
new Object[] {"orc", 2}
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,8 @@ protected Object get(PositionDelete<T> delete, int index) {
@Override
public void write(PositionDelete<T> row, VectorizedRowBatch output) throws IOException {
Preconditions.checkArgument(row != null, "value must not be null");
Preconditions.checkArgument(writers().size() == 2 || row.row() != null,
"The row in PositionDelete must not be null because it was set row schema in position delete.");
writeRow(row, output);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,17 @@ public EqualityDeleteWriter<InternalRow> newEqDeleteWriter(EncryptedOutputFile f
.withKeyMetadata(file.keyMetadata())
.buildEqualityWriter();

case ORC:
return ORC.writeDeletes(file.encryptingOutputFile())
.createWriterFunc(SparkOrcWriter::new)
.overwrite()
.rowSchema(eqDeleteRowSchema)
.withSpec(spec)
.withPartition(partition)
.equalityFieldIds(equalityFieldIds)
.withKeyMetadata(file.keyMetadata())
.buildEqualityWriter();

default:
throw new UnsupportedOperationException(
"Cannot write equality-deletes for unsupported file format: " + format);
Expand Down Expand Up @@ -261,6 +272,17 @@ public PositionDeleteWriter<InternalRow> newPosDeleteWriter(EncryptedOutputFile
.withKeyMetadata(file.keyMetadata())
.buildPositionWriter();

case ORC:
return ORC.writeDeletes(file.encryptingOutputFile())
.createWriterFunc(SparkOrcWriter::new)
.overwrite()
.rowSchema(posDeleteRowSchema)
.withSpec(spec)
.withPartition(partition)
.withKeyMetadata(file.keyMetadata())
.transformPaths(path -> UTF8String.fromString(path.toString()))
.buildPositionWriter();

default:
throw new UnsupportedOperationException(
"Cannot write pos-deletes for unsupported file format: " + format);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,17 @@ public EqualityDeleteWriter<InternalRow> newEqDeleteWriter(EncryptedOutputFile f
.withKeyMetadata(file.keyMetadata())
.buildEqualityWriter();

case ORC:
return ORC.writeDeletes(file.encryptingOutputFile())
.createWriterFunc(SparkOrcWriter::new)
.overwrite()
.rowSchema(eqDeleteRowSchema)
.withSpec(spec)
.withPartition(partition)
.equalityFieldIds(equalityFieldIds)
.withKeyMetadata(file.keyMetadata())
.buildEqualityWriter();

default:
throw new UnsupportedOperationException(
"Cannot write equality-deletes for unsupported file format: " + format);
Expand Down Expand Up @@ -261,6 +272,17 @@ public PositionDeleteWriter<InternalRow> newPosDeleteWriter(EncryptedOutputFile
.withKeyMetadata(file.keyMetadata())
.buildPositionWriter();

case ORC:
return ORC.writeDeletes(file.encryptingOutputFile())
.createWriterFunc(SparkOrcWriter::new)
.overwrite()
.rowSchema(posDeleteRowSchema)
.withSpec(spec)
.withPartition(partition)
.withKeyMetadata(file.keyMetadata())
.transformPaths(path -> UTF8String.fromString(path.toString()))
.buildPositionWriter();

default:
throw new UnsupportedOperationException(
"Cannot write pos-deletes for unsupported file format: " + format);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,17 @@ public EqualityDeleteWriter<InternalRow> newEqDeleteWriter(EncryptedOutputFile f
.withKeyMetadata(file.keyMetadata())
.buildEqualityWriter();

case ORC:
return ORC.writeDeletes(file.encryptingOutputFile())
.createWriterFunc(SparkOrcWriter::new)
.overwrite()
.rowSchema(eqDeleteRowSchema)
.withSpec(spec)
.withPartition(partition)
.equalityFieldIds(equalityFieldIds)
.withKeyMetadata(file.keyMetadata())
.buildEqualityWriter();

default:
throw new UnsupportedOperationException(
"Cannot write equality-deletes for unsupported file format: " + format);
Expand Down Expand Up @@ -261,6 +272,17 @@ public PositionDeleteWriter<InternalRow> newPosDeleteWriter(EncryptedOutputFile
.withKeyMetadata(file.keyMetadata())
.buildPositionWriter();

case ORC:
return ORC.writeDeletes(file.encryptingOutputFile())
.createWriterFunc(SparkOrcWriter::new)
.overwrite()
.rowSchema(posDeleteRowSchema)
.withSpec(spec)
.withPartition(partition)
.withKeyMetadata(file.keyMetadata())
.transformPaths(path -> UTF8String.fromString(path.toString()))
.buildPositionWriter();

default:
throw new UnsupportedOperationException(
"Cannot write pos-deletes for unsupported file format: " + format);
Expand Down