Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.trino.spi.Page;
import io.trino.spi.block.Block;
import io.trino.spi.block.BlockBuilder;
import io.trino.spi.block.MapBlockBuilder;
import io.trino.spi.connector.ColumnMetadata;
import io.trino.spi.connector.ConnectorPageSource;
import io.trino.spi.connector.ConnectorSession;
Expand Down Expand Up @@ -98,12 +99,13 @@ public ConnectorPageSource pageSource(ConnectorTransactionHandle transactionHand
BIGINT.writeLong(blockBuilders.get("failures"), stats.getFailures());
DOUBLE.writeDouble(blockBuilders.get("average_time"), stats.getTime().getAvg());

BlockBuilder mapWriter = blockBuilders.get("time_distribution_percentiles").beginBlockEntry();
for (Map.Entry<Double, Double> percentile : stats.getTime().getPercentiles().entrySet()) {
DOUBLE.writeDouble(mapWriter, percentile.getKey());
DOUBLE.writeDouble(mapWriter, percentile.getValue());
}
blockBuilders.get("time_distribution_percentiles").closeEntry();
MapBlockBuilder blockBuilder = (MapBlockBuilder) blockBuilders.get("time_distribution_percentiles");
blockBuilder.buildEntry((keyBuilder, valueBuilder) -> {
stats.getTime().getPercentiles().forEach((key, value) -> {
DOUBLE.writeDouble(keyBuilder, key);
DOUBLE.writeDouble(valueBuilder, value);
});
});
}

Block[] blocks = ruleStatsTable.getColumns().stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import static com.google.common.base.Preconditions.checkState;
import static io.airlift.units.DataSize.Unit.MEGABYTE;
import static io.trino.operator.aggregation.builder.InMemoryHashAggregationBuilder.toTypes;
import static io.trino.spi.type.BigintType.BIGINT;
import static io.trino.sql.planner.optimizations.HashGenerationOptimizer.INITIAL_HASH_VALUE;
import static io.trino.type.TypeUtils.NULL_HASH_CODE;
import static java.util.Objects.requireNonNull;
Expand Down Expand Up @@ -572,7 +573,7 @@ private Page getGlobalAggregationOutput()

while (channel < groupByTypes.size()) {
if (channel == groupIdChannel.orElseThrow()) {
output.getBlockBuilder(channel).writeLong(groupId);
BIGINT.writeLong(output.getBlockBuilder(channel), groupId);
}
else {
output.getBlockBuilder(channel).appendNull();
Expand All @@ -582,7 +583,7 @@ private Page getGlobalAggregationOutput()

if (hashChannel.isPresent()) {
long hashValue = calculateDefaultOutputHash(groupByTypes, groupIdChannel.orElseThrow(), groupId);
output.getBlockBuilder(channel).writeLong(hashValue);
BIGINT.writeLong(output.getBlockBuilder(channel), hashValue);
channel++;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import io.trino.spi.block.Block;
import io.trino.spi.block.BlockBuilder;
import io.trino.spi.block.ColumnarRow;
import io.trino.spi.block.RowBlockBuilder;
import io.trino.spi.block.RowValueBuilder;
import io.trino.spi.function.AccumulatorState;
import io.trino.spi.function.AccumulatorStateFactory;
import io.trino.spi.function.AccumulatorStateSerializer;
Expand Down Expand Up @@ -73,6 +75,7 @@
import static io.trino.sql.gen.Bootstrap.BOOTSTRAP_METHOD;
import static io.trino.sql.gen.BytecodeUtils.invoke;
import static io.trino.sql.gen.BytecodeUtils.loadConstant;
import static io.trino.sql.gen.LambdaMetafactoryGenerator.generateMetafactory;
import static io.trino.util.CompilerUtils.defineClass;
import static io.trino.util.CompilerUtils.makeClassName;
import static java.lang.String.format;
Expand Down Expand Up @@ -776,18 +779,13 @@ private static void generateGroupedEvaluateIntermediate(ClassDefinition definiti
.ret();
}
else {
Variable rowBuilder = method.getScope().declareVariable(BlockBuilder.class, "rowBuilder");
body.append(rowBuilder.set(out.invoke("beginBlockEntry", BlockBuilder.class)));

for (StateFieldAndDescriptor stateFieldAndDescriptor : stateFieldAndDescriptors) {
BytecodeExpression stateSerializer = thisVariable.getField(stateFieldAndDescriptor.getStateSerializerField());
BytecodeExpression state = thisVariable.getField(stateFieldAndDescriptor.getStateField());

body.append(state.invoke("setGroupId", void.class, groupId.cast(long.class)))
.append(stateSerializer.invoke("serialize", void.class, state.cast(AccumulatorState.class), rowBuilder));
body.append(state.invoke("setGroupId", void.class, groupId.cast(long.class)));
}
body.append(out.invoke("closeEntry", BlockBuilder.class).pop())
.ret();

generateSerializeState(definition, stateFieldAndDescriptors, out, thisVariable, body);
body.ret();
}
}

Expand Down Expand Up @@ -818,17 +816,36 @@ private static void generateEvaluateIntermediate(ClassDefinition definition, Lis
.ret();
}
else {
Variable rowBuilder = method.getScope().declareVariable(BlockBuilder.class, "rowBuilder");
body.append(rowBuilder.set(out.invoke("beginBlockEntry", BlockBuilder.class)));
generateSerializeState(definition, stateFieldAndDescriptors, out, thisVariable, body);
body.ret();
}
}

for (StateFieldAndDescriptor stateFieldAndDescriptor : stateFieldAndDescriptors) {
BytecodeExpression stateSerializer = thisVariable.getField(stateFieldAndDescriptor.getStateSerializerField());
BytecodeExpression state = thisVariable.getField(stateFieldAndDescriptor.getStateField());
body.append(stateSerializer.invoke("serialize", void.class, state.cast(AccumulatorState.class), rowBuilder));
}
body.append(out.invoke("closeEntry", BlockBuilder.class).pop())
.ret();
private static void generateSerializeState(ClassDefinition definition, List<StateFieldAndDescriptor> stateFieldAndDescriptors, Parameter out, Variable thisVariable, BytecodeBlock body)
{
MethodDefinition serializeState = generateSerializeStateMethod(definition, stateFieldAndDescriptors);

BytecodeExpression rowEntryBuilder = generateMetafactory(RowValueBuilder.class, serializeState, ImmutableList.of(thisVariable));
body.append(out.cast(RowBlockBuilder.class).invoke("buildEntry", void.class, rowEntryBuilder));
}

private static MethodDefinition generateSerializeStateMethod(ClassDefinition definition, List<StateFieldAndDescriptor> stateFieldAndDescriptors)
{
Parameter fieldBuilders = arg("fieldBuilders", type(List.class, BlockBuilder.class));
MethodDefinition method = definition.declareMethod(a(PRIVATE), "serializeState", type(void.class), fieldBuilders);

Variable thisVariable = method.getThis();
BytecodeBlock body = method.getBody();

for (int i = 0; i < stateFieldAndDescriptors.size(); i++) {
StateFieldAndDescriptor stateFieldAndDescriptor = stateFieldAndDescriptors.get(i);
BytecodeExpression stateSerializer = thisVariable.getField(stateFieldAndDescriptor.getStateSerializerField());
BytecodeExpression state = thisVariable.getField(stateFieldAndDescriptor.getStateField());
BytecodeExpression fieldBuilder = fieldBuilders.invoke("get", Object.class, constantInt(i)).cast(BlockBuilder.class);
body.append(stateSerializer.invoke("serialize", void.class, state.cast(AccumulatorState.class), fieldBuilder));
}
body.ret();
return method;
}

private static void generateGroupedEvaluateFinal(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.google.common.primitives.Doubles;
import io.airlift.stats.TDigest;
import io.trino.operator.aggregation.state.TDigestAndPercentileArrayState;
import io.trino.spi.block.ArrayBlockBuilder;
import io.trino.spi.block.Block;
import io.trino.spi.block.BlockBuilder;
import io.trino.spi.function.AggregationFunction;
Expand Down Expand Up @@ -95,14 +96,12 @@ public static void output(@AggregationState TDigestAndPercentileArrayState state
return;
}

BlockBuilder blockBuilder = out.beginBlockEntry();

List<Double> valuesAtPercentiles = valuesAtPercentiles(digest, percentiles);
for (double value : valuesAtPercentiles) {
DOUBLE.writeDouble(blockBuilder, value);
}

out.closeEntry();
((ArrayBlockBuilder) out).buildEntry(elementBuilder -> {
for (double value : valuesAtPercentiles) {
DOUBLE.writeDouble(elementBuilder, value);
}
});
}

public static List<Double> valuesAtPercentiles(TDigest digest, List<Double> percentiles)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import io.airlift.stats.TDigest;
import io.trino.operator.aggregation.state.TDigestAndPercentileArrayState;
import io.trino.spi.block.ArrayBlockBuilder;
import io.trino.spi.block.Block;
import io.trino.spi.block.BlockBuilder;
import io.trino.spi.function.AggregationFunction;
Expand Down Expand Up @@ -65,13 +66,11 @@ public static void output(@AggregationState TDigestAndPercentileArrayState state
return;
}

BlockBuilder blockBuilder = out.beginBlockEntry();

List<Double> valuesAtPercentiles = valuesAtPercentiles(digest, percentiles);
for (double value : valuesAtPercentiles) {
BIGINT.writeLong(blockBuilder, Math.round(value));
}

out.closeEntry();
((ArrayBlockBuilder) out).buildEntry(elementBuilder -> {
List<Double> valuesAtPercentiles = valuesAtPercentiles(digest, percentiles);
for (double value : valuesAtPercentiles) {
BIGINT.writeLong(elementBuilder, Math.round(value));
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import io.airlift.stats.TDigest;
import io.trino.operator.aggregation.state.TDigestAndPercentileArrayState;
import io.trino.spi.block.ArrayBlockBuilder;
import io.trino.spi.block.Block;
import io.trino.spi.block.BlockBuilder;
import io.trino.spi.function.AggregationFunction;
Expand Down Expand Up @@ -66,13 +67,11 @@ public static void output(@AggregationState TDigestAndPercentileArrayState state
return;
}

BlockBuilder blockBuilder = out.beginBlockEntry();

List<Double> valuesAtPercentiles = valuesAtPercentiles(digest, percentiles);
for (double value : valuesAtPercentiles) {
REAL.writeLong(blockBuilder, floatToRawIntBits((float) value));
}

out.closeEntry();
((ArrayBlockBuilder) out).buildEntry(elementBuilder -> {
for (double value : valuesAtPercentiles) {
REAL.writeLong(elementBuilder, floatToRawIntBits((float) value));
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.trino.operator.aggregation;

import io.trino.spi.block.BlockBuilder;
import io.trino.spi.block.MapBlockBuilder;
import io.trino.spi.function.AccumulatorState;
import io.trino.spi.function.AccumulatorStateMetadata;
import io.trino.spi.function.AggregationFunction;
Expand Down Expand Up @@ -94,12 +95,10 @@ public static void output(@AggregationState State state, BlockBuilder out)
out.appendNull();
}
else {
BlockBuilder entryBuilder = out.beginBlockEntry();
state.get().forEachBucket((key, value) -> {
BigintType.BIGINT.writeLong(entryBuilder, key);
BigintType.BIGINT.writeLong(entryBuilder, value);
});
out.closeEntry();
((MapBlockBuilder) out).buildEntry((keyBuilder, valueBuilder) -> state.get().forEachBucket((key, value) -> {
BigintType.BIGINT.writeLong(keyBuilder, key);
BigintType.BIGINT.writeLong(valueBuilder, value);
}));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.trino.operator.aggregation.state.LongDecimalWithOverflowState;
import io.trino.spi.block.Block;
import io.trino.spi.block.BlockBuilder;
import io.trino.spi.block.Int128ArrayBlockBuilder;
import io.trino.spi.function.AggregationFunction;
import io.trino.spi.function.AggregationState;
import io.trino.spi.function.BlockIndex;
Expand Down Expand Up @@ -129,9 +130,7 @@ public static void outputDecimal(@AggregationState LongDecimalWithOverflowState
long rawLow = decimal[offset + 1];

Decimals.throwIfOverflows(rawHigh, rawLow);
out.writeLong(rawHigh);
out.writeLong(rawLow);
out.closeEntry();
((Int128ArrayBlockBuilder) out).writeInt128(rawHigh, rawLow);
}
else {
out.appendNull();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import io.trino.operator.aggregation.state.DoubleHistogramStateSerializer;
import io.trino.spi.block.BlockBuilder;
import io.trino.spi.block.MapBlockBuilder;
import io.trino.spi.function.AccumulatorState;
import io.trino.spi.function.AccumulatorStateMetadata;
import io.trino.spi.function.AggregationFunction;
Expand Down Expand Up @@ -92,13 +93,12 @@ public static void output(@AggregationState State state, BlockBuilder out)
}
else {
Map<Double, Double> value = state.get().getBuckets();

BlockBuilder entryBuilder = out.beginBlockEntry();
for (Map.Entry<Double, Double> entry : value.entrySet()) {
DoubleType.DOUBLE.writeDouble(entryBuilder, entry.getKey());
DoubleType.DOUBLE.writeDouble(entryBuilder, entry.getValue());
}
out.closeEntry();
((MapBlockBuilder) out).buildEntry((keyBuilder, valueBuilder) -> {
for (Map.Entry<Double, Double> entry : value.entrySet()) {
DoubleType.DOUBLE.writeDouble(keyBuilder, entry.getKey());
DoubleType.DOUBLE.writeDouble(valueBuilder, entry.getValue());
}
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.trino.spi.TrinoException;
import io.trino.spi.block.Block;
import io.trino.spi.block.BlockBuilder;
import io.trino.spi.block.MapBlockBuilder;
import io.trino.spi.type.Type;
import io.trino.type.BlockTypeOperators.BlockPositionEqual;
import io.trino.type.BlockTypeOperators.BlockPositionHashCode;
Expand Down Expand Up @@ -126,12 +127,12 @@ private void deserialize(Block block)

public void serialize(BlockBuilder out)
{
BlockBuilder mapBlockBuilder = out.beginBlockEntry();
for (int i = 0; i < keyBlockBuilder.getPositionCount(); i++) {
keyType.appendTo(keyBlockBuilder, i, mapBlockBuilder);
valueType.appendTo(valueBlockBuilder, i, mapBlockBuilder);
}
out.closeEntry();
((MapBlockBuilder) out).buildEntry((keyBuilder, valueBuilder) -> {
for (int i = 0; i < keyBlockBuilder.getPositionCount(); i++) {
keyType.appendTo(keyBlockBuilder, i, keyBuilder);
valueType.appendTo(valueBlockBuilder, i, valueBuilder);
}
});
}

public long estimatedInMemorySize()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.trino.operator.aggregation;

import io.trino.spi.block.BlockBuilder;
import io.trino.spi.block.MapBlockBuilder;
import io.trino.spi.function.AggregationFunction;
import io.trino.spi.function.AggregationState;
import io.trino.spi.function.CombineFunction;
Expand Down Expand Up @@ -59,12 +60,12 @@ public static void output(@AggregationState DoubleHistogramAggregation.State sta
}
else {
Map<Double, Double> value = state.get().getBuckets();
BlockBuilder entryBuilder = out.beginBlockEntry();
for (Map.Entry<Double, Double> entry : value.entrySet()) {
REAL.writeLong(entryBuilder, floatToRawIntBits(entry.getKey().floatValue()));
REAL.writeLong(entryBuilder, floatToRawIntBits(entry.getValue().floatValue()));
}
out.closeEntry();
((MapBlockBuilder) out).buildEntry((keyBuilder, valueBuilder) -> {
for (Map.Entry<Double, Double> entry : value.entrySet()) {
REAL.writeLong(keyBuilder, floatToRawIntBits(entry.getKey().floatValue()));
REAL.writeLong(valueBuilder, floatToRawIntBits(entry.getValue().floatValue()));
}
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import io.airlift.slice.Slice;
import io.trino.spi.block.BlockBuilder;
import io.trino.spi.block.MapBlockBuilder;
import io.trino.spi.function.AccumulatorState;
import io.trino.spi.function.AccumulatorStateMetadata;
import io.trino.spi.function.AggregationFunction;
Expand Down Expand Up @@ -97,12 +98,10 @@ public static void output(@AggregationState State state, BlockBuilder out)
out.appendNull();
}
else {
BlockBuilder entryBuilder = out.beginBlockEntry();
state.get().forEachBucket((key, value) -> {
VarcharType.VARCHAR.writeSlice(entryBuilder, key);
BigintType.BIGINT.writeLong(entryBuilder, value);
});
out.closeEntry();
((MapBlockBuilder) out).buildEntry((keyBuilder, valueBuilder) -> state.get().forEachBucket((key, value) -> {
VarcharType.VARCHAR.writeSlice(keyBuilder, key);
BigintType.BIGINT.writeLong(valueBuilder, value);
}));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.trino.operator.aggregation.arrayagg;

import io.trino.operator.aggregation.NullablePosition;
import io.trino.spi.block.ArrayBlockBuilder;
import io.trino.spi.block.Block;
import io.trino.spi.block.BlockBuilder;
import io.trino.spi.function.AggregationFunction;
Expand Down Expand Up @@ -62,9 +63,7 @@ public static void output(
out.appendNull();
}
else {
BlockBuilder entryBuilder = out.beginBlockEntry();
state.forEach((block, position) -> elementType.appendTo(block, position, entryBuilder));
out.closeEntry();
((ArrayBlockBuilder) out).buildEntry(elementBuilder -> state.forEach((block, position) -> elementType.appendTo(block, position, elementBuilder)));
}
}
}
Loading