diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ExportCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ExportCommand.java index fa6e15b7af696..95e7caa8a9ed0 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ExportCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ExportCommand.java @@ -18,6 +18,12 @@ package org.apache.hudi.cli.commands; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.apache.avro.specific.SpecificData; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.model.HoodieArchivedMetaEntry; import org.apache.hudi.avro.model.HoodieCleanMetadata; @@ -36,14 +42,6 @@ import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; import org.apache.hudi.common.util.ClosableIterator; import org.apache.hudi.exception.HoodieException; - -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.generic.IndexedRecord; -import org.apache.avro.specific.SpecificData; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.Path; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; import org.springframework.shell.core.CommandMarker; import org.springframework.shell.core.annotation.CliCommand; import org.springframework.shell.core.annotation.CliOption; @@ -60,10 +58,10 @@ /** * CLI commands to export various information from a HUDI dataset. - * + *

* "export instants": Export Instants and their metadata from the Timeline to a local - * directory specified by the parameter --localFolder - * The instants are exported in the json format. + * directory specified by the parameter --localFolder + * The instants are exported in the json format. */ @Component public class ExportCommand implements CommandMarker { @@ -83,7 +81,7 @@ public String exportInstants( int numExports = limit == -1 ? Integer.MAX_VALUE : limit; int numCopied = 0; - if (! new File(localFolder).isDirectory()) { + if (!new File(localFolder).isDirectory()) { throw new HoodieException(localFolder + " is not a valid local directory"); } @@ -94,7 +92,7 @@ public String exportInstants( // Archived instants are in the commit archive files FileStatus[] statuses = FSUtils.getFs(basePath, HoodieCLI.conf).globStatus(archivePath); - List archivedStatuses = Arrays.stream(statuses).sorted((f1, f2) -> (int)(f1.getModificationTime() - f2.getModificationTime())).collect(Collectors.toList()); + List archivedStatuses = Arrays.stream(statuses).sorted((f1, f2) -> (int) (f1.getModificationTime() - f2.getModificationTime())).collect(Collectors.toList()); if (descending) { Collections.reverse(nonArchivedInstants); @@ -115,11 +113,11 @@ public String exportInstants( private int copyArchivedInstants(List statuses, Set actionSet, int limit, String localFolder) throws Exception { int copyCount = 0; + FileSystem fileSystem = FSUtils.getFs(HoodieCLI.getTableMetaClient().getBasePath(), HoodieCLI.conf); for (FileStatus fs : statuses) { // read the archived file - Reader reader = HoodieLogFormat.newReader(FSUtils.getFs(HoodieCLI.getTableMetaClient().getBasePath(), HoodieCLI.conf), - new HoodieLogFile(fs.getPath()), HoodieArchivedMetaEntry.getClassSchema()); + Reader reader = HoodieLogFormat.newReader(fileSystem, new HoodieLogFile(fs.getPath()), HoodieArchivedMetaEntry.getClassSchema()); // read the avro blocks while (reader.hasNext() && copyCount < limit) { @@ -130,7 +128,7 @@ private int copyArchivedInstants(List statuses, Set actionSe // Archived instants are saved as arvo encoded HoodieArchivedMetaEntry records. We need to get the // metadata record from the entry and convert it to json. HoodieArchivedMetaEntry archiveEntryRecord = (HoodieArchivedMetaEntry) SpecificData.get() - .deepCopy(HoodieArchivedMetaEntry.SCHEMA$, ir); + .deepCopy(HoodieArchivedMetaEntry.SCHEMA$, ir); final String action = archiveEntryRecord.get("actionType").toString(); if (!actionSet.contains(action)) { continue; @@ -157,7 +155,7 @@ private int copyArchivedInstants(List statuses, Set actionSe default: throw new HoodieException("Unknown type of action " + action); } - + final String instantTime = archiveEntryRecord.get("commitTime").toString(); final String outPath = localFolder + Path.SEPARATOR + instantTime + "." + action; writeToFile(outPath, HoodieAvroUtils.avroToJson(metadata, true)); @@ -178,9 +176,8 @@ private int copyNonArchivedInstants(List instants, int limit, Str int copyCount = 0; if (instants.isEmpty()) { - return limit; + return copyCount; } - final Logger LOG = LogManager.getLogger(ExportCommand.class); final HoodieTableMetaClient metaClient = HoodieCLI.getTableMetaClient(); final HoodieActiveTimeline timeline = metaClient.getActiveTimeline(); @@ -221,6 +218,7 @@ private int copyNonArchivedInstants(List instants, int limit, Str if (data != null) { writeToFile(localPath, data); + copyCount = copyCount + 1; } } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ExportInstantsProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ExportInstantsProcedure.scala new file mode 100644 index 0000000000000..ff6ab921794c2 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ExportInstantsProcedure.scala @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command.procedures + +import org.apache.avro.generic.GenericRecord +import org.apache.avro.specific.SpecificData +import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} +import org.apache.hudi.avro.HoodieAvroUtils +import org.apache.hudi.avro.model.HoodieArchivedMetaEntry +import org.apache.hudi.common.fs.FSUtils +import org.apache.hudi.common.model.HoodieLogFile +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.table.log.HoodieLogFormat +import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock +import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline, TimelineMetadataUtils} +import org.apache.hudi.exception.HoodieException +import org.apache.spark.internal.Logging +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable +import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType} + +import java.io.File +import java.util +import java.util.Collections +import java.util.function.Supplier +import scala.collection.JavaConverters._ +import scala.util.control.Breaks.break + +class ExportInstantsProcedure extends BaseProcedure with ProcedureBuilder with Logging { + var sortByFieldParameter: ProcedureParameter = _ + + val defaultActions = "clean,commit,deltacommit,rollback,savepoint,restore" + + private val PARAMETERS = Array[ProcedureParameter]( + ProcedureParameter.required(0, "table", DataTypes.StringType, None), + ProcedureParameter.required(1, "localFolder", DataTypes.StringType, None), + ProcedureParameter.optional(2, "limit", DataTypes.IntegerType, -1), + ProcedureParameter.optional(3, "actions", DataTypes.StringType, defaultActions), + ProcedureParameter.optional(4, "desc", DataTypes.BooleanType, false) + ) + + private val OUTPUT_TYPE = new StructType(Array[StructField]( + StructField("export_detail", DataTypes.StringType, nullable = true, Metadata.empty) + )) + + def parameters: Array[ProcedureParameter] = PARAMETERS + + def outputType: StructType = OUTPUT_TYPE + + override def call(args: ProcedureArgs): Seq[Row] = { + super.checkArgs(PARAMETERS, args) + + val table = getArgValueOrDefault(args, PARAMETERS(0)).get.asInstanceOf[String] + val localFolder = getArgValueOrDefault(args, PARAMETERS(1)).get.asInstanceOf[String] + val limit = getArgValueOrDefault(args, PARAMETERS(2)).get.asInstanceOf[Int] + val actions: String = getArgValueOrDefault(args, PARAMETERS(3)).get.asInstanceOf[String] + val desc = getArgValueOrDefault(args, PARAMETERS(4)).get.asInstanceOf[Boolean] + + val hoodieCatalogTable = HoodieCatalogTable(sparkSession, new TableIdentifier(table)) + val basePath = hoodieCatalogTable.tableLocation + val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build + val archivePath = new Path(basePath + "/.hoodie/.commits_.archive*") + val actionSet: util.Set[String] = Set(actions.split(","): _*).asJava + val numExports = if (limit == -1) Integer.MAX_VALUE else limit + var numCopied = 0 + + if (!new File(localFolder).isDirectory) throw new HoodieException(localFolder + " is not a valid local directory") + + // The non archived instants can be listed from the Timeline. + val nonArchivedInstants: util.List[HoodieInstant] = metaClient + .getActiveTimeline + .filterCompletedInstants.getInstants.iterator().asScala + .filter((i: HoodieInstant) => actionSet.contains(i.getAction)) + .toList.asJava + + // Archived instants are in the commit archive files + val statuses: Array[FileStatus] = FSUtils.getFs(basePath, jsc.hadoopConfiguration()).globStatus(archivePath) + val archivedStatuses = List(statuses: _*) + .sortWith((f1, f2) => (f1.getModificationTime - f2.getModificationTime).toInt > 0).asJava + + if (desc) { + Collections.reverse(nonArchivedInstants) + numCopied = copyNonArchivedInstants(metaClient, nonArchivedInstants, numExports, localFolder) + if (numCopied < numExports) { + Collections.reverse(archivedStatuses) + numCopied += copyArchivedInstants(basePath, archivedStatuses, actionSet, numExports - numCopied, localFolder) + } + } else { + numCopied = copyArchivedInstants(basePath, archivedStatuses, actionSet, numExports, localFolder) + if (numCopied < numExports) numCopied += copyNonArchivedInstants(metaClient, nonArchivedInstants, numExports - numCopied, localFolder) + } + + Seq(Row("Exported " + numCopied + " Instants to " + localFolder)) + } + + @throws[Exception] + private def copyArchivedInstants(basePath: String, statuses: util.List[FileStatus], actionSet: util.Set[String], limit: Int, localFolder: String) = { + import scala.collection.JavaConversions._ + var copyCount = 0 + val fileSystem = FSUtils.getFs(basePath, jsc.hadoopConfiguration()) + for (fs <- statuses) { + // read the archived file + val reader = HoodieLogFormat.newReader(fileSystem, new HoodieLogFile(fs.getPath), HoodieArchivedMetaEntry.getClassSchema) + // read the avro blocks + while ( { + reader.hasNext && copyCount < limit + }) { + val blk = reader.next.asInstanceOf[HoodieAvroDataBlock] + try { + val recordItr = blk.getRecordIterator + try while ( { + recordItr.hasNext + }) { + val ir = recordItr.next + // Archived instants are saved as arvo encoded HoodieArchivedMetaEntry records. We need to get the + // metadata record from the entry and convert it to json. + val archiveEntryRecord = SpecificData.get.deepCopy(HoodieArchivedMetaEntry.SCHEMA$, ir).asInstanceOf[HoodieArchivedMetaEntry] + val action = archiveEntryRecord.get("actionType").toString + if (!actionSet.contains(action)) break() //todo: continue is not supported + val metadata: GenericRecord = action match { + case HoodieTimeline.CLEAN_ACTION => + archiveEntryRecord.getHoodieCleanMetadata + + case HoodieTimeline.COMMIT_ACTION => + archiveEntryRecord.getHoodieCommitMetadata + + case HoodieTimeline.DELTA_COMMIT_ACTION => + archiveEntryRecord.getHoodieCommitMetadata + + case HoodieTimeline.ROLLBACK_ACTION => + archiveEntryRecord.getHoodieRollbackMetadata + + case HoodieTimeline.SAVEPOINT_ACTION => + archiveEntryRecord.getHoodieSavePointMetadata + + case HoodieTimeline.COMPACTION_ACTION => + archiveEntryRecord.getHoodieCompactionMetadata + + case _ => logInfo("Unknown type of action " + action) + null + } + val instantTime = archiveEntryRecord.get("commitTime").toString + val outPath = localFolder + Path.SEPARATOR + instantTime + "." + action + if (metadata != null) writeToFile(fileSystem, outPath, HoodieAvroUtils.avroToJson(metadata, true)) + if ( { + copyCount += 1; + copyCount + } == limit) break //todo: break is not supported + } + finally if (recordItr != null) recordItr.close() + } + } + reader.close() + } + copyCount + } + + @throws[Exception] + private def copyNonArchivedInstants(metaClient: HoodieTableMetaClient, instants: util.List[HoodieInstant], limit: Int, localFolder: String): Int = { + import scala.collection.JavaConversions._ + var copyCount = 0 + if (instants.nonEmpty) { + val timeline = metaClient.getActiveTimeline + val fileSystem = FSUtils.getFs(metaClient.getBasePath, jsc.hadoopConfiguration()) + for (instant <- instants) { + val localPath = localFolder + Path.SEPARATOR + instant.getFileName + val data: Array[Byte] = instant.getAction match { + case HoodieTimeline.CLEAN_ACTION => + val metadata = TimelineMetadataUtils.deserializeHoodieCleanMetadata(timeline.getInstantDetails(instant).get) + HoodieAvroUtils.avroToJson(metadata, true) + + case HoodieTimeline.DELTA_COMMIT_ACTION => + // Already in json format + timeline.getInstantDetails(instant).get + + case HoodieTimeline.COMMIT_ACTION => + // Already in json format + timeline.getInstantDetails(instant).get + + case HoodieTimeline.COMPACTION_ACTION => + // Already in json format + timeline.getInstantDetails(instant).get + + case HoodieTimeline.ROLLBACK_ACTION => + val metadata = TimelineMetadataUtils.deserializeHoodieRollbackMetadata(timeline.getInstantDetails(instant).get) + HoodieAvroUtils.avroToJson(metadata, true) + + case HoodieTimeline.SAVEPOINT_ACTION => + val metadata = TimelineMetadataUtils.deserializeHoodieSavepointMetadata(timeline.getInstantDetails(instant).get) + HoodieAvroUtils.avroToJson(metadata, true) + + case _ => null + + } + if (data != null) { + writeToFile(fileSystem, localPath, data) + copyCount = copyCount + 1 + } + } + } + copyCount + } + + @throws[Exception] + private def writeToFile(fs: FileSystem, path: String, data: Array[Byte]): Unit = { + val out = fs.create(new Path(path)) + out.write(data) + out.flush() + out.close() + } + + override def build = new ExportInstantsProcedure() +} + +object ExportInstantsProcedure { + val NAME = "export_instants" + + def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] { + override def get() = new ExportInstantsProcedure() + } +} + + diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala index 7cfeaaa0b6cdb..ff129964faeaa 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala @@ -47,6 +47,7 @@ object HoodieProcedures { mapBuilder.put(DeleteMarkerProcedure.NAME, DeleteMarkerProcedure.builder) mapBuilder.put(ShowRollbacksProcedure.NAME, ShowRollbacksProcedure.builder) mapBuilder.put(ShowRollbackDetailProcedure.NAME, ShowRollbackDetailProcedure.builder) + mapBuilder.put(ExportInstantsProcedure.NAME, ExportInstantsProcedure.builder) mapBuilder.build } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestExportInstantsProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestExportInstantsProcedure.scala new file mode 100644 index 0000000000000..b234126930390 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestExportInstantsProcedure.scala @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.procedure + +import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase + +class TestExportInstantsProcedure extends HoodieSparkSqlTestBase { + + test("Test Call export_instants Procedure") { + withTempDir { tmp => + val tableName = generateTableName + // create table + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long + |) using hudi + | location '${tmp.getCanonicalPath}/$tableName' + | tblproperties ( + | primaryKey = 'id', + | preCombineField = 'ts' + | ) + """.stripMargin) + + // insert data to table + spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000") + + val result = spark.sql(s"""call export_instants(table => '$tableName', localFolder => '${tmp.getCanonicalPath}/$tableName')""").limit(1).collect() + assertResult(1) { + result.length + } + } + } +}