Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .github/workflows/bot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,16 @@ jobs:
include:
- scala: "scala-2.11"
spark: "spark2"
- scala: "scala-2.11"
spark: "spark2,spark-shade-unbundle-avro"
- scala: "scala-2.12"
spark: "spark3,spark3.0.x"
- scala: "scala-2.12"
spark: "spark3,spark3.0.x,spark-shade-unbundle-avro"
- scala: "scala-2.12"
spark: "spark3"
- scala: "scala-2.12"
spark: "spark3,spark-shade-unbundle-avro"
steps:
- uses: actions/checkout@v2
- name: Set up JDK 8
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,19 @@ import java.util.Properties

import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord

import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.hudi.client.utils.SparkRowSerDe
import org.apache.hudi.common.config.TypedProperties
import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.keygen.constant.KeyGeneratorOptions
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
import org.apache.hudi.keygen.{BaseKeyGenerator, CustomAvroKeyGenerator, CustomKeyGenerator, KeyGenerator}

import org.apache.spark.SPARK_VERSION
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.avro.SchemaConverters
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, Literal}
import org.apache.spark.sql.execution.datasources.{FileStatusCache, InMemoryFileIndex}
Expand Down Expand Up @@ -137,13 +139,13 @@ object HoodieSparkUtils extends SparkAdapterSupport {
def createRddInternal(df: DataFrame, writeSchema: Schema, latestTableSchema: Schema, structName: String, recordNamespace: String)
: RDD[GenericRecord] = {
// Use the write avro schema to derive the StructType which has the correct nullability information
val writeDataType = SchemaConverters.toSqlType(writeSchema).dataType.asInstanceOf[StructType]
val writeDataType = AvroConversionUtils.convertAvroSchemaToStructType(writeSchema)
val encoder = RowEncoder.apply(writeDataType).resolveAndBind()
val deserializer = sparkAdapter.createSparkRowSerDe(encoder)
// if records were serialized with old schema, but an evolved schema was passed in with latestTableSchema, we need
// latestTableSchema equivalent datatype to be passed in to AvroConversionHelper.createConverterToAvro()
val reconciledDataType =
if (latestTableSchema != null) SchemaConverters.toSqlType(latestTableSchema).dataType.asInstanceOf[StructType] else writeDataType
if (latestTableSchema != null) AvroConversionUtils.convertAvroSchemaToStructType(latestTableSchema) else writeDataType
// Note: deserializer.deserializeRow(row) is not capable of handling evolved schema. i.e. if Row was serialized in
// old schema, but deserializer was created with an encoder with evolved schema, deserialization fails.
// Hence we always need to deserialize in the same schema as serialized schema.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,19 @@ package org.apache.hudi.integ.testsuite.utils

import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.hudi.HoodieSparkUtils

import org.apache.hudi.{AvroConversionUtils, HoodieSparkUtils}
import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.util.Option
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config
import org.apache.hudi.integ.testsuite.generator.GenericRecordFullPayloadGenerator
import org.apache.hudi.integ.testsuite.utils.SparkSqlUtils.getFieldNamesAndTypes
import org.apache.hudi.utilities.schema.RowBasedSchemaProvider

import org.apache.spark.api.java.JavaRDD
import org.apache.spark.sql.avro.SchemaConverters
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.SparkSession
import org.apache.spark.storage.StorageLevel

import org.slf4j.Logger

import scala.math.BigDecimal.RoundingMode.RoundingMode
Expand Down Expand Up @@ -139,7 +140,7 @@ object SparkSqlUtils {
*/
def getFieldNamesAndTypes(avroSchemaString: String): Array[(String, String)] = {
val schema = new Schema.Parser().parse(avroSchemaString)
val structType = SchemaConverters.toSqlType(schema).dataType.asInstanceOf[StructType]
val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema)
structType.fields.map(field => (field.name, field.dataType.simpleString))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hudi

import org.apache.hadoop.fs.Path

import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord}
import org.apache.hudi.DataSourceWriteOptions.{BOOTSTRAP_OPERATION_OPT_VAL, OPERATION}
Expand All @@ -26,8 +27,9 @@ import org.apache.hudi.common.model.HoodieTableType.{COPY_ON_WRITE, MERGE_ON_REA
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.hadoop.HoodieROTablePathFilter

import org.apache.log4j.LogManager
import org.apache.spark.sql.avro.SchemaConverters

import org.apache.spark.sql.execution.datasources.{DataSource, FileStatusCache, HadoopFsRelation}
import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
Expand Down Expand Up @@ -217,8 +219,7 @@ class DefaultSource extends RelationProvider
// the table schema evolution.
val tableSchemaResolver = new TableSchemaResolver(metaClient)
try {
Some(SchemaConverters.toSqlType(tableSchemaResolver.getTableAvroSchema)
.dataType.asInstanceOf[StructType])
Some(AvroConversionUtils.convertAvroSchemaToStructType(tableSchemaResolver.getTableAvroSchema))
} catch {
case _: Throwable =>
None // If there is no commit in the table, we can not get the schema
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hudi

import org.apache.hadoop.fs.{FileStatus, Path}

import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE, QUERY_TYPE_SNAPSHOT_OPT_VAL}
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.common.config.HoodieMetadataConfig
Expand All @@ -26,10 +27,10 @@ import org.apache.hudi.common.model.FileSlice
import org.apache.hudi.common.model.HoodieTableType.MERGE_ON_READ
import org.apache.hudi.common.table.view.{FileSystemViewStorageConfig, HoodieTableFileSystemView}
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}

import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{Column, SparkSession}
import org.apache.spark.sql.avro.SchemaConverters
import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, BoundReference, Expression, InterpretedPredicate}
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
import org.apache.spark.sql.catalyst.{InternalRow, expressions}
Expand All @@ -38,6 +39,7 @@ import org.apache.spark.sql.hudi.{DataSkippingUtils, HoodieSqlUtils}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType
import org.apache.spark.unsafe.types.UTF8String

import java.util.Properties

import scala.collection.JavaConverters._
Expand Down Expand Up @@ -96,8 +98,7 @@ case class HoodieFileIndex(
*/
lazy val schema: StructType = schemaSpec.getOrElse({
val schemaUtil = new TableSchemaResolver(metaClient)
SchemaConverters.toSqlType(schemaUtil.getTableAvroSchema)
.dataType.asInstanceOf[StructType]
AvroConversionUtils.convertAvroSchemaToStructType(schemaUtil.getTableAvroSchema)
})

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,17 @@ import java.util.{Date, Locale, Properties}

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hudi.SparkAdapterSupport

import org.apache.hudi.{AvroConversionUtils, SparkAdapterSupport}
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.common.config.DFSPropertiesConfiguration
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline

import org.apache.spark.SPARK_VERSION
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.avro.SchemaConverters
import org.apache.spark.sql.{Column, DataFrame, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
Expand All @@ -46,6 +46,7 @@ import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.types.{DataType, NullType, StringType, StructField, StructType}

import java.text.SimpleDateFormat

import scala.collection.immutable.Map

object HoodieSqlUtils extends SparkAdapterSupport {
Expand Down Expand Up @@ -83,8 +84,7 @@ object HoodieSqlUtils extends SparkAdapterSupport {
catch {
case _: Throwable => None
}
avroSchema.map(SchemaConverters.toSqlType(_).dataType
.asInstanceOf[StructType]).map(removeMetaFields)
avroSchema.map(AvroConversionUtils.convertAvroSchemaToStructType).map(removeMetaFields)
}

def getAllPartitionPaths(spark: SparkSession, table: CatalogTable): Seq[String] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@ package org.apache.spark.sql.hudi.command.payload

import java.util.{Base64, Properties}
import java.util.concurrent.Callable
import scala.collection.JavaConverters._

import com.google.common.cache.CacheBuilder

import org.apache.avro.Schema
import org.apache.avro.generic.{GenericData, GenericRecord, IndexedRecord}

import org.apache.hudi.AvroConversionUtils
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.hudi.avro.HoodieAvroUtils.bytesToAvro
Expand All @@ -31,12 +34,14 @@ import org.apache.hudi.common.util.{ValidationUtils, Option => HOption}
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.io.HoodieWriteHandle
import org.apache.hudi.sql.IExpressionEvaluator

import org.apache.spark.sql.avro.{AvroSerializer, HoodieAvroSerializer, SchemaConverters}
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.hudi.SerDeUtils
import org.apache.spark.sql.hudi.command.payload.ExpressionPayload.getEvaluator
import org.apache.spark.sql.types.{StructField, StructType}

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer

/**
Expand Down Expand Up @@ -309,7 +314,7 @@ object ExpressionPayload {
SchemaConverters.toAvroType(conditionType), false)
val conditionEvaluator = ExpressionCodeGen.doCodeGen(Seq(condition), conditionSerializer)

val assignSqlType = SchemaConverters.toSqlType(writeSchema).dataType.asInstanceOf[StructType]
val assignSqlType = AvroConversionUtils.convertAvroSchemaToStructType(writeSchema)
val assignSerializer = new HoodieAvroSerializer(assignSqlType, writeSchema, false)
val assignmentEvaluator = ExpressionCodeGen.doCodeGen(assignments, assignSerializer)
conditionEvaluator -> assignmentEvaluator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,18 @@ package org.apache.spark.sql.hudi.command.payload

import org.apache.avro.generic.IndexedRecord
import org.apache.avro.Schema
import org.apache.spark.sql.avro.{HooodieAvroDeserializer, SchemaConverters}

import org.apache.hudi.AvroConversionUtils

import org.apache.spark.sql.avro.HooodieAvroDeserializer
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.types._

/**
* A sql typed record which will convert the avro field to sql typed value.
*/
class SqlTypedRecord(val record: IndexedRecord) extends IndexedRecord {

private lazy val sqlType = SchemaConverters.toSqlType(getSchema).dataType.asInstanceOf[StructType]
private lazy val sqlType = AvroConversionUtils.convertAvroSchemaToStructType(getSchema)
private lazy val avroDeserializer = HooodieAvroDeserializer(record.getSchema, sqlType)
private lazy val sqlRow = avroDeserializer.deserializeData(record).asInstanceOf[InternalRow]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,17 @@ import java.nio.charset.StandardCharsets
import java.util.Date

import org.apache.hadoop.fs.Path
import org.apache.hudi.{DataSourceReadOptions, IncrementalRelation, MergeOnReadIncrementalRelation, SparkAdapterSupport}

import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, IncrementalRelation, MergeOnReadIncrementalRelation, SparkAdapterSupport}
import org.apache.hudi.common.model.HoodieTableType
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.util.{FileIOUtils, TablePathUtils}

import org.apache.spark.sql.hudi.streaming.HoodieStreamSource.VERSION
import org.apache.spark.sql.hudi.streaming.HoodieSourceOffset.INIT_OFFSET
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.avro.SchemaConverters
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, Offset, Source}
Expand Down Expand Up @@ -118,8 +119,7 @@ class HoodieStreamSource(
override def schema: StructType = {
schemaOption.getOrElse {
val schemaUtil = new TableSchemaResolver(metaClient)
SchemaConverters.toSqlType(schemaUtil.getTableAvroSchema)
.dataType.asInstanceOf[StructType]
AvroConversionUtils.convertAvroSchemaToStructType(schemaUtil.getTableAvroSchema)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,5 @@ public void testDataSourceWriterExtraCommitMetadata() throws Exception {

Assertions.assertTrue(
((UnresolvedRelation)newStatment.table()).multipartIdentifier().contains("test_reflect_util"));

if (!spark.version().startsWith("3.0")) {
Assertions.assertTrue(newStatment.userSpecifiedCols().isEmpty());
}
}
}