Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.avro.model.HoodieRollbackPlan;
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
import org.apache.hudi.client.heartbeat.HeartbeatUtils;
import org.apache.hudi.common.HoodiePendingRollbackInfo;
import org.apache.hudi.common.engine.HoodieEngineContext;
Expand Down Expand Up @@ -87,8 +88,10 @@ public abstract class BaseHoodieTableServiceClient<O> extends BaseHoodieClient i

protected Set<String> pendingInflightAndRequestedInstants;

protected BaseHoodieTableServiceClient(HoodieEngineContext context, HoodieWriteConfig clientConfig) {
super(context, clientConfig, Option.empty());
protected BaseHoodieTableServiceClient(HoodieEngineContext context,
HoodieWriteConfig clientConfig,
Option<EmbeddedTimelineService> timelineService) {
super(context, clientConfig, timelineService);
}

protected void startAsyncCleanerService(BaseHoodieWriteClient writeClient) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hudi.client;

import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieKey;
Expand Down Expand Up @@ -62,8 +63,10 @@ public class HoodieFlinkTableServiceClient<T> extends BaseHoodieTableServiceClie
*/
private HoodieBackedTableMetadataWriter metadataWriter;

protected HoodieFlinkTableServiceClient(HoodieEngineContext context, HoodieWriteConfig clientConfig) {
super(context, clientConfig);
protected HoodieFlinkTableServiceClient(HoodieEngineContext context,
HoodieWriteConfig clientConfig,
Option<EmbeddedTimelineService> timelineService) {
super(context, clientConfig, timelineService);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public class HoodieFlinkWriteClient<T> extends
public HoodieFlinkWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig) {
super(context, writeConfig, FlinkUpgradeDowngradeHelper.getInstance());
this.bucketToHandles = new HashMap<>();
this.tableServiceClient = new HoodieFlinkTableServiceClient<>(context, writeConfig);
this.tableServiceClient = new HoodieFlinkTableServiceClient<>(context, writeConfig, getTimelineServer());
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.hudi.client;

import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.testutils.HoodieFlinkClientTestHarness;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import java.io.IOException;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;

public class TestFlinkWriteClient extends HoodieFlinkClientTestHarness {

@BeforeEach
private void setup() throws IOException {
initPath();
initFileSystem();
initMetaClient();
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testWriteClientAndTableServiceClientWithTimelineServer(
boolean enableEmbeddedTimelineServer) throws IOException {
HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
.withPath(metaClient.getBasePathV2().toString())
.withEmbeddedTimelineServerEnabled(enableEmbeddedTimelineServer)
.build();

HoodieFlinkWriteClient writeClient = new HoodieFlinkWriteClient(context, writeConfig);
// Only one timeline server should be instantiated, and the same timeline server
// should be used by both the write client and the table service client.
assertEquals(
writeClient.getTimelineServer(),
writeClient.getTableServiceClient().getTimelineServer());
if (!enableEmbeddedTimelineServer) {
assertFalse(writeClient.getTimelineServer().isPresent());
}

writeClient.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.client;

import org.apache.hudi.client.embedded.EmbeddedTimelineService;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.util.Option;
Expand All @@ -34,8 +35,10 @@

public class HoodieJavaTableServiceClient extends BaseHoodieTableServiceClient<List<WriteStatus>> {

protected HoodieJavaTableServiceClient(HoodieEngineContext context, HoodieWriteConfig clientConfig) {
super(context, clientConfig);
protected HoodieJavaTableServiceClient(HoodieEngineContext context,
HoodieWriteConfig clientConfig,
Option<EmbeddedTimelineService> timelineService) {
super(context, clientConfig, timelineService);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,15 @@ public class HoodieJavaWriteClient<T> extends

public HoodieJavaWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig) {
super(context, writeConfig, JavaUpgradeDowngradeHelper.getInstance());
this.tableServiceClient = new HoodieJavaTableServiceClient(context, writeConfig);
this.tableServiceClient = new HoodieJavaTableServiceClient(context, writeConfig, getTimelineServer());
}

public HoodieJavaWriteClient(HoodieEngineContext context,
HoodieWriteConfig writeConfig,
boolean rollbackPending,
Option<EmbeddedTimelineService> timelineService) {
super(context, writeConfig, timelineService, JavaUpgradeDowngradeHelper.getInstance());
this.tableServiceClient = new HoodieJavaTableServiceClient(context, writeConfig);
this.tableServiceClient = new HoodieJavaTableServiceClient(context, writeConfig, getTimelineServer());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.hudi.client;

import org.apache.hudi.client.embedded.EmbeddedTimelineService;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.fs.FSUtils;
Expand All @@ -31,6 +32,8 @@
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.testutils.RawTripTestPayload;
import org.apache.hudi.common.util.BaseFileUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.hadoop.HoodieParquetInputFormat;
import org.apache.hudi.hadoop.utils.HoodieHiveUtils;
Expand All @@ -44,16 +47,20 @@
import org.apache.hadoop.mapred.JobConf;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;

import java.io.IOException;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;

import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.AVRO_SCHEMA;
import static org.apache.hudi.common.testutils.HoodieTestTable.makeNewCommitTime;
import static org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResource;
import static org.apache.hudi.index.HoodieIndex.IndexType.INMEMORY;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class TestHoodieJavaWriteClientInsert extends HoodieJavaClientTestHarness {
Expand Down Expand Up @@ -96,6 +103,45 @@ private void setupIncremental(JobConf jobConf, String startCommit, int numberOfC
jobConf.setInt(maxCommitPulls, numberOfCommitsToPull);
}

@ParameterizedTest
@CsvSource({"true,true", "true,false", "false,true", "false,false"})
public void testWriteClientAndTableServiceClientWithTimelineServer(
boolean enableEmbeddedTimelineServer, boolean passInTimelineServer) throws IOException {
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(INMEMORY).build())
.withPath(metaClient.getBasePathV2().toString())
.withEmbeddedTimelineServerEnabled(enableEmbeddedTimelineServer)
.build();

HoodieJavaWriteClient writeClient;
if (passInTimelineServer) {
EmbeddedTimelineService timelineService =
new EmbeddedTimelineService(context, null, writeConfig);
timelineService.startServer();
writeConfig.setViewStorageConfig(timelineService.getRemoteFileSystemViewConfig());
writeClient = new HoodieJavaWriteClient(context, writeConfig, true, Option.of(timelineService));
// Both the write client and the table service client should use the same passed-in
// timeline server instance.
assertEquals(timelineService, writeClient.getTimelineServer().get());
assertEquals(timelineService, writeClient.getTableServiceClient().getTimelineServer().get());
// Write config should not be changed
assertEquals(writeConfig, writeClient.getConfig());
timelineService.stop();
} else {
writeClient = new HoodieJavaWriteClient(context, writeConfig);
// Only one timeline server should be instantiated, and the same timeline server
// should be used by both the write client and the table service client.
assertEquals(
writeClient.getTimelineServer(),
writeClient.getTableServiceClient().getTimelineServer());
if (!enableEmbeddedTimelineServer) {
assertFalse(writeClient.getTimelineServer().isPresent());
}
}
writeClient.close();
}

@Test
public void testInsert() throws Exception {
HoodieWriteConfig config = makeHoodieClientConfigBuilder(basePath).withMergeAllowDuplicateOnInserts(true).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.hudi.avro.model.HoodieClusteringGroup;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieCommitMetadata;
Expand Down Expand Up @@ -58,8 +59,10 @@ public class SparkRDDTableServiceClient<T> extends BaseHoodieTableServiceClient<

private static final Logger LOG = LoggerFactory.getLogger(SparkRDDTableServiceClient.class);

protected SparkRDDTableServiceClient(HoodieEngineContext context, HoodieWriteConfig clientConfig) {
super(context, clientConfig);
protected SparkRDDTableServiceClient(HoodieEngineContext context,
HoodieWriteConfig clientConfig,
Option<EmbeddedTimelineService> timelineService) {
super(context, clientConfig, timelineService);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public SparkRDDWriteClient(HoodieEngineContext context, HoodieWriteConfig writeC
public SparkRDDWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig,
Option<EmbeddedTimelineService> timelineService) {
super(context, writeConfig, timelineService, SparkUpgradeDowngradeHelper.getInstance());
this.tableServiceClient = new SparkRDDTableServiceClient<>(context, writeConfig);
this.tableServiceClient = new SparkRDDTableServiceClient<>(context, writeConfig, getTimelineServer());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@

package org.apache.hudi.client;

import org.apache.hudi.client.embedded.EmbeddedTimelineService;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.data.HoodieData.HoodieDataCacheKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
Expand All @@ -34,6 +36,7 @@
import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.MethodSource;

import java.io.IOException;
Expand Down Expand Up @@ -61,6 +64,47 @@ static Stream<Arguments> testWriteClientReleaseResourcesShouldOnlyUnpersistRelev
);
}

@ParameterizedTest
@CsvSource({"true,true", "true,false", "false,true", "false,false"})
public void testWriteClientAndTableServiceClientWithTimelineServer(
boolean enableEmbeddedTimelineServer, boolean passInTimelineServer) throws IOException {
HoodieTableMetaClient metaClient =
getHoodieMetaClient(hadoopConf(), URI.create(basePath()).getPath(), new Properties());
HoodieWriteConfig writeConfig = getConfigBuilder(true)
.withPath(metaClient.getBasePathV2().toString())
.withEmbeddedTimelineServerEnabled(enableEmbeddedTimelineServer)
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
.withRemoteServerPort(incrementTimelineServicePortToUse()).build())
.build();

SparkRDDWriteClient writeClient;
if (passInTimelineServer) {
EmbeddedTimelineService timelineService =
new EmbeddedTimelineService(context(), null, writeConfig);
timelineService.startServer();
writeConfig.setViewStorageConfig(timelineService.getRemoteFileSystemViewConfig());
writeClient = new SparkRDDWriteClient(context(), writeConfig, Option.of(timelineService));
// Both the write client and the table service client should use the same passed-in
// timeline server instance.
assertEquals(timelineService, writeClient.getTimelineServer().get());
assertEquals(timelineService, writeClient.getTableServiceClient().getTimelineServer().get());
// Write config should not be changed
assertEquals(writeConfig, writeClient.getConfig());
timelineService.stop();
} else {
writeClient = new SparkRDDWriteClient(context(), writeConfig);
// Only one timeline server should be instantiated, and the same timeline server
// should be used by both the write client and the table service client.
assertEquals(
writeClient.getTimelineServer(),
writeClient.getTableServiceClient().getTimelineServer());
if (!enableEmbeddedTimelineServer) {
assertFalse(writeClient.getTimelineServer().isPresent());
}
}
writeClient.close();
}

@ParameterizedTest
@MethodSource
void testWriteClientReleaseResourcesShouldOnlyUnpersistRelevantRdds(HoodieTableType tableType, boolean shouldReleaseResource) throws IOException {
Expand Down