Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@

import java.io.Serializable;

public interface SparkRowDeserializer extends Serializable {
public interface SparkRowSerDe extends Serializable {
Row deserializeRow(InternalRow internalRow);

InternalRow serializeRow(Row row);
}
19 changes: 19 additions & 0 deletions hudi-spark-datasource/hudi-spark/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,25 @@
<artifactId>spark-sql_${scala.binary.version}</artifactId>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-catalyst_${scala.binary.version}</artifactId>
<classifier>tests</classifier>
<scope>test</scope>
</dependency>

<!-- Spark (Packages) -->
<dependency>
<groupId>org.apache.spark</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@ import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.common.model.{HoodieRecord, HoodieTableType}
import org.apache.hudi.DataSourceWriteOptions.{BOOTSTRAP_OPERATION_OPT_VAL, OPERATION_OPT_KEY}
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.hadoop.HoodieROTablePathFilter
import org.apache.log4j.LogManager
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.execution.streaming.Sink
import org.apache.spark.sql.execution.streaming.{Sink, Source}
import org.apache.spark.sql.hudi.streaming.HoodieStreamSource
import org.apache.spark.sql.sources._
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.StructType
Expand All @@ -44,6 +45,7 @@ class DefaultSource extends RelationProvider
with CreatableRelationProvider
with DataSourceRegister
with StreamSinkProvider
with StreamSourceProvider
with Serializable {

private val log = LogManager.getLogger(classOf[DefaultSource])
Expand Down Expand Up @@ -181,4 +183,35 @@ class DefaultSource extends RelationProvider
.resolveRelation()
}
}

override def sourceSchema(sqlContext: SQLContext,
schema: Option[StructType],
providerName: String,
parameters: Map[String, String]): (String, StructType) = {
val path = parameters.get("path")
if (path.isEmpty || path.get == null) {
throw new HoodieException(s"'path' must be specified.")
}
val metaClient = new HoodieTableMetaClient(
sqlContext.sparkSession.sessionState.newHadoopConf(), path.get)
val schemaResolver = new TableSchemaResolver(metaClient)
val sqlSchema =
try {
val avroSchema = schemaResolver.getTableAvroSchema
AvroConversionUtils.convertAvroSchemaToStructType(avroSchema)
} catch {
case _: Exception =>
require(schema.isDefined, "Fail to resolve source schema")
schema.get
}
(shortName(), sqlSchema)
}

override def createSource(sqlContext: SQLContext,
metadataPath: String,
schema: Option[StructType],
providerName: String,
parameters: Map[String, String]): Source = {
new HoodieStreamSource(sqlContext, metadataPath, schema, parameters)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ package org.apache.hudi
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hudi.client.utils.SparkRowDeserializer
import org.apache.hudi.client.utils.SparkRowSerDe
import org.apache.hudi.common.model.HoodieRecord
import org.apache.spark.SPARK_VERSION
import org.apache.spark.rdd.RDD
Expand Down Expand Up @@ -99,7 +99,7 @@ object HoodieSparkUtils {
// Use the Avro schema to derive the StructType which has the correct nullability information
val dataType = SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType]
val encoder = RowEncoder.apply(dataType).resolveAndBind()
val deserializer = HoodieSparkUtils.createDeserializer(encoder)
val deserializer = HoodieSparkUtils.createRowSerDe(encoder)
df.queryExecution.toRdd.map(row => deserializer.deserializeRow(row))
.mapPartitions { records =>
if (records.isEmpty) Iterator.empty
Expand All @@ -110,12 +110,12 @@ object HoodieSparkUtils {
}
}

def createDeserializer(encoder: ExpressionEncoder[Row]): SparkRowDeserializer = {
// TODO remove Spark2RowDeserializer if Spark 2.x support is dropped
def createRowSerDe(encoder: ExpressionEncoder[Row]): SparkRowSerDe = {
// TODO remove Spark2RowSerDe if Spark 2.x support is dropped
if (SPARK_VERSION.startsWith("2.")) {
new Spark2RowDeserializer(encoder)
new Spark2RowSerDe(encoder)
} else {
new Spark3RowDeserializer(encoder)
new Spark3RowSerDe(encoder)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.hudi.streaming

import com.fasterxml.jackson.annotation.JsonInclude.Include
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is very simple right, can we just hand format the json without the jackson dependency? just a thought. leave it to you

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your advice! Yes, currently it is much simple. But in the long term, if we introduce the commit_seq_no or other infos to the offset, We may need the json parser. And the jackson dependency is already in the dependency of spark. So I prefer to keep.

import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
import org.apache.hudi.common.table.timeline.HoodieTimeline
import org.apache.spark.sql.execution.streaming.{Offset, SerializedOffset}

case class HoodieSourceOffset(commitTime: String) extends Offset {

override def json(): String = {
HoodieSourceOffset.toJson(this)
}

override def equals(obj: Any): Boolean = {
obj match {
case HoodieSourceOffset(otherCommitTime) =>
otherCommitTime == commitTime
case _=> false
}
}

override def hashCode(): Int = {
commitTime.hashCode
}
}


object HoodieSourceOffset {
val mapper = new ObjectMapper with ScalaObjectMapper
mapper.setSerializationInclusion(Include.NON_ABSENT)
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
mapper.registerModule(DefaultScalaModule)

def toJson(offset: HoodieSourceOffset): String = {
mapper.writeValueAsString(offset)
}

def fromJson(json: String): HoodieSourceOffset = {
mapper.readValue[HoodieSourceOffset](json)
}

def apply(offset: Offset): HoodieSourceOffset = {
offset match {
case SerializedOffset(json) => fromJson(json)
case o: HoodieSourceOffset => o
}
}

val INIT_OFFSET = HoodieSourceOffset(HoodieTimeline.INIT_INSTANT_TS)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.hudi.streaming

import java.io.{BufferedWriter, InputStream, OutputStream, OutputStreamWriter}
import java.nio.charset.StandardCharsets
import java.util.Date

import org.apache.hadoop.fs.Path
import org.apache.hudi.{DataSourceReadOptions, HoodieSparkUtils, IncrementalRelation, MergeOnReadIncrementalRelation}
import org.apache.hudi.common.model.HoodieTableType
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.util.{FileIOUtils, TablePathUtils}
import org.apache.spark.sql.hudi.streaming.HoodieStreamSource.VERSION
import org.apache.spark.sql.hudi.streaming.HoodieSourceOffset.INIT_OFFSET
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.avro.SchemaConverters
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, Offset, Source}
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, SQLContext}

/**
* The Struct Stream Source for Hudi to consume the data by streaming job.
* @param sqlContext
* @param metadataPath
* @param schemaOption
* @param parameters
*/
class HoodieStreamSource(
sqlContext: SQLContext,
metadataPath: String,
schemaOption: Option[StructType],
parameters: Map[String, String])
extends Source with Logging with Serializable {

@transient private val hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf()
private lazy val tablePath: Path = {
val path = new Path(parameters.getOrElse("path", "Missing 'path' option"))
val fs = path.getFileSystem(hadoopConf)
TablePathUtils.getTablePath(fs, path).get()
}
private lazy val metaClient = new HoodieTableMetaClient(hadoopConf, tablePath.toString)
private lazy val tableType = metaClient.getTableType

@transient private var lastOffset: HoodieSourceOffset = _
@transient private lazy val initialOffsets = {
val metadataLog =
new HDFSMetadataLog[HoodieSourceOffset](sqlContext.sparkSession, metadataPath) {
override def serialize(metadata: HoodieSourceOffset, out: OutputStream): Unit = {
val writer = new BufferedWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8))
writer.write("v" + VERSION + "\n")
writer.write(metadata.json)
writer.flush()
}

/**
* Deserialize the init offset from the metadata file.
* The format in the metadata file is like this:
* ----------------------------------------------
* v1 -- The version info in the first line
* offsetJson -- The json string of HoodieSourceOffset in the rest of the file
* -----------------------------------------------
* @param in
* @return
*/
override def deserialize(in: InputStream): HoodieSourceOffset = {
val content = FileIOUtils.readAsUTFString(in)
// Get version from the first line
val firstLineEnd = content.indexOf("\n")
if (firstLineEnd > 0) {
val version = getVersion(content.substring(0, firstLineEnd))
if (version > VERSION) {
throw new IllegalStateException(s"UnSupportVersion: max support version is: $VERSION" +
s" current version is: $version")
}
// Get offset from the rest line in the file
HoodieSourceOffset.fromJson(content.substring(firstLineEnd + 1))
} else {
throw new IllegalStateException(s"Bad metadata format, failed to find the version line.")
}
}
}
metadataLog.get(0).getOrElse {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some comments on this code would be helpful

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree!

metadataLog.add(0, INIT_OFFSET)
INIT_OFFSET
}
}

private def getVersion(versionLine: String): Int = {
if (versionLine.startsWith("v")) {
versionLine.substring(1).toInt
} else {
throw new IllegalStateException(s"Illegal version line: $versionLine " +
s"in the streaming metadata path")
}
}

override def schema: StructType = {
schemaOption.getOrElse {
val schemaUtil = new TableSchemaResolver(metaClient)
SchemaConverters.toSqlType(schemaUtil.getTableAvroSchema)
.dataType.asInstanceOf[StructType]
}
}

/**
* Get the latest offset from the hoodie table.
* @return
*/
override def getOffset: Option[Offset] = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just a rant. Source#getOffset() is such a bad name. its actually the latest offset. :(

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, getOffset is not a meaningful method name. However it is defined in the spark interface Source. We can not rename it but can add some comments for it.

metaClient.reloadActiveTimeline()
val activeInstants = metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants
if (!activeInstants.empty()) {
val currentLatestCommitTime = activeInstants.lastInstant().get().getTimestamp
if (lastOffset == null || currentLatestCommitTime > lastOffset.commitTime) {
lastOffset = HoodieSourceOffset(currentLatestCommitTime)
}
} else { // if there are no active commits, use the init offset
lastOffset = initialOffsets
}
Some(lastOffset)
}

override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
initialOffsets

val startOffset = start.map(HoodieSourceOffset(_))
.getOrElse(initialOffsets)
val endOffset = HoodieSourceOffset(end)

if (startOffset == endOffset) {
sqlContext.internalCreateDataFrame(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice.

sqlContext.sparkContext.emptyRDD[InternalRow].setName("empty"), schema, isStreaming = true)
} else {
// Consume the data between (startCommitTime, endCommitTime]
val incParams = parameters ++ Map(
DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY -> startCommitTime(startOffset),
DataSourceReadOptions.END_INSTANTTIME_OPT_KEY -> endOffset.commitTime
)

val rdd = tableType match {
case HoodieTableType.COPY_ON_WRITE =>
val serDe = HoodieSparkUtils.createRowSerDe(RowEncoder(schema))
new IncrementalRelation(sqlContext, incParams, schema, metaClient)
.buildScan()
.map(serDe.serializeRow)
case HoodieTableType.MERGE_ON_READ =>
val requiredColumns = schema.fields.map(_.name)
new MergeOnReadIncrementalRelation(sqlContext, incParams, schema, metaClient)
.buildScan(requiredColumns, Array.empty[Filter])
.asInstanceOf[RDD[InternalRow]]
case _ => throw new IllegalArgumentException(s"UnSupport tableType: $tableType")
}
sqlContext.internalCreateDataFrame(rdd, schema, isStreaming = true)
}
}

private def startCommitTime(startOffset: HoodieSourceOffset): String = {
startOffset match {
case INIT_OFFSET => startOffset.commitTime
case HoodieSourceOffset(commitTime) =>
val time = HoodieActiveTimeline.COMMIT_FORMATTER.parse(commitTime).getTime
// As we consume the data between (start, end], start is not included,
// so we +1s to the start commit time here.
HoodieActiveTimeline.COMMIT_FORMATTER.format(new Date(time + 1000))
case _=> throw new IllegalStateException("UnKnow offset type.")
}
}

override def stop(): Unit = {

}
}

object HoodieStreamSource {
val VERSION = 1
}
Loading