From 4f297d3f18d4d08004876370495a559f6eec522f Mon Sep 17 00:00:00 2001 From: Xingyuan Lin Date: Mon, 10 Aug 2020 21:44:46 -0700 Subject: [PATCH] Avro: Fix pruning columns when a logical-map array's value type is nested E.g., map(long, list) --- .../org/apache/iceberg/avro/PruneColumns.java | 12 ++++---- .../iceberg/avro/TestAvroNameMapping.java | 6 ++++ .../iceberg/avro/TestAvroReadProjection.java | 30 +++++++++++++++++++ 3 files changed, 43 insertions(+), 5 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/avro/PruneColumns.java b/core/src/main/java/org/apache/iceberg/avro/PruneColumns.java index decca0384ba1..1e899542aec8 100644 --- a/core/src/main/java/org/apache/iceberg/avro/PruneColumns.java +++ b/core/src/main/java/org/apache/iceberg/avro/PruneColumns.java @@ -144,16 +144,18 @@ public Schema array(Schema array, Schema element) { if (selectedIds.contains(keyId) || selectedIds.contains(valueId)) { return complexMapWithIds(array, keyId, valueId); } else if (element != null) { - Schema keyProjection = element.getField("key").schema(); + Schema.Field keyProjectionField = element.getField("key"); Schema valueProjection = element.getField("value").schema(); + // it is possible that key is not selected, and // key schemas can be different if new field ids were assigned to them - if (keyValue.getField("key").schema() != keyProjection) { + if (keyProjectionField != null && keyValue.getField("key").schema() != keyProjectionField.schema()) { Preconditions.checkState( SchemaNormalization.parsingFingerprint64(keyValue.getField("key").schema()) == - SchemaNormalization.parsingFingerprint64(keyProjection), "Map keys should not be projected"); - return AvroSchemaUtil.createMap(keyId, keyProjection, valueId, valueProjection); + SchemaNormalization.parsingFingerprint64(keyProjectionField.schema()), + "Map keys should not be projected"); + return AvroSchemaUtil.createMap(keyId, keyProjectionField.schema(), valueId, valueProjection); } else if (keyValue.getField("value").schema() != valueProjection) { - return AvroSchemaUtil.createMap(keyId, keyProjection, valueId, valueProjection); + return AvroSchemaUtil.createMap(keyId, keyValue.getField("key").schema(), valueId, valueProjection); } else { return complexMapWithIds(array, keyId, valueId); } diff --git a/core/src/test/java/org/apache/iceberg/avro/TestAvroNameMapping.java b/core/src/test/java/org/apache/iceberg/avro/TestAvroNameMapping.java index 3a3c28cad84f..f3e75e1f2a92 100644 --- a/core/src/test/java/org/apache/iceberg/avro/TestAvroNameMapping.java +++ b/core/src/test/java/org/apache/iceberg/avro/TestAvroNameMapping.java @@ -281,6 +281,12 @@ public void testInferredMapping() throws IOException { Assert.assertEquals(record, projected); } + @Test + @Override + public void testAvroArrayAsLogicalMap() { + // no-op + } + @Override protected Record writeAndRead(String desc, Schema writeSchema, diff --git a/core/src/test/java/org/apache/iceberg/avro/TestAvroReadProjection.java b/core/src/test/java/org/apache/iceberg/avro/TestAvroReadProjection.java index c367e53cb548..65edefaf60da 100644 --- a/core/src/test/java/org/apache/iceberg/avro/TestAvroReadProjection.java +++ b/core/src/test/java/org/apache/iceberg/avro/TestAvroReadProjection.java @@ -21,11 +21,18 @@ import java.io.File; import java.io.IOException; +import java.util.List; +import java.util.Map; import org.apache.avro.generic.GenericData; import org.apache.iceberg.Files; import org.apache.iceberg.Schema; import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.types.Types; +import org.junit.Assert; +import org.junit.Test; public class TestAvroReadProjection extends TestReadProjection { @Override @@ -49,4 +56,27 @@ protected GenericData.Record writeAndRead(String desc, return Iterables.getOnlyElement(records); } + + @Test + public void testAvroArrayAsLogicalMap() throws IOException { + Schema writeSchema = new Schema( + Types.NestedField.optional(0, "map", Types.MapType.ofOptional(2, 3, + Types.LongType.get(), + Types.ListType.ofRequired(1, Types.LongType.get()) + )) + ); + + List values1 = ImmutableList.of(101L, 102L); + List values2 = ImmutableList.of(201L, 202L, 203L); + GenericData.Record record = new GenericData.Record(AvroSchemaUtil.convert(writeSchema, "table")); + record.put("map", ImmutableMap.of(100L, values1, 200L, values2)); + + GenericData.Record projected = writeAndRead("full_projection", writeSchema, writeSchema, record); + Assert.assertEquals("Should contain correct value list", + values1, + ((Map>) projected.get("map")).get(100L)); + Assert.assertEquals("Should contain correct value list", + values2, + ((Map>) projected.get("map")).get(200L)); + } }