Skip to content

Commit

Permalink
fix(flink): fix recreating table/view issue on flink backend
Browse files Browse the repository at this point in the history
  • Loading branch information
zhenzhongxu authored and gforsyth committed Dec 6, 2023
1 parent 9006642 commit 0c9791f
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 19 deletions.
78 changes: 60 additions & 18 deletions ibis/backends/flink/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
DropTable,
InsertSelect,
)
from ibis.backends.flink.utils import ibis_schema_to_flink_schema

if TYPE_CHECKING:
from collections.abc import Mapping
Expand Down Expand Up @@ -145,6 +146,34 @@ def list_tables(
# but executing the SQL string directly yields a `TableResult` object
return self._filter_with_like(tables, like)

def list_views(
self,
like: str | None = None,
temporary: bool = True,
) -> list[str]:
"""Return the list of view names.
Return the list of view names in the specified database and catalog.
or the default one if no database/catalog is specified.
Parameters
----------
like : str, optional
A pattern in Python's regex format.
temporary : bool, optional
Whether to list temporary views or permanent views.
Returns
-------
list[str]
The list of the view names that match the pattern `like`.
"""
if temporary:
views = self._table_env.list_temporary_views()
else:
views = self._table_env.list_views()
return self._filter_with_like(views, like)

def _fully_qualified_name(
self,
name: str,
Expand Down Expand Up @@ -319,26 +348,29 @@ def create_table(
if obj is None and schema is None:
raise exc.IbisError("The schema or obj parameter is required")

if overwrite:
self.drop_table(name=name, catalog=catalog, database=database, force=True)
# in-memory data is created as views in `pyflink`
elif obj is not None:
if isinstance(obj, pa.Table):
obj = obj.to_pandas()
if isinstance(obj, pd.DataFrame):
table = self._table_env.from_pandas(
obj, ibis_schema_to_flink_schema(schema)
)
if isinstance(obj, ir.Table):
table = obj
return self.create_view(name, table, database=database, overwrite=overwrite)

if isinstance(obj, pa.Table):
obj = obj.to_pandas()
if isinstance(obj, pd.DataFrame):
qualified_name = self._fully_qualified_name(name, database, catalog)
table = self._table_env.from_pandas(obj)
# in-memory data is created as views in `pyflink`
# TODO(chloeh13q): alternatively, we can do CREATE TABLE and then INSERT
# INTO ... VALUES to keep things consistent
self._table_env.create_temporary_view(qualified_name, table)
if isinstance(obj, ir.Table):
# TODO(chloeh13q): implement CREATE TABLE for expressions
raise NotImplementedError
if schema is not None:
# external data is created as tables in `pyflink`
else:
if not tbl_properties:
raise exc.IbisError(
"tbl_properties is required when creating table with schema"
)
if overwrite:
self.drop_table(
name=name, catalog=catalog, database=database, force=True
)

statement = CreateTableFromConnector(
table_name=name,
schema=schema,
Expand All @@ -349,8 +381,7 @@ def create_table(
catalog=catalog,
)
self._exec_sql(statement.compile())

return self.table(name, database=database)
return self.table(name, database=database)

def drop_table(
self,
Expand Down Expand Up @@ -391,6 +422,7 @@ def create_view(
obj: ir.Table,
*,
database: str | None = None,
catalog: str | None = None,
overwrite: bool = False,
) -> ir.Table:
"""Create a new view from an expression.
Expand All @@ -404,6 +436,8 @@ def create_view(
database
Name of the database where the view will be created, if not
provided the database's default is used.
catalog
Name of the catalog where the table exists, if not the default.
overwrite
Whether to clobber an existing view with the same name.
Expand All @@ -412,7 +446,15 @@ def create_view(
Table
The view that was created.
"""
raise NotImplementedError
if obj is None:
raise exc.IbisError("The obj parameter is required")

if overwrite and self.list_views(name):
self.drop_view(name=name, catalog=catalog, database=database, force=True)

qualified_name = self._fully_qualified_name(name, database, catalog)
self._table_env.create_temporary_view(qualified_name, obj)
return self.table(name, database=database)

def drop_view(
self,
Expand Down
14 changes: 13 additions & 1 deletion ibis/backends/flink/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@
from abc import ABC, abstractmethod
from collections import defaultdict

from pyflink.table.types import DataTypes
from pyflink.table.types import DataTypes, RowType

import ibis.expr.datatypes as dt
import ibis.expr.operations as ops
import ibis.expr.schema as sch
from ibis.common.temporal import IntervalUnit
from ibis.util import convert_unit

Expand Down Expand Up @@ -326,3 +327,14 @@ def translate_literal(op: ops.Literal) -> str:
return f"ARRAY{list(value)}"

raise NotImplementedError(f"No translation rule for {dtype}")


def ibis_schema_to_flink_schema(schema: sch.Schema) -> RowType:
if schema is None:
return None
return DataTypes.ROW(
[
DataTypes.FIELD(key, _to_pyflink_types[type(value)])
for key, value in schema.fields.items()
]
)

0 comments on commit 0c9791f

Please sign in to comment.