Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Schema;
import org.apache.iceberg.parquet.ParquetSchemaUtil;
import org.apache.iceberg.parquet.ParquetUtil;
Expand Down Expand Up @@ -137,61 +136,52 @@ public ParquetValueReader<?> message(
@Override
public ParquetValueReader<?> struct(
Types.StructType expected, GroupType struct, List<ParquetValueReader<?>> fieldReaders) {
if (null == expected) {
return new InternalRowReader(ImmutableList.of());
}

// match the expected struct's order
Map<Integer, ParquetValueReader<?>> readersById = Maps.newHashMap();
Map<Integer, Type> typesById = Maps.newHashMap();
Map<Integer, Integer> maxDefinitionLevelsById = Maps.newHashMap();
List<Type> fields = struct.getFields();
for (int i = 0; i < fields.size(); i += 1) {
Type fieldType = fields.get(i);
int fieldD = type.getMaxDefinitionLevel(path(fieldType.getName())) - 1;
if (fieldType.getId() != null) {
int id = fieldType.getId().intValue();
readersById.put(id, ParquetValueReaders.option(fieldType, fieldD, fieldReaders.get(i)));
typesById.put(id, fieldType);
if (idToConstant.containsKey(id)) {
maxDefinitionLevelsById.put(id, fieldD);
}
}
}

List<Types.NestedField> expectedFields =
expected != null ? expected.fields() : ImmutableList.of();
int constantDefinitionLevel = type.getMaxDefinitionLevel(currentPath());
List<Types.NestedField> expectedFields = expected.fields();
List<ParquetValueReader<?>> reorderedFields =
Lists.newArrayListWithExpectedSize(expectedFields.size());
// Defaulting to parent max definition level
int defaultMaxDefinitionLevel = type.getMaxDefinitionLevel(currentPath());

for (Types.NestedField field : expectedFields) {
int id = field.fieldId();
ParquetValueReader<?> reader = readersById.get(id);
if (idToConstant.containsKey(id)) {
// containsKey is used because the constant may be null
int fieldMaxDefinitionLevel =
maxDefinitionLevelsById.getOrDefault(id, defaultMaxDefinitionLevel);
reorderedFields.add(
ParquetValueReaders.constant(idToConstant.get(id), fieldMaxDefinitionLevel));
} else if (id == MetadataColumns.ROW_POSITION.fieldId()) {
reorderedFields.add(ParquetValueReaders.position());
} else if (id == MetadataColumns.IS_DELETED.fieldId()) {
reorderedFields.add(ParquetValueReaders.constant(false));
} else if (reader != null) {
reorderedFields.add(reader);
} else if (field.initialDefault() != null) {
reorderedFields.add(
ParquetValueReaders.constant(
SparkUtil.internalToSpark(field.type(), field.initialDefault()),
maxDefinitionLevelsById.getOrDefault(id, defaultMaxDefinitionLevel)));
} else if (field.isOptional()) {
reorderedFields.add(ParquetValueReaders.nulls());
} else {
throw new IllegalArgumentException(
String.format("Missing required field: %s", field.name()));
}
ParquetValueReader<?> reader =
ParquetValueReaders.replaceWithMetadataReader(
id, readersById.get(id), idToConstant, constantDefinitionLevel);
reorderedFields.add(defaultReader(field, reader, constantDefinitionLevel));
}

return new InternalRowReader(reorderedFields);
}

private ParquetValueReader<?> defaultReader(
Types.NestedField field, ParquetValueReader<?> reader, int constantDL) {
if (reader != null) {
return reader;
} else if (field.initialDefault() != null) {
return ParquetValueReaders.constant(
SparkUtil.internalToSpark(field.type(), field.initialDefault()), constantDL);
} else if (field.isOptional()) {
return ParquetValueReaders.nulls();
}

throw new IllegalArgumentException(String.format("Missing required field: %s", field.name()));
}

@Override
public ParquetValueReader<?> list(
Types.ListType expectedList, GroupType array, ParquetValueReader<?> elementReader) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,10 @@ private boolean useCometBatchReads() {
}

private boolean supportsCometBatchReads(Types.NestedField field) {
return field.type().isPrimitiveType() && !field.type().typeId().equals(Type.TypeID.UUID);
return field.type().isPrimitiveType()
&& !field.type().typeId().equals(Type.TypeID.UUID)
&& field.fieldId() != MetadataColumns.ROW_ID.fieldId()
&& field.fieldId() != MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.fieldId();
}

// conditions for using ORC batch reads:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,7 @@ protected void assertEquals(
Object[] actual = actualRows.get(row);
assertThat(actual).as("Number of columns should match").hasSameSizeAs(expected);
for (int col = 0; col < actualRows.get(row).length; col += 1) {
String newContext = String.format("%s: row %d col %d", context, row + 1, col + 1);
assertEquals(newContext, expected, actual);
assertEquals(context + ": row " + (row + 1), expected, actual);
}
}
}
Expand All @@ -83,7 +82,9 @@ protected void assertEquals(String context, Object[] expectedRow, Object[] actua
assertEquals(newContext, (Object[]) expectedValue, (Object[]) actualValue);
}
} else if (expectedValue != ANY) {
assertThat(actualValue).as("%s contents should match", context).isEqualTo(expectedValue);
assertThat(actualValue)
.as(context + " col " + (col + 1) + " contents should match")
.isEqualTo(expectedValue);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,15 @@
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Schema;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.expressions.Literal;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Type;
Expand All @@ -51,13 +56,27 @@

public abstract class AvroDataTest {

private static final long FIRST_ROW_ID = 2_000L;
protected static final Map<Integer, Object> ID_TO_CONSTANT =
Map.of(
MetadataColumns.ROW_ID.fieldId(),
FIRST_ROW_ID,
MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.fieldId(),
34L);

protected abstract void writeAndValidate(Schema schema) throws IOException;

protected void writeAndValidate(Schema writeSchema, Schema expectedSchema) throws IOException {
throw new UnsupportedEncodingException(
"Cannot run test, writeAndValidate(Schema, Schema) is not implemented");
}

protected void writeAndValidate(Schema writeSchema, Schema expectedSchema, List<Record> records)
throws IOException {
throw new UnsupportedEncodingException(
"Cannot run test, writeAndValidate(Schema, Schema, List<Record>) is not implemented");
}

protected boolean supportsDefaultValues() {
return false;
}
Expand All @@ -66,6 +85,10 @@ protected boolean supportsNestedTypes() {
return true;
}

protected boolean supportsRowLineage() {
return false;
}

protected static final StructType SUPPORTED_PRIMITIVES =
StructType.of(
required(100, "id", LongType.get()),
Expand Down Expand Up @@ -547,4 +570,39 @@ public void testPrimitiveTypeDefaultValues(Type.PrimitiveType type, Literal<?> d

writeAndValidate(writeSchema, readSchema);
}

@Test
public void testRowLineage() throws Exception {
Assumptions.assumeThat(supportsRowLineage())
.as("Row lineage support is not implemented")
.isTrue();

Schema schema =
new Schema(
required(1, "id", LongType.get()),
required(2, "data", Types.StringType.get()),
MetadataColumns.ROW_ID,
MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER);

GenericRecord record = GenericRecord.create(schema);

writeAndValidate(
schema,
schema,
List.of(
record.copy(Map.of("id", 1L, "data", "a")),
record.copy(Map.of("id", 2L, "data", "b")),
record.copy(
Map.of(
"id",
3L,
"data",
"c",
"_row_id",
1_000L,
"_last_updated_sequence_number",
33L)),
record.copy(Map.of("id", 4L, "data", "d", "_row_id", 1_001L)),
record.copy(Map.of("id", 5L, "data", "e"))));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.data.GenericDataUtil;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Type;
Expand Down Expand Up @@ -198,12 +200,47 @@ private static void assertEqualsSafe(Type type, Object expected, Object actual)

public static void assertEqualsUnsafe(
Types.StructType struct, Record expected, InternalRow actual) {
assertEqualsUnsafe(struct, expected, actual, null, -1);
}

public static void assertEqualsUnsafe(
Types.StructType struct,
Record expected,
InternalRow actual,
Map<Integer, Object> idToConstant,
int pos) {
Types.StructType expectedType = expected.struct();
List<Types.NestedField> fields = struct.fields();
for (int i = 0; i < fields.size(); i += 1) {
Type fieldType = fields.get(i).type();
for (int readPos = 0; readPos < fields.size(); readPos += 1) {
Types.NestedField field = fields.get(readPos);
Types.NestedField expectedField = expectedType.field(field.fieldId());

Object expectedValue = expected.get(i);
Object actualValue = actual.get(i, convert(fieldType));
Type fieldType = field.type();
Object actualValue =
actual.isNullAt(readPos) ? null : actual.get(readPos, convert(fieldType));

Object expectedValue;
if (expectedField != null) {
int id = expectedField.fieldId();
if (id == MetadataColumns.ROW_ID.fieldId()) {
expectedValue = expected.getField(expectedField.name());
if (expectedValue == null && idToConstant != null) {
expectedValue = (Long) idToConstant.get(id) + pos;
}

} else if (id == MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.fieldId()) {
expectedValue = expected.getField(expectedField.name());
if (expectedValue == null && idToConstant != null) {
expectedValue = idToConstant.get(id);
}

} else {
expectedValue = expected.getField(expectedField.name());
}
} else {
// comparison expects Iceberg's generic representation
expectedValue = GenericDataUtil.internalToGeneric(field.type(), field.initialDefault());
}

assertEqualsUnsafe(fieldType, expectedValue, actualValue);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.apache.iceberg.spark.data;

import static org.apache.iceberg.spark.data.TestHelpers.assertEqualsUnsafe;
import static org.apache.iceberg.types.Types.NestedField.required;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
Expand All @@ -28,7 +27,6 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.avro.generic.GenericData;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.FileFormat;
Expand All @@ -38,7 +36,9 @@
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.data.IcebergGenerics;
import org.apache.iceberg.data.RandomGenericData;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.data.parquet.GenericParquetWriter;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.inmemory.InMemoryOutputFile;
import org.apache.iceberg.io.CloseableIterable;
Expand Down Expand Up @@ -70,6 +70,13 @@ protected void writeAndValidate(Schema schema) throws IOException {

@Override
protected void writeAndValidate(Schema writeSchema, Schema expectedSchema) throws IOException {
List<Record> expected = RandomGenericData.generate(writeSchema, 100, 0L);
writeAndValidate(writeSchema, expectedSchema, expected);
}

@Override
protected void writeAndValidate(Schema writeSchema, Schema expectedSchema, List<Record> expected)
throws IOException {
assumeThat(
null
== TypeUtil.find(
Expand All @@ -79,29 +86,39 @@ protected void writeAndValidate(Schema writeSchema, Schema expectedSchema) throw
.as("Parquet Avro cannot write non-string map keys")
.isTrue();

List<GenericData.Record> expected = RandomData.generateList(writeSchema, 100, 0L);

OutputFile outputFile = new InMemoryOutputFile();

try (FileAppender<GenericData.Record> writer =
Parquet.write(outputFile).schema(writeSchema).named("test").build()) {
OutputFile output = new InMemoryOutputFile();
try (FileAppender<Record> writer =
Parquet.write(output)
.schema(writeSchema)
.createWriterFunc(GenericParquetWriter::create)
.named("test")
.build()) {
writer.addAll(expected);
}

try (CloseableIterable<InternalRow> reader =
Parquet.read(outputFile.toInputFile())
Parquet.read(output.toInputFile())
.project(expectedSchema)
.createReaderFunc(type -> SparkParquetReaders.buildReader(expectedSchema, type))
.createReaderFunc(
type -> SparkParquetReaders.buildReader(expectedSchema, type, ID_TO_CONSTANT))
.build()) {
Iterator<InternalRow> rows = reader.iterator();
for (GenericData.Record record : expected) {
assertThat(rows.hasNext()).as("Should have expected number of rows").isTrue();
assertEqualsUnsafe(expectedSchema.asStruct(), record, rows.next());
int pos = 0;
for (Record record : expected) {
assertThat(rows).as("Should have expected number of rows").hasNext();
GenericsHelpers.assertEqualsUnsafe(
expectedSchema.asStruct(), record, rows.next(), ID_TO_CONSTANT, pos);
pos += 1;
}
assertThat(rows.hasNext()).as("Should not have extra rows").isFalse();
assertThat(rows).as("Should not have extra rows").isExhausted();
}
}

@Override
protected boolean supportsRowLineage() {
return true;
}

@Override
protected boolean supportsDefaultValues() {
return true;
Expand Down