diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala index 55877938f8cb5..95962d1ca4437 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala @@ -23,6 +23,7 @@ import org.apache.avro.generic.GenericRecord import org.apache.avro.{JsonProperties, Schema} import org.apache.hudi.HoodieSparkUtils.sparkAdapter import org.apache.hudi.avro.AvroSchemaUtils +import org.apache.hudi.exception.SchemaCompatibilityException import org.apache.hudi.internal.schema.HoodieSchemaException import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow @@ -58,9 +59,16 @@ object AvroConversionUtils { */ def createInternalRowToAvroConverter(rootCatalystType: StructType, rootAvroType: Schema, nullable: Boolean): InternalRow => GenericRecord = { val serializer = sparkAdapter.createAvroSerializer(rootCatalystType, rootAvroType, nullable) - row => serializer - .serialize(row) - .asInstanceOf[GenericRecord] + row => { + try { + serializer + .serialize(row) + .asInstanceOf[GenericRecord] + } catch { + case e: HoodieSchemaException => throw e + case e => throw new SchemaCompatibilityException("Failed to convert spark record into avro record", e) + } + } } /** diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala index 03d977f6fc9b3..6de5de8842ea3 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala @@ -18,25 +18,25 @@ package org.apache.hudi +import org.apache.avro.Schema +import org.apache.avro.generic.GenericRecord +import org.apache.hadoop.fs.Path import org.apache.hudi.HoodieConversionUtils.toScalaOption import org.apache.hudi.avro.{AvroSchemaUtils, HoodieAvroUtils} import org.apache.hudi.client.utils.SparkRowSerDe import org.apache.hudi.common.model.HoodieRecord import org.apache.hudi.hadoop.fs.CachingPath - -import org.apache.avro.Schema -import org.apache.avro.generic.GenericRecord -import org.apache.hadoop.fs.Path +import org.apache.hudi.util.ExceptionWrappingIterator import org.apache.spark.SPARK_VERSION import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD -import org.apache.spark.sql.DataFrame import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.util.DateTimeUtils.getTimeZone import org.apache.spark.sql.execution.SQLConfInjectingRDD import org.apache.spark.sql.execution.datasources.SparkParsePartitionUtil import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{StringType, StructField, StructType} +import org.apache.spark.sql.{DataFrame, HoodieUnsafeUtils} import org.apache.spark.unsafe.types.UTF8String import scala.collection.JavaConverters._ @@ -131,6 +131,16 @@ object HoodieSparkUtils extends SparkAdapterSupport with SparkVersionsSupport wi def injectSQLConf[T: ClassTag](rdd: RDD[T], conf: SQLConf): RDD[T] = new SQLConfInjectingRDD(rdd, conf) + def maybeWrapDataFrameWithException(df: DataFrame, exceptionClass: String, msg: String, shouldWrap: Boolean): DataFrame = { + if (shouldWrap) { + HoodieUnsafeUtils.createDataFrameFromRDD(df.sparkSession, injectSQLConf(df.queryExecution.toRdd.mapPartitions { + rows => new ExceptionWrappingIterator[InternalRow](rows, exceptionClass, msg) + }, SQLConf.get), df.schema) + } else { + df + } + } + def safeCreateRDD(df: DataFrame, structName: String, recordNamespace: String, reconcileToLatestSchema: Boolean, latestTableSchema: org.apache.hudi.common.util.Option[Schema] = org.apache.hudi.common.util.Option.empty()): Tuple2[RDD[GenericRecord], RDD[String]] = { diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/ExceptionWrappingIterator.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/ExceptionWrappingIterator.scala new file mode 100644 index 0000000000000..994e6f0eea2dc --- /dev/null +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/ExceptionWrappingIterator.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.util + +import org.apache.hudi.common.util.ReflectionUtils + +/** + * Used to catch exceptions from an iterator + * @param in iterator to catch exceptions from + * @param exceptionClass name of exception class to throw when an exception is thrown during iteration + * @param msg message the thrown exception should have + */ +class ExceptionWrappingIterator[T](val in: Iterator[T], val exceptionClass: String, val msg: String) extends Iterator[T] { + override def hasNext: Boolean = try in.hasNext + catch { + case e: Throwable => throw createException(e) + } + + override def next: T = try in.next + catch { + case e: Throwable => throw createException(e) + } + + private def createException(e: Throwable): Throwable = { + ReflectionUtils.loadClass(exceptionClass, Array(classOf[String], classOf[Throwable]).asInstanceOf[Array[Class[_]]], msg, e).asInstanceOf[Throwable] + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java index 027f6ccb37d2b..ba747a63cbc00 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java @@ -19,12 +19,12 @@ package org.apache.hudi.avro; import org.apache.hudi.common.util.Option; +import org.apache.hudi.exception.HoodieAvroSchemaException; +import org.apache.hudi.exception.InvalidUnionTypeException; import org.apache.hudi.exception.MissingSchemaFieldException; import org.apache.hudi.exception.SchemaBackwardsCompatibilityException; import org.apache.hudi.exception.SchemaCompatibilityException; -import org.apache.hudi.exception.InvalidUnionTypeException; -import org.apache.avro.AvroRuntimeException; import org.apache.avro.Schema; import org.apache.avro.SchemaCompatibility; @@ -317,7 +317,7 @@ public static Schema resolveUnionSchema(Schema schema, String fieldSchemaFullNam .orElse(null); if (nonNullType == null) { - throw new AvroRuntimeException( + throw new HoodieAvroSchemaException( String.format("Unsupported Avro UNION type %s: Only UNION of a null type and a non-null type is supported", schema)); } @@ -349,14 +349,14 @@ public static Schema resolveNullableSchema(Schema schema) { List innerTypes = schema.getTypes(); if (innerTypes.size() != 2) { - throw new AvroRuntimeException( + throw new HoodieAvroSchemaException( String.format("Unsupported Avro UNION type %s: Only UNION of a null type and a non-null type is supported", schema)); } Schema firstInnerType = innerTypes.get(0); Schema secondInnerType = innerTypes.get(1); if ((firstInnerType.getType() != Schema.Type.NULL && secondInnerType.getType() != Schema.Type.NULL) || (firstInnerType.getType() == Schema.Type.NULL && secondInnerType.getType() == Schema.Type.NULL)) { - throw new AvroRuntimeException( + throw new HoodieAvroSchemaException( String.format("Unsupported Avro UNION type %s: Only UNION of a null type and a non-null type is supported", schema)); } return firstInnerType.getType() == Schema.Type.NULL ? secondInnerType : firstInnerType; diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java index b352099cb1e11..a7b3f5ae197b1 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java @@ -36,6 +36,7 @@ import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.exception.HoodieAvroSchemaException; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.SchemaCompatibilityException; @@ -931,7 +932,9 @@ private static Object rewriteRecordWithNewSchema(Object oldRecord, Schema oldAvr private static Object rewriteRecordWithNewSchemaInternal(Object oldRecord, Schema oldSchema, Schema newSchema, Map renameCols, Deque fieldNames) { switch (newSchema.getType()) { case RECORD: - ValidationUtils.checkArgument(oldRecord instanceof IndexedRecord, "cannot rewrite record with different type"); + if (!(oldRecord instanceof IndexedRecord)) { + throw new SchemaCompatibilityException("cannot rewrite record with different type"); + } IndexedRecord indexedRecord = (IndexedRecord) oldRecord; List fields = newSchema.getFields(); GenericData.Record newRecord = new GenericData.Record(newSchema); @@ -963,15 +966,17 @@ private static Object rewriteRecordWithNewSchemaInternal(Object oldRecord, Schem } return newRecord; case ENUM: - ValidationUtils.checkArgument( - oldSchema.getType() == Schema.Type.STRING || oldSchema.getType() == Schema.Type.ENUM, - "Only ENUM or STRING type can be converted ENUM type"); + if (oldSchema.getType() != Schema.Type.STRING && oldSchema.getType() != Schema.Type.ENUM) { + throw new SchemaCompatibilityException(String.format("Only ENUM or STRING type can be converted ENUM type. Schema type was %s", oldSchema.getType().getName())); + } if (oldSchema.getType() == Schema.Type.STRING) { return new GenericData.EnumSymbol(newSchema, oldRecord); } return oldRecord; case ARRAY: - ValidationUtils.checkArgument(oldRecord instanceof Collection, "cannot rewrite record with different type"); + if (!(oldRecord instanceof Collection)) { + throw new SchemaCompatibilityException(String.format("Cannot rewrite %s as an array", oldRecord.getClass().getName())); + } Collection array = (Collection) oldRecord; List newArray = new ArrayList<>(array.size()); fieldNames.push("element"); @@ -981,7 +986,9 @@ private static Object rewriteRecordWithNewSchemaInternal(Object oldRecord, Schem fieldNames.pop(); return newArray; case MAP: - ValidationUtils.checkArgument(oldRecord instanceof Map, "cannot rewrite record with different type"); + if (!(oldRecord instanceof Map)) { + throw new SchemaCompatibilityException(String.format("Cannot rewrite %s as a map", oldRecord.getClass().getName())); + } Map map = (Map) oldRecord; Map newMap = new HashMap<>(map.size(), 1.0f); fieldNames.push("value"); @@ -1029,7 +1036,7 @@ private static Object rewritePrimaryType(Object oldValue, Schema oldSchema, Sche BigDecimal bd = new BigDecimal(new BigInteger(bytes), decimal.getScale()).setScale(((Decimal) newSchema.getLogicalType()).getScale()); return DECIMAL_CONVERSION.toFixed(bd, newSchema, newSchema.getLogicalType()); } else { - throw new UnsupportedOperationException("Fixed type size change is not currently supported"); + throw new HoodieAvroSchemaException("Fixed type size change is not currently supported"); } } @@ -1045,7 +1052,7 @@ private static Object rewritePrimaryType(Object oldValue, Schema oldSchema, Sche } default: - throw new AvroRuntimeException("Unknown schema type: " + newSchema.getType()); + throw new HoodieAvroSchemaException("Unknown schema type: " + newSchema.getType()); } } else { return rewritePrimaryTypeWithDiffSchemaType(oldValue, oldSchema, newSchema); @@ -1130,7 +1137,7 @@ private static Object rewritePrimaryTypeWithDiffSchemaType(Object oldValue, Sche break; default: } - throw new AvroRuntimeException(String.format("cannot support rewrite value for schema type: %s since the old schema type is: %s", newSchema, oldSchema)); + throw new HoodieAvroSchemaException(String.format("cannot support rewrite value for schema type: %s since the old schema type is: %s", newSchema, oldSchema)); } /** diff --git a/hudi-common/src/main/java/org/apache/hudi/exception/HoodieAvroSchemaException.java b/hudi-common/src/main/java/org/apache/hudi/exception/HoodieAvroSchemaException.java new file mode 100644 index 0000000000000..c19c88c15c8b6 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/exception/HoodieAvroSchemaException.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.exception; + +/** + * Thrown when we detect in Hudi code that a record schema + * violates Avro rules. This can happen even when using Spark + * because we use Avro schema internally + */ +public class HoodieAvroSchemaException extends SchemaCompatibilityException { + public HoodieAvroSchemaException(String message) { + super(message); + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/exception/HoodieRecordCreationException.java b/hudi-common/src/main/java/org/apache/hudi/exception/HoodieRecordCreationException.java new file mode 100644 index 0000000000000..dec70b369dae0 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/exception/HoodieRecordCreationException.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.exception; + +/** + * Exception thrown during HoodieRecord construction for any failure + * that is not a KeyGeneration failure. An example of a failure would be if the + * record is malformed. + */ +public class HoodieRecordCreationException extends HoodieException { + + public HoodieRecordCreationException(String message, Throwable t) { + super(message, t); + } +} diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index d9ac8ae798f2d..98c7c5d29f565 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -51,7 +51,7 @@ import org.apache.hudi.common.util.{CommitUtils, StringUtils, Option => HOption} import org.apache.hudi.config.HoodieBootstrapConfig.{BASE_PATH, INDEX_CLASS_NAME} import org.apache.hudi.config.HoodieWriteConfig.SPARK_SQL_MERGE_INTO_PREPPED_KEY import org.apache.hudi.config.{HoodieCompactionConfig, HoodieInternalConfig, HoodieWriteConfig} -import org.apache.hudi.exception.{HoodieException, HoodieWriteConflictException} +import org.apache.hudi.exception.{HoodieException, HoodieRecordCreationException, HoodieWriteConflictException} import org.apache.hudi.hive.{HiveSyncConfigHolder, HiveSyncTool} import org.apache.hudi.internal.schema.InternalSchema import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter @@ -78,6 +78,7 @@ import java.util.function.BiConsumer import scala.collection.JavaConversions._ import scala.collection.JavaConverters.setAsJavaSetConverter import scala.collection.mutable +import scala.util.{Failure, Success, Try} object HoodieSparkSqlWriter { @@ -478,10 +479,13 @@ class HoodieSparkSqlWriterInternal { } instantTime = client.createNewInstantTime() // Convert to RDD[HoodieRecord] - val hoodieRecords = - HoodieCreateRecordUtils.createHoodieRecordRdd(HoodieCreateRecordUtils.createHoodieRecordRddArgs(df, - writeConfig, parameters, avroRecordName, avroRecordNamespace, writerSchema, - processedDataSchema, operation, instantTime, preppedSparkSqlWrites, preppedSparkSqlMergeInto, preppedWriteOperation)) + val hoodieRecords = Try(HoodieCreateRecordUtils.createHoodieRecordRdd( + HoodieCreateRecordUtils.createHoodieRecordRddArgs(df, writeConfig, parameters, avroRecordName, + avroRecordNamespace, writerSchema, processedDataSchema, operation, instantTime, preppedSparkSqlWrites, + preppedSparkSqlMergeInto, preppedWriteOperation))) match { + case Success(recs) => recs + case Failure(e) => throw new HoodieRecordCreationException("Failed to create Hoodie Spark Record", e) + } val dedupedHoodieRecords = if (hoodieConfig.getBoolean(INSERT_DROP_DUPS) && operation != WriteOperationType.INSERT_OVERWRITE_TABLE && operation != WriteOperationType.INSERT_OVERWRITE) { diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieStreamerConfig.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieStreamerConfig.java index b3b64cff905b6..e50e7fa06124b 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieStreamerConfig.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieStreamerConfig.java @@ -132,4 +132,11 @@ public class HoodieStreamerConfig extends HoodieConfig { .sinceVersion("0.14.0") .withDocumentation("Number of records to sample from the first write. To improve the estimation's accuracy, " + "for smaller or more compressable record size, set the sample size bigger. For bigger or less compressable record size, set smaller."); + + public static final ConfigProperty ROW_THROW_EXPLICIT_EXCEPTIONS = ConfigProperty + .key(STREAMER_CONFIG_PREFIX + "row.throw.explicit.exceptions") + .defaultValue(false) + .markAdvanced() + .sinceVersion("0.15.0") + .withDocumentation("When enabled, the dataframe generated from reading source data is wrapped with an exception handler to explicitly surface exceptions."); } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/RowSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/RowSource.java index f2cc48f280c0d..1c7e9d9909889 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/RowSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/RowSource.java @@ -18,10 +18,13 @@ package org.apache.hudi.utilities.sources; +import org.apache.hudi.HoodieSparkUtils; import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.util.ConfigUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.utilities.UtilHelpers; +import org.apache.hudi.utilities.exception.HoodieReadFromSourceException; import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.hudi.utilities.sources.helpers.SanitizationUtils; @@ -30,6 +33,8 @@ import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; +import static org.apache.hudi.utilities.config.HoodieStreamerConfig.ROW_THROW_EXPLICIT_EXCEPTIONS; + public abstract class RowSource extends Source> { public RowSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession, @@ -46,7 +51,9 @@ protected final InputBatch> fetchNewData(Option lastCkptStr Dataset sanitizedRows = SanitizationUtils.sanitizeColumnNamesForAvro(dsr, props); SchemaProvider rowSchemaProvider = UtilHelpers.createRowBasedSchemaProvider(sanitizedRows.schema(), props, sparkContext); - return new InputBatch<>(Option.of(sanitizedRows), res.getValue(), rowSchemaProvider); + Dataset wrappedDf = HoodieSparkUtils.maybeWrapDataFrameWithException(sanitizedRows, HoodieReadFromSourceException.class.getName(), + "Failed to read from row source", ConfigUtils.getBooleanWithAltKeys(props, ROW_THROW_EXPLICIT_EXCEPTIONS)); + return new InputBatch<>(Option.of(wrappedDf), res.getValue(), rowSchemaProvider); }).orElseGet(() -> new InputBatch<>(res.getKey(), res.getValue())); } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java index 90315bc97643c..61d7793e6ad03 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java @@ -36,6 +36,9 @@ import org.apache.hudi.common.util.collection.ClosableIterator; import org.apache.hudi.common.util.collection.CloseableMappingIterator; import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieKeyException; +import org.apache.hudi.exception.HoodieKeyGeneratorException; +import org.apache.hudi.exception.HoodieRecordCreationException; import org.apache.hudi.keygen.BuiltinKeyGenerator; import org.apache.hudi.keygen.KeyGenUtils; import org.apache.hudi.keygen.constant.KeyGeneratorOptions; @@ -104,10 +107,7 @@ public static Option> createHoodieRecords(HoodieStreamer.C : DataSourceUtils.createPayload(cfg.payloadClassName, gr); avroRecords.add(Either.left(new HoodieAvroRecord<>(hoodieKey, payload))); } catch (Exception e) { - if (!shouldErrorTable) { - throw e; - } - avroRecords.add(generateErrorRecord(genRec)); + avroRecords.add(generateErrorRecordOrThrowException(genRec, e, shouldErrorTable)); } } return avroRecords.iterator(); @@ -135,10 +135,7 @@ public static Option> createHoodieRecords(HoodieStreamer.C return Either.left(new HoodieSparkRecord(new HoodieKey(recordKey, partitionPath), HoodieInternalRowUtils.getCachedUnsafeProjection(baseStructType, targetStructType).apply(row), targetStructType, false)); } catch (Exception e) { - if (!shouldErrorTable) { - throw e; - } - return generateErrorRecord(rec); + return generateErrorRecordOrThrowException(rec, e, shouldErrorTable); } }); @@ -159,7 +156,16 @@ public static Option> createHoodieRecords(HoodieStreamer.C * @return the representation of error record (empty {@link HoodieRecord} and the error record * String) for writing to error table. */ - private static Either generateErrorRecord(GenericRecord genRec) { + private static Either generateErrorRecordOrThrowException(GenericRecord genRec, Exception e, boolean shouldErrorTable) { + if (!shouldErrorTable) { + if (e instanceof HoodieKeyException) { + throw (HoodieKeyException) e; + } else if (e instanceof HoodieKeyGeneratorException) { + throw (HoodieKeyGeneratorException) e; + } else { + throw new HoodieRecordCreationException("Failed to create Hoodie Record", e); + } + } try { return Either.right(HoodieAvroUtils.avroToJsonString(genRec, false)); } catch (Exception ex) { diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceFormatAdapter.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceFormatAdapter.java index 1796c96dab867..c379472b26eb6 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceFormatAdapter.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceFormatAdapter.java @@ -23,8 +23,10 @@ import org.apache.hudi.HoodieSparkUtils; import org.apache.hudi.avro.MercifulJsonConverter; import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.util.ConfigUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.exception.SchemaCompatibilityException; import org.apache.hudi.utilities.UtilHelpers; import org.apache.hudi.utilities.schema.FilebasedSchemaProvider; import org.apache.hudi.utilities.schema.SchemaProvider; @@ -53,6 +55,7 @@ import scala.util.Either; +import static org.apache.hudi.utilities.config.HoodieStreamerConfig.ROW_THROW_EXPLICIT_EXCEPTIONS; import static org.apache.hudi.utilities.config.HoodieStreamerConfig.SANITIZE_SCHEMA_FIELD_NAMES; import static org.apache.hudi.utilities.config.HoodieStreamerConfig.SCHEMA_FIELD_NAME_INVALID_CHAR_MASK; import static org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE; @@ -66,6 +69,8 @@ public class SourceFormatAdapter implements Closeable { private final Source source; private boolean shouldSanitize = SANITIZE_SCHEMA_FIELD_NAMES.defaultValue(); + + private boolean wrapWithException = ROW_THROW_EXPLICIT_EXCEPTIONS.defaultValue(); private String invalidCharMask = SCHEMA_FIELD_NAME_INVALID_CHAR_MASK.defaultValue(); private Option errorTableWriter = Option.empty(); @@ -80,6 +85,7 @@ public SourceFormatAdapter(Source source, Option errorTabl if (props.isPresent()) { this.shouldSanitize = SanitizationUtils.shouldSanitize(props.get()); this.invalidCharMask = SanitizationUtils.getInvalidCharMask(props.get()); + this.wrapWithException = ConfigUtils.getBooleanWithAltKeys(props.get(), ROW_THROW_EXPLICIT_EXCEPTIONS); } if (this.shouldSanitize && source.getSourceType() == Source.SourceType.PROTO) { throw new IllegalArgumentException("PROTO cannot be sanitized"); @@ -244,7 +250,8 @@ public InputBatch> fetchNewDataInRowFormat(Option lastCkptS StructType dataType = AvroConversionUtils.convertAvroSchemaToStructType(sourceSchema); return new InputBatch<>( Option.ofNullable( - r.getBatch().map(rdd -> source.getSparkSession().read().schema(dataType).json(rdd)).orElse(null)), + r.getBatch().map(rdd -> HoodieSparkUtils.maybeWrapDataFrameWithException(source.getSparkSession().read().schema(dataType).json(rdd), + SchemaCompatibilityException.class.getName(), "Schema does not match json data", wrapWithException)).orElse(null)), r.getCheckpointForNextBatch(), r.getSchemaProvider()); } } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroDFSSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroDFSSource.java index 5ccf9ad2b2963..808a4ca57cea1 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroDFSSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroDFSSource.java @@ -39,8 +39,7 @@ public void setup() throws Exception { } @Override - protected Source prepareDFSSource() { - TypedProperties props = new TypedProperties(); + protected Source prepareDFSSource(TypedProperties props) { props.setProperty("hoodie.streamer.source.dfs.root", dfsRoot); try { return new AvroDFSSource(props, jsc, sparkSession, schemaProvider); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestCsvDFSSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestCsvDFSSource.java index 6a2bbcd01366a..c4bb59ff812fe 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestCsvDFSSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestCsvDFSSource.java @@ -46,8 +46,7 @@ public void setup() throws Exception { } @Override - public Source prepareDFSSource() { - TypedProperties props = new TypedProperties(); + public Source prepareDFSSource(TypedProperties props) { props.setProperty("hoodie.streamer.source.dfs.root", dfsRoot); props.setProperty("hoodie.streamer.csv.header", Boolean.toString(true)); props.setProperty("hoodie.streamer.csv.sep", "\t"); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonDFSSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonDFSSource.java index 24a341fe9c335..ae134e862beaf 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonDFSSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonDFSSource.java @@ -20,15 +20,29 @@ import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.exception.SchemaCompatibilityException; +import org.apache.hudi.utilities.config.HoodieStreamerConfig; +import org.apache.hudi.utilities.streamer.SourceFormatAdapter; import org.apache.hudi.utilities.testutils.UtilitiesTestBase; import org.apache.hudi.utilities.testutils.sources.AbstractDFSSourceTestBase; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import java.io.IOException; +import java.io.PrintStream; import java.util.List; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + /** * Basic tests for {@link JsonDFSSource}. */ @@ -42,8 +56,7 @@ public void setup() throws Exception { } @Override - public Source prepareDFSSource() { - TypedProperties props = new TypedProperties(); + public Source prepareDFSSource(TypedProperties props) { props.setProperty("hoodie.streamer.source.dfs.root", dfsRoot); return new JsonDFSSource(props, jsc, sparkSession, schemaProvider); } @@ -53,4 +66,36 @@ public void writeNewDataToFile(List records, Path path) throws IOE UtilitiesTestBase.Helpers.saveStringsToDFS( Helpers.jsonifyRecords(records), fs, path.toString()); } + + @Test + public void testCorruptedSourceFile() throws IOException { + fs.mkdirs(new Path(dfsRoot)); + TypedProperties props = new TypedProperties(); + props.setProperty(HoodieStreamerConfig.ROW_THROW_EXPLICIT_EXCEPTIONS.key(), "true"); + SourceFormatAdapter sourceFormatAdapter = new SourceFormatAdapter(prepareDFSSource(props), Option.empty(), Option.of(props)); + generateOneFile("1", "000", 10); + generateOneFile("2", "000", 10); + RemoteIterator files = fs.listFiles(generateOneFile("3", "000", 10), true); + + FileStatus file1Status = files.next(); + InputBatch> batch = sourceFormatAdapter.fetchNewDataInRowFormat(Option.empty(), Long.MAX_VALUE); + corruptFile(file1Status.getPath()); + assertTrue(batch.getBatch().isPresent()); + Throwable t = assertThrows(Exception.class, + () -> batch.getBatch().get().show(30)); + while (t != null) { + if (t instanceof SchemaCompatibilityException) { + return; + } + t = t.getCause(); + } + throw new AssertionError("Exception does not have SchemaCompatibility in its trace", t); + } + + protected void corruptFile(Path path) throws IOException { + PrintStream os = new PrintStream(fs.appendFile(path).build()); + os.println("🤷‍"); + os.flush(); + os.close(); + } } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestParquetDFSSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestParquetDFSSource.java index 159ababcf471c..a9c448748c914 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestParquetDFSSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestParquetDFSSource.java @@ -41,8 +41,7 @@ public void setup() throws Exception { } @Override - public Source prepareDFSSource() { - TypedProperties props = new TypedProperties(); + public Source prepareDFSSource(TypedProperties props) { props.setProperty("hoodie.streamer.source.dfs.root", dfsRoot); return new ParquetDFSSource(props, jsc, sparkSession, schemaProvider); } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractDFSSourceTestBase.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractDFSSourceTestBase.java index 0de087ece73e0..76a1a64536708 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractDFSSourceTestBase.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractDFSSourceTestBase.java @@ -19,6 +19,7 @@ package org.apache.hudi.utilities.testutils.sources; import org.apache.hudi.AvroConversionUtils; +import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.util.Option; @@ -74,7 +75,11 @@ public void setup() throws Exception { * * @return A {@link Source} using DFS as the file system. */ - protected abstract Source prepareDFSSource(); + protected final Source prepareDFSSource() { + return prepareDFSSource(new TypedProperties()); + } + + protected abstract Source prepareDFSSource(TypedProperties props); /** * Writes test data, i.e., a {@link List} of {@link HoodieRecord}, to a file on DFS.