diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index f33cc86a18a1..04b6a0d97f0b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1494,7 +1494,7 @@ object SQLConf { " register class names for which data source V2 write paths are disabled. Writes from these" + " sources will fall back to the V1 sources.") .stringConf - .createWithDefault("csv,orc,text") + .createWithDefault("csv,json,orc,text") val DISABLED_V2_STREAMING_WRITERS = buildConf("spark.sql.streaming.disabledV2Writers") .doc("A comma-separated list of fully qualified data source register class names for which" + diff --git a/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister index be9cb8153f25..8dc97c308d79 100644 --- a/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister +++ b/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister @@ -1,6 +1,6 @@ org.apache.spark.sql.execution.datasources.v2.csv.CSVDataSourceV2 org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider -org.apache.spark.sql.execution.datasources.json.JsonFileFormat +org.apache.spark.sql.execution.datasources.v2.json.JsonDataSourceV2 org.apache.spark.sql.execution.datasources.noop.NoopDataSource org.apache.spark.sql.execution.datasources.orc.OrcFileFormat org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index fb619a7b65cf..f988f6eb266d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -37,9 +37,9 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.util.{escapeSingleQuotedString, quoteIdentifier} import org.apache.spark.sql.execution.datasources.{DataSource, PartitioningUtils} -import org.apache.spark.sql.execution.datasources.json.JsonFileFormat import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.execution.datasources.v2.csv.CSVDataSourceV2 +import org.apache.spark.sql.execution.datasources.v2.json.JsonDataSourceV2 import org.apache.spark.sql.execution.datasources.v2.orc.OrcDataSourceV2 import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -238,7 +238,7 @@ case class AlterTableAddColumnsCommand( // TextFileFormat only default to one column "value" // Hive type is already considered as hive serde table, so the logic will not // come in here. - case _: JsonFileFormat | _: CSVDataSourceV2 | _: ParquetFileFormat | _: OrcDataSourceV2 => + case _: JsonDataSourceV2 | _: CSVDataSourceV2 | _: ParquetFileFormat | _: OrcDataSourceV2 => case s if s.getClass.getCanonicalName.endsWith("OrcFileFormat") => case s => throw new AnalysisException( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala index d3f04145b83d..95a63c3d1e2d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala @@ -157,38 +157,3 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister { case _ => false } } - -private[json] class JsonOutputWriter( - path: String, - options: JSONOptions, - dataSchema: StructType, - context: TaskAttemptContext) - extends OutputWriter with Logging { - - private val encoding = options.encoding match { - case Some(charsetName) => Charset.forName(charsetName) - case None => StandardCharsets.UTF_8 - } - - if (JSONOptionsInRead.blacklist.contains(encoding)) { - logWarning(s"The JSON file ($path) was written in the encoding ${encoding.displayName()}" + - " which can be read back by Spark only if multiLine is enabled.") - } - - private var jacksonGenerator: Option[JacksonGenerator] = None - - override def write(row: InternalRow): Unit = { - val gen = jacksonGenerator.getOrElse { - val os = CodecStreams.createOutputStreamWriter(context, new Path(path), encoding) - // create the Generator without separator inserted between 2 records - val newGen = new JacksonGenerator(dataSchema, os, options) - jacksonGenerator = Some(newGen) - newGen - } - - gen.write(row) - gen.writeLineEnding() - } - - override def close(): Unit = jacksonGenerator.foreach(_.close()) -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonOutputWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonOutputWriter.scala new file mode 100644 index 000000000000..b3cd570cfb1c --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonOutputWriter.scala @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.json + +import java.nio.charset.{Charset, StandardCharsets} + +import org.apache.hadoop.fs.Path +import org.apache.hadoop.mapreduce.TaskAttemptContext + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.json.{JacksonGenerator, JSONOptions, JSONOptionsInRead} +import org.apache.spark.sql.execution.datasources.{CodecStreams, OutputWriter} +import org.apache.spark.sql.types.StructType + +class JsonOutputWriter( + path: String, + options: JSONOptions, + dataSchema: StructType, + context: TaskAttemptContext) + extends OutputWriter with Logging { + + private val encoding = options.encoding match { + case Some(charsetName) => Charset.forName(charsetName) + case None => StandardCharsets.UTF_8 + } + + if (JSONOptionsInRead.blacklist.contains(encoding)) { + logWarning(s"The JSON file ($path) was written in the encoding ${encoding.displayName()}" + + " which can be read back by Spark only if multiLine is enabled.") + } + + private var jacksonGenerator: Option[JacksonGenerator] = None + + override def write(row: InternalRow): Unit = { + val gen = jacksonGenerator.getOrElse { + val os = CodecStreams.createOutputStreamWriter(context, new Path(path), encoding) + // create the Generator without separator inserted between 2 records + val newGen = new JacksonGenerator(dataSchema, os, options) + jacksonGenerator = Some(newGen) + newGen + } + + gen.write(row) + gen.writeLineEnding() + } + + override def close(): Unit = jacksonGenerator.foreach(_.close()) +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonDataSourceV2.scala new file mode 100644 index 000000000000..610bd4c1b9d8 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonDataSourceV2.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.v2.json + +import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.execution.datasources.json.JsonFileFormat +import org.apache.spark.sql.execution.datasources.v2._ +import org.apache.spark.sql.sources.v2.Table +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +class JsonDataSourceV2 extends FileDataSourceV2 { + + override def fallbackFileFormat: Class[_ <: FileFormat] = classOf[JsonFileFormat] + + override def shortName(): String = "json" + + override def getTable(options: CaseInsensitiveStringMap): Table = { + val paths = getPaths(options) + val tableName = getTableName(paths) + JsonTable(tableName, sparkSession, options, paths, None, fallbackFileFormat) + } + + override def getTable(options: CaseInsensitiveStringMap, schema: StructType): Table = { + val paths = getPaths(options) + val tableName = getTableName(paths) + JsonTable(tableName, sparkSession, options, paths, Some(schema), fallbackFileFormat) + } +} + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonPartitionReaderFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonPartitionReaderFactory.scala new file mode 100644 index 000000000000..e5b7ae0bd228 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonPartitionReaderFactory.scala @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.v2.json + +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.json.{JacksonParser, JSONOptionsInRead} +import org.apache.spark.sql.execution.datasources.PartitionedFile +import org.apache.spark.sql.execution.datasources.json.JsonDataSource +import org.apache.spark.sql.execution.datasources.v2._ +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.sources.v2.reader.PartitionReader +import org.apache.spark.sql.types.StructType +import org.apache.spark.util.SerializableConfiguration + +/** + * A factory used to create JSON readers. + * + * @param sqlConf SQL configuration. + * @param broadcastedConf Broadcast serializable Hadoop Configuration. + * @param dataSchema Schema of JSON files. + * @param readDataSchema Required schema of JSON files. + * @param partitionSchema Schema of partitions. + * @param parsedOptions Options for parsing JSON files. + */ +case class JsonPartitionReaderFactory( + sqlConf: SQLConf, + broadcastedConf: Broadcast[SerializableConfiguration], + dataSchema: StructType, + readDataSchema: StructType, + partitionSchema: StructType, + parsedOptions: JSONOptionsInRead) extends FilePartitionReaderFactory { + + override def buildReader(partitionedFile: PartitionedFile): PartitionReader[InternalRow] = { + val actualSchema = + StructType(readDataSchema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord)) + val parser = new JacksonParser(actualSchema, parsedOptions, allowArrayAsStructs = true) + val iter = JsonDataSource(parsedOptions).readFile( + broadcastedConf.value.value, + partitionedFile, + parser, + readDataSchema) + val fileReader = new PartitionReaderFromIterator[InternalRow](iter) + new PartitionReaderWithPartitionValues(fileReader, readDataSchema, + partitionSchema, partitionedFile.partitionValues) + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonScan.scala new file mode 100644 index 000000000000..201572b4338b --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonScan.scala @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.v2.json + +import scala.collection.JavaConverters._ + +import org.apache.hadoop.fs.Path + +import org.apache.spark.sql.{AnalysisException, SparkSession} +import org.apache.spark.sql.catalyst.expressions.ExprUtils +import org.apache.spark.sql.catalyst.json.JSONOptionsInRead +import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap +import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex +import org.apache.spark.sql.execution.datasources.json.JsonDataSource +import org.apache.spark.sql.execution.datasources.v2.{FileScan, TextBasedFileScan} +import org.apache.spark.sql.sources.v2.reader.PartitionReaderFactory +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap +import org.apache.spark.util.SerializableConfiguration + +case class JsonScan( + sparkSession: SparkSession, + fileIndex: PartitioningAwareFileIndex, + dataSchema: StructType, + readDataSchema: StructType, + readPartitionSchema: StructType, + options: CaseInsensitiveStringMap) + extends TextBasedFileScan(sparkSession, fileIndex, readDataSchema, readPartitionSchema, options) { + + private val parsedOptions = new JSONOptionsInRead( + CaseInsensitiveMap(options.asScala.toMap), + sparkSession.sessionState.conf.sessionLocalTimeZone, + sparkSession.sessionState.conf.columnNameOfCorruptRecord) + + override def isSplitable(path: Path): Boolean = { + JsonDataSource(parsedOptions).isSplitable && super.isSplitable(path) + } + + override def createReaderFactory(): PartitionReaderFactory = { + // Check a field requirement for corrupt records here to throw an exception in a driver side + ExprUtils.verifyColumnNameOfCorruptRecord(dataSchema, parsedOptions.columnNameOfCorruptRecord) + + if (readDataSchema.length == 1 && + readDataSchema.head.name == parsedOptions.columnNameOfCorruptRecord) { + throw new AnalysisException( + "Since Spark 2.3, the queries from raw JSON/CSV files are disallowed when the\n" + + "referenced columns only include the internal corrupt record column\n" + + s"(named _corrupt_record by default). For example:\n" + + "spark.read.schema(schema).json(file).filter($\"_corrupt_record\".isNotNull).count()\n" + + "and spark.read.schema(schema).json(file).select(\"_corrupt_record\").show().\n" + + "Instead, you can cache or save the parsed results and then send the same query.\n" + + "For example, val df = spark.read.schema(schema).json(file).cache() and then\n" + + "df.filter($\"_corrupt_record\".isNotNull).count()." + ) + } + val caseSensitiveMap = options.asCaseSensitiveMap.asScala.toMap + // Hadoop Configurations are case sensitive. + val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(caseSensitiveMap) + val broadcastedConf = sparkSession.sparkContext.broadcast( + new SerializableConfiguration(hadoopConf)) + // The partition values are already truncated in `FileScan.partitions`. + // We should use `readPartitionSchema` as the partition schema here. + JsonPartitionReaderFactory(sparkSession.sessionState.conf, broadcastedConf, + dataSchema, readDataSchema, readPartitionSchema, parsedOptions) + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonScanBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonScanBuilder.scala new file mode 100644 index 000000000000..bb3c0366bdc2 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonScanBuilder.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.v2.json + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex +import org.apache.spark.sql.execution.datasources.v2.FileScanBuilder +import org.apache.spark.sql.sources.v2.reader.Scan +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +class JsonScanBuilder ( + sparkSession: SparkSession, + fileIndex: PartitioningAwareFileIndex, + schema: StructType, + dataSchema: StructType, + options: CaseInsensitiveStringMap) + extends FileScanBuilder(sparkSession, fileIndex, dataSchema) { + override def build(): Scan = { + JsonScan(sparkSession, fileIndex, dataSchema, readDataSchema(), readPartitionSchema(), options) + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonTable.scala new file mode 100644 index 000000000000..bbdd3ae69222 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonTable.scala @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.v2.json + +import scala.collection.JavaConverters._ + +import org.apache.hadoop.fs.FileStatus + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.json.JSONOptionsInRead +import org.apache.spark.sql.execution.datasources.FileFormat +import org.apache.spark.sql.execution.datasources.json.JsonDataSource +import org.apache.spark.sql.execution.datasources.v2.FileTable +import org.apache.spark.sql.sources.v2.writer.WriteBuilder +import org.apache.spark.sql.types._ +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +case class JsonTable( + name: String, + sparkSession: SparkSession, + options: CaseInsensitiveStringMap, + paths: Seq[String], + userSpecifiedSchema: Option[StructType], + fallbackFileFormat: Class[_ <: FileFormat]) + extends FileTable(sparkSession, options, paths, userSpecifiedSchema) { + override def newScanBuilder(options: CaseInsensitiveStringMap): JsonScanBuilder = + new JsonScanBuilder(sparkSession, fileIndex, schema, dataSchema, options) + + override def inferSchema(files: Seq[FileStatus]): Option[StructType] = { + val parsedOptions = new JSONOptionsInRead( + options.asScala.toMap, + sparkSession.sessionState.conf.sessionLocalTimeZone, + sparkSession.sessionState.conf.columnNameOfCorruptRecord) + JsonDataSource(parsedOptions).inferSchema( + sparkSession, files, parsedOptions) + } + + override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = + new JsonWriteBuilder(options, paths, formatName, supportsDataType) + + override def supportsDataType(dataType: DataType): Boolean = dataType match { + case _: AtomicType => true + + case st: StructType => st.forall { f => supportsDataType(f.dataType) } + + case ArrayType(elementType, _) => supportsDataType(elementType) + + case MapType(keyType, valueType, _) => + supportsDataType(keyType) && supportsDataType(valueType) + + case udt: UserDefinedType[_] => supportsDataType(udt.sqlType) + + case _: NullType => true + + case _ => false + } + + override def formatName: String = "JSON" +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonWriteBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonWriteBuilder.scala new file mode 100644 index 000000000000..3c99e07489a7 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonWriteBuilder.scala @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.v2.json + +import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext} + +import org.apache.spark.sql.catalyst.json.JSONOptions +import org.apache.spark.sql.catalyst.util.CompressionCodecs +import org.apache.spark.sql.execution.datasources.{CodecStreams, OutputWriter, OutputWriterFactory} +import org.apache.spark.sql.execution.datasources.json.JsonOutputWriter +import org.apache.spark.sql.execution.datasources.v2.FileWriteBuilder +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types._ +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +class JsonWriteBuilder( + options: CaseInsensitiveStringMap, + paths: Seq[String], + formatName: String, + supportsDataType: DataType => Boolean) + extends FileWriteBuilder(options, paths, formatName, supportsDataType) { + override def prepareWrite( + sqlConf: SQLConf, + job: Job, + options: Map[String, String], + dataSchema: StructType): OutputWriterFactory = { + val conf = job.getConfiguration + val parsedOptions = new JSONOptions( + options, + sqlConf.sessionLocalTimeZone, + sqlConf.columnNameOfCorruptRecord) + parsedOptions.compressionCodec.foreach { codec => + CompressionCodecs.setCodecConfiguration(conf, codec) + } + + new OutputWriterFactory { + override def newInstance( + path: String, + dataSchema: StructType, + context: TaskAttemptContext): OutputWriter = { + new JsonOutputWriter(path, parsedOptions, dataSchema, context) + } + + override def getFileExtension(context: TaskAttemptContext): String = { + ".json" + CodecStreams.getCompressionExtension(context) + } + } + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala index 8fcffbfee54c..c6fdf41ca7d9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala @@ -332,7 +332,7 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with Befo // TODO: test file source V2 after write path is fixed. Seq(true).foreach { useV1 => val useV1List = if (useV1) { - "csv,orc" + "csv,json,orc" } else { "" } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuerySuite.scala index a543eb835165..e20a82ba9bc4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuerySuite.scala @@ -130,7 +130,10 @@ class OptimizeMetadataOnlyQuerySuite extends QueryTest with SharedSQLContext { } test("Incorrect result caused by the rule OptimizeMetadataOnlyQuery") { - withSQLConf(OPTIMIZER_METADATA_ONLY.key -> "true") { + // This test case is only for file source V1. As the rule OptimizeMetadataOnlyQuery is disabled + // by default, we can skip testing file source v2 in current stage. + withSQLConf(OPTIMIZER_METADATA_ONLY.key -> "true", + SQLConf.USE_V1_SOURCE_READER_LIST.key -> "json") { withTempPath { path => val tablePath = new File(s"${path.getCanonicalPath}/cOl3=c/cOl1=a/cOl5=e") Seq(("a", "b", "c", "d", "e")).toDF("cOl1", "cOl2", "cOl3", "cOl4", "cOl5") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 69761775e3d5..6316e89537ca 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -1961,9 +1961,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { spark.read.schema(schema).json(path).select("_corrupt_record").collect() }.getMessage assert(msg.contains("only include the internal corrupt record column")) - intercept[catalyst.errors.TreeNodeException[_]] { - spark.read.schema(schema).json(path).filter($"_corrupt_record".isNotNull).count() - } + // workaround val df = spark.read.schema(schema).json(path).cache() assert(df.filter($"_corrupt_record".isNotNull).count() == 1) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index c9ff4fc0777e..87bbf04fb26d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -913,24 +913,24 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv withTable("appendParquetToJson") { createDF(0, 9).write.format("json").saveAsTable("appendParquetToJson") - val e = intercept[AnalysisException] { + val msg = intercept[AnalysisException] { createDF(10, 19).write.mode(SaveMode.Append).format("parquet") .saveAsTable("appendParquetToJson") - } - assert(e.getMessage.contains( - "The format of the existing table default.appendParquetToJson is `JsonFileFormat`. " + - "It doesn't match the specified format `ParquetFileFormat`")) + }.getMessage + // The format of the existing table can be JsonDataSourceV2 or JsonFileFormat. + assert(msg.contains("The format of the existing table default.appendParquetToJson is `Json")) + assert(msg.contains("It doesn't match the specified format `ParquetFileFormat`")) } withTable("appendTextToJson") { createDF(0, 9).write.format("json").saveAsTable("appendTextToJson") - val e = intercept[AnalysisException] { + val msg = intercept[AnalysisException] { createDF(10, 19).write.mode(SaveMode.Append).format("text") .saveAsTable("appendTextToJson") - } - assert(e.getMessage.contains( - "The format of the existing table default.appendTextToJson is `JsonFileFormat`. " + - "It doesn't match the specified format")) + }.getMessage + // The format of the existing table can be JsonDataSourceV2 or JsonFileFormat. + assert(msg.contains("The format of the existing table default.appendTextToJson is `Json")) + assert(msg.contains("It doesn't match the specified format")) } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala index bf6d0ea5788d..0587cfebc8d3 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala @@ -817,10 +817,12 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes assert(preferredLocations.distinct.length == 2) } - checkLocality() - - withSQLConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD.key -> "0") { + withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> dataSourceName) { checkLocality() + + withSQLConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD.key -> "0") { + checkLocality() + } } } }