Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1440,6 +1440,14 @@ object SQLConf {
.stringConf
.createWithDefault("")

val USE_V1_SOURCE_WRITER_LIST = buildConf("spark.sql.sources.write.useV1SourceList")
.internal()
.doc("A comma-separated list of data source short names or fully qualified data source" +
" register class names for which data source V2 write paths are disabled. Writes from these" +
" sources will fall back to the V1 sources.")
.stringConf
.createWithDefault("")

val DISABLED_V2_STREAMING_WRITERS = buildConf("spark.sql.streaming.disabledV2Writers")
.doc("A comma-separated list of fully qualified data source register class names for which" +
" StreamWriteSupport is disabled. Writes to these sources will fall back to the V1 Sinks.")
Expand Down Expand Up @@ -2026,6 +2034,8 @@ class SQLConf extends Serializable with Logging {

def userV1SourceReaderList: String = getConf(USE_V1_SOURCE_READER_LIST)

def userV1SourceWriterList: String = getConf(USE_V1_SOURCE_WRITER_LIST)

def disabledV2StreamingWriters: String = getConf(DISABLED_V2_STREAMING_WRITERS)

def disabledV2StreamingMicroBatchReaders: String =
Expand Down
17 changes: 14 additions & 3 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{AppendData, InsertIntoTable,
import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, LogicalRelation}
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2Utils, WriteToDataSourceV2}
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2Utils, FileDataSourceV2, WriteToDataSourceV2}
import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.sources.v2._
import org.apache.spark.sql.sources.v2.writer.SupportsSaveMode
Expand Down Expand Up @@ -243,8 +243,19 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
assertNotBucketed("save")

val session = df.sparkSession
val cls = DataSource.lookupDataSource(source, session.sessionState.conf)
if (classOf[TableProvider].isAssignableFrom(cls)) {
val useV1Sources =
session.sessionState.conf.userV1SourceWriterList.toLowerCase(Locale.ROOT).split(",")
val lookupCls = DataSource.lookupDataSource(source, session.sessionState.conf)
val cls = lookupCls.newInstance() match {
case f: FileDataSourceV2 if useV1Sources.contains(f.shortName()) ||
useV1Sources.contains(lookupCls.getCanonicalName.toLowerCase(Locale.ROOT)) =>
f.fallBackFileFormat
case _ => lookupCls
}
// SPARK-26673: In Data Source V2 project, partitioning is still under development.
Copy link
Member

@dongjoon-hyun dongjoon-hyun Jan 29, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we remove this (SPARK-26673) since this is the current PR's JIRA?

// Here we fallback to V1 if the write path if output partitioning is required.
// TODO: use V2 implementations when partitioning feature is supported.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you clearly mention what JIRA ID is for this TODO?

if (classOf[TableProvider].isAssignableFrom(cls) && partitioningColumns.isEmpty) {
val provider = cls.getConstructor().newInstance().asInstanceOf[TableProvider]
val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
provider, session.sessionState.conf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -763,7 +763,7 @@ object DataSource extends Logging {
* supplied schema is not empty.
* @param schema
*/
private def validateSchema(schema: StructType): Unit = {
def validateSchema(schema: StructType): Unit = {
def hasEmptySchema(schema: StructType): Boolean = {
schema.size == 0 || schema.find {
case StructField(_, b: StructType, _, _) => hasEmptySchema(b)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.sql.execution.datasources.v2.orc.OrcTable
* E.g, with temporary view `t` using [[FileDataSourceV2]], inserting into view `t` fails
* since there is no corresponding physical plan.
* SPARK-23817: This is a temporary hack for making current data source V2 work. It should be
* removed when write path of file data source v2 is finished.
* removed when Catalog of file data source v2 is finished.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Catalog of file data source v2 is finished? Does this mean catalog support of file data source v2?

*/
class FallbackOrcDataSourceV2(sparkSession: SparkSession) extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.sources.v2.writer.{DataWriter, WriterCommitMessage}
import org.apache.spark.sql.types.StringType
import org.apache.spark.util.SerializableConfiguration

Expand All @@ -37,7 +38,7 @@ import org.apache.spark.util.SerializableConfiguration
abstract class FileFormatDataWriter(
description: WriteJobDescription,
taskAttemptContext: TaskAttemptContext,
committer: FileCommitProtocol) {
committer: FileCommitProtocol) extends DataWriter[InternalRow] {
/**
* Max number of files a single task writes out due to file size. In most cases the number of
* files written should be very small. This is just a safe guard to protect some really bad
Expand Down Expand Up @@ -70,7 +71,7 @@ abstract class FileFormatDataWriter(
* to the driver and used to update the catalog. Other information will be sent back to the
* driver too and used to e.g. update the metrics in UI.
*/
def commit(): WriteTaskResult = {
override def commit(): WriteTaskResult = {
releaseResources()
val summary = ExecutedWriteSummary(
updatedPartitions = updatedPartitions.toSet,
Expand Down Expand Up @@ -301,6 +302,7 @@ class WriteJobDescription(

/** The result of a successful write task. */
case class WriteTaskResult(commitMsg: TaskCommitMessage, summary: ExecutedWriteSummary)
extends WriterCommitMessage

/**
* Wrapper class for the metrics of writing data out.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ object FileFormatWriter extends Logging {
* For every registered [[WriteJobStatsTracker]], call `processStats()` on it, passing it
* the corresponding [[WriteTaskStats]] from all executors.
*/
private def processStats(
private[datasources] def processStats(
statsTrackers: Seq[WriteJobStatsTracker],
statsPerTask: Seq[Seq[WriteTaskStats]])
: Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.OutputWriter
import org.apache.spark.sql.types._

private[orc] class OrcOutputWriter(
private[sql] class OrcOutputWriter(
path: String,
dataSchema: StructType,
context: TaskAttemptContext)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.datasources.v2

import org.apache.hadoop.mapreduce.Job

import org.apache.spark.internal.Logging
import org.apache.spark.internal.io.FileCommitProtocol
import org.apache.spark.sql.execution.datasources.{WriteJobDescription, WriteTaskResult}
import org.apache.spark.sql.execution.datasources.FileFormatWriter.processStats
import org.apache.spark.sql.sources.v2.writer._
import org.apache.spark.util.SerializableConfiguration

class FileBatchWrite(
job: Job,
description: WriteJobDescription,
committer: FileCommitProtocol)
extends BatchWrite with Logging {
override def commit(messages: Array[WriterCommitMessage]): Unit = {
val results = messages.map(_.asInstanceOf[WriteTaskResult])
committer.commitJob(job, results.map(_.commitMsg))
logInfo(s"Write Job ${description.uuid} committed.")

processStats(description.statsTrackers, results.map(_.summary.stats))
logInfo(s"Finished processing stats for write job ${description.uuid}.")
}

override def useCommitCoordinator(): Boolean = false

override def abort(messages: Array[WriterCommitMessage]): Unit = {
committer.abortJob(job)
}

override def createBatchWriterFactory(): DataWriterFactory = {
val conf = new SerializableConfiguration(job.getConfiguration)
FileWriterFactory(description, committer, conf)
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@ import org.apache.hadoop.fs.FileStatus

import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.sources.v2.{SupportsBatchRead, Table}
import org.apache.spark.sql.sources.v2.{SupportsBatchRead, SupportsBatchWrite, Table}
import org.apache.spark.sql.types.StructType

abstract class FileTable(
sparkSession: SparkSession,
fileIndex: PartitioningAwareFileIndex,
userSpecifiedSchema: Option[StructType]) extends Table with SupportsBatchRead {
userSpecifiedSchema: Option[StructType])
extends Table with SupportsBatchRead with SupportsBatchWrite {
def getFileIndex: PartitioningAwareFileIndex = this.fileIndex

lazy val dataSchema: StructType = userSpecifiedSchema.orElse {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.datasources.v2

import java.util.{Optional, UUID}

import scala.collection.JavaConverters._

import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat

import org.apache.spark.internal.io.{FileCommitProtocol, HadoopMapReduceCommitProtocol}
import org.apache.spark.sql.{AnalysisException, SaveMode, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker, DataSource, OutputWriterFactory, WriteJobDescription}
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.v2.{DataSourceOptions, SupportsBatchWrite}
import org.apache.spark.sql.sources.v2.writer.{BatchWrite, SupportsSaveMode, WriteBuilder}
import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.util.SerializableConfiguration

abstract class FileWriteBuilder(options: DataSourceOptions)
extends WriteBuilder with SupportsSaveMode {
private var schema: StructType = _
private var queryId: String = _
private var mode: SaveMode = _

override def withInputDataSchema(schema: StructType): WriteBuilder = {
this.schema = schema
this
}

override def withQueryId(queryId: String): WriteBuilder = {
this.queryId = queryId
this
}

override def mode(mode: SaveMode): WriteBuilder = {
this.mode = mode
this
}

override def buildForBatch(): BatchWrite = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this method is too long, could be better if we can separate it into multiple methods

assert(schema != null, "Missing input data schema")
assert(queryId != null, "Missing query ID")
assert(mode != null, "Missing save mode")
assert(options.paths().length == 1)
DataSource.validateSchema(schema)
val pathName = options.paths().head
val path = new Path(pathName)
val sparkSession = SparkSession.active
val optionsAsScala = options.asMap().asScala.toMap
val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(optionsAsScala)
val job = Job.getInstance(hadoopConf)
val committer = FileCommitProtocol.instantiate(
sparkSession.sessionState.conf.fileCommitProtocolClass,
jobId = java.util.UUID.randomUUID().toString,
outputPath = pathName)

job.setOutputKeyClass(classOf[Void])
job.setOutputValueClass(classOf[InternalRow])
FileOutputFormat.setOutputPath(job, path)

val caseInsensitiveOptions = CaseInsensitiveMap(optionsAsScala)
// Note: prepareWrite has side effect. It sets "job".
val outputWriterFactory =
prepareWrite(sparkSession.sessionState.conf, job, caseInsensitiveOptions, schema)
val allColumns = schema.toAttributes
lazy val metrics: Map[String, SQLMetric] = BasicWriteJobStatsTracker.metrics
val serializableHadoopConf = new SerializableConfiguration(hadoopConf)
val statsTracker = new BasicWriteJobStatsTracker(serializableHadoopConf, metrics)
// TODO: after partitioning is supported in V2:
// 1. filter out partition columns in `dataColumns`.
// 2. Don't use Seq.empty for `partitionColumns`.
val description = new WriteJobDescription(
uuid = UUID.randomUUID().toString,
serializableHadoopConf = new SerializableConfiguration(job.getConfiguration),
outputWriterFactory = outputWriterFactory,
allColumns = allColumns,
dataColumns = allColumns,
partitionColumns = Seq.empty,
bucketIdExpression = None,
path = pathName,
customPartitionLocations = Map.empty,
maxRecordsPerFile = caseInsensitiveOptions.get("maxRecordsPerFile").map(_.toLong)
.getOrElse(sparkSession.sessionState.conf.maxRecordsPerFile),
timeZoneId = caseInsensitiveOptions.get(DateTimeUtils.TIMEZONE_OPTION)
.getOrElse(sparkSession.sessionState.conf.sessionLocalTimeZone),
statsTrackers = Seq(statsTracker)
)

val fs = path.getFileSystem(hadoopConf)
mode match {
case SaveMode.ErrorIfExists if (fs.exists(path)) =>
val qualifiedOutputPath = path.makeQualified(fs.getUri, fs.getWorkingDirectory)
throw new AnalysisException(s"path $qualifiedOutputPath already exists.")

case SaveMode.Ignore if (fs.exists(path)) =>
null

case SaveMode.Overwrite =>
committer.deleteWithJob(fs, path, true)
committer.setupJob(job)
new FileBatchWrite(job, description, committer)

case _ =>
committer.setupJob(job)
new FileBatchWrite(job, description, committer)
}
}

/**
* Prepares a write job and returns an [[OutputWriterFactory]]. Client side job preparation can
* be put here. For example, user defined output committer can be configured here
* by setting the output committer class in the conf of spark.sql.sources.outputCommitterClass.
*/
def prepareWrite(
sqlConf: SQLConf,
job: Job,
options: Map[String, String],
dataSchema: StructType): OutputWriterFactory
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.datasources.v2

import java.util.Date

import org.apache.hadoop.mapreduce.{TaskAttemptID, TaskID, TaskType}
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl

import org.apache.spark.internal.io.{FileCommitProtocol, SparkHadoopWriterUtils}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.{DynamicPartitionDataWriter, EmptyDirectoryDataWriter, SingleDirectoryDataWriter, WriteJobDescription}
import org.apache.spark.sql.sources.v2.writer.{DataWriter, DataWriterFactory}
import org.apache.spark.util.SerializableConfiguration

case class FileWriterFactory (
description: WriteJobDescription,
committer: FileCommitProtocol,
conf: SerializableConfiguration) extends DataWriterFactory {
override def createWriter(partitionId: Int, realTaskId: Long): DataWriter[InternalRow] = {
val taskAttemptContext = createTaskAttemptContext(partitionId)
committer.setupTask(taskAttemptContext)
if (description.partitionColumns.isEmpty) {
new SingleDirectoryDataWriter(description, taskAttemptContext, committer)
} else {
new DynamicPartitionDataWriter(description, taskAttemptContext, committer)
}
}

private def createTaskAttemptContext(partitionId: Int): TaskAttemptContextImpl = {
val jobId = SparkHadoopWriterUtils.createJobID(new Date, 0)
val taskId = new TaskID(jobId, TaskType.MAP, partitionId)
val taskAttemptId = new TaskAttemptID(taskId, 0)
// Set up the configuration object
val hadoopConf = conf.value
hadoopConf.set("mapreduce.job.id", jobId.toString)
hadoopConf.set("mapreduce.task.id", taskId.toString)
hadoopConf.set("mapreduce.task.attempt.id", taskAttemptId.toString)
hadoopConf.setBoolean("mapreduce.task.ismap", true)
hadoopConf.setInt("mapreduce.task.partition", 0)

new TaskAttemptContextImpl(hadoopConf, taskAttemptId)
}
}
Loading