Skip to content

Commit

Permalink
Optimize duplicate tests and code
Browse files Browse the repository at this point in the history
  • Loading branch information
dmetasoul-opensource committed Jan 28, 2022
1 parent 428a7f4 commit 9226b7c
Show file tree
Hide file tree
Showing 28 changed files with 141 additions and 209 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,15 @@ class LakeSoulSparkSessionExtension extends (SparkSessionExtensions => Unit) {
extensions.injectPostHocResolutionRule { session =>
PreprocessTableDelete(session.sessionState.conf)
}

extensions.injectPostHocResolutionRule { session =>
LakeSoulPostHocAnalysis(session)
}
extensions.injectResolutionRule{ session =>

extensions.injectResolutionRule { session =>
ProcessCDCTableMergeOnRead(session.sessionState.conf)
}

extensions.injectResolutionRule { session =>
RewriteQueryByMaterialView(session)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,37 +1,29 @@
package org.apache.spark.sql.lakesoul.rules

import org.apache.spark.sql.Column
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, EqualTo, Expression}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2ScanRelation}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.functions.expr
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.lakesoul.{LakeSoulTableProperties, LakeSoulTableRelationV2}
import org.apache.spark.sql.lakesoul.LakeSoulTableProperties
import org.apache.spark.sql.lakesoul.catalog.LakeSoulTableV2
import org.apache.spark.sql.lakesoul.exception.LakeSoulErrors
case class ProcessCDCTableMergeOnRead (sqlConf: SQLConf) extends Rule[LogicalPlan]{
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsDown {

case class ProcessCDCTableMergeOnRead(sqlConf: SQLConf) extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsDown {
case p: LogicalPlan if p.children.exists(_.isInstanceOf[DataSourceV2Relation]) && !p.isInstanceOf[Filter] =>
p.children.toSeq.find(_.isInstanceOf[DataSourceV2Relation]).get match {
case dsv2@DataSourceV2Relation(table: LakeSoulTableV2, _, _, _, _)=>{
val value=getLakeSoulTableCDCColumn(table)
if(value.nonEmpty){
p.withNewChildren(Filter(Column(expr(s" ${value.get}!= 'delete'").expr).expr,dsv2)::Nil)
}
else {
p
}
p.children.find(_.isInstanceOf[DataSourceV2Relation]).get match {
case dsv2@DataSourceV2Relation(table: LakeSoulTableV2, _, _, _, _) =>
val value = getLakeSoulTableCDCColumn(table)
if (value.nonEmpty) {
p.withNewChildren(Filter(Column(expr(s" ${value.get}!= 'delete'").expr).expr, dsv2) :: Nil)
}
else {
p
}
}

}
}
private def lakeSoulTableHasHashPartition(table: LakeSoulTableV2): Boolean = {
table.snapshotManagement.snapshot.getTableInfo.hash_column.nonEmpty
}
private def lakeSoulTableCDCColumn(table: LakeSoulTableV2): Boolean = {
table.snapshotManagement.snapshot.getTableInfo.configuration.contains(LakeSoulTableProperties.lakeSoulCDCChangePropKey)
}

private def getLakeSoulTableCDCColumn(table: LakeSoulTableV2): Option[String] = {
table.snapshotManagement.snapshot.getTableInfo.configuration.get(LakeSoulTableProperties.lakeSoulCDCChangePropKey)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ package com.dmetasoul.lakesoul.tables
import java.util.Locale

import org.apache.spark.sql.lakesoul.LakeSoulUtils
import org.apache.spark.sql.lakesoul.test.LakeSQLCommandSoulTest
import org.apache.spark.sql.lakesoul.test.LakeSoulSQLCommandTest
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.{AnalysisException, QueryTest}

class LakeSoulTableSuite extends QueryTest
with SharedSparkSession
with LakeSQLCommandSoulTest {
with LakeSoulSQLCommandTest {

test("forPath") {
withTempDir { dir =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ import com.dmetasoul.lakesoul.tables.LakeSoulTable
import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.functions.{col, last}
import org.apache.spark.sql.lakesoul.sources.LakeSoulSQLConf
import org.apache.spark.sql.lakesoul.test.{LakeSQLCommandSoulTest, TestUtils}
import org.apache.spark.sql.lakesoul.test.{LakeSoulSQLCommandTest, TestUtils}
import org.apache.spark.sql.test.SharedSparkSession
import org.scalatest.BeforeAndAfterEach

class ParquetScanSuite extends QueryTest
with SharedSparkSession with BeforeAndAfterEach
with LakeSQLCommandSoulTest {
with LakeSoulSQLCommandTest {

import testImplicits._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,30 +15,19 @@
*/

package org.apache.spark.sql.lakesoul
import org.apache.spark.sql.lakesoul.SnapshotManagement
import com.dmetasoul.lakesoul.tables.LakeSoulTable
import java.io.File
import java.util.Locale

import com.dmetasoul.lakesoul.meta.MetaVersion
import org.apache.hadoop.fs.Path
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.lakesoul.test.{LakeSQLCommandSoulTest, LakeSoulTestUtils}
import org.apache.spark.sql.lakesoul.utils.DataFileInfo
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.lakesoul.test.LakeSoulTestUtils
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.{MetadataBuilder, StructType}
import org.apache.spark.util.Utils
import org.scalatest.matchers.must.Matchers.contain
import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper
import org.apache.spark.sql.types.StructType

import java.io.File
import scala.language.implicitConversions

class TestCDC
class CDCSuite
extends QueryTest
with SharedSparkSession
with LakeSoulTestUtils {
Expand Down
4 changes: 2 additions & 2 deletions src/test/scala/org/apache/spark/sql/lakesoul/DDLSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@ import org.apache.spark.SparkException
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.lakesoul.schema.InvariantViolationException
import org.apache.spark.sql.lakesoul.test.LakeSQLCommandSoulTest
import org.apache.spark.sql.lakesoul.test.LakeSoulSQLCommandTest
import org.apache.spark.sql.test.{SQLTestUtils, SharedSparkSession}
import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructType}
import org.apache.spark.sql.{AnalysisException, QueryTest, Row}

import scala.collection.JavaConverters._

class DDLSuite extends DDLTestBase with SharedSparkSession
with LakeSQLCommandSoulTest {
with LakeSoulSQLCommandTest {

override protected def verifyDescribeTable(tblName: String): Unit = {
val res = sql(s"DESCRIBE TABLE $tblName").collect()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.lakesoul
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.lakesoul.test.LakeSQLCommandSoulTest
import org.apache.spark.sql.lakesoul.test.LakeSoulSQLCommandTest
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.{AnalysisException, QueryTest}
import org.scalatest.Tag
Expand Down Expand Up @@ -130,6 +130,6 @@ trait DDLUsingPathTests extends QueryTest

}

class DDLUsingPathSuite extends DDLUsingPathTests with LakeSQLCommandSoulTest {
class DDLUsingPathSuite extends DDLUsingPathTests with LakeSoulSQLCommandTest {
}

Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, Table,
import org.apache.spark.sql.connector.expressions._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.lakesoul.catalog.{LakeSoulCatalog, LakeSoulTableV2}
import org.apache.spark.sql.lakesoul.test.LakeSQLCommandSoulTest
import org.apache.spark.sql.lakesoul.test.LakeSoulSQLCommandTest
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.{LongType, StringType, StructType}
import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
Expand Down Expand Up @@ -411,7 +411,7 @@ trait DataFrameWriterV2Tests

class DataFrameWriterV2Suite
extends DataFrameWriterV2Tests
with LakeSQLCommandSoulTest {
with LakeSoulSQLCommandTest {

import testImplicits._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.{PARTITION_OVERWRITE_MODE, PartitionOverwriteMode}
import org.apache.spark.sql.lakesoul.schema.SchemaUtils
import org.apache.spark.sql.lakesoul.sources.LakeSoulSQLConf
import org.apache.spark.sql.lakesoul.test.{LakeSQLCommandSoulTest, LakeSoulTestUtils}
import org.apache.spark.sql.lakesoul.test.{LakeSoulSQLCommandTest, LakeSoulTestUtils}
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types._
import org.scalatest.BeforeAndAfter

import scala.collection.JavaConverters._

class InsertIntoSQLSuite extends InsertIntoTests(false, true)
with LakeSQLCommandSoulTest {
with LakeSoulSQLCommandTest {
override protected def doInsert(tableName: String, insert: DataFrame, mode: SaveMode): Unit = {
val tmpView = "tmp_view"
withTempView(tmpView) {
Expand All @@ -48,7 +48,7 @@ class InsertIntoSQLSuite extends InsertIntoTests(false, true)
}

class InsertIntoSQLByPathSuite extends InsertIntoTests(false, true)
with LakeSQLCommandSoulTest {
with LakeSoulSQLCommandTest {
override protected def doInsert(tableName: String, insert: DataFrame, mode: SaveMode): Unit = {
val tmpView = "tmp_view"
withTempView(tmpView) {
Expand Down Expand Up @@ -85,7 +85,7 @@ class InsertIntoSQLByPathSuite extends InsertIntoTests(false, true)
}

class InsertIntoDataFrameSuite extends InsertIntoTests(false, false)
with LakeSQLCommandSoulTest {
with LakeSoulSQLCommandTest {
override protected def doInsert(tableName: String, insert: DataFrame, mode: SaveMode): Unit = {
val dfw = insert.write.format(v2Format)
if (mode != null) {
Expand All @@ -96,7 +96,7 @@ class InsertIntoDataFrameSuite extends InsertIntoTests(false, false)
}

class InsertIntoDataFrameByPathSuite extends InsertIntoTests(false, false)
with LakeSQLCommandSoulTest {
with LakeSoulSQLCommandTest {
override protected def doInsert(tableName: String, insert: DataFrame, mode: SaveMode): Unit = {
val dfw = insert.write.format(v2Format)
if (mode != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import com.dmetasoul.lakesoul.tables.LakeSoulTable

import java.util.Locale
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.lakesoul.test.LakeSQLCommandSoulTest
import org.apache.spark.sql.lakesoul.test.LakeSoulSQLCommandTest
import org.apache.spark.sql.test.{SQLTestUtils, SharedSparkSession}
import org.apache.spark.sql.{AnalysisException, QueryTest}

Expand All @@ -30,7 +30,7 @@ import scala.util.control.NonFatal
class NotSupportedDDLSuite
extends NotSupportedDDLBase
with SharedSparkSession
with LakeSQLCommandSoulTest
with LakeSoulSQLCommandTest


abstract class NotSupportedDDLBase extends QueryTest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.lakesoul.test.{LakeSQLCommandSoulTest, LakeSoulTestUtils}
import org.apache.spark.sql.lakesoul.test.{LakeSoulSQLCommandTest, LakeSoulTestUtils}
import org.apache.spark.sql.lakesoul.utils.DataFileInfo
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.{MetadataBuilder, StructType}
Expand Down Expand Up @@ -1457,7 +1457,7 @@ trait TableCreationTests

class TableCreationSuite
extends TableCreationTests
with LakeSQLCommandSoulTest {
with LakeSoulSQLCommandTest {

private def loadTable(tableName: String): Table = {
val ti = spark.sessionState.sqlParser.parseMultipartIdentifier(tableName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.lakesoul.SnapshotManagement
import org.apache.spark.sql.lakesoul.catalog.LakeSoulCatalog
import org.apache.spark.sql.lakesoul.test.{LakeSQLCommandSoulTest, LakeSoulTestUtils}
import org.apache.spark.sql.lakesoul.test.{LakeSoulSQLCommandTest, LakeSoulTestUtils}
import org.apache.spark.sql.lakesoul.utils.DataFileInfo
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -1300,9 +1300,9 @@ trait AlterTableByPathTests extends AlterTableLakeSoulTestBase {

class AlterTableByNameSuite
extends AlterTableByNameTests
with LakeSQLCommandSoulTest {
with LakeSoulSQLCommandTest {


}

class AlterTableByPathSuite extends AlterTableByPathTests with LakeSQLCommandSoulTest
class AlterTableByPathSuite extends AlterTableByPathTests with LakeSoulSQLCommandTest
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@

package org.apache.spark.sql.lakesoul.commands

import org.apache.spark.sql.lakesoul.test.LakeSQLCommandSoulTest
import org.apache.spark.sql.lakesoul.test.LakeSoulSQLCommandTest

class DeleteSQLSuite extends DeleteSuiteBase with LakeSQLCommandSoulTest {
class DeleteSQLSuite extends DeleteSuiteBase with LakeSoulSQLCommandTest {

override protected def executeDelete(target: String, where: String = null): Unit = {
val whereClause = Option(where).map(c => s"WHERE $c").getOrElse("")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,13 @@ package org.apache.spark.sql.lakesoul.commands
import com.dmetasoul.lakesoul
import com.dmetasoul.lakesoul.tables.{LakeSoulTable, LakeSoulTableTestUtils}
import org.apache.spark.sql.lakesoul.SnapshotManagement
import org.apache.spark.sql.lakesoul.test.LakeSQLCommandSoulTest
import org.apache.spark.sql.lakesoul.test.LakeSoulSQLCommandTest
import org.apache.spark.sql.{Row, functions}

class DeleteScalaSuite extends DeleteSuiteBase with LakeSQLCommandSoulTest {
class DeleteScalaSuite extends DeleteSuiteBase with LakeSoulSQLCommandTest {

import testImplicits._


test("delete cached table by path") {
Seq((2, 2), (1, 4)).toDF("key", "value")
.write.mode("overwrite").format("lakesoul").save(tempPath)
Expand Down Expand Up @@ -64,7 +63,7 @@ class DeleteScalaSuite extends DeleteSuiteBase with LakeSQLCommandSoulTest {
case tableName :: Nil => tableName -> None // just table name
case tableName :: alias :: Nil => // tablename SPACE alias OR tab SPACE lename
val ordinary = (('a' to 'z') ++ ('A' to 'Z') ++ ('0' to '9')).toSet
if (!alias.forall(ordinary.contains(_))) {
if (!alias.forall(ordinary.contains)) {
(tableName + " " + alias) -> None
} else {
tableName -> Some(alias)
Expand All @@ -84,7 +83,7 @@ class DeleteScalaSuite extends DeleteSuiteBase with LakeSQLCommandSoulTest {
LakeSoulTableTestUtils.createTable(spark.table(tableNameOrPath),
SnapshotManagement(tableNameOrPath))
}
optionalAlias.map(table.as(_)).getOrElse(table)
optionalAlias.map(table.as).getOrElse(table)
}

if (where != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,50 +17,16 @@
package org.apache.spark.sql.lakesoul.commands

import com.dmetasoul.lakesoul.tables.LakeSoulTable

import java.io.File
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.lakesoul.SnapshotManagement
import org.apache.spark.sql.lakesoul.test.LakeSoulTestBeforeAndAfterEach
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row}
import org.apache.spark.util.Utils
import org.scalatest.BeforeAndAfterEach

abstract class DeleteSuiteBase extends QueryTest
with SharedSparkSession with BeforeAndAfterEach {
with SharedSparkSession with LakeSoulTestBeforeAndAfterEach {

import testImplicits._

var tempDir: File = _

var snapshotManagement: SnapshotManagement = _

protected def tempPath: String = tempDir.getCanonicalPath

protected def readLakeSoulTable(path: String): DataFrame = {
spark.read.format("lakesoul").load(path)
}

override def beforeEach() {
super.beforeEach()
tempDir = Utils.createTempDir()
snapshotManagement = SnapshotManagement(tempPath)
}

override def afterEach() {
try {
Utils.deleteRecursively(tempDir)
try {
snapshotManagement.updateSnapshot()
LakeSoulTable.forPath(snapshotManagement.table_name).dropTable()
} catch {
case e: Exception =>
}
} finally {
super.afterEach()
}
}

protected def executeDelete(target: String, where: String = null): Unit

protected def append(df: DataFrame, partitionBy: Seq[String] = Nil): Unit = {
Expand Down
Loading

0 comments on commit 9226b7c

Please sign in to comment.