Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,33 @@

import org.apache.hudi.client.embedded.EmbeddedTimelineServerHelper;
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.client.heartbeat.HoodieHeartbeatClient;
import org.apache.hudi.client.transaction.TransactionManager;
import org.apache.hudi.client.utils.TransactionUtils;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCommitException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieWriteConflictException;
import org.apache.hudi.metrics.HoodieMetrics;
import org.apache.hudi.table.HoodieTable;

import com.codahale.metrics.Timer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import java.io.IOException;
import java.io.Serializable;
import java.util.List;
import java.util.Set;

/**
* Abstract class taking care of holding common member variables (FileSystem, SparkContext, HoodieConfigs) Also, manages
Expand All @@ -47,9 +59,11 @@ public abstract class BaseHoodieClient implements Serializable, AutoCloseable {
protected final transient FileSystem fs;
protected final transient HoodieEngineContext context;
protected final transient Configuration hadoopConf;
protected final transient HoodieMetrics metrics;
protected final HoodieWriteConfig config;
protected final String basePath;
protected final HoodieHeartbeatClient heartbeatClient;
protected final TransactionManager txnManager;

/**
* Timeline Server has the same lifetime as that of Client. Any operations done on the same timeline service will be
Expand All @@ -74,6 +88,8 @@ protected BaseHoodieClient(HoodieEngineContext context, HoodieWriteConfig client
shouldStopTimelineServer = !timelineServer.isPresent();
this.heartbeatClient = new HoodieHeartbeatClient(this.fs, this.basePath,
clientConfig.getHoodieClientHeartbeatIntervalInMs(), clientConfig.getHoodieClientHeartbeatTolerableMisses());
this.metrics = new HoodieMetrics(config);
this.txnManager = new TransactionManager(config, fs);
startEmbeddedServerView();
initWrapperFSMetrics();
}
Expand All @@ -85,6 +101,8 @@ protected BaseHoodieClient(HoodieEngineContext context, HoodieWriteConfig client
public void close() {
stopEmbeddedServerView(true);
this.context.setJobStatus("", "");
this.heartbeatClient.stop();
this.txnManager.close();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remark: this is common to all clients, so moved here.

}

private synchronized void stopEmbeddedServerView(boolean resetViewStorageConfig) {
Expand Down Expand Up @@ -146,4 +164,52 @@ public Option<EmbeddedTimelineService> getTimelineServer() {
public HoodieHeartbeatClient getHeartbeatClient() {
return heartbeatClient;
}

/**
* Resolve write conflicts before commit.
*
* @param table A hoodie table instance created after transaction starts so that the latest commits and files are captured.
* @param metadata Current committing instant's metadata
* @param pendingInflightAndRequestedInstants
* @see {@link BaseHoodieWriteClient#preCommit}
* @see {@link BaseHoodieTableServiceClient#preCommit}
*/
protected void resolveWriteConflict(HoodieTable table, HoodieCommitMetadata metadata, Set<String> pendingInflightAndRequestedInstants) {
Timer.Context conflictResolutionTimer = metrics.getConflictResolutionCtx();
try {
TransactionUtils.resolveWriteConflictIfAny(table, this.txnManager.getCurrentTransactionOwner(),
Option.of(metadata), config, txnManager.getLastCompletedTransactionOwner(), false, pendingInflightAndRequestedInstants);
metrics.emitConflictResolutionSuccessful();
} catch (HoodieWriteConflictException e) {
metrics.emitConflictResolutionFailed();
throw e;
} finally {
if (conflictResolutionTimer != null) {
conflictResolutionTimer.stop();
}
}
}

/**
* Finalize Write operation.
*
* @param table HoodieTable
* @param instantTime Instant Time
* @param stats Hoodie Write Stat
*/
protected void finalizeWrite(HoodieTable table, String instantTime, List<HoodieWriteStat> stats) {
try {
final Timer.Context finalizeCtx = metrics.getFinalizeCtx();
table.finalizeWrite(context, instantTime, stats);
if (finalizeCtx != null) {
Option<Long> durationInMs = Option.of(metrics.getDurationInMs(finalizeCtx.stop()));
durationInMs.ifPresent(duration -> {
LOG.info("Finalize write elapsed time (milliseconds): " + duration);
metrics.updateFinalizeWriteMetrics(duration, stats.size());
});
}
} catch (HoodieIOException ioe) {
throw new HoodieCommitException("Failed to complete commit " + instantTime + " due to finalize errors.", ioe);
}
}
}
Loading