Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions data/src/main/java/org/apache/iceberg/data/GenericReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.util.Map;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Schema;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.avro.Avro;
Expand All @@ -38,10 +37,9 @@
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.orc.ORCSchemaUtil;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.util.PartitionUtil;

class GenericReader implements Serializable {
Expand Down Expand Up @@ -129,8 +127,7 @@ private CloseableIterable<Record> openFile(FileScanTask task, Schema fileProject

case ORC:
Schema projectionWithoutConstantAndMetadataFields =
TypeUtil.selectNot(
fileProjection, Sets.union(partition.keySet(), MetadataColumns.metadataFieldIds()));
ORCSchemaUtil.removeConstantsAndMetadataFields(fileProjection, partition.keySet());
ORC.ReadBuilder orc =
ORC.read(input)
.project(projectionWithoutConstantAndMetadataFields)
Expand Down
67 changes: 63 additions & 4 deletions data/src/test/java/org/apache/iceberg/data/TestLocalScan.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,11 @@
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.Tables;
import org.apache.iceberg.TestHelpers.Row;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.hadoop.HadoopInputFile;
import org.apache.iceberg.hadoop.HadoopTables;
Expand Down Expand Up @@ -524,6 +526,17 @@ private DataFile writeFile(String location, String filename, List<Record> record

private DataFile writeFile(String location, String filename, Schema schema, List<Record> records)
throws IOException {
return writeFile(location, filename, schema, records, PartitionSpec.unpartitioned(), null);
}

private DataFile writeFile(
String location,
String filename,
Schema schema,
List<Record> records,
PartitionSpec spec,
StructLike partition)
throws IOException {
Path path = new Path(location, filename);
FileFormat fileFormat = FileFormat.fromFileName(filename);
Preconditions.checkNotNull(fileFormat, "Cannot determine format for file: %s", filename);
Expand All @@ -534,10 +547,16 @@ private DataFile writeFile(String location, String filename, Schema schema, List
appender.addAll(records);
}

return DataFiles.builder(PartitionSpec.unpartitioned())
.withInputFile(HadoopInputFile.fromPath(path, CONF))
.withMetrics(fileAppender.metrics())
.build();
DataFiles.Builder builder =
DataFiles.builder(spec)
.withInputFile(HadoopInputFile.fromPath(path, CONF))
.withMetrics(fileAppender.metrics());

if (spec.isUnpartitioned()) {
return builder.build();
} else {
return builder.withPartition(partition).build();
}
}

@Test
Expand Down Expand Up @@ -584,6 +603,46 @@ public void testFilterWithDateAndTimestamp() throws IOException {
}
}

@Test
public void testProjectNestedIdentityPartitionColumn() throws IOException {
Schema nestedSchema =
new Schema(
Types.NestedField.optional(1, "id", Types.LongType.get()),
Types.NestedField.optional(
2,
"struct",
Types.StructType.of(
Types.NestedField.optional(3, "innerId", Types.LongType.get()),
Types.NestedField.optional(4, "innerName", Types.StringType.get()))));
PartitionSpec spec =
PartitionSpec.builderFor(nestedSchema).identity("struct.innerName").build();

File tableLocation = temp.newFolder("project_nested_partition_column_table");
Assert.assertTrue(tableLocation.delete());

Table table =
TABLES.create(
nestedSchema,
spec,
ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name()),
tableLocation.getAbsolutePath());

List<Record> data = RandomGenericData.generate(nestedSchema, 10, 435691832918L);
DataFile file =
writeFile(
tableLocation.toString(),
format.addExtension("record-file"),
nestedSchema,
data,
spec,
Row.of("partitionValue"));
table.newFastAppend().appendFile(file).commit();

for (Record r : IcebergGenerics.read(table).select("struct.innerName").build()) {
Assert.assertEquals("partitionValue", ((Record) r.getField("struct")).getField("innerName"));
}
}

private static ByteBuffer longToBuffer(long value) {
return ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN).putLong(0, value);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.avro.Avro;
Expand All @@ -41,9 +40,9 @@
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.mapping.NameMappingParser;
import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.orc.ORCSchemaUtil;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.util.PartitionUtil;

Expand Down Expand Up @@ -170,8 +169,7 @@ private CloseableIterable<RowData> newOrcIterable(
Map<Integer, ?> idToConstant,
InputFilesDecryptor inputFilesDecryptor) {
Schema readSchemaWithoutConstantAndMetadataFields =
TypeUtil.selectNot(
schema, Sets.union(idToConstant.keySet(), MetadataColumns.metadataFieldIds()));
ORCSchemaUtil.removeConstantsAndMetadataFields(schema, idToConstant.keySet());

ORC.ReadBuilder builder =
ORC.read(inputFilesDecryptor.getInputFile(task))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.Row;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
Expand All @@ -41,7 +40,6 @@
import org.apache.iceberg.flink.TestHelpers;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types;
import org.junit.Assume;
import org.junit.Test;

/** Test {@link FlinkInputFormat}. */
Expand Down Expand Up @@ -142,8 +140,6 @@ public void testBasicProjection() throws IOException {

@Test
public void testReadPartitionColumn() throws Exception {
Assume.assumeTrue("Temporary skip ORC", FileFormat.ORC != fileFormat);

Schema nestedSchema =
new Schema(
Types.NestedField.optional(1, "id", Types.LongType.get()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataTableScan;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SchemaParser;
Expand Down Expand Up @@ -71,9 +70,9 @@
import org.apache.iceberg.mr.InputFormatConfig;
import org.apache.iceberg.mr.hive.HiveIcebergStorageHandler;
import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.orc.ORCSchemaUtil;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.util.PartitionUtil;
Expand Down Expand Up @@ -435,8 +434,7 @@ private CloseableIterable<T> newOrcIterable(
Map<Integer, ?> idToConstant =
constantsMap(task, IdentityPartitionConverters::convertConstant);
Schema readSchemaWithoutConstantAndMetadataFields =
TypeUtil.selectNot(
readSchema, Sets.union(idToConstant.keySet(), MetadataColumns.metadataFieldIds()));
ORCSchemaUtil.removeConstantsAndMetadataFields(readSchema, idToConstant.keySet());

CloseableIterable<T> orcIterator = null;
// ORC does not support reuse containers yet
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,46 @@ public void testIdentityPartitionProjections() throws Exception {
validateIdentityPartitionProjections(withColumns("message", "level", "date"), inputRecords);
}

private static final Schema NESTED_LOG_SCHEMA =
new Schema(
Types.NestedField.optional(1, "id", Types.IntegerType.get()),
Types.NestedField.optional(2, "date", Types.StringType.get()),
Types.NestedField.optional(
3,
"struct",
Types.StructType.of(
Types.NestedField.optional(4, "level", Types.StringType.get()),
Types.NestedField.optional(5, "message", Types.StringType.get()))));

private static final PartitionSpec NESTED_IDENTITY_PARTITION_SPEC =
PartitionSpec.builderFor(NESTED_LOG_SCHEMA).identity("date").identity("struct.level").build();

@Test
public void testNestedIdentityPartitionProjections() throws Exception {
helper.createTable(NESTED_LOG_SCHEMA, NESTED_IDENTITY_PARTITION_SPEC);
List<Record> inputRecords = helper.generateRandomRecords(10, 0L);

Integer idx = 0;
AppendFiles append = helper.table().newAppend();
for (Record record : inputRecords) {
record.set(1, "2020-03-2" + idx);
record.get(2, Record.class).set(0, idx.toString());
append.appendFile(
helper.writeFile(Row.of("2020-03-2" + idx, idx.toString()), ImmutableList.of(record)));
idx += 1;
}
append.commit();

builder.project(NESTED_LOG_SCHEMA.select("struct.level"));
List<Record> actualRecords = testInputFormat.create(builder.conf()).getRecords();

for (int pos = 0; pos < inputRecords.size(); pos++) {
Assert.assertEquals(
String.valueOf(pos),
((Record) actualRecords.get(pos).getField("struct")).getField("level"));
}
}

private static Schema withColumns(String... names) {
Map<String, Integer> indexByName = TypeUtil.indexByName(LOG_SCHEMA.asStruct());
Set<Integer> projectedIds = Sets.newHashSet();
Expand Down
22 changes: 22 additions & 0 deletions orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,15 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Schema;
import org.apache.iceberg.mapping.NameMapping;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMultimap;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;
Expand Down Expand Up @@ -442,4 +445,23 @@ static TypeDescription applyNameMapping(TypeDescription orcSchema, NameMapping n
public static Map<Integer, String> idToOrcName(Schema schema) {
return TypeUtil.visit(schema, new IdToOrcName());
}

/**
* Returns a {@link Schema} which has constant fields and metadata fields removed from the
* provided schema. This utility can be used to create a "read schema" which can be passed to the
* ORC file reader and hence avoiding deserialization and memory costs associated with column
* values already available through Iceberg metadata.
*
* <p>NOTE: This method, unlike {@link TypeUtil#selectNot(Schema, Set)}, preserves empty structs
* (caused due to a struct having all constant fields) so that Iceberg ORC readers can later add
* constant fields in these structs
*/
public static Schema removeConstantsAndMetadataFields(
Schema schema, Set<Integer> constantFieldIds) {
Set<Integer> projectedIds = TypeUtil.getProjectedIds(schema.asStruct());
return TypeUtil.project(
schema,
Sets.difference(
projectedIds, Sets.union(constantFieldIds, MetadataColumns.metadataFieldIds())));
}
}
36 changes: 36 additions & 0 deletions orc/src/test/java/org/apache/iceberg/orc/TestORCSchemaUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,12 @@
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Schema;
import org.apache.iceberg.mapping.MappingUtil;
import org.apache.iceberg.mapping.NameMapping;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;
import org.apache.orc.TypeDescription;
Expand Down Expand Up @@ -518,6 +521,39 @@ public void testAssignIdsByNameMappingAndProject() {
"Schema should be the prunned by projection", equalsWithIds(expected, projectedOrcSchema));
}

@Test
public void testRemoveConstantsAndMetadataFields() {
Schema schema =
new Schema(
Lists.newArrayList(
MetadataColumns.FILE_PATH,
required(1, "id", Types.LongType.get()),
required(
2,
"location",
Types.StructType.of(
required(3, "lat", Types.DoubleType.get()),
required(4, "long", Types.DoubleType.get())))));

// should remove constant fields and metadata fields
Schema expectedConstantsAndMetadataFieldsRemoved =
new Schema(
required(
2, "location", Types.StructType.of(required(4, "long", Types.DoubleType.get()))));
assertEquals(
expectedConstantsAndMetadataFieldsRemoved.asStruct(),
ORCSchemaUtil.removeConstantsAndMetadataFields(schema, Sets.newHashSet(1, 3)).asStruct());

// should not remove empty structs if their elements are removed
Schema expectedPreserveEmptyStructElements =
new Schema(
required(1, "id", Types.LongType.get()),
required(2, "location", Types.StructType.of()));
assertEquals(
expectedPreserveEmptyStructElements.asStruct(),
ORCSchemaUtil.removeConstantsAndMetadataFields(schema, Sets.newHashSet(3, 4)).asStruct());
}

private static boolean equalsWithIds(TypeDescription first, TypeDescription second) {
if (second == first) {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@
package org.apache.iceberg.spark.source;

import java.util.Map;
import java.util.Set;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.ScanTask;
import org.apache.iceberg.ScanTaskGroup;
import org.apache.iceberg.Schema;
Expand All @@ -30,11 +28,10 @@
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.orc.ORCSchemaUtil;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.spark.data.vectorized.VectorizedSparkOrcReaders;
import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders;
import org.apache.iceberg.types.TypeUtil;
import org.apache.spark.sql.vectorized.ColumnarBatch;

abstract class BaseBatchReader<T extends ScanTask> extends BaseReader<ColumnarBatch, T> {
Expand Down Expand Up @@ -105,12 +102,8 @@ private CloseableIterable<ColumnarBatch> newOrcIterable(
long length,
Expression residual,
Map<Integer, ?> idToConstant) {
Set<Integer> constantFieldIds = idToConstant.keySet();
Set<Integer> metadataFieldIds = MetadataColumns.metadataFieldIds();
Sets.SetView<Integer> constantAndMetadataFieldIds =
Sets.union(constantFieldIds, metadataFieldIds);
Schema schemaWithoutConstantAndMetadataFields =
TypeUtil.selectNot(expectedSchema(), constantAndMetadataFieldIds);
ORCSchemaUtil.removeConstantsAndMetadataFields(expectedSchema(), idToConstant.keySet());

return ORC.read(inputFile)
.project(schemaWithoutConstantAndMetadataFields)
Expand Down
Loading