Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.CloseableIteratorListener;
import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.exception.HoodieClusteringException;
import org.apache.hudi.exception.HoodieException;
Expand Down Expand Up @@ -335,7 +336,7 @@ private HoodieData<HoodieRecord<T>> readRecordsForGroupWithLogs(JavaSparkContext
};
suppliers.add(iteratorSupplier);
});
return new LazyConcatenatingIterator<>(suppliers);
return CloseableIteratorListener.addListener(new LazyConcatenatingIterator<>(suppliers));
}));
}

Expand All @@ -357,7 +358,7 @@ private HoodieData<HoodieRecord<T>> readRecordsForGroupBaseFiles(JavaSparkContex
iteratorGettersForPartition.add(recordIteratorGetter);
});

return new LazyConcatenatingIterator<>(iteratorGettersForPartition);
return CloseableIteratorListener.addListener(new LazyConcatenatingIterator<>(iteratorGettersForPartition));
}));
}

Expand Down Expand Up @@ -477,7 +478,7 @@ public Iterator<InternalRow> call(ClusteringOperation clusteringOperation) throw
0, Long.MAX_VALUE, usePosition, false);
fileGroupReader.initRecordIterators();
// read records from the FG reader
return fileGroupReader.getClosableIterator();
return CloseableIteratorListener.addListener(fileGroupReader.getClosableIterator());
}
}).rdd();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.hudi.data;

import org.apache.spark.TaskContext;
import org.apache.spark.util.TaskCompletionListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Iterator;

/**
* Helper class for adding a spark task completion listener that will ensure the iterator is closed if it is an instance of {@link AutoCloseable}.
* This is commonly used with {@link org.apache.hudi.common.util.collection.ClosableIterator} to ensure the resources are closed after the task completes.
*/
public class CloseableIteratorListener implements TaskCompletionListener {
private static final Logger LOG = LoggerFactory.getLogger(CloseableIteratorListener.class);
private final Object iterator;

private CloseableIteratorListener(Object iterator) {
this.iterator = iterator;
}

public static <T> Iterator<T> addListener(Iterator<T> iterator) {
TaskContext.get().addTaskCompletionListener(new CloseableIteratorListener(iterator));
return iterator;
}

public static <T> scala.collection.Iterator<T> addListener(scala.collection.Iterator<T> iterator) {
TaskContext.get().addTaskCompletionListener(new CloseableIteratorListener(iterator));
return iterator;
}

/**
* Closes the iterator if it also implements {@link AutoCloseable}, otherwise it is a no-op.
*
* @param context the spark context
*/
@Override
public void onTaskCompletion(TaskContext context) {
if (iterator instanceof AutoCloseable) {
try {
((AutoCloseable) iterator).close();
} catch (Exception ex) {
LOG.warn("Failed to properly close iterator", ex);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ public <W> HoodiePairData<K, W> mapValues(SerializableFunction<V, W> func) {
}

public <W> HoodiePairData<K, W> flatMapValues(SerializableFunction<V, Iterator<W>> func) {
return HoodieJavaPairRDD.of(pairRDDData.flatMapValues(func::apply));
return HoodieJavaPairRDD.of(pairRDDData.flatMapValues(iter -> CloseableIteratorListener.addListener(func.apply(iter))));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,21 +150,21 @@ public <O> HoodieData<O> map(SerializableFunction<T, O> func) {

@Override
public <O> HoodieData<O> mapPartitions(SerializableFunction<Iterator<T>, Iterator<O>> func, boolean preservesPartitioning) {
return HoodieJavaRDD.of(rddData.mapPartitions(func::apply, preservesPartitioning));
return HoodieJavaRDD.of(rddData.mapPartitions(iter -> CloseableIteratorListener.addListener(func.apply(iter)), preservesPartitioning));
}

@Override
public <O> HoodieData<O> flatMap(SerializableFunction<T, Iterator<O>> func) {
// NOTE: Unrolling this lambda into a method reference results in [[ClassCastException]]
// due to weird interop b/w Scala and Java
return HoodieJavaRDD.of(rddData.flatMap(e -> func.apply(e)));
return HoodieJavaRDD.of(rddData.flatMap(e -> CloseableIteratorListener.addListener(func.apply(e))));
}

@Override
public <K, V> HoodiePairData<K, V> flatMapToPair(SerializableFunction<T, Iterator<? extends Pair<K, V>>> func) {
return HoodieJavaPairRDD.of(
rddData.flatMapToPair(e ->
new MappingIterator<>(func.apply(e), p -> new Tuple2<>(p.getKey(), p.getValue()))));
new MappingIterator<>(CloseableIteratorListener.addListener(func.apply(e)), p -> new Tuple2<>(p.getKey(), p.getValue()))));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.junit.jupiter.api.Test;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;

import scala.Tuple2;
Expand Down Expand Up @@ -107,4 +108,15 @@ public void testLeftOuterJoinOperation() {
assertEquals(Option.of("value1"), item.getRight().getRight());
});
}

@Test
void testFlatMapValuesWithCloseable() {
String partition1 = "partition1";
String partition2 = "partition2";
HoodiePairData<Integer, String> input = HoodieJavaPairRDD.of(jsc.parallelizePairs(Arrays.asList(Tuple2.apply(1, partition1), Tuple2.apply(2, partition2)), 2));
input.flatMapValues(partition -> new TrackingCloseableIterator<>(partition, Collections.singletonList(1).iterator()))
.collectAsList();
assertTrue(TrackingCloseableIterator.isClosed(partition1));
assertTrue(TrackingCloseableIterator.isClosed(partition2));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,13 @@
import org.apache.spark.sql.internal.SQLConf;
import org.junit.jupiter.api.Test;

import java.util.Arrays;
import java.util.Collections;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class TestHoodieJavaRDD extends HoodieClientTestBase {
@Test
Expand Down Expand Up @@ -65,4 +68,37 @@ public void testDeduceNumPartitions() {
.reduceByKey((p1, p2) -> p1, 11);
assertEquals(11, shuffleRDD.deduceNumPartitions());
}

@Test
void testMapPartitionsWithCloseable() {
String partition1 = "partition1";
String partition2 = "partition2";
HoodieData<String> input = HoodieJavaRDD.of(Arrays.asList(partition1, partition2), context, 2);
input.mapPartitions(partition -> new TrackingCloseableIterator<>(partition.next(), Collections.singletonList("a").iterator()), true)
.collectAsList();
assertTrue(TrackingCloseableIterator.isClosed(partition1));
assertTrue(TrackingCloseableIterator.isClosed(partition2));
}

@Test
void testFlatMapWithCloseable() {
String partition1 = "partition1";
String partition2 = "partition2";
HoodieData<String> input = HoodieJavaRDD.of(Arrays.asList(partition1, partition2), context, 2);
input.flatMap(partition -> new TrackingCloseableIterator<>(partition, Collections.singletonList("a").iterator()))
.collectAsList();
assertTrue(TrackingCloseableIterator.isClosed(partition1));
assertTrue(TrackingCloseableIterator.isClosed(partition2));
}

@Test
void testFlatMapToPairWithCloseable() {
String partition1 = "partition1";
String partition2 = "partition2";
HoodieData<String> input = HoodieJavaRDD.of(Arrays.asList(partition1, partition2), context, 2);
input.flatMapToPair(partition -> new TrackingCloseableIterator<>(partition, Collections.singletonList(Pair.of(1, "1")).iterator()))
.collectAsList();
assertTrue(TrackingCloseableIterator.isClosed(partition1));
assertTrue(TrackingCloseableIterator.isClosed(partition2));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.hudi.data;

import org.apache.hudi.common.util.collection.ClosableIterator;

import java.io.Serializable;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;

/**
* Closeable iterator to use in Spark related testing to ensure that the close method is properly called after transformations.
* @param <T> type of record within the iterator
*/
class TrackingCloseableIterator<T> implements ClosableIterator<T>, Serializable {
private static final Map<String, Boolean> IS_CLOSED_BY_ID = new HashMap<>();
private final String id;
private final Iterator<T> inner;

public TrackingCloseableIterator(String id, Iterator<T> inner) {
this.id = id;
this.inner = inner;
IS_CLOSED_BY_ID.put(id, false);
}

public static boolean isClosed(String id) {
return IS_CLOSED_BY_ID.get(id);
}

@Override
public void close() {
IS_CLOSED_BY_ID.put(id, true);
}

@Override
public boolean hasNext() {
return inner.hasNext();
}

@Override
public T next() {
return inner.next();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@

import org.apache.hudi.common.util.Either;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand All @@ -43,7 +47,12 @@ protected HoodieBaseListData(List<T> data, boolean lazy) {
protected HoodieBaseListData(Stream<T> dataStream, boolean lazy) {
// NOTE: In case this container is being instantiated by an eager parent, we have to
// pre-materialize the stream
this.data = lazy ? Either.left(dataStream) : Either.right(dataStream.collect(Collectors.toList()));
if (lazy) {
this.data = Either.left(dataStream);
} else {
this.data = Either.right(dataStream.collect(Collectors.toList()));
dataStream.close();
}
this.lazy = lazy;
}

Expand All @@ -69,9 +78,31 @@ protected long count() {

protected List<T> collectAsList() {
if (lazy) {
return data.asLeft().collect(Collectors.toList());
try (Stream<T> stream = data.asLeft()) {
return stream.collect(Collectors.toList());
}
} else {
return data.asRight();
}
}

static class IteratorCloser implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(IteratorCloser.class);
private final Iterator<?> iterator;

IteratorCloser(Iterator<?> iterator) {
this.iterator = iterator;
}

@Override
public void run() {
if (iterator instanceof AutoCloseable) {
try {
((AutoCloseable) iterator).close();
} catch (Exception ex) {
LOG.warn("Failed to properly close iterator", ex);
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -119,29 +119,35 @@ public <O> HoodieData<O> map(SerializableFunction<T, O> func) {
@Override
public <O> HoodieData<O> mapPartitions(SerializableFunction<Iterator<T>, Iterator<O>> func, boolean preservesPartitioning) {
Function<Iterator<T>, Iterator<O>> mapper = throwingMapWrapper(func);
Iterator<T> iterator = asStream().iterator();
Iterator<O> newIterator = mapper.apply(iterator);
return new HoodieListData<>(
StreamSupport.stream(
Spliterators.spliteratorUnknownSize(
mapper.apply(asStream().iterator()), Spliterator.ORDERED), true),
newIterator, Spliterator.ORDERED), true).onClose(new IteratorCloser(newIterator)),
lazy
);
}

@Override
public <O> HoodieData<O> flatMap(SerializableFunction<T, Iterator<O>> func) {
Function<T, Iterator<O>> mapper = throwingMapWrapper(func);
Stream<O> mappedStream = asStream().flatMap(e ->
StreamSupport.stream(
Spliterators.spliteratorUnknownSize(mapper.apply(e), Spliterator.ORDERED), true));
Stream<O> mappedStream = asStream().flatMap(e -> {
Iterator<O> iterator = mapper.apply(e);
return StreamSupport.stream(
Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), true).onClose(new IteratorCloser(iterator));
});
return new HoodieListData<>(mappedStream, lazy);
}

@Override
public <K, V> HoodiePairData<K, V> flatMapToPair(SerializableFunction<T, Iterator<? extends Pair<K, V>>> func) {
Function<T, Iterator<? extends Pair<K, V>>> mapper = throwingMapWrapper(func);
Stream<Pair<K, V>> mappedStream = asStream().flatMap(e ->
StreamSupport.stream(
Spliterators.spliteratorUnknownSize(mapper.apply(e), Spliterator.ORDERED), true));
Stream<Pair<K, V>> mappedStream = asStream().flatMap(e -> {
Iterator<? extends Pair<K, V>> iterator = mapper.apply(e);
return StreamSupport.stream(
Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), true).onClose(new IteratorCloser(iterator));
});

return new HoodieListPairData<>(mappedStream, lazy);
}
Expand Down
Loading
Loading