Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,14 @@
import org.roaringbitmap.longlong.LongBitmapDataProvider;
import org.roaringbitmap.longlong.Roaring64Bitmap;

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.stream.IntStream;

Expand Down Expand Up @@ -170,7 +170,7 @@ public CompletableFuture<Collection<Slice>> finish()
return completedFuture(fragments);
}

// In spite of the name "Delta" Lake, we must rewrite the entire file to delete rows.
// In spite of the name "Delta" Lake, we must rewrite the entire file to delete rows.
private List<Slice> rewriteFile(Path sourcePath, FileDeletion deletion)
{
try {
Expand Down Expand Up @@ -210,10 +210,7 @@ private FileWriter createParquetFileWriter(FileSystem fileSystem, Path path, Lis
CompressionCodecName compressionCodecName = getCompressionCodec(session).getParquetCompressionCodec();

try {
Callable<Void> rollbackAction = () -> {
fileSystem.delete(path, false);
return null;
};
Closeable rollbackAction = () -> fileSystem.delete(path, false);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To me making the rollback action a Closeable looks weird, there is no resource that we are releasing.
You could use Runnable instead if the problem is that you want to avoid extra return null.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could use Runnable instead if the problem is that you want to avoid extra return null.

I actually use it in try-with-resource, e.g:

        try (rollbackAction) {
            avroWriter.close();
        }
        catch (Exception e) {
            throw new TrinoException(ICEBERG_WRITER_CLOSE_ERROR, "Error rolling back write to Avro file", e);
        }

so I don't have to deal with syntax sugar myself. I think using native Java try-with-resource is actually better than relying on Guava closer.


List<Type> parquetTypes = dataColumns.stream()
.map(column -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
package io.trino.plugin.deltalake;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Streams;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
Expand Down Expand Up @@ -49,15 +49,16 @@
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.joda.time.DateTimeZone;

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;

import static com.google.common.base.Verify.verify;
Expand Down Expand Up @@ -117,7 +118,7 @@ public class DeltaLakePageSink
private long writtenBytes;
private long memoryUsage;

private final List<DeltaLakeWriter> closedWriters = new ArrayList<>();
private final List<Closeable> closedWriterRollbackActions = new ArrayList<>();
private final ImmutableList.Builder<Slice> dataFileInfos = ImmutableList.builder();

public DeltaLakePageSink(
Expand Down Expand Up @@ -229,16 +230,13 @@ public CompletableFuture<Collection<Slice>> finish()

private ListenableFuture<Collection<Slice>> doFinish()
{
for (DeltaLakeWriter writer : writers) {
closeWriter(writer);
for (int writerIndex = 0; writerIndex < writers.size(); writerIndex++) {
closeWriter(writerIndex);
}
writers.clear();

List<Slice> result = dataFileInfos.build();

writtenBytes = closedWriters.stream()
.mapToLong(DeltaLakeWriter::getWrittenBytes)
.sum();

return Futures.immediateFuture(result);
}

Expand All @@ -250,21 +248,27 @@ public void abort()

private void doAbort()
{
Optional<Exception> rollbackException = Optional.empty();
for (DeltaLakeWriter writer : Iterables.concat(writers, closedWriters)) {
// writers can contain nulls if an exception is thrown when doAppend expends the writer list
if (writer != null) {
try {
writer.rollback();
}
catch (Exception e) {
LOG.warn("exception '%s' while rollback on %s", e, writer);
rollbackException = Optional.of(e);
List<Closeable> rollbackActions = Streams.concat(
writers.stream()
// writers can contain nulls if an exception is thrown when doAppend expends the writer list
Comment thread
sopel39 marked this conversation as resolved.
Outdated
.filter(Objects::nonNull)
.map(writer -> writer::rollback),
closedWriterRollbackActions.stream())
.collect(toImmutableList());
RuntimeException rollbackException = null;
for (Closeable rollbackAction : rollbackActions) {
try {
rollbackAction.close();
}
catch (Throwable t) {
if (rollbackException == null) {
rollbackException = new TrinoException(DELTA_LAKE_BAD_WRITE, "Error rolling back write to Delta Lake");
}
rollbackException.addSuppressed(t);
}
}
if (rollbackException.isPresent()) {
throw new TrinoException(DELTA_LAKE_BAD_WRITE, "Error rolling back write to Delta Lake", rollbackException.get());
if (rollbackException != null) {
throw rollbackException;
}
}

Expand Down Expand Up @@ -363,7 +367,7 @@ private int[] getWriterIndexes(Page page)
if (deltaLakeWriter.getWrittenBytes() <= targetMaxFileSize) {
continue;
}
closeWriter(deltaLakeWriter);
closeWriter(writerIndex);
}
else {
continue;
Expand Down Expand Up @@ -403,6 +407,7 @@ private int[] getWriterIndexes(Page page)
dataColumnHandles);

writers.set(writerIndex, writer);
memoryUsage += writer.getMemoryUsage();
}
catch (IOException e) {
throw new TrinoException(DELTA_LAKE_BAD_WRITE, "Unable to create writer for location: " + outputPath, e);
Expand All @@ -414,13 +419,20 @@ private int[] getWriterIndexes(Page page)
return writerIndexes;
}

private void closeWriter(DeltaLakeWriter writer)
private void closeWriter(int writerIndex)
{
DeltaLakeWriter writer = writers.get(writerIndex);

long currentWritten = writer.getWrittenBytes();
long currentMemory = writer.getMemoryUsage();
writer.commit();

closedWriterRollbackActions.add(writer.commit());

writtenBytes += writer.getWrittenBytes() - currentWritten;
memoryUsage += writer.getMemoryUsage() - currentMemory;
memoryUsage -= currentMemory;

writers.set(writerIndex, null);

try {
DataFileInfo dataFileInfo = writer.getDataFileInfo();
dataFileInfos.add(wrappedBuffer(dataFileInfoCodec.toJsonBytes(dataFileInfo)));
Expand All @@ -429,7 +441,6 @@ private void closeWriter(DeltaLakeWriter writer)
LOG.warn("exception '%s' while finishing write on %s", e, writer);
throw new TrinoException(DELTA_LAKE_BAD_WRITE, "Error committing Parquet file to Delta Lake", e);
}
closedWriters.add(writer);
}

/**
Expand Down Expand Up @@ -469,10 +480,7 @@ private FileWriter createParquetFileWriter(Path path)

try {
FileSystem fileSystem = hdfsEnvironment.getFileSystem(session.getIdentity(), path, conf);
Callable<Void> rollbackAction = () -> {
fileSystem.delete(path, false);
return null;
};
Closeable rollbackAction = () -> fileSystem.delete(path, false);

List<Type> parquetTypes = dataColumnTypes.stream()
.map(type -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
import org.apache.parquet.hadoop.util.HadoopInputFile;

import java.io.Closeable;
import java.io.IOException;
import java.time.Instant;
import java.util.Collection;
Expand Down Expand Up @@ -139,9 +140,9 @@ public void appendRows(Page originalPage)
}

@Override
public void commit()
public Closeable commit()
{
fileWriter.commit();
return fileWriter.commit();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import io.trino.spi.Page;

import java.io.Closeable;
import java.util.Optional;

public interface FileWriter
Expand All @@ -25,7 +26,10 @@ public interface FileWriter

void appendRows(Page dataPage);

void commit();
/**
* Commits written data. Returns rollback {@link Closeable} which can be used to cleanup on failure.
*/
Closeable commit();

void rollback();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,13 @@
package io.trino.plugin.hive;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Streams;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import io.airlift.concurrent.MoreFutures;
import io.airlift.json.JsonCodec;
import io.airlift.log.Logger;
import io.airlift.slice.Slice;
import io.trino.hdfs.HdfsEnvironment;
import io.trino.plugin.hive.util.HiveBucketing.BucketingVersion;
Expand All @@ -38,11 +37,13 @@
import it.unimi.dsi.fastutil.objects.Object2IntMap;
import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;

import java.io.Closeable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.concurrent.Callable;
Expand All @@ -51,6 +52,7 @@

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static io.airlift.slice.Slices.wrappedBuffer;
import static io.trino.plugin.hive.HiveErrorCode.HIVE_TOO_MANY_OPEN_PARTITIONS;
Expand All @@ -63,8 +65,6 @@
public class HivePageSink
implements ConnectorPageSink, ConnectorMergeSink
{
private static final Logger log = Logger.get(HivePageSink.class);

private static final int MAX_PAGE_POSITIONS = 4096;

private final HiveWriterFactory writerFactory;
Expand All @@ -89,7 +89,7 @@ public class HivePageSink
private final ConnectorSession session;

private final long targetMaxFileSize;
private final List<HiveWriter> closedWriters = new ArrayList<>();
private final List<Closeable> closedWriterRollbackActions = new ArrayList<>();
private final List<Slice> partitionUpdates = new ArrayList<>();
private final List<Callable<Object>> verificationTasks = new ArrayList<>();

Expand Down Expand Up @@ -218,17 +218,12 @@ private ListenableFuture<Collection<Slice>> doMergeSinkFinish()

private ListenableFuture<Collection<Slice>> doInsertSinkFinish()
{
for (HiveWriter writer : writers) {
closeWriter(writer);
for (int writerIndex = 0; writerIndex < writers.size(); writerIndex++) {
closeWriter(writerIndex);
}
List<Slice> result = ImmutableList.copyOf(partitionUpdates);
writers.clear();

writtenBytes = closedWriters.stream()
.mapToLong(HiveWriter::getWrittenBytes)
.sum();
validationCpuNanos = closedWriters.stream()
.mapToLong(HiveWriter::getValidationCpuNanos)
.sum();
List<Slice> result = ImmutableList.copyOf(partitionUpdates);

if (verificationTasks.isEmpty()) {
return Futures.immediateFuture(result);
Expand Down Expand Up @@ -256,21 +251,27 @@ public void abort()

private void doAbort()
{
Optional<Exception> rollbackException = Optional.empty();
for (HiveWriter writer : Iterables.concat(writers, closedWriters)) {
// writers can contain nulls if an exception is thrown when doAppend expends the writer list
if (writer != null) {
try {
writer.rollback();
}
catch (Exception e) {
log.warn("exception '%s' while rollback on %s", e, writer);
rollbackException = Optional.of(e);
List<Closeable> rollbackActions = Streams.concat(
writers.stream()
// writers can contain nulls if an exception is thrown when doAppend expends the writer list
Comment thread
sopel39 marked this conversation as resolved.
Outdated
.filter(Objects::nonNull)
.map(writer -> writer::rollback),
closedWriterRollbackActions.stream())
.collect(toImmutableList());
RuntimeException rollbackException = null;
for (Closeable rollbackAction : rollbackActions) {
try {
rollbackAction.close();
}
catch (Throwable t) {
if (rollbackException == null) {
rollbackException = new TrinoException(HIVE_WRITER_CLOSE_ERROR, "Error rolling back write to Hive");
}
rollbackException.addSuppressed(t);
}
}
if (rollbackException.isPresent()) {
throw new TrinoException(HIVE_WRITER_CLOSE_ERROR, "Error rolling back write to Hive", rollbackException.get());
if (rollbackException != null) {
throw rollbackException;
}
}

Expand Down Expand Up @@ -349,15 +350,20 @@ private void writePage(Page page)
}
}

private void closeWriter(HiveWriter writer)
private void closeWriter(int writerIndex)
{
HiveWriter writer = writers.get(writerIndex);
Comment thread
sopel39 marked this conversation as resolved.
Outdated

long currentWritten = writer.getWrittenBytes();
long currentMemory = writer.getMemoryUsage();
writer.commit();

closedWriterRollbackActions.add(writer.commit());

writtenBytes += (writer.getWrittenBytes() - currentWritten);
memoryUsage += (writer.getMemoryUsage() - currentMemory);
memoryUsage -= currentMemory;
validationCpuNanos += writer.getValidationCpuNanos();

closedWriters.add(writer);
writers.set(writerIndex, null);
Comment thread
sopel39 marked this conversation as resolved.
Outdated

PartitionUpdate partitionUpdate = writer.getPartitionUpdate();
partitionUpdates.add(wrappedBuffer(partitionUpdateCodec.toJsonBytes(partitionUpdate)));
Expand Down Expand Up @@ -392,7 +398,7 @@ private int[] getWriterIndexes(Page page)
continue;
}
// close current writer
closeWriter(writer);
closeWriter(writerIndex);
}

OptionalInt bucketNumber = OptionalInt.empty();
Expand All @@ -403,6 +409,7 @@ private int[] getWriterIndexes(Page page)
writer = writerFactory.createWriter(partitionColumns, position, bucketNumber);

writers.set(writerIndex, writer);
memoryUsage += writer.getMemoryUsage();
}
verify(writers.size() == pagePartitioner.getMaxIndex() + 1);
verify(!writers.contains(null));
Expand Down
Loading