diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala index 4901c0d39117d..5cb3f12c78c47 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala @@ -20,13 +20,14 @@ package org.apache.spark.sql.hudi import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path +import org.apache.hudi.client.SparkRDDWriteClient import org.apache.hudi.client.common.HoodieSparkEngineContext import org.apache.hudi.common.config.{DFSPropertiesConfiguration, HoodieMetadataConfig} import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.model.HoodieRecord import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieInstantTimeGenerator} import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} -import org.apache.hudi.{AvroConversionUtils, SparkAdapterSupport} +import org.apache.hudi.{AvroConversionUtils, DataSourceUtils, DataSourceWriteOptions, HoodieWriterUtils, SparkAdapterSupport} import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.sql.catalyst.TableIdentifier @@ -47,6 +48,7 @@ import scala.collection.JavaConverters._ import scala.collection.immutable.Map object HoodieSqlCommonUtils extends SparkAdapterSupport { + // NOTE: {@code SimpleDataFormat} is NOT thread-safe // TODO replace w/ DateTimeFormatter private val defaultDateFormat = @@ -312,4 +314,29 @@ object HoodieSqlCommonUtils extends SparkAdapterSupport { case field if resolver(field.name, name) => field } } + + /** + * Create the SparkRDDWriteClient from the exists hoodie table path. + * @param sparkSession The spark session. + * @param metaClient The meta client of the hoodie table. + * @param parameters The parameters passed to the SparkRDDWriteClient. + */ + def createHoodieClientFromPath( + sparkSession: SparkSession, + metaClient: HoodieTableMetaClient, + parameters: Map[String, String]): SparkRDDWriteClient[_] = { + val schemaUtil = new TableSchemaResolver(metaClient) + val schemaStr = schemaUtil.getTableAvroSchemaWithoutMetadataFields.toString + + // Append the spark conf to the parameters and fill the missing key with the default value. + val finalParameters = HoodieWriterUtils.parametersWithWriteDefaults( + withSparkConf(sparkSession, Map.empty)( + parameters + (DataSourceWriteOptions.TABLE_TYPE.key() -> metaClient.getTableType.name()) + ) + ) + + val jsc = new JavaSparkContext(sparkSession.sparkContext) + DataSourceUtils.createHoodieClient(jsc, schemaStr, metaClient.getBasePath, + metaClient.getTableConfig.getTableName, finalParameters.asJava) + } } diff --git a/hudi-spark-datasource/hudi-spark/src/main/antlr4/org/apache/hudi/spark/sql/parser/HoodieSqlCommon.g4 b/hudi-spark-datasource/hudi-spark/src/main/antlr4/org/apache/hudi/spark/sql/parser/HoodieSqlCommon.g4 index 74f83438f659c..6a4e462c05cce 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/antlr4/org/apache/hudi/spark/sql/parser/HoodieSqlCommon.g4 +++ b/hudi-spark-datasource/hudi-spark/src/main/antlr4/org/apache/hudi/spark/sql/parser/HoodieSqlCommon.g4 @@ -30,6 +30,10 @@ statement | operation = (RUN | SCHEDULE) COMPACTION ON path = STRING (AT instantTimestamp = NUMBER)? #compactionOnPath | SHOW COMPACTION ON tableIdentifier (LIMIT limit = NUMBER)? #showCompactionOnTable | SHOW COMPACTION ON path = STRING (LIMIT limit = NUMBER)? #showCompactionOnPath + | RUN CLUSTERING ON tableIdentifier (ORDER BY orderColumn+=IDENTIFIER (',' orderColumn+=IDENTIFIER)*)? (AT timestamp = NUMBER)? #clusteringOnTable + | RUN CLUSTERING ON path = STRING (ORDER BY orderColumn+=IDENTIFIER (',' orderColumn+=IDENTIFIER)*)? (AT timestamp = NUMBER)? #clusteringOnPath + | SHOW CLUSTERING ON tableIdentifier (LIMIT limit = NUMBER)? #showClusteringOnTable + | SHOW CLUSTERING ON path = STRING (LIMIT limit = NUMBER)? #showClusteringOnPath ; tableIdentifier @@ -44,6 +48,9 @@ statement ON: 'ON'; SHOW: 'SHOW'; LIMIT: 'LIMIT'; + CLUSTERING: 'CLUSTERING'; + ORDER: 'ORDER'; + BY: 'BY'; NUMBER : DIGIT+ diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Clustering.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Clustering.scala new file mode 100644 index 0000000000000..98f15389f423c --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Clustering.scala @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.plans.logical + +case class ClusteringOnTable(table: LogicalPlan, orderByColumns: Seq[String], timestamp: Option[Long]) + extends Command { + override def children: Seq[LogicalPlan] = Seq(table) + + def withNewChildrenInternal(newChildren: IndexedSeq[LogicalPlan]): ClusteringOnTable = { + copy(table = newChildren.head) + } +} + +case class ClusteringOnPath(path: String, orderByColumns: Seq[String], timestamp: Option[Long]) + extends Command { + override def children: Seq[LogicalPlan] = Seq.empty + + def withNewChildrenInternal(newChildren: IndexedSeq[LogicalPlan]): ClusteringOnPath = { + this + } +} + +case class ShowClusteringOnTable(table: LogicalPlan, limit: Int = 20) + extends Command { + override def children: Seq[LogicalPlan] = Seq(table) + + def withNewChildrenInternal(newChildren: IndexedSeq[LogicalPlan]): ShowClusteringOnTable = { + copy(table = newChildren.head) + } +} + +case class ShowClusteringOnPath(path: String, limit: Int = 20) + extends Command { + override def children: Seq[LogicalPlan] = Seq.empty + + def withNewChildrenInternal(newChildren: IndexedSeq[LogicalPlan]): ShowClusteringOnPath = { + this + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala index a198d0e009af2..f006a0ae72032 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.hudi import org.apache.hudi.SparkAdapterSupport + import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.expressions.{And, Cast, Expression, Literal} import org.apache.spark.sql.catalyst.plans.logical.{MergeIntoTable, SubqueryAlias} diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala index c8fa32891e0f9..d9fe32737aa52 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala @@ -21,6 +21,7 @@ import org.apache.hudi.{HoodieSparkUtils, SparkAdapterSupport} import org.apache.hudi.DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL import org.apache.hudi.common.model.HoodieRecord import org.apache.hudi.common.table.HoodieTableMetaClient + import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedStar} import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Expression, Literal, NamedExpression} import org.apache.spark.sql.catalyst.plans.Inner @@ -31,7 +32,7 @@ import org.apache.spark.sql.execution.datasources.{CreateTable, LogicalRelation} import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.{getTableIdentifier, getTableLocation, isHoodieTable, removeMetaFields, tableExistsInPath} import org.apache.spark.sql.hudi.HoodieSqlUtils._ import org.apache.spark.sql.hudi.command._ -import org.apache.spark.sql.hudi.{HoodieOptionConfig, HoodieSqlCommonUtils, HoodieSqlUtils} +import org.apache.spark.sql.hudi.{HoodieOptionConfig, HoodieSqlCommonUtils} import org.apache.spark.sql.types.StringType import org.apache.spark.sql.{AnalysisException, SparkSession} @@ -106,6 +107,22 @@ case class HoodieAnalysis(sparkSession: SparkSession) extends Rule[LogicalPlan] // Convert to CompactionShowHoodiePathCommand case CompactionShowOnPath(path, limit) => CompactionShowHoodiePathCommand(path, limit) + + case ShowClusteringOnTable(table, limit) if isHoodieTable(table, sparkSession) => + val tableId = getTableIdentifier(table) + val catalogTable = sparkSession.sessionState.catalog.getTableMetadata(tableId) + ClusteringShowHoodieTableCommand(catalogTable, limit) + + case ShowClusteringOnPath(path, limit) => + ClusteringShowHoodiePathCommand(path, limit) + + case ClusteringOnTable(table, orderByColumns, timestamp) if isHoodieTable(table, sparkSession) => + val tableId = getTableIdentifier(table) + val catalogTable = sparkSession.sessionState.catalog.getTableMetadata(tableId) + ClusteringHoodieTableCommand(catalogTable, orderByColumns, timestamp) + + case ClusteringOnPath(path, orderByColumns, timestamp) => + ClusteringHoodiePathCommand(path, orderByColumns, timestamp) case _=> plan } } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/ClusteringHoodiePathCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/ClusteringHoodiePathCommand.scala new file mode 100644 index 0000000000000..011bf410cdaff --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/ClusteringHoodiePathCommand.scala @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command + +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.util.ClusteringUtils +import org.apache.hudi.common.util.{Option => HOption} +import org.apache.hudi.config.HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS + +import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.hudi.HoodieSqlCommonUtils + +import scala.collection.JavaConverters._ +import scala.collection.immutable.Map + +case class ClusteringHoodiePathCommand( + path: String, + orderByColumns: Seq[String], + timestamp: Option[Long]) extends HoodieLeafRunnableCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { + val metaClient = HoodieTableMetaClient.builder().setBasePath(path) + .setConf(sparkSession.sessionState.newHadoopConf()).build() + + val client = HoodieSqlCommonUtils.createHoodieClientFromPath( + sparkSession, + metaClient, + Map(PLAN_STRATEGY_SORT_COLUMNS.key() -> orderByColumns.mkString(",")) + ) + + // Get all of the pending clustering. + val pendingClustering = ClusteringUtils.getAllPendingClusteringPlans(metaClient) + .iterator().asScala.map(_.getLeft.getTimestamp).toSeq.sortBy(f => f) + + val clusteringInstants = if (timestamp.isEmpty) { + // If there is no pending clustering, schedule to generate one. + if (pendingClustering.isEmpty) { + val instantTime = HoodieActiveTimeline.createNewInstantTime + if(client.scheduleClusteringAtInstant(instantTime, HOption.empty())) { + Seq(instantTime) + } else { + Seq.empty[String] + } + } else { // If exist pending cluster, cluster all of the pending plan. + pendingClustering + } + } else { + // Clustering the specified instant. + if (pendingClustering.contains(timestamp.get.toString)) { + Seq(timestamp.get.toString) + } else { + throw new IllegalArgumentException(s"Clustering instant: ${timestamp.get} is not found in $path," + + s" Available pending clustering instants are: ${pendingClustering.mkString(",")}.") + } + } + + logInfo(s"Clustering instants are: ${clusteringInstants.mkString(",")}.") + val startTs = System.currentTimeMillis() + clusteringInstants.foreach(client.cluster(_, true)) + logInfo(s"Finish clustering all the instants: ${clusteringInstants.mkString(",")}," + + s" time send: ${System.currentTimeMillis() - startTs}ms.") + Seq.empty[Row] + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/ClusteringHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/ClusteringHoodieTableCommand.scala new file mode 100644 index 0000000000000..681b5e2a0075b --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/ClusteringHoodieTableCommand.scala @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command + +import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.catalyst.catalog.{CatalogTable, HoodieCatalogTable} + +case class ClusteringHoodieTableCommand(table: CatalogTable, + orderByColumns: Seq[String], timestamp: Option[Long]) extends HoodieLeafRunnableCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { + val hoodieCatalogTable = HoodieCatalogTable(sparkSession, table) + + val notExistsColumns = orderByColumns.filterNot(table.schema.fieldNames.contains(_)) + assert(notExistsColumns.isEmpty, s"Order by columns:[${notExistsColumns.mkString(",")}] is not exists" + + s" in table ${table.identifier.unquotedString}.") + + ClusteringHoodiePathCommand( + hoodieCatalogTable.tableLocation, + orderByColumns, + timestamp).run(sparkSession) + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/ClusteringShowHoodiePathCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/ClusteringShowHoodiePathCommand.scala new file mode 100644 index 0000000000000..28dac09c8e394 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/ClusteringShowHoodiePathCommand.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command + +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.util.ClusteringUtils + +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} +import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.types.{IntegerType, StringType} + +import scala.collection.JavaConverters._ + +case class ClusteringShowHoodiePathCommand(path: String, limit: Int) + extends HoodieLeafRunnableCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { + val metaClient = HoodieTableMetaClient.builder().setBasePath(path) + .setConf(sparkSession.sessionState.newHadoopConf()).build() + + ClusteringUtils.getAllPendingClusteringPlans(metaClient).iterator().asScala.map { p => + Row(p.getLeft.getTimestamp, p.getRight.getInputGroups.size()) + }.toSeq.take(limit) + } + + override val output: Seq[Attribute] = { + Seq( + AttributeReference("timestamp", StringType, nullable = false)(), + AttributeReference("groups", IntegerType, nullable = false)() + ) + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/ClusteringShowHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/ClusteringShowHoodieTableCommand.scala new file mode 100644 index 0000000000000..67f99cf44d6e9 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/ClusteringShowHoodieTableCommand.scala @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command + +import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.catalyst.catalog.{CatalogTable, HoodieCatalogTable} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} +import org.apache.spark.sql.types.{IntegerType, StringType} + +case class ClusteringShowHoodieTableCommand(table: CatalogTable, limit: Int) + extends HoodieLeafRunnableCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { + val hoodieCatalogTable = HoodieCatalogTable(sparkSession, table) + ClusteringShowHoodiePathCommand(hoodieCatalogTable.tableLocation, limit).run(sparkSession) + } + + override val output: Seq[Attribute] = { + Seq( + AttributeReference("timestamp", StringType, nullable = false)(), + AttributeReference("groups", IntegerType, nullable = false)() + ) + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionHoodiePathCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionHoodiePathCommand.scala index 1363fb939b4e3..0ec3ecf25e6d2 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionHoodiePathCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionHoodiePathCommand.scala @@ -20,11 +20,11 @@ package org.apache.spark.sql.hudi.command import org.apache.hudi.client.WriteStatus import org.apache.hudi.common.model.HoodieTableType import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieTimeline} -import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} +import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.util.{HoodieTimer, Option => HOption} import org.apache.hudi.exception.HoodieException -import org.apache.hudi.{DataSourceUtils, DataSourceWriteOptions, HoodieWriterUtils} -import org.apache.spark.api.java.{JavaRDD, JavaSparkContext} + +import org.apache.spark.api.java.JavaRDD import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.catalyst.plans.logical.CompactionOperation import org.apache.spark.sql.catalyst.plans.logical.CompactionOperation.{CompactionOperation, RUN, SCHEDULE} @@ -45,20 +45,7 @@ case class CompactionHoodiePathCommand(path: String, assert(metaClient.getTableType == HoodieTableType.MERGE_ON_READ, s"Must compaction on a Merge On Read table.") - val schemaUtil = new TableSchemaResolver(metaClient) - val schemaStr = schemaUtil.getTableAvroSchemaWithoutMetadataFields.toString - - val parameters = HoodieWriterUtils.parametersWithWriteDefaults( - HoodieSqlCommonUtils.withSparkConf(sparkSession, Map.empty)( - Map( - DataSourceWriteOptions.TABLE_TYPE.key() -> HoodieTableType.MERGE_ON_READ.name() - ) - ) - ) - val jsc = new JavaSparkContext(sparkSession.sparkContext) - val client = DataSourceUtils.createHoodieClient(jsc, schemaStr, path, - metaClient.getTableConfig.getTableName, parameters) - + val client = HoodieSqlCommonUtils.createHoodieClientFromPath(sparkSession, metaClient, Map.empty) operation match { case SCHEDULE => val instantTime = instantTimestamp.map(_.toString).getOrElse(HoodieActiveTimeline.createNewInstantTime) @@ -71,7 +58,7 @@ case class CompactionHoodiePathCommand(path: String, // Do compaction val timeLine = metaClient.getActiveTimeline val pendingCompactionInstants = timeLine.getWriteTimeline.getInstants.iterator().asScala - .filter(p => p.getAction == HoodieTimeline.COMPACTION_ACTION) + .filter(p => p.getAction == HoodieTimeline.COMPACTION_ACTION) .map(_.getTimestamp) .toSeq.sortBy(f => f) val willCompactionInstants = if (instantTimestamp.isEmpty) { diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/parser/HoodieSqlCommonAstBuilder.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/parser/HoodieSqlCommonAstBuilder.scala index b1f5a32fe1e19..f371773f220a9 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/parser/HoodieSqlCommonAstBuilder.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/parser/HoodieSqlCommonAstBuilder.scala @@ -18,15 +18,17 @@ package org.apache.spark.sql.parser import org.apache.hudi.SparkAdapterSupport -import org.apache.hudi.spark.sql.parser.{HoodieSqlCommonBaseVisitor, HoodieSqlCommonParser} -import org.apache.hudi.spark.sql.parser.HoodieSqlCommonParser.{CompactionOnPathContext, CompactionOnTableContext, ShowCompactionOnPathContext, ShowCompactionOnTableContext, SingleStatementContext, TableIdentifierContext} +import org.apache.hudi.spark.sql.parser.HoodieSqlCommonBaseVisitor +import org.apache.hudi.spark.sql.parser.HoodieSqlCommonParser.{ClusteringOnPathContext, ClusteringOnTableContext, CompactionOnPathContext, CompactionOnTableContext, ShowClusteringOnPathContext, ShowClusteringOnTableContext, ShowCompactionOnPathContext, ShowCompactionOnTableContext, SingleStatementContext, TableIdentifierContext} + import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation -import org.apache.spark.sql.catalyst.parser.ParserUtils.withOrigin import org.apache.spark.sql.catalyst.parser.{ParserInterface, ParserUtils} -import org.apache.spark.sql.catalyst.plans.logical.{CompactionOperation, CompactionPath, CompactionShowOnPath, CompactionShowOnTable, CompactionTable, LogicalPlan} +import org.apache.spark.sql.catalyst.plans.logical.{ClusteringOnPath, ClusteringOnTable, CompactionOperation, CompactionPath, CompactionShowOnPath, CompactionShowOnTable, CompactionTable, LogicalPlan, ShowClusteringOnPath, ShowClusteringOnTable} + +import scala.collection.JavaConverters._ class HoodieSqlCommonAstBuilder(session: SparkSession, delegate: ParserInterface) extends HoodieSqlCommonBaseVisitor[AnyRef] with Logging with SparkAdapterSupport { @@ -69,6 +71,46 @@ class HoodieSqlCommonAstBuilder(session: SparkSession, delegate: ParserInterface } } + override def visitClusteringOnTable(ctx: ClusteringOnTableContext): LogicalPlan = withOrigin(ctx) { + val table = ctx.tableIdentifier().accept(this).asInstanceOf[LogicalPlan] + val orderByColumns = if (ctx.orderColumn != null) { + ctx.orderColumn.asScala.map(_.getText) + } else { + Seq.empty[String] + } + val timestamp = if (ctx.timestamp != null) Some(ctx.timestamp.getText.toLong) else None + ClusteringOnTable(table, orderByColumns, timestamp) + } + + override def visitClusteringOnPath(ctx: ClusteringOnPathContext): LogicalPlan = withOrigin(ctx) { + val path = string(ctx.path) + val orderByColumns = if (ctx.orderColumn != null) { + ctx.orderColumn.asScala.map(_.getText) + } else { + Seq.empty[String] + } + val timestamp = if (ctx.timestamp != null) Some(ctx.timestamp.getText.toLong) else None + ClusteringOnPath(path, orderByColumns, timestamp) + } + + override def visitShowClusteringOnTable(ctx: ShowClusteringOnTableContext): LogicalPlan = withOrigin(ctx) { + val table = ctx.tableIdentifier().accept(this).asInstanceOf[LogicalPlan] + if (ctx.limit != null) { + ShowClusteringOnTable(table, ctx.limit.getText.toInt) + } else { + ShowClusteringOnTable(table) + } + } + + override def visitShowClusteringOnPath(ctx: ShowClusteringOnPathContext): LogicalPlan = withOrigin(ctx) { + val path = string(ctx.path) + if (ctx.limit != null) { + ShowClusteringOnPath(path, ctx.limit.getText.toInt) + } else { + ShowClusteringOnPath(path) + } + } + override def visitTableIdentifier(ctx: TableIdentifierContext): LogicalPlan = withOrigin(ctx) { UnresolvedRelation(TableIdentifier(ctx.table.getText, Option(ctx.db).map(_.getText))) } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestClusteringTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestClusteringTable.scala new file mode 100644 index 0000000000000..45795c5b3dd4c --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestClusteringTable.scala @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi + +import org.apache.hadoop.fs.Path + +import org.apache.hudi.HoodieDataSourceHelpers +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.util.{Option => HOption} +import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieTimeline} + +import scala.collection.JavaConverters.asScalaIteratorConverter + +class TestClusteringTable extends TestHoodieSqlBase { + + test("Test clustering table") { + withTempDir{tmp => + Seq("cow", "mor").foreach {tableType => + val tableName = generateTableName + val basePath = s"${tmp.getCanonicalPath}/$tableName" + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long + |) using hudi + | location '$basePath' + | options ( + | type = '$tableType', + | primaryKey ='id', + | preCombineField = 'ts' + | ) + """.stripMargin) + spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)") + spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)") + + val metaClient = HoodieTableMetaClient.builder().setBasePath(basePath) + .setConf(spark.sessionState.newHadoopConf()).build() + val client = HoodieSqlCommonUtils.createHoodieClientFromPath(spark, metaClient, Map.empty) + + // First schedule a clustering plan + val firstScheduleInstant = HoodieActiveTimeline.createNewInstantTime + client.scheduleClusteringAtInstant(firstScheduleInstant, HOption.empty()) + + // Second schedule a clustering plan + spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)") + val secondScheduleInstant = HoodieActiveTimeline.createNewInstantTime + client.scheduleClusteringAtInstant(secondScheduleInstant, HOption.empty()) + checkAnswer(s"show clustering on $tableName")( + Seq(firstScheduleInstant, 1), + Seq(secondScheduleInstant, 1) + ) + + // Do clustering for all the clustering plan + spark.sql(s"run clustering on $tableName order by ts") + checkAnswer(s"select id, name, price, ts from $tableName order by id")( + Seq(1, "a1", 10.0, 1000), + Seq(2, "a2", 10.0, 1001), + Seq(3, "a3", 10.0, 1002) + ) + // After clustering there should be no pending clustering. + checkAnswer(s"show clustering on $tableName")() + + // Do clustering without manual schedule(which will do the schedule if no pending clustering exists) + spark.sql(s"insert into $tableName values(4, 'a4', 10, 1003)") + spark.sql(s"insert into $tableName values(5, 'a5', 10, 1004)") + spark.sql(s"run clustering on $tableName") + + // Get the second replace commit + val fs = new Path(basePath).getFileSystem(spark.sessionState.newHadoopConf()) + val thirdClusteringInstant = HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, basePath) + .findInstantsAfter(secondScheduleInstant) + .getInstants + .iterator().asScala + .filter(p => p.getAction == HoodieTimeline.REPLACE_COMMIT_ACTION) + .toSeq + // Should have a new replace commit after the second clustering command. + assertResult(1)(thirdClusteringInstant.size) + + checkAnswer(s"select id, name, price, ts from $tableName order by id")( + Seq(1, "a1", 10.0, 1000), + Seq(2, "a2", 10.0, 1001), + Seq(3, "a3", 10.0, 1002), + Seq(4, "a4", 10.0, 1003), + Seq(5, "a5", 10.0, 1004) + ) + } + } + } + + + test("Test clustering table path") { + withTempDir{tmp => + Seq("cow", "mor").foreach {tableType => + val tableName = generateTableName + val basePath = s"${tmp.getCanonicalPath}/$tableName" + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long + |) using hudi + | location '$basePath' + | options ( + | primaryKey ='id', + | type = '$tableType', + | preCombineField = 'ts' + | ) + """.stripMargin) + + spark.sql(s"run clustering on '$basePath'") + spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)") + spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)") + spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)") + + val metaClient = HoodieTableMetaClient.builder().setBasePath(basePath) + .setConf(spark.sessionState.newHadoopConf()).build() + val client = HoodieSqlCommonUtils.createHoodieClientFromPath(spark, metaClient, Map.empty) + + // Schedule a clustering plan + val scheduleInstant = HoodieActiveTimeline.createNewInstantTime + client.scheduleClusteringAtInstant(scheduleInstant, HOption.empty()) + checkAnswer(s"show clustering on '$basePath'")( + Seq(scheduleInstant, 1) + ) + + // Do clustering for all the clustering plan + spark.sql(s"run clustering on '$basePath' order by ts at $scheduleInstant") + checkAnswer(s"select id, name, price, ts from $tableName order by id")( + Seq(1, "a1", 10.0, 1000), + Seq(2, "a2", 10.0, 1001), + Seq(3, "a3", 10.0, 1002) + ) + val fs = new Path(basePath).getFileSystem(spark.sessionState.newHadoopConf()) + HoodieDataSourceHelpers.hasNewCommits(fs, basePath, scheduleInstant) + } + } + } + +}