@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources.delta
1919
2020import java .io .File
2121
22+ import org .apache .hadoop .fs .FileSystem
2223import org .apache .hadoop .fs .Path
2324
2425import org .apache .spark .SparkConf
@@ -32,7 +33,7 @@ import org.apache.spark.sql.delta.{DeltaLog, DeltaTableUtils}
3233import org .apache .spark .sql .delta .catalog .DeltaCatalog
3334import org .apache .spark .sql .execution .{CommandResultExec , FileSourceScanExec , SparkPlan }
3435import org .apache .spark .sql .execution .adaptive .AdaptiveSparkPlanHelper
35- import org .apache .spark .sql .execution .command .{ExecutedCommandExec , RunnableCommand }
36+ import org .apache .spark .sql .execution .command .{ExecutedCommandExec , RunnableCommand , UploadDataCommand }
3637import org .apache .spark .sql .execution .datasources .v2 .{AppendDataExecV1 , AtomicCreateTableAsSelectExec , AtomicReplaceTableAsSelectExec , CreateTableExec , OverwriteByExpressionExecV1 }
3738import org .apache .spark .sql .execution .metric .SQLMetric
3839import org .apache .spark .sql .internal .{SQLConf , StaticSQLConf }
@@ -3733,56 +3734,55 @@ class DeltaQuerySuite extends QueryTest
37333734// }
37343735// }
37353736// }
3736- //
3737- // test("CARMEL-5229 : upload - Resolve table schema with upload csv header in case insensitive") {
3738- // withTempDir { dir =>
3739- // Seq(true, false).foreach { ae =>
3740- // withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> ae.toString,
3741- // SQLConf.AUTO_REPARTITION_FOR_WRITING_ENABLED.key -> "true",
3742- // SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0",
3743- // SQLConf.COALESCE_PARTITIONS_ENABLED.key -> "false",
3744- // SQLConf.SHUFFLE_PARTITIONS.key -> "5") {
3745- //
3746- // withTable( "dest") {
3747- // sql("CREATE TABLE dest(Id BIGINT, NAME STRING, sex Boolean) USING DELTA")
3748- //
3749- // val table = TableIdentifier("dest")
3750- // val targetTable = getCatalogTable(table)
3751- //
3752- // val path = new Path(dir.getCanonicalPath, ae.toString)
3753- // spark.range(0, 5).map(x => (x, s"user_$x", x % 2==0)).toDF("ID", "name", "Sex").
3754- // toDF().coalesce(1).write.option("header", true).csv(path.toString)
3755- //
3756- // val uploadDataCommand =
3757- // UploadDataCommand(table, "", true, None, Some(Map("header" -> "true")))
3758- //
3759- // val fs = FileSystem.get(sparkContext.hadoopConfiguration)
3760- // val defaultFs = FileSystem.getDefaultUri((sparkContext.hadoopConfiguration)).getScheme
3761- // val isDefaultLocal = defaultFs == null || defaultFs == "file"
3762- //
3763- // val files = fs.listStatus(new Path(path.toString)).
3764- // filter(p => p.getPath.toString.endsWith(".csv")).map(p => p.getPath.toString)
3765- // val csvFile = new Path(files(0))
3766- // val targetPath = new Path(targetTable.storage.locationUri.get.getPath)
3767- // val result = uploadDataCommand.performUpload(spark, fs, targetTable,
3768- // csvFile, isDefaultLocal)
3769- //
3770- // assert(result == targetPath)
3771- // checkAnswer(
3772- // sql("SELECT * FROM dest"),
3773- // Seq(
3774- // Row(0, "user_0", true),
3775- // Row(1, "user_1", false),
3776- // Row(2, "user_2", true),
3777- // Row(3, "user_3", false),
3778- // Row(4, "user_4", true)
3779- // )
3780- // )
3781- // }
3782- // }
3783- // }
3784- // }
3785- // }
3737+
3738+ test(" CARMEL-5229 : upload - Resolve table schema with upload csv header in case insensitive" ) {
3739+ withTempDir { dir =>
3740+ Seq (true , false ).foreach { ae =>
3741+ withSQLConf(SQLConf .ADAPTIVE_EXECUTION_ENABLED .key -> ae.toString,
3742+ SQLConf .AUTO_BROADCASTJOIN_THRESHOLD .key -> " 0" ,
3743+ SQLConf .COALESCE_PARTITIONS_ENABLED .key -> " false" ,
3744+ SQLConf .SHUFFLE_PARTITIONS .key -> " 5" ) {
3745+
3746+ withTable( " dest" ) {
3747+ sql(" CREATE TABLE dest(Id BIGINT, NAME STRING, sex Boolean) USING DELTA" )
3748+
3749+ val table = TableIdentifier (" dest" )
3750+ val targetTable = getCatalogTable(table)
3751+
3752+ val path = new Path (dir.getCanonicalPath, ae.toString)
3753+ spark.range(0 , 5 ).map(x => (x, s " user_ $x" , x % 2 == 0 )).toDF(" ID" , " name" , " Sex" ).
3754+ toDF().coalesce(1 ).write.option(" header" , true ).csv(path.toString)
3755+
3756+ val uploadDataCommand =
3757+ UploadDataCommand (table, " " , true , None , Map (" header" -> " true" ))
3758+
3759+ val fs = FileSystem .get(sparkContext.hadoopConfiguration)
3760+ val defaultFs = FileSystem .getDefaultUri((sparkContext.hadoopConfiguration)).getScheme
3761+ val isDefaultLocal = defaultFs == null || defaultFs == " file"
3762+
3763+ val files = fs.listStatus(new Path (path.toString)).
3764+ filter(p => p.getPath.toString.endsWith(" .csv" )).map(p => p.getPath.toString)
3765+ val csvFile = new Path (files(0 ))
3766+ val targetPath = new Path (targetTable.storage.locationUri.get.getPath)
3767+ val result = uploadDataCommand.performUpload(spark, fs, targetTable,
3768+ csvFile, isDefaultLocal)
3769+
3770+ assert(result == targetPath)
3771+ checkAnswer(
3772+ sql(" SELECT * FROM dest" ),
3773+ Seq (
3774+ Row (0 , " user_0" , true ),
3775+ Row (1 , " user_1" , false ),
3776+ Row (2 , " user_2" , true ),
3777+ Row (3 , " user_3" , false ),
3778+ Row (4 , " user_4" , true )
3779+ )
3780+ )
3781+ }
3782+ }
3783+ }
3784+ }
3785+ }
37863786
37873787 test(" CARMEL-5202: Fix NoSuchElementException when creating DELTA table" ) {
37883788 withView(" v1" ) {
0 commit comments