diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/ClosableMergingIterator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/ClosableMergingIterator.java new file mode 100644 index 000000000000..bbd51d8b95d4 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/ClosableMergingIterator.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client.utils; + +import org.apache.hudi.common.util.collection.ClosableIterator; + +import java.util.function.BiFunction; + +/** + * Closeable counterpart of {@link MergingIterator} + */ +public class ClosableMergingIterator extends MergingIterator implements ClosableIterator { + + public ClosableMergingIterator(ClosableIterator leftIterator, + ClosableIterator rightIterator, + BiFunction mergeFunction) { + super(leftIterator, rightIterator, mergeFunction); + } + + @Override + public void close() { + ((ClosableIterator) leftIterator).close(); + ((ClosableIterator) rightIterator).close(); + } +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/MergingIterator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/MergingIterator.java index 2dd7c44c4df1..28389100b377 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/MergingIterator.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/MergingIterator.java @@ -18,19 +18,27 @@ package org.apache.hudi.client.utils; -import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.util.ValidationUtils; import java.util.Iterator; import java.util.function.BiFunction; -public class MergingIterator implements Iterator { +/** + * Iterator providing for the semantic of simultaneously iterating over 2 other iterators + * and combining their respective output + * + * @param type returned by the first iterator + * @param type returned by the second iterator + * @param type returned by this iterator + */ +public class MergingIterator implements Iterator { + + protected final Iterator leftIterator; + protected final Iterator rightIterator; - private final Iterator leftIterator; - private final Iterator rightIterator; - private final BiFunction mergeFunction; + private final BiFunction mergeFunction; - public MergingIterator(Iterator leftIterator, Iterator rightIterator, BiFunction mergeFunction) { + public MergingIterator(Iterator leftIterator, Iterator rightIterator, BiFunction mergeFunction) { this.leftIterator = leftIterator; this.rightIterator = rightIterator; this.mergeFunction = mergeFunction; @@ -45,7 +53,7 @@ public boolean hasNext() { } @Override - public T next() { + public R next() { return mergeFunction.apply(leftIterator.next(), rightIterator.next()); } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java index de16ea17c911..f7df0816e312 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java @@ -65,6 +65,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -498,21 +499,26 @@ protected void doWrite(HoodieRecord record, Schema schema, TypedProperties props @Override public List close() { try { + if (isClosed()) { + // Handle has already been closed + return Collections.emptyList(); + } + + markClosed(); // flush any remaining records to disk appendDataAndDeleteBlocks(header, true); recordItr = null; - if (writer != null) { - writer.close(); - writer = null; - - // update final size, once for all log files - // TODO we can actually deduce file size purely from AppendResult (based on offset and size - // of the appended block) - for (WriteStatus status : statuses) { - long logFileSize = FSUtils.getFileSize(fs, new Path(config.getBasePath(), status.getStat().getPath())); - status.getStat().setFileSizeInBytes(logFileSize); - } + + writer.close(); + + // update final size, once for all log files + // TODO we can actually deduce file size purely from AppendResult (based on offset and size + // of the appended block) + for (WriteStatus status : statuses) { + long logFileSize = FSUtils.getFileSize(fs, new Path(config.getBasePath(), status.getStat().getPath())); + status.getStat().setFileSizeInBytes(logFileSize); } + return statuses; } catch (IOException e) { throw new HoodieUpsertException("Failed to close UpdateHandle", e); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java index 8b20df3f1a5c..97fd9878b4ff 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java @@ -203,6 +203,12 @@ public IOType getIOType() { public List close() { LOG.info("Closing the file " + writeStatus.getFileId() + " as we are done with all the records " + recordsWritten); try { + if (isClosed()) { + // Handle has already been closed + return Collections.emptyList(); + } + + markClosed(); if (fileWriter != null) { fileWriter.close(); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java index d83237cb9253..416e3ebfe5e0 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java @@ -407,6 +407,12 @@ protected void writeIncomingRecords() throws IOException { @Override public List close() { try { + if (isClosed()) { + // Handle has already been closed + return Collections.emptyList(); + } + + markClosed(); writeIncomingRecords(); if (keyToNewRecords instanceof ExternalSpillableMap) { @@ -416,10 +422,8 @@ public List close() { keyToNewRecords = null; writtenRecordKeys = null; - if (fileWriter != null) { - fileWriter.close(); - fileWriter = null; - } + fileWriter.close(); + fileWriter = null; long fileSizeInBytes = FSUtils.getFileSize(fs, newFilePath); HoodieWriteStat stat = writeStatus.getStat(); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java index 3a053e6439e6..676e407f2c1a 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java @@ -76,6 +76,8 @@ public abstract class HoodieWriteHandle extends HoodieIOHandle hoodieTable, TaskContextSupplier taskContextSupplier) { this(config, instantTime, partitionPath, fileId, hoodieTable, @@ -175,6 +177,14 @@ public void write(HoodieRecord record, Schema schema, TypedProperties props) { doWrite(record, schema, props); } + protected boolean isClosed() { + return closed; + } + + protected void markClosed() { + this.closed = true; + } + public abstract List close(); public List writeStatuses() { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/bootstrap/BootstrapRecordConsumer.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/bootstrap/BootstrapRecordConsumer.java index f9b85679fbbe..b3496ad3dc03 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/bootstrap/BootstrapRecordConsumer.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/bootstrap/BootstrapRecordConsumer.java @@ -41,6 +41,7 @@ public void consume(HoodieRecord record) { @Override public Void finish() { + bootstrapHandle.close(); return null; } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java index 17b8620da63f..138e6a840d4b 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java @@ -56,6 +56,7 @@ public void consume(HoodieRecord record) { @Override public Void finish() { + upsertHandle.close(); return null; } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java index b7668a347969..893ee3fc0321 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java @@ -18,14 +18,15 @@ package org.apache.hudi.table.action.commit; +import org.apache.hudi.client.utils.ClosableMergingIterator; import org.apache.hudi.common.config.HoodieCommonConfig; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.TableSchemaResolver; -import org.apache.hudi.common.util.collection.ClosableIterator; import org.apache.hudi.common.util.InternalSchemaCache; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.ClosableIterator; import org.apache.hudi.common.util.queue.HoodieExecutor; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; @@ -49,7 +50,6 @@ import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; @@ -105,10 +105,10 @@ public void runMerge(HoodieTable table, || !isPureProjection || baseFile.getBootstrapBaseFile().isPresent(); - HoodieExecutor wrapper = null; + HoodieExecutor executor = null; try { - Iterator recordIterator; + ClosableIterator recordIterator; // In case writer's schema is simply a projection of the reader's one we can read // the records in the projected schema directly @@ -123,8 +123,11 @@ public void runMerge(HoodieTable table, HoodieFileReaderFactory.getReaderFactory(recordType).getFileReader(bootstrapFileConfig, bootstrapFilePath), mergeHandle.getPartitionFields(), mergeHandle.getPartitionValues()); - recordIterator = bootstrapFileReader.getRecordIterator(mergeHandle.getWriterSchemaWithMetaFields()); recordSchema = mergeHandle.getWriterSchemaWithMetaFields(); + recordIterator = new ClosableMergingIterator<>( + baseFileRecordIterator, + (ClosableIterator) bootstrapFileReader.getRecordIterator(recordSchema), + (left, right) -> left.joinWith(right, recordSchema)); } else { recordIterator = baseFileRecordIterator; recordSchema = isPureProjection ? writerSchema : readerSchema; @@ -132,7 +135,7 @@ public void runMerge(HoodieTable table, boolean isBufferingRecords = ExecutorFactory.isBufferingRecords(writeConfig); - wrapper = ExecutorFactory.create(writeConfig, recordIterator, new UpdateHandler(mergeHandle), record -> { + executor = ExecutorFactory.create(writeConfig, recordIterator, new UpdateHandler(mergeHandle), record -> { HoodieRecord newRecord; if (schemaEvolutionTransformerOpt.isPresent()) { newRecord = schemaEvolutionTransformerOpt.get().apply(record); @@ -148,21 +151,19 @@ public void runMerge(HoodieTable table, return isBufferingRecords ? newRecord.copy() : newRecord; }, table.getPreExecuteRunnable()); - wrapper.execute(); + executor.execute(); } catch (Exception e) { throw new HoodieException(e); } finally { - // HUDI-2875: mergeHandle is not thread safe, we should totally terminate record inputting - // and executor firstly and then close mergeHandle. - baseFileReader.close(); - if (bootstrapFileReader != null) { - bootstrapFileReader.close(); - } - if (null != wrapper) { - wrapper.shutdownNow(); - wrapper.awaitTermination(); + // NOTE: If executor is initialized it's responsible for gracefully shutting down + // both producer and consumer + if (executor != null) { + executor.shutdownNow(); + executor.awaitTermination(); + } else { + baseFileReader.close(); + mergeHandle.close(); } - mergeHandle.close(); } } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/FlinkLazyInsertIterable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/FlinkLazyInsertIterable.java index 6e573ec9432b..3a088cc51d21 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/FlinkLazyInsertIterable.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/FlinkLazyInsertIterable.java @@ -56,20 +56,20 @@ public FlinkLazyInsertIterable(Iterator> recordItr, @Override protected List computeNext() { // Executor service used for launching writer thread. - HoodieExecutor> bufferedIteratorExecutor = null; + HoodieExecutor> executor = null; try { - final Schema schema = new Schema.Parser().parse(hoodieConfig.getSchema()); - bufferedIteratorExecutor = ExecutorFactory.create(hoodieConfig, inputItr, getExplicitInsertHandler(), + Schema schema = new Schema.Parser().parse(hoodieConfig.getSchema()); + executor = ExecutorFactory.create(hoodieConfig, inputItr, getExplicitInsertHandler(), getTransformer(schema, hoodieConfig)); - final List result = bufferedIteratorExecutor.execute(); + final List result = executor.execute(); checkState(result != null && !result.isEmpty()); return result; } catch (Exception e) { throw new HoodieException(e); } finally { - if (null != bufferedIteratorExecutor) { - bufferedIteratorExecutor.shutdownNow(); - bufferedIteratorExecutor.awaitTermination(); + if (executor != null) { + executor.shutdownNow(); + executor.awaitTermination(); } } } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkDeletePartitionCommitActionExecutor.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkDeletePartitionCommitActionExecutor.java index 3f19534d08cd..5fc6d8a807aa 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkDeletePartitionCommitActionExecutor.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkDeletePartitionCommitActionExecutor.java @@ -66,7 +66,7 @@ public HoodieWriteMetadata> execute() { DeletePartitionUtils.checkForPendingTableServiceActions(table, partitions); try { - HoodieTimer timer = new HoodieTimer().startTimer(); + HoodieTimer timer = HoodieTimer.start(); context.setJobStatus(this.getClass().getSimpleName(), "Gather all file ids from all deleting partitions."); Map> partitionToReplaceFileIds = context.parallelize(partitions).distinct().collectAsList() diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/JavaLazyInsertIterable.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/JavaLazyInsertIterable.java index d2e813a506bc..d3612fbf8686 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/JavaLazyInsertIterable.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/JavaLazyInsertIterable.java @@ -59,21 +59,20 @@ public JavaLazyInsertIterable(Iterator> recordItr, @Override protected List computeNext() { // Executor service used for launching writer thread. - HoodieExecutor> bufferedIteratorExecutor = + HoodieExecutor> executor = null; try { final Schema schema = new Schema.Parser().parse(hoodieConfig.getSchema()); - bufferedIteratorExecutor = - ExecutorFactory.create(hoodieConfig, inputItr, getInsertHandler(), getTransformer(schema, hoodieConfig)); - final List result = bufferedIteratorExecutor.execute(); + executor = ExecutorFactory.create(hoodieConfig, inputItr, getInsertHandler(), getTransformer(schema, hoodieConfig)); + final List result = executor.execute(); checkState(result != null && !result.isEmpty()); return result; } catch (Exception e) { throw new HoodieException(e); } finally { - if (null != bufferedIteratorExecutor) { - bufferedIteratorExecutor.shutdownNow(); - bufferedIteratorExecutor.awaitTermination(); + if (executor != null) { + executor.shutdownNow(); + executor.awaitTermination(); } } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/OrcBootstrapMetadataHandler.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/OrcBootstrapMetadataHandler.java index 6c7f70cc58ea..fa60148ea10b 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/OrcBootstrapMetadataHandler.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/OrcBootstrapMetadataHandler.java @@ -70,11 +70,12 @@ void executeBootstrap(HoodieBootstrapHandle bootstrapHandle, Path so if (config.getRecordMerger().getRecordType() == HoodieRecordType.SPARK) { throw new UnsupportedOperationException(); } - HoodieExecutor wrapper = null; Reader orcReader = OrcFile.createReader(sourceFilePath, OrcFile.readerOptions(table.getHadoopConf())); TypeDescription orcSchema = AvroOrcUtils.createOrcSchema(avroSchema); - try (RecordReader reader = orcReader.rows(new Reader.Options(table.getHadoopConf()).schema(orcSchema))) { - wrapper = ExecutorFactory.create(config, new OrcReaderIterator(reader, avroSchema, orcSchema), + HoodieExecutor executor = null; + RecordReader reader = orcReader.rows(new Reader.Options(table.getHadoopConf()).schema(orcSchema)); + try { + executor = ExecutorFactory.create(config, new OrcReaderIterator(reader, avroSchema, orcSchema), new BootstrapRecordConsumer(bootstrapHandle), inp -> { String recKey = keyGenerator.getKey(inp).getRecordKey(); GenericRecord gr = new GenericData.Record(METADATA_BOOTSTRAP_RECORD_SCHEMA); @@ -82,16 +83,20 @@ void executeBootstrap(HoodieBootstrapHandle bootstrapHandle, Path so BootstrapRecordPayload payload = new BootstrapRecordPayload(gr); HoodieRecord rec = new HoodieAvroRecord(new HoodieKey(recKey, partitionPath), payload); return rec; - }, table.getPreExecuteRunnable()); - wrapper.execute(); + }, table.getPreExecuteRunnable()); + executor.execute(); } catch (Exception e) { throw new HoodieException(e); } finally { - if (null != wrapper) { - wrapper.shutdownNow(); - wrapper.awaitTermination(); + // NOTE: If executor is initialized it's responsible for gracefully shutting down + // both producer and consumer + if (executor != null) { + executor.shutdownNow(); + executor.awaitTermination(); + } else { + reader.close(); + bootstrapHandle.close(); } - bootstrapHandle.close(); } } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java index 8f412a39f3cd..8ef6ab8f5cff 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java @@ -78,11 +78,12 @@ protected void executeBootstrap(HoodieBootstrapHandle bootstrapHandl KeyGeneratorInterface keyGenerator, String partitionPath, Schema schema) throws Exception { - HoodieExecutor wrapper = null; HoodieRecordMerger recordMerger = table.getConfig().getRecordMerger(); HoodieFileReader reader = HoodieFileReaderFactory.getReaderFactory(recordMerger.getRecordType()) .getFileReader(table.getHadoopConf(), sourceFilePath); + + HoodieExecutor executor = null; try { Function transformer = record -> { String recordKey = record.getRecordKey(schema, Option.of(keyGenerator)); @@ -92,21 +93,22 @@ protected void executeBootstrap(HoodieBootstrapHandle bootstrapHandl // it since these records will be inserted into the queue later. .copy(); }; - ClosableIterator recordIterator = reader.getRecordIterator(schema); - wrapper = ExecutorFactory.create(config, recordIterator, + executor = ExecutorFactory.create(config, recordIterator, new BootstrapRecordConsumer(bootstrapHandle), transformer, table.getPreExecuteRunnable()); - - wrapper.execute(); + executor.execute(); } catch (Exception e) { throw new HoodieException(e); } finally { - reader.close(); - if (null != wrapper) { - wrapper.shutdownNow(); - wrapper.awaitTermination(); + // NOTE: If executor is initialized it's responsible for gracefully shutting down + // both producer and consumer + if (executor != null) { + executor.shutdownNow(); + executor.awaitTermination(); + } else { + reader.close(); + bootstrapHandle.close(); } - bootstrapHandle.close(); } } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutorInSpark.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutorInSpark.java index eb61cb433120..bca0764b8760 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutorInSpark.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutorInSpark.java @@ -29,7 +29,6 @@ import org.apache.hudi.exception.HoodieException; import org.apache.hudi.testutils.HoodieClientTestHarness; -import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; import org.apache.spark.TaskContext; import org.apache.spark.TaskContext$; @@ -154,15 +153,15 @@ public Integer finish() { @Test public void testExecutorTermination() { - Iterator unboundedRecordIter = new Iterator() { + Iterator unboundedRecordIter = new Iterator() { @Override public boolean hasNext() { return true; } @Override - public GenericRecord next() { - return dataGen.generateGenericRecord(); + public HoodieRecord next() { + return dataGen.generateInserts(instantTime, 1).get(0); } }; diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorMessageQueue.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorMessageQueue.java index 4c8e0dac27db..b2fd31fd3296 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorMessageQueue.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorMessageQueue.java @@ -281,16 +281,6 @@ public Integer finish() { @Timeout(value = 60) public void testException() throws Exception { final int numRecords = 1000; - final int numProducers = 40; - - final DisruptorMessageQueue queue = - new DisruptorMessageQueue(1024, getTransformer(HoodieTestDataGenerator.AVRO_SCHEMA, writeConfig), - "BLOCKING_WAIT", numProducers, new Runnable() { - @Override - public void run() { - // do nothing. - } - }); List pRecs = dataGen.generateInserts(instantTime, numRecords); @@ -307,8 +297,7 @@ public void run() { })); } } - - + HoodieConsumer, Integer> consumer = new HoodieConsumer, Integer>() { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java index 71a4d19fe474..bf220ca78471 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java @@ -76,7 +76,7 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader private static final Logger LOG = LoggerFactory.getLogger(HoodieMergedLogRecordScanner.class); // A timer for calculating elapsed time in millis - public final HoodieTimer timer = new HoodieTimer(); + public final HoodieTimer timer = HoodieTimer.create(); // Map of compacted/merged records private final ExternalSpillableMap records; // Set of already scanned prefixes allowing us to avoid scanning same prefixes again diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/FutureUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/FutureUtils.java index d9c1dee6e817..bb9819df8bf7 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/FutureUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/FutureUtils.java @@ -18,8 +18,6 @@ package org.apache.hudi.common.util; -import javax.annotation.Nonnull; - import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; @@ -30,18 +28,38 @@ public class FutureUtils { /** - * Parallel CompletableFutures + * Similar to {@link CompletableFuture#allOf(CompletableFuture[])} with a few important + * differences: + * + *
    + *
  1. Completes successfully as soon as *all* of the futures complete successfully
  2. + *
  3. Completes exceptionally as soon as *any* of the futures complete exceptionally
  4. + *
  5. In case it's completed exceptionally all the other futures not completed yet, will be + * cancelled
  6. + *
* - * @param futures CompletableFuture list - * @return a new CompletableFuture which will completed when all of the given CompletableFutures complete. + * @param futures list of {@link CompletableFuture}s */ - public static CompletableFuture> allOf(@Nonnull List> futures) { - return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])) - .thenApply(aVoid -> - futures.stream() - // NOTE: This join wouldn't block, since all the - // futures are completed at this point. - .map(CompletableFuture::join) - .collect(Collectors.toList())); + public static CompletableFuture> allOf(List> futures) { + CompletableFuture union = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); + + futures.forEach(future -> { + // NOTE: We add a callback to every future, to cancel all the other not yet completed futures, + // which will be providing for an early termination semantic: whenever any of the futures + // fail other futures will be cancelled and the exception will be returned as a result + future.whenComplete((ignored, throwable) -> { + if (throwable != null) { + futures.forEach(f -> f.cancel(true)); + union.completeExceptionally(throwable); + } + }); + }); + + return union.thenApply(aVoid -> + futures.stream() + // NOTE: This join wouldn't block, since all the + // futures are completed at this point. + .map(CompletableFuture::join) + .collect(Collectors.toList())); } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieTimer.java b/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieTimer.java index a0a8ca0867e9..a35cd60933f8 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieTimer.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieTimer.java @@ -30,7 +30,7 @@ public class HoodieTimer { // Ordered stack of TimeInfo's to make sure stopping the timer returns the correct elapsed time - private final Deque timeInfoDeque = new ArrayDeque<>(); + private final Deque timeInfoDeque = new ArrayDeque<>(1); /** * @deprecated please use either {@link HoodieTimer#start} or {@link HoodieTimer#create} APIs @@ -47,7 +47,6 @@ private HoodieTimer(boolean shouldStart) { } static class TimeInfo { - // captures the startTime of the code block long startTime; // is the timing still running for the last started timer @@ -84,10 +83,16 @@ public long endTimer() { return timeInfoDeque.pop().stop(); } + /** + * Creates an instance of {@link HoodieTimer} already started + */ public static HoodieTimer start() { return new HoodieTimer(true); } + /** + * Creates an instance of {@link HoodieTimer} that is NOT started + */ public static HoodieTimer create() { return new HoodieTimer(false); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ClosableIterator.java b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ClosableIterator.java index ac0335cd3eb2..f91aede5f3e4 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ClosableIterator.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ClosableIterator.java @@ -24,8 +24,6 @@ * An iterator that give a chance to release resources. * * @param The return type - * - * TODO move under common.util.collection */ public interface ClosableIterator extends Iterator, AutoCloseable { @Override diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BaseHoodieQueueBasedExecutor.java b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BaseHoodieQueueBasedExecutor.java index 2bd01bdd3349..86011e865dc0 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BaseHoodieQueueBasedExecutor.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BaseHoodieQueueBasedExecutor.java @@ -25,6 +25,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Arrays; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; @@ -66,6 +67,9 @@ public abstract class BaseHoodieQueueBasedExecutor implements HoodieExe private final List> producers; // Consumer protected final Option> consumer; + // Futures corresponding to producing/consuming processes + private CompletableFuture consumingFuture; + private CompletableFuture producingFuture; public BaseHoodieQueueBasedExecutor(List> producers, Option> consumer, @@ -152,7 +156,28 @@ public final boolean awaitTermination() { } @Override - public void shutdownNow() { + public final void shutdownNow() { + // NOTE: PLEASE READ CAREFULLY + // Graceful shutdown sequence have been a source of multiple issues in the + // past (for ex HUDI-2875, HUDI-5238). To handle it appropriately in a graceful + // fashion we're consolidating shutdown sequence w/in the Executor itself (in + // this method) shutting down in following order + // + // 1. We shut down producing/consuming pipeline (by interrupting + // corresponding futures), then + // 2. We shut down producer and consumer (if present), and after that + // 3. We shut down the executors + // + if (producingFuture != null) { + producingFuture.cancel(true); + } + if (consumingFuture != null) { + consumingFuture.cancel(true); + } + // Clean up resources associated w/ producers/consumers + producers.forEach(HoodieProducer::close); + consumer.ifPresent(HoodieConsumer::finish); + // Shutdown executor-services producerExecutorService.shutdownNow(); consumerExecutorService.shutdownNow(); } @@ -170,12 +195,12 @@ public E execute() { checkState(this.consumer.isPresent()); setUp(); // Start consuming/producing asynchronously - CompletableFuture consuming = startConsumingAsync(); - CompletableFuture producing = startProducingAsync(); + this.consumingFuture = startConsumingAsync(); + this.producingFuture = startProducingAsync(); // NOTE: To properly support mode when there's no consumer, we have to fall back // to producing future as the trigger for us to shut down the queue - return producing.thenCombine(consuming, (aVoid, anotherVoid) -> null) + return allOf(Arrays.asList(producingFuture, consumingFuture)) .whenComplete((ignored, throwable) -> { // Close the queue to release the resources queue.close(); @@ -190,6 +215,11 @@ public E execute() { // to be interrupted as well Thread.currentThread().interrupt(); } + // throw if we have any other exception seen already. There is a chance that cancellation/closing of producers with CompeletableFuture wins before the actual exception + // is thrown. + if (this.queue.getThrowable() != null) { + throw new HoodieException(queue.getThrowable()); + } throw new HoodieException(e); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueue.java b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueue.java index fa22efec2410..e9d13b10dca2 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueue.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueue.java @@ -270,6 +270,11 @@ public void markAsFailed(Throwable e) { this.rateLimiter.release(RECORD_CACHING_LIMIT + 1); } + @Override + public Throwable getThrowable() { + return this.hasFailed.get(); + } + @Override public boolean isEmpty() { return this.queue.size() == 0; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/DisruptorMessageQueue.java b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/DisruptorMessageQueue.java index 1c91b8123994..ea0efab5386c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/DisruptorMessageQueue.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/DisruptorMessageQueue.java @@ -32,6 +32,7 @@ import org.slf4j.LoggerFactory; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; /** @@ -47,6 +48,7 @@ public class DisruptorMessageQueue implements HoodieMessageQueue { private final Disruptor queue; private final Function transformFunction; private final RingBuffer ringBuffer; + private AtomicReference throwable = new AtomicReference<>(null); private boolean isShutdown = false; private boolean isStarted = false; @@ -89,9 +91,15 @@ public Option readNextRecord() { @Override public void markAsFailed(Throwable e) { + this.throwable.compareAndSet(null, e); // no-op } + @Override + public Throwable getThrowable() { + return this.throwable.get(); + } + @Override public boolean isEmpty() { return ringBuffer.getBufferSize() == ringBuffer.remainingCapacity(); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/HoodieMessageQueue.java b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/HoodieMessageQueue.java index 185cdea022e7..79baf23e97a5 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/HoodieMessageQueue.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/HoodieMessageQueue.java @@ -50,6 +50,8 @@ public interface HoodieMessageQueue extends Closeable { */ void markAsFailed(Throwable e); + Throwable getThrowable(); + boolean isEmpty(); /**