Skip to content

Commit

Permalink
fix: specify how function arguments are to be bound (#231)
Browse files Browse the repository at this point in the history
BREAKING CHANGE: function argument bindings were open to interpretation
before, and were often produced incorrectly; therefore, this change
semantically shifts some responsibilities from the consumers to the
producers.
  • Loading branch information
jvanstraten authored Jul 26, 2022
1 parent d83d566 commit d4cfbe0
Show file tree
Hide file tree
Showing 2 changed files with 178 additions and 21 deletions.
187 changes: 172 additions & 15 deletions proto/substrait/algebra.proto
Original file line number Diff line number Diff line change
Expand Up @@ -517,47 +517,146 @@ message Expression {
}
}

// A scalar function call.
message ScalarFunction {
// points to a function_anchor defined in this plan
// Points to a function_anchor defined in this plan, which must refer
// to a scalar function in the associated YAML file. Required; avoid
// using anchor/reference zero.
uint32 function_reference = 1;

// The arguments to be bound to the function. This must have exactly the
// number of arguments specified in the function definition, and the
// argument types must also match exactly:
//
// - Value arguments must be bound using FunctionArgument.value, and
// the expression in that must yield a value of a type that a function
// overload is defined for.
// - Type arguments must be bound using FunctionArgument.type.
// - Required enum arguments must be bound using FunctionArgument.enum
// followed by Enum.specified, with a string that case-insensitively
// matches one of the allowed options.
// - Optional enum arguments must be bound using FunctionArgument.enum
// followed by either Enum.specified or Enum.unspecified. If specified,
// the string must case-insensitively match one of the allowed options.
repeated FunctionArgument arguments = 4;

// Must be set to the return type of the function, exactly as derived
// using the declaration in the extension.
Type output_type = 3;

// deprecated; use args instead
// Deprecated; use arguments instead.
repeated Expression args = 2 [deprecated = true];
}

// A window function call.
message WindowFunction {
// points to a function_anchor defined in this plan
// Points to a function_anchor defined in this plan, which must refer
// to a window function in the associated YAML file. Required; 0 is
// considered to be a valid anchor/reference.
uint32 function_reference = 1;
repeated Expression partitions = 2;

// The arguments to be bound to the function. This must have exactly the
// number of arguments specified in the function definition, and the
// argument types must also match exactly:
//
// - Value arguments must be bound using FunctionArgument.value, and
// the expression in that must yield a value of a type that a function
// overload is defined for.
// - Type arguments must be bound using FunctionArgument.type, and a
// function overload must be defined for that type.
// - Required enum arguments must be bound using FunctionArgument.enum
// followed by Enum.specified, with a string that case-insensitively
// matches one of the allowed options.
// - Optional enum arguments must be bound using FunctionArgument.enum
// followed by either Enum.specified or Enum.unspecified. If specified,
// the string must case-insensitively match one of the allowed options.
repeated FunctionArgument arguments = 9;

// Must be set to the return type of the function, exactly as derived
// using the declaration in the extension.
Type output_type = 7;

// Describes which part of the window function to perform within the
// context of distributed algorithms. Required. Must be set to
// INITIAL_TO_RESULT for window functions that are not decomposable.
AggregationPhase phase = 6;

// If specified, the records that are part of the window defined by
// upper_bound and lower_bound are ordered according to this list
// before they are aggregated. The first sort field has the highest
// priority; only if a sort field determines two records to be equivalent
// is the next field queried. This field is optional, and is only allowed
// if the window function is defined to support sorting.
repeated SortField sorts = 3;
Bound upper_bound = 4;

// Specifies whether equivalent records are merged before being aggregated.
// Optional, defaults to AGGREGATION_INVOCATION_ALL.
AggregateFunction.AggregationInvocation invocation = 10;

// When one or more partition expressions are specified, two records are
// considered to be in the same partition if and only if these expressions
// yield an equal tuple of values for both. When computing the window
// function, only the subset of records within the bounds that are also in
// the same partition as the current record are aggregated.
repeated Expression partitions = 2;

// Defines the record relative to the current record from which the window
// extends. The bound is inclusive. If the lower bound indexes a record
// greater than the upper bound, TODO (null range/no records passed?
// wrapping around as if lower/upper were swapped? error? null?).
// Optional; defaults to the start of the partition.
Bound lower_bound = 5;
AggregationPhase phase = 6;
Type output_type = 7;
repeated FunctionArgument arguments = 9;

// deprecated; use args instead
// Defines the record relative to the current record up to which the window
// extends. The bound is inclusive. If the upper bound indexes a record
// less than the lower bound, TODO (null range/no records passed?
// wrapping around as if lower/upper were swapped? error? null?).
// Optional; defaults to the end of the partition.
Bound upper_bound = 4;

// Deprecated; use arguments instead.
repeated Expression args = 8 [deprecated = true];

// Defines one of the two boundaries for the window of a window function.
message Bound {
// Defines that the bound extends this far back from the current record.
message Preceding {
// A strictly positive integer specifying the number of records that
// the window extends back from the current record. Required. Use
// CurrentRow for offset zero and Following for negative offsets.
int64 offset = 1;
}

// Defines that the bound extends this far ahead of the current record.
message Following {
// A strictly positive integer specifying the number of records that
// the window extends ahead of the current record. Required. Use
// CurrentRow for offset zero and Preceding for negative offsets.
int64 offset = 1;
}

// Defines that the bound extends to or from the current record.
message CurrentRow {}

// Defines an "unbounded bound": for lower bounds this means the start
// of the partition, and for upper bounds this means the end of the
// partition.
message Unbounded {}

oneof kind {
// The bound extends some number of records behind the current record.
Preceding preceding = 1;

// The bound extends some number of records ahead of the current
// record.
Following following = 2;

// The bound extends to the current record.
CurrentRow current_row = 3;

// The bound extends to the start of the partition or the end of the
// partition, depending on whether this represents the upper or lower
// bound.
Unbounded unbounded = 4;
}
}
Expand Down Expand Up @@ -866,33 +965,91 @@ message SortField {
}
}

// Describes which part of an aggregation or window function to perform within
// the context of distributed algorithms.
enum AggregationPhase {
// Implies `INTERMEDIATE_TO_RESULT`.
AGGREGATION_PHASE_UNSPECIFIED = 0;

// Specifies that the function should be run only up to the point of
// generating an intermediate value, to be further aggregated later using
// INTERMEDIATE_TO_INTERMEDIATE or INTERMEDIATE_TO_RESULT.
AGGREGATION_PHASE_INITIAL_TO_INTERMEDIATE = 1;

// Specifies that the inputs of the aggregate or window function are the
// intermediate values of the function, and that the output should also be
// an intermediate value, to be further aggregated later using
// INTERMEDIATE_TO_INTERMEDIATE or INTERMEDIATE_TO_RESULT.
AGGREGATION_PHASE_INTERMEDIATE_TO_INTERMEDIATE = 2;

// A complete invocation: the function should aggregate the given set of
// inputs to yield a single return value. This style must be used for
// aggregate or window functions that are not decomposable.
AGGREGATION_PHASE_INITIAL_TO_RESULT = 3;

// Specifies that the inputs of the aggregate or window function are the
// intermediate values of the function, generated previously using
// INITIAL_TO_INTERMEDIATE and possibly INTERMEDIATE_TO_INTERMEDIATE calls.
// This call should combine the intermediate values to yield the final
// return value.
AGGREGATION_PHASE_INTERMEDIATE_TO_RESULT = 4;
}

// An aggregate function.
message AggregateFunction {
// points to a function_anchor defined in this plan
// Points to a function_anchor defined in this plan, which must refer
// to an aggregate function in the associated YAML file. Required; 0 is
// considered to be a valid anchor/reference.
uint32 function_reference = 1;

// The arguments to be bound to the function. This must have exactly the
// number of arguments specified in the function definition, and the
// argument types must also match exactly:
//
// - Value arguments must be bound using FunctionArgument.value, and
// the expression in that must yield a value of a type that a function
// overload is defined for.
// - Type arguments must be bound using FunctionArgument.type, and a
// function overload must be defined for that type.
// - Required enum arguments must be bound using FunctionArgument.enum
// followed by Enum.specified, with a string that case-insensitively
// matches one of the allowed options.
// - Optional enum arguments must be bound using FunctionArgument.enum
// followed by either Enum.specified or Enum.unspecified. If specified,
// the string must case-insensitively match one of the allowed options.
repeated FunctionArgument arguments = 7;
repeated SortField sorts = 3;
AggregationPhase phase = 4;

// Must be set to the return type of the function, exactly as derived
// using the declaration in the extension.
Type output_type = 5;

// Describes which part of the aggregation to perform within the context of
// distributed algorithms. Required. Must be set to INITIAL_TO_RESULT for
// aggregate functions that are not decomposable.
AggregationPhase phase = 4;

// If specified, the aggregated records are ordered according to this list
// before they are aggregated. The first sort field has the highest
// priority; only if a sort field determines two records to be equivalent is
// the next field queried. This field is optional.
repeated SortField sorts = 3;

// Specifies whether equivalent records are merged before being aggregated.
// Optional, defaults to AGGREGATION_INVOCATION_ALL.
AggregationInvocation invocation = 6;

// deprecated; use args instead
// deprecated; use arguments instead
repeated Expression args = 2 [deprecated = true];

// Method in which equivalent records are merged before being aggregated.
enum AggregationInvocation {
// This default value implies AGGREGATION_INVOCATION_ALL.
AGGREGATION_INVOCATION_UNSPECIFIED = 0;

// Use all values in aggregation calculation
// Use all values in the aggregation calculation.
AGGREGATION_INVOCATION_ALL = 1;

// Use only distinct values in aggregation calculation
// Use only distinct values in the aggregation calculation.
AGGREGATION_INVOCATION_DISTINCT = 2;
}
}
12 changes: 6 additions & 6 deletions site/docs/expressions/aggregate_functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ Aggregate functions are functions that define an operation which consumes values

Aggregate function signatures contain all the properties defined for [scalar functions](scalar_functions.md). Additionally, they contain the properties below:

| Property | Description | Required |
| ------------------------ | ------------------------------------------------------------- | ------------------------------- |
| Inherits | All properties defined for scalar function. | N/A |
| Ordered | Whether this aggregation function should allow user ordering. | Optional, defaults to false |
| Maximum set size | Maximum allowed set size as an unsigned integer. | Optional, defaults to unlimited |
| Property | Description | Required |
| ------------------------ | --------------------------------------------------------------- | ------------------------------- |
| Inherits | All properties defined for scalar function. | N/A |
| Ordered | Whether the result of this function is sensitive to sort order. | Optional, defaults to false |
| Maximum set size | Maximum allowed set size as an unsigned integer. | Optional, defaults to unlimited |
| Decomposable | Whether the function can be executed in one or more intermediate steps. Valid options are: `NONE`, `ONE`, `MANY`, describing how intermediate steps can be taken. | Optional, defaults to `NONE` |
| Intermediate Output Type | If the function is decomposable, represents the intermediate output type that is used, if the function is defined as either `ONE` or `MANY` decomposable. Will be a struct in many cases. | Required for `ONE` and `MANY`. |
| Invocation | Whether the function uses all or only distinct values in the aggregation calculation. Valid options are: `ALL`, `DISTINCT`. | Optional, defaults to `ALL` |
Expand All @@ -22,5 +22,5 @@ When binding an aggregate function, the binding must include the following addit
| Property | Description |
| -------- | ------------------------------------------------------------ |
| Phase | Describes the input type of the data: [INITIAL_TO_INTERMEDIATE, INTERMEDIATE_TO_INTERMEDIATE, INITIAL_TO_RESULT, INTERMEDIATE_TO_RESULT] describing what portion of the operation is required. For functions that are NOT decomposable, the only valid option will be INITIAL_TO_RESULT. |
| Ordering | One or more ordering keys along with key order (ASC\|DESC\|NULL FIRST, etc.), declared similar to the sort keys in an `ORDER BY` relational operation. Only allowed in cases where the function signature supports ordering. |
| Ordering | Zero or more ordering keys along with key order (ASC\|DESC\|NULL FIRST, etc.), declared similar to the sort keys in an `ORDER BY` relational operation. If no sorts are specified, the records are not sorted prior to being passed to the aggregate function. |

0 comments on commit d4cfbe0

Please sign in to comment.