Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BUG: Modin on dask is throwing errors on initialization, #6870

Closed
3 tasks done
jan876 opened this issue Jan 19, 2024 · 3 comments
Closed
3 tasks done

BUG: Modin on dask is throwing errors on initialization, #6870

jan876 opened this issue Jan 19, 2024 · 3 comments
Labels
bug 🦗 Something isn't working Dask ⚡ Issues related to the Dask engine External Pull requests and issues from people who do not regularly contribute to modin Needs more information ❔ Issues that require more information from the reporter

Comments

@jan876
Copy link

jan876 commented Jan 19, 2024

Modin version checks

  • I have checked that this issue has not already been reported.

  • I have confirmed this bug exists on the latest released version of Modin.

  • I have confirmed this bug exists on the main branch of Modin. (In order to do this you can follow this guide.)

Reproducible Example

# modin version 0.26.0
# dask version 2024.1

from modin.config import Engine
Engine.put("dask")
from dask.distributed import Client
client = Client('localhost:8786') # this port forwards to dask cluster

import modin.pandas as mpd

df2 = mpd.DataFrame({'a': [1, 2], 'b': [3, 4]})
df2

Issue Description

Seems like modin is using an api from distributed client which is no longer supported.

Expected Behavior

It should create a simple test modin dataframe.

Error Logs

---------------------------------------------------------------------------
Exception                                 Traceback (most recent call last)
Cell In[26], line 1
----> 1 df2 = mpd.DataFrame({'a': [1, 2], 'b': [3, 4]})
      2 df2

File ~/.pyenv/versions/venv/lib/python3.11/site-packages/modin/logging/logger_decorator.py:129, in enable_logging.<locals>.decorator.<locals>.run_and_log(*args, **kwargs)
    114 """
    115 Compute function with logging if Modin logging is enabled.
    116 
   (...)
    126 Any
    127 """
    128 if LogMode.get() == "disable":
--> 129     return obj(*args, **kwargs)
    131 logger = get_logger()
    132 logger_level = getattr(logger, log_level)

File ~/.pyenv/versions/venv/lib/python3.11/site-packages/modin/pandas/dataframe.py:179, in DataFrame.__init__(self, data, index, columns, dtype, copy, query_compiler)
    177 # Check type of data and use appropriate constructor
    178 elif query_compiler is None:
--> 179     distributed_frame = from_non_pandas(data, index, columns, dtype)
    180     if distributed_frame is not None:
    181         self._query_compiler = distributed_frame._query_compiler

File ~/.pyenv/versions/venv/lib/python3.11/site-packages/modin/pandas/io.py:970, in from_non_pandas(df, index, columns, dtype)
    949 """
    950 Convert a non-pandas DataFrame into Modin DataFrame.
    951 
   (...)
    966     Converted DataFrame.
    967 """
    968 from modin.core.execution.dispatching.factories.dispatcher import FactoryDispatcher
--> 970 new_qc = FactoryDispatcher.from_non_pandas(df, index, columns, dtype)
    971 if new_qc is not None:
    972     return ModinObjects.DataFrame(query_compiler=new_qc)

File ~/.pyenv/versions/venv/lib/python3.11/site-packages/modin/core/execution/dispatching/factories/dispatcher.py:177, in FactoryDispatcher.from_non_pandas(cls, *args, **kwargs)
    174 @classmethod
    175 @_inherit_docstrings(factories.BaseFactory._from_non_pandas)
    176 def from_non_pandas(cls, *args, **kwargs):
--> 177     return cls.get_factory()._from_non_pandas(*args, **kwargs)

File ~/.pyenv/versions/venv/lib/python3.11/site-packages/modin/core/execution/dispatching/factories/dispatcher.py:115, in FactoryDispatcher.get_factory(cls)
    112 if cls.__factory is None:
    113     from modin.pandas import _update_engine
--> 115     Engine.subscribe(_update_engine)
    116     Engine.subscribe(cls._update_factory)
    117     StorageFormat.subscribe(cls._update_factory)

File ~/.pyenv/versions/venv/lib/python3.11/site-packages/modin/config/pubsub.py:291, in Parameter.subscribe(cls, callback)
    282 """
    283 Add `callback` to the `_subs` list and then execute it.
    284 
   (...)
    288     Callable to execute.
    289 """
    290 cls._subs.append(callback)
--> 291 callback(cls)

File ~/.pyenv/versions/venv/lib/python3.11/site-packages/modin/pandas/__init__.py:154, in _update_engine(publisher)
    151     if _is_first_update.get("Dask", True):
    152         from modin.core.execution.dask.common import initialize_dask
--> 154         initialize_dask()
    155 elif publisher.get() == "Unidist":
    156     if _is_first_update.get("Unidist", True):

File ~/.pyenv/versions/venv/lib/python3.11/site-packages/modin/core/execution/dask/common/utils.py:42, in initialize_dask()
     38         import warnings
     40         warnings.simplefilter("ignore", category=FutureWarning)
---> 42     client.run(_disable_warnings)
     44 except ValueError:
     45     from distributed import Client

File ~/.pyenv/versions/venv/lib/python3.11/site-packages/distributed/client.py:2998, in Client.run(self, function, workers, wait, nanny, on_error, *args, **kwargs)
   2915 def run(
   2916     self,
   2917     function,
   (...)
   2923     **kwargs,
   2924 ):
   2925     """
   2926     Run a function on all workers outside of task scheduling system
   2927 
   (...)
   2996     >>> c.run(print_state, wait=False)  # doctest: +SKIP
   2997     """
-> 2998     return self.sync(
   2999         self._run,
   3000         function,
   3001         *args,
   3002         workers=workers,
   3003         wait=wait,
   3004         nanny=nanny,
   3005         on_error=on_error,
   3006         **kwargs,
   3007     )

File ~/.pyenv/versions/venv/lib/python3.11/site-packages/distributed/utils.py:358, in SyncMethodMixin.sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    356     return future
    357 else:
--> 358     return sync(
    359         self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    360     )

File ~/.pyenv/versions/venv/lib/python3.11/site-packages/distributed/utils.py:434, in sync(loop, func, callback_timeout, *args, **kwargs)
    431         wait(10)
    433 if error is not None:
--> 434     raise error
    435 else:
    436     return result

File ~/.pyenv/versions/venv/lib/python3.11/site-packages/distributed/utils.py:408, in sync.<locals>.f()
    406         awaitable = wait_for(awaitable, timeout)
    407     future = asyncio.ensure_future(awaitable)
--> 408     result = yield future
    409 except Exception as exception:
    410     error = exception

File ~/.pyenv/versions/venv/lib/python3.11/site-packages/tornado/gen.py:767, in Runner.run(self)
    765 try:
    766     try:
--> 767         value = future.result()
    768     except Exception as e:
    769         # Save the exception for later. It's important that
    770         # gen.throw() not be called inside this try/except block
    771         # because that makes sys.exc_info behave unexpectedly.
    772         exc: Optional[Exception] = e

File ~/.pyenv/versions/venv/lib/python3.11/site-packages/distributed/client.py:2903, in Client._run(self, function, nanny, workers, wait, on_error, *args, **kwargs)
   2900     continue
   2902 if on_error == "raise":
-> 2903     raise exc
   2904 elif on_error == "return":
   2905     results[key] = exc

File /opt/conda/lib/python3.10/site-packages/distributed/scheduler.py:6258, in send_message()

File /opt/conda/lib/python3.10/site-packages/distributed/core.py:1180, in send_recv()

Exception: TypeError('code expected at most 16 arguments, got 18')

Installed Versions

INSTALLED VERSIONS

commit : 47a9a4a
python : 3.11.1.final.0
python-bits : 64
OS : Darwin
OS-release : 23.2.0
Version : Darwin Kernel Version 23.2.0: Wed Nov 15 21:55:06 PST 2023; root:xnu-10002.61.3~2/RELEASE_ARM64_T6020
machine : arm64
processor : arm
byteorder : little
LC_ALL : None
LANG : en_US.UTF-8
LOCALE : en_US.UTF-8

Modin dependencies

modin : 0.26.0
ray : 2.9.0
dask : 2024.1.0
distributed : 2024.1.0
hdk : None

pandas dependencies

pandas : 2.1.4
numpy : 1.26.1
pytz : 2023.3.post1
dateutil : 2.8.2
setuptools : 68.2.2
pip : 23.3.2
Cython : None
pytest : 7.1.2
hypothesis : None
sphinx : None
blosc : None
feather : None
xlsxwriter : None
lxml.etree : 4.9.3
html5lib : None
pymysql : None
psycopg2 : 2.9.5
jinja2 : 3.1.2
IPython : 8.17.2
pandas_datareader : None
bs4 : 4.12.2
bottleneck : None
dataframe-api-compat: None
fastparquet : None
fsspec : 2023.10.0
gcsfs : 2023.10.0
matplotlib : 3.6.2
numba : 0.58.1
numexpr : None
odfpy : None
openpyxl : 3.1.2
pandas_gbq : None
pyarrow : 14.0.1
pyreadstat : None
pyxlsb : None
s3fs : 0.4.2
scipy : 1.11.4
sqlalchemy : 1.4.49
tables : None
tabulate : None
xarray : 2023.11.0
xlrd : 2.0.1
zstandard : None
tzdata : 2023.3
qtpy : None
pyqt5 : None

@jan876 jan876 added bug 🦗 Something isn't working Triage 🩹 Issues that need triage labels Jan 19, 2024
@YarShev YarShev added Dask ⚡ Issues related to the Dask engine and removed Triage 🩹 Issues that need triage labels Jan 23, 2024
@YarShev
Copy link
Collaborator

YarShev commented Jan 23, 2024

Hi @jan876, could you elaborate on how you set up a Dask Distributed cluster? Is it a single-node case or multi-node?

@anmyachev
Copy link
Collaborator

@jan876 you most likely have a python version conflict. Locally you use version 3.11.1 and the Dask cluster works with 3.10. This can be seen in this part of the stack:

File ~/.pyenv/versions/venv/lib/python3.11/site-packages/distributed/client.py:2903, in Client._run(self, function, nanny, workers, wait, on_error, *args, **kwargs)
   2900     continue
   2902 if on_error == "raise":
-> 2903     raise exc
   2904 elif on_error == "return":
   2905     results[key] = exc

File /opt/conda/lib/python3.10/site-packages/distributed/scheduler.py:6258, in send_message()

File /opt/conda/lib/python3.10/site-packages/distributed/core.py:1180, in send_recv()

Exception: TypeError('code expected at most 16 arguments, got 18')

I saw a similar error in cloudpipe/cloudpickle#451, although this does not rely to Dask.

@anmyachev anmyachev added External Pull requests and issues from people who do not regularly contribute to modin Needs more information ❔ Issues that require more information from the reporter labels Jan 26, 2024
@anmyachev
Copy link
Collaborator

@jan876 feel free to reopen if needed

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug 🦗 Something isn't working Dask ⚡ Issues related to the Dask engine External Pull requests and issues from people who do not regularly contribute to modin Needs more information ❔ Issues that require more information from the reporter
Projects
None yet
Development

No branches or pull requests

3 participants