-
Notifications
You must be signed in to change notification settings - Fork 3.4k
Simplify compiled page projection #26531
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -61,7 +61,7 @@ | |
| import org.weakref.jmx.Managed; | ||
| import org.weakref.jmx.Nested; | ||
|
|
||
| import java.lang.invoke.MethodHandle; | ||
| import java.lang.reflect.Constructor; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.Optional; | ||
|
|
@@ -72,6 +72,7 @@ | |
|
|
||
| import static com.google.common.base.MoreObjects.toStringHelper; | ||
| import static com.google.common.base.Throwables.throwIfInstanceOf; | ||
| import static com.google.common.collect.ImmutableList.toImmutableList; | ||
| import static io.airlift.bytecode.Access.FINAL; | ||
| import static io.airlift.bytecode.Access.PRIVATE; | ||
| import static io.airlift.bytecode.Access.PUBLIC; | ||
|
|
@@ -97,7 +98,6 @@ | |
| import static io.trino.sql.relational.DeterminismEvaluator.isDeterministic; | ||
| import static io.trino.util.CompilerUtils.defineClass; | ||
| import static io.trino.util.CompilerUtils.makeClassName; | ||
| import static io.trino.util.Reflection.constructorMethodHandle; | ||
| import static java.util.Objects.requireNonNull; | ||
|
|
||
| public class PageFunctionCompiler | ||
|
|
@@ -198,9 +198,10 @@ private Supplier<PageProjection> compileProjectionInternal(RowExpression project | |
|
|
||
| ClassDefinition pageProjectionWorkDefinition = definePageProjectWorkClass(result.getRewrittenExpression(), callSiteBinder, classNameSuffix); | ||
|
|
||
| Class<?> pageProjectionWorkClass; | ||
| Constructor<? extends PageProjectionWork> pageProjectionWorkConstructor; | ||
| try { | ||
| pageProjectionWorkClass = defineClass(pageProjectionWorkDefinition, PageProjectionWork.class, callSiteBinder.getBindings(), getClass().getClassLoader()); | ||
| Class<? extends PageProjectionWork> pageProjectionWorkClass = defineClass(pageProjectionWorkDefinition, PageProjectionWork.class, callSiteBinder.getBindings(), getClass().getClassLoader()); | ||
| pageProjectionWorkConstructor = pageProjectionWorkClass.getConstructor(); | ||
| } | ||
| catch (Exception e) { | ||
| if (Throwables.getRootCause(e) instanceof MethodTooLargeException) { | ||
|
|
@@ -210,12 +211,19 @@ private Supplier<PageProjection> compileProjectionInternal(RowExpression project | |
| throw new TrinoException(COMPILER_ERROR, e); | ||
| } | ||
|
|
||
| MethodHandle pageProjectionConstructor = constructorMethodHandle(pageProjectionWorkClass, BlockBuilder.class, ConnectorSession.class, SourcePage.class, SelectedPositions.class); | ||
| return () -> new GeneratedPageProjection( | ||
| result.getRewrittenExpression(), | ||
| isExpressionDeterministic, | ||
| result.getInputChannels(), | ||
| pageProjectionConstructor); | ||
| return () -> { | ||
| try { | ||
| PageProjectionWork pageProjectionWork = pageProjectionWorkConstructor.newInstance(); | ||
| return new GeneratedPageProjection( | ||
| result.getRewrittenExpression(), | ||
| isExpressionDeterministic, | ||
| result.getInputChannels(), | ||
| pageProjectionWork); | ||
| } | ||
| catch (ReflectiveOperationException e) { | ||
| throw new TrinoException(COMPILER_ERROR, e); | ||
| } | ||
| }; | ||
| } | ||
|
|
||
| private static ParameterizedType generateProjectionWorkClassName(Optional<String> classNameSuffix) | ||
|
|
@@ -232,89 +240,96 @@ private ClassDefinition definePageProjectWorkClass(RowExpression projection, Cal | |
| type(PageProjectionWork.class)); | ||
|
|
||
| FieldDefinition blockBuilderField = classDefinition.declareField(a(PRIVATE), "blockBuilder", BlockBuilder.class); | ||
| FieldDefinition sessionField = classDefinition.declareField(a(PRIVATE), "session", ConnectorSession.class); | ||
| FieldDefinition selectedPositionsField = classDefinition.declareField(a(PRIVATE), "selectedPositions", SelectedPositions.class); | ||
|
|
||
| CachedInstanceBinder cachedInstanceBinder = new CachedInstanceBinder(classDefinition, callSiteBinder); | ||
|
|
||
| List<Integer> inputChannels = getInputChannels(projection); | ||
| List<FieldDefinition> blockFields = inputChannels.stream() | ||
| .map(channel -> classDefinition.declareField(a(PRIVATE), "block_" + channel, Block.class)) | ||
| .collect(toImmutableList()); | ||
| // process | ||
| generateProcessMethod(classDefinition, blockBuilderField, sessionField, selectedPositionsField); | ||
| generateProcessMethod(classDefinition, blockBuilderField, blockFields, inputChannels); | ||
|
|
||
| // evaluate | ||
| Map<LambdaDefinitionExpression, CompiledLambda> compiledLambdaMap = generateMethodsForLambda(classDefinition, callSiteBinder, cachedInstanceBinder, projection); | ||
| generateEvaluateMethod(classDefinition, callSiteBinder, cachedInstanceBinder, compiledLambdaMap, projection, blockBuilderField); | ||
|
|
||
| // constructor | ||
| Parameter blockBuilder = arg("blockBuilder", BlockBuilder.class); | ||
| Parameter session = arg("session", ConnectorSession.class); | ||
| Parameter page = arg("page", SourcePage.class); | ||
| Parameter selectedPositions = arg("selectedPositions", SelectedPositions.class); | ||
|
|
||
| MethodDefinition constructorDefinition = classDefinition.declareConstructor(a(PUBLIC), blockBuilder, session, page, selectedPositions); | ||
| MethodDefinition constructorDefinition = classDefinition.declareConstructor(a(PUBLIC)); | ||
|
|
||
| BytecodeBlock body = constructorDefinition.getBody(); | ||
| Variable thisVariable = constructorDefinition.getThis(); | ||
|
|
||
| body.comment("super();") | ||
| .append(thisVariable) | ||
| .invokeConstructor(Object.class) | ||
| .append(thisVariable.setField(blockBuilderField, blockBuilder)) | ||
| .append(thisVariable.setField(sessionField, session)) | ||
| .append(thisVariable.setField(selectedPositionsField, selectedPositions)); | ||
|
|
||
| for (int channel : getInputChannels(projection)) { | ||
| FieldDefinition blockField = classDefinition.declareField(a(PRIVATE, FINAL), "block_" + channel, Block.class); | ||
| body.append(thisVariable.setField(blockField, page.invoke("getBlock", Block.class, constantInt(channel)))); | ||
| } | ||
| .invokeConstructor(Object.class); | ||
|
|
||
| cachedInstanceBinder.generateInitializations(thisVariable, body); | ||
| body.ret(); | ||
|
|
||
| return classDefinition; | ||
| } | ||
|
|
||
| private static MethodDefinition generateProcessMethod( | ||
| private static void generateProcessMethod( | ||
| ClassDefinition classDefinition, | ||
| FieldDefinition blockBuilder, | ||
| FieldDefinition session, | ||
| FieldDefinition selectedPositions) | ||
| FieldDefinition blockBuilderField, | ||
| List<FieldDefinition> blockFields, | ||
| List<Integer> inputChannels) | ||
| { | ||
| MethodDefinition method = classDefinition.declareMethod(a(PUBLIC), "process", type(Block.class), ImmutableList.of()); | ||
| Parameter session = arg("session", ConnectorSession.class); | ||
| Parameter page = arg("page", SourcePage.class); | ||
| Parameter selectedPositions = arg("selectedPositions", SelectedPositions.class); | ||
| Parameter blockBuilder = arg("blockBuilder", BlockBuilder.class); | ||
|
|
||
| MethodDefinition method = classDefinition.declareMethod( | ||
| a(PUBLIC), | ||
| "process", | ||
| type(Block.class), | ||
| ImmutableList.<Parameter>builder() | ||
| .add(session) | ||
| .add(page) | ||
| .add(selectedPositions) | ||
| .add(blockBuilder) | ||
| .build()); | ||
|
|
||
| Scope scope = method.getScope(); | ||
| Variable thisVariable = method.getThis(); | ||
| BytecodeBlock body = method.getBody(); | ||
|
|
||
| Variable from = scope.declareVariable("from", body, thisVariable.getField(selectedPositions).invoke("getOffset", int.class)); | ||
| Variable to = scope.declareVariable("to", body, add(thisVariable.getField(selectedPositions).invoke("getOffset", int.class), thisVariable.getField(selectedPositions).invoke("size", int.class))); | ||
| for (int i = 0; i < inputChannels.size(); i++) { | ||
| int channel = inputChannels.get(i); | ||
| body.append(thisVariable.setField(blockFields.get(i), page.invoke("getBlock", Block.class, constantInt(channel)))); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looks like those fields are never cleared, so between executions, the last used blocks will be referenced here and thus non-eligible for GC
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. right, i was thinking that the PageProjection instance itself would go out of scope once the operator is done.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| } | ||
| body.append(thisVariable.setField(blockBuilderField, blockBuilder)); | ||
|
|
||
| Variable from = scope.declareVariable("from", body, selectedPositions.invoke("getOffset", int.class)); | ||
| Variable to = scope.declareVariable("to", body, add(selectedPositions.invoke("getOffset", int.class), selectedPositions.invoke("size", int.class))); | ||
| Variable positions = scope.declareVariable(int[].class, "positions"); | ||
| Variable index = scope.declareVariable(int.class, "index"); | ||
|
|
||
| IfStatement ifStatement = new IfStatement() | ||
| .condition(thisVariable.getField(selectedPositions).invoke("isList", boolean.class)); | ||
| .condition(selectedPositions.invoke("isList", boolean.class)); | ||
| body.append(ifStatement); | ||
|
|
||
| ifStatement.ifTrue(new BytecodeBlock() | ||
| .append(positions.set(thisVariable.getField(selectedPositions).invoke("getPositions", int[].class))) | ||
| .append(positions.set(selectedPositions.invoke("getPositions", int[].class))) | ||
| .append(new ForLoop("positions loop") | ||
| .initialize(index.set(from)) | ||
| .condition(lessThan(index, to)) | ||
| .update(index.increment()) | ||
| .body(new BytecodeBlock() | ||
| .append(thisVariable.invoke("evaluate", void.class, thisVariable.getField(session), positions.getElement(index)))))); | ||
| .append(thisVariable.invoke("evaluate", void.class, session, positions.getElement(index)))))); | ||
|
|
||
| ifStatement.ifFalse(new ForLoop("range based loop") | ||
| .initialize(index.set(from)) | ||
| .condition(lessThan(index, to)) | ||
| .update(index.increment()) | ||
| .body(new BytecodeBlock() | ||
| .append(thisVariable.invoke("evaluate", void.class, thisVariable.getField(session), index)))); | ||
| .append(thisVariable.invoke("evaluate", void.class, session, index)))); | ||
|
|
||
| body.comment("return this.blockBuilder.build();") | ||
| .append(thisVariable.getField(blockBuilder).invoke("build", Block.class)) | ||
| .append(thisVariable.getField(blockBuilderField).invoke("build", Block.class)) | ||
| .retObject(); | ||
|
|
||
| return method; | ||
| } | ||
|
|
||
| private MethodDefinition generateEvaluateMethod( | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.