Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ package org.apache.hudi
import java.nio.ByteBuffer
import java.sql.{Date, Timestamp}
import java.util

import org.apache.avro.Conversions.DecimalConversion
import org.apache.avro.LogicalTypes.{TimestampMicros, TimestampMillis}
import org.apache.avro.Schema.Type._
Expand Down Expand Up @@ -340,7 +339,7 @@ object AvroConversionHelper {
}
}
case structType: StructType =>
val schema: Schema = SchemaConverters.toAvroType(structType, nullable = false, structName, recordNamespace)
val schema: Schema = AvroConversionUtils.convertStructTypeToAvroSchema(structType, structName, recordNamespace)
val childNameSpace = if (recordNamespace != "") s"$recordNamespace.$structName" else structName
val fieldConverters = structType.fields.map(field =>
createConverterToAvro(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

package org.apache.hudi

import org.apache.avro.Schema
import org.apache.avro.{JsonProperties, Schema}
import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder, IndexedRecord}
import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.spark.rdd.RDD
Expand All @@ -27,6 +27,7 @@ import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Dataset, Row, SparkSession}

import scala.collection.JavaConverters._
import scala.collection.JavaConversions._

object AvroConversionUtils {

Expand All @@ -49,13 +50,45 @@ object AvroConversionUtils {
def convertStructTypeToAvroSchema(structType: StructType,
structName: String,
recordNamespace: String): Schema = {
SchemaConverters.toAvroType(structType, nullable = false, structName, recordNamespace)
getAvroSchemaWithDefaults(SchemaConverters.toAvroType(structType, nullable = false, structName, recordNamespace))
}

def convertAvroSchemaToStructType(avroSchema: Schema): StructType = {
SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType]
}

/**
* Regenerate Avro schema with proper nullable default values. Avro expects null to be first entry in case of UNION so that
* default value can be set to null.
* @param writeSchema original writer schema.
* @return the regenerated schema with proper defaults set.
*/
def getAvroSchemaWithDefaults(writeSchema: Schema): Schema = {
val modifiedFields = writeSchema.getFields.map(field => {
field.schema().getType match {
case Schema.Type.RECORD => {
val newSchema = getAvroSchemaWithDefaults(field.schema())
new Schema.Field(field.name(), newSchema, field.doc(), JsonProperties.NULL_VALUE)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we setting null default here?
if a field is nullabe, it would be of type Union. IMO Record field must nnot have default.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure. feel free to fix it. the one you had in your draft commit was using deprecated constructor and thought of fixing it. we could probably do field.defaultValue()

}
case Schema.Type.UNION => {
val innerFields = field.schema().getTypes
val containsNullSchema = innerFields.foldLeft(false)((nullFieldEncountered, schema) => nullFieldEncountered | schema.getType == Schema.Type.NULL)
if(containsNullSchema) {
val newSchema = Schema.createUnion(List(Schema.create(Schema.Type.NULL)) ++ innerFields.filter(innerSchema => !(innerSchema.getType == Schema.Type.NULL)))
val newSchemaField = new Schema.Field(field.name(), newSchema, field.doc(), JsonProperties.NULL_VALUE)
newSchemaField
} else {
new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultVal())
}
}
case _ => new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultVal())
}
}).toList
val newSchema = Schema.createRecord(writeSchema.getName, writeSchema.getDoc, writeSchema.getNamespace, writeSchema.isError)
newSchema.setFields(modifiedFields)
newSchema
}

def buildAvroRecordBySchema(record: IndexedRecord,
requiredSchema: Schema,
requiredPos: List[Int],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ public static Schema getStructTypeExampleSchema() throws IOException {
return new Schema.Parser().parse(FileIOUtils.readAsUTFString(DataSourceTestUtils.class.getResourceAsStream("/exampleSchema.txt")));
}

public static Schema getStructTypeExampleEvolvedSchema() throws IOException {
return new Schema.Parser().parse(FileIOUtils.readAsUTFString(DataSourceTestUtils.class.getResourceAsStream("/exampleEvolvedSchema.txt")));
}

public static List<Row> generateRandomRows(int count) {
Random random = new Random();
List<Row> toReturn = new ArrayList<>();
Expand All @@ -58,4 +62,31 @@ public static List<Row> generateRandomRows(int count) {
}
return toReturn;
}

public static List<Row> generateUpdates(List<Row> records, int count) {
List<Row> toReturn = new ArrayList<>();
for (int i = 0; i < count; i++) {
Object[] values = new Object[3];
values[0] = records.get(i).getString(0);
values[1] = records.get(i).getAs(1);
values[2] = new Date().getTime();
toReturn.add(RowFactory.create(values));
}
return toReturn;
}

public static List<Row> generateRandomRowsEvolvedSchema(int count) {
Random random = new Random();
List<Row> toReturn = new ArrayList<>();
List<String> partitions = Arrays.asList(new String[] {DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH, DEFAULT_THIRD_PARTITION_PATH});
for (int i = 0; i < count; i++) {
Object[] values = new Object[4];
values[0] = UUID.randomUUID().toString();
values[1] = partitions.get(random.nextInt(3));
values[2] = new Date().getTime();
values[3] = UUID.randomUUID().toString();
toReturn.add(RowFactory.create(values));
}
return toReturn;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
{
"namespace": "example.schema",
"type": "record",
"name": "trip",
"fields": [
{
"name": "_row_key",
"type": "string"
},
{
"name": "partition",
"type": "string"
},
{
"name": "ts",
"type": ["long", "null"]
},
{
"name": "new_field",
"type": ["string","null"]
}
]
}

Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,10 @@

package org.apache.hudi.functional

import java.time.Instant
import java.util
import java.util.{Collections, Date, UUID}

import org.apache.commons.io.FileUtils
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.client.{SparkRDDWriteClient, TestBootstrap}
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.model.{HoodieRecord, HoodieRecordPayload}
import org.apache.hudi.common.testutils.HoodieTestDataGenerator
import org.apache.hudi.config.{HoodieBootstrapConfig, HoodieWriteConfig}
Expand All @@ -34,10 +31,14 @@ import org.apache.hudi.{AvroConversionUtils, DataSourceUtils, DataSourceWriteOpt
import org.apache.spark.SparkContext
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.{Row, SQLContext, SaveMode, SparkSession}
import org.junit.jupiter.api.Assertions.assertEquals
import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito.{spy, times, verify}
import org.scalatest.{FunSuite, Matchers}

import java.time.Instant
import java.util
import java.util.{Collections, Date, UUID}
import scala.collection.JavaConversions._

class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
Expand Down Expand Up @@ -113,6 +114,91 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
}
}

List(DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
.foreach(tableType => {
test("test schema evolution for " + tableType) {
initSparkContext("test_schema_evolution")
val path = java.nio.file.Files.createTempDirectory("hoodie_test_path")
try {
val hoodieFooTableName = "hoodie_foo_tbl"
//create a new table
val fooTableModifier = Map("path" -> path.toAbsolutePath.toString,
HoodieWriteConfig.TABLE_NAME -> hoodieFooTableName,
"hoodie.insert.shuffle.parallelism" -> "1",
"hoodie.upsert.shuffle.parallelism" -> "1",
DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> tableType,
DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "_row_key",
DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "partition",
DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY -> "org.apache.hudi.keygen.SimpleKeyGenerator")
val fooTableParams = HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier)

// generate the inserts
var schema = DataSourceTestUtils.getStructTypeExampleSchema
var structType = AvroConversionUtils.convertAvroSchemaToStructType(schema)
var records = DataSourceTestUtils.generateRandomRows(10)
var recordsSeq = convertRowListToSeq(records)
var df1 = spark.createDataFrame(sc.parallelize(recordsSeq), structType)
// write to Hudi
HoodieSparkSqlWriter.write(sqlContext, SaveMode.Overwrite, fooTableParams, df1)

val snapshotDF1 = spark.read.format("org.apache.hudi")
.load(path.toAbsolutePath.toString + "/*/*/*/*")
assertEquals(10, snapshotDF1.count())

// remove metadata columns so that expected and actual DFs can be compared as is
val trimmedDf1 = snapshotDF1.drop(HoodieRecord.HOODIE_META_COLUMNS.get(0)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(1))
.drop(HoodieRecord.HOODIE_META_COLUMNS.get(2)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(3))
.drop(HoodieRecord.HOODIE_META_COLUMNS.get(4))

assert(df1.except(trimmedDf1).count() == 0)

// issue updates so that log files are created for MOR table
var updates = DataSourceTestUtils.generateUpdates(records, 5);
var updatesSeq = convertRowListToSeq(updates)
var updatesDf = spark.createDataFrame(sc.parallelize(updatesSeq), structType)
// write to Hudi
HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, updatesDf)

val snapshotDF2 = spark.read.format("org.apache.hudi")
.load(path.toAbsolutePath.toString + "/*/*/*/*")
assertEquals(10, snapshotDF2.count())

// remove metadata columns so that expected and actual DFs can be compared as is
val trimmedDf2 = snapshotDF1.drop(HoodieRecord.HOODIE_META_COLUMNS.get(0)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(1))
.drop(HoodieRecord.HOODIE_META_COLUMNS.get(2)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(3))
.drop(HoodieRecord.HOODIE_META_COLUMNS.get(4))

// ensure 2nd batch of updates matches.
assert(updatesDf.intersect(trimmedDf2).except(updatesDf).count() == 0)

schema = DataSourceTestUtils.getStructTypeExampleEvolvedSchema
structType = AvroConversionUtils.convertAvroSchemaToStructType(schema)
records = DataSourceTestUtils.generateRandomRowsEvolvedSchema(5)
recordsSeq = convertRowListToSeq(records)
val df3 = spark.createDataFrame(sc.parallelize(recordsSeq), structType)
// write to Hudi
HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, df3)

val snapshotDF3 = spark.read.format("org.apache.hudi")
.load(path.toAbsolutePath.toString + "/*/*/*/*")
assertEquals(15, snapshotDF3.count())

// remove metadata columns so that expected and actual DFs can be compared as is
val trimmedDf3 = snapshotDF3.drop(HoodieRecord.HOODIE_META_COLUMNS.get(0)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(1))
.drop(HoodieRecord.HOODIE_META_COLUMNS.get(2)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(3))
.drop(HoodieRecord.HOODIE_META_COLUMNS.get(4))

// ensure 2nd batch of updates matches.
assert(df3.intersect(trimmedDf3).except(df3).count() == 0)

} finally {
spark.stop()
FileUtils.deleteDirectory(path.toFile)
}
}
})


test("test bulk insert dataset with datasource impl") {
initSparkContext("test_bulk_insert_datasource")
val path = java.nio.file.Files.createTempDirectory("hoodie_test_path")
Expand Down