diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveRecoverableExecution.java b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveRecoverableExecution.java index b73a483a646bc..3fa1507441d48 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveRecoverableExecution.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveRecoverableExecution.java @@ -141,8 +141,7 @@ public static Object[][] testSettings() return new Object[][] {{1, true}, {2, false}, {2, true}}; } - // Flaky test: https://github.com/prestodb/presto/issues/20272 - @Test(timeOut = TEST_TIMEOUT, dataProvider = "testSettings", invocationCount = INVOCATION_COUNT, enabled = false) + @Test(timeOut = TEST_TIMEOUT, dataProvider = "testSettings", invocationCount = INVOCATION_COUNT) public void testCreateBucketedTable(int writerConcurrency, boolean optimizedPartitionUpdateSerializationEnabled) throws Exception { diff --git a/presto-main/src/main/java/com/facebook/presto/operator/TableFinishOperator.java b/presto-main/src/main/java/com/facebook/presto/operator/TableFinishOperator.java index 00f71d244058d..838394b47e52e 100644 --- a/presto-main/src/main/java/com/facebook/presto/operator/TableFinishOperator.java +++ b/presto-main/src/main/java/com/facebook/presto/operator/TableFinishOperator.java @@ -246,12 +246,13 @@ public void addInput(Page page) checkState(state == State.RUNNING, "Operator is %s", state); TableCommitContext tableCommitContext = getTableCommitContext(page, tableCommitContextCodec); - lifespanAndStageStateTracker.update(page, tableCommitContext); - lifespanAndStageStateTracker.getStatisticsPagesToProcess(page, tableCommitContext).forEach(statisticsPage -> { - OperationTimer timer = new OperationTimer(statisticsCpuTimerEnabled); - statisticsAggregationOperator.addInput(statisticsPage); - timer.end(statisticsTiming); - }); + if (lifespanAndStageStateTracker.update(page, tableCommitContext)) { + lifespanAndStageStateTracker.getStatisticsPagesToProcess(page, tableCommitContext).forEach(statisticsPage -> { + OperationTimer timer = new OperationTimer(statisticsCpuTimerEnabled); + statisticsAggregationOperator.addInput(statisticsPage); + timer.end(statisticsTiming); + }); + } if (memoryTrackingEnabled) { systemMemoryContext.setBytes(operatorRetainedMemoryBytes.get()); } @@ -394,7 +395,7 @@ public void commit() } } - public void update(Page page, TableCommitContext tableCommitContext) + public boolean update(Page page, TableCommitContext tableCommitContext) { LifespanAndStage lifespanAndStage = LifespanAndStage.fromTableCommitContext(tableCommitContext); PageSinkCommitStrategy commitStrategy = tableCommitContext.getPageSinkCommitStrategy(); @@ -406,14 +407,14 @@ public void update(Page page, TableCommitContext tableCommitContext) noCommitUnrecoverableLifespanAndStageStates.computeIfAbsent( lifespanAndStage, ignored -> new LifespanAndStageState( tableCommitContext.getTaskId(), operatorRetainedMemoryBytes, false)).update(page); - return; + return true; } case TASK_COMMIT: { // Case 2: Commit is required, but partial recovery is not supported taskCommitUnrecoverableLifespanAndStageStates.computeIfAbsent( lifespanAndStage, ignored -> new LifespanAndStageState( tableCommitContext.getTaskId(), operatorRetainedMemoryBytes, false)).update(page); - return; + return true; } case LIFESPAN_COMMIT: { // Case 2: Lifespan commit is required @@ -424,7 +425,7 @@ public void update(Page page, TableCommitContext tableCommitContext) checkState( !committedRecoverableLifespanAndStages.get(lifespanAndStage).getTaskId().equals(tableCommitContext.getTaskId()), "Received page from same task of committed lifespan and stage combination"); - return; + return false; } // Case 2b: Current (stage, lifespan) combination is not yet committed @@ -440,7 +441,7 @@ public void update(Page page, TableCommitContext tableCommitContext) uncommittedRecoverableLifespanAndStageStates.remove(lifespanAndStage); commitFutures.add(pageSinkCommitter.commitAsync(lifespanAndStageState.getFragments())); } - return; + return true; } default: throw new IllegalArgumentException("unexpected commit strategy: " + commitStrategy);