diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala index d974e216d585d..7f842a795fcab 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala @@ -50,6 +50,8 @@ object HoodieProcedures { mapBuilder.put(ExportInstantsProcedure.NAME, ExportInstantsProcedure.builder) mapBuilder.put(ShowAllFileSystemViewProcedure.NAME, ShowAllFileSystemViewProcedure.builder) mapBuilder.put(ShowLatestFileSystemViewProcedure.NAME, ShowLatestFileSystemViewProcedure.builder) + mapBuilder.put(ShowHoodieLogFileMetadataProcedure.NAME, ShowHoodieLogFileMetadataProcedure.builder) + mapBuilder.put(ShowHoodieLogFileRecordsProcedure.NAME, ShowHoodieLogFileRecordsProcedure.builder) mapBuilder.build } } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileMetadataProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileMetadataProcedure.scala new file mode 100644 index 0000000000000..7f0730386e06c --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileMetadataProcedure.scala @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command.procedures + +import com.fasterxml.jackson.databind.ObjectMapper +import com.google.common.collect.{Lists, Maps} +import org.apache.hadoop.fs.Path +import org.apache.hudi.common.fs.FSUtils +import org.apache.hudi.common.model.HoodieLogFile +import org.apache.hudi.common.table.log.HoodieLogFormat +import org.apache.hudi.common.table.log.block.HoodieLogBlock.{HeaderMetadataType, HoodieLogBlockType} +import org.apache.hudi.common.table.log.block.{HoodieCorruptBlock, HoodieDataBlock} +import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} +import org.apache.parquet.avro.AvroSchemaConverter +import org.apache.spark.sql.Row +import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType} + +import java.util.Objects +import java.util.concurrent.atomic.AtomicInteger +import java.util.function.Supplier +import scala.collection.JavaConverters.{asScalaBufferConverter, asScalaIteratorConverter, mapAsScalaMapConverter} + +class ShowHoodieLogFileMetadataProcedure extends BaseProcedure with ProcedureBuilder { + override def parameters: Array[ProcedureParameter] = Array[ProcedureParameter]( + ProcedureParameter.required(0, "table", DataTypes.StringType, None), + ProcedureParameter.required(1, "log_file_path_pattern", DataTypes.StringType, None), + ProcedureParameter.optional(2, "limit", DataTypes.IntegerType, 10) + ) + + override def outputType: StructType = StructType(Array[StructField]( + StructField("instant_time", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("record_count", DataTypes.IntegerType, nullable = true, Metadata.empty), + StructField("block_type", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("header_metadata", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("footer_metadata", DataTypes.StringType, nullable = true, Metadata.empty) + )) + + override def call(args: ProcedureArgs): Seq[Row] = { + checkArgs(parameters, args) + val table = getArgValueOrDefault(args, parameters(0)) + val logFilePathPattern: String = getArgValueOrDefault(args, parameters(1)).get.asInstanceOf[String] + val limit: Int = getArgValueOrDefault(args, parameters(2)).get.asInstanceOf[Int] + val basePath = getBasePath(table) + val fs = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build.getFs + val logFilePaths = FSUtils.getGlobStatusExcludingMetaFolder(fs, new Path(logFilePathPattern)).iterator().asScala + .map(_.getPath.toString).toList + val commitCountAndMetadata: java.util.Map[String, java.util.List[(HoodieLogBlockType, (java.util.Map[HeaderMetadataType, String], java.util.Map[HeaderMetadataType, String]), Int)]] = Maps.newHashMap() + var numCorruptBlocks = 0 + var dummyInstantTimeCount = 0 + logFilePaths.foreach { + logFilePath => { + val statuses = fs.listStatus(new Path(logFilePath)) + val schema = new AvroSchemaConverter() + .convert(Objects.requireNonNull(TableSchemaResolver.readSchemaFromLogFile(fs, new Path(logFilePath)))) + val reader = HoodieLogFormat.newReader(fs, new HoodieLogFile(statuses(0).getPath), schema) + + // read the avro blocks + while (reader.hasNext) { + val block = reader.next() + val recordCount = new AtomicInteger(0) + var instantTime: String = null + if (block.isInstanceOf[HoodieCorruptBlock]) { + try { + instantTime = block.getLogBlockHeader.get(HeaderMetadataType.INSTANT_TIME) + if (null == instantTime) { + throw new java.lang.Exception("Invalid instant time " + instantTime) + } + } catch { + case _: java.lang.Exception => + numCorruptBlocks = numCorruptBlocks + 1; + instantTime = "corrupt_block_" + numCorruptBlocks + } + } else { + instantTime = block.getLogBlockHeader.get(HeaderMetadataType.INSTANT_TIME) + if (null == instantTime) { + dummyInstantTimeCount = dummyInstantTimeCount + 1 + instantTime = "dummy_instant_time_" + dummyInstantTimeCount + } + block match { + case dataBlock: HoodieDataBlock => + val recordItr = dataBlock.getRecordIterator + recordItr.asScala.foreach(_ => recordCount.incrementAndGet()) + recordItr.close() + } + } + if (commitCountAndMetadata.containsKey(instantTime)) { + val list = commitCountAndMetadata.get(instantTime) + list.add((block.getBlockType, (block.getLogBlockHeader, block.getLogBlockFooter), recordCount.get())) + } else { + val list = Lists.newArrayList((block.getBlockType, (block.getLogBlockHeader, block.getLogBlockFooter), recordCount.get())) + commitCountAndMetadata.put(instantTime, list) + } + } + reader.close() + } + } + val rows: java.util.List[Row] = Lists.newArrayList() + val objectMapper = new ObjectMapper() + commitCountAndMetadata.asScala.foreach { + case (instantTime, values) => + values.asScala.foreach { + tuple3 => + rows.add(Row( + instantTime, + tuple3._3, + tuple3._1.toString, + objectMapper.writeValueAsString(tuple3._2._1), + objectMapper.writeValueAsString(tuple3._2._2) + )) + } + } + rows.stream().limit(limit).toArray().map(r => r.asInstanceOf[Row]).toList + } + + override def build: Procedure = new ShowHoodieLogFileMetadataProcedure +} + +object ShowHoodieLogFileMetadataProcedure { + val NAME = "show_logfile_metadata" + + def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] { + override def get(): ProcedureBuilder = new ShowHoodieLogFileMetadataProcedure() + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala new file mode 100644 index 0000000000000..a7d09dceebfbf --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command.procedures + +import com.google.common.collect.Lists +import org.apache.avro.generic.IndexedRecord +import org.apache.hadoop.fs.Path +import org.apache.hudi.common.config.HoodieCommonConfig +import org.apache.hudi.common.fs.FSUtils +import org.apache.hudi.common.model.HoodieLogFile +import org.apache.hudi.common.table.log.block.HoodieDataBlock +import org.apache.hudi.common.table.log.{HoodieLogFormat, HoodieMergedLogRecordScanner} +import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} +import org.apache.hudi.common.util.ValidationUtils +import org.apache.hudi.config.{HoodieCompactionConfig, HoodieMemoryConfig} +import org.apache.parquet.avro.AvroSchemaConverter +import org.apache.spark.sql.Row +import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType} + +import java.util.Objects +import java.util.function.Supplier +import scala.collection.JavaConverters._ + +class ShowHoodieLogFileRecordsProcedure extends BaseProcedure with ProcedureBuilder { + override def parameters: Array[ProcedureParameter] = Array[ProcedureParameter]( + ProcedureParameter.required(0, "table", DataTypes.StringType, None), + ProcedureParameter.required(1, "log_file_path_pattern", DataTypes.StringType, None), + ProcedureParameter.optional(2, "merge", DataTypes.BooleanType, false), + ProcedureParameter.optional(3, "limit", DataTypes.IntegerType, 10) + ) + + override def outputType: StructType = StructType(Array[StructField]( + StructField("records", DataTypes.StringType, nullable = true, Metadata.empty) + )) + + override def call(args: ProcedureArgs): Seq[Row] = { + checkArgs(parameters, args) + val table = getArgValueOrDefault(args, parameters(0)) + val logFilePathPattern: String = getArgValueOrDefault(args, parameters(1)).get.asInstanceOf[String] + val merge: Boolean = getArgValueOrDefault(args, parameters(2)).get.asInstanceOf[Boolean] + val limit: Int = getArgValueOrDefault(args, parameters(3)).get.asInstanceOf[Int] + val basePath = getBasePath(table) + val client = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build + val fs = client.getFs + val logFilePaths = FSUtils.getGlobStatusExcludingMetaFolder(fs, new Path(logFilePathPattern)).iterator().asScala + .map(_.getPath.toString).toList + ValidationUtils.checkArgument(logFilePaths.nonEmpty, "There is no log file") + val converter = new AvroSchemaConverter() + val allRecords: java.util.List[IndexedRecord] = Lists.newArrayList() + if (merge) { + val schema = converter.convert(Objects.requireNonNull(TableSchemaResolver.readSchemaFromLogFile(fs, new Path(logFilePaths.last)))) + val scanner = HoodieMergedLogRecordScanner.newBuilder + .withFileSystem(fs) + .withBasePath(basePath) + .withLogFilePaths(logFilePaths.asJava) + .withReaderSchema(schema) + .withLatestInstantTime(client.getActiveTimeline.getCommitTimeline.lastInstant.get.getTimestamp) + .withReadBlocksLazily(java.lang.Boolean.parseBoolean(HoodieCompactionConfig.COMPACTION_LAZY_BLOCK_READ_ENABLE.defaultValue)) + .withReverseReader(java.lang.Boolean.parseBoolean(HoodieCompactionConfig.COMPACTION_REVERSE_LOG_READ_ENABLE.defaultValue)) + .withBufferSize(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE.defaultValue) + .withMaxMemorySizeInBytes(HoodieMemoryConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES) + .withSpillableMapBasePath(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH.defaultValue) + .withDiskMapType(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue) + .withBitCaskDiskMapCompressionEnabled(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue) + .build + scanner.asScala.foreach(hoodieRecord => { + val record = hoodieRecord.getData.getInsertValue(schema).get() + if (allRecords.size() < limit) { + allRecords.add(record) + } + }) + } else { + logFilePaths.toStream.takeWhile(_ => allRecords.size() < limit).foreach { + logFilePath => { + val schema = converter.convert(Objects.requireNonNull(TableSchemaResolver.readSchemaFromLogFile(fs, new Path(logFilePath)))) + val reader = HoodieLogFormat.newReader(fs, new HoodieLogFile(logFilePath), schema) + while (reader.hasNext) { + val block = reader.next() + block match { + case dataBlock: HoodieDataBlock => + val recordItr = dataBlock.getRecordIterator + recordItr.asScala.foreach(record => { + if (allRecords.size() < limit) { + allRecords.add(record) + } + }) + recordItr.close() + } + } + reader.close() + } + } + } + val rows: java.util.List[Row] = Lists.newArrayList() + allRecords.asScala.foreach(record => { + rows.add(Row(record.toString)) + }) + rows.asScala + } + + override def build: Procedure = new ShowHoodieLogFileRecordsProcedure +} + +object ShowHoodieLogFileRecordsProcedure { + val NAME = "show_logfile_records" + + def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] { + override def get(): ProcedureBuilder = new ShowHoodieLogFileRecordsProcedure() + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestHoodieLogFileProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestHoodieLogFileProcedure.scala new file mode 100644 index 0000000000000..41954c80025e3 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestHoodieLogFileProcedure.scala @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.hudi.procedure + +import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase + +class TestHoodieLogFileProcedure extends HoodieSparkSqlTestBase { + test("Test Call show_logfile_metadata Procedure") { + withTempDir { tmp => + val tableName = generateTableName + val tablePath = s"${tmp.getCanonicalPath}/$tableName" + // create table + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long + |) using hudi + | partitioned by (ts) + | location '$tablePath' + | tblproperties ( + | type = 'mor', + | primaryKey = 'id', + | preCombineField = 'ts' + | ) + """.stripMargin) + // insert data to table + spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000") + spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500") + spark.sql(s"update $tableName set name = 'b1', price = 100 where id = 1") + + // Check required fields + checkExceptionContain(s"""call show_logfile_metadata(limit => 10)""")( + s"Argument: table is required") + + // collect result for table + val result = spark.sql( + s"""call show_logfile_metadata(table => '$tableName', log_file_path_pattern => '$tablePath/ts=1000/*.log.*')""".stripMargin).collect() + assertResult(1) { + result.length + } + } + } + + test("Test Call show_logfile_records Procedure") { + withTempDir { tmp => + val tableName = generateTableName + val tablePath = s"${tmp.getCanonicalPath}/$tableName" + // create table + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long + |) using hudi + | partitioned by (ts) + | location '$tablePath' + | tblproperties ( + | type = 'mor', + | primaryKey = 'id', + | preCombineField = 'ts' + | ) + """.stripMargin) + // insert data to table + spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000") + spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500") + spark.sql(s"update $tableName set name = 'b1' where id = 1") + spark.sql(s"update $tableName set name = 'b2' where id = 2") + + // Check required fields + checkExceptionContain(s"""call show_logfile_records(limit => 10)""")( + s"Argument: table is required") + + // collect result for table + val result = spark.sql( + s"""call show_logfile_records(table => '$tableName', log_file_path_pattern => '$tablePath/*/*.log.*', limit => 1)""".stripMargin).collect() + assertResult(1) { + result.length + } + } + } +}