diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala similarity index 59% rename from sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala rename to sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala index 69dd622ce4a54..bde2d2b89d56f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala @@ -20,152 +20,15 @@ package org.apache.spark.sql.execution.datasources import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs._ import org.apache.hadoop.io.compress.{CompressionCodecFactory, SplittableCompressionCodec} -import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext} +import org.apache.hadoop.mapreduce.Job -import org.apache.spark.annotation.Experimental import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} -import org.apache.spark.sql.catalyst.catalog.BucketSpec +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection -import org.apache.spark.sql.execution.FileRelation -import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, Filter} +import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types.StructType -/** - * ::Experimental:: - * A factory that produces [[OutputWriter]]s. A new [[OutputWriterFactory]] is created on driver - * side for each write job issued when writing to a [[HadoopFsRelation]], and then gets serialized - * to executor side to create actual [[OutputWriter]]s on the fly. - * - * @since 1.4.0 - */ -@Experimental -abstract class OutputWriterFactory extends Serializable { - /** - * When writing to a [[HadoopFsRelation]], this method gets called by each task on executor side - * to instantiate new [[OutputWriter]]s. - * - * @param path Path of the file to which this [[OutputWriter]] is supposed to write. Note that - * this may not point to the final output file. For example, `FileOutputFormat` writes to - * temporary directories and then merge written files back to the final destination. In - * this case, `path` points to a temporary output file under the temporary directory. - * @param dataSchema Schema of the rows to be written. Partition columns are not included in the - * schema if the relation being written is partitioned. - * @param context The Hadoop MapReduce task context. - * @since 1.4.0 - */ - def newInstance( - path: String, - bucketId: Option[Int], // TODO: This doesn't belong here... - dataSchema: StructType, - context: TaskAttemptContext): OutputWriter - - /** - * Returns a new instance of [[OutputWriter]] that will write data to the given path. - * This method gets called by each task on executor to write [[InternalRow]]s to - * format-specific files. Compared to the other `newInstance()`, this is a newer API that - * passes only the path that the writer must write to. The writer must write to the exact path - * and not modify it (do not add subdirectories, extensions, etc.). All other - * file-format-specific information needed to create the writer must be passed - * through the [[OutputWriterFactory]] implementation. - * @since 2.0.0 - */ - def newWriter(path: String): OutputWriter = { - throw new UnsupportedOperationException("newInstance with just path not supported") - } -} - -/** - * ::Experimental:: - * [[OutputWriter]] is used together with [[HadoopFsRelation]] for persisting rows to the - * underlying file system. Subclasses of [[OutputWriter]] must provide a zero-argument constructor. - * An [[OutputWriter]] instance is created and initialized when a new output file is opened on - * executor side. This instance is used to persist rows to this single output file. - * - * @since 1.4.0 - */ -@Experimental -abstract class OutputWriter { - /** - * Persists a single row. Invoked on the executor side. When writing to dynamically partitioned - * tables, dynamic partition columns are not included in rows to be written. - * - * @since 1.4.0 - */ - def write(row: Row): Unit - - /** - * Closes the [[OutputWriter]]. Invoked on the executor side after all rows are persisted, before - * the task output is committed. - * - * @since 1.4.0 - */ - def close(): Unit - - private var converter: InternalRow => Row = _ - - protected[sql] def initConverter(dataSchema: StructType) = { - converter = - CatalystTypeConverters.createToScalaConverter(dataSchema).asInstanceOf[InternalRow => Row] - } - - protected[sql] def writeInternal(row: InternalRow): Unit = { - write(converter(row)) - } -} - -/** - * Acts as a container for all of the metadata required to read from a datasource. All discovery, - * resolution and merging logic for schemas and partitions has been removed. - * - * @param location A [[FileCatalog]] that can enumerate the locations of all the files that comprise - * this relation. - * @param partitionSchema The schema of the columns (if any) that are used to partition the relation - * @param dataSchema The schema of any remaining columns. Note that if any partition columns are - * present in the actual data files as well, they are preserved. - * @param bucketSpec Describes the bucketing (hash-partitioning of the files by some column values). - * @param fileFormat A file format that can be used to read and write the data in files. - * @param options Configuration used when reading / writing data. - */ -case class HadoopFsRelation( - location: FileCatalog, - partitionSchema: StructType, - dataSchema: StructType, - bucketSpec: Option[BucketSpec], - fileFormat: FileFormat, - options: Map[String, String])(val sparkSession: SparkSession) - extends BaseRelation with FileRelation { - - override def sqlContext: SQLContext = sparkSession.sqlContext - - val schema: StructType = { - val dataSchemaColumnNames = dataSchema.map(_.name.toLowerCase).toSet - StructType(dataSchema ++ partitionSchema.filterNot { column => - dataSchemaColumnNames.contains(column.name.toLowerCase) - }) - } - - def partitionSchemaOption: Option[StructType] = - if (partitionSchema.isEmpty) None else Some(partitionSchema) - def partitionSpec: PartitionSpec = location.partitionSpec() - - def refresh(): Unit = location.refresh() - - override def toString: String = { - fileFormat match { - case source: DataSourceRegister => source.shortName() - case _ => "HadoopFiles" - } - } - - /** Returns the list of files that will be read when scanning this relation. */ - override def inputFiles: Array[String] = - location.allFiles().map(_.getPath.toUri.toString).toArray - - override def sizeInBytes: Long = location.allFiles().map(_.getLen).sum -} - /** * Used to read and write data stored in files to/from the [[InternalRow]] format. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala new file mode 100644 index 0000000000000..c7ebe0b76a150 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import org.apache.spark.sql.{SparkSession, SQLContext} +import org.apache.spark.sql.catalyst.catalog.BucketSpec +import org.apache.spark.sql.execution.FileRelation +import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister} +import org.apache.spark.sql.types.StructType + + +/** + * Acts as a container for all of the metadata required to read from a datasource. All discovery, + * resolution and merging logic for schemas and partitions has been removed. + * + * @param location A [[FileCatalog]] that can enumerate the locations of all the files that comprise + * this relation. + * @param partitionSchema The schema of the columns (if any) that are used to partition the relation + * @param dataSchema The schema of any remaining columns. Note that if any partition columns are + * present in the actual data files as well, they are preserved. + * @param bucketSpec Describes the bucketing (hash-partitioning of the files by some column values). + * @param fileFormat A file format that can be used to read and write the data in files. + * @param options Configuration used when reading / writing data. + */ +case class HadoopFsRelation( + location: FileCatalog, + partitionSchema: StructType, + dataSchema: StructType, + bucketSpec: Option[BucketSpec], + fileFormat: FileFormat, + options: Map[String, String])(val sparkSession: SparkSession) + extends BaseRelation with FileRelation { + + override def sqlContext: SQLContext = sparkSession.sqlContext + + val schema: StructType = { + val dataSchemaColumnNames = dataSchema.map(_.name.toLowerCase).toSet + StructType(dataSchema ++ partitionSchema.filterNot { column => + dataSchemaColumnNames.contains(column.name.toLowerCase) + }) + } + + def partitionSchemaOption: Option[StructType] = + if (partitionSchema.isEmpty) None else Some(partitionSchema) + + def partitionSpec: PartitionSpec = location.partitionSpec() + + def refresh(): Unit = location.refresh() + + override def toString: String = { + fileFormat match { + case source: DataSourceRegister => source.shortName() + case _ => "HadoopFiles" + } + } + + /** Returns the list of files that will be read when scanning this relation. */ + override def inputFiles: Array[String] = + location.allFiles().map(_.getPath.toUri.toString).toArray + + override def sizeInBytes: Long = location.allFiles().map(_.getLen).sum +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/OutputWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/OutputWriter.scala new file mode 100644 index 0000000000000..d2eec7b1413f8 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/OutputWriter.scala @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import org.apache.hadoop.mapreduce.TaskAttemptContext + +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} +import org.apache.spark.sql.types.StructType + + +/** + * A factory that produces [[OutputWriter]]s. A new [[OutputWriterFactory]] is created on driver + * side for each write job issued when writing to a [[HadoopFsRelation]], and then gets serialized + * to executor side to create actual [[OutputWriter]]s on the fly. + */ +abstract class OutputWriterFactory extends Serializable { + /** + * When writing to a [[HadoopFsRelation]], this method gets called by each task on executor side + * to instantiate new [[OutputWriter]]s. + * + * @param path Path of the file to which this [[OutputWriter]] is supposed to write. Note that + * this may not point to the final output file. For example, `FileOutputFormat` writes to + * temporary directories and then merge written files back to the final destination. In + * this case, `path` points to a temporary output file under the temporary directory. + * @param dataSchema Schema of the rows to be written. Partition columns are not included in the + * schema if the relation being written is partitioned. + * @param context The Hadoop MapReduce task context. + * @since 1.4.0 + */ + def newInstance( + path: String, + bucketId: Option[Int], // TODO: This doesn't belong here... + dataSchema: StructType, + context: TaskAttemptContext): OutputWriter + + /** + * Returns a new instance of [[OutputWriter]] that will write data to the given path. + * This method gets called by each task on executor to write InternalRows to + * format-specific files. Compared to the other `newInstance()`, this is a newer API that + * passes only the path that the writer must write to. The writer must write to the exact path + * and not modify it (do not add subdirectories, extensions, etc.). All other + * file-format-specific information needed to create the writer must be passed + * through the [[OutputWriterFactory]] implementation. + * @since 2.0.0 + */ + def newWriter(path: String): OutputWriter = { + throw new UnsupportedOperationException("newInstance with just path not supported") + } +} + + +/** + * [[OutputWriter]] is used together with [[HadoopFsRelation]] for persisting rows to the + * underlying file system. Subclasses of [[OutputWriter]] must provide a zero-argument constructor. + * An [[OutputWriter]] instance is created and initialized when a new output file is opened on + * executor side. This instance is used to persist rows to this single output file. + */ +abstract class OutputWriter { + /** + * Persists a single row. Invoked on the executor side. When writing to dynamically partitioned + * tables, dynamic partition columns are not included in rows to be written. + * + * @since 1.4.0 + */ + def write(row: Row): Unit + + /** + * Closes the [[OutputWriter]]. Invoked on the executor side after all rows are persisted, before + * the task output is committed. + * + * @since 1.4.0 + */ + def close(): Unit + + private var converter: InternalRow => Row = _ + + protected[sql] def initConverter(dataSchema: StructType) = { + converter = + CatalystTypeConverters.createToScalaConverter(dataSchema).asInstanceOf[InternalRow => Row] + } + + protected[sql] def writeInternal(row: InternalRow): Unit = { + write(converter(row)) + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 4a308ff1a32f8..6faafed1e6290 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -425,150 +425,6 @@ class ParquetFileFormat } } -/** - * A factory for generating OutputWriters for writing parquet files. This implemented is different - * from the [[ParquetOutputWriter]] as this does not use any [[OutputCommitter]]. It simply - * writes the data to the path used to generate the output writer. Callers of this factory - * has to ensure which files are to be considered as committed. - */ -private[parquet] class ParquetOutputWriterFactory( - sqlConf: SQLConf, - dataSchema: StructType, - hadoopConf: Configuration, - options: Map[String, String]) extends OutputWriterFactory { - - private val serializableConf: SerializableConfiguration = { - val job = Job.getInstance(hadoopConf) - val conf = ContextUtil.getConfiguration(job) - val parquetOptions = new ParquetOptions(options, sqlConf) - - // We're not really using `ParquetOutputFormat[Row]` for writing data here, because we override - // it in `ParquetOutputWriter` to support appending and dynamic partitioning. The reason why - // we set it here is to setup the output committer class to `ParquetOutputCommitter`, which is - // bundled with `ParquetOutputFormat[Row]`. - job.setOutputFormatClass(classOf[ParquetOutputFormat[Row]]) - - ParquetOutputFormat.setWriteSupportClass(job, classOf[ParquetWriteSupport]) - - // We want to clear this temporary metadata from saving into Parquet file. - // This metadata is only useful for detecting optional columns when pushing down filters. - val dataSchemaToWrite = StructType.removeMetadata( - StructType.metadataKeyForOptionalField, - dataSchema).asInstanceOf[StructType] - ParquetWriteSupport.setSchema(dataSchemaToWrite, conf) - - // Sets flags for `CatalystSchemaConverter` (which converts Catalyst schema to Parquet schema) - // and `CatalystWriteSupport` (writing actual rows to Parquet files). - conf.set( - SQLConf.PARQUET_BINARY_AS_STRING.key, - sqlConf.isParquetBinaryAsString.toString) - - conf.set( - SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, - sqlConf.isParquetINT96AsTimestamp.toString) - - conf.set( - SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key, - sqlConf.writeLegacyParquetFormat.toString) - - // Sets compression scheme - conf.set(ParquetOutputFormat.COMPRESSION, parquetOptions.compressionCodec) - new SerializableConfiguration(conf) - } - - /** - * Returns a [[OutputWriter]] that writes data to the give path without using - * [[OutputCommitter]]. - */ - override def newWriter(path: String): OutputWriter = new OutputWriter { - - // Create TaskAttemptContext that is used to pass on Configuration to the ParquetRecordWriter - private val hadoopTaskAttemptId = new TaskAttemptID(new TaskID(new JobID, TaskType.MAP, 0), 0) - private val hadoopAttemptContext = new TaskAttemptContextImpl( - serializableConf.value, hadoopTaskAttemptId) - - // Instance of ParquetRecordWriter that does not use OutputCommitter - private val recordWriter = createNoCommitterRecordWriter(path, hadoopAttemptContext) - - override def write(row: Row): Unit = { - throw new UnsupportedOperationException("call writeInternal") - } - - protected[sql] override def writeInternal(row: InternalRow): Unit = { - recordWriter.write(null, row) - } - - override def close(): Unit = recordWriter.close(hadoopAttemptContext) - } - - /** Create a [[ParquetRecordWriter]] that writes the given path without using OutputCommitter */ - private def createNoCommitterRecordWriter( - path: String, - hadoopAttemptContext: TaskAttemptContext): RecordWriter[Void, InternalRow] = { - // Custom ParquetOutputFormat that disable use of committer and writes to the given path - val outputFormat = new ParquetOutputFormat[InternalRow]() { - override def getOutputCommitter(c: TaskAttemptContext): OutputCommitter = { null } - override def getDefaultWorkFile(c: TaskAttemptContext, ext: String): Path = { new Path(path) } - } - outputFormat.getRecordWriter(hadoopAttemptContext) - } - - /** Disable the use of the older API. */ - def newInstance( - path: String, - bucketId: Option[Int], - dataSchema: StructType, - context: TaskAttemptContext): OutputWriter = { - throw new UnsupportedOperationException( - "this version of newInstance not supported for " + - "ParquetOutputWriterFactory") - } -} - - -// NOTE: This class is instantiated and used on executor side only, no need to be serializable. -private[parquet] class ParquetOutputWriter( - path: String, - bucketId: Option[Int], - context: TaskAttemptContext) - extends OutputWriter { - - private val recordWriter: RecordWriter[Void, InternalRow] = { - val outputFormat = { - new ParquetOutputFormat[InternalRow]() { - // Here we override `getDefaultWorkFile` for two reasons: - // - // 1. To allow appending. We need to generate unique output file names to avoid - // overwriting existing files (either exist before the write job, or are just written - // by other tasks within the same write job). - // - // 2. To allow dynamic partitioning. Default `getDefaultWorkFile` uses - // `FileOutputCommitter.getWorkPath()`, which points to the base directory of all - // partitions in the case of dynamic partitioning. - override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = { - val configuration = context.getConfiguration - val uniqueWriteJobId = configuration.get(WriterContainer.DATASOURCE_WRITEJOBUUID) - val taskAttemptId = context.getTaskAttemptID - val split = taskAttemptId.getTaskID.getId - val bucketString = bucketId.map(BucketingUtils.bucketIdToString).getOrElse("") - // It has the `.parquet` extension at the end because (de)compression tools - // such as gunzip would not be able to decompress this as the compression - // is not applied on this whole file but on each "page" in Parquet format. - new Path(path, f"part-r-$split%05d-$uniqueWriteJobId$bucketString$extension") - } - } - } - - outputFormat.getRecordWriter(context) - } - - override def write(row: Row): Unit = throw new UnsupportedOperationException("call writeInternal") - - override def writeInternal(row: InternalRow): Unit = recordWriter.write(null, row) - - override def close(): Unit = recordWriter.close(context) -} - object ParquetFileFormat extends Logging { private[parquet] def readSchema( footers: Seq[Footer], sparkSession: SparkSession): Option[StructType] = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOutputWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOutputWriter.scala new file mode 100644 index 0000000000000..f89ce05d82d90 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOutputWriter.scala @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.hadoop.mapreduce._ +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl +import org.apache.parquet.hadoop.{ParquetOutputFormat, ParquetRecordWriter} +import org.apache.parquet.hadoop.util.ContextUtil + +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.datasources.{BucketingUtils, OutputWriter, OutputWriterFactory, WriterContainer} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.StructType +import org.apache.spark.util.SerializableConfiguration + + +/** + * A factory for generating OutputWriters for writing parquet files. This implemented is different + * from the [[ParquetOutputWriter]] as this does not use any [[OutputCommitter]]. It simply + * writes the data to the path used to generate the output writer. Callers of this factory + * has to ensure which files are to be considered as committed. + */ +private[parquet] class ParquetOutputWriterFactory( + sqlConf: SQLConf, + dataSchema: StructType, + hadoopConf: Configuration, + options: Map[String, String]) + extends OutputWriterFactory { + + private val serializableConf: SerializableConfiguration = { + val job = Job.getInstance(hadoopConf) + val conf = ContextUtil.getConfiguration(job) + val parquetOptions = new ParquetOptions(options, sqlConf) + + // We're not really using `ParquetOutputFormat[Row]` for writing data here, because we override + // it in `ParquetOutputWriter` to support appending and dynamic partitioning. The reason why + // we set it here is to setup the output committer class to `ParquetOutputCommitter`, which is + // bundled with `ParquetOutputFormat[Row]`. + job.setOutputFormatClass(classOf[ParquetOutputFormat[Row]]) + + ParquetOutputFormat.setWriteSupportClass(job, classOf[ParquetWriteSupport]) + + // We want to clear this temporary metadata from saving into Parquet file. + // This metadata is only useful for detecting optional columns when pushing down filters. + val dataSchemaToWrite = StructType.removeMetadata( + StructType.metadataKeyForOptionalField, + dataSchema).asInstanceOf[StructType] + ParquetWriteSupport.setSchema(dataSchemaToWrite, conf) + + // Sets flags for `CatalystSchemaConverter` (which converts Catalyst schema to Parquet schema) + // and `CatalystWriteSupport` (writing actual rows to Parquet files). + conf.set( + SQLConf.PARQUET_BINARY_AS_STRING.key, + sqlConf.isParquetBinaryAsString.toString) + + conf.set( + SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, + sqlConf.isParquetINT96AsTimestamp.toString) + + conf.set( + SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key, + sqlConf.writeLegacyParquetFormat.toString) + + // Sets compression scheme + conf.set(ParquetOutputFormat.COMPRESSION, parquetOptions.compressionCodec) + new SerializableConfiguration(conf) + } + + /** + * Returns a [[OutputWriter]] that writes data to the give path without using + * [[OutputCommitter]]. + */ + override def newWriter(path: String): OutputWriter = new OutputWriter { + + // Create TaskAttemptContext that is used to pass on Configuration to the ParquetRecordWriter + private val hadoopTaskAttemptId = new TaskAttemptID(new TaskID(new JobID, TaskType.MAP, 0), 0) + private val hadoopAttemptContext = new TaskAttemptContextImpl( + serializableConf.value, hadoopTaskAttemptId) + + // Instance of ParquetRecordWriter that does not use OutputCommitter + private val recordWriter = createNoCommitterRecordWriter(path, hadoopAttemptContext) + + override def write(row: Row): Unit = { + throw new UnsupportedOperationException("call writeInternal") + } + + protected[sql] override def writeInternal(row: InternalRow): Unit = { + recordWriter.write(null, row) + } + + override def close(): Unit = recordWriter.close(hadoopAttemptContext) + } + + /** Create a [[ParquetRecordWriter]] that writes the given path without using OutputCommitter */ + private def createNoCommitterRecordWriter( + path: String, + hadoopAttemptContext: TaskAttemptContext): RecordWriter[Void, InternalRow] = { + // Custom ParquetOutputFormat that disable use of committer and writes to the given path + val outputFormat = new ParquetOutputFormat[InternalRow]() { + override def getOutputCommitter(c: TaskAttemptContext): OutputCommitter = { null } + override def getDefaultWorkFile(c: TaskAttemptContext, ext: String): Path = { new Path(path) } + } + outputFormat.getRecordWriter(hadoopAttemptContext) + } + + /** Disable the use of the older API. */ + def newInstance( + path: String, + bucketId: Option[Int], + dataSchema: StructType, + context: TaskAttemptContext): OutputWriter = { + throw new UnsupportedOperationException( + "this version of newInstance not supported for " + + "ParquetOutputWriterFactory") + } +} + + +// NOTE: This class is instantiated and used on executor side only, no need to be serializable. +private[parquet] class ParquetOutputWriter( + path: String, + bucketId: Option[Int], + context: TaskAttemptContext) + extends OutputWriter { + + private val recordWriter: RecordWriter[Void, InternalRow] = { + val outputFormat = { + new ParquetOutputFormat[InternalRow]() { + // Here we override `getDefaultWorkFile` for two reasons: + // + // 1. To allow appending. We need to generate unique output file names to avoid + // overwriting existing files (either exist before the write job, or are just written + // by other tasks within the same write job). + // + // 2. To allow dynamic partitioning. Default `getDefaultWorkFile` uses + // `FileOutputCommitter.getWorkPath()`, which points to the base directory of all + // partitions in the case of dynamic partitioning. + override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = { + val configuration = context.getConfiguration + val uniqueWriteJobId = configuration.get(WriterContainer.DATASOURCE_WRITEJOBUUID) + val taskAttemptId = context.getTaskAttemptID + val split = taskAttemptId.getTaskID.getId + val bucketString = bucketId.map(BucketingUtils.bucketIdToString).getOrElse("") + // It has the `.parquet` extension at the end because (de)compression tools + // such as gunzip would not be able to decompress this as the compression + // is not applied on this whole file but on each "page" in Parquet format. + new Path(path, f"part-r-$split%05d-$uniqueWriteJobId$bucketString$extension") + } + } + } + + outputFormat.getRecordWriter(context) + } + + override def write(row: Row): Unit = throw new UnsupportedOperationException("call writeInternal") + + override def writeInternal(row: InternalRow): Unit = recordWriter.write(null, row) + + override def close(): Unit = recordWriter.close(context) +}