diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java index faa7ff67b2693..be324a3c5081a 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java @@ -273,7 +273,8 @@ public WriteStatus close() { insertRecordsWritten++; } } - keyToNewRecords.clear(); + + ((ExternalSpillableMap) keyToNewRecords).close(); writtenRecordKeys.clear(); if (fileWriter != null) { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/DiskBasedMap.java b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/DiskBasedMap.java index 56ef5c3d75074..fe4666305d3f6 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/DiskBasedMap.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/DiskBasedMap.java @@ -74,6 +74,8 @@ public final class DiskBasedMap private ThreadLocal randomAccessFile = new ThreadLocal<>(); private Queue openedAccessFiles = new ConcurrentLinkedQueue<>(); + private transient Thread shutdownThread = null; + public DiskBasedMap(String baseFilePath) throws IOException { this.valueMetadataMap = new ConcurrentHashMap<>(); this.writeOnlyFile = new File(baseFilePath, UUID.randomUUID().toString()); @@ -126,33 +128,8 @@ private void initFile(File writeOnlyFile) throws IOException { * (typically 4 KB) to disk. */ private void addShutDownHook() { - Runtime.getRuntime().addShutdownHook(new Thread() { - @Override - public void run() { - try { - if (writeOnlyFileHandle != null) { - writeOnlyFileHandle.flush(); - fileOutputStream.getChannel().force(false); - writeOnlyFileHandle.close(); - } - - while (!openedAccessFiles.isEmpty()) { - BufferedRandomAccessFile file = openedAccessFiles.poll(); - if (null != file) { - try { - file.close(); - } catch (IOException ioe) { - // skip exception - } - } - } - writeOnlyFile.delete(); - } catch (Exception e) { - // delete the file for any sort of exception - writeOnlyFile.delete(); - } - } - }); + shutdownThread = new Thread(this::cleanup); + Runtime.getRuntime().addShutdownHook(shutdownThread); } private void flushToDisk() { @@ -267,6 +244,39 @@ public void clear() { // reducing concurrency). Instead, just clear the pointer map. The file will be removed on exit. } + public void close() { + cleanup(); + if (shutdownThread != null) { + Runtime.getRuntime().removeShutdownHook(shutdownThread); + } + } + + private void cleanup() { + valueMetadataMap.clear(); + try { + if (writeOnlyFileHandle != null) { + writeOnlyFileHandle.flush(); + fileOutputStream.getChannel().force(false); + writeOnlyFileHandle.close(); + } + + while (!openedAccessFiles.isEmpty()) { + BufferedRandomAccessFile file = openedAccessFiles.poll(); + if (null != file) { + try { + file.close(); + } catch (IOException ioe) { + // skip exception + } + } + } + writeOnlyFile.delete(); + } catch (Exception e) { + // delete the file for any sort of exception + writeOnlyFile.delete(); + } + } + @Override public Set keySet() { return valueMetadataMap.keySet(); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java index 32c41f7ada09d..003d525b66d5e 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java @@ -225,6 +225,10 @@ public void clear() { currentInMemoryMapSize = 0L; } + public void close() { + getDiskBasedMap().close(); + } + @Override public Set keySet() { Set keySet = new HashSet(); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/LazyFileIterable.java b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/LazyFileIterable.java index 927c992a85b0c..95b1ac2b37b36 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/LazyFileIterable.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/LazyFileIterable.java @@ -38,6 +38,8 @@ public class LazyFileIterable implements Iterable { // Stores the key and corresponding value's latest metadata spilled to disk private final Map inMemoryMetadataOfSpilledData; + private transient Thread shutdownThread = null; + public LazyFileIterable(String filePath, Map map) { this.filePath = filePath; this.inMemoryMetadataOfSpilledData = map; @@ -103,6 +105,11 @@ public void forEachRemaining(Consumer action) { } private void close() { + closeHandle(); + Runtime.getRuntime().removeShutdownHook(shutdownThread); + } + + private void closeHandle() { if (readOnlyFileHandle != null) { try { readOnlyFileHandle.close(); @@ -114,12 +121,8 @@ private void close() { } private void addShutdownHook() { - Runtime.getRuntime().addShutdownHook(new Thread() { - @Override - public void run() { - close(); - } - }); + shutdownThread = new Thread(this::closeHandle); + Runtime.getRuntime().addShutdownHook(shutdownThread); } } }