Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,11 @@ public ParquetValueReader<?> primitive(
switch (primitive.getPrimitiveTypeName()) {
case FIXED_LEN_BYTE_ARRAY:
case BINARY:
return new ParquetValueReaders.ByteArrayReader(desc);
if (expected.typeId() == Types.StringType.get().typeId()) {
return new StringReader(desc);
} else {
return new ParquetValueReaders.ByteArrayReader(desc);
}
case INT32:
if (expected.typeId() == org.apache.iceberg.types.Type.TypeID.LONG) {
return new ParquetValueReaders.IntAsLongReader(desc);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,19 @@
package org.apache.iceberg.flink.data;

import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.assertj.core.api.Assertions.assertThat;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.List;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryStringData;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.Files;
Expand All @@ -47,7 +50,6 @@
import org.apache.iceberg.types.Types;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.junit.Assert;
import org.junit.Test;

public class TestFlinkParquetReader extends DataTest {
Expand All @@ -62,15 +64,7 @@ public void testTwoLevelList() throws IOException {
org.apache.avro.Schema avroSchema = AvroSchemaUtil.convert(schema.asStruct());

File testFile = temp.newFile();
Assert.assertTrue(testFile.delete());

ParquetWriter<GenericRecord> writer =
AvroParquetWriter.<GenericRecord>builder(new Path(testFile.toURI()))
.withDataModel(GenericData.get())
.withSchema(avroSchema)
.config("parquet.avro.add-list-element-records", "true")
.config("parquet.avro.write-old-list-structure", "true")
.build();
assertThat(testFile.delete()).isTrue();

GenericRecordBuilder recordBuilder = new GenericRecordBuilder(avroSchema);
List<ByteBuffer> expectedByteList = Lists.newArrayList();
Expand All @@ -81,26 +75,87 @@ public void testTwoLevelList() throws IOException {
recordBuilder.set("topbytes", expectedBinary);
GenericData.Record expectedRecord = recordBuilder.build();

writer.write(expectedRecord);
writer.close();
try (ParquetWriter<GenericRecord> writer =
AvroParquetWriter.<GenericRecord>builder(new Path(testFile.toURI()))
.withDataModel(GenericData.get())
.withSchema(avroSchema)
.config("parquet.avro.add-list-element-records", "true")
.config("parquet.avro.write-old-list-structure", "true")
.build()) {
writer.write(expectedRecord);
}

try (CloseableIterable<RowData> reader =
Parquet.read(Files.localInput(testFile))
.project(schema)
.createReaderFunc(type -> FlinkParquetReaders.buildReader(schema, type))
.build()) {
Iterator<RowData> rows = reader.iterator();
Assert.assertTrue("Should have at least one row", rows.hasNext());
assertThat(rows).as("Should have at least one row").hasNext();
RowData rowData = rows.next();
assertThat(expectedByte).isEqualTo(rowData.getArray(0).getBinary(0));
assertThat(expectedByte).isEqualTo(rowData.getBinary(1));
assertThat(rows).as("Should not have more than one row").isExhausted();
}
}

@Test
public void testReadBinaryFieldAsString() throws IOException {
Schema schemaForWriteBinary = new Schema(optional(1, "strbytes", Types.BinaryType.get()));
org.apache.avro.Schema avroSchema = AvroSchemaUtil.convert(schemaForWriteBinary.asStruct());

File testFile = temp.newFile();
assertThat(testFile.delete()).isTrue();

String expectedString = "hello";

GenericRecordBuilder recordBuilder = new GenericRecordBuilder(avroSchema);
ByteBuffer expectedBinary = ByteBuffer.wrap(expectedString.getBytes(StandardCharsets.UTF_8));
recordBuilder.set("strbytes", expectedBinary);
GenericData.Record expectedRecord = recordBuilder.build();

try (ParquetWriter<GenericRecord> writer =
AvroParquetWriter.<GenericRecord>builder(new Path(testFile.toURI()))
.withDataModel(GenericData.get())
.withSchema(avroSchema)
.build()) {
writer.write(expectedRecord);
}

// read as string
Schema schemaForReadBinaryAsString =
new Schema(optional(1, "strbytes", Types.StringType.get()));
try (CloseableIterable<RowData> reader =
Parquet.read(Files.localInput(testFile))
.project(schemaForReadBinaryAsString)
.createReaderFunc(
type -> FlinkParquetReaders.buildReader(schemaForReadBinaryAsString, type))
.build()) {
Iterator<RowData> rows = reader.iterator();
assertThat(rows).as("Should have at least one row").hasNext();
RowData rowData = rows.next();
assertThat(rowData.getString(0)).isInstanceOf(BinaryStringData.class);
assertThat(rowData.getString(0).toString()).isEqualTo(expectedString);
assertThat(rows).as("Should not have more than one row").isExhausted();
}

// read as byte[]
try (CloseableIterable<RowData> reader =
Parquet.read(Files.localInput(testFile))
.project(schemaForWriteBinary)
.createReaderFunc(type -> FlinkParquetReaders.buildReader(schemaForWriteBinary, type))
.build()) {
Iterator<RowData> rows = reader.iterator();
assertThat(rows).as("Should have at least one row").hasNext();
RowData rowData = rows.next();
Assert.assertArrayEquals(rowData.getArray(0).getBinary(0), expectedByte);
Assert.assertArrayEquals(rowData.getBinary(1), expectedByte);
Assert.assertFalse("Should not have more than one row", rows.hasNext());
assertThat(rowData.getBinary(0)).isEqualTo(expectedString.getBytes(StandardCharsets.UTF_8));
assertThat(rows).as("Should not have more than one row").isExhausted();
}
}

private void writeAndValidate(Iterable<Record> iterable, Schema schema) throws IOException {
File testFile = temp.newFile();
Assert.assertTrue("Delete should succeed", testFile.delete());
assertThat(testFile.delete()).as("Delete should succeed").isTrue();

try (FileAppender<Record> writer =
Parquet.write(Files.localOutput(testFile))
Expand All @@ -119,10 +174,10 @@ private void writeAndValidate(Iterable<Record> iterable, Schema schema) throws I
Iterator<RowData> rows = reader.iterator();
LogicalType rowType = FlinkSchemaUtil.convert(schema);
for (int i = 0; i < NUM_RECORDS; i += 1) {
Assert.assertTrue("Should have expected number of rows", rows.hasNext());
assertThat(rows.hasNext()).as("Should have expected number of rows").isTrue();
TestHelpers.assertRowData(schema.asStruct(), rowType, expected.next(), rows.next());
}
Assert.assertFalse("Should not have extra rows", rows.hasNext());
assertThat(rows.hasNext()).as("Should not have extra rows").isFalse();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,8 +284,13 @@ public ParquetValueReader<?> primitive(
switch (primitive.getPrimitiveTypeName()) {
case FIXED_LEN_BYTE_ARRAY:
case BINARY:
if (expected != null && expected.typeId() == TypeID.UUID) {
return new UUIDReader(desc);
if (expected != null) {
switch (expected.typeId()) {
case UUID:
return new UUIDReader(desc);
case STRING:
return new StringReader(desc);
}
}
return new ParquetValueReaders.ByteArrayReader(desc);
case INT32:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,19 @@
package org.apache.iceberg.spark.data;

import static org.apache.iceberg.spark.data.TestHelpers.assertEqualsUnsafe;
import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.apache.iceberg.types.Types.NestedField.required;
import static org.assertj.core.api.Assertions.assertThat;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.FileFormat;
Expand All @@ -35,6 +40,7 @@
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.data.IcebergGenerics;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.hadoop.HadoopTables;
Expand Down Expand Up @@ -172,6 +178,59 @@ public void testInt96TimestampProducedBySparkIsReadCorrectly() throws IOExceptio
}
}

@Test
public void testReadBinaryFieldAsString() throws IOException {
Schema schemaForWriteBinary = new Schema(optional(1, "strbytes", Types.BinaryType.get()));
org.apache.avro.Schema avroSchema = AvroSchemaUtil.convert(schemaForWriteBinary.asStruct());

File testFile = temp.newFile();
assertThat(testFile.delete()).isTrue();

String expectedString = "hello";

GenericRecordBuilder recordBuilder = new GenericRecordBuilder(avroSchema);
ByteBuffer expectedBinary = ByteBuffer.wrap(expectedString.getBytes(StandardCharsets.UTF_8));
recordBuilder.set("strbytes", expectedBinary);
GenericData.Record expectedRecord = recordBuilder.build();

try (FileAppender<GenericData.Record> writer =
Parquet.write(Files.localOutput(testFile))
.schema(schemaForWriteBinary)
.named("test")
.build()) {
writer.add(expectedRecord);
}

// read as string
Schema schemaForReadBinaryAsString =
new Schema(optional(1, "strbytes", Types.StringType.get()));
try (CloseableIterable<InternalRow> reader =
Parquet.read(Files.localInput(testFile))
.project(schemaForReadBinaryAsString)
.createReaderFunc(
type -> SparkParquetReaders.buildReader(schemaForReadBinaryAsString, type))
.build()) {
Iterator<InternalRow> rows = reader.iterator();
assertThat(rows).as("Should have at least one row").hasNext();
InternalRow row = rows.next();
assertThat(row.getString(0)).isEqualTo(expectedString);
assertThat(rows).as("Should not have more than one row").isExhausted();
}

// read as byte[]
try (CloseableIterable<InternalRow> reader =
Parquet.read(Files.localInput(testFile))
.project(schemaForReadBinaryAsString)
.createReaderFunc(type -> SparkParquetReaders.buildReader(schemaForWriteBinary, type))
.build()) {
Iterator<InternalRow> rows = reader.iterator();
assertThat(rows).as("Should have at least one row").hasNext();
InternalRow row = rows.next();
assertThat(row.getBinary(0)).isEqualTo(expectedString.getBytes(StandardCharsets.UTF_8));
assertThat(rows).as("Should not have more than one row").isExhausted();
}
}

/**
* Native Spark ParquetWriter.Builder implementation so that we can write timestamps using Spark's
* native ParquetWriteSupport.
Expand Down