diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchivedCommitsCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchivedCommitsCommand.java index 313c1bcd3b93c..4c7ce8819d534 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchivedCommitsCommand.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchivedCommitsCommand.java @@ -92,7 +92,7 @@ public void init() throws IOException { // archive HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, hadoopConf); - archiveLog.archiveIfRequired(); + archiveLog.archiveIfRequired(jsc); } @AfterEach diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCommitsCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCommitsCommand.java index 45c340df2db8a..44e2b8097e5f8 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCommitsCommand.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCommitsCommand.java @@ -176,7 +176,7 @@ public void testShowArchivedCommits() throws IOException { // archive metaClient = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient()); HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, jsc.hadoopConfiguration()); - archiveLog.archiveIfRequired(); + archiveLog.archiveIfRequired(jsc); CommandResult cr = getShell().executeCommand(String.format("commits showarchived --startTs %s --endTs %s", "100", "104")); assertTrue(cr.isSuccess()); diff --git a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java index b2ad315b599bd..9782b46b6ab12 100644 --- a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java +++ b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java @@ -337,7 +337,7 @@ protected void postCommit(HoodieTable table, HoodieCommitMetadata metadata, S try { // Delete the marker directory for the instant. - new MarkerFiles(table, instantTime).quietDeleteMarkerDir(); + new MarkerFiles(table, instantTime).quietDeleteMarkerDir(jsc, config.getMarkersDeleteParallelism()); // Do an inline compaction if enabled if (config.isInlineCompaction()) { @@ -349,7 +349,7 @@ protected void postCommit(HoodieTable table, HoodieCommitMetadata metadata, S } // We cannot have unbounded commit files. Archive commits if we have to archive HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(config, hadoopConf); - archiveLog.archiveIfRequired(); + archiveLog.archiveIfRequired(jsc); autoCleanOnCommit(instantTime); } catch (IOException ioe) { throw new HoodieIOException(ioe.getMessage(), ioe); diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 9aecdf7075849..69758f251cc4f 100644 --- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -88,6 +88,8 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { public static final String DEFAULT_HOODIE_WRITE_STATUS_CLASS = WriteStatus.class.getName(); public static final String FINALIZE_WRITE_PARALLELISM = "hoodie.finalize.write.parallelism"; public static final String DEFAULT_FINALIZE_WRITE_PARALLELISM = DEFAULT_PARALLELISM; + public static final String MARKERS_DELETE_PARALLELISM = "hoodie.markers.delete.parallelism"; + public static final String DEFAULT_MARKERS_DELETE_PARALLELISM = "100"; public static final String BULKINSERT_SORT_MODE = "hoodie.bulkinsert.sort.mode"; public static final String DEFAULT_BULKINSERT_SORT_MODE = BulkInsertSortMode.GLOBAL_SORT .toString(); @@ -235,6 +237,10 @@ public int getFinalizeWriteParallelism() { return Integer.parseInt(props.getProperty(FINALIZE_WRITE_PARALLELISM)); } + public int getMarkersDeleteParallelism() { + return Integer.parseInt(props.getProperty(MARKERS_DELETE_PARALLELISM)); + } + public boolean isEmbeddedTimelineServerEnabled() { return Boolean.parseBoolean(props.getProperty(EMBEDDED_TIMELINE_SERVER_ENABLED)); } @@ -830,6 +836,11 @@ public Builder withFinalizeWriteParallelism(int parallelism) { return this; } + public Builder withMarkersDeleteParallelism(int parallelism) { + props.setProperty(MARKERS_DELETE_PARALLELISM, String.valueOf(parallelism)); + return this; + } + public Builder withEmbeddedTimelineServerEnabled(boolean enabled) { props.setProperty(EMBEDDED_TIMELINE_SERVER_ENABLED, String.valueOf(enabled)); return this; @@ -874,6 +885,8 @@ public HoodieWriteConfig build() { DEFAULT_HOODIE_WRITE_STATUS_CLASS); setDefaultOnCondition(props, !props.containsKey(FINALIZE_WRITE_PARALLELISM), FINALIZE_WRITE_PARALLELISM, DEFAULT_FINALIZE_WRITE_PARALLELISM); + setDefaultOnCondition(props, !props.containsKey(MARKERS_DELETE_PARALLELISM), MARKERS_DELETE_PARALLELISM, + DEFAULT_MARKERS_DELETE_PARALLELISM); setDefaultOnCondition(props, !props.containsKey(EMBEDDED_TIMELINE_SERVER_ENABLED), EMBEDDED_TIMELINE_SERVER_ENABLED, DEFAULT_EMBEDDED_TIMELINE_SERVER_ENABLED); setDefaultOnCondition(props, !props.containsKey(INITIAL_CONSISTENCY_CHECK_INTERVAL_MS_PROP), diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java index 748091e1d11d6..d8b0c6e9018e1 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java @@ -70,6 +70,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -428,21 +429,21 @@ protected void reconcileAgainstMarkers(JavaSparkContext jsc, } // we are not including log appends here, since they are already fail-safe. - List invalidDataPaths = markers.createdAndMergedDataPaths(); - List validDataPaths = stats.stream() + Set invalidDataPaths = markers.createdAndMergedDataPaths(jsc, config.getFinalizeWriteParallelism()); + Set validDataPaths = stats.stream() .map(HoodieWriteStat::getPath) .filter(p -> p.endsWith(this.getBaseFileExtension())) - .collect(Collectors.toList()); + .collect(Collectors.toSet()); + // Contains list of partially created files. These needs to be cleaned up. invalidDataPaths.removeAll(validDataPaths); + if (!invalidDataPaths.isEmpty()) { LOG.info("Removing duplicate data files created due to spark retries before committing. Paths=" + invalidDataPaths); - } - Map>> invalidPathsByPartition = invalidDataPaths.stream() - .map(dp -> Pair.of(new Path(dp).getParent().toString(), new Path(basePath, dp).toString())) - .collect(Collectors.groupingBy(Pair::getKey)); + Map>> invalidPathsByPartition = invalidDataPaths.stream() + .map(dp -> Pair.of(new Path(dp).getParent().toString(), new Path(basePath, dp).toString())) + .collect(Collectors.groupingBy(Pair::getKey)); - if (!invalidPathsByPartition.isEmpty()) { // Ensure all files in delete list is actually present. This is mandatory for an eventually consistent FS. // Otherwise, we may miss deleting such files. If files are not found even after retries, fail the commit if (consistencyCheckEnabled) { diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java index 98d3e05fcb7f0..4be00a3a58e97 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java @@ -54,6 +54,7 @@ import org.apache.hudi.exception.HoodieIOException; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaSparkContext; import java.io.FileNotFoundException; import java.io.IOException; @@ -121,7 +122,7 @@ private void close() { /** * Check if commits need to be archived. If yes, archive commits. */ - public boolean archiveIfRequired() throws IOException { + public boolean archiveIfRequired(JavaSparkContext jsc) throws IOException { try { List instantsToArchive = getInstantsToArchive().collect(Collectors.toList()); @@ -129,7 +130,7 @@ public boolean archiveIfRequired() throws IOException { if (!instantsToArchive.isEmpty()) { this.writer = openWriter(); LOG.info("Archiving instants " + instantsToArchive); - archive(instantsToArchive); + archive(jsc, instantsToArchive); LOG.info("Deleting archived instants " + instantsToArchive); success = deleteArchivedInstants(instantsToArchive); } else { @@ -267,7 +268,7 @@ private boolean deleteAllInstantsOlderorEqualsInAuxMetaFolder(HoodieInstant thre return success; } - public void archive(List instants) throws HoodieCommitException { + public void archive(JavaSparkContext jsc, List instants) throws HoodieCommitException { try { HoodieTimeline commitTimeline = metaClient.getActiveTimeline().getAllCommitsTimeline().filterCompletedInstants(); Schema wrapperSchema = HoodieArchivedMetaEntry.getClassSchema(); @@ -275,7 +276,7 @@ public void archive(List instants) throws HoodieCommitException { List records = new ArrayList<>(); for (HoodieInstant hoodieInstant : instants) { try { - deleteAnyLeftOverMarkerFiles(hoodieInstant); + deleteAnyLeftOverMarkerFiles(jsc, hoodieInstant); records.add(convertToAvroRecord(commitTimeline, hoodieInstant)); if (records.size() >= this.config.getCommitArchivalBatchSize()) { writeToFile(wrapperSchema, records); @@ -293,9 +294,9 @@ public void archive(List instants) throws HoodieCommitException { } } - private void deleteAnyLeftOverMarkerFiles(HoodieInstant instant) { + private void deleteAnyLeftOverMarkerFiles(JavaSparkContext jsc, HoodieInstant instant) { MarkerFiles markerFiles = new MarkerFiles(table, instant.getTimestamp()); - if (markerFiles.deleteMarkerDir()) { + if (markerFiles.deleteMarkerDir(jsc, config.getMarkersDeleteParallelism())) { LOG.info("Cleaned up left over marker directory for instant :" + instant); } } diff --git a/hudi-client/src/main/java/org/apache/hudi/table/MarkerFiles.java b/hudi-client/src/main/java/org/apache/hudi/table/MarkerFiles.java index 00eb7df7ac096..8a310fd30b3ce 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/MarkerFiles.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/MarkerFiles.java @@ -18,8 +18,12 @@ package org.apache.hudi.table; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.util.ValidationUtils; @@ -28,26 +32,27 @@ import org.apache.hudi.io.IOType; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaSparkContext; import java.io.IOException; +import java.io.Serializable; +import java.util.Arrays; import java.util.ArrayList; -import java.util.LinkedList; +import java.util.HashSet; import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; /** * Operates on marker files for a given write action (commit, delta commit, compaction). */ -public class MarkerFiles { +public class MarkerFiles implements Serializable { private static final Logger LOG = LogManager.getLogger(MarkerFiles.class); - public static String stripMarkerSuffix(String path) { - return path.substring(0, path.indexOf(HoodieTableMetaClient.MARKER_EXTN)); - } - private final String instantTime; - private final FileSystem fs; - private final Path markerDirPath; + private final transient FileSystem fs; + private final transient Path markerDirPath; private final String basePath; public MarkerFiles(FileSystem fs, String basePath, String markerFolderPath, String instantTime) { @@ -64,9 +69,9 @@ public MarkerFiles(HoodieTable table, String instantTime) { instantTime); } - public void quietDeleteMarkerDir() { + public void quietDeleteMarkerDir(JavaSparkContext jsc, int parallelism) { try { - deleteMarkerDir(); + deleteMarkerDir(jsc, parallelism); } catch (HoodieIOException ioe) { LOG.warn("Error deleting marker directory for instant " + instantTime, ioe); } @@ -74,34 +79,77 @@ public void quietDeleteMarkerDir() { /** * Delete Marker directory corresponding to an instant. + * + * @param jsc Java Spark Context. + * @param parallelism Spark parallelism for deletion. */ - public boolean deleteMarkerDir() { + public boolean deleteMarkerDir(JavaSparkContext jsc, int parallelism) { try { - boolean result = fs.delete(markerDirPath, true); - if (result) { + if (fs.exists(markerDirPath)) { + FileStatus[] fileStatuses = fs.listStatus(markerDirPath); + List markerDirSubPaths = Arrays.stream(fileStatuses) + .map(fileStatus -> fileStatus.getPath().toString()) + .collect(Collectors.toList()); + + if (markerDirSubPaths.size() > 0) { + SerializableConfiguration conf = new SerializableConfiguration(fs.getConf()); + parallelism = Math.min(markerDirSubPaths.size(), parallelism); + jsc.parallelize(markerDirSubPaths, parallelism).foreach(subPathStr -> { + Path subPath = new Path(subPathStr); + FileSystem fileSystem = subPath.getFileSystem(conf.get()); + fileSystem.delete(subPath, true); + }); + } + + boolean result = fs.delete(markerDirPath, true); LOG.info("Removing marker directory at " + markerDirPath); - } else { - LOG.info("No marker directory to delete at " + markerDirPath); + return result; } - return result; } catch (IOException ioe) { throw new HoodieIOException(ioe.getMessage(), ioe); } + return false; } public boolean doesMarkerDirExist() throws IOException { return fs.exists(markerDirPath); } - public List createdAndMergedDataPaths() throws IOException { - List dataFiles = new LinkedList<>(); - FSUtils.processFiles(fs, markerDirPath.toString(), (status) -> { - String pathStr = status.getPath().toString(); - if (pathStr.contains(HoodieTableMetaClient.MARKER_EXTN) && !pathStr.endsWith(IOType.APPEND.name())) { - dataFiles.add(translateMarkerToDataPath(pathStr)); + public Set createdAndMergedDataPaths(JavaSparkContext jsc, int parallelism) throws IOException { + Set dataFiles = new HashSet<>(); + + FileStatus[] topLevelStatuses = fs.listStatus(markerDirPath); + List subDirectories = new ArrayList<>(); + for (FileStatus topLevelStatus: topLevelStatuses) { + if (topLevelStatus.isFile()) { + String pathStr = topLevelStatus.getPath().toString(); + if (pathStr.contains(HoodieTableMetaClient.MARKER_EXTN) && !pathStr.endsWith(IOType.APPEND.name())) { + dataFiles.add(translateMarkerToDataPath(pathStr)); + } + } else { + subDirectories.add(topLevelStatus.getPath().toString()); } - return true; - }, false); + } + + if (subDirectories.size() > 0) { + parallelism = Math.min(subDirectories.size(), parallelism); + SerializableConfiguration serializedConf = new SerializableConfiguration(fs.getConf()); + dataFiles.addAll(jsc.parallelize(subDirectories, parallelism).flatMap(directory -> { + Path path = new Path(directory); + FileSystem fileSystem = path.getFileSystem(serializedConf.get()); + RemoteIterator itr = fileSystem.listFiles(path, true); + List result = new ArrayList<>(); + while (itr.hasNext()) { + FileStatus status = itr.next(); + String pathStr = status.getPath().toString(); + if (pathStr.contains(HoodieTableMetaClient.MARKER_EXTN) && !pathStr.endsWith(IOType.APPEND.name())) { + result.add(translateMarkerToDataPath(pathStr)); + } + } + return result.iterator(); + }).collect()); + } + return dataFiles; } @@ -110,6 +158,10 @@ private String translateMarkerToDataPath(String markerPath) { return MarkerFiles.stripMarkerSuffix(rPath); } + public static String stripMarkerSuffix(String path) { + return path.substring(0, path.indexOf(HoodieTableMetaClient.MARKER_EXTN)); + } + public List allMarkerFilePaths() throws IOException { List markerFiles = new ArrayList<>(); FSUtils.processFiles(fs, markerDirPath.toString(), fileStatus -> { diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java index 846e8a8538e54..90b9bb387168b 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java @@ -113,7 +113,7 @@ public HoodieRollbackMetadata execute() { } // Finally, remove the marker files post rollback. - new MarkerFiles(table, instantToRollback.getTimestamp()).quietDeleteMarkerDir(); + new MarkerFiles(table, instantToRollback.getTimestamp()).quietDeleteMarkerDir(jsc, config.getMarkersDeleteParallelism()); return rollbackMetadata; } diff --git a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java index 484caf7f78471..5785fc89b7702 100644 --- a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java +++ b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java @@ -79,7 +79,7 @@ public void testArchiveEmptyTable() throws IOException { .withParallelism(2, 2).forTable("test-trip-table").build(); metaClient = HoodieTableMetaClient.reload(metaClient); HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, hadoopConf); - boolean result = archiveLog.archiveIfRequired(); + boolean result = archiveLog.archiveIfRequired(jsc); assertTrue(result); } @@ -157,7 +157,7 @@ public void testArchiveTableWithArchival() throws IOException { metaClient = HoodieTableMetaClient.reload(metaClient); HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, hadoopConf); - assertTrue(archiveLog.archiveIfRequired()); + assertTrue(archiveLog.archiveIfRequired(jsc)); // reload the timeline and remove the remaining commits timeline = metaClient.getActiveTimeline().reload().getAllCommitsTimeline().filterCompletedInstants(); @@ -246,7 +246,7 @@ public void testArchiveTableWithNoArchival() throws IOException { HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); assertEquals(4, timeline.countInstants(), "Loaded 4 commits and the count should match"); - boolean result = archiveLog.archiveIfRequired(); + boolean result = archiveLog.archiveIfRequired(jsc); assertTrue(result); timeline = metaClient.getActiveTimeline().reload().getCommitsTimeline().filterCompletedInstants(); assertEquals(4, timeline.countInstants(), "Should not archive commits when maxCommitsToKeep is 5"); @@ -289,7 +289,7 @@ public void testArchiveCommitSafety() throws IOException { HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); assertEquals(6, timeline.countInstants(), "Loaded 6 commits and the count should match"); - boolean result = archiveLog.archiveIfRequired(); + boolean result = archiveLog.archiveIfRequired(jsc); assertTrue(result); timeline = metaClient.getActiveTimeline().reload().getCommitsTimeline().filterCompletedInstants(); assertTrue(timeline.containsOrBeforeTimelineStarts("100"), "Archived commits should always be safe"); @@ -315,7 +315,7 @@ public void testArchiveCommitSavepointNoHole() throws IOException { HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); assertEquals(6, timeline.countInstants(), "Loaded 6 commits and the count should match"); - assertTrue(archiveLog.archiveIfRequired()); + assertTrue(archiveLog.archiveIfRequired(jsc)); timeline = metaClient.getActiveTimeline().reload().getCommitsTimeline().filterCompletedInstants(); assertEquals(5, timeline.countInstants(), "Since we have a savepoint at 101, we should never archive any commit after 101 (we only archive 100)"); @@ -349,7 +349,7 @@ public void testArchiveCommitCompactionNoHole() throws IOException { HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsAndCompactionTimeline(); assertEquals(8, timeline.countInstants(), "Loaded 6 commits and the count should match"); - boolean result = archiveLog.archiveIfRequired(); + boolean result = archiveLog.archiveIfRequired(jsc); assertTrue(result); timeline = metaClient.getActiveTimeline().reload().getCommitsAndCompactionTimeline(); assertFalse(timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "100")), @@ -397,7 +397,7 @@ public void testArchiveCommitTimeline() throws IOException { HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, dfs.getConf()); - boolean result = archiveLog.archiveIfRequired(); + boolean result = archiveLog.archiveIfRequired(jsc); assertTrue(result); HoodieArchivedTimeline archivedTimeline = metaClient.getArchivedTimeline(); List archivedInstants = Arrays.asList(instant1, instant2, instant3); diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestMarkerFiles.java b/hudi-client/src/test/java/org/apache/hudi/table/TestMarkerFiles.java index 723d9e17a6bf2..af679cee833db 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/TestMarkerFiles.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/TestMarkerFiles.java @@ -28,6 +28,9 @@ import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.io.IOType; +import org.apache.hudi.testutils.HoodieClientTestUtils; +import org.apache.spark.api.java.JavaSparkContext; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -45,16 +48,23 @@ public class TestMarkerFiles extends HoodieCommonTestHarness { private MarkerFiles markerFiles; private FileSystem fs; private Path markerFolderPath; + private JavaSparkContext jsc; @BeforeEach public void setup() throws IOException { initPath(); initMetaClient(); + this.jsc = new JavaSparkContext(HoodieClientTestUtils.getSparkConfForTest(TestMarkerFiles.class.getName())); this.fs = FSUtils.getFs(metaClient.getBasePath(), metaClient.getHadoopConf()); this.markerFolderPath = new Path(metaClient.getMarkerFolderPath("000")); this.markerFiles = new MarkerFiles(fs, metaClient.getBasePath(), markerFolderPath.toString(), "000"); } + @AfterEach + public void cleanup() { + jsc.stop(); + } + private void createSomeMarkerFiles() { markerFiles.create("2020/06/01", "file1", IOType.MERGE); markerFiles.create("2020/06/02", "file2", IOType.APPEND); @@ -97,7 +107,7 @@ public void testDeletionWhenMarkerDirExists() throws IOException { // then assertTrue(markerFiles.doesMarkerDirExist()); - assertTrue(markerFiles.deleteMarkerDir()); + assertTrue(markerFiles.deleteMarkerDir(jsc, 2)); assertFalse(markerFiles.doesMarkerDirExist()); } @@ -105,7 +115,7 @@ public void testDeletionWhenMarkerDirExists() throws IOException { public void testDeletionWhenMarkerDirNotExists() throws IOException { // then assertFalse(markerFiles.doesMarkerDirExist()); - assertFalse(markerFiles.deleteMarkerDir()); + assertFalse(markerFiles.deleteMarkerDir(jsc, 2)); } @Test @@ -120,7 +130,7 @@ public void testDataPathsWhenCreatingOrMerging() throws IOException { // then assertIterableEquals(CollectionUtils.createImmutableList( "2020/06/01/file1", "2020/06/03/file3"), - markerFiles.createdAndMergedDataPaths().stream().sorted().collect(Collectors.toList()) + markerFiles.createdAndMergedDataPaths(jsc, 2).stream().sorted().collect(Collectors.toList()) ); }