Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,16 @@ import java.util.concurrent.TimeoutException

import scala.collection.JavaConverters._

import org.apache.hadoop.fs.Path

import org.apache.spark.annotation.Evolving
import org.apache.spark.api.java.function.VoidFunction2
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.plans.logical.CreateTableStatement
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.connector.catalog.{SupportsWrite, Table, TableProvider}
import org.apache.spark.sql.connector.catalog.{SupportsWrite, Table, TableProvider, V1Table, V2TableWithV1Fallback}
import org.apache.spark.sql.connector.catalog.TableCapability._
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.DataSource
Expand Down Expand Up @@ -298,52 +302,85 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {

/**
* Starts the execution of the streaming query, which will continually output results to the given
* table as new data arrives. The returned [[StreamingQuery]] object can be used to interact with
* the stream.
* table as new data arrives. A new table will be created if the table not exists. The returned
* [[StreamingQuery]] object can be used to interact with the stream.
*
* @since 3.1.0
*/
@throws[TimeoutException]
def toTable(tableName: String): StreamingQuery = {
this.source = SOURCE_NAME_TABLE
this.tableName = tableName
startInternal(None)
}

private def startInternal(path: Option[String]): StreamingQuery = {
if (source.toLowerCase(Locale.ROOT) == DDLUtils.HIVE_PROVIDER) {
throw new AnalysisException("Hive data source can only be used with tables, you can not " +
"write files of Hive data source directly.")
}
import df.sparkSession.sessionState.analyzer.CatalogAndIdentifier

if (source == SOURCE_NAME_TABLE) {
assertNotPartitioned(SOURCE_NAME_TABLE)
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
val originalMultipartIdentifier = df.sparkSession.sessionState.sqlParser
.parseMultipartIdentifier(tableName)
val CatalogAndIdentifier(catalog, identifier) = originalMultipartIdentifier

import df.sparkSession.sessionState.analyzer.CatalogAndIdentifier
// Currently we don't create a logical streaming writer node in logical plan, so cannot rely
// on analyzer to resolve it. Directly lookup only for temp view to provide clearer message.
// TODO (SPARK-27484): we should add the writing node before the plan is analyzed.
if (df.sparkSession.sessionState.catalog.isTempView(originalMultipartIdentifier)) {
throw new AnalysisException(s"Temporary view $tableName doesn't support streaming write")
}

if (!catalog.asTableCatalog.tableExists(identifier)) {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
val originalMultipartIdentifier = df.sparkSession.sessionState.sqlParser
.parseMultipartIdentifier(tableName)
val CatalogAndIdentifier(catalog, identifier) = originalMultipartIdentifier

// Currently we don't create a logical streaming writer node in logical plan, so cannot rely
// on analyzer to resolve it. Directly lookup only for temp view to provide clearer message.
// TODO (SPARK-27484): we should add the writing node before the plan is analyzed.
if (df.sparkSession.sessionState.catalog.isTempView(originalMultipartIdentifier)) {
throw new AnalysisException(s"Temporary view $tableName doesn't support streaming write")
}
/**
* Note, currently the new table creation by this API doesn't fully cover the V2 table.
* TODO (SPARK-33638): Full support of v2 table creation
*/
val cmd = CreateTableStatement(
Copy link
Contributor

@HeartSaVioR HeartSaVioR Dec 2, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think leveraging the old (probably DSv1) options is not sufficient - this doesn't have full coverage on DSv2 table - no Transform on partitioning, no properties, no options.

Using source (via format(...)) as USE <provider> is also not intuitive - it is only effective when table creation is taking place, and it occurs implicitly.

Please compare the usage with creating table on DataFrameWriterV2. I still think this worths having V2 writer for streaming.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using source (via format(...)) as USE <provider> is also not intuitive - it is only effective when table creation is taking place, and it occurs implicitly.
Yes, this is indeed a reasonable concern. We should check the source and provider. Especially when they are different. Done in b6393ba and UT added.

originalMultipartIdentifier,
df.schema.asNullable,
partitioningColumns.getOrElse(Nil).asTransforms.toSeq,
None,
Map.empty[String, String],
Some(source),
Map.empty[String, String],
extraOptions.get("path"),
None,
None,
external = false,
ifNotExists = false)
Dataset.ofRows(df.sparkSession, cmd)
}

val tableInstance = catalog.asTableCatalog.loadTable(identifier)
val tableInstance = catalog.asTableCatalog.loadTable(identifier)

import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
val sink = tableInstance match {
case t: SupportsWrite if t.supports(STREAMING_WRITE) => t
case t => throw new AnalysisException(s"Table $tableName doesn't support streaming " +
s"write - $t")
def writeToV1Table(table: CatalogTable): StreamingQuery = {
if (table.tableType == CatalogTableType.VIEW) {
throw new AnalysisException(s"Streaming into views $tableName is not supported.")
}
require(table.provider.isDefined)
if (source != table.provider.get) {
throw new AnalysisException(s"The input source($source) is different from the table " +
s"$tableName's data source provider(${table.provider.get}).")
}
format(table.provider.get)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Writing to V1 table shouldn't rely on normalizedParCols, but then you'll be stuck how to provide Array[Transform] (provided by CatalogTable) to Seq[String].

Copy link
Contributor

@HeartSaVioR HeartSaVioR Dec 2, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see ResolveSessionCatalog.buildCatalogTable() leverages partitioning.asPartitionColumns - that could be used here to set partitionBy. Not beauty to deal with putting provider and partitioning manually here instead of letting analyzer does it, but as there's no logical node for Streaming write, I guess there's no option.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we shouldn't set partitionBy here since that will ignore the original setting of partitioningColumns.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So are we completely ignoring the table's partition information? I don't think this is same as DSv2 path.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We didn't ignore. IMO, the partition information should be handled at the DataSource level, no matter V1/V2. Different DS should have their own ability and strategy to handle the partition overwrite/append issue(part discussion of schema evolution). So for the API level, we need to pass both information down to the runtime.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please find the usage of normalizedParCols in DataStreamWriter. This config is only effective in DSv1.

Copy link
Member Author

@xuanyuanking xuanyuanking Dec 3, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got the point. So your point focus on how DSv2 addressing the partition column conflict. My point is want to explain we shouldn't overwrite the user's input, we need to pass both(user-input and table catalog partition info) into the data source. Data source need to know both(user-input and table catalog partition info). I already added this and further discussion to SPARK-33638's comment since it's part of full support for the V2 table.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I missed this. I'm not sure we provide the both into data source, but data source is probably able to know about the partitioning (as table partitioning is given by data source), so consider this as minor and make a follow-up if necessary. In anyway you'll want to check this to achieve my review comments on documentation.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure we provide the both into data source
My bad, didn't make it clear. We don't provide both, only pass the user-provided one(in V1). The partitioning in the catalog is able to know in a data source as you said. I need to change we need to pass both to data source need to know both(we need to pass the user-provided partitioning). Let me rephrase the last comment to make it clear.

Yes, of cause, a follow-up is needed.

.option("path", new Path(table.location).toString).start()
}

startQuery(sink, extraOptions)
} else if (source == SOURCE_NAME_MEMORY) {
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
tableInstance match {
case t: SupportsWrite if t.supports(STREAMING_WRITE) => startQuery(t, extraOptions)
case t: V2TableWithV1Fallback =>
writeToV1Table(t.v1Table)
case t: V1Table =>
writeToV1Table(t.v1Table)
case t => throw new AnalysisException(s"Table $tableName doesn't support streaming " +
s"write - $t")
}
}

private def startInternal(path: Option[String]): StreamingQuery = {
if (source.toLowerCase(Locale.ROOT) == DDLUtils.HIVE_PROVIDER) {
throw new AnalysisException("Hive data source can only be used with tables, you can not " +
"write files of Hive data source directly.")
}

if (source == SOURCE_NAME_MEMORY) {
assertNotPartitioned(SOURCE_NAME_MEMORY)
if (extraOptions.get("queryName").isEmpty) {
throw new AnalysisException("queryName must be specified for memory sink")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.scalatest.BeforeAndAfter

import org.apache.spark.sql.{AnalysisException, Row}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, TableAlreadyExistsException}
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2
import org.apache.spark.sql.connector.{FakeV2Provider, InMemoryTableCatalog, InMemoryTableSessionCatalog}
Expand All @@ -39,6 +39,7 @@ import org.apache.spark.sql.streaming.StreamTest
import org.apache.spark.sql.streaming.sources.FakeScanBuilder
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.Utils

class DataStreamTableAPISuite extends StreamTest with BeforeAndAfter {
import testImplicits._
Expand Down Expand Up @@ -175,21 +176,24 @@ class DataStreamTableAPISuite extends StreamTest with BeforeAndAfter {
test("write: write to table with custom catalog & no namespace") {
val tableIdentifier = "testcat.table_name"

spark.sql(s"CREATE TABLE $tableIdentifier (id bigint, data string) USING foo")
checkAnswer(spark.table(tableIdentifier), Seq.empty)
withTable(tableIdentifier) {
spark.sql(s"CREATE TABLE $tableIdentifier (id bigint, data string) USING foo")
checkAnswer(spark.table(tableIdentifier), Seq.empty)

runTestWithStreamAppend(tableIdentifier)
runTestWithStreamAppend(tableIdentifier)
}
}

test("write: write to table with custom catalog & namespace") {
spark.sql("CREATE NAMESPACE testcat.ns")

val tableIdentifier = "testcat.ns.table_name"

spark.sql(s"CREATE TABLE $tableIdentifier (id bigint, data string) USING foo")
checkAnswer(spark.table(tableIdentifier), Seq.empty)
withTable(tableIdentifier) {
spark.sql(s"CREATE TABLE $tableIdentifier (id bigint, data string) USING foo")
checkAnswer(spark.table(tableIdentifier), Seq.empty)

runTestWithStreamAppend(tableIdentifier)
runTestWithStreamAppend(tableIdentifier)
}
}

test("write: write to table with default session catalog") {
Expand All @@ -200,35 +204,19 @@ class DataStreamTableAPISuite extends StreamTest with BeforeAndAfter {
spark.sql("CREATE NAMESPACE ns")

val tableIdentifier = "ns.table_name"
spark.sql(s"CREATE TABLE $tableIdentifier (id bigint, data string) USING $v2Source")
checkAnswer(spark.table(tableIdentifier), Seq.empty)
withTable(tableIdentifier) {
spark.sql(s"CREATE TABLE $tableIdentifier (id bigint, data string) USING $v2Source")
checkAnswer(spark.table(tableIdentifier), Seq.empty)

runTestWithStreamAppend(tableIdentifier)
runTestWithStreamAppend(tableIdentifier)
}
}

test("write: write to non-exist table with custom catalog") {
val tableIdentifier = "testcat.nonexisttable"
spark.sql("CREATE NAMESPACE testcat.ns")

withTempDir { checkpointDir =>
val exc = intercept[NoSuchTableException] {
runStreamQueryAppendMode(tableIdentifier, checkpointDir, Seq.empty, Seq.empty)
}
assert(exc.getMessage.contains("nonexisttable"))
}
}

test("write: write to file provider based table isn't allowed yet") {
val tableIdentifier = "table_name"

spark.sql(s"CREATE TABLE $tableIdentifier (id bigint, data string) USING parquet")
checkAnswer(spark.table(tableIdentifier), Seq.empty)

withTempDir { checkpointDir =>
val exc = intercept[AnalysisException] {
runStreamQueryAppendMode(tableIdentifier, checkpointDir, Seq.empty, Seq.empty)
}
assert(exc.getMessage.contains("doesn't support streaming write"))
withTable(tableIdentifier) {
runTestWithStreamAppend(tableIdentifier)
}
}

Expand Down Expand Up @@ -262,8 +250,107 @@ class DataStreamTableAPISuite extends StreamTest with BeforeAndAfter {
val exc = intercept[AnalysisException] {
runStreamQueryAppendMode(viewIdentifier, checkpointDir, Seq.empty, Seq.empty)
}
assert(exc.getMessage.contains("doesn't support streaming write"))
assert(exc.getMessage.contains(s"Streaming into views $viewIdentifier is not supported"))
}
}

test("write: write to an external table") {
withTempDir { dir =>
val tableName = "stream_test"
withTable(tableName) {
checkForStreamTable(Some(dir), tableName)
}
}
}

test("write: write to a managed table") {
val tableName = "stream_test"
withTable(tableName) {
checkForStreamTable(None, tableName)
}
}

test("write: write to an external table with existing path") {
withTempDir { dir =>
val tableName = "stream_test"
withTable(tableName) {
// The file written by batch will not be seen after the table was written by a streaming
// query. This is because we loads files from the metadata log instead of listing them
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: load instead of loads, but as it's a nit, OK to fix in follow-up PR.

// using HDFS API.
Seq(4, 5, 6).toDF("value").write.format("parquet")
.option("path", dir.getCanonicalPath).saveAsTable(tableName)

checkForStreamTable(Some(dir), tableName)
}
}
}

test("write: write to a managed table with existing path") {
val tableName = "stream_test"
withTable(tableName) {
// The file written by batch will not be seen after the table was written by a streaming
// query. This is because we loads files from the metadata log instead of listing them
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

// using HDFS API.
Seq(4, 5, 6).toDF("value").write.format("parquet").saveAsTable(tableName)

checkForStreamTable(None, tableName)
}
}

test("write: write to an external path and create table") {
withTempDir { dir =>
val tableName = "stream_test"
withTable(tableName) {
// The file written by batch will not be seen after the table was written by a streaming
// query. This is because we loads files from the metadata log instead of listing them
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

// using HDFS API.
Seq(4, 5, 6).toDF("value").write
.mode("append").format("parquet").save(dir.getCanonicalPath)

checkForStreamTable(Some(dir), tableName)
}
}
}

test("write: write to table with different format shouldn't be allowed") {
val tableName = "stream_test"

spark.sql(s"CREATE TABLE $tableName (id bigint, data string) USING json")
checkAnswer(spark.table(tableName), Seq.empty)

withTempDir { checkpointDir =>
val exc = intercept[AnalysisException] {
runStreamQueryAppendMode(tableName, checkpointDir, Seq.empty, Seq.empty)
}
assert(exc.getMessage.contains("The input source(parquet) is different from the table " +
s"$tableName's data source provider(json)"))
}
}

private def checkForStreamTable(dir: Option[File], tableName: String): Unit = {
val memory = MemoryStream[Int]
val dsw = memory.toDS().writeStream.format("parquet")
dir.foreach { output =>
dsw.option("path", output.getCanonicalPath)
}
val sq = dsw
.option("checkpointLocation", Utils.createTempDir().getCanonicalPath)
.toTable(tableName)
memory.addData(1, 2, 3)
sq.processAllAvailable()

checkDataset(
spark.table(tableName).as[Int],
1, 2, 3)
val catalogTable = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName))
val path = if (dir.nonEmpty) {
dir.get
} else {
new File(catalogTable.location)
}
checkDataset(
spark.read.format("parquet").load(path.getCanonicalPath).as[Int],
1, 2, 3)
}

private def runTestWithStreamAppend(tableIdentifier: String) = {
Expand Down