Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.airlift.units.DataSize.succinctBytes;
import static java.lang.String.format;
import static java.util.Arrays.stream;
import static java.util.Objects.requireNonNull;
Expand Down Expand Up @@ -375,6 +376,21 @@ private static String formatJsonFragmentList(List<PlanFragment> fragments, Optio
return new JsonRenderer(functionAndTypeManager).render(fragmentJsonMap.build());
}

public static String formattedFragmentString(StageExecutionStats stageExecutionStats,
double avgPositionsPerTask, double sdAmongTasks, int taskSize)
{
return format("CPU: %s, Scheduled: %s, Input: %s (%s); per task: avg.: %s std.dev.: %s, Output: %s (%s), %s tasks%n",
stageExecutionStats.getTotalCpuTime().convertToMostSuccinctTimeUnit(),
stageExecutionStats.getTotalScheduledTime().convertToMostSuccinctTimeUnit(),
formatPositions(stageExecutionStats.getProcessedInputPositions()),
succinctBytes(stageExecutionStats.getProcessedInputDataSizeInBytes()),
formatDouble(avgPositionsPerTask),
formatDouble(sdAmongTasks),
formatPositions(stageExecutionStats.getOutputPositions()),
succinctBytes(stageExecutionStats.getOutputDataSizeInBytes()),
taskSize);
}

private static String formatFragment(
FunctionAndTypeManager functionAndTypeManager,
Session session,
Expand All @@ -397,16 +413,7 @@ private static String formatFragment(
double sdAmongTasks = Math.sqrt(squaredDifferences / tasks.size());

builder.append(indentString(1))
.append(format("CPU: %s, Scheduled: %s, Input: %s (%s); per task: avg.: %s std.dev.: %s, Output: %s (%s), %s tasks%n",
stageExecutionStats.getTotalCpuTime().convertToMostSuccinctTimeUnit(),
stageExecutionStats.getTotalScheduledTime().convertToMostSuccinctTimeUnit(),
formatPositions(stageExecutionStats.getProcessedInputPositions()),
stageExecutionStats.getProcessedInputDataSizeInBytes(),
formatDouble(avgPositionsPerTask),
formatDouble(sdAmongTasks),
formatPositions(stageExecutionStats.getOutputPositions()),
stageExecutionStats.getOutputDataSizeInBytes(),
tasks.size()));
.append(formattedFragmentString(stageExecutionStats, avgPositionsPerTask, sdAmongTasks, tasks.size()));
}

PartitioningScheme partitioningScheme = fragment.getPartitioningScheme();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,28 @@
*/
package com.facebook.presto.sql.planner.planPrinter;

import com.facebook.airlift.stats.Distribution;
import com.facebook.presto.common.RuntimeMetric;
import com.facebook.presto.common.RuntimeStats;
import com.facebook.presto.common.predicate.Domain;
import com.facebook.presto.common.predicate.Range;
import com.facebook.presto.common.predicate.TupleDomain;
import com.facebook.presto.common.predicate.ValueSet;
import com.facebook.presto.cost.StatsAndCosts;
import com.facebook.presto.execution.StageExecutionStats;
import com.facebook.presto.metadata.FunctionAndTypeManager;
import com.facebook.presto.operator.DynamicFilterStats;
import com.facebook.presto.operator.ExchangeOperator;
import com.facebook.presto.operator.OperatorStats;
import com.facebook.presto.operator.TaskOutputOperator;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorId;
import com.facebook.presto.spi.TableHandle;
import com.facebook.presto.spi.eventlistener.StageGcStatistics;
import com.facebook.presto.spi.plan.Partitioning;
import com.facebook.presto.spi.plan.PartitioningScheme;
import com.facebook.presto.spi.plan.PlanFragmentId;
import com.facebook.presto.spi.plan.PlanNodeId;
import com.facebook.presto.spi.plan.PlanNodeIdAllocator;
import com.facebook.presto.spi.plan.StageExecutionDescriptor;
import com.facebook.presto.spi.plan.TableScanNode;
Expand All @@ -37,21 +47,31 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.airlift.units.Duration;
import org.testng.annotations.Test;

import java.util.List;
import java.util.Optional;

import static com.facebook.presto.SessionTestUtils.TEST_SESSION;
import static com.facebook.presto.common.RuntimeUnit.NONE;
import static com.facebook.presto.common.predicate.TupleDomain.withColumnDomains;
import static com.facebook.presto.common.type.VarcharType.VARCHAR;
import static com.facebook.presto.metadata.AbstractMockMetadata.dummyMetadata;
import static com.facebook.presto.metadata.FunctionAndTypeManager.createTestFunctionAndTypeManager;
import static com.facebook.presto.sql.planner.SystemPartitioningHandle.SOURCE_DISTRIBUTION;
import static com.facebook.presto.sql.planner.planPrinter.PlanPrinter.formattedFragmentString;
import static io.airlift.slice.Slices.utf8Slice;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;

public class TestPlanPrinter
{
private static final String TEST_METRIC_NAME = "test_metric";
private static final RuntimeMetric TEST_RUNTIME_METRIC_1 = new RuntimeMetric(TEST_METRIC_NAME, NONE, 10, 2, 9, 1);
private static final DynamicFilterStats TEST_DYNAMIC_FILTER_STATS_1 = new DynamicFilterStats(ImmutableSet.of(new PlanNodeId("1"), new PlanNodeId("2")));

private static final PlanBuilder PLAN_BUILDER = new PlanBuilder(TEST_SESSION, new PlanNodeIdAllocator(), dummyMetadata());
private static final FunctionAndTypeManager FUNCTION_AND_TYPE_MANAGER = createTestFunctionAndTypeManager();
private static final VariableReferenceExpression COLUMN_VARIABLE = new VariableReferenceExpression(Optional.empty(), "column", VARCHAR);
Expand Down Expand Up @@ -186,4 +206,152 @@ public void testDomainTextFormatting()
Domain.onlyNull(VARCHAR),
"[NULL]");
}

@Test
public void assertFragmentStringReadability()
{
int stageId0 = 0;
int stageExecutionId0 = 1;
List<OperatorStats> pipeline00 = ImmutableList.of(
createOperatorStats(stageId0, stageExecutionId0, 0, 0, new PlanNodeId("101"),
ExchangeOperator.class,
5384L, 100L,
5040L, 100L,
5040L, 100L),
createOperatorStats(stageId0, stageExecutionId0, 0, 1, new PlanNodeId("102"),
TaskOutputOperator.class,
0L, 0L,
5040L, 100L,
5040L, 100L));
StageExecutionStats stageExecutionStats = createStageStats(stageId0, stageExecutionId0,
5384L, 100L,
5040L, 100L,
5040L, 100L,
pipeline00);

String expected = "CPU: 0.00ns, Scheduled: 0.00ns, Input: 100 rows (4.92kB); per task: avg.: 10.10 std.dev.: 21.20, Output: 100 rows (4.92kB), 3 tasks\n";
assertEquals(formattedFragmentString(stageExecutionStats, 10.1, 21.2, 3), expected);
}

private static OperatorStats createOperatorStats(int stageId, int stageExecutionId, int pipelineId,
int operatorId, PlanNodeId planNodeId, Class operatorCls,
long rawInputDataSize, long rawInputPositions,
long inputDataSize, long inputPositions,
long outputDataSize, long outputPositions)
{
return new OperatorStats(
stageId,
stageExecutionId,
pipelineId,
operatorId,
planNodeId,
operatorCls.getSimpleName(),
0L,
0L,
new Duration(0, NANOSECONDS),
new Duration(0, NANOSECONDS),
0,
0L,
new Duration(0, NANOSECONDS),
new Duration(0, NANOSECONDS),
0,
rawInputDataSize,
rawInputPositions,
inputDataSize,
inputPositions,
0.0,
0L,
new Duration(0, NANOSECONDS),
new Duration(0, NANOSECONDS),
0,
outputDataSize,
outputPositions,
0L,
new Duration(0, NANOSECONDS),
new Duration(0, NANOSECONDS),
0L,
new Duration(0, NANOSECONDS),
new Duration(0, NANOSECONDS),
0,
0L,
0L,
0L,
0L,
0L,
0L,
0L,
Optional.empty(),
null,
new RuntimeStats(ImmutableMap.of(TEST_METRIC_NAME, RuntimeMetric.copyOf(TEST_RUNTIME_METRIC_1))),
DynamicFilterStats.copyOf(TEST_DYNAMIC_FILTER_STATS_1),
0,
0,
0,
0);
}

private static StageExecutionStats createStageStats(int stageId, int stageExecutionId, long rawInputDataSize, long rawInputPositions,
long inputDataSize, long inputPositions,
long outputDataSize, long outputPositions,
List<OperatorStats> operatorSummaries)
{
return new StageExecutionStats(
0L,

new Distribution(0).snapshot(),

1,
0,
1,

0,
0,

0,
0,
0,
0,
0,

0.0,
0.0,
0L,
0L,
0L,
0L,

new Duration(0, NANOSECONDS),
new Duration(0, NANOSECONDS),
new Duration(0, NANOSECONDS),
new Duration(0, NANOSECONDS),
false,
ImmutableSet.of(),

0L,

rawInputDataSize,
rawInputPositions,

inputDataSize,
inputPositions,

0L,
outputDataSize,
outputPositions,

0L,

new StageGcStatistics(
stageId,
stageExecutionId,
102,
103,
104,
105,
106,
107),

operatorSummaries,
new RuntimeStats());
}
}
Loading