From 59166308b666ed454d663df4b82c983b43d5d515 Mon Sep 17 00:00:00 2001 From: Jonathan Vexler <=> Date: Wed, 28 Feb 2024 15:40:37 -0500 Subject: [PATCH 01/14] make exceptions more specific --- .../org/apache/hudi/AvroConversionUtils.scala | 17 +++++-- .../org/apache/hudi/HoodieSparkUtils.scala | 4 +- .../org/apache/hudi/avro/AvroSchemaUtils.java | 8 ++-- .../org/apache/hudi/avro/HoodieAvroUtils.java | 47 ++++++++++++------- .../HoodieRecordCreationException.java | 30 ++++++++++++ .../apache/hudi/HoodieSparkSqlWriter.scala | 17 +++++-- .../streamer/HoodieStreamerUtils.java | 15 +++++- 7 files changed, 106 insertions(+), 32 deletions(-) create mode 100644 hudi-common/src/main/java/org/apache/hudi/exception/HoodieRecordCreationException.java diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala index 55877938f8cb5..954b5a78c7f93 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala @@ -23,6 +23,7 @@ import org.apache.avro.generic.GenericRecord import org.apache.avro.{JsonProperties, Schema} import org.apache.hudi.HoodieSparkUtils.sparkAdapter import org.apache.hudi.avro.AvroSchemaUtils +import org.apache.hudi.exception.SchemaCompatibilityException import org.apache.hudi.internal.schema.HoodieSchemaException import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow @@ -58,9 +59,19 @@ object AvroConversionUtils { */ def createInternalRowToAvroConverter(rootCatalystType: StructType, rootAvroType: Schema, nullable: Boolean): InternalRow => GenericRecord = { val serializer = sparkAdapter.createAvroSerializer(rootCatalystType, rootAvroType, nullable) - row => serializer - .serialize(row) - .asInstanceOf[GenericRecord] + row => { + try { + serializer + .serialize(row) + .asInstanceOf[GenericRecord] + } catch { + case e: HoodieSchemaException => throw e + case e => throw new SchemaCompatibilityException("Failed to convert spark record into avro record", e) + } + + } + + } /** diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala index 975135c13d586..436dea2a71e54 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala @@ -23,10 +23,12 @@ import org.apache.hudi.avro.{AvroSchemaUtils, HoodieAvroUtils} import org.apache.hudi.client.utils.SparkRowSerDe import org.apache.hudi.common.model.HoodieRecord import org.apache.hudi.hadoop.fs.CachingPath - import org.apache.avro.Schema import org.apache.avro.generic.GenericRecord import org.apache.hadoop.fs.Path +import org.apache.hudi.exception.SchemaCompatibilityException +import org.apache.hudi.internal.schema +import org.apache.hudi.internal.schema.HoodieSchemaException import org.apache.spark.SPARK_VERSION import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java index b0489d75ae012..4cfa9fa7f1dd9 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java @@ -20,8 +20,8 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.SchemaCompatibilityException; +import org.apache.hudi.internal.schema.HoodieSchemaException; -import org.apache.avro.AvroRuntimeException; import org.apache.avro.Schema; import org.apache.avro.SchemaCompatibility; @@ -337,7 +337,7 @@ public static Schema resolveUnionSchema(Schema schema, String fieldSchemaFullNam .orElse(null); if (nonNullType == null) { - throw new AvroRuntimeException( + throw new HoodieSchemaException( String.format("Unsupported Avro UNION type %s: Only UNION of a null type and a non-null type is supported", schema)); } @@ -369,14 +369,14 @@ public static Schema resolveNullableSchema(Schema schema) { List innerTypes = schema.getTypes(); if (innerTypes.size() != 2) { - throw new AvroRuntimeException( + throw new HoodieSchemaException( String.format("Unsupported Avro UNION type %s: Only UNION of a null type and a non-null type is supported", schema)); } Schema firstInnerType = innerTypes.get(0); Schema secondInnerType = innerTypes.get(1); if ((firstInnerType.getType() != Schema.Type.NULL && secondInnerType.getType() != Schema.Type.NULL) || (firstInnerType.getType() == Schema.Type.NULL && secondInnerType.getType() == Schema.Type.NULL)) { - throw new AvroRuntimeException( + throw new HoodieSchemaException( String.format("Unsupported Avro UNION type %s: Only UNION of a null type and a non-null type is supported", schema)); } return firstInnerType.getType() == Schema.Type.NULL ? secondInnerType : firstInnerType; diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java index b352099cb1e11..aef6af6edb91e 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java @@ -39,6 +39,7 @@ import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.SchemaCompatibilityException; +import org.apache.hudi.internal.schema.HoodieSchemaException; import org.apache.hudi.util.Lazy; import org.apache.avro.AvroRuntimeException; @@ -917,21 +918,29 @@ private static Object rewriteRecordWithNewSchema(Object oldRecord, Schema oldAvr if (oldRecord == null) { return null; } - // try to get real schema for union type - Schema oldSchema = getActualSchemaFromUnion(oldAvroSchema, oldRecord); - Object newRecord = rewriteRecordWithNewSchemaInternal(oldRecord, oldSchema, newSchema, renameCols, fieldNames); - // validation is recursive so it only needs to be called on the original input - if (validate && !ConvertingGenericData.INSTANCE.validate(newSchema, newRecord)) { - throw new SchemaCompatibilityException( - "Unable to validate the rewritten record " + oldRecord + " against schema " + newSchema); + try { + // try to get real schema for union type + Schema oldSchema = getActualSchemaFromUnion(oldAvroSchema, oldRecord); + Object newRecord = rewriteRecordWithNewSchemaInternal(oldRecord, oldSchema, newSchema, renameCols, fieldNames); + // validation is recursive so it only needs to be called on the original input + if (validate && !ConvertingGenericData.INSTANCE.validate(newSchema, newRecord)) { + throw new SchemaCompatibilityException( + "Unable to validate the rewritten record " + oldRecord + " against schema " + newSchema); + } + return newRecord; + } catch (HoodieSchemaException e) { + throw e; + } catch (Exception e) { + throw new SchemaCompatibilityException("Failed to rewrite avro record to new schema", e); } - return newRecord; } private static Object rewriteRecordWithNewSchemaInternal(Object oldRecord, Schema oldSchema, Schema newSchema, Map renameCols, Deque fieldNames) { switch (newSchema.getType()) { case RECORD: - ValidationUtils.checkArgument(oldRecord instanceof IndexedRecord, "cannot rewrite record with different type"); + if (!(oldRecord instanceof IndexedRecord)) { + throw new SchemaCompatibilityException("cannot rewrite record with different type"); + } IndexedRecord indexedRecord = (IndexedRecord) oldRecord; List fields = newSchema.getFields(); GenericData.Record newRecord = new GenericData.Record(newSchema); @@ -963,15 +972,17 @@ private static Object rewriteRecordWithNewSchemaInternal(Object oldRecord, Schem } return newRecord; case ENUM: - ValidationUtils.checkArgument( - oldSchema.getType() == Schema.Type.STRING || oldSchema.getType() == Schema.Type.ENUM, - "Only ENUM or STRING type can be converted ENUM type"); + if (oldSchema.getType() != Schema.Type.STRING && oldSchema.getType() != Schema.Type.ENUM) { + throw new SchemaCompatibilityException("Only ENUM or STRING type can be converted ENUM type"); + } if (oldSchema.getType() == Schema.Type.STRING) { return new GenericData.EnumSymbol(newSchema, oldRecord); } return oldRecord; case ARRAY: - ValidationUtils.checkArgument(oldRecord instanceof Collection, "cannot rewrite record with different type"); + if (!(oldRecord instanceof Collection)) { + throw new SchemaCompatibilityException("cannot rewrite record with different type"); + } Collection array = (Collection) oldRecord; List newArray = new ArrayList<>(array.size()); fieldNames.push("element"); @@ -981,7 +992,9 @@ private static Object rewriteRecordWithNewSchemaInternal(Object oldRecord, Schem fieldNames.pop(); return newArray; case MAP: - ValidationUtils.checkArgument(oldRecord instanceof Map, "cannot rewrite record with different type"); + if (!(oldRecord instanceof Map)) { + throw new SchemaCompatibilityException("cannot rewrite record with different type"); + } Map map = (Map) oldRecord; Map newMap = new HashMap<>(map.size(), 1.0f); fieldNames.push("value"); @@ -1029,7 +1042,7 @@ private static Object rewritePrimaryType(Object oldValue, Schema oldSchema, Sche BigDecimal bd = new BigDecimal(new BigInteger(bytes), decimal.getScale()).setScale(((Decimal) newSchema.getLogicalType()).getScale()); return DECIMAL_CONVERSION.toFixed(bd, newSchema, newSchema.getLogicalType()); } else { - throw new UnsupportedOperationException("Fixed type size change is not currently supported"); + throw new SchemaCompatibilityException("Fixed type size change is not currently supported"); } } @@ -1045,7 +1058,7 @@ private static Object rewritePrimaryType(Object oldValue, Schema oldSchema, Sche } default: - throw new AvroRuntimeException("Unknown schema type: " + newSchema.getType()); + throw new HoodieSchemaException("Unknown schema type: " + newSchema.getType()); } } else { return rewritePrimaryTypeWithDiffSchemaType(oldValue, oldSchema, newSchema); @@ -1130,7 +1143,7 @@ private static Object rewritePrimaryTypeWithDiffSchemaType(Object oldValue, Sche break; default: } - throw new AvroRuntimeException(String.format("cannot support rewrite value for schema type: %s since the old schema type is: %s", newSchema, oldSchema)); + throw new HoodieSchemaException(String.format("cannot support rewrite value for schema type: %s since the old schema type is: %s", newSchema, oldSchema)); } /** diff --git a/hudi-common/src/main/java/org/apache/hudi/exception/HoodieRecordCreationException.java b/hudi-common/src/main/java/org/apache/hudi/exception/HoodieRecordCreationException.java new file mode 100644 index 0000000000000..032bb6e31722d --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/exception/HoodieRecordCreationException.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.exception; + +/** + * Exception thrown during hoodie record construction + */ +public class HoodieRecordCreationException extends HoodieException { + + public HoodieRecordCreationException(String message, Throwable t) { + super(message, t); + } +} diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 00ec59c5b8fd7..421903807ba14 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -51,7 +51,7 @@ import org.apache.hudi.common.util.{CommitUtils, StringUtils, Option => HOption} import org.apache.hudi.config.HoodieBootstrapConfig.{BASE_PATH, INDEX_CLASS_NAME} import org.apache.hudi.config.HoodieWriteConfig.SPARK_SQL_MERGE_INTO_PREPPED_KEY import org.apache.hudi.config.{HoodieCompactionConfig, HoodieInternalConfig, HoodieWriteConfig} -import org.apache.hudi.exception.{HoodieException, HoodieWriteConflictException} +import org.apache.hudi.exception.{HoodieException, HoodieRecordCreationException, HoodieWriteConflictException} import org.apache.hudi.hive.{HiveSyncConfigHolder, HiveSyncTool} import org.apache.hudi.internal.schema.InternalSchema import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter @@ -78,6 +78,7 @@ import java.util.function.BiConsumer import scala.collection.JavaConversions._ import scala.collection.JavaConverters.setAsJavaSetConverter import scala.collection.mutable +import scala.util.{Failure, Success, Try} object HoodieSparkSqlWriter { @@ -493,10 +494,16 @@ class HoodieSparkSqlWriterInternal { } instantTime = client.createNewInstantTime() // Convert to RDD[HoodieRecord] - val hoodieRecords = - HoodieCreateRecordUtils.createHoodieRecordRdd(HoodieCreateRecordUtils.createHoodieRecordRddArgs(df, - writeConfig, parameters, avroRecordName, avroRecordNamespace, writerSchema, - processedDataSchema, operation, instantTime, preppedSparkSqlWrites, preppedSparkSqlMergeInto, preppedWriteOperation)) + try { + + } + val hoodieRecords = Try(HoodieCreateRecordUtils.createHoodieRecordRdd( + HoodieCreateRecordUtils.createHoodieRecordRddArgs(df, writeConfig, parameters, avroRecordName, + avroRecordNamespace, writerSchema, processedDataSchema, operation, instantTime, preppedSparkSqlWrites, + preppedSparkSqlMergeInto, preppedWriteOperation))) match { + case Success(recs) => recs + case Failure(e) => throw new HoodieRecordCreationException("Failed to create Hoodie Spark Record", e) + } val dedupedHoodieRecords = if (hoodieConfig.getBoolean(INSERT_DROP_DUPS) && operation != WriteOperationType.INSERT_OVERWRITE_TABLE && operation != WriteOperationType.INSERT_OVERWRITE) { diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java index 90315bc97643c..5ffc9565d0d7d 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java @@ -36,6 +36,9 @@ import org.apache.hudi.common.util.collection.ClosableIterator; import org.apache.hudi.common.util.collection.CloseableMappingIterator; import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieKeyException; +import org.apache.hudi.exception.HoodieKeyGeneratorException; +import org.apache.hudi.exception.HoodieRecordCreationException; import org.apache.hudi.keygen.BuiltinKeyGenerator; import org.apache.hudi.keygen.KeyGenUtils; import org.apache.hudi.keygen.constant.KeyGeneratorOptions; @@ -105,7 +108,11 @@ public static Option> createHoodieRecords(HoodieStreamer.C avroRecords.add(Either.left(new HoodieAvroRecord<>(hoodieKey, payload))); } catch (Exception e) { if (!shouldErrorTable) { - throw e; + if (!(e instanceof HoodieKeyException || e instanceof HoodieKeyGeneratorException)) { + throw new HoodieRecordCreationException("Failed to create Hoodie Avro Record", e); + } else { + throw e; + } } avroRecords.add(generateErrorRecord(genRec)); } @@ -136,7 +143,11 @@ public static Option> createHoodieRecords(HoodieStreamer.C HoodieInternalRowUtils.getCachedUnsafeProjection(baseStructType, targetStructType).apply(row), targetStructType, false)); } catch (Exception e) { if (!shouldErrorTable) { - throw e; + if (!(e instanceof HoodieKeyException || e instanceof HoodieKeyGeneratorException)) { + throw new HoodieRecordCreationException("Failed to create Hoodie Spark Record", e); + } else { + throw e; + } } return generateErrorRecord(rec); } From a5ccaf5573250350c960d33dd750d4d9c8a6e690 Mon Sep 17 00:00:00 2001 From: Jonathan Vexler <=> Date: Thu, 7 Mar 2024 11:01:48 -0800 Subject: [PATCH 02/14] use hudi avro exception --- .../org/apache/hudi/HoodieSparkUtils.scala | 9 ++--- .../org/apache/hudi/avro/AvroSchemaUtils.java | 10 +++--- .../org/apache/hudi/avro/HoodieAvroUtils.java | 36 ++++++++----------- .../exception/HoodieAvroSchemaException.java | 29 +++++++++++++++ 4 files changed, 52 insertions(+), 32 deletions(-) create mode 100644 hudi-common/src/main/java/org/apache/hudi/exception/HoodieAvroSchemaException.java diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala index 97553d446479c..9919af4c2c625 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala @@ -18,17 +18,14 @@ package org.apache.hudi +import org.apache.avro.Schema +import org.apache.avro.generic.GenericRecord +import org.apache.hadoop.fs.Path import org.apache.hudi.HoodieConversionUtils.toScalaOption import org.apache.hudi.avro.{AvroSchemaUtils, HoodieAvroUtils} import org.apache.hudi.client.utils.SparkRowSerDe import org.apache.hudi.common.model.HoodieRecord import org.apache.hudi.hadoop.fs.CachingPath -import org.apache.avro.Schema -import org.apache.avro.generic.GenericRecord -import org.apache.hadoop.fs.Path -import org.apache.hudi.exception.SchemaCompatibilityException -import org.apache.hudi.internal.schema -import org.apache.hudi.internal.schema.HoodieSchemaException import org.apache.spark.SPARK_VERSION import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java index af4d5ba11d37f..ba747a63cbc00 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java @@ -19,11 +19,11 @@ package org.apache.hudi.avro; import org.apache.hudi.common.util.Option; +import org.apache.hudi.exception.HoodieAvroSchemaException; +import org.apache.hudi.exception.InvalidUnionTypeException; import org.apache.hudi.exception.MissingSchemaFieldException; import org.apache.hudi.exception.SchemaBackwardsCompatibilityException; import org.apache.hudi.exception.SchemaCompatibilityException; -import org.apache.hudi.exception.InvalidUnionTypeException; -import org.apache.hudi.internal.schema.HoodieSchemaException; import org.apache.avro.Schema; import org.apache.avro.SchemaCompatibility; @@ -317,7 +317,7 @@ public static Schema resolveUnionSchema(Schema schema, String fieldSchemaFullNam .orElse(null); if (nonNullType == null) { - throw new HoodieSchemaException( + throw new HoodieAvroSchemaException( String.format("Unsupported Avro UNION type %s: Only UNION of a null type and a non-null type is supported", schema)); } @@ -349,14 +349,14 @@ public static Schema resolveNullableSchema(Schema schema) { List innerTypes = schema.getTypes(); if (innerTypes.size() != 2) { - throw new HoodieSchemaException( + throw new HoodieAvroSchemaException( String.format("Unsupported Avro UNION type %s: Only UNION of a null type and a non-null type is supported", schema)); } Schema firstInnerType = innerTypes.get(0); Schema secondInnerType = innerTypes.get(1); if ((firstInnerType.getType() != Schema.Type.NULL && secondInnerType.getType() != Schema.Type.NULL) || (firstInnerType.getType() == Schema.Type.NULL && secondInnerType.getType() == Schema.Type.NULL)) { - throw new HoodieSchemaException( + throw new HoodieAvroSchemaException( String.format("Unsupported Avro UNION type %s: Only UNION of a null type and a non-null type is supported", schema)); } return firstInnerType.getType() == Schema.Type.NULL ? secondInnerType : firstInnerType; diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java index aef6af6edb91e..a7b3f5ae197b1 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java @@ -36,10 +36,10 @@ import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.exception.HoodieAvroSchemaException; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.SchemaCompatibilityException; -import org.apache.hudi.internal.schema.HoodieSchemaException; import org.apache.hudi.util.Lazy; import org.apache.avro.AvroRuntimeException; @@ -918,21 +918,15 @@ private static Object rewriteRecordWithNewSchema(Object oldRecord, Schema oldAvr if (oldRecord == null) { return null; } - try { - // try to get real schema for union type - Schema oldSchema = getActualSchemaFromUnion(oldAvroSchema, oldRecord); - Object newRecord = rewriteRecordWithNewSchemaInternal(oldRecord, oldSchema, newSchema, renameCols, fieldNames); - // validation is recursive so it only needs to be called on the original input - if (validate && !ConvertingGenericData.INSTANCE.validate(newSchema, newRecord)) { - throw new SchemaCompatibilityException( - "Unable to validate the rewritten record " + oldRecord + " against schema " + newSchema); - } - return newRecord; - } catch (HoodieSchemaException e) { - throw e; - } catch (Exception e) { - throw new SchemaCompatibilityException("Failed to rewrite avro record to new schema", e); + // try to get real schema for union type + Schema oldSchema = getActualSchemaFromUnion(oldAvroSchema, oldRecord); + Object newRecord = rewriteRecordWithNewSchemaInternal(oldRecord, oldSchema, newSchema, renameCols, fieldNames); + // validation is recursive so it only needs to be called on the original input + if (validate && !ConvertingGenericData.INSTANCE.validate(newSchema, newRecord)) { + throw new SchemaCompatibilityException( + "Unable to validate the rewritten record " + oldRecord + " against schema " + newSchema); } + return newRecord; } private static Object rewriteRecordWithNewSchemaInternal(Object oldRecord, Schema oldSchema, Schema newSchema, Map renameCols, Deque fieldNames) { @@ -973,7 +967,7 @@ private static Object rewriteRecordWithNewSchemaInternal(Object oldRecord, Schem return newRecord; case ENUM: if (oldSchema.getType() != Schema.Type.STRING && oldSchema.getType() != Schema.Type.ENUM) { - throw new SchemaCompatibilityException("Only ENUM or STRING type can be converted ENUM type"); + throw new SchemaCompatibilityException(String.format("Only ENUM or STRING type can be converted ENUM type. Schema type was %s", oldSchema.getType().getName())); } if (oldSchema.getType() == Schema.Type.STRING) { return new GenericData.EnumSymbol(newSchema, oldRecord); @@ -981,7 +975,7 @@ private static Object rewriteRecordWithNewSchemaInternal(Object oldRecord, Schem return oldRecord; case ARRAY: if (!(oldRecord instanceof Collection)) { - throw new SchemaCompatibilityException("cannot rewrite record with different type"); + throw new SchemaCompatibilityException(String.format("Cannot rewrite %s as an array", oldRecord.getClass().getName())); } Collection array = (Collection) oldRecord; List newArray = new ArrayList<>(array.size()); @@ -993,7 +987,7 @@ private static Object rewriteRecordWithNewSchemaInternal(Object oldRecord, Schem return newArray; case MAP: if (!(oldRecord instanceof Map)) { - throw new SchemaCompatibilityException("cannot rewrite record with different type"); + throw new SchemaCompatibilityException(String.format("Cannot rewrite %s as a map", oldRecord.getClass().getName())); } Map map = (Map) oldRecord; Map newMap = new HashMap<>(map.size(), 1.0f); @@ -1042,7 +1036,7 @@ private static Object rewritePrimaryType(Object oldValue, Schema oldSchema, Sche BigDecimal bd = new BigDecimal(new BigInteger(bytes), decimal.getScale()).setScale(((Decimal) newSchema.getLogicalType()).getScale()); return DECIMAL_CONVERSION.toFixed(bd, newSchema, newSchema.getLogicalType()); } else { - throw new SchemaCompatibilityException("Fixed type size change is not currently supported"); + throw new HoodieAvroSchemaException("Fixed type size change is not currently supported"); } } @@ -1058,7 +1052,7 @@ private static Object rewritePrimaryType(Object oldValue, Schema oldSchema, Sche } default: - throw new HoodieSchemaException("Unknown schema type: " + newSchema.getType()); + throw new HoodieAvroSchemaException("Unknown schema type: " + newSchema.getType()); } } else { return rewritePrimaryTypeWithDiffSchemaType(oldValue, oldSchema, newSchema); @@ -1143,7 +1137,7 @@ private static Object rewritePrimaryTypeWithDiffSchemaType(Object oldValue, Sche break; default: } - throw new HoodieSchemaException(String.format("cannot support rewrite value for schema type: %s since the old schema type is: %s", newSchema, oldSchema)); + throw new HoodieAvroSchemaException(String.format("cannot support rewrite value for schema type: %s since the old schema type is: %s", newSchema, oldSchema)); } /** diff --git a/hudi-common/src/main/java/org/apache/hudi/exception/HoodieAvroSchemaException.java b/hudi-common/src/main/java/org/apache/hudi/exception/HoodieAvroSchemaException.java new file mode 100644 index 0000000000000..ebdd1d416caa3 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/exception/HoodieAvroSchemaException.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.exception; + +/** + * Thrown when record schema will violate avro rules + */ +public class HoodieAvroSchemaException extends SchemaCompatibilityException { + public HoodieAvroSchemaException(String message) { + super(message); + } +} From ce3b20807d54e358b4997dd64214ae00f8016264 Mon Sep 17 00:00:00 2001 From: Jonathan Vexler <=> Date: Wed, 13 Mar 2024 14:47:30 -0400 Subject: [PATCH 03/14] Address review comments --- .../apache/hudi/HoodieSparkSqlWriter.scala | 3 -- .../streamer/HoodieStreamerUtils.java | 29 ++++++++----------- 2 files changed, 12 insertions(+), 20 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 689547ca30782..98c7c5d29f565 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -479,9 +479,6 @@ class HoodieSparkSqlWriterInternal { } instantTime = client.createNewInstantTime() // Convert to RDD[HoodieRecord] - try { - - } val hoodieRecords = Try(HoodieCreateRecordUtils.createHoodieRecordRdd( HoodieCreateRecordUtils.createHoodieRecordRddArgs(df, writeConfig, parameters, avroRecordName, avroRecordNamespace, writerSchema, processedDataSchema, operation, instantTime, preppedSparkSqlWrites, diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java index 5ffc9565d0d7d..61d7793e6ad03 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java @@ -107,14 +107,7 @@ public static Option> createHoodieRecords(HoodieStreamer.C : DataSourceUtils.createPayload(cfg.payloadClassName, gr); avroRecords.add(Either.left(new HoodieAvroRecord<>(hoodieKey, payload))); } catch (Exception e) { - if (!shouldErrorTable) { - if (!(e instanceof HoodieKeyException || e instanceof HoodieKeyGeneratorException)) { - throw new HoodieRecordCreationException("Failed to create Hoodie Avro Record", e); - } else { - throw e; - } - } - avroRecords.add(generateErrorRecord(genRec)); + avroRecords.add(generateErrorRecordOrThrowException(genRec, e, shouldErrorTable)); } } return avroRecords.iterator(); @@ -142,14 +135,7 @@ public static Option> createHoodieRecords(HoodieStreamer.C return Either.left(new HoodieSparkRecord(new HoodieKey(recordKey, partitionPath), HoodieInternalRowUtils.getCachedUnsafeProjection(baseStructType, targetStructType).apply(row), targetStructType, false)); } catch (Exception e) { - if (!shouldErrorTable) { - if (!(e instanceof HoodieKeyException || e instanceof HoodieKeyGeneratorException)) { - throw new HoodieRecordCreationException("Failed to create Hoodie Spark Record", e); - } else { - throw e; - } - } - return generateErrorRecord(rec); + return generateErrorRecordOrThrowException(rec, e, shouldErrorTable); } }); @@ -170,7 +156,16 @@ public static Option> createHoodieRecords(HoodieStreamer.C * @return the representation of error record (empty {@link HoodieRecord} and the error record * String) for writing to error table. */ - private static Either generateErrorRecord(GenericRecord genRec) { + private static Either generateErrorRecordOrThrowException(GenericRecord genRec, Exception e, boolean shouldErrorTable) { + if (!shouldErrorTable) { + if (e instanceof HoodieKeyException) { + throw (HoodieKeyException) e; + } else if (e instanceof HoodieKeyGeneratorException) { + throw (HoodieKeyGeneratorException) e; + } else { + throw new HoodieRecordCreationException("Failed to create Hoodie Record", e); + } + } try { return Either.right(HoodieAvroUtils.avroToJsonString(genRec, false)); } catch (Exception ex) { From 5808ef26ae02bfa22dab10f06f83bd949192e9ac Mon Sep 17 00:00:00 2001 From: Jonathan Vexler <=> Date: Thu, 14 Mar 2024 11:02:12 -0400 Subject: [PATCH 04/14] fix unnecessary changes --- .../src/main/scala/org/apache/hudi/HoodieSparkUtils.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala index 9919af4c2c625..03d977f6fc9b3 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala @@ -18,14 +18,15 @@ package org.apache.hudi -import org.apache.avro.Schema -import org.apache.avro.generic.GenericRecord -import org.apache.hadoop.fs.Path import org.apache.hudi.HoodieConversionUtils.toScalaOption import org.apache.hudi.avro.{AvroSchemaUtils, HoodieAvroUtils} import org.apache.hudi.client.utils.SparkRowSerDe import org.apache.hudi.common.model.HoodieRecord import org.apache.hudi.hadoop.fs.CachingPath + +import org.apache.avro.Schema +import org.apache.avro.generic.GenericRecord +import org.apache.hadoop.fs.Path import org.apache.spark.SPARK_VERSION import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD From 6369eb583473d6c632a44ef9fdb0f0a834d4b0fd Mon Sep 17 00:00:00 2001 From: Jonathan Vexler <=> Date: Wed, 27 Mar 2024 10:50:55 -0400 Subject: [PATCH 05/14] add exception wrapping --- .../org/apache/hudi/HoodieSparkUtils.scala | 21 +++++++--- .../hudi/util/ExceptionWrappingIterator.scala | 38 +++++++++++++++++++ .../config/HoodieStreamerConfig.java | 7 ++++ .../hudi/utilities/sources/RowSource.java | 8 +++- .../streamer/SourceFormatAdapter.java | 8 +++- 5 files changed, 75 insertions(+), 7 deletions(-) create mode 100644 hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/ExceptionWrappingIterator.scala diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala index 03d977f6fc9b3..cd335bd4c6b88 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala @@ -18,25 +18,26 @@ package org.apache.hudi +import org.apache.avro.Schema +import org.apache.avro.generic.GenericRecord +import org.apache.hadoop.fs.Path import org.apache.hudi.HoodieConversionUtils.toScalaOption import org.apache.hudi.avro.{AvroSchemaUtils, HoodieAvroUtils} import org.apache.hudi.client.utils.SparkRowSerDe +import org.apache.hudi.common.config.TypedProperties import org.apache.hudi.common.model.HoodieRecord import org.apache.hudi.hadoop.fs.CachingPath - -import org.apache.avro.Schema -import org.apache.avro.generic.GenericRecord -import org.apache.hadoop.fs.Path +import org.apache.hudi.util.ExceptionWrappingIterator import org.apache.spark.SPARK_VERSION import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD -import org.apache.spark.sql.DataFrame import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.util.DateTimeUtils.getTimeZone import org.apache.spark.sql.execution.SQLConfInjectingRDD import org.apache.spark.sql.execution.datasources.SparkParsePartitionUtil import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{StringType, StructField, StructType} +import org.apache.spark.sql.{DataFrame, HoodieUnsafeUtils} import org.apache.spark.unsafe.types.UTF8String import scala.collection.JavaConverters._ @@ -131,6 +132,16 @@ object HoodieSparkUtils extends SparkAdapterSupport with SparkVersionsSupport wi def injectSQLConf[T: ClassTag](rdd: RDD[T], conf: SQLConf): RDD[T] = new SQLConfInjectingRDD(rdd, conf) + def maybeWrapDataFrameWithException(df: DataFrame, exceptionClass: String, msg: String, shouldWrap: Boolean): DataFrame = { + if (shouldWrap) { + HoodieUnsafeUtils.createDataFrameFromRDD(df.sparkSession, injectSQLConf(df.queryExecution.toRdd.mapPartitions { + rows => new ExceptionWrappingIterator[InternalRow](rows, exceptionClass, msg) + }, SQLConf.get), df.schema) + } else { + df + } + } + def safeCreateRDD(df: DataFrame, structName: String, recordNamespace: String, reconcileToLatestSchema: Boolean, latestTableSchema: org.apache.hudi.common.util.Option[Schema] = org.apache.hudi.common.util.Option.empty()): Tuple2[RDD[GenericRecord], RDD[String]] = { diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/ExceptionWrappingIterator.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/ExceptionWrappingIterator.scala new file mode 100644 index 0000000000000..04c3fb28c046f --- /dev/null +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/ExceptionWrappingIterator.scala @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.util + +import org.apache.hudi.common.util.ReflectionUtils + +class ExceptionWrappingIterator[T](var in: Iterator[T], var exceptionClass: String, var msg: String) extends Iterator[T] { + override def hasNext: Boolean = try in.hasNext + catch { + case e: Throwable => throw createException(e) + } + + override def next: T = try in.next + catch { + case e: Throwable => throw createException(e) + } + + private def createException(e: Throwable): Throwable = { + ReflectionUtils.loadClass(exceptionClass, Array(classOf[String], classOf[Throwable]).asInstanceOf[Array[Class[_]]], msg, e).asInstanceOf[Throwable] + } +} diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieStreamerConfig.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieStreamerConfig.java index b3b64cff905b6..de29b3f8598e1 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieStreamerConfig.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieStreamerConfig.java @@ -132,4 +132,11 @@ public class HoodieStreamerConfig extends HoodieConfig { .sinceVersion("0.14.0") .withDocumentation("Number of records to sample from the first write. To improve the estimation's accuracy, " + "for smaller or more compressable record size, set the sample size bigger. For bigger or less compressable record size, set smaller."); + + public static final ConfigProperty EXTRA_ROW_SOURCE_EXCEPTIONS = ConfigProperty + .key(STREAMER_CONFIG_PREFIX + "extra.row.source.exceptions") + .defaultValue(false) + .markAdvanced() + .sinceVersion("0.15.0") + .withDocumentation("Reads from source to row format will have the dataframe wrapped with an exception handler"); } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/RowSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/RowSource.java index f2cc48f280c0d..bdb135f79c335 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/RowSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/RowSource.java @@ -18,10 +18,12 @@ package org.apache.hudi.utilities.sources; +import org.apache.hudi.HoodieSparkUtils; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.utilities.UtilHelpers; +import org.apache.hudi.utilities.exception.HoodieReadFromSourceException; import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.hudi.utilities.sources.helpers.SanitizationUtils; @@ -30,6 +32,8 @@ import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; +import static org.apache.hudi.utilities.config.HoodieStreamerConfig.EXTRA_ROW_SOURCE_EXCEPTIONS; + public abstract class RowSource extends Source> { public RowSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession, @@ -46,7 +50,9 @@ protected final InputBatch> fetchNewData(Option lastCkptStr Dataset sanitizedRows = SanitizationUtils.sanitizeColumnNamesForAvro(dsr, props); SchemaProvider rowSchemaProvider = UtilHelpers.createRowBasedSchemaProvider(sanitizedRows.schema(), props, sparkContext); - return new InputBatch<>(Option.of(sanitizedRows), res.getValue(), rowSchemaProvider); + Dataset wrappedDf = HoodieSparkUtils.maybeWrapDataFrameWithException(sanitizedRows, HoodieReadFromSourceException.class.getName(), + "Failed to read from row source", props.getBoolean(EXTRA_ROW_SOURCE_EXCEPTIONS.key(), EXTRA_ROW_SOURCE_EXCEPTIONS.defaultValue())); + return new InputBatch<>(Option.of(wrappedDf), res.getValue(), rowSchemaProvider); }).orElseGet(() -> new InputBatch<>(res.getKey(), res.getValue())); } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceFormatAdapter.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceFormatAdapter.java index f29404701db97..c01a944b7ad90 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceFormatAdapter.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceFormatAdapter.java @@ -25,6 +25,7 @@ import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.exception.SchemaCompatibilityException; import org.apache.hudi.utilities.UtilHelpers; import org.apache.hudi.utilities.schema.FilebasedSchemaProvider; import org.apache.hudi.utilities.schema.SchemaProvider; @@ -53,6 +54,7 @@ import scala.util.Either; +import static org.apache.hudi.utilities.config.HoodieStreamerConfig.EXTRA_ROW_SOURCE_EXCEPTIONS; import static org.apache.hudi.utilities.config.HoodieStreamerConfig.SANITIZE_SCHEMA_FIELD_NAMES; import static org.apache.hudi.utilities.config.HoodieStreamerConfig.SCHEMA_FIELD_NAME_INVALID_CHAR_MASK; import static org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE; @@ -66,6 +68,8 @@ public final class SourceFormatAdapter implements Closeable { private final Source source; private boolean shouldSanitize = SANITIZE_SCHEMA_FIELD_NAMES.defaultValue(); + + private boolean wrapWithException = EXTRA_ROW_SOURCE_EXCEPTIONS.defaultValue(); private String invalidCharMask = SCHEMA_FIELD_NAME_INVALID_CHAR_MASK.defaultValue(); private Option errorTableWriter = Option.empty(); @@ -80,6 +84,7 @@ public SourceFormatAdapter(Source source, Option errorTabl if (props.isPresent()) { this.shouldSanitize = SanitizationUtils.shouldSanitize(props.get()); this.invalidCharMask = SanitizationUtils.getInvalidCharMask(props.get()); + this.wrapWithException = props.get().getBoolean(EXTRA_ROW_SOURCE_EXCEPTIONS.key(), EXTRA_ROW_SOURCE_EXCEPTIONS.defaultValue()); } if (this.shouldSanitize && source.getSourceType() == Source.SourceType.PROTO) { throw new IllegalArgumentException("PROTO cannot be sanitized"); @@ -244,7 +249,8 @@ public InputBatch> fetchNewDataInRowFormat(Option lastCkptS StructType dataType = AvroConversionUtils.convertAvroSchemaToStructType(sourceSchema); return new InputBatch<>( Option.ofNullable( - r.getBatch().map(rdd -> source.getSparkSession().read().schema(dataType).json(rdd)).orElse(null)), + r.getBatch().map(rdd -> HoodieSparkUtils.maybeWrapDataFrameWithException(source.getSparkSession().read().schema(dataType).json(rdd), + SchemaCompatibilityException.class.getName(), "Schema does not match json data", wrapWithException)).orElse(null)), r.getCheckpointForNextBatch(), r.getSchemaProvider()); } } From 42d6607ad24c2184d3b36191b74ef0aac8bc43f1 Mon Sep 17 00:00:00 2001 From: Jonathan Vexler <=> Date: Wed, 27 Mar 2024 11:36:49 -0400 Subject: [PATCH 06/14] style --- .../src/main/scala/org/apache/hudi/AvroConversionUtils.scala | 3 --- 1 file changed, 3 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala index 954b5a78c7f93..95962d1ca4437 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala @@ -68,10 +68,7 @@ object AvroConversionUtils { case e: HoodieSchemaException => throw e case e => throw new SchemaCompatibilityException("Failed to convert spark record into avro record", e) } - } - - } /** From 90a3006725e3030d185a314bbb58729dd345e654 Mon Sep 17 00:00:00 2001 From: Jonathan Vexler <=> Date: Thu, 28 Mar 2024 12:56:03 -0400 Subject: [PATCH 07/14] address review comments --- .../org/apache/hudi/util/ExceptionWrappingIterator.scala | 2 +- .../apache/hudi/utilities/config/HoodieStreamerConfig.java | 6 +++--- .../java/org/apache/hudi/utilities/sources/RowSource.java | 5 +++-- .../hudi/utilities/streamer/SourceFormatAdapter.java | 7 ++++--- 4 files changed, 11 insertions(+), 9 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/ExceptionWrappingIterator.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/ExceptionWrappingIterator.scala index 04c3fb28c046f..b882446000f36 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/ExceptionWrappingIterator.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/ExceptionWrappingIterator.scala @@ -21,7 +21,7 @@ package org.apache.hudi.util import org.apache.hudi.common.util.ReflectionUtils -class ExceptionWrappingIterator[T](var in: Iterator[T], var exceptionClass: String, var msg: String) extends Iterator[T] { +class ExceptionWrappingIterator[T](val in: Iterator[T], val exceptionClass: String, val msg: String) extends Iterator[T] { override def hasNext: Boolean = try in.hasNext catch { case e: Throwable => throw createException(e) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieStreamerConfig.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieStreamerConfig.java index de29b3f8598e1..1983774a1de9d 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieStreamerConfig.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieStreamerConfig.java @@ -133,10 +133,10 @@ public class HoodieStreamerConfig extends HoodieConfig { .withDocumentation("Number of records to sample from the first write. To improve the estimation's accuracy, " + "for smaller or more compressable record size, set the sample size bigger. For bigger or less compressable record size, set smaller."); - public static final ConfigProperty EXTRA_ROW_SOURCE_EXCEPTIONS = ConfigProperty - .key(STREAMER_CONFIG_PREFIX + "extra.row.source.exceptions") + public static final ConfigProperty ROW_THROW_EXPLICIT_EXCEPTIONS = ConfigProperty + .key(STREAMER_CONFIG_PREFIX + ".row.throw.explicit.exceptions") .defaultValue(false) .markAdvanced() .sinceVersion("0.15.0") - .withDocumentation("Reads from source to row format will have the dataframe wrapped with an exception handler"); + .withDocumentation("When enabled, the dataframe generated from reading source data is wrapped with an exception handler to explicitly surface exceptions."); } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/RowSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/RowSource.java index bdb135f79c335..1c7e9d9909889 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/RowSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/RowSource.java @@ -20,6 +20,7 @@ import org.apache.hudi.HoodieSparkUtils; import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.util.ConfigUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.utilities.UtilHelpers; @@ -32,7 +33,7 @@ import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; -import static org.apache.hudi.utilities.config.HoodieStreamerConfig.EXTRA_ROW_SOURCE_EXCEPTIONS; +import static org.apache.hudi.utilities.config.HoodieStreamerConfig.ROW_THROW_EXPLICIT_EXCEPTIONS; public abstract class RowSource extends Source> { @@ -51,7 +52,7 @@ protected final InputBatch> fetchNewData(Option lastCkptStr SchemaProvider rowSchemaProvider = UtilHelpers.createRowBasedSchemaProvider(sanitizedRows.schema(), props, sparkContext); Dataset wrappedDf = HoodieSparkUtils.maybeWrapDataFrameWithException(sanitizedRows, HoodieReadFromSourceException.class.getName(), - "Failed to read from row source", props.getBoolean(EXTRA_ROW_SOURCE_EXCEPTIONS.key(), EXTRA_ROW_SOURCE_EXCEPTIONS.defaultValue())); + "Failed to read from row source", ConfigUtils.getBooleanWithAltKeys(props, ROW_THROW_EXPLICIT_EXCEPTIONS)); return new InputBatch<>(Option.of(wrappedDf), res.getValue(), rowSchemaProvider); }).orElseGet(() -> new InputBatch<>(res.getKey(), res.getValue())); } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceFormatAdapter.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceFormatAdapter.java index c01a944b7ad90..d9cc4b8c256bd 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceFormatAdapter.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceFormatAdapter.java @@ -23,6 +23,7 @@ import org.apache.hudi.HoodieSparkUtils; import org.apache.hudi.avro.MercifulJsonConverter; import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.util.ConfigUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.SchemaCompatibilityException; @@ -54,7 +55,7 @@ import scala.util.Either; -import static org.apache.hudi.utilities.config.HoodieStreamerConfig.EXTRA_ROW_SOURCE_EXCEPTIONS; +import static org.apache.hudi.utilities.config.HoodieStreamerConfig.ROW_THROW_EXPLICIT_EXCEPTIONS; import static org.apache.hudi.utilities.config.HoodieStreamerConfig.SANITIZE_SCHEMA_FIELD_NAMES; import static org.apache.hudi.utilities.config.HoodieStreamerConfig.SCHEMA_FIELD_NAME_INVALID_CHAR_MASK; import static org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE; @@ -69,7 +70,7 @@ public final class SourceFormatAdapter implements Closeable { private final Source source; private boolean shouldSanitize = SANITIZE_SCHEMA_FIELD_NAMES.defaultValue(); - private boolean wrapWithException = EXTRA_ROW_SOURCE_EXCEPTIONS.defaultValue(); + private boolean wrapWithException = ROW_THROW_EXPLICIT_EXCEPTIONS.defaultValue(); private String invalidCharMask = SCHEMA_FIELD_NAME_INVALID_CHAR_MASK.defaultValue(); private Option errorTableWriter = Option.empty(); @@ -84,7 +85,7 @@ public SourceFormatAdapter(Source source, Option errorTabl if (props.isPresent()) { this.shouldSanitize = SanitizationUtils.shouldSanitize(props.get()); this.invalidCharMask = SanitizationUtils.getInvalidCharMask(props.get()); - this.wrapWithException = props.get().getBoolean(EXTRA_ROW_SOURCE_EXCEPTIONS.key(), EXTRA_ROW_SOURCE_EXCEPTIONS.defaultValue()); + this.wrapWithException = ConfigUtils.getBooleanWithAltKeys(props.get(), ROW_THROW_EXPLICIT_EXCEPTIONS); } if (this.shouldSanitize && source.getSourceType() == Source.SourceType.PROTO) { throw new IllegalArgumentException("PROTO cannot be sanitized"); From 521ae79c05782ff553c945bc84c27afe33f8e52a Mon Sep 17 00:00:00 2001 From: Jonathan Vexler <=> Date: Thu, 28 Mar 2024 13:46:52 -0400 Subject: [PATCH 08/14] remove . from config --- .../org/apache/hudi/utilities/config/HoodieStreamerConfig.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieStreamerConfig.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieStreamerConfig.java index 1983774a1de9d..e50e7fa06124b 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieStreamerConfig.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieStreamerConfig.java @@ -134,7 +134,7 @@ public class HoodieStreamerConfig extends HoodieConfig { + "for smaller or more compressable record size, set the sample size bigger. For bigger or less compressable record size, set smaller."); public static final ConfigProperty ROW_THROW_EXPLICIT_EXCEPTIONS = ConfigProperty - .key(STREAMER_CONFIG_PREFIX + ".row.throw.explicit.exceptions") + .key(STREAMER_CONFIG_PREFIX + "row.throw.explicit.exceptions") .defaultValue(false) .markAdvanced() .sinceVersion("0.15.0") From 0e2e1d8ea5829905db3464a97593bb81231bbc08 Mon Sep 17 00:00:00 2001 From: Jonathan Vexler <=> Date: Mon, 1 Apr 2024 09:58:16 -0400 Subject: [PATCH 09/14] address review comments --- .../org/apache/hudi/HoodieSparkUtils.scala | 1 - .../exception/HoodieAvroSchemaException.java | 4 +- .../HoodieRecordCreationException.java | 4 +- .../utilities/sources/TestAvroDFSSource.java | 3 +- .../utilities/sources/TestCsvDFSSource.java | 3 +- .../utilities/sources/TestJsonDFSSource.java | 49 ++++++++++++++++++- .../sources/TestParquetDFSSource.java | 18 ++++++- .../sources/AbstractDFSSourceTestBase.java | 7 ++- 8 files changed, 77 insertions(+), 12 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala index cd335bd4c6b88..6de5de8842ea3 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala @@ -24,7 +24,6 @@ import org.apache.hadoop.fs.Path import org.apache.hudi.HoodieConversionUtils.toScalaOption import org.apache.hudi.avro.{AvroSchemaUtils, HoodieAvroUtils} import org.apache.hudi.client.utils.SparkRowSerDe -import org.apache.hudi.common.config.TypedProperties import org.apache.hudi.common.model.HoodieRecord import org.apache.hudi.hadoop.fs.CachingPath import org.apache.hudi.util.ExceptionWrappingIterator diff --git a/hudi-common/src/main/java/org/apache/hudi/exception/HoodieAvroSchemaException.java b/hudi-common/src/main/java/org/apache/hudi/exception/HoodieAvroSchemaException.java index ebdd1d416caa3..90799efdd4415 100644 --- a/hudi-common/src/main/java/org/apache/hudi/exception/HoodieAvroSchemaException.java +++ b/hudi-common/src/main/java/org/apache/hudi/exception/HoodieAvroSchemaException.java @@ -20,7 +20,9 @@ package org.apache.hudi.exception; /** - * Thrown when record schema will violate avro rules + * Thrown when we detect in Hudi code that a record schema + * will violate avro rules. This can happen even when using spark + * because we use avro schema internally */ public class HoodieAvroSchemaException extends SchemaCompatibilityException { public HoodieAvroSchemaException(String message) { diff --git a/hudi-common/src/main/java/org/apache/hudi/exception/HoodieRecordCreationException.java b/hudi-common/src/main/java/org/apache/hudi/exception/HoodieRecordCreationException.java index 032bb6e31722d..8d192f1947082 100644 --- a/hudi-common/src/main/java/org/apache/hudi/exception/HoodieRecordCreationException.java +++ b/hudi-common/src/main/java/org/apache/hudi/exception/HoodieRecordCreationException.java @@ -20,7 +20,9 @@ package org.apache.hudi.exception; /** - * Exception thrown during hoodie record construction + * Exception thrown during hoodie record construction for any failure that + * that is not a KeyGeneration Failure. An example of a failure would be if the + * record is malformed. */ public class HoodieRecordCreationException extends HoodieException { diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroDFSSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroDFSSource.java index 1cda910b707bf..0d8cfd0bf0af5 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroDFSSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroDFSSource.java @@ -39,8 +39,7 @@ public void setup() throws Exception { } @Override - protected Source prepareDFSSource() { - TypedProperties props = new TypedProperties(); + protected Source prepareDFSSource(TypedProperties props) { props.setProperty("hoodie.deltastreamer.source.dfs.root", dfsRoot); try { return new AvroDFSSource(props, jsc, sparkSession, schemaProvider); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestCsvDFSSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestCsvDFSSource.java index 8eaa1d95b2390..2f93a2a4f1233 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestCsvDFSSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestCsvDFSSource.java @@ -46,8 +46,7 @@ public void setup() throws Exception { } @Override - public Source prepareDFSSource() { - TypedProperties props = new TypedProperties(); + public Source prepareDFSSource(TypedProperties props) { props.setProperty("hoodie.deltastreamer.source.dfs.root", dfsRoot); props.setProperty("hoodie.deltastreamer.csv.header", Boolean.toString(true)); props.setProperty("hoodie.deltastreamer.csv.sep", "\t"); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonDFSSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonDFSSource.java index fde10b2d9a59b..4a40404abf60f 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonDFSSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonDFSSource.java @@ -20,15 +20,29 @@ import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.exception.SchemaCompatibilityException; +import org.apache.hudi.utilities.config.HoodieStreamerConfig; +import org.apache.hudi.utilities.streamer.SourceFormatAdapter; import org.apache.hudi.utilities.testutils.UtilitiesTestBase; import org.apache.hudi.utilities.testutils.sources.AbstractDFSSourceTestBase; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import java.io.IOException; +import java.io.PrintStream; import java.util.List; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + /** * Basic tests for {@link JsonDFSSource}. */ @@ -42,8 +56,7 @@ public void setup() throws Exception { } @Override - public Source prepareDFSSource() { - TypedProperties props = new TypedProperties(); + public Source prepareDFSSource(TypedProperties props) { props.setProperty("hoodie.deltastreamer.source.dfs.root", dfsRoot); return new JsonDFSSource(props, jsc, sparkSession, schemaProvider); } @@ -53,4 +66,36 @@ public void writeNewDataToFile(List records, Path path) throws IOE UtilitiesTestBase.Helpers.saveStringsToDFS( Helpers.jsonifyRecords(records), fs, path.toString()); } + + @Test + public void testCorruptedSourceFile() throws IOException { + fs.mkdirs(new Path(dfsRoot)); + TypedProperties props = new TypedProperties(); + props.setProperty(HoodieStreamerConfig.ROW_THROW_EXPLICIT_EXCEPTIONS.key(), "true"); + SourceFormatAdapter sourceFormatAdapter = new SourceFormatAdapter(prepareDFSSource(props), Option.empty(), Option.of(props)); + generateOneFile("1", "000", 10); + generateOneFile("2", "000", 10); + RemoteIterator files = fs.listFiles(generateOneFile("3", "000", 10), true); + + FileStatus file1Status = files.next(); + InputBatch> batch = sourceFormatAdapter.fetchNewDataInRowFormat(Option.empty(), Long.MAX_VALUE); + corruptFile(file1Status.getPath()); + assertTrue(batch.getBatch().isPresent()); + Throwable T = assertThrows(Exception.class, + () -> batch.getBatch().get().show(30)); + while (T != null) { + if (T instanceof SchemaCompatibilityException) { + return; + } + T = T.getCause(); + } + throw new AssertionError("Exception does not have SchemaCompatibility in its trace", T); + } + + protected void corruptFile(Path path) throws IOException { + PrintStream os = new PrintStream(fs.appendFile(path).build()); + os.println("🤷‍"); + os.flush(); + os.close(); + } } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestParquetDFSSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestParquetDFSSource.java index 44489037e823f..38729846c04c3 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestParquetDFSSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestParquetDFSSource.java @@ -20,14 +20,29 @@ import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.exception.SchemaCompatibilityException; +import org.apache.hudi.utilities.config.HoodieStreamerConfig; +import org.apache.hudi.utilities.streamer.SourceFormatAdapter; import org.apache.hudi.utilities.testutils.sources.AbstractDFSSourceTestBase; +import org.apache.avro.Schema; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import java.io.IOException; +import java.io.PrintStream; import java.util.List; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + /** * Basic tests for {@link ParquetDFSSource}. */ @@ -41,8 +56,7 @@ public void setup() throws Exception { } @Override - public Source prepareDFSSource() { - TypedProperties props = new TypedProperties(); + public Source prepareDFSSource(TypedProperties props) { props.setProperty("hoodie.deltastreamer.source.dfs.root", dfsRoot); return new ParquetDFSSource(props, jsc, sparkSession, schemaProvider); } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractDFSSourceTestBase.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractDFSSourceTestBase.java index 0de087ece73e0..76a1a64536708 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractDFSSourceTestBase.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractDFSSourceTestBase.java @@ -19,6 +19,7 @@ package org.apache.hudi.utilities.testutils.sources; import org.apache.hudi.AvroConversionUtils; +import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.util.Option; @@ -74,7 +75,11 @@ public void setup() throws Exception { * * @return A {@link Source} using DFS as the file system. */ - protected abstract Source prepareDFSSource(); + protected final Source prepareDFSSource() { + return prepareDFSSource(new TypedProperties()); + } + + protected abstract Source prepareDFSSource(TypedProperties props); /** * Writes test data, i.e., a {@link List} of {@link HoodieRecord}, to a file on DFS. From 26afcf35d3d1c7cae817e8fd506f5c20f70a2eb2 Mon Sep 17 00:00:00 2001 From: Jonathan Vexler <=> Date: Mon, 1 Apr 2024 10:01:16 -0400 Subject: [PATCH 10/14] fix merge --- .../utilities/sources/TestParquetDFSSource.java | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestParquetDFSSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestParquetDFSSource.java index 2b2a299fcb114..a9c448748c914 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestParquetDFSSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestParquetDFSSource.java @@ -20,29 +20,14 @@ import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.util.Option; -import org.apache.hudi.exception.SchemaCompatibilityException; -import org.apache.hudi.utilities.config.HoodieStreamerConfig; -import org.apache.hudi.utilities.streamer.SourceFormatAdapter; import org.apache.hudi.utilities.testutils.sources.AbstractDFSSourceTestBase; -import org.apache.avro.Schema; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.RemoteIterator; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Row; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; import java.io.IOException; -import java.io.PrintStream; import java.util.List; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; - /** * Basic tests for {@link ParquetDFSSource}. */ From 51380200fafd1b3917658c549ab3caa3e5a408f5 Mon Sep 17 00:00:00 2001 From: Jonathan Vexler <=> Date: Mon, 1 Apr 2024 10:06:53 -0400 Subject: [PATCH 11/14] fix checkstyle --- .../hudi/utilities/sources/TestJsonDFSSource.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonDFSSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonDFSSource.java index 9df773ca9d386..ae134e862beaf 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonDFSSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonDFSSource.java @@ -81,15 +81,15 @@ public void testCorruptedSourceFile() throws IOException { InputBatch> batch = sourceFormatAdapter.fetchNewDataInRowFormat(Option.empty(), Long.MAX_VALUE); corruptFile(file1Status.getPath()); assertTrue(batch.getBatch().isPresent()); - Throwable T = assertThrows(Exception.class, + Throwable t = assertThrows(Exception.class, () -> batch.getBatch().get().show(30)); - while (T != null) { - if (T instanceof SchemaCompatibilityException) { + while (t != null) { + if (t instanceof SchemaCompatibilityException) { return; } - T = T.getCause(); + t = t.getCause(); } - throw new AssertionError("Exception does not have SchemaCompatibility in its trace", T); + throw new AssertionError("Exception does not have SchemaCompatibility in its trace", t); } protected void corruptFile(Path path) throws IOException { From f7e2e1b6051c023a4a84643c3c20cf7c8e0587d9 Mon Sep 17 00:00:00 2001 From: Jon Vexler Date: Tue, 2 Apr 2024 21:31:15 -0400 Subject: [PATCH 12/14] Update hudi-common/src/main/java/org/apache/hudi/exception/HoodieRecordCreationException.java Co-authored-by: Y Ethan Guo --- .../apache/hudi/exception/HoodieRecordCreationException.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/exception/HoodieRecordCreationException.java b/hudi-common/src/main/java/org/apache/hudi/exception/HoodieRecordCreationException.java index 8d192f1947082..dec70b369dae0 100644 --- a/hudi-common/src/main/java/org/apache/hudi/exception/HoodieRecordCreationException.java +++ b/hudi-common/src/main/java/org/apache/hudi/exception/HoodieRecordCreationException.java @@ -20,8 +20,8 @@ package org.apache.hudi.exception; /** - * Exception thrown during hoodie record construction for any failure that - * that is not a KeyGeneration Failure. An example of a failure would be if the + * Exception thrown during HoodieRecord construction for any failure + * that is not a KeyGeneration failure. An example of a failure would be if the * record is malformed. */ public class HoodieRecordCreationException extends HoodieException { From ea630b524af999cfa7b71a35cc214ebeca12366f Mon Sep 17 00:00:00 2001 From: Jon Vexler Date: Tue, 2 Apr 2024 21:31:20 -0400 Subject: [PATCH 13/14] Update hudi-common/src/main/java/org/apache/hudi/exception/HoodieAvroSchemaException.java Co-authored-by: Y Ethan Guo --- .../org/apache/hudi/exception/HoodieAvroSchemaException.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/exception/HoodieAvroSchemaException.java b/hudi-common/src/main/java/org/apache/hudi/exception/HoodieAvroSchemaException.java index 90799efdd4415..c19c88c15c8b6 100644 --- a/hudi-common/src/main/java/org/apache/hudi/exception/HoodieAvroSchemaException.java +++ b/hudi-common/src/main/java/org/apache/hudi/exception/HoodieAvroSchemaException.java @@ -21,8 +21,8 @@ /** * Thrown when we detect in Hudi code that a record schema - * will violate avro rules. This can happen even when using spark - * because we use avro schema internally + * violates Avro rules. This can happen even when using Spark + * because we use Avro schema internally */ public class HoodieAvroSchemaException extends SchemaCompatibilityException { public HoodieAvroSchemaException(String message) { From 1770608bb63ae127c56ee13ecbab225a685d49e2 Mon Sep 17 00:00:00 2001 From: Jonathan Vexler <=> Date: Tue, 2 Apr 2024 21:35:15 -0400 Subject: [PATCH 14/14] add javadoc to exception wrapper --- .../org/apache/hudi/util/ExceptionWrappingIterator.scala | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/ExceptionWrappingIterator.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/ExceptionWrappingIterator.scala index b882446000f36..994e6f0eea2dc 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/ExceptionWrappingIterator.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/ExceptionWrappingIterator.scala @@ -21,6 +21,12 @@ package org.apache.hudi.util import org.apache.hudi.common.util.ReflectionUtils +/** + * Used to catch exceptions from an iterator + * @param in iterator to catch exceptions from + * @param exceptionClass name of exception class to throw when an exception is thrown during iteration + * @param msg message the thrown exception should have + */ class ExceptionWrappingIterator[T](val in: Iterator[T], val exceptionClass: String, val msg: String) extends Iterator[T] { override def hasNext: Boolean = try in.hasNext catch {