Skip to content

Commit

Permalink
feat(flink): support ArrayValue.collect
Browse files Browse the repository at this point in the history
  • Loading branch information
ArtnerC authored and jcrist committed Aug 9, 2024
1 parent 9881edb commit eb857e6
Show file tree
Hide file tree
Showing 9 changed files with 19 additions and 28 deletions.
2 changes: 1 addition & 1 deletion .env
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FLINK_VERSION=1.19.1
FLINK_VERSION=1.20.0
CLOUDSDK_ACTIVE_CONFIG_NAME=ibis-gbq
GOOGLE_CLOUD_PROJECT="$CLOUDSDK_ACTIVE_CONFIG_NAME"
PGPASSWORD="postgres"
Expand Down
8 changes: 4 additions & 4 deletions .github/workflows/ibis-backends.yml
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ jobs:
extras:
- flink
additional_deps:
- "'apache-flink==1.19.1'"
- "'apache-flink==1.20.0'"
- "'pandas<2.2'"
- setuptools
services:
Expand All @@ -249,7 +249,7 @@ jobs:
extras:
- flink
additional_deps:
- "'apache-flink==1.19.1'"
- "'apache-flink==1.20.0'"
- "'pandas<2.2'"
- setuptools
services:
Expand Down Expand Up @@ -400,7 +400,7 @@ jobs:
extras:
- flink
additional_deps:
- "'apache-flink==1.19.1'"
- "'apache-flink==1.20.0'"
- "'pandas<2.2'"
- setuptools
services:
Expand All @@ -413,7 +413,7 @@ jobs:
extras:
- flink
additional_deps:
- "'apache-flink==1.19.1'"
- "'apache-flink==1.20.0'"
- "'pandas<2.2'"
- setuptools
services:
Expand Down
2 changes: 1 addition & 1 deletion conda/environment-arm64-flink.yml
Original file line number Diff line number Diff line change
Expand Up @@ -96,4 +96,4 @@ dependencies:
- py4j =0.10.9.7
- pip
- pip:
- apache-flink =1.19.1
- apache-flink =1.20.0
2 changes: 1 addition & 1 deletion conda/environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ channels:
- conda-forge
dependencies:
# runtime dependencies
- apache-flink
- apache-flink =1.20.0
- atpublic >=2.3
- black >=22.1.0,<25
- clickhouse-connect >=0.5.23
Expand Down
4 changes: 2 additions & 2 deletions docker/flink/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
ARG FLINK_VERSION=1.19.1
ARG FLINK_VERSION=1.20.0
FROM flink:${FLINK_VERSION}

# ibis-flink requires PyFlink dependency
ARG FLINK_VERSION=1.19.1
ARG FLINK_VERSION=1.20.0
RUN wget -nv -P $FLINK_HOME/lib/ https://repo1.maven.org/maven2/org/apache/flink/flink-python/${FLINK_VERSION}/flink-python-${FLINK_VERSION}.jar

# install python3 and pip3
Expand Down
6 changes: 6 additions & 0 deletions ibis/backends/sql/compilers/flink.py
Original file line number Diff line number Diff line change
Expand Up @@ -575,5 +575,11 @@ def visit_MapMerge(self, op: ops.MapMerge, *, left, right):
def visit_StructColumn(self, op, *, names, values):
return self.cast(sge.Struct(expressions=list(values)), op.dtype)

def visit_ArrayCollect(self, op, *, arg, where, order_by, include_null):
if not include_null:
cond = arg.is_(sg.not_(NULL, copy=False))
where = cond if where is None else sge.And(this=cond, expression=where)
return self.agg.array_agg(arg, where=where, order_by=order_by)


compiler = FlinkCompiler()
1 change: 1 addition & 0 deletions ibis/backends/sql/dialects.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ class Generator(Hive.Generator):
sge.VariancePop: rename_func("var_pop"),
sge.ArrayConcat: rename_func("array_concat"),
sge.ArraySize: rename_func("cardinality"),
sge.ArrayAgg: rename_func("array_agg"),
sge.Length: rename_func("char_length"),
sge.TryCast: lambda self,
e: f"TRY_CAST({e.this.sql(self.dialect)} AS {e.to.sql(self.dialect)})",
Expand Down
20 changes: 3 additions & 17 deletions ibis/backends/tests/test_aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -1394,25 +1394,11 @@ def test_group_concat_ordered(alltypes, df, filtered):


@pytest.mark.notimpl(
[
"druid",
"exasol",
"flink",
"impala",
"mssql",
"mysql",
"oracle",
"sqlite",
],
["druid", "exasol", "impala", "mssql", "mysql", "oracle", "sqlite"],
raises=com.OperationNotDefinedError,
)
@pytest.mark.notimpl(
[
"clickhouse",
"dask",
"pandas",
"pyspark",
],
["clickhouse", "dask", "pandas", "pyspark", "flink"],
raises=com.UnsupportedOperationError,
)
@pytest.mark.parametrize(
Expand Down Expand Up @@ -1447,7 +1433,7 @@ def test_collect_ordered(alltypes, df, filtered):


@pytest.mark.notimpl(
["druid", "exasol", "flink", "impala", "mssql", "mysql", "oracle", "sqlite"],
["druid", "exasol", "impala", "mssql", "mysql", "oracle", "sqlite"],
raises=com.OperationNotDefinedError,
)
@pytest.mark.notimpl(
Expand Down
2 changes: 0 additions & 2 deletions ibis/backends/tests/test_array.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,6 @@ def test_unnest_complex(backend):


@builtin_array
@pytest.mark.notimpl(["flink"], raises=com.OperationNotDefinedError)
@pytest.mark.notyet(
["datafusion"],
raises=Exception,
Expand Down Expand Up @@ -331,7 +330,6 @@ def test_unnest_idempotent(backend):


@builtin_array
@pytest.mark.notimpl(["flink"], raises=com.OperationNotDefinedError)
@pytest.mark.notyet(
["datafusion"],
raises=Exception,
Expand Down

0 comments on commit eb857e6

Please sign in to comment.