diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java index bad5e1982e0fe..58898e72206ed 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java @@ -25,6 +25,7 @@ import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.ExternalSpillableMap; import org.apache.hudi.hadoop.config.HoodieRealtimeConfig; import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils; import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils; @@ -189,6 +190,9 @@ public long getPos() throws IOException { @Override public void close() throws IOException { parquetReader.close(); + // need clean the tmp file which created by logScanner + // Otherwise, for resident process such as presto, the /tmp directory will overflow + ((ExternalSpillableMap) deltaRecordMap).close(); } @Override diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java index 9f6e77bd1f4d0..ede76dc3490fa 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java @@ -82,6 +82,7 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.List; @@ -235,6 +236,7 @@ private void testReaderInternal(ExternalSpillableMap.DiskMapType diskMapType, jobConf.setBoolean(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(), isCompressionEnabled); // validate record reader compaction + long logTmpFileStartTime = System.currentTimeMillis(); HoodieRealtimeRecordReader recordReader = new HoodieRealtimeRecordReader(split, jobConf, reader); // use reader to read base Parquet File and log file, merge in flight and return latest commit @@ -255,6 +257,8 @@ private void testReaderInternal(ExternalSpillableMap.DiskMapType diskMapType, assertEquals(1.0, recordReader.getProgress(), 0.05); assertEquals(120, recordCnt); recordReader.close(); + // the temp file produced by logScanner should be deleted + assertTrue(!getLogTempFile(logTmpFileStartTime, System.currentTimeMillis(), diskMapType.toString()).exists()); } catch (Exception ioe) { throw new HoodieException(ioe.getMessage(), ioe); } @@ -264,6 +268,13 @@ private void testReaderInternal(ExternalSpillableMap.DiskMapType diskMapType, } + private File getLogTempFile(long startTime, long endTime, String diskType) { + return Arrays.stream(new File("/tmp").listFiles()) + .filter(f -> f.isDirectory() && f.getName().startsWith("hudi-" + diskType) && f.lastModified() > startTime && f.lastModified() < endTime) + .findFirst() + .orElse(new File("")); + } + @Test public void testUnMergedReader() throws Exception { // initial commit @@ -473,6 +484,7 @@ public void testReaderWithNestedAndComplexSchema(ExternalSpillableMap.DiskMapTyp assertEquals("stringArray" + i + recordCommitTimeSuffix, arrayValues[i].toString(), "test value for field: stringArray"); } + reader.close(); } } @@ -552,6 +564,7 @@ public void testSchemaEvolutionAndRollbackBlockInLastLogFile(ExternalSpillableMa while (recordReader.next(key, value)) { // keep reading } + reader.close(); } private static Stream testArguments() {