Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ statement
| UNCACHE TABLE (IF EXISTS)? multipartIdentifier #uncacheTable
| CLEAR CACHE #clearCache
| LOAD DATA LOCAL? INPATH path=STRING OVERWRITE? INTO TABLE
tableIdentifier partitionSpec? #loadData
multipartIdentifier partitionSpec? #loadData
| TRUNCATE TABLE multipartIdentifier partitionSpec? #truncateTable
| MSCK REPAIR TABLE multipartIdentifier #repairTable
| op=(ADD | LIST) identifier .*? #manageResource
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2797,6 +2797,25 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
RepairTableStatement(visitMultipartIdentifier(ctx.multipartIdentifier()))
}

/**
* Create a [[LoadDataStatement]].
*
* For example:
* {{{
* LOAD DATA [LOCAL] INPATH 'filepath' [OVERWRITE] INTO TABLE multi_part_name
* [PARTITION (partcol1=val1, partcol2=val2 ...)]
* }}}
*/
override def visitLoadData(ctx: LoadDataContext): LogicalPlan = withOrigin(ctx) {
LoadDataStatement(
tableName = visitMultipartIdentifier(ctx.multipartIdentifier),
path = string(ctx.path),
isLocal = ctx.LOCAL != null,
isOverwrite = ctx.OVERWRITE != null,
partition = Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec)
)
}

/**
* Creates a [[ShowCreateTableStatement]]
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,16 @@ case class AnalyzeColumnStatement(
*/
case class RepairTableStatement(tableName: Seq[String]) extends ParsedStatement

/**
* A LOAD DATA INTO TABLE statement, as parsed from SQL
*/
case class LoadDataStatement(
tableName: Seq[String],
path: String,
isLocal: Boolean,
isOverwrite: Boolean,
partition: Option[TablePartitionSpec]) extends ParsedStatement

/**
* A SHOW CREATE TABLE statement, as parsed from SQL.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1045,6 +1045,33 @@ class DDLParserSuite extends AnalysisTest {
RepairTableStatement(Seq("a", "b", "c")))
}

test("LOAD DATA INTO table") {
comparePlans(
parsePlan("LOAD DATA INPATH 'filepath' INTO TABLE a.b.c"),
LoadDataStatement(Seq("a", "b", "c"), "filepath", false, false, None))

comparePlans(
parsePlan("LOAD DATA LOCAL INPATH 'filepath' INTO TABLE a.b.c"),
LoadDataStatement(Seq("a", "b", "c"), "filepath", true, false, None))

comparePlans(
parsePlan("LOAD DATA LOCAL INPATH 'filepath' OVERWRITE INTO TABLE a.b.c"),
LoadDataStatement(Seq("a", "b", "c"), "filepath", true, true, None))

comparePlans(
parsePlan(
s"""
|LOAD DATA LOCAL INPATH 'filepath' OVERWRITE INTO TABLE a.b.c
|PARTITION(ds='2017-06-10')
""".stripMargin),
LoadDataStatement(
Seq("a", "b", "c"),
"filepath",
true,
true,
Some(Map("ds" -> "2017-06-10"))))
}

test("SHOW CREATE table") {
comparePlans(
parsePlan("SHOW CREATE TABLE a.b.c"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, LookupCatalog, TableChange, V1Table}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.execution.command.{AlterTableAddColumnsCommand, AlterTableRecoverPartitionsCommand, AlterTableSetLocationCommand, AlterTableSetPropertiesCommand, AlterTableUnsetPropertiesCommand, AnalyzeColumnCommand, AnalyzePartitionCommand, AnalyzeTableCommand, CacheTableCommand, CreateDatabaseCommand, DescribeColumnCommand, DescribeTableCommand, DropTableCommand, ShowCreateTableCommand, ShowPartitionsCommand, ShowTablesCommand, TruncateTableCommand, UncacheTableCommand}
import org.apache.spark.sql.execution.command.{AlterTableAddColumnsCommand, AlterTableRecoverPartitionsCommand, AlterTableSetLocationCommand, AlterTableSetPropertiesCommand, AlterTableUnsetPropertiesCommand, AnalyzeColumnCommand, AnalyzePartitionCommand, AnalyzeTableCommand, CacheTableCommand, CreateDatabaseCommand, DescribeColumnCommand, DescribeTableCommand, DropTableCommand, LoadDataCommand, ShowCreateTableCommand, ShowPartitionsCommand, ShowTablesCommand, TruncateTableCommand, UncacheTableCommand}
import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, RefreshTable}
import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -299,6 +299,15 @@ class ResolveSessionCatalog(
v1TableName.asTableIdentifier,
"MSCK REPAIR TABLE")

case LoadDataStatement(tableName, path, isLocal, isOverwrite, partition) =>
val v1TableName = parseV1Table(tableName, "LOAD DATA")
LoadDataCommand(
v1TableName.asTableIdentifier,
path,
isLocal,
isOverwrite,
partition)

case ShowCreateTableStatement(tableName) =>
val v1TableName = parseV1Table(tableName, "SHOW CREATE TABLE")
ShowCreateTableCommand(v1TableName.asTableIdentifier)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,25 +273,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
options = Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty))
}

/**
* Create a [[LoadDataCommand]] command.
*
* For example:
* {{{
* LOAD DATA [LOCAL] INPATH 'filepath' [OVERWRITE] INTO TABLE tablename
* [PARTITION (partcol1=val1, partcol2=val2 ...)]
* }}}
*/
override def visitLoadData(ctx: LoadDataContext): LogicalPlan = withOrigin(ctx) {
LoadDataCommand(
table = visitTableIdentifier(ctx.tableIdentifier),
path = string(ctx.path),
isLocal = ctx.LOCAL != null,
isOverwrite = ctx.OVERWRITE != null,
partition = Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec)
)
}

/**
* Create an [[AlterDatabasePropertiesCommand]] command.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1268,6 +1268,24 @@ class DataSourceV2SQLSuite
}
}

test("LOAD DATA INTO TABLE") {
val t = "testcat.ns1.ns2.tbl"
withTable(t) {
sql(
s"""
|CREATE TABLE $t (id bigint, data string)
|USING foo
|PARTITIONED BY (id)
""".stripMargin)

testV1Command("LOAD DATA", s"INPATH 'filepath' INTO TABLE $t")
testV1Command("LOAD DATA", s"LOCAL INPATH 'filepath' INTO TABLE $t")
testV1Command("LOAD DATA", s"LOCAL INPATH 'filepath' OVERWRITE INTO TABLE $t")
testV1Command("LOAD DATA",
s"LOCAL INPATH 'filepath' OVERWRITE INTO TABLE $t PARTITION(id=1)")
}
}

test("SHOW CREATE TABLE") {
val t = "testcat.ns1.ns2.tbl"
withTable(t) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1410,29 +1410,4 @@ class DDLParserSuite extends AnalysisTest with SharedSparkSession {
assert(source4.table == "table2")
assert(location4 == Some("/spark/warehouse"))
}

test("load data") {
val v1 = "LOAD DATA INPATH 'path' INTO TABLE table1"
val (table, path, isLocal, isOverwrite, partition) = parser.parsePlan(v1).collect {
case LoadDataCommand(t, path, l, o, partition) => (t, path, l, o, partition)
}.head
assert(table.database.isEmpty)
assert(table.table == "table1")
assert(path == "path")
assert(!isLocal)
assert(!isOverwrite)
assert(partition.isEmpty)

val v2 = "LOAD DATA LOCAL INPATH 'path' OVERWRITE INTO TABLE table1 PARTITION(c='1', d='2')"
val (table2, path2, isLocal2, isOverwrite2, partition2) = parser.parsePlan(v2).collect {
case LoadDataCommand(t, path, l, o, partition) => (t, path, l, o, partition)
}.head
assert(table2.database.isEmpty)
assert(table2.table == "table1")
assert(path2 == "path")
assert(isLocal2)
assert(isOverwrite2)
assert(partition2.nonEmpty)
assert(partition2.get.apply("c") == "1" && partition2.get.apply("d") == "2")
}
}