Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,13 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable}
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
import org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider
import org.apache.spark.sql.execution.datasources.json.JsonFileFormat
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources._
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.{CalendarIntervalType, StructType}
Expand Down Expand Up @@ -292,7 +290,7 @@ case class DataSource(
case s: StreamSinkProvider =>
s.createSink(sparkSession.sqlContext, options, partitionColumns, outputMode)

case parquet: parquet.ParquetFileFormat =>
case fileFormat: FileFormat =>
val caseInsensitiveOptions = new CaseInsensitiveMap(options)
val path = caseInsensitiveOptions.getOrElse("path", {
throw new IllegalArgumentException("'path' is not specified")
Expand All @@ -301,7 +299,7 @@ case class DataSource(
throw new IllegalArgumentException(
s"Data source $className does not support $outputMode output mode")
}
new FileStreamSink(sparkSession, path, parquet, partitionColumns, options)
new FileStreamSink(sparkSession, path, fileFormat, partitionColumns, options)

case _ =>
throw new UnsupportedOperationException(
Expand Down Expand Up @@ -516,7 +514,7 @@ case class DataSource(
val plan = data.logicalPlan
plan.resolve(name :: Nil, data.sparkSession.sessionState.analyzer.resolver).getOrElse {
throw new AnalysisException(
s"Unable to resolve ${name} given [${plan.output.map(_.name).mkString(", ")}]")
s"Unable to resolve $name given [${plan.output.map(_.name).mkString(", ")}]")
}.asInstanceOf[Attribute]
}
// For partitioned relation r, r.schema's column ordering can be different from the column
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.streaming

import org.apache.spark.sql._
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.execution.DataSourceScanExec
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.streaming.{MemoryStream, MetadataLogFileIndex}
Expand Down Expand Up @@ -142,42 +142,38 @@ class FileStreamSinkSuite extends StreamTest {
}
}

test("FileStreamSink - supported formats") {
def testFormat(format: Option[String]): Unit = {
val inputData = MemoryStream[Int]
val ds = inputData.toDS()
test("FileStreamSink - parquet") {
testFormat(None) // should not throw error as default format parquet when not specified
testFormat(Some("parquet"))
}

val outputDir = Utils.createTempDir(namePrefix = "stream.output").getCanonicalPath
val checkpointDir = Utils.createTempDir(namePrefix = "stream.checkpoint").getCanonicalPath
test("FileStreamSink - text") {
testFormat(Some("text"))
}

var query: StreamingQuery = null
test("FileStreamSink - json") {
testFormat(Some("text"))
}

try {
val writer =
ds.map(i => (i, i * 1000))
.toDF("id", "value")
.writeStream
if (format.nonEmpty) {
writer.format(format.get)
}
query = writer
.option("checkpointLocation", checkpointDir)
.start(outputDir)
} finally {
if (query != null) {
query.stop()
}
}
}
def testFormat(format: Option[String]): Unit = {
val inputData = MemoryStream[Int]
val ds = inputData.toDS()

testFormat(None) // should not throw error as default format parquet when not specified
testFormat(Some("parquet"))
val e = intercept[UnsupportedOperationException] {
testFormat(Some("text"))
}
Seq("text", "not support", "stream").foreach { s =>
assert(e.getMessage.contains(s))
val outputDir = Utils.createTempDir(namePrefix = "stream.output").getCanonicalPath
val checkpointDir = Utils.createTempDir(namePrefix = "stream.checkpoint").getCanonicalPath

var query: StreamingQuery = null

try {
val writer = ds.map(i => (i, i * 1000)).toDF("id", "value").writeStream
if (format.nonEmpty) {
writer.format(format.get)
}
query = writer.option("checkpointLocation", checkpointDir).start(outputDir)
} finally {
if (query != null) {
query.stop()
}
}
}

}