diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java index a33383a05c02d..03e3d991141c1 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java @@ -360,7 +360,12 @@ protected void appendDataAndDeleteBlocks(Map header) header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, writeSchemaWithMetaFields.toString()); List blocks = new ArrayList<>(2); if (recordList.size() > 0) { - blocks.add(HoodieDataBlock.getBlock(hoodieTable.getLogDataBlockFormat(), recordList, header)); + if (config.populateMetaFields()) { + blocks.add(HoodieDataBlock.getBlock(hoodieTable.getLogDataBlockFormat(), recordList, header)); + } else { + final String keyField = hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp(); + blocks.add(HoodieDataBlock.getBlock(hoodieTable.getLogDataBlockFormat(), recordList, header, keyField)); + } } if (keysToDelete.size() > 0) { blocks.add(new HoodieDeleteBlock(keysToDelete.toArray(new HoodieKey[keysToDelete.size()]), header)); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index 6cbc595c55974..6886573c16deb 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -41,6 +41,7 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.WriteConcurrencyMode; +import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.log.HoodieLogFormat; import org.apache.hudi.common.table.log.block.HoodieDeleteBlock; @@ -76,6 +77,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.stream.Collectors; import static org.apache.hudi.common.table.HoodieTableConfig.ARCHIVELOG_FOLDER; @@ -91,6 +93,10 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta private static final Logger LOG = LogManager.getLogger(HoodieBackedTableMetadataWriter.class); + // Virtual keys support for metadata table. This Field is + // from the metadata payload schema. + private static final String RECORD_KEY_FIELD = HoodieMetadataPayload.SCHEMA_FIELD_ID_KEY; + protected HoodieWriteConfig metadataWriteConfig; protected HoodieWriteConfig dataWriteConfig; protected String tableName; @@ -202,7 +208,15 @@ private HoodieWriteConfig createMetadataWriteConfig(HoodieWriteConfig writeConfi .withDeleteParallelism(parallelism) .withRollbackParallelism(parallelism) .withFinalizeWriteParallelism(parallelism) - .withAllowMultiWriteOnSameInstant(true); + .withAllowMultiWriteOnSameInstant(true) + .withKeyGenerator(HoodieTableMetadataKeyGenerator.class.getCanonicalName()) + .withPopulateMetaFields(dataWriteConfig.getMetadataConfig().populateMetaFields()); + + // RecordKey properties are needed for the metadata table records + final Properties properties = new Properties(); + properties.put(HoodieTableConfig.RECORDKEY_FIELDS.key(), RECORD_KEY_FIELD); + properties.put("hoodie.datasource.write.recordkey.field", RECORD_KEY_FIELD); + builder.withProperties(properties); if (writeConfig.isMetricsOn()) { builder.withMetricsConfig(HoodieMetricsConfig.newBuilder() @@ -395,9 +409,12 @@ private boolean bootstrapFromFilesystem(HoodieEngineContext engineContext, Hoodi .setTableType(HoodieTableType.MERGE_ON_READ) .setTableName(tableName) .setArchiveLogFolder(ARCHIVELOG_FOLDER.defaultValue()) - .setPayloadClassName(HoodieMetadataPayload.class.getName()) - .setBaseFileFormat(HoodieFileFormat.HFILE.toString()) - .initTable(hadoopConf.get(), metadataWriteConfig.getBasePath()); + .setPayloadClassName(HoodieMetadataPayload.class.getName()) + .setBaseFileFormat(HoodieFileFormat.HFILE.toString()) + .setRecordKeyFields(RECORD_KEY_FIELD) + .setPopulateMetaFields(dataWriteConfig.getMetadataConfig().populateMetaFields()) + .setKeyGeneratorClassProp(HoodieTableMetadataKeyGenerator.class.getCanonicalName()) + .initTable(hadoopConf.get(), metadataWriteConfig.getBasePath()); initTableMetadata(); initializeFileGroups(dataMetaClient, MetadataPartitionType.FILES, createInstantTime, 1); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataKeyGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataKeyGenerator.java new file mode 100644 index 0000000000000..e9d7aec8c8e2a --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataKeyGenerator.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.metadata; + +import org.apache.avro.generic.GenericRecord; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.keygen.BaseKeyGenerator; +import org.apache.hudi.keygen.KeyGenUtils; + +/** + * Custom key generator for the Hoodie table metadata. The metadata table record payload + * has an internal schema with a known key field HoodieMetadataPayload.SCHEMA_FIELD_ID_KEY. + * With or without the virtual keys, getting the key from the metadata table record is always + * via the above field and there is no real need for a key generator. But, when a write + * client is instantiated for the metadata table, when virtual keys are enabled, and when + * key generator class is not configured, the default SimpleKeyGenerator will be used. + * To avoid using any other key generators for the metadata table which rely on certain + * config properties, we need this custom key generator exclusively for the metadata table. + */ +public class HoodieTableMetadataKeyGenerator extends BaseKeyGenerator { + + public HoodieTableMetadataKeyGenerator(TypedProperties config) { + super(config); + } + + @Override + public String getRecordKey(GenericRecord record) { + return KeyGenUtils.getRecordKey(record, HoodieMetadataPayload.SCHEMA_FIELD_ID_KEY); + } + + @Override + public String getPartitionPath(GenericRecord record) { + return ""; + } +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java index bff91c3e56c2b..21a4fb58932b8 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java @@ -346,7 +346,8 @@ private void writeToFile(Schema wrapperSchema, List records) thro if (records.size() > 0) { Map header = new HashMap<>(); header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, wrapperSchema.toString()); - HoodieAvroDataBlock block = new HoodieAvroDataBlock(records, header); + final String keyField = table.getMetaClient().getTableConfig().getRecordKeyFieldProp(); + HoodieAvroDataBlock block = new HoodieAvroDataBlock(records, header, keyField); writer.appendBlock(block); records.clear(); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java index 09a4e2eb588ea..73e1413d9dde0 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java @@ -180,6 +180,7 @@ public List compact(HoodieCompactionHandler compactionHandler, .withDiskMapType(config.getCommonConfig().getSpillableDiskMapType()) .withBitCaskDiskMapCompressionEnabled(config.getCommonConfig().isBitCaskDiskMapCompressionEnabled()) .withOperationField(config.allowOperationMetadataField()) + .withPartition(operation.getPartitionPath()) .build(); if (!scanner.iterator().hasNext()) { scanner.close(); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java index 69e21bec9dde1..92dda123fed47 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java @@ -203,6 +203,7 @@ private JavaRDD> readRecordsForGroupWithLogs(JavaSparkContext js .withReverseReader(config.getCompactionReverseLogReadEnabled()) .withBufferSize(config.getMaxDFSStreamBufferSize()) .withSpillableMapBasePath(config.getSpillableMapBasePath()) + .withPartition(clusteringOp.getPartitionPath()) .build(); Option baseFileReader = StringUtils.isNullOrEmpty(clusteringOp.getDataFilePath()) diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java index 60765b05da74f..b84a8abdcc796 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java @@ -47,9 +47,9 @@ public SimpleKeyGenerator(TypedProperties props) { SimpleKeyGenerator(TypedProperties props, String recordKeyField, String partitionPathField) { super(props); this.recordKeyFields = recordKeyField == null - ? Collections.emptyList() - : Collections.singletonList(recordKeyField); - this.partitionPathFields = Collections.singletonList(partitionPathField); + ? Collections.emptyList() : Collections.singletonList(recordKeyField); + this.partitionPathFields = partitionPathField == null + ? Collections.emptyList() : Collections.singletonList(partitionPathField); simpleAvroKeyGenerator = new SimpleAvroKeyGenerator(props, recordKeyField, partitionPathField); } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java index 59e12a1515ad5..005d031cb9df2 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java @@ -88,6 +88,7 @@ import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.EnumSource; import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.ValueSource; import java.io.IOException; import java.nio.file.Files; @@ -358,8 +359,9 @@ public void testRollbackOperations(HoodieTableType tableType) throws Exception { * Test that manual rollbacks work correctly and enough timeline history is maintained on the metadata table * timeline. */ - @Test - public void testManualRollbacks() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testManualRollbacks(final boolean populateMateFields) throws Exception { HoodieTableType tableType = COPY_ON_WRITE; init(tableType, false); // Setting to archive more aggressively on the Metadata Table than the Dataset @@ -369,7 +371,9 @@ public void testManualRollbacks() throws Exception { writeConfig = getWriteConfigBuilder(true, true, false) .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true) .archiveCommitsWith(minArchiveCommitsMetadata, minArchiveCommitsMetadata + 1).retainCommits(1) - .withMaxNumDeltaCommitsBeforeCompaction(maxDeltaCommitsBeforeCompaction).build()) + .withMaxNumDeltaCommitsBeforeCompaction(maxDeltaCommitsBeforeCompaction) + .withPopulateMetaFields(populateMateFields) + .build()) .withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(minArchiveCommitsDataset, minArchiveCommitsDataset + 1) .retainCommits(1).retainFileVersions(1).withAutoClean(false).withAsyncClean(true).build()) .build(); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java index 5242e9f33766d..cac1e1bbe81e2 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java @@ -22,6 +22,7 @@ import org.apache.hudi.common.table.view.TableFileSystemView; import org.apache.hudi.common.testutils.HoodieTestTable; import org.apache.hudi.metadata.HoodieBackedTableMetadata; +import org.apache.hudi.metadata.HoodieTableMetadataKeyGenerator; import org.apache.hudi.table.HoodieSparkTable; import org.apache.hudi.table.HoodieTable; @@ -29,6 +30,8 @@ import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; import java.io.IOException; import java.util.ArrayList; @@ -90,4 +93,20 @@ private void verifyBaseMetadataTable() throws IOException { }); } + /** + * Verify if the Metadata table is constructed with table properties including + * the right key generator class name. + */ + @ParameterizedTest + @EnumSource(HoodieTableType.class) + public void testMetadataTableKeyGenerator(final HoodieTableType tableType) throws Exception { + init(tableType); + + HoodieBackedTableMetadata tableMetadata = new HoodieBackedTableMetadata(context, + writeConfig.getMetadataConfig(), writeConfig.getBasePath(), writeConfig.getSpillableMapBasePath(), false); + + assertEquals(HoodieTableMetadataKeyGenerator.class.getCanonicalName(), + tableMetadata.getMetadataMetaClient().getTableConfig().getKeyGeneratorClassName()); + } + } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java index a91099976700e..5617058bb8af8 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java @@ -288,7 +288,9 @@ protected HoodieWriteConfig.Builder getWriteConfigBuilder(HoodieFailedWritesClea .withMetadataConfig(HoodieMetadataConfig.newBuilder() .enable(useFileListingMetadata) .enableFullScan(enableFullScan) - .enableMetrics(enableMetrics).build()) + .enableMetrics(enableMetrics) + .withPopulateMetaFields(false) + .build()) .withMetricsConfig(HoodieMetricsConfig.newBuilder().on(enableMetrics) .withExecutorMetrics(true).build()) .withMetricsGraphiteConfig(HoodieMetricsGraphiteConfig.newBuilder() diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java index 9ed98b15cb7ab..1df77b8b17f6d 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java @@ -283,13 +283,26 @@ protected Properties getPropertiesForKeyGen() { return properties; } - protected void addConfigsForPopulateMetaFields(HoodieWriteConfig.Builder configBuilder, boolean populateMetaFields) { + protected Properties getPropertiesForMetadataTable() { + Properties properties = new Properties(); + properties.put(HoodieTableConfig.POPULATE_META_FIELDS.key(), "false"); + properties.put("hoodie.datasource.write.recordkey.field", "key"); + properties.put(HoodieTableConfig.RECORDKEY_FIELDS.key(), "key"); + return properties; + } + + protected void addConfigsForPopulateMetaFields(HoodieWriteConfig.Builder configBuilder, boolean populateMetaFields, + boolean isMetadataTable) { if (!populateMetaFields) { - configBuilder.withProperties(getPropertiesForKeyGen()) + configBuilder.withProperties((isMetadataTable ? getPropertiesForMetadataTable() : getPropertiesForKeyGen())) .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.SIMPLE).build()); } } + protected void addConfigsForPopulateMetaFields(HoodieWriteConfig.Builder configBuilder, boolean populateMetaFields) { + addConfigsForPopulateMetaFields(configBuilder, populateMetaFields, false); + } + /** * Cleanups hoodie clients. */ diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java index ea99014eacb50..0aa0593693e7c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java @@ -132,6 +132,12 @@ public final class HoodieMetadataConfig extends HoodieConfig { .sinceVersion("0.10.0") .withDocumentation("Enable full scanning of log files while reading log records. If disabled, hudi does look up of only interested entries."); + public static final ConfigProperty POPULATE_META_FIELDS = ConfigProperty + .key(METADATA_PREFIX + ".populate.meta.fields") + .defaultValue("false") + .sinceVersion("0.10.0") + .withDocumentation("When enabled, populates all meta fields. When disabled, no meta fields are populated."); + private HoodieMetadataConfig() { super(); } @@ -164,6 +170,10 @@ public boolean enableFullScan() { return getBoolean(ENABLE_FULL_SCAN_LOG_FILES); } + public boolean populateMetaFields() { + return getBooleanOrDefault(HoodieMetadataConfig.POPULATE_META_FIELDS); + } + public static class Builder { private EngineType engineType = EngineType.SPARK; @@ -206,6 +216,11 @@ public Builder withMaxNumDeltaCommitsBeforeCompaction(int maxNumDeltaCommitsBefo return this; } + public Builder withPopulateMetaFields(boolean populateMetaFields) { + metadataConfig.setValue(POPULATE_META_FIELDS, Boolean.toString(populateMetaFields)); + return this; + } + public Builder archiveCommitsWith(int minToKeep, int maxToKeep) { metadataConfig.setValue(MIN_COMMITS_TO_KEEP, String.valueOf(minToKeep)); metadataConfig.setValue(MAX_COMMITS_TO_KEEP, String.valueOf(maxToKeep)); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java index dc57fd1c6ff8b..df196fe235c6f 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java @@ -25,6 +25,7 @@ import org.apache.hudi.common.config.ConfigProperty; import org.apache.hudi.common.config.HoodieConfig; import org.apache.hudi.common.model.HoodieFileFormat; +import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; @@ -364,7 +365,7 @@ public boolean populateMetaFields() { * @returns the record key field prop. */ public String getRecordKeyFieldProp() { - return getString(RECORDKEY_FIELDS); + return getStringOrDefault(RECORDKEY_FIELDS, HoodieRecord.RECORD_KEY_METADATA_FIELD); } public String getKeyGeneratorClassName() { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java index e2e76ad7d6503..d495badeca4eb 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java @@ -33,6 +33,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.SpillableMapUtils; +import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; @@ -120,28 +121,32 @@ public abstract class AbstractHoodieLogRecordReader { private int totalScannedLogFiles; // Progress private float progress = 0.0f; + // Partition name + private Option partitionName; + // Populate meta fields for the records + private boolean populateMetaFields = true; - protected AbstractHoodieLogRecordReader(FileSystem fs, String basePath, List logFilePaths, Schema readerSchema, + protected AbstractHoodieLogRecordReader(FileSystem fs, String basePath, List logFilePaths, + Schema readerSchema, String latestInstantTime, boolean readBlocksLazily, boolean reverseReader, - int bufferSize, Option instantRange, boolean withOperationField) { - this(fs, basePath, logFilePaths, readerSchema, latestInstantTime, readBlocksLazily, reverseReader, bufferSize, instantRange, withOperationField, - true); + int bufferSize, Option instantRange, + boolean withOperationField) { + this(fs, basePath, logFilePaths, readerSchema, latestInstantTime, readBlocksLazily, reverseReader, bufferSize, + instantRange, withOperationField, true, Option.empty()); } - protected AbstractHoodieLogRecordReader(FileSystem fs, String basePath, List logFilePaths, Schema readerSchema, - String latestInstantTime, boolean readBlocksLazily, boolean reverseReader, - int bufferSize, Option instantRange, boolean withOperationField, - boolean enableFullScan) { + protected AbstractHoodieLogRecordReader(FileSystem fs, String basePath, List logFilePaths, + Schema readerSchema, String latestInstantTime, boolean readBlocksLazily, + boolean reverseReader, int bufferSize, Option instantRange, + boolean withOperationField, boolean enableFullScan, + Option partitionName) { this.readerSchema = readerSchema; this.latestInstantTime = latestInstantTime; this.hoodieTableMetaClient = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(basePath).build(); // load class from the payload fully qualified class name - this.payloadClassFQN = this.hoodieTableMetaClient.getTableConfig().getPayloadClass(); - this.preCombineField = this.hoodieTableMetaClient.getTableConfig().getPreCombineField(); HoodieTableConfig tableConfig = this.hoodieTableMetaClient.getTableConfig(); - if (!tableConfig.populateMetaFields()) { - this.simpleKeyGenFields = Option.of(Pair.of(tableConfig.getRecordKeyFieldProp(), tableConfig.getPartitionFieldProp())); - } + this.payloadClassFQN = tableConfig.getPayloadClass(); + this.preCombineField = tableConfig.getPreCombineField(); this.totalLogFiles.addAndGet(logFilePaths.size()); this.logFilePaths = logFilePaths; this.reverseReader = reverseReader; @@ -151,6 +156,22 @@ protected AbstractHoodieLogRecordReader(FileSystem fs, String basePath, List> keys) { HoodieTimeline completedInstantsTimeline = commitsTimeline.filterCompletedInstants(); HoodieTimeline inflightInstantsTimeline = commitsTimeline.filterInflights(); try { - // iterate over the paths + + // Get the key field based on populate meta fields config + // and the table type + final String keyField = getKeyField(); + + // Iterate over the paths logFormatReaderWrapper = new HoodieLogFormatReader(fs, logFilePaths.stream().map(logFile -> new HoodieLogFile(new Path(logFile))).collect(Collectors.toList()), - readerSchema, readBlocksLazily, reverseReader, bufferSize, !enableFullScan); + readerSchema, readBlocksLazily, reverseReader, bufferSize, !enableFullScan, keyField); Set scannedLogFiles = new HashSet<>(); while (logFormatReaderWrapper.hasNext()) { HoodieLogFile logFile = logFormatReaderWrapper.getLogFile(); @@ -339,15 +365,34 @@ private void processDataBlock(HoodieDataBlock dataBlock, Option> ke } totalLogRecords.addAndGet(recs.size()); for (IndexedRecord rec : recs) { - processNextRecord(createHoodieRecord(rec)); + processNextRecord(createHoodieRecord(rec, this.hoodieTableMetaClient.getTableConfig(), this.payloadClassFQN, + this.preCombineField, this.withOperationField, this.simpleKeyGenFields, this.partitionName)); } } - protected HoodieRecord createHoodieRecord(IndexedRecord rec) { - if (!simpleKeyGenFields.isPresent()) { - return SpillableMapUtils.convertToHoodieRecordPayload((GenericRecord) rec, this.payloadClassFQN, this.preCombineField, this.withOperationField); + /** + * Create @{@link HoodieRecord} from the @{@link IndexedRecord}. + * + * @param rec - IndexedRecord to create the HoodieRecord from + * @param hoodieTableConfig - Table config + * @param payloadClassFQN - Payload class fully qualified name + * @param preCombineField - PreCombine field + * @param withOperationField - Whether operation field is enabled + * @param simpleKeyGenFields - Key generator fields when populate meta fields is tuened off + * @param partitionName - Partition name + * @return HoodieRecord created from the IndexedRecord + */ + protected HoodieRecord createHoodieRecord(final IndexedRecord rec, final HoodieTableConfig hoodieTableConfig, + final String payloadClassFQN, final String preCombineField, + final boolean withOperationField, + final Option> simpleKeyGenFields, + final Option partitionName) { + if (this.populateMetaFields) { + return SpillableMapUtils.convertToHoodieRecordPayload((GenericRecord) rec, payloadClassFQN, + preCombineField, withOperationField); } else { - return SpillableMapUtils.convertToHoodieRecordPayload((GenericRecord) rec, this.payloadClassFQN, this.preCombineField, this.simpleKeyGenFields.get(), this.withOperationField); + return SpillableMapUtils.convertToHoodieRecordPayload((GenericRecord) rec, payloadClassFQN, + preCombineField, simpleKeyGenFields.get(), withOperationField, partitionName); } } @@ -418,6 +463,10 @@ protected String getPayloadClassFQN() { return payloadClassFQN; } + protected Option getPartitionName() { + return partitionName; + } + public long getTotalRollbacks() { return totalRollbacks.get(); } @@ -451,6 +500,10 @@ public abstract static class Builder { public abstract Builder withBufferSize(int bufferSize); + public Builder withPartition(String partitionName) { + throw new UnsupportedOperationException(); + } + public Builder withInstantRange(Option instantRange) { throw new UnsupportedOperationException(); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java index d8d00acb1ae02..a786e8305bc27 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java @@ -46,7 +46,8 @@ public static HoodieFileSliceReader getFileSliceReader( Iterator baseIterator = baseFileReader.get().getRecordIterator(schema); while (baseIterator.hasNext()) { GenericRecord record = (GenericRecord) baseIterator.next(); - HoodieRecord hoodieRecord = transform(record, scanner, payloadClass, preCombineField, simpleKeyGenFieldsOpt); + HoodieRecord hoodieRecord = transform( + record, scanner, payloadClass, preCombineField, simpleKeyGenFieldsOpt); scanner.processNextRecord(hoodieRecord); } return new HoodieFileSliceReader(scanner.iterator()); @@ -68,8 +69,10 @@ private static HoodieRecord transform( GenericRecord record, HoodieMergedLogRecordScanner scanner, String payloadClass, String preCombineField, Option> simpleKeyGenFieldsOpt) { return simpleKeyGenFieldsOpt.isPresent() - ? SpillableMapUtils.convertToHoodieRecordPayload(record, payloadClass, preCombineField, simpleKeyGenFieldsOpt.get(), scanner.isWithOperationField()) - : SpillableMapUtils.convertToHoodieRecordPayload(record, payloadClass, preCombineField, scanner.isWithOperationField()); + ? SpillableMapUtils.convertToHoodieRecordPayload(record, + payloadClass, preCombineField, simpleKeyGenFieldsOpt.get(), scanner.isWithOperationField(), Option.empty()) + : SpillableMapUtils.convertToHoodieRecordPayload(record, + payloadClass, preCombineField, scanner.isWithOperationField(), scanner.getPartitionName()); } private HoodieFileSliceReader(Iterator> recordsItr) { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java index cdf3065587d13..9a3913fa118d0 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java @@ -22,6 +22,7 @@ import org.apache.hudi.common.fs.SchemeAwareFSDataInputStream; import org.apache.hudi.common.fs.TimedFSDataInputStream; import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock; import org.apache.hudi.common.table.log.block.HoodieCommandBlock; import org.apache.hudi.common.table.log.block.HoodieCorruptBlock; @@ -66,6 +67,7 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader { private final HoodieLogFile logFile; private final byte[] magicBuffer = new byte[6]; private final Schema readerSchema; + private final String keyField; private boolean readBlockLazily; private long reverseLogFilePosition; private long lastReverseLogFilePosition; @@ -76,11 +78,13 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader { public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, int bufferSize, boolean readBlockLazily, boolean reverseReader) throws IOException { - this(fs, logFile, readerSchema, bufferSize, readBlockLazily, reverseReader, false); + this(fs, logFile, readerSchema, bufferSize, readBlockLazily, reverseReader, false, + HoodieRecord.RECORD_KEY_METADATA_FIELD); } public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, int bufferSize, - boolean readBlockLazily, boolean reverseReader, boolean enableInlineReading) throws IOException { + boolean readBlockLazily, boolean reverseReader, boolean enableInlineReading, + String keyField) throws IOException { FSDataInputStream fsDataInputStream = fs.open(logFile.getPath(), bufferSize); this.logFile = logFile; this.inputStream = getFSDataInputStream(fsDataInputStream, fs, bufferSize); @@ -88,6 +92,7 @@ public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSc this.readBlockLazily = readBlockLazily; this.reverseReader = reverseReader; this.enableInlineReading = enableInlineReading; + this.keyField = keyField; if (this.reverseReader) { this.reverseLogFilePosition = this.lastReverseLogFilePosition = fs.getFileStatus(logFile.getPath()).getLen(); } @@ -251,11 +256,12 @@ private HoodieLogBlock readBlock() throws IOException { return HoodieAvroDataBlock.getBlock(content, readerSchema); } else { return new HoodieAvroDataBlock(logFile, inputStream, Option.ofNullable(content), readBlockLazily, - contentPosition, contentLength, blockEndPos, readerSchema, header, footer); + contentPosition, contentLength, blockEndPos, readerSchema, header, footer, keyField); } case HFILE_DATA_BLOCK: return new HoodieHFileDataBlock(logFile, inputStream, Option.ofNullable(content), readBlockLazily, - contentPosition, contentLength, blockEndPos, readerSchema, header, footer, enableInlineReading); + contentPosition, contentLength, blockEndPos, readerSchema, + header, footer, enableInlineReading, keyField); case DELETE_BLOCK: return HoodieDeleteBlock.getBlock(logFile, inputStream, Option.ofNullable(content), readBlockLazily, contentPosition, contentLength, blockEndPos, header, footer); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java index febdbf8068292..2db5437697094 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java @@ -44,18 +44,15 @@ public class HoodieLogFormatReader implements HoodieLogFormat.Reader { private final Schema readerSchema; private final boolean readBlocksLazily; private final boolean reverseLogReader; - private final boolean enableInLineReading; + private final String recordKeyField; + private final boolean enableInlineReading; private int bufferSize; private static final Logger LOG = LogManager.getLogger(HoodieLogFormatReader.class); HoodieLogFormatReader(FileSystem fs, List logFiles, Schema readerSchema, boolean readBlocksLazily, - boolean reverseLogReader, int bufferSize) throws IOException { - this(fs, logFiles, readerSchema, readBlocksLazily, reverseLogReader, bufferSize, false); - } - - HoodieLogFormatReader(FileSystem fs, List logFiles, Schema readerSchema, boolean readBlocksLazily, - boolean reverseLogReader, int bufferSize, boolean enableInlineReading) throws IOException { + boolean reverseLogReader, int bufferSize, boolean enableInlineReading, + String recordKeyField) throws IOException { this.logFiles = logFiles; this.fs = fs; this.readerSchema = readerSchema; @@ -63,10 +60,12 @@ public class HoodieLogFormatReader implements HoodieLogFormat.Reader { this.reverseLogReader = reverseLogReader; this.bufferSize = bufferSize; this.prevReadersInOpenState = new ArrayList<>(); - this.enableInLineReading = enableInlineReading; + this.recordKeyField = recordKeyField; + this.enableInlineReading = enableInlineReading; if (logFiles.size() > 0) { HoodieLogFile nextLogFile = logFiles.remove(0); - this.currentReader = new HoodieLogFileReader(fs, nextLogFile, readerSchema, bufferSize, readBlocksLazily, false, enableInlineReading); + this.currentReader = new HoodieLogFileReader(fs, nextLogFile, readerSchema, bufferSize, readBlocksLazily, false, + enableInlineReading, recordKeyField); } } @@ -107,7 +106,7 @@ public boolean hasNext() { } this.currentReader = new HoodieLogFileReader(fs, nextLogFile, readerSchema, bufferSize, readBlocksLazily, false, - this.enableInLineReading); + enableInlineReading, recordKeyField); } catch (IOException io) { throw new HoodieIOException("unable to initialize read with log file ", io); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java index a8d97ac1b5f18..2e47e695d3144 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java @@ -76,10 +76,13 @@ protected HoodieMergedLogRecordScanner(FileSystem fs, String basePath, List instantRange, boolean autoScan, - ExternalSpillableMap.DiskMapType diskMapType, boolean isBitCaskDiskMapCompressionEnabled, - boolean withOperationField, boolean enableFullScan) { - super(fs, basePath, logFilePaths, readerSchema, latestInstantTime, readBlocksLazily, reverseReader, bufferSize, instantRange, withOperationField, - enableFullScan); + ExternalSpillableMap.DiskMapType diskMapType, + boolean isBitCaskDiskMapCompressionEnabled, + boolean withOperationField, boolean enableFullScan, + Option partitionName) { + super(fs, basePath, logFilePaths, readerSchema, latestInstantTime, readBlocksLazily, reverseReader, bufferSize, + instantRange, withOperationField, + enableFullScan, partitionName); try { // Store merged records for all versions for this log file, set the in-memory footprint to maxInMemoryMapSize this.records = new ExternalSpillableMap<>(maxMemorySizeInBytes, spillableMapBasePath, new DefaultSizeEstimator(), @@ -187,6 +190,7 @@ public static class Builder extends AbstractHoodieLogRecordReader.Builder { private boolean autoScan = true; // operation field default false private boolean withOperationField = false; + protected String partitionName; @Override public Builder withFileSystem(FileSystem fs) { @@ -272,12 +276,19 @@ public Builder withOperationField(boolean withOperationField) { return this; } + @Override + public Builder withPartition(String partitionName) { + this.partitionName = partitionName; + return this; + } + @Override public HoodieMergedLogRecordScanner build() { return new HoodieMergedLogRecordScanner(fs, basePath, logFilePaths, readerSchema, latestInstantTime, maxMemorySizeInBytes, readBlocksLazily, reverseReader, bufferSize, spillableMapBasePath, instantRange, autoScan, - diskMapType, isBitCaskDiskMapCompressionEnabled, withOperationField, true); + diskMapType, isBitCaskDiskMapCompressionEnabled, withOperationField, true, + Option.ofNullable(partitionName)); } } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java index 31fc352acad09..1d3f5f3b01c56 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java @@ -21,6 +21,7 @@ import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.fs.SizeAwareDataInputStream; import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.HoodieIOException; @@ -58,22 +59,27 @@ public class HoodieAvroDataBlock extends HoodieDataBlock { private ThreadLocal decoderCache = new ThreadLocal<>(); public HoodieAvroDataBlock(@Nonnull Map logBlockHeader, - @Nonnull Map logBlockFooter, - @Nonnull Option blockContentLocation, @Nonnull Option content, - FSDataInputStream inputStream, boolean readBlockLazily) { + @Nonnull Map logBlockFooter, + @Nonnull Option blockContentLocation, @Nonnull Option content, + FSDataInputStream inputStream, boolean readBlockLazily) { super(logBlockHeader, logBlockFooter, blockContentLocation, content, inputStream, readBlockLazily); } public HoodieAvroDataBlock(HoodieLogFile logFile, FSDataInputStream inputStream, Option content, - boolean readBlockLazily, long position, long blockSize, long blockEndpos, Schema readerSchema, - Map header, Map footer) { + boolean readBlockLazily, long position, long blockSize, long blockEndpos, Schema readerSchema, + Map header, Map footer, String keyField) { super(content, inputStream, readBlockLazily, - Option.of(new HoodieLogBlockContentLocation(logFile, position, blockSize, blockEndpos)), readerSchema, header, - footer); + Option.of(new HoodieLogBlockContentLocation(logFile, position, blockSize, blockEndpos)), readerSchema, header, + footer, keyField); + } + + public HoodieAvroDataBlock(@Nonnull List records, @Nonnull Map header, String keyField) { + super(records, header, new HashMap<>(), keyField); } public HoodieAvroDataBlock(@Nonnull List records, @Nonnull Map header) { - super(records, header, new HashMap<>()); + super(records, header, new HashMap<>(), HoodieRecord.RECORD_KEY_METADATA_FIELD); } @Override diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java index 2e4338ef785d0..66c9571487dff 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java @@ -18,6 +18,7 @@ package org.apache.hudi.common.table.log.block; +import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; @@ -29,7 +30,6 @@ import javax.annotation.Nonnull; import java.io.IOException; -import java.util.HashMap; import java.util.List; import java.util.Map; @@ -46,39 +46,62 @@ public abstract class HoodieDataBlock extends HoodieLogBlock { protected List records; protected Schema schema; + protected String keyField; public HoodieDataBlock(@Nonnull Map logBlockHeader, @Nonnull Map logBlockFooter, @Nonnull Option blockContentLocation, @Nonnull Option content, FSDataInputStream inputStream, boolean readBlockLazily) { super(logBlockHeader, logBlockFooter, blockContentLocation, content, inputStream, readBlockLazily); + this.keyField = HoodieRecord.RECORD_KEY_METADATA_FIELD; } public HoodieDataBlock(@Nonnull List records, @Nonnull Map header, - @Nonnull Map footer) { - super(header, footer, Option.empty(), Option.empty(), null, false); + @Nonnull Map footer, String keyField) { + this(header, footer, Option.empty(), Option.empty(), null, false); this.records = records; this.schema = new Schema.Parser().parse(super.getLogBlockHeader().get(HeaderMetadataType.SCHEMA)); - } - - public HoodieDataBlock(@Nonnull List records, @Nonnull Map header) { - this(records, header, new HashMap<>()); + this.keyField = keyField; } protected HoodieDataBlock(Option content, @Nonnull FSDataInputStream inputStream, boolean readBlockLazily, - Option blockContentLocation, Schema readerSchema, - @Nonnull Map headers, @Nonnull Map footer) { - super(headers, footer, blockContentLocation, content, inputStream, readBlockLazily); + Option blockContentLocation, Schema readerSchema, + @Nonnull Map headers, @Nonnull Map footer, String keyField) { + this(headers, footer, blockContentLocation, content, inputStream, readBlockLazily); this.schema = readerSchema; + this.keyField = keyField; } + /** + * Util method to get a data block for the requested type. + * + * @param logDataBlockFormat - Data block type + * @param recordList - List of records that goes in the data block + * @param header - data block header + * @return Data block of the requested type. + */ public static HoodieLogBlock getBlock(HoodieLogBlockType logDataBlockFormat, List recordList, Map header) { + return getBlock(logDataBlockFormat, recordList, header, HoodieRecord.RECORD_KEY_METADATA_FIELD); + } + + /** + * Util method to get a data block for the requested type. + * + * @param logDataBlockFormat - Data block type + * @param recordList - List of records that goes in the data block + * @param header - data block header + * @param keyField - FieldId to get the key from the records + * @return Data block of the requested type. + */ + public static HoodieLogBlock getBlock(HoodieLogBlockType logDataBlockFormat, List recordList, + Map header, String keyField) { switch (logDataBlockFormat) { case AVRO_DATA_BLOCK: - return new HoodieAvroDataBlock(recordList, header); + return new HoodieAvroDataBlock(recordList, header, keyField); case HFILE_DATA_BLOCK: - return new HoodieHFileDataBlock(recordList, header); + return new HoodieHFileDataBlock(recordList, header, keyField); default: throw new HoodieException("Data block format " + logDataBlockFormat + " not implemented"); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java index a1e0c129803f7..35ea41dd3f637 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java @@ -68,24 +68,23 @@ public class HoodieHFileDataBlock extends HoodieDataBlock { private static int blockSize = 1 * 1024 * 1024; private boolean enableInlineReading = false; - public HoodieHFileDataBlock(@Nonnull Map logBlockHeader, - @Nonnull Map logBlockFooter, - @Nonnull Option blockContentLocation, @Nonnull Option content, - FSDataInputStream inputStream, boolean readBlockLazily) { - super(logBlockHeader, logBlockFooter, blockContentLocation, content, inputStream, readBlockLazily); - } - public HoodieHFileDataBlock(HoodieLogFile logFile, FSDataInputStream inputStream, Option content, - boolean readBlockLazily, long position, long blockSize, long blockEndpos, Schema readerSchema, - Map header, Map footer, boolean enableInlineReading) { + boolean readBlockLazily, long position, long blockSize, long blockEndpos, + Schema readerSchema, Map header, + Map footer, boolean enableInlineReading, String keyField) { super(content, inputStream, readBlockLazily, - Option.of(new HoodieLogBlockContentLocation(logFile, position, blockSize, blockEndpos)), readerSchema, header, - footer); + Option.of(new HoodieLogBlockContentLocation(logFile, position, blockSize, blockEndpos)), + readerSchema, header, footer, keyField); this.enableInlineReading = enableInlineReading; } + public HoodieHFileDataBlock(@Nonnull List records, @Nonnull Map header, + String keyField) { + super(records, header, new HashMap<>(), keyField); + } + public HoodieHFileDataBlock(@Nonnull List records, @Nonnull Map header) { - super(records, header, new HashMap<>()); + this(records, header, HoodieRecord.RECORD_KEY_METADATA_FIELD); } @Override @@ -111,7 +110,7 @@ protected byte[] serializeRecords() throws IOException { boolean useIntegerKey = false; int key = 0; int keySize = 0; - Field keyField = records.get(0).getSchema().getField(HoodieRecord.RECORD_KEY_METADATA_FIELD); + Field keyField = records.get(0).getSchema().getField(this.keyField); if (keyField == null) { // Missing key metadata field so we should use an integer sequence key useIntegerKey = true; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/SpillableMapUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/SpillableMapUtils.java index 5dd0c5a8bc9c2..934b5b5f616c6 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/SpillableMapUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/SpillableMapUtils.java @@ -115,22 +115,38 @@ public static long computePayloadSize(R value, SizeEstimator valueSizeEst * Utility method to convert bytes to HoodieRecord using schema and payload class. */ public static R convertToHoodieRecordPayload(GenericRecord rec, String payloadClazz, String preCombineField, boolean withOperationField) { - return convertToHoodieRecordPayload(rec, payloadClazz, preCombineField, Pair.of(HoodieRecord.RECORD_KEY_METADATA_FIELD, HoodieRecord.PARTITION_PATH_METADATA_FIELD), withOperationField); + return convertToHoodieRecordPayload(rec, payloadClazz, preCombineField, + Pair.of(HoodieRecord.RECORD_KEY_METADATA_FIELD, HoodieRecord.PARTITION_PATH_METADATA_FIELD), + withOperationField, Option.empty()); + } + + public static R convertToHoodieRecordPayload(GenericRecord record, String payloadClazz, + String preCombineField, + boolean withOperationField, + Option partitionName) { + return convertToHoodieRecordPayload(record, payloadClazz, preCombineField, + Pair.of(HoodieRecord.RECORD_KEY_METADATA_FIELD, HoodieRecord.PARTITION_PATH_METADATA_FIELD), + withOperationField, partitionName); } /** * Utility method to convert bytes to HoodieRecord using schema and payload class. */ - public static R convertToHoodieRecordPayload(GenericRecord rec, String payloadClazz, - String preCombineField, Pair recordKeyPartitionPathPair, - boolean withOperationField) { - String recKey = rec.get(recordKeyPartitionPathPair.getLeft()).toString(); - String partitionPath = rec.get(recordKeyPartitionPathPair.getRight()).toString(); - Object preCombineVal = getPreCombineVal(rec, preCombineField); + public static R convertToHoodieRecordPayload(GenericRecord record, String payloadClazz, + String preCombineField, + Pair recordKeyPartitionPathFieldPair, + boolean withOperationField, + Option partitionName) { + final String recKey = record.get(recordKeyPartitionPathFieldPair.getKey()).toString(); + final String partitionPath = (partitionName.isPresent() ? partitionName.get() : + record.get(recordKeyPartitionPathFieldPair.getRight()).toString()); + + Object preCombineVal = getPreCombineVal(record, preCombineField); HoodieOperation operation = withOperationField - ? HoodieOperation.fromName(getNullableValAsString(rec, HoodieRecord.OPERATION_METADATA_FIELD)) : null; + ? HoodieOperation.fromName(getNullableValAsString(record, HoodieRecord.OPERATION_METADATA_FIELD)) : null; HoodieRecord hoodieRecord = new HoodieRecord<>(new HoodieKey(recKey, partitionPath), - ReflectionUtils.loadPayload(payloadClazz, new Object[] {rec, preCombineVal}, GenericRecord.class, Comparable.class), operation); + ReflectionUtils.loadPayload(payloadClazz, new Object[]{record, preCombineVal}, GenericRecord.class, + Comparable.class), operation); return (R) hoodieRecord; } diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java index 6cc5533f1c515..766bc68ec1151 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java @@ -133,8 +133,8 @@ protected List>>> getRec // local map to assist in merging with base file records Map>> logRecords = readLogRecords(logRecordScanner, keys, timings); - List>>> result = readFromBaseAndMergeWithLogRecords(baseFileReader, - keys, logRecords, timings); + List>>> result = readFromBaseAndMergeWithLogRecords( + baseFileReader, keys, logRecords, timings, partitionName); LOG.info(String.format("Metadata read for %s keys took [baseFileRead, logMerge] %s ms", keys.size(), timings)); return result; } catch (IOException ioe) { @@ -175,8 +175,8 @@ private Map>> readLogRecords( } private List>>> readFromBaseAndMergeWithLogRecords(HoodieFileReader baseFileReader, - List keys, Map>> logRecords, - List timings) throws IOException { + List keys, Map>> logRecords, List timings, String partitionName) throws IOException { List>>> result = new ArrayList<>(); // merge with base records HoodieTimer timer = new HoodieTimer().startTimer(); @@ -189,10 +189,7 @@ private List>>> readFrom readTimer.startTimer(); Option baseRecord = baseFileReader.getRecordByKey(key); if (baseRecord.isPresent()) { - hoodieRecord = metadataTableConfig.populateMetaFields() - ? SpillableMapUtils.convertToHoodieRecordPayload(baseRecord.get(), metadataTableConfig.getPayloadClass(), metadataTableConfig.getPreCombineField(), false) - : SpillableMapUtils.convertToHoodieRecordPayload(baseRecord.get(), metadataTableConfig.getPayloadClass(), metadataTableConfig.getPreCombineField(), - Pair.of(metadataTableConfig.getRecordKeyFieldProp(), metadataTableConfig.getPartitionFieldProp()), false); + hoodieRecord = getRecord(baseRecord, partitionName); metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.BASEFILE_READ_STR, readTimer.endTimer())); // merge base file record w/ log record if present if (logRecords.containsKey(key) && logRecords.get(key).isPresent()) { @@ -218,6 +215,18 @@ private List>>> readFrom return result; } + private HoodieRecord getRecord(Option baseRecord, String partitionName) { + ValidationUtils.checkState(baseRecord.isPresent()); + if (metadataTableConfig.populateMetaFields()) { + return SpillableMapUtils.convertToHoodieRecordPayload(baseRecord.get(), + metadataTableConfig.getPayloadClass(), metadataTableConfig.getPreCombineField(), false); + } + return SpillableMapUtils.convertToHoodieRecordPayload(baseRecord.get(), + metadataTableConfig.getPayloadClass(), metadataTableConfig.getPreCombineField(), + Pair.of(metadataTableConfig.getRecordKeyFieldProp(), metadataTableConfig.getPartitionFieldProp()), + false, Option.of(partitionName)); + } + /** * Returns a new pair of readers to the base and log files. */ @@ -241,7 +250,8 @@ private Pair openReadersI baseFileOpenMs = baseFileReaderOpenTimePair.getValue(); // Open the log record scanner using the log files from the latest file slice - Pair logRecordScannerOpenTimePair = getLogRecordScanner(slice); + Pair logRecordScannerOpenTimePair = getLogRecordScanner(slice, + partitionName); logRecordScanner = logRecordScannerOpenTimePair.getKey(); logScannerOpenMs = logRecordScannerOpenTimePair.getValue(); @@ -293,7 +303,7 @@ private Set getValidInstantTimestamps() { return validInstantTimestamps; } - private Pair getLogRecordScanner(FileSlice slice) { + private Pair getLogRecordScanner(FileSlice slice, String partitionName) { HoodieTimer timer = new HoodieTimer().startTimer(); List logFilePaths = slice.getLogFiles() .sorted(HoodieLogFile.getLogFileComparator()) @@ -323,6 +333,7 @@ private Pair getLogRecordScanner(File .withBitCaskDiskMapCompressionEnabled(commonConfig.isBitCaskDiskMapCompressionEnabled()) .withLogBlockTimestamps(validInstantTimestamps) .enableFullScan(metadataConfig.enableFullScan()) + .withPartition(partitionName) .build(); Long logScannerOpenMs = timer.endTimer(); diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordReader.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordReader.java index 131ca3b91762f..2c9ca39fdd50a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordReader.java @@ -25,7 +25,11 @@ import java.util.Set; import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.fs.FileSystem; +import org.apache.hudi.common.table.HoodieTableConfig; +import org.apache.hudi.common.util.SpillableMapUtils; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -49,13 +53,17 @@ public class HoodieMetadataMergedLogRecordReader extends HoodieMergedLogRecordSc // Set of all record keys that are to be read in memory private Set mergeKeyFilter; - private HoodieMetadataMergedLogRecordReader(FileSystem fs, String basePath, List logFilePaths, - Schema readerSchema, String latestInstantTime, Long maxMemorySizeInBytes, int bufferSize, + private HoodieMetadataMergedLogRecordReader(FileSystem fs, String basePath, String partitionName, + List logFilePaths, + Schema readerSchema, String latestInstantTime, + Long maxMemorySizeInBytes, int bufferSize, String spillableMapBasePath, Set mergeKeyFilter, - ExternalSpillableMap.DiskMapType diskMapType, boolean isBitCaskDiskMapCompressionEnabled, + ExternalSpillableMap.DiskMapType diskMapType, + boolean isBitCaskDiskMapCompressionEnabled, Option instantRange, boolean enableFullScan) { super(fs, basePath, logFilePaths, readerSchema, latestInstantTime, maxMemorySizeInBytes, false, false, bufferSize, - spillableMapBasePath, instantRange, false, diskMapType, isBitCaskDiskMapCompressionEnabled, false, enableFullScan); + spillableMapBasePath, instantRange, false, diskMapType, isBitCaskDiskMapCompressionEnabled, false, + enableFullScan, Option.of(partitionName)); this.mergeKeyFilter = mergeKeyFilter; if (enableFullScan) { performScan(); @@ -76,6 +84,23 @@ protected void processNextDeletedKey(HoodieKey hoodieKey) { } } + @Override + protected HoodieRecord createHoodieRecord(final IndexedRecord rec, final HoodieTableConfig hoodieTableConfig, + final String payloadClassFQN, final String preCombineField, + final boolean withOperationField, + final Option> simpleKeyGenFields, + final Option partitionName) { + if (hoodieTableConfig.populateMetaFields()) { + return super.createHoodieRecord(rec, hoodieTableConfig, payloadClassFQN, preCombineField, withOperationField, + simpleKeyGenFields, partitionName); + } + + // When meta fields are not available, create the record using the + // preset key field and the known partition name + return SpillableMapUtils.convertToHoodieRecordPayload((GenericRecord) rec, payloadClassFQN, + preCombineField, simpleKeyGenFields.get(), withOperationField, partitionName); + } + /** * Returns the builder for {@code HoodieMetadataMergedLogRecordScanner}. */ @@ -107,6 +132,11 @@ public List>>> getRecord return metadataRecords; } + @Override + protected String getKeyField() { + return HoodieMetadataPayload.SCHEMA_FIELD_ID_KEY; + } + /** * Builder used to build {@code HoodieMetadataMergedLogRecordScanner}. */ @@ -161,6 +191,12 @@ public Builder withBufferSize(int bufferSize) { return this; } + @Override + public Builder withPartition(String partitionName) { + this.partitionName = partitionName; + return this; + } + @Override public Builder withMaxMemorySizeInBytes(Long maxMemorySizeInBytes) { this.maxMemorySizeInBytes = maxMemorySizeInBytes; @@ -202,7 +238,7 @@ public Builder enableFullScan(boolean enableFullScan) { @Override public HoodieMetadataMergedLogRecordReader build() { - return new HoodieMetadataMergedLogRecordReader(fs, basePath, logFilePaths, readerSchema, + return new HoodieMetadataMergedLogRecordReader(fs, basePath, partitionName, logFilePaths, readerSchema, latestInstantTime, maxMemorySizeInBytes, bufferSize, spillableMapBasePath, mergeKeyFilter, diskMapType, isBitCaskDiskMapCompressionEnabled, instantRange, enableFullScan); } diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java index 16eef8a5fd85a..0b0d144a6e7e9 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java @@ -61,6 +61,12 @@ * HoodieMetadataRecord for ease of operations. */ public class HoodieMetadataPayload implements HoodieRecordPayload { + + // HoodieMetadata schema field ids + public static final String SCHEMA_FIELD_ID_KEY = "key"; + public static final String SCHEMA_FIELD_ID_TYPE = "type"; + public static final String SCHEMA_FIELD_ID_METADATA = "filesystemMetadata"; + // Type of the record // This can be an enum in the schema but Avro 1.8 has a bug - https://issues.apache.org/jira/browse/AVRO-1810 private static final int PARTITION_LIST = 1; @@ -78,13 +84,13 @@ public HoodieMetadataPayload(Option record) { if (record.isPresent()) { // This can be simplified using SpecificData.deepcopy once this bug is fixed // https://issues.apache.org/jira/browse/AVRO-1811 - key = record.get().get("key").toString(); - type = (int) record.get().get("type"); - if (record.get().get("filesystemMetadata") != null) { + key = record.get().get(SCHEMA_FIELD_ID_KEY).toString(); + type = (int) record.get().get(SCHEMA_FIELD_ID_TYPE); + if (record.get().get(SCHEMA_FIELD_ID_METADATA) != null) { filesystemMetadata = (Map) record.get().get("filesystemMetadata"); filesystemMetadata.keySet().forEach(k -> { GenericRecord v = filesystemMetadata.get(k); - filesystemMetadata.put(k.toString(), new HoodieMetadataFileInfo((Long)v.get("size"), (Boolean)v.get("isDeleted"))); + filesystemMetadata.put(k.toString(), new HoodieMetadataFileInfo((Long) v.get("size"), (Boolean) v.get("isDeleted"))); }); } } @@ -231,8 +237,8 @@ private Map combineFilesystemMetadata(HoodieMeta @Override public String toString() { final StringBuilder sb = new StringBuilder("HoodieMetadataPayload {"); - sb.append("key=").append(key).append(", "); - sb.append("type=").append(type).append(", "); + sb.append(SCHEMA_FIELD_ID_KEY + "=").append(key).append(", "); + sb.append(SCHEMA_FIELD_ID_TYPE + "=").append(type).append(", "); sb.append("creations=").append(Arrays.toString(getFilenames().toArray())).append(", "); sb.append("deletions=").append(Arrays.toString(getDeletions().toArray())).append(", "); sb.append('}'); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java index 3368c17c7bd10..5be3b9674573f 100755 --- a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java @@ -1672,9 +1672,9 @@ private HoodieDataBlock getDataBlock(HoodieLogBlockType dataBlockType, List header) { switch (dataBlockType) { case AVRO_DATA_BLOCK: - return new HoodieAvroDataBlock(records, header); + return new HoodieAvroDataBlock(records, header, HoodieRecord.RECORD_KEY_METADATA_FIELD); case HFILE_DATA_BLOCK: - return new HoodieHFileDataBlock(records, header); + return new HoodieHFileDataBlock(records, header, HoodieRecord.RECORD_KEY_METADATA_FIELD); default: throw new RuntimeException("Unknown data block type " + dataBlockType); } diff --git a/hudi-utilities/src/test/resources/fixtures/testUpsertsContinuousModeWithMultipleWriters.COPY_ON_WRITE.zip b/hudi-utilities/src/test/resources/fixtures/testUpsertsContinuousModeWithMultipleWriters.COPY_ON_WRITE.zip index 48bf278bd6c6f..42c0301b310fe 100644 Binary files a/hudi-utilities/src/test/resources/fixtures/testUpsertsContinuousModeWithMultipleWriters.COPY_ON_WRITE.zip and b/hudi-utilities/src/test/resources/fixtures/testUpsertsContinuousModeWithMultipleWriters.COPY_ON_WRITE.zip differ diff --git a/hudi-utilities/src/test/resources/fixtures/testUpsertsContinuousModeWithMultipleWriters.MERGE_ON_READ.zip b/hudi-utilities/src/test/resources/fixtures/testUpsertsContinuousModeWithMultipleWriters.MERGE_ON_READ.zip index 657f83c2d0a59..8e57b1d45f38a 100644 Binary files a/hudi-utilities/src/test/resources/fixtures/testUpsertsContinuousModeWithMultipleWriters.MERGE_ON_READ.zip and b/hudi-utilities/src/test/resources/fixtures/testUpsertsContinuousModeWithMultipleWriters.MERGE_ON_READ.zip differ