Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,8 @@ trait LineageParser {
relationColumnLineage: AttributeMap[AttributeSet]): AttributeMap[AttributeSet] = {
val mergedRelationColumnLineage = {
relationOutput.foldLeft((ListMap[Attribute, AttributeSet](), relationColumnLineage)) {
case ((acc, x), attr) if x.isEmpty =>
(acc + (attr -> AttributeSet.empty), x.empty)
case ((acc, x), attr) =>
(acc + (attr -> x.head._2), x.tail)
}._1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,16 @@ class SparkSQLLineageParserHelperSuite extends KyuubiFunSuite
" partitioned by(pid)")
spark.sql("create table if not exists test_db0.test_table1" +
" (key int, value string) using parquet")
spark.sql("create table test_db.test_table_from_dir" +
" (`a0` string, `b0` string) using parquet")
}

override def afterAll(): Unit = {
Seq("test_db0.test_table0", "test_db0.test_table1", "test_db0.test_table_part0").foreach { t =>
Seq(
"test_db0.test_table0",
"test_db0.test_table1",
"test_db0.test_table_part0",
"test_db.test_table_from_dir").foreach { t =>
spark.sql(s"drop table if exists $t")
}
spark.sql("drop database if exists test_db")
Expand Down Expand Up @@ -1442,6 +1448,34 @@ class SparkSQLLineageParserHelperSuite extends KyuubiFunSuite
}
}

test("test directory to table") {
val inputFile = getClass.getResource("/").getPath + "input_file"
val sourceFile = File(inputFile).createFile()
spark.sql(
s"""
|CREATE OR REPLACE TEMPORARY VIEW temp_view (
| `a` STRING COMMENT '',
| `b` STRING COMMENT ''
|) USING csv OPTIONS(
| sep='\t',
| path='${sourceFile.path}'
|);
|""".stripMargin).collect()

val ret0 = extractLineageWithoutExecuting(
s"""
|INSERT OVERWRITE TABLE test_db.test_table_from_dir
|SELECT `a`, `b` FROM temp_view
|""".stripMargin)

assert(ret0 == Lineage(
List(),
List(s"spark_catalog.test_db.test_table_from_dir"),
List(
(s"spark_catalog.test_db.test_table_from_dir.a0", Set()),
(s"spark_catalog.test_db.test_table_from_dir.b0", Set()))))
}

private def extractLineageWithoutExecuting(sql: String): Lineage = {
val parsed = spark.sessionState.sqlParser.parsePlan(sql)
val analyzed = spark.sessionState.analyzer.execute(parsed)
Expand Down
Loading