diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java index ab7b1174c9f3..55fdf58c7078 100644 --- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java +++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java @@ -262,7 +262,11 @@ public ParquetValueReader primitive( switch (primitive.getPrimitiveTypeName()) { case FIXED_LEN_BYTE_ARRAY: case BINARY: - return new ParquetValueReaders.ByteArrayReader(desc); + if (expected.typeId() == Types.StringType.get().typeId()) { + return new StringReader(desc); + } else { + return new ParquetValueReaders.ByteArrayReader(desc); + } case INT32: if (expected.typeId() == org.apache.iceberg.types.Type.TypeID.LONG) { return new ParquetValueReaders.IntAsLongReader(desc); diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java index 30a2a7bb51ce..a87a7b053c79 100644 --- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java +++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java @@ -19,16 +19,19 @@ package org.apache.iceberg.flink.data; import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.assertj.core.api.Assertions.assertThat; import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.util.Iterator; import java.util.List; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecordBuilder; import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.binary.BinaryStringData; import org.apache.flink.table.types.logical.LogicalType; import org.apache.hadoop.fs.Path; import org.apache.iceberg.Files; @@ -47,7 +50,6 @@ import org.apache.iceberg.types.Types; import org.apache.parquet.avro.AvroParquetWriter; import org.apache.parquet.hadoop.ParquetWriter; -import org.junit.Assert; import org.junit.Test; public class TestFlinkParquetReader extends DataTest { @@ -62,15 +64,7 @@ public void testTwoLevelList() throws IOException { org.apache.avro.Schema avroSchema = AvroSchemaUtil.convert(schema.asStruct()); File testFile = temp.newFile(); - Assert.assertTrue(testFile.delete()); - - ParquetWriter writer = - AvroParquetWriter.builder(new Path(testFile.toURI())) - .withDataModel(GenericData.get()) - .withSchema(avroSchema) - .config("parquet.avro.add-list-element-records", "true") - .config("parquet.avro.write-old-list-structure", "true") - .build(); + assertThat(testFile.delete()).isTrue(); GenericRecordBuilder recordBuilder = new GenericRecordBuilder(avroSchema); List expectedByteList = Lists.newArrayList(); @@ -81,8 +75,15 @@ public void testTwoLevelList() throws IOException { recordBuilder.set("topbytes", expectedBinary); GenericData.Record expectedRecord = recordBuilder.build(); - writer.write(expectedRecord); - writer.close(); + try (ParquetWriter writer = + AvroParquetWriter.builder(new Path(testFile.toURI())) + .withDataModel(GenericData.get()) + .withSchema(avroSchema) + .config("parquet.avro.add-list-element-records", "true") + .config("parquet.avro.write-old-list-structure", "true") + .build()) { + writer.write(expectedRecord); + } try (CloseableIterable reader = Parquet.read(Files.localInput(testFile)) @@ -90,17 +91,71 @@ public void testTwoLevelList() throws IOException { .createReaderFunc(type -> FlinkParquetReaders.buildReader(schema, type)) .build()) { Iterator rows = reader.iterator(); - Assert.assertTrue("Should have at least one row", rows.hasNext()); + assertThat(rows).as("Should have at least one row").hasNext(); + RowData rowData = rows.next(); + assertThat(expectedByte).isEqualTo(rowData.getArray(0).getBinary(0)); + assertThat(expectedByte).isEqualTo(rowData.getBinary(1)); + assertThat(rows).as("Should not have more than one row").isExhausted(); + } + } + + @Test + public void testReadBinaryFieldAsString() throws IOException { + Schema schemaForWriteBinary = new Schema(optional(1, "strbytes", Types.BinaryType.get())); + org.apache.avro.Schema avroSchema = AvroSchemaUtil.convert(schemaForWriteBinary.asStruct()); + + File testFile = temp.newFile(); + assertThat(testFile.delete()).isTrue(); + + String expectedString = "hello"; + + GenericRecordBuilder recordBuilder = new GenericRecordBuilder(avroSchema); + ByteBuffer expectedBinary = ByteBuffer.wrap(expectedString.getBytes(StandardCharsets.UTF_8)); + recordBuilder.set("strbytes", expectedBinary); + GenericData.Record expectedRecord = recordBuilder.build(); + + try (ParquetWriter writer = + AvroParquetWriter.builder(new Path(testFile.toURI())) + .withDataModel(GenericData.get()) + .withSchema(avroSchema) + .build()) { + writer.write(expectedRecord); + } + + // read as string + Schema schemaForReadBinaryAsString = + new Schema(optional(1, "strbytes", Types.StringType.get())); + try (CloseableIterable reader = + Parquet.read(Files.localInput(testFile)) + .project(schemaForReadBinaryAsString) + .createReaderFunc( + type -> FlinkParquetReaders.buildReader(schemaForReadBinaryAsString, type)) + .build()) { + Iterator rows = reader.iterator(); + assertThat(rows).as("Should have at least one row").hasNext(); + RowData rowData = rows.next(); + assertThat(rowData.getString(0)).isInstanceOf(BinaryStringData.class); + assertThat(rowData.getString(0).toString()).isEqualTo(expectedString); + assertThat(rows).as("Should not have more than one row").isExhausted(); + } + + // read as byte[] + try (CloseableIterable reader = + Parquet.read(Files.localInput(testFile)) + .project(schemaForWriteBinary) + .createReaderFunc(type -> FlinkParquetReaders.buildReader(schemaForWriteBinary, type)) + .build()) { + Iterator rows = reader.iterator(); + assertThat(rows).as("Should have at least one row").hasNext(); RowData rowData = rows.next(); - Assert.assertArrayEquals(rowData.getArray(0).getBinary(0), expectedByte); - Assert.assertArrayEquals(rowData.getBinary(1), expectedByte); - Assert.assertFalse("Should not have more than one row", rows.hasNext()); + assertThat(rowData.getBinary(0)).isEqualTo(expectedString.getBytes(StandardCharsets.UTF_8)); + assertThat(rows).as("Should not have more than one row").isExhausted(); } } private void writeAndValidate(Iterable iterable, Schema schema) throws IOException { File testFile = temp.newFile(); - Assert.assertTrue("Delete should succeed", testFile.delete()); + assertThat(testFile.delete()).as("Delete should succeed").isTrue(); try (FileAppender writer = Parquet.write(Files.localOutput(testFile)) @@ -119,10 +174,10 @@ private void writeAndValidate(Iterable iterable, Schema schema) throws I Iterator rows = reader.iterator(); LogicalType rowType = FlinkSchemaUtil.convert(schema); for (int i = 0; i < NUM_RECORDS; i += 1) { - Assert.assertTrue("Should have expected number of rows", rows.hasNext()); + assertThat(rows.hasNext()).as("Should have expected number of rows").isTrue(); TestHelpers.assertRowData(schema.asStruct(), rowType, expected.next(), rows.next()); } - Assert.assertFalse("Should not have extra rows", rows.hasNext()); + assertThat(rows.hasNext()).as("Should not have extra rows").isFalse(); } } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java index af16d9bbc290..e7ddfe37c0f5 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java @@ -284,8 +284,13 @@ public ParquetValueReader primitive( switch (primitive.getPrimitiveTypeName()) { case FIXED_LEN_BYTE_ARRAY: case BINARY: - if (expected != null && expected.typeId() == TypeID.UUID) { - return new UUIDReader(desc); + if (expected != null) { + switch (expected.typeId()) { + case UUID: + return new UUIDReader(desc); + case STRING: + return new StringReader(desc); + } } return new ParquetValueReaders.ByteArrayReader(desc); case INT32: diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReader.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReader.java index 024ce3a60c2b..f9472ec38163 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReader.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReader.java @@ -19,14 +19,19 @@ package org.apache.iceberg.spark.data; import static org.apache.iceberg.spark.data.TestHelpers.assertEqualsUnsafe; +import static org.apache.iceberg.types.Types.NestedField.optional; import static org.apache.iceberg.types.Types.NestedField.required; +import static org.assertj.core.api.Assertions.assertThat; import java.io.File; import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.util.Iterator; import java.util.List; import java.util.Map; import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecordBuilder; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.DataFiles; import org.apache.iceberg.FileFormat; @@ -35,6 +40,7 @@ import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; +import org.apache.iceberg.avro.AvroSchemaUtil; import org.apache.iceberg.data.IcebergGenerics; import org.apache.iceberg.data.Record; import org.apache.iceberg.hadoop.HadoopTables; @@ -172,6 +178,59 @@ public void testInt96TimestampProducedBySparkIsReadCorrectly() throws IOExceptio } } + @Test + public void testReadBinaryFieldAsString() throws IOException { + Schema schemaForWriteBinary = new Schema(optional(1, "strbytes", Types.BinaryType.get())); + org.apache.avro.Schema avroSchema = AvroSchemaUtil.convert(schemaForWriteBinary.asStruct()); + + File testFile = temp.newFile(); + assertThat(testFile.delete()).isTrue(); + + String expectedString = "hello"; + + GenericRecordBuilder recordBuilder = new GenericRecordBuilder(avroSchema); + ByteBuffer expectedBinary = ByteBuffer.wrap(expectedString.getBytes(StandardCharsets.UTF_8)); + recordBuilder.set("strbytes", expectedBinary); + GenericData.Record expectedRecord = recordBuilder.build(); + + try (FileAppender writer = + Parquet.write(Files.localOutput(testFile)) + .schema(schemaForWriteBinary) + .named("test") + .build()) { + writer.add(expectedRecord); + } + + // read as string + Schema schemaForReadBinaryAsString = + new Schema(optional(1, "strbytes", Types.StringType.get())); + try (CloseableIterable reader = + Parquet.read(Files.localInput(testFile)) + .project(schemaForReadBinaryAsString) + .createReaderFunc( + type -> SparkParquetReaders.buildReader(schemaForReadBinaryAsString, type)) + .build()) { + Iterator rows = reader.iterator(); + assertThat(rows).as("Should have at least one row").hasNext(); + InternalRow row = rows.next(); + assertThat(row.getString(0)).isEqualTo(expectedString); + assertThat(rows).as("Should not have more than one row").isExhausted(); + } + + // read as byte[] + try (CloseableIterable reader = + Parquet.read(Files.localInput(testFile)) + .project(schemaForReadBinaryAsString) + .createReaderFunc(type -> SparkParquetReaders.buildReader(schemaForWriteBinary, type)) + .build()) { + Iterator rows = reader.iterator(); + assertThat(rows).as("Should have at least one row").hasNext(); + InternalRow row = rows.next(); + assertThat(row.getBinary(0)).isEqualTo(expectedString.getBytes(StandardCharsets.UTF_8)); + assertThat(rows).as("Should not have more than one row").isExhausted(); + } + } + /** * Native Spark ParquetWriter.Builder implementation so that we can write timestamps using Spark's * native ParquetWriteSupport.