Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,11 @@ case class CatalogTable(
StructType(partitionFields)
}

/** Return true if the table is stream table */
def isStreaming: Boolean = {
provider.isDefined && storage.properties.getOrElse("isStreaming", "false").toBoolean
}

/**
* schema of this table's data columns
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,33 @@ object SQLConf {
.intConf
.createWithDefault(200)

val SQLSTREAM_WATERMARK_ENABLE = buildConf("spark.sqlstreaming.watermark.enable")
.doc("Whether use watermark in sqlstreaming.")
.booleanConf
.createWithDefault(false)

val SQLSTREAM_OUTPUTMODE = buildConf("spark.sqlstreaming.outputMode")
.doc("The output mode used in sqlstreaming")
.stringConf
.createWithDefault("append")

val SQLSTREAM_TRIGGER = buildConf("spark.sqlstreaming.trigger")
.doc("The structstreaming trigger used in sqlstreaming")
.stringConf
.createWithDefault("0s")

val SQLSTREAM_QUERY_NAME = buildConf("spark.sqlstreaming.queryName")
.doc("The structstreaming query name used in sqlstreaming. " +
"User must use spark.sql.streaming.checkpointLocation and " +
"spark.sqlstreaming.queryName to ensure the unique checkpointLocation")
.stringConf
.createOptional

val SQLSTREAM_QUERY_ENABLE = buildConf("spark.sqlstreaming.query.enable")
.doc("Whether to enable use sqlstreaming in spark")
.booleanConf
.createWithDefault(false)

// This is used to set the default data source
val DEFAULT_DATA_SOURCE_NAME = buildConf("spark.sql.sources.default")
.doc("The default data source to use in input/output.")
Expand Down Expand Up @@ -1813,6 +1840,16 @@ class SQLConf extends Serializable with Logging {

def broadcastTimeout: Long = getConf(BROADCAST_TIMEOUT)

def sqlStreamWaterMarkEnable: Boolean = getConf(SQLSTREAM_WATERMARK_ENABLE)

def sqlStreamOutputMode: String = getConf(SQLSTREAM_OUTPUTMODE)

def sqlStreamTrigger: String = getConf(SQLSTREAM_TRIGGER)

def sqlStreamQueryName: Option[String] = getConf(SQLSTREAM_QUERY_NAME)

def sqlStreamQueryEnable: Boolean = getConf(SQLSTREAM_QUERY_ENABLE)

def defaultDataSourceName: String = getConf(DEFAULT_DATA_SOURCE_NAME)

def convertCTAS: Boolean = getConf(CONVERT_CTAS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -820,6 +820,10 @@ object DDLUtils {
table.provider.isDefined && table.provider.get.toLowerCase(Locale.ROOT) != HIVE_PROVIDER
}

def isStreamingTable(table: CatalogTable): Boolean = {
table.isStreaming && table.provider.get.toLowerCase(Locale.ROOT) != HIVE_PROVIDER
}

/**
* Throws a standard error for actions that require partitionProvider = hive.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,17 @@ import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoTable, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.plans.logical.{EventTimeWatermark, InsertIntoDir}
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.{RowDataSourceScanExec, SparkPlan}
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.streaming.{SQLStreamingSink, StreamingRelation}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}

/**
* Replaces generic operations with specific variants that are designed to work with Spark
Expand Down Expand Up @@ -221,6 +223,10 @@ case class DataSourceAnalysis(conf: SQLConf) extends Rule[LogicalPlan] with Cast
* data source.
*/
class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] {
private val sqlConf = sparkSession.sqlContext.conf
private val WATERMARK_COLUMN = "watermark.column"
private val WATERMARK_DEALY = "watermark.delay"

private def readDataSourceTable(table: CatalogTable): LogicalPlan = {
val qualifiedTableName = QualifiedTableName(table.database, table.identifier.table)
val catalog = sparkSession.sessionState.catalog
Expand All @@ -239,11 +245,42 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan]
options = table.storage.properties ++ pathOption,
catalogTable = Some(table))

LogicalRelation(dataSource.resolveRelation(checkFilesExist = false), table)
if (table.isStreaming && sqlConf.sqlStreamQueryEnable) {
val relation =
StreamingRelation(dataSource, table.provider.get, table.schema.toAttributes)
withWatermark(relation, table)
} else {
LogicalRelation(dataSource.resolveRelation(checkFilesExist = false), table)
}
}
})
}

/**
* Check watermark enable. If true, add watermark to relation.
* @param relation the basic streaming relation
* @param metadata table meta
* @return
*/
private def withWatermark(relation: LogicalPlan, metadata: CatalogTable): LogicalPlan = {
if (sqlConf.sqlStreamWaterMarkEnable) {
logInfo("Using watermark in sqlstreaming")
val options = metadata.storage.properties
val column = options.getOrElse(WATERMARK_COLUMN,
throw new IllegalArgumentException(s"$WATERMARK_COLUMN is empty"))
val delay = options.getOrElse(WATERMARK_DEALY,
throw new IllegalArgumentException(s"$WATERMARK_DEALY is empty"))
EventTimeWatermark(
UnresolvedAttribute(column),
CalendarInterval.fromString(s"interval $delay"),
relation
)
} else {
logInfo("None watermark found in sqlstreaming")
relation
}
}

private def readHiveTable(table: CatalogTable): LogicalPlan = {
HiveTableRelation(
table,
Expand All @@ -253,6 +290,10 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan]
}

override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case InsertIntoTable(UnresolvedCatalogRelation(tableMeta), _, child, _, _)
if DDLUtils.isStreamingTable(tableMeta) && sqlConf.sqlStreamQueryEnable =>
SQLStreamingSink(sparkSession, tableMeta, child)

case i @ InsertIntoTable(UnresolvedCatalogRelation(tableMeta), _, _, _, _)
if DDLUtils.isDatasourceTable(tableMeta) =>
i.copy(table = readDataSourceTable(tableMeta))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.streaming

import java.util.concurrent.TimeUnit

import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes
import org.apache.spark.sql.execution.command.RunnableCommand
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils
import org.apache.spark.sql.sources.v2.StreamingWriteSupportProvider
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.util.Utils

/**
* The basic RunnableCommand for SQLStreaming, using Command.run to start a streaming query.
*
* @param sparkSession
* @param extraOptions
* @param partitionColumnNames
* @param child
*/
case class SQLStreamingSink(sparkSession: SparkSession,
table: CatalogTable,
child: LogicalPlan)
extends RunnableCommand {

private val sqlConf = sparkSession.sqlContext.conf

/**
* The given column name may not be equal to any of the existing column names if we were in
* case-insensitive context. Normalize the given column name to the real one so that we don't
* need to care about case sensitivity afterwards.
*/
private def normalize(df: DataFrame, columnName: String, columnType: String): String = {
val validColumnNames = df.logicalPlan.output.map(_.name)
validColumnNames.find(sparkSession.sessionState.analyzer.resolver(_, columnName))
.getOrElse(throw new AnalysisException(s"$columnType column $columnName not found in " +
s"existing columns (${validColumnNames.mkString(", ")})"))
}

/**
* Parse spark.sqlstreaming.trigger.seconds to Trigger
*/
private def parseTrigger(): Trigger = {
val trigger = Utils.timeStringAsMs(sqlConf.sqlStreamTrigger)
Trigger.ProcessingTime(trigger, TimeUnit.MICROSECONDS)
}

/**
* Running by queryExecution.executeCollect()
* @param sparkSession
* @return return empty rdds, save as DDLCommands
*/
override def run(sparkSession: SparkSession): Seq[Row] = {

///////////////////////////////////////////////////////////////////////////////////////
// Builder pattern config options
///////////////////////////////////////////////////////////////////////////////////////
val df = Dataset.ofRows(sparkSession, child)
val outputMode = InternalOutputModes(sqlConf.sqlStreamOutputMode)
val normalizedParCols = table.partitionColumnNames.map {
normalize(df, _, "Partition")
}

val ds = DataSource.lookupDataSource(table.provider.get, sparkSession.sessionState.conf)
val disabledSources = sparkSession.sqlContext.conf.disabledV2StreamingWriters.split(",")
var options = table.storage.properties
val sink = ds.newInstance() match {
case w: StreamingWriteSupportProvider
if !disabledSources.contains(w.getClass.getCanonicalName) =>
val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
w, df.sparkSession.sessionState.conf)
options = sessionOptions ++ options
w
case _ =>
val ds = DataSource(
df.sparkSession,
className = table.provider.get,
options = options,
partitionColumns = normalizedParCols)
ds.createSink(outputMode)
}

sparkSession.sessionState.streamingQueryManager.startQuery(
sqlConf.sqlStreamQueryName,
None,
df,
table.storage.properties,
sink,
outputMode,
trigger = parseTrigger()
).awaitTermination()

Seq.empty[Row]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,10 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
// it to Hive. If it fails, treat it as not hive compatible and go back to 2.1
val tableProperties = tableMetaToTableProps(table)

if (table.isStreaming) {
tableProperties.put(DATASOURCE_STREAM_TABLE, "true")
}

// put table provider and partition provider in table properties.
tableProperties.put(DATASOURCE_PROVIDER, provider)
if (table.tracksPartitionsInCatalog) {
Expand Down Expand Up @@ -1313,6 +1317,7 @@ object HiveExternalCatalog {
val CREATED_SPARK_VERSION = SPARK_SQL_PREFIX + "create.version"

val HIVE_GENERATED_STORAGE_PROPERTIES = Set(SERIALIZATION_FORMAT)
val DATASOURCE_STREAM_TABLE = DATASOURCE_PREFIX + "isStreaming"

// When storing data source tables in hive metastore, we need to set data schema to empty if the
// schema is hive-incompatible. However we need a hack to preserve existing behavior. Before
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hive

import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.test.SQLTestUtils

class StreamTableDDLCommandSuite extends SQLTestUtils with TestHiveSingleton {
private val catalog = spark.sessionState.catalog

test("CTAS: create data source stream table") {
withTempPath { dir =>
withTable("t") {
sql(
s"""CREATE TABLE t USING PARQUET
|OPTIONS (
|PATH = '${dir.toURI}',
|location = '${dir.toURI}',
|isStreaming = 'true')
|AS SELECT 1 AS a, 2 AS b, 3 AS c
""".stripMargin)
val streamTable = catalog.getTableMetadata(TableIdentifier("t"))
assert(streamTable.isStreaming)
}
}
}
}