feat: TVF Part 7/X Final PR of adaptation changes#26445
feat: TVF Part 7/X Final PR of adaptation changes#26445mohsaka merged 11 commits intoprestodb:masterfrom
Conversation
There was a problem hiding this comment.
Sorry @mohsaka, your pull request is larger than the review limit of 150000 diff characters
056ebac to
4f63c71
Compare
There was a problem hiding this comment.
Sorry @mohsaka, your pull request is larger than the review limit of 150000 diff characters
aditi-pandit
left a comment
There was a problem hiding this comment.
Thanks @mohsaka.
It might get easier to review the code if we split as follows :
i) Changes in TestingTableFunctions.java
ii) Add the TableFunctionNode changes and the basic structural wiring (in Printer, GraphVizPrinter, PlanBuilder, PlanMatcher)
iii) Similar changes ffor TableFunctionProcessorNode..The PlanNode definition and the basic structural wiring like above.
iv) There are bunch of changes like those in Field.java, StatementAnalyzer changes which can be one off changes and can be made independent of this PR.
v) Basic Planner rule for ImplementTableFunctionSource
vi) PruneTableFunction* rules + TestPrune*
vii) Remove* rules + TestRemove*
There was a problem hiding this comment.
Move this change to a separate PR.
There was a problem hiding this comment.
Don't understand the reason for the changes in this file. Why did you have to make this class public ?
There was a problem hiding this comment.
Coerce function needed for RelationPlanner
QueryPlanner.PlanAndMappings copartitionCoercions = partitionQueryPlanner.coerce(sourcePlanBuilder, partitioningColumns, analysis, idAllocator, variableAllocator, metadata);
sourcePlanBuilder = copartitionCoercions.getSubPlan();
partitionBy = partitioningColumns.stream()
.map(copartitionCoercions::get)
.collect(toImmutableList());
| Scope inputScope = analysis.getScope(tableArgumentsByName.get(name).getRelation()); | ||
| columns.stream() | ||
| .filter(column -> column < 0 || column >= inputScope.getRelationType().getAllFieldCount()) // hidden columns can be required as well as visible columns | ||
| .filter(column -> column < 0 || column >= inputScope.getRelationType().getVisibleFieldCount()) |
There was a problem hiding this comment.
This can be added as a single change without the others.
4f63c71 to
95fd75c
Compare
aditi-pandit
left a comment
There was a problem hiding this comment.
This code looks good. Will do one more read.
| // for processing or for pass-through. null value in the marker column indicates that the value at the same | ||
| // position in the source column should not be processed or passed-through. | ||
| // the mapping is only present if there are two or more sources. | ||
| private final Optional<Map<VariableReferenceExpression, VariableReferenceExpression>> markerVariables; |
There was a problem hiding this comment.
Can you add a comment with an example for this.
There was a problem hiding this comment.
@aditi-pandit Thanks. I've added an example in the comment.
| return orderBy; | ||
| } | ||
|
|
||
| static void addPassthroughColumns(ImmutableList.Builder<VariableReferenceExpression> outputVariables, |
There was a problem hiding this comment.
Can you add a comment about the usage of this function and explanation for each parameter
There was a problem hiding this comment.
@aditi-pandit Thanks. I've added a comment for the explannation.
|
@jaystarshot : PTAL. This code has the main planner changes for Table function. |
a952a34 to
33da439
Compare
| * - source T2(a2, b2) | ||
| * </pre> | ||
| */ | ||
| public class ImplementTableFunctionSource |
There was a problem hiding this comment.
the naming of this rule is non intuitive, can it be improved? eg
TransformTableFunctionToProcessorNodeRule
There was a problem hiding this comment.
@jaystarshot Thanks. I've renamed it to TransformTableFunctionToTableFunctionProcessor. What do you think of this name?
jaystarshot
left a comment
There was a problem hiding this comment.
I think this PR is missing end to end integration (execution) tests, without those its hard to say if the addExchanges etc is correct. Can you please add them
33da439 to
f5e5c48
Compare
jaystarshot
left a comment
There was a problem hiding this comment.
I have reviewed most of the planner changes only "SymbolMapper.java" is left which i will review.
| return Optional.empty(); | ||
| }); | ||
|
|
||
| return translatedProperties.unordered(true); |
There was a problem hiding this comment.
nit: maybe a comment here explaining why conservative in being unordered
There was a problem hiding this comment.
Added comment. Main reason is that the user can pretty much do anything with the rows they are provided. So we don't have a guarantee that they are ordered after table function application.
| PlanNode newSource = node.getSources().get(i).accept(this, context); | ||
| newSources.add(newSource); | ||
|
|
||
| SymbolMapper inputMapper = new SymbolMapper(new HashMap<>(), warningCollector); |
There was a problem hiding this comment.
This input mapper is empty, this doesn't look correct
There was a problem hiding this comment.
This was pretty wrong. Fixed a few things.
- Mapper should be used from Rewriter mapping.
- We should have been using context.rewrite instead of accept. Following the convention of the other Nodes.
Thanks!
Thank you for the review! I will take a look today or monday. |
| final class Processed | ||
| implements TableFunctionProcessorState | ||
| { | ||
| private final boolean usedInput; |
There was a problem hiding this comment.
Can you document what usedInput signifies so folks can understand the semantics?
| * @param input a tuple of {@link Page} including one page for each table function's input table. | ||
| * Pages list is ordered according to the corresponding argument specifications in {@link ConnectorTableFunction}. | ||
| * A page for an argument consists of columns requested during analysis (see {@link TableFunctionAnalysis#getRequiredColumns()}}. | ||
| * If any of the sources is fully processed, {@code Optional.empty)()} is returned for that source. |
There was a problem hiding this comment.
| * If any of the sources is fully processed, {@code Optional.empty)()} is returned for that source. | |
| * If any of the sources is fully processed, {@code Optional.empty()} is returned for that source. |
| /** | ||
| * This method processes a split. It is called multiple times until the whole output for the split is produced. | ||
| * | ||
| * @param split a {@link ConnectorSplit} representing a subtask. |
There was a problem hiding this comment.
Let's document that this is Nullable and when it's expected to be null.
There was a problem hiding this comment.
Added description for when table function is labeled KEEP WITH EMPTY and has no input.
| public interface TableFunctionSplitProcessor | ||
| { | ||
| /** | ||
| * This method processes a split. It is called multiple times until the whole output for the split is produced. |
There was a problem hiding this comment.
Should we make it clear this is 1:1 with a split?
There was a problem hiding this comment.
Added comment explaining that the Split processor is one to one with a split.
| import static com.google.common.base.Preconditions.checkArgument; | ||
| import static java.lang.String.format; | ||
|
|
||
| public class Sequence |
There was a problem hiding this comment.
Let's add documentation for this. This probably entails a new section for TVFs, next to our existing functions.
There was a problem hiding this comment.
Agreed, we plan on doing something similar to
https://trino.io/docs/current/functions/table.html
| import static java.util.Locale.ENGLISH; | ||
| import static java.util.stream.Collectors.joining; | ||
|
|
||
| public class ExcludeColumns |
There was a problem hiding this comment.
We should add documentation for this as well.
There was a problem hiding this comment.
Ditto to above.
https://trino.io/docs/current/functions/table.html
|
|
||
| private final OperatorContext operatorContext; | ||
|
|
||
| private final PageBuffer pageBuffer = new PageBuffer(); |
There was a problem hiding this comment.
My IDE is showing this is unused.
There was a problem hiding this comment.
I'm not too sure on why this was changed from the old implementation. But switched back to the old one which was simpler.
| // Fallback if needed | ||
| return getFunctionId(split, tableFunctionSplitResolvers); |
There was a problem hiding this comment.
Can you explain why we need this Exception-based fallback? Is there any way to refactor to avoid the need to do this? It's far preferable to just let Exceptions bubble up.
| .map(RowType.Field::getName) | ||
| .filter(Optional::isPresent) | ||
| .map(Optional::get) | ||
| .map(name -> name.toLowerCase(ENGLISH)) |
There was a problem hiding this comment.
I think this will break any connector that has case sensitive identifiers.
There was a problem hiding this comment.
Agreed. There's a comment left by kasiafi acknowledging this issue and a TODO above.
// column names in DescriptorArgument are canonical wrt SQL identifier semantics.
// column names in TableArgument are not canonical wrt SQL identifier semantics, as they are taken from the corresponding RelationType.
// because of that, we match the excluded columns names case-insensitive
// TODO: apply proper identifier semantics
| import static com.google.common.base.Preconditions.checkState; | ||
| import static java.util.Objects.requireNonNull; | ||
|
|
||
| public class PageBuffer |
There was a problem hiding this comment.
This looks completely unused.
There was a problem hiding this comment.
Used now. Not sure why it was not used before. I'm guessing I missed something when bringing code in by component.
|
@jaystarshot @tdcmeehan Thank you for the thorough reviews. Really appreciate the time you take to do them. I have addressed the comments so please take a second look when you have the chance. Thanks again! |
f29548e to
b84a071
Compare
…nPlanner and ExcludeColumns optimizer rule. Co-authored-by: kasiafi <30203062+kasiafi@users.noreply.github.com> Co-authored-by: Xin Zhang <desertsxin@gmail.com>
Co-authored-by: kasiafi <30203062+kasiafi@users.noreply.github.com> Co-authored-by: mohsaka <135669458+mohsaka@users.noreply.github.com>
Co-authored-by: kasiafi <30203062+kasiafi@users.noreply.github.com> Co-authored-by: mohsaka <135669458+mohsaka@users.noreply.github.com>
Changes adapted from trino/PR#16584 Author: kasiafi Co-authored-by: kasiafi <30203062+kasiafi@users.noreply.github.com> Co-authored-by: Xin Zhang <desertsxin@gmail.com>
Changes adapted from trino/PR#16716 Author: kasiafi Co-authored-by: kasiafi <30203062+kasiafi@users.noreply.github.com>
Changes adapted from trino/PR#25493 Author: kasiafi Co-authored-by: kasiafi <30203062+kasiafi@users.noreply.github.com>
b84a071 to
3e548ad
Compare
3e548ad to
756b350
Compare
| // as if it was a single partition. Alternatively, it could be split into smaller partitions of arbitrary size. | ||
| DataOrganizationSpecification specification = argumentProperties.getSpecification().orElse(UNORDERED_SINGLE_PARTITION); | ||
|
|
||
| PlanNode innerWindow = new WindowNode( |
There was a problem hiding this comment.
@mohsaka : We don't need to do window over window as a window node can have 2 window functions that have the same partition by and order by. Can you try combining the 2 windows with a single function list and see what happens ?
There was a problem hiding this comment.
@mohsaka : I'm fine with doing this as a follow up PR.
There was a problem hiding this comment.
@aditi-pandit We originally had it as a single window function, which can be viewed in one of our really old PR's
https://github.com/mohsaka/presto/blob/c2d64577387f41bd0f1270c55b0fa851920eb6bb/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/ImplementTableFunctionSource.java#L358
However it caused us to modify the WindowFilterPushDown rule and remove this check
We didn't think this was proper, so we had to split it in 2.
|
@mohsaka imported this issue as lakehouse/presto #26445 |
Description
This PR contains all final changes to TVF functionality.
Motivation and Context
Completes the addition of TVF support in Presto.
Impact
Test Plan
Added new test cases.
Rule test cases:
TestTransformTableFunctionToTableFunctionProcessor
TestPruneTableFunctionProcessorColumns
TestPruneTableFunctionProcessorSourceColumns
TestRemoveRedundantTableFunction
TestRewriteExcludeColumnsFunctionToProjection
Planner test case:
planner/TestTableFunctionInvocation
System TVF Test Cases:
TestExcludeColumnsFunction
TestSequenceFunction
Re-ran previous test cases to check for regressions:
test/TestTableFunctionInvocation
TestTableFunctionRegistry
TestAnalyzer
Contributor checklist
Release Notes
Please follow release notes guidelines and fill in the release notes below.