diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestVariantReaders.java b/parquet/src/test/java/org/apache/iceberg/parquet/TestVariantReaders.java index 23c6e9b3282c..e7f618a620db 100644 --- a/parquet/src/test/java/org/apache/iceberg/parquet/TestVariantReaders.java +++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestVariantReaders.java @@ -21,21 +21,32 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import com.fasterxml.jackson.core.JsonGenerator; import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.StringWriter; import java.math.BigDecimal; import java.nio.ByteBuffer; import java.nio.ByteOrder; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Stream; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.IOUtils; import org.apache.iceberg.Schema; +import org.apache.iceberg.TestTables; import org.apache.iceberg.data.Record; import org.apache.iceberg.data.parquet.InternalReader; import org.apache.iceberg.inmemory.InMemoryOutputFile; import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; @@ -46,6 +57,7 @@ import org.apache.iceberg.types.Types.IntegerType; import org.apache.iceberg.types.Types.NestedField; import org.apache.iceberg.types.Types.VariantType; +import org.apache.iceberg.util.JsonUtil; import org.apache.iceberg.variants.PhysicalType; import org.apache.iceberg.variants.ShreddedObject; import org.apache.iceberg.variants.ValueArray; @@ -70,6 +82,10 @@ import org.apache.parquet.schema.Type; import org.apache.parquet.schema.Types; import org.assertj.core.api.Assumptions; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; @@ -154,10 +170,139 @@ private static Stream metadataAndValues() { return Streams.concat(primitives, object); } + // TO GENERATE TEST CASES, SET THIS TO A LOCATION + private static final String CASE_LOCATION = null; + private static final FileIO IO = new TestTables.LocalFileIO(); + private static final StringWriter JSON_STRING_WRITER = new StringWriter(); + private static final AtomicInteger caseAssignment = new AtomicInteger(0); + private static JsonGenerator CASE_JSON_GENERATOR; + private int caseNumber = -1; + + @BeforeAll + public static void createJsonGenerator() throws IOException { + if (CASE_LOCATION != null) { + CASE_JSON_GENERATOR = JsonUtil.factory().createGenerator(JSON_STRING_WRITER); + CASE_JSON_GENERATOR.useDefaultPrettyPrinter(); + CASE_JSON_GENERATOR.writeStartArray(); + } + } + + @AfterAll + public static void writeLog() throws IOException { + if (CASE_JSON_GENERATOR == null) { + return; + } + + CASE_JSON_GENERATOR.writeEndArray(); + CASE_JSON_GENERATOR.flush(); + String caseJson = JSON_STRING_WRITER.toString(); + + try (OutputStream out = IO.newOutputFile(CASE_LOCATION + "/cases.json").createOrOverwrite()) { + out.write(caseJson.getBytes(StandardCharsets.UTF_8)); + } + } + + @BeforeEach + public void startCaseJson() throws IOException { + this.caseNumber = caseAssignment.incrementAndGet(); + + if (CASE_JSON_GENERATOR == null) { + return; + } + + CASE_JSON_GENERATOR.writeStartObject(); + CASE_JSON_GENERATOR.writeNumberField("case_number", caseNumber); + } + + @AfterEach + public void endCaseJson() throws IOException { + if (CASE_JSON_GENERATOR == null) { + return; + } + + CASE_JSON_GENERATOR.writeEndObject(); + } + + private void errorCase(String testName, String errorMessage) throws IOException { + if (CASE_JSON_GENERATOR == null) { + return; + } + + CASE_JSON_GENERATOR.writeStringField("test", testName); + CASE_JSON_GENERATOR.writeStringField("parquet_file", parquetFile()); + CASE_JSON_GENERATOR.writeStringField("error_message", errorMessage); + } + + private void multipleVariantCase(String testName, Variant... variants) throws IOException { + if (CASE_JSON_GENERATOR == null) { + return; + } + + CASE_JSON_GENERATOR.writeStringField("test", testName); + CASE_JSON_GENERATOR.writeStringField("parquet_file", parquetFile()); + + CASE_JSON_GENERATOR.writeArrayFieldStart("variant_files"); + for (int i = 0; i < variants.length; i += 1) { + if (variants[i] != null) { + String file = writeVariantFile(i, variants[i]); + CASE_JSON_GENERATOR.writeString(file); + } else { + CASE_JSON_GENERATOR.writeNull(); + } + } + CASE_JSON_GENERATOR.writeEndArray(); + + CASE_JSON_GENERATOR.writeStringField("variants", Arrays.toString(variants)); + } + + private void singleVariantCase(String testName, Variant variant) throws IOException { + if (CASE_JSON_GENERATOR == null) { + return; + } + + CASE_JSON_GENERATOR.writeStringField("test", testName); + CASE_JSON_GENERATOR.writeStringField("parquet_file", parquetFile()); + + String file = writeVariantFile(0, variant); + CASE_JSON_GENERATOR.writeStringField("variant_file", file); + + CASE_JSON_GENERATOR.writeStringField("variant", String.valueOf(variant)); + } + + private String writeVariantFile(int rowId, Variant variant) throws IOException { + String variantFile = String.format("case-%03d_row-%d.variant.bin", caseNumber, rowId); + + ByteBuffer buffer = ParquetVariantUtil.toByteBuffer(variant.metadata(), variant.value()); + try (OutputStream out = + IO.newOutputFile(CASE_LOCATION + "/" + variantFile).createOrOverwrite()) { + out.write(buffer.array()); + } + + return variantFile; + } + + private String parquetFile() { + return String.format("case-%03d.parquet", caseNumber); + } + + private void writeParquetFile(InputFile file) throws IOException { + if (CASE_JSON_GENERATOR == null) { + return; + } + + try (OutputStream out = + IO.newOutputFile(CASE_LOCATION + "/" + parquetFile()).createOrOverwrite(); + InputStream in = file.newStream()) { + IOUtils.copyBytes(in, out, 1000); + } + } + @ParameterizedTest @MethodSource("metadataAndValues") public void testUnshreddedVariants(VariantMetadata metadata, VariantValue expected) throws IOException { + singleVariantCase("testUnshreddedVariants", Variant.of(metadata, expected)); + GroupType variantType = variant("var", 2); MessageType parquetSchema = parquetSchema(variantType); @@ -178,6 +323,8 @@ public void testUnshreddedVariants(VariantMetadata metadata, VariantValue expect @MethodSource("metadataAndValues") public void testUnshreddedVariantsWithShreddedSchema( VariantMetadata metadata, VariantValue expected) throws IOException { + singleVariantCase("testUnshreddedVariantsWithShreddedSchema", Variant.of(metadata, expected)); + // the variant's Parquet schema has a shredded field that is unused by all data values GroupType variantType = variant("var", 2, shreddedPrimitive(PrimitiveTypeName.BINARY, STRING)); MessageType parquetSchema = parquetSchema(variantType); @@ -202,6 +349,9 @@ public void testShreddedVariantPrimitives(VariantPrimitive primitive) throws .as("Null is not a shredded type") .isTrue(); + Variant expected = Variant.of(EMPTY_METADATA, primitive); + singleVariantCase("testShreddedVariantPrimitives", expected); + GroupType variantType = variant("var", 2, shreddedType(primitive)); MessageType parquetSchema = parquetSchema(variantType); @@ -220,12 +370,15 @@ public void testShreddedVariantPrimitives(VariantPrimitive primitive) throws assertThat(actual.getField("var")).isInstanceOf(Variant.class); Variant actualVariant = (Variant) actual.getField("var"); - VariantTestUtil.assertEqual(EMPTY_METADATA, actualVariant.metadata()); - VariantTestUtil.assertEqual(primitive, actualVariant.value()); + VariantTestUtil.assertEqual(expected.metadata(), actualVariant.metadata()); + VariantTestUtil.assertEqual(expected.value(), actualVariant.value()); } @Test public void testNullValueAndNullTypedValue() throws IOException { + Variant expected = Variant.of(EMPTY_METADATA, Variants.ofNull()); + singleVariantCase("testNullValueAndNullTypedValue", expected); + GroupType variantType = variant("var", 2, shreddedPrimitive(PrimitiveTypeName.INT32)); MessageType parquetSchema = parquetSchema(variantType); @@ -238,12 +391,15 @@ public void testNullValueAndNullTypedValue() throws IOException { assertThat(actual.getField("var")).isInstanceOf(Variant.class); Variant actualVariant = (Variant) actual.getField("var"); - VariantTestUtil.assertEqual(EMPTY_METADATA, actualVariant.metadata()); - VariantTestUtil.assertEqual(Variants.ofNull(), actualVariant.value()); + VariantTestUtil.assertEqual(expected.metadata(), actualVariant.metadata()); + VariantTestUtil.assertEqual(expected.value(), actualVariant.value()); } @Test public void testMissingValueColumn() throws IOException { + Variant expected = Variant.of(EMPTY_METADATA, Variants.of(34)); + singleVariantCase("testMissingValueColumn", expected); + GroupType variantType = Types.buildGroup(Type.Repetition.REQUIRED) .id(2) @@ -262,12 +418,15 @@ public void testMissingValueColumn() throws IOException { assertThat(actual.getField("var")).isInstanceOf(Variant.class); Variant actualVariant = (Variant) actual.getField("var"); - VariantTestUtil.assertEqual(EMPTY_METADATA, actualVariant.metadata()); - VariantTestUtil.assertEqual(Variants.of(34), actualVariant.value()); + VariantTestUtil.assertEqual(expected.metadata(), actualVariant.metadata()); + VariantTestUtil.assertEqual(expected.value(), actualVariant.value()); } @Test - public void testValueAndTypedValueConflict() { + public void testValueAndTypedValueConflict() throws IOException { + errorCase( + "testValueAndTypedValueConflict", "Invalid variant, conflicting value and typed_value"); + GroupType variantType = variant("var", 2, shreddedPrimitive(PrimitiveTypeName.INT32)); MessageType parquetSchema = parquetSchema(variantType); @@ -289,7 +448,9 @@ public void testValueAndTypedValueConflict() { } @Test - public void testUnsignedInteger() { + public void testUnsignedInteger() throws IOException { + errorCase("testUnsignedInteger", "Unsupported shredded value type: INTEGER(32,false)"); + GroupType variantType = variant( "var", @@ -307,7 +468,11 @@ public void testUnsignedInteger() { } @Test - public void testFixedLengthByteArray() { + public void testFixedLengthByteArray() throws IOException { + errorCase( + "testFixedLengthByteArray", + "Unsupported shredded value type: optional fixed_len_byte_array(4) typed_value"); + GroupType variantType = variant( "var", @@ -327,6 +492,13 @@ public void testFixedLengthByteArray() { @Test public void testShreddedObject() throws IOException { + ShreddedObject expectedObj = Variants.object(TEST_METADATA); + expectedObj.put("a", Variants.ofNull()); + expectedObj.put("b", Variants.of("")); + + Variant expected = Variant.of(TEST_METADATA, expectedObj); + singleVariantCase("testShreddedObject", expected); + GroupType fieldA = field("a", shreddedPrimitive(PrimitiveTypeName.INT32)); GroupType fieldB = field("b", shreddedPrimitive(PrimitiveTypeName.BINARY, STRING)); GroupType objectFields = objectFields(fieldA, fieldB); @@ -344,17 +516,20 @@ public void testShreddedObject() throws IOException { assertThat(actual.getField("id")).isEqualTo(1); assertThat(actual.getField("var")).isInstanceOf(Variant.class); - ShreddedObject expected = Variants.object(TEST_METADATA); - expected.put("a", Variants.ofNull()); - expected.put("b", Variants.of("")); - Variant actualVariant = (Variant) actual.getField("var"); - VariantTestUtil.assertEqual(TEST_METADATA, actualVariant.metadata()); - VariantTestUtil.assertEqual(expected, actualVariant.value()); + VariantTestUtil.assertEqual(expected.metadata(), actualVariant.metadata()); + VariantTestUtil.assertEqual(expected.value(), actualVariant.value()); } @Test public void testShreddedObjectMissingValueColumn() throws IOException { + ShreddedObject expectedObj = Variants.object(TEST_METADATA); + expectedObj.put("a", Variants.of((short) 1234)); + expectedObj.put("b", Variants.of("iceberg")); + + Variant expected = Variant.of(TEST_METADATA, expectedObj); + singleVariantCase("testShreddedObjectMissingValueColumn", expected); + GroupType fieldA = field("a", shreddedPrimitive(PrimitiveTypeName.INT32)); GroupType fieldB = field("b", shreddedPrimitive(PrimitiveTypeName.BINARY, STRING)); GroupType objectFields = objectFields(fieldA, fieldB); @@ -379,17 +554,19 @@ public void testShreddedObjectMissingValueColumn() throws IOException { assertThat(actual.getField("id")).isEqualTo(1); assertThat(actual.getField("var")).isInstanceOf(Variant.class); - ShreddedObject expected = Variants.object(TEST_METADATA); - expected.put("a", Variants.of((short) 1234)); - expected.put("b", Variants.of("iceberg")); - Variant actualVariant = (Variant) actual.getField("var"); - VariantTestUtil.assertEqual(TEST_METADATA, actualVariant.metadata()); - VariantTestUtil.assertEqual(expected, actualVariant.value()); + VariantTestUtil.assertEqual(expected.metadata(), actualVariant.metadata()); + VariantTestUtil.assertEqual(expected.value(), actualVariant.value()); } @Test public void testShreddedObjectMissingField() throws IOException { + ShreddedObject expectedObj = Variants.object(TEST_METADATA); + expectedObj.put("a", Variants.of(false)); + + Variant expected = Variant.of(TEST_METADATA, expectedObj); + singleVariantCase("testShreddedObjectMissingField", expected); + GroupType fieldA = field("a", shreddedPrimitive(PrimitiveTypeName.INT32)); GroupType fieldB = field("b", shreddedPrimitive(PrimitiveTypeName.BINARY, STRING)); GroupType objectFields = objectFields(fieldA, fieldB); @@ -408,16 +585,16 @@ public void testShreddedObjectMissingField() throws IOException { assertThat(actual.getField("id")).isEqualTo(1); assertThat(actual.getField("var")).isInstanceOf(Variant.class); - ShreddedObject expected = Variants.object(TEST_METADATA); - expected.put("a", Variants.of(false)); - Variant actualVariant = (Variant) actual.getField("var"); - VariantTestUtil.assertEqual(TEST_METADATA, actualVariant.metadata()); - VariantTestUtil.assertEqual(expected, actualVariant.value()); + VariantTestUtil.assertEqual(expected.metadata(), actualVariant.metadata()); + VariantTestUtil.assertEqual(expected.value(), actualVariant.value()); } @Test public void testEmptyShreddedObject() throws IOException { + Variant expected = Variant.of(TEST_METADATA, Variants.object(TEST_METADATA)); + singleVariantCase("testEmptyShreddedObject", expected); + GroupType fieldA = field("a", shreddedPrimitive(PrimitiveTypeName.INT32)); GroupType fieldB = field("b", shreddedPrimitive(PrimitiveTypeName.BINARY, STRING)); GroupType objectFields = objectFields(fieldA, fieldB); @@ -435,15 +612,19 @@ public void testEmptyShreddedObject() throws IOException { assertThat(actual.getField("id")).isEqualTo(1); assertThat(actual.getField("var")).isInstanceOf(Variant.class); - ShreddedObject expected = Variants.object(TEST_METADATA); - Variant actualVariant = (Variant) actual.getField("var"); - VariantTestUtil.assertEqual(TEST_METADATA, actualVariant.metadata()); - VariantTestUtil.assertEqual(expected, actualVariant.value()); + VariantTestUtil.assertEqual(expected.metadata(), actualVariant.metadata()); + VariantTestUtil.assertEqual(expected.value(), actualVariant.value()); } @Test public void testShreddedObjectMissingFieldValueColumn() throws IOException { + ShreddedObject expectedObj = Variants.object(TEST_METADATA); + expectedObj.put("b", Variants.of("iceberg")); + + Variant expected = Variant.of(TEST_METADATA, expectedObj); + singleVariantCase("testShreddedObjectMissingFieldValueColumn", expected); + // field groups do not have value GroupType fieldA = Types.buildGroup(Type.Repetition.REQUIRED) @@ -469,16 +650,19 @@ public void testShreddedObjectMissingFieldValueColumn() throws IOException { assertThat(actual.getField("id")).isEqualTo(1); assertThat(actual.getField("var")).isInstanceOf(Variant.class); - ShreddedObject expected = Variants.object(TEST_METADATA); - expected.put("b", Variants.of("iceberg")); - Variant actualVariant = (Variant) actual.getField("var"); - VariantTestUtil.assertEqual(TEST_METADATA, actualVariant.metadata()); - VariantTestUtil.assertEqual(expected, actualVariant.value()); + VariantTestUtil.assertEqual(expected.metadata(), actualVariant.metadata()); + VariantTestUtil.assertEqual(expected.value(), actualVariant.value()); } @Test public void testShreddedObjectMissingTypedValue() throws IOException { + ShreddedObject expectedObj = Variants.object(TEST_METADATA); + expectedObj.put("b", Variants.of("iceberg")); + + Variant expected = Variant.of(TEST_METADATA, expectedObj); + singleVariantCase("testShreddedObjectMissingTypedValue", expected); + // field groups do not have typed_value GroupType fieldA = Types.buildGroup(Type.Repetition.REQUIRED) @@ -506,16 +690,23 @@ public void testShreddedObjectMissingTypedValue() throws IOException { assertThat(actual.getField("id")).isEqualTo(1); assertThat(actual.getField("var")).isInstanceOf(Variant.class); - ShreddedObject expected = Variants.object(TEST_METADATA); - expected.put("b", Variants.of("iceberg")); - Variant actualVariant = (Variant) actual.getField("var"); - VariantTestUtil.assertEqual(TEST_METADATA, actualVariant.metadata()); - VariantTestUtil.assertEqual(expected, actualVariant.value()); + VariantTestUtil.assertEqual(expected.metadata(), actualVariant.metadata()); + VariantTestUtil.assertEqual(expected.value(), actualVariant.value()); } @Test public void testShreddedObjectWithinShreddedObject() throws IOException { + ShreddedObject expectedInner = Variants.object(TEST_METADATA); + expectedInner.put("a", Variants.of(34)); + expectedInner.put("b", Variants.of("iceberg")); + ShreddedObject expectedOuter = Variants.object(TEST_METADATA); + expectedOuter.put("c", expectedInner); + expectedOuter.put("d", Variants.of(-0.0D)); + + Variant expected = Variant.of(TEST_METADATA, expectedOuter); + singleVariantCase("testShreddedObjectWithinShreddedObject", expected); + GroupType fieldA = field("a", shreddedPrimitive(PrimitiveTypeName.INT32)); GroupType fieldB = field("b", shreddedPrimitive(PrimitiveTypeName.BINARY, STRING)); GroupType innerFields = objectFields(fieldA, fieldB); @@ -539,20 +730,21 @@ public void testShreddedObjectWithinShreddedObject() throws IOException { assertThat(actual.getField("id")).isEqualTo(1); assertThat(actual.getField("var")).isInstanceOf(Variant.class); - ShreddedObject expectedInner = Variants.object(TEST_METADATA); - expectedInner.put("a", Variants.of(34)); - expectedInner.put("b", Variants.of("iceberg")); - ShreddedObject expectedOuter = Variants.object(TEST_METADATA); - expectedOuter.put("c", expectedInner); - expectedOuter.put("d", Variants.of(-0.0D)); - Variant actualVariant = (Variant) actual.getField("var"); - VariantTestUtil.assertEqual(TEST_METADATA, actualVariant.metadata()); - VariantTestUtil.assertEqual(expectedOuter, actualVariant.value()); + VariantTestUtil.assertEqual(expected.metadata(), actualVariant.metadata()); + VariantTestUtil.assertEqual(expected.value(), actualVariant.value()); } @Test public void testShreddedObjectWithOptionalFieldStructs() throws IOException { + // the expected value is the shredded field value + ShreddedObject expectedObj = Variants.object(TEST_METADATA); + expectedObj.put("a", Variants.of(34)); + expectedObj.put("b", Variants.of("iceberg")); + + Variant expected = Variant.of(TEST_METADATA, expectedObj); + singleVariantCase("testShreddedObjectWithOptionalFieldStructs", expected); + // fields use an incorrect OPTIONAL struct of value and typed_value to test definition levels GroupType fieldA = Types.buildGroup(Type.Repetition.OPTIONAL) @@ -598,18 +790,21 @@ public void testShreddedObjectWithOptionalFieldStructs() throws IOException { assertThat(actual.getField("id")).isEqualTo(1); assertThat(actual.getField("var")).isInstanceOf(Variant.class); - // the expected value is the shredded field value - ShreddedObject expected = Variants.object(TEST_METADATA); - expected.put("a", Variants.of(34)); - expected.put("b", Variants.of("iceberg")); - Variant actualVariant = (Variant) actual.getField("var"); - VariantTestUtil.assertEqual(TEST_METADATA, actualVariant.metadata()); - VariantTestUtil.assertEqual(expected, actualVariant.value()); + VariantTestUtil.assertEqual(expected.metadata(), actualVariant.metadata()); + VariantTestUtil.assertEqual(expected.value(), actualVariant.value()); } @Test public void testPartiallyShreddedObject() throws IOException { + ShreddedObject expectedObj = Variants.object(TEST_METADATA); + expectedObj.put("a", Variants.ofNull()); + expectedObj.put("b", Variants.of("iceberg")); + expectedObj.put("d", Variants.ofIsoDate("2024-01-30")); + + Variant expected = Variant.of(TEST_METADATA, expectedObj); + singleVariantCase("testPartiallyShreddedObject", expected); + GroupType fieldA = field("a", shreddedPrimitive(PrimitiveTypeName.INT32)); GroupType fieldB = field("b", shreddedPrimitive(PrimitiveTypeName.BINARY, STRING)); GroupType objectFields = objectFields(fieldA, fieldB); @@ -638,18 +833,21 @@ public void testPartiallyShreddedObject() throws IOException { assertThat(actual.getField("id")).isEqualTo(1); assertThat(actual.getField("var")).isInstanceOf(Variant.class); - ShreddedObject expected = Variants.object(TEST_METADATA); - expected.put("a", Variants.ofNull()); - expected.put("b", Variants.of("iceberg")); - expected.put("d", Variants.ofIsoDate("2024-01-30")); - Variant actualVariant = (Variant) actual.getField("var"); - VariantTestUtil.assertEqual(TEST_METADATA, actualVariant.metadata()); - VariantTestUtil.assertEqual(expected, actualVariant.value()); + VariantTestUtil.assertEqual(expected.metadata(), actualVariant.metadata()); + VariantTestUtil.assertEqual(expected.value(), actualVariant.value()); } @Test public void testPartiallyShreddedObjectFieldConflict() throws IOException { + // the expected value is the shredded field value + ShreddedObject expectedObj = Variants.object(TEST_METADATA); + expectedObj.put("a", Variants.ofNull()); + expectedObj.put("b", Variants.of("iceberg")); + + Variant expected = Variant.of(TEST_METADATA, expectedObj); + singleVariantCase("testPartiallyShreddedObjectFieldConflict", expected); + GroupType fieldA = field("a", shreddedPrimitive(PrimitiveTypeName.INT32)); GroupType fieldB = field("b", shreddedPrimitive(PrimitiveTypeName.BINARY, STRING)); GroupType objectFields = objectFields(fieldA, fieldB); @@ -679,18 +877,20 @@ public void testPartiallyShreddedObjectFieldConflict() throws IOException { assertThat(actual.getField("id")).isEqualTo(1); assertThat(actual.getField("var")).isInstanceOf(Variant.class); - // the expected value is the shredded field value - ShreddedObject expected = Variants.object(TEST_METADATA); - expected.put("a", Variants.ofNull()); - expected.put("b", Variants.of("iceberg")); - Variant actualVariant = (Variant) actual.getField("var"); - VariantTestUtil.assertEqual(TEST_METADATA, actualVariant.metadata()); - VariantTestUtil.assertEqual(expected, actualVariant.value()); + VariantTestUtil.assertEqual(expected.metadata(), actualVariant.metadata()); + VariantTestUtil.assertEqual(expected.value(), actualVariant.value()); } @Test public void testPartiallyShreddedObjectMissingFieldConflict() throws IOException { + // the expected value is the shredded field value + ShreddedObject expectedObj = Variants.object(TEST_METADATA); + expectedObj.put("a", Variants.ofNull()); + + Variant expected = Variant.of(TEST_METADATA, expectedObj); + singleVariantCase("testPartiallyShreddedObjectMissingFieldConflict", expected); + GroupType fieldA = field("a", shreddedPrimitive(PrimitiveTypeName.INT32)); GroupType fieldB = field("b", shreddedPrimitive(PrimitiveTypeName.BINARY, STRING)); GroupType objectFields = objectFields(fieldA, fieldB); @@ -721,17 +921,16 @@ public void testPartiallyShreddedObjectMissingFieldConflict() throws IOException assertThat(actual.getField("id")).isEqualTo(1); assertThat(actual.getField("var")).isInstanceOf(Variant.class); - // the expected value is the shredded field value - ShreddedObject expected = Variants.object(TEST_METADATA); - expected.put("a", Variants.ofNull()); - Variant actualVariant = (Variant) actual.getField("var"); - VariantTestUtil.assertEqual(TEST_METADATA, actualVariant.metadata()); - VariantTestUtil.assertEqual(expected, actualVariant.value()); + VariantTestUtil.assertEqual(expected.metadata(), actualVariant.metadata()); + VariantTestUtil.assertEqual(expected.value(), actualVariant.value()); } @Test public void testNonObjectWithNullShreddedFields() throws IOException { + Variant expected = Variant.of(TEST_METADATA, Variants.of(34)); + singleVariantCase("testNonObjectWithNullShreddedFields", expected); + GroupType fieldA = field("a", shreddedPrimitive(PrimitiveTypeName.INT32)); GroupType fieldB = field("b", shreddedPrimitive(PrimitiveTypeName.BINARY, STRING)); GroupType objectFields = objectFields(fieldA, fieldB); @@ -749,12 +948,16 @@ public void testNonObjectWithNullShreddedFields() throws IOException { assertThat(actual.getField("var")).isInstanceOf(Variant.class); Variant actualVariant = (Variant) actual.getField("var"); - VariantTestUtil.assertEqual(TEST_METADATA, actualVariant.metadata()); - VariantTestUtil.assertEqual(Variants.of(34), actualVariant.value()); + VariantTestUtil.assertEqual(expected.metadata(), actualVariant.metadata()); + VariantTestUtil.assertEqual(expected.value(), actualVariant.value()); } @Test - public void testNonObjectWithNonNullShreddedFields() { + public void testNonObjectWithNonNullShreddedFields() throws IOException { + errorCase( + "testNonObjectWithNonNullShreddedFields", + "Invalid variant, non-object value with shredded fields"); + GroupType fieldA = field("a", shreddedPrimitive(PrimitiveTypeName.INT32)); GroupType fieldB = field("b", shreddedPrimitive(PrimitiveTypeName.BINARY, STRING)); GroupType objectFields = objectFields(fieldA, fieldB); @@ -782,7 +985,11 @@ public void testNonObjectWithNonNullShreddedFields() { } @Test - public void testEmptyPartiallyShreddedObjectConflict() { + public void testEmptyPartiallyShreddedObjectConflict() throws IOException { + errorCase( + "testEmptyPartiallyShreddedObjectConflict", + "Invalid variant, non-object value with shredded fields"); + GroupType fieldA = field("a", shreddedPrimitive(PrimitiveTypeName.INT32)); GroupType fieldB = field("b", shreddedPrimitive(PrimitiveTypeName.BINARY, STRING)); GroupType objectFields = objectFields(fieldA, fieldB); @@ -811,6 +1018,29 @@ public void testEmptyPartiallyShreddedObjectConflict() { @Test public void testMixedRecords() throws IOException { + ShreddedObject expectedC1 = Variants.object(TEST_METADATA); + expectedC1.put("b", Variants.of("iceberg")); + ShreddedObject expectedOne = Variants.object(TEST_METADATA); + expectedOne.put("c", expectedC1); + + ShreddedObject expectedTwo = Variants.object(TEST_METADATA); + expectedTwo.put("c", Variants.of((byte) 8)); + expectedTwo.put("d", Variants.of(-0.0D)); + + ShreddedObject expectedC3 = Variants.object(TEST_METADATA); + expectedC3.put("a", Variants.of(34)); + expectedC3.put("b", Variants.of("")); + ShreddedObject expectedThree = Variants.object(TEST_METADATA); + expectedThree.put("c", expectedC3); + expectedThree.put("d", Variants.of(0.0D)); + + multipleVariantCase( + "testMixedRecords", + null, + Variant.of(TEST_METADATA, expectedOne), + Variant.of(TEST_METADATA, expectedTwo), + Variant.of(TEST_METADATA, expectedThree)); + // tests multiple rows to check that Parquet columns are correctly advanced GroupType fieldA = field("a", shreddedPrimitive(PrimitiveTypeName.INT32)); GroupType fieldB = field("b", shreddedPrimitive(PrimitiveTypeName.BINARY, STRING)); @@ -833,11 +1063,6 @@ public void testMixedRecords() throws IOException { record(variantType, Map.of("metadata", TEST_METADATA_BUFFER, "typed_value", outer1)); GenericRecord one = record(parquetSchema, Map.of("id", 1, "var", variant1)); - ShreddedObject expectedC1 = Variants.object(TEST_METADATA); - expectedC1.put("b", Variants.of("iceberg")); - ShreddedObject expectedOne = Variants.object(TEST_METADATA); - expectedOne.put("c", expectedC1); - GenericRecord c2 = record(fieldC, Map.of("value", serialize(Variants.of((byte) 8)))); GenericRecord d2 = record(fieldD, Map.of("typed_value", -0.0D)); GenericRecord outer2 = record(outerFields, Map.of("c", c2, "d", d2)); @@ -845,10 +1070,6 @@ public void testMixedRecords() throws IOException { record(variantType, Map.of("metadata", TEST_METADATA_BUFFER, "typed_value", outer2)); GenericRecord two = record(parquetSchema, Map.of("id", 2, "var", variant2)); - ShreddedObject expectedTwo = Variants.object(TEST_METADATA); - expectedTwo.put("c", Variants.of((byte) 8)); - expectedTwo.put("d", Variants.of(-0.0D)); - GenericRecord a3 = record(fieldA, Map.of("typed_value", 34)); GenericRecord b3 = record(fieldB, Map.of("value", serialize(Variants.of("")))); GenericRecord inner3 = record(innerFields, Map.of("a", a3, "b", b3)); @@ -859,13 +1080,6 @@ public void testMixedRecords() throws IOException { record(variantType, Map.of("metadata", TEST_METADATA_BUFFER, "typed_value", outer3)); GenericRecord three = record(parquetSchema, Map.of("id", 3, "var", variant3)); - ShreddedObject expectedC3 = Variants.object(TEST_METADATA); - expectedC3.put("a", Variants.of(34)); - expectedC3.put("b", Variants.of("")); - ShreddedObject expectedThree = Variants.object(TEST_METADATA); - expectedThree.put("c", expectedC3); - expectedThree.put("d", Variants.of(0.0D)); - List records = writeAndRead(parquetSchema, List.of(zero, one, two, three)); Record actualZero = records.get(0); @@ -899,6 +1113,13 @@ public void testMixedRecords() throws IOException { @Test public void testSimpleArray() throws IOException { + ValueArray expectedArray = Variants.array(); + expectedArray.add(Variants.of("comedy")); + expectedArray.add(Variants.of("drama")); + + Variant expected = Variant.of(EMPTY_METADATA, expectedArray); + singleVariantCase("testSimpleArray", expected); + Type shreddedType = shreddedPrimitive(PrimitiveTypeName.BINARY, STRING); GroupType elementType = element(shreddedType); GroupType variantType = variant("var", 2, list(elementType)); @@ -914,21 +1135,20 @@ public void testSimpleArray() throws IOException { variantType, Map.of("metadata", VariantTestUtil.emptyMetadata(), "typed_value", arr)); GenericRecord row = record(parquetSchema, Map.of("id", 1, "var", var)); - ValueArray expectedArray = Variants.array(); - expectedArray.add(Variants.of("comedy")); - expectedArray.add(Variants.of("drama")); - Record actual = writeAndRead(parquetSchema, row); assertThat(actual.getField("id")).isEqualTo(1); assertThat(actual.getField("var")).isInstanceOf(Variant.class); Variant actualVariant = (Variant) actual.getField("var"); - VariantTestUtil.assertEqual(EMPTY_METADATA, actualVariant.metadata()); - VariantTestUtil.assertEqual(expectedArray, actualVariant.value()); + VariantTestUtil.assertEqual(expected.metadata(), actualVariant.metadata()); + VariantTestUtil.assertEqual(expected.value(), actualVariant.value()); } @Test public void testNullArray() throws IOException { + Variant expected = Variant.of(EMPTY_METADATA, Variants.ofNull()); + singleVariantCase("testNullArray", expected); + Type shreddedType = shreddedPrimitive(PrimitiveTypeName.BINARY, STRING); GroupType variantType = variant("var", 2, list(element(shreddedType))); MessageType parquetSchema = parquetSchema(variantType); @@ -948,12 +1168,15 @@ public void testNullArray() throws IOException { assertThat(actual.getField("id")).isEqualTo(1); assertThat(actual.getField("var")).isInstanceOf(Variant.class); Variant actualVariant = (Variant) actual.getField("var"); - VariantTestUtil.assertEqual(EMPTY_METADATA, actualVariant.metadata()); - VariantTestUtil.assertEqual(Variants.ofNull(), actualVariant.value()); + VariantTestUtil.assertEqual(expected.metadata(), actualVariant.metadata()); + VariantTestUtil.assertEqual(expected.value(), actualVariant.value()); } @Test public void testEmptyArray() throws IOException { + Variant expected = Variant.of(EMPTY_METADATA, Variants.array()); + singleVariantCase("testEmptyArray", expected); + Type shreddedType = shreddedPrimitive(PrimitiveTypeName.BINARY, STRING); GroupType variantType = variant("var", 2, list(element(shreddedType))); MessageType parquetSchema = parquetSchema(variantType); @@ -970,11 +1193,20 @@ public void testEmptyArray() throws IOException { Variant actualVariant = (Variant) actual.getField("var"); assertThat(actualVariant.value().type()).isEqualTo(PhysicalType.ARRAY); assertThat(actualVariant.value().asArray().numElements()).isEqualTo(0); - VariantTestUtil.assertEqual(EMPTY_METADATA, actualVariant.metadata()); + VariantTestUtil.assertEqual(expected.metadata(), actualVariant.metadata()); + VariantTestUtil.assertEqual(expected.value(), actualVariant.value()); } @Test public void testArrayWithNull() throws IOException { + ValueArray expectedArray = Variants.array(); + expectedArray.add(Variants.of("comedy")); + expectedArray.add(Variants.ofNull()); + expectedArray.add(Variants.of("drama")); + + Variant expected = Variant.of(EMPTY_METADATA, expectedArray); + singleVariantCase("testArrayWithNull", expected); + Type shreddedType = shreddedPrimitive(PrimitiveTypeName.BINARY, STRING); GroupType elementType = element(shreddedType); GroupType variantType = variant("var", 2, list(elementType)); @@ -991,11 +1223,6 @@ public void testArrayWithNull() throws IOException { variantType, Map.of("metadata", VariantTestUtil.emptyMetadata(), "typed_value", arr)); GenericRecord row = record(parquetSchema, Map.of("id", 1, "var", var)); - ValueArray expectedArray = Variants.array(); - expectedArray.add(Variants.of("comedy")); - expectedArray.add(Variants.ofNull()); - expectedArray.add(Variants.of("drama")); - Record actual = writeAndRead(parquetSchema, row); assertThat(actual.getField("id")).isEqualTo(1); @@ -1003,12 +1230,23 @@ public void testArrayWithNull() throws IOException { Variant actualVariant = (Variant) actual.getField("var"); assertThat(actualVariant.value().type()).isEqualTo(PhysicalType.ARRAY); assertThat(actualVariant.value().asArray().numElements()).isEqualTo(3); - VariantTestUtil.assertEqual(EMPTY_METADATA, actualVariant.metadata()); - VariantTestUtil.assertEqual(expectedArray, actualVariant.value()); + VariantTestUtil.assertEqual(expected.metadata(), actualVariant.metadata()); + VariantTestUtil.assertEqual(expected.value(), actualVariant.value()); } @Test public void testNestedArray() throws IOException { + ValueArray expectedArray = Variants.array(); + ValueArray expectedInner1 = Variants.array(); + expectedInner1.add(Variants.of("comedy")); + expectedInner1.add(Variants.of("drama")); + ValueArray expectedInner2 = Variants.array(); + expectedArray.add(expectedInner1); + expectedArray.add(expectedInner2); + + Variant expected = Variant.of(EMPTY_METADATA, expectedArray); + singleVariantCase("testNestedArray", expected); + Type shreddedType = shreddedPrimitive(PrimitiveTypeName.BINARY, STRING); GroupType elementType = element(shreddedType); GroupType outerElementType = element(list(elementType)); @@ -1029,26 +1267,45 @@ public void testNestedArray() throws IOException { Map.of("metadata", VariantTestUtil.emptyMetadata(), "typed_value", outer1)); GenericRecord row = record(parquetSchema, Map.of("id", 1, "var", var)); - ValueArray expectedArray = Variants.array(); - ValueArray expectedInner1 = Variants.array(); - expectedInner1.add(Variants.of("comedy")); - expectedInner1.add(Variants.of("drama")); - ValueArray expectedInner2 = Variants.array(); - expectedArray.add(expectedInner1); - expectedArray.add(expectedInner2); - Record actual = writeAndRead(parquetSchema, row); // Verify assertThat(actual.getField("id")).isEqualTo(1); assertThat(actual.getField("var")).isInstanceOf(Variant.class); Variant actualVariant = (Variant) actual.getField("var"); - VariantTestUtil.assertEqual(EMPTY_METADATA, actualVariant.metadata()); - VariantTestUtil.assertEqual(expectedArray, actualVariant.value()); + VariantTestUtil.assertEqual(expected.metadata(), actualVariant.metadata()); + VariantTestUtil.assertEqual(expected.value(), actualVariant.value()); } @Test public void testArrayWithNestedObject() throws IOException { + ValueArray expected1 = Variants.array(); + ShreddedObject expectedElement1 = Variants.object(TEST_METADATA); + expectedElement1.put("a", Variants.of(1)); + expectedElement1.put("b", Variants.of("comedy")); + expected1.add(expectedElement1); + ShreddedObject expectedElement2 = Variants.object(TEST_METADATA); + expectedElement2.put("a", Variants.of(2)); + expectedElement2.put("b", Variants.of("drama")); + expected1.add(expectedElement2); + + ValueArray expected2 = Variants.array(); + ShreddedObject expectedElement3 = Variants.object(TEST_METADATA); + expectedElement3.put("a", Variants.of(3)); + expectedElement3.put("b", Variants.of("action")); + expectedElement3.put("c", Variants.of("str")); + expected2.add(expectedElement3); + ShreddedObject expectedElement4 = Variants.object(TEST_METADATA); + expectedElement4.put("a", Variants.of(4)); + expectedElement4.put("b", Variants.of("horror")); + expectedElement4.put("d", Variants.ofIsoDate("2024-01-30")); + expected2.add(expectedElement4); + + multipleVariantCase( + "testArrayWithNestedObject", + Variant.of(TEST_METADATA, expected1), + Variant.of(TEST_METADATA, expected2)); + GroupType fieldA = field("a", shreddedPrimitive(PrimitiveTypeName.INT32)); GroupType fieldB = field("b", shreddedPrimitive(PrimitiveTypeName.BINARY, STRING)); GroupType shreddedFields = objectFields(fieldA, fieldB); @@ -1082,16 +1339,6 @@ public void testArrayWithNestedObject() throws IOException { record(variantType, Map.of("metadata", TEST_METADATA_BUFFER, "typed_value", arr1)); GenericRecord row1 = record(parquetSchema, Map.of("id", 1, "var", var1)); - ValueArray expected1 = Variants.array(); - ShreddedObject expectedElement1 = Variants.object(TEST_METADATA); - expectedElement1.put("a", Variants.of(1)); - expectedElement1.put("b", Variants.of("comedy")); - expected1.add(expectedElement1); - ShreddedObject expectedElement2 = Variants.object(TEST_METADATA); - expectedElement2.put("a", Variants.of(2)); - expectedElement2.put("b", Variants.of("drama")); - expected1.add(expectedElement2); - // Row 2 with nested partially shredded object GenericRecord shredded3 = record( @@ -1123,18 +1370,6 @@ public void testArrayWithNestedObject() throws IOException { record(variantType, Map.of("metadata", TEST_METADATA_BUFFER, "typed_value", arr2)); GenericRecord row2 = record(parquetSchema, Map.of("id", 2, "var", var2)); - ValueArray expected2 = Variants.array(); - ShreddedObject expectedElement3 = Variants.object(TEST_METADATA); - expectedElement3.put("a", Variants.of(3)); - expectedElement3.put("b", Variants.of("action")); - expectedElement3.put("c", Variants.of("str")); - expected2.add(expectedElement3); - ShreddedObject expectedElement4 = Variants.object(TEST_METADATA); - expectedElement4.put("a", Variants.of(4)); - expectedElement4.put("b", Variants.of("horror")); - expectedElement4.put("d", Variants.ofIsoDate("2024-01-30")); - expected2.add(expectedElement4); - // verify List actual = writeAndRead(parquetSchema, List.of(row1, row2)); Record actual1 = actual.get(0); @@ -1156,6 +1391,27 @@ public void testArrayWithNestedObject() throws IOException { @Test public void testArrayWithNonArray() throws IOException { + ValueArray expectedArray1 = Variants.array(); + expectedArray1.add(Variants.of("comedy")); + expectedArray1.add(Variants.of("drama")); + + VariantValue expectedValue2 = Variants.of(PhysicalType.INT32, 34); + + ShreddedObject expectedObject3 = Variants.object(TEST_METADATA); + expectedObject3.put("a", Variants.ofNull()); + expectedObject3.put("d", Variants.of("iceberg")); + + ValueArray expectedArray4 = Variants.array(); + expectedArray4.add(Variants.of("action")); + expectedArray4.add(Variants.of("horror")); + + multipleVariantCase( + "testArrayWithNestedObject", + Variant.of(EMPTY_METADATA, expectedArray1), + Variant.of(EMPTY_METADATA, expectedValue2), + Variant.of(TEST_METADATA, expectedObject3), + Variant.of(TEST_METADATA, expectedArray4)); + Type shreddedType = shreddedPrimitive(PrimitiveTypeName.BINARY, STRING); GroupType elementType = element(shreddedType); GroupType variantType = variant("var", 2, list(elementType)); @@ -1168,28 +1424,18 @@ public void testArrayWithNonArray() throws IOException { GenericRecord var1 = record( variantType, Map.of("metadata", VariantTestUtil.emptyMetadata(), "typed_value", arr1)); - GenericRecord row1 = record(parquetSchema, Map.of("id", 1, "var", var1)); - - ValueArray expectedArray1 = Variants.array(); - expectedArray1.add(Variants.of("comedy")); - expectedArray1.add(Variants.of("drama")); + GenericRecord row1 = record(parquetSchema, Map.of("id", 0, "var", var1)); GenericRecord var2 = record( variantType, Map.of( "metadata", VariantTestUtil.emptyMetadata(), "value", serialize(Variants.of(34)))); - GenericRecord row2 = record(parquetSchema, Map.of("id", 2, "var", var2)); - - VariantValue expectedValue2 = Variants.of(PhysicalType.INT32, 34); + GenericRecord row2 = record(parquetSchema, Map.of("id", 1, "var", var2)); GenericRecord var3 = record(variantType, Map.of("metadata", TEST_METADATA_BUFFER, "value", TEST_OBJECT_BUFFER)); - GenericRecord row3 = record(parquetSchema, Map.of("id", 3, "var", var3)); - - ShreddedObject expectedObject3 = Variants.object(TEST_METADATA); - expectedObject3.put("a", Variants.ofNull()); - expectedObject3.put("d", Variants.of("iceberg")); + GenericRecord row3 = record(parquetSchema, Map.of("id", 2, "var", var3)); // Test array is read properly after a non-array List arr4 = @@ -1198,38 +1444,34 @@ public void testArrayWithNonArray() throws IOException { record(elementType, Map.of("typed_value", "horror"))); GenericRecord var4 = record(variantType, Map.of("metadata", TEST_METADATA_BUFFER, "typed_value", arr4)); - GenericRecord row4 = record(parquetSchema, Map.of("id", 4, "var", var4)); - - ValueArray expectedArray4 = Variants.array(); - expectedArray4.add(Variants.of("action")); - expectedArray4.add(Variants.of("horror")); + GenericRecord row4 = record(parquetSchema, Map.of("id", 3, "var", var4)); List actual = writeAndRead(parquetSchema, List.of(row1, row2, row3, row4)); // Verify Record actual1 = actual.get(0); - assertThat(actual1.getField("id")).isEqualTo(1); + assertThat(actual1.getField("id")).isEqualTo(0); assertThat(actual1.getField("var")).isInstanceOf(Variant.class); Variant actualVariant1 = (Variant) actual1.getField("var"); VariantTestUtil.assertEqual(EMPTY_METADATA, actualVariant1.metadata()); VariantTestUtil.assertEqual(expectedArray1, actualVariant1.value()); Record actual2 = actual.get(1); - assertThat(actual2.getField("id")).isEqualTo(2); + assertThat(actual2.getField("id")).isEqualTo(1); assertThat(actual2.getField("var")).isInstanceOf(Variant.class); Variant actualVariant2 = (Variant) actual2.getField("var"); VariantTestUtil.assertEqual(EMPTY_METADATA, actualVariant2.metadata()); VariantTestUtil.assertEqual(expectedValue2, actualVariant2.value()); Record actual3 = actual.get(2); - assertThat(actual3.getField("id")).isEqualTo(3); + assertThat(actual3.getField("id")).isEqualTo(2); assertThat(actual3.getField("var")).isInstanceOf(Variant.class); Variant actualVariant3 = (Variant) actual3.getField("var"); VariantTestUtil.assertEqual(TEST_METADATA, actualVariant3.metadata()); VariantTestUtil.assertEqual(expectedObject3, actualVariant3.value()); Record actual4 = actual.get(3); - assertThat(actual4.getField("id")).isEqualTo(4); + assertThat(actual4.getField("id")).isEqualTo(3); assertThat(actual4.getField("var")).isInstanceOf(Variant.class); Variant actualVariant4 = (Variant) actual4.getField("var"); VariantTestUtil.assertEqual(TEST_METADATA, actualVariant4.metadata()); @@ -1238,6 +1480,13 @@ public void testArrayWithNonArray() throws IOException { @Test public void testArrayMissingValueColumn() throws IOException { + ValueArray expectedArray = Variants.array(); + expectedArray.add(Variants.of("comedy")); + expectedArray.add(Variants.of("drama")); + + Variant expected = Variant.of(EMPTY_METADATA, expectedArray); + singleVariantCase("testArrayMissingValueColumn", expected); + Type shreddedType = shreddedPrimitive(PrimitiveTypeName.BINARY, STRING); GroupType elementType = element(shreddedType); GroupType variantType = @@ -1259,21 +1508,24 @@ public void testArrayMissingValueColumn() throws IOException { variantType, Map.of("metadata", VariantTestUtil.emptyMetadata(), "typed_value", arr)); GenericRecord row = record(parquetSchema, Map.of("id", 1, "var", var)); - ValueArray expectedArray = Variants.array(); - expectedArray.add(Variants.of("comedy")); - expectedArray.add(Variants.of("drama")); - Record actual = writeAndRead(parquetSchema, row); assertThat(actual.getField("id")).isEqualTo(1); assertThat(actual.getField("var")).isInstanceOf(Variant.class); Variant actualVariant = (Variant) actual.getField("var"); - VariantTestUtil.assertEqual(EMPTY_METADATA, actualVariant.metadata()); - VariantTestUtil.assertEqual(expectedArray, actualVariant.value()); + VariantTestUtil.assertEqual(expected.metadata(), actualVariant.metadata()); + VariantTestUtil.assertEqual(expected.value(), actualVariant.value()); } @Test public void testArrayMissingElementValueColumn() throws IOException { + ValueArray expectedArray = Variants.array(); + expectedArray.add(Variants.of("comedy")); + expectedArray.add(Variants.of("drama")); + + Variant expected = Variant.of(EMPTY_METADATA, expectedArray); + singleVariantCase("testArrayMissingElementValueColumn", expected); + Type shreddedType = shreddedPrimitive(PrimitiveTypeName.BINARY, STRING); GroupType elementType = Types.buildGroup(Type.Repetition.REQUIRED).addField(shreddedType).named("element"); @@ -1290,21 +1542,23 @@ public void testArrayMissingElementValueColumn() throws IOException { variantType, Map.of("metadata", VariantTestUtil.emptyMetadata(), "typed_value", arr)); GenericRecord row = record(parquetSchema, Map.of("id", 1, "var", var)); - ValueArray expectedArray = Variants.array(); - expectedArray.add(Variants.of("comedy")); - expectedArray.add(Variants.of("drama")); - Record actual = writeAndRead(parquetSchema, row); assertThat(actual.getField("id")).isEqualTo(1); assertThat(actual.getField("var")).isInstanceOf(Variant.class); Variant actualVariant = (Variant) actual.getField("var"); - VariantTestUtil.assertEqual(EMPTY_METADATA, actualVariant.metadata()); - VariantTestUtil.assertEqual(expectedArray, actualVariant.value()); + VariantTestUtil.assertEqual(expected.metadata(), actualVariant.metadata()); + VariantTestUtil.assertEqual(expected.value(), actualVariant.value()); } @Test public void testArrayWithElementNullValueAndNullTypedValue() throws IOException { + ValueArray expectedArray = Variants.array(); + expectedArray.add(Variants.ofNull()); + + Variant expected = Variant.of(EMPTY_METADATA, expectedArray); + singleVariantCase("testArrayWithElementNullValueAndNullTypedValue", expected); + // Test the invalid case that both value and typed_value of an element are null Type shreddedType = shreddedPrimitive(PrimitiveTypeName.BINARY, STRING); GroupType elementType = element(shreddedType); @@ -1323,15 +1577,19 @@ public void testArrayWithElementNullValueAndNullTypedValue() throws IOException assertThat(actual.getField("var")).isInstanceOf(Variant.class); Variant actualVariant = (Variant) actual.getField("var"); - VariantTestUtil.assertEqual(EMPTY_METADATA, actualVariant.metadata()); VariantValue actualValue = actualVariant.value(); assertThat(actualValue.type()).isEqualTo(PhysicalType.ARRAY); assertThat(actualValue.asArray().numElements()).isEqualTo(1); - VariantTestUtil.assertEqual(Variants.ofNull(), actualValue.asArray().get(0)); + VariantTestUtil.assertEqual(expected.metadata(), actualVariant.metadata()); + VariantTestUtil.assertEqual(expected.value(), actualVariant.value()); } @Test - public void testArrayWithElementValueTypedValueConflict() { + public void testArrayWithElementValueTypedValueConflict() throws IOException { + errorCase( + "testArrayWithElementValueTypedValueConflict", + "Invalid variant, conflicting value and typed_value"); + // Test the invalid case that both value and typed_value of an element are not null Type shreddedType = shreddedPrimitive(PrimitiveTypeName.BINARY, STRING); GroupType elementType = element(shreddedType); @@ -1400,11 +1658,11 @@ protected WriteSupport getWriteSupport(Configuration conf) { } } - static Record writeAndRead(MessageType parquetSchema, GenericRecord record) throws IOException { + private Record writeAndRead(MessageType parquetSchema, GenericRecord record) throws IOException { return Iterables.getOnlyElement(writeAndRead(parquetSchema, List.of(record))); } - static List writeAndRead(MessageType parquetSchema, List records) + private List writeAndRead(MessageType parquetSchema, List records) throws IOException { OutputFile outputFile = new InMemoryOutputFile(); @@ -1415,6 +1673,8 @@ static List writeAndRead(MessageType parquetSchema, List } } + writeParquetFile(outputFile.toInputFile()); + try (CloseableIterable reader = Parquet.read(outputFile.toInputFile()) .project(SCHEMA) diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestVariantReadsFromFile.java b/parquet/src/test/java/org/apache/iceberg/parquet/TestVariantReadsFromFile.java new file mode 100644 index 000000000000..b32e1c25f4c2 --- /dev/null +++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestVariantReadsFromFile.java @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.parquet; + +import com.fasterxml.jackson.databind.JsonNode; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.List; +import java.util.stream.Stream; +import org.apache.hadoop.thirdparty.com.google.common.collect.Streams; +import org.apache.iceberg.Schema; +import org.apache.iceberg.TestTables; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.data.parquet.InternalReader; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.JsonUtil; +import org.apache.iceberg.variants.Variant; +import org.apache.iceberg.variants.VariantTestUtil; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +public class TestVariantReadsFromFile { + private static final Schema SCHEMA = + new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.required(2, "var", Types.VariantType.get())); + + private static final String CASE_LOCATION = null; + private static final FileIO IO = new TestTables.LocalFileIO(); + + private static Stream cases() throws IOException { + if (CASE_LOCATION == null) { + return Stream.of(JsonUtil.mapper().readValue("{\"case_number\": -1}", JsonNode.class)); + } + + InputFile caseJsonInput = IO.newInputFile(CASE_LOCATION + "/cases.json"); + JsonNode cases = JsonUtil.mapper().readValue(caseJsonInput.newStream(), JsonNode.class); + Preconditions.checkArgument( + cases != null && cases.isArray(), "Invalid case JSON, not an array: %s", caseJsonInput); + + return Streams.stream(cases); + } + + private static Stream errorCases() throws IOException { + return cases() + .filter(caseNode -> caseNode.has("error_message") || !caseNode.has("parquet_file")) + .map( + caseNode -> { + int caseNumber = JsonUtil.getInt("case_number", caseNode); + String testName = JsonUtil.getStringOrNull("test", caseNode); + String parquetFile = JsonUtil.getStringOrNull("parquet_file", caseNode); + String errorMessage = JsonUtil.getStringOrNull("error_message", caseNode); + return Arguments.of(caseNumber, testName, parquetFile, errorMessage); + }); + } + + private static Stream singleVariantCases() throws IOException { + return cases() + .filter(caseNode -> caseNode.has("variant_file") || !caseNode.has("parquet_file")) + .map( + caseNode -> { + int caseNumber = JsonUtil.getInt("case_number", caseNode); + String testName = JsonUtil.getStringOrNull("test", caseNode); + String variant = JsonUtil.getStringOrNull("variant", caseNode); + String parquetFile = JsonUtil.getStringOrNull("parquet_file", caseNode); + String variantFile = JsonUtil.getStringOrNull("variant_file", caseNode); + return Arguments.of(caseNumber, testName, variant, parquetFile, variantFile); + }); + } + + private static Stream multiVariantCases() throws IOException { + return cases() + .filter(caseNode -> caseNode.has("variant_files") || !caseNode.has("parquet_file")) + .map( + caseNode -> { + int caseNumber = JsonUtil.getInt("case_number", caseNode); + String testName = JsonUtil.getStringOrNull("test", caseNode); + String parquetFile = JsonUtil.getStringOrNull("parquet_file", caseNode); + List variantFiles = + caseNode.has("variant_files") + ? Lists.newArrayList( + Iterables.transform( + caseNode.get("variant_files"), + node -> node == null || node.isNull() ? null : node.asText())) + : null; + String variants = JsonUtil.getStringOrNull("variants", caseNode); + return Arguments.of(caseNumber, testName, variants, parquetFile, variantFiles); + }); + } + + @ParameterizedTest + @MethodSource("errorCases") + public void testError(int caseNumber, String testName, String parquetFile, String errorMessage) { + if (parquetFile == null) { + return; + } + + Assertions.assertThatThrownBy(() -> readParquet(parquetFile)) + .as("Test case %s: %s", caseNumber, testName) + .hasMessageContaining(errorMessage); + } + + @ParameterizedTest + @MethodSource("singleVariantCases") + public void testSingleVariant( + int caseNumber, String testName, String variant, String parquetFile, String variantFile) + throws IOException { + if (parquetFile == null) { + return; + } + + Variant expected = readVariant(variantFile); + + Record record = readParquetRecord(parquetFile); + Assertions.assertThat(record.getField("var")).isInstanceOf(Variant.class); + Variant actual = (Variant) record.getField("var"); + VariantTestUtil.assertEqual(expected.metadata(), actual.metadata()); + VariantTestUtil.assertEqual(expected.value(), actual.value()); + } + + @ParameterizedTest + @MethodSource("multiVariantCases") + public void testMultiVariant( + int caseNumber, + String testName, + String variants, + String parquetFile, + List variantFiles) + throws IOException { + if (parquetFile == null) { + return; + } + + List records = readParquet(parquetFile); + + for (int i = 0; i < records.size(); i += 1) { + Record record = records.get(i); + String variantFile = variantFiles.get(i); + + if (variantFile != null) { + Variant expected = readVariant(variantFile); + Assertions.assertThat(record.getField("var")).isInstanceOf(Variant.class); + Variant actual = (Variant) record.getField("var"); + VariantTestUtil.assertEqual(expected.metadata(), actual.metadata()); + VariantTestUtil.assertEqual(expected.value(), actual.value()); + } else { + Assertions.assertThat(record.getField("var")).isNull(); + } + } + } + + private Variant readVariant(String variantFile) throws IOException { + try (InputStream in = IO.newInputFile(CASE_LOCATION + "/" + variantFile).newStream()) { + byte[] variantBytes = in.readAllBytes(); + return Variant.from(ByteBuffer.wrap(variantBytes).order(ByteOrder.LITTLE_ENDIAN)); + } + } + + private Record readParquetRecord(String parquetFile) throws IOException { + return Iterables.getOnlyElement(readParquet(parquetFile)); + } + + private List readParquet(String parquetFile) throws IOException { + try (CloseableIterable reader = + Parquet.read(IO.newInputFile(CASE_LOCATION + "/" + parquetFile)) + .project(SCHEMA) + .createReaderFunc(fileSchema -> InternalReader.create(SCHEMA, fileSchema)) + .build()) { + return Lists.newArrayList(reader); + } + } +}