Skip to content

Commit

Permalink
Merge pull request #6 from meta-soul/refine_tests
Browse files Browse the repository at this point in the history
Optimize duplicate tests and code
  • Loading branch information
Asakiny committed Jan 28, 2022
2 parents 428a7f4 + 0f3a3b8 commit 2f756fe
Show file tree
Hide file tree
Showing 28 changed files with 155 additions and 236 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,15 @@ class LakeSoulSparkSessionExtension extends (SparkSessionExtensions => Unit) {
extensions.injectPostHocResolutionRule { session =>
PreprocessTableDelete(session.sessionState.conf)
}

extensions.injectPostHocResolutionRule { session =>
LakeSoulPostHocAnalysis(session)
}
extensions.injectResolutionRule{ session =>

extensions.injectResolutionRule { session =>
ProcessCDCTableMergeOnRead(session.sessionState.conf)
}

extensions.injectResolutionRule { session =>
RewriteQueryByMaterialView(session)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,39 +1,46 @@
/*
* Copyright [2022] [DMetaSoul Team]
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.lakesoul.rules

import org.apache.spark.sql.Column
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, EqualTo, Expression}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2ScanRelation}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.functions.expr
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.lakesoul.{LakeSoulTableProperties, LakeSoulTableRelationV2}
import org.apache.spark.sql.lakesoul.LakeSoulTableProperties
import org.apache.spark.sql.lakesoul.catalog.LakeSoulTableV2
import org.apache.spark.sql.lakesoul.exception.LakeSoulErrors
case class ProcessCDCTableMergeOnRead (sqlConf: SQLConf) extends Rule[LogicalPlan]{
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsDown {

case class ProcessCDCTableMergeOnRead(sqlConf: SQLConf) extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsDown {
case p: LogicalPlan if p.children.exists(_.isInstanceOf[DataSourceV2Relation]) && !p.isInstanceOf[Filter] =>
p.children.toSeq.find(_.isInstanceOf[DataSourceV2Relation]).get match {
case dsv2@DataSourceV2Relation(table: LakeSoulTableV2, _, _, _, _)=>{
val value=getLakeSoulTableCDCColumn(table)
if(value.nonEmpty){
p.withNewChildren(Filter(Column(expr(s" ${value.get}!= 'delete'").expr).expr,dsv2)::Nil)
}
else {
p
}
p.children.find(_.isInstanceOf[DataSourceV2Relation]).get match {
case dsv2@DataSourceV2Relation(table: LakeSoulTableV2, _, _, _, _) =>
val value = getLakeSoulTableCDCColumn(table)
if (value.nonEmpty) {
p.withNewChildren(Filter(Column(expr(s" ${value.get}!= 'delete'").expr).expr, dsv2) :: Nil)
}
else {
p
}
}

}
}
private def lakeSoulTableHasHashPartition(table: LakeSoulTableV2): Boolean = {
table.snapshotManagement.snapshot.getTableInfo.hash_column.nonEmpty
}
private def lakeSoulTableCDCColumn(table: LakeSoulTableV2): Boolean = {
table.snapshotManagement.snapshot.getTableInfo.configuration.contains(LakeSoulTableProperties.lakeSoulCDCChangePropKey)
}

private def getLakeSoulTableCDCColumn(table: LakeSoulTableV2): Option[String] = {
table.snapshotManagement.snapshot.getTableInfo.configuration.get(LakeSoulTableProperties.lakeSoulCDCChangePropKey)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ package com.dmetasoul.lakesoul.tables
import java.util.Locale

import org.apache.spark.sql.lakesoul.LakeSoulUtils
import org.apache.spark.sql.lakesoul.test.LakeSQLCommandSoulTest
import org.apache.spark.sql.lakesoul.test.LakeSoulSQLCommandTest
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.{AnalysisException, QueryTest}

class LakeSoulTableSuite extends QueryTest
with SharedSparkSession
with LakeSQLCommandSoulTest {
with LakeSoulSQLCommandTest {

test("forPath") {
withTempDir { dir =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ import com.dmetasoul.lakesoul.tables.LakeSoulTable
import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.functions.{col, last}
import org.apache.spark.sql.lakesoul.sources.LakeSoulSQLConf
import org.apache.spark.sql.lakesoul.test.{LakeSQLCommandSoulTest, TestUtils}
import org.apache.spark.sql.lakesoul.test.{LakeSoulSQLCommandTest, TestUtils}
import org.apache.spark.sql.test.SharedSparkSession
import org.scalatest.BeforeAndAfterEach

class ParquetScanSuite extends QueryTest
with SharedSparkSession with BeforeAndAfterEach
with LakeSQLCommandSoulTest {
with LakeSoulSQLCommandTest {

import testImplicits._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,30 +15,17 @@
*/

package org.apache.spark.sql.lakesoul
import org.apache.spark.sql.lakesoul.SnapshotManagement
import com.dmetasoul.lakesoul.tables.LakeSoulTable
import java.io.File
import java.util.Locale

import com.dmetasoul.lakesoul.meta.MetaVersion
import org.apache.hadoop.fs.Path
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.lakesoul.test.{LakeSQLCommandSoulTest, LakeSoulTestUtils}
import org.apache.spark.sql.lakesoul.utils.DataFileInfo
import org.apache.spark.sql.lakesoul.test.LakeSoulTestUtils
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.{MetadataBuilder, StructType}
import org.apache.spark.util.Utils
import org.scalatest.matchers.must.Matchers.contain
import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper
import org.apache.spark.sql.types.StructType

import scala.language.implicitConversions

class TestCDC
class CDCSuite
extends QueryTest
with SharedSparkSession
with LakeSoulTestUtils {
Expand All @@ -47,24 +34,6 @@ class TestCDC

val format = "lakesoul"

private def createTableByPath(path: File,
df: DataFrame,
tableName: String,
partitionedBy: Seq[String] = Nil): Unit = {
df.write
.partitionBy(partitionedBy: _*)
.mode(SaveMode.Append)
.format(format)
.save(path.getCanonicalPath)

sql(
s"""
|CREATE TABLE lakesoul_test
|USING lakesoul
|LOCATION '${path.getCanonicalPath}'
""".stripMargin)
}

private implicit def toTableIdentifier(tableName: String): TableIdentifier = {
spark.sessionState.sqlParser.parseTableIdentifier(tableName)
}
Expand All @@ -85,14 +54,6 @@ class TestCDC
spark.sessionState.catalog.getTableMetadata(tableName).schema
}

private def getSnapshotManagement(table: CatalogTable): SnapshotManagement = {
getSnapshotManagement(new Path(table.storage.locationUri.get))
}

private def getSnapshotManagement(tableName: String): SnapshotManagement = {
getSnapshotManagement(spark.sessionState.catalog.getTableMetadata(tableName))
}

protected def getSnapshotManagement(path: Path): SnapshotManagement = {
SnapshotManagement(path)
}
Expand Down
4 changes: 2 additions & 2 deletions src/test/scala/org/apache/spark/sql/lakesoul/DDLSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@ import org.apache.spark.SparkException
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.lakesoul.schema.InvariantViolationException
import org.apache.spark.sql.lakesoul.test.LakeSQLCommandSoulTest
import org.apache.spark.sql.lakesoul.test.LakeSoulSQLCommandTest
import org.apache.spark.sql.test.{SQLTestUtils, SharedSparkSession}
import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructType}
import org.apache.spark.sql.{AnalysisException, QueryTest, Row}

import scala.collection.JavaConverters._

class DDLSuite extends DDLTestBase with SharedSparkSession
with LakeSQLCommandSoulTest {
with LakeSoulSQLCommandTest {

override protected def verifyDescribeTable(tblName: String): Unit = {
val res = sql(s"DESCRIBE TABLE $tblName").collect()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.lakesoul
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.lakesoul.test.LakeSQLCommandSoulTest
import org.apache.spark.sql.lakesoul.test.LakeSoulSQLCommandTest
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.{AnalysisException, QueryTest}
import org.scalatest.Tag
Expand Down Expand Up @@ -130,6 +130,6 @@ trait DDLUsingPathTests extends QueryTest

}

class DDLUsingPathSuite extends DDLUsingPathTests with LakeSQLCommandSoulTest {
class DDLUsingPathSuite extends DDLUsingPathTests with LakeSoulSQLCommandTest {
}

Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, Table,
import org.apache.spark.sql.connector.expressions._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.lakesoul.catalog.{LakeSoulCatalog, LakeSoulTableV2}
import org.apache.spark.sql.lakesoul.test.LakeSQLCommandSoulTest
import org.apache.spark.sql.lakesoul.test.LakeSoulSQLCommandTest
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.{LongType, StringType, StructType}
import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
Expand Down Expand Up @@ -411,7 +411,7 @@ trait DataFrameWriterV2Tests

class DataFrameWriterV2Suite
extends DataFrameWriterV2Tests
with LakeSQLCommandSoulTest {
with LakeSoulSQLCommandTest {

import testImplicits._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.{PARTITION_OVERWRITE_MODE, PartitionOverwriteMode}
import org.apache.spark.sql.lakesoul.schema.SchemaUtils
import org.apache.spark.sql.lakesoul.sources.LakeSoulSQLConf
import org.apache.spark.sql.lakesoul.test.{LakeSQLCommandSoulTest, LakeSoulTestUtils}
import org.apache.spark.sql.lakesoul.test.{LakeSoulSQLCommandTest, LakeSoulTestUtils}
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types._
import org.scalatest.BeforeAndAfter

import scala.collection.JavaConverters._

class InsertIntoSQLSuite extends InsertIntoTests(false, true)
with LakeSQLCommandSoulTest {
with LakeSoulSQLCommandTest {
override protected def doInsert(tableName: String, insert: DataFrame, mode: SaveMode): Unit = {
val tmpView = "tmp_view"
withTempView(tmpView) {
Expand All @@ -48,7 +48,7 @@ class InsertIntoSQLSuite extends InsertIntoTests(false, true)
}

class InsertIntoSQLByPathSuite extends InsertIntoTests(false, true)
with LakeSQLCommandSoulTest {
with LakeSoulSQLCommandTest {
override protected def doInsert(tableName: String, insert: DataFrame, mode: SaveMode): Unit = {
val tmpView = "tmp_view"
withTempView(tmpView) {
Expand Down Expand Up @@ -85,7 +85,7 @@ class InsertIntoSQLByPathSuite extends InsertIntoTests(false, true)
}

class InsertIntoDataFrameSuite extends InsertIntoTests(false, false)
with LakeSQLCommandSoulTest {
with LakeSoulSQLCommandTest {
override protected def doInsert(tableName: String, insert: DataFrame, mode: SaveMode): Unit = {
val dfw = insert.write.format(v2Format)
if (mode != null) {
Expand All @@ -96,7 +96,7 @@ class InsertIntoDataFrameSuite extends InsertIntoTests(false, false)
}

class InsertIntoDataFrameByPathSuite extends InsertIntoTests(false, false)
with LakeSQLCommandSoulTest {
with LakeSoulSQLCommandTest {
override protected def doInsert(tableName: String, insert: DataFrame, mode: SaveMode): Unit = {
val dfw = insert.write.format(v2Format)
if (mode != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import com.dmetasoul.lakesoul.tables.LakeSoulTable

import java.util.Locale
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.lakesoul.test.LakeSQLCommandSoulTest
import org.apache.spark.sql.lakesoul.test.LakeSoulSQLCommandTest
import org.apache.spark.sql.test.{SQLTestUtils, SharedSparkSession}
import org.apache.spark.sql.{AnalysisException, QueryTest}

Expand All @@ -30,7 +30,7 @@ import scala.util.control.NonFatal
class NotSupportedDDLSuite
extends NotSupportedDDLBase
with SharedSparkSession
with LakeSQLCommandSoulTest
with LakeSoulSQLCommandTest


abstract class NotSupportedDDLBase extends QueryTest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.lakesoul.test.{LakeSQLCommandSoulTest, LakeSoulTestUtils}
import org.apache.spark.sql.lakesoul.test.{LakeSoulSQLCommandTest, LakeSoulTestUtils}
import org.apache.spark.sql.lakesoul.utils.DataFileInfo
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.{MetadataBuilder, StructType}
Expand Down Expand Up @@ -1457,7 +1457,7 @@ trait TableCreationTests

class TableCreationSuite
extends TableCreationTests
with LakeSQLCommandSoulTest {
with LakeSoulSQLCommandTest {

private def loadTable(tableName: String): Table = {
val ti = spark.sessionState.sqlParser.parseMultipartIdentifier(tableName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.lakesoul.SnapshotManagement
import org.apache.spark.sql.lakesoul.catalog.LakeSoulCatalog
import org.apache.spark.sql.lakesoul.test.{LakeSQLCommandSoulTest, LakeSoulTestUtils}
import org.apache.spark.sql.lakesoul.test.{LakeSoulSQLCommandTest, LakeSoulTestUtils}
import org.apache.spark.sql.lakesoul.utils.DataFileInfo
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -1300,9 +1300,9 @@ trait AlterTableByPathTests extends AlterTableLakeSoulTestBase {

class AlterTableByNameSuite
extends AlterTableByNameTests
with LakeSQLCommandSoulTest {
with LakeSoulSQLCommandTest {


}

class AlterTableByPathSuite extends AlterTableByPathTests with LakeSQLCommandSoulTest
class AlterTableByPathSuite extends AlterTableByPathTests with LakeSoulSQLCommandTest
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@

package org.apache.spark.sql.lakesoul.commands

import org.apache.spark.sql.lakesoul.test.LakeSQLCommandSoulTest
import org.apache.spark.sql.lakesoul.test.LakeSoulSQLCommandTest

class DeleteSQLSuite extends DeleteSuiteBase with LakeSQLCommandSoulTest {
class DeleteSQLSuite extends DeleteSuiteBase with LakeSoulSQLCommandTest {

override protected def executeDelete(target: String, where: String = null): Unit = {
val whereClause = Option(where).map(c => s"WHERE $c").getOrElse("")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,13 @@ package org.apache.spark.sql.lakesoul.commands
import com.dmetasoul.lakesoul
import com.dmetasoul.lakesoul.tables.{LakeSoulTable, LakeSoulTableTestUtils}
import org.apache.spark.sql.lakesoul.SnapshotManagement
import org.apache.spark.sql.lakesoul.test.LakeSQLCommandSoulTest
import org.apache.spark.sql.lakesoul.test.LakeSoulSQLCommandTest
import org.apache.spark.sql.{Row, functions}

class DeleteScalaSuite extends DeleteSuiteBase with LakeSQLCommandSoulTest {
class DeleteScalaSuite extends DeleteSuiteBase with LakeSoulSQLCommandTest {

import testImplicits._


test("delete cached table by path") {
Seq((2, 2), (1, 4)).toDF("key", "value")
.write.mode("overwrite").format("lakesoul").save(tempPath)
Expand Down Expand Up @@ -64,7 +63,7 @@ class DeleteScalaSuite extends DeleteSuiteBase with LakeSQLCommandSoulTest {
case tableName :: Nil => tableName -> None // just table name
case tableName :: alias :: Nil => // tablename SPACE alias OR tab SPACE lename
val ordinary = (('a' to 'z') ++ ('A' to 'Z') ++ ('0' to '9')).toSet
if (!alias.forall(ordinary.contains(_))) {
if (!alias.forall(ordinary.contains)) {
(tableName + " " + alias) -> None
} else {
tableName -> Some(alias)
Expand All @@ -84,7 +83,7 @@ class DeleteScalaSuite extends DeleteSuiteBase with LakeSQLCommandSoulTest {
LakeSoulTableTestUtils.createTable(spark.table(tableNameOrPath),
SnapshotManagement(tableNameOrPath))
}
optionalAlias.map(table.as(_)).getOrElse(table)
optionalAlias.map(table.as).getOrElse(table)
}

if (where != null) {
Expand Down
Loading

0 comments on commit 2f756fe

Please sign in to comment.