Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions presto-docs/src/main/sphinx/admin/verifier.rst
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,10 @@ Name Description
are emitted to ``stdout``.
``control.table-prefix`` The table prefix to be appended to the control target table.
``test.table-prefix`` The table prefix to be appended to the test target table.
``control.reuse-table`` If ``true``, reuse the output table of the control source Insert and CreateTableAsSelect
query. Otherwise, run the control source query and write to a temporary table.
``test.reuse-table`` If ``true``, reuse the output table of the test source Insert and CreateTableAsSelect
query. Otherwise, run the test source query and write to a temporary table.
``test-id`` A string to be attached to output events.
``max-concurrency`` Maximum number of concurrent verifications.
``suite-repetition`` How many times a suite is verified.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,11 @@ public class QueryInfo
private final String bucketChecksumQuery;
private final String jsonPlan;
private final String outputTableName;

private final boolean isReuseTable;
private final Double cpuTimeSecs;
private final Double wallTimeSecs;
private final Long peakTotalMemoryBytes;
private final Long peakTaskTotalMemoryBytes;

private final String extraStats;

public QueryInfo(
Expand All @@ -79,7 +78,8 @@ public QueryInfo(
Optional<String> bucketChecksumQuery,
Optional<String> jsonPlan,
Optional<QueryActionStats> queryActionStats,
Optional<String> outputTableName)
Optional<String> outputTableName,
boolean isReuseTable)
{
Optional<QueryStats> stats = queryActionStats.flatMap(QueryActionStats::getQueryStats);
this.catalog = requireNonNull(catalog, "catalog is null");
Expand All @@ -105,6 +105,7 @@ public QueryInfo(
this.peakTaskTotalMemoryBytes = stats.map(QueryStats::getPeakTaskTotalMemoryBytes).orElse(null);
this.extraStats = queryActionStats.flatMap(QueryActionStats::getExtraStats).orElse(null);
this.outputTableName = outputTableName.orElse(null);
this.isReuseTable = isReuseTable;
}

private static double millisToSeconds(long millis)
Expand Down Expand Up @@ -220,6 +221,12 @@ public String getOutputTableName()
return outputTableName;
}

@EventField
public boolean getIsReuseTable()
{
return isReuseTable;
}

@EventField
public Double getCpuTimeSecs()
{
Expand Down Expand Up @@ -279,6 +286,7 @@ public static class Builder
private Optional<String> jsonPlan = Optional.empty();
private Optional<QueryActionStats> queryActionStats = Optional.empty();
private Optional<String> outputTableName = Optional.empty();
private boolean isReuseTable;

private Builder(
String catalog,
Expand Down Expand Up @@ -376,6 +384,12 @@ public Builder setOutputTableName(Optional<String> outputTableName)
return this;
}

public Builder setIsReuseTable(boolean isReuseTable)
{
this.isReuseTable = isReuseTable;
return this;
}

public QueryInfo build()
{
return new QueryInfo(
Expand All @@ -396,7 +410,8 @@ public QueryInfo build()
bucketChecksumQuery,
jsonPlan,
queryActionStats,
outputTableName);
outputTableName,
isReuseTable);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public enum EventStatus
FAILED,
FAILED_RESOLVED,
SKIPPED,
RESUBMITTED,
}

private final String suite;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,8 @@ public VerificationResult run()
ChecksumQueryContext testChecksumQueryContext = new ChecksumQueryContext();
Optional<R> matchResult = Optional.empty();
Optional<DeterminismAnalysisDetails> determinismAnalysisDetails = Optional.empty();
boolean controlReuseTable = false;
boolean testReuseTable = false;

Optional<PartialVerificationResult> partialResult = Optional.empty();
Optional<Throwable> throwable = Optional.empty();
Expand All @@ -204,52 +206,74 @@ public VerificationResult run()
// Rewrite queries
if (isControlEnabled()) {
control = Optional.of(getQueryRewrite(CONTROL));
controlReuseTable = control.isPresent() && control.get() instanceof QueryObjectBundle && ((QueryObjectBundle) control.get()).isReuseTable();
if (controlReuseTable) {
controlQueryContext.setState(QueryState.REUSE);
controlQueryContext.setMainQueryStats(QueryActionStats.queryIdStats(sourceQuery.getQueryId(CONTROL).get()));
}
Comment on lines +209 to +213
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be clearer to put the logic inside the getQueryRewrite()

}

test = Optional.of(getQueryRewrite(TEST));
testReuseTable = test.isPresent() && test.get() instanceof QueryObjectBundle && ((QueryObjectBundle) test.get()).isReuseTable();
if (testReuseTable) {
testQueryContext.setState(QueryState.REUSE);
testQueryContext.setMainQueryStats(QueryActionStats.queryIdStats(sourceQuery.getQueryId(TEST).get()));
}
Comment on lines +217 to +221
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above


// First run setup queries
if (isControlEnabled()) {
if (isControlEnabled() && !controlReuseTable) {
QueryBundle controlQueryBundle = control.get();
QueryAction controlSetupAction = setupOnMainClusters ? queryActions.getControlAction() : queryActions.getHelperAction();
controlQueryBundle.getSetupQueries().forEach(query -> runAndConsume(
() -> controlSetupAction.execute(query, CONTROL_SETUP),
controlQueryContext::addSetupQuery,
controlQueryContext::setException));
}
QueryBundle testQueryBundle = test.get();
QueryAction testSetupAction = setupOnMainClusters ? queryActions.getTestAction() : queryActions.getHelperAction();
testQueryBundle.getSetupQueries().forEach(query -> runAndConsume(
() -> testSetupAction.execute(query, TEST_SETUP),
testQueryContext::addSetupQuery,
testQueryContext::setException));
if (!testReuseTable) {
QueryBundle testQueryBundle = test.get();
QueryAction testSetupAction = setupOnMainClusters ? queryActions.getTestAction() : queryActions.getHelperAction();
testQueryBundle.getSetupQueries().forEach(query -> runAndConsume(
() -> testSetupAction.execute(query, TEST_SETUP),
testQueryContext::addSetupQuery,
testQueryContext::setException));
}

ListenableFuture<Optional<QueryResult<V>>> controlQueryFuture = immediateFuture(Optional.empty());
ListenableFuture<Optional<QueryResult<V>>> testQueryFuture = immediateFuture(Optional.empty());
// Start control query
if (isControlEnabled()) {
if (isControlEnabled() && !controlReuseTable) {
QueryBundle controlQueryBundle = control.get();
controlQueryFuture = executor.submit(() -> runMainQuery(controlQueryBundle.getQuery(), CONTROL, controlQueryContext));
}

if (!concurrentControlAndTest) {
getFutureValue(controlQueryFuture);
}

// Run test queries
ListenableFuture<Optional<QueryResult<V>>> testQueryFuture = executor.submit(() -> runMainQuery(testQueryBundle.getQuery(), TEST, testQueryContext));
if (!testReuseTable) {
QueryBundle testQueryBundle = test.get();
testQueryFuture = executor.submit(() -> runMainQuery(testQueryBundle.getQuery(), TEST, testQueryContext));
}
controlQueryResult = getFutureValue(controlQueryFuture);

if (QUERY_BANK_MODE.equals(runningMode) && !saveSnapshot) {
controlQueryContext.setState(QueryState.SUCCEEDED);
controlQueryContext.setMainQueryStats(EMPTY_STATS);
}
else if (!skipControl || QUERY_BANK_MODE.equals(runningMode)) {
// saveSnapshot or regular run with skipControl = false
controlQueryContext.setState(QueryState.SUCCEEDED);
if (!controlReuseTable) {
controlQueryContext.setState(QueryState.SUCCEEDED);
}
}
else {
controlQueryContext.setState(NOT_RUN);
}

testQueryResult = getFutureValue(testQueryFuture);
testQueryContext.setState(QueryState.SUCCEEDED);
if (!testReuseTable) {
testQueryContext.setState(QueryState.SUCCEEDED);
}

// Verify results
if (QUERY_BANK_MODE.equals(runningMode) && !saveSnapshot && !skipChecksum) {
Expand All @@ -262,8 +286,13 @@ else if ((isControlEnabled()) && !skipChecksum) {
matchResult = Optional.of(verify(control.get(), test.get(), controlQueryResult, testQueryResult, controlChecksumQueryContext, testChecksumQueryContext));

// Determinism analysis
if (!QUERY_BANK_MODE.equals(runningMode) && matchResult.get().isMismatchPossiblyCausedByNonDeterminism()) {
determinismAnalysisDetails = Optional.of(analyzeDeterminism(control.get(), matchResult.get()));
if (!QUERY_BANK_MODE.equals(runningMode)) {
if ((controlReuseTable || testReuseTable) && matchResult.get().isMismatchPossiblyCausedByReuseOutdatedTable() && verificationContext.getResubmissionCount() < verificationResubmissionLimit) {
return new VerificationResult(this, true, Optional.empty());
}
else if (matchResult.get().isMismatchPossiblyCausedByNonDeterminism()) {
determinismAnalysisDetails = Optional.of(analyzeDeterminism(control.get(), matchResult.get()));
}
}
}

Expand All @@ -287,7 +316,7 @@ else if ((isControlEnabled()) && !skipChecksum) {
}
finally {
if (!smartTeardown
|| testQueryContext.getState() != QueryState.SUCCEEDED
|| !ImmutableList.of(QueryState.SUCCEEDED, QueryState.REUSE).contains(testQueryContext.getState())
|| (partialResult.isPresent() && partialResult.get().getStatus().equals(SUCCEEDED))) {
QueryAction controlTeardownAction = teardownOnMainClusters ? queryActions.getControlAction() : queryActions.getHelperAction();
QueryAction testTeardownAction = teardownOnMainClusters ? queryActions.getTestAction() : queryActions.getHelperAction();
Expand Down Expand Up @@ -347,13 +376,14 @@ private EventStatus getEventStatus(
}

if (skipControl) {
if (testQueryContext.getState() == QueryState.SUCCEEDED) {
if (ImmutableList.of(QueryState.SUCCEEDED, QueryState.REUSE).contains(testQueryContext.getState())) {
return SUCCEEDED;
}
}
else {
if (skipChecksum) {
if (controlQueryContext.getState() == QueryState.SUCCEEDED && testQueryContext.getState() == QueryState.SUCCEEDED) {
if (ImmutableList.of(QueryState.SUCCEEDED, QueryState.REUSE).contains(testQueryContext.getState()) &&
ImmutableList.of(QueryState.SUCCEEDED, QueryState.REUSE).contains(controlQueryContext.getState())) {
return SUCCEEDED;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public CreateTableVerification(
@Override
protected QueryObjectBundle getQueryRewrite(ClusterType clusterType)
{
return queryRewriter.rewriteQuery(getSourceQuery().getQuery(clusterType), clusterType);
return queryRewriter.rewriteQuery(getSourceQuery().getQuery(clusterType), getSourceQuery().getQueryConfiguration(clusterType), clusterType);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public CreateViewVerification(
@Override
protected QueryObjectBundle getQueryRewrite(ClusterType clusterType)
{
return queryRewriter.rewriteQuery(getSourceQuery().getQuery(clusterType), clusterType);
return queryRewriter.rewriteQuery(getSourceQuery().getQuery(clusterType), getSourceQuery().getQueryConfiguration(clusterType), clusterType);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,12 @@ public boolean isMismatchPossiblyCausedByNonDeterminism()
return matchType == ROW_COUNT_MISMATCH || matchType == COLUMN_MISMATCH;
}

@Override
public boolean isMismatchPossiblyCausedByReuseOutdatedTable()
{
return matchType == SCHEMA_MISMATCH || matchType == ROW_COUNT_MISMATCH || matchType == COLUMN_MISMATCH;
}

public MatchType getMatchType()
{
return matchType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,15 +85,17 @@ public DataVerification(
@Override
protected QueryObjectBundle getQueryRewrite(ClusterType clusterType)
{
return queryRewriter.rewriteQuery(getSourceQuery().getQuery(clusterType), clusterType);
return queryRewriter.rewriteQuery(getSourceQuery().getQuery(clusterType), getSourceQuery().getQueryConfiguration(clusterType), clusterType,
getVerificationContext().getResubmissionCount() == 0);
}

@Override
protected void updateQueryInfoWithQueryBundle(QueryInfo.Builder queryInfo, Optional<QueryObjectBundle> queryBundle)
{
super.updateQueryInfoWithQueryBundle(queryInfo, queryBundle);
queryInfo.setQuery(queryBundle.map(bundle -> formatSql(bundle.getQuery(), bundle.getRewrittenFunctionCalls())));
queryInfo.setOutputTableName(queryBundle.map(QueryObjectBundle::getObjectName).map(QualifiedName::toString));
queryInfo.setQuery(queryBundle.map(bundle -> formatSql(bundle.getQuery(), bundle.getRewrittenFunctionCalls())))
.setOutputTableName(queryBundle.map(QueryObjectBundle::getObjectName).map(QualifiedName::toString))
.setIsReuseTable(queryBundle.map(QueryObjectBundle::isReuseTable).orElse(false));
}

@Override
Expand Down Expand Up @@ -174,7 +176,7 @@ protected Optional<String> resolveFailure(
checkState(control.isPresent(), "control is missing");
return failureResolverManager.resolveResultMismatch((DataMatchResult) matchResult.get(), control.get());
}
if (throwable.isPresent() && controlQueryContext.getState() == QueryState.SUCCEEDED) {
if (throwable.isPresent() && ImmutableList.of(QueryState.SUCCEEDED, QueryState.REUSE).contains(controlQueryContext.getState())) {
checkState(controlQueryContext.getMainQueryStats().isPresent(), "controlQueryStats is missing");
return failureResolverManager.resolveException(controlQueryContext.getMainQueryStats().get(), throwable.get(), test);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ public boolean isMismatchPossiblyCausedByNonDeterminism()
return false;
}

@Override
public boolean isMismatchPossiblyCausedByReuseOutdatedTable()
{
return false;
}

@Override
public String getReport()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ private DeterminismAnalysis analyze(QueryObjectBundle control, ChecksumResult co
Map<QueryBundle, DeterminismAnalysisRun.Builder> queryRuns = new HashMap<>();
try {
for (int i = 0; i < maxAnalysisRuns; i++) {
QueryObjectBundle queryBundle = queryRewriter.rewriteQuery(sourceQuery.getQuery(CONTROL), CONTROL);
QueryObjectBundle queryBundle = queryRewriter.rewriteQuery(sourceQuery.getQuery(CONTROL), sourceQuery.getQueryConfiguration(CONTROL), CONTROL, false);
DeterminismAnalysisRun.Builder run = determinismAnalysisDetails.addRun().setTableName(queryBundle.getObjectName().toString());
queryRuns.put(queryBundle, run);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@ public boolean isMismatchPossiblyCausedByNonDeterminism()
return false;
}

@Override
public boolean isMismatchPossiblyCausedByReuseOutdatedTable()
{
return false;
}

@Override
public String getReport()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,7 @@ public interface MatchResult

boolean isMismatchPossiblyCausedByNonDeterminism();

boolean isMismatchPossiblyCausedByReuseOutdatedTable();

String getReport();
}
Loading