Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def compile(
op: agg_ops.WindowOp,
column: typed_expr.TypedExpr,
*,
order_by: tuple[sge.Expression, ...],
order_by: tuple[sge.Expression, ...] = (),
) -> sge.Expression:
return ORDERED_UNARY_OP_REGISTRATION[op](op, column, order_by=order_by)

Expand Down
13 changes: 13 additions & 0 deletions bigframes/core/compile/sqlglot/aggregations/unary_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,19 @@ def _(
return sge.func("IFNULL", result, sge.true())


@UNARY_OP_REGISTRATION.register(agg_ops.AnyOp)
def _(
op: agg_ops.AnyOp,
column: typed_expr.TypedExpr,
window: typing.Optional[window_spec.WindowSpec] = None,
) -> sge.Expression:
expr = column.expr
expr = apply_window_if_present(sge.func("LOGICAL_OR", expr), window)

# BQ will return null for empty column, result would be false in pandas.
return sge.func("COALESCE", expr, sge.convert(False))


@UNARY_OP_REGISTRATION.register(agg_ops.ApproxQuartilesOp)
def _(
op: agg_ops.ApproxQuartilesOp,
Expand Down
65 changes: 57 additions & 8 deletions bigframes/core/compile/sqlglot/expressions/array_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,16 @@

import typing

import sqlglot
import sqlglot as sg
import sqlglot.expressions as sge

from bigframes import operations as ops
from bigframes.core.compile.sqlglot.expressions.typed_expr import TypedExpr
import bigframes.core.compile.sqlglot.scalar_compiler as scalar_compiler
import bigframes.dtypes as dtypes

register_unary_op = scalar_compiler.scalar_op_compiler.register_unary_op


@register_unary_op(ops.ArrayToStringOp, pass_op=True)
def _(expr: TypedExpr, op: ops.ArrayToStringOp) -> sge.Expression:
return sge.ArrayToString(this=expr.expr, expression=f"'{op.delimiter}'")
register_nary_op = scalar_compiler.scalar_op_compiler.register_nary_op


@register_unary_op(ops.ArrayIndexOp, pass_op=True)
Expand All @@ -41,17 +38,45 @@ def _(expr: TypedExpr, op: ops.ArrayIndexOp) -> sge.Expression:
)


@register_unary_op(ops.ArrayReduceOp, pass_op=True)
def _(expr: TypedExpr, op: ops.ArrayReduceOp) -> sge.Expression:
sub_expr = sg.to_identifier("bf_arr_reduce_uid")
sub_type = dtypes.get_array_inner_type(expr.dtype)

if op.aggregation.order_independent:
from bigframes.core.compile.sqlglot.aggregations import unary_compiler

agg_expr = unary_compiler.compile(op.aggregation, TypedExpr(sub_expr, sub_type))
else:
from bigframes.core.compile.sqlglot.aggregations import ordered_unary_compiler

agg_expr = ordered_unary_compiler.compile(
op.aggregation, TypedExpr(sub_expr, sub_type)
)

return (
sge.select(agg_expr)
.from_(
sge.Unnest(
expressions=[expr.expr],
alias=sge.TableAlias(columns=[sub_expr]),
)
)
.subquery()
)


@register_unary_op(ops.ArraySliceOp, pass_op=True)
def _(expr: TypedExpr, op: ops.ArraySliceOp) -> sge.Expression:
slice_idx = sqlglot.to_identifier("slice_idx")
slice_idx = sg.to_identifier("slice_idx")

conditions: typing.List[sge.Predicate] = [slice_idx >= op.start]

if op.stop is not None:
conditions.append(slice_idx < op.stop)

# local name for each element in the array
el = sqlglot.to_identifier("el")
el = sg.to_identifier("el")

selected_elements = (
sge.select(el)
Expand All @@ -66,3 +91,27 @@ def _(expr: TypedExpr, op: ops.ArraySliceOp) -> sge.Expression:
)

return sge.array(selected_elements)


@register_unary_op(ops.ArrayToStringOp, pass_op=True)
def _(expr: TypedExpr, op: ops.ArrayToStringOp) -> sge.Expression:
return sge.ArrayToString(this=expr.expr, expression=f"'{op.delimiter}'")


@register_nary_op(ops.ToArrayOp)
def _(*exprs: TypedExpr) -> sge.Expression:
do_upcast_bool = any(
dtypes.is_numeric(expr.dtype, include_bool=False) for expr in exprs
)
if do_upcast_bool:
sg_exprs = [_coerce_bool_to_int(expr) for expr in exprs]
else:
sg_exprs = [expr.expr for expr in exprs]
return sge.Array(expressions=sg_exprs)


def _coerce_bool_to_int(typed_expr: TypedExpr) -> sge.Expression:
"""Coerce boolean expression to integer."""
if typed_expr.dtype == dtypes.BOOL_DTYPE:
return sge.Cast(this=typed_expr.expr, to="INT64")
return typed_expr.expr
4 changes: 2 additions & 2 deletions tests/system/small/engines/test_array_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
REFERENCE_ENGINE = polars_executor.PolarsExecutor()


@pytest.mark.parametrize("engine", ["polars", "bq"], indirect=True)
@pytest.mark.parametrize("engine", ["polars", "bq", "bq-sqlglot"], indirect=True)
def test_engines_to_array_op(scalars_array_value: array_value.ArrayValue, engine):
# Bigquery won't allow you to materialize arrays with null, so use non-nullable
int64_non_null = ops.coalesce_op.as_expr("int64_col", expression.const(0))
Expand All @@ -46,7 +46,7 @@ def test_engines_to_array_op(scalars_array_value: array_value.ArrayValue, engine
assert_equivalence_execution(arr.node, REFERENCE_ENGINE, engine)


@pytest.mark.parametrize("engine", ["polars", "bq"], indirect=True)
@pytest.mark.parametrize("engine", ["polars", "bq", "bq-sqlglot"], indirect=True)
def test_engines_array_reduce_op(arrays_array_value: array_value.ArrayValue, engine):
arr, _ = arrays_array_value.compute_values(
[
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
WITH `bfcte_0` AS (
SELECT
`bool_col`
FROM `bigframes-dev`.`sqlglot_test`.`scalar_types`
), `bfcte_1` AS (
SELECT
COALESCE(LOGICAL_OR(`bool_col`), FALSE) AS `bfcol_1`
FROM `bfcte_0`
)
SELECT
`bfcol_1` AS `bool_col`
FROM `bfcte_1`
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
WITH `bfcte_0` AS (
SELECT
`bool_col`
FROM `bigframes-dev`.`sqlglot_test`.`scalar_types`
), `bfcte_1` AS (
SELECT
*,
CASE
WHEN `bool_col` IS NULL
THEN NULL
ELSE COALESCE(LOGICAL_OR(`bool_col`) OVER (), FALSE)
END AS `bfcol_1`
FROM `bfcte_0`
)
SELECT
`bfcol_1` AS `agg_bool`
FROM `bfcte_1`
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,20 @@ def test_all(scalar_types_df: bpd.DataFrame, snapshot):
snapshot.assert_match(sql_window_partition, "window_partition_out.sql")


def test_any(scalar_types_df: bpd.DataFrame, snapshot):
col_name = "bool_col"
bf_df = scalar_types_df[[col_name]]
agg_expr = agg_ops.AnyOp().as_expr(col_name)
sql = _apply_unary_agg_ops(bf_df, [agg_expr], [col_name])

snapshot.assert_match(sql, "out.sql")

# Window tests
window = window_spec.WindowSpec(ordering=(ordering.ascending_over(col_name),))
sql_window = _apply_unary_window_op(bf_df, agg_expr, window, "agg_bool")
snapshot.assert_match(sql_window, "window_out.sql")


def test_approx_quartiles(scalar_types_df: bpd.DataFrame, snapshot):
col_name = "int64_col"
bf_df = scalar_types_df[[col_name]]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
WITH `bfcte_0` AS (
SELECT
`bool_list_col`,
`float_list_col`,
`string_list_col`
FROM `bigframes-dev`.`sqlglot_test`.`repeated_types`
), `bfcte_1` AS (
SELECT
*,
(
SELECT
COALESCE(SUM(bf_arr_reduce_uid), 0)
FROM UNNEST(`float_list_col`) AS bf_arr_reduce_uid
) AS `bfcol_3`,
(
SELECT
STDDEV(bf_arr_reduce_uid)
FROM UNNEST(`float_list_col`) AS bf_arr_reduce_uid
) AS `bfcol_4`,
(
SELECT
COUNT(bf_arr_reduce_uid)
FROM UNNEST(`string_list_col`) AS bf_arr_reduce_uid
) AS `bfcol_5`,
(
SELECT
COALESCE(LOGICAL_OR(bf_arr_reduce_uid), FALSE)
FROM UNNEST(`bool_list_col`) AS bf_arr_reduce_uid
) AS `bfcol_6`
FROM `bfcte_0`
)
SELECT
`bfcol_3` AS `sum_float`,
`bfcol_4` AS `std_float`,
`bfcol_5` AS `count_str`,
`bfcol_6` AS `any_bool`
FROM `bfcte_1`
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
WITH `bfcte_0` AS (
SELECT
`bool_col`,
`float64_col`,
`int64_col`,
`string_col`
FROM `bigframes-dev`.`sqlglot_test`.`scalar_types`
), `bfcte_1` AS (
SELECT
*,
[COALESCE(`bool_col`, FALSE)] AS `bfcol_8`,
[COALESCE(`int64_col`, 0)] AS `bfcol_9`,
[COALESCE(`string_col`, ''), COALESCE(`string_col`, '')] AS `bfcol_10`,
[
COALESCE(`int64_col`, 0),
CAST(COALESCE(`bool_col`, FALSE) AS INT64),
COALESCE(`float64_col`, 0.0)
] AS `bfcol_11`
FROM `bfcte_0`
)
SELECT
`bfcol_8` AS `bool_col`,
`bfcol_9` AS `int64_col`,
`bfcol_10` AS `strs_col`,
`bfcol_11` AS `numeric_col`
FROM `bfcte_1`
37 changes: 37 additions & 0 deletions tests/unit/core/compile/sqlglot/expressions/test_array_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
import pytest

from bigframes import operations as ops
from bigframes.core import expression
from bigframes.operations._op_converters import convert_index, convert_slice
import bigframes.operations.aggregations as agg_ops
import bigframes.pandas as bpd
from bigframes.testing import utils

Expand All @@ -42,6 +44,20 @@ def test_array_index(repeated_types_df: bpd.DataFrame, snapshot):
snapshot.assert_match(sql, "out.sql")


def test_array_reduce_op(repeated_types_df: bpd.DataFrame, snapshot):
ops_map = {
"sum_float": ops.ArrayReduceOp(agg_ops.SumOp()).as_expr("float_list_col"),
"std_float": ops.ArrayReduceOp(agg_ops.StdOp()).as_expr("float_list_col"),
"count_str": ops.ArrayReduceOp(agg_ops.CountOp()).as_expr("string_list_col"),
"any_bool": ops.ArrayReduceOp(agg_ops.AnyOp()).as_expr("bool_list_col"),
}

sql = utils._apply_ops_to_sql(
repeated_types_df, list(ops_map.values()), list(ops_map.keys())
)
snapshot.assert_match(sql, "out.sql")


def test_array_slice_with_only_start(repeated_types_df: bpd.DataFrame, snapshot):
col_name = "string_list_col"
bf_df = repeated_types_df[[col_name]]
Expand All @@ -60,3 +76,24 @@ def test_array_slice_with_start_and_stop(repeated_types_df: bpd.DataFrame, snaps
)

snapshot.assert_match(sql, "out.sql")


def test_to_array_op(scalar_types_df: bpd.DataFrame, snapshot):
bf_df = scalar_types_df[["int64_col", "bool_col", "float64_col", "string_col"]]
# Bigquery won't allow you to materialize arrays with null, so use non-nullable
int64_non_null = ops.coalesce_op.as_expr("int64_col", expression.const(0))
bool_col_non_null = ops.coalesce_op.as_expr("bool_col", expression.const(False))
float_col_non_null = ops.coalesce_op.as_expr("float64_col", expression.const(0.0))
string_col_non_null = ops.coalesce_op.as_expr("string_col", expression.const(""))

ops_map = {
"bool_col": ops.ToArrayOp().as_expr(bool_col_non_null),
"int64_col": ops.ToArrayOp().as_expr(int64_non_null),
"strs_col": ops.ToArrayOp().as_expr(string_col_non_null, string_col_non_null),
"numeric_col": ops.ToArrayOp().as_expr(
int64_non_null, bool_col_non_null, float_col_non_null
),
}

sql = utils._apply_ops_to_sql(bf_df, list(ops_map.values()), list(ops_map.keys()))
snapshot.assert_match(sql, "out.sql")