Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.avro.generic.GenericRecord
import org.apache.avro.{JsonProperties, Schema}
import org.apache.hudi.HoodieSparkUtils.sparkAdapter
import org.apache.hudi.avro.AvroSchemaUtils
import org.apache.hudi.exception.SchemaCompatibilityException
import org.apache.hudi.internal.schema.HoodieSchemaException
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
Expand Down Expand Up @@ -58,9 +59,16 @@ object AvroConversionUtils {
*/
def createInternalRowToAvroConverter(rootCatalystType: StructType, rootAvroType: Schema, nullable: Boolean): InternalRow => GenericRecord = {
val serializer = sparkAdapter.createAvroSerializer(rootCatalystType, rootAvroType, nullable)
row => serializer
.serialize(row)
.asInstanceOf[GenericRecord]
row => {
try {
serializer
.serialize(row)
.asInstanceOf[GenericRecord]
} catch {
case e: HoodieSchemaException => throw e
case e => throw new SchemaCompatibilityException("Failed to convert spark record into avro record", e)
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,25 @@

package org.apache.hudi

import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.hadoop.fs.Path
import org.apache.hudi.HoodieConversionUtils.toScalaOption
import org.apache.hudi.avro.{AvroSchemaUtils, HoodieAvroUtils}
import org.apache.hudi.client.utils.SparkRowSerDe
import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.hadoop.fs.CachingPath

import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.hadoop.fs.Path
import org.apache.hudi.util.ExceptionWrappingIterator
import org.apache.spark.SPARK_VERSION
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.util.DateTimeUtils.getTimeZone
import org.apache.spark.sql.execution.SQLConfInjectingRDD
import org.apache.spark.sql.execution.datasources.SparkParsePartitionUtil
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, HoodieUnsafeUtils}
import org.apache.spark.unsafe.types.UTF8String

import scala.collection.JavaConverters._
Expand Down Expand Up @@ -131,6 +131,16 @@ object HoodieSparkUtils extends SparkAdapterSupport with SparkVersionsSupport wi
def injectSQLConf[T: ClassTag](rdd: RDD[T], conf: SQLConf): RDD[T] =
new SQLConfInjectingRDD(rdd, conf)

def maybeWrapDataFrameWithException(df: DataFrame, exceptionClass: String, msg: String, shouldWrap: Boolean): DataFrame = {
if (shouldWrap) {
HoodieUnsafeUtils.createDataFrameFromRDD(df.sparkSession, injectSQLConf(df.queryExecution.toRdd.mapPartitions {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is SQLConf injection necessary here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#6352 alexey says that "Yes, it will propagate to all RDDs in the execution chain (up to a shuffling point)". So I guess the question is if there is a shuffling point?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My doubt is if wrapping the df with exception needs the SQL conf injection. So you're saying this needs SQL conf injection so the configs can still be propagated to the executors.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's keep an eye on the DAG and Spark execution when this is rolled out. I think this is OK for now.

rows => new ExceptionWrappingIterator[InternalRow](rows, exceptionClass, msg)
}, SQLConf.get), df.schema)
} else {
df
}
}

def safeCreateRDD(df: DataFrame, structName: String, recordNamespace: String, reconcileToLatestSchema: Boolean,
latestTableSchema: org.apache.hudi.common.util.Option[Schema] = org.apache.hudi.common.util.Option.empty()):
Tuple2[RDD[GenericRecord], RDD[String]] = {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.hudi.util

import org.apache.hudi.common.util.ReflectionUtils

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Docs here?

/**
* Used to catch exceptions from an iterator
* @param in iterator to catch exceptions from
* @param exceptionClass name of exception class to throw when an exception is thrown during iteration
* @param msg message the thrown exception should have
*/
class ExceptionWrappingIterator[T](val in: Iterator[T], val exceptionClass: String, val msg: String) extends Iterator[T] {
override def hasNext: Boolean = try in.hasNext
catch {
case e: Throwable => throw createException(e)
}

override def next: T = try in.next
catch {
case e: Throwable => throw createException(e)
}

private def createException(e: Throwable): Throwable = {
ReflectionUtils.loadClass(exceptionClass, Array(classOf[String], classOf[Throwable]).asInstanceOf[Array[Class[_]]], msg, e).asInstanceOf[Throwable]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@
package org.apache.hudi.avro;

import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieAvroSchemaException;
import org.apache.hudi.exception.InvalidUnionTypeException;
import org.apache.hudi.exception.MissingSchemaFieldException;
import org.apache.hudi.exception.SchemaBackwardsCompatibilityException;
import org.apache.hudi.exception.SchemaCompatibilityException;
import org.apache.hudi.exception.InvalidUnionTypeException;

import org.apache.avro.AvroRuntimeException;
import org.apache.avro.Schema;
import org.apache.avro.SchemaCompatibility;

Expand Down Expand Up @@ -317,7 +317,7 @@ public static Schema resolveUnionSchema(Schema schema, String fieldSchemaFullNam
.orElse(null);

if (nonNullType == null) {
throw new AvroRuntimeException(
throw new HoodieAvroSchemaException(
String.format("Unsupported Avro UNION type %s: Only UNION of a null type and a non-null type is supported", schema));
}

Expand Down Expand Up @@ -349,14 +349,14 @@ public static Schema resolveNullableSchema(Schema schema) {
List<Schema> innerTypes = schema.getTypes();

if (innerTypes.size() != 2) {
throw new AvroRuntimeException(
throw new HoodieAvroSchemaException(
String.format("Unsupported Avro UNION type %s: Only UNION of a null type and a non-null type is supported", schema));
}
Schema firstInnerType = innerTypes.get(0);
Schema secondInnerType = innerTypes.get(1);
if ((firstInnerType.getType() != Schema.Type.NULL && secondInnerType.getType() != Schema.Type.NULL)
|| (firstInnerType.getType() == Schema.Type.NULL && secondInnerType.getType() == Schema.Type.NULL)) {
throw new AvroRuntimeException(
throw new HoodieAvroSchemaException(
String.format("Unsupported Avro UNION type %s: Only UNION of a null type and a non-null type is supported", schema));
}
return firstInnerType.getType() == Schema.Type.NULL ? secondInnerType : firstInnerType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieAvroSchemaException;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.SchemaCompatibilityException;
Expand Down Expand Up @@ -931,7 +932,9 @@ private static Object rewriteRecordWithNewSchema(Object oldRecord, Schema oldAvr
private static Object rewriteRecordWithNewSchemaInternal(Object oldRecord, Schema oldSchema, Schema newSchema, Map<String, String> renameCols, Deque<String> fieldNames) {
switch (newSchema.getType()) {
case RECORD:
ValidationUtils.checkArgument(oldRecord instanceof IndexedRecord, "cannot rewrite record with different type");
if (!(oldRecord instanceof IndexedRecord)) {
throw new SchemaCompatibilityException("cannot rewrite record with different type");
}
IndexedRecord indexedRecord = (IndexedRecord) oldRecord;
List<Schema.Field> fields = newSchema.getFields();
GenericData.Record newRecord = new GenericData.Record(newSchema);
Expand Down Expand Up @@ -963,15 +966,17 @@ private static Object rewriteRecordWithNewSchemaInternal(Object oldRecord, Schem
}
return newRecord;
case ENUM:
ValidationUtils.checkArgument(
oldSchema.getType() == Schema.Type.STRING || oldSchema.getType() == Schema.Type.ENUM,
"Only ENUM or STRING type can be converted ENUM type");
if (oldSchema.getType() != Schema.Type.STRING && oldSchema.getType() != Schema.Type.ENUM) {
throw new SchemaCompatibilityException(String.format("Only ENUM or STRING type can be converted ENUM type. Schema type was %s", oldSchema.getType().getName()));
}
if (oldSchema.getType() == Schema.Type.STRING) {
return new GenericData.EnumSymbol(newSchema, oldRecord);
}
return oldRecord;
case ARRAY:
ValidationUtils.checkArgument(oldRecord instanceof Collection, "cannot rewrite record with different type");
if (!(oldRecord instanceof Collection)) {
throw new SchemaCompatibilityException(String.format("Cannot rewrite %s as an array", oldRecord.getClass().getName()));
}
Collection array = (Collection) oldRecord;
List<Object> newArray = new ArrayList<>(array.size());
fieldNames.push("element");
Expand All @@ -981,7 +986,9 @@ private static Object rewriteRecordWithNewSchemaInternal(Object oldRecord, Schem
fieldNames.pop();
return newArray;
case MAP:
ValidationUtils.checkArgument(oldRecord instanceof Map, "cannot rewrite record with different type");
if (!(oldRecord instanceof Map)) {
throw new SchemaCompatibilityException(String.format("Cannot rewrite %s as a map", oldRecord.getClass().getName()));
}
Map<Object, Object> map = (Map<Object, Object>) oldRecord;
Map<Object, Object> newMap = new HashMap<>(map.size(), 1.0f);
fieldNames.push("value");
Expand Down Expand Up @@ -1029,7 +1036,7 @@ private static Object rewritePrimaryType(Object oldValue, Schema oldSchema, Sche
BigDecimal bd = new BigDecimal(new BigInteger(bytes), decimal.getScale()).setScale(((Decimal) newSchema.getLogicalType()).getScale());
return DECIMAL_CONVERSION.toFixed(bd, newSchema, newSchema.getLogicalType());
} else {
throw new UnsupportedOperationException("Fixed type size change is not currently supported");
throw new HoodieAvroSchemaException("Fixed type size change is not currently supported");
}
}

Expand All @@ -1045,7 +1052,7 @@ private static Object rewritePrimaryType(Object oldValue, Schema oldSchema, Sche
}

default:
throw new AvroRuntimeException("Unknown schema type: " + newSchema.getType());
throw new HoodieAvroSchemaException("Unknown schema type: " + newSchema.getType());
}
} else {
return rewritePrimaryTypeWithDiffSchemaType(oldValue, oldSchema, newSchema);
Expand Down Expand Up @@ -1130,7 +1137,7 @@ private static Object rewritePrimaryTypeWithDiffSchemaType(Object oldValue, Sche
break;
default:
}
throw new AvroRuntimeException(String.format("cannot support rewrite value for schema type: %s since the old schema type is: %s", newSchema, oldSchema));
throw new HoodieAvroSchemaException(String.format("cannot support rewrite value for schema type: %s since the old schema type is: %s", newSchema, oldSchema));
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.hudi.exception;

/**
* Thrown when we detect in Hudi code that a record schema
* violates Avro rules. This can happen even when using Spark
* because we use Avro schema internally
*/
public class HoodieAvroSchemaException extends SchemaCompatibilityException {
public HoodieAvroSchemaException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.hudi.exception;

/**
* Exception thrown during HoodieRecord construction for any failure
* that is not a KeyGeneration failure. An example of a failure would be if the
* record is malformed.
*/
public class HoodieRecordCreationException extends HoodieException {

public HoodieRecordCreationException(String message, Throwable t) {
super(message, t);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ import org.apache.hudi.common.util.{CommitUtils, StringUtils, Option => HOption}
import org.apache.hudi.config.HoodieBootstrapConfig.{BASE_PATH, INDEX_CLASS_NAME}
import org.apache.hudi.config.HoodieWriteConfig.SPARK_SQL_MERGE_INTO_PREPPED_KEY
import org.apache.hudi.config.{HoodieCompactionConfig, HoodieInternalConfig, HoodieWriteConfig}
import org.apache.hudi.exception.{HoodieException, HoodieWriteConflictException}
import org.apache.hudi.exception.{HoodieException, HoodieRecordCreationException, HoodieWriteConflictException}
import org.apache.hudi.hive.{HiveSyncConfigHolder, HiveSyncTool}
import org.apache.hudi.internal.schema.InternalSchema
import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
Expand All @@ -78,6 +78,7 @@ import java.util.function.BiConsumer
import scala.collection.JavaConversions._
import scala.collection.JavaConverters.setAsJavaSetConverter
import scala.collection.mutable
import scala.util.{Failure, Success, Try}

object HoodieSparkSqlWriter {

Expand Down Expand Up @@ -478,10 +479,13 @@ class HoodieSparkSqlWriterInternal {
}
instantTime = client.createNewInstantTime()
// Convert to RDD[HoodieRecord]
val hoodieRecords =
HoodieCreateRecordUtils.createHoodieRecordRdd(HoodieCreateRecordUtils.createHoodieRecordRddArgs(df,
writeConfig, parameters, avroRecordName, avroRecordNamespace, writerSchema,
processedDataSchema, operation, instantTime, preppedSparkSqlWrites, preppedSparkSqlMergeInto, preppedWriteOperation))
val hoodieRecords = Try(HoodieCreateRecordUtils.createHoodieRecordRdd(
HoodieCreateRecordUtils.createHoodieRecordRddArgs(df, writeConfig, parameters, avroRecordName,
avroRecordNamespace, writerSchema, processedDataSchema, operation, instantTime, preppedSparkSqlWrites,
preppedSparkSqlMergeInto, preppedWriteOperation))) match {
case Success(recs) => recs
case Failure(e) => throw new HoodieRecordCreationException("Failed to create Hoodie Spark Record", e)
}

val dedupedHoodieRecords =
if (hoodieConfig.getBoolean(INSERT_DROP_DUPS) && operation != WriteOperationType.INSERT_OVERWRITE_TABLE && operation != WriteOperationType.INSERT_OVERWRITE) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,4 +132,11 @@ public class HoodieStreamerConfig extends HoodieConfig {
.sinceVersion("0.14.0")
.withDocumentation("Number of records to sample from the first write. To improve the estimation's accuracy, "
+ "for smaller or more compressable record size, set the sample size bigger. For bigger or less compressable record size, set smaller.");

public static final ConfigProperty<Boolean> ROW_THROW_EXPLICIT_EXCEPTIONS = ConfigProperty
.key(STREAMER_CONFIG_PREFIX + "row.throw.explicit.exceptions")
.defaultValue(false)
.markAdvanced()
.sinceVersion("0.15.0")
.withDocumentation("When enabled, the dataframe generated from reading source data is wrapped with an exception handler to explicitly surface exceptions.");
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@

package org.apache.hudi.utilities.sources;

import org.apache.hudi.HoodieSparkUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.exception.HoodieReadFromSourceException;
import org.apache.hudi.utilities.schema.SchemaProvider;

import org.apache.hudi.utilities.sources.helpers.SanitizationUtils;
Expand All @@ -30,6 +33,8 @@
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

import static org.apache.hudi.utilities.config.HoodieStreamerConfig.ROW_THROW_EXPLICIT_EXCEPTIONS;

public abstract class RowSource extends Source<Dataset<Row>> {

public RowSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession,
Expand All @@ -46,7 +51,9 @@ protected final InputBatch<Dataset<Row>> fetchNewData(Option<String> lastCkptStr
Dataset<Row> sanitizedRows = SanitizationUtils.sanitizeColumnNamesForAvro(dsr, props);
SchemaProvider rowSchemaProvider =
UtilHelpers.createRowBasedSchemaProvider(sanitizedRows.schema(), props, sparkContext);
return new InputBatch<>(Option.of(sanitizedRows), res.getValue(), rowSchemaProvider);
Dataset<Row> wrappedDf = HoodieSparkUtils.maybeWrapDataFrameWithException(sanitizedRows, HoodieReadFromSourceException.class.getName(),
"Failed to read from row source", ConfigUtils.getBooleanWithAltKeys(props, ROW_THROW_EXPLICIT_EXCEPTIONS));
return new InputBatch<>(Option.of(wrappedDf), res.getValue(), rowSchemaProvider);
}).orElseGet(() -> new InputBatch<>(res.getKey(), res.getValue()));
}
}
Loading