diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java index ed5b71d96b1e8..2e719d256a96e 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java @@ -76,26 +76,24 @@ public abstract class BaseHoodieClient implements Serializable, AutoCloseable { * incremental fashion. */ private transient Option timelineServer; - private final boolean shouldStopTimelineServer; - protected BaseHoodieClient(HoodieEngineContext context, HoodieWriteConfig clientConfig) { - this(context, clientConfig, Option.empty()); - } + private boolean shouldCloseTxnManager; // flag used for resource releasing + private boolean shouldStopTimelineServer; // flag used for resource releasing - protected BaseHoodieClient(HoodieEngineContext context, HoodieWriteConfig clientConfig, - Option timelineServer) { + protected BaseHoodieClient(HoodieEngineContext context, + HoodieWriteConfig clientConfig, + Option timelineServer, + Option txnManager) { this.hadoopConf = context.getHadoopConf().get(); this.fs = FSUtils.getFs(clientConfig.getBasePath(), hadoopConf); this.context = context; this.basePath = clientConfig.getBasePath(); this.config = clientConfig; - this.timelineServer = timelineServer; - shouldStopTimelineServer = !timelineServer.isPresent(); this.heartbeatClient = new HoodieHeartbeatClient(this.fs, this.basePath, clientConfig.getHoodieClientHeartbeatIntervalInMs(), clientConfig.getHoodieClientHeartbeatTolerableMisses()); this.metrics = new HoodieMetrics(config); - this.txnManager = new TransactionManager(config, fs); - startEmbeddedServerView(); + this.txnManager = initTxnManager(txnManager); + this.timelineServer = startEmbeddedServerView(timelineServer); initWrapperFSMetrics(); runClientInitCallbacks(); } @@ -108,7 +106,9 @@ public void close() { stopEmbeddedServerView(true); this.context.setJobStatus("", ""); this.heartbeatClient.close(); - this.txnManager.close(); + if (this.shouldCloseTxnManager) { + this.txnManager.close(); + } } private synchronized void stopEmbeddedServerView(boolean resetViewStorageConfig) { @@ -125,21 +125,31 @@ private synchronized void stopEmbeddedServerView(boolean resetViewStorageConfig) } } - private synchronized void startEmbeddedServerView() { - if (config.isEmbeddedTimelineServerEnabled()) { - if (!timelineServer.isPresent()) { + private synchronized Option startEmbeddedServerView(Option reused) { + if (reused.isPresent()) { + LOG.info("Timeline Server already running. Not restarting the service"); + return reused; + } else { + if (config.isEmbeddedTimelineServerEnabled()) { // Run Embedded Timeline Server try { - timelineServer = EmbeddedTimelineServerHelper.createEmbeddedTimelineService(context, config); + this.shouldStopTimelineServer = true; + return EmbeddedTimelineServerHelper.createEmbeddedTimelineService(context, config); } catch (IOException e) { LOG.warn("Unable to start timeline service. Proceeding as if embedded server is disabled", e); stopEmbeddedServerView(false); } - } else { - LOG.info("Timeline Server already running. Not restarting the service"); } + } + return Option.empty(); + } + + private TransactionManager initTxnManager(Option reused) { + if (reused.isPresent()) { + return reused.get(); } else { - LOG.info("Embedded Timeline Server is disabled. Not starting timeline service"); + this.shouldCloseTxnManager = true; + return new TransactionManager(config, fs); } } @@ -186,6 +196,10 @@ public HoodieHeartbeatClient getHeartbeatClient() { return heartbeatClient; } + public Option getTxnManager() { + return Option.of(txnManager); + } + /** * Resolve write conflicts before commit. * diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java index 05944e7171183..1d2ccb459387b 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java @@ -28,6 +28,7 @@ import org.apache.hudi.avro.model.HoodieRollbackPlan; import org.apache.hudi.client.embedded.EmbeddedTimelineService; import org.apache.hudi.client.heartbeat.HeartbeatUtils; +import org.apache.hudi.client.transaction.TransactionManager; import org.apache.hudi.common.HoodiePendingRollbackInfo; import org.apache.hudi.common.data.HoodieData; import org.apache.hudi.common.engine.HoodieEngineContext; @@ -93,8 +94,9 @@ public abstract class BaseHoodieTableServiceClient extends BaseHoodieClient i protected BaseHoodieTableServiceClient(HoodieEngineContext context, HoodieWriteConfig clientConfig, - Option timelineService) { - super(context, clientConfig, timelineService); + Option timelineService, + Option txnManager) { + super(context, clientConfig, timelineService, txnManager); } protected void startAsyncCleanerService(BaseHoodieWriteClient writeClient) { @@ -172,7 +174,7 @@ private void inlineCompaction(HoodieTable table, Option> ext } /** - * Ensures compaction instant is in expected state and performs Log Compaction for the workload stored in instant-time.s + * Ensures compaction instant is in expected state and performs Log Compaction for the workload stored in instant-time. * * @param compactionInstantTime Compaction Instant Time * @return Collection of Write Status diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java index 050ad0070dab2..21fcd248442a0 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java @@ -136,19 +136,6 @@ public abstract class BaseHoodieWriteClient extends BaseHoodieClient protected BaseHoodieTableServiceClient tableServiceClient; - /** - * Create a write client, with new hudi index. - * @param context HoodieEngineContext - * @param writeConfig instance of HoodieWriteConfig - * @param upgradeDowngradeHelper engine-specific instance of {@link SupportsUpgradeDowngrade} - */ - @Deprecated - public BaseHoodieWriteClient(HoodieEngineContext context, - HoodieWriteConfig writeConfig, - SupportsUpgradeDowngrade upgradeDowngradeHelper) { - this(context, writeConfig, Option.empty(), upgradeDowngradeHelper); - } - /** * Create a write client, allows to specify all parameters. * @@ -156,12 +143,11 @@ public BaseHoodieWriteClient(HoodieEngineContext context, * @param writeConfig instance of HoodieWriteConfig * @param timelineService Timeline Service that runs as part of write client. */ - @Deprecated public BaseHoodieWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig, Option timelineService, SupportsUpgradeDowngrade upgradeDowngradeHelper) { - super(context, writeConfig, timelineService); + super(context, writeConfig, timelineService, Option.empty()); this.index = createIndex(writeConfig); this.upgradeDowngradeHelper = upgradeDowngradeHelper; } @@ -1311,12 +1297,13 @@ protected void releaseResources(String instantTime) { @Override public void close() { + // Close table service client first + this.tableServiceClient.close(); // Stop timeline-server if running super.close(); // Calling this here releases any resources used by your index, so make sure to finish any related operations // before this point this.index.close(); - this.tableServiceClient.close(); } private void setWriteTimer(HoodieTable table) { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/CompactionAdminClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/CompactionAdminClient.java index 8ff562c2070ec..0bca1103554f9 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/CompactionAdminClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/CompactionAdminClient.java @@ -67,7 +67,7 @@ public class CompactionAdminClient extends BaseHoodieClient { private static final Logger LOG = LoggerFactory.getLogger(CompactionAdminClient.class); public CompactionAdminClient(HoodieEngineContext context, String basePath) { - super(context, HoodieWriteConfig.newBuilder().withPath(basePath).build()); + super(context, HoodieWriteConfig.newBuilder().withPath(basePath).build(), Option.empty(), Option.empty()); } /** diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java index b3e9abc7a3a13..f94e17669e4aa 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java @@ -84,10 +84,8 @@ protected synchronized boolean reset(Option callerInstant, } public void close() { - if (isLockRequired) { - lockManager.close(); - LOG.info("Transaction manager closed"); - } + lockManager.close(); + LOG.info("Transaction manager closed"); } public LockManager getLockManager() { diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java index 13105251acf7b..ba976e78aebcc 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java @@ -20,6 +20,7 @@ import org.apache.hudi.client.common.HoodieFlinkEngineContext; import org.apache.hudi.client.embedded.EmbeddedTimelineService; +import org.apache.hudi.client.transaction.TransactionManager; import org.apache.hudi.common.data.HoodieData; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.HoodieCommitMetadata; @@ -68,8 +69,9 @@ public class HoodieFlinkTableServiceClient extends BaseHoodieTableServiceClie protected HoodieFlinkTableServiceClient(HoodieEngineContext context, HoodieWriteConfig clientConfig, - Option timelineService) { - super(context, clientConfig, timelineService); + Option timelineService, + Option txnManager) { + super(context, clientConfig, timelineService, txnManager); } @Override diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java index a320aad62fc06..5f03bf4a8cbb6 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java @@ -86,9 +86,9 @@ public class HoodieFlinkWriteClient extends private final Map bucketToHandles; public HoodieFlinkWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig) { - super(context, writeConfig, FlinkUpgradeDowngradeHelper.getInstance()); + super(context, writeConfig, Option.empty(), FlinkUpgradeDowngradeHelper.getInstance()); this.bucketToHandles = new HashMap<>(); - this.tableServiceClient = new HoodieFlinkTableServiceClient<>(context, writeConfig, getTimelineServer()); + this.tableServiceClient = new HoodieFlinkTableServiceClient<>(context, writeConfig, getTimelineServer(), getTxnManager()); } /** diff --git a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/client/TestFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/client/TestFlinkWriteClient.java index 0752e9e785c73..eb0bdc5995838 100644 --- a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/client/TestFlinkWriteClient.java +++ b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/client/TestFlinkWriteClient.java @@ -23,6 +23,7 @@ import org.apache.hudi.testutils.HoodieFlinkClientTestHarness; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; @@ -30,6 +31,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; public class TestFlinkWriteClient extends HoodieFlinkClientTestHarness { @@ -61,4 +63,22 @@ public void testWriteClientAndTableServiceClientWithTimelineServer( writeClient.close(); } + + @Test + public void testWriteClientAndTableServiceClientWithTxnManager() { + HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder() + .withPath(metaClient.getBasePathV2().toString()) + .build(); + + HoodieFlinkWriteClient writeClient = new HoodieFlinkWriteClient(context, writeConfig); + + assertNotNull(writeClient.getTxnManager()); + + assertEquals( + writeClient.getTxnManager(), + writeClient.getTableServiceClient().getTxnManager() + ); + + writeClient.close(); + } } diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaTableServiceClient.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaTableServiceClient.java index bcbd7dac918f8..69f4bd4eebd2b 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaTableServiceClient.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaTableServiceClient.java @@ -19,6 +19,7 @@ package org.apache.hudi.client; import org.apache.hudi.client.embedded.EmbeddedTimelineService; +import org.apache.hudi.client.transaction.TransactionManager; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.util.Option; @@ -37,8 +38,9 @@ public class HoodieJavaTableServiceClient extends BaseHoodieTableServiceClient timelineService) { - super(context, clientConfig, timelineService); + Option timelineService, + Option txnManager) { + super(context, clientConfig, timelineService, txnManager); } @Override diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java index 54129fda16c58..360f9738b3d11 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java @@ -18,7 +18,6 @@ package org.apache.hudi.client; -import org.apache.hudi.client.common.HoodieJavaEngineContext; import org.apache.hudi.client.embedded.EmbeddedTimelineService; import org.apache.hudi.common.data.HoodieListData; import org.apache.hudi.common.engine.HoodieEngineContext; @@ -52,22 +51,20 @@ public class HoodieJavaWriteClient extends BaseHoodieWriteClient>, List, List> { public HoodieJavaWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig) { - super(context, writeConfig, JavaUpgradeDowngradeHelper.getInstance()); - this.tableServiceClient = new HoodieJavaTableServiceClient(context, writeConfig, getTimelineServer()); + this(context, writeConfig, Option.empty()); } public HoodieJavaWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig, - boolean rollbackPending, Option timelineService) { super(context, writeConfig, timelineService, JavaUpgradeDowngradeHelper.getInstance()); - this.tableServiceClient = new HoodieJavaTableServiceClient(context, writeConfig, getTimelineServer()); + this.tableServiceClient = new HoodieJavaTableServiceClient(context, writeConfig, getTimelineServer(), getTxnManager()); } @Override public List> filterExists(List> hoodieRecords) { // Create a Hoodie table which encapsulated the commits and files visible - HoodieJavaTable table = HoodieJavaTable.create(config, (HoodieJavaEngineContext) context); + HoodieJavaTable table = HoodieJavaTable.create(config, context); Timer.Context indexTimer = metrics.getIndexCtx(); List> recordsWithLocation = getIndex().tagLocation(HoodieListData.eager(hoodieRecords), context, table).collectAsList(); metrics.updateIndexMetrics(LOOKUP_STR, metrics.getDurationInMs(indexTimer == null ? 0L : indexTimer.stop())); @@ -239,7 +236,7 @@ protected void completeCompaction(HoodieCommitMetadata metadata, @Override protected HoodieWriteMetadata> compact(String compactionInstantTime, - boolean shouldComplete) { + boolean shouldComplete) { throw new HoodieNotSupportedException("Compact is not supported in HoodieJavaClient"); } diff --git a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestHoodieJavaWriteClientInsert.java b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestHoodieJavaWriteClientInsert.java index 02c407ba02db3..c89237cd9353a 100644 --- a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestHoodieJavaWriteClientInsert.java +++ b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestHoodieJavaWriteClientInsert.java @@ -61,6 +61,7 @@ import static org.apache.hudi.index.HoodieIndex.IndexType.INMEMORY; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; public class TestHoodieJavaWriteClientInsert extends HoodieJavaClientTestHarness { @@ -120,7 +121,7 @@ public void testWriteClientAndTableServiceClientWithTimelineServer( new EmbeddedTimelineService(context, null, writeConfig); timelineService.startServer(); writeConfig.setViewStorageConfig(timelineService.getRemoteFileSystemViewConfig()); - writeClient = new HoodieJavaWriteClient(context, writeConfig, true, Option.of(timelineService)); + writeClient = new HoodieJavaWriteClient(context, writeConfig, Option.of(timelineService)); // Both the write client and the table service client should use the same passed-in // timeline server instance. assertEquals(timelineService, writeClient.getTimelineServer().get()); @@ -142,6 +143,25 @@ public void testWriteClientAndTableServiceClientWithTimelineServer( writeClient.close(); } + @Test + public void testWriteClientAndTableServiceClientWithTxnManager() { + HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder() + .withPath(metaClient.getBasePathV2().toString()) + .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(INMEMORY).build()) + .build(); + + HoodieJavaWriteClient writeClient = new HoodieJavaWriteClient(context, writeConfig); + + assertNotNull(writeClient.getTxnManager()); + + assertEquals( + writeClient.getTxnManager(), + writeClient.getTableServiceClient().getTxnManager() + ); + + writeClient.close(); + } + @Test public void testInsert() throws Exception { HoodieWriteConfig config = makeHoodieClientConfigBuilder(basePath).withMergeAllowDuplicateOnInserts(true).build(); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDTableServiceClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDTableServiceClient.java index d2fb78e08c696..0db076a8df3d9 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDTableServiceClient.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDTableServiceClient.java @@ -21,6 +21,7 @@ import org.apache.hudi.avro.model.HoodieClusteringGroup; import org.apache.hudi.avro.model.HoodieClusteringPlan; import org.apache.hudi.client.embedded.EmbeddedTimelineService; +import org.apache.hudi.client.transaction.TransactionManager; import org.apache.hudi.common.data.HoodieData; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.HoodieCommitMetadata; @@ -67,8 +68,9 @@ public class SparkRDDTableServiceClient extends BaseHoodieTableServiceClient< protected SparkRDDTableServiceClient(HoodieEngineContext context, HoodieWriteConfig clientConfig, - Option timelineService) { - super(context, clientConfig, timelineService); + Option timelineService, + Option txnManager) { + super(context, clientConfig, timelineService, txnManager); } @Override diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java index 711f5708d9b32..5396fb69eb061 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java @@ -71,21 +71,10 @@ public SparkRDDWriteClient(HoodieEngineContext context, HoodieWriteConfig client this(context, clientConfig, Option.empty()); } - @Deprecated - public SparkRDDWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig, boolean rollbackPending) { - this(context, writeConfig, Option.empty()); - } - - @Deprecated - public SparkRDDWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig, boolean rollbackPending, - Option timelineService) { - this(context, writeConfig, timelineService); - } - public SparkRDDWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig, Option timelineService) { super(context, writeConfig, timelineService, SparkUpgradeDowngradeHelper.getInstance()); - this.tableServiceClient = new SparkRDDTableServiceClient<>(context, writeConfig, getTimelineServer()); + this.tableServiceClient = new SparkRDDTableServiceClient<>(context, writeConfig, getTimelineServer(), getTxnManager()); } @Override diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java index 320ebb1576339..1e75545d9dd8d 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java @@ -188,7 +188,7 @@ public void deletePartitions(String instantTime, List par List partitionsToDrop = partitions.stream().map(MetadataPartitionType::getPartitionPath).collect(Collectors.toList()); LOG.info("Deleting Metadata Table partitions: " + partitionsToDrop); - try (SparkRDDWriteClient writeClient = new SparkRDDWriteClient(engineContext, metadataWriteConfig, true)) { + try (SparkRDDWriteClient writeClient = new SparkRDDWriteClient(engineContext, metadataWriteConfig)) { String actionType = CommitUtils.getCommitActionType(WriteOperationType.DELETE_PARTITION, HoodieTableType.MERGE_ON_READ); writeClient.startCommitWithTime(instantTime, actionType); writeClient.deletePartitions(partitionsToDrop, instantTime); @@ -199,7 +199,7 @@ public void deletePartitions(String instantTime, List par @Override public BaseHoodieWriteClient getWriteClient() { if (writeClient == null) { - writeClient = new SparkRDDWriteClient(engineContext, metadataWriteConfig, true); + writeClient = new SparkRDDWriteClient(engineContext, metadataWriteConfig); } return writeClient; } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java index c2de0595dc7f3..68ab2313a2576 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java @@ -481,6 +481,7 @@ public void testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType ta final SparkRDDWriteClient client3 = getHoodieWriteClient(cfg); // Create upserts, schedule cleaning, schedule compaction in parallel + String fourthInstantTime = HoodieActiveTimeline.createNewInstantTime(); Future future1 = executors.submit(() -> { final String newCommitTime = HoodieActiveTimeline.createNewInstantTime(); final int numRecords = 100; @@ -492,34 +493,35 @@ public void testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType ta if (tableType == HoodieTableType.MERGE_ON_READ) { // Since the compaction already went in, this upsert has // to fail + // Note: the start time of upsert must be smaller than the compaction scheduling time assertThrows(IllegalArgumentException.class, () -> { - createCommitWithUpserts(cfg, client1, thirdCommitTime, commitTimeBetweenPrevAndNew, newCommitTime, numRecords); + createCommitWithUpserts(cfg, client1, thirdCommitTime, commitTimeBetweenPrevAndNew, fourthInstantTime, numRecords); }); } else { // We don't have the compaction for COW and so this upsert // has to pass assertDoesNotThrow(() -> { - createCommitWithUpserts(cfg, client1, thirdCommitTime, commitTimeBetweenPrevAndNew, newCommitTime, numRecords); + createCommitWithUpserts(cfg, client1, thirdCommitTime, commitTimeBetweenPrevAndNew, fourthInstantTime, numRecords); }); - validInstants.add(newCommitTime); + validInstants.add(fourthInstantTime); } }); + String fifthInstantTime = HoodieActiveTimeline.createNewInstantTime(); Future future2 = executors.submit(() -> { if (tableType == HoodieTableType.MERGE_ON_READ) { assertDoesNotThrow(() -> { - String compactionTimeStamp = HoodieActiveTimeline.createNewInstantTime(); - client2.scheduleTableService(compactionTimeStamp, Option.empty(), TableServiceType.COMPACT); + client2.scheduleTableService(fifthInstantTime, Option.empty(), TableServiceType.COMPACT); }); } latchCountDownAndWait(scheduleCountDownLatch, 30000); }); + String sixthInstantTime = HoodieActiveTimeline.createNewInstantTime(); Future future3 = executors.submit(() -> { assertDoesNotThrow(() -> { latchCountDownAndWait(scheduleCountDownLatch, 30000); - String cleanCommitTime = HoodieActiveTimeline.createNewInstantTime(); - client3.scheduleTableService(cleanCommitTime, Option.empty(), TableServiceType.CLEAN); + client3.scheduleTableService(sixthInstantTime, Option.empty(), TableServiceType.CLEAN); }); }); future1.get(); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDWriteClient.java index 9cffce2b07bbe..0d4eb7d1a0ef0 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDWriteClient.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDWriteClient.java @@ -35,6 +35,7 @@ import org.apache.avro.generic.GenericRecord; import org.apache.spark.api.java.JavaRDD; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.CsvSource; @@ -52,6 +53,7 @@ import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; class TestSparkRDDWriteClient extends SparkClientFunctionalTestHarness { @@ -110,6 +112,26 @@ public void testWriteClientAndTableServiceClientWithTimelineServer( writeClient.close(); } + @Test + public void testWriteClientAndTableServiceClientWithTxnManager() throws IOException { + HoodieTableMetaClient metaClient = + getHoodieMetaClient(hadoopConf(), URI.create(basePath()).getPath(), new Properties()); + HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder() + .withPath(metaClient.getBasePathV2().toString()) + .build(); + + SparkRDDWriteClient writeClient = new SparkRDDWriteClient(context(), writeConfig); + + assertNotNull(writeClient.getTxnManager()); + + assertEquals( + writeClient.getTxnManager(), + writeClient.getTableServiceClient().getTxnManager() + ); + + writeClient.close(); + } + @ParameterizedTest @MethodSource void testWriteClientReleaseResourcesShouldOnlyUnpersistRelevantRdds( diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java index 56fabb362b60a..e50960cd3c2d9 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java @@ -2456,8 +2456,7 @@ public void testRollbackOfPartiallyFailedCommitWithNewPartitions() throws Except HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc); try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, - getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER, true, true, false, false, false).build(), - true)) { + getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER, true, true, false, false, false).build())) { String newCommitTime = HoodieActiveTimeline.createNewInstantTime(); client.startCommitWithTime(newCommitTime); List records = dataGen.generateInserts(newCommitTime, 10); @@ -2487,8 +2486,7 @@ public void testRollbackOfPartiallyFailedCommitWithNewPartitions() throws Except } try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, - getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER, true, true, false, false, false).build(), - true)) { + getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER, true, true, false, false, false).build())) { String newCommitTime = client.startCommit(); // Next insert List records = dataGen.generateInserts(newCommitTime, 20); @@ -2682,8 +2680,7 @@ public void testErrorCases() throws Exception { // TESTCASE: If commit on the metadata table succeeds but fails on the dataset, then on next init the metadata table // should be rolled back to last valid commit. try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, - getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER, true, true, false, false, false).build(), - true)) { + getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER, true, true, false, false, false).build())) { String newCommitTime = HoodieActiveTimeline.createNewInstantTime(); client.startCommitWithTime(newCommitTime); List records = dataGen.generateInserts(newCommitTime, 10); @@ -2706,8 +2703,7 @@ public void testErrorCases() throws Exception { } try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, - getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER, true, true, false, false, false).build(), - true)) { + getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER, true, true, false, false, false).build())) { String newCommitTime = client.startCommit(); // Next insert List records = dataGen.generateInserts(newCommitTime, 5); @@ -3070,7 +3066,7 @@ public void testOutOfOrderCommits() throws Exception { metadataProps.setProperty(INLINE_COMPACT_NUM_DELTA_COMMITS.key(), "3"); HoodieWriteConfig metadataWriteConfig = HoodieWriteConfig.newBuilder() .withProperties(metadataProps).build(); - SparkRDDWriteClient metadataWriteClient = new SparkRDDWriteClient(context, metadataWriteConfig, true); + SparkRDDWriteClient metadataWriteClient = new SparkRDDWriteClient(context, metadataWriteConfig); final String compactionInstantTime = HoodieTableMetadataUtil.createCompactionTimestamp(commitTime); assertTrue(metadataWriteClient.scheduleCompactionAtInstant(compactionInstantTime, Option.empty())); metadataWriteClient.compact(compactionInstantTime);