From 879f1662ccc590bb9aaf6beff8fa15ec6a31e5f6 Mon Sep 17 00:00:00 2001 From: Karol Sobczak Date: Fri, 28 Oct 2022 18:21:44 +0200 Subject: [PATCH 1/7] Fix formatting --- .../main/java/io/trino/plugin/deltalake/DeltaLakeMergeSink.java | 2 +- .../src/main/java/io/trino/plugin/hive/MergeFileWriter.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMergeSink.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMergeSink.java index 0f521597ce5a..7d2be8f778ed 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMergeSink.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMergeSink.java @@ -170,7 +170,7 @@ public CompletableFuture> finish() return completedFuture(fragments); } - // In spite of the name "Delta" Lake, we must rewrite the entire file to delete rows. + // In spite of the name "Delta" Lake, we must rewrite the entire file to delete rows. private List rewriteFile(Path sourcePath, FileDeletion deletion) { try { diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/MergeFileWriter.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/MergeFileWriter.java index 12b88c9240fc..338eeac82a5d 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/MergeFileWriter.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/MergeFileWriter.java @@ -107,7 +107,7 @@ public static Page buildInsertPage(Page insertPage, long writeId, List !column.isPartitionKey() && !column.isHidden()) .map(column -> insertPage.getBlock(column.getBaseHiveColumnIndex())) .collect(toImmutableList()); - Block mergedColumnsBlock = RowBlock.fromFieldBlocks(positionCount, Optional.empty(), dataColumns.toArray(new Block[]{})); + Block mergedColumnsBlock = RowBlock.fromFieldBlocks(positionCount, Optional.empty(), dataColumns.toArray(new Block[] {})); Block currentTransactionBlock = RunLengthEncodedBlock.create(BIGINT, writeId, positionCount); Block[] blockArray = { RunLengthEncodedBlock.create(INSERT_OPERATION_BLOCK, positionCount), From 01fc0b19e58d8dfd4563ae4b7e4a6ce047b8bc46 Mon Sep 17 00:00:00 2001 From: Karol Sobczak Date: Mon, 31 Oct 2022 10:26:18 +0100 Subject: [PATCH 2/7] Make rollbackAction Closeable --- .../plugin/deltalake/DeltaLakeMergeSink.java | 7 ++----- .../plugin/deltalake/DeltaLakePageSink.java | 7 ++----- .../io/trino/plugin/hive/RcFileFileWriter.java | 10 +++++----- .../plugin/hive/RcFileFileWriterFactory.java | 7 ++----- .../io/trino/plugin/hive/orc/OrcFileWriter.java | 10 +++++----- .../plugin/hive/orc/OrcFileWriterFactory.java | 7 ++----- .../plugin/hive/parquet/ParquetFileWriter.java | 10 +++++----- .../hive/parquet/ParquetFileWriterFactory.java | 7 ++----- .../plugin/iceberg/IcebergAvroFileWriter.java | 10 +++++----- .../iceberg/IcebergFileWriterFactory.java | 17 ++++------------- .../plugin/iceberg/IcebergOrcFileWriter.java | 4 ++-- .../iceberg/IcebergParquetFileWriter.java | 4 ++-- 12 files changed, 38 insertions(+), 62 deletions(-) diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMergeSink.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMergeSink.java index 7d2be8f778ed..a976fcce7e16 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMergeSink.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMergeSink.java @@ -48,6 +48,7 @@ import org.roaringbitmap.longlong.LongBitmapDataProvider; import org.roaringbitmap.longlong.Roaring64Bitmap; +import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; @@ -55,7 +56,6 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; import java.util.stream.IntStream; @@ -210,10 +210,7 @@ private FileWriter createParquetFileWriter(FileSystem fileSystem, Path path, Lis CompressionCodecName compressionCodecName = getCompressionCodec(session).getParquetCompressionCodec(); try { - Callable rollbackAction = () -> { - fileSystem.delete(path, false); - return null; - }; + Closeable rollbackAction = () -> fileSystem.delete(path, false); List parquetTypes = dataColumns.stream() .map(column -> { diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSink.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSink.java index b4d0cf0218c1..e73c0c3a6f52 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSink.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSink.java @@ -49,6 +49,7 @@ import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.joda.time.DateTimeZone; +import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; @@ -57,7 +58,6 @@ import java.util.Map; import java.util.Optional; import java.util.Properties; -import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; import static com.google.common.base.Verify.verify; @@ -469,10 +469,7 @@ private FileWriter createParquetFileWriter(Path path) try { FileSystem fileSystem = hdfsEnvironment.getFileSystem(session.getIdentity(), path, conf); - Callable rollbackAction = () -> { - fileSystem.delete(path, false); - return null; - }; + Closeable rollbackAction = () -> fileSystem.delete(path, false); List parquetTypes = dataColumnTypes.stream() .map(type -> { diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/RcFileFileWriter.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/RcFileFileWriter.java index e798818c4e12..bec99668ed04 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/RcFileFileWriter.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/RcFileFileWriter.java @@ -29,6 +29,7 @@ import io.trino.spi.type.Type; import org.openjdk.jol.info.ClassLayout; +import java.io.Closeable; import java.io.IOException; import java.io.OutputStream; import java.io.UncheckedIOException; @@ -37,7 +38,6 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.concurrent.Callable; import java.util.function.Supplier; import static com.google.common.base.MoreObjects.toStringHelper; @@ -55,7 +55,7 @@ public class RcFileFileWriter private final CountingOutputStream outputStream; private final RcFileWriter rcFileWriter; - private final Callable rollbackAction; + private final Closeable rollbackAction; private final int[] fileInputColumnIndexes; private final List nullBlocks; private final Optional> validationInputFactory; @@ -64,7 +64,7 @@ public class RcFileFileWriter public RcFileFileWriter( OutputStream outputStream, - Callable rollbackAction, + Closeable rollbackAction, RcFileEncoding rcFileEncoding, List fileColumnTypes, Optional codecName, @@ -138,7 +138,7 @@ public void commit() } catch (IOException | UncheckedIOException e) { try { - rollbackAction.call(); + rollbackAction.close(); } catch (Exception ignored) { // ignore @@ -168,7 +168,7 @@ public void rollback() rcFileWriter.close(); } finally { - rollbackAction.call(); + rollbackAction.close(); } } catch (Exception e) { diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/RcFileFileWriterFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/RcFileFileWriterFactory.java index 49cb587c0454..a2e79414385b 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/RcFileFileWriterFactory.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/RcFileFileWriterFactory.java @@ -36,13 +36,13 @@ import javax.inject.Inject; +import java.io.Closeable; import java.io.IOException; import java.io.OutputStream; import java.util.List; import java.util.Optional; import java.util.OptionalInt; import java.util.Properties; -import java.util.concurrent.Callable; import java.util.function.Supplier; import static io.trino.plugin.hive.HiveErrorCode.HIVE_WRITER_OPEN_ERROR; @@ -152,10 +152,7 @@ else if (ColumnarSerDe.class.getName().equals(storageFormat.getSerde())) { }); } - Callable rollbackAction = () -> { - fileSystem.delete(path, false); - return null; - }; + Closeable rollbackAction = () -> fileSystem.delete(path, false); return Optional.of(new RcFileFileWriter( outputStream, diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OrcFileWriter.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OrcFileWriter.java index e791d6c821ee..2c7d5dd9c134 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OrcFileWriter.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OrcFileWriter.java @@ -38,6 +38,7 @@ import io.trino.spi.type.Type; import org.openjdk.jol.info.ClassLayout; +import java.io.Closeable; import java.io.IOException; import java.io.UncheckedIOException; import java.lang.management.ManagementFactory; @@ -48,7 +49,6 @@ import java.util.Optional; import java.util.OptionalInt; import java.util.OptionalLong; -import java.util.concurrent.Callable; import java.util.function.Supplier; import static com.google.common.base.MoreObjects.toStringHelper; @@ -74,7 +74,7 @@ public class OrcFileWriter private final AcidTransaction transaction; private final boolean useAcidSchema; private final OptionalInt bucketNumber; - private final Callable rollbackAction; + private final Closeable rollbackAction; private final int[] fileInputColumnIndexes; private final List nullBlocks; private final Optional> validationInputFactory; @@ -89,7 +89,7 @@ public OrcFileWriter( AcidTransaction transaction, boolean useAcidSchema, OptionalInt bucketNumber, - Callable rollbackAction, + Closeable rollbackAction, List columnNames, List fileColumnTypes, ColumnMetadata fileColumnOrcTypes, @@ -190,7 +190,7 @@ public void commit() } catch (IOException | UncheckedIOException e) { try { - rollbackAction.call(); + rollbackAction.close(); } catch (Exception ignored) { // ignore @@ -245,7 +245,7 @@ public void rollback() orcWriter.close(); } finally { - rollbackAction.call(); + rollbackAction.close(); } } catch (Exception e) { diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OrcFileWriterFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OrcFileWriterFactory.java index 37fbbfbd7178..dd0d97862849 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OrcFileWriterFactory.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OrcFileWriterFactory.java @@ -46,12 +46,12 @@ import javax.inject.Inject; +import java.io.Closeable; import java.io.IOException; import java.util.List; import java.util.Optional; import java.util.OptionalInt; import java.util.Properties; -import java.util.concurrent.Callable; import java.util.function.Supplier; import static io.trino.orc.metadata.OrcType.createRootOrcType; @@ -183,10 +183,7 @@ public Optional createFileWriter( }); } - Callable rollbackAction = () -> { - fileSystem.deleteFile(stringPath); - return null; - }; + Closeable rollbackAction = () -> fileSystem.deleteFile(stringPath); if (transaction.isInsert() && useAcidSchema) { // Only add the ACID columns if the request is for insert-type operations - - for delete operations, diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetFileWriter.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetFileWriter.java index 37f460b7b0db..9f40c68c6fed 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetFileWriter.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetFileWriter.java @@ -29,6 +29,7 @@ import org.joda.time.DateTimeZone; import org.openjdk.jol.info.ClassLayout; +import java.io.Closeable; import java.io.IOException; import java.io.OutputStream; import java.io.UncheckedIOException; @@ -37,7 +38,6 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.concurrent.Callable; import java.util.function.Supplier; import static com.google.common.base.MoreObjects.toStringHelper; @@ -55,7 +55,7 @@ public class ParquetFileWriter private static final ThreadMXBean THREAD_MX_BEAN = ManagementFactory.getThreadMXBean(); private final ParquetWriter parquetWriter; - private final Callable rollbackAction; + private final Closeable rollbackAction; private final int[] fileInputColumnIndexes; private final List nullBlocks; private final Optional> validationInputFactory; @@ -63,7 +63,7 @@ public class ParquetFileWriter public ParquetFileWriter( OutputStream outputStream, - Callable rollbackAction, + Closeable rollbackAction, List fileColumnTypes, List fileColumnNames, MessageType messageType, @@ -145,7 +145,7 @@ public void commit() } catch (IOException | UncheckedIOException e) { try { - rollbackAction.call(); + rollbackAction.close(); } catch (Exception ignored) { // ignore @@ -175,7 +175,7 @@ public void rollback() parquetWriter.close(); } finally { - rollbackAction.call(); + rollbackAction.close(); } } catch (Exception e) { diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetFileWriterFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetFileWriterFactory.java index 5f6d54eaf282..1e228b7b892f 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetFileWriterFactory.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetFileWriterFactory.java @@ -44,12 +44,12 @@ import javax.inject.Inject; +import java.io.Closeable; import java.io.IOException; import java.util.List; import java.util.Optional; import java.util.OptionalInt; import java.util.Properties; -import java.util.concurrent.Callable; import java.util.function.Supplier; import static io.trino.parquet.writer.ParquetSchemaConverter.HIVE_PARQUET_USE_INT96_TIMESTAMP_ENCODING; @@ -128,10 +128,7 @@ public Optional createFileWriter( try { FileSystem fileSystem = hdfsEnvironment.getFileSystem(session.getIdentity(), path, conf); - Callable rollbackAction = () -> { - fileSystem.delete(path, false); - return null; - }; + Closeable rollbackAction = () -> fileSystem.delete(path, false); ParquetSchemaConverter schemaConverter = new ParquetSchemaConverter( fileColumnTypes, diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergAvroFileWriter.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergAvroFileWriter.java index 688912c2a538..185847d9f64a 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergAvroFileWriter.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergAvroFileWriter.java @@ -27,9 +27,9 @@ import org.apache.iceberg.io.OutputFile; import org.openjdk.jol.info.ClassLayout; +import java.io.Closeable; import java.io.IOException; import java.util.List; -import java.util.concurrent.Callable; import static io.trino.plugin.iceberg.IcebergAvroDataConversion.toIcebergRecords; import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_WRITER_CLOSE_ERROR; @@ -50,11 +50,11 @@ public class IcebergAvroFileWriter private final Schema icebergSchema; private final List types; private final FileAppender avroWriter; - private final Callable rollbackAction; + private final Closeable rollbackAction; public IcebergAvroFileWriter( OutputFile file, - Callable rollbackAction, + Closeable rollbackAction, Schema icebergSchema, List types, HiveCompressionCodec hiveCompressionCodec) @@ -104,7 +104,7 @@ public void commit() } catch (IOException e) { try { - rollbackAction.call(); + rollbackAction.close(); } catch (Exception ex) { if (!e.equals(ex)) { @@ -123,7 +123,7 @@ public void rollback() avroWriter.close(); } finally { - rollbackAction.call(); + rollbackAction.close(); } } catch (Exception e) { diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergFileWriterFactory.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergFileWriterFactory.java index 52c370983911..0b16a0bb0e10 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergFileWriterFactory.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergFileWriterFactory.java @@ -41,12 +41,12 @@ import javax.inject.Inject; +import java.io.Closeable; import java.io.IOException; import java.io.OutputStream; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.concurrent.Callable; import java.util.function.Supplier; import java.util.stream.IntStream; @@ -172,10 +172,7 @@ private IcebergFileWriter createParquetWriter( try { OutputStream outputStream = fileSystem.newOutputFile(outputPath).create(); - Callable rollbackAction = () -> { - fileSystem.deleteFile(outputPath); - return null; - }; + Closeable rollbackAction = () -> fileSystem.deleteFile(outputPath); ParquetWriterOptions parquetWriterOptions = ParquetWriterOptions.builder() .setMaxPageSize(getParquetWriterPageSize(session)) @@ -215,10 +212,7 @@ private IcebergFileWriter createOrcWriter( try { OrcDataSink orcDataSink = new OutputStreamOrcDataSink(fileSystem.newOutputFile(outputPath).create()); - Callable rollbackAction = () -> { - fileSystem.deleteFile(outputPath); - return null; - }; + Closeable rollbackAction = () -> fileSystem.deleteFile(outputPath); List columnFields = icebergSchema.columns(); List fileColumnNames = columnFields.stream() @@ -297,10 +291,7 @@ private IcebergFileWriter createAvroWriter( Schema icebergSchema, ConnectorSession session) { - Callable rollbackAction = () -> { - fileIo.deleteFile(outputPath); - return null; - }; + Closeable rollbackAction = () -> fileIo.deleteFile(outputPath); List columnTypes = icebergSchema.columns().stream() .map(column -> toTrinoType(column.type(), typeManager)) diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergOrcFileWriter.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergOrcFileWriter.java index 3a58e3711da6..2e44e34d2398 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergOrcFileWriter.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergOrcFileWriter.java @@ -47,6 +47,7 @@ import org.apache.iceberg.util.BinaryUtil; import org.apache.iceberg.util.UnicodeUtil; +import java.io.Closeable; import java.math.BigDecimal; import java.nio.ByteBuffer; import java.util.List; @@ -54,7 +55,6 @@ import java.util.Optional; import java.util.OptionalInt; import java.util.Set; -import java.util.concurrent.Callable; import java.util.function.Supplier; import static com.google.common.base.Verify.verify; @@ -77,7 +77,7 @@ public IcebergOrcFileWriter( MetricsConfig metricsConfig, Schema icebergSchema, OrcDataSink orcDataSink, - Callable rollbackAction, + Closeable rollbackAction, List columnNames, List fileColumnTypes, ColumnMetadata fileColumnOrcTypes, diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergParquetFileWriter.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergParquetFileWriter.java index c4d7ec0e34d2..6664a326341c 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergParquetFileWriter.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergParquetFileWriter.java @@ -23,11 +23,11 @@ import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.schema.MessageType; +import java.io.Closeable; import java.io.OutputStream; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.concurrent.Callable; import static java.util.Objects.requireNonNull; import static org.apache.iceberg.parquet.ParquetUtil.fileMetrics; @@ -43,7 +43,7 @@ public class IcebergParquetFileWriter public IcebergParquetFileWriter( MetricsConfig metricsConfig, OutputStream outputStream, - Callable rollbackAction, + Closeable rollbackAction, List fileColumnTypes, List fileColumnNames, MessageType messageType, From 09b5d7a5a83020a05c88cdac09185e957da83d55 Mon Sep 17 00:00:00 2001 From: Karol Sobczak Date: Mon, 31 Oct 2022 11:08:55 +0100 Subject: [PATCH 3/7] Make sure rollback exceptions are not ignored --- .../io/trino/plugin/hive/MergeFileWriter.java | 11 +++++---- .../trino/plugin/hive/RcFileFileWriter.java | 9 ++------ .../trino/plugin/hive/RecordFileWriter.java | 17 +++++++------- .../trino/plugin/hive/SortingFileWriter.java | 23 +++++++++++++++---- .../trino/plugin/hive/orc/OrcFileWriter.java | 9 ++------ .../hive/parquet/ParquetFileWriter.java | 9 ++------ .../plugin/iceberg/IcebergAvroFileWriter.java | 9 ++------ 7 files changed, 42 insertions(+), 45 deletions(-) diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/MergeFileWriter.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/MergeFileWriter.java index 338eeac82a5d..d7970de1ad37 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/MergeFileWriter.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/MergeFileWriter.java @@ -14,6 +14,7 @@ package io.trino.plugin.hive; import com.google.common.annotations.VisibleForTesting; +import com.google.common.io.Closer; import io.trino.plugin.hive.HiveWriterFactory.RowIdSortingFileWriterMaker; import io.trino.plugin.hive.acid.AcidOperation; import io.trino.plugin.hive.acid.AcidTransaction; @@ -28,6 +29,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import java.io.IOException; import java.util.List; import java.util.Optional; import java.util.OptionalInt; @@ -151,11 +153,12 @@ public void commit() public void rollback() { // Make sure both writers get rolled back - try { - deleteFileWriter.ifPresent(FileWriter::rollback); + try (Closer closer = Closer.create()) { + closer.register(() -> insertFileWriter.ifPresent(FileWriter::rollback)); + closer.register(() -> deleteFileWriter.ifPresent(FileWriter::rollback)); } - finally { - insertFileWriter.ifPresent(FileWriter::rollback); + catch (IOException e) { + throw new RuntimeException(e); } } diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/RcFileFileWriter.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/RcFileFileWriter.java index bec99668ed04..f64be8349d5c 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/RcFileFileWriter.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/RcFileFileWriter.java @@ -163,13 +163,8 @@ public void commit() @Override public void rollback() { - try { - try { - rcFileWriter.close(); - } - finally { - rollbackAction.close(); - } + try (rollbackAction) { + rcFileWriter.close(); } catch (Exception e) { throw new TrinoException(HIVE_WRITER_CLOSE_ERROR, "Error rolling back write to Hive", e); diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/RecordFileWriter.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/RecordFileWriter.java index 0aff366281de..d186679c01af 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/RecordFileWriter.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/RecordFileWriter.java @@ -38,6 +38,7 @@ import org.joda.time.DateTimeZone; import org.openjdk.jol.info.ClassLayout; +import java.io.Closeable; import java.io.IOException; import java.io.UncheckedIOException; import java.util.List; @@ -206,20 +207,20 @@ public void commit() @Override public void rollback() { - try { - try { - recordWriter.close(true); - } - finally { - // perform explicit deletion here as implementations of RecordWriter.close() often ignore the abort flag. - path.getFileSystem(conf).delete(path, false); - } + Closeable rollbackAction = createRollbackAction(path, conf); + try (rollbackAction) { + recordWriter.close(true); } catch (IOException e) { throw new TrinoException(HIVE_WRITER_CLOSE_ERROR, "Error rolling back write to Hive", e); } } + private static Closeable createRollbackAction(Path path, JobConf conf) + { + return () -> path.getFileSystem(conf).delete(path, false); + } + @Override public long getValidationCpuNanos() { diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/SortingFileWriter.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/SortingFileWriter.java index daae0d42dd86..2ea0b63aeeae 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/SortingFileWriter.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/SortingFileWriter.java @@ -37,6 +37,7 @@ import org.apache.hadoop.fs.Path; import org.openjdk.jol.info.ClassLayout; +import java.io.Closeable; import java.io.IOException; import java.io.UncheckedIOException; import java.util.ArrayList; @@ -153,11 +154,23 @@ public void commit() @Override public void rollback() { - for (TempFile file : tempFiles) { - cleanupFile(file.getPath()); + Closeable rollbackAction = createRollbackAction(fileSystem, tempFiles); + try (Closer closer = Closer.create()) { + closer.register(outputWriter::rollback); + closer.register(rollbackAction); + } + catch (IOException e) { + throw new RuntimeException(e); } + } - outputWriter.rollback(); + private static Closeable createRollbackAction(TrinoFileSystem fileSystem, Queue tempFiles) + { + return () -> { + for (TempFile file : tempFiles) { + cleanupFile(fileSystem, file.getPath()); + } + }; } @Override @@ -247,12 +260,12 @@ private void writeTempFile(Consumer consumer) tempFiles.add(new TempFile(tempFile, writer.getWrittenBytes())); } catch (IOException | UncheckedIOException e) { - cleanupFile(tempFile); + cleanupFile(fileSystem, tempFile); throw new TrinoException(HIVE_WRITER_DATA_ERROR, "Failed to write temporary file: " + tempFile, e); } } - private void cleanupFile(String file) + private static void cleanupFile(TrinoFileSystem fileSystem, String file) { try { fileSystem.deleteFile(file); diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OrcFileWriter.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OrcFileWriter.java index 2c7d5dd9c134..65cc59097317 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OrcFileWriter.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OrcFileWriter.java @@ -240,13 +240,8 @@ private void updateUserMetadata() @Override public void rollback() { - try { - try { - orcWriter.close(); - } - finally { - rollbackAction.close(); - } + try (rollbackAction) { + orcWriter.close(); } catch (Exception e) { throw new TrinoException(HIVE_WRITER_CLOSE_ERROR, "Error rolling back write to Hive", e); diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetFileWriter.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetFileWriter.java index 9f40c68c6fed..d9c503eba7a6 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetFileWriter.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetFileWriter.java @@ -170,13 +170,8 @@ public void commit() @Override public void rollback() { - try { - try { - parquetWriter.close(); - } - finally { - rollbackAction.close(); - } + try (rollbackAction) { + parquetWriter.close(); } catch (Exception e) { throw new TrinoException(HIVE_WRITER_CLOSE_ERROR, "Error rolling back write parquet to Hive", e); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergAvroFileWriter.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergAvroFileWriter.java index 185847d9f64a..0a991b7b48a5 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergAvroFileWriter.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergAvroFileWriter.java @@ -118,13 +118,8 @@ public void commit() @Override public void rollback() { - try { - try { - avroWriter.close(); - } - finally { - rollbackAction.close(); - } + try (rollbackAction) { + avroWriter.close(); } catch (Exception e) { throw new TrinoException(ICEBERG_WRITER_CLOSE_ERROR, "Error rolling back write to Avro file", e); From 8c61c04d5c8596e31dc4b2f8683a945f45e4ba32 Mon Sep 17 00:00:00 2001 From: Karol Sobczak Date: Mon, 31 Oct 2022 11:24:55 +0100 Subject: [PATCH 4/7] Make Hive and Delta abort suppress rollback exceptions --- .../plugin/deltalake/DeltaLakePageSink.java | 20 ++++++++++--------- .../io/trino/plugin/hive/HivePageSink.java | 20 ++++++++++--------- 2 files changed, 22 insertions(+), 18 deletions(-) diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSink.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSink.java index e73c0c3a6f52..db1219d99f80 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSink.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSink.java @@ -250,21 +250,23 @@ public void abort() private void doAbort() { - Optional rollbackException = Optional.empty(); + RuntimeException rollbackException = null; for (DeltaLakeWriter writer : Iterables.concat(writers, closedWriters)) { - // writers can contain nulls if an exception is thrown when doAppend expends the writer list - if (writer != null) { - try { + try { + // writers can contain nulls if an exception is thrown when doAppend expends the writer list + if (writer != null) { writer.rollback(); } - catch (Exception e) { - LOG.warn("exception '%s' while rollback on %s", e, writer); - rollbackException = Optional.of(e); + } + catch (Throwable t) { + if (rollbackException == null) { + rollbackException = new TrinoException(DELTA_LAKE_BAD_WRITE, "Error rolling back write to Delta Lake"); } + rollbackException.addSuppressed(t); } } - if (rollbackException.isPresent()) { - throw new TrinoException(DELTA_LAKE_BAD_WRITE, "Error rolling back write to Delta Lake", rollbackException.get()); + if (rollbackException != null) { + throw rollbackException; } } diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePageSink.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePageSink.java index 1d2c669e9a9a..4e9617566e43 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePageSink.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePageSink.java @@ -256,21 +256,23 @@ public void abort() private void doAbort() { - Optional rollbackException = Optional.empty(); + RuntimeException rollbackException = null; for (HiveWriter writer : Iterables.concat(writers, closedWriters)) { - // writers can contain nulls if an exception is thrown when doAppend expends the writer list - if (writer != null) { - try { + try { + // writers can contain nulls if an exception is thrown when doAppend expends the writer list + if (writer != null) { writer.rollback(); } - catch (Exception e) { - log.warn("exception '%s' while rollback on %s", e, writer); - rollbackException = Optional.of(e); + } + catch (Throwable t) { + if (rollbackException == null) { + rollbackException = new TrinoException(HIVE_WRITER_CLOSE_ERROR, "Error rolling back write to Hive"); } + rollbackException.addSuppressed(t); } } - if (rollbackException.isPresent()) { - throw new TrinoException(HIVE_WRITER_CLOSE_ERROR, "Error rolling back write to Hive", rollbackException.get()); + if (rollbackException != null) { + throw rollbackException; } } From 4a12e5d4c3f1711f7ce9dc9050e9b535526f6d97 Mon Sep 17 00:00:00 2001 From: Karol Sobczak Date: Mon, 31 Oct 2022 18:20:36 +0100 Subject: [PATCH 5/7] Reduce estimated size of Orc and Parquet record writers Instance size of writers was not accounted for by Delta, Hive and Iceberg writers. Once it becomes accounted for, then it seems that estimated memory for Orc and Parquet is excessive. --- .../src/main/java/io/trino/plugin/hive/HiveStorageFormat.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveStorageFormat.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveStorageFormat.java index d74856376e59..a91f5c97c050 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveStorageFormat.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveStorageFormat.java @@ -61,12 +61,12 @@ public enum HiveStorageFormat OrcSerde.class.getName(), OrcInputFormat.class.getName(), OrcOutputFormat.class.getName(), - DataSize.of(256, Unit.MEGABYTE)), + DataSize.of(64, Unit.MEGABYTE)), PARQUET( ParquetHiveSerDe.class.getName(), MapredParquetInputFormat.class.getName(), MapredParquetOutputFormat.class.getName(), - DataSize.of(128, Unit.MEGABYTE)), + DataSize.of(64, Unit.MEGABYTE)), AVRO( AvroSerDe.class.getName(), AvroContainerInputFormat.class.getName(), From a8709b521bfad812895f2965f6709da96bc6bfc6 Mon Sep 17 00:00:00 2001 From: Karol Sobczak Date: Mon, 31 Oct 2022 17:44:27 +0100 Subject: [PATCH 6/7] Account for writer instance size in page sinks --- .../main/java/io/trino/plugin/deltalake/DeltaLakePageSink.java | 1 + .../src/main/java/io/trino/plugin/hive/HivePageSink.java | 1 + .../src/main/java/io/trino/plugin/iceberg/IcebergPageSink.java | 1 + 3 files changed, 3 insertions(+) diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSink.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSink.java index db1219d99f80..c7ea5a6fc19a 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSink.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSink.java @@ -405,6 +405,7 @@ private int[] getWriterIndexes(Page page) dataColumnHandles); writers.set(writerIndex, writer); + memoryUsage += writer.getMemoryUsage(); } catch (IOException e) { throw new TrinoException(DELTA_LAKE_BAD_WRITE, "Unable to create writer for location: " + outputPath, e); diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePageSink.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePageSink.java index 4e9617566e43..9753026ddfce 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePageSink.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePageSink.java @@ -405,6 +405,7 @@ private int[] getWriterIndexes(Page page) writer = writerFactory.createWriter(partitionColumns, position, bucketNumber); writers.set(writerIndex, writer); + memoryUsage += writer.getMemoryUsage(); } verify(writers.size() == pagePartitioner.getMaxIndex() + 1); verify(!writers.contains(null)); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSink.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSink.java index 6ac967b91b00..9778c071d213 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSink.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSink.java @@ -289,6 +289,7 @@ private int[] getWriterIndexes(Page page) writer = createWriter(partitionData); writers.set(writerIndex, writer); + memoryUsage += writer.getWriter().getMemoryUsage(); } verify(writers.size() == pagePartitioner.getMaxIndex() + 1); verify(!writers.contains(null)); From bde343f2bd5cbe835cc5a30b0f8690f099b03be5 Mon Sep 17 00:00:00 2001 From: Karol Sobczak Date: Fri, 28 Oct 2022 18:13:41 +0200 Subject: [PATCH 7/7] Reduce writers retained memory utilization Whenever writer is closed, it's instance was retained so that rollback could be performed in case of an error. However, this was retaining excessive amount of memory which was not needed anymore. This commit makes io.trino.plugin.hive.FileWriter#commit return rollback action reference so that page sinks don't have to keep reference to writers themselves. --- .../plugin/deltalake/DeltaLakePageSink.java | 44 ++++++++----- .../plugin/deltalake/DeltaLakeWriter.java | 5 +- .../java/io/trino/plugin/hive/FileWriter.java | 6 +- .../io/trino/plugin/hive/HivePageSink.java | 52 ++++++++------- .../java/io/trino/plugin/hive/HiveWriter.java | 6 +- .../io/trino/plugin/hive/MergeFileWriter.java | 13 +++- .../trino/plugin/hive/RcFileFileWriter.java | 4 +- .../trino/plugin/hive/RecordFileWriter.java | 4 +- .../trino/plugin/hive/SortingFileWriter.java | 7 +- .../trino/plugin/hive/orc/OrcFileWriter.java | 4 +- .../hive/parquet/ParquetFileWriter.java | 4 +- .../plugin/iceberg/IcebergAvroFileWriter.java | 4 +- .../trino/plugin/iceberg/IcebergPageSink.java | 65 +++++++++++-------- 13 files changed, 134 insertions(+), 84 deletions(-) diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSink.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSink.java index c7ea5a6fc19a..ab99be045722 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSink.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSink.java @@ -14,7 +14,7 @@ package io.trino.plugin.deltalake; import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; +import com.google.common.collect.Streams; import com.google.common.primitives.Ints; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -56,6 +56,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Properties; import java.util.concurrent.CompletableFuture; @@ -117,7 +118,7 @@ public class DeltaLakePageSink private long writtenBytes; private long memoryUsage; - private final List closedWriters = new ArrayList<>(); + private final List closedWriterRollbackActions = new ArrayList<>(); private final ImmutableList.Builder dataFileInfos = ImmutableList.builder(); public DeltaLakePageSink( @@ -229,16 +230,13 @@ public CompletableFuture> finish() private ListenableFuture> doFinish() { - for (DeltaLakeWriter writer : writers) { - closeWriter(writer); + for (int writerIndex = 0; writerIndex < writers.size(); writerIndex++) { + closeWriter(writerIndex); } + writers.clear(); List result = dataFileInfos.build(); - writtenBytes = closedWriters.stream() - .mapToLong(DeltaLakeWriter::getWrittenBytes) - .sum(); - return Futures.immediateFuture(result); } @@ -250,13 +248,17 @@ public void abort() private void doAbort() { + List rollbackActions = Streams.concat( + writers.stream() + // writers can contain nulls if an exception is thrown when doAppend expends the writer list + .filter(Objects::nonNull) + .map(writer -> writer::rollback), + closedWriterRollbackActions.stream()) + .collect(toImmutableList()); RuntimeException rollbackException = null; - for (DeltaLakeWriter writer : Iterables.concat(writers, closedWriters)) { + for (Closeable rollbackAction : rollbackActions) { try { - // writers can contain nulls if an exception is thrown when doAppend expends the writer list - if (writer != null) { - writer.rollback(); - } + rollbackAction.close(); } catch (Throwable t) { if (rollbackException == null) { @@ -365,7 +367,7 @@ private int[] getWriterIndexes(Page page) if (deltaLakeWriter.getWrittenBytes() <= targetMaxFileSize) { continue; } - closeWriter(deltaLakeWriter); + closeWriter(writerIndex); } else { continue; @@ -417,13 +419,20 @@ private int[] getWriterIndexes(Page page) return writerIndexes; } - private void closeWriter(DeltaLakeWriter writer) + private void closeWriter(int writerIndex) { + DeltaLakeWriter writer = writers.get(writerIndex); + long currentWritten = writer.getWrittenBytes(); long currentMemory = writer.getMemoryUsage(); - writer.commit(); + + closedWriterRollbackActions.add(writer.commit()); + writtenBytes += writer.getWrittenBytes() - currentWritten; - memoryUsage += writer.getMemoryUsage() - currentMemory; + memoryUsage -= currentMemory; + + writers.set(writerIndex, null); + try { DataFileInfo dataFileInfo = writer.getDataFileInfo(); dataFileInfos.add(wrappedBuffer(dataFileInfoCodec.toJsonBytes(dataFileInfo))); @@ -432,7 +441,6 @@ private void closeWriter(DeltaLakeWriter writer) LOG.warn("exception '%s' while finishing write on %s", e, writer); throw new TrinoException(DELTA_LAKE_BAD_WRITE, "Error committing Parquet file to Delta Lake", e); } - closedWriters.add(writer); } /** diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeWriter.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeWriter.java index 5a1bac849163..a09d0fd9a091 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeWriter.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeWriter.java @@ -35,6 +35,7 @@ import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; import org.apache.parquet.hadoop.util.HadoopInputFile; +import java.io.Closeable; import java.io.IOException; import java.time.Instant; import java.util.Collection; @@ -139,9 +140,9 @@ public void appendRows(Page originalPage) } @Override - public void commit() + public Closeable commit() { - fileWriter.commit(); + return fileWriter.commit(); } @Override diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/FileWriter.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/FileWriter.java index 89ed9c7a3aec..9a719a5bdd02 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/FileWriter.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/FileWriter.java @@ -15,6 +15,7 @@ import io.trino.spi.Page; +import java.io.Closeable; import java.util.Optional; public interface FileWriter @@ -25,7 +26,10 @@ public interface FileWriter void appendRows(Page dataPage); - void commit(); + /** + * Commits written data. Returns rollback {@link Closeable} which can be used to cleanup on failure. + */ + Closeable commit(); void rollback(); diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePageSink.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePageSink.java index 9753026ddfce..ee524bd925f1 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePageSink.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePageSink.java @@ -14,14 +14,13 @@ package io.trino.plugin.hive; import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; +import com.google.common.collect.Streams; import com.google.common.primitives.Ints; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import io.airlift.concurrent.MoreFutures; import io.airlift.json.JsonCodec; -import io.airlift.log.Logger; import io.airlift.slice.Slice; import io.trino.hdfs.HdfsEnvironment; import io.trino.plugin.hive.util.HiveBucketing.BucketingVersion; @@ -38,11 +37,13 @@ import it.unimi.dsi.fastutil.objects.Object2IntMap; import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap; +import java.io.Closeable; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.OptionalInt; import java.util.concurrent.Callable; @@ -51,6 +52,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Verify.verify; +import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.util.concurrent.MoreExecutors.directExecutor; import static io.airlift.slice.Slices.wrappedBuffer; import static io.trino.plugin.hive.HiveErrorCode.HIVE_TOO_MANY_OPEN_PARTITIONS; @@ -63,8 +65,6 @@ public class HivePageSink implements ConnectorPageSink, ConnectorMergeSink { - private static final Logger log = Logger.get(HivePageSink.class); - private static final int MAX_PAGE_POSITIONS = 4096; private final HiveWriterFactory writerFactory; @@ -89,7 +89,7 @@ public class HivePageSink private final ConnectorSession session; private final long targetMaxFileSize; - private final List closedWriters = new ArrayList<>(); + private final List closedWriterRollbackActions = new ArrayList<>(); private final List partitionUpdates = new ArrayList<>(); private final List> verificationTasks = new ArrayList<>(); @@ -218,17 +218,12 @@ private ListenableFuture> doMergeSinkFinish() private ListenableFuture> doInsertSinkFinish() { - for (HiveWriter writer : writers) { - closeWriter(writer); + for (int writerIndex = 0; writerIndex < writers.size(); writerIndex++) { + closeWriter(writerIndex); } - List result = ImmutableList.copyOf(partitionUpdates); + writers.clear(); - writtenBytes = closedWriters.stream() - .mapToLong(HiveWriter::getWrittenBytes) - .sum(); - validationCpuNanos = closedWriters.stream() - .mapToLong(HiveWriter::getValidationCpuNanos) - .sum(); + List result = ImmutableList.copyOf(partitionUpdates); if (verificationTasks.isEmpty()) { return Futures.immediateFuture(result); @@ -256,13 +251,17 @@ public void abort() private void doAbort() { + List rollbackActions = Streams.concat( + writers.stream() + // writers can contain nulls if an exception is thrown when doAppend expends the writer list + .filter(Objects::nonNull) + .map(writer -> writer::rollback), + closedWriterRollbackActions.stream()) + .collect(toImmutableList()); RuntimeException rollbackException = null; - for (HiveWriter writer : Iterables.concat(writers, closedWriters)) { + for (Closeable rollbackAction : rollbackActions) { try { - // writers can contain nulls if an exception is thrown when doAppend expends the writer list - if (writer != null) { - writer.rollback(); - } + rollbackAction.close(); } catch (Throwable t) { if (rollbackException == null) { @@ -351,15 +350,20 @@ private void writePage(Page page) } } - private void closeWriter(HiveWriter writer) + private void closeWriter(int writerIndex) { + HiveWriter writer = writers.get(writerIndex); + long currentWritten = writer.getWrittenBytes(); long currentMemory = writer.getMemoryUsage(); - writer.commit(); + + closedWriterRollbackActions.add(writer.commit()); + writtenBytes += (writer.getWrittenBytes() - currentWritten); - memoryUsage += (writer.getMemoryUsage() - currentMemory); + memoryUsage -= currentMemory; + validationCpuNanos += writer.getValidationCpuNanos(); - closedWriters.add(writer); + writers.set(writerIndex, null); PartitionUpdate partitionUpdate = writer.getPartitionUpdate(); partitionUpdates.add(wrappedBuffer(partitionUpdateCodec.toJsonBytes(partitionUpdate))); @@ -394,7 +398,7 @@ private int[] getWriterIndexes(Page page) continue; } // close current writer - closeWriter(writer); + closeWriter(writerIndex); } OptionalInt bucketNumber = OptionalInt.empty(); diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveWriter.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveWriter.java index c4e89fd130dd..7628b45362c5 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveWriter.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveWriter.java @@ -17,6 +17,7 @@ import io.trino.plugin.hive.PartitionUpdate.UpdateMode; import io.trino.spi.Page; +import java.io.Closeable; import java.util.Optional; import java.util.function.Consumer; @@ -86,10 +87,11 @@ public void append(Page dataPage) inputSizeInBytes += dataPage.getSizeInBytes(); } - public void commit() + public Closeable commit() { - fileWriter.commit(); + Closeable rollbackAction = fileWriter.commit(); onCommit.accept(this); + return rollbackAction; } long getValidationCpuNanos() diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/MergeFileWriter.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/MergeFileWriter.java index d7970de1ad37..ff591bc9d8e0 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/MergeFileWriter.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/MergeFileWriter.java @@ -29,6 +29,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import java.io.Closeable; import java.io.IOException; import java.util.List; import java.util.Optional; @@ -143,10 +144,16 @@ public long getMemoryUsage() } @Override - public void commit() + public Closeable commit() { - deleteFileWriter.ifPresent(FileWriter::commit); - insertFileWriter.ifPresent(FileWriter::commit); + Optional deleteRollbackAction = deleteFileWriter.map(FileWriter::commit); + Optional insertRollbackAction = insertFileWriter.map(FileWriter::commit); + return () -> { + try (Closer closer = Closer.create()) { + insertRollbackAction.ifPresent(closer::register); + deleteRollbackAction.ifPresent(closer::register); + } + }; } @Override diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/RcFileFileWriter.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/RcFileFileWriter.java index f64be8349d5c..50ec569d9024 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/RcFileFileWriter.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/RcFileFileWriter.java @@ -131,7 +131,7 @@ public void appendRows(Page dataPage) } @Override - public void commit() + public Closeable commit() { try { rcFileWriter.close(); @@ -158,6 +158,8 @@ public void commit() throw new TrinoException(HIVE_WRITE_VALIDATION_FAILED, e); } } + + return rollbackAction; } @Override diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/RecordFileWriter.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/RecordFileWriter.java index d186679c01af..3d6345968400 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/RecordFileWriter.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/RecordFileWriter.java @@ -193,7 +193,7 @@ public void appendRow(Page dataPage, int position) } @Override - public void commit() + public Closeable commit() { try { recordWriter.close(false); @@ -202,6 +202,8 @@ public void commit() catch (IOException e) { throw new TrinoException(HIVE_WRITER_CLOSE_ERROR, "Error committing write to Hive", e); } + + return createRollbackAction(path, conf); } @Override diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/SortingFileWriter.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/SortingFileWriter.java index 2ea0b63aeeae..f098c5d68b95 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/SortingFileWriter.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/SortingFileWriter.java @@ -129,14 +129,15 @@ public void appendRows(Page page) } @Override - public void commit() + public Closeable commit() { + Closeable rollbackAction = createRollbackAction(fileSystem, tempFiles); if (!sortBuffer.isEmpty()) { // skip temporary files entirely if the total output size is small if (tempFiles.isEmpty()) { sortBuffer.flushTo(outputWriter::appendRows); outputWriter.commit(); - return; + return rollbackAction; } flushToTempFile(); @@ -149,6 +150,8 @@ public void commit() catch (UncheckedIOException e) { throw new TrinoException(HIVE_WRITER_CLOSE_ERROR, "Error committing write to Hive", e); } + + return rollbackAction; } @Override diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OrcFileWriter.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OrcFileWriter.java index 65cc59097317..c0e32909e28d 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OrcFileWriter.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OrcFileWriter.java @@ -180,7 +180,7 @@ public void appendRows(Page dataPage) } @Override - public void commit() + public Closeable commit() { try { if (transaction.isAcidTransactionRunning() && useAcidSchema) { @@ -211,6 +211,8 @@ public void commit() throw new TrinoException(HIVE_WRITE_VALIDATION_FAILED, e); } } + + return rollbackAction; } private void updateUserMetadata() diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetFileWriter.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetFileWriter.java index d9c503eba7a6..d31e47a7c0b7 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetFileWriter.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetFileWriter.java @@ -138,7 +138,7 @@ public void appendRows(Page dataPage) } @Override - public void commit() + public Closeable commit() { try { parquetWriter.close(); @@ -165,6 +165,8 @@ public void commit() throw new TrinoException(HIVE_WRITE_VALIDATION_FAILED, e); } } + + return rollbackAction; } @Override diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergAvroFileWriter.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergAvroFileWriter.java index 0a991b7b48a5..075bda00543f 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergAvroFileWriter.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergAvroFileWriter.java @@ -97,7 +97,7 @@ public void appendRows(Page dataPage) } @Override - public void commit() + public Closeable commit() { try { avroWriter.close(); @@ -113,6 +113,8 @@ public void commit() } throw new TrinoException(ICEBERG_WRITER_CLOSE_ERROR, "Error closing Avro file", e); } + + return rollbackAction; } @Override diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSink.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSink.java index 9778c071d213..5c9bc83a8a6a 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSink.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSink.java @@ -14,7 +14,7 @@ package io.trino.plugin.iceberg; import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; +import com.google.common.collect.Streams; import io.airlift.json.JsonCodec; import io.airlift.slice.Slice; import io.trino.filesystem.TrinoFileSystem; @@ -47,11 +47,13 @@ import org.apache.iceberg.io.LocationProvider; import org.apache.iceberg.transforms.Transform; +import java.io.Closeable; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.function.Function; @@ -98,7 +100,7 @@ public class IcebergPageSink private final Map storageProperties; private final List writers = new ArrayList<>(); - private final List closedWriters = new ArrayList<>(); + private final List closedWriterRollbackActions = new ArrayList<>(); private final Collection commitTasks = new ArrayList<>(); private long writtenBytes; @@ -163,16 +165,10 @@ public CompletableFuture appendPage(Page page) @Override public CompletableFuture> finish() { - for (WriteContext context : writers) { - closeWriter(context); + for (int writerIndex = 0; writerIndex < writers.size(); writerIndex++) { + closeWriter(writerIndex); } - - writtenBytes = closedWriters.stream() - .mapToLong(writer -> writer.getWriter().getWrittenBytes()) - .sum(); - validationCpuNanos = closedWriters.stream() - .mapToLong(writer -> writer.getWriter().getValidationCpuNanos()) - .sum(); + writers.clear(); return completedFuture(commitTasks); } @@ -180,12 +176,16 @@ public CompletableFuture> finish() @Override public void abort() { + List rollbackActions = Streams.concat( + writers.stream() + .filter(Objects::nonNull) + .map(writer -> writer::rollback), + closedWriterRollbackActions.stream()) + .collect(toImmutableList()); RuntimeException error = null; - for (WriteContext context : Iterables.concat(writers, closedWriters)) { + for (Closeable rollbackAction : rollbackActions) { try { - if (context != null) { - context.getWriter().rollback(); - } + rollbackAction.close(); } catch (Throwable t) { if (error == null) { @@ -282,7 +282,7 @@ private int[] getWriterIndexes(Page page) if (writer.getWrittenBytes() <= targetMaxFileSize) { continue; } - closeWriter(writer); + closeWriter(writerIndex); } Optional partitionData = getPartitionData(pagePartitioner.getColumns(), page, position); @@ -297,19 +297,27 @@ private int[] getWriterIndexes(Page page) return writerIndexes; } - private void closeWriter(WriteContext writeContext) + private void closeWriter(int writerIndex) { - long currentWritten = writeContext.getWriter().getWrittenBytes(); - long currentMemory = writeContext.getWriter().getMemoryUsage(); - writeContext.getWriter().commit(); - writtenBytes += (writeContext.getWriter().getWrittenBytes() - currentWritten); - memoryUsage += (writeContext.getWriter().getMemoryUsage() - currentMemory); + WriteContext writeContext = writers.get(writerIndex); + IcebergFileWriter writer = writeContext.getWriter(); + + long currentWritten = writer.getWrittenBytes(); + long currentMemory = writer.getMemoryUsage(); + + closedWriterRollbackActions.add(writer.commit()); + + validationCpuNanos += writer.getValidationCpuNanos(); + writtenBytes += (writer.getWrittenBytes() - currentWritten); + memoryUsage -= currentMemory; + + writers.set(writerIndex, null); CommitTaskData task = new CommitTaskData( - writeContext.getPath().toString(), + writeContext.getPath(), fileFormat, - writeContext.getWriter().getWrittenBytes(), - new MetricsWrapper(writeContext.getWriter().getMetrics()), + writer.getWrittenBytes(), + new MetricsWrapper(writer.getMetrics()), PartitionSpecParser.toJson(partitionSpec), writeContext.getPartitionData().map(PartitionData::toJson), DATA, @@ -318,8 +326,6 @@ private void closeWriter(WriteContext writeContext) Optional.empty()); commitTasks.add(wrappedBuffer(jsonCodec.toJsonBytes(task))); - - closedWriters.add(writeContext); } private WriteContext createWriter(Optional partitionData) @@ -460,6 +466,11 @@ public long getWrittenBytes() { return writer.getWrittenBytes(); } + + public void rollback() + { + writer.rollback(); + } } private static class PagePartitioner