Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,12 @@ private HoodieWriteConfig createMetadataWriteConfig(HoodieWriteConfig writeConfi
builder.withProperties(properties);

if (writeConfig.isMetricsOn()) {
// Table Name is needed for metric reporters prefix
Properties commonProperties = new Properties();
commonProperties.put(HoodieWriteConfig.TBL_NAME.key(), tableName);

builder.withMetricsConfig(HoodieMetricsConfig.newBuilder()
.fromProperties(commonProperties)
.withReporterType(writeConfig.getMetricsReporterType().toString())
.withExecutorMetrics(writeConfig.isExecutorMetricsEnabled())
.on(true).build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,6 @@

package org.apache.hudi.client.functional;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.util.Time;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieMetadataColumnStats;
Expand Down Expand Up @@ -81,14 +73,14 @@
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.common.util.hash.ColumnIndexID;
import org.apache.hudi.common.util.hash.PartitionIndexID;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieArchivalConfig;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieLockConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.config.HoodieLockConfig;
import org.apache.hudi.config.HoodieStorageConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.io.storage.HoodieHFileReader;
Expand All @@ -107,6 +99,15 @@
import org.apache.hudi.table.upgrade.SparkUpgradeDowngradeHelper;
import org.apache.hudi.table.upgrade.UpgradeDowngrade;
import org.apache.hudi.testutils.MetadataMergeWriteStatus;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.util.Time;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.parquet.avro.AvroSchemaConverter;
Expand Down Expand Up @@ -178,6 +179,19 @@ public static List<Arguments> tableTypeAndEnableOperationArgs() {
);
}

public static List<Arguments> tableOperationsTestArgs() {
return asList(
Arguments.of(COPY_ON_WRITE, true, true),
Arguments.of(COPY_ON_WRITE, true, false),
Arguments.of(COPY_ON_WRITE, false, true),
Arguments.of(COPY_ON_WRITE, false, false),
Arguments.of(MERGE_ON_READ, true, true),
Arguments.of(MERGE_ON_READ, true, false),
Arguments.of(MERGE_ON_READ, false, true),
Arguments.of(MERGE_ON_READ, false, false)
);
}

/**
* Metadata Table bootstrap scenarios.
*/
Expand Down Expand Up @@ -441,28 +455,34 @@ public void testOnlyValidPartitionsAdded(HoodieTableType tableType) throws Excep
* Test various table operations sync to Metadata Table correctly.
*/
@ParameterizedTest
@MethodSource("tableTypeAndEnableOperationArgs")
public void testTableOperations(HoodieTableType tableType, boolean enableFullScan) throws Exception {
init(tableType, true, enableFullScan, false, false);
doWriteInsertAndUpsert(testTable);
@MethodSource("tableOperationsTestArgs")
public void testTableOperations(HoodieTableType tableType, boolean enableFullScan, boolean enableMetrics) throws Exception {
List<Long> commitTimeList = new ArrayList<>();
commitTimeList.add(Long.parseLong(HoodieActiveTimeline.createNewInstantTime()));
for (int i = 0; i < 8; i++) {
long nextCommitTime = getNextCommitTime(commitTimeList.get(commitTimeList.size() - 1));
commitTimeList.add(nextCommitTime);
}
init(tableType, true, enableFullScan, enableMetrics, false);
doWriteInsertAndUpsert(testTable, commitTimeList.get(0).toString(), commitTimeList.get(1).toString(), false);

// trigger an upsert
doWriteOperationAndValidate(testTable, "0000003");
doWriteOperationAndValidate(testTable, commitTimeList.get(2).toString());

// trigger compaction
if (MERGE_ON_READ.equals(tableType)) {
doCompactionAndValidate(testTable, "0000004");
doCompactionAndValidate(testTable, commitTimeList.get(3).toString());
}

// trigger an upsert
doWriteOperation(testTable, "0000005");
doWriteOperation(testTable, commitTimeList.get(4).toString());

// trigger clean
doCleanAndValidate(testTable, "0000006", singletonList("0000001"));
doCleanAndValidate(testTable, commitTimeList.get(5).toString(), singletonList(commitTimeList.get(0).toString()));

// trigger few upserts and validate
doWriteOperation(testTable, "0000007");
doWriteOperation(testTable, "0000008");
doWriteOperation(testTable, commitTimeList.get(6).toString());
doWriteOperation(testTable, commitTimeList.get(7).toString());
validateMetadata(testTable, emptyList(), true);
}

Expand Down