Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.avro
import org.apache.avro.LogicalTypes.{Date, Decimal, TimestampMicros, TimestampMillis}
import org.apache.avro.Schema.Type._
import org.apache.avro.{LogicalTypes, Schema, SchemaBuilder}
import org.apache.hudi.avro.AvroSchemaUtils.isNullable
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.sql.types.Decimal.minBytesForPrecision
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -202,7 +203,12 @@ private[sql] object SchemaConverters {
st.foreach { f =>
val fieldAvroType =
toAvroType(f.dataType, f.nullable, f.name, childNameSpace)
fieldsAssembler.name(f.name).`type`(fieldAvroType).noDefault()
val fieldBuilder = fieldsAssembler.name(f.name).`type`(fieldAvroType)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is actually borrowed from Spark, and we try to avoid any changes to such code to make sure we're not diverging from Spark

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image
When extractPartitionValuesFromPartitionPath is turned on, the StructType schema and AvroSchema differs. convertToAvroSchema is missing the default value when the field is nullable, making the table not queryable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think i understand why you believe this is an appropriate fix for the issue you're observing:

  • Spark's schemas don't have defaults at all
  • In case Avro schema's field is nullable doesn't entail that it should have null as default value

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From what i understand so far the issue is not in the conversion, but in the fact that we're not handling schema evolution properly in HoodieAvroDataBlock -- whenever we decode a record from an existing data block we should make sure that any nullable field has actually null as default value so that Avro reader is able to decode the data in case this particular field is not present

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @alexeykudinkin , we should not change the code of SchemaConverters.scala, this is the bug of logscanner

if (isNullable(fieldAvroType)) {
fieldBuilder.withDefault(null)
} else {
fieldBuilder.noDefault()
}
}
fieldsAssembler.endRecord()
}
Expand All @@ -212,7 +218,7 @@ private[sql] object SchemaConverters {
}

if (nullable && catalystType != NullType && schema.getType != Schema.Type.UNION) {
Schema.createUnion(schema, nullSchema)
Schema.createUnion(nullSchema, schema)
} else {
schema
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,4 +204,52 @@ class TestUpdateTable extends HoodieSparkSqlTestBase {
}
})
}

test("Test Add Column and Update Table") {
withTempDir { tmp =>
Copy link
Contributor

@xiarixiaoyao xiarixiaoyao Feb 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@qidian99
thanks for your contribution
I ran this UT directly in the master branch, expected to fail but finally succeeded
could you pls check your UT thanks

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the timely reply. I changed the UT to manually set partition pruning to true.

@stream2000 and I both tested on master branch and the test will fail
image

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@qidian99 can you please paste the whole stacktrace? Would like to understand better what exactly is failing

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see you pasted the stacktrace failing when you query your data via server. Can you please paste the stacktrace of this particular test failing?

I want to better understand which operation is failing in this test

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@qidian99 only non-partitioned tables has this problem?

val tableName = generateTableName

spark.sql("SET hoodie.datasource.read.extract.partition.values.from.path=true")

// create table
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| price double,
| ts long
|) using hudi
| location '${tmp.getCanonicalPath}/$tableName'
| tblproperties (
| type = 'mor',
| primaryKey = 'id',
| preCombineField = 'ts'
| )
""".stripMargin)

// insert data to table
spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
checkAnswer(s"select id, name, price, ts from $tableName")(
Seq(1, "a1", 10.0, 1000)
)

spark.sql(s"update $tableName set price = 22 where id = 1")
checkAnswer(s"select id, name, price, ts from $tableName")(
Seq(1, "a1", 22.0, 1000)
)

spark.sql(s"alter table $tableName add column new_col1 int")

checkAnswer(s"select id, name, price, ts, new_col1 from $tableName")(
Seq(1, "a1", 22.0, 1000, null)
)

// update and check
spark.sql(s"update $tableName set price = price * 2 where id = 1")
checkAnswer(s"select id, name, price, ts, new_col1 from $tableName")(
Seq(1, "a1", 44.0, 1000, null)
)
}
}
}