Skip to content

Commit e4dff8a

Browse files
committed
[HUDI-2951] Disable remote view storage config for flink
1 parent 6dab307 commit e4dff8a

File tree

8 files changed

+9
-204
lines changed

8 files changed

+9
-204
lines changed

hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ public void open(Configuration parameters) throws Exception {
6262
if (conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED)) {
6363
// do not use the remote filesystem view because the async cleaning service
6464
// local timeline is very probably to fall behind with the remote one.
65-
this.writeClient = StreamerUtil.createWriteClient(conf, getRuntimeContext(), false);
65+
this.writeClient = StreamerUtil.createWriteClient(conf, getRuntimeContext());
6666
this.executor = NonThrownExecutor.builder(LOG).build();
6767
}
6868
}

hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ public void initializeState(StateInitializationContext context) throws Exception
120120
}
121121

122122
this.hadoopConf = StreamerUtil.getHadoopConf();
123-
this.writeConfig = StreamerUtil.getHoodieClientConfig(this.conf, true);
123+
this.writeConfig = StreamerUtil.getHoodieClientConfig(this.conf);
124124
this.hoodieTable = FlinkTables.createTable(writeConfig, hadoopConf, getRuntimeContext());
125125
this.aggregateManager = getRuntimeContext().getGlobalAggregateManager();
126126

hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ public BucketAssignFunction(Configuration conf) {
113113
@Override
114114
public void open(Configuration parameters) throws Exception {
115115
super.open(parameters);
116-
HoodieWriteConfig writeConfig = StreamerUtil.getHoodieClientConfig(this.conf, false);
116+
HoodieWriteConfig writeConfig = StreamerUtil.getHoodieClientConfig(this.conf);
117117
HoodieFlinkEngineContext context = new HoodieFlinkEngineContext(
118118
new SerializableConfiguration(StreamerUtil.getHadoopConf()),
119119
new FlinkTaskContextSupplier(getRuntimeContext()));

hudi-flink/src/main/java/org/apache/hudi/util/FlinkTables.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public static HoodieFlinkTable<?> createTable(Configuration conf, RuntimeContext
4646
HoodieFlinkEngineContext context = new HoodieFlinkEngineContext(
4747
new SerializableConfiguration(getHadoopConf()),
4848
new FlinkTaskContextSupplier(runtimeContext));
49-
HoodieWriteConfig writeConfig = getHoodieClientConfig(conf, true);
49+
HoodieWriteConfig writeConfig = getHoodieClientConfig(conf);
5050
return HoodieFlinkTable.create(writeConfig, context);
5151
}
5252

@@ -71,7 +71,7 @@ public static HoodieFlinkTable<?> createTable(
7171
* <p>This expects to be used by driver.
7272
*/
7373
public static HoodieFlinkTable<?> createTable(Configuration conf) {
74-
HoodieWriteConfig writeConfig = StreamerUtil.getHoodieClientConfig(conf, true, false);
74+
HoodieWriteConfig writeConfig = StreamerUtil.getHoodieClientConfig(conf);
7575
return HoodieFlinkTable.create(writeConfig, HoodieFlinkEngineContext.DEFAULT);
7676
}
7777
}

hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java

Lines changed: 4 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
3434
import org.apache.hudi.common.table.timeline.HoodieInstant;
3535
import org.apache.hudi.common.table.timeline.HoodieTimeline;
36-
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
3736
import org.apache.hudi.common.util.Option;
3837
import org.apache.hudi.common.util.ReflectionUtils;
3938
import org.apache.hudi.common.util.ValidationUtils;
@@ -145,21 +144,7 @@ public static org.apache.hadoop.conf.Configuration getHadoopConf() {
145144
return FlinkClientUtil.getHadoopConf();
146145
}
147146

148-
/**
149-
* Mainly used for tests.
150-
*/
151147
public static HoodieWriteConfig getHoodieClientConfig(Configuration conf) {
152-
return getHoodieClientConfig(conf, false, false);
153-
}
154-
155-
public static HoodieWriteConfig getHoodieClientConfig(Configuration conf, boolean loadFsViewStorageConfig) {
156-
return getHoodieClientConfig(conf, false, loadFsViewStorageConfig);
157-
}
158-
159-
public static HoodieWriteConfig getHoodieClientConfig(
160-
Configuration conf,
161-
boolean enableEmbeddedTimelineService,
162-
boolean loadFsViewStorageConfig) {
163148
HoodieWriteConfig.Builder builder =
164149
HoodieWriteConfig.newBuilder()
165150
.withEngineType(EngineType.FLINK)
@@ -204,20 +189,13 @@ public static HoodieWriteConfig getHoodieClientConfig(
204189
.withPayloadOrderingField(conf.getString(FlinkOptions.PRECOMBINE_FIELD))
205190
.withPayloadEventTimeField(conf.getString(FlinkOptions.PRECOMBINE_FIELD))
206191
.build())
207-
.withEmbeddedTimelineServerEnabled(enableEmbeddedTimelineService)
208192
.withEmbeddedTimelineServerReuseEnabled(true) // make write client embedded timeline service singleton
209193
.withAutoCommit(false)
210194
.withAllowOperationMetadataField(conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED))
211195
.withProps(flinkConf2TypedProperties(conf))
212196
.withSchema(getSourceSchema(conf).toString());
213197

214-
HoodieWriteConfig writeConfig = builder.build();
215-
if (loadFsViewStorageConfig) {
216-
// do not use the builder to give a change for recovering the original fs view storage config
217-
FileSystemViewStorageConfig viewStorageConfig = ViewStorageProperties.loadFromProperties(conf.getString(FlinkOptions.PATH));
218-
writeConfig.setViewStorageConfig(viewStorageConfig);
219-
}
220-
return writeConfig;
198+
return builder.build();
221199
}
222200

223201
/**
@@ -363,28 +341,15 @@ public static HoodieTableMetaClient createMetaClient(Configuration conf) {
363341

364342
/**
365343
* Creates the Flink write client.
366-
*
367-
* <p>This expects to be used by client, the driver should start an embedded timeline server.
368344
*/
369345
@SuppressWarnings("rawtypes")
370346
public static HoodieFlinkWriteClient createWriteClient(Configuration conf, RuntimeContext runtimeContext) {
371-
return createWriteClient(conf, runtimeContext, true);
372-
}
373-
374-
/**
375-
* Creates the Flink write client.
376-
*
377-
* <p>This expects to be used by client, set flag {@code loadFsViewStorageConfig} to use
378-
* remote filesystem view storage config, or an in-memory filesystem view storage is used.
379-
*/
380-
@SuppressWarnings("rawtypes")
381-
public static HoodieFlinkWriteClient createWriteClient(Configuration conf, RuntimeContext runtimeContext, boolean loadFsViewStorageConfig) {
382347
HoodieFlinkEngineContext context =
383348
new HoodieFlinkEngineContext(
384349
new SerializableConfiguration(getHadoopConf()),
385350
new FlinkTaskContextSupplier(runtimeContext));
386351

387-
HoodieWriteConfig writeConfig = getHoodieClientConfig(conf, loadFsViewStorageConfig);
352+
HoodieWriteConfig writeConfig = getHoodieClientConfig(conf);
388353
return new HoodieFlinkWriteClient<>(context, writeConfig);
389354
}
390355

@@ -397,18 +362,9 @@ public static HoodieFlinkWriteClient createWriteClient(Configuration conf, Runti
397362
*/
398363
@SuppressWarnings("rawtypes")
399364
public static HoodieFlinkWriteClient createWriteClient(Configuration conf) throws IOException {
400-
HoodieWriteConfig writeConfig = getHoodieClientConfig(conf, true, false);
365+
HoodieWriteConfig writeConfig = getHoodieClientConfig(conf);
401366
// build the write client to start the embedded timeline server
402-
final HoodieFlinkWriteClient writeClient = new HoodieFlinkWriteClient<>(HoodieFlinkEngineContext.DEFAULT, writeConfig);
403-
// create the filesystem view storage properties for client
404-
final FileSystemViewStorageConfig viewStorageConfig = writeConfig.getViewStorageConfig();
405-
// rebuild the view storage config with simplified options.
406-
FileSystemViewStorageConfig rebuilt = FileSystemViewStorageConfig.newBuilder()
407-
.withStorageType(viewStorageConfig.getStorageType())
408-
.withRemoteServerHost(viewStorageConfig.getRemoteViewServerHost())
409-
.withRemoteServerPort(viewStorageConfig.getRemoteViewServerPort()).build();
410-
ViewStorageProperties.createProperties(conf.getString(FlinkOptions.PATH), rebuilt);
411-
return writeClient;
367+
return new HoodieFlinkWriteClient<>(HoodieFlinkEngineContext.DEFAULT, writeConfig);
412368
}
413369

414370
/**

hudi-flink/src/main/java/org/apache/hudi/util/ViewStorageProperties.java

Lines changed: 0 additions & 83 deletions
This file was deleted.

hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,9 @@
1919
package org.apache.hudi.utils;
2020

2121
import org.apache.hudi.common.table.HoodieTableMetaClient;
22-
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
23-
import org.apache.hudi.common.table.view.FileSystemViewStorageType;
2422
import org.apache.hudi.common.util.FileIOUtils;
2523
import org.apache.hudi.configuration.FlinkOptions;
2624
import org.apache.hudi.util.StreamerUtil;
27-
import org.apache.hudi.util.ViewStorageProperties;
2825

2926
import org.apache.flink.configuration.Configuration;
3027
import org.junit.jupiter.api.Test;
@@ -101,13 +98,5 @@ void testInstantTimeDiff() {
10198
long diff = StreamerUtil.instantTimeDiffSeconds(higher, lower);
10299
assertThat(diff, is(75L));
103100
}
104-
105-
@Test
106-
void testDumpRemoteViewStorageConfig() throws IOException {
107-
Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
108-
StreamerUtil.createWriteClient(conf);
109-
FileSystemViewStorageConfig storageConfig = ViewStorageProperties.loadFromProperties(conf.getString(FlinkOptions.PATH));
110-
assertThat(storageConfig.getStorageType(), is(FileSystemViewStorageType.REMOTE_FIRST));
111-
}
112101
}
113102

hudi-flink/src/test/java/org/apache/hudi/utils/TestViewStorageProperties.java

Lines changed: 0 additions & 57 deletions
This file was deleted.

0 commit comments

Comments
 (0)