diff --git a/continuous_integration/environment-3.10-jdk11-dev.yaml b/continuous_integration/environment-3.10-jdk11-dev.yaml index d6c45735e..59033ae3c 100644 --- a/continuous_integration/environment-3.10-jdk11-dev.yaml +++ b/continuous_integration/environment-3.10-jdk11-dev.yaml @@ -6,6 +6,7 @@ dependencies: - dask-ml>=2022.1.22 - dask>=2022.3.0 - fastapi>=0.69.0 +- fugue>=0.7.0 - intake>=0.6.0 - jpype1>=1.0.2 - jsonschema @@ -31,14 +32,3 @@ dependencies: - tpot - tzlocal>=2.1 - uvicorn>=0.11.3 -# fugue dependencies; remove when we conda install fugue -- adagio -- antlr4-python3-runtime<4.10 -- ciso8601 -- fs -- pip -- qpd -- triad -- pip: - # FIXME: tests are failing with fugue 0.7.0 - - fugue[sql]>=0.5.3,<0.7.0 diff --git a/continuous_integration/environment-3.10-jdk8-dev.yaml b/continuous_integration/environment-3.10-jdk8-dev.yaml index f28206297..742e4ba1d 100644 --- a/continuous_integration/environment-3.10-jdk8-dev.yaml +++ b/continuous_integration/environment-3.10-jdk8-dev.yaml @@ -6,6 +6,7 @@ dependencies: - dask-ml>=2022.1.22 - dask>=2022.3.0 - fastapi>=0.69.0 +- fugue>=0.7.0 - intake>=0.6.0 - jpype1>=1.0.2 - jsonschema @@ -31,14 +32,3 @@ dependencies: - tpot - tzlocal>=2.1 - uvicorn>=0.11.3 -# fugue dependencies; remove when we conda install fugue -- adagio -- antlr4-python3-runtime<4.10 -- ciso8601 -- fs -- pip -- qpd -- triad -- pip: - # FIXME: tests are failing with fugue 0.7.0 - - fugue[sql]>=0.5.3,<0.7.0 diff --git a/continuous_integration/environment-3.8-jdk11-dev.yaml b/continuous_integration/environment-3.8-jdk11-dev.yaml index b865bb82f..17e39266a 100644 --- a/continuous_integration/environment-3.8-jdk11-dev.yaml +++ b/continuous_integration/environment-3.8-jdk11-dev.yaml @@ -6,6 +6,7 @@ dependencies: - dask-ml=2022.1.22 - dask=2022.3.0 - fastapi=0.69.0 +- fugue=0.7.0 - intake=0.6.0 - jpype1=1.0.2 - jsonschema @@ -31,13 +32,3 @@ dependencies: - tpot - tzlocal=2.1 - uvicorn=0.11.3 -# fugue dependencies; remove when we conda install fugue -- adagio -- antlr4-python3-runtime<4.10 -- ciso8601 -- fs -- pip -- qpd -- triad -- pip: - - fugue[sql]==0.5.3 diff --git a/continuous_integration/environment-3.8-jdk8-dev.yaml b/continuous_integration/environment-3.8-jdk8-dev.yaml index b143fcc6c..b6cbf582c 100644 --- a/continuous_integration/environment-3.8-jdk8-dev.yaml +++ b/continuous_integration/environment-3.8-jdk8-dev.yaml @@ -6,6 +6,7 @@ dependencies: - dask-ml=2022.1.22 - dask=2022.3.0 - fastapi=0.69.0 +- fugue=0.7.0 - intake=0.6.0 - jpype1=1.0.2 - jsonschema @@ -31,13 +32,3 @@ dependencies: - tpot - tzlocal=2.1 - uvicorn=0.11.3 -# fugue dependencies; remove when we conda install fugue -- adagio -- antlr4-python3-runtime<4.10 -- ciso8601 -- fs -- pip -- qpd -- triad -- pip: - - fugue[sql]==0.5.3 diff --git a/continuous_integration/environment-3.9-jdk11-dev.yaml b/continuous_integration/environment-3.9-jdk11-dev.yaml index 5a1c19777..f38cafade 100644 --- a/continuous_integration/environment-3.9-jdk11-dev.yaml +++ b/continuous_integration/environment-3.9-jdk11-dev.yaml @@ -6,6 +6,7 @@ dependencies: - dask-ml>=2022.1.22 - dask>=2022.3.0 - fastapi>=0.69.0 +- fugue>=0.7.0 - intake>=0.6.0 - jpype1>=1.0.2 - jsonschema @@ -31,14 +32,3 @@ dependencies: - tpot - tzlocal>=2.1 - uvicorn>=0.11.3 -# fugue dependencies; remove when we conda install fugue -- adagio -- antlr4-python3-runtime<4.10 -- ciso8601 -- fs -- pip -- qpd -- triad -- pip: - # FIXME: tests are failing with fugue 0.7.0 - - fugue[sql]>=0.5.3,<0.7.0 diff --git a/continuous_integration/environment-3.9-jdk8-dev.yaml b/continuous_integration/environment-3.9-jdk8-dev.yaml index 6f8367e51..c16d95f01 100644 --- a/continuous_integration/environment-3.9-jdk8-dev.yaml +++ b/continuous_integration/environment-3.9-jdk8-dev.yaml @@ -6,6 +6,7 @@ dependencies: - dask-ml>=2022.1.22 - dask>=2022.3.0 - fastapi>=0.69.0 +- fugue>=0.7.0 - intake>=0.6.0 - jpype1>=1.0.2 - jsonschema @@ -31,14 +32,3 @@ dependencies: - tpot - tzlocal>=2.1 - uvicorn>=0.11.3 -# fugue dependencies; remove when we conda install fugue -- adagio -- antlr4-python3-runtime<4.10 -- ciso8601 -- fs -- pip -- qpd -- triad -- pip: - # FIXME: tests are failing with fugue 0.7.0 - - fugue[sql]>=0.5.3,<0.7.0 diff --git a/dask_sql/integrations/fugue.py b/dask_sql/integrations/fugue.py index ce685a1ee..11b0c4d3a 100644 --- a/dask_sql/integrations/fugue.py +++ b/dask_sql/integrations/fugue.py @@ -1,8 +1,10 @@ try: import fugue import fugue_dask + from dask.distributed import Client from fugue import WorkflowDataFrame, register_execution_engine from fugue_sql import FugueSQLWorkflow + from triad import run_at_def from triad.utils.convert import get_caller_global_local_vars except ImportError: # pragma: no cover raise ImportError( @@ -15,9 +17,25 @@ from dask_sql.context import Context -register_execution_engine( - "dask", lambda conf: DaskSQLExecutionEngine(conf), on_dup="overwrite" -) + +@run_at_def +def _register_engines() -> None: + """Register (overwrite) the default Dask execution engine of Fugue. This + function is invoked as an entrypoint, users don't need to call it explicitly. + """ + register_execution_engine( + "dask", + lambda conf, **kwargs: DaskSQLExecutionEngine(conf=conf), + on_dup="overwrite", + ) + + register_execution_engine( + Client, + lambda engine, conf, **kwargs: DaskSQLExecutionEngine( + dask_client=engine, conf=conf + ), + on_dup="overwrite", + ) class DaskSQLEngine(fugue.execution.execution_engine.SQLEngine): diff --git a/docs/environment.yml b/docs/environment.yml index f8e0df2f8..5704e7e29 100644 --- a/docs/environment.yml +++ b/docs/environment.yml @@ -10,8 +10,7 @@ dependencies: - maven>=3.6.0 - dask>=2022.3.0 - pandas>=1.1.2 - # FIXME: tests are failing with fugue 0.7.0 - - fugue>=0.5.3,<0.7.0 + - fugue>=0.7.0 - jpype1>=1.0.2 - fastapi>=0.69.0 - uvicorn>=0.11.3 diff --git a/docs/requirements-docs.txt b/docs/requirements-docs.txt index 09870d22e..be37d0b83 100644 --- a/docs/requirements-docs.txt +++ b/docs/requirements-docs.txt @@ -3,8 +3,7 @@ sphinx-tabs dask-sphinx-theme>=3.0.0 dask>=2022.3.0 pandas>=1.1.2 -# FIXME: tests are failing with fugue 0.7.0 -fugue>=0.5.3,<0.7.0 +fugue>=0.7.0 jpype1>=1.0.2 fastapi>=0.69.0 uvicorn>=0.11.3 diff --git a/docs/source/fugue.rst b/docs/source/fugue.rst index 264d19fcd..972229c13 100644 --- a/docs/source/fugue.rst +++ b/docs/source/fugue.rst @@ -8,7 +8,7 @@ In order to offer a "best of both worlds" solution, dask-sql includes several op dask-sql as a FugueSQL engine ----------------------------- -FugueSQL users unfamiliar with dask-sql can take advantage of its functionality with minimal code changes by passing :class:`dask_sql.integrations.fugue.DaskSQLExecutionEngine` into the ``FugueSQLWorkflow`` being used to execute commands. +FugueSQL users unfamiliar with dask-sql can take advantage of its functionality by installing it in an environment alongside Fugue; this will automatically register :class:`dask_sql.integrations.fugue.DaskSQLExecutionEngine` as the default Dask execution engine for FugueSQL queries. For more information and sample usage, see `Fugue — dask-sql as a FugueSQL engine `_. Using FugueSQL on an existing ``Context`` diff --git a/notebooks/FugueSQL.ipynb b/notebooks/FugueSQL.ipynb new file mode 100644 index 000000000..1d59b8f78 --- /dev/null +++ b/notebooks/FugueSQL.ipynb @@ -0,0 +1,577 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "f39e2dbc-21a1-4d9a-bed7-e2bf2bd25bb8", + "metadata": {}, + "source": [ + "# FugueSQL Integrations\n", + "\n", + "[FugueSQL](https://fugue-tutorials.readthedocs.io/tutorials/fugue_sql/index.html) is a related project that aims to provide a unified SQL interface for a variety of different computing frameworks, including Dask.\n", + "While it offers a SQL engine with a larger set of supported commands, this comes at the cost of slower performance when using Dask in comparison to dask-sql.\n", + "In order to offer a \"best of both worlds\" solution, dask-sql can easily be integrated with FugueSQL, using its faster implementation of SQL commands when possible and falling back on FugueSQL's implementation when necessary." + ] + }, + { + "cell_type": "markdown", + "id": "90e31400", + "metadata": {}, + "source": [ + "## Setup\n", + "\n", + "FugueSQL offers the cell magic `%%fsql`, which can be used to define and execute queries entirely in SQL, with no need for external Python code!\n", + "\n", + "To use this cell magic, users must install [fugue-jupyter](https://pypi.org/project/fugue-jupyter/), which will additionally provide SQL syntax highlighting (note that the kernel must be restart after installing):" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "96c3ad1a", + "metadata": {}, + "outputs": [], + "source": [ + "!pip install fugue-jupyter" + ] + }, + { + "cell_type": "markdown", + "id": "ae79361a", + "metadata": {}, + "source": [ + "And run `fugue_jupyter.setup()` to register the magic:" + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "id": "2df05f5b", + "metadata": {}, + "outputs": [], + "source": [ + "from fugue_jupyter import setup\n", + "\n", + "setup()" + ] + }, + { + "cell_type": "markdown", + "id": "d3b8bfe5", + "metadata": {}, + "source": [ + "We will also start up a Dask client, which can be specified as an execution engine for FugueSQL queries:" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "id": "a35d98e6-f24e-46c4-a4e6-b64d649d8ba7", + "metadata": {}, + "outputs": [], + "source": [ + "from dask.distributed import Client\n", + "\n", + "client = Client()" + ] + }, + { + "cell_type": "markdown", + "id": "bcb96523", + "metadata": {}, + "source": [ + "## dask-sql as a FugueSQL execution engine\n", + "\n", + "When dask-sql is installed, its `DaskSQLExecutionEngine` is automatically registered as the default engine for FugueSQL queries ran on Dask.\n", + "We can then use it to run queries with the `%%fsql` cell magic, specifying `dask` as the execution engine to run the query on:" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "id": "ff633572-ad08-4de1-8678-a8fbd09effd1", + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
a
0xyz
\n", + "
" + ], + "text/plain": [ + " a\n", + "0 xyz" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/html": [ + "schema: a:str" + ], + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "%%fsql dask\n", + "\n", + "CREATE [[\"xyz\"], [\"xxx\"]] SCHEMA a:str\n", + "SELECT * WHERE a LIKE '%y%'\n", + "PRINT" + ] + }, + { + "cell_type": "markdown", + "id": "7f16b7d9-6b45-4caf-bbcb-63cc5d858556", + "metadata": {}, + "source": [ + "We can also use the `YIELD` keyword to register the results of our queries into Python objects:" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "id": "521965bc-1a4c-49ab-b48f-789351cb24d4", + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
b
0xyz
1xxx-
\n", + "
" + ], + "text/plain": [ + " b\n", + "0 xyz\n", + "1 xxx-" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/html": [ + "schema: b:str" + ], + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "%%fsql dask\n", + "src = CREATE [[\"xyz\"], [\"xxx\"]] SCHEMA a:str\n", + "\n", + "a = SELECT a AS b WHERE a LIKE '%y%'\n", + " YIELD DATAFRAME AS test\n", + "\n", + "b = SELECT CONCAT(a, '-') AS b FROM src WHERE a LIKE '%xx%'\n", + " YIELD DATAFRAME AS test1\n", + "\n", + "SELECT * FROM a UNION SELECT * FROM b\n", + "PRINT" + ] + }, + { + "cell_type": "markdown", + "id": "dfbb0a9a", + "metadata": {}, + "source": [ + "Which can then be interacted with outside of SQL:" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "id": "79a3e87a-2764-410c-b257-c710c4a6c6d4", + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
Dask DataFrame Structure:
\n", + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
b
npartitions=2
object
...
...
\n", + "
\n", + "
Dask Name: rename, 16 tasks
" + ], + "text/plain": [ + "Dask DataFrame Structure:\n", + " b\n", + "npartitions=2 \n", + " object\n", + " ...\n", + " ...\n", + "Dask Name: rename, 16 tasks" + ] + }, + "execution_count": 5, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "test.native # a Dask DataFrame" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "id": "c98cb652-06e2-444a-b70a-fdd3de9ecd15", + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
b
1xxx-
\n", + "
" + ], + "text/plain": [ + " b\n", + "1 xxx-" + ] + }, + "execution_count": 6, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "test1.native.compute()" + ] + }, + { + "cell_type": "markdown", + "id": "932ede31-90b2-49e5-9f4d-7cf1b8d919d2", + "metadata": {}, + "source": [ + "We can also run the equivalent of these queries in python code using `fugue_sql.fsql`, passing the Dask client into its `run` method to specify Dask as an execution engine:" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "id": "c265b170-de4d-4fab-aeae-9f94031e960d", + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
a
0xyz
\n", + "
" + ], + "text/plain": [ + " a\n", + "0 xyz" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/html": [ + "schema: a:str" + ], + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/plain": [ + "DataFrames()" + ] + }, + "execution_count": 7, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "from fugue_sql import fsql\n", + "\n", + "fsql(\"\"\"\n", + "CREATE [[\"xyz\"], [\"xxx\"]] SCHEMA a:str\n", + "SELECT * WHERE a LIKE '%y%'\n", + "PRINT\n", + "\"\"\").run(client)" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "id": "77e3bf50-8c8b-4e2f-a5e7-28b1d86499d7", + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
Dask DataFrame Structure:
\n", + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
a
npartitions=2
object
...
...
\n", + "
\n", + "
Dask Name: rename, 16 tasks
" + ], + "text/plain": [ + "Dask DataFrame Structure:\n", + " a\n", + "npartitions=2 \n", + " object\n", + " ...\n", + " ...\n", + "Dask Name: rename, 16 tasks" + ] + }, + "execution_count": 8, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "result = fsql(\"\"\"\n", + "CREATE [[\"xyz\"], [\"xxx\"]] SCHEMA a:str\n", + "SELECT * WHERE a LIKE '%y%'\n", + "YIELD DATAFRAME AS test2\n", + "\"\"\").run(client)\n", + "\n", + "result[\"test2\"].native # a Dask DataFrame" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "7d4c71d4-238f-4c72-8609-dbbe0782aea9", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.9.13" + }, + "vscode": { + "interpreter": { + "hash": "656801d214ad98d4b301386b078628ce3ae2dbd81a59ed4deed7a5b13edfab09" + } + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/setup.py b/setup.py index 79f2247b3..1d52789d1 100755 --- a/setup.py +++ b/setup.py @@ -112,14 +112,16 @@ def build(self): "black==22.3.0", "isort==5.7.0", ], - # FIXME: tests are failing with fugue 0.7.0 - "fugue": ["fugue[sql]>=0.5.3,<0.7.0"], + "fugue": ["fugue>=0.7.0"], }, entry_points={ "console_scripts": [ "dask-sql-server = dask_sql.server.app:main", "dask-sql = dask_sql.cmd:main", - ] + ], + "fugue.plugins": [ + "dasksql = dask_sql.integrations.fugue:_register_engines[fugue]" + ], }, zip_safe=False, cmdclass=cmdclass, diff --git a/tests/integration/test_fugue.py b/tests/integration/test_fugue.py index 951bf7a48..d846b5559 100644 --- a/tests/integration/test_fugue.py +++ b/tests/integration/test_fugue.py @@ -13,7 +13,8 @@ from dask_sql.integrations.fugue import DaskSQLExecutionEngine, fsql_dask -def test_simple_statement(): +@skip_if_external_scheduler +def test_simple_statement(client): with fugue_sql.FugueSQLWorkflow(DaskSQLExecutionEngine) as dag: df = dag.df([[0, "hello"], [1, "world"]], "a:int64,b:str") dag("SELECT * FROM df WHERE a > 0 YIELD DATAFRAME AS result") @@ -32,11 +33,19 @@ def test_simple_statement(): return_df = result["result"].as_pandas() assert_eq(return_df, pd.DataFrame({"a": [1], "b": ["world"]})) + result = fugue_sql.fsql( + "SELECT * FROM df WHERE a > 0 YIELD DATAFRAME AS result", + df=pdf, + ).run(client) + + return_df = result["result"].as_pandas() + assert_eq(return_df, pd.DataFrame({"a": [1], "b": ["world"]})) + # TODO: Revisit fixing this on an independant cluster (without dask-sql) based on the # discussion in https://github.com/dask-contrib/dask-sql/issues/407 @skip_if_external_scheduler -def test_fsql(): +def test_fsql(client): def assert_fsql(df: pd.DataFrame) -> None: assert_eq(df, pd.DataFrame({"a": [1]}))