From 0f3a3b86cb215bd39101efaa4ee324a96ced36da Mon Sep 17 00:00:00 2001 From: dmetasoul01 Date: Fri, 28 Jan 2022 18:30:55 +0800 Subject: [PATCH] Optimize duplicate tests and code --- .../sql/LakeSoulSparkSessionExtension.scala | 5 +- .../rules/ProcessCDCTableMergeOnRead.scala | 57 +++++++++++-------- .../lakesoul/tables/LakeSoulTableSuite.scala | 4 +- .../datasource/ParquetScanSuite.scala | 4 +- .../{TestCDC.scala => CDCSuite.scala} | 45 +-------------- .../apache/spark/sql/lakesoul/DDLSuite.scala | 4 +- .../sql/lakesoul/DDLUsingPathSuite.scala | 4 +- .../sql/lakesoul/DataFrameWriterV2Suite.scala | 4 +- .../sql/lakesoul/InsertIntoTableSuite.scala | 10 ++-- .../sql/lakesoul/NotSupportedDDLSuite.scala | 4 +- .../sql/lakesoul/TableCreationTests.scala | 4 +- .../lakesoul/commands/AlterTableTests.scala | 6 +- .../lakesoul/commands/DeleteSQLSuite.scala | 4 +- .../lakesoul/commands/DeleteScalaSuite.scala | 9 ++- .../lakesoul/commands/DeleteSuiteBase.scala | 38 +------------ .../lakesoul/commands/MaterialViewSuite.scala | 4 +- .../lakesoul/commands/MergeIntoSQLSuite.scala | 28 +++++++-- .../lakesoul/commands/UpdateSQLSuite.scala | 4 +- .../lakesoul/commands/UpdateScalaSuite.scala | 4 +- .../lakesoul/commands/UpdateSuiteBase.scala | 41 +------------ .../lakesoul/commands/UpsertSuiteBase.scala | 40 +------------ .../LakeSoulPostHocAnalysisSuiteSoul.scala | 4 +- .../RewriteQueryByMaterialViewBase.scala | 4 +- .../schema/CaseSensitivitySuite.scala | 4 +- .../schema/SchemaEnforcementSuite.scala | 6 +- .../schema/SchemaValidationSuite.scala | 4 +- ...est.scala => LakeSoulSQLCommandTest.scala} | 2 +- .../spark/sql/lakesoul/test/TestUtils.scala | 44 ++++++++++++-- 28 files changed, 155 insertions(+), 236 deletions(-) rename src/test/scala/org/apache/spark/sql/lakesoul/{TestCDC.scala => CDCSuite.scala} (61%) rename src/test/scala/org/apache/spark/sql/lakesoul/test/{LakeSQLCommandSoulTest.scala => LakeSoulSQLCommandTest.scala} (98%) diff --git a/src/main/scala/com/dmetasoul/lakesoul/sql/LakeSoulSparkSessionExtension.scala b/src/main/scala/com/dmetasoul/lakesoul/sql/LakeSoulSparkSessionExtension.scala index c7e8f9d1d..3fe68cb34 100644 --- a/src/main/scala/com/dmetasoul/lakesoul/sql/LakeSoulSparkSessionExtension.scala +++ b/src/main/scala/com/dmetasoul/lakesoul/sql/LakeSoulSparkSessionExtension.scala @@ -107,12 +107,15 @@ class LakeSoulSparkSessionExtension extends (SparkSessionExtensions => Unit) { extensions.injectPostHocResolutionRule { session => PreprocessTableDelete(session.sessionState.conf) } + extensions.injectPostHocResolutionRule { session => LakeSoulPostHocAnalysis(session) } - extensions.injectResolutionRule{ session => + + extensions.injectResolutionRule { session => ProcessCDCTableMergeOnRead(session.sessionState.conf) } + extensions.injectResolutionRule { session => RewriteQueryByMaterialView(session) } diff --git a/src/main/scala/org/apache/spark/sql/lakesoul/rules/ProcessCDCTableMergeOnRead.scala b/src/main/scala/org/apache/spark/sql/lakesoul/rules/ProcessCDCTableMergeOnRead.scala index 7d903c055..3ed1df4f0 100644 --- a/src/main/scala/org/apache/spark/sql/lakesoul/rules/ProcessCDCTableMergeOnRead.scala +++ b/src/main/scala/org/apache/spark/sql/lakesoul/rules/ProcessCDCTableMergeOnRead.scala @@ -1,39 +1,46 @@ +/* + * Copyright [2022] [DMetaSoul Team] + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.spark.sql.lakesoul.rules + import org.apache.spark.sql.Column -import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases -import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, EqualTo, Expression} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2ScanRelation} +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.functions.expr import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.lakesoul.{LakeSoulTableProperties, LakeSoulTableRelationV2} +import org.apache.spark.sql.lakesoul.LakeSoulTableProperties import org.apache.spark.sql.lakesoul.catalog.LakeSoulTableV2 -import org.apache.spark.sql.lakesoul.exception.LakeSoulErrors -case class ProcessCDCTableMergeOnRead (sqlConf: SQLConf) extends Rule[LogicalPlan]{ - override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsDown { + +case class ProcessCDCTableMergeOnRead(sqlConf: SQLConf) extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsDown { case p: LogicalPlan if p.children.exists(_.isInstanceOf[DataSourceV2Relation]) && !p.isInstanceOf[Filter] => - p.children.toSeq.find(_.isInstanceOf[DataSourceV2Relation]).get match { - case dsv2@DataSourceV2Relation(table: LakeSoulTableV2, _, _, _, _)=>{ - val value=getLakeSoulTableCDCColumn(table) - if(value.nonEmpty){ - p.withNewChildren(Filter(Column(expr(s" ${value.get}!= 'delete'").expr).expr,dsv2)::Nil) - } - else { - p - } + p.children.find(_.isInstanceOf[DataSourceV2Relation]).get match { + case dsv2@DataSourceV2Relation(table: LakeSoulTableV2, _, _, _, _) => + val value = getLakeSoulTableCDCColumn(table) + if (value.nonEmpty) { + p.withNewChildren(Filter(Column(expr(s" ${value.get}!= 'delete'").expr).expr, dsv2) :: Nil) + } + else { + p + } } - - } - } - private def lakeSoulTableHasHashPartition(table: LakeSoulTableV2): Boolean = { - table.snapshotManagement.snapshot.getTableInfo.hash_column.nonEmpty - } - private def lakeSoulTableCDCColumn(table: LakeSoulTableV2): Boolean = { - table.snapshotManagement.snapshot.getTableInfo.configuration.contains(LakeSoulTableProperties.lakeSoulCDCChangePropKey) } + private def getLakeSoulTableCDCColumn(table: LakeSoulTableV2): Option[String] = { table.snapshotManagement.snapshot.getTableInfo.configuration.get(LakeSoulTableProperties.lakeSoulCDCChangePropKey) } - } diff --git a/src/test/scala/com/dmetasoul/lakesoul/tables/LakeSoulTableSuite.scala b/src/test/scala/com/dmetasoul/lakesoul/tables/LakeSoulTableSuite.scala index 3758076e9..e6ec13fca 100644 --- a/src/test/scala/com/dmetasoul/lakesoul/tables/LakeSoulTableSuite.scala +++ b/src/test/scala/com/dmetasoul/lakesoul/tables/LakeSoulTableSuite.scala @@ -19,13 +19,13 @@ package com.dmetasoul.lakesoul.tables import java.util.Locale import org.apache.spark.sql.lakesoul.LakeSoulUtils -import org.apache.spark.sql.lakesoul.test.LakeSQLCommandSoulTest +import org.apache.spark.sql.lakesoul.test.LakeSoulSQLCommandTest import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.{AnalysisException, QueryTest} class LakeSoulTableSuite extends QueryTest with SharedSparkSession - with LakeSQLCommandSoulTest { + with LakeSoulSQLCommandTest { test("forPath") { withTempDir { dir => diff --git a/src/test/scala/org/apache/spark/sql/execution/datasource/ParquetScanSuite.scala b/src/test/scala/org/apache/spark/sql/execution/datasource/ParquetScanSuite.scala index 0e380d924..a8e0b976f 100644 --- a/src/test/scala/org/apache/spark/sql/execution/datasource/ParquetScanSuite.scala +++ b/src/test/scala/org/apache/spark/sql/execution/datasource/ParquetScanSuite.scala @@ -20,13 +20,13 @@ import com.dmetasoul.lakesoul.tables.LakeSoulTable import org.apache.spark.sql.QueryTest import org.apache.spark.sql.functions.{col, last} import org.apache.spark.sql.lakesoul.sources.LakeSoulSQLConf -import org.apache.spark.sql.lakesoul.test.{LakeSQLCommandSoulTest, TestUtils} +import org.apache.spark.sql.lakesoul.test.{LakeSoulSQLCommandTest, TestUtils} import org.apache.spark.sql.test.SharedSparkSession import org.scalatest.BeforeAndAfterEach class ParquetScanSuite extends QueryTest with SharedSparkSession with BeforeAndAfterEach - with LakeSQLCommandSoulTest { + with LakeSoulSQLCommandTest { import testImplicits._ diff --git a/src/test/scala/org/apache/spark/sql/lakesoul/TestCDC.scala b/src/test/scala/org/apache/spark/sql/lakesoul/CDCSuite.scala similarity index 61% rename from src/test/scala/org/apache/spark/sql/lakesoul/TestCDC.scala rename to src/test/scala/org/apache/spark/sql/lakesoul/CDCSuite.scala index 43266840e..2062b760d 100644 --- a/src/test/scala/org/apache/spark/sql/lakesoul/TestCDC.scala +++ b/src/test/scala/org/apache/spark/sql/lakesoul/CDCSuite.scala @@ -15,30 +15,17 @@ */ package org.apache.spark.sql.lakesoul -import org.apache.spark.sql.lakesoul.SnapshotManagement -import com.dmetasoul.lakesoul.tables.LakeSoulTable -import java.io.File -import java.util.Locale -import com.dmetasoul.lakesoul.meta.MetaVersion import org.apache.hadoop.fs.Path import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.NoSuchTableException -import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType} -import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog} -import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.lakesoul.test.{LakeSQLCommandSoulTest, LakeSoulTestUtils} -import org.apache.spark.sql.lakesoul.utils.DataFileInfo +import org.apache.spark.sql.lakesoul.test.LakeSoulTestUtils import org.apache.spark.sql.test.SharedSparkSession -import org.apache.spark.sql.types.{MetadataBuilder, StructType} -import org.apache.spark.util.Utils -import org.scalatest.matchers.must.Matchers.contain -import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper +import org.apache.spark.sql.types.StructType import scala.language.implicitConversions -class TestCDC +class CDCSuite extends QueryTest with SharedSparkSession with LakeSoulTestUtils { @@ -47,24 +34,6 @@ class TestCDC val format = "lakesoul" - private def createTableByPath(path: File, - df: DataFrame, - tableName: String, - partitionedBy: Seq[String] = Nil): Unit = { - df.write - .partitionBy(partitionedBy: _*) - .mode(SaveMode.Append) - .format(format) - .save(path.getCanonicalPath) - - sql( - s""" - |CREATE TABLE lakesoul_test - |USING lakesoul - |LOCATION '${path.getCanonicalPath}' - """.stripMargin) - } - private implicit def toTableIdentifier(tableName: String): TableIdentifier = { spark.sessionState.sqlParser.parseTableIdentifier(tableName) } @@ -85,14 +54,6 @@ class TestCDC spark.sessionState.catalog.getTableMetadata(tableName).schema } - private def getSnapshotManagement(table: CatalogTable): SnapshotManagement = { - getSnapshotManagement(new Path(table.storage.locationUri.get)) - } - - private def getSnapshotManagement(tableName: String): SnapshotManagement = { - getSnapshotManagement(spark.sessionState.catalog.getTableMetadata(tableName)) - } - protected def getSnapshotManagement(path: Path): SnapshotManagement = { SnapshotManagement(path) } diff --git a/src/test/scala/org/apache/spark/sql/lakesoul/DDLSuite.scala b/src/test/scala/org/apache/spark/sql/lakesoul/DDLSuite.scala index d2e00ab35..d4a2b2ac5 100644 --- a/src/test/scala/org/apache/spark/sql/lakesoul/DDLSuite.scala +++ b/src/test/scala/org/apache/spark/sql/lakesoul/DDLSuite.scala @@ -20,7 +20,7 @@ import org.apache.spark.SparkException import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.lakesoul.schema.InvariantViolationException -import org.apache.spark.sql.lakesoul.test.LakeSQLCommandSoulTest +import org.apache.spark.sql.lakesoul.test.LakeSoulSQLCommandTest import org.apache.spark.sql.test.{SQLTestUtils, SharedSparkSession} import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructType} import org.apache.spark.sql.{AnalysisException, QueryTest, Row} @@ -28,7 +28,7 @@ import org.apache.spark.sql.{AnalysisException, QueryTest, Row} import scala.collection.JavaConverters._ class DDLSuite extends DDLTestBase with SharedSparkSession - with LakeSQLCommandSoulTest { + with LakeSoulSQLCommandTest { override protected def verifyDescribeTable(tblName: String): Unit = { val res = sql(s"DESCRIBE TABLE $tblName").collect() diff --git a/src/test/scala/org/apache/spark/sql/lakesoul/DDLUsingPathSuite.scala b/src/test/scala/org/apache/spark/sql/lakesoul/DDLUsingPathSuite.scala index e2b8b55ba..73826aced 100644 --- a/src/test/scala/org/apache/spark/sql/lakesoul/DDLUsingPathSuite.scala +++ b/src/test/scala/org/apache/spark/sql/lakesoul/DDLUsingPathSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.lakesoul import org.apache.hadoop.fs.Path import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.lakesoul.test.LakeSQLCommandSoulTest +import org.apache.spark.sql.lakesoul.test.LakeSoulSQLCommandTest import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.{AnalysisException, QueryTest} import org.scalatest.Tag @@ -130,6 +130,6 @@ trait DDLUsingPathTests extends QueryTest } -class DDLUsingPathSuite extends DDLUsingPathTests with LakeSQLCommandSoulTest { +class DDLUsingPathSuite extends DDLUsingPathTests with LakeSoulSQLCommandTest { } diff --git a/src/test/scala/org/apache/spark/sql/lakesoul/DataFrameWriterV2Suite.scala b/src/test/scala/org/apache/spark/sql/lakesoul/DataFrameWriterV2Suite.scala index cfb802158..b20e5543c 100644 --- a/src/test/scala/org/apache/spark/sql/lakesoul/DataFrameWriterV2Suite.scala +++ b/src/test/scala/org/apache/spark/sql/lakesoul/DataFrameWriterV2Suite.scala @@ -22,7 +22,7 @@ import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, Table, import org.apache.spark.sql.connector.expressions._ import org.apache.spark.sql.functions._ import org.apache.spark.sql.lakesoul.catalog.{LakeSoulCatalog, LakeSoulTableV2} -import org.apache.spark.sql.lakesoul.test.LakeSQLCommandSoulTest +import org.apache.spark.sql.lakesoul.test.LakeSoulSQLCommandTest import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types.{LongType, StringType, StructType} import org.apache.spark.sql.{AnalysisException, QueryTest, Row} @@ -411,7 +411,7 @@ trait DataFrameWriterV2Tests class DataFrameWriterV2Suite extends DataFrameWriterV2Tests - with LakeSQLCommandSoulTest { + with LakeSoulSQLCommandTest { import testImplicits._ diff --git a/src/test/scala/org/apache/spark/sql/lakesoul/InsertIntoTableSuite.scala b/src/test/scala/org/apache/spark/sql/lakesoul/InsertIntoTableSuite.scala index 09377cd26..ebb0ffddc 100644 --- a/src/test/scala/org/apache/spark/sql/lakesoul/InsertIntoTableSuite.scala +++ b/src/test/scala/org/apache/spark/sql/lakesoul/InsertIntoTableSuite.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.{PARTITION_OVERWRITE_MODE, PartitionOverwriteMode} import org.apache.spark.sql.lakesoul.schema.SchemaUtils import org.apache.spark.sql.lakesoul.sources.LakeSoulSQLConf -import org.apache.spark.sql.lakesoul.test.{LakeSQLCommandSoulTest, LakeSoulTestUtils} +import org.apache.spark.sql.lakesoul.test.{LakeSoulSQLCommandTest, LakeSoulTestUtils} import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types._ import org.scalatest.BeforeAndAfter @@ -36,7 +36,7 @@ import org.scalatest.BeforeAndAfter import scala.collection.JavaConverters._ class InsertIntoSQLSuite extends InsertIntoTests(false, true) - with LakeSQLCommandSoulTest { + with LakeSoulSQLCommandTest { override protected def doInsert(tableName: String, insert: DataFrame, mode: SaveMode): Unit = { val tmpView = "tmp_view" withTempView(tmpView) { @@ -48,7 +48,7 @@ class InsertIntoSQLSuite extends InsertIntoTests(false, true) } class InsertIntoSQLByPathSuite extends InsertIntoTests(false, true) - with LakeSQLCommandSoulTest { + with LakeSoulSQLCommandTest { override protected def doInsert(tableName: String, insert: DataFrame, mode: SaveMode): Unit = { val tmpView = "tmp_view" withTempView(tmpView) { @@ -85,7 +85,7 @@ class InsertIntoSQLByPathSuite extends InsertIntoTests(false, true) } class InsertIntoDataFrameSuite extends InsertIntoTests(false, false) - with LakeSQLCommandSoulTest { + with LakeSoulSQLCommandTest { override protected def doInsert(tableName: String, insert: DataFrame, mode: SaveMode): Unit = { val dfw = insert.write.format(v2Format) if (mode != null) { @@ -96,7 +96,7 @@ class InsertIntoDataFrameSuite extends InsertIntoTests(false, false) } class InsertIntoDataFrameByPathSuite extends InsertIntoTests(false, false) - with LakeSQLCommandSoulTest { + with LakeSoulSQLCommandTest { override protected def doInsert(tableName: String, insert: DataFrame, mode: SaveMode): Unit = { val dfw = insert.write.format(v2Format) if (mode != null) { diff --git a/src/test/scala/org/apache/spark/sql/lakesoul/NotSupportedDDLSuite.scala b/src/test/scala/org/apache/spark/sql/lakesoul/NotSupportedDDLSuite.scala index 48a3cec64..66b25ee15 100644 --- a/src/test/scala/org/apache/spark/sql/lakesoul/NotSupportedDDLSuite.scala +++ b/src/test/scala/org/apache/spark/sql/lakesoul/NotSupportedDDLSuite.scala @@ -20,7 +20,7 @@ import com.dmetasoul.lakesoul.tables.LakeSoulTable import java.util.Locale import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.lakesoul.test.LakeSQLCommandSoulTest +import org.apache.spark.sql.lakesoul.test.LakeSoulSQLCommandTest import org.apache.spark.sql.test.{SQLTestUtils, SharedSparkSession} import org.apache.spark.sql.{AnalysisException, QueryTest} @@ -30,7 +30,7 @@ import scala.util.control.NonFatal class NotSupportedDDLSuite extends NotSupportedDDLBase with SharedSparkSession - with LakeSQLCommandSoulTest + with LakeSoulSQLCommandTest abstract class NotSupportedDDLBase extends QueryTest diff --git a/src/test/scala/org/apache/spark/sql/lakesoul/TableCreationTests.scala b/src/test/scala/org/apache/spark/sql/lakesoul/TableCreationTests.scala index 66cd89307..7277ca93d 100644 --- a/src/test/scala/org/apache/spark/sql/lakesoul/TableCreationTests.scala +++ b/src/test/scala/org/apache/spark/sql/lakesoul/TableCreationTests.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.analysis.NoSuchTableException import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType} import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog} import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.lakesoul.test.{LakeSQLCommandSoulTest, LakeSoulTestUtils} +import org.apache.spark.sql.lakesoul.test.{LakeSoulSQLCommandTest, LakeSoulTestUtils} import org.apache.spark.sql.lakesoul.utils.DataFileInfo import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types.{MetadataBuilder, StructType} @@ -1457,7 +1457,7 @@ trait TableCreationTests class TableCreationSuite extends TableCreationTests - with LakeSQLCommandSoulTest { + with LakeSoulSQLCommandTest { private def loadTable(tableName: String): Table = { val ti = spark.sessionState.sqlParser.parseMultipartIdentifier(tableName) diff --git a/src/test/scala/org/apache/spark/sql/lakesoul/commands/AlterTableTests.scala b/src/test/scala/org/apache/spark/sql/lakesoul/commands/AlterTableTests.scala index 3c67d7f2f..fa9ac8aaf 100644 --- a/src/test/scala/org/apache/spark/sql/lakesoul/commands/AlterTableTests.scala +++ b/src/test/scala/org/apache/spark/sql/lakesoul/commands/AlterTableTests.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.lakesoul.SnapshotManagement import org.apache.spark.sql.lakesoul.catalog.LakeSoulCatalog -import org.apache.spark.sql.lakesoul.test.{LakeSQLCommandSoulTest, LakeSoulTestUtils} +import org.apache.spark.sql.lakesoul.test.{LakeSoulSQLCommandTest, LakeSoulTestUtils} import org.apache.spark.sql.lakesoul.utils.DataFileInfo import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types._ @@ -1300,9 +1300,9 @@ trait AlterTableByPathTests extends AlterTableLakeSoulTestBase { class AlterTableByNameSuite extends AlterTableByNameTests - with LakeSQLCommandSoulTest { + with LakeSoulSQLCommandTest { } -class AlterTableByPathSuite extends AlterTableByPathTests with LakeSQLCommandSoulTest +class AlterTableByPathSuite extends AlterTableByPathTests with LakeSoulSQLCommandTest diff --git a/src/test/scala/org/apache/spark/sql/lakesoul/commands/DeleteSQLSuite.scala b/src/test/scala/org/apache/spark/sql/lakesoul/commands/DeleteSQLSuite.scala index d387f2db8..345ee57ab 100644 --- a/src/test/scala/org/apache/spark/sql/lakesoul/commands/DeleteSQLSuite.scala +++ b/src/test/scala/org/apache/spark/sql/lakesoul/commands/DeleteSQLSuite.scala @@ -16,9 +16,9 @@ package org.apache.spark.sql.lakesoul.commands -import org.apache.spark.sql.lakesoul.test.LakeSQLCommandSoulTest +import org.apache.spark.sql.lakesoul.test.LakeSoulSQLCommandTest -class DeleteSQLSuite extends DeleteSuiteBase with LakeSQLCommandSoulTest { +class DeleteSQLSuite extends DeleteSuiteBase with LakeSoulSQLCommandTest { override protected def executeDelete(target: String, where: String = null): Unit = { val whereClause = Option(where).map(c => s"WHERE $c").getOrElse("") diff --git a/src/test/scala/org/apache/spark/sql/lakesoul/commands/DeleteScalaSuite.scala b/src/test/scala/org/apache/spark/sql/lakesoul/commands/DeleteScalaSuite.scala index 4f0730f57..77cde85e0 100644 --- a/src/test/scala/org/apache/spark/sql/lakesoul/commands/DeleteScalaSuite.scala +++ b/src/test/scala/org/apache/spark/sql/lakesoul/commands/DeleteScalaSuite.scala @@ -19,14 +19,13 @@ package org.apache.spark.sql.lakesoul.commands import com.dmetasoul.lakesoul import com.dmetasoul.lakesoul.tables.{LakeSoulTable, LakeSoulTableTestUtils} import org.apache.spark.sql.lakesoul.SnapshotManagement -import org.apache.spark.sql.lakesoul.test.LakeSQLCommandSoulTest +import org.apache.spark.sql.lakesoul.test.LakeSoulSQLCommandTest import org.apache.spark.sql.{Row, functions} -class DeleteScalaSuite extends DeleteSuiteBase with LakeSQLCommandSoulTest { +class DeleteScalaSuite extends DeleteSuiteBase with LakeSoulSQLCommandTest { import testImplicits._ - test("delete cached table by path") { Seq((2, 2), (1, 4)).toDF("key", "value") .write.mode("overwrite").format("lakesoul").save(tempPath) @@ -64,7 +63,7 @@ class DeleteScalaSuite extends DeleteSuiteBase with LakeSQLCommandSoulTest { case tableName :: Nil => tableName -> None // just table name case tableName :: alias :: Nil => // tablename SPACE alias OR tab SPACE lename val ordinary = (('a' to 'z') ++ ('A' to 'Z') ++ ('0' to '9')).toSet - if (!alias.forall(ordinary.contains(_))) { + if (!alias.forall(ordinary.contains)) { (tableName + " " + alias) -> None } else { tableName -> Some(alias) @@ -84,7 +83,7 @@ class DeleteScalaSuite extends DeleteSuiteBase with LakeSQLCommandSoulTest { LakeSoulTableTestUtils.createTable(spark.table(tableNameOrPath), SnapshotManagement(tableNameOrPath)) } - optionalAlias.map(table.as(_)).getOrElse(table) + optionalAlias.map(table.as).getOrElse(table) } if (where != null) { diff --git a/src/test/scala/org/apache/spark/sql/lakesoul/commands/DeleteSuiteBase.scala b/src/test/scala/org/apache/spark/sql/lakesoul/commands/DeleteSuiteBase.scala index 4480c1c8f..366ac392c 100644 --- a/src/test/scala/org/apache/spark/sql/lakesoul/commands/DeleteSuiteBase.scala +++ b/src/test/scala/org/apache/spark/sql/lakesoul/commands/DeleteSuiteBase.scala @@ -17,50 +17,16 @@ package org.apache.spark.sql.lakesoul.commands import com.dmetasoul.lakesoul.tables.LakeSoulTable - -import java.io.File import org.apache.spark.sql.functions.col -import org.apache.spark.sql.lakesoul.SnapshotManagement +import org.apache.spark.sql.lakesoul.test.LakeSoulTestBeforeAndAfterEach import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row} -import org.apache.spark.util.Utils -import org.scalatest.BeforeAndAfterEach abstract class DeleteSuiteBase extends QueryTest - with SharedSparkSession with BeforeAndAfterEach { + with SharedSparkSession with LakeSoulTestBeforeAndAfterEach { import testImplicits._ - var tempDir: File = _ - - var snapshotManagement: SnapshotManagement = _ - - protected def tempPath: String = tempDir.getCanonicalPath - - protected def readLakeSoulTable(path: String): DataFrame = { - spark.read.format("lakesoul").load(path) - } - - override def beforeEach() { - super.beforeEach() - tempDir = Utils.createTempDir() - snapshotManagement = SnapshotManagement(tempPath) - } - - override def afterEach() { - try { - Utils.deleteRecursively(tempDir) - try { - snapshotManagement.updateSnapshot() - LakeSoulTable.forPath(snapshotManagement.table_name).dropTable() - } catch { - case e: Exception => - } - } finally { - super.afterEach() - } - } - protected def executeDelete(target: String, where: String = null): Unit protected def append(df: DataFrame, partitionBy: Seq[String] = Nil): Unit = { diff --git a/src/test/scala/org/apache/spark/sql/lakesoul/commands/MaterialViewSuite.scala b/src/test/scala/org/apache/spark/sql/lakesoul/commands/MaterialViewSuite.scala index 6ea83d6ae..6c3e77146 100644 --- a/src/test/scala/org/apache/spark/sql/lakesoul/commands/MaterialViewSuite.scala +++ b/src/test/scala/org/apache/spark/sql/lakesoul/commands/MaterialViewSuite.scala @@ -21,14 +21,14 @@ import com.dmetasoul.lakesoul.tables.LakeSoulTable import org.apache.spark.sql.functions._ import org.apache.spark.sql.lakesoul.SnapshotManagement import org.apache.spark.sql.lakesoul.sources.LakeSoulSQLConf -import org.apache.spark.sql.lakesoul.test.LakeSQLCommandSoulTest +import org.apache.spark.sql.lakesoul.test.LakeSoulSQLCommandTest import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.{AnalysisException, QueryTest} import org.apache.spark.util.Utils import org.scalatest.BeforeAndAfter class MaterialViewSuite extends QueryTest - with SharedSparkSession with LakeSQLCommandSoulTest with BeforeAndAfter { + with SharedSparkSession with LakeSoulSQLCommandTest with BeforeAndAfter { import testImplicits._ diff --git a/src/test/scala/org/apache/spark/sql/lakesoul/commands/MergeIntoSQLSuite.scala b/src/test/scala/org/apache/spark/sql/lakesoul/commands/MergeIntoSQLSuite.scala index 1953924bd..eddfe6652 100644 --- a/src/test/scala/org/apache/spark/sql/lakesoul/commands/MergeIntoSQLSuite.scala +++ b/src/test/scala/org/apache/spark/sql/lakesoul/commands/MergeIntoSQLSuite.scala @@ -1,21 +1,39 @@ package org.apache.spark.sql.lakesoul.commands -import org.apache.spark.sql.{AnalysisException, DataFrame, Row} -import org.apache.spark.sql.lakesoul.test.LakeSQLCommandSoulTest +import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row} +import org.apache.spark.sql.lakesoul.test.LakeSoulSQLCommandTest import org.apache.spark.util.Utils import org.scalatest._ import matchers.should.Matchers._ +import org.apache.spark.sql.lakesoul.test.{LakeSoulTestBeforeAndAfterEach, LakeSoulTestUtils} +import org.apache.spark.sql.test.SharedSparkSession -class MergeIntoSQLSuite extends UpsertSuiteBase with LakeSQLCommandSoulTest { +class MergeIntoSQLSuite extends QueryTest + with SharedSparkSession with LakeSoulTestBeforeAndAfterEach + with LakeSoulTestUtils with LakeSoulSQLCommandTest { import testImplicits._ + protected def initTable(df: DataFrame, + rangePartition: Seq[String] = Nil, + hashPartition: Seq[String] = Nil, + hashBucketNum: Int = 2): Unit = { + val writer = df.write.format("lakesoul").mode("overwrite") + + writer + .option("rangePartitions", rangePartition.mkString(",")) + .option("hashPartitions", hashPartition.mkString(",")) + .option("hashBucketNum", hashBucketNum) + .save(snapshotManagement.table_name) + } + private def initHashTable(): Unit = { initTable( Seq((20201101, 1, 1), (20201101, 2, 2), (20201101, 3, 3), (20201102, 4, 4)) .toDF("range", "hash", "value"), - "range", - "hash") + Seq("range"), + Seq("hash") + ) } private def withViewNamed(df: DataFrame, viewName: String)(f: => Unit): Unit = { diff --git a/src/test/scala/org/apache/spark/sql/lakesoul/commands/UpdateSQLSuite.scala b/src/test/scala/org/apache/spark/sql/lakesoul/commands/UpdateSQLSuite.scala index 831e4cc96..f280280ea 100644 --- a/src/test/scala/org/apache/spark/sql/lakesoul/commands/UpdateSQLSuite.scala +++ b/src/test/scala/org/apache/spark/sql/lakesoul/commands/UpdateSQLSuite.scala @@ -17,9 +17,9 @@ package org.apache.spark.sql.lakesoul.commands import org.apache.spark.sql.Row -import org.apache.spark.sql.lakesoul.test.LakeSQLCommandSoulTest +import org.apache.spark.sql.lakesoul.test.LakeSoulSQLCommandTest -class UpdateSQLSuite extends UpdateSuiteBase with LakeSQLCommandSoulTest { +class UpdateSQLSuite extends UpdateSuiteBase with LakeSoulSQLCommandTest { import testImplicits._ diff --git a/src/test/scala/org/apache/spark/sql/lakesoul/commands/UpdateScalaSuite.scala b/src/test/scala/org/apache/spark/sql/lakesoul/commands/UpdateScalaSuite.scala index f96000288..f9da2b480 100644 --- a/src/test/scala/org/apache/spark/sql/lakesoul/commands/UpdateScalaSuite.scala +++ b/src/test/scala/org/apache/spark/sql/lakesoul/commands/UpdateScalaSuite.scala @@ -19,12 +19,12 @@ package org.apache.spark.sql.lakesoul.commands import com.dmetasoul.lakesoul import com.dmetasoul.lakesoul.tables.{LakeSoulTable, LakeSoulTableTestUtils} import org.apache.spark.sql.lakesoul.SnapshotManagement -import org.apache.spark.sql.lakesoul.test.LakeSQLCommandSoulTest +import org.apache.spark.sql.lakesoul.test.LakeSoulSQLCommandTest import org.apache.spark.sql.{Row, functions} import java.util.Locale -class UpdateScalaSuite extends UpdateSuiteBase with LakeSQLCommandSoulTest { +class UpdateScalaSuite extends UpdateSuiteBase with LakeSoulSQLCommandTest { import testImplicits._ diff --git a/src/test/scala/org/apache/spark/sql/lakesoul/commands/UpdateSuiteBase.scala b/src/test/scala/org/apache/spark/sql/lakesoul/commands/UpdateSuiteBase.scala index 243223037..c3822121a 100644 --- a/src/test/scala/org/apache/spark/sql/lakesoul/commands/UpdateSuiteBase.scala +++ b/src/test/scala/org/apache/spark/sql/lakesoul/commands/UpdateSuiteBase.scala @@ -17,60 +17,25 @@ package org.apache.spark.sql.lakesoul.commands import com.dmetasoul.lakesoul.tables.LakeSoulTable - -import java.io.File -import java.util.Locale -import org.apache.hadoop.fs.Path import org.apache.spark.sql.functions.col import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.lakesoul.SnapshotManagement -import org.apache.spark.sql.lakesoul.test.LakeSoulTestUtils +import org.apache.spark.sql.lakesoul.test.{LakeSoulTestBeforeAndAfterEach, LakeSoulTestUtils} import org.apache.spark.sql.test.{SQLTestUtils, SharedSparkSession} import org.apache.spark.sql.types._ import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row} -import org.apache.spark.util.Utils -import org.scalatest.BeforeAndAfterEach +import java.util.Locale import scala.language.implicitConversions abstract class UpdateSuiteBase extends QueryTest with SharedSparkSession - with BeforeAndAfterEach + with LakeSoulTestBeforeAndAfterEach with SQLTestUtils with LakeSoulTestUtils { import testImplicits._ - var tempDir: File = _ - - var snapshotManagement: SnapshotManagement = _ - - protected def tempPath = tempDir.getCanonicalPath - - protected def readLakeSoulTable(path: String): DataFrame = { - spark.read.format("lakesoul").load(path) - } - - override def beforeEach() { - super.beforeEach() - tempDir = Utils.createTempDir() - snapshotManagement = SnapshotManagement(new Path(tempPath)) - } - - override def afterEach() { - try { - Utils.deleteRecursively(tempDir) - try { - LakeSoulTable.forPath(snapshotManagement.table_name).dropTable() - } catch { - case e: Exception => - } - } finally { - super.afterEach() - } - } - protected def executeUpdate(lakeSoulTable: String, set: Seq[String], where: String): Unit = { executeUpdate(lakeSoulTable, set.mkString(", "), where) } diff --git a/src/test/scala/org/apache/spark/sql/lakesoul/commands/UpsertSuiteBase.scala b/src/test/scala/org/apache/spark/sql/lakesoul/commands/UpsertSuiteBase.scala index 7fe5a0299..b7a0045c1 100644 --- a/src/test/scala/org/apache/spark/sql/lakesoul/commands/UpsertSuiteBase.scala +++ b/src/test/scala/org/apache/spark/sql/lakesoul/commands/UpsertSuiteBase.scala @@ -17,54 +17,18 @@ package org.apache.spark.sql.lakesoul.commands import com.dmetasoul.lakesoul.tables.LakeSoulTable - -import java.io.File -import org.apache.hadoop.fs.Path import org.apache.spark.sql.functions._ -import org.apache.spark.sql.lakesoul.SnapshotManagement import org.apache.spark.sql.lakesoul.sources.LakeSoulSQLConf -import org.apache.spark.sql.lakesoul.test.LakeSoulTestUtils +import org.apache.spark.sql.lakesoul.test.{LakeSoulTestBeforeAndAfterEach, LakeSoulTestUtils} import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row} -import org.apache.spark.util.Utils -import org.scalatest.BeforeAndAfterEach class UpsertSuiteBase extends QueryTest - with SharedSparkSession with BeforeAndAfterEach + with SharedSparkSession with LakeSoulTestBeforeAndAfterEach with LakeSoulTestUtils { import testImplicits._ - var tempDir: File = _ - - var snapshotManagement: SnapshotManagement = _ - - protected def tempPath: String = tempDir.getCanonicalPath - - protected def readLakeSoulTable(path: String): DataFrame = { - spark.read.format("lakesoul").load(path) - } - - override def beforeEach(): Unit = { - super.beforeEach() - tempDir = Utils.createTempDir() - snapshotManagement = SnapshotManagement(new Path(tempPath)) - } - - override def afterEach(): Unit = { - try { - Utils.deleteRecursively(tempDir) - try { - snapshotManagement.updateSnapshot() - LakeSoulTable.forPath(snapshotManagement.table_name).dropTable() - } catch { - case e: Exception => - } - } finally { - super.afterEach() - } - } - // protected def executeUpsert(df: DataFrame, condition: Option[String], tableName: String): Unit protected def executeUpsert(df: DataFrame, condition: Option[String], tableName: String): Unit = { if (condition.isEmpty) { diff --git a/src/test/scala/org/apache/spark/sql/lakesoul/rules/LakeSoulPostHocAnalysisSuiteSoul.scala b/src/test/scala/org/apache/spark/sql/lakesoul/rules/LakeSoulPostHocAnalysisSuiteSoul.scala index 8db1f4c9e..34f54df1c 100644 --- a/src/test/scala/org/apache/spark/sql/lakesoul/rules/LakeSoulPostHocAnalysisSuiteSoul.scala +++ b/src/test/scala/org/apache/spark/sql/lakesoul/rules/LakeSoulPostHocAnalysisSuiteSoul.scala @@ -19,10 +19,10 @@ package org.apache.spark.sql.lakesoul.rules import com.dmetasoul.lakesoul.tables.LakeSoulTable import org.apache.spark.sql.QueryTest import org.apache.spark.sql.functions._ -import org.apache.spark.sql.lakesoul.test.{LakeSQLCommandSoulTest, TestUtils} +import org.apache.spark.sql.lakesoul.test.{LakeSoulSQLCommandTest, TestUtils} import org.apache.spark.sql.test.SharedSparkSession -class LakeSoulPostHocAnalysisSuiteSoul extends QueryTest with SharedSparkSession with LakeSQLCommandSoulTest { +class LakeSoulPostHocAnalysisSuiteSoul extends QueryTest with SharedSparkSession with LakeSoulSQLCommandTest { import testImplicits._ diff --git a/src/test/scala/org/apache/spark/sql/lakesoul/rules/RewriteQueryByMaterialViewBase.scala b/src/test/scala/org/apache/spark/sql/lakesoul/rules/RewriteQueryByMaterialViewBase.scala index b5bf2b7a3..01279e4e4 100644 --- a/src/test/scala/org/apache/spark/sql/lakesoul/rules/RewriteQueryByMaterialViewBase.scala +++ b/src/test/scala/org/apache/spark/sql/lakesoul/rules/RewriteQueryByMaterialViewBase.scala @@ -17,14 +17,14 @@ package org.apache.spark.sql.lakesoul.rules import com.dmetasoul.lakesoul.tables.LakeSoulTable -import org.apache.spark.sql.lakesoul.test.LakeSQLCommandSoulTest +import org.apache.spark.sql.lakesoul.test.LakeSoulSQLCommandTest import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.{AnalysisException, QueryTest} import org.apache.spark.util.Utils import org.scalatest.BeforeAndAfterAll abstract class RewriteQueryByMaterialViewBase extends QueryTest - with SharedSparkSession with LakeSQLCommandSoulTest with BeforeAndAfterAll { + with SharedSparkSession with LakeSoulSQLCommandTest with BeforeAndAfterAll { import testImplicits._ diff --git a/src/test/scala/org/apache/spark/sql/lakesoul/schema/CaseSensitivitySuite.scala b/src/test/scala/org/apache/spark/sql/lakesoul/schema/CaseSensitivitySuite.scala index 9daddab31..272d2764f 100644 --- a/src/test/scala/org/apache/spark/sql/lakesoul/schema/CaseSensitivitySuite.scala +++ b/src/test/scala/org/apache/spark/sql/lakesoul/schema/CaseSensitivitySuite.scala @@ -22,7 +22,7 @@ import org.apache.spark.sql.execution.streaming.MemoryStream import org.apache.spark.sql.functions.col import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.lakesoul.SnapshotManagement -import org.apache.spark.sql.lakesoul.test.LakeSQLCommandSoulTest +import org.apache.spark.sql.lakesoul.test.LakeSoulSQLCommandTest import org.apache.spark.sql.lakesoul.utils.DataFileInfo import org.apache.spark.sql.streaming.{StreamingQuery, StreamingQueryException} import org.apache.spark.sql.test.{SQLTestUtils, SharedSparkSession} @@ -30,7 +30,7 @@ import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{AnalysisException, Dataset, QueryTest, Row} class CaseSensitivitySuite extends QueryTest - with SharedSparkSession with SQLTestUtils with LakeSQLCommandSoulTest { + with SharedSparkSession with SQLTestUtils with LakeSoulSQLCommandTest { import testImplicits._ diff --git a/src/test/scala/org/apache/spark/sql/lakesoul/schema/SchemaEnforcementSuite.scala b/src/test/scala/org/apache/spark/sql/lakesoul/schema/SchemaEnforcementSuite.scala index a931e805a..286918d16 100644 --- a/src/test/scala/org/apache/spark/sql/lakesoul/schema/SchemaEnforcementSuite.scala +++ b/src/test/scala/org/apache/spark/sql/lakesoul/schema/SchemaEnforcementSuite.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.execution.streaming.MemoryStream import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.lakesoul.sources.LakeSoulSQLConf -import org.apache.spark.sql.lakesoul.test.{LakeSQLCommandSoulTest, LakeSoulTestUtils} +import org.apache.spark.sql.lakesoul.test.{LakeSoulSQLCommandTest, LakeSoulTestUtils} import org.apache.spark.sql.lakesoul.{SnapshotManagement, LakeSoulOptions} import org.apache.spark.sql.streaming.StreamingQueryException import org.apache.spark.sql.test.SharedSparkSession @@ -300,7 +300,7 @@ trait AppendSaveModeTests extends BatchWriterSoulTest { } trait AppendOutputModeTests extends SchemaEnforcementSuiteBase with SharedSparkSession - with LakeSQLCommandSoulTest { + with LakeSoulSQLCommandTest { import testImplicits._ @@ -730,7 +730,7 @@ trait OverwriteSaveModeTests extends BatchWriterSoulTest { } trait CompleteOutputModeTests extends SchemaEnforcementSuiteBase with SharedSparkSession - with LakeSQLCommandSoulTest { + with LakeSoulSQLCommandTest { import testImplicits._ diff --git a/src/test/scala/org/apache/spark/sql/lakesoul/schema/SchemaValidationSuite.scala b/src/test/scala/org/apache/spark/sql/lakesoul/schema/SchemaValidationSuite.scala index be51aca73..375da28fe 100644 --- a/src/test/scala/org/apache/spark/sql/lakesoul/schema/SchemaValidationSuite.scala +++ b/src/test/scala/org/apache/spark/sql/lakesoul/schema/SchemaValidationSuite.scala @@ -21,7 +21,7 @@ import com.dmetasoul.lakesoul.tables.LakeSoulTable import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.functions._ -import org.apache.spark.sql.lakesoul.test.LakeSQLCommandSoulTest +import org.apache.spark.sql.lakesoul.test.LakeSoulSQLCommandTest import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SparkSession} @@ -33,7 +33,7 @@ import java.util.concurrent.CountDownLatch * command completes analysis but before the command starts the transaction. We want to make sure * That we do not corrupt tables. */ -class SchemaValidationSuite extends QueryTest with SharedSparkSession with LakeSQLCommandSoulTest { +class SchemaValidationSuite extends QueryTest with SharedSparkSession with LakeSoulSQLCommandTest { class BlockingRule( blockActionLatch: CountDownLatch, diff --git a/src/test/scala/org/apache/spark/sql/lakesoul/test/LakeSQLCommandSoulTest.scala b/src/test/scala/org/apache/spark/sql/lakesoul/test/LakeSoulSQLCommandTest.scala similarity index 98% rename from src/test/scala/org/apache/spark/sql/lakesoul/test/LakeSQLCommandSoulTest.scala rename to src/test/scala/org/apache/spark/sql/lakesoul/test/LakeSoulSQLCommandTest.scala index 02c620948..e3e9399c0 100644 --- a/src/test/scala/org/apache/spark/sql/lakesoul/test/LakeSQLCommandSoulTest.scala +++ b/src/test/scala/org/apache/spark/sql/lakesoul/test/LakeSoulSQLCommandTest.scala @@ -104,7 +104,7 @@ class LakeSoulTestSparkSession(sparkConf: SparkConf) extends TestSparkSession(sp * A trait for tests that are testing a fully set up SparkSession with all of LakeSoul's requirements, * such as the configuration of the LakeSoulCatalog and the addition of all LakeSoul extensions. */ -trait LakeSQLCommandSoulTest extends LakeSoulTestUtils { +trait LakeSoulSQLCommandTest extends LakeSoulTestUtils { self: SharedSparkSession => override protected def createSparkSession: TestSparkSession = { diff --git a/src/test/scala/org/apache/spark/sql/lakesoul/test/TestUtils.scala b/src/test/scala/org/apache/spark/sql/lakesoul/test/TestUtils.scala index 67fa816e1..848d00716 100644 --- a/src/test/scala/org/apache/spark/sql/lakesoul/test/TestUtils.scala +++ b/src/test/scala/org/apache/spark/sql/lakesoul/test/TestUtils.scala @@ -20,7 +20,13 @@ import com.dmetasoul.lakesoul.tables.LakeSoulTable import org.apache.spark.SparkConf import org.apache.spark.sql.execution.datasources.v2.merge.parquet.batch.merge_operator.MergeOperator import org.apache.spark.sql.functions.col +import org.apache.spark.sql.lakesoul.SnapshotManagement +import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.util.Utils +import org.scalatest.{BeforeAndAfterEach, Suite} + +import java.io.File object TestUtils { @@ -177,18 +183,14 @@ object TestUtils { checkDFResult(lakeSoulData, expectedResults) } - - } - class MergeOpInt extends MergeOperator[Int] { override def mergeData(input: Seq[Int]): Int = { input.sum } } - class MergeOpString extends MergeOperator[String] { override def mergeData(input: Seq[String]): String = { input.mkString(",") @@ -199,4 +201,38 @@ class MergeOpString02 extends MergeOperator[String] { override def mergeData(input: Seq[String]): String = { input.mkString(";") } +} + +trait LakeSoulTestBeforeAndAfterEach extends BeforeAndAfterEach { + self: Suite with SharedSparkSession => + + var tempDir: File = _ + + var snapshotManagement: SnapshotManagement = _ + + protected def tempPath: String = tempDir.getCanonicalPath + + protected def readLakeSoulTable(path: String): DataFrame = { + spark.read.format("lakesoul").load(path) + } + + override def beforeEach(): Unit = { + super.beforeEach() + tempDir = Utils.createTempDir() + snapshotManagement = SnapshotManagement(tempPath) + } + + override def afterEach(): Unit = { + try { + Utils.deleteRecursively(tempDir) + try { + snapshotManagement.updateSnapshot() + LakeSoulTable.forPath(snapshotManagement.table_name).dropTable() + } catch { + case _: Exception => + } + } finally { + super.afterEach() + } + } } \ No newline at end of file