Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@ package org.apache.spark.sql.hudi
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path

import org.apache.hudi.client.SparkRDDWriteClient
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.common.config.{DFSPropertiesConfiguration, HoodieMetadataConfig}
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieInstantTimeGenerator}
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.{AvroConversionUtils, SparkAdapterSupport}
import org.apache.hudi.{AvroConversionUtils, DataSourceUtils, DataSourceWriteOptions, HoodieWriterUtils, SparkAdapterSupport}

import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.catalyst.TableIdentifier
Expand All @@ -47,6 +48,7 @@ import scala.collection.JavaConverters._
import scala.collection.immutable.Map

object HoodieSqlCommonUtils extends SparkAdapterSupport {

// NOTE: {@code SimpleDataFormat} is NOT thread-safe
// TODO replace w/ DateTimeFormatter
private val defaultDateFormat =
Expand Down Expand Up @@ -312,4 +314,29 @@ object HoodieSqlCommonUtils extends SparkAdapterSupport {
case field if resolver(field.name, name) => field
}
}

/**
* Create the SparkRDDWriteClient from the exists hoodie table path.
* @param sparkSession The spark session.
* @param metaClient The meta client of the hoodie table.
* @param parameters The parameters passed to the SparkRDDWriteClient.
*/
def createHoodieClientFromPath(
sparkSession: SparkSession,
metaClient: HoodieTableMetaClient,
parameters: Map[String, String]): SparkRDDWriteClient[_] = {
val schemaUtil = new TableSchemaResolver(metaClient)
val schemaStr = schemaUtil.getTableAvroSchemaWithoutMetadataFields.toString

// Append the spark conf to the parameters and fill the missing key with the default value.
val finalParameters = HoodieWriterUtils.parametersWithWriteDefaults(
withSparkConf(sparkSession, Map.empty)(
parameters + (DataSourceWriteOptions.TABLE_TYPE.key() -> metaClient.getTableType.name())
)
)

val jsc = new JavaSparkContext(sparkSession.sparkContext)
DataSourceUtils.createHoodieClient(jsc, schemaStr, metaClient.getBasePath,
metaClient.getTableConfig.getTableName, finalParameters.asJava)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ statement
| operation = (RUN | SCHEDULE) COMPACTION ON path = STRING (AT instantTimestamp = NUMBER)? #compactionOnPath
| SHOW COMPACTION ON tableIdentifier (LIMIT limit = NUMBER)? #showCompactionOnTable
| SHOW COMPACTION ON path = STRING (LIMIT limit = NUMBER)? #showCompactionOnPath
| RUN CLUSTERING ON tableIdentifier (ORDER BY orderColumn+=IDENTIFIER (',' orderColumn+=IDENTIFIER)*)? (AT timestamp = NUMBER)? #clusteringOnTable
| RUN CLUSTERING ON path = STRING (ORDER BY orderColumn+=IDENTIFIER (',' orderColumn+=IDENTIFIER)*)? (AT timestamp = NUMBER)? #clusteringOnPath
| SHOW CLUSTERING ON tableIdentifier (LIMIT limit = NUMBER)? #showClusteringOnTable
| SHOW CLUSTERING ON path = STRING (LIMIT limit = NUMBER)? #showClusteringOnPath
;

tableIdentifier
Expand All @@ -44,6 +48,9 @@ statement
ON: 'ON';
SHOW: 'SHOW';
LIMIT: 'LIMIT';
CLUSTERING: 'CLUSTERING';
ORDER: 'ORDER';
BY: 'BY';

NUMBER
: DIGIT+
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.plans.logical

case class ClusteringOnTable(table: LogicalPlan, orderByColumns: Seq[String], timestamp: Option[Long])
extends Command {
override def children: Seq[LogicalPlan] = Seq(table)

def withNewChildrenInternal(newChildren: IndexedSeq[LogicalPlan]): ClusteringOnTable = {
copy(table = newChildren.head)
}
}

case class ClusteringOnPath(path: String, orderByColumns: Seq[String], timestamp: Option[Long])
extends Command {
override def children: Seq[LogicalPlan] = Seq.empty

def withNewChildrenInternal(newChildren: IndexedSeq[LogicalPlan]): ClusteringOnPath = {
this
}
}

case class ShowClusteringOnTable(table: LogicalPlan, limit: Int = 20)
extends Command {
override def children: Seq[LogicalPlan] = Seq(table)

def withNewChildrenInternal(newChildren: IndexedSeq[LogicalPlan]): ShowClusteringOnTable = {
copy(table = newChildren.head)
}
}

case class ShowClusteringOnPath(path: String, limit: Int = 20)
extends Command {
override def children: Seq[LogicalPlan] = Seq.empty

def withNewChildrenInternal(newChildren: IndexedSeq[LogicalPlan]): ShowClusteringOnPath = {
this
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.hudi

import org.apache.hudi.SparkAdapterSupport

import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.{And, Cast, Expression, Literal}
import org.apache.spark.sql.catalyst.plans.logical.{MergeIntoTable, SubqueryAlias}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.hudi.{HoodieSparkUtils, SparkAdapterSupport}
import org.apache.hudi.DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL
import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.table.HoodieTableMetaClient

import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedStar}
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Expression, Literal, NamedExpression}
import org.apache.spark.sql.catalyst.plans.Inner
Expand All @@ -31,7 +32,7 @@ import org.apache.spark.sql.execution.datasources.{CreateTable, LogicalRelation}
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.{getTableIdentifier, getTableLocation, isHoodieTable, removeMetaFields, tableExistsInPath}
import org.apache.spark.sql.hudi.HoodieSqlUtils._
import org.apache.spark.sql.hudi.command._
import org.apache.spark.sql.hudi.{HoodieOptionConfig, HoodieSqlCommonUtils, HoodieSqlUtils}
import org.apache.spark.sql.hudi.{HoodieOptionConfig, HoodieSqlCommonUtils}
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.{AnalysisException, SparkSession}

Expand Down Expand Up @@ -106,6 +107,22 @@ case class HoodieAnalysis(sparkSession: SparkSession) extends Rule[LogicalPlan]
// Convert to CompactionShowHoodiePathCommand
case CompactionShowOnPath(path, limit) =>
CompactionShowHoodiePathCommand(path, limit)

case ShowClusteringOnTable(table, limit) if isHoodieTable(table, sparkSession) =>
val tableId = getTableIdentifier(table)
val catalogTable = sparkSession.sessionState.catalog.getTableMetadata(tableId)
ClusteringShowHoodieTableCommand(catalogTable, limit)

case ShowClusteringOnPath(path, limit) =>
ClusteringShowHoodiePathCommand(path, limit)

case ClusteringOnTable(table, orderByColumns, timestamp) if isHoodieTable(table, sparkSession) =>
val tableId = getTableIdentifier(table)
val catalogTable = sparkSession.sessionState.catalog.getTableMetadata(tableId)
ClusteringHoodieTableCommand(catalogTable, orderByColumns, timestamp)

case ClusteringOnPath(path, orderByColumns, timestamp) =>
ClusteringHoodiePathCommand(path, orderByColumns, timestamp)
case _=> plan
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.hudi.command

import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.util.ClusteringUtils
import org.apache.hudi.common.util.{Option => HOption}
import org.apache.hudi.config.HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS

import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils

import scala.collection.JavaConverters._
import scala.collection.immutable.Map

case class ClusteringHoodiePathCommand(
path: String,
orderByColumns: Seq[String],
timestamp: Option[Long]) extends HoodieLeafRunnableCommand {

override def run(sparkSession: SparkSession): Seq[Row] = {
val metaClient = HoodieTableMetaClient.builder().setBasePath(path)
.setConf(sparkSession.sessionState.newHadoopConf()).build()

val client = HoodieSqlCommonUtils.createHoodieClientFromPath(
sparkSession,
metaClient,
Map(PLAN_STRATEGY_SORT_COLUMNS.key() -> orderByColumns.mkString(","))
)

// Get all of the pending clustering.
val pendingClustering = ClusteringUtils.getAllPendingClusteringPlans(metaClient)
.iterator().asScala.map(_.getLeft.getTimestamp).toSeq.sortBy(f => f)

val clusteringInstants = if (timestamp.isEmpty) {
// If there is no pending clustering, schedule to generate one.
if (pendingClustering.isEmpty) {
val instantTime = HoodieActiveTimeline.createNewInstantTime
if(client.scheduleClusteringAtInstant(instantTime, HOption.empty())) {
Seq(instantTime)
} else {
Seq.empty[String]
}
} else { // If exist pending cluster, cluster all of the pending plan.
pendingClustering
}
} else {
// Clustering the specified instant.
if (pendingClustering.contains(timestamp.get.toString)) {
Seq(timestamp.get.toString)
} else {
throw new IllegalArgumentException(s"Clustering instant: ${timestamp.get} is not found in $path," +
s" Available pending clustering instants are: ${pendingClustering.mkString(",")}.")
}
}

logInfo(s"Clustering instants are: ${clusteringInstants.mkString(",")}.")
val startTs = System.currentTimeMillis()
clusteringInstants.foreach(client.cluster(_, true))
logInfo(s"Finish clustering all the instants: ${clusteringInstants.mkString(",")}," +
s" time send: ${System.currentTimeMillis() - startTs}ms.")
Seq.empty[Row]
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.hudi.command

import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, HoodieCatalogTable}

case class ClusteringHoodieTableCommand(table: CatalogTable,
orderByColumns: Seq[String], timestamp: Option[Long]) extends HoodieLeafRunnableCommand {

override def run(sparkSession: SparkSession): Seq[Row] = {
val hoodieCatalogTable = HoodieCatalogTable(sparkSession, table)

val notExistsColumns = orderByColumns.filterNot(table.schema.fieldNames.contains(_))
assert(notExistsColumns.isEmpty, s"Order by columns:[${notExistsColumns.mkString(",")}] is not exists" +
s" in table ${table.identifier.unquotedString}.")

ClusteringHoodiePathCommand(
hoodieCatalogTable.tableLocation,
orderByColumns,
timestamp).run(sparkSession)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.hudi.command

import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.util.ClusteringUtils

import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{IntegerType, StringType}

import scala.collection.JavaConverters._

case class ClusteringShowHoodiePathCommand(path: String, limit: Int)
extends HoodieLeafRunnableCommand {

override def run(sparkSession: SparkSession): Seq[Row] = {
val metaClient = HoodieTableMetaClient.builder().setBasePath(path)
.setConf(sparkSession.sessionState.newHadoopConf()).build()

ClusteringUtils.getAllPendingClusteringPlans(metaClient).iterator().asScala.map { p =>
Row(p.getLeft.getTimestamp, p.getRight.getInputGroups.size())
}.toSeq.take(limit)
}

override val output: Seq[Attribute] = {
Seq(
AttributeReference("timestamp", StringType, nullable = false)(),
AttributeReference("groups", IntegerType, nullable = false)()
)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.hudi.command

import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, HoodieCatalogTable}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.types.{IntegerType, StringType}

case class ClusteringShowHoodieTableCommand(table: CatalogTable, limit: Int)
extends HoodieLeafRunnableCommand {

override def run(sparkSession: SparkSession): Seq[Row] = {
val hoodieCatalogTable = HoodieCatalogTable(sparkSession, table)
ClusteringShowHoodiePathCommand(hoodieCatalogTable.tableLocation, limit).run(sparkSession)
}

override val output: Seq[Attribute] = {
Seq(
AttributeReference("timestamp", StringType, nullable = false)(),
AttributeReference("groups", IntegerType, nullable = false)()
)
}
}
Loading