diff --git a/bigframes/core/compile/sqlglot/aggregations/ordered_unary_compiler.py b/bigframes/core/compile/sqlglot/aggregations/ordered_unary_compiler.py index 9024a9ec89..594d75fd3c 100644 --- a/bigframes/core/compile/sqlglot/aggregations/ordered_unary_compiler.py +++ b/bigframes/core/compile/sqlglot/aggregations/ordered_unary_compiler.py @@ -27,7 +27,7 @@ def compile( op: agg_ops.WindowOp, column: typed_expr.TypedExpr, *, - order_by: tuple[sge.Expression, ...], + order_by: tuple[sge.Expression, ...] = (), ) -> sge.Expression: return ORDERED_UNARY_OP_REGISTRATION[op](op, column, order_by=order_by) diff --git a/bigframes/core/compile/sqlglot/aggregations/unary_compiler.py b/bigframes/core/compile/sqlglot/aggregations/unary_compiler.py index d0d887588c..603e8a096c 100644 --- a/bigframes/core/compile/sqlglot/aggregations/unary_compiler.py +++ b/bigframes/core/compile/sqlglot/aggregations/unary_compiler.py @@ -49,6 +49,19 @@ def _( return sge.func("IFNULL", result, sge.true()) +@UNARY_OP_REGISTRATION.register(agg_ops.AnyOp) +def _( + op: agg_ops.AnyOp, + column: typed_expr.TypedExpr, + window: typing.Optional[window_spec.WindowSpec] = None, +) -> sge.Expression: + expr = column.expr + expr = apply_window_if_present(sge.func("LOGICAL_OR", expr), window) + + # BQ will return null for empty column, result would be false in pandas. + return sge.func("COALESCE", expr, sge.convert(False)) + + @UNARY_OP_REGISTRATION.register(agg_ops.ApproxQuartilesOp) def _( op: agg_ops.ApproxQuartilesOp, diff --git a/bigframes/core/compile/sqlglot/expressions/array_ops.py b/bigframes/core/compile/sqlglot/expressions/array_ops.py index 57ff2ee459..2758178beb 100644 --- a/bigframes/core/compile/sqlglot/expressions/array_ops.py +++ b/bigframes/core/compile/sqlglot/expressions/array_ops.py @@ -16,19 +16,16 @@ import typing -import sqlglot +import sqlglot as sg import sqlglot.expressions as sge from bigframes import operations as ops from bigframes.core.compile.sqlglot.expressions.typed_expr import TypedExpr import bigframes.core.compile.sqlglot.scalar_compiler as scalar_compiler +import bigframes.dtypes as dtypes register_unary_op = scalar_compiler.scalar_op_compiler.register_unary_op - - -@register_unary_op(ops.ArrayToStringOp, pass_op=True) -def _(expr: TypedExpr, op: ops.ArrayToStringOp) -> sge.Expression: - return sge.ArrayToString(this=expr.expr, expression=f"'{op.delimiter}'") +register_nary_op = scalar_compiler.scalar_op_compiler.register_nary_op @register_unary_op(ops.ArrayIndexOp, pass_op=True) @@ -41,9 +38,37 @@ def _(expr: TypedExpr, op: ops.ArrayIndexOp) -> sge.Expression: ) +@register_unary_op(ops.ArrayReduceOp, pass_op=True) +def _(expr: TypedExpr, op: ops.ArrayReduceOp) -> sge.Expression: + sub_expr = sg.to_identifier("bf_arr_reduce_uid") + sub_type = dtypes.get_array_inner_type(expr.dtype) + + if op.aggregation.order_independent: + from bigframes.core.compile.sqlglot.aggregations import unary_compiler + + agg_expr = unary_compiler.compile(op.aggregation, TypedExpr(sub_expr, sub_type)) + else: + from bigframes.core.compile.sqlglot.aggregations import ordered_unary_compiler + + agg_expr = ordered_unary_compiler.compile( + op.aggregation, TypedExpr(sub_expr, sub_type) + ) + + return ( + sge.select(agg_expr) + .from_( + sge.Unnest( + expressions=[expr.expr], + alias=sge.TableAlias(columns=[sub_expr]), + ) + ) + .subquery() + ) + + @register_unary_op(ops.ArraySliceOp, pass_op=True) def _(expr: TypedExpr, op: ops.ArraySliceOp) -> sge.Expression: - slice_idx = sqlglot.to_identifier("slice_idx") + slice_idx = sg.to_identifier("slice_idx") conditions: typing.List[sge.Predicate] = [slice_idx >= op.start] @@ -51,7 +76,7 @@ def _(expr: TypedExpr, op: ops.ArraySliceOp) -> sge.Expression: conditions.append(slice_idx < op.stop) # local name for each element in the array - el = sqlglot.to_identifier("el") + el = sg.to_identifier("el") selected_elements = ( sge.select(el) @@ -66,3 +91,27 @@ def _(expr: TypedExpr, op: ops.ArraySliceOp) -> sge.Expression: ) return sge.array(selected_elements) + + +@register_unary_op(ops.ArrayToStringOp, pass_op=True) +def _(expr: TypedExpr, op: ops.ArrayToStringOp) -> sge.Expression: + return sge.ArrayToString(this=expr.expr, expression=f"'{op.delimiter}'") + + +@register_nary_op(ops.ToArrayOp) +def _(*exprs: TypedExpr) -> sge.Expression: + do_upcast_bool = any( + dtypes.is_numeric(expr.dtype, include_bool=False) for expr in exprs + ) + if do_upcast_bool: + sg_exprs = [_coerce_bool_to_int(expr) for expr in exprs] + else: + sg_exprs = [expr.expr for expr in exprs] + return sge.Array(expressions=sg_exprs) + + +def _coerce_bool_to_int(typed_expr: TypedExpr) -> sge.Expression: + """Coerce boolean expression to integer.""" + if typed_expr.dtype == dtypes.BOOL_DTYPE: + return sge.Cast(this=typed_expr.expr, to="INT64") + return typed_expr.expr diff --git a/tests/system/small/engines/test_array_ops.py b/tests/system/small/engines/test_array_ops.py index c53b9e9dc1..3b80cb8854 100644 --- a/tests/system/small/engines/test_array_ops.py +++ b/tests/system/small/engines/test_array_ops.py @@ -26,7 +26,7 @@ REFERENCE_ENGINE = polars_executor.PolarsExecutor() -@pytest.mark.parametrize("engine", ["polars", "bq"], indirect=True) +@pytest.mark.parametrize("engine", ["polars", "bq", "bq-sqlglot"], indirect=True) def test_engines_to_array_op(scalars_array_value: array_value.ArrayValue, engine): # Bigquery won't allow you to materialize arrays with null, so use non-nullable int64_non_null = ops.coalesce_op.as_expr("int64_col", expression.const(0)) @@ -46,7 +46,7 @@ def test_engines_to_array_op(scalars_array_value: array_value.ArrayValue, engine assert_equivalence_execution(arr.node, REFERENCE_ENGINE, engine) -@pytest.mark.parametrize("engine", ["polars", "bq"], indirect=True) +@pytest.mark.parametrize("engine", ["polars", "bq", "bq-sqlglot"], indirect=True) def test_engines_array_reduce_op(arrays_array_value: array_value.ArrayValue, engine): arr, _ = arrays_array_value.compute_values( [ diff --git a/tests/unit/core/compile/sqlglot/aggregations/snapshots/test_unary_compiler/test_any/out.sql b/tests/unit/core/compile/sqlglot/aggregations/snapshots/test_unary_compiler/test_any/out.sql new file mode 100644 index 0000000000..03b0d5c151 --- /dev/null +++ b/tests/unit/core/compile/sqlglot/aggregations/snapshots/test_unary_compiler/test_any/out.sql @@ -0,0 +1,12 @@ +WITH `bfcte_0` AS ( + SELECT + `bool_col` + FROM `bigframes-dev`.`sqlglot_test`.`scalar_types` +), `bfcte_1` AS ( + SELECT + COALESCE(LOGICAL_OR(`bool_col`), FALSE) AS `bfcol_1` + FROM `bfcte_0` +) +SELECT + `bfcol_1` AS `bool_col` +FROM `bfcte_1` \ No newline at end of file diff --git a/tests/unit/core/compile/sqlglot/aggregations/snapshots/test_unary_compiler/test_any/window_out.sql b/tests/unit/core/compile/sqlglot/aggregations/snapshots/test_unary_compiler/test_any/window_out.sql new file mode 100644 index 0000000000..970349a4f5 --- /dev/null +++ b/tests/unit/core/compile/sqlglot/aggregations/snapshots/test_unary_compiler/test_any/window_out.sql @@ -0,0 +1,17 @@ +WITH `bfcte_0` AS ( + SELECT + `bool_col` + FROM `bigframes-dev`.`sqlglot_test`.`scalar_types` +), `bfcte_1` AS ( + SELECT + *, + CASE + WHEN `bool_col` IS NULL + THEN NULL + ELSE COALESCE(LOGICAL_OR(`bool_col`) OVER (), FALSE) + END AS `bfcol_1` + FROM `bfcte_0` +) +SELECT + `bfcol_1` AS `agg_bool` +FROM `bfcte_1` \ No newline at end of file diff --git a/tests/unit/core/compile/sqlglot/aggregations/test_unary_compiler.py b/tests/unit/core/compile/sqlglot/aggregations/test_unary_compiler.py index 478368393a..a21c753896 100644 --- a/tests/unit/core/compile/sqlglot/aggregations/test_unary_compiler.py +++ b/tests/unit/core/compile/sqlglot/aggregations/test_unary_compiler.py @@ -88,6 +88,20 @@ def test_all(scalar_types_df: bpd.DataFrame, snapshot): snapshot.assert_match(sql_window_partition, "window_partition_out.sql") +def test_any(scalar_types_df: bpd.DataFrame, snapshot): + col_name = "bool_col" + bf_df = scalar_types_df[[col_name]] + agg_expr = agg_ops.AnyOp().as_expr(col_name) + sql = _apply_unary_agg_ops(bf_df, [agg_expr], [col_name]) + + snapshot.assert_match(sql, "out.sql") + + # Window tests + window = window_spec.WindowSpec(ordering=(ordering.ascending_over(col_name),)) + sql_window = _apply_unary_window_op(bf_df, agg_expr, window, "agg_bool") + snapshot.assert_match(sql_window, "window_out.sql") + + def test_approx_quartiles(scalar_types_df: bpd.DataFrame, snapshot): col_name = "int64_col" bf_df = scalar_types_df[[col_name]] diff --git a/tests/unit/core/compile/sqlglot/expressions/snapshots/test_array_ops/test_array_reduce_op/out.sql b/tests/unit/core/compile/sqlglot/expressions/snapshots/test_array_ops/test_array_reduce_op/out.sql new file mode 100644 index 0000000000..b9f87bfd1e --- /dev/null +++ b/tests/unit/core/compile/sqlglot/expressions/snapshots/test_array_ops/test_array_reduce_op/out.sql @@ -0,0 +1,37 @@ +WITH `bfcte_0` AS ( + SELECT + `bool_list_col`, + `float_list_col`, + `string_list_col` + FROM `bigframes-dev`.`sqlglot_test`.`repeated_types` +), `bfcte_1` AS ( + SELECT + *, + ( + SELECT + COALESCE(SUM(bf_arr_reduce_uid), 0) + FROM UNNEST(`float_list_col`) AS bf_arr_reduce_uid + ) AS `bfcol_3`, + ( + SELECT + STDDEV(bf_arr_reduce_uid) + FROM UNNEST(`float_list_col`) AS bf_arr_reduce_uid + ) AS `bfcol_4`, + ( + SELECT + COUNT(bf_arr_reduce_uid) + FROM UNNEST(`string_list_col`) AS bf_arr_reduce_uid + ) AS `bfcol_5`, + ( + SELECT + COALESCE(LOGICAL_OR(bf_arr_reduce_uid), FALSE) + FROM UNNEST(`bool_list_col`) AS bf_arr_reduce_uid + ) AS `bfcol_6` + FROM `bfcte_0` +) +SELECT + `bfcol_3` AS `sum_float`, + `bfcol_4` AS `std_float`, + `bfcol_5` AS `count_str`, + `bfcol_6` AS `any_bool` +FROM `bfcte_1` \ No newline at end of file diff --git a/tests/unit/core/compile/sqlglot/expressions/snapshots/test_array_ops/test_to_array_op/out.sql b/tests/unit/core/compile/sqlglot/expressions/snapshots/test_array_ops/test_to_array_op/out.sql new file mode 100644 index 0000000000..3e29701658 --- /dev/null +++ b/tests/unit/core/compile/sqlglot/expressions/snapshots/test_array_ops/test_to_array_op/out.sql @@ -0,0 +1,26 @@ +WITH `bfcte_0` AS ( + SELECT + `bool_col`, + `float64_col`, + `int64_col`, + `string_col` + FROM `bigframes-dev`.`sqlglot_test`.`scalar_types` +), `bfcte_1` AS ( + SELECT + *, + [COALESCE(`bool_col`, FALSE)] AS `bfcol_8`, + [COALESCE(`int64_col`, 0)] AS `bfcol_9`, + [COALESCE(`string_col`, ''), COALESCE(`string_col`, '')] AS `bfcol_10`, + [ + COALESCE(`int64_col`, 0), + CAST(COALESCE(`bool_col`, FALSE) AS INT64), + COALESCE(`float64_col`, 0.0) + ] AS `bfcol_11` + FROM `bfcte_0` +) +SELECT + `bfcol_8` AS `bool_col`, + `bfcol_9` AS `int64_col`, + `bfcol_10` AS `strs_col`, + `bfcol_11` AS `numeric_col` +FROM `bfcte_1` \ No newline at end of file diff --git a/tests/unit/core/compile/sqlglot/expressions/test_array_ops.py b/tests/unit/core/compile/sqlglot/expressions/test_array_ops.py index 61b8b99479..67c8bb0e5c 100644 --- a/tests/unit/core/compile/sqlglot/expressions/test_array_ops.py +++ b/tests/unit/core/compile/sqlglot/expressions/test_array_ops.py @@ -15,7 +15,9 @@ import pytest from bigframes import operations as ops +from bigframes.core import expression from bigframes.operations._op_converters import convert_index, convert_slice +import bigframes.operations.aggregations as agg_ops import bigframes.pandas as bpd from bigframes.testing import utils @@ -42,6 +44,20 @@ def test_array_index(repeated_types_df: bpd.DataFrame, snapshot): snapshot.assert_match(sql, "out.sql") +def test_array_reduce_op(repeated_types_df: bpd.DataFrame, snapshot): + ops_map = { + "sum_float": ops.ArrayReduceOp(agg_ops.SumOp()).as_expr("float_list_col"), + "std_float": ops.ArrayReduceOp(agg_ops.StdOp()).as_expr("float_list_col"), + "count_str": ops.ArrayReduceOp(agg_ops.CountOp()).as_expr("string_list_col"), + "any_bool": ops.ArrayReduceOp(agg_ops.AnyOp()).as_expr("bool_list_col"), + } + + sql = utils._apply_ops_to_sql( + repeated_types_df, list(ops_map.values()), list(ops_map.keys()) + ) + snapshot.assert_match(sql, "out.sql") + + def test_array_slice_with_only_start(repeated_types_df: bpd.DataFrame, snapshot): col_name = "string_list_col" bf_df = repeated_types_df[[col_name]] @@ -60,3 +76,24 @@ def test_array_slice_with_start_and_stop(repeated_types_df: bpd.DataFrame, snaps ) snapshot.assert_match(sql, "out.sql") + + +def test_to_array_op(scalar_types_df: bpd.DataFrame, snapshot): + bf_df = scalar_types_df[["int64_col", "bool_col", "float64_col", "string_col"]] + # Bigquery won't allow you to materialize arrays with null, so use non-nullable + int64_non_null = ops.coalesce_op.as_expr("int64_col", expression.const(0)) + bool_col_non_null = ops.coalesce_op.as_expr("bool_col", expression.const(False)) + float_col_non_null = ops.coalesce_op.as_expr("float64_col", expression.const(0.0)) + string_col_non_null = ops.coalesce_op.as_expr("string_col", expression.const("")) + + ops_map = { + "bool_col": ops.ToArrayOp().as_expr(bool_col_non_null), + "int64_col": ops.ToArrayOp().as_expr(int64_non_null), + "strs_col": ops.ToArrayOp().as_expr(string_col_non_null, string_col_non_null), + "numeric_col": ops.ToArrayOp().as_expr( + int64_non_null, bool_col_non_null, float_col_non_null + ), + } + + sql = utils._apply_ops_to_sql(bf_df, list(ops_map.values()), list(ops_map.keys())) + snapshot.assert_match(sql, "out.sql")