Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,13 @@ import org.apache.avro.LogicalTypes.{TimestampMicros, TimestampMillis}
import org.apache.avro.Schema.Type._
import org.apache.avro.generic.GenericData.{Fixed, Record}
import org.apache.avro.generic.{GenericData, GenericFixed, GenericRecord}
import org.apache.avro.{LogicalTypes, Schema}
import org.apache.avro.{LogicalTypes, Schema, SchemaBuilder}
import org.apache.spark.sql.Row
import org.apache.spark.sql.avro.{IncompatibleSchemaException, SchemaConverters}
import org.apache.spark.sql.catalyst.expressions.GenericRow
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.types._
import org.apache.hudi.AvroConversionUtils._

import scala.collection.JavaConverters._

Expand Down Expand Up @@ -137,7 +138,7 @@ object AvroConversionHelper {
case (struct: StructType, RECORD) =>
val length = struct.fields.length
val converters = new Array[AnyRef => AnyRef](length)
val avroFieldIndexes = new Array[Int](length)
val avroFieldNames = new Array[String](length)
var i = 0
while (i < length) {
val sqlField = struct.fields(i)
Expand All @@ -146,7 +147,7 @@ object AvroConversionHelper {
val converter = createConverter(avroField.schema(), sqlField.dataType,
path :+ sqlField.name)
converters(i) = converter
avroFieldIndexes(i) = avroField.pos()
avroFieldNames(i) = avroField.name()
} else if (!sqlField.nullable) {
throw new IncompatibleSchemaException(
s"Cannot find non-nullable field ${sqlField.name} at path ${path.mkString(".")} " +
Expand All @@ -168,7 +169,7 @@ object AvroConversionHelper {
while (i < converters.length) {
if (converters(i) != null) {
val converter = converters(i)
result(i) = converter(record.get(avroFieldIndexes(i)))
result(i) = converter(record.get(avroFieldNames(i)))
}
i += 1
}
Expand Down Expand Up @@ -365,4 +366,133 @@ object AvroConversionHelper {
}
}
}

def createConverterToAvroV2(dataType: DataType,
structName: String,
recordNamespace: String): Any => Any = {
dataType match {
case BinaryType => (item: Any) =>
item match {
case null => null
case bytes: Array[Byte] => ByteBuffer.wrap(bytes)
}
case IntegerType | LongType |
FloatType | DoubleType | StringType | BooleanType => identity
case ByteType => (item: Any) =>
if (item == null) null else item.asInstanceOf[Byte].intValue
case ShortType => (item: Any) =>
if (item == null) null else item.asInstanceOf[Short].intValue
case dec: DecimalType =>
val schema = SchemaConverters.toAvroType(dec, nullable = false, structName, recordNamespace)
(item: Any) => {
Option(item).map { _ =>
val bigDecimalValue = item.asInstanceOf[java.math.BigDecimal]
val decimalConversions = new DecimalConversion()
decimalConversions.toFixed(bigDecimalValue, schema, LogicalTypes.decimal(dec.precision, dec.scale))
}.orNull
}
case TimestampType => (item: Any) =>
// Convert time to microseconds since spark-avro by default converts TimestampType to
// Avro Logical TimestampMicros
Option(item).map(_.asInstanceOf[Timestamp].getTime * 1000).orNull
case DateType => (item: Any) =>
Option(item).map(_.asInstanceOf[Date].toLocalDate.toEpochDay.toInt).orNull
case ArrayType(elementType, _) =>
val elementConverter = createConverterToAvro(
elementType,
structName,
recordNamespace)
(item: Any) => {
if (item == null) {
null
} else {
val sourceArray = item.asInstanceOf[Seq[Any]]
val sourceArraySize = sourceArray.size
val targetList = new util.ArrayList[Any](sourceArraySize)
var idx = 0
while (idx < sourceArraySize) {
targetList.add(elementConverter(sourceArray(idx)))
idx += 1
}
targetList
}
}
case MapType(StringType, valueType, _) =>
val valueConverter = createConverterToAvro(
valueType,
structName,
recordNamespace)
(item: Any) => {
if (item == null) {
null
} else {
val javaMap = new util.HashMap[String, Any]()
item.asInstanceOf[Map[String, Any]].foreach { case (key, value) =>
javaMap.put(key, valueConverter(value))
}
javaMap
}
}
case structType: StructType =>
val schema: Schema = removeNamespaceFromFixedFields(convertStructTypeToAvroSchema(structType, structName, recordNamespace))
val childNameSpace = if (recordNamespace != "") s"$recordNamespace.$structName" else structName
val fieldConverters = structType.fields.map(field =>
createConverterToAvro(
field.dataType,
field.name,
childNameSpace))
(item: Any) => {
if (item == null) {
null
} else {
val record = new Record(schema)
val convertersIterator = fieldConverters.iterator
val fieldNamesIterator = dataType.asInstanceOf[StructType].fieldNames.iterator
val rowIterator = item.asInstanceOf[Row].toSeq.iterator

while (convertersIterator.hasNext) {
val converter = convertersIterator.next()
record.put(fieldNamesIterator.next(), converter(rowIterator.next()))
}
record
}
}
}
}

/**
* Remove namespace from fixed field.
* org.apache.spark.sql.avro.SchemaConverters.toAvroType method adds namespace to fixed avro field
* https://github.com/apache/spark/blob/master/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala#L177
* So, we need to remove that namespace so that reader schema without namespace do not throw erorr like this one
* org.apache.avro.AvroTypeException: Found hoodie.source.hoodie_source.height.fixed, expecting fixed
*
* @param schema Schema from which namespace needs to be removed for fixed fields
* @return input schema with namespace removed for fixed fields, if any
*/
def removeNamespaceFromFixedFields(schema: Schema): Schema ={
val fields = new util.ArrayList[Schema.Field]
var isSchemaChanged = false

import scala.collection.JavaConversions._

for (field <- schema.getFields) {
var fieldSchema = field.schema
if (fieldSchema.getType.getName == "fixed" && fieldSchema.getLogicalType != null
&& fieldSchema.getLogicalType.getName == "decimal" && fieldSchema.getNamespace != "") {
isSchemaChanged = true
val name = fieldSchema.getName
val avroType = fieldSchema.getLogicalType.asInstanceOf[LogicalTypes.Decimal]
fieldSchema = avroType.addToSchema(SchemaBuilder.fixed(name).size(fieldSchema.getFixedSize))
}
fields.add(new Schema.Field(field.name, fieldSchema, field.doc, field.defaultVal))
}

if (isSchemaChanged) {
Schema.createRecord(schema.getName, "", schema.getNamespace, false, fields)
}
else {
schema
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,12 @@ object DataSourceWriteOptions {
val PRECOMBINE_FIELD_OPT_KEY = HoodieWriteConfig.PRECOMBINE_FIELD_PROP
val DEFAULT_PRECOMBINE_FIELD_OPT_VAL = "ts"

/**
* Test dropping namespace to support schema evolution
*/
val DROP_NAMESPACE_OPT_KEY = "hoodie.drop.namespace"
val DEFAULT_DROP_NAMESPACE_OPT_VAL = "false"

/**
* Payload class used. Override this, if you like to roll your own merge logic, when upserting/inserting.
* This will render any value set for `PRECOMBINE_FIELD_OPT_VAL` in-effective
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public JavaRDD<HoodieRecord> generateInputRecords(String tableName, String sourc
KeyGenerator keyGenerator = DataSourceUtils.createKeyGenerator(props);
String structName = tableName + "_record";
String namespace = "hoodie." + tableName;
RDD<GenericRecord> genericRecords = HoodieSparkUtils.createRdd(inputDataset, structName, namespace);
RDD<GenericRecord> genericRecords = HoodieSparkUtils.createRdd(inputDataset, structName, namespace, props.getBoolean("hoodie.drop.namespace"));
return genericRecords.toJavaRDD().map(gr -> {
String orderingVal = HoodieAvroUtils.getNestedFieldValAsString(
gr, props.getString("hoodie.datasource.write.precombine.field"), false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ private[hudi] object HoodieSparkSqlWriter {

// Convert to RDD[HoodieRecord]
val keyGenerator = DataSourceUtils.createKeyGenerator(toProperties(parameters))
val genericRecords: RDD[GenericRecord] = HoodieSparkUtils.createRdd(df, schema, structName, nameSpace)
val genericRecords: RDD[GenericRecord] = HoodieSparkUtils.createRdd(df, schema, structName, nameSpace, parameters(DROP_NAMESPACE_OPT_KEY).toBoolean)
val shouldCombine = parameters(INSERT_DROP_DUPS_OPT_KEY).toBoolean || operation.equals(WriteOperationType.UPSERT);
val hoodieAllIncomingRecords = genericRecords.map(gr => {
val hoodieRecord = if (shouldCombine) {
Expand Down Expand Up @@ -189,7 +189,7 @@ private[hudi] object HoodieSparkSqlWriter {

// Convert to RDD[HoodieKey]
val keyGenerator = DataSourceUtils.createKeyGenerator(toProperties(parameters))
val genericRecords: RDD[GenericRecord] = HoodieSparkUtils.createRdd(df, structName, nameSpace)
val genericRecords: RDD[GenericRecord] = HoodieSparkUtils.createRdd(df, structName, nameSpace, parameters(DROP_NAMESPACE_OPT_KEY).toBoolean)
val hoodieKeysToDelete = genericRecords.map(gr => keyGenerator.getKey(gr)).toJavaRDD()

if (!tableExists) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,12 @@ object HoodieSparkUtils {
new InMemoryFileIndex(sparkSession, globbedPaths, Map(), Option.empty, fileStatusCache)
}

def createRdd(df: DataFrame, structName: String, recordNamespace: String): RDD[GenericRecord] = {
def createRdd(df: DataFrame, structName: String, recordNamespace: String, useV2Converter: Boolean = false): RDD[GenericRecord] = {
val avroSchema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, recordNamespace)
createRdd(df, avroSchema, structName, recordNamespace)
createRdd(df, avroSchema, structName, recordNamespace, useV2Converter)
}

def createRdd(df: DataFrame, avroSchema: Schema, structName: String, recordNamespace: String)
def createRdd(df: DataFrame, avroSchema: Schema, structName: String, recordNamespace: String, useV2Converter: Boolean)
: RDD[GenericRecord] = {
// Use the Avro schema to derive the StructType which has the correct nullability information
val dataType = SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType]
Expand All @@ -104,7 +104,11 @@ object HoodieSparkUtils {
.mapPartitions { records =>
if (records.isEmpty) Iterator.empty
else {
val convertor = AvroConversionHelper.createConverterToAvro(dataType, structName, recordNamespace)
var convertor = AvroConversionHelper.createConverterToAvro(dataType, structName, recordNamespace)
if (useV2Converter) {
println("DEBUG:Removing namespace")
convertor = AvroConversionHelper.createConverterToAvroV2(dataType, structName, recordNamespace)
}
records.map { x => convertor(x).asInstanceOf[GenericRecord] }
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ class TestCOWDataSource extends HoodieClientTestBase {
DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "_row_key",
DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "partition",
DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "timestamp",
HoodieWriteConfig.TABLE_NAME -> "hoodie_test"
HoodieWriteConfig.TABLE_NAME -> "hoodie_test",
DataSourceWriteOptions.DROP_NAMESPACE_OPT_KEY -> "false"
)

val verificationCol: String = "driver"
Expand Down Expand Up @@ -86,6 +87,37 @@ class TestCOWDataSource extends HoodieClientTestBase {
assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000"))
}

@Test
def testCopyOnWriteInsertUpdateWithoutNamespace(): Unit = {
// Insert Operation
val records1 = recordsToStrings(dataGen.generateInserts("000", 100)).toList
val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
inputDF1.write.format("org.apache.hudi")
.options(commonOpts)
.mode(SaveMode.Overwrite)
.save(basePath)

assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000"))

val snapshotDF1 = spark.read.format("org.apache.hudi")
.load(basePath + "/*/*/*/*")
assertEquals(100, snapshotDF1.count())

// Upsert Operation without Hudi metadata columns
val records2 = recordsToStrings(dataGen.generateUpdates("001", 100)).toList
val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2 , 2))

val commonOptsV2 = commonOpts + ("hoodie.drop.namespace" -> "true")
inputDF2.write.format("org.apache.hudi")
.options(commonOptsV2)
.mode(SaveMode.Append)
.save(basePath)

val snapshotDF2 = spark.read.format("org.apache.hudi")
.load(basePath + "/*/*/*/*")
assertEquals(inputDF2.count(), snapshotDF2.count())
}

@ParameterizedTest
//TODO(metadata): Needs HUDI-1459 to be fixed
//@ValueSource(booleans = Array(true, false))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ class TestMORDataSource extends HoodieClientTestBase {
DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "_row_key",
DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "partition",
DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "timestamp",
HoodieWriteConfig.TABLE_NAME -> "hoodie_test"
HoodieWriteConfig.TABLE_NAME -> "hoodie_test",
DataSourceWriteOptions.DROP_NAMESPACE_OPT_KEY -> "false"
)

val verificationCol: String = "driver"
Expand Down Expand Up @@ -122,6 +123,75 @@ class TestMORDataSource extends HoodieClientTestBase {
assertEquals(updatedVerificationVal, hudiSnapshotDF3.filter(col("_row_key") === verificationRowKey).select(verificationCol).first.getString(0))
}

@Test def testMergeOnReadStorageWithoutNamespace() {

val fs = FSUtils.getFs(basePath, spark.sparkContext.hadoopConfiguration)
// Bulk Insert Operation
val records1 = recordsToStrings(dataGen.generateInserts("001", 100)).toList
val inputDF1: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records1, 2))
inputDF1.write.format("org.apache.hudi")
.options(commonOpts)
.option("hoodie.compact.inline", "false") // else fails due to compaction & deltacommit instant times being same
.option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
.option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
.mode(SaveMode.Overwrite)
.save(basePath)

assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000"))

// Read RO View
val hudiRODF1 = spark.read.format("org.apache.hudi")
.option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL)
.load(basePath + "/*/*/*/*")
assertEquals(100, hudiRODF1.count()) // still 100, since we only updated
val insertCommitTime = HoodieDataSourceHelpers.latestCommit(fs, basePath)
val insertCommitTimes = hudiRODF1.select("_hoodie_commit_time").distinct().collectAsList().map(r => r.getString(0)).toList
assertEquals(List(insertCommitTime), insertCommitTimes)

// Upsert operation without Hudi metadata columns
val records2 = recordsToStrings(dataGen.generateUniqueUpdates("002", 100)).toList
val inputDF2: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records2, 2))
inputDF2.write.format("org.apache.hudi")
.options(commonOpts)
.mode(SaveMode.Append)
.save(basePath)

// Read Snapshot query
val updateCommitTime = HoodieDataSourceHelpers.latestCommit(fs, basePath)
val hudiSnapshotDF2 = spark.read.format("org.apache.hudi")
.option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
.load(basePath + "/*/*/*/*")
val updateCommitTimes = hudiSnapshotDF2.select("_hoodie_commit_time").distinct().collectAsList().map(r => r.getString(0)).toList
assertEquals(List(updateCommitTime), updateCommitTimes)

// Upsert based on the written table with Hudi metadata columns
val verificationRowKey = hudiSnapshotDF2.limit(1).select("_row_key").first.getString(0)
val inputDF3 = hudiSnapshotDF2.filter(col("_row_key") === verificationRowKey).withColumn(verificationCol, lit(updatedVerificationVal))

inputDF3.write.format("org.apache.hudi")
.options(commonOpts)
.mode(SaveMode.Append)
.save(basePath)

val hudiSnapshotDF3 = spark.read.format("hudi").load(basePath + "/*/*/*/*")
assertEquals(100, hudiSnapshotDF3.count())
assertEquals(updatedVerificationVal, hudiSnapshotDF3.filter(col("_row_key") === verificationRowKey).select(verificationCol).first.getString(0))

val commonOptsV2 = commonOpts + ("hoodie.drop.namespace" -> "true")
// Upsert operation without Hudi metadata columns
val records4 = recordsToStrings(dataGen.generateUniqueUpdates("002", 100)).toList
val inputDF4: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records2, 2))
inputDF4.write.format("org.apache.hudi")
.options(commonOptsV2)
.mode(SaveMode.Append)
.save(basePath)

val hudiSnapshotDF4 = spark.read.format("hudi").load(basePath + "/*/*/*/*")
assertEquals(100, hudiSnapshotDF4.count())

}


@Test def testCount() {
// First Operation:
// Producing parquet files to three default partitions.
Expand Down