Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ subprojects {
force 'com.fasterxml.jackson.module:jackson-module-scala_2.11:2.10.2'
force 'com.fasterxml.jackson.module:jackson-module-scala_2.12:2.10.2'
force 'com.fasterxml.jackson.module:jackson-module-paranamer:2.10.2'
force 'com.fasterxml.jackson.core:jackson-databind:2.10.2'
}
}

Expand Down Expand Up @@ -532,10 +533,11 @@ project(':iceberg-mr') {
testCompile project(path: ':iceberg-core', configuration: 'testArtifacts')
testCompile project(path: ':iceberg-hive-metastore', configuration: 'testArtifacts')

testCompile("org.apache.avro:avro:1.9.2")
testCompile("org.apache.avro:avro:1.10.2")
testCompile("org.apache.calcite:calcite-core")
testCompile("com.esotericsoftware:kryo-shaded:4.0.2")
testCompile("com.fasterxml.jackson.core:jackson-annotations:2.6.5")
testCompile("com.fasterxml.jackson.core:jackson-annotations:2.10.2")
testCompile("com.fasterxml.jackson.core:jackson-databind:2.10.2")
testCompile("org.apache.hive:hive-service") {
exclude group: 'org.apache.hive', module: 'hive-exec'
}
Expand Down Expand Up @@ -602,7 +604,7 @@ if (jdkVersion == '8') {
testCompile("org.apache.avro:avro:1.9.2")
testCompile("org.apache.calcite:calcite-core")
testCompile("com.esotericsoftware:kryo-shaded:4.0.2")
testCompile("com.fasterxml.jackson.core:jackson-annotations:2.6.5")
testCompile("com.fasterxml.jackson.core:jackson-annotations:2.10.2")
testCompile("org.apache.hive:hive-service:3.1.2") {
exclude group: 'org.apache.hive', module: 'hive-exec'
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.avro.TestReadProjection;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.InputFile;
Expand Down Expand Up @@ -132,9 +133,9 @@ public void testV1ForwardCompatibility() throws IOException {
Assert.assertEquals("Added rows count", ADDED_ROWS, (long) generic.get("added_rows_count"));
Assert.assertEquals("Existing rows count", EXISTING_ROWS, (long) generic.get("existing_rows_count"));
Assert.assertEquals("Deleted rows count", DELETED_ROWS, (long) generic.get("deleted_rows_count"));
Assert.assertNull("Content", generic.get(ManifestFile.MANIFEST_CONTENT.name()));
Assert.assertNull("Sequence number", generic.get(ManifestFile.SEQUENCE_NUMBER.name()));
Assert.assertNull("Min sequence number", generic.get(ManifestFile.MIN_SEQUENCE_NUMBER.name()));
TestReadProjection.assertNotProjected("Content", generic, ManifestFile.MANIFEST_CONTENT.name());
TestReadProjection.assertNotProjected("Sequence number", generic, ManifestFile.SEQUENCE_NUMBER.name());
TestReadProjection.assertNotProjected("Min sequence number", generic, ManifestFile.MIN_SEQUENCE_NUMBER.name());
}

@Test
Expand All @@ -154,9 +155,9 @@ public void testV2ForwardCompatibility() throws IOException {
Assert.assertEquals("Added rows count", ADDED_ROWS, (long) generic.get("added_rows_count"));
Assert.assertEquals("Existing rows count", EXISTING_ROWS, (long) generic.get("existing_rows_count"));
Assert.assertEquals("Deleted rows count", DELETED_ROWS, (long) generic.get("deleted_rows_count"));
Assert.assertNull("Content", generic.get(ManifestFile.MANIFEST_CONTENT.name()));
Assert.assertNull("Sequence number", generic.get(ManifestFile.SEQUENCE_NUMBER.name()));
Assert.assertNull("Min sequence number", generic.get(ManifestFile.MIN_SEQUENCE_NUMBER.name()));
TestReadProjection.assertNotProjected("Content", generic, ManifestFile.MANIFEST_CONTENT.name());
TestReadProjection.assertNotProjected("Sequence number", generic, ManifestFile.SEQUENCE_NUMBER.name());
TestReadProjection.assertNotProjected("Min sequence number", generic, ManifestFile.MIN_SEQUENCE_NUMBER.name());
}

@Test
Expand Down
97 changes: 53 additions & 44 deletions core/src/test/java/org/apache/iceberg/avro/TestReadProjection.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@ private LegacyHiveTableUtils() {
static Schema getSchema(org.apache.hadoop.hive.metastore.api.Table table) {
Map<String, String> props = getTableProperties(table);
String schemaStr = props.get("avro.schema.literal");
org.apache.avro.Schema avroSchema = schemaStr != null ? new org.apache.avro.Schema.Parser().parse(schemaStr) : null;
// Disable default value validation for backward compatibility with Avro 1.7
org.apache.avro.Schema avroSchema =
schemaStr != null ? new org.apache.avro.Schema.Parser().setValidateDefaults(false).parse(schemaStr) : null;
Schema schema;
if (avroSchema != null) {
String serde = table.getSd().getSerdeInfo().getSerializationLib();
Expand Down
21 changes: 13 additions & 8 deletions parquet/src/main/java/org/apache/iceberg/parquet/ParquetAvro.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
package org.apache.iceberg.parquet;

import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.List;
import java.util.Map;
import java.util.WeakHashMap;
import org.apache.avro.Conversion;
import org.apache.avro.Conversions;
import org.apache.avro.LogicalType;
Expand All @@ -30,6 +32,7 @@
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericFixed;
import org.apache.avro.specific.SpecificData;
import org.apache.commons.math3.util.Pair;
import org.apache.iceberg.avro.AvroSchemaVisitor;
import org.apache.iceberg.avro.UUIDConversion;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
Expand All @@ -48,8 +51,8 @@ static Schema parquetAvroSchema(Schema avroSchema) {
static class ParquetDecimal extends LogicalType {
private static final String NAME = "parquet-decimal";

private int precision;
private int scale;
private final int precision;
private final int scale;

ParquetDecimal(int precision, int scale) {
super(NAME);
Expand Down Expand Up @@ -154,12 +157,10 @@ public Long toLong(BigDecimal value, org.apache.avro.Schema schema, LogicalType
}

private static class FixedDecimalConversion extends Conversions.DecimalConversion {
private final LogicalType[] decimalsByScale = new LogicalType[39];
private final WeakHashMap<Pair<Integer, Integer>, LogicalType> decimalsByScale;

private FixedDecimalConversion() {
for (int i = 0; i < decimalsByScale.length; i += 1) {
decimalsByScale[i] = LogicalTypes.decimal(i, i);
}
this.decimalsByScale = new WeakHashMap<>();
}

@Override
Expand All @@ -169,12 +170,16 @@ public String getLogicalTypeName() {

@Override
public BigDecimal fromFixed(GenericFixed value, Schema schema, LogicalType type) {
return super.fromFixed(value, schema, decimalsByScale[((ParquetDecimal) type).scale()]);
ParquetDecimal dec = (ParquetDecimal) type;
return new BigDecimal(new BigInteger(value.bytes()), dec.scale());
}

@Override
public GenericFixed toFixed(BigDecimal value, Schema schema, LogicalType type) {
return super.toFixed(value, schema, decimalsByScale[((ParquetDecimal) type).scale()]);
ParquetDecimal dec = (ParquetDecimal) type;
Pair<Integer, Integer> key = new Pair<>(dec.precision(), dec.scale());
return super.toFixed(value, schema,
decimalsByScale.computeIfAbsent(key, k -> LogicalTypes.decimal(k.getFirst(), k.getSecond())));
}
}

Expand Down
Loading