diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java index 5c53aa8494c7a..523bd50b564bb 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java @@ -55,6 +55,7 @@ import org.apache.hudi.common.util.collection.ClosableIterator; import org.apache.hudi.config.HoodieClusteringConfig; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.data.CloseableIteratorListener; import org.apache.hudi.data.HoodieJavaRDD; import org.apache.hudi.exception.HoodieClusteringException; import org.apache.hudi.exception.HoodieException; @@ -335,7 +336,7 @@ private HoodieData> readRecordsForGroupWithLogs(JavaSparkContext }; suppliers.add(iteratorSupplier); }); - return new LazyConcatenatingIterator<>(suppliers); + return CloseableIteratorListener.addListener(new LazyConcatenatingIterator<>(suppliers)); })); } @@ -357,7 +358,7 @@ private HoodieData> readRecordsForGroupBaseFiles(JavaSparkContex iteratorGettersForPartition.add(recordIteratorGetter); }); - return new LazyConcatenatingIterator<>(iteratorGettersForPartition); + return CloseableIteratorListener.addListener(new LazyConcatenatingIterator<>(iteratorGettersForPartition)); })); } @@ -477,7 +478,7 @@ public Iterator call(ClusteringOperation clusteringOperation) throw 0, Long.MAX_VALUE, usePosition, false); fileGroupReader.initRecordIterators(); // read records from the FG reader - return fileGroupReader.getClosableIterator(); + return CloseableIteratorListener.addListener(fileGroupReader.getClosableIterator()); } }).rdd(); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/CloseableIteratorListener.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/CloseableIteratorListener.java new file mode 100644 index 0000000000000..c018aeccfa315 --- /dev/null +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/CloseableIteratorListener.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.data; + +import org.apache.spark.TaskContext; +import org.apache.spark.util.TaskCompletionListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Iterator; + +/** + * Helper class for adding a spark task completion listener that will ensure the iterator is closed if it is an instance of {@link AutoCloseable}. + * This is commonly used with {@link org.apache.hudi.common.util.collection.ClosableIterator} to ensure the resources are closed after the task completes. + */ +public class CloseableIteratorListener implements TaskCompletionListener { + private static final Logger LOG = LoggerFactory.getLogger(CloseableIteratorListener.class); + private final Object iterator; + + private CloseableIteratorListener(Object iterator) { + this.iterator = iterator; + } + + public static Iterator addListener(Iterator iterator) { + TaskContext.get().addTaskCompletionListener(new CloseableIteratorListener(iterator)); + return iterator; + } + + public static scala.collection.Iterator addListener(scala.collection.Iterator iterator) { + TaskContext.get().addTaskCompletionListener(new CloseableIteratorListener(iterator)); + return iterator; + } + + /** + * Closes the iterator if it also implements {@link AutoCloseable}, otherwise it is a no-op. + * + * @param context the spark context + */ + @Override + public void onTaskCompletion(TaskContext context) { + if (iterator instanceof AutoCloseable) { + try { + ((AutoCloseable) iterator).close(); + } catch (Exception ex) { + LOG.warn("Failed to properly close iterator", ex); + } + } + } +} diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaPairRDD.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaPairRDD.java index fbcab6b575e42..2dbbb1880bfe0 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaPairRDD.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaPairRDD.java @@ -131,7 +131,7 @@ public HoodiePairData mapValues(SerializableFunction func) { } public HoodiePairData flatMapValues(SerializableFunction> func) { - return HoodieJavaPairRDD.of(pairRDDData.flatMapValues(func::apply)); + return HoodieJavaPairRDD.of(pairRDDData.flatMapValues(iter -> CloseableIteratorListener.addListener(func.apply(iter)))); } @Override diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java index faec42368ca87..0eca77693db8e 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java @@ -150,21 +150,21 @@ public HoodieData map(SerializableFunction func) { @Override public HoodieData mapPartitions(SerializableFunction, Iterator> func, boolean preservesPartitioning) { - return HoodieJavaRDD.of(rddData.mapPartitions(func::apply, preservesPartitioning)); + return HoodieJavaRDD.of(rddData.mapPartitions(iter -> CloseableIteratorListener.addListener(func.apply(iter)), preservesPartitioning)); } @Override public HoodieData flatMap(SerializableFunction> func) { // NOTE: Unrolling this lambda into a method reference results in [[ClassCastException]] // due to weird interop b/w Scala and Java - return HoodieJavaRDD.of(rddData.flatMap(e -> func.apply(e))); + return HoodieJavaRDD.of(rddData.flatMap(e -> CloseableIteratorListener.addListener(func.apply(e)))); } @Override public HoodiePairData flatMapToPair(SerializableFunction>> func) { return HoodieJavaPairRDD.of( rddData.flatMapToPair(e -> - new MappingIterator<>(func.apply(e), p -> new Tuple2<>(p.getKey(), p.getValue())))); + new MappingIterator<>(CloseableIteratorListener.addListener(func.apply(e)), p -> new Tuple2<>(p.getKey(), p.getValue())))); } @Override diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/data/TestHoodieJavaPairRDD.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/data/TestHoodieJavaPairRDD.java index 75bc888a71d10..3c11b1279e400 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/data/TestHoodieJavaPairRDD.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/data/TestHoodieJavaPairRDD.java @@ -31,6 +31,7 @@ import org.junit.jupiter.api.Test; import java.util.Arrays; +import java.util.Collections; import java.util.List; import scala.Tuple2; @@ -107,4 +108,15 @@ public void testLeftOuterJoinOperation() { assertEquals(Option.of("value1"), item.getRight().getRight()); }); } + + @Test + void testFlatMapValuesWithCloseable() { + String partition1 = "partition1"; + String partition2 = "partition2"; + HoodiePairData input = HoodieJavaPairRDD.of(jsc.parallelizePairs(Arrays.asList(Tuple2.apply(1, partition1), Tuple2.apply(2, partition2)), 2)); + input.flatMapValues(partition -> new TrackingCloseableIterator<>(partition, Collections.singletonList(1).iterator())) + .collectAsList(); + assertTrue(TrackingCloseableIterator.isClosed(partition1)); + assertTrue(TrackingCloseableIterator.isClosed(partition2)); + } } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/data/TestHoodieJavaRDD.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/data/TestHoodieJavaRDD.java index a2617b592d6e3..cbaf7fa604d0e 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/data/TestHoodieJavaRDD.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/data/TestHoodieJavaRDD.java @@ -27,10 +27,13 @@ import org.apache.spark.sql.internal.SQLConf; import org.junit.jupiter.api.Test; +import java.util.Arrays; +import java.util.Collections; import java.util.stream.Collectors; import java.util.stream.IntStream; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; public class TestHoodieJavaRDD extends HoodieClientTestBase { @Test @@ -65,4 +68,37 @@ public void testDeduceNumPartitions() { .reduceByKey((p1, p2) -> p1, 11); assertEquals(11, shuffleRDD.deduceNumPartitions()); } + + @Test + void testMapPartitionsWithCloseable() { + String partition1 = "partition1"; + String partition2 = "partition2"; + HoodieData input = HoodieJavaRDD.of(Arrays.asList(partition1, partition2), context, 2); + input.mapPartitions(partition -> new TrackingCloseableIterator<>(partition.next(), Collections.singletonList("a").iterator()), true) + .collectAsList(); + assertTrue(TrackingCloseableIterator.isClosed(partition1)); + assertTrue(TrackingCloseableIterator.isClosed(partition2)); + } + + @Test + void testFlatMapWithCloseable() { + String partition1 = "partition1"; + String partition2 = "partition2"; + HoodieData input = HoodieJavaRDD.of(Arrays.asList(partition1, partition2), context, 2); + input.flatMap(partition -> new TrackingCloseableIterator<>(partition, Collections.singletonList("a").iterator())) + .collectAsList(); + assertTrue(TrackingCloseableIterator.isClosed(partition1)); + assertTrue(TrackingCloseableIterator.isClosed(partition2)); + } + + @Test + void testFlatMapToPairWithCloseable() { + String partition1 = "partition1"; + String partition2 = "partition2"; + HoodieData input = HoodieJavaRDD.of(Arrays.asList(partition1, partition2), context, 2); + input.flatMapToPair(partition -> new TrackingCloseableIterator<>(partition, Collections.singletonList(Pair.of(1, "1")).iterator())) + .collectAsList(); + assertTrue(TrackingCloseableIterator.isClosed(partition1)); + assertTrue(TrackingCloseableIterator.isClosed(partition2)); + } } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/data/TrackingCloseableIterator.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/data/TrackingCloseableIterator.java new file mode 100644 index 0000000000000..d06ec7766f370 --- /dev/null +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/data/TrackingCloseableIterator.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.data; + +import org.apache.hudi.common.util.collection.ClosableIterator; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +/** + * Closeable iterator to use in Spark related testing to ensure that the close method is properly called after transformations. + * @param type of record within the iterator + */ +class TrackingCloseableIterator implements ClosableIterator, Serializable { + private static final Map IS_CLOSED_BY_ID = new HashMap<>(); + private final String id; + private final Iterator inner; + + public TrackingCloseableIterator(String id, Iterator inner) { + this.id = id; + this.inner = inner; + IS_CLOSED_BY_ID.put(id, false); + } + + public static boolean isClosed(String id) { + return IS_CLOSED_BY_ID.get(id); + } + + @Override + public void close() { + IS_CLOSED_BY_ID.put(id, true); + } + + @Override + public boolean hasNext() { + return inner.hasNext(); + } + + @Override + public T next() { + return inner.next(); + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieBaseListData.java b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieBaseListData.java index 6f3dbfcef9939..b603b99d93022 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieBaseListData.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieBaseListData.java @@ -21,6 +21,10 @@ import org.apache.hudi.common.util.Either; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Iterator; import java.util.List; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -43,7 +47,12 @@ protected HoodieBaseListData(List data, boolean lazy) { protected HoodieBaseListData(Stream dataStream, boolean lazy) { // NOTE: In case this container is being instantiated by an eager parent, we have to // pre-materialize the stream - this.data = lazy ? Either.left(dataStream) : Either.right(dataStream.collect(Collectors.toList())); + if (lazy) { + this.data = Either.left(dataStream); + } else { + this.data = Either.right(dataStream.collect(Collectors.toList())); + dataStream.close(); + } this.lazy = lazy; } @@ -69,9 +78,31 @@ protected long count() { protected List collectAsList() { if (lazy) { - return data.asLeft().collect(Collectors.toList()); + try (Stream stream = data.asLeft()) { + return stream.collect(Collectors.toList()); + } } else { return data.asRight(); } } + + static class IteratorCloser implements Runnable { + private static final Logger LOG = LoggerFactory.getLogger(IteratorCloser.class); + private final Iterator iterator; + + IteratorCloser(Iterator iterator) { + this.iterator = iterator; + } + + @Override + public void run() { + if (iterator instanceof AutoCloseable) { + try { + ((AutoCloseable) iterator).close(); + } catch (Exception ex) { + LOG.warn("Failed to properly close iterator", ex); + } + } + } + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListData.java b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListData.java index 690ab71c090e7..5eebf2a240141 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListData.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListData.java @@ -119,10 +119,12 @@ public HoodieData map(SerializableFunction func) { @Override public HoodieData mapPartitions(SerializableFunction, Iterator> func, boolean preservesPartitioning) { Function, Iterator> mapper = throwingMapWrapper(func); + Iterator iterator = asStream().iterator(); + Iterator newIterator = mapper.apply(iterator); return new HoodieListData<>( StreamSupport.stream( Spliterators.spliteratorUnknownSize( - mapper.apply(asStream().iterator()), Spliterator.ORDERED), true), + newIterator, Spliterator.ORDERED), true).onClose(new IteratorCloser(newIterator)), lazy ); } @@ -130,18 +132,22 @@ public HoodieData mapPartitions(SerializableFunction, Iterato @Override public HoodieData flatMap(SerializableFunction> func) { Function> mapper = throwingMapWrapper(func); - Stream mappedStream = asStream().flatMap(e -> - StreamSupport.stream( - Spliterators.spliteratorUnknownSize(mapper.apply(e), Spliterator.ORDERED), true)); + Stream mappedStream = asStream().flatMap(e -> { + Iterator iterator = mapper.apply(e); + return StreamSupport.stream( + Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), true).onClose(new IteratorCloser(iterator)); + }); return new HoodieListData<>(mappedStream, lazy); } @Override public HoodiePairData flatMapToPair(SerializableFunction>> func) { Function>> mapper = throwingMapWrapper(func); - Stream> mappedStream = asStream().flatMap(e -> - StreamSupport.stream( - Spliterators.spliteratorUnknownSize(mapper.apply(e), Spliterator.ORDERED), true)); + Stream> mappedStream = asStream().flatMap(e -> { + Iterator> iterator = mapper.apply(e); + return StreamSupport.stream( + Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), true).onClose(new IteratorCloser(iterator)); + }); return new HoodieListPairData<>(mappedStream, lazy); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListPairData.java b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListPairData.java index ebf7207c84e36..dd5bb2249622b 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListPairData.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListPairData.java @@ -105,7 +105,9 @@ public HoodieData values() { @Override public Map countByKey() { - return asStream().collect(Collectors.groupingBy(Pair::getKey, Collectors.counting())); + try (Stream> stream = asStream()) { + return stream.collect(Collectors.groupingBy(Pair::getKey, Collectors.counting())); + } } @Override @@ -114,27 +116,31 @@ public HoodiePairData> groupByKey() { Collector, ?, Map>> groupingCollector = Collectors.groupingBy(Pair::getKey, mappingCollector); - Map> groupedByKey = asStream().collect(groupingCollector); - return new HoodieListPairData<>( - groupedByKey.entrySet().stream().map(e -> Pair.of(e.getKey(), e.getValue())), - lazy - ); + try (Stream> s = asStream()) { + Map> groupedByKey = s.collect(groupingCollector); + return new HoodieListPairData<>( + groupedByKey.entrySet().stream().map(e -> Pair.of(e.getKey(), e.getValue())), + lazy + ); + } } @Override public HoodiePairData reduceByKey(SerializableBiFunction combiner, int parallelism) { - Map> reducedMap = asStream().collect( - Collectors.groupingBy( - Pair::getKey, - HashMap::new, - Collectors.mapping(Pair::getValue, Collectors.reducing(combiner::apply)))); - - return new HoodieListPairData<>( - reducedMap.entrySet() - .stream() - .map(e -> Pair.of(e.getKey(), e.getValue().orElse(null))), - lazy - ); + try (Stream> stream = asStream()) { + Map> reducedMap = stream.collect( + Collectors.groupingBy( + Pair::getKey, + HashMap::new, + Collectors.mapping(Pair::getValue, Collectors.reducing(combiner::apply)))); + + return new HoodieListPairData<>( + reducedMap.entrySet() + .stream() + .map(e -> Pair.of(e.getKey(), e.getValue().orElse(null))), + lazy + ); + } } @Override @@ -158,7 +164,7 @@ public HoodiePairData flatMapValues(SerializableFunction(mappedValuesIterator, w -> Pair.of(p.getKey(), w)); return StreamSupport.stream( - Spliterators.spliteratorUnknownSize(mappedPairsIterator, Spliterator.ORDERED), true); + Spliterators.spliteratorUnknownSize(mappedPairsIterator, Spliterator.ORDERED), true).onClose(new IteratorCloser(mappedValuesIterator)); }), lazy); } @@ -172,26 +178,28 @@ public HoodiePairData>> leftOuterJoin(HoodiePairData]] values - HashMap> rightStreamMap = ((HoodieListPairData) other).asStream().collect( - Collectors.groupingBy( - Pair::getKey, - HashMap::new, - Collectors.mapping(Pair::getValue, Collectors.toList()))); - - Stream>>> leftOuterJoined = asStream().flatMap(pair -> { - K key = pair.getKey(); - V leftValue = pair.getValue(); - List rightValues = rightStreamMap.get(key); - - if (rightValues == null) { - return Stream.of(Pair.of(key, Pair.of(leftValue, Option.empty()))); - } else { - return rightValues.stream().map(rightValue -> - Pair.of(key, Pair.of(leftValue, Option.of(rightValue)))); - } - }); - - return new HoodieListPairData<>(leftOuterJoined, lazy); + try (Stream> stream = ((HoodieListPairData) other).asStream()) { + HashMap> rightStreamMap = stream.collect( + Collectors.groupingBy( + Pair::getKey, + HashMap::new, + Collectors.mapping(Pair::getValue, Collectors.toList()))); + + Stream>>> leftOuterJoined = asStream().flatMap(pair -> { + K key = pair.getKey(); + V leftValue = pair.getValue(); + List rightValues = rightStreamMap.get(key); + + if (rightValues == null) { + return Stream.of(Pair.of(key, Pair.of(leftValue, Option.empty()))); + } else { + return rightValues.stream().map(rightValue -> + Pair.of(key, Pair.of(leftValue, Option.of(rightValue)))); + } + }); + + return new HoodieListPairData<>(leftOuterJoined, lazy); + } } @Override @@ -206,24 +214,26 @@ public HoodiePairData> join(HoodiePairData other) { ValidationUtils.checkArgument(other instanceof HoodieListPairData); // Transform right-side container to a multi-map of [[K]] to [[List]] values - HashMap> rightStreamMap = ((HoodieListPairData) other).asStream().collect( - Collectors.groupingBy( - Pair::getKey, - HashMap::new, - Collectors.mapping(Pair::getValue, Collectors.toList()))); - - List>> joinResult = new ArrayList<>(); - asStream().forEach(pair -> { - K key = pair.getKey(); - V leftValue = pair.getValue(); - List rightValues = rightStreamMap.getOrDefault(key, Collections.emptyList()); - - for (W rightValue : rightValues) { - joinResult.add(Pair.of(key, Pair.of(leftValue, rightValue))); - } - }); - - return new HoodieListPairData<>(joinResult, lazy); + try (Stream> stream = ((HoodieListPairData) other).asStream()) { + HashMap> rightStreamMap = stream.collect( + Collectors.groupingBy( + Pair::getKey, + HashMap::new, + Collectors.mapping(Pair::getValue, Collectors.toList()))); + + List>> joinResult = new ArrayList<>(); + asStream().forEach(pair -> { + K key = pair.getKey(); + V leftValue = pair.getValue(); + List rightValues = rightStreamMap.getOrDefault(key, Collections.emptyList()); + + for (W rightValue : rightValues) { + joinResult.add(Pair.of(key, Pair.of(leftValue, rightValue))); + } + }); + + return new HoodieListPairData<>(joinResult, lazy); + } } @Override diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java index a57fb777fbdc6..570d9580ca0df 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java @@ -348,12 +348,14 @@ public T next() { @Override public void close() { - try { - reader.close(); - } catch (IOException e) { - throw new HoodieIOException("Failed to close the reader", e); - } finally { - this.reader = null; + if (reader != null) { + try { + reader.close(); + } catch (IOException e) { + throw new HoodieIOException("Failed to close the reader", e); + } finally { + this.reader = null; + } } } } diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroHFileReaderImplBase.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroHFileReaderImplBase.java index 143d3ab01681c..4dba4d840f07c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroHFileReaderImplBase.java +++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroHFileReaderImplBase.java @@ -54,8 +54,9 @@ public abstract class HoodieAvroHFileReaderImplBase extends HoodieAvroFileReader public static List readAllRecords(HoodieAvroFileReader reader) throws IOException { Schema schema = reader.getSchema(); - return toStream(reader.getIndexedRecordIterator(schema)) - .collect(Collectors.toList()); + try (ClosableIterator indexedRecordIterator = reader.getIndexedRecordIterator(schema)) { + return toStream(indexedRecordIterator).collect(Collectors.toList()); + } } /** @@ -77,8 +78,9 @@ public static List readRecords(HoodieAvroHFileReaderImplBase read List keys, Schema schema) throws IOException { Collections.sort(keys); - return toStream(reader.getIndexedRecordsByKeysIterator(keys, schema)) - .collect(Collectors.toList()); + try (ClosableIterator indexedRecordsByKeysIterator = reader.getIndexedRecordsByKeysIterator(keys, schema)) { + return toStream(indexedRecordsByKeysIterator).collect(Collectors.toList()); + } } public abstract ClosableIterator getIndexedRecordsByKeysIterator(List keys, diff --git a/hudi-common/src/test/java/org/apache/hudi/common/data/CloseValidationIterator.java b/hudi-common/src/test/java/org/apache/hudi/common/data/CloseValidationIterator.java new file mode 100644 index 0000000000000..b61bdd6536f1d --- /dev/null +++ b/hudi-common/src/test/java/org/apache/hudi/common/data/CloseValidationIterator.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.common.data; + +import org.apache.hudi.common.util.collection.ClosableIterator; + +import java.util.Iterator; + +/** + * Implementation of a {@link ClosableIterator} to help validate that the close method is properly called. + * @param type of record within the iterator + */ +class CloseValidationIterator implements ClosableIterator { + private final Iterator inner; + private boolean isClosed = false; + + public CloseValidationIterator(Iterator inner) { + this.inner = inner; + } + + public boolean isClosed() { + return isClosed; + } + + @Override + public void close() { + isClosed = true; + } + + @Override + public boolean hasNext() { + return inner.hasNext(); + } + + @Override + public T next() { + return inner.next(); + } +} diff --git a/hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieListData.java b/hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieListData.java index 795318f5e01be..24bd9a909493f 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieListData.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieListData.java @@ -25,6 +25,7 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.ValueSource; import java.util.Arrays; import java.util.Collections; @@ -94,4 +95,41 @@ public void testIsEmpty() { emptyListData = HoodieListData.lazy(Collections.emptyList()); assertTrue(emptyListData.isEmpty()); } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testMapPartitionsWithCloseable(boolean isLazy) { + String partition1 = "partition1"; + String partition2 = "partition2"; + HoodieData input = new HoodieListData<>(Stream.of(partition1, partition2), isLazy); + CloseValidationIterator iterator = new CloseValidationIterator<>(Collections.singletonList("value").iterator()); + assertEquals(1, input.mapPartitions(partition -> iterator, true).collectAsList().size()); + assertTrue(iterator.isClosed()); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testFlatMapWithCloseable(boolean isLazy) { + String partition1 = "partition1"; + String partition2 = "partition2"; + CloseValidationIterator iterator1 = new CloseValidationIterator<>(Collections.singletonList("value").iterator()); + CloseValidationIterator iterator2 = new CloseValidationIterator<>(Collections.singletonList("value").iterator()); + HoodieData input = new HoodieListData<>(Stream.of(partition1, partition2), isLazy); + assertEquals(2, input.flatMap(partition -> partition.equals(partition1) ? iterator1 : iterator2).collectAsList().size()); + assertTrue(iterator1.isClosed()); + assertTrue(iterator2.isClosed()); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testFlatMapToPairWithCloseable(boolean isLazy) { + String partition1 = "partition1"; + String partition2 = "partition2"; + HoodieData input = new HoodieListData<>(Stream.of(partition1, partition2), isLazy); + CloseValidationIterator> iterator1 = new CloseValidationIterator<>(Collections.singletonList(Pair.of("1", "value")).iterator()); + CloseValidationIterator> iterator2 = new CloseValidationIterator<>(Collections.singletonList(Pair.of("2", "value")).iterator()); + assertEquals(2, input.flatMapToPair(partition -> partition.equals(partition1) ? iterator1 : iterator2).collectAsList().size()); + assertTrue(iterator1.isClosed()); + assertTrue(iterator2.isClosed()); + } } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieListDataPairData.java b/hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieListDataPairData.java index 8355a5f30edd9..d0bda7715a6a2 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieListDataPairData.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieListDataPairData.java @@ -31,6 +31,7 @@ import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.List; @@ -42,6 +43,7 @@ import static org.apache.hudi.common.util.CollectionUtils.createImmutableList; import static org.apache.hudi.common.util.CollectionUtils.createImmutableMap; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; /** * Tests {@link HoodieListPairData}. @@ -149,6 +151,50 @@ public void testReduceByKey(Map> expected, Map>> createdIterators = new ArrayList<>(); + HoodiePairData data = HoodieListData.lazy(Arrays.asList(1, 1, 1)) + .flatMapToPair(key -> { + CloseValidationIterator> iter = new CloseValidationIterator<>(Collections.singletonList(Pair.of(key, 1)).iterator()); + createdIterators.add(iter); + return iter; + }); + List> result = data.reduceByKey(Integer::sum, 1).collectAsList(); + assertEquals(Collections.singletonList(Pair.of(1, 3)), result); + createdIterators.forEach(iter -> assertTrue(iter.isClosed())); + } + + @Test + void testLeftOuterJoinWithCloseableInput() { + List>> createdIterators = new ArrayList<>(); + HoodiePairData dataToJoin = HoodieListData.lazy(Arrays.asList(1, 2, 3)) + .flatMapToPair(key -> { + CloseValidationIterator> iter = new CloseValidationIterator<>(Collections.singletonList(Pair.of(key, 1)).iterator()); + createdIterators.add(iter); + return iter; + }); + HoodiePairData data = HoodieListPairData.lazy(Arrays.asList(Pair.of(1, 1), Pair.of(4, 2))); + List>>> result = data.leftOuterJoin(dataToJoin).collectAsList(); + assertEquals(2, result.size()); + createdIterators.forEach(iter -> assertTrue(iter.isClosed())); + } + + @Test + void testJoinWithCloseableInput() { + List>> createdIterators = new ArrayList<>(); + HoodiePairData dataToJoin = HoodieListData.lazy(Arrays.asList(1, 2, 3)) + .flatMapToPair(key -> { + CloseValidationIterator> iter = new CloseValidationIterator<>(Collections.singletonList(Pair.of(key, 1)).iterator()); + createdIterators.add(iter); + return iter; + }); + HoodiePairData data = HoodieListPairData.lazy(Arrays.asList(Pair.of(1, 1), Pair.of(4, 2))); + List>> result = data.join(dataToJoin).collectAsList(); + assertEquals(1, result.size()); + createdIterators.forEach(iter -> assertTrue(iter.isClosed())); + } + @Test public void testLeftOuterJoinSingleValuePerKey() { HoodiePairData pairData1 = HoodieListPairData.lazy(Arrays.asList( diff --git a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHBaseHFileReaderWriter.java b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHBaseHFileReaderWriter.java index f34034d0a3570..e0d307e2f0344 100644 --- a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHBaseHFileReaderWriter.java +++ b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHBaseHFileReaderWriter.java @@ -21,6 +21,7 @@ import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.ClosableIterator; import org.apache.hudi.hadoop.fs.HadoopFSUtils; import org.apache.hudi.io.storage.HoodieAvroFileReader; import org.apache.hudi.io.storage.HoodieAvroHFileReaderImplBase; @@ -42,7 +43,6 @@ import java.io.IOException; import java.util.Arrays; -import java.util.Iterator; import java.util.List; import java.util.Spliterator; import java.util.Spliterators; @@ -107,15 +107,16 @@ public void testReaderGetRecordIteratorByKeysWithBackwardSeek() throws Exception (entry.get("_row_key").toString()).contains("key05") || (entry.get("_row_key").toString()).contains("key24") || (entry.get("_row_key").toString()).contains("key31"))).collect(Collectors.toList()); - Iterator iterator = + try (ClosableIterator iterator = hfileReader.getIndexedRecordsByKeysIterator( Arrays.asList("key00001", "key05", "key24", "key16", "key31", "key61"), - avroSchema); - List recordsByKeys = - StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false) - .map(r -> (GenericRecord) r) - .collect(Collectors.toList()); - assertEquals(expectedKey1s, recordsByKeys); + avroSchema)) { + List recordsByKeys = + StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false) + .map(r -> (GenericRecord) r) + .collect(Collectors.toList()); + assertEquals(expectedKey1s, recordsByKeys); + } } } diff --git a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHFileReaderWriter.java b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHFileReaderWriter.java index 362f58a00cf34..1044604419ace 100644 --- a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHFileReaderWriter.java +++ b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHFileReaderWriter.java @@ -21,6 +21,7 @@ import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.ClosableIterator; import org.apache.hudi.io.storage.HoodieAvroFileReader; import org.apache.hudi.io.storage.HoodieAvroHFileReaderImplBase; import org.apache.hudi.io.storage.HoodieNativeAvroHFileReader; @@ -32,7 +33,6 @@ import java.io.IOException; import java.util.Arrays; -import java.util.Iterator; import java.util.Spliterator; import java.util.Spliterators; import java.util.stream.Collectors; @@ -78,15 +78,16 @@ public void testReaderGetRecordIteratorByKeysWithBackwardSeek() throws Exception // Even though key16 exists, it's a backward seek not in order. // Our native HFile reader does not allow backward seek, and throws an exception // Note that backward seek is not expected to happen in production code - Iterator iterator = + try (ClosableIterator iterator = hfileReader.getIndexedRecordsByKeysIterator( Arrays.asList("key00001", "key05", "key24", "key16", "key31", "key61"), - avroSchema); - assertThrows( - IllegalStateException.class, - () -> StreamSupport.stream( - Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false) - .collect(Collectors.toList())); + avroSchema)) { + assertThrows( + IllegalStateException.class, + () -> StreamSupport.stream( + Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false) + .collect(Collectors.toList())); + } } } } diff --git a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestOrcReaderIterator.java b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestOrcReaderIterator.java index 4cf6f7c27c743..698f44597ce75 100644 --- a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestOrcReaderIterator.java +++ b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestOrcReaderIterator.java @@ -20,6 +20,7 @@ package org.apache.hudi.io.hadoop; import org.apache.hudi.common.util.AvroOrcUtils; +import org.apache.hudi.common.util.collection.ClosableIterator; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; @@ -39,7 +40,6 @@ import org.junit.jupiter.api.Test; import java.io.File; -import java.util.Iterator; import static org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResource; import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes; @@ -66,33 +66,34 @@ public void testOrcIteratorReadData() throws Exception { Schema avroSchema = getSchemaFromResource(TestOrcReaderIterator.class, "/simple-test.avsc"); TypeDescription orcSchema = AvroOrcUtils.createOrcSchema(avroSchema); OrcFile.WriterOptions options = OrcFile.writerOptions(conf).setSchema(orcSchema).compress(CompressionKind.ZLIB); - Writer writer = OrcFile.createWriter(filePath, options); - VectorizedRowBatch batch = orcSchema.createRowBatch(); - BytesColumnVector nameColumns = (BytesColumnVector) batch.cols[0]; - LongColumnVector numberColumns = (LongColumnVector) batch.cols[1]; - BytesColumnVector colorColumns = (BytesColumnVector) batch.cols[2]; - for (int r = 0; r < 5; ++r) { - int row = batch.size++; - byte[] name = getUTF8Bytes("name" + r); - nameColumns.setVal(row, name); - byte[] color = getUTF8Bytes("color" + r); - colorColumns.setVal(row, color); - numberColumns.vector[row] = r; + try (Writer writer = OrcFile.createWriter(filePath, options)) { + VectorizedRowBatch batch = orcSchema.createRowBatch(); + BytesColumnVector nameColumns = (BytesColumnVector) batch.cols[0]; + LongColumnVector numberColumns = (LongColumnVector) batch.cols[1]; + BytesColumnVector colorColumns = (BytesColumnVector) batch.cols[2]; + for (int r = 0; r < 5; ++r) { + int row = batch.size++; + byte[] name = getUTF8Bytes("name" + r); + nameColumns.setVal(row, name); + byte[] color = getUTF8Bytes("color" + r); + colorColumns.setVal(row, color); + numberColumns.vector[row] = r; + } + writer.addRowBatch(batch); } - writer.addRowBatch(batch); - writer.close(); Reader reader = OrcFile.createReader(filePath, OrcFile.readerOptions(conf)); RecordReader recordReader = reader.rows(new Reader.Options(conf).schema(orcSchema)); - Iterator iterator = new OrcReaderIterator<>(recordReader, avroSchema, orcSchema); - int recordCount = 0; - while (iterator.hasNext()) { - GenericRecord record = iterator.next(); - assertEquals("name" + recordCount, record.get("name").toString()); - assertEquals("color" + recordCount, record.get("favorite_color").toString()); - assertEquals(recordCount, record.get("favorite_number")); - recordCount++; + try (ClosableIterator iterator = new OrcReaderIterator<>(recordReader, avroSchema, orcSchema)) { + int recordCount = 0; + while (iterator.hasNext()) { + GenericRecord record = iterator.next(); + assertEquals("name" + recordCount, record.get("name").toString()); + assertEquals("color" + recordCount, record.get("favorite_color").toString()); + assertEquals(recordCount, record.get("favorite_number")); + recordCount++; + } + assertEquals(5, recordCount); } - assertEquals(5, recordCount); } } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/HoodieMultipleBaseFileFormat.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/HoodieMultipleBaseFileFormat.scala index c1b4106199210..e54faaac1da82 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/HoodieMultipleBaseFileFormat.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/HoodieMultipleBaseFileFormat.scala @@ -24,6 +24,7 @@ import org.apache.hudi.DataSourceReadOptions.{REALTIME_PAYLOAD_COMBINE_OPT_VAL, import org.apache.hudi.MergeOnReadSnapshotRelation.createPartitionedFile import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.model.{FileSlice, HoodieLogFile} +import org.apache.hudi.data.CloseableIteratorListener import org.apache.hudi.storage.StoragePath import org.apache.hadoop.conf.Configuration @@ -134,7 +135,7 @@ class HoodieMultipleBaseFileFormat(tableState: Broadcast[HoodieTableState], (file: PartitionedFile) => { val filePath = sparkAdapter.getSparkPartitionedFileUtils.getPathFromPartitionedFile(file) val fileFormat = detectFileFormat(filePath.toString) - file.partitionValues match { + val iter = file.partitionValues match { case fileSliceMapping: HoodiePartitionFileSliceMapping => if (FSUtils.isLogFile(filePath)) { // no base file @@ -192,6 +193,7 @@ class HoodieMultipleBaseFileFormat(tableState: Broadcast[HoodieTableState], case _ => throw new UnsupportedOperationException(s"Base file format $fileFormat is not supported.") } } + CloseableIteratorListener.addListener(iter) } } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala index 3bceb6eb4b84b..8f83f6c830fe8 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala @@ -26,6 +26,7 @@ import org.apache.hudi.common.config.{HoodieMemoryConfig, TypedProperties} import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} import org.apache.hudi.common.table.read.HoodieFileGroupReader +import org.apache.hudi.data.CloseableIteratorListener import org.apache.hudi.internal.schema.InternalSchema import org.apache.hudi.io.IOUtils import org.apache.hudi.storage.StorageConfiguration @@ -163,7 +164,7 @@ class HoodieFileGroupReaderBasedParquetFileFormat(tablePath: String, (file: PartitionedFile) => { val storageConf = new HadoopStorageConfiguration(broadcastedStorageConf.value.value) - file.partitionValues match { + val iter = file.partitionValues match { // Snapshot or incremental queries. case fileSliceMapping: HoodiePartitionFileSliceMapping => val filegroupName = FSUtils.getFileIdFromFilePath(sparkAdapter @@ -200,6 +201,7 @@ class HoodieFileGroupReaderBasedParquetFileFormat(tablePath: String, readBaseFile(file, parquetFileReader.value, requestedSchema, remainingPartitionSchema, fixedPartitionIndexes, requiredSchema, partitionSchema, outputSchema, filters, storageConf) } + CloseableIteratorListener.addListener(iter) } } @@ -265,6 +267,7 @@ class HoodieFileGroupReaderBasedParquetFileFormat(tablePath: String, private def makeCloseableFileGroupMappingRecordIterator(closeableFileGroupRecordIterator: HoodieFileGroupReader.HoodieFileGroupReaderIterator[InternalRow], mappingFunction: Function[InternalRow, InternalRow]): Iterator[InternalRow] = { + CloseableIteratorListener.addListener(closeableFileGroupRecordIterator) new Iterator[InternalRow] with Closeable { override def hasNext: Boolean = closeableFileGroupRecordIterator.hasNext diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/PartitionBucketIndexManager.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/PartitionBucketIndexManager.scala index 24c4663583ea0..a1b39d035842e 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/PartitionBucketIndexManager.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/PartitionBucketIndexManager.scala @@ -31,6 +31,7 @@ import org.apache.hudi.common.table.view.HoodieTableFileSystemView import org.apache.hudi.common.util.{Option, ValidationUtils} import org.apache.hudi.config.{HoodieIndexConfig, HoodieInternalConfig} import org.apache.hudi.config.HoodieWriteConfig.ROLLBACK_USING_MARKERS_ENABLE +import org.apache.hudi.data.CloseableIteratorListener import org.apache.hudi.exception.HoodieException import org.apache.hudi.index.bucket.partition.{PartitionBucketIndexCalculator, PartitionBucketIndexUtils} import org.apache.hudi.internal.schema.InternalSchema @@ -242,6 +243,7 @@ class PartitionBucketIndexManager extends BaseProcedure false) fileGroupReader.initRecordIterators() val iterator = fileGroupReader.getClosableIterator.asInstanceOf[HoodieFileGroupReader.HoodieFileGroupReaderIterator[InternalRow]] + CloseableIteratorListener.addListener(iterator) iterator.asScala }) }