Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,12 @@ protected void appendDataAndDeleteBlocks(Map<HeaderMetadataType, String> header)
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, writeSchemaWithMetaFields.toString());
List<HoodieLogBlock> blocks = new ArrayList<>(2);
if (recordList.size() > 0) {
blocks.add(HoodieDataBlock.getBlock(hoodieTable.getLogDataBlockFormat(), recordList, header));
if (config.populateMetaFields()) {
blocks.add(HoodieDataBlock.getBlock(hoodieTable.getLogDataBlockFormat(), recordList, header));
} else {
final String keyField = hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp();
blocks.add(HoodieDataBlock.getBlock(hoodieTable.getLogDataBlockFormat(), recordList, header, keyField));
}
}
if (keysToDelete.size() > 0) {
blocks.add(new HoodieDeleteBlock(keysToDelete.toArray(new HoodieKey[keysToDelete.size()]), header));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteConcurrencyMode;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
Expand Down Expand Up @@ -76,6 +77,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;

import static org.apache.hudi.common.table.HoodieTableConfig.ARCHIVELOG_FOLDER;
Expand All @@ -91,6 +93,10 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta

private static final Logger LOG = LogManager.getLogger(HoodieBackedTableMetadataWriter.class);

// Virtual keys support for metadata table. This Field is
// from the metadata payload schema.
private static final String RECORD_KEY_FIELD = HoodieMetadataPayload.SCHEMA_FIELD_ID_KEY;

protected HoodieWriteConfig metadataWriteConfig;
protected HoodieWriteConfig dataWriteConfig;
protected String tableName;
Expand Down Expand Up @@ -202,7 +208,15 @@ private HoodieWriteConfig createMetadataWriteConfig(HoodieWriteConfig writeConfi
.withDeleteParallelism(parallelism)
.withRollbackParallelism(parallelism)
.withFinalizeWriteParallelism(parallelism)
.withAllowMultiWriteOnSameInstant(true);
.withAllowMultiWriteOnSameInstant(true)
.withKeyGenerator(HoodieTableMetadataKeyGenerator.class.getCanonicalName())
.withPopulateMetaFields(dataWriteConfig.getMetadataConfig().populateMetaFields());

// RecordKey properties are needed for the metadata table records
final Properties properties = new Properties();
properties.put(HoodieTableConfig.RECORDKEY_FIELDS.key(), RECORD_KEY_FIELD);
properties.put("hoodie.datasource.write.recordkey.field", RECORD_KEY_FIELD);
builder.withProperties(properties);

if (writeConfig.isMetricsOn()) {
builder.withMetricsConfig(HoodieMetricsConfig.newBuilder()
Expand Down Expand Up @@ -395,9 +409,12 @@ private boolean bootstrapFromFilesystem(HoodieEngineContext engineContext, Hoodi
.setTableType(HoodieTableType.MERGE_ON_READ)
.setTableName(tableName)
.setArchiveLogFolder(ARCHIVELOG_FOLDER.defaultValue())
.setPayloadClassName(HoodieMetadataPayload.class.getName())
.setBaseFileFormat(HoodieFileFormat.HFILE.toString())
.initTable(hadoopConf.get(), metadataWriteConfig.getBasePath());
.setPayloadClassName(HoodieMetadataPayload.class.getName())
.setBaseFileFormat(HoodieFileFormat.HFILE.toString())
.setRecordKeyFields(RECORD_KEY_FIELD)
.setPopulateMetaFields(dataWriteConfig.getMetadataConfig().populateMetaFields())
.setKeyGeneratorClassProp(HoodieTableMetadataKeyGenerator.class.getCanonicalName())
.initTable(hadoopConf.get(), metadataWriteConfig.getBasePath());

initTableMetadata();
initializeFileGroups(dataMetaClient, MetadataPartitionType.FILES, createInstantTime, 1);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.hudi.metadata;

import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.keygen.KeyGenUtils;

/**
* Custom key generator for the Hoodie table metadata. The metadata table record payload
* has an internal schema with a known key field HoodieMetadataPayload.SCHEMA_FIELD_ID_KEY.
* With or without the virtual keys, getting the key from the metadata table record is always
* via the above field and there is no real need for a key generator. But, when a write
* client is instantiated for the metadata table, when virtual keys are enabled, and when
* key generator class is not configured, the default SimpleKeyGenerator will be used.
* To avoid using any other key generators for the metadata table which rely on certain
* config properties, we need this custom key generator exclusively for the metadata table.
*/
public class HoodieTableMetadataKeyGenerator extends BaseKeyGenerator {

public HoodieTableMetadataKeyGenerator(TypedProperties config) {
super(config);
}

@Override
public String getRecordKey(GenericRecord record) {
return KeyGenUtils.getRecordKey(record, HoodieMetadataPayload.SCHEMA_FIELD_ID_KEY);
}

@Override
public String getPartitionPath(GenericRecord record) {
return "";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,8 @@ private void writeToFile(Schema wrapperSchema, List<IndexedRecord> records) thro
if (records.size() > 0) {
Map<HeaderMetadataType, String> header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, wrapperSchema.toString());
HoodieAvroDataBlock block = new HoodieAvroDataBlock(records, header);
final String keyField = table.getMetaClient().getTableConfig().getRecordKeyFieldProp();
HoodieAvroDataBlock block = new HoodieAvroDataBlock(records, header, keyField);
writer.appendBlock(block);
records.clear();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ public List<WriteStatus> compact(HoodieCompactionHandler compactionHandler,
.withDiskMapType(config.getCommonConfig().getSpillableDiskMapType())
.withBitCaskDiskMapCompressionEnabled(config.getCommonConfig().isBitCaskDiskMapCompressionEnabled())
.withOperationField(config.allowOperationMetadataField())
.withPartition(operation.getPartitionPath())
.build();
if (!scanner.iterator().hasNext()) {
scanner.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ private JavaRDD<HoodieRecord<T>> readRecordsForGroupWithLogs(JavaSparkContext js
.withReverseReader(config.getCompactionReverseLogReadEnabled())
.withBufferSize(config.getMaxDFSStreamBufferSize())
.withSpillableMapBasePath(config.getSpillableMapBasePath())
.withPartition(clusteringOp.getPartitionPath())
.build();

Option<HoodieFileReader> baseFileReader = StringUtils.isNullOrEmpty(clusteringOp.getDataFilePath())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ public SimpleKeyGenerator(TypedProperties props) {
SimpleKeyGenerator(TypedProperties props, String recordKeyField, String partitionPathField) {
super(props);
this.recordKeyFields = recordKeyField == null
? Collections.emptyList()
: Collections.singletonList(recordKeyField);
this.partitionPathFields = Collections.singletonList(partitionPathField);
? Collections.emptyList() : Collections.singletonList(recordKeyField);
this.partitionPathFields = partitionPathField == null
? Collections.emptyList() : Collections.singletonList(partitionPathField);
simpleAvroKeyGenerator = new SimpleAvroKeyGenerator(props, recordKeyField, partitionPathField);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;

import java.io.IOException;
import java.nio.file.Files;
Expand Down Expand Up @@ -358,8 +359,9 @@ public void testRollbackOperations(HoodieTableType tableType) throws Exception {
* Test that manual rollbacks work correctly and enough timeline history is maintained on the metadata table
* timeline.
*/
@Test
public void testManualRollbacks() throws Exception {
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testManualRollbacks(final boolean populateMateFields) throws Exception {
HoodieTableType tableType = COPY_ON_WRITE;
init(tableType, false);
// Setting to archive more aggressively on the Metadata Table than the Dataset
Expand All @@ -369,7 +371,9 @@ public void testManualRollbacks() throws Exception {
writeConfig = getWriteConfigBuilder(true, true, false)
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true)
.archiveCommitsWith(minArchiveCommitsMetadata, minArchiveCommitsMetadata + 1).retainCommits(1)
.withMaxNumDeltaCommitsBeforeCompaction(maxDeltaCommitsBeforeCompaction).build())
.withMaxNumDeltaCommitsBeforeCompaction(maxDeltaCommitsBeforeCompaction)
.withPopulateMetaFields(populateMateFields)
.build())
.withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(minArchiveCommitsDataset, minArchiveCommitsDataset + 1)
.retainCommits(1).retainFileVersions(1).withAutoClean(false).withAsyncClean(true).build())
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,16 @@
import org.apache.hudi.common.table.view.TableFileSystemView;
import org.apache.hudi.common.testutils.HoodieTestTable;
import org.apache.hudi.metadata.HoodieBackedTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadataKeyGenerator;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;

import org.apache.hadoop.fs.FileStatus;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;

import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -90,4 +93,20 @@ private void verifyBaseMetadataTable() throws IOException {
});
}

/**
* Verify if the Metadata table is constructed with table properties including
* the right key generator class name.
*/
@ParameterizedTest
@EnumSource(HoodieTableType.class)
public void testMetadataTableKeyGenerator(final HoodieTableType tableType) throws Exception {
init(tableType);

HoodieBackedTableMetadata tableMetadata = new HoodieBackedTableMetadata(context,
writeConfig.getMetadataConfig(), writeConfig.getBasePath(), writeConfig.getSpillableMapBasePath(), false);

assertEquals(HoodieTableMetadataKeyGenerator.class.getCanonicalName(),
tableMetadata.getMetadataMetaClient().getTableConfig().getKeyGeneratorClassName());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,9 @@ protected HoodieWriteConfig.Builder getWriteConfigBuilder(HoodieFailedWritesClea
.withMetadataConfig(HoodieMetadataConfig.newBuilder()
.enable(useFileListingMetadata)
.enableFullScan(enableFullScan)
.enableMetrics(enableMetrics).build())
.enableMetrics(enableMetrics)
.withPopulateMetaFields(false)
.build())
.withMetricsConfig(HoodieMetricsConfig.newBuilder().on(enableMetrics)
.withExecutorMetrics(true).build())
.withMetricsGraphiteConfig(HoodieMetricsGraphiteConfig.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,13 +283,26 @@ protected Properties getPropertiesForKeyGen() {
return properties;
}

protected void addConfigsForPopulateMetaFields(HoodieWriteConfig.Builder configBuilder, boolean populateMetaFields) {
protected Properties getPropertiesForMetadataTable() {
Properties properties = new Properties();
properties.put(HoodieTableConfig.POPULATE_META_FIELDS.key(), "false");
properties.put("hoodie.datasource.write.recordkey.field", "key");
properties.put(HoodieTableConfig.RECORDKEY_FIELDS.key(), "key");
return properties;
}

protected void addConfigsForPopulateMetaFields(HoodieWriteConfig.Builder configBuilder, boolean populateMetaFields,
boolean isMetadataTable) {
if (!populateMetaFields) {
configBuilder.withProperties(getPropertiesForKeyGen())
configBuilder.withProperties((isMetadataTable ? getPropertiesForMetadataTable() : getPropertiesForKeyGen()))
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.SIMPLE).build());
}
}

protected void addConfigsForPopulateMetaFields(HoodieWriteConfig.Builder configBuilder, boolean populateMetaFields) {
addConfigsForPopulateMetaFields(configBuilder, populateMetaFields, false);
}

/**
* Cleanups hoodie clients.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,12 @@ public final class HoodieMetadataConfig extends HoodieConfig {
.sinceVersion("0.10.0")
.withDocumentation("Enable full scanning of log files while reading log records. If disabled, hudi does look up of only interested entries.");

public static final ConfigProperty<String> POPULATE_META_FIELDS = ConfigProperty
.key(METADATA_PREFIX + ".populate.meta.fields")
.defaultValue("false")
.sinceVersion("0.10.0")
.withDocumentation("When enabled, populates all meta fields. When disabled, no meta fields are populated.");

private HoodieMetadataConfig() {
super();
}
Expand Down Expand Up @@ -164,6 +170,10 @@ public boolean enableFullScan() {
return getBoolean(ENABLE_FULL_SCAN_LOG_FILES);
}

public boolean populateMetaFields() {
return getBooleanOrDefault(HoodieMetadataConfig.POPULATE_META_FIELDS);
}

public static class Builder {

private EngineType engineType = EngineType.SPARK;
Expand Down Expand Up @@ -206,6 +216,11 @@ public Builder withMaxNumDeltaCommitsBeforeCompaction(int maxNumDeltaCommitsBefo
return this;
}

public Builder withPopulateMetaFields(boolean populateMetaFields) {
metadataConfig.setValue(POPULATE_META_FIELDS, Boolean.toString(populateMetaFields));
return this;
}

public Builder archiveCommitsWith(int minToKeep, int maxToKeep) {
metadataConfig.setValue(MIN_COMMITS_TO_KEEP, String.valueOf(minToKeep));
metadataConfig.setValue(MAX_COMMITS_TO_KEEP, String.valueOf(maxToKeep));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
Expand Down Expand Up @@ -364,7 +365,7 @@ public boolean populateMetaFields() {
* @returns the record key field prop.
*/
public String getRecordKeyFieldProp() {
return getString(RECORDKEY_FIELDS);
return getStringOrDefault(RECORDKEY_FIELDS, HoodieRecord.RECORD_KEY_METADATA_FIELD);
}

public String getKeyGeneratorClassName() {
Expand Down
Loading