diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchivedCommitsCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchivedCommitsCommand.java index 9732ce72b913..091c4b4173ae 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchivedCommitsCommand.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchivedCommitsCommand.java @@ -29,6 +29,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.table.HoodieSparkTable; @@ -72,6 +73,8 @@ public void init() throws Exception { HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath) .withSchema(HoodieTestCommitMetadataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2) .withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 3).build()) + .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder() + .withRemoteServerPort(timelineServicePort).build()) .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build()) .forTable("test-trip-table").build(); diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCommitsCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCommitsCommand.java index d71e7ec8d987..959346e15007 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCommitsCommand.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCommitsCommand.java @@ -32,6 +32,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; +import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.util.NumericUtils; @@ -209,6 +210,8 @@ public void testShowArchivedCommits() throws Exception { HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath1) .withSchema(HoodieTestCommitMetadataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2) .withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 3).build()) + .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder() + .withRemoteServerPort(timelineServicePort).build()) .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build()) .forTable("test-trip-table").build(); diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCompactionCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCompactionCommand.java index de305f404455..21841a576945 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCompactionCommand.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCompactionCommand.java @@ -33,6 +33,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; +import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.common.testutils.CompactionTestUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieCompactionConfig; @@ -159,6 +160,8 @@ private void generateArchive() throws IOException { HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath) .withSchema(HoodieTestCommitMetadataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2) .withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 3).build()) + .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder() + .withRemoteServerPort(timelineServicePort).build()) .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build()) .forTable("test-trip-table").build(); // archive diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestUpgradeDowngradeCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestUpgradeDowngradeCommand.java index a39329e018bf..cba6d901b956 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestUpgradeDowngradeCommand.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestUpgradeDowngradeCommand.java @@ -28,8 +28,10 @@ import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableVersion; import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; +import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.common.testutils.FileCreateUtils; import org.apache.hudi.common.testutils.HoodieTestTable; +import org.apache.hudi.testutils.HoodieClientTestUtils; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; @@ -62,6 +64,8 @@ public void init() throws Exception { new TableCommand().createTable( tablePath, tableName, HoodieTableType.COPY_ON_WRITE.name(), "", TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload"); + timelineService = HoodieClientTestUtils.initTimelineService( + context, basePath(), FileSystemViewStorageConfig.REMOTE_PORT_NUM.defaultValue()); metaClient = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient()); //Create some commits files and base files HoodieTestTable.of(metaClient) diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/functional/CLIFunctionalTestHarness.java b/hudi-cli/src/test/java/org/apache/hudi/cli/functional/CLIFunctionalTestHarness.java index b2b9f86b0207..7a12a6692a2b 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/functional/CLIFunctionalTestHarness.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/functional/CLIFunctionalTestHarness.java @@ -22,7 +22,10 @@ import org.apache.hudi.client.HoodieReadClient; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.common.HoodieSparkEngineContext; +import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; +import org.apache.hudi.testutils.HoodieClientTestUtils; import org.apache.hudi.testutils.providers.SparkProvider; +import org.apache.hudi.timeline.service.TimelineService; import org.apache.hadoop.conf.Configuration; import org.apache.spark.SparkConf; @@ -39,10 +42,13 @@ public class CLIFunctionalTestHarness implements SparkProvider { + protected static int timelineServicePort = + FileSystemViewStorageConfig.REMOTE_PORT_NUM.defaultValue(); + protected static transient TimelineService timelineService; + protected static transient HoodieSparkEngineContext context; private static transient SparkSession spark; private static transient SQLContext sqlContext; private static transient JavaSparkContext jsc; - private static transient HoodieSparkEngineContext context; private static transient JLineShellComponent shell; /** * An indicator of the initialization status. @@ -107,6 +113,9 @@ public synchronized void runBeforeEach() { jsc = new JavaSparkContext(spark.sparkContext()); context = new HoodieSparkEngineContext(jsc); shell = new Bootstrap().getJLineShellComponent(); + timelineService = HoodieClientTestUtils.initTimelineService( + context, basePath(), incrementTimelineServicePortToUse()); + timelineServicePort = timelineService.getServerPort(); } } @@ -120,14 +129,25 @@ public static synchronized void cleanUpAfterAll() { shell.stop(); shell = null; } + if (timelineService != null) { + timelineService.close(); + } } /** * Helper to prepare string for matching. + * * @param str Input string. * @return pruned string with non word characters removed. */ protected static String removeNonWordAndStripSpace(String str) { return str.replaceAll("[\\s]+", ",").replaceAll("[\\W]+", ","); } + + protected int incrementTimelineServicePortToUse() { + // Increment the timeline service port for each individual test + // to avoid port reuse causing failures + timelineServicePort = (timelineServicePort + 1 - 1024) % (65536 - 1024) + 1024; + return timelineServicePort; + } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 6652f5e0ae5f..44a703f224ee 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -45,6 +45,7 @@ import org.apache.hudi.config.metrics.HoodieMetricsGraphiteConfig; import org.apache.hudi.config.metrics.HoodieMetricsJmxConfig; import org.apache.hudi.config.metrics.HoodieMetricsPrometheusConfig; +import org.apache.hudi.exception.HoodieNotSupportedException; import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.keygen.SimpleAvroKeyGenerator; @@ -248,14 +249,17 @@ public class HoodieWriteConfig extends HoodieConfig { public static final ConfigProperty MARKERS_TYPE = ConfigProperty .key("hoodie.write.markers.type") - .defaultValue(MarkerType.DIRECT.toString()) + .defaultValue(MarkerType.TIMELINE_SERVER_BASED.toString()) .sinceVersion("0.9.0") .withDocumentation("Marker type to use. Two modes are supported: " + "- DIRECT: individual marker file corresponding to each data file is directly " + "created by the writer. " + "- TIMELINE_SERVER_BASED: marker operations are all handled at the timeline service " + "which serves as a proxy. New marker entries are batch processed and stored " - + "in a limited number of underlying files for efficiency."); + + "in a limited number of underlying files for efficiency. If HDFS is used or " + + "timeline server is disabled, DIRECT markers are used as fallback even if this " + + "is configure. For Spark structured streaming, this configuration does not " + + "take effect, i.e., DIRECT markers are always used for Spark structured streaming."); public static final ConfigProperty MARKERS_TIMELINE_SERVER_BASED_BATCH_NUM_THREADS = ConfigProperty .key("hoodie.markers.timeline_server_based.batch.num_threads") @@ -2159,6 +2163,7 @@ public Builder withProperties(Properties properties) { } protected void setDefaults() { + writeConfig.setDefaultValue(MARKERS_TYPE, getDefaultMarkersType(engineType)); // Check for mandatory properties writeConfig.setDefaults(HoodieWriteConfig.class.getName()); // Make sure the props is propagated @@ -2213,5 +2218,18 @@ public HoodieWriteConfig build() { // Build WriteConfig at the end return new HoodieWriteConfig(engineType, writeConfig.getProps()); } + + private String getDefaultMarkersType(EngineType engineType) { + switch (engineType) { + case SPARK: + return MarkerType.TIMELINE_SERVER_BASED.toString(); + case FLINK: + case JAVA: + // Timeline-server-based marker is not supported for Flink and Java engines + return MarkerType.DIRECT.toString(); + default: + throw new HoodieNotSupportedException("Unsupported engine " + engineType); + } + } } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/TimelineServerBasedWriteMarkers.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/TimelineServerBasedWriteMarkers.java index 7ba6cb6bef1d..4879e0bc60c9 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/TimelineServerBasedWriteMarkers.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/TimelineServerBasedWriteMarkers.java @@ -19,9 +19,11 @@ package org.apache.hudi.table.marker; import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.IOType; import org.apache.hudi.common.util.HoodieTimer; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.exception.HoodieRemoteException; import org.apache.hudi.table.HoodieTable; @@ -132,18 +134,24 @@ protected Option create(String partitionPath, String dataFileName, IOType Map paramsMap = new HashMap<>(); paramsMap.put(MARKER_DIR_PATH_PARAM, markerDirPath.toString()); - paramsMap.put(MARKER_NAME_PARAM, partitionPath + "/" + markerFileName); + if (StringUtils.isNullOrEmpty(partitionPath)) { + paramsMap.put(MARKER_NAME_PARAM, markerFileName); + } else { + paramsMap.put(MARKER_NAME_PARAM, partitionPath + "/" + markerFileName); + } + boolean success; try { success = executeRequestToTimelineServer( - CREATE_MARKER_URL, paramsMap, new TypeReference() {}, RequestMethod.POST); + CREATE_MARKER_URL, paramsMap, new TypeReference() { + }, RequestMethod.POST); } catch (IOException e) { throw new HoodieRemoteException("Failed to create marker file " + partitionPath + "/" + markerFileName, e); } LOG.info("[timeline-server-based] Created marker file " + partitionPath + "/" + markerFileName + " in " + timer.endTimer() + " ms"); if (success) { - return Option.of(new Path(new Path(markerDirPath, partitionPath), markerFileName)); + return Option.of(new Path(FSUtils.getPartitionPath(markerDirPath, partitionPath), markerFileName)); } else { return Option.empty(); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/WriteMarkersFactory.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/WriteMarkersFactory.java index e7d31f3a07b2..dfd55f295812 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/WriteMarkersFactory.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/WriteMarkersFactory.java @@ -18,10 +18,13 @@ package org.apache.hudi.table.marker; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.fs.StorageSchemes; import org.apache.hudi.common.table.marker.MarkerType; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.table.HoodieTable; +import com.esotericsoftware.minlog.Log; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -43,6 +46,18 @@ public static WriteMarkers get(MarkerType markerType, HoodieTable table, String case DIRECT: return new DirectWriteMarkers(table, instantTime); case TIMELINE_SERVER_BASED: + if (!table.getConfig().isEmbeddedTimelineServerEnabled()) { + Log.warn("Timeline-server-based markers are configured as the marker type " + + "but embedded timeline server is not enabled. Falling back to direct markers."); + return new DirectWriteMarkers(table, instantTime); + } + String basePath = table.getMetaClient().getBasePath(); + if (StorageSchemes.HDFS.getScheme().equals( + FSUtils.getFs(basePath, table.getContext().getHadoopConf().newCopy()).getScheme())) { + Log.warn("Timeline-server-based markers are not supported for HDFS: " + + "base path " + basePath + ". Falling back to direct markers."); + return new DirectWriteMarkers(table, instantTime); + } return new TimelineServerBasedWriteMarkers(table, instantTime); default: throw new HoodieException("The marker type \"" + markerType.name() + "\" is not supported."); diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java index 1df71e812d6b..59b7a9274c94 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java @@ -19,8 +19,10 @@ package org.apache.hudi.config; import org.apache.hudi.common.engine.EngineType; +import org.apache.hudi.common.table.marker.MarkerType; import org.apache.hudi.config.HoodieWriteConfig.Builder; import org.apache.hudi.index.HoodieIndex; + import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; @@ -32,6 +34,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Properties; +import java.util.function.Function; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -67,63 +70,38 @@ public void testPropertyLoading(boolean withAlternative) throws IOException { @Test public void testDefaultIndexAccordingToEngineType() { - // default bloom - HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath("/tmp").build(); - assertEquals(HoodieIndex.IndexType.BLOOM, writeConfig.getIndexType()); - - // spark default bloom - writeConfig = HoodieWriteConfig.newBuilder().withEngineType(EngineType.SPARK).withPath("/tmp").build(); - assertEquals(HoodieIndex.IndexType.BLOOM, writeConfig.getIndexType()); - - // flink default in-memory - writeConfig = HoodieWriteConfig.newBuilder().withEngineType(EngineType.FLINK).withPath("/tmp").build(); - assertEquals(HoodieIndex.IndexType.INMEMORY, writeConfig.getIndexType()); + testEngineSpecificConfig(HoodieWriteConfig::getIndexType, + constructConfigMap( + EngineType.SPARK, HoodieIndex.IndexType.BLOOM, + EngineType.FLINK, HoodieIndex.IndexType.INMEMORY, + EngineType.JAVA, HoodieIndex.IndexType.INMEMORY)); } @Test public void testDefaultClusteringPlanStrategyClassAccordingToEngineType() { - // Default (as Spark) - HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath("/tmp").build(); - assertEquals( - HoodieClusteringConfig.SPARK_SIZED_BASED_CLUSTERING_PLAN_STRATEGY, - writeConfig.getClusteringPlanStrategyClass()); - - // Spark - writeConfig = HoodieWriteConfig.newBuilder().withEngineType(EngineType.SPARK).withPath("/tmp").build(); - assertEquals( - HoodieClusteringConfig.SPARK_SIZED_BASED_CLUSTERING_PLAN_STRATEGY, - writeConfig.getClusteringPlanStrategyClass()); - - // Flink and Java - for (EngineType engineType : new EngineType[] {EngineType.FLINK, EngineType.JAVA}) { - writeConfig = HoodieWriteConfig.newBuilder().withEngineType(engineType).withPath("/tmp").build(); - assertEquals( - HoodieClusteringConfig.JAVA_SIZED_BASED_CLUSTERING_PLAN_STRATEGY, - writeConfig.getClusteringPlanStrategyClass()); - } + testEngineSpecificConfig(HoodieWriteConfig::getClusteringPlanStrategyClass, + constructConfigMap( + EngineType.SPARK, HoodieClusteringConfig.SPARK_SIZED_BASED_CLUSTERING_PLAN_STRATEGY, + EngineType.FLINK, HoodieClusteringConfig.JAVA_SIZED_BASED_CLUSTERING_PLAN_STRATEGY, + EngineType.JAVA, HoodieClusteringConfig.JAVA_SIZED_BASED_CLUSTERING_PLAN_STRATEGY)); } @Test public void testDefaultClusteringExecutionStrategyClassAccordingToEngineType() { - // Default (as Spark) - HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath("/tmp").build(); - assertEquals( - HoodieClusteringConfig.SPARK_SORT_AND_SIZE_EXECUTION_STRATEGY, - writeConfig.getClusteringExecutionStrategyClass()); - - // Spark - writeConfig = HoodieWriteConfig.newBuilder().withEngineType(EngineType.SPARK).withPath("/tmp").build(); - assertEquals( - HoodieClusteringConfig.SPARK_SORT_AND_SIZE_EXECUTION_STRATEGY, - writeConfig.getClusteringExecutionStrategyClass()); + testEngineSpecificConfig(HoodieWriteConfig::getClusteringExecutionStrategyClass, + constructConfigMap( + EngineType.SPARK, HoodieClusteringConfig.SPARK_SORT_AND_SIZE_EXECUTION_STRATEGY, + EngineType.FLINK, HoodieClusteringConfig.JAVA_SORT_AND_SIZE_EXECUTION_STRATEGY, + EngineType.JAVA, HoodieClusteringConfig.JAVA_SORT_AND_SIZE_EXECUTION_STRATEGY)); + } - // Flink and Java - for (EngineType engineType : new EngineType[] {EngineType.FLINK, EngineType.JAVA}) { - writeConfig = HoodieWriteConfig.newBuilder().withEngineType(engineType).withPath("/tmp").build(); - assertEquals( - HoodieClusteringConfig.JAVA_SORT_AND_SIZE_EXECUTION_STRATEGY, - writeConfig.getClusteringExecutionStrategyClass()); - } + @Test + public void testDefaultMarkersTypeAccordingToEngineType() { + testEngineSpecificConfig(HoodieWriteConfig::getMarkersType, + constructConfigMap( + EngineType.SPARK, MarkerType.TIMELINE_SERVER_BASED, + EngineType.FLINK, MarkerType.DIRECT, + EngineType.JAVA, MarkerType.DIRECT)); } private ByteArrayOutputStream saveParamsIntoOutputStream(Map params) throws IOException { @@ -133,4 +111,44 @@ private ByteArrayOutputStream saveParamsIntoOutputStream(Map par properties.store(outStream, "Saved on " + new Date(System.currentTimeMillis())); return outStream; } + + /** + * Tests the engine-specific configuration values for one configuration key . + * + * @param getConfigFunc Function to get the config value. + * @param expectedConfigMap Expected config map, with key as the engine type + * and value as the corresponding config value for the engine. + */ + private void testEngineSpecificConfig(Function getConfigFunc, + Map expectedConfigMap) { + HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath("/tmp").build(); + assertEquals(expectedConfigMap.get(EngineType.SPARK), getConfigFunc.apply(writeConfig)); + + for (EngineType engineType : expectedConfigMap.keySet()) { + writeConfig = HoodieWriteConfig.newBuilder() + .withEngineType(engineType).withPath("/tmp").build(); + assertEquals(expectedConfigMap.get(engineType), getConfigFunc.apply(writeConfig)); + } + } + + /** + * Constructs the map. + * + * @param k1 First engine type. + * @param v1 Config value for the first engine type. + * @param k2 Second engine type. + * @param v2 Config value for the second engine type. + * @param k3 Third engine type. + * @param v3 Config value for the third engine type. + * @return {@link Map} instance, with key as the engine type + * and value as the corresponding config value for the engine. + */ + private Map constructConfigMap( + EngineType k1, Object v1, EngineType k2, Object v2, EngineType k3, Object v3) { + Map mapping = new HashMap<>(); + mapping.put(k1, v1); + mapping.put(k2, v2); + mapping.put(k3, v3); + return mapping; + } } diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/marker/TestWriteMarkersFactory.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/marker/TestWriteMarkersFactory.java new file mode 100644 index 000000000000..21c0e8108a53 --- /dev/null +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/marker/TestWriteMarkersFactory.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.table.marker; + +import org.apache.hudi.common.config.SerializableConfiguration; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.fs.HoodieWrapperFileSystem; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.marker.MarkerType; +import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; +import org.apache.hudi.common.testutils.HoodieCommonTestHarness; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.table.HoodieTable; + +import org.apache.hadoop.conf.Configuration; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.mockito.Mockito; + +import java.io.IOException; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; + +public class TestWriteMarkersFactory extends HoodieCommonTestHarness { + private static final String NON_HDFS_BASE_PATH = "/tmp/dir"; + private static final String HDFS_BASE_PATH = "hdfs://localhost/dir"; + private final HoodieWriteConfig writeConfig = Mockito.mock(HoodieWriteConfig.class); + private final HoodieTableMetaClient metaClient = Mockito.mock(HoodieTableMetaClient.class); + private final HoodieWrapperFileSystem fileSystem = Mockito.mock(HoodieWrapperFileSystem.class); + private final HoodieEngineContext context = Mockito.mock(HoodieEngineContext.class); + private final HoodieTable table = Mockito.mock(HoodieTable.class); + + @BeforeEach + public void init() throws IOException { + initMetaClient(); + } + + public static Stream configParams() { + Object[][] data = new Object[][] { + {NON_HDFS_BASE_PATH, true}, {HDFS_BASE_PATH, false}, + {NON_HDFS_BASE_PATH, true}, {HDFS_BASE_PATH, false}, + }; + return Stream.of(data).map(Arguments::of); + } + + @ParameterizedTest + @MethodSource("configParams") + public void testDirectMarkers(String basePath, boolean isTimelineServerEnabled) { + testWriteMarkersFactory( + MarkerType.DIRECT, basePath, isTimelineServerEnabled, DirectWriteMarkers.class); + } + + @Test + public void testTimelineServerBasedMarkersWithTimelineServerEnabled() { + testWriteMarkersFactory( + MarkerType.TIMELINE_SERVER_BASED, NON_HDFS_BASE_PATH, true, + TimelineServerBasedWriteMarkers.class); + } + + @Test + public void testTimelineServerBasedMarkersWithTimelineServerDisabled() { + // Fallback to direct markers should happen + testWriteMarkersFactory( + MarkerType.TIMELINE_SERVER_BASED, NON_HDFS_BASE_PATH, false, + DirectWriteMarkers.class); + } + + @Test + public void testTimelineServerBasedMarkersWithHDFS() { + // Fallback to direct markers should happen + testWriteMarkersFactory( + MarkerType.TIMELINE_SERVER_BASED, HDFS_BASE_PATH, true, + DirectWriteMarkers.class); + } + + private void testWriteMarkersFactory( + MarkerType markerTypeConfig, String basePath, boolean isTimelineServerEnabled, + Class expectedWriteMarkersClass) { + String instantTime = "001"; + Mockito.when(table.getConfig()).thenReturn(writeConfig); + Mockito.when(writeConfig.isEmbeddedTimelineServerEnabled()) + .thenReturn(isTimelineServerEnabled); + Mockito.when(table.getMetaClient()).thenReturn(metaClient); + Mockito.when(metaClient.getFs()).thenReturn(fileSystem); + Mockito.when(metaClient.getBasePath()).thenReturn(basePath); + Mockito.when(metaClient.getMarkerFolderPath(any())).thenReturn(basePath + ".hoodie/.temp"); + Mockito.when(table.getContext()).thenReturn(context); + Mockito.when(context.getHadoopConf()).thenReturn(new SerializableConfiguration(new Configuration())); + Mockito.when(writeConfig.getViewStorageConfig()) + .thenReturn(FileSystemViewStorageConfig.newBuilder().build()); + assertEquals(expectedWriteMarkersClass, + WriteMarkersFactory.get(markerTypeConfig, table, instantTime).getClass()); + } +} diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java index afc4caea2d51..1d2db2b67073 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java @@ -25,6 +25,7 @@ import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.TableServiceType; import org.apache.hudi.common.model.WriteConcurrencyMode; +import org.apache.hudi.common.table.marker.MarkerType; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.common.table.view.FileSystemViewStorageType; @@ -92,6 +93,8 @@ public void testHoodieClientBasicMultiWriter(HoodieTableType tableType) throws E .withCompactionConfig(HoodieCompactionConfig.newBuilder() .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).withAutoClean(false).build()) .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL) + // Timeline-server-based markers are not used for multi-writer tests + .withMarkersType(MarkerType.DIRECT.name()) .withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(FileSystemBasedLockProviderTestClass.class) .build()).withAutoCommit(false).withProperties(properties).build(); // Create the first commit @@ -163,6 +166,8 @@ public void testMultiWriterWithInsertsToDistinctPartitions(HoodieTableType table .withLockProvider(FileSystemBasedLockProviderTestClass.class) .build()) .withAutoCommit(false) + // Timeline-server-based markers are not used for multi-writer tests + .withMarkersType(MarkerType.DIRECT.name()) .withProperties(properties) .build(); @@ -209,11 +214,13 @@ private void testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType t properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, "5000"); // Disabling embedded timeline server, it doesn't work with multiwriter HoodieWriteConfig.Builder writeConfigBuilder = getConfigBuilder() - .withCompactionConfig(HoodieCompactionConfig.newBuilder().withAutoClean(false) - .withInlineCompaction(false).withAsyncClean(true) - .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY) - .withMaxNumDeltaCommitsBeforeCompaction(2).build()) + .withCompactionConfig(HoodieCompactionConfig.newBuilder().withAutoClean(false) + .withInlineCompaction(false).withAsyncClean(true) + .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY) + .withMaxNumDeltaCommitsBeforeCompaction(2).build()) .withEmbeddedTimelineServerEnabled(false) + // Timeline-server-based markers are not used for multi-writer tests + .withMarkersType(MarkerType.DIRECT.name()) .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder().withStorageType( FileSystemViewStorageType.MEMORY).build()) .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL) @@ -327,6 +334,8 @@ public void testHoodieClientMultiWriterWithClustering(HoodieTableType tableType) .withCompactionConfig(HoodieCompactionConfig.newBuilder().withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY) .withAutoClean(false).build()) .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL) + // Timeline-server-based markers are not used for multi-writer tests + .withMarkersType(MarkerType.DIRECT.name()) .withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(FileSystemBasedLockProviderTestClass.class) .build()).withAutoCommit(false).withProperties(properties); HoodieWriteConfig cfg = writeConfigBuilder.build(); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java index 96782f49428c..00e65a67c08e 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java @@ -23,6 +23,7 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.testutils.RawTripTestPayload; import org.apache.hudi.common.util.BaseFileUtils; @@ -64,6 +65,7 @@ public void setUp() throws Exception { HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath); initSparkContexts("TestUpdateSchemaEvolution"); initFileSystem(); + initTimelineService(); } @AfterEach @@ -228,6 +230,9 @@ public void testSchemaEvolutionOnUpdateMisMatchWithChangeColumnType() throws Exc private HoodieWriteConfig makeHoodieClientConfig(String name) { Schema schema = getSchemaFromResource(getClass(), name); - return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(schema.toString()).build(); + return HoodieWriteConfig.newBuilder().withPath(basePath) + .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder() + .withRemoteServerPort(timelineServicePort).build()) + .withSchema(schema.toString()).build(); } } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java index 005d031cb9df..e3dfa4bb5d91 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java @@ -1286,6 +1286,7 @@ public HoodieWriteConfig.Builder getConfigBuilder(String schemaStr, HoodieIndex. .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType).build()) .withEmbeddedTimelineServerEnabled(true).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder() .withEnableBackupForRemoteFileSystemView(false) // Fail test if problem connecting to timeline-server + .withRemoteServerPort(timelineServicePort) .withStorageType(FileSystemViewStorageType.EMBEDDED_KV_STORE).build()); } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java index 30d59baece09..d1575374e99c 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java @@ -47,11 +47,13 @@ import org.apache.hudi.common.model.IOType; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.marker.MarkerType; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; +import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.testutils.HoodieTestTable; @@ -61,6 +63,7 @@ import org.apache.hudi.common.util.ClusteringUtils; import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.FileIOUtils; +import org.apache.hudi.common.util.MarkerUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.collection.Pair; @@ -2336,13 +2339,22 @@ private Pair> testConsistencyCheck(HoodieTableMetaCli // Create a dummy marker file to simulate the case that a marker file was created without data file. // This should fail the commit - String partitionPath = Arrays - .stream(fs.globStatus(new Path(String.format("%s/*/*/*/*", metaClient.getMarkerFolderPath(instantTime))), - path -> path.toString().contains(HoodieTableMetaClient.MARKER_EXTN))) - .limit(1).map(status -> status.getPath().getParent().toString()).collect(Collectors.toList()).get(0); + String partitionPath; + String markerFolderPath = metaClient.getMarkerFolderPath(instantTime); + if (cfg.getMarkersType() == MarkerType.TIMELINE_SERVER_BASED) { + String markerName = MarkerUtils.readTimelineServerBasedMarkersFromFileSystem( + markerFolderPath, fs, context, 1).values().stream() + .flatMap(Collection::stream).findFirst().get(); + partitionPath = new Path(markerFolderPath, markerName).getParent().toString(); + } else { + partitionPath = Arrays + .stream(fs.globStatus(new Path(String.format("%s/*/*/*/*", markerFolderPath)), + path -> path.toString().contains(HoodieTableMetaClient.MARKER_EXTN))) + .limit(1).map(status -> status.getPath().getParent().toString()).collect(Collectors.toList()).get(0); + } Option markerFilePath = WriteMarkersFactory.get( - cfg.getMarkersType(), getHoodieTable(metaClient, cfg), instantTime) + cfg.getMarkersType(), getHoodieTable(metaClient, cfg), instantTime) .create(partitionPath, FSUtils.makeDataFileName(instantTime, "1-0-1", UUID.randomUUID().toString()), IOType.MERGE); @@ -2489,6 +2501,8 @@ private HoodieWriteConfig getParallelWritingWriteConfig(HoodieFailedWritesCleani .withAutoClean(false).build()) .withTimelineLayoutVersion(1) .withHeartbeatIntervalInMs(3 * 1000) + .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder() + .withRemoteServerPort(timelineServicePort).build()) .withAutoCommit(false) .withProperties(populateMetaFields ? new Properties() : getPropertiesForKeyGen()).build(); } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java index 5617058bb8af..110b2b95a76b 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java @@ -84,6 +84,7 @@ public void init(HoodieTableType tableType, boolean enableMetadataTable, boolean initSparkContexts("TestHoodieMetadata"); initFileSystem(); fs.mkdirs(new Path(basePath)); + initTimelineService(); initMetaClient(tableType); initTestDataGenerator(); metadataTableBasePath = HoodieTableMetadata.getMetadataTableBasePath(basePath); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBootstrap.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBootstrap.java index 12c8410c35e0..39a24f05a4fa 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBootstrap.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBootstrap.java @@ -21,6 +21,7 @@ import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.common.testutils.HoodieMetadataTestTable; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.testutils.HoodieTestTable; @@ -241,6 +242,8 @@ private HoodieWriteConfig getWriteConfig(int minArchivalCommits, int maxArchival return HoodieWriteConfig.newBuilder().withPath(basePath) .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2) .withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(minArchivalCommits, maxArchivalCommits).build()) + .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder() + .withRemoteServerPort(timelineServicePort).build()) .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build()) .forTable("test-trip-table").build(); } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java index 7cb9740a8c6c..aeef69e53dfd 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java @@ -36,6 +36,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant.State; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.versioning.clean.CleanPlanV2MigrationHandler; +import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.common.testutils.HoodieMetadataTestTable; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.testutils.HoodieTestTable; @@ -94,6 +95,7 @@ public void init() throws Exception { public void init(HoodieTableType tableType) throws Exception { initPath(); initSparkContexts(); + initTimelineService(); initMetaClient(); hadoopConf = context.getHadoopConf().get(); metaClient.getFs().mkdirs(new Path(basePath)); @@ -126,6 +128,8 @@ private HoodieWriteConfig initTestTableAndGetWriteConfig(boolean enableMetadata, HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath(basePath) .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2) .withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(minArchivalCommits, maxArchivalCommits).build()) + .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder() + .withRemoteServerPort(timelineServicePort).build()) .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadata) .withMaxNumDeltaCommitsBeforeCompaction(maxDeltaCommitsMetadataTable).build()) .forTable("test-trip-table").build(); @@ -210,6 +214,8 @@ public void testArchiveCommitSavepointNoHole() throws Exception { HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(basePath) .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2).forTable("test-trip-table") .withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 5).build()) + .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder() + .withRemoteServerPort(timelineServicePort).build()) .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build()) .build(); @@ -328,6 +334,8 @@ public void testArchiveCommitTimeline() throws Exception { HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA) .withParallelism(2, 2).forTable("test-trip-table") .withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 3).build()) + .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder() + .withRemoteServerPort(timelineServicePort).build()) .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build()) .build(); metaClient = HoodieTableMetaClient.reload(metaClient); @@ -484,6 +492,8 @@ public void testArchiveCompletedRollbackAndClean() throws Exception { HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA) .withParallelism(2, 2).forTable("test-trip-table") .withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(minInstantsToKeep, maxInstantsToKeep).build()) + .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder() + .withRemoteServerPort(timelineServicePort).build()) .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build()) .build(); metaClient = HoodieTableMetaClient.reload(metaClient); @@ -519,6 +529,8 @@ public void testArchiveInflightClean() throws Exception { HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA) .withParallelism(2, 2).forTable("test-trip-table") .withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 3).build()) + .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder() + .withRemoteServerPort(timelineServicePort).build()) .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build()) .build(); metaClient = HoodieTableMetaClient.reload(metaClient); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieInternalRowParquetWriter.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieInternalRowParquetWriter.java index de5555543465..dd509a86b4e9 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieInternalRowParquetWriter.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieInternalRowParquetWriter.java @@ -18,8 +18,6 @@ package org.apache.hudi.io.storage.row; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; import org.apache.hudi.common.bloom.BloomFilter; import org.apache.hudi.common.bloom.BloomFilterFactory; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; @@ -27,6 +25,9 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.testutils.HoodieClientTestHarness; import org.apache.hudi.testutils.SparkDatasetTestUtils; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; @@ -66,7 +67,8 @@ public void tearDown() throws Exception { @ParameterizedTest @ValueSource(booleans = {true, false}) public void endToEndTest(boolean parquetWriteLegacyFormatEnabled) throws Exception { - HoodieWriteConfig.Builder writeConfigBuilder = SparkDatasetTestUtils.getConfigBuilder(basePath); + HoodieWriteConfig.Builder writeConfigBuilder = + SparkDatasetTestUtils.getConfigBuilder(basePath, timelineServicePort); for (int i = 0; i < 5; i++) { // init write support and parquet config HoodieRowParquetWriteSupport writeSupport = getWriteSupport(writeConfigBuilder, hadoopConf, parquetWriteLegacyFormatEnabled); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieRowCreateHandle.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieRowCreateHandle.java index 76a91ef124bb..0f308425bc1c 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieRowCreateHandle.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieRowCreateHandle.java @@ -28,8 +28,8 @@ import org.apache.hudi.table.HoodieSparkTable; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.testutils.HoodieClientTestHarness; - import org.apache.hudi.testutils.SparkDatasetTestUtils; + import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.catalyst.InternalRow; @@ -65,6 +65,7 @@ public void setUp() throws Exception { initFileSystem(); initTestDataGenerator(); initMetaClient(); + initTimelineService(); } @AfterEach @@ -75,7 +76,8 @@ public void tearDown() throws Exception { @Test public void testRowCreateHandle() throws Exception { // init config and table - HoodieWriteConfig cfg = SparkDatasetTestUtils.getConfigBuilder(basePath).build(); + HoodieWriteConfig cfg = + SparkDatasetTestUtils.getConfigBuilder(basePath, timelineServicePort).build(); HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient); List fileNames = new ArrayList<>(); List fileAbsPaths = new ArrayList<>(); @@ -116,7 +118,8 @@ public void testRowCreateHandle() throws Exception { @Test public void testGlobalFailure() throws Exception { // init config and table - HoodieWriteConfig cfg = SparkDatasetTestUtils.getConfigBuilder(basePath).build(); + HoodieWriteConfig cfg = + SparkDatasetTestUtils.getConfigBuilder(basePath, timelineServicePort).build(); HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient); String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[0]; @@ -124,7 +127,8 @@ public void testGlobalFailure() throws Exception { String fileId = UUID.randomUUID().toString(); String instantTime = "000"; - HoodieRowCreateHandle handle = new HoodieRowCreateHandle(table, cfg, partitionPath, fileId, instantTime, RANDOM.nextInt(100000), RANDOM.nextLong(), RANDOM.nextLong(), SparkDatasetTestUtils.STRUCT_TYPE); + HoodieRowCreateHandle handle = + new HoodieRowCreateHandle(table, cfg, partitionPath, fileId, instantTime, RANDOM.nextInt(100000), RANDOM.nextLong(), RANDOM.nextLong(), SparkDatasetTestUtils.STRUCT_TYPE); int size = 10 + RANDOM.nextInt(1000); int totalFailures = 5; // Generate first batch of valid rows @@ -169,7 +173,8 @@ public void testGlobalFailure() throws Exception { @Test public void testInstantiationFailure() throws IOException { // init config and table - HoodieWriteConfig cfg = SparkDatasetTestUtils.getConfigBuilder(basePath).withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build()) + HoodieWriteConfig cfg = SparkDatasetTestUtils.getConfigBuilder(basePath, timelineServicePort) + .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build()) .withPath("/dummypath/abc/").build(); HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java index 2305d7bdeb4d..765427928fc3 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java @@ -18,8 +18,6 @@ package org.apache.hudi.table; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.Path; import org.apache.hudi.avro.model.HoodieActionInstant; import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieCleanPartitionMetadata; @@ -62,6 +60,7 @@ import org.apache.hudi.common.table.timeline.versioning.clean.CleanMetadataMigrator; import org.apache.hudi.common.table.timeline.versioning.clean.CleanPlanMigrator; import org.apache.hudi.common.table.timeline.versioning.clean.CleanPlanV1MigrationHandler; +import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.common.table.view.TableFileSystemView; import org.apache.hudi.common.testutils.HoodieMetadataTestTable; import org.apache.hudi.common.testutils.HoodieTestTable; @@ -81,6 +80,9 @@ import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter; import org.apache.hudi.table.action.clean.CleanPlanner; import org.apache.hudi.testutils.HoodieClientTestBase; + +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaRDD; @@ -89,7 +91,6 @@ import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; -import scala.Tuple3; import java.io.File; import java.io.IOException; @@ -111,6 +112,8 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import scala.Tuple3; + import static org.apache.hudi.common.testutils.HoodieTestTable.makeIncrementalCommitTimes; import static org.apache.hudi.common.testutils.HoodieTestTable.makeNewCommitTime; import static org.apache.hudi.common.testutils.HoodieTestUtils.DEFAULT_PARTITION_PATHS; @@ -1300,7 +1303,10 @@ public void testCleanMarkerDataFilesOnRollback() throws Exception { final int numTempFilesBefore = testTable.listAllFilesInTempFolder().length; assertEquals(10, numTempFilesBefore, "Some marker files are created."); - HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).build(); + HoodieWriteConfig config = HoodieWriteConfig.newBuilder() + .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder() + .withRemoteServerPort(timelineServicePort).build()) + .withPath(basePath).build(); metaClient = HoodieTableMetaClient.reload(metaClient); HoodieTable table = HoodieSparkTable.create(config, context, metaClient); table.getActiveTimeline().transitionRequestedToInflight( diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java index 40df1af898ea..91836bcadb92 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java @@ -26,6 +26,7 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.testutils.RawTripTestPayload; import org.apache.hudi.common.testutils.Transformations; @@ -112,7 +113,9 @@ private HoodieWriteConfig makeHoodieClientConfig() { private HoodieWriteConfig.Builder makeHoodieClientConfigBuilder() { // Prepare the AvroParquetIO - return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(SCHEMA.toString()); + return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(SCHEMA.toString()) + .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder() + .withRemoteServerPort(timelineServicePort).build()); } // TODO (weiy): Add testcases for crossing file writing. @@ -405,8 +408,10 @@ public void testFileSizeUpsertRecords() throws Exception { public void testInsertUpsertWithHoodieAvroPayload() throws Exception { Schema schema = getSchemaFromResource(TestCopyOnWriteActionExecutor.class, "/testDataGeneratorSchema.txt"); HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(schema.toString()) - .withStorageConfig(HoodieStorageConfig.newBuilder() - .parquetMaxFileSize(1000 * 1024).hfileMaxFileSize(1000 * 1024).build()).build(); + .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder() + .withRemoteServerPort(timelineServicePort).build()) + .withStorageConfig(HoodieStorageConfig.newBuilder() + .parquetMaxFileSize(1000 * 1024).hfileMaxFileSize(1000 * 1024).build()).build(); metaClient = HoodieTableMetaClient.reload(metaClient); HoodieSparkCopyOnWriteTable table = (HoodieSparkCopyOnWriteTable) HoodieSparkTable.create(config, context, metaClient); String instantTime = "000"; diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java index 1e68a94588f0..1d0177cb0c52 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java @@ -29,8 +29,10 @@ import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.marker.MarkerType; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.common.table.view.SyncableFileSystemView; import org.apache.hudi.common.table.view.TableFileSystemView; @@ -296,6 +298,8 @@ void testRollbackWithDeltaAndCompactionCommit(boolean rollbackUsingMarkers) thro void testMultiRollbackWithDeltaAndCompactionCommit() throws Exception { boolean populateMetaFields = true; HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(false) + // Timeline-server-based markers are not used for multi-rollback tests + .withMarkersType(MarkerType.DIRECT.name()) .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build()); addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields); HoodieWriteConfig cfg = cfgBuilder.build(); @@ -346,6 +350,8 @@ void testMultiRollbackWithDeltaAndCompactionCommit() throws Exception { newCommitTime = "002"; // WriteClient with custom config (disable small file handling) HoodieWriteConfig smallFileWriteConfig = getHoodieWriteConfigWithSmallFileHandlingOffBuilder(populateMetaFields) + // Timeline-server-based markers are not used for multi-rollback tests + .withMarkersType(MarkerType.DIRECT.name()) .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build()).build(); try (SparkRDDWriteClient nClient = getHoodieWriteClient(smallFileWriteConfig)) { nClient.startCommitWithTime(newCommitTime); @@ -467,6 +473,8 @@ private HoodieWriteConfig.Builder getHoodieWriteConfigWithSmallFileHandlingOffBu .withAutoCommit(false) .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024) .withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1).build()) + .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder() + .withRemoteServerPort(timelineServicePort).build()) .withEmbeddedTimelineServerEnabled(true) .withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1024).parquetMaxFileSize(1024).build()).forTable("test-trip-table"); @@ -561,7 +569,7 @@ void testInsertsGeneratedIntoLogFilesRollback(boolean rollbackUsingMarkers) thro } } if (rollbackUsingMarkers) { - metaClient.getFs().copyFromLocalFile(markerDir, + metaClient.getFs().copyFromLocalFile(new Path(markerDir, lastCommitTime), new Path(metaClient.getMarkerFolderPath(lastCommitTime))); } Thread.sleep(1000); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/marker/TestDirectWriteMarkers.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/marker/TestDirectWriteMarkers.java index 0817d6317b15..fa6df3ba73df 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/marker/TestDirectWriteMarkers.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/marker/TestDirectWriteMarkers.java @@ -59,15 +59,18 @@ public void cleanup() { } @Override - void verifyMarkersInFileSystem() throws IOException { + void verifyMarkersInFileSystem(boolean isTablePartitioned) throws IOException { List markerFiles = FileSystemTestUtils.listRecursive(fs, markerFolderPath) .stream().filter(status -> status.getPath().getName().contains(".marker")) .sorted().collect(Collectors.toList()); assertEquals(3, markerFiles.size()); assertIterableEquals(CollectionUtils.createImmutableList( - "file:" + markerFolderPath.toString() + "/2020/06/01/file1.marker.MERGE", - "file:" + markerFolderPath.toString() + "/2020/06/02/file2.marker.APPEND", - "file:" + markerFolderPath.toString() + "/2020/06/03/file3.marker.CREATE"), + "file:" + markerFolderPath.toString() + + (isTablePartitioned ? "/2020/06/01" : "") + "/file1.marker.MERGE", + "file:" + markerFolderPath.toString() + + (isTablePartitioned ? "/2020/06/02" : "") + "/file2.marker.APPEND", + "file:" + markerFolderPath.toString() + + (isTablePartitioned ? "/2020/06/03" : "") + "/file3.marker.CREATE"), markerFiles.stream().map(m -> m.getPath().toString()).collect(Collectors.toList()) ); } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/marker/TestTimelineServerBasedWriteMarkers.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/marker/TestTimelineServerBasedWriteMarkers.java index 583883ccb493..61ee844b1917 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/marker/TestTimelineServerBasedWriteMarkers.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/marker/TestTimelineServerBasedWriteMarkers.java @@ -93,18 +93,20 @@ public void cleanup() { } @Override - void verifyMarkersInFileSystem() throws IOException { + void verifyMarkersInFileSystem(boolean isTablePartitioned) throws IOException { // Verifies the markers List allMarkers = MarkerUtils.readTimelineServerBasedMarkersFromFileSystem( - markerFolderPath.toString(), fs, context, 1) + markerFolderPath.toString(), fs, context, 1) .values().stream().flatMap(Collection::stream).sorted() .collect(Collectors.toList()); assertEquals(3, allMarkers.size()); - assertIterableEquals(CollectionUtils.createImmutableList( - "2020/06/01/file1.marker.MERGE", - "2020/06/02/file2.marker.APPEND", - "2020/06/03/file3.marker.CREATE"), - allMarkers); + List expectedMarkers = isTablePartitioned + ? CollectionUtils.createImmutableList( + "2020/06/01/file1.marker.MERGE", "2020/06/02/file2.marker.APPEND", + "2020/06/03/file3.marker.CREATE") + : CollectionUtils.createImmutableList( + "file1.marker.MERGE", "file2.marker.APPEND", "file3.marker.CREATE"); + assertIterableEquals(expectedMarkers, allMarkers); // Verifies the marker type file Path markerTypeFilePath = new Path(markerFolderPath, MarkerUtils.MARKER_TYPE_FILENAME); assertTrue(MarkerUtils.doesMarkerTypeFileExist(fs, markerFolderPath.toString())); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/marker/TestWriteMarkersBase.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/marker/TestWriteMarkersBase.java index 0298ed37a638..5f96041b372d 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/marker/TestWriteMarkersBase.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/marker/TestWriteMarkersBase.java @@ -31,8 +31,11 @@ import org.apache.hadoop.fs.Path; import org.apache.spark.api.java.JavaSparkContext; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import java.io.IOException; +import java.util.List; import java.util.stream.Collectors; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -48,10 +51,10 @@ public abstract class TestWriteMarkersBase extends HoodieCommonTestHarness { protected JavaSparkContext jsc; protected HoodieSparkEngineContext context; - private void createSomeMarkers() { - writeMarkers.create("2020/06/01", "file1", IOType.MERGE); - writeMarkers.create("2020/06/02", "file2", IOType.APPEND); - writeMarkers.create("2020/06/03", "file3", IOType.CREATE); + private void createSomeMarkers(boolean isTablePartitioned) { + writeMarkers.create(isTablePartitioned ? "2020/06/01" : "", "file1", IOType.MERGE); + writeMarkers.create(isTablePartitioned ? "2020/06/02" : "", "file2", IOType.APPEND); + writeMarkers.create(isTablePartitioned ? "2020/06/03" : "", "file3", IOType.CREATE); } private void createInvalidFile(String partitionPath, String invalidFileName) { @@ -64,22 +67,24 @@ private void createInvalidFile(String partitionPath, String invalidFileName) { } } - abstract void verifyMarkersInFileSystem() throws IOException; + abstract void verifyMarkersInFileSystem(boolean isTablePartitioned) throws IOException; - @Test - public void testCreation() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testCreation(boolean isTablePartitioned) throws Exception { // when - createSomeMarkers(); + createSomeMarkers(isTablePartitioned); // then assertTrue(fs.exists(markerFolderPath)); - verifyMarkersInFileSystem(); + verifyMarkersInFileSystem(isTablePartitioned); } - @Test - public void testDeletionWhenMarkerDirExists() throws IOException { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testDeletionWhenMarkerDirExists(boolean isTablePartitioned) throws IOException { //when - writeMarkers.create("2020/06/01", "file1", IOType.MERGE); + writeMarkers.create(isTablePartitioned ? "2020/06/01" : "", "file1", IOType.MERGE); // then assertTrue(writeMarkers.doesMarkerDirExist()); @@ -95,32 +100,40 @@ public void testDeletionWhenMarkerDirNotExists() throws IOException { assertFalse(writeMarkers.deleteMarkerDir(context, 2)); } - @Test - public void testDataPathsWhenCreatingOrMerging() throws IOException { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testDataPathsWhenCreatingOrMerging(boolean isTablePartitioned) throws IOException { // add markfiles - createSomeMarkers(); + createSomeMarkers(isTablePartitioned); // add invalid file - createInvalidFile("2020/06/01", "invalid_file3"); + createInvalidFile(isTablePartitioned ? "2020/06/01" : "", "invalid_file3"); long fileSize = FileSystemTestUtils.listRecursive(fs, markerFolderPath).stream() .filter(fileStatus -> !fileStatus.getPath().getName().contains(MarkerUtils.MARKER_TYPE_FILENAME)) .count(); assertEquals(fileSize, 4); + List expectedPaths = isTablePartitioned + ? CollectionUtils.createImmutableList("2020/06/01/file1", "2020/06/03/file3") + : CollectionUtils.createImmutableList("file1", "file3"); // then - assertIterableEquals(CollectionUtils.createImmutableList( - "2020/06/01/file1", "2020/06/03/file3"), + assertIterableEquals(expectedPaths, writeMarkers.createdAndMergedDataPaths(context, 2).stream().sorted().collect(Collectors.toList()) ); } - @Test - public void testAllMarkerPaths() throws IOException { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testAllMarkerPaths(boolean isTablePartitioned) throws IOException { // given - createSomeMarkers(); + createSomeMarkers(isTablePartitioned); + List expectedPaths = isTablePartitioned + ? CollectionUtils.createImmutableList("2020/06/01/file1.marker.MERGE", + "2020/06/02/file2.marker.APPEND", "2020/06/03/file3.marker.CREATE") + : CollectionUtils.createImmutableList( + "file1.marker.MERGE", "file2.marker.APPEND", "file3.marker.CREATE"); // then - assertIterableEquals(CollectionUtils.createImmutableList("2020/06/01/file1.marker.MERGE", - "2020/06/02/file2.marker.APPEND", "2020/06/03/file3.marker.CREATE"), + assertIterableEquals(expectedPaths, writeMarkers.allMarkerFilePaths().stream() .filter(path -> !path.contains(MarkerUtils.MARKER_TYPE_FILENAME)) .sorted().collect(Collectors.toList()) diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java index 19ec4e6d0654..5f5dfdec5dce 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java @@ -326,9 +326,11 @@ public void testDowngrade(boolean deletePartialMarkerFiles, HoodieTableType tabl new UpgradeDowngrade(metaClient, cfg, context, SparkUpgradeDowngradeHelper.getInstance()) .run(toVersion, null); - // assert marker files - assertMarkerFilesForDowngrade(table, commitInstant, toVersion == HoodieTableVersion.ONE); - + if (fromVersion == HoodieTableVersion.TWO) { + // assert marker files + assertMarkerFilesForDowngrade(table, commitInstant, toVersion == HoodieTableVersion.ONE); + } + // verify hoodie.table.version got downgraded metaClient = HoodieTableMetaClient.builder().setConf(context.getHadoopConf().get()).setBasePath(cfg.getBasePath()) .setLayoutVersion(Option.of(new TimelineLayoutVersion(cfg.getTimelineLayoutVersion()))).build(); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java index 05d4b5c7df55..ee3c309b30f2 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java @@ -152,6 +152,7 @@ public HoodieWriteConfig.Builder getConfigBuilder(String schemaStr, IndexType in .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType).build()) .withEmbeddedTimelineServerEnabled(true).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder() .withEnableBackupForRemoteFileSystemView(false) // Fail test if problem connecting to timeline-server + .withRemoteServerPort(timelineServicePort) .withStorageType(FileSystemViewStorageType.EMBEDDED_KV_STORE).build()); } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java index 1df77b8b17f6..bdea909e093d 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java @@ -32,6 +32,7 @@ import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.common.table.view.TableFileSystemView; import org.apache.hudi.common.testutils.HoodieCommonTestHarness; @@ -56,6 +57,7 @@ import org.apache.hudi.table.HoodieSparkTable; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.WorkloadStat; +import org.apache.hudi.timeline.service.TimelineService; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -102,6 +104,8 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im private static final Logger LOG = LogManager.getLogger(HoodieClientTestHarness.class); + protected static int timelineServicePort = + FileSystemViewStorageConfig.REMOTE_PORT_NUM.defaultValue(); private String testMethodName; protected transient JavaSparkContext jsc = null; protected transient HoodieSparkEngineContext context = null; @@ -113,6 +117,7 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im protected transient SparkRDDWriteClient writeClient; protected transient HoodieReadClient readClient; protected transient HoodieTableFileSystemView tableView; + protected transient TimelineService timelineService; protected final SparkTaskContextSupplier supplier = new SparkTaskContextSupplier(); @@ -145,12 +150,14 @@ public void initResources() throws IOException { initTestDataGenerator(); initFileSystem(); initMetaClient(); + initTimelineService(); } /** * Cleanups resource group for the subclasses of {@link HoodieClientTestBase}. */ public void cleanupResources() throws IOException { + cleanupTimelineService(); cleanupClients(); cleanupSparkContexts(); cleanupTestDataGenerator(); @@ -245,6 +252,7 @@ protected void cleanupFileSystem() throws IOException { * * @throws IOException */ + @Override protected void initMetaClient() throws IOException { initMetaClient(getTableType()); } @@ -272,6 +280,28 @@ protected void initMetaClient(HoodieTableType tableType, Properties properties) metaClient = HoodieTestUtils.init(hadoopConf, basePath, tableType, properties); } + /** + * Initializes timeline service based on the write config. + */ + protected void initTimelineService() { + timelineService = HoodieClientTestUtils.initTimelineService( + context, basePath, incrementTimelineServicePortToUse()); + timelineServicePort = timelineService.getServerPort(); + } + + protected void cleanupTimelineService() { + if (timelineService != null) { + timelineService.close(); + } + } + + protected int incrementTimelineServicePortToUse() { + // Increment the timeline service port for each individual test + // to avoid port reuse causing failures + timelineServicePort = (timelineServicePort + 1 - 1024) % (65536 - 1024) + 1024; + return timelineServicePort; + } + protected Properties getPropertiesForKeyGen() { Properties properties = new Properties(); properties.put(HoodieTableConfig.POPULATE_META_FIELDS.key(), "false"); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java index 59172c55a806..6dffd535b914 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java @@ -20,6 +20,7 @@ import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.client.HoodieReadClient; +import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieCommitMetadata; @@ -30,13 +31,18 @@ import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.view.FileSystemViewManager; +import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView; import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.timeline.service.TimelineService; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; @@ -257,6 +263,36 @@ public static Stream readHFile(JavaSparkContext jsc, String[] pat return valuesAsList.stream(); } + /** + * Initializes timeline service based on the write config. + * + * @param context {@link HoodieEngineContext} instance to use. + * @param basePath Base path of the table. + * @param timelineServicePort Port number to use for timeline service. + * @return started {@link TimelineService} instance. + */ + public static TimelineService initTimelineService( + HoodieEngineContext context, String basePath, int timelineServicePort) { + try { + HoodieWriteConfig config = HoodieWriteConfig.newBuilder() + .withPath(basePath) + .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder() + .withRemoteServerPort(timelineServicePort).build()) + .build(); + TimelineService timelineService = new TimelineService(context, new Configuration(), + TimelineService.Config.builder().enableMarkerRequests(true) + .serverPort(config.getViewStorageConfig().getRemoteViewServerPort()).build(), + FileSystem.get(new Configuration()), + FileSystemViewManager.createViewManager(context, config.getMetadataConfig(), + config.getViewStorageConfig(), config.getCommonConfig())); + timelineService.startService(); + LOG.info("Timeline service server port: " + timelineServicePort); + return timelineService; + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } + public static Option getCommitMetadataForLatestInstant(HoodieTableMetaClient metaClient) { HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); if (timeline.lastInstant().isPresent()) { diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java index 40ef54b14cb6..88c4c13d974d 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java @@ -49,6 +49,7 @@ import org.apache.hudi.testutils.providers.HoodieMetaClientProvider; import org.apache.hudi.testutils.providers.HoodieWriteClientProvider; import org.apache.hudi.testutils.providers.SparkProvider; +import org.apache.hudi.timeline.service.TimelineService; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -81,10 +82,13 @@ public class SparkClientFunctionalTestHarness implements SparkProvider, HoodieMetaClientProvider, HoodieWriteClientProvider { + protected static int timelineServicePort = + FileSystemViewStorageConfig.REMOTE_PORT_NUM.defaultValue(); private static transient SparkSession spark; private static transient SQLContext sqlContext; private static transient JavaSparkContext jsc; private static transient HoodieSparkEngineContext context; + private static transient TimelineService timelineService; /** * An indicator of the initialization status. @@ -174,6 +178,9 @@ public synchronized void runBeforeEach() { sqlContext = spark.sqlContext(); jsc = new JavaSparkContext(spark.sparkContext()); context = new HoodieSparkEngineContext(jsc); + timelineService = HoodieClientTestUtils.initTimelineService( + context, basePath(), incrementTimelineServicePortToUse()); + timelineServicePort = timelineService.getServerPort(); } } @@ -189,6 +196,9 @@ public static synchronized void resetSpark() { spark.close(); spark = null; } + if (timelineService != null) { + timelineService.close(); + } } protected JavaRDD tagLocation( @@ -312,9 +322,17 @@ protected HoodieWriteConfig.Builder getConfigBuilder(Boolean autoCommit, Boolean .withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1024 * 1024 * 1024).parquetMaxFileSize(1024 * 1024 * 1024).build()) .withEmbeddedTimelineServerEnabled(true).forTable("test-trip-table") .withFileSystemViewConfig(new FileSystemViewStorageConfig.Builder() + .withRemoteServerPort(timelineServicePort) .withEnableBackupForRemoteFileSystemView(false).build()) .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType).build()) .withClusteringConfig(clusteringConfig) .withRollbackUsingMarkers(rollbackUsingMarkers); } + + protected int incrementTimelineServicePortToUse() { + // Increment the timeline service port for each individual test + // to avoid port reuse causing failures + timelineServicePort = (timelineServicePort + 1 - 1024) % (65536 - 1024) + 1024; + return timelineServicePort; + } } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkDatasetTestUtils.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkDatasetTestUtils.java index 2e3b546de403..fd8ece1e06b8 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkDatasetTestUtils.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkDatasetTestUtils.java @@ -19,6 +19,7 @@ package org.apache.hudi.testutils; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieIndexConfig; @@ -176,12 +177,14 @@ public static InternalRow getInternalRowWithError(String partitionPath) { return new GenericInternalRow(values); } - public static HoodieWriteConfig.Builder getConfigBuilder(String basePath) { + public static HoodieWriteConfig.Builder getConfigBuilder(String basePath, int timelineServicePort) { return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA) .withParallelism(2, 2) .withDeleteParallelism(2) .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024).build()) .withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1024 * 1024).parquetMaxFileSize(1024 * 1024).build()) + .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder() + .withRemoteServerPort(timelineServicePort).build()) .forTable("test-trip-table") .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()) .withBulkInsertParallelism(2); diff --git a/hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/internal/HoodieBulkInsertInternalWriterTestBase.java b/hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/internal/HoodieBulkInsertInternalWriterTestBase.java index b19b098443ba..95a023abb687 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/internal/HoodieBulkInsertInternalWriterTestBase.java +++ b/hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/internal/HoodieBulkInsertInternalWriterTestBase.java @@ -77,7 +77,7 @@ protected HoodieWriteConfig getWriteConfig(boolean populateMetaFields) { properties.setProperty(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), SparkDatasetTestUtils.PARTITION_PATH_FIELD_NAME); properties.setProperty(HoodieTableConfig.POPULATE_META_FIELDS.key(), "false"); } - return getConfigBuilder(basePath).withProperties(properties).build(); + return getConfigBuilder(basePath, timelineServicePort).withProperties(properties).build(); } protected void assertWriteStatuses(List writeStatuses, int batches, int size, diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala index 6e736d225a52..8d8ebfa7e54f 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala @@ -16,18 +16,16 @@ */ package org.apache.hudi -import java.lang -import java.util.function.Function - import org.apache.hudi.async.{AsyncClusteringService, AsyncCompactService, SparkStreamingAsyncClusteringService, SparkStreamingAsyncCompactService} import org.apache.hudi.client.SparkRDDWriteClient import org.apache.hudi.client.common.HoodieSparkEngineContext import org.apache.hudi.common.model.HoodieRecordPayload -import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} +import org.apache.hudi.common.table.marker.MarkerType import org.apache.hudi.common.table.timeline.HoodieInstant.State import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline} -import org.apache.hudi.common.util.CompactionUtils -import org.apache.hudi.common.util.ClusteringUtils +import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} +import org.apache.hudi.common.util.{ClusteringUtils, CompactionUtils} +import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.exception.HoodieCorruptedDataException import org.apache.log4j.LogManager import org.apache.spark.api.java.JavaSparkContext @@ -35,8 +33,10 @@ import org.apache.spark.sql.execution.streaming.Sink import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode} -import scala.util.{Failure, Success, Try} +import java.lang +import java.util.function.Function import scala.collection.JavaConversions._ +import scala.util.{Failure, Success, Try} class HoodieStreamingSink(sqlContext: SQLContext, options: Map[String, String], @@ -71,25 +71,29 @@ class HoodieStreamingSink(sqlContext: SQLContext, private var hoodieTableConfig : Option[HoodieTableConfig] = Option.empty override def addBatch(batchId: Long, data: DataFrame): Unit = this.synchronized { - if (isAsyncCompactorServiceShutdownAbnormally) { + if (isAsyncCompactorServiceShutdownAbnormally) { throw new IllegalStateException("Async Compactor shutdown unexpectedly") } - if (isAsyncClusteringServiceShutdownAbnormally) { + if (isAsyncClusteringServiceShutdownAbnormally) { log.error("Async clustering service shutdown unexpectedly") throw new IllegalStateException("Async clustering service shutdown unexpectedly") } + // Override to use direct markers. In Structured streaming, timeline server is closed after + // first micro-batch and subsequent micro-batches do not have timeline server running. + // Thus, we can't use timeline-server-based markers. + val updatedOptions = options.updated(HoodieWriteConfig.MARKERS_TYPE.key(), MarkerType.DIRECT.name()) retry(retryCnt, retryIntervalMs)( Try( HoodieSparkSqlWriter.write( - sqlContext, mode, options, data, hoodieTableConfig, writeClient, Some(triggerAsyncCompactor), Some(triggerAsyncClustering)) + sqlContext, mode, updatedOptions, data, hoodieTableConfig, writeClient, Some(triggerAsyncCompactor), Some(triggerAsyncClustering)) ) match { case Success((true, commitOps, compactionInstantOps, clusteringInstant, client, tableConfig)) => log.info(s"Micro batch id=$batchId succeeded" + (commitOps.isPresent match { - case true => s" for commit=${commitOps.get()}" - case _ => s" with no new commits" - })) + case true => s" for commit=${commitOps.get()}" + case _ => s" with no new commits" + })) writeClient = Some(client) hoodieTableConfig = Some(tableConfig) if (compactionInstantOps.isPresent) { diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala index 68b630be5d5e..85c64f826541 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala @@ -59,9 +59,11 @@ class TestStructuredStreaming extends HoodieClientTestBase { spark = sqlContext.sparkSession initTestDataGenerator() initFileSystem() + initTimelineService() } @AfterEach override def tearDown() = { + cleanupTimelineService() cleanupSparkContexts() cleanupTestDataGenerator() cleanupFileSystem() diff --git a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/TimelineService.java b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/TimelineService.java index 77c7870f5f3a..7b8257705146 100644 --- a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/TimelineService.java +++ b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/TimelineService.java @@ -321,8 +321,10 @@ public void close() { if (requestHandler != null) { this.requestHandler.stop(); } - this.app.stop(); - this.app = null; + if (this.app != null) { + this.app.stop(); + this.app = null; + } this.fsViewsManager.close(); LOG.info("Closed Timeline Service"); }