Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -76,26 +76,24 @@ public abstract class BaseHoodieClient implements Serializable, AutoCloseable {
* incremental fashion.
*/
private transient Option<EmbeddedTimelineService> timelineServer;
private final boolean shouldStopTimelineServer;

protected BaseHoodieClient(HoodieEngineContext context, HoodieWriteConfig clientConfig) {
this(context, clientConfig, Option.empty());
}
private boolean shouldCloseTxnManager; // flag used for resource releasing
private boolean shouldStopTimelineServer; // flag used for resource releasing

protected BaseHoodieClient(HoodieEngineContext context, HoodieWriteConfig clientConfig,
Option<EmbeddedTimelineService> timelineServer) {
protected BaseHoodieClient(HoodieEngineContext context,
HoodieWriteConfig clientConfig,
Option<EmbeddedTimelineService> timelineServer,
Option<TransactionManager> txnManager) {
this.hadoopConf = context.getHadoopConf().get();
this.fs = FSUtils.getFs(clientConfig.getBasePath(), hadoopConf);
this.context = context;
this.basePath = clientConfig.getBasePath();
this.config = clientConfig;
this.timelineServer = timelineServer;
shouldStopTimelineServer = !timelineServer.isPresent();
this.heartbeatClient = new HoodieHeartbeatClient(this.fs, this.basePath,
clientConfig.getHoodieClientHeartbeatIntervalInMs(), clientConfig.getHoodieClientHeartbeatTolerableMisses());
this.metrics = new HoodieMetrics(config);
this.txnManager = new TransactionManager(config, fs);
startEmbeddedServerView();
this.txnManager = initTxnManager(txnManager);
this.timelineServer = startEmbeddedServerView(timelineServer);
initWrapperFSMetrics();
runClientInitCallbacks();
}
Expand All @@ -108,7 +106,9 @@ public void close() {
stopEmbeddedServerView(true);
this.context.setJobStatus("", "");
this.heartbeatClient.close();
this.txnManager.close();
if (this.shouldCloseTxnManager) {
this.txnManager.close();
}
}

private synchronized void stopEmbeddedServerView(boolean resetViewStorageConfig) {
Expand All @@ -125,21 +125,31 @@ private synchronized void stopEmbeddedServerView(boolean resetViewStorageConfig)
}
}

private synchronized void startEmbeddedServerView() {
if (config.isEmbeddedTimelineServerEnabled()) {
if (!timelineServer.isPresent()) {
private synchronized Option<EmbeddedTimelineService> startEmbeddedServerView(Option<EmbeddedTimelineService> reused) {
if (reused.isPresent()) {
LOG.info("Timeline Server already running. Not restarting the service");
return reused;
} else {
if (config.isEmbeddedTimelineServerEnabled()) {
// Run Embedded Timeline Server
try {
timelineServer = EmbeddedTimelineServerHelper.createEmbeddedTimelineService(context, config);
this.shouldStopTimelineServer = true;
return EmbeddedTimelineServerHelper.createEmbeddedTimelineService(context, config);
} catch (IOException e) {
LOG.warn("Unable to start timeline service. Proceeding as if embedded server is disabled", e);
stopEmbeddedServerView(false);
}
} else {
LOG.info("Timeline Server already running. Not restarting the service");
}
}
return Option.empty();
}

private TransactionManager initTxnManager(Option<TransactionManager> reused) {
if (reused.isPresent()) {
return reused.get();
} else {
LOG.info("Embedded Timeline Server is disabled. Not starting timeline service");
this.shouldCloseTxnManager = true;
return new TransactionManager(config, fs);
}
}

Expand Down Expand Up @@ -186,6 +196,10 @@ public HoodieHeartbeatClient getHeartbeatClient() {
return heartbeatClient;
}

public Option<TransactionManager> getTxnManager() {
return Option.of(txnManager);
}

/**
* Resolve write conflicts before commit.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.hudi.avro.model.HoodieRollbackPlan;
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
import org.apache.hudi.client.heartbeat.HeartbeatUtils;
import org.apache.hudi.client.transaction.TransactionManager;
import org.apache.hudi.common.HoodiePendingRollbackInfo;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
Expand Down Expand Up @@ -93,8 +94,9 @@ public abstract class BaseHoodieTableServiceClient<O> extends BaseHoodieClient i

protected BaseHoodieTableServiceClient(HoodieEngineContext context,
HoodieWriteConfig clientConfig,
Option<EmbeddedTimelineService> timelineService) {
super(context, clientConfig, timelineService);
Option<EmbeddedTimelineService> timelineService,
Option<TransactionManager> txnManager) {
super(context, clientConfig, timelineService, txnManager);
}

protected void startAsyncCleanerService(BaseHoodieWriteClient writeClient) {
Expand Down Expand Up @@ -172,7 +174,7 @@ private void inlineCompaction(HoodieTable table, Option<Map<String, String>> ext
}

/**
* Ensures compaction instant is in expected state and performs Log Compaction for the workload stored in instant-time.s
* Ensures compaction instant is in expected state and performs Log Compaction for the workload stored in instant-time.
*
* @param compactionInstantTime Compaction Instant Time
* @return Collection of Write Status
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,32 +136,18 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> extends BaseHoodieClient

protected BaseHoodieTableServiceClient<O> tableServiceClient;

/**
* Create a write client, with new hudi index.
* @param context HoodieEngineContext
* @param writeConfig instance of HoodieWriteConfig
* @param upgradeDowngradeHelper engine-specific instance of {@link SupportsUpgradeDowngrade}
*/
@Deprecated
public BaseHoodieWriteClient(HoodieEngineContext context,
HoodieWriteConfig writeConfig,
SupportsUpgradeDowngrade upgradeDowngradeHelper) {
this(context, writeConfig, Option.empty(), upgradeDowngradeHelper);
}

/**
* Create a write client, allows to specify all parameters.
*
* @param context HoodieEngineContext
* @param writeConfig instance of HoodieWriteConfig
* @param timelineService Timeline Service that runs as part of write client.
*/
@Deprecated
public BaseHoodieWriteClient(HoodieEngineContext context,
HoodieWriteConfig writeConfig,
Option<EmbeddedTimelineService> timelineService,
SupportsUpgradeDowngrade upgradeDowngradeHelper) {
super(context, writeConfig, timelineService);
super(context, writeConfig, timelineService, Option.empty());
this.index = createIndex(writeConfig);
this.upgradeDowngradeHelper = upgradeDowngradeHelper;
}
Expand Down Expand Up @@ -1311,12 +1297,13 @@ protected void releaseResources(String instantTime) {

@Override
public void close() {
// Close table service client first
this.tableServiceClient.close();
// Stop timeline-server if running
super.close();
// Calling this here releases any resources used by your index, so make sure to finish any related operations
// before this point
this.index.close();
this.tableServiceClient.close();
}

private void setWriteTimer(HoodieTable table) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public class CompactionAdminClient extends BaseHoodieClient {
private static final Logger LOG = LoggerFactory.getLogger(CompactionAdminClient.class);

public CompactionAdminClient(HoodieEngineContext context, String basePath) {
super(context, HoodieWriteConfig.newBuilder().withPath(basePath).build());
super(context, HoodieWriteConfig.newBuilder().withPath(basePath).build(), Option.empty(), Option.empty());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,8 @@ protected synchronized boolean reset(Option<HoodieInstant> callerInstant,
}

public void close() {
if (isLockRequired) {
lockManager.close();
LOG.info("Transaction manager closed");
}
lockManager.close();
LOG.info("Transaction manager closed");
}

public LockManager getLockManager() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
import org.apache.hudi.client.transaction.TransactionManager;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieCommitMetadata;
Expand Down Expand Up @@ -68,8 +69,9 @@ public class HoodieFlinkTableServiceClient<T> extends BaseHoodieTableServiceClie

protected HoodieFlinkTableServiceClient(HoodieEngineContext context,
HoodieWriteConfig clientConfig,
Option<EmbeddedTimelineService> timelineService) {
super(context, clientConfig, timelineService);
Option<EmbeddedTimelineService> timelineService,
Option<TransactionManager> txnManager) {
super(context, clientConfig, timelineService, txnManager);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,9 @@ public class HoodieFlinkWriteClient<T> extends
private final Map<String, Path> bucketToHandles;

public HoodieFlinkWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig) {
super(context, writeConfig, FlinkUpgradeDowngradeHelper.getInstance());
super(context, writeConfig, Option.empty(), FlinkUpgradeDowngradeHelper.getInstance());
this.bucketToHandles = new HashMap<>();
this.tableServiceClient = new HoodieFlinkTableServiceClient<>(context, writeConfig, getTimelineServer());
this.tableServiceClient = new HoodieFlinkTableServiceClient<>(context, writeConfig, getTimelineServer(), getTxnManager());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@
import org.apache.hudi.testutils.HoodieFlinkClientTestHarness;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import java.io.IOException;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;

public class TestFlinkWriteClient extends HoodieFlinkClientTestHarness {

Expand Down Expand Up @@ -61,4 +63,22 @@ public void testWriteClientAndTableServiceClientWithTimelineServer(

writeClient.close();
}

@Test
public void testWriteClientAndTableServiceClientWithTxnManager() {
HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
.withPath(metaClient.getBasePathV2().toString())
.build();

HoodieFlinkWriteClient writeClient = new HoodieFlinkWriteClient(context, writeConfig);

assertNotNull(writeClient.getTxnManager());

assertEquals(
writeClient.getTxnManager(),
writeClient.getTableServiceClient().getTxnManager()
);

writeClient.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hudi.client;

import org.apache.hudi.client.embedded.EmbeddedTimelineService;
import org.apache.hudi.client.transaction.TransactionManager;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.util.Option;
Expand All @@ -37,8 +38,9 @@ public class HoodieJavaTableServiceClient extends BaseHoodieTableServiceClient<L

protected HoodieJavaTableServiceClient(HoodieEngineContext context,
HoodieWriteConfig clientConfig,
Option<EmbeddedTimelineService> timelineService) {
super(context, clientConfig, timelineService);
Option<EmbeddedTimelineService> timelineService,
Option<TransactionManager> txnManager) {
super(context, clientConfig, timelineService, txnManager);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.hudi.client;

import org.apache.hudi.client.common.HoodieJavaEngineContext;
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
import org.apache.hudi.common.data.HoodieListData;
import org.apache.hudi.common.engine.HoodieEngineContext;
Expand Down Expand Up @@ -52,22 +51,20 @@ public class HoodieJavaWriteClient<T> extends
BaseHoodieWriteClient<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> {

public HoodieJavaWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig) {
super(context, writeConfig, JavaUpgradeDowngradeHelper.getInstance());
this.tableServiceClient = new HoodieJavaTableServiceClient(context, writeConfig, getTimelineServer());
this(context, writeConfig, Option.empty());
}

public HoodieJavaWriteClient(HoodieEngineContext context,
HoodieWriteConfig writeConfig,
boolean rollbackPending,
Option<EmbeddedTimelineService> timelineService) {
super(context, writeConfig, timelineService, JavaUpgradeDowngradeHelper.getInstance());
this.tableServiceClient = new HoodieJavaTableServiceClient(context, writeConfig, getTimelineServer());
this.tableServiceClient = new HoodieJavaTableServiceClient(context, writeConfig, getTimelineServer(), getTxnManager());
}

@Override
public List<HoodieRecord<T>> filterExists(List<HoodieRecord<T>> hoodieRecords) {
// Create a Hoodie table which encapsulated the commits and files visible
HoodieJavaTable<T> table = HoodieJavaTable.create(config, (HoodieJavaEngineContext) context);
HoodieJavaTable<T> table = HoodieJavaTable.create(config, context);
Timer.Context indexTimer = metrics.getIndexCtx();
List<HoodieRecord<T>> recordsWithLocation = getIndex().tagLocation(HoodieListData.eager(hoodieRecords), context, table).collectAsList();
metrics.updateIndexMetrics(LOOKUP_STR, metrics.getDurationInMs(indexTimer == null ? 0L : indexTimer.stop()));
Expand Down Expand Up @@ -239,7 +236,7 @@ protected void completeCompaction(HoodieCommitMetadata metadata,

@Override
protected HoodieWriteMetadata<List<WriteStatus>> compact(String compactionInstantTime,
boolean shouldComplete) {
boolean shouldComplete) {
throw new HoodieNotSupportedException("Compact is not supported in HoodieJavaClient");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import static org.apache.hudi.index.HoodieIndex.IndexType.INMEMORY;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class TestHoodieJavaWriteClientInsert extends HoodieJavaClientTestHarness {
Expand Down Expand Up @@ -120,7 +121,7 @@ public void testWriteClientAndTableServiceClientWithTimelineServer(
new EmbeddedTimelineService(context, null, writeConfig);
timelineService.startServer();
writeConfig.setViewStorageConfig(timelineService.getRemoteFileSystemViewConfig());
writeClient = new HoodieJavaWriteClient(context, writeConfig, true, Option.of(timelineService));
writeClient = new HoodieJavaWriteClient(context, writeConfig, Option.of(timelineService));
// Both the write client and the table service client should use the same passed-in
// timeline server instance.
assertEquals(timelineService, writeClient.getTimelineServer().get());
Expand All @@ -142,6 +143,25 @@ public void testWriteClientAndTableServiceClientWithTimelineServer(
writeClient.close();
}

@Test
public void testWriteClientAndTableServiceClientWithTxnManager() {
HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
.withPath(metaClient.getBasePathV2().toString())
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(INMEMORY).build())
.build();

HoodieJavaWriteClient writeClient = new HoodieJavaWriteClient(context, writeConfig);

assertNotNull(writeClient.getTxnManager());

assertEquals(
writeClient.getTxnManager(),
writeClient.getTableServiceClient().getTxnManager()
);

writeClient.close();
}

@Test
public void testInsert() throws Exception {
HoodieWriteConfig config = makeHoodieClientConfigBuilder(basePath).withMergeAllowDuplicateOnInserts(true).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.hudi.avro.model.HoodieClusteringGroup;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
import org.apache.hudi.client.transaction.TransactionManager;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieCommitMetadata;
Expand Down Expand Up @@ -67,8 +68,9 @@ public class SparkRDDTableServiceClient<T> extends BaseHoodieTableServiceClient<

protected SparkRDDTableServiceClient(HoodieEngineContext context,
HoodieWriteConfig clientConfig,
Option<EmbeddedTimelineService> timelineService) {
super(context, clientConfig, timelineService);
Option<EmbeddedTimelineService> timelineService,
Option<TransactionManager> txnManager) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if it's a good idea to expose this in the constructor, as the client can escape the transaction manager and cause correctness issues in concurrency control.

super(context, clientConfig, timelineService, txnManager);
}

@Override
Expand Down
Loading