Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,7 @@ public static Object[][] testSettings()
return new Object[][] {{1, true}, {2, false}, {2, true}};
}

// Flaky test: https://github.com/prestodb/presto/issues/20272
@Test(timeOut = TEST_TIMEOUT, dataProvider = "testSettings", invocationCount = INVOCATION_COUNT, enabled = false)
@Test(timeOut = TEST_TIMEOUT, dataProvider = "testSettings", invocationCount = INVOCATION_COUNT)
public void testCreateBucketedTable(int writerConcurrency, boolean optimizedPartitionUpdateSerializationEnabled)
throws Exception
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,12 +246,13 @@ public void addInput(Page page)
checkState(state == State.RUNNING, "Operator is %s", state);

TableCommitContext tableCommitContext = getTableCommitContext(page, tableCommitContextCodec);
lifespanAndStageStateTracker.update(page, tableCommitContext);
lifespanAndStageStateTracker.getStatisticsPagesToProcess(page, tableCommitContext).forEach(statisticsPage -> {
OperationTimer timer = new OperationTimer(statisticsCpuTimerEnabled);
statisticsAggregationOperator.addInput(statisticsPage);
timer.end(statisticsTiming);
});
if (lifespanAndStageStateTracker.update(page, tableCommitContext)) {
lifespanAndStageStateTracker.getStatisticsPagesToProcess(page, tableCommitContext).forEach(statisticsPage -> {
OperationTimer timer = new OperationTimer(statisticsCpuTimerEnabled);
statisticsAggregationOperator.addInput(statisticsPage);
timer.end(statisticsTiming);
});
}
if (memoryTrackingEnabled) {
systemMemoryContext.setBytes(operatorRetainedMemoryBytes.get());
}
Expand Down Expand Up @@ -394,7 +395,7 @@ public void commit()
}
}

public void update(Page page, TableCommitContext tableCommitContext)
public boolean update(Page page, TableCommitContext tableCommitContext)
{
LifespanAndStage lifespanAndStage = LifespanAndStage.fromTableCommitContext(tableCommitContext);
PageSinkCommitStrategy commitStrategy = tableCommitContext.getPageSinkCommitStrategy();
Expand All @@ -406,14 +407,14 @@ public void update(Page page, TableCommitContext tableCommitContext)
noCommitUnrecoverableLifespanAndStageStates.computeIfAbsent(
lifespanAndStage, ignored -> new LifespanAndStageState(
tableCommitContext.getTaskId(), operatorRetainedMemoryBytes, false)).update(page);
return;
return true;
}
case TASK_COMMIT: {
// Case 2: Commit is required, but partial recovery is not supported
taskCommitUnrecoverableLifespanAndStageStates.computeIfAbsent(
lifespanAndStage, ignored -> new LifespanAndStageState(
tableCommitContext.getTaskId(), operatorRetainedMemoryBytes, false)).update(page);
return;
return true;
}
case LIFESPAN_COMMIT: {
// Case 2: Lifespan commit is required
Expand All @@ -424,7 +425,7 @@ public void update(Page page, TableCommitContext tableCommitContext)
checkState(
!committedRecoverableLifespanAndStages.get(lifespanAndStage).getTaskId().equals(tableCommitContext.getTaskId()),
"Received page from same task of committed lifespan and stage combination");
return;
return false;
}

// Case 2b: Current (stage, lifespan) combination is not yet committed
Expand All @@ -440,7 +441,7 @@ public void update(Page page, TableCommitContext tableCommitContext)
uncommittedRecoverableLifespanAndStageStates.remove(lifespanAndStage);
commitFutures.add(pageSinkCommitter.commitAsync(lifespanAndStageState.getFragments()));
}
return;
return true;
}
default:
throw new IllegalArgumentException("unexpected commit strategy: " + commitStrategy);
Expand Down