From d4fa26d439608ac84a6105d46bf94e16827bc92f Mon Sep 17 00:00:00 2001 From: Rusty Conover Date: Sat, 23 Sep 2023 15:48:28 -0400 Subject: [PATCH] Core: Avro writers use BlockingBinaryEncoder to enable array/map sizes When writing Avro files often Iceberg is writing arrays and maps. The current use of binaryEncoder() and directBinaryEncoder() of org.apache.avro.io.EncoderFactory do not write the length of the arrays or maps to Avro since the encoder does not buffer the output to calculate a length. Knowing the length of an array or map is useful to clients decoding the Avro file since they can skip decoding the entire array or map if it is not needed when reading the file. This PR changes all Avro writers to use blockingBinaryEncoder(), this encoder does not "block" in the concurrency sense but it does buffer the output of objects such that the lengths of arrays and maps will be calculated. See: https://avro.apache.org/docs/1.5.1/api/java/org/apache/avro/io/EncoderFactory.html#blockingBinaryEncoder(java.io.OutputStream,%20org.apache.avro.io.BinaryEncoder) For details between the different Avro encoders. --- .../main/java/org/apache/iceberg/avro/AvroEncoderUtil.java | 3 ++- .../java/org/apache/iceberg/data/avro/IcebergEncoder.java | 5 ++++- .../org/apache/iceberg/encryption/KeyMetadataEncoder.java | 5 ++++- 3 files changed, 10 insertions(+), 3 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/avro/AvroEncoderUtil.java b/core/src/main/java/org/apache/iceberg/avro/AvroEncoderUtil.java index ba3c6fece7f9..866d218d55ed 100644 --- a/core/src/main/java/org/apache/iceberg/avro/AvroEncoderUtil.java +++ b/core/src/main/java/org/apache/iceberg/avro/AvroEncoderUtil.java @@ -54,7 +54,8 @@ public static byte[] encode(T datum, Schema avroSchema) throws IOException { dataOut.writeUTF(avroSchema.toString()); // Encode the datum with avro schema. - BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null); + BinaryEncoder encoder = + new EncoderFactory().configureBlockSize(1024 * 1024).blockingBinaryEncoder(out, null); DatumWriter writer = new GenericAvroWriter<>(avroSchema); writer.write(datum, encoder); encoder.flush(); diff --git a/core/src/main/java/org/apache/iceberg/data/avro/IcebergEncoder.java b/core/src/main/java/org/apache/iceberg/data/avro/IcebergEncoder.java index a70e79aac2c5..cb9c112c3810 100644 --- a/core/src/main/java/org/apache/iceberg/data/avro/IcebergEncoder.java +++ b/core/src/main/java/org/apache/iceberg/data/avro/IcebergEncoder.java @@ -98,7 +98,10 @@ public ByteBuffer encode(D datum) throws IOException { @Override public void encode(D datum, OutputStream stream) throws IOException { - BinaryEncoder encoder = EncoderFactory.get().directBinaryEncoder(stream, ENCODER.get()); + BinaryEncoder encoder = + new EncoderFactory() + .configureBlockSize(1024 * 1024) + .blockingBinaryEncoder(stream, ENCODER.get()); ENCODER.set(encoder); writer.write(datum, encoder); encoder.flush(); diff --git a/core/src/main/java/org/apache/iceberg/encryption/KeyMetadataEncoder.java b/core/src/main/java/org/apache/iceberg/encryption/KeyMetadataEncoder.java index faab6a47c814..2866e01521e0 100644 --- a/core/src/main/java/org/apache/iceberg/encryption/KeyMetadataEncoder.java +++ b/core/src/main/java/org/apache/iceberg/encryption/KeyMetadataEncoder.java @@ -90,7 +90,10 @@ public ByteBuffer encode(KeyMetadata datum) throws IOException { @Override public void encode(KeyMetadata datum, OutputStream stream) throws IOException { - BinaryEncoder encoder = EncoderFactory.get().directBinaryEncoder(stream, ENCODER.get()); + BinaryEncoder encoder = + new EncoderFactory() + .configureBlockSize(1024 * 1024) + .blockingBinaryEncoder(stream, ENCODER.get()); ENCODER.set(encoder); writer.write(datum, encoder); encoder.flush();