diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ExportInstantsProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ExportInstantsProcedure.scala index ff6ab921794c2..3aeb4a952d578 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ExportInstantsProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ExportInstantsProcedure.scala @@ -17,13 +17,13 @@ package org.apache.spark.sql.hudi.command.procedures -import org.apache.avro.generic.GenericRecord +import org.apache.avro.generic.{GenericRecord, IndexedRecord} import org.apache.avro.specific.SpecificData import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} import org.apache.hudi.avro.HoodieAvroUtils import org.apache.hudi.avro.model.HoodieArchivedMetaEntry import org.apache.hudi.common.fs.FSUtils -import org.apache.hudi.common.model.HoodieLogFile +import org.apache.hudi.common.model.{HoodieAvroIndexedRecord, HoodieLogFile, HoodieRecord} import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.table.log.HoodieLogFormat import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock @@ -123,7 +123,10 @@ class ExportInstantsProcedure extends BaseProcedure with ProcedureBuilder with L }) { val blk = reader.next.asInstanceOf[HoodieAvroDataBlock] try { - val recordItr = blk.getRecordIterator + val mapper = new HoodieRecord.Mapper() { + override def apply(data: IndexedRecord) = new HoodieAvroIndexedRecord(data) + } + val recordItr = blk.getRecordIterator(mapper) try while ( { recordItr.hasNext }) { diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileMetadataProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileMetadataProcedure.scala index 7f0730386e06c..fc7a58962a1fa 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileMetadataProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileMetadataProcedure.scala @@ -20,8 +20,9 @@ package org.apache.spark.sql.hudi.command.procedures import com.fasterxml.jackson.databind.ObjectMapper import com.google.common.collect.{Lists, Maps} import org.apache.hadoop.fs.Path +import org.apache.avro.generic.IndexedRecord import org.apache.hudi.common.fs.FSUtils -import org.apache.hudi.common.model.HoodieLogFile +import org.apache.hudi.common.model.{HoodieAvroIndexedRecord, HoodieLogFile, HoodieRecord} import org.apache.hudi.common.table.log.HoodieLogFormat import org.apache.hudi.common.table.log.block.HoodieLogBlock.{HeaderMetadataType, HoodieLogBlockType} import org.apache.hudi.common.table.log.block.{HoodieCorruptBlock, HoodieDataBlock} @@ -93,7 +94,10 @@ class ShowHoodieLogFileMetadataProcedure extends BaseProcedure with ProcedureBui } block match { case dataBlock: HoodieDataBlock => - val recordItr = dataBlock.getRecordIterator + val mapper = new HoodieRecord.Mapper() { + override def apply(data: IndexedRecord) = new HoodieAvroIndexedRecord(data) + } + val recordItr = dataBlock.getRecordIterator(mapper) recordItr.asScala.foreach(_ => recordCount.incrementAndGet()) recordItr.close() } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala index a7d09dceebfbf..7d2431eba6280 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala @@ -22,7 +22,7 @@ import org.apache.avro.generic.IndexedRecord import org.apache.hadoop.fs.Path import org.apache.hudi.common.config.HoodieCommonConfig import org.apache.hudi.common.fs.FSUtils -import org.apache.hudi.common.model.HoodieLogFile +import org.apache.hudi.common.model.{HoodieAvroIndexedRecord, HoodieLogFile, HoodieRecord, HoodieRecordPayload} import org.apache.hudi.common.table.log.block.HoodieDataBlock import org.apache.hudi.common.table.log.{HoodieLogFormat, HoodieMergedLogRecordScanner} import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} @@ -79,7 +79,7 @@ class ShowHoodieLogFileRecordsProcedure extends BaseProcedure with ProcedureBuil .withBitCaskDiskMapCompressionEnabled(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue) .build scanner.asScala.foreach(hoodieRecord => { - val record = hoodieRecord.getData.getInsertValue(schema).get() + val record = hoodieRecord.getData.asInstanceOf[HoodieRecordPayload[_]].getInsertValue(schema).get() if (allRecords.size() < limit) { allRecords.add(record) } @@ -93,10 +93,13 @@ class ShowHoodieLogFileRecordsProcedure extends BaseProcedure with ProcedureBuil val block = reader.next() block match { case dataBlock: HoodieDataBlock => - val recordItr = dataBlock.getRecordIterator + val mapper = new HoodieRecord.Mapper() { + override def apply(data: IndexedRecord) = new HoodieAvroIndexedRecord(data) + } + val recordItr = dataBlock.getRecordIterator(mapper) recordItr.asScala.foreach(record => { if (allRecords.size() < limit) { - allRecords.add(record) + allRecords.add(record.getData.asInstanceOf[IndexedRecord]) } }) recordItr.close()