Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.avro.generic.GenericRecord
import org.apache.avro.{JsonProperties, Schema}
import org.apache.hudi.HoodieSparkUtils.sparkAdapter
import org.apache.hudi.avro.AvroSchemaUtils
import org.apache.hudi.exception.SchemaCompatibilityException
import org.apache.hudi.internal.schema.HoodieSchemaException
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
Expand Down Expand Up @@ -58,9 +59,19 @@ object AvroConversionUtils {
*/
def createInternalRowToAvroConverter(rootCatalystType: StructType, rootAvroType: Schema, nullable: Boolean): InternalRow => GenericRecord = {
val serializer = sparkAdapter.createAvroSerializer(rootCatalystType, rootAvroType, nullable)
row => serializer
.serialize(row)
.asInstanceOf[GenericRecord]
row => {
try {
serializer
.serialize(row)
.asInstanceOf[GenericRecord]
} catch {
case e: HoodieSchemaException => throw e
case e => throw new SchemaCompatibilityException("Failed to convert spark record into avro record", e)
}

}


}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,14 @@

package org.apache.hudi

import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.hadoop.fs.Path
import org.apache.hudi.HoodieConversionUtils.toScalaOption
import org.apache.hudi.avro.{AvroSchemaUtils, HoodieAvroUtils}
import org.apache.hudi.client.utils.SparkRowSerDe
import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.hadoop.fs.CachingPath

import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.hadoop.fs.Path
import org.apache.spark.SPARK_VERSION
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@
package org.apache.hudi.avro;

import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieAvroSchemaException;
import org.apache.hudi.exception.InvalidUnionTypeException;
import org.apache.hudi.exception.MissingSchemaFieldException;
import org.apache.hudi.exception.SchemaBackwardsCompatibilityException;
import org.apache.hudi.exception.SchemaCompatibilityException;
import org.apache.hudi.exception.InvalidUnionTypeException;

import org.apache.avro.AvroRuntimeException;
import org.apache.avro.Schema;
import org.apache.avro.SchemaCompatibility;

Expand Down Expand Up @@ -317,7 +317,7 @@ public static Schema resolveUnionSchema(Schema schema, String fieldSchemaFullNam
.orElse(null);

if (nonNullType == null) {
throw new AvroRuntimeException(
throw new HoodieAvroSchemaException(
String.format("Unsupported Avro UNION type %s: Only UNION of a null type and a non-null type is supported", schema));
}

Expand Down Expand Up @@ -349,14 +349,14 @@ public static Schema resolveNullableSchema(Schema schema) {
List<Schema> innerTypes = schema.getTypes();

if (innerTypes.size() != 2) {
throw new AvroRuntimeException(
throw new HoodieAvroSchemaException(
String.format("Unsupported Avro UNION type %s: Only UNION of a null type and a non-null type is supported", schema));
}
Schema firstInnerType = innerTypes.get(0);
Schema secondInnerType = innerTypes.get(1);
if ((firstInnerType.getType() != Schema.Type.NULL && secondInnerType.getType() != Schema.Type.NULL)
|| (firstInnerType.getType() == Schema.Type.NULL && secondInnerType.getType() == Schema.Type.NULL)) {
throw new AvroRuntimeException(
throw new HoodieAvroSchemaException(
String.format("Unsupported Avro UNION type %s: Only UNION of a null type and a non-null type is supported", schema));
}
return firstInnerType.getType() == Schema.Type.NULL ? secondInnerType : firstInnerType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieAvroSchemaException;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.SchemaCompatibilityException;
Expand Down Expand Up @@ -931,7 +932,9 @@ private static Object rewriteRecordWithNewSchema(Object oldRecord, Schema oldAvr
private static Object rewriteRecordWithNewSchemaInternal(Object oldRecord, Schema oldSchema, Schema newSchema, Map<String, String> renameCols, Deque<String> fieldNames) {
switch (newSchema.getType()) {
case RECORD:
ValidationUtils.checkArgument(oldRecord instanceof IndexedRecord, "cannot rewrite record with different type");
if (!(oldRecord instanceof IndexedRecord)) {
throw new SchemaCompatibilityException("cannot rewrite record with different type");
}
IndexedRecord indexedRecord = (IndexedRecord) oldRecord;
List<Schema.Field> fields = newSchema.getFields();
GenericData.Record newRecord = new GenericData.Record(newSchema);
Expand Down Expand Up @@ -963,15 +966,17 @@ private static Object rewriteRecordWithNewSchemaInternal(Object oldRecord, Schem
}
return newRecord;
case ENUM:
ValidationUtils.checkArgument(
oldSchema.getType() == Schema.Type.STRING || oldSchema.getType() == Schema.Type.ENUM,
"Only ENUM or STRING type can be converted ENUM type");
if (oldSchema.getType() != Schema.Type.STRING && oldSchema.getType() != Schema.Type.ENUM) {
throw new SchemaCompatibilityException(String.format("Only ENUM or STRING type can be converted ENUM type. Schema type was %s", oldSchema.getType().getName()));
}
if (oldSchema.getType() == Schema.Type.STRING) {
return new GenericData.EnumSymbol(newSchema, oldRecord);
}
return oldRecord;
case ARRAY:
ValidationUtils.checkArgument(oldRecord instanceof Collection, "cannot rewrite record with different type");
if (!(oldRecord instanceof Collection)) {
throw new SchemaCompatibilityException(String.format("Cannot rewrite %s as an array", oldRecord.getClass().getName()));
}
Collection array = (Collection) oldRecord;
List<Object> newArray = new ArrayList<>(array.size());
fieldNames.push("element");
Expand All @@ -981,7 +986,9 @@ private static Object rewriteRecordWithNewSchemaInternal(Object oldRecord, Schem
fieldNames.pop();
return newArray;
case MAP:
ValidationUtils.checkArgument(oldRecord instanceof Map, "cannot rewrite record with different type");
if (!(oldRecord instanceof Map)) {
throw new SchemaCompatibilityException(String.format("Cannot rewrite %s as a map", oldRecord.getClass().getName()));
}
Map<Object, Object> map = (Map<Object, Object>) oldRecord;
Map<Object, Object> newMap = new HashMap<>(map.size(), 1.0f);
fieldNames.push("value");
Expand Down Expand Up @@ -1029,7 +1036,7 @@ private static Object rewritePrimaryType(Object oldValue, Schema oldSchema, Sche
BigDecimal bd = new BigDecimal(new BigInteger(bytes), decimal.getScale()).setScale(((Decimal) newSchema.getLogicalType()).getScale());
return DECIMAL_CONVERSION.toFixed(bd, newSchema, newSchema.getLogicalType());
} else {
throw new UnsupportedOperationException("Fixed type size change is not currently supported");
throw new HoodieAvroSchemaException("Fixed type size change is not currently supported");
}
}

Expand All @@ -1045,7 +1052,7 @@ private static Object rewritePrimaryType(Object oldValue, Schema oldSchema, Sche
}

default:
throw new AvroRuntimeException("Unknown schema type: " + newSchema.getType());
throw new HoodieAvroSchemaException("Unknown schema type: " + newSchema.getType());
}
} else {
return rewritePrimaryTypeWithDiffSchemaType(oldValue, oldSchema, newSchema);
Expand Down Expand Up @@ -1130,7 +1137,7 @@ private static Object rewritePrimaryTypeWithDiffSchemaType(Object oldValue, Sche
break;
default:
}
throw new AvroRuntimeException(String.format("cannot support rewrite value for schema type: %s since the old schema type is: %s", newSchema, oldSchema));
throw new HoodieAvroSchemaException(String.format("cannot support rewrite value for schema type: %s since the old schema type is: %s", newSchema, oldSchema));
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.hudi.exception;

/**
* Thrown when record schema will violate avro rules
*/
public class HoodieAvroSchemaException extends SchemaCompatibilityException {
public HoodieAvroSchemaException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.hudi.exception;

/**
* Exception thrown during hoodie record construction
*/
public class HoodieRecordCreationException extends HoodieException {

public HoodieRecordCreationException(String message, Throwable t) {
super(message, t);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ import org.apache.hudi.common.util.{CommitUtils, StringUtils, Option => HOption}
import org.apache.hudi.config.HoodieBootstrapConfig.{BASE_PATH, INDEX_CLASS_NAME}
import org.apache.hudi.config.HoodieWriteConfig.SPARK_SQL_MERGE_INTO_PREPPED_KEY
import org.apache.hudi.config.{HoodieCompactionConfig, HoodieInternalConfig, HoodieWriteConfig}
import org.apache.hudi.exception.{HoodieException, HoodieWriteConflictException}
import org.apache.hudi.exception.{HoodieException, HoodieRecordCreationException, HoodieWriteConflictException}
import org.apache.hudi.hive.{HiveSyncConfigHolder, HiveSyncTool}
import org.apache.hudi.internal.schema.InternalSchema
import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
Expand All @@ -78,6 +78,7 @@ import java.util.function.BiConsumer
import scala.collection.JavaConversions._
import scala.collection.JavaConverters.setAsJavaSetConverter
import scala.collection.mutable
import scala.util.{Failure, Success, Try}

object HoodieSparkSqlWriter {

Expand Down Expand Up @@ -478,10 +479,16 @@ class HoodieSparkSqlWriterInternal {
}
instantTime = client.createNewInstantTime()
// Convert to RDD[HoodieRecord]
val hoodieRecords =
HoodieCreateRecordUtils.createHoodieRecordRdd(HoodieCreateRecordUtils.createHoodieRecordRddArgs(df,
writeConfig, parameters, avroRecordName, avroRecordNamespace, writerSchema,
processedDataSchema, operation, instantTime, preppedSparkSqlWrites, preppedSparkSqlMergeInto, preppedWriteOperation))
try {

}
val hoodieRecords = Try(HoodieCreateRecordUtils.createHoodieRecordRdd(
HoodieCreateRecordUtils.createHoodieRecordRddArgs(df, writeConfig, parameters, avroRecordName,
avroRecordNamespace, writerSchema, processedDataSchema, operation, instantTime, preppedSparkSqlWrites,
preppedSparkSqlMergeInto, preppedWriteOperation))) match {
case Success(recs) => recs
case Failure(e) => throw new HoodieRecordCreationException("Failed to create Hoodie Spark Record", e)
}

val dedupedHoodieRecords =
if (hoodieConfig.getBoolean(INSERT_DROP_DUPS) && operation != WriteOperationType.INSERT_OVERWRITE_TABLE && operation != WriteOperationType.INSERT_OVERWRITE) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.CloseableMappingIterator;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieKeyException;
import org.apache.hudi.exception.HoodieKeyGeneratorException;
import org.apache.hudi.exception.HoodieRecordCreationException;
import org.apache.hudi.keygen.BuiltinKeyGenerator;
import org.apache.hudi.keygen.KeyGenUtils;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
Expand Down Expand Up @@ -105,7 +108,11 @@ public static Option<JavaRDD<HoodieRecord>> createHoodieRecords(HoodieStreamer.C
avroRecords.add(Either.left(new HoodieAvroRecord<>(hoodieKey, payload)));
} catch (Exception e) {
if (!shouldErrorTable) {
throw e;
if (!(e instanceof HoodieKeyException || e instanceof HoodieKeyGeneratorException)) {
throw new HoodieRecordCreationException("Failed to create Hoodie Avro Record", e);
} else {
throw e;
}
}
avroRecords.add(generateErrorRecord(genRec));
}
Expand Down Expand Up @@ -136,7 +143,11 @@ public static Option<JavaRDD<HoodieRecord>> createHoodieRecords(HoodieStreamer.C
HoodieInternalRowUtils.getCachedUnsafeProjection(baseStructType, targetStructType).apply(row), targetStructType, false));
} catch (Exception e) {
if (!shouldErrorTable) {
throw e;
if (!(e instanceof HoodieKeyException || e instanceof HoodieKeyGeneratorException)) {
throw new HoodieRecordCreationException("Failed to create Hoodie Spark Record", e);
} else {
throw e;
}
}
return generateErrorRecord(rec);
}
Expand Down