diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMergeSink.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMergeSink.java index 0f521597ce5a..a976fcce7e16 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMergeSink.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMergeSink.java @@ -48,6 +48,7 @@ import org.roaringbitmap.longlong.LongBitmapDataProvider; import org.roaringbitmap.longlong.Roaring64Bitmap; +import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; @@ -55,7 +56,6 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; import java.util.stream.IntStream; @@ -170,7 +170,7 @@ public CompletableFuture> finish() return completedFuture(fragments); } - // In spite of the name "Delta" Lake, we must rewrite the entire file to delete rows. + // In spite of the name "Delta" Lake, we must rewrite the entire file to delete rows. private List rewriteFile(Path sourcePath, FileDeletion deletion) { try { @@ -210,10 +210,7 @@ private FileWriter createParquetFileWriter(FileSystem fileSystem, Path path, Lis CompressionCodecName compressionCodecName = getCompressionCodec(session).getParquetCompressionCodec(); try { - Callable rollbackAction = () -> { - fileSystem.delete(path, false); - return null; - }; + Closeable rollbackAction = () -> fileSystem.delete(path, false); List parquetTypes = dataColumns.stream() .map(column -> { diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSink.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSink.java index b4d0cf0218c1..ab99be045722 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSink.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSink.java @@ -14,7 +14,7 @@ package io.trino.plugin.deltalake; import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; +import com.google.common.collect.Streams; import com.google.common.primitives.Ints; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -49,15 +49,16 @@ import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.joda.time.DateTimeZone; +import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Properties; -import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; import static com.google.common.base.Verify.verify; @@ -117,7 +118,7 @@ public class DeltaLakePageSink private long writtenBytes; private long memoryUsage; - private final List closedWriters = new ArrayList<>(); + private final List closedWriterRollbackActions = new ArrayList<>(); private final ImmutableList.Builder dataFileInfos = ImmutableList.builder(); public DeltaLakePageSink( @@ -229,16 +230,13 @@ public CompletableFuture> finish() private ListenableFuture> doFinish() { - for (DeltaLakeWriter writer : writers) { - closeWriter(writer); + for (int writerIndex = 0; writerIndex < writers.size(); writerIndex++) { + closeWriter(writerIndex); } + writers.clear(); List result = dataFileInfos.build(); - writtenBytes = closedWriters.stream() - .mapToLong(DeltaLakeWriter::getWrittenBytes) - .sum(); - return Futures.immediateFuture(result); } @@ -250,21 +248,27 @@ public void abort() private void doAbort() { - Optional rollbackException = Optional.empty(); - for (DeltaLakeWriter writer : Iterables.concat(writers, closedWriters)) { - // writers can contain nulls if an exception is thrown when doAppend expends the writer list - if (writer != null) { - try { - writer.rollback(); - } - catch (Exception e) { - LOG.warn("exception '%s' while rollback on %s", e, writer); - rollbackException = Optional.of(e); + List rollbackActions = Streams.concat( + writers.stream() + // writers can contain nulls if an exception is thrown when doAppend expends the writer list + .filter(Objects::nonNull) + .map(writer -> writer::rollback), + closedWriterRollbackActions.stream()) + .collect(toImmutableList()); + RuntimeException rollbackException = null; + for (Closeable rollbackAction : rollbackActions) { + try { + rollbackAction.close(); + } + catch (Throwable t) { + if (rollbackException == null) { + rollbackException = new TrinoException(DELTA_LAKE_BAD_WRITE, "Error rolling back write to Delta Lake"); } + rollbackException.addSuppressed(t); } } - if (rollbackException.isPresent()) { - throw new TrinoException(DELTA_LAKE_BAD_WRITE, "Error rolling back write to Delta Lake", rollbackException.get()); + if (rollbackException != null) { + throw rollbackException; } } @@ -363,7 +367,7 @@ private int[] getWriterIndexes(Page page) if (deltaLakeWriter.getWrittenBytes() <= targetMaxFileSize) { continue; } - closeWriter(deltaLakeWriter); + closeWriter(writerIndex); } else { continue; @@ -403,6 +407,7 @@ private int[] getWriterIndexes(Page page) dataColumnHandles); writers.set(writerIndex, writer); + memoryUsage += writer.getMemoryUsage(); } catch (IOException e) { throw new TrinoException(DELTA_LAKE_BAD_WRITE, "Unable to create writer for location: " + outputPath, e); @@ -414,13 +419,20 @@ private int[] getWriterIndexes(Page page) return writerIndexes; } - private void closeWriter(DeltaLakeWriter writer) + private void closeWriter(int writerIndex) { + DeltaLakeWriter writer = writers.get(writerIndex); + long currentWritten = writer.getWrittenBytes(); long currentMemory = writer.getMemoryUsage(); - writer.commit(); + + closedWriterRollbackActions.add(writer.commit()); + writtenBytes += writer.getWrittenBytes() - currentWritten; - memoryUsage += writer.getMemoryUsage() - currentMemory; + memoryUsage -= currentMemory; + + writers.set(writerIndex, null); + try { DataFileInfo dataFileInfo = writer.getDataFileInfo(); dataFileInfos.add(wrappedBuffer(dataFileInfoCodec.toJsonBytes(dataFileInfo))); @@ -429,7 +441,6 @@ private void closeWriter(DeltaLakeWriter writer) LOG.warn("exception '%s' while finishing write on %s", e, writer); throw new TrinoException(DELTA_LAKE_BAD_WRITE, "Error committing Parquet file to Delta Lake", e); } - closedWriters.add(writer); } /** @@ -469,10 +480,7 @@ private FileWriter createParquetFileWriter(Path path) try { FileSystem fileSystem = hdfsEnvironment.getFileSystem(session.getIdentity(), path, conf); - Callable rollbackAction = () -> { - fileSystem.delete(path, false); - return null; - }; + Closeable rollbackAction = () -> fileSystem.delete(path, false); List parquetTypes = dataColumnTypes.stream() .map(type -> { diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeWriter.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeWriter.java index 5a1bac849163..a09d0fd9a091 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeWriter.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeWriter.java @@ -35,6 +35,7 @@ import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; import org.apache.parquet.hadoop.util.HadoopInputFile; +import java.io.Closeable; import java.io.IOException; import java.time.Instant; import java.util.Collection; @@ -139,9 +140,9 @@ public void appendRows(Page originalPage) } @Override - public void commit() + public Closeable commit() { - fileWriter.commit(); + return fileWriter.commit(); } @Override diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/FileWriter.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/FileWriter.java index 89ed9c7a3aec..9a719a5bdd02 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/FileWriter.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/FileWriter.java @@ -15,6 +15,7 @@ import io.trino.spi.Page; +import java.io.Closeable; import java.util.Optional; public interface FileWriter @@ -25,7 +26,10 @@ public interface FileWriter void appendRows(Page dataPage); - void commit(); + /** + * Commits written data. Returns rollback {@link Closeable} which can be used to cleanup on failure. + */ + Closeable commit(); void rollback(); diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePageSink.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePageSink.java index 1d2c669e9a9a..ee524bd925f1 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePageSink.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePageSink.java @@ -14,14 +14,13 @@ package io.trino.plugin.hive; import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; +import com.google.common.collect.Streams; import com.google.common.primitives.Ints; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import io.airlift.concurrent.MoreFutures; import io.airlift.json.JsonCodec; -import io.airlift.log.Logger; import io.airlift.slice.Slice; import io.trino.hdfs.HdfsEnvironment; import io.trino.plugin.hive.util.HiveBucketing.BucketingVersion; @@ -38,11 +37,13 @@ import it.unimi.dsi.fastutil.objects.Object2IntMap; import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap; +import java.io.Closeable; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.OptionalInt; import java.util.concurrent.Callable; @@ -51,6 +52,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Verify.verify; +import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.util.concurrent.MoreExecutors.directExecutor; import static io.airlift.slice.Slices.wrappedBuffer; import static io.trino.plugin.hive.HiveErrorCode.HIVE_TOO_MANY_OPEN_PARTITIONS; @@ -63,8 +65,6 @@ public class HivePageSink implements ConnectorPageSink, ConnectorMergeSink { - private static final Logger log = Logger.get(HivePageSink.class); - private static final int MAX_PAGE_POSITIONS = 4096; private final HiveWriterFactory writerFactory; @@ -89,7 +89,7 @@ public class HivePageSink private final ConnectorSession session; private final long targetMaxFileSize; - private final List closedWriters = new ArrayList<>(); + private final List closedWriterRollbackActions = new ArrayList<>(); private final List partitionUpdates = new ArrayList<>(); private final List> verificationTasks = new ArrayList<>(); @@ -218,17 +218,12 @@ private ListenableFuture> doMergeSinkFinish() private ListenableFuture> doInsertSinkFinish() { - for (HiveWriter writer : writers) { - closeWriter(writer); + for (int writerIndex = 0; writerIndex < writers.size(); writerIndex++) { + closeWriter(writerIndex); } - List result = ImmutableList.copyOf(partitionUpdates); + writers.clear(); - writtenBytes = closedWriters.stream() - .mapToLong(HiveWriter::getWrittenBytes) - .sum(); - validationCpuNanos = closedWriters.stream() - .mapToLong(HiveWriter::getValidationCpuNanos) - .sum(); + List result = ImmutableList.copyOf(partitionUpdates); if (verificationTasks.isEmpty()) { return Futures.immediateFuture(result); @@ -256,21 +251,27 @@ public void abort() private void doAbort() { - Optional rollbackException = Optional.empty(); - for (HiveWriter writer : Iterables.concat(writers, closedWriters)) { - // writers can contain nulls if an exception is thrown when doAppend expends the writer list - if (writer != null) { - try { - writer.rollback(); - } - catch (Exception e) { - log.warn("exception '%s' while rollback on %s", e, writer); - rollbackException = Optional.of(e); + List rollbackActions = Streams.concat( + writers.stream() + // writers can contain nulls if an exception is thrown when doAppend expends the writer list + .filter(Objects::nonNull) + .map(writer -> writer::rollback), + closedWriterRollbackActions.stream()) + .collect(toImmutableList()); + RuntimeException rollbackException = null; + for (Closeable rollbackAction : rollbackActions) { + try { + rollbackAction.close(); + } + catch (Throwable t) { + if (rollbackException == null) { + rollbackException = new TrinoException(HIVE_WRITER_CLOSE_ERROR, "Error rolling back write to Hive"); } + rollbackException.addSuppressed(t); } } - if (rollbackException.isPresent()) { - throw new TrinoException(HIVE_WRITER_CLOSE_ERROR, "Error rolling back write to Hive", rollbackException.get()); + if (rollbackException != null) { + throw rollbackException; } } @@ -349,15 +350,20 @@ private void writePage(Page page) } } - private void closeWriter(HiveWriter writer) + private void closeWriter(int writerIndex) { + HiveWriter writer = writers.get(writerIndex); + long currentWritten = writer.getWrittenBytes(); long currentMemory = writer.getMemoryUsage(); - writer.commit(); + + closedWriterRollbackActions.add(writer.commit()); + writtenBytes += (writer.getWrittenBytes() - currentWritten); - memoryUsage += (writer.getMemoryUsage() - currentMemory); + memoryUsage -= currentMemory; + validationCpuNanos += writer.getValidationCpuNanos(); - closedWriters.add(writer); + writers.set(writerIndex, null); PartitionUpdate partitionUpdate = writer.getPartitionUpdate(); partitionUpdates.add(wrappedBuffer(partitionUpdateCodec.toJsonBytes(partitionUpdate))); @@ -392,7 +398,7 @@ private int[] getWriterIndexes(Page page) continue; } // close current writer - closeWriter(writer); + closeWriter(writerIndex); } OptionalInt bucketNumber = OptionalInt.empty(); @@ -403,6 +409,7 @@ private int[] getWriterIndexes(Page page) writer = writerFactory.createWriter(partitionColumns, position, bucketNumber); writers.set(writerIndex, writer); + memoryUsage += writer.getMemoryUsage(); } verify(writers.size() == pagePartitioner.getMaxIndex() + 1); verify(!writers.contains(null)); diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveStorageFormat.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveStorageFormat.java index d74856376e59..a91f5c97c050 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveStorageFormat.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveStorageFormat.java @@ -61,12 +61,12 @@ public enum HiveStorageFormat OrcSerde.class.getName(), OrcInputFormat.class.getName(), OrcOutputFormat.class.getName(), - DataSize.of(256, Unit.MEGABYTE)), + DataSize.of(64, Unit.MEGABYTE)), PARQUET( ParquetHiveSerDe.class.getName(), MapredParquetInputFormat.class.getName(), MapredParquetOutputFormat.class.getName(), - DataSize.of(128, Unit.MEGABYTE)), + DataSize.of(64, Unit.MEGABYTE)), AVRO( AvroSerDe.class.getName(), AvroContainerInputFormat.class.getName(), diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveWriter.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveWriter.java index c4e89fd130dd..7628b45362c5 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveWriter.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveWriter.java @@ -17,6 +17,7 @@ import io.trino.plugin.hive.PartitionUpdate.UpdateMode; import io.trino.spi.Page; +import java.io.Closeable; import java.util.Optional; import java.util.function.Consumer; @@ -86,10 +87,11 @@ public void append(Page dataPage) inputSizeInBytes += dataPage.getSizeInBytes(); } - public void commit() + public Closeable commit() { - fileWriter.commit(); + Closeable rollbackAction = fileWriter.commit(); onCommit.accept(this); + return rollbackAction; } long getValidationCpuNanos() diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/MergeFileWriter.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/MergeFileWriter.java index 12b88c9240fc..ff591bc9d8e0 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/MergeFileWriter.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/MergeFileWriter.java @@ -14,6 +14,7 @@ package io.trino.plugin.hive; import com.google.common.annotations.VisibleForTesting; +import com.google.common.io.Closer; import io.trino.plugin.hive.HiveWriterFactory.RowIdSortingFileWriterMaker; import io.trino.plugin.hive.acid.AcidOperation; import io.trino.plugin.hive.acid.AcidTransaction; @@ -28,6 +29,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import java.io.Closeable; +import java.io.IOException; import java.util.List; import java.util.Optional; import java.util.OptionalInt; @@ -107,7 +110,7 @@ public static Page buildInsertPage(Page insertPage, long writeId, List !column.isPartitionKey() && !column.isHidden()) .map(column -> insertPage.getBlock(column.getBaseHiveColumnIndex())) .collect(toImmutableList()); - Block mergedColumnsBlock = RowBlock.fromFieldBlocks(positionCount, Optional.empty(), dataColumns.toArray(new Block[]{})); + Block mergedColumnsBlock = RowBlock.fromFieldBlocks(positionCount, Optional.empty(), dataColumns.toArray(new Block[] {})); Block currentTransactionBlock = RunLengthEncodedBlock.create(BIGINT, writeId, positionCount); Block[] blockArray = { RunLengthEncodedBlock.create(INSERT_OPERATION_BLOCK, positionCount), @@ -141,21 +144,28 @@ public long getMemoryUsage() } @Override - public void commit() + public Closeable commit() { - deleteFileWriter.ifPresent(FileWriter::commit); - insertFileWriter.ifPresent(FileWriter::commit); + Optional deleteRollbackAction = deleteFileWriter.map(FileWriter::commit); + Optional insertRollbackAction = insertFileWriter.map(FileWriter::commit); + return () -> { + try (Closer closer = Closer.create()) { + insertRollbackAction.ifPresent(closer::register); + deleteRollbackAction.ifPresent(closer::register); + } + }; } @Override public void rollback() { // Make sure both writers get rolled back - try { - deleteFileWriter.ifPresent(FileWriter::rollback); + try (Closer closer = Closer.create()) { + closer.register(() -> insertFileWriter.ifPresent(FileWriter::rollback)); + closer.register(() -> deleteFileWriter.ifPresent(FileWriter::rollback)); } - finally { - insertFileWriter.ifPresent(FileWriter::rollback); + catch (IOException e) { + throw new RuntimeException(e); } } diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/RcFileFileWriter.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/RcFileFileWriter.java index e798818c4e12..50ec569d9024 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/RcFileFileWriter.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/RcFileFileWriter.java @@ -29,6 +29,7 @@ import io.trino.spi.type.Type; import org.openjdk.jol.info.ClassLayout; +import java.io.Closeable; import java.io.IOException; import java.io.OutputStream; import java.io.UncheckedIOException; @@ -37,7 +38,6 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.concurrent.Callable; import java.util.function.Supplier; import static com.google.common.base.MoreObjects.toStringHelper; @@ -55,7 +55,7 @@ public class RcFileFileWriter private final CountingOutputStream outputStream; private final RcFileWriter rcFileWriter; - private final Callable rollbackAction; + private final Closeable rollbackAction; private final int[] fileInputColumnIndexes; private final List nullBlocks; private final Optional> validationInputFactory; @@ -64,7 +64,7 @@ public class RcFileFileWriter public RcFileFileWriter( OutputStream outputStream, - Callable rollbackAction, + Closeable rollbackAction, RcFileEncoding rcFileEncoding, List fileColumnTypes, Optional codecName, @@ -131,14 +131,14 @@ public void appendRows(Page dataPage) } @Override - public void commit() + public Closeable commit() { try { rcFileWriter.close(); } catch (IOException | UncheckedIOException e) { try { - rollbackAction.call(); + rollbackAction.close(); } catch (Exception ignored) { // ignore @@ -158,18 +158,15 @@ public void commit() throw new TrinoException(HIVE_WRITE_VALIDATION_FAILED, e); } } + + return rollbackAction; } @Override public void rollback() { - try { - try { - rcFileWriter.close(); - } - finally { - rollbackAction.call(); - } + try (rollbackAction) { + rcFileWriter.close(); } catch (Exception e) { throw new TrinoException(HIVE_WRITER_CLOSE_ERROR, "Error rolling back write to Hive", e); diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/RcFileFileWriterFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/RcFileFileWriterFactory.java index 49cb587c0454..a2e79414385b 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/RcFileFileWriterFactory.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/RcFileFileWriterFactory.java @@ -36,13 +36,13 @@ import javax.inject.Inject; +import java.io.Closeable; import java.io.IOException; import java.io.OutputStream; import java.util.List; import java.util.Optional; import java.util.OptionalInt; import java.util.Properties; -import java.util.concurrent.Callable; import java.util.function.Supplier; import static io.trino.plugin.hive.HiveErrorCode.HIVE_WRITER_OPEN_ERROR; @@ -152,10 +152,7 @@ else if (ColumnarSerDe.class.getName().equals(storageFormat.getSerde())) { }); } - Callable rollbackAction = () -> { - fileSystem.delete(path, false); - return null; - }; + Closeable rollbackAction = () -> fileSystem.delete(path, false); return Optional.of(new RcFileFileWriter( outputStream, diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/RecordFileWriter.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/RecordFileWriter.java index 0aff366281de..3d6345968400 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/RecordFileWriter.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/RecordFileWriter.java @@ -38,6 +38,7 @@ import org.joda.time.DateTimeZone; import org.openjdk.jol.info.ClassLayout; +import java.io.Closeable; import java.io.IOException; import java.io.UncheckedIOException; import java.util.List; @@ -192,7 +193,7 @@ public void appendRow(Page dataPage, int position) } @Override - public void commit() + public Closeable commit() { try { recordWriter.close(false); @@ -201,25 +202,27 @@ public void commit() catch (IOException e) { throw new TrinoException(HIVE_WRITER_CLOSE_ERROR, "Error committing write to Hive", e); } + + return createRollbackAction(path, conf); } @Override public void rollback() { - try { - try { - recordWriter.close(true); - } - finally { - // perform explicit deletion here as implementations of RecordWriter.close() often ignore the abort flag. - path.getFileSystem(conf).delete(path, false); - } + Closeable rollbackAction = createRollbackAction(path, conf); + try (rollbackAction) { + recordWriter.close(true); } catch (IOException e) { throw new TrinoException(HIVE_WRITER_CLOSE_ERROR, "Error rolling back write to Hive", e); } } + private static Closeable createRollbackAction(Path path, JobConf conf) + { + return () -> path.getFileSystem(conf).delete(path, false); + } + @Override public long getValidationCpuNanos() { diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/SortingFileWriter.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/SortingFileWriter.java index daae0d42dd86..f098c5d68b95 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/SortingFileWriter.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/SortingFileWriter.java @@ -37,6 +37,7 @@ import org.apache.hadoop.fs.Path; import org.openjdk.jol.info.ClassLayout; +import java.io.Closeable; import java.io.IOException; import java.io.UncheckedIOException; import java.util.ArrayList; @@ -128,14 +129,15 @@ public void appendRows(Page page) } @Override - public void commit() + public Closeable commit() { + Closeable rollbackAction = createRollbackAction(fileSystem, tempFiles); if (!sortBuffer.isEmpty()) { // skip temporary files entirely if the total output size is small if (tempFiles.isEmpty()) { sortBuffer.flushTo(outputWriter::appendRows); outputWriter.commit(); - return; + return rollbackAction; } flushToTempFile(); @@ -148,16 +150,30 @@ public void commit() catch (UncheckedIOException e) { throw new TrinoException(HIVE_WRITER_CLOSE_ERROR, "Error committing write to Hive", e); } + + return rollbackAction; } @Override public void rollback() { - for (TempFile file : tempFiles) { - cleanupFile(file.getPath()); + Closeable rollbackAction = createRollbackAction(fileSystem, tempFiles); + try (Closer closer = Closer.create()) { + closer.register(outputWriter::rollback); + closer.register(rollbackAction); + } + catch (IOException e) { + throw new RuntimeException(e); } + } - outputWriter.rollback(); + private static Closeable createRollbackAction(TrinoFileSystem fileSystem, Queue tempFiles) + { + return () -> { + for (TempFile file : tempFiles) { + cleanupFile(fileSystem, file.getPath()); + } + }; } @Override @@ -247,12 +263,12 @@ private void writeTempFile(Consumer consumer) tempFiles.add(new TempFile(tempFile, writer.getWrittenBytes())); } catch (IOException | UncheckedIOException e) { - cleanupFile(tempFile); + cleanupFile(fileSystem, tempFile); throw new TrinoException(HIVE_WRITER_DATA_ERROR, "Failed to write temporary file: " + tempFile, e); } } - private void cleanupFile(String file) + private static void cleanupFile(TrinoFileSystem fileSystem, String file) { try { fileSystem.deleteFile(file); diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OrcFileWriter.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OrcFileWriter.java index e791d6c821ee..c0e32909e28d 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OrcFileWriter.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OrcFileWriter.java @@ -38,6 +38,7 @@ import io.trino.spi.type.Type; import org.openjdk.jol.info.ClassLayout; +import java.io.Closeable; import java.io.IOException; import java.io.UncheckedIOException; import java.lang.management.ManagementFactory; @@ -48,7 +49,6 @@ import java.util.Optional; import java.util.OptionalInt; import java.util.OptionalLong; -import java.util.concurrent.Callable; import java.util.function.Supplier; import static com.google.common.base.MoreObjects.toStringHelper; @@ -74,7 +74,7 @@ public class OrcFileWriter private final AcidTransaction transaction; private final boolean useAcidSchema; private final OptionalInt bucketNumber; - private final Callable rollbackAction; + private final Closeable rollbackAction; private final int[] fileInputColumnIndexes; private final List nullBlocks; private final Optional> validationInputFactory; @@ -89,7 +89,7 @@ public OrcFileWriter( AcidTransaction transaction, boolean useAcidSchema, OptionalInt bucketNumber, - Callable rollbackAction, + Closeable rollbackAction, List columnNames, List fileColumnTypes, ColumnMetadata fileColumnOrcTypes, @@ -180,7 +180,7 @@ public void appendRows(Page dataPage) } @Override - public void commit() + public Closeable commit() { try { if (transaction.isAcidTransactionRunning() && useAcidSchema) { @@ -190,7 +190,7 @@ public void commit() } catch (IOException | UncheckedIOException e) { try { - rollbackAction.call(); + rollbackAction.close(); } catch (Exception ignored) { // ignore @@ -211,6 +211,8 @@ public void commit() throw new TrinoException(HIVE_WRITE_VALIDATION_FAILED, e); } } + + return rollbackAction; } private void updateUserMetadata() @@ -240,13 +242,8 @@ private void updateUserMetadata() @Override public void rollback() { - try { - try { - orcWriter.close(); - } - finally { - rollbackAction.call(); - } + try (rollbackAction) { + orcWriter.close(); } catch (Exception e) { throw new TrinoException(HIVE_WRITER_CLOSE_ERROR, "Error rolling back write to Hive", e); diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OrcFileWriterFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OrcFileWriterFactory.java index 37fbbfbd7178..dd0d97862849 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OrcFileWriterFactory.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OrcFileWriterFactory.java @@ -46,12 +46,12 @@ import javax.inject.Inject; +import java.io.Closeable; import java.io.IOException; import java.util.List; import java.util.Optional; import java.util.OptionalInt; import java.util.Properties; -import java.util.concurrent.Callable; import java.util.function.Supplier; import static io.trino.orc.metadata.OrcType.createRootOrcType; @@ -183,10 +183,7 @@ public Optional createFileWriter( }); } - Callable rollbackAction = () -> { - fileSystem.deleteFile(stringPath); - return null; - }; + Closeable rollbackAction = () -> fileSystem.deleteFile(stringPath); if (transaction.isInsert() && useAcidSchema) { // Only add the ACID columns if the request is for insert-type operations - - for delete operations, diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetFileWriter.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetFileWriter.java index 37f460b7b0db..d31e47a7c0b7 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetFileWriter.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetFileWriter.java @@ -29,6 +29,7 @@ import org.joda.time.DateTimeZone; import org.openjdk.jol.info.ClassLayout; +import java.io.Closeable; import java.io.IOException; import java.io.OutputStream; import java.io.UncheckedIOException; @@ -37,7 +38,6 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.concurrent.Callable; import java.util.function.Supplier; import static com.google.common.base.MoreObjects.toStringHelper; @@ -55,7 +55,7 @@ public class ParquetFileWriter private static final ThreadMXBean THREAD_MX_BEAN = ManagementFactory.getThreadMXBean(); private final ParquetWriter parquetWriter; - private final Callable rollbackAction; + private final Closeable rollbackAction; private final int[] fileInputColumnIndexes; private final List nullBlocks; private final Optional> validationInputFactory; @@ -63,7 +63,7 @@ public class ParquetFileWriter public ParquetFileWriter( OutputStream outputStream, - Callable rollbackAction, + Closeable rollbackAction, List fileColumnTypes, List fileColumnNames, MessageType messageType, @@ -138,14 +138,14 @@ public void appendRows(Page dataPage) } @Override - public void commit() + public Closeable commit() { try { parquetWriter.close(); } catch (IOException | UncheckedIOException e) { try { - rollbackAction.call(); + rollbackAction.close(); } catch (Exception ignored) { // ignore @@ -165,18 +165,15 @@ public void commit() throw new TrinoException(HIVE_WRITE_VALIDATION_FAILED, e); } } + + return rollbackAction; } @Override public void rollback() { - try { - try { - parquetWriter.close(); - } - finally { - rollbackAction.call(); - } + try (rollbackAction) { + parquetWriter.close(); } catch (Exception e) { throw new TrinoException(HIVE_WRITER_CLOSE_ERROR, "Error rolling back write parquet to Hive", e); diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetFileWriterFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetFileWriterFactory.java index 5f6d54eaf282..1e228b7b892f 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetFileWriterFactory.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetFileWriterFactory.java @@ -44,12 +44,12 @@ import javax.inject.Inject; +import java.io.Closeable; import java.io.IOException; import java.util.List; import java.util.Optional; import java.util.OptionalInt; import java.util.Properties; -import java.util.concurrent.Callable; import java.util.function.Supplier; import static io.trino.parquet.writer.ParquetSchemaConverter.HIVE_PARQUET_USE_INT96_TIMESTAMP_ENCODING; @@ -128,10 +128,7 @@ public Optional createFileWriter( try { FileSystem fileSystem = hdfsEnvironment.getFileSystem(session.getIdentity(), path, conf); - Callable rollbackAction = () -> { - fileSystem.delete(path, false); - return null; - }; + Closeable rollbackAction = () -> fileSystem.delete(path, false); ParquetSchemaConverter schemaConverter = new ParquetSchemaConverter( fileColumnTypes, diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergAvroFileWriter.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergAvroFileWriter.java index 688912c2a538..075bda00543f 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergAvroFileWriter.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergAvroFileWriter.java @@ -27,9 +27,9 @@ import org.apache.iceberg.io.OutputFile; import org.openjdk.jol.info.ClassLayout; +import java.io.Closeable; import java.io.IOException; import java.util.List; -import java.util.concurrent.Callable; import static io.trino.plugin.iceberg.IcebergAvroDataConversion.toIcebergRecords; import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_WRITER_CLOSE_ERROR; @@ -50,11 +50,11 @@ public class IcebergAvroFileWriter private final Schema icebergSchema; private final List types; private final FileAppender avroWriter; - private final Callable rollbackAction; + private final Closeable rollbackAction; public IcebergAvroFileWriter( OutputFile file, - Callable rollbackAction, + Closeable rollbackAction, Schema icebergSchema, List types, HiveCompressionCodec hiveCompressionCodec) @@ -97,14 +97,14 @@ public void appendRows(Page dataPage) } @Override - public void commit() + public Closeable commit() { try { avroWriter.close(); } catch (IOException e) { try { - rollbackAction.call(); + rollbackAction.close(); } catch (Exception ex) { if (!e.equals(ex)) { @@ -113,18 +113,15 @@ public void commit() } throw new TrinoException(ICEBERG_WRITER_CLOSE_ERROR, "Error closing Avro file", e); } + + return rollbackAction; } @Override public void rollback() { - try { - try { - avroWriter.close(); - } - finally { - rollbackAction.call(); - } + try (rollbackAction) { + avroWriter.close(); } catch (Exception e) { throw new TrinoException(ICEBERG_WRITER_CLOSE_ERROR, "Error rolling back write to Avro file", e); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergFileWriterFactory.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergFileWriterFactory.java index 52c370983911..0b16a0bb0e10 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergFileWriterFactory.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergFileWriterFactory.java @@ -41,12 +41,12 @@ import javax.inject.Inject; +import java.io.Closeable; import java.io.IOException; import java.io.OutputStream; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.concurrent.Callable; import java.util.function.Supplier; import java.util.stream.IntStream; @@ -172,10 +172,7 @@ private IcebergFileWriter createParquetWriter( try { OutputStream outputStream = fileSystem.newOutputFile(outputPath).create(); - Callable rollbackAction = () -> { - fileSystem.deleteFile(outputPath); - return null; - }; + Closeable rollbackAction = () -> fileSystem.deleteFile(outputPath); ParquetWriterOptions parquetWriterOptions = ParquetWriterOptions.builder() .setMaxPageSize(getParquetWriterPageSize(session)) @@ -215,10 +212,7 @@ private IcebergFileWriter createOrcWriter( try { OrcDataSink orcDataSink = new OutputStreamOrcDataSink(fileSystem.newOutputFile(outputPath).create()); - Callable rollbackAction = () -> { - fileSystem.deleteFile(outputPath); - return null; - }; + Closeable rollbackAction = () -> fileSystem.deleteFile(outputPath); List columnFields = icebergSchema.columns(); List fileColumnNames = columnFields.stream() @@ -297,10 +291,7 @@ private IcebergFileWriter createAvroWriter( Schema icebergSchema, ConnectorSession session) { - Callable rollbackAction = () -> { - fileIo.deleteFile(outputPath); - return null; - }; + Closeable rollbackAction = () -> fileIo.deleteFile(outputPath); List columnTypes = icebergSchema.columns().stream() .map(column -> toTrinoType(column.type(), typeManager)) diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergOrcFileWriter.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergOrcFileWriter.java index 3a58e3711da6..2e44e34d2398 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergOrcFileWriter.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergOrcFileWriter.java @@ -47,6 +47,7 @@ import org.apache.iceberg.util.BinaryUtil; import org.apache.iceberg.util.UnicodeUtil; +import java.io.Closeable; import java.math.BigDecimal; import java.nio.ByteBuffer; import java.util.List; @@ -54,7 +55,6 @@ import java.util.Optional; import java.util.OptionalInt; import java.util.Set; -import java.util.concurrent.Callable; import java.util.function.Supplier; import static com.google.common.base.Verify.verify; @@ -77,7 +77,7 @@ public IcebergOrcFileWriter( MetricsConfig metricsConfig, Schema icebergSchema, OrcDataSink orcDataSink, - Callable rollbackAction, + Closeable rollbackAction, List columnNames, List fileColumnTypes, ColumnMetadata fileColumnOrcTypes, diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSink.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSink.java index 6ac967b91b00..5c9bc83a8a6a 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSink.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSink.java @@ -14,7 +14,7 @@ package io.trino.plugin.iceberg; import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; +import com.google.common.collect.Streams; import io.airlift.json.JsonCodec; import io.airlift.slice.Slice; import io.trino.filesystem.TrinoFileSystem; @@ -47,11 +47,13 @@ import org.apache.iceberg.io.LocationProvider; import org.apache.iceberg.transforms.Transform; +import java.io.Closeable; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.function.Function; @@ -98,7 +100,7 @@ public class IcebergPageSink private final Map storageProperties; private final List writers = new ArrayList<>(); - private final List closedWriters = new ArrayList<>(); + private final List closedWriterRollbackActions = new ArrayList<>(); private final Collection commitTasks = new ArrayList<>(); private long writtenBytes; @@ -163,16 +165,10 @@ public CompletableFuture appendPage(Page page) @Override public CompletableFuture> finish() { - for (WriteContext context : writers) { - closeWriter(context); + for (int writerIndex = 0; writerIndex < writers.size(); writerIndex++) { + closeWriter(writerIndex); } - - writtenBytes = closedWriters.stream() - .mapToLong(writer -> writer.getWriter().getWrittenBytes()) - .sum(); - validationCpuNanos = closedWriters.stream() - .mapToLong(writer -> writer.getWriter().getValidationCpuNanos()) - .sum(); + writers.clear(); return completedFuture(commitTasks); } @@ -180,12 +176,16 @@ public CompletableFuture> finish() @Override public void abort() { + List rollbackActions = Streams.concat( + writers.stream() + .filter(Objects::nonNull) + .map(writer -> writer::rollback), + closedWriterRollbackActions.stream()) + .collect(toImmutableList()); RuntimeException error = null; - for (WriteContext context : Iterables.concat(writers, closedWriters)) { + for (Closeable rollbackAction : rollbackActions) { try { - if (context != null) { - context.getWriter().rollback(); - } + rollbackAction.close(); } catch (Throwable t) { if (error == null) { @@ -282,13 +282,14 @@ private int[] getWriterIndexes(Page page) if (writer.getWrittenBytes() <= targetMaxFileSize) { continue; } - closeWriter(writer); + closeWriter(writerIndex); } Optional partitionData = getPartitionData(pagePartitioner.getColumns(), page, position); writer = createWriter(partitionData); writers.set(writerIndex, writer); + memoryUsage += writer.getWriter().getMemoryUsage(); } verify(writers.size() == pagePartitioner.getMaxIndex() + 1); verify(!writers.contains(null)); @@ -296,19 +297,27 @@ private int[] getWriterIndexes(Page page) return writerIndexes; } - private void closeWriter(WriteContext writeContext) + private void closeWriter(int writerIndex) { - long currentWritten = writeContext.getWriter().getWrittenBytes(); - long currentMemory = writeContext.getWriter().getMemoryUsage(); - writeContext.getWriter().commit(); - writtenBytes += (writeContext.getWriter().getWrittenBytes() - currentWritten); - memoryUsage += (writeContext.getWriter().getMemoryUsage() - currentMemory); + WriteContext writeContext = writers.get(writerIndex); + IcebergFileWriter writer = writeContext.getWriter(); + + long currentWritten = writer.getWrittenBytes(); + long currentMemory = writer.getMemoryUsage(); + + closedWriterRollbackActions.add(writer.commit()); + + validationCpuNanos += writer.getValidationCpuNanos(); + writtenBytes += (writer.getWrittenBytes() - currentWritten); + memoryUsage -= currentMemory; + + writers.set(writerIndex, null); CommitTaskData task = new CommitTaskData( - writeContext.getPath().toString(), + writeContext.getPath(), fileFormat, - writeContext.getWriter().getWrittenBytes(), - new MetricsWrapper(writeContext.getWriter().getMetrics()), + writer.getWrittenBytes(), + new MetricsWrapper(writer.getMetrics()), PartitionSpecParser.toJson(partitionSpec), writeContext.getPartitionData().map(PartitionData::toJson), DATA, @@ -317,8 +326,6 @@ private void closeWriter(WriteContext writeContext) Optional.empty()); commitTasks.add(wrappedBuffer(jsonCodec.toJsonBytes(task))); - - closedWriters.add(writeContext); } private WriteContext createWriter(Optional partitionData) @@ -459,6 +466,11 @@ public long getWrittenBytes() { return writer.getWrittenBytes(); } + + public void rollback() + { + writer.rollback(); + } } private static class PagePartitioner diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergParquetFileWriter.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergParquetFileWriter.java index c4d7ec0e34d2..6664a326341c 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergParquetFileWriter.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergParquetFileWriter.java @@ -23,11 +23,11 @@ import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.schema.MessageType; +import java.io.Closeable; import java.io.OutputStream; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.concurrent.Callable; import static java.util.Objects.requireNonNull; import static org.apache.iceberg.parquet.ParquetUtil.fileMetrics; @@ -43,7 +43,7 @@ public class IcebergParquetFileWriter public IcebergParquetFileWriter( MetricsConfig metricsConfig, OutputStream outputStream, - Callable rollbackAction, + Closeable rollbackAction, List fileColumnTypes, List fileColumnNames, MessageType messageType,