Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions hudi-cli/src/main/java/org/apache/hudi/cli/HoodieCLI.java
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ public static void initFS(boolean force) throws IOException {
}

public static void refreshTableMetadata() {
setTableMetaClient(new HoodieTableMetaClient(HoodieCLI.conf, basePath, false, HoodieCLI.consistencyGuardConfig,
Option.of(layoutVersion)));
setTableMetaClient(HoodieTableMetaClient.builder().setConf(HoodieCLI.conf).setBasePath(basePath).setLoadActiveTimelineOnLoad(false).setConsistencyGuardConfig(HoodieCLI.consistencyGuardConfig)
.setLayoutVersion(Option.of(layoutVersion)).build());
}

public static void connectTo(String basePath, Integer layoutVersion) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ public String showCommitFiles(
public String compareCommits(@CliOption(key = {"path"}, help = "Path of the table to compare to") final String path) {

HoodieTableMetaClient source = HoodieCLI.getTableMetaClient();
HoodieTableMetaClient target = new HoodieTableMetaClient(HoodieCLI.conf, path);
HoodieTableMetaClient target = HoodieTableMetaClient.builder().setConf(HoodieCLI.conf).setBasePath(path).build();
HoodieTimeline targetTimeline = target.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
HoodieTimeline sourceTimeline = source.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
String targetLatestCommit =
Expand All @@ -426,7 +426,7 @@ public String compareCommits(@CliOption(key = {"path"}, help = "Path of the tabl

@CliCommand(value = "commits sync", help = "Compare commits with another Hoodie table")
public String syncCommits(@CliOption(key = {"path"}, help = "Path of the table to compare to") final String path) {
HoodieCLI.syncTableMetadata = new HoodieTableMetaClient(HoodieCLI.conf, path);
HoodieCLI.syncTableMetadata = HoodieTableMetaClient.builder().setConf(HoodieCLI.conf).setBasePath(path).build();
HoodieCLI.state = HoodieCLI.CLIState.SYNC;
return "Load sync state between " + HoodieCLI.getTableMetaClient().getTableConfig().getTableName() + " and "
+ HoodieCLI.syncTableMetadata.getTableConfig().getTableName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ private HoodieTableFileSystemView buildFileSystemView(String globRegex, String m
boolean includeMaxInstant, boolean includeInflight, boolean excludeCompaction) throws IOException {
HoodieTableMetaClient client = HoodieCLI.getTableMetaClient();
HoodieTableMetaClient metaClient =
new HoodieTableMetaClient(client.getHadoopConf(), client.getBasePath(), true);
HoodieTableMetaClient.builder().setConf(client.getHadoopConf()).setBasePath(client.getBasePath()).setLoadActiveTimelineOnLoad(true).build();
FileSystem fs = HoodieCLI.fs;
String globPath = String.format("%s/%s/*", client.getBasePath(), globRegex);
List<FileStatus> statuses = FSUtils.getGlobStatusExcludingMetaFolder(fs, new Path(globPath));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -402,8 +402,10 @@ private static int deleteSavepoint(JavaSparkContext jsc, String savepointTime, S
*/
protected static int upgradeOrDowngradeTable(JavaSparkContext jsc, String basePath, String toVersion) {
HoodieWriteConfig config = getWriteConfig(basePath);
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), false,
config.getConsistencyGuardConfig(), Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion())));
HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(config.getBasePath())
.setLoadActiveTimelineOnLoad(false).setConsistencyGuardConfig(config.getConsistencyGuardConfig())
.setLayoutVersion(Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion()))).build();
try {
new SparkUpgradeDowngrade(metaClient, config, new HoodieSparkEngineContext(jsc)).run(metaClient, HoodieTableVersion.valueOf(toVersion), config, new HoodieSparkEngineContext(jsc), null);
LOG.info(String.format("Table at \"%s\" upgraded / downgraded to version \"%s\".", basePath, toVersion));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public String createTable(

boolean existing = false;
try {
new HoodieTableMetaClient(HoodieCLI.conf, path);
HoodieTableMetaClient.builder().setConf(HoodieCLI.conf).setBasePath(path).build();
existing = true;
} catch (TableNotFoundException dfe) {
// expected
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ class DedupeSparkJob(basePath: String,
val tmpTableName = s"htbl_${System.currentTimeMillis()}"
val dedupeTblName = s"${tmpTableName}_dupeKeys"

val metadata = new HoodieTableMetaClient(fs.getConf, basePath)
val metadata = HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(basePath).build()

val allFiles = fs.listStatus(new org.apache.hadoop.fs.Path(s"$basePath/$duplicatedPartitionPath"))
val fsView = new HoodieTableFileSystemView(metadata, metadata.getActiveTimeline.getCommitTimeline.filterCompletedInstants(), allFiles)
Expand Down Expand Up @@ -184,7 +184,7 @@ class DedupeSparkJob(basePath: String,
}

def fixDuplicates(dryRun: Boolean = true) = {
val metadata = new HoodieTableMetaClient(fs.getConf, basePath)
val metadata = HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(basePath).build()

val allFiles = fs.listStatus(new Path(s"$basePath/$duplicatedPartitionPath"))
val fsView = new HoodieTableFileSystemView(metadata, metadata.getActiveTimeline.getCommitTimeline.filterCompletedInstants(), allFiles)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,9 @@ protected void initWrapperFSMetrics() {
}

protected HoodieTableMetaClient createMetaClient(boolean loadActiveTimelineOnLoad) {
return new HoodieTableMetaClient(hadoopConf, config.getBasePath(), loadActiveTimelineOnLoad,
config.getConsistencyGuardConfig(),
Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion())));
return HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(config.getBasePath())
.setLoadActiveTimelineOnLoad(loadActiveTimelineOnLoad).setConsistencyGuardConfig(config.getConsistencyGuardConfig())
.setLayoutVersion(Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion()))).build();
}

public Option<EmbeddedTimelineService> getTimelineServer() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ protected HoodieBackedTableMetadataWriter(Configuration hadoopConf, HoodieWriteC
ValidationUtils.checkArgument(!this.metadataWriteConfig.useFileListingMetadata(), "File listing cannot be used for Metadata Table");

initRegistry();
HoodieTableMetaClient datasetMetaClient = new HoodieTableMetaClient(hadoopConf, datasetWriteConfig.getBasePath());
HoodieTableMetaClient datasetMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(datasetWriteConfig.getBasePath()).build();
initialize(engineContext, datasetMetaClient);
if (enabled) {
// This is always called even in case the table was created for the first time. This is because
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,10 @@ protected HoodieFlinkTable(HoodieWriteConfig config, HoodieEngineContext context
}

public static <T extends HoodieRecordPayload> HoodieFlinkTable<T> create(HoodieWriteConfig config, HoodieFlinkEngineContext context) {
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(
context.getHadoopConf().get(),
config.getBasePath(),
true,
config.getConsistencyGuardConfig(),
Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion()))
);
HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setConf(context.getHadoopConf().get()).setBasePath(config.getBasePath())
.setLoadActiveTimelineOnLoad(true).setConsistencyGuardConfig(config.getConsistencyGuardConfig())
.setLayoutVersion(Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion()))).build();
return HoodieFlinkTable.create(config, context, metaClient);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,10 @@ protected HoodieJavaTable(HoodieWriteConfig config, HoodieEngineContext context,
}

public static <T extends HoodieRecordPayload> HoodieJavaTable<T> create(HoodieWriteConfig config, HoodieEngineContext context) {
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(
context.getHadoopConf().get(),
config.getBasePath(),
true,
config.getConsistencyGuardConfig(),
Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion()))
);
HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setConf(context.getHadoopConf().get()).setBasePath(config.getBasePath())
.setLoadActiveTimelineOnLoad(true).setConsistencyGuardConfig(config.getConsistencyGuardConfig())
.setLayoutVersion(Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion()))).build();
return HoodieJavaTable.create(config, (HoodieJavaEngineContext) context, metaClient);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public HoodieReadClient(HoodieSparkEngineContext context, HoodieWriteConfig clie
this.hadoopConf = context.getHadoopConf().get();
final String basePath = clientConfig.getBasePath();
// Create a Hoodie table which encapsulated the commits and files visible
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, basePath, true);
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build();
this.hoodieTable = HoodieSparkTable.create(clientConfig, context, metaClient);
this.index = SparkHoodieIndex.createIndex(clientConfig);
this.sqlContextOpt = Option.empty();
Expand Down Expand Up @@ -199,7 +199,7 @@ public JavaRDD<HoodieRecord<T>> tagLocation(JavaRDD<HoodieRecord<T>> hoodieRecor
*/
public List<Pair<String, HoodieCompactionPlan>> getPendingCompactions() {
HoodieTableMetaClient metaClient =
new HoodieTableMetaClient(hadoopConf, hoodieTable.getMetaClient().getBasePath(), true);
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(hoodieTable.getMetaClient().getBasePath()).setLoadActiveTimelineOnLoad(true).build();
return CompactionUtils.getAllPendingCompactionPlans(metaClient).stream()
.map(
instantWorkloadPair -> Pair.of(instantWorkloadPair.getKey().getTimestamp(), instantWorkloadPair.getValue()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,10 @@ protected HoodieSparkTable(HoodieWriteConfig config, HoodieEngineContext context
}

public static <T extends HoodieRecordPayload> HoodieSparkTable<T> create(HoodieWriteConfig config, HoodieEngineContext context) {
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(
context.getHadoopConf().get(),
config.getBasePath(),
true,
config.getConsistencyGuardConfig(),
Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion()))
);
HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setConf(context.getHadoopConf().get()).setBasePath(config.getBasePath())
.setLoadActiveTimelineOnLoad(true).setConsistencyGuardConfig(config.getConsistencyGuardConfig())
.setLayoutVersion(Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion()))).build();
return HoodieSparkTable.create(config, (HoodieSparkEngineContext) context, metaClient);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ private void validateRepair(String ingestionInstant, String compactionInstant, i
int expNumRepairs) throws Exception {
List<Pair<HoodieLogFile, HoodieLogFile>> renameFiles =
validateUnSchedulePlan(client, ingestionInstant, compactionInstant, numEntriesPerInstant, expNumRepairs, true);
metaClient = new HoodieTableMetaClient(metaClient.getHadoopConf(), basePath, true);
metaClient = HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build();
List<ValidationOpResult> result = client.validateCompactionPlan(metaClient, compactionInstant, 1);
if (expNumRepairs > 0) {
assertTrue(result.stream().anyMatch(r -> !r.isSuccess()), "Expect some failures in validation");
Expand Down Expand Up @@ -176,7 +176,7 @@ private void validateRepair(String ingestionInstant, String compactionInstant, i
* @param compactionInstant Compaction Instant
*/
private void ensureValidCompactionPlan(String compactionInstant) throws Exception {
metaClient = new HoodieTableMetaClient(metaClient.getHadoopConf(), basePath, true);
metaClient = HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build();
// Ensure compaction-plan is good to begin with
List<ValidationOpResult> validationResults = client.validateCompactionPlan(metaClient, compactionInstant, 1);
assertFalse(validationResults.stream().anyMatch(v -> !v.isSuccess()),
Expand Down Expand Up @@ -234,7 +234,7 @@ private List<Pair<HoodieLogFile, HoodieLogFile>> validateUnSchedulePlan(Compacti
// Check suggested rename operations
List<Pair<HoodieLogFile, HoodieLogFile>> renameFiles =
client.getRenamingActionsForUnschedulingCompactionPlan(metaClient, compactionInstant, 1, Option.empty(), false);
metaClient = new HoodieTableMetaClient(metaClient.getHadoopConf(), basePath, true);
metaClient = HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build();

// Log files belonging to file-slices created because of compaction request must be renamed

Expand Down Expand Up @@ -270,7 +270,7 @@ private List<Pair<HoodieLogFile, HoodieLogFile>> validateUnSchedulePlan(Compacti

client.unscheduleCompactionPlan(compactionInstant, false, 1, false);

metaClient = new HoodieTableMetaClient(metaClient.getHadoopConf(), basePath, true);
metaClient = HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build();
final HoodieTableFileSystemView newFsView =
new HoodieTableFileSystemView(metaClient, metaClient.getCommitsAndCompactionTimeline());
// Expect all file-slice whose base-commit is same as compaction commit to contain no new Log files
Expand Down Expand Up @@ -306,7 +306,7 @@ private void validateUnScheduleFileId(CompactionAdminClient client, String inges
// Check suggested rename operations
List<Pair<HoodieLogFile, HoodieLogFile>> renameFiles = client
.getRenamingActionsForUnschedulingCompactionOperation(metaClient, compactionInstant, op, Option.empty(), false);
metaClient = new HoodieTableMetaClient(metaClient.getHadoopConf(), basePath, true);
metaClient = HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build();

// Log files belonging to file-slices created because of compaction request must be renamed

Expand All @@ -331,7 +331,7 @@ private void validateUnScheduleFileId(CompactionAdminClient client, String inges
// Call the main unschedule API
client.unscheduleCompactionFileId(op.getFileGroupId(), false, false);

metaClient = new HoodieTableMetaClient(metaClient.getHadoopConf(), basePath, true);
metaClient = HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build();
final HoodieTableFileSystemView newFsView =
new HoodieTableFileSystemView(metaClient, metaClient.getCommitsAndCompactionTimeline());
// Expect all file-slice whose base-commit is same as compaction commit to contain no new Log files
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ private void testUpsertsInternal(HoodieWriteConfig config,

final HoodieWriteConfig cfg = hoodieWriteConfig;
final String instantTime = "007";
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build();
String basePathStr = basePath;
HoodieTable table = getHoodieTable(metaClient, cfg);
jsc.parallelize(Arrays.asList(1)).map(e -> {
Expand Down Expand Up @@ -894,7 +894,7 @@ public void testSmallInsertHandlingForUpserts() throws Exception {
assertNoWriteErrors(statuses);

assertEquals(2, statuses.size(), "2 files needs to be committed.");
HoodieTableMetaClient metadata = new HoodieTableMetaClient(hadoopConf, basePath);
HoodieTableMetaClient metadata = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build();

HoodieTable table = getHoodieTable(metadata, config);
BaseFileOnlyView fileSystemView = table.getBaseFileOnlyView();
Expand Down Expand Up @@ -1001,7 +1001,7 @@ public void testSmallInsertHandlingForInserts(boolean mergeAllowDuplicateInserts
+ readRowKeysFromParquet(hadoopConf, new Path(basePath, statuses.get(1).getStat().getPath())).size(),
"file should contain 340 records");

HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, basePath);
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build();
HoodieTable table = getHoodieTable(metaClient, config);
List<HoodieBaseFile> files = table.getBaseFileOnlyView()
.getLatestBaseFilesBeforeOrOn(testPartitionPath, commitTime3).collect(Collectors.toList());
Expand Down Expand Up @@ -1428,7 +1428,7 @@ public void testCommitWritesRelativePaths() throws Exception {

HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).build();
try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) {
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, basePath);
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build();
HoodieSparkTable table = HoodieSparkTable.create(cfg, context, metaClient);

String instantTime = "000";
Expand Down Expand Up @@ -1533,7 +1533,7 @@ public void testMetadataStatsOnCommit() throws Exception {
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testConsistencyCheckDuringFinalize(boolean enableOptimisticConsistencyGuard) throws Exception {
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, basePath);
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build();
String instantTime = "000";
HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder()
.withEnableOptimisticConsistencyGuard(enableOptimisticConsistencyGuard).build()).build();
Expand All @@ -1559,7 +1559,7 @@ public void testConsistencyCheckDuringFinalize(boolean enableOptimisticConsisten

private void testRollbackAfterConsistencyCheckFailureUsingFileList(boolean rollbackUsingMarkers, boolean enableOptimisticConsistencyGuard) throws Exception {
String instantTime = "000";
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, basePath);
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build();
HoodieWriteConfig cfg = !enableOptimisticConsistencyGuard ? getConfigBuilder().withRollbackUsingMarkers(rollbackUsingMarkers).withAutoCommit(false)
.withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true)
.withMaxConsistencyCheckIntervalMs(1).withInitialConsistencyCheckIntervalMs(1).withEnableOptimisticConsistencyGuard(enableOptimisticConsistencyGuard).build()).build() :
Expand Down
Loading