diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala index b8893ac80dfda..0620e287d0c78 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala @@ -66,6 +66,13 @@ object HoodieProcedures { mapBuilder.put(ShowBootstrapPartitionsProcedure.NAME, ShowBootstrapPartitionsProcedure.builder) mapBuilder.put(UpgradeTableProcedure.NAME, UpgradeTableProcedure.builder) mapBuilder.put(DowngradeTableProcedure.NAME, DowngradeTableProcedure.builder) + mapBuilder.put(ListMetadataFilesProcedure.NAME, ListMetadataFilesProcedure.builder) + mapBuilder.put(ListMetadataPartitionsProcedure.NAME, ListMetadataPartitionsProcedure.builder) + mapBuilder.put(MetadataCreateProcedure.NAME, MetadataCreateProcedure.builder) + mapBuilder.put(MetadataDeleteProcedure.NAME, MetadataDeleteProcedure.builder) + mapBuilder.put(MetadataInitProcedure.NAME, MetadataInitProcedure.builder) + mapBuilder.put(ShowMetadataStatsProcedure.NAME, ShowMetadataStatsProcedure.builder) + mapBuilder.put(ValidateMetadataFilesProcedure.NAME, ValidateMetadataFilesProcedure.builder) mapBuilder.build } } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ListMetadataFilesProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ListMetadataFilesProcedure.scala new file mode 100644 index 0000000000000..591293e08dcf1 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ListMetadataFilesProcedure.scala @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command.procedures + +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hudi.common.config.HoodieMetadataConfig +import org.apache.hudi.common.engine.HoodieLocalEngineContext +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.util.HoodieTimer +import org.apache.hudi.exception.HoodieException +import org.apache.hudi.metadata.HoodieBackedTableMetadata +import org.apache.spark.internal.Logging +import org.apache.spark.sql.Row +import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType} + +import java.util +import java.util.function.Supplier + +class ListMetadataFilesProcedure() extends BaseProcedure with ProcedureBuilder with Logging { + private val PARAMETERS = Array[ProcedureParameter]( + ProcedureParameter.required(0, "table", DataTypes.StringType, None), + ProcedureParameter.optional(1, "partition", DataTypes.StringType, None) + ) + + private val OUTPUT_TYPE = new StructType(Array[StructField]( + StructField("file_path", DataTypes.StringType, nullable = true, Metadata.empty) + )) + + def parameters: Array[ProcedureParameter] = PARAMETERS + + def outputType: StructType = OUTPUT_TYPE + + override def call(args: ProcedureArgs): Seq[Row] = { + super.checkArgs(PARAMETERS, args) + + val table = getArgValueOrDefault(args, PARAMETERS(0)) + val partition = getArgValueOrDefault(args, PARAMETERS(1)).get.asInstanceOf[String] + + val basePath = getBasePath(table) + val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build + val config = HoodieMetadataConfig.newBuilder.enable(true).build + val metaReader = new HoodieBackedTableMetadata(new HoodieLocalEngineContext(metaClient.getHadoopConf), + config, basePath, "/tmp") + if (!metaReader.enabled){ + throw new HoodieException(s"Metadata Table not enabled/initialized.") + } + + val timer = new HoodieTimer().startTimer + val statuses = metaReader.getAllFilesInPartition(new Path(basePath, partition)) + logDebug("Took " + timer.endTimer + " ms") + + val rows = new util.ArrayList[Row] + statuses.toStream.sortBy(p => p.getPath.getName).foreach((f: FileStatus) => { + rows.add(Row(f.getPath.getName)) + }) + rows.stream().toArray().map(r => r.asInstanceOf[Row]).toList + } + + override def build: Procedure = new ListMetadataFilesProcedure() +} + +object ListMetadataFilesProcedure { + val NAME = "list_metadata_files" + + def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] { + override def get() = new ListMetadataFilesProcedure() + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ListMetadataPartitionsProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ListMetadataPartitionsProcedure.scala new file mode 100644 index 0000000000000..4c0bf15d90623 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ListMetadataPartitionsProcedure.scala @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command.procedures + +import org.apache.hudi.client.common.HoodieSparkEngineContext +import org.apache.hudi.common.config.HoodieMetadataConfig +import org.apache.hudi.common.util.HoodieTimer +import org.apache.hudi.exception.HoodieException +import org.apache.hudi.metadata.HoodieBackedTableMetadata +import org.apache.spark.internal.Logging +import org.apache.spark.sql.Row +import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType} + +import java.util +import java.util.Collections +import java.util.function.Supplier +import scala.collection.JavaConverters.asScalaIteratorConverter + +class ListMetadataPartitionsProcedure() extends BaseProcedure with ProcedureBuilder with Logging { + private val PARAMETERS = Array[ProcedureParameter]( + ProcedureParameter.required(0, "table", DataTypes.StringType, None) + ) + + private val OUTPUT_TYPE = new StructType(Array[StructField]( + StructField("partition", DataTypes.StringType, nullable = true, Metadata.empty) + )) + + def parameters: Array[ProcedureParameter] = PARAMETERS + + def outputType: StructType = OUTPUT_TYPE + + override def call(args: ProcedureArgs): Seq[Row] = { + super.checkArgs(PARAMETERS, args) + + val table = getArgValueOrDefault(args, PARAMETERS(0)) + + val basePath = getBasePath(table) + val config = HoodieMetadataConfig.newBuilder.enable(true).build + val metadata = new HoodieBackedTableMetadata(new HoodieSparkEngineContext(jsc), + config, basePath, "/tmp") + if (!metadata.enabled){ + throw new HoodieException(s"Metadata Table not enabled/initialized.") + } + + val timer = new HoodieTimer().startTimer + val partitions = metadata.getAllPartitionPaths + Collections.sort(partitions) + logDebug("Took " + timer.endTimer + " ms") + + val rows = new util.ArrayList[Row] + partitions.stream.iterator().asScala.foreach((p: String) => { + rows.add(Row(p)) + }) + rows.stream().toArray().map(r => r.asInstanceOf[Row]).toList + } + + override def build: Procedure = new ListMetadataPartitionsProcedure() +} + +object ListMetadataPartitionsProcedure { + val NAME = "list_metadata_partitions" + + def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] { + override def get() = new ListMetadataPartitionsProcedure() + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/MetadataCreateProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/MetadataCreateProcedure.scala new file mode 100644 index 0000000000000..e20459ea36d24 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/MetadataCreateProcedure.scala @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command.procedures + +import org.apache.hadoop.fs.Path +import org.apache.hudi.SparkAdapterSupport +import org.apache.hudi.client.common.HoodieSparkEngineContext +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.util.HoodieTimer +import org.apache.hudi.metadata.{HoodieTableMetadata, SparkHoodieBackedTableMetadataWriter} +import org.apache.spark.sql.Row +import org.apache.spark.sql.types._ + +import java.io.FileNotFoundException +import java.util.function.Supplier + +class MetadataCreateProcedure extends BaseProcedure with ProcedureBuilder with SparkAdapterSupport { + private val PARAMETERS = Array[ProcedureParameter]( + ProcedureParameter.required(0, "table", DataTypes.StringType, None) + ) + + private val OUTPUT_TYPE = new StructType(Array[StructField]( + StructField("result", DataTypes.StringType, nullable = true, Metadata.empty) + )) + + def parameters: Array[ProcedureParameter] = PARAMETERS + + def outputType: StructType = OUTPUT_TYPE + + override def call(args: ProcedureArgs): Seq[Row] = { + super.checkArgs(PARAMETERS, args) + + val tableName = getArgValueOrDefault(args, PARAMETERS(0)) + + val basePath = getBasePath(tableName) + val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build + val metadataPath = new Path(HoodieTableMetadata.getMetadataTableBasePath(basePath)) + + try { + val statuses = metaClient.getFs.listStatus(metadataPath) + if (statuses.nonEmpty) { + throw new RuntimeException("Metadata directory (" + metadataPath.toString + ") not empty.") + } + } catch { + case e: FileNotFoundException => + // Metadata directory does not exist yet + metaClient.getFs.mkdirs(metadataPath) + } + val timer = new HoodieTimer().startTimer + val writeConfig = getWriteConfig(basePath) + SparkHoodieBackedTableMetadataWriter.create(metaClient.getHadoopConf, writeConfig, new HoodieSparkEngineContext(jsc)) + Seq(Row("Created Metadata Table in " + metadataPath + " (duration=" + timer.endTimer / 1000.0 + "secs)")) + } + + override def build = new MetadataCreateProcedure() +} + +object MetadataCreateProcedure { + val NAME = "metadata_create" + var metadataBaseDirectory: Option[String] = None + + def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] { + override def get() = new MetadataCreateProcedure() + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/MetadataDeleteProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/MetadataDeleteProcedure.scala new file mode 100644 index 0000000000000..216a365117728 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/MetadataDeleteProcedure.scala @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command.procedures + +import org.apache.hadoop.fs.Path +import org.apache.hudi.SparkAdapterSupport +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.metadata.HoodieTableMetadata +import org.apache.spark.sql.Row +import org.apache.spark.sql.types._ + +import java.io.FileNotFoundException +import java.util.function.Supplier + +class MetadataDeleteProcedure extends BaseProcedure with ProcedureBuilder with SparkAdapterSupport { + private val PARAMETERS = Array[ProcedureParameter]( + ProcedureParameter.required(0, "table", DataTypes.StringType, None) + ) + + private val OUTPUT_TYPE = new StructType(Array[StructField]( + StructField("result", DataTypes.StringType, nullable = true, Metadata.empty) + )) + + def parameters: Array[ProcedureParameter] = PARAMETERS + + def outputType: StructType = OUTPUT_TYPE + + override def call(args: ProcedureArgs): Seq[Row] = { + super.checkArgs(PARAMETERS, args) + + val tableName = getArgValueOrDefault(args, PARAMETERS(0)) + val basePath = getBasePath(tableName) + val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build + val metadataPath = new Path(HoodieTableMetadata.getMetadataTableBasePath(basePath)) + + try { + val statuses = metaClient.getFs.listStatus(metadataPath) + if (statuses.nonEmpty) metaClient.getFs.delete(metadataPath, true) + } catch { + case e: FileNotFoundException => + // Metadata directory does not exist + } + Seq(Row("Removed Metadata Table from " + metadataPath)) + } + + override def build = new MetadataDeleteProcedure() +} + +object MetadataDeleteProcedure { + val NAME = "metadata_delete" + var metadataBaseDirectory: Option[String] = None + + def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] { + override def get() = new MetadataDeleteProcedure() + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/MetadataInitProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/MetadataInitProcedure.scala new file mode 100644 index 0000000000000..acd1532a979be --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/MetadataInitProcedure.scala @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command.procedures + +import org.apache.hadoop.fs.Path +import org.apache.hudi.SparkAdapterSupport +import org.apache.hudi.client.common.HoodieSparkEngineContext +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.util.HoodieTimer +import org.apache.hudi.metadata.{HoodieTableMetadata, SparkHoodieBackedTableMetadataWriter} +import org.apache.spark.internal.Logging +import org.apache.spark.sql.Row +import org.apache.spark.sql.types._ + +import java.io.FileNotFoundException +import java.util.function.Supplier + +class MetadataInitProcedure extends BaseProcedure with ProcedureBuilder with SparkAdapterSupport with Logging { + private val PARAMETERS = Array[ProcedureParameter]( + ProcedureParameter.required(0, "table", DataTypes.StringType, None), + ProcedureParameter.optional(1, "readOnly", DataTypes.BooleanType, false) + ) + + private val OUTPUT_TYPE = new StructType(Array[StructField]( + StructField("result", DataTypes.StringType, nullable = true, Metadata.empty) + )) + + def parameters: Array[ProcedureParameter] = PARAMETERS + + def outputType: StructType = OUTPUT_TYPE + + override def call(args: ProcedureArgs): Seq[Row] = { + super.checkArgs(PARAMETERS, args) + + val tableName = getArgValueOrDefault(args, PARAMETERS(0)) + val readOnly = getArgValueOrDefault(args, PARAMETERS(1)).get.asInstanceOf[Boolean] + + val basePath = getBasePath(tableName) + val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build + val metadataPath = new Path(HoodieTableMetadata.getMetadataTableBasePath(basePath)) + try { + metaClient.getFs.listStatus(metadataPath) + } catch { + case e: FileNotFoundException => + // Metadata directory does not exist yet + throw new RuntimeException("Metadata directory (" + metadataPath.toString + ") does not exist.") + } + + val timer = new HoodieTimer().startTimer + if (!readOnly) { + val writeConfig = getWriteConfig(basePath) + SparkHoodieBackedTableMetadataWriter.create(metaClient.getHadoopConf, writeConfig, new HoodieSparkEngineContext(jsc)) + } + + val action = if (readOnly) "Opened" else "Initialized" + Seq(Row(action + " Metadata Table in " + metadataPath + " (duration=" + timer.endTimer / 1000.0 + "sec)")) + } + + override def build = new MetadataInitProcedure() +} + +object MetadataInitProcedure { + val NAME = "metadata_init" + var metadataBaseDirectory: Option[String] = None + + def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] { + override def get() = new MetadataInitProcedure() + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowMetadataStatsProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowMetadataStatsProcedure.scala new file mode 100644 index 0000000000000..9a73a51fd128c --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowMetadataStatsProcedure.scala @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command.procedures + +import org.apache.hudi.common.config.HoodieMetadataConfig +import org.apache.hudi.common.engine.HoodieLocalEngineContext +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.metadata.HoodieBackedTableMetadata +import org.apache.spark.sql.Row +import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType} + +import java.util +import java.util.function.Supplier +import scala.collection.JavaConversions._ + +class ShowMetadataStatsProcedure() extends BaseProcedure with ProcedureBuilder { + private val PARAMETERS = Array[ProcedureParameter]( + ProcedureParameter.required(0, "table", DataTypes.StringType, None) + ) + + private val OUTPUT_TYPE = new StructType(Array[StructField]( + StructField("stat_key", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("stat_value", DataTypes.StringType, nullable = true, Metadata.empty) + )) + + def parameters: Array[ProcedureParameter] = PARAMETERS + + def outputType: StructType = OUTPUT_TYPE + + override def call(args: ProcedureArgs): Seq[Row] = { + super.checkArgs(PARAMETERS, args) + + val table = getArgValueOrDefault(args, PARAMETERS(0)) + + val basePath = getBasePath(table) + val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build + val config = HoodieMetadataConfig.newBuilder.enable(true).build + val metadata = new HoodieBackedTableMetadata(new HoodieLocalEngineContext(metaClient.getHadoopConf), + config, basePath, "/tmp") + val stats = metadata.stats + + val rows = new util.ArrayList[Row] + for (entry <- stats.entrySet) { + rows.add(Row(entry.getKey, entry.getValue)) + } + rows.stream().toArray().map(r => r.asInstanceOf[Row]).toList + } + + override def build: Procedure = new ShowMetadataStatsProcedure() +} + +object ShowMetadataStatsProcedure { + val NAME = "show_metadata_stats" + + def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] { + override def get() = new ShowMetadataStatsProcedure() + } +} + + diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ValidateMetadataFilesProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ValidateMetadataFilesProcedure.scala new file mode 100644 index 0000000000000..b3c125942acc7 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ValidateMetadataFilesProcedure.scala @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command.procedures + +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hudi.common.config.HoodieMetadataConfig +import org.apache.hudi.common.engine.HoodieLocalEngineContext +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.util.HoodieTimer +import org.apache.hudi.exception.HoodieException +import org.apache.hudi.metadata.HoodieBackedTableMetadata +import org.apache.spark.internal.Logging +import org.apache.spark.sql.Row +import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType} + +import java.util +import java.util.Collections +import java.util.function.Supplier +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters.asScalaIteratorConverter + +class ValidateMetadataFilesProcedure() extends BaseProcedure with ProcedureBuilder with Logging { + private val PARAMETERS = Array[ProcedureParameter]( + ProcedureParameter.required(0, "table", DataTypes.StringType, None), + ProcedureParameter.optional(1, "verbose", DataTypes.BooleanType, false) + ) + + private val OUTPUT_TYPE = new StructType(Array[StructField]( + StructField("partition", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("file_name", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("is_present_in_fs", DataTypes.BooleanType, nullable = true, Metadata.empty), + StructField("is_resent_in_metadata", DataTypes.BooleanType, nullable = true, Metadata.empty), + StructField("fs_size", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("metadata_size", DataTypes.LongType, nullable = true, Metadata.empty) + )) + + def parameters: Array[ProcedureParameter] = PARAMETERS + + def outputType: StructType = OUTPUT_TYPE + + override def call(args: ProcedureArgs): Seq[Row] = { + super.checkArgs(PARAMETERS, args) + + val table = getArgValueOrDefault(args, PARAMETERS(0)) + val verbose = getArgValueOrDefault(args, PARAMETERS(1)).get.asInstanceOf[Boolean] + + val basePath = getBasePath(table) + val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build + val config = HoodieMetadataConfig.newBuilder.enable(true).build + val metadataReader = new HoodieBackedTableMetadata(new HoodieLocalEngineContext(metaClient.getHadoopConf), + config, basePath, "/tmp") + + if (!metadataReader.enabled){ + throw new HoodieException(s"Metadata Table not enabled/initialized.") + } + + val fsConfig = HoodieMetadataConfig.newBuilder.enable(false).build + val fsMetaReader = new HoodieBackedTableMetadata(new HoodieLocalEngineContext(metaClient.getHadoopConf), + fsConfig, basePath, "/tmp") + + val timer = new HoodieTimer().startTimer + val metadataPartitions = metadataReader.getAllPartitionPaths + logDebug("Listing partitions Took " + timer.endTimer + " ms") + val fsPartitions = fsMetaReader.getAllPartitionPaths + Collections.sort(fsPartitions) + Collections.sort(metadataPartitions) + + val allPartitions = new util.HashSet[String] + allPartitions.addAll(fsPartitions) + allPartitions.addAll(metadataPartitions) + + if (!fsPartitions.equals(metadataPartitions)) { + logError("FS partition listing is not matching with metadata partition listing!") + logError("All FS partitions: " + util.Arrays.toString(fsPartitions.toArray)) + logError("All Metadata partitions: " + util.Arrays.toString(metadataPartitions.toArray)) + } + + val rows = new util.ArrayList[Row] + for (partition <- allPartitions) { + val fileStatusMap = new util.HashMap[String, FileStatus] + val metadataFileStatusMap = new util.HashMap[String, FileStatus] + val metadataStatuses = metadataReader.getAllFilesInPartition(new Path(basePath, partition)) + util.Arrays.stream(metadataStatuses).iterator().asScala.foreach((entry: FileStatus) => metadataFileStatusMap.put(entry.getPath.getName, entry)) + val fsStatuses = fsMetaReader.getAllFilesInPartition(new Path(basePath, partition)) + util.Arrays.stream(fsStatuses).iterator().asScala.foreach((entry: FileStatus) => fileStatusMap.put(entry.getPath.getName, entry)) + val allFiles = new util.HashSet[String] + allFiles.addAll(fileStatusMap.keySet) + allFiles.addAll(metadataFileStatusMap.keySet) + for (file <- allFiles) { + val fsFileStatus = fileStatusMap.get(file) + val metaFileStatus = metadataFileStatusMap.get(file) + val doesFsFileExists = fsFileStatus != null + val doesMetadataFileExists = metaFileStatus != null + val fsFileLength = if (doesFsFileExists) fsFileStatus.getLen else 0 + val metadataFileLength = if (doesMetadataFileExists) metaFileStatus.getLen else 0 + if (verbose) { // if verbose print all files + rows.add(Row(partition, file, doesFsFileExists, doesMetadataFileExists, fsFileLength, metadataFileLength)) + } else if ((doesFsFileExists != doesMetadataFileExists) || (fsFileLength != metadataFileLength)) { // if non verbose, print only non matching files + rows.add(Row(partition, file, doesFsFileExists, doesMetadataFileExists, fsFileLength, metadataFileLength)) + } + } + if (metadataStatuses.length != fsStatuses.length) { + logError(" FS and metadata files count not matching for " + partition + ". FS files count " + fsStatuses.length + ", metadata base files count " + metadataStatuses.length) + } + for (entry <- fileStatusMap.entrySet) { + if (!metadataFileStatusMap.containsKey(entry.getKey)) { + logError("FS file not found in metadata " + entry.getKey) + } else if (entry.getValue.getLen != metadataFileStatusMap.get(entry.getKey).getLen) { + logError(" FS file size mismatch " + entry.getKey + ", size equality " + (entry.getValue.getLen == metadataFileStatusMap.get(entry.getKey).getLen) + ". FS size " + entry.getValue.getLen + ", metadata size " + metadataFileStatusMap.get(entry.getKey).getLen) + } + } + for (entry <- metadataFileStatusMap.entrySet) { + if (!fileStatusMap.containsKey(entry.getKey)) { + logError("Metadata file not found in FS " + entry.getKey) + } else if (entry.getValue.getLen != fileStatusMap.get(entry.getKey).getLen) { + logError(" Metadata file size mismatch " + entry.getKey + ", size equality " + (entry.getValue.getLen == fileStatusMap.get(entry.getKey).getLen) + ". Metadata size " + entry.getValue.getLen + ", FS size " + metadataFileStatusMap.get(entry.getKey).getLen) + } + } + } + rows.stream().toArray().map(r => r.asInstanceOf[Row]).toList + } + + override def build: Procedure = new ValidateMetadataFilesProcedure() +} + +object ValidateMetadataFilesProcedure { + val NAME = "validate_metadata_files" + + def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] { + override def get() = new ValidateMetadataFilesProcedure() + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestMetadataProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestMetadataProcedure.scala new file mode 100644 index 0000000000000..9dbb8f22ec262 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestMetadataProcedure.scala @@ -0,0 +1,262 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.procedure + +import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase + +class TestMetadataProcedure extends HoodieSparkSqlTestBase { + + test("Test Call metadata_delete Procedure") { + withTempDir { tmp => + val tableName = generateTableName + // create table + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long + |) using hudi + | location '${tmp.getCanonicalPath}/$tableName' + | tblproperties ( + | primaryKey = 'id', + | preCombineField = 'ts' + | ) + """.stripMargin) + // insert data to table + spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000") + spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500") + + // delete the metadata + val deleteResult = spark.sql(s"""call metadata_delete(table => '$tableName')""").collect() + assertResult(1) { + deleteResult.length + } + } + } + + test("Test Call metadata_create Procedure") { + withTempDir { tmp => + val tableName = generateTableName + // create table + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long + |) using hudi + | location '${tmp.getCanonicalPath}/$tableName' + | tblproperties ( + | primaryKey = 'id', + | preCombineField = 'ts' + | ) + """.stripMargin) + // insert data to table + spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000") + spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500") + + // The first step is delete the metadata + val deleteResult = spark.sql(s"""call metadata_delete(table => '$tableName')""").collect() + assertResult(1) { + deleteResult.length + } + + // The second step is create the metadata + val createResult = spark.sql(s"""call metadata_create(table => '$tableName')""").collect() + assertResult(1) { + createResult.length + } + } + } + + test("Test Call metadata_init Procedure") { + withTempDir { tmp => + val tableName = generateTableName + // create table + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long + |) using hudi + | location '${tmp.getCanonicalPath}/$tableName' + | tblproperties ( + | primaryKey = 'id', + | preCombineField = 'ts' + | ) + """.stripMargin) + // insert data to table + spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000") + spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500") + + // read only, no initialize + val readResult = spark.sql(s"""call metadata_init(table => '$tableName', readOnly => true)""").collect() + assertResult(1) { + readResult.length + } + + // initialize metadata + val initResult = spark.sql(s"""call metadata_init(table => '$tableName')""").collect() + assertResult(1) { + initResult.length + } + } + } + + test("Test Call show_metadata_stats Procedure") { + withTempDir { tmp => + val tableName = generateTableName + // create table + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long + |) using hudi + | location '${tmp.getCanonicalPath}/$tableName' + | tblproperties ( + | primaryKey = 'id', + | preCombineField = 'ts', + | hoodie.metadata.metrics.enable = 'true' + | ) + """.stripMargin) + // insert data to table + spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000") + spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500") + + // collect metadata stats for table + val metadataStats = spark.sql(s"""call show_metadata_stats(table => '$tableName')""").collect() + assertResult(0) { + metadataStats.length + } + } + } + + test("Test Call list_metadata_partitions Procedure") { + withTempDir { tmp => + val tableName = generateTableName + // create table + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long + |) using hudi + | location '${tmp.getCanonicalPath}/$tableName' + | partitioned by (ts) + | tblproperties ( + | primaryKey = 'id', + | preCombineField = 'ts' + | ) + """.stripMargin) + // insert data to table + spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000") + spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500") + + // collect metadata partitions for table + val partitions = spark.sql(s"""call list_metadata_partitions(table => '$tableName')""").collect() + assertResult(2) { + partitions.length + } + } + } + + test("Test Call list_metadata_files Procedure") { + withTempDir { tmp => + val tableName = generateTableName + // create table + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long + |) using hudi + | location '${tmp.getCanonicalPath}/$tableName' + | partitioned by (ts) + | tblproperties ( + | primaryKey = 'id', + | preCombineField = 'ts' + | ) + """.stripMargin) + // insert data to table + spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000") + spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500") + + // collect metadata partitions for table + val partitions = spark.sql(s"""call list_metadata_partitions(table => '$tableName')""").collect() + assertResult(2) { + partitions.length + } + + // collect metadata files for a partition of a table + val partition = partitions(0).get(0).toString + val filesResult = spark.sql(s"""call list_metadata_files(table => '$tableName', partition => '$partition')""").collect() + assertResult(1) { + filesResult.length + } + } + } + + test("Test Call validate_metadata_files Procedure") { + withTempDir { tmp => + val tableName = generateTableName + // create table + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long + |) using hudi + | location '${tmp.getCanonicalPath}/$tableName' + | partitioned by (ts) + | tblproperties ( + | primaryKey = 'id', + | preCombineField = 'ts' + | ) + """.stripMargin) + // insert data to table + spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000") + spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500") + + // collect validate metadata files result + val validateFilesResult = spark.sql(s"""call validate_metadata_files(table => '$tableName')""").collect() + assertResult(0) { + validateFilesResult.length + } + + // collect validate metadata files result with verbose + val validateFilesVerboseResult = spark.sql(s"""call validate_metadata_files(table => '$tableName', verbose => true)""").collect() + assertResult(2) { + validateFilesVerboseResult.length + } + } + } +}