diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/HoodieWrapperFileSystem.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/HoodieWrapperFileSystem.java index a79d1571afe73..bceea8e3671cf 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/fs/HoodieWrapperFileSystem.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/HoodieWrapperFileSystem.java @@ -21,6 +21,7 @@ import org.apache.hudi.common.metrics.Registry; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.util.HoodieTimer; +import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; @@ -60,6 +61,8 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeoutException; +import static org.apache.hudi.common.fs.StorageSchemes.HDFS; + /** * HoodieWrapperFileSystem wraps the default file system. It holds state about the open streams in the file system to * support getting the written size to each of the open streams. @@ -68,6 +71,8 @@ public class HoodieWrapperFileSystem extends FileSystem { public static final String HOODIE_SCHEME_PREFIX = "hoodie-"; + private static final String TMP_PATH_POSTFIX = ".tmp"; + protected enum MetricName { create, rename, delete, listStatus, mkdirs, getFileStatus, globStatus, listFiles, read, write } @@ -986,6 +991,65 @@ public long getBytesWritten(Path file) { file.toString() + " does not have a open stream. Cannot get the bytes written on the stream"); } + protected boolean needCreateTempFile() { + return HDFS.getScheme().equals(fileSystem.getScheme()); + } + + /** + * Creates a new file with overwrite set to false. This ensures files are created + * only once and never rewritten, also, here we take care if the content is not + * empty, will first write the content to a temp file if {needCreateTempFile} is + * true, and then rename it back after the content is written. + * + * @param fullPath File Path + * @param content Content to be stored + */ + public void createImmutableFileInPath(Path fullPath, Option content) + throws HoodieIOException { + FSDataOutputStream fsout = null; + Path tmpPath = null; + + boolean needTempFile = needCreateTempFile(); + + try { + if (!content.isPresent()) { + fsout = fileSystem.create(fullPath, false); + } + + if (content.isPresent() && needTempFile) { + Path parent = fullPath.getParent(); + tmpPath = new Path(parent, fullPath.getName() + TMP_PATH_POSTFIX); + fsout = fileSystem.create(tmpPath, false); + fsout.write(content.get()); + } + + if (content.isPresent() && !needTempFile) { + fsout = fileSystem.create(fullPath, false); + fsout.write(content.get()); + } + } catch (IOException e) { + String errorMsg = "Failed to create file" + (tmpPath != null ? tmpPath : fullPath); + throw new HoodieIOException(errorMsg, e); + } finally { + try { + if (null != fsout) { + fsout.close(); + } + } catch (IOException e) { + String errorMsg = "Failed to close file" + (needTempFile ? tmpPath : fullPath); + throw new HoodieIOException(errorMsg, e); + } + + try { + if (null != tmpPath) { + fileSystem.rename(tmpPath, fullPath); + } + } catch (IOException e) { + throw new HoodieIOException("Failed to rename " + tmpPath + " to the target " + fullPath, e); + } + } + } + public FileSystem getFileSystem() { return fileSystem; } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java index a62068e655e5d..29b5a9fe9c849 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java @@ -30,7 +30,6 @@ import org.apache.hudi.exception.HoodieIOException; import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; @@ -557,7 +556,7 @@ protected void transitionState(HoodieInstant fromInstant, HoodieInstant toInstan if (allowRedundantTransitions) { FileIOUtils.createFileInPath(metaClient.getFs(), getInstantFileNamePath(toInstant.getFileName()), data); } else { - createImmutableFileInPath(getInstantFileNamePath(toInstant.getFileName()), data); + metaClient.getFs().createImmutableFileInPath(getInstantFileNamePath(toInstant.getFileName()), data); } LOG.info("Create new file for toInstant ?" + getInstantFileNamePath(toInstant.getFileName())); } @@ -724,33 +723,7 @@ protected void createFileInMetaPath(String filename, Option content, boo if (allowOverwrite || metaClient.getTimelineLayoutVersion().isNullVersion()) { FileIOUtils.createFileInPath(metaClient.getFs(), fullPath, content); } else { - createImmutableFileInPath(fullPath, content); - } - } - - /** - * Creates a new file in timeline with overwrite set to false. This ensures - * files are created only once and never rewritten - * @param fullPath File Path - * @param content Content to be stored - */ - private void createImmutableFileInPath(Path fullPath, Option content) { - FSDataOutputStream fsout = null; - try { - fsout = metaClient.getFs().create(fullPath, false); - if (content.isPresent()) { - fsout.write(content.get()); - } - } catch (IOException e) { - throw new HoodieIOException("Failed to create file " + fullPath, e); - } finally { - try { - if (null != fsout) { - fsout.close(); - } - } catch (IOException e) { - throw new HoodieIOException("Failed to close file " + fullPath, e); - } + metaClient.getFs().createImmutableFileInPath(fullPath, content); } } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java b/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java index 9ff17cdbd2688..adee1e820e3dc 100755 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java @@ -18,6 +18,8 @@ package org.apache.hudi.common.table.timeline; +import org.apache.hudi.common.fs.HoodieWrapperFileSystem; +import org.apache.hudi.common.fs.NoOpConsistencyGuard; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant.State; import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; @@ -32,6 +34,7 @@ import org.junit.jupiter.api.Test; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.text.ParseException; import java.util.ArrayList; import java.util.Collections; @@ -45,6 +48,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; +import java.util.function.Consumer; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -199,6 +203,25 @@ public void testTimelineOperations() { assertTrue(activeCommitTimeline.isBeforeTimelineStarts("00")); } + @Test + public void testAllowTempCommit() { + shouldAllowTempCommit(true, hoodieMetaClient -> { + timeline = new HoodieActiveTimeline(hoodieMetaClient); + + HoodieInstant instant1 = new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, "1"); + timeline.createNewInstant(instant1); + + byte[] data = "commit".getBytes(StandardCharsets.UTF_8); + timeline.saveAsComplete(new HoodieInstant(true, instant1.getAction(), + instant1.getTimestamp()), Option.of(data)); + + timeline = timeline.reload(); + + assertTrue(timeline.getContiguousCompletedWriteTimeline().lastInstant().isPresent()); + assertEquals(instant1.getTimestamp(), timeline.getContiguousCompletedWriteTimeline().lastInstant().get().getTimestamp()); + }); + } + @Test public void testGetContiguousCompletedWriteTimeline() { // a mock timeline with holes @@ -594,4 +617,25 @@ private List getAllInstants() { } return allInstants; } + + private void shouldAllowTempCommit(boolean allowTempCommit, Consumer fun) { + if (allowTempCommit) { + HoodieWrapperFileSystem fs = metaClient.getFs(); + HoodieWrapperFileSystem newFs = new HoodieWrapperFileSystem(fs.getFileSystem(), new NoOpConsistencyGuard()) { + @Override + protected boolean needCreateTempFile() { + return true; + } + }; + metaClient.setFs(newFs); + try { + fun.accept(metaClient); + } finally { + metaClient.setFs(fs); + } + return; + } + fun.accept(metaClient); + } + }