Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -56,17 +56,17 @@ public class HoodieClusteringJob {
private HoodieTableMetaClient metaClient;

public HoodieClusteringJob(JavaSparkContext jsc, Config cfg) {
this(jsc, cfg, UtilHelpers.buildProperties(jsc.hadoopConfiguration(), cfg.propsFilePath, cfg.configs));
this(jsc, cfg, UtilHelpers.buildProperties(jsc.hadoopConfiguration(), cfg.propsFilePath, cfg.configs),
UtilHelpers.createMetaClient(jsc, cfg.basePath, true));
}

public HoodieClusteringJob(JavaSparkContext jsc, Config cfg, TypedProperties props) {
public HoodieClusteringJob(JavaSparkContext jsc, Config cfg, TypedProperties props, HoodieTableMetaClient metaClient) {
this.cfg = cfg;
this.jsc = jsc;
this.props = props;
this.metaClient = UtilHelpers.createMetaClient(jsc, cfg.basePath, true);
this.metaClient = metaClient;
// Disable async cleaning, will trigger synchronous cleaning manually.
this.props.put(HoodieCleanConfig.ASYNC_CLEAN.key(), false);
this.metaClient = UtilHelpers.createMetaClient(jsc, cfg.basePath, true);
if (this.metaClient.getTableConfig().isMetadataTableAvailable()) {
// add default lock config options if MDT is enabled.
UtilHelpers.addLockOptions(cfg.basePath, this.props);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,17 +56,18 @@ public class HoodieCompactor {
private transient FileSystem fs;
private TypedProperties props;
private final JavaSparkContext jsc;
private final HoodieTableMetaClient metaClient;
private HoodieTableMetaClient metaClient;

public HoodieCompactor(JavaSparkContext jsc, Config cfg) {
this(jsc, cfg, UtilHelpers.buildProperties(jsc.hadoopConfiguration(), cfg.propsFilePath, cfg.configs));
this(jsc, cfg, UtilHelpers.buildProperties(jsc.hadoopConfiguration(), cfg.propsFilePath, cfg.configs),
UtilHelpers.createMetaClient(jsc, cfg.basePath, true));
}

public HoodieCompactor(JavaSparkContext jsc, Config cfg, TypedProperties props) {
public HoodieCompactor(JavaSparkContext jsc, Config cfg, TypedProperties props, HoodieTableMetaClient metaClient) {
this.cfg = cfg;
this.jsc = jsc;
this.props = props;
this.metaClient = UtilHelpers.createMetaClient(jsc, cfg.basePath, true);
this.metaClient = metaClient;
// Disable async cleaning, will trigger synchronous cleaning manually.
this.props.put(HoodieCleanConfig.ASYNC_CLEAN.key(), false);
if (this.metaClient.getTableConfig().isMetadataTableAvailable()) {
Expand Down Expand Up @@ -256,7 +257,7 @@ private int doCompact(JavaSparkContext jsc) throws Exception {
// If no compaction instant is provided by --instant-time, find the earliest scheduled compaction
// instant from the active timeline
if (StringUtils.isNullOrEmpty(cfg.compactionInstantTime)) {
HoodieTableMetaClient metaClient = UtilHelpers.createMetaClient(jsc, cfg.basePath, true);
metaClient = HoodieTableMetaClient.reload(metaClient);
Option<HoodieInstant> firstCompactionInstant = metaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant();
if (firstCompactionInstant.isPresent()) {
cfg.compactionInstantTime = firstCompactionInstant.get().getTimestamp();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.hudi.utilities.multitable;

import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.utilities.HoodieClusteringJob;

import org.apache.spark.api.java.JavaSparkContext;
Expand All @@ -43,13 +44,18 @@ class ClusteringTask extends TableServiceTask {
*/
private String clusteringMode;

/**
* Meta Client.
*/
private HoodieTableMetaClient metaClient;

@Override
void run() {
HoodieClusteringJob.Config clusteringConfig = new HoodieClusteringJob.Config();
clusteringConfig.basePath = basePath;
clusteringConfig.parallelism = parallelism;
clusteringConfig.runningMode = clusteringMode;
new HoodieClusteringJob(jsc, clusteringConfig, props).cluster(retry);
new HoodieClusteringJob(jsc, clusteringConfig, props, metaClient).cluster(retry);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Be caution that the timeline should be refreshed each time for compaction when metaClient is reused.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This‘s right. In the existing code, the metaclient is reloaded during clustering, and a new metaclient is created during compaction. A better implementation would be to have consistent behavior for compaction as well. The metaclient should be reloaded at the point of creation.

private int doScheduleAndCluster(JavaSparkContext jsc) throws Exception {
LOG.info("Step 1: Do schedule");
metaClient = HoodieTableMetaClient.reload(metaClient);
...
}
� private int doCompact(JavaSparkContext jsc) throws Exception {
 ...
         if (StringUtils.isNullOrEmpty(cfg.compactionInstantTime)) {
        HoodieTableMetaClient metaClient = UtilHelpers.createMetaClient(jsc, cfg.basePath, true);...
}

I will make modifications here.

}

/**
Expand Down Expand Up @@ -98,6 +104,11 @@ public static final class Builder {
*/
private int retry;

/**
* Meta Client.
*/
private HoodieTableMetaClient metaClient;

private Builder() {
}

Expand Down Expand Up @@ -131,6 +142,11 @@ public Builder withRetry(int retry) {
return this;
}

public Builder withMetaclient(HoodieTableMetaClient metaClient) {
this.metaClient = metaClient;
return this;
}

public ClusteringTask build() {
ClusteringTask clusteringTask = new ClusteringTask();
clusteringTask.jsc = this.jsc;
Expand All @@ -139,6 +155,7 @@ public ClusteringTask build() {
clusteringTask.retry = this.retry;
clusteringTask.basePath = this.basePath;
clusteringTask.props = this.props;
clusteringTask.metaClient = this.metaClient;
return clusteringTask;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.hudi.utilities.multitable;

import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.utilities.HoodieCompactor;

import org.apache.spark.api.java.JavaSparkContext;
Expand Down Expand Up @@ -48,6 +49,11 @@ class CompactionTask extends TableServiceTask {
*/
private int parallelism;

/**
* Meta Client.
*/
private HoodieTableMetaClient metaClient;

@Override
void run() {
HoodieCompactor.Config compactionCfg = new HoodieCompactor.Config();
Expand All @@ -56,7 +62,7 @@ void run() {
compactionCfg.runningMode = compactionRunningMode;
compactionCfg.parallelism = parallelism;
compactionCfg.retry = retry;
new HoodieCompactor(jsc, compactionCfg, props).compact(retry);
new HoodieCompactor(jsc, compactionCfg, props, metaClient).compact(retry);
}

/**
Expand Down Expand Up @@ -109,6 +115,11 @@ public static final class Builder {
*/
private JavaSparkContext jsc;

/**
* Meta Client.
*/
private HoodieTableMetaClient metaClient;

public Builder withProps(TypedProperties props) {
this.props = props;
return this;
Expand Down Expand Up @@ -144,6 +155,11 @@ public Builder withJsc(JavaSparkContext jsc) {
return this;
}

public Builder withMetaclient(HoodieTableMetaClient metaClient) {
this.metaClient = metaClient;
return this;
}

public CompactionTask build() {
CompactionTask compactionTask = new CompactionTask();
compactionTask.basePath = this.basePath;
Expand All @@ -153,6 +169,7 @@ public CompactionTask build() {
compactionTask.compactionStrategyName = this.compactionStrategyName;
compactionTask.retry = this.retry;
compactionTask.props = this.props;
compactionTask.metaClient = this.metaClient;
return compactionTask;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.TableNotFoundException;
import org.apache.hudi.utilities.UtilHelpers;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
Expand Down Expand Up @@ -166,15 +168,20 @@ public static TableServicePipeline buildTableServicePipeline(JavaSparkContext js
HoodieMultiTableServicesMain.Config cfg,
TypedProperties props) {
TableServicePipeline pipeline = new TableServicePipeline();
HoodieTableMetaClient metaClient = UtilHelpers.createMetaClient(jsc, basePath, true);
TypedProperties propsWithTableConfig = new TypedProperties(metaClient.getTableConfig().getProps());
propsWithTableConfig.putAll(props);

if (cfg.enableCompaction) {
pipeline.add(CompactionTask.newBuilder()
.withJsc(jsc)
.withBasePath(basePath)
.withParallelism(cfg.parallelism)
.withCompactionRunningMode(cfg.compactionRunningMode)
.withCompactionStrategyName(cfg.compactionStrategyClassName)
.withProps(props)
.withProps(propsWithTableConfig)
.withRetry(cfg.retry)
.withMetaclient(metaClient)
.build());
}
if (cfg.enableClustering) {
Expand All @@ -183,23 +190,24 @@ public static TableServicePipeline buildTableServicePipeline(JavaSparkContext js
.withJsc(jsc)
.withParallelism(cfg.parallelism)
.withClusteringRunningMode(cfg.clusteringRunningMode)
.withProps(props)
.withProps(propsWithTableConfig)
.withRetry(cfg.retry)
.withMetaclient(metaClient)
.build());
}
if (cfg.enableClean) {
pipeline.add(CleanTask.newBuilder()
.withBasePath(basePath)
.withJsc(jsc)
.withRetry(cfg.retry)
.withProps(props)
.withProps(propsWithTableConfig)
.build());
}
if (cfg.enableArchive) {
pipeline.add(ArchiveTask.newBuilder()
.withBasePath(basePath)
.withJsc(jsc)
.withProps(props)
.withProps(propsWithTableConfig)
.withRetry(cfg.retry)
.build());
}
Expand Down