diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveMetadata.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveMetadata.java index f1b184cba8a8d..839377998d624 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveMetadata.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveMetadata.java @@ -129,12 +129,14 @@ import java.util.OptionalLong; import java.util.Properties; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.function.Function; import java.util.function.Predicate; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.IntStream; +import static com.facebook.airlift.concurrent.MoreFutures.toCompletableFuture; import static com.facebook.presto.expressions.LogicalRowExpressions.TRUE_CONSTANT; import static com.facebook.presto.expressions.LogicalRowExpressions.and; import static com.facebook.presto.expressions.LogicalRowExpressions.binaryExpression; @@ -2418,17 +2420,17 @@ public List listTablePrivileges(ConnectorSession session, SchemaTable } @Override - public void commitPartition(ConnectorSession session, ConnectorOutputTableHandle tableHandle, Collection fragments) + public CompletableFuture commitPartitionAsync(ConnectorSession session, ConnectorOutputTableHandle tableHandle, Collection fragments) { HiveOutputTableHandle handle = (HiveOutputTableHandle) tableHandle; - stagingFileCommitter.commitFiles(session, handle.getSchemaName(), handle.getTableName(), getPartitionUpdates(fragments)); + return toCompletableFuture(stagingFileCommitter.commitFiles(session, handle.getSchemaName(), handle.getTableName(), getPartitionUpdates(fragments))); } @Override - public void commitPartition(ConnectorSession session, ConnectorInsertTableHandle tableHandle, Collection fragments) + public CompletableFuture commitPartitionAsync(ConnectorSession session, ConnectorInsertTableHandle tableHandle, Collection fragments) { HiveInsertTableHandle handle = (HiveInsertTableHandle) tableHandle; - stagingFileCommitter.commitFiles(session, handle.getSchemaName(), handle.getTableName(), getPartitionUpdates(fragments)); + return toCompletableFuture(stagingFileCommitter.commitFiles(session, handle.getSchemaName(), handle.getTableName(), getPartitionUpdates(fragments))); } private List buildGrants(SchemaTableName tableName, PrestoPrincipal principal) diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveStagingFileCommitter.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveStagingFileCommitter.java index e7bf5386cd43e..4bf34d3931909 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveStagingFileCommitter.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveStagingFileCommitter.java @@ -16,7 +16,6 @@ import com.facebook.presto.hive.HdfsEnvironment.HdfsContext; import com.facebook.presto.hive.PartitionUpdate.FileWriteInfo; import com.facebook.presto.spi.ConnectorSession; -import com.facebook.presto.spi.PrestoException; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import org.apache.hadoop.fs.FileSystem; @@ -27,10 +26,10 @@ import java.util.ArrayList; import java.util.List; -import static com.facebook.airlift.concurrent.MoreFutures.getFutureValue; import static com.facebook.presto.hive.metastore.MetastoreUtil.getFileSystem; import static com.facebook.presto.hive.metastore.MetastoreUtil.renameFile; import static com.google.common.base.Preconditions.checkState; +import static com.google.common.util.concurrent.Futures.catching; import static com.google.common.util.concurrent.Futures.whenAllSucceed; import static com.google.common.util.concurrent.MoreExecutors.directExecutor; import static java.util.Objects.requireNonNull; @@ -51,10 +50,10 @@ public HiveStagingFileCommitter( } @Override - public void commitFiles(ConnectorSession session, String schemaName, String tableName, List partitionUpdates) + public ListenableFuture commitFiles(ConnectorSession session, String schemaName, String tableName, List partitionUpdates) { HdfsContext context = new HdfsContext(session, schemaName, tableName); - List> commitFutures = new ArrayList<>(); + List> commitFutures = new ArrayList<>(); for (PartitionUpdate partitionUpdate : partitionUpdates) { Path path = partitionUpdate.getWritePath(); @@ -63,17 +62,22 @@ public void commitFiles(ConnectorSession session, String schemaName, String tabl checkState(!fileWriteInfo.getWriteFileName().equals(fileWriteInfo.getTargetFileName())); Path source = new Path(path, fileWriteInfo.getWriteFileName()); Path target = new Path(path, fileWriteInfo.getTargetFileName()); - commitFutures.add(fileRenameExecutor.submit(() -> renameFile(fileSystem, source, target))); + commitFutures.add(fileRenameExecutor.submit(() -> { + renameFile(fileSystem, source, target); + return null; + })); } } - ListenableFuture listenableFutureAggregate = whenAllSucceed(commitFutures).call(() -> null, directExecutor()); - try { - getFutureValue(listenableFutureAggregate, PrestoException.class); - } - catch (RuntimeException e) { - listenableFutureAggregate.cancel(true); - throw e; - } + ListenableFuture result = whenAllSucceed(commitFutures).call(() -> null, directExecutor()); + return catching( + result, + RuntimeException.class, + e -> { + checkState(e != null, "Null exception is caught during commitFiles"); + result.cancel(true); + throw e; + }, + directExecutor()); } } diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/StagingFileCommitter.java b/presto-hive/src/main/java/com/facebook/presto/hive/StagingFileCommitter.java index cbd26a287b75a..0addf9d87cde5 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/StagingFileCommitter.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/StagingFileCommitter.java @@ -14,10 +14,11 @@ package com.facebook.presto.hive; import com.facebook.presto.spi.ConnectorSession; +import com.google.common.util.concurrent.ListenableFuture; import java.util.List; public interface StagingFileCommitter { - void commitFiles(ConnectorSession session, String schemaName, String tableName, List partitionUpdates); + ListenableFuture commitFiles(ConnectorSession session, String schemaName, String tableName, List partitionUpdates); } diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/AbstractTestHiveClient.java b/presto-hive/src/test/java/com/facebook/presto/hive/AbstractTestHiveClient.java index a6ba1f4d737cf..4c09cba02239f 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/AbstractTestHiveClient.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/AbstractTestHiveClient.java @@ -3614,7 +3614,7 @@ protected void doCreateTable(SchemaTableName tableName, HiveStorageFormat storag if (pageSinkProperties.isPartitionCommitRequired()) { assertValidPartitionCommitFragments(fragments); - metadata.commitPartition(session, outputHandle, fragments); + metadata.commitPartitionAsync(session, outputHandle, fragments).get(); } // verify all new files start with the unique prefix @@ -3783,7 +3783,7 @@ private void doInsert(HiveStorageFormat storageFormat, SchemaTableName tableName Collection fragments = getFutureValue(sink.finish()); if (pageSinkProperties.isPartitionCommitRequired()) { assertValidPartitionCommitFragments(fragments); - metadata.commitPartition(session, insertTableHandle, fragments); + metadata.commitPartitionAsync(session, insertTableHandle, fragments).get(); } metadata.finishInsert(session, insertTableHandle, fragments, ImmutableList.of()); @@ -4006,7 +4006,7 @@ private void doInsertIntoNewPartition(HiveStorageFormat storageFormat, SchemaTab Collection fragments = getFutureValue(sink.finish()); if (pageSinkProperties.isPartitionCommitRequired()) { assertValidPartitionCommitFragments(fragments); - metadata.commitPartition(session, insertTableHandle, fragments); + metadata.commitPartitionAsync(session, insertTableHandle, fragments).get(); } metadata.finishInsert(session, insertTableHandle, fragments, ImmutableList.of()); @@ -4125,7 +4125,7 @@ private void doInsertIntoExistingPartition(HiveStorageFormat storageFormat, Sche Collection fragments = getFutureValue(sink.finish()); if (pageSinkProperties.isPartitionCommitRequired()) { assertValidPartitionCommitFragments(fragments); - metadata.commitPartition(session, insertTableHandle, fragments); + metadata.commitPartitionAsync(session, insertTableHandle, fragments).get(); } metadata.finishInsert(session, insertTableHandle, fragments, ImmutableList.of()); diff --git a/presto-main/src/main/java/com/facebook/presto/metadata/Metadata.java b/presto-main/src/main/java/com/facebook/presto/metadata/Metadata.java index e9dd24fa64d67..e81b5f90a9d4c 100644 --- a/presto-main/src/main/java/com/facebook/presto/metadata/Metadata.java +++ b/presto-main/src/main/java/com/facebook/presto/metadata/Metadata.java @@ -43,6 +43,7 @@ import com.facebook.presto.spi.type.TypeManager; import com.facebook.presto.spi.type.TypeSignature; import com.facebook.presto.sql.planner.PartitioningHandle; +import com.google.common.util.concurrent.ListenableFuture; import io.airlift.slice.Slice; import java.util.Collection; @@ -418,13 +419,13 @@ public interface Metadata * Commits partition for table creation. */ @Experimental - void commitPartition(Session session, OutputTableHandle tableHandle, Collection fragments); + ListenableFuture commitPartitionAsync(Session session, OutputTableHandle tableHandle, Collection fragments); /** * Commits partition for table insertion. */ @Experimental - void commitPartition(Session session, InsertTableHandle tableHandle, Collection fragments); + ListenableFuture commitPartitionAsync(Session session, InsertTableHandle tableHandle, Collection fragments); FunctionManager getFunctionManager(); diff --git a/presto-main/src/main/java/com/facebook/presto/metadata/MetadataManager.java b/presto-main/src/main/java/com/facebook/presto/metadata/MetadataManager.java index 9b686dd76235f..6b7e9bbf09692 100644 --- a/presto-main/src/main/java/com/facebook/presto/metadata/MetadataManager.java +++ b/presto-main/src/main/java/com/facebook/presto/metadata/MetadataManager.java @@ -73,6 +73,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Multimap; +import com.google.common.util.concurrent.ListenableFuture; import io.airlift.slice.Slice; import javax.inject.Inject; @@ -92,6 +93,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import static com.facebook.airlift.concurrent.MoreFutures.toListenableFuture; import static com.facebook.presto.expressions.LogicalRowExpressions.FALSE_CONSTANT; import static com.facebook.presto.metadata.QualifiedObjectName.convertFromSchemaTableName; import static com.facebook.presto.metadata.TableLayout.fromConnectorLayout; @@ -1156,25 +1158,25 @@ public List listTablePrivileges(Session session, QualifiedTablePrefix } @Override - public void commitPartition(Session session, OutputTableHandle tableHandle, Collection fragments) + public ListenableFuture commitPartitionAsync(Session session, OutputTableHandle tableHandle, Collection fragments) { ConnectorId connectorId = tableHandle.getConnectorId(); CatalogMetadata catalogMetadata = getCatalogMetadata(session, connectorId); ConnectorMetadata metadata = catalogMetadata.getMetadata(); ConnectorSession connectorSession = session.toConnectorSession(connectorId); - metadata.commitPartition(connectorSession, tableHandle.getConnectorHandle(), fragments); + return toListenableFuture(metadata.commitPartitionAsync(connectorSession, tableHandle.getConnectorHandle(), fragments)); } @Override - public void commitPartition(Session session, InsertTableHandle tableHandle, Collection fragments) + public ListenableFuture commitPartitionAsync(Session session, InsertTableHandle tableHandle, Collection fragments) { ConnectorId connectorId = tableHandle.getConnectorId(); CatalogMetadata catalogMetadata = getCatalogMetadata(session, connectorId); ConnectorMetadata metadata = catalogMetadata.getMetadata(); ConnectorSession connectorSession = session.toConnectorSession(connectorId); - metadata.commitPartition(connectorSession, tableHandle.getConnectorHandle(), fragments); + return toListenableFuture(metadata.commitPartitionAsync(connectorSession, tableHandle.getConnectorHandle(), fragments)); } @Override diff --git a/presto-main/src/main/java/com/facebook/presto/operator/TableFinishOperator.java b/presto-main/src/main/java/com/facebook/presto/operator/TableFinishOperator.java index 31bd78d8e833e..c89b99e4781f0 100644 --- a/presto-main/src/main/java/com/facebook/presto/operator/TableFinishOperator.java +++ b/presto-main/src/main/java/com/facebook/presto/operator/TableFinishOperator.java @@ -20,6 +20,7 @@ import com.facebook.presto.operator.OperationTimer.OperationTiming; import com.facebook.presto.spi.Page; import com.facebook.presto.spi.PageBuilder; +import com.facebook.presto.spi.PrestoException; import com.facebook.presto.spi.block.Block; import com.facebook.presto.spi.connector.ConnectorOutputMetadata; import com.facebook.presto.spi.plan.PlanNodeId; @@ -31,12 +32,14 @@ import com.google.common.util.concurrent.ListenableFuture; import io.airlift.slice.Slice; +import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.concurrent.ExecutionException; import java.util.stream.Stream; import static com.facebook.presto.SystemSessionProperties.isStatisticsCpuTimerEnabled; @@ -48,9 +51,13 @@ import static com.facebook.presto.spi.type.VarbinaryType.VARBINARY; import static com.google.common.base.MoreObjects.toStringHelper; import static com.google.common.base.Preconditions.checkState; +import static com.google.common.base.Throwables.propagateIfPossible; import static com.google.common.base.Verify.verify; import static com.google.common.collect.ImmutableList.toImmutableList; +import static com.google.common.util.concurrent.Futures.whenAllSucceed; +import static com.google.common.util.concurrent.MoreExecutors.directExecutor; import static io.airlift.units.Duration.succinctNanos; +import static java.lang.Thread.currentThread; import static java.util.Objects.requireNonNull; public class TableFinishOperator @@ -241,6 +248,7 @@ public Page getOutput() } state = State.FINISHED; + lifespanAndStageStateTracker.waitForAllLifespanCommitted(); outputMetadata = tableFinisher.finishTable(lifespanAndStageStateTracker.getFinalFragments(), computedStatisticsBuilder.build()); // output page will only be constructed once, @@ -293,7 +301,7 @@ public interface TableFinisher public interface LifespanCommitter { - void commitLifespan(Collection fragments); + ListenableFuture commitLifespan(Collection fragments); } // A lifespan in a stage defines the unit for commit and recovery in recoverable grouped execution @@ -307,12 +315,31 @@ private static class LifespanAndStageStateTracker private final Map committedRecoverableLifespanAndStages = new HashMap<>(); private final LifespanCommitter lifespanCommitter; + private final List> commitFutures = new ArrayList<>(); LifespanAndStageStateTracker(LifespanCommitter lifespanCommitter) { this.lifespanCommitter = requireNonNull(lifespanCommitter, "lifespanCommitter is null"); } + public void waitForAllLifespanCommitted() + { + ListenableFuture future = whenAllSucceed(commitFutures).call(() -> null, directExecutor()); + try { + future.get(); + } + catch (InterruptedException e) { + future.cancel(true); + currentThread().interrupt(); + throw new RuntimeException(e); + } + catch (ExecutionException e) { + future.cancel(true); + propagateIfPossible(e.getCause(), PrestoException.class); + throw new RuntimeException(e.getCause()); + } + } + public void update(Page page, TableCommitContext tableCommitContext) { LifespanAndStage lifespanAndStage = LifespanAndStage.fromTableCommitContext(tableCommitContext); @@ -336,7 +363,7 @@ public void update(Page page, TableCommitContext tableCommitContext) LifespanAndStageState lifespanAndStageState = lifespanStageStatesPerTask.get(tableCommitContext.getTaskId()); committedRecoverableLifespanAndStages.put(lifespanAndStage, lifespanAndStageState); uncommittedRecoverableLifespanAndStageStates.remove(lifespanAndStage); - lifespanCommitter.commitLifespan(lifespanAndStageState.getFragments()); + commitFutures.add(lifespanCommitter.commitLifespan(lifespanAndStageState.getFragments())); } } diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/LocalExecutionPlanner.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/LocalExecutionPlanner.java index 415e230ea9b10..c6c9dbd3ff2db 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/LocalExecutionPlanner.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/LocalExecutionPlanner.java @@ -2869,10 +2869,10 @@ private static LifespanCommitter createLifespanCommitter(Session session, Metada { return fragments -> { if (target instanceof CreateHandle) { - metadata.commitPartition(session, ((CreateHandle) target).getHandle(), fragments); + return metadata.commitPartitionAsync(session, ((CreateHandle) target).getHandle(), fragments); } else if (target instanceof InsertHandle) { - metadata.commitPartition(session, ((InsertHandle) target).getHandle(), fragments); + return metadata.commitPartitionAsync(session, ((InsertHandle) target).getHandle(), fragments); } else { throw new AssertionError("Unhandled target type: " + target.getClass().getName()); diff --git a/presto-main/src/test/java/com/facebook/presto/metadata/AbstractMockMetadata.java b/presto-main/src/test/java/com/facebook/presto/metadata/AbstractMockMetadata.java index 4a380ca74cedf..58efad33b52d5 100644 --- a/presto-main/src/test/java/com/facebook/presto/metadata/AbstractMockMetadata.java +++ b/presto-main/src/test/java/com/facebook/presto/metadata/AbstractMockMetadata.java @@ -39,6 +39,7 @@ import com.facebook.presto.spi.type.TypeManager; import com.facebook.presto.spi.type.TypeSignature; import com.facebook.presto.sql.planner.PartitioningHandle; +import com.google.common.util.concurrent.ListenableFuture; import io.airlift.slice.Slice; import java.util.Collection; @@ -471,13 +472,13 @@ public List listTablePrivileges(Session session, QualifiedTablePrefix } @Override - public void commitPartition(Session session, OutputTableHandle tableHandle, Collection fragments) + public ListenableFuture commitPartitionAsync(Session session, OutputTableHandle tableHandle, Collection fragments) { throw new UnsupportedOperationException(); } @Override - public void commitPartition(Session session, InsertTableHandle tableHandle, Collection fragments) + public ListenableFuture commitPartitionAsync(Session session, InsertTableHandle tableHandle, Collection fragments) { throw new UnsupportedOperationException(); } diff --git a/presto-main/src/test/java/com/facebook/presto/operator/TestTableFinishOperator.java b/presto-main/src/test/java/com/facebook/presto/operator/TestTableFinishOperator.java index 99808a7fa6700..d3960e17d01a1 100644 --- a/presto-main/src/test/java/com/facebook/presto/operator/TestTableFinishOperator.java +++ b/presto-main/src/test/java/com/facebook/presto/operator/TestTableFinishOperator.java @@ -33,6 +33,7 @@ import com.facebook.presto.sql.planner.plan.StatisticAggregationsDescriptor; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.ListenableFuture; import io.airlift.slice.Slice; import io.airlift.slice.Slices; import org.testng.annotations.AfterClass; @@ -60,6 +61,7 @@ import static com.facebook.presto.testing.TestingTaskContext.createTaskContext; import static com.google.common.base.Preconditions.checkState; import static com.google.common.collect.Iterables.getOnlyElement; +import static com.google.common.util.concurrent.Futures.immediateFuture; import static java.util.concurrent.Executors.newScheduledThreadPool; import static java.util.concurrent.TimeUnit.NANOSECONDS; import static org.assertj.core.api.Assertions.assertThat; @@ -304,9 +306,10 @@ private static class TestingLifespanCommitter private List> fragmentsList = new ArrayList<>(); @Override - public void commitLifespan(Collection fragments) + public ListenableFuture commitLifespan(Collection fragments) { fragmentsList.add(fragments); + return immediateFuture(null); } public List> getCommittedFragments() diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/connector/ConnectorMetadata.java b/presto-spi/src/main/java/com/facebook/presto/spi/connector/ConnectorMetadata.java index 449461acea96b..68b63fa2bc7c5 100644 --- a/presto-spi/src/main/java/com/facebook/presto/spi/connector/ConnectorMetadata.java +++ b/presto-spi/src/main/java/com/facebook/presto/spi/connector/ConnectorMetadata.java @@ -51,6 +51,7 @@ import java.util.Optional; import java.util.OptionalLong; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; import static com.facebook.presto.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR; @@ -661,7 +662,7 @@ default List listTablePrivileges(ConnectorSession session, SchemaTabl * This method is unstable and subject to change in the future. */ @Experimental - default void commitPartition(ConnectorSession session, ConnectorOutputTableHandle tableHandle, Collection fragments) + default CompletableFuture commitPartitionAsync(ConnectorSession session, ConnectorOutputTableHandle tableHandle, Collection fragments) { throw new PrestoException(NOT_SUPPORTED, "This connector does not support partition commit"); } @@ -672,7 +673,7 @@ default void commitPartition(ConnectorSession session, ConnectorOutputTableHandl * This method is unstable and subject to change in the future. */ @Experimental - default void commitPartition(ConnectorSession session, ConnectorInsertTableHandle tableHandle, Collection fragments) + default CompletableFuture commitPartitionAsync(ConnectorSession session, ConnectorInsertTableHandle tableHandle, Collection fragments) { throw new PrestoException(NOT_SUPPORTED, "This connector does not support partition commit"); } diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/connector/classloader/ClassLoaderSafeConnectorMetadata.java b/presto-spi/src/main/java/com/facebook/presto/spi/connector/classloader/ClassLoaderSafeConnectorMetadata.java index 9aaa013427017..e935a45f6e009 100644 --- a/presto-spi/src/main/java/com/facebook/presto/spi/connector/classloader/ClassLoaderSafeConnectorMetadata.java +++ b/presto-spi/src/main/java/com/facebook/presto/spi/connector/classloader/ClassLoaderSafeConnectorMetadata.java @@ -54,6 +54,7 @@ import java.util.Optional; import java.util.OptionalLong; import java.util.Set; +import java.util.concurrent.CompletableFuture; import static java.util.Objects.requireNonNull; @@ -594,18 +595,18 @@ public List listTablePrivileges(ConnectorSession session, SchemaTable } @Override - public void commitPartition(ConnectorSession session, ConnectorOutputTableHandle tableHandle, Collection fragments) + public CompletableFuture commitPartitionAsync(ConnectorSession session, ConnectorOutputTableHandle tableHandle, Collection fragments) { try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) { - delegate.commitPartition(session, tableHandle, fragments); + return delegate.commitPartitionAsync(session, tableHandle, fragments); } } @Override - public void commitPartition(ConnectorSession session, ConnectorInsertTableHandle tableHandle, Collection fragments) + public CompletableFuture commitPartitionAsync(ConnectorSession session, ConnectorInsertTableHandle tableHandle, Collection fragments) { try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) { - delegate.commitPartition(session, tableHandle, fragments); + return delegate.commitPartitionAsync(session, tableHandle, fragments); } } }