diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetReader.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetReader.java index 9ad07dfafbf60..dc368a2e08214 100644 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetReader.java @@ -19,7 +19,9 @@ package org.apache.hudi.io.storage; import java.io.IOException; +import java.util.ArrayList; import java.util.Iterator; +import java.util.List; import java.util.Set; import org.apache.avro.Schema; @@ -30,14 +32,17 @@ import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.util.BaseFileUtils; import org.apache.hudi.common.util.ParquetReaderIterator; + import org.apache.parquet.avro.AvroParquetReader; import org.apache.parquet.avro.AvroReadSupport; import org.apache.parquet.hadoop.ParquetReader; public class HoodieParquetReader implements HoodieFileReader { + private final Path path; private final Configuration conf; private final BaseFileUtils parquetUtils; + private List readerIterators = new ArrayList<>(); public HoodieParquetReader(Configuration configuration, Path path) { this.conf = configuration; @@ -64,7 +69,9 @@ public Set filterRowKeys(Set candidateRowKeys) { public Iterator getRecordIterator(Schema schema) throws IOException { AvroReadSupport.setAvroReadSchema(conf, schema); ParquetReader reader = AvroParquetReader.builder(path).withConf(conf).build(); - return new ParquetReaderIterator<>(reader); + ParquetReaderIterator parquetReaderIterator = new ParquetReaderIterator<>(reader); + readerIterators.add(parquetReaderIterator); + return parquetReaderIterator; } @Override @@ -74,6 +81,7 @@ public Schema getSchema() { @Override public void close() { + readerIterators.forEach(entry -> entry.close()); } @Override diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala index ac2937e57ea16..4fc86f729b056 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala @@ -35,6 +35,7 @@ import org.apache.hudi.common.util.StringUtils import org.apache.hudi.common.util.ValidationUtils.checkState import org.apache.hudi.io.storage.HoodieHFileReader import org.apache.hudi.metadata.HoodieTableMetadata +import org.apache.spark.TaskContext import org.apache.spark.execution.datasources.HoodieInMemoryFileIndex import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD @@ -47,6 +48,7 @@ import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan} import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{Row, SQLContext, SparkSession} +import java.io.Closeable import scala.collection.JavaConverters._ import scala.util.Try @@ -333,7 +335,13 @@ object HoodieBaseRelation { partitionedFile => { val extension = FSUtils.getFileExtension(partitionedFile.filePath) if (HoodieFileFormat.PARQUET.getFileExtension.equals(extension)) { - parquetReader.apply(partitionedFile) + val iter = parquetReader.apply(partitionedFile) + if (iter.isInstanceOf[Closeable]) { + // register a callback to close parquetReader which will be executed on task completion. + // when tasks finished, this method will be called, and release resources. + Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => iter.asInstanceOf[Closeable].close())) + } + iter } else if (HoodieFileFormat.HFILE.getFileExtension.equals(extension)) { hfileReader.apply(partitionedFile) } else {