diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java index 301ed61bf4e5..5245c4bba214 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java @@ -26,6 +26,7 @@ import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.avro.model.HoodieRollbackMetadata; import org.apache.hudi.avro.model.HoodieRollbackPlan; +import org.apache.hudi.client.embedded.EmbeddedTimelineService; import org.apache.hudi.client.heartbeat.HeartbeatUtils; import org.apache.hudi.common.HoodiePendingRollbackInfo; import org.apache.hudi.common.engine.HoodieEngineContext; @@ -87,8 +88,10 @@ public abstract class BaseHoodieTableServiceClient extends BaseHoodieClient i protected Set pendingInflightAndRequestedInstants; - protected BaseHoodieTableServiceClient(HoodieEngineContext context, HoodieWriteConfig clientConfig) { - super(context, clientConfig, Option.empty()); + protected BaseHoodieTableServiceClient(HoodieEngineContext context, + HoodieWriteConfig clientConfig, + Option timelineService) { + super(context, clientConfig, timelineService); } protected void startAsyncCleanerService(BaseHoodieWriteClient writeClient) { diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java index faf27d2ce824..89e3d78e0d3d 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java @@ -19,6 +19,7 @@ package org.apache.hudi.client; import org.apache.hudi.client.common.HoodieFlinkEngineContext; +import org.apache.hudi.client.embedded.EmbeddedTimelineService; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieKey; @@ -62,8 +63,10 @@ public class HoodieFlinkTableServiceClient extends BaseHoodieTableServiceClie */ private HoodieBackedTableMetadataWriter metadataWriter; - protected HoodieFlinkTableServiceClient(HoodieEngineContext context, HoodieWriteConfig clientConfig) { - super(context, clientConfig); + protected HoodieFlinkTableServiceClient(HoodieEngineContext context, + HoodieWriteConfig clientConfig, + Option timelineService) { + super(context, clientConfig, timelineService); } @Override diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java index 1d4697d709d0..28327192a89b 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java @@ -87,7 +87,7 @@ public class HoodieFlinkWriteClient extends public HoodieFlinkWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig) { super(context, writeConfig, FlinkUpgradeDowngradeHelper.getInstance()); this.bucketToHandles = new HashMap<>(); - this.tableServiceClient = new HoodieFlinkTableServiceClient<>(context, writeConfig); + this.tableServiceClient = new HoodieFlinkTableServiceClient<>(context, writeConfig, getTimelineServer()); } /** diff --git a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/client/TestFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/client/TestFlinkWriteClient.java new file mode 100644 index 000000000000..0752e9e785c7 --- /dev/null +++ b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/client/TestFlinkWriteClient.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.client; + +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.testutils.HoodieFlinkClientTestHarness; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import java.io.IOException; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; + +public class TestFlinkWriteClient extends HoodieFlinkClientTestHarness { + + @BeforeEach + private void setup() throws IOException { + initPath(); + initFileSystem(); + initMetaClient(); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testWriteClientAndTableServiceClientWithTimelineServer( + boolean enableEmbeddedTimelineServer) throws IOException { + HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder() + .withPath(metaClient.getBasePathV2().toString()) + .withEmbeddedTimelineServerEnabled(enableEmbeddedTimelineServer) + .build(); + + HoodieFlinkWriteClient writeClient = new HoodieFlinkWriteClient(context, writeConfig); + // Only one timeline server should be instantiated, and the same timeline server + // should be used by both the write client and the table service client. + assertEquals( + writeClient.getTimelineServer(), + writeClient.getTableServiceClient().getTimelineServer()); + if (!enableEmbeddedTimelineServer) { + assertFalse(writeClient.getTimelineServer().isPresent()); + } + + writeClient.close(); + } +} diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaTableServiceClient.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaTableServiceClient.java index 2d823aa7f577..bcbd7dac918f 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaTableServiceClient.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaTableServiceClient.java @@ -18,6 +18,7 @@ package org.apache.hudi.client; +import org.apache.hudi.client.embedded.EmbeddedTimelineService; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.util.Option; @@ -34,8 +35,10 @@ public class HoodieJavaTableServiceClient extends BaseHoodieTableServiceClient> { - protected HoodieJavaTableServiceClient(HoodieEngineContext context, HoodieWriteConfig clientConfig) { - super(context, clientConfig); + protected HoodieJavaTableServiceClient(HoodieEngineContext context, + HoodieWriteConfig clientConfig, + Option timelineService) { + super(context, clientConfig, timelineService); } @Override diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java index af35078b9a98..997dd5d84e36 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java @@ -53,7 +53,7 @@ public class HoodieJavaWriteClient extends public HoodieJavaWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig) { super(context, writeConfig, JavaUpgradeDowngradeHelper.getInstance()); - this.tableServiceClient = new HoodieJavaTableServiceClient(context, writeConfig); + this.tableServiceClient = new HoodieJavaTableServiceClient(context, writeConfig, getTimelineServer()); } public HoodieJavaWriteClient(HoodieEngineContext context, @@ -61,7 +61,7 @@ public HoodieJavaWriteClient(HoodieEngineContext context, boolean rollbackPending, Option timelineService) { super(context, writeConfig, timelineService, JavaUpgradeDowngradeHelper.getInstance()); - this.tableServiceClient = new HoodieJavaTableServiceClient(context, writeConfig); + this.tableServiceClient = new HoodieJavaTableServiceClient(context, writeConfig, getTimelineServer()); } @Override diff --git a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestHoodieJavaWriteClientInsert.java b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestHoodieJavaWriteClientInsert.java index ae73b0a65d79..02c407ba02db 100644 --- a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestHoodieJavaWriteClientInsert.java +++ b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestHoodieJavaWriteClientInsert.java @@ -19,6 +19,7 @@ package org.apache.hudi.client; +import org.apache.hudi.client.embedded.EmbeddedTimelineService; import org.apache.hudi.common.bloom.BloomFilter; import org.apache.hudi.common.engine.EngineType; import org.apache.hudi.common.fs.FSUtils; @@ -31,6 +32,8 @@ import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.testutils.RawTripTestPayload; import org.apache.hudi.common.util.BaseFileUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.hadoop.HoodieParquetInputFormat; import org.apache.hudi.hadoop.utils.HoodieHiveUtils; @@ -44,8 +47,10 @@ import org.apache.hadoop.mapred.JobConf; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; import org.junit.jupiter.params.provider.ValueSource; +import java.io.IOException; import java.nio.file.Paths; import java.util.ArrayList; import java.util.List; @@ -53,7 +58,9 @@ import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.AVRO_SCHEMA; import static org.apache.hudi.common.testutils.HoodieTestTable.makeNewCommitTime; import static org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResource; +import static org.apache.hudi.index.HoodieIndex.IndexType.INMEMORY; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; public class TestHoodieJavaWriteClientInsert extends HoodieJavaClientTestHarness { @@ -96,6 +103,45 @@ private void setupIncremental(JobConf jobConf, String startCommit, int numberOfC jobConf.setInt(maxCommitPulls, numberOfCommitsToPull); } + @ParameterizedTest + @CsvSource({"true,true", "true,false", "false,true", "false,false"}) + public void testWriteClientAndTableServiceClientWithTimelineServer( + boolean enableEmbeddedTimelineServer, boolean passInTimelineServer) throws IOException { + metaClient = HoodieTableMetaClient.reload(metaClient); + HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder() + .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(INMEMORY).build()) + .withPath(metaClient.getBasePathV2().toString()) + .withEmbeddedTimelineServerEnabled(enableEmbeddedTimelineServer) + .build(); + + HoodieJavaWriteClient writeClient; + if (passInTimelineServer) { + EmbeddedTimelineService timelineService = + new EmbeddedTimelineService(context, null, writeConfig); + timelineService.startServer(); + writeConfig.setViewStorageConfig(timelineService.getRemoteFileSystemViewConfig()); + writeClient = new HoodieJavaWriteClient(context, writeConfig, true, Option.of(timelineService)); + // Both the write client and the table service client should use the same passed-in + // timeline server instance. + assertEquals(timelineService, writeClient.getTimelineServer().get()); + assertEquals(timelineService, writeClient.getTableServiceClient().getTimelineServer().get()); + // Write config should not be changed + assertEquals(writeConfig, writeClient.getConfig()); + timelineService.stop(); + } else { + writeClient = new HoodieJavaWriteClient(context, writeConfig); + // Only one timeline server should be instantiated, and the same timeline server + // should be used by both the write client and the table service client. + assertEquals( + writeClient.getTimelineServer(), + writeClient.getTableServiceClient().getTimelineServer()); + if (!enableEmbeddedTimelineServer) { + assertFalse(writeClient.getTimelineServer().isPresent()); + } + } + writeClient.close(); + } + @Test public void testInsert() throws Exception { HoodieWriteConfig config = makeHoodieClientConfigBuilder(basePath).withMergeAllowDuplicateOnInserts(true).build(); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDTableServiceClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDTableServiceClient.java index 1fcd6acb12de..de203a0aa5b6 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDTableServiceClient.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDTableServiceClient.java @@ -20,6 +20,7 @@ import org.apache.hudi.avro.model.HoodieClusteringGroup; import org.apache.hudi.avro.model.HoodieClusteringPlan; +import org.apache.hudi.client.embedded.EmbeddedTimelineService; import org.apache.hudi.common.data.HoodieData; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.HoodieCommitMetadata; @@ -58,8 +59,10 @@ public class SparkRDDTableServiceClient extends BaseHoodieTableServiceClient< private static final Logger LOG = LoggerFactory.getLogger(SparkRDDTableServiceClient.class); - protected SparkRDDTableServiceClient(HoodieEngineContext context, HoodieWriteConfig clientConfig) { - super(context, clientConfig); + protected SparkRDDTableServiceClient(HoodieEngineContext context, + HoodieWriteConfig clientConfig, + Option timelineService) { + super(context, clientConfig, timelineService); } @Override diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java index ac3355d06fcb..643a042442ba 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java @@ -83,7 +83,7 @@ public SparkRDDWriteClient(HoodieEngineContext context, HoodieWriteConfig writeC public SparkRDDWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig, Option timelineService) { super(context, writeConfig, timelineService, SparkUpgradeDowngradeHelper.getInstance()); - this.tableServiceClient = new SparkRDDTableServiceClient<>(context, writeConfig); + this.tableServiceClient = new SparkRDDTableServiceClient<>(context, writeConfig, getTimelineServer()); } @Override diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDWriteClient.java index 0d8eda4912d2..96a519cc11f2 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDWriteClient.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDWriteClient.java @@ -19,11 +19,13 @@ package org.apache.hudi.client; +import org.apache.hudi.client.embedded.EmbeddedTimelineService; import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.data.HoodieData.HoodieDataCacheKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; @@ -34,6 +36,7 @@ import org.apache.spark.api.java.JavaRDD; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.CsvSource; import org.junit.jupiter.params.provider.MethodSource; import java.io.IOException; @@ -61,6 +64,47 @@ static Stream testWriteClientReleaseResourcesShouldOnlyUnpersistRelev ); } + @ParameterizedTest + @CsvSource({"true,true", "true,false", "false,true", "false,false"}) + public void testWriteClientAndTableServiceClientWithTimelineServer( + boolean enableEmbeddedTimelineServer, boolean passInTimelineServer) throws IOException { + HoodieTableMetaClient metaClient = + getHoodieMetaClient(hadoopConf(), URI.create(basePath()).getPath(), new Properties()); + HoodieWriteConfig writeConfig = getConfigBuilder(true) + .withPath(metaClient.getBasePathV2().toString()) + .withEmbeddedTimelineServerEnabled(enableEmbeddedTimelineServer) + .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder() + .withRemoteServerPort(incrementTimelineServicePortToUse()).build()) + .build(); + + SparkRDDWriteClient writeClient; + if (passInTimelineServer) { + EmbeddedTimelineService timelineService = + new EmbeddedTimelineService(context(), null, writeConfig); + timelineService.startServer(); + writeConfig.setViewStorageConfig(timelineService.getRemoteFileSystemViewConfig()); + writeClient = new SparkRDDWriteClient(context(), writeConfig, Option.of(timelineService)); + // Both the write client and the table service client should use the same passed-in + // timeline server instance. + assertEquals(timelineService, writeClient.getTimelineServer().get()); + assertEquals(timelineService, writeClient.getTableServiceClient().getTimelineServer().get()); + // Write config should not be changed + assertEquals(writeConfig, writeClient.getConfig()); + timelineService.stop(); + } else { + writeClient = new SparkRDDWriteClient(context(), writeConfig); + // Only one timeline server should be instantiated, and the same timeline server + // should be used by both the write client and the table service client. + assertEquals( + writeClient.getTimelineServer(), + writeClient.getTableServiceClient().getTimelineServer()); + if (!enableEmbeddedTimelineServer) { + assertFalse(writeClient.getTimelineServer().isPresent()); + } + } + writeClient.close(); + } + @ParameterizedTest @MethodSource void testWriteClientReleaseResourcesShouldOnlyUnpersistRelevantRdds(HoodieTableType tableType, boolean shouldReleaseResource) throws IOException {