Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.utils.MetadataConversionUtils;
import org.apache.hudi.common.HoodieCleanStat;
import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
Expand Down Expand Up @@ -96,6 +97,7 @@ public void testArchiveEmptyTable() throws IOException {
HoodieWriteConfig cfg =
HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
.withParallelism(2, 2).forTable("test-trip-table").build();
initMetadataTable(cfg);
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, table);
Expand All @@ -109,7 +111,7 @@ public void testArchiveTableWithArchival() throws IOException {
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 4).build())
.forTable("test-trip-table").build();
HoodieTestUtils.init(hadoopConf, basePath);
initMetadataTable(cfg);
// Requested Compaction
HoodieTestDataGenerator.createCompactionAuxiliaryMetadata(basePath,
new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, "100"), wrapperFs.getConf());
Expand Down Expand Up @@ -235,6 +237,7 @@ public void testArchiveTableWithNoArchival() throws IOException {
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2).forTable("test-trip-table")
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 5).build())
.build();
initMetadataTable(cfg);
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, table);
Expand Down Expand Up @@ -301,6 +304,7 @@ public void testArchiveCommitSafety() throws IOException {
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2).forTable("test-trip-table")
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 5).build())
.build();
initMetadataTable(cfg);
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, table);
Expand Down Expand Up @@ -328,14 +332,15 @@ public void testArchiveCommitSavepointNoHole() throws IOException {
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2).forTable("test-trip-table")
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 5).build())
.build();

initMetadataTable(cfg);
HoodieTestDataGenerator.createCommitFile(basePath, "100", wrapperFs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "101", wrapperFs.getConf());
HoodieTestDataGenerator.createSavepointFile(basePath, "101", wrapperFs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "102", wrapperFs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "103", wrapperFs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "104", wrapperFs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "105", wrapperFs.getConf());

HoodieTable table = HoodieSparkTable.create(cfg, context);
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, table);

Expand All @@ -359,6 +364,7 @@ public void testArchiveRollbacks() throws IOException {
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2).forTable("test-trip-table")
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 3).build())
.build();
initMetadataTable(cfg);

createCommitAndRollbackFile("100", "101", false);
createCommitAndRollbackFile("102", "103", false);
Expand Down Expand Up @@ -388,6 +394,8 @@ public void testArchiveCommitCompactionNoHole() throws IOException {
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2).forTable("test-trip-table")
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 5).build())
.build();
initMetadataTable(cfg);

HoodieTestDataGenerator.createCommitFile(basePath, "100", wrapperFs.getConf());
HoodieTestDataGenerator.createCompactionRequestedFile(basePath, "101", wrapperFs.getConf());
HoodieTestDataGenerator.createCompactionAuxiliaryMetadata(basePath,
Expand Down Expand Up @@ -436,6 +444,7 @@ public void testArchiveCommitTimeline() throws IOException {
.withParallelism(2, 2).forTable("test-trip-table")
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 3).build())
.build();
initMetadataTable(cfg);
metaClient = HoodieTableMetaClient.reload(metaClient);

HoodieTestDataGenerator.createCommitFile(basePath, "1", wrapperFs.getConf());
Expand Down Expand Up @@ -488,6 +497,7 @@ public void testArchiveCompletedClean() throws IOException {
.withParallelism(2, 2).forTable("test-trip-table")
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 3).build())
.build();
initMetadataTable(cfg);
metaClient = HoodieTableMetaClient.reload(metaClient);

createCleanMetadata("10", false);
Expand All @@ -513,6 +523,7 @@ public void testArchiveCompletedRollback() throws IOException {
.withParallelism(2, 2).forTable("test-trip-table")
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 3).build())
.build();
initMetadataTable(cfg);
metaClient = HoodieTableMetaClient.reload(metaClient);

createCommitAndRollbackFile("6", "10", false);
Expand Down Expand Up @@ -543,6 +554,7 @@ public void testArchiveCompletedShouldRetainMinInstantsIfInstantsGreaterThanMaxt
.withParallelism(2, 2).forTable("test-trip-table")
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(minInstants, maxInstants).build())
.build();
initMetadataTable(cfg);
metaClient = HoodieTableMetaClient.reload(metaClient);
for (int i = 0; i < maxInstants + 2; i++) {
createCleanMetadata(i + "", false);
Expand All @@ -564,6 +576,7 @@ public void testArchiveCompletedShouldNotArchiveIfInstantsLessThanMaxtoKeep() th
.withParallelism(2, 2).forTable("test-trip-table")
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(minInstants, maxInstants).build())
.build();
initMetadataTable(cfg);
metaClient = HoodieTableMetaClient.reload(metaClient);
for (int i = 0; i < maxInstants; i++) {
createCleanMetadata(i + "", false);
Expand All @@ -585,6 +598,7 @@ public void testArchiveCompletedRollbackAndClean() throws IOException {
.withParallelism(2, 2).forTable("test-trip-table")
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(minInstantsToKeep, maxInstantsToKeep).build())
.build();
initMetadataTable(cfg);
metaClient = HoodieTableMetaClient.reload(metaClient);

int startInstant = 1;
Expand Down Expand Up @@ -618,6 +632,7 @@ public void testArchiveInflightClean() throws IOException {
.withParallelism(2, 2).forTable("test-trip-table")
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 3).build())
.build();
initMetadataTable(cfg);
metaClient = HoodieTableMetaClient.reload(metaClient);

createCleanMetadata("10", false);
Expand Down Expand Up @@ -677,4 +692,11 @@ private HoodieInstant createRollbackMetadata(String rollbackTime, String commitT
}
return new HoodieInstant(inflight, "rollback", rollbackTime);
}

private void initMetadataTable(HoodieWriteConfig writeConfig) {
// Init the metadata table, so that commits/instants created externally can get synced later. Without
// instants getting synced, archival of those instants will not trigger.
SparkRDDWriteClient client = new SparkRDDWriteClient(context, writeConfig);
client.syncTableMetadata();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hudi.io.storage.row;

import org.apache.hudi.client.HoodieInternalWriteStatus;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
Expand Down Expand Up @@ -168,11 +169,15 @@ public void testGlobalFailure() throws Exception {
@Test
public void testInstantiationFailure() throws IOException {
// init config and table
HoodieWriteConfig cfg = SparkDatasetTestUtils.getConfigBuilder(basePath).withPath("/dummypath/abc/").build();
HoodieWriteConfig cfg = SparkDatasetTestUtils.getConfigBuilder("/dummypath/abc/")
.withMetadataConfig(HoodieMetadataConfig.newBuilder()
.enable(false)
.build())
.build();
HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);

try {
new HoodieRowCreateHandle(table, cfg, " def", UUID.randomUUID().toString(), "001", RANDOM.nextInt(100000), RANDOM.nextLong(), RANDOM.nextLong(), SparkDatasetTestUtils.STRUCT_TYPE);
new HoodieRowCreateHandle(table, cfg, "def", UUID.randomUUID().toString(), "001", RANDOM.nextInt(100000), RANDOM.nextLong(), RANDOM.nextLong(), SparkDatasetTestUtils.STRUCT_TYPE);
fail("Should have thrown exception");
} catch (HoodieInsertException ioe) {
// expected
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1079,8 +1079,15 @@ public void testLogFileCountsAfterCompaction(boolean populateMetaFields) throws
HoodieTable table = HoodieSparkTable.create(config, context, metaClient);
HoodieSparkWriteableTestTable.of(table, HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS)
.withLogAppends(updatedRecords);
// In writeRecordsToLogFiles, no commit files are getting added, so resetting file-system view state
((SyncableFileSystemView) (table.getSliceView())).reset();

// Mark 2nd delta-instant as completed
metaClient.getActiveTimeline().createNewInstant(new HoodieInstant(State.INFLIGHT,
HoodieTimeline.DELTA_COMMIT_ACTION, newCommitTime));
metaClient.getActiveTimeline().saveAsComplete(
new HoodieInstant(State.INFLIGHT, HoodieTimeline.DELTA_COMMIT_ACTION, newCommitTime), Option.empty());

metaClient = HoodieTableMetaClient.reload(metaClient);
table = HoodieSparkTable.create(config, context, metaClient);

// Verify that all data file has one log file
for (String partitionPath : dataGen.getPartitionPaths()) {
Expand All @@ -1091,12 +1098,6 @@ public void testLogFileCountsAfterCompaction(boolean populateMetaFields) throws
}
}

// Mark 2nd delta-instant as completed
metaClient.getActiveTimeline().createNewInstant(new HoodieInstant(State.INFLIGHT,
HoodieTimeline.DELTA_COMMIT_ACTION, newCommitTime));
metaClient.getActiveTimeline().saveAsComplete(
new HoodieInstant(State.INFLIGHT, HoodieTimeline.DELTA_COMMIT_ACTION, newCommitTime), Option.empty());

// Do a compaction
String compactionInstantTime = writeClient.scheduleCompaction(Option.empty()).get().toString();
JavaRDD<WriteStatus> result = (JavaRDD<WriteStatus>) writeClient.compact(compactionInstantTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public final class HoodieMetadataConfig extends HoodieConfig {
// Enable the internal Metadata Table which saves file listings
public static final ConfigProperty<Boolean> METADATA_ENABLE_PROP = ConfigProperty
.key(METADATA_PREFIX + ".enable")
.defaultValue(false)
.defaultValue(true)
.sinceVersion("0.7.0")
.withDocumentation("Enable the internal metadata table which serves table metadata like level file listings");

Expand All @@ -51,8 +51,6 @@ public final class HoodieMetadataConfig extends HoodieConfig {
.sinceVersion("0.7.0")
.withDocumentation("Validate contents of metadata table on each access; e.g against the actual listings from lake storage");

public static final boolean DEFAULT_METADATA_ENABLE_FOR_READERS = false;

// Enable metrics for internal Metadata Table
public static final ConfigProperty<Boolean> METADATA_METRICS_ENABLE_PROP = ConfigProperty
.key(METADATA_PREFIX + ".metrics.enable")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.hadoop;

import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.engine.HoodieLocalEngineContext;
import org.apache.hudi.common.fs.FSUtils;
Expand All @@ -29,7 +30,6 @@
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.TableNotFoundException;
import org.apache.hudi.hadoop.utils.HoodieHiveUtils;
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;

import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -176,7 +176,7 @@ public boolean accept(Path path) {
}

fsView = FileSystemViewManager.createInMemoryFileSystemView(engineContext,
metaClient, HoodieInputFormatUtils.buildMetadataConfig(getConf()));
metaClient, HoodieMetadataConfig.newBuilder().build());
String partition = FSUtils.getRelativePartitionPath(new Path(metaClient.getBasePath()), folder);
List<HoodieBaseFile> latestFiles = fsView.getLatestBaseFiles(partition).collect(Collectors.toList());
// populate the cache
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,6 @@
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS;
import static org.apache.hudi.common.config.HoodieMetadataConfig.METADATA_ENABLE_PROP;
import static org.apache.hudi.common.config.HoodieMetadataConfig.METADATA_VALIDATE_PROP;

public class HoodieInputFormatUtils {

// These positions have to be deterministic across all tables
Expand Down Expand Up @@ -416,13 +412,6 @@ public static Map<HoodieTableMetaClient, List<Path>> groupSnapshotPathsByMetaCli
return grouped;
}

public static HoodieMetadataConfig buildMetadataConfig(Configuration conf) {
return HoodieMetadataConfig.newBuilder()
.enable(conf.getBoolean(METADATA_ENABLE_PROP.key(), DEFAULT_METADATA_ENABLE_FOR_READERS))
.validate(conf.getBoolean(METADATA_VALIDATE_PROP.key(), METADATA_VALIDATE_PROP.defaultValue()))
.build();
}

public static List<FileStatus> filterFileStatusForSnapshotMode(JobConf job, Map<String, HoodieTableMetaClient> tableMetaClientMap,
List<Path> snapshotPaths) throws IOException {
HoodieLocalEngineContext engineContext = new HoodieLocalEngineContext(job);
Expand All @@ -443,7 +432,8 @@ public static List<FileStatus> filterFileStatusForSnapshotMode(JobConf job, Map<
HoodieTimeline timeline = HoodieHiveUtils.getTableTimeline(metaClient.getTableConfig().getTableName(), job, metaClient);

HoodieTableFileSystemView fsView = fsViewCache.computeIfAbsent(metaClient, tableMetaClient ->
FileSystemViewManager.createInMemoryFileSystemViewWithTimeline(engineContext, tableMetaClient, buildMetadataConfig(job), timeline));
FileSystemViewManager.createInMemoryFileSystemViewWithTimeline(engineContext, tableMetaClient,
HoodieMetadataConfig.newBuilder().build(), timeline));
List<HoodieBaseFile> filteredBaseFiles = new ArrayList<>();
for (Path p : entry.getValue()) {
String relativePartitionPath = FSUtils.getRelativePartitionPath(new Path(metaClient.getBasePath()), p);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.hadoop.utils;

import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.engine.HoodieLocalEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
Expand Down Expand Up @@ -105,7 +106,7 @@ public static InputSplit[] getRealtimeSplits(Configuration conf, Stream<FileSpli
if (!fsCache.containsKey(metaClient)) {
HoodieLocalEngineContext engineContext = new HoodieLocalEngineContext(conf);
HoodieTableFileSystemView fsView = FileSystemViewManager.createInMemoryFileSystemView(engineContext,
metaClient, HoodieInputFormatUtils.buildMetadataConfig(conf));
metaClient, HoodieMetadataConfig.newBuilder().build());
fsCache.put(metaClient, fsView);
}
HoodieTableFileSystemView fsView = fsCache.get(metaClient);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ protected List<String> getPartitions(Option<Integer> partitionsLimit) throws IOE
// calls in metrics as they are not part of normal HUDI operation.
HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
List<String> partitionPaths = FSUtils.getAllPartitionPaths(engineContext, metaClient.getBasePath(),
HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS, HoodieMetadataConfig.METADATA_VALIDATE_PROP.defaultValue(), false);
HoodieMetadataConfig.METADATA_ENABLE_PROP.defaultValue(), HoodieMetadataConfig.METADATA_VALIDATE_PROP.defaultValue(), false);
// Sort partition so we can pick last N partitions by default
Collections.sort(partitionPaths);
if (!partitionPaths.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ case class HoodieFileIndex(
// would be able to run SET hoodie.metadata.enable=true in the spark sql session to enable metadata listing.
properties.put(HoodieMetadataConfig.METADATA_ENABLE_PROP,
sqlConf.getConfString(HoodieMetadataConfig.METADATA_ENABLE_PROP.key(),
HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS.toString))
HoodieMetadataConfig.METADATA_ENABLE_PROP.defaultValue().toString))
properties.put(HoodieMetadataConfig.METADATA_VALIDATE_PROP,
sqlConf.getConfString(HoodieMetadataConfig.METADATA_VALIDATE_PROP.key(),
HoodieMetadataConfig.METADATA_VALIDATE_PROP.defaultValue().toString))
Expand Down
Loading