Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize duplicate tests and code #6

Merged
merged 1 commit into from
Jan 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,15 @@ class LakeSoulSparkSessionExtension extends (SparkSessionExtensions => Unit) {
extensions.injectPostHocResolutionRule { session =>
PreprocessTableDelete(session.sessionState.conf)
}

extensions.injectPostHocResolutionRule { session =>
LakeSoulPostHocAnalysis(session)
}
extensions.injectResolutionRule{ session =>

extensions.injectResolutionRule { session =>
ProcessCDCTableMergeOnRead(session.sessionState.conf)
}

extensions.injectResolutionRule { session =>
RewriteQueryByMaterialView(session)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,39 +1,46 @@
/*
* Copyright [2022] [DMetaSoul Team]
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.lakesoul.rules

import org.apache.spark.sql.Column
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, EqualTo, Expression}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2ScanRelation}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.functions.expr
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.lakesoul.{LakeSoulTableProperties, LakeSoulTableRelationV2}
import org.apache.spark.sql.lakesoul.LakeSoulTableProperties
import org.apache.spark.sql.lakesoul.catalog.LakeSoulTableV2
import org.apache.spark.sql.lakesoul.exception.LakeSoulErrors
case class ProcessCDCTableMergeOnRead (sqlConf: SQLConf) extends Rule[LogicalPlan]{
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsDown {

case class ProcessCDCTableMergeOnRead(sqlConf: SQLConf) extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsDown {
case p: LogicalPlan if p.children.exists(_.isInstanceOf[DataSourceV2Relation]) && !p.isInstanceOf[Filter] =>
p.children.toSeq.find(_.isInstanceOf[DataSourceV2Relation]).get match {
case dsv2@DataSourceV2Relation(table: LakeSoulTableV2, _, _, _, _)=>{
val value=getLakeSoulTableCDCColumn(table)
if(value.nonEmpty){
p.withNewChildren(Filter(Column(expr(s" ${value.get}!= 'delete'").expr).expr,dsv2)::Nil)
}
else {
p
}
p.children.find(_.isInstanceOf[DataSourceV2Relation]).get match {
case dsv2@DataSourceV2Relation(table: LakeSoulTableV2, _, _, _, _) =>
val value = getLakeSoulTableCDCColumn(table)
if (value.nonEmpty) {
p.withNewChildren(Filter(Column(expr(s" ${value.get}!= 'delete'").expr).expr, dsv2) :: Nil)
}
else {
p
}
}

}
}
private def lakeSoulTableHasHashPartition(table: LakeSoulTableV2): Boolean = {
table.snapshotManagement.snapshot.getTableInfo.hash_column.nonEmpty
}
private def lakeSoulTableCDCColumn(table: LakeSoulTableV2): Boolean = {
table.snapshotManagement.snapshot.getTableInfo.configuration.contains(LakeSoulTableProperties.lakeSoulCDCChangePropKey)
}

private def getLakeSoulTableCDCColumn(table: LakeSoulTableV2): Option[String] = {
table.snapshotManagement.snapshot.getTableInfo.configuration.get(LakeSoulTableProperties.lakeSoulCDCChangePropKey)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ package com.dmetasoul.lakesoul.tables
import java.util.Locale

import org.apache.spark.sql.lakesoul.LakeSoulUtils
import org.apache.spark.sql.lakesoul.test.LakeSQLCommandSoulTest
import org.apache.spark.sql.lakesoul.test.LakeSoulSQLCommandTest
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.{AnalysisException, QueryTest}

class LakeSoulTableSuite extends QueryTest
with SharedSparkSession
with LakeSQLCommandSoulTest {
with LakeSoulSQLCommandTest {

test("forPath") {
withTempDir { dir =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ import com.dmetasoul.lakesoul.tables.LakeSoulTable
import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.functions.{col, last}
import org.apache.spark.sql.lakesoul.sources.LakeSoulSQLConf
import org.apache.spark.sql.lakesoul.test.{LakeSQLCommandSoulTest, TestUtils}
import org.apache.spark.sql.lakesoul.test.{LakeSoulSQLCommandTest, TestUtils}
import org.apache.spark.sql.test.SharedSparkSession
import org.scalatest.BeforeAndAfterEach

class ParquetScanSuite extends QueryTest
with SharedSparkSession with BeforeAndAfterEach
with LakeSQLCommandSoulTest {
with LakeSoulSQLCommandTest {

import testImplicits._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,30 +15,17 @@
*/

package org.apache.spark.sql.lakesoul
import org.apache.spark.sql.lakesoul.SnapshotManagement
import com.dmetasoul.lakesoul.tables.LakeSoulTable
import java.io.File
import java.util.Locale

import com.dmetasoul.lakesoul.meta.MetaVersion
import org.apache.hadoop.fs.Path
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.lakesoul.test.{LakeSQLCommandSoulTest, LakeSoulTestUtils}
import org.apache.spark.sql.lakesoul.utils.DataFileInfo
import org.apache.spark.sql.lakesoul.test.LakeSoulTestUtils
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.{MetadataBuilder, StructType}
import org.apache.spark.util.Utils
import org.scalatest.matchers.must.Matchers.contain
import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper
import org.apache.spark.sql.types.StructType

import scala.language.implicitConversions

class TestCDC
class CDCSuite
extends QueryTest
with SharedSparkSession
with LakeSoulTestUtils {
Expand All @@ -47,24 +34,6 @@ class TestCDC

val format = "lakesoul"

private def createTableByPath(path: File,
df: DataFrame,
tableName: String,
partitionedBy: Seq[String] = Nil): Unit = {
df.write
.partitionBy(partitionedBy: _*)
.mode(SaveMode.Append)
.format(format)
.save(path.getCanonicalPath)

sql(
s"""
|CREATE TABLE lakesoul_test
|USING lakesoul
|LOCATION '${path.getCanonicalPath}'
""".stripMargin)
}

private implicit def toTableIdentifier(tableName: String): TableIdentifier = {
spark.sessionState.sqlParser.parseTableIdentifier(tableName)
}
Expand All @@ -85,14 +54,6 @@ class TestCDC
spark.sessionState.catalog.getTableMetadata(tableName).schema
}

private def getSnapshotManagement(table: CatalogTable): SnapshotManagement = {
getSnapshotManagement(new Path(table.storage.locationUri.get))
}

private def getSnapshotManagement(tableName: String): SnapshotManagement = {
getSnapshotManagement(spark.sessionState.catalog.getTableMetadata(tableName))
}

protected def getSnapshotManagement(path: Path): SnapshotManagement = {
SnapshotManagement(path)
}
Expand Down
4 changes: 2 additions & 2 deletions src/test/scala/org/apache/spark/sql/lakesoul/DDLSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@ import org.apache.spark.SparkException
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.lakesoul.schema.InvariantViolationException
import org.apache.spark.sql.lakesoul.test.LakeSQLCommandSoulTest
import org.apache.spark.sql.lakesoul.test.LakeSoulSQLCommandTest
import org.apache.spark.sql.test.{SQLTestUtils, SharedSparkSession}
import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructType}
import org.apache.spark.sql.{AnalysisException, QueryTest, Row}

import scala.collection.JavaConverters._

class DDLSuite extends DDLTestBase with SharedSparkSession
with LakeSQLCommandSoulTest {
with LakeSoulSQLCommandTest {

override protected def verifyDescribeTable(tblName: String): Unit = {
val res = sql(s"DESCRIBE TABLE $tblName").collect()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.lakesoul
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.lakesoul.test.LakeSQLCommandSoulTest
import org.apache.spark.sql.lakesoul.test.LakeSoulSQLCommandTest
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.{AnalysisException, QueryTest}
import org.scalatest.Tag
Expand Down Expand Up @@ -130,6 +130,6 @@ trait DDLUsingPathTests extends QueryTest

}

class DDLUsingPathSuite extends DDLUsingPathTests with LakeSQLCommandSoulTest {
class DDLUsingPathSuite extends DDLUsingPathTests with LakeSoulSQLCommandTest {
}

Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, Table,
import org.apache.spark.sql.connector.expressions._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.lakesoul.catalog.{LakeSoulCatalog, LakeSoulTableV2}
import org.apache.spark.sql.lakesoul.test.LakeSQLCommandSoulTest
import org.apache.spark.sql.lakesoul.test.LakeSoulSQLCommandTest
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.{LongType, StringType, StructType}
import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
Expand Down Expand Up @@ -411,7 +411,7 @@ trait DataFrameWriterV2Tests

class DataFrameWriterV2Suite
extends DataFrameWriterV2Tests
with LakeSQLCommandSoulTest {
with LakeSoulSQLCommandTest {

import testImplicits._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.{PARTITION_OVERWRITE_MODE, PartitionOverwriteMode}
import org.apache.spark.sql.lakesoul.schema.SchemaUtils
import org.apache.spark.sql.lakesoul.sources.LakeSoulSQLConf
import org.apache.spark.sql.lakesoul.test.{LakeSQLCommandSoulTest, LakeSoulTestUtils}
import org.apache.spark.sql.lakesoul.test.{LakeSoulSQLCommandTest, LakeSoulTestUtils}
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types._
import org.scalatest.BeforeAndAfter

import scala.collection.JavaConverters._

class InsertIntoSQLSuite extends InsertIntoTests(false, true)
with LakeSQLCommandSoulTest {
with LakeSoulSQLCommandTest {
override protected def doInsert(tableName: String, insert: DataFrame, mode: SaveMode): Unit = {
val tmpView = "tmp_view"
withTempView(tmpView) {
Expand All @@ -48,7 +48,7 @@ class InsertIntoSQLSuite extends InsertIntoTests(false, true)
}

class InsertIntoSQLByPathSuite extends InsertIntoTests(false, true)
with LakeSQLCommandSoulTest {
with LakeSoulSQLCommandTest {
override protected def doInsert(tableName: String, insert: DataFrame, mode: SaveMode): Unit = {
val tmpView = "tmp_view"
withTempView(tmpView) {
Expand Down Expand Up @@ -85,7 +85,7 @@ class InsertIntoSQLByPathSuite extends InsertIntoTests(false, true)
}

class InsertIntoDataFrameSuite extends InsertIntoTests(false, false)
with LakeSQLCommandSoulTest {
with LakeSoulSQLCommandTest {
override protected def doInsert(tableName: String, insert: DataFrame, mode: SaveMode): Unit = {
val dfw = insert.write.format(v2Format)
if (mode != null) {
Expand All @@ -96,7 +96,7 @@ class InsertIntoDataFrameSuite extends InsertIntoTests(false, false)
}

class InsertIntoDataFrameByPathSuite extends InsertIntoTests(false, false)
with LakeSQLCommandSoulTest {
with LakeSoulSQLCommandTest {
override protected def doInsert(tableName: String, insert: DataFrame, mode: SaveMode): Unit = {
val dfw = insert.write.format(v2Format)
if (mode != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import com.dmetasoul.lakesoul.tables.LakeSoulTable

import java.util.Locale
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.lakesoul.test.LakeSQLCommandSoulTest
import org.apache.spark.sql.lakesoul.test.LakeSoulSQLCommandTest
import org.apache.spark.sql.test.{SQLTestUtils, SharedSparkSession}
import org.apache.spark.sql.{AnalysisException, QueryTest}

Expand All @@ -30,7 +30,7 @@ import scala.util.control.NonFatal
class NotSupportedDDLSuite
extends NotSupportedDDLBase
with SharedSparkSession
with LakeSQLCommandSoulTest
with LakeSoulSQLCommandTest


abstract class NotSupportedDDLBase extends QueryTest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.lakesoul.test.{LakeSQLCommandSoulTest, LakeSoulTestUtils}
import org.apache.spark.sql.lakesoul.test.{LakeSoulSQLCommandTest, LakeSoulTestUtils}
import org.apache.spark.sql.lakesoul.utils.DataFileInfo
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.{MetadataBuilder, StructType}
Expand Down Expand Up @@ -1457,7 +1457,7 @@ trait TableCreationTests

class TableCreationSuite
extends TableCreationTests
with LakeSQLCommandSoulTest {
with LakeSoulSQLCommandTest {

private def loadTable(tableName: String): Table = {
val ti = spark.sessionState.sqlParser.parseMultipartIdentifier(tableName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.lakesoul.SnapshotManagement
import org.apache.spark.sql.lakesoul.catalog.LakeSoulCatalog
import org.apache.spark.sql.lakesoul.test.{LakeSQLCommandSoulTest, LakeSoulTestUtils}
import org.apache.spark.sql.lakesoul.test.{LakeSoulSQLCommandTest, LakeSoulTestUtils}
import org.apache.spark.sql.lakesoul.utils.DataFileInfo
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -1300,9 +1300,9 @@ trait AlterTableByPathTests extends AlterTableLakeSoulTestBase {

class AlterTableByNameSuite
extends AlterTableByNameTests
with LakeSQLCommandSoulTest {
with LakeSoulSQLCommandTest {


}

class AlterTableByPathSuite extends AlterTableByPathTests with LakeSQLCommandSoulTest
class AlterTableByPathSuite extends AlterTableByPathTests with LakeSoulSQLCommandTest
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@

package org.apache.spark.sql.lakesoul.commands

import org.apache.spark.sql.lakesoul.test.LakeSQLCommandSoulTest
import org.apache.spark.sql.lakesoul.test.LakeSoulSQLCommandTest

class DeleteSQLSuite extends DeleteSuiteBase with LakeSQLCommandSoulTest {
class DeleteSQLSuite extends DeleteSuiteBase with LakeSoulSQLCommandTest {

override protected def executeDelete(target: String, where: String = null): Unit = {
val whereClause = Option(where).map(c => s"WHERE $c").getOrElse("")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,13 @@ package org.apache.spark.sql.lakesoul.commands
import com.dmetasoul.lakesoul
import com.dmetasoul.lakesoul.tables.{LakeSoulTable, LakeSoulTableTestUtils}
import org.apache.spark.sql.lakesoul.SnapshotManagement
import org.apache.spark.sql.lakesoul.test.LakeSQLCommandSoulTest
import org.apache.spark.sql.lakesoul.test.LakeSoulSQLCommandTest
import org.apache.spark.sql.{Row, functions}

class DeleteScalaSuite extends DeleteSuiteBase with LakeSQLCommandSoulTest {
class DeleteScalaSuite extends DeleteSuiteBase with LakeSoulSQLCommandTest {

import testImplicits._


test("delete cached table by path") {
Seq((2, 2), (1, 4)).toDF("key", "value")
.write.mode("overwrite").format("lakesoul").save(tempPath)
Expand Down Expand Up @@ -64,7 +63,7 @@ class DeleteScalaSuite extends DeleteSuiteBase with LakeSQLCommandSoulTest {
case tableName :: Nil => tableName -> None // just table name
case tableName :: alias :: Nil => // tablename SPACE alias OR tab SPACE lename
val ordinary = (('a' to 'z') ++ ('A' to 'Z') ++ ('0' to '9')).toSet
if (!alias.forall(ordinary.contains(_))) {
if (!alias.forall(ordinary.contains)) {
(tableName + " " + alias) -> None
} else {
tableName -> Some(alias)
Expand All @@ -84,7 +83,7 @@ class DeleteScalaSuite extends DeleteSuiteBase with LakeSQLCommandSoulTest {
LakeSoulTableTestUtils.createTable(spark.table(tableNameOrPath),
SnapshotManagement(tableNameOrPath))
}
optionalAlias.map(table.as(_)).getOrElse(table)
optionalAlias.map(table.as).getOrElse(table)
}

if (where != null) {
Expand Down
Loading