From df9cf33bacfab75947fdf6d6a50974fc0ad08421 Mon Sep 17 00:00:00 2001 From: Y Ethan Guo Date: Wed, 8 Mar 2023 10:00:39 -0800 Subject: [PATCH] [DO NOT MERGE] Showcase HoodieMetadataFileSystemView issue causing inconsistent view and data loss in 0.13.0 --- ...RemoteFileSystemViewWithMetadataTable.java | 276 ++++++++++++++++++ .../metadata/HoodieBackedTableMetadata.java | 7 + .../HoodieBackedTestDelayedTableMetadata.java | 55 ++++ 3 files changed, 338 insertions(+) create mode 100644 hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestRemoteFileSystemViewWithMetadataTable.java create mode 100644 hudi-common/src/test/java/org/apache/hudi/metadata/HoodieBackedTestDelayedTableMetadata.java diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestRemoteFileSystemViewWithMetadataTable.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestRemoteFileSystemViewWithMetadataTable.java new file mode 100644 index 0000000000000..c5f7f4218d542 --- /dev/null +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestRemoteFileSystemViewWithMetadataTable.java @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.client.functional; + +import org.apache.hudi.client.SparkRDDWriteClient; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.engine.HoodieLocalEngineContext; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; +import org.apache.hudi.common.table.view.FileSystemViewManager; +import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; +import org.apache.hudi.common.table.view.FileSystemViewStorageType; +import org.apache.hudi.common.table.view.RemoteHoodieTableFileSystemView; +import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieCompactionConfig; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.metadata.HoodieBackedTestDelayedTableMetadata; +import org.apache.hudi.metadata.HoodieMetadataFileSystemView; +import org.apache.hudi.testutils.HoodieClientTestHarness; +import org.apache.hudi.timeline.service.TimelineService; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaRDD; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.stream.Collectors; + +import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION; +import static org.apache.hudi.common.table.view.FileSystemViewStorageConfig.REMOTE_PORT_NUM; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Tests the {@link RemoteHoodieTableFileSystemView} with metadata table enabled, using + * {@link HoodieMetadataFileSystemView} on the timeline server. + */ +public class TestRemoteFileSystemViewWithMetadataTable extends HoodieClientTestHarness { + private static final Logger LOG = LogManager.getLogger(TestRemoteFileSystemViewWithMetadataTable.class); + + @BeforeEach + public void setUp() throws Exception { + initPath(); + initSparkContexts(); + initFileSystem(); + initMetaClient(); + initTimelineService(); + dataGen = new HoodieTestDataGenerator(0x1f86); + } + + @AfterEach + public void tearDown() throws Exception { + cleanupTimelineService(); + cleanupClients(); + cleanupSparkContexts(); + cleanupFileSystem(); + cleanupExecutorService(); + dataGen = null; + System.gc(); + } + + @Override + public void initTimelineService() { + // Start a timeline server that are running across multiple commits + HoodieLocalEngineContext localEngineContext = new HoodieLocalEngineContext(metaClient.getHadoopConf()); + + try { + HoodieWriteConfig config = HoodieWriteConfig.newBuilder() + .withPath(basePath) + .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder() + .withRemoteServerPort(incrementTimelineServicePortToUse()).build()) + .build(); + timelineService = new TimelineService(localEngineContext, new Configuration(), + TimelineService.Config.builder().enableMarkerRequests(true) + .serverPort(config.getViewStorageConfig().getRemoteViewServerPort()).build(), + FileSystem.get(new Configuration()), + FileSystemViewManager.createViewManager( + context, config.getMetadataConfig(), config.getViewStorageConfig(), + config.getCommonConfig(), + () -> new HoodieBackedTestDelayedTableMetadata( + context, config.getMetadataConfig(), basePath, + config.getViewStorageConfig().getSpillableDir(), true))); + timelineService.startService(); + timelineServicePort = timelineService.getServerPort(); + LOG.info("Started timeline server on port: " + timelineServicePort); + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testMORGetLatestFileSliceWithMetadataTable(boolean useExistingTimelineServer) throws IOException { + // This test utilizes the `HoodieBackedTestDelayedTableMetadata` to make sure the + // synced file system view is always served. + + SparkRDDWriteClient writeClient = createWriteClient( + useExistingTimelineServer ? Option.of(timelineService) : Option.empty()); + + for (int i = 0; i < 3; i++) { + writeToTable(i, writeClient); + } + + // At this point, there are three deltacommits and one compaction commit in the Hudi timeline, + // and the file system view of timeline server is not yet synced + HoodieTableMetaClient newMetaClient = HoodieTableMetaClient.builder() + .setConf(metaClient.getHadoopConf()) + .setBasePath(basePath) + .build(); + HoodieActiveTimeline timeline = newMetaClient.getActiveTimeline(); + HoodieInstant compactionCommit = timeline.lastInstant().get(); + assertTrue(timeline.lastInstant().get().getAction().equals(COMMIT_ACTION)); + + // For all the file groups compacted by the compaction commit, the file system view + // should return the latest file slices which is written by the latest commit + HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes( + timeline.getInstantDetails(compactionCommit).get(), HoodieCommitMetadata.class); + List> partitionFileIdPairList = + commitMetadata.getPartitionToWriteStats().entrySet().stream().flatMap( + entry -> { + String partitionPath = entry.getKey(); + return entry.getValue().stream().map( + writeStat -> Pair.of(partitionPath, writeStat.getFileId())); + } + ).collect(Collectors.toList()); + List> lookupList = new ArrayList<>(); + // Accumulate enough threads to test read-write concurrency while syncing the file system + // view at the timeline server + while (lookupList.size() < 128) { + lookupList.addAll(partitionFileIdPairList); + } + + int timelineServerPort = useExistingTimelineServer + ? timelineService.getServerPort() + : writeClient.getTableServiceClient().getTimelineServer().get().getRemoteFileSystemViewConfig().getRemoteViewServerPort(); + + LOG.info("Connecting to Timeline Server: " + timelineServerPort); + RemoteHoodieTableFileSystemView view = + new RemoteHoodieTableFileSystemView("localhost", timelineServerPort, metaClient); + + List callableList = lookupList.stream() + .map(pair -> new TestViewLookUpCallable(view, pair, compactionCommit.getTimestamp())) + .collect(Collectors.toList()); + List> resultList = new ArrayList<>(); + + ExecutorService pool = Executors.newCachedThreadPool(); + callableList.forEach(callable -> { + resultList.add(pool.submit(callable)); + }); + + assertTrue(resultList.stream().map(future -> { + try { + return future.get(); + } catch (Exception e) { + LOG.error(e); + return false; + } + }).reduce((a, b) -> a && b).get()); + } + + @Override + protected HoodieTableType getTableType() { + return HoodieTableType.MERGE_ON_READ; + } + + private SparkRDDWriteClient createWriteClient(Option timelineService) { + HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder() + .withPath(basePath) + .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA) + .withParallelism(2, 2) + .withBulkInsertParallelism(2) + .withFinalizeWriteParallelism(2) + .withDeleteParallelism(2) + .withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION) + .withMergeSmallFileGroupCandidatesLimit(0) + .withCompactionConfig(HoodieCompactionConfig.newBuilder() + .withMaxNumDeltaCommitsBeforeCompaction(3) + .build()) + .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder() + .withStorageType(FileSystemViewStorageType.REMOTE_ONLY) + .withRemoteServerPort(timelineService.isPresent() + ? timelineService.get().getServerPort() : REMOTE_PORT_NUM.defaultValue()) + .build()) + .withAutoCommit(false) + .forTable("test_mor_table") + .build(); + return new SparkRDDWriteClient(context, writeConfig, timelineService); + } + + private void writeToTable(int round, SparkRDDWriteClient writeClient) throws IOException { + String instantTime = HoodieActiveTimeline.createNewInstantTime(); + writeClient.startCommitWithTime(instantTime); + List records = round == 0 + ? dataGen.generateInserts(instantTime, 100) + : dataGen.generateUpdates(instantTime, 100); + + JavaRDD writeStatusRDD = writeClient.upsert(jsc.parallelize(records, 1), instantTime); + writeClient.commit(instantTime, writeStatusRDD, Option.empty(), DELTA_COMMIT_ACTION, Collections.emptyMap()); + // Triggers compaction + writeClient.scheduleCompaction(Option.empty()); + writeClient.runAnyPendingCompactions(); + } + + /** + * Test callable to send lookup request to the timeline server for the latest file slice + * based on the partition path and file ID. + */ + class TestViewLookUpCallable implements Callable { + private final RemoteHoodieTableFileSystemView view; + private final Pair partitionFileIdPair; + private final String expectedCommitTime; + + public TestViewLookUpCallable( + RemoteHoodieTableFileSystemView view, + Pair partitionFileIdPair, + String expectedCommitTime) { + this.view = view; + this.partitionFileIdPair = partitionFileIdPair; + this.expectedCommitTime = expectedCommitTime; + } + + @Override + public Boolean call() throws Exception { + Option latestFileSlice = view.getLatestFileSlice( + partitionFileIdPair.getLeft(), partitionFileIdPair.getRight()); + boolean result = latestFileSlice.isPresent() && expectedCommitTime.equals( + FSUtils.getCommitTime(new Path(latestFileSlice.get().getBaseFile().get().getPath()).getName())); + if (!result) { + LOG.error("The timeline server does not return the correct result: latestFileSliceReturned=" + + latestFileSlice + " expectedCommitTime=" + expectedCommitTime); + } + return result; + } + } +} + diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java index f3538127955f8..4b439e570e2f3 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java @@ -683,6 +683,13 @@ public Option getLatestCompactionTime() { @Override public void reset() { + LOG.info("Sleeping for 5 seconds in reset() to simulate processing ..."); + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + LOG.warn("Sleep is interrupted", e); + } + LOG.info("Sleep in reset() is finished."); initIfNeeded(); dataMetaClient.reloadActiveTimeline(); if (metadataMetaClient != null) { diff --git a/hudi-common/src/test/java/org/apache/hudi/metadata/HoodieBackedTestDelayedTableMetadata.java b/hudi-common/src/test/java/org/apache/hudi/metadata/HoodieBackedTestDelayedTableMetadata.java new file mode 100644 index 0000000000000..fb8d515610c78 --- /dev/null +++ b/hudi-common/src/test/java/org/apache/hudi/metadata/HoodieBackedTestDelayedTableMetadata.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.metadata; + +import org.apache.hudi.common.config.HoodieMetadataConfig; +import org.apache.hudi.common.engine.HoodieEngineContext; + +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +/** + * Table metadata provided by an internal DFS backed Hudi metadata table, + * with an intentional delay in `reset()` to test concurrent reads and writes. + */ +public class HoodieBackedTestDelayedTableMetadata extends HoodieBackedTableMetadata { + private static final Logger LOG = LogManager.getLogger(HoodieBackedTestDelayedTableMetadata.class); + + public HoodieBackedTestDelayedTableMetadata(HoodieEngineContext engineContext, + HoodieMetadataConfig metadataConfig, + String datasetBasePath, + String spillableMapDirectory, + boolean reuse) { + super(engineContext, metadataConfig, datasetBasePath, spillableMapDirectory, reuse); + } + + @Override + public void reset() { + LOG.info("Sleeping for 5 seconds in reset() to simulate processing ..."); + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + LOG.warn("Sleep is interrupted", e); + } + LOG.info("Sleep in reset() is finished."); + super.reset(); + } +} +