diff --git a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParseHelper.scala b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParseHelper.scala index 27311146454..effe2ae8314 100644 --- a/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParseHelper.scala +++ b/extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParseHelper.scala @@ -176,6 +176,8 @@ trait LineageParser { relationColumnLineage: AttributeMap[AttributeSet]): AttributeMap[AttributeSet] = { val mergedRelationColumnLineage = { relationOutput.foldLeft((ListMap[Attribute, AttributeSet](), relationColumnLineage)) { + case ((acc, x), attr) if x.isEmpty => + (acc + (attr -> AttributeSet.empty), x.empty) case ((acc, x), attr) => (acc + (attr -> x.head._2), x.tail) }._1 diff --git a/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParserHelperSuite.scala b/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParserHelperSuite.scala index b65cc4c5a40..88ef0553891 100644 --- a/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParserHelperSuite.scala +++ b/extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParserHelperSuite.scala @@ -59,10 +59,16 @@ class SparkSQLLineageParserHelperSuite extends KyuubiFunSuite " partitioned by(pid)") spark.sql("create table if not exists test_db0.test_table1" + " (key int, value string) using parquet") + spark.sql("create table test_db.test_table_from_dir" + + " (`a0` string, `b0` string) using parquet") } override def afterAll(): Unit = { - Seq("test_db0.test_table0", "test_db0.test_table1", "test_db0.test_table_part0").foreach { t => + Seq( + "test_db0.test_table0", + "test_db0.test_table1", + "test_db0.test_table_part0", + "test_db.test_table_from_dir").foreach { t => spark.sql(s"drop table if exists $t") } spark.sql("drop database if exists test_db") @@ -1442,6 +1448,34 @@ class SparkSQLLineageParserHelperSuite extends KyuubiFunSuite } } + test("test directory to table") { + val inputFile = getClass.getResource("/").getPath + "input_file" + val sourceFile = File(inputFile).createFile() + spark.sql( + s""" + |CREATE OR REPLACE TEMPORARY VIEW temp_view ( + | `a` STRING COMMENT '', + | `b` STRING COMMENT '' + |) USING csv OPTIONS( + | sep='\t', + | path='${sourceFile.path}' + |); + |""".stripMargin).collect() + + val ret0 = extractLineageWithoutExecuting( + s""" + |INSERT OVERWRITE TABLE test_db.test_table_from_dir + |SELECT `a`, `b` FROM temp_view + |""".stripMargin) + + assert(ret0 == Lineage( + List(), + List(s"spark_catalog.test_db.test_table_from_dir"), + List( + (s"spark_catalog.test_db.test_table_from_dir.a0", Set()), + (s"spark_catalog.test_db.test_table_from_dir.b0", Set())))) + } + private def extractLineageWithoutExecuting(sql: String): Lineage = { val parsed = spark.sessionState.sqlParser.parsePlan(sql) val analyzed = spark.sessionState.analyzer.execute(parsed)