Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -129,12 +129,14 @@
import java.util.OptionalLong;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static com.facebook.airlift.concurrent.MoreFutures.toCompletableFuture;
import static com.facebook.presto.expressions.LogicalRowExpressions.TRUE_CONSTANT;
import static com.facebook.presto.expressions.LogicalRowExpressions.and;
import static com.facebook.presto.expressions.LogicalRowExpressions.binaryExpression;
Expand Down Expand Up @@ -2418,17 +2420,17 @@ public List<GrantInfo> listTablePrivileges(ConnectorSession session, SchemaTable
}

@Override
public void commitPartition(ConnectorSession session, ConnectorOutputTableHandle tableHandle, Collection<Slice> fragments)
public CompletableFuture<Void> commitPartitionAsync(ConnectorSession session, ConnectorOutputTableHandle tableHandle, Collection<Slice> fragments)
{
HiveOutputTableHandle handle = (HiveOutputTableHandle) tableHandle;
stagingFileCommitter.commitFiles(session, handle.getSchemaName(), handle.getTableName(), getPartitionUpdates(fragments));
return toCompletableFuture(stagingFileCommitter.commitFiles(session, handle.getSchemaName(), handle.getTableName(), getPartitionUpdates(fragments)));
}

@Override
public void commitPartition(ConnectorSession session, ConnectorInsertTableHandle tableHandle, Collection<Slice> fragments)
public CompletableFuture<Void> commitPartitionAsync(ConnectorSession session, ConnectorInsertTableHandle tableHandle, Collection<Slice> fragments)
{
HiveInsertTableHandle handle = (HiveInsertTableHandle) tableHandle;
stagingFileCommitter.commitFiles(session, handle.getSchemaName(), handle.getTableName(), getPartitionUpdates(fragments));
return toCompletableFuture(stagingFileCommitter.commitFiles(session, handle.getSchemaName(), handle.getTableName(), getPartitionUpdates(fragments)));
}

private List<GrantInfo> buildGrants(SchemaTableName tableName, PrestoPrincipal principal)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import com.facebook.presto.hive.HdfsEnvironment.HdfsContext;
import com.facebook.presto.hive.PartitionUpdate.FileWriteInfo;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.PrestoException;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.hadoop.fs.FileSystem;
Expand All @@ -27,10 +26,10 @@
import java.util.ArrayList;
import java.util.List;

import static com.facebook.airlift.concurrent.MoreFutures.getFutureValue;
import static com.facebook.presto.hive.metastore.MetastoreUtil.getFileSystem;
import static com.facebook.presto.hive.metastore.MetastoreUtil.renameFile;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.util.concurrent.Futures.catching;
import static com.google.common.util.concurrent.Futures.whenAllSucceed;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static java.util.Objects.requireNonNull;
Expand All @@ -51,10 +50,10 @@ public HiveStagingFileCommitter(
}

@Override
public void commitFiles(ConnectorSession session, String schemaName, String tableName, List<PartitionUpdate> partitionUpdates)
public ListenableFuture<Void> commitFiles(ConnectorSession session, String schemaName, String tableName, List<PartitionUpdate> partitionUpdates)
{
HdfsContext context = new HdfsContext(session, schemaName, tableName);
List<ListenableFuture<?>> commitFutures = new ArrayList<>();
List<ListenableFuture<Void>> commitFutures = new ArrayList<>();

for (PartitionUpdate partitionUpdate : partitionUpdates) {
Path path = partitionUpdate.getWritePath();
Expand All @@ -63,17 +62,22 @@ public void commitFiles(ConnectorSession session, String schemaName, String tabl
checkState(!fileWriteInfo.getWriteFileName().equals(fileWriteInfo.getTargetFileName()));
Path source = new Path(path, fileWriteInfo.getWriteFileName());
Path target = new Path(path, fileWriteInfo.getTargetFileName());
commitFutures.add(fileRenameExecutor.submit(() -> renameFile(fileSystem, source, target)));
commitFutures.add(fileRenameExecutor.submit(() -> {
renameFile(fileSystem, source, target);
return null;
}));
}
}

ListenableFuture<?> listenableFutureAggregate = whenAllSucceed(commitFutures).call(() -> null, directExecutor());
try {
getFutureValue(listenableFutureAggregate, PrestoException.class);
}
catch (RuntimeException e) {
listenableFutureAggregate.cancel(true);
throw e;
}
ListenableFuture<Void> result = whenAllSucceed(commitFutures).call(() -> null, directExecutor());
return catching(
Comment thread
shixuan-fan marked this conversation as resolved.
Outdated
result,
RuntimeException.class,
e -> {
checkState(e != null, "Null exception is caught during commitFiles");
result.cancel(true);
throw e;
},
directExecutor());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@
package com.facebook.presto.hive;

import com.facebook.presto.spi.ConnectorSession;
import com.google.common.util.concurrent.ListenableFuture;

import java.util.List;

public interface StagingFileCommitter
{
void commitFiles(ConnectorSession session, String schemaName, String tableName, List<PartitionUpdate> partitionUpdates);
ListenableFuture<Void> commitFiles(ConnectorSession session, String schemaName, String tableName, List<PartitionUpdate> partitionUpdates);
}
Original file line number Diff line number Diff line change
Expand Up @@ -3614,7 +3614,7 @@ protected void doCreateTable(SchemaTableName tableName, HiveStorageFormat storag

if (pageSinkProperties.isPartitionCommitRequired()) {
assertValidPartitionCommitFragments(fragments);
metadata.commitPartition(session, outputHandle, fragments);
metadata.commitPartitionAsync(session, outputHandle, fragments).get();
}

// verify all new files start with the unique prefix
Expand Down Expand Up @@ -3783,7 +3783,7 @@ private void doInsert(HiveStorageFormat storageFormat, SchemaTableName tableName
Collection<Slice> fragments = getFutureValue(sink.finish());
if (pageSinkProperties.isPartitionCommitRequired()) {
assertValidPartitionCommitFragments(fragments);
metadata.commitPartition(session, insertTableHandle, fragments);
metadata.commitPartitionAsync(session, insertTableHandle, fragments).get();
}
metadata.finishInsert(session, insertTableHandle, fragments, ImmutableList.of());

Expand Down Expand Up @@ -4006,7 +4006,7 @@ private void doInsertIntoNewPartition(HiveStorageFormat storageFormat, SchemaTab
Collection<Slice> fragments = getFutureValue(sink.finish());
if (pageSinkProperties.isPartitionCommitRequired()) {
assertValidPartitionCommitFragments(fragments);
metadata.commitPartition(session, insertTableHandle, fragments);
metadata.commitPartitionAsync(session, insertTableHandle, fragments).get();
}
metadata.finishInsert(session, insertTableHandle, fragments, ImmutableList.of());

Expand Down Expand Up @@ -4125,7 +4125,7 @@ private void doInsertIntoExistingPartition(HiveStorageFormat storageFormat, Sche
Collection<Slice> fragments = getFutureValue(sink.finish());
if (pageSinkProperties.isPartitionCommitRequired()) {
assertValidPartitionCommitFragments(fragments);
metadata.commitPartition(session, insertTableHandle, fragments);
metadata.commitPartitionAsync(session, insertTableHandle, fragments).get();
}
metadata.finishInsert(session, insertTableHandle, fragments, ImmutableList.of());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import com.facebook.presto.spi.type.TypeManager;
import com.facebook.presto.spi.type.TypeSignature;
import com.facebook.presto.sql.planner.PartitioningHandle;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.slice.Slice;

import java.util.Collection;
Expand Down Expand Up @@ -418,13 +419,13 @@ public interface Metadata
* Commits partition for table creation.
*/
@Experimental
void commitPartition(Session session, OutputTableHandle tableHandle, Collection<Slice> fragments);
ListenableFuture<Void> commitPartitionAsync(Session session, OutputTableHandle tableHandle, Collection<Slice> fragments);

/**
* Commits partition for table insertion.
*/
@Experimental
void commitPartition(Session session, InsertTableHandle tableHandle, Collection<Slice> fragments);
ListenableFuture<Void> commitPartitionAsync(Session session, InsertTableHandle tableHandle, Collection<Slice> fragments);

FunctionManager getFunctionManager();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Multimap;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.slice.Slice;

import javax.inject.Inject;
Expand All @@ -92,6 +93,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import static com.facebook.airlift.concurrent.MoreFutures.toListenableFuture;
import static com.facebook.presto.expressions.LogicalRowExpressions.FALSE_CONSTANT;
import static com.facebook.presto.metadata.QualifiedObjectName.convertFromSchemaTableName;
import static com.facebook.presto.metadata.TableLayout.fromConnectorLayout;
Expand Down Expand Up @@ -1156,25 +1158,25 @@ public List<GrantInfo> listTablePrivileges(Session session, QualifiedTablePrefix
}

@Override
public void commitPartition(Session session, OutputTableHandle tableHandle, Collection<Slice> fragments)
public ListenableFuture<Void> commitPartitionAsync(Session session, OutputTableHandle tableHandle, Collection<Slice> fragments)
{
ConnectorId connectorId = tableHandle.getConnectorId();
CatalogMetadata catalogMetadata = getCatalogMetadata(session, connectorId);
ConnectorMetadata metadata = catalogMetadata.getMetadata();
ConnectorSession connectorSession = session.toConnectorSession(connectorId);

metadata.commitPartition(connectorSession, tableHandle.getConnectorHandle(), fragments);
return toListenableFuture(metadata.commitPartitionAsync(connectorSession, tableHandle.getConnectorHandle(), fragments));
}

@Override
public void commitPartition(Session session, InsertTableHandle tableHandle, Collection<Slice> fragments)
public ListenableFuture<Void> commitPartitionAsync(Session session, InsertTableHandle tableHandle, Collection<Slice> fragments)
{
ConnectorId connectorId = tableHandle.getConnectorId();
CatalogMetadata catalogMetadata = getCatalogMetadata(session, connectorId);
ConnectorMetadata metadata = catalogMetadata.getMetadata();
ConnectorSession connectorSession = session.toConnectorSession(connectorId);

metadata.commitPartition(connectorSession, tableHandle.getConnectorHandle(), fragments);
return toListenableFuture(metadata.commitPartitionAsync(connectorSession, tableHandle.getConnectorHandle(), fragments));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.facebook.presto.operator.OperationTimer.OperationTiming;
import com.facebook.presto.spi.Page;
import com.facebook.presto.spi.PageBuilder;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.block.Block;
import com.facebook.presto.spi.connector.ConnectorOutputMetadata;
import com.facebook.presto.spi.plan.PlanNodeId;
Expand All @@ -31,12 +32,14 @@
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.slice.Slice;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.stream.Stream;

import static com.facebook.presto.SystemSessionProperties.isStatisticsCpuTimerEnabled;
Expand All @@ -48,9 +51,13 @@
import static com.facebook.presto.spi.type.VarbinaryType.VARBINARY;
import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Throwables.propagateIfPossible;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.util.concurrent.Futures.whenAllSucceed;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static io.airlift.units.Duration.succinctNanos;
import static java.lang.Thread.currentThread;
import static java.util.Objects.requireNonNull;

public class TableFinishOperator
Expand Down Expand Up @@ -241,6 +248,7 @@ public Page getOutput()
}
state = State.FINISHED;

lifespanAndStageStateTracker.waitForAllLifespanCommitted();
outputMetadata = tableFinisher.finishTable(lifespanAndStageStateTracker.getFinalFragments(), computedStatisticsBuilder.build());

// output page will only be constructed once,
Expand Down Expand Up @@ -293,7 +301,7 @@ public interface TableFinisher

public interface LifespanCommitter
{
void commitLifespan(Collection<Slice> fragments);
ListenableFuture<Void> commitLifespan(Collection<Slice> fragments);
}

// A lifespan in a stage defines the unit for commit and recovery in recoverable grouped execution
Expand All @@ -307,12 +315,31 @@ private static class LifespanAndStageStateTracker
private final Map<LifespanAndStage, LifespanAndStageState> committedRecoverableLifespanAndStages = new HashMap<>();

private final LifespanCommitter lifespanCommitter;
private final List<ListenableFuture<Void>> commitFutures = new ArrayList<>();

LifespanAndStageStateTracker(LifespanCommitter lifespanCommitter)
{
this.lifespanCommitter = requireNonNull(lifespanCommitter, "lifespanCommitter is null");
}

public void waitForAllLifespanCommitted()
{
ListenableFuture<Void> future = whenAllSucceed(commitFutures).call(() -> null, directExecutor());
try {
future.get();
}
catch (InterruptedException e) {
future.cancel(true);
currentThread().interrupt();
throw new RuntimeException(e);
}
catch (ExecutionException e) {
future.cancel(true);
propagateIfPossible(e.getCause(), PrestoException.class);
throw new RuntimeException(e.getCause());
}
}

public void update(Page page, TableCommitContext tableCommitContext)
{
LifespanAndStage lifespanAndStage = LifespanAndStage.fromTableCommitContext(tableCommitContext);
Expand All @@ -336,7 +363,7 @@ public void update(Page page, TableCommitContext tableCommitContext)
LifespanAndStageState lifespanAndStageState = lifespanStageStatesPerTask.get(tableCommitContext.getTaskId());
committedRecoverableLifespanAndStages.put(lifespanAndStage, lifespanAndStageState);
uncommittedRecoverableLifespanAndStageStates.remove(lifespanAndStage);
lifespanCommitter.commitLifespan(lifespanAndStageState.getFragments());
commitFutures.add(lifespanCommitter.commitLifespan(lifespanAndStageState.getFragments()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2869,10 +2869,10 @@ private static LifespanCommitter createLifespanCommitter(Session session, Metada
{
return fragments -> {
if (target instanceof CreateHandle) {
metadata.commitPartition(session, ((CreateHandle) target).getHandle(), fragments);
return metadata.commitPartitionAsync(session, ((CreateHandle) target).getHandle(), fragments);
}
else if (target instanceof InsertHandle) {
metadata.commitPartition(session, ((InsertHandle) target).getHandle(), fragments);
return metadata.commitPartitionAsync(session, ((InsertHandle) target).getHandle(), fragments);
}
else {
throw new AssertionError("Unhandled target type: " + target.getClass().getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import com.facebook.presto.spi.type.TypeManager;
import com.facebook.presto.spi.type.TypeSignature;
import com.facebook.presto.sql.planner.PartitioningHandle;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.slice.Slice;

import java.util.Collection;
Expand Down Expand Up @@ -471,13 +472,13 @@ public List<GrantInfo> listTablePrivileges(Session session, QualifiedTablePrefix
}

@Override
public void commitPartition(Session session, OutputTableHandle tableHandle, Collection<Slice> fragments)
public ListenableFuture<Void> commitPartitionAsync(Session session, OutputTableHandle tableHandle, Collection<Slice> fragments)
{
throw new UnsupportedOperationException();
}

@Override
public void commitPartition(Session session, InsertTableHandle tableHandle, Collection<Slice> fragments)
public ListenableFuture<Void> commitPartitionAsync(Session session, InsertTableHandle tableHandle, Collection<Slice> fragments)
{
throw new UnsupportedOperationException();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.facebook.presto.sql.planner.plan.StatisticAggregationsDescriptor;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;
import org.testng.annotations.AfterClass;
Expand Down Expand Up @@ -60,6 +61,7 @@
import static com.facebook.presto.testing.TestingTaskContext.createTaskContext;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.Iterables.getOnlyElement;
import static com.google.common.util.concurrent.Futures.immediateFuture;
import static java.util.concurrent.Executors.newScheduledThreadPool;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -304,9 +306,10 @@ private static class TestingLifespanCommitter
private List<Collection<Slice>> fragmentsList = new ArrayList<>();

@Override
public void commitLifespan(Collection<Slice> fragments)
public ListenableFuture<Void> commitLifespan(Collection<Slice> fragments)
{
fragmentsList.add(fragments);
return immediateFuture(null);
}

public List<Collection<Slice>> getCommittedFragments()
Expand Down
Loading