diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java index 5e2a9fdf87957..ff44c7cef017b 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java @@ -239,6 +239,7 @@ public void close() { public void reset() { preferredView.reset(); secondaryView.reset(); + errorOnPreferredView = false; } @Override @@ -255,6 +256,7 @@ public HoodieTimeline getTimeline() { public void sync() { preferredView.sync(); secondaryView.sync(); + errorOnPreferredView = false; } @Override diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/NetworkUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/NetworkUtils.java index d6a56fe39ca97..35c77590ebd95 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/NetworkUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/NetworkUtils.java @@ -21,7 +21,8 @@ import org.apache.hudi.exception.HoodieException; import java.io.IOException; -import java.net.ServerSocket; +import java.net.InetSocketAddress; +import java.net.Socket; /** * A utility class for network. @@ -29,10 +30,13 @@ public class NetworkUtils { public static synchronized String getHostname() { - ServerSocket s = null; + Socket s = null; try { - s = new ServerSocket(0); - return s.getInetAddress().getHostAddress(); + s = new Socket(); + // see https://stackoverflow.com/questions/9481865/getting-the-ip-address-of-the-current-machine-using-java + // for details. + s.connect(new InetSocketAddress("google.com", 80)); + return s.getLocalAddress().getHostAddress(); } catch (IOException e) { throw new HoodieException("Unable to find server port", e); } finally { diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java index 77d6630044670..13154b217575c 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java @@ -62,7 +62,7 @@ public void open(Configuration parameters) throws Exception { if (conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED)) { // do not use the remote filesystem view because the async cleaning service // local timeline is very probably to fall behind with the remote one. - this.writeClient = StreamerUtil.createWriteClient(conf, getRuntimeContext()); + this.writeClient = StreamerUtil.createWriteClient(conf, getRuntimeContext(), false); this.executor = NonThrownExecutor.builder(LOG).build(); } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java index 513179bc51f6c..f6055ba11d2fc 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java @@ -120,7 +120,7 @@ public void initializeState(StateInitializationContext context) throws Exception } this.hadoopConf = StreamerUtil.getHadoopConf(); - this.writeConfig = StreamerUtil.getHoodieClientConfig(this.conf); + this.writeConfig = StreamerUtil.getHoodieClientConfig(this.conf, true); this.hoodieTable = FlinkTables.createTable(writeConfig, hadoopConf, getRuntimeContext()); this.aggregateManager = getRuntimeContext().getGlobalAggregateManager(); diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java index 67040c1870852..73fd6685539fb 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java @@ -113,7 +113,7 @@ public BucketAssignFunction(Configuration conf) { @Override public void open(Configuration parameters) throws Exception { super.open(parameters); - HoodieWriteConfig writeConfig = StreamerUtil.getHoodieClientConfig(this.conf); + HoodieWriteConfig writeConfig = StreamerUtil.getHoodieClientConfig(this.conf, true); HoodieFlinkEngineContext context = new HoodieFlinkEngineContext( new SerializableConfiguration(StreamerUtil.getHadoopConf()), new FlinkTaskContextSupplier(getRuntimeContext())); diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/FlinkTables.java b/hudi-flink/src/main/java/org/apache/hudi/util/FlinkTables.java index 9be47752be715..6918a06b186b8 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/util/FlinkTables.java +++ b/hudi-flink/src/main/java/org/apache/hudi/util/FlinkTables.java @@ -46,7 +46,7 @@ public static HoodieFlinkTable createTable(Configuration conf, RuntimeContext HoodieFlinkEngineContext context = new HoodieFlinkEngineContext( new SerializableConfiguration(getHadoopConf()), new FlinkTaskContextSupplier(runtimeContext)); - HoodieWriteConfig writeConfig = getHoodieClientConfig(conf); + HoodieWriteConfig writeConfig = getHoodieClientConfig(conf, true); return HoodieFlinkTable.create(writeConfig, context); } @@ -71,7 +71,7 @@ public static HoodieFlinkTable createTable( *

This expects to be used by driver. */ public static HoodieFlinkTable createTable(Configuration conf) { - HoodieWriteConfig writeConfig = StreamerUtil.getHoodieClientConfig(conf); + HoodieWriteConfig writeConfig = StreamerUtil.getHoodieClientConfig(conf, true, false); return HoodieFlinkTable.create(writeConfig, HoodieFlinkEngineContext.DEFAULT); } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index 516c75b8708fc..98df0bbcfd868 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -33,6 +33,7 @@ import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.common.util.ValidationUtils; @@ -144,7 +145,21 @@ public static org.apache.hadoop.conf.Configuration getHadoopConf() { return FlinkClientUtil.getHadoopConf(); } + /** + * Mainly used for tests. + */ public static HoodieWriteConfig getHoodieClientConfig(Configuration conf) { + return getHoodieClientConfig(conf, false, false); + } + + public static HoodieWriteConfig getHoodieClientConfig(Configuration conf, boolean loadFsViewStorageConfig) { + return getHoodieClientConfig(conf, false, loadFsViewStorageConfig); + } + + public static HoodieWriteConfig getHoodieClientConfig( + Configuration conf, + boolean enableEmbeddedTimelineService, + boolean loadFsViewStorageConfig) { HoodieWriteConfig.Builder builder = HoodieWriteConfig.newBuilder() .withEngineType(EngineType.FLINK) @@ -189,13 +204,20 @@ public static HoodieWriteConfig getHoodieClientConfig(Configuration conf) { .withPayloadOrderingField(conf.getString(FlinkOptions.PRECOMBINE_FIELD)) .withPayloadEventTimeField(conf.getString(FlinkOptions.PRECOMBINE_FIELD)) .build()) + .withEmbeddedTimelineServerEnabled(enableEmbeddedTimelineService) .withEmbeddedTimelineServerReuseEnabled(true) // make write client embedded timeline service singleton .withAutoCommit(false) .withAllowOperationMetadataField(conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED)) .withProps(flinkConf2TypedProperties(conf)) .withSchema(getSourceSchema(conf).toString()); - return builder.build(); + HoodieWriteConfig writeConfig = builder.build(); + if (loadFsViewStorageConfig) { + // do not use the builder to give a change for recovering the original fs view storage config + FileSystemViewStorageConfig viewStorageConfig = ViewStorageProperties.loadFromProperties(conf.getString(FlinkOptions.PATH)); + writeConfig.setViewStorageConfig(viewStorageConfig); + } + return writeConfig; } /** @@ -341,15 +363,28 @@ public static HoodieTableMetaClient createMetaClient(Configuration conf) { /** * Creates the Flink write client. + * + *

This expects to be used by client, the driver should start an embedded timeline server. */ @SuppressWarnings("rawtypes") public static HoodieFlinkWriteClient createWriteClient(Configuration conf, RuntimeContext runtimeContext) { + return createWriteClient(conf, runtimeContext, true); + } + + /** + * Creates the Flink write client. + * + *

This expects to be used by client, set flag {@code loadFsViewStorageConfig} to use + * remote filesystem view storage config, or an in-memory filesystem view storage is used. + */ + @SuppressWarnings("rawtypes") + public static HoodieFlinkWriteClient createWriteClient(Configuration conf, RuntimeContext runtimeContext, boolean loadFsViewStorageConfig) { HoodieFlinkEngineContext context = new HoodieFlinkEngineContext( new SerializableConfiguration(getHadoopConf()), new FlinkTaskContextSupplier(runtimeContext)); - HoodieWriteConfig writeConfig = getHoodieClientConfig(conf); + HoodieWriteConfig writeConfig = getHoodieClientConfig(conf, loadFsViewStorageConfig); return new HoodieFlinkWriteClient<>(context, writeConfig); } @@ -362,9 +397,18 @@ public static HoodieFlinkWriteClient createWriteClient(Configuration conf, Runti */ @SuppressWarnings("rawtypes") public static HoodieFlinkWriteClient createWriteClient(Configuration conf) throws IOException { - HoodieWriteConfig writeConfig = getHoodieClientConfig(conf); + HoodieWriteConfig writeConfig = getHoodieClientConfig(conf, true, false); // build the write client to start the embedded timeline server - return new HoodieFlinkWriteClient<>(HoodieFlinkEngineContext.DEFAULT, writeConfig); + final HoodieFlinkWriteClient writeClient = new HoodieFlinkWriteClient<>(HoodieFlinkEngineContext.DEFAULT, writeConfig); + // create the filesystem view storage properties for client + final FileSystemViewStorageConfig viewStorageConfig = writeConfig.getViewStorageConfig(); + // rebuild the view storage config with simplified options. + FileSystemViewStorageConfig rebuilt = FileSystemViewStorageConfig.newBuilder() + .withStorageType(viewStorageConfig.getStorageType()) + .withRemoteServerHost(viewStorageConfig.getRemoteViewServerHost()) + .withRemoteServerPort(viewStorageConfig.getRemoteViewServerPort()).build(); + ViewStorageProperties.createProperties(conf.getString(FlinkOptions.PATH), rebuilt); + return writeClient; } /** diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/ViewStorageProperties.java b/hudi-flink/src/main/java/org/apache/hudi/util/ViewStorageProperties.java new file mode 100644 index 0000000000000..da55e27f0c03b --- /dev/null +++ b/hudi-flink/src/main/java/org/apache/hudi/util/ViewStorageProperties.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.util; + +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; +import org.apache.hudi.exception.HoodieIOException; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Date; +import java.util.Properties; + +import static org.apache.hudi.common.table.HoodieTableMetaClient.AUXILIARYFOLDER_NAME; + +/** + * Helper class to read/write {@link FileSystemViewStorageConfig}. + */ +public class ViewStorageProperties { + private static final Logger LOG = LoggerFactory.getLogger(ViewStorageProperties.class); + + private static final String FILE_NAME = "view_storage_conf.properties"; + + /** + * Initialize the {@link #FILE_NAME} meta file. + */ + public static void createProperties( + String basePath, + FileSystemViewStorageConfig config) throws IOException { + Path propertyPath = getPropertiesFilePath(basePath); + FileSystem fs = FSUtils.getFs(basePath, StreamerUtil.getHadoopConf()); + fs.delete(propertyPath, false); + try (FSDataOutputStream outputStream = fs.create(propertyPath)) { + config.getProps().store(outputStream, + "Filesystem view storage properties saved on " + new Date(System.currentTimeMillis())); + } + } + + /** + * Read the {@link FileSystemViewStorageConfig} with given table base path. + */ + public static FileSystemViewStorageConfig loadFromProperties(String basePath) { + Path propertyPath = getPropertiesFilePath(basePath); + LOG.info("Loading filesystem view storage properties from " + propertyPath); + FileSystem fs = FSUtils.getFs(basePath, StreamerUtil.getHadoopConf()); + Properties props = new Properties(); + try { + try (FSDataInputStream inputStream = fs.open(propertyPath)) { + props.load(inputStream); + } + return FileSystemViewStorageConfig.newBuilder().fromProperties(props).build(); + } catch (IOException e) { + throw new HoodieIOException("Could not load filesystem view storage properties from " + propertyPath, e); + } + } + + private static Path getPropertiesFilePath(String basePath) { + String auxPath = basePath + Path.SEPARATOR + AUXILIARYFOLDER_NAME; + return new Path(auxPath, FILE_NAME); + } +} diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java index e00fbfac50ce3..57297c50ee82b 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java +++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java @@ -19,9 +19,12 @@ package org.apache.hudi.utils; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; +import org.apache.hudi.common.table.view.FileSystemViewStorageType; import org.apache.hudi.common.util.FileIOUtils; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.util.StreamerUtil; +import org.apache.hudi.util.ViewStorageProperties; import org.apache.flink.configuration.Configuration; import org.junit.jupiter.api.Test; @@ -98,5 +101,13 @@ void testInstantTimeDiff() { long diff = StreamerUtil.instantTimeDiffSeconds(higher, lower); assertThat(diff, is(75L)); } + + @Test + void testDumpRemoteViewStorageConfig() throws IOException { + Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()); + StreamerUtil.createWriteClient(conf); + FileSystemViewStorageConfig storageConfig = ViewStorageProperties.loadFromProperties(conf.getString(FlinkOptions.PATH)); + assertThat(storageConfig.getStorageType(), is(FileSystemViewStorageType.REMOTE_FIRST)); + } } diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestViewStorageProperties.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestViewStorageProperties.java new file mode 100644 index 0000000000000..f80760bf1fd85 --- /dev/null +++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestViewStorageProperties.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utils; + +import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; +import org.apache.hudi.common.table.view.FileSystemViewStorageType; +import org.apache.hudi.util.ViewStorageProperties; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.File; +import java.io.IOException; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; + +/** + * Test cases for {@link ViewStorageProperties}. + */ +public class TestViewStorageProperties { + @TempDir + File tempFile; + + @Test + void testReadWriteProperties() throws IOException { + String basePath = tempFile.getAbsolutePath(); + FileSystemViewStorageConfig config = FileSystemViewStorageConfig.newBuilder() + .withStorageType(FileSystemViewStorageType.SPILLABLE_DISK) + .withRemoteServerHost("host1") + .withRemoteServerPort(1234).build(); + ViewStorageProperties.createProperties(basePath, config); + ViewStorageProperties.createProperties(basePath, config); + ViewStorageProperties.createProperties(basePath, config); + + FileSystemViewStorageConfig readConfig = ViewStorageProperties.loadFromProperties(basePath); + assertThat(readConfig.getStorageType(), is(FileSystemViewStorageType.SPILLABLE_DISK)); + assertThat(readConfig.getRemoteViewServerHost(), is("host1")); + assertThat(readConfig.getRemoteViewServerPort(), is(1234)); + } +}