diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroReadSupport.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroReadSupport.java index eca14413a6..dce9c8ea37 100644 --- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroReadSupport.java +++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroReadSupport.java @@ -25,8 +25,10 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.util.ReflectionUtils; import org.apache.parquet.hadoop.api.ReadSupport; +import org.apache.parquet.io.InvalidRecordException; import org.apache.parquet.io.api.RecordMaterializer; import org.apache.parquet.schema.MessageType; +import org.apache.log4j.Logger; /** * Avro implementation of {@link ReadSupport} for avro generic, specific, and @@ -36,6 +38,7 @@ * @param the Java type of records created by this ReadSupport */ public class AvroReadSupport extends ReadSupport { + static Logger log = Logger.getLogger(AvroReadSupport.class.getName()); public static String AVRO_REQUESTED_PROJECTION = "parquet.avro.projection"; private static final String AVRO_READ_SCHEMA = "parquet.avro.read.schema"; @@ -136,10 +139,22 @@ public RecordMaterializer prepareForRead( GenericData model = getDataModel(configuration); String compatEnabled = metadata.get(AvroReadSupport.AVRO_COMPATIBILITY); - if (compatEnabled != null && Boolean.valueOf(compatEnabled)) { - return newCompatMaterializer(parquetSchema, avroSchema, model); + + try { + if (compatEnabled != null && Boolean.valueOf(compatEnabled)) { + return newCompatMaterializer(parquetSchema, avroSchema, model); + } + return new AvroRecordMaterializer(parquetSchema, avroSchema, model); + } catch (InvalidRecordException | ClassCastException e) { + log.error("Warning, Avro schema doesn't match Parquet schema, falling back to conversion: ", e); + // If the Avro schema is bad, fall back to reconstructing it from the Parquet schema + avroSchema = new AvroSchemaConverter(configuration).convert(parquetSchema); + + if (compatEnabled != null && Boolean.valueOf(compatEnabled)) { + return newCompatMaterializer(parquetSchema, avroSchema, model); + } + return new AvroRecordMaterializer(parquetSchema, avroSchema, model); } - return new AvroRecordMaterializer(parquetSchema, avroSchema, model); } @SuppressWarnings("unchecked") diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestArrayListCompatibility.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestArrayListCompatibility.java new file mode 100644 index 0000000000..f565c25cda --- /dev/null +++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestArrayListCompatibility.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.avro; + +import com.google.common.io.Resources; +import org.apache.avro.generic.GenericData; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.hadoop.ParquetReader; +import org.junit.Test; +import java.io.IOException; + +public class TestArrayListCompatibility { + + @Test + public void testListArrayCompatibility() throws IOException { + Path testPath = new Path(Resources.getResource("list-array-compat.parquet").getFile()); + + Configuration conf = new Configuration(); + ParquetReader parquetReader = + AvroParquetReader.builder(testPath).withConf(conf).build(); + GenericData.Record firstRecord; + firstRecord = (GenericData.Record) parquetReader.read(); + if (firstRecord == null) { + throw new IOException("Can't process empty Parquet file"); + } + firstRecord.getSchema().toString(true); + } + +} diff --git a/parquet-avro/src/test/resources/list-array-compat.parquet b/parquet-avro/src/test/resources/list-array-compat.parquet new file mode 100644 index 0000000000..e68ff0c33f Binary files /dev/null and b/parquet-avro/src/test/resources/list-array-compat.parquet differ