Skip to content

Commit

Permalink
feat(flink): implement nested schema support
Browse files Browse the repository at this point in the history
  • Loading branch information
chloeh13q authored and cpcloud committed Dec 12, 2023
1 parent bdde3a4 commit 057fabc
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 12 deletions.
3 changes: 2 additions & 1 deletion ibis/backends/flink/ddl.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
format_partition,
is_fully_qualified,
)
from ibis.backends.base.sql.registry import quote_identifier, type_to_sql_string
from ibis.backends.base.sql.registry import quote_identifier
from ibis.backends.flink.registry import type_to_sql_string

if TYPE_CHECKING:
from ibis.api import Watermark
Expand Down
31 changes: 25 additions & 6 deletions ibis/backends/flink/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,13 @@

import ibis.common.exceptions as com
import ibis.expr.operations as ops
from ibis.backends.base.sql.registry import aggregate, fixed_arity, helpers, unary
from ibis.backends.base.sql.registry import (
aggregate,
fixed_arity,
helpers,
quote_identifier,
unary,
)
from ibis.backends.base.sql.registry import (
operation_registry as base_operation_registry,
)
Expand All @@ -17,6 +23,12 @@
operation_registry = base_operation_registry.copy()


def type_to_sql_string(tval):
if tval.is_array():
return f"array<{helpers.type_to_sql_string(tval.value_type)}>"
return helpers.type_to_sql_string(tval)


def _not(translator: ExprTranslator, op: ops.Node) -> str:
formatted_arg = translator.translate(op.arg)
if helpers.needs_parens(op.arg):
Expand Down Expand Up @@ -61,10 +73,11 @@ def _cast(translator: ExprTranslator, op: ops.generic.Cast) -> str:
return f"CAST({arg_translated} AS date)"
elif to.is_json():
return arg_translated

from ibis.backends.base.sql.registry.main import cast

return cast(translator=translator, op=op)
elif op.arg.dtype.is_temporal() and op.to.is_int64():
return f"1000000 * unix_timestamp({arg_translated})"
else:
sql_type = type_to_sql_string(op.to)
return f"CAST({arg_translated} AS {sql_type})"


def _left_op_right(translator: ExprTranslator, op_node: ops.Node, op_sign: str) -> str:
Expand Down Expand Up @@ -96,7 +109,7 @@ def _try_cast(translator: ExprTranslator, op: ops.Node) -> str:
# It's recommended to use UNIX_TIMESTAMP(CAST(timestamp_col AS STRING)) instead.
return f"UNIX_TIMESTAMP(TRY_CAST({arg_formatted} AS STRING))"
else:
sql_type = helpers.type_to_sql_string(op.to)
sql_type = type_to_sql_string(op.to)
return f"TRY_CAST({arg_formatted} AS {sql_type})"


Expand Down Expand Up @@ -382,6 +395,11 @@ def _timestamp_from_ymdhms(
return f"CAST({concat_string} AS TIMESTAMP)"


def _struct_field(translator, op):
arg = translator.translate(op.arg)
return f"{arg}.{quote_identifier(op.field, force=True)}"


operation_registry.update(
{
# Unary operations
Expand Down Expand Up @@ -444,6 +462,7 @@ def _timestamp_from_ymdhms(
ops.TimestampFromUNIX: _timestamp_from_unix,
ops.TimestampFromYMDHMS: _timestamp_from_ymdhms,
ops.TimestampSub: _timestamp_sub,
ops.StructField: _struct_field,
}
)

Expand Down
4 changes: 2 additions & 2 deletions ibis/backends/flink/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@


class TestConf(BackendTest):
supports_structs = False
force_sort = True
deps = "pandas", "pyflink"

Expand Down Expand Up @@ -50,13 +49,14 @@ def connect(*, tmpdir, worker_id, **kw: Any):
def _load_data(self, **_: Any) -> None:
import pandas as pd

from ibis.backends.tests.data import json_types
from ibis.backends.tests.data import json_types, struct_types

for table_name in TEST_TABLES:
path = self.data_dir / "parquet" / f"{table_name}.parquet"
self.connection.create_table(table_name, pd.read_parquet(path))

self.connection.create_table("json_t", json_types)
self.connection.create_table("struct", struct_types)


class TestConfForStreaming(TestConf):
Expand Down
14 changes: 13 additions & 1 deletion ibis/backends/tests/test_struct.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
pytestmark = [
pytest.mark.never(["mysql", "sqlite", "mssql"], reason="No struct support"),
pytest.mark.notyet(["impala"]),
pytest.mark.notimpl(["datafusion", "druid", "oracle", "flink"]),
pytest.mark.notimpl(["datafusion", "druid", "oracle"]),
]


Expand Down Expand Up @@ -55,6 +55,9 @@ def test_all_fields(struct, struct_df):

@pytest.mark.notimpl(["postgres"])
@pytest.mark.parametrize("field", ["a", "b", "c"])
@pytest.mark.notyet(
["flink"], reason="flink doesn't support creating struct columns from literals"
)
def test_literal(backend, con, field):
query = _STRUCT_LITERAL[field]
dtype = query.type().to_pandas()
Expand All @@ -69,6 +72,9 @@ def test_literal(backend, con, field):
@pytest.mark.notyet(
["clickhouse"], reason="clickhouse doesn't support nullable nested types"
)
@pytest.mark.notyet(
["flink"], reason="flink doesn't support creating struct columns from literals"
)
def test_null_literal(backend, con, field):
query = _NULL_STRUCT_LITERAL[field]
result = pd.Series([con.execute(query)])
Expand All @@ -78,6 +84,9 @@ def test_null_literal(backend, con, field):


@pytest.mark.notimpl(["dask", "pandas", "postgres"])
@pytest.mark.notyet(
["flink"], reason="flink doesn't support creating struct columns from literals"
)
def test_struct_column(backend, alltypes, df):
t = alltypes
expr = ibis.struct(dict(a=t.string_col, b=1, c=t.bigint_col)).name("s")
Expand All @@ -91,6 +100,9 @@ def test_struct_column(backend, alltypes, df):


@pytest.mark.notimpl(["dask", "pandas", "postgres", "polars"])
@pytest.mark.notyet(
["flink"], reason="flink doesn't support creating struct columns from collect"
)
def test_collect_into_struct(alltypes):
from ibis import _

Expand Down
4 changes: 2 additions & 2 deletions ibis/backends/tests/test_timecontext.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,8 @@ def test_context_adjustment_filter_before_window(
@pytest.mark.notimpl(["duckdb", "pyspark"])
@pytest.mark.notimpl(
["flink"],
raises=com.OperationNotDefinedError,
reason="No translation rule for <class 'ibis.expr.operations.structs.StructField'>",
raises=com.UnsupportedOperationError,
reason="Flink engine does not support generic window clause with no order by",
)
def test_context_adjustment_multi_col_udf_non_grouped(
backend, alltypes, context, monkeypatch
Expand Down

0 comments on commit 057fabc

Please sign in to comment.