Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

P2P shuffling loses annotations when dataframe is later transformed to array withdf.values #7715

Closed
hendrikmakait opened this issue Mar 28, 2023 · 2 comments
Labels
bug Something is broken shuffle

Comments

@hendrikmakait
Copy link
Member

This issue was originally mentioned in #7615 (comment).

from __future__ import annotations

import dask
import dask.dataframe as dd
from dask.distributed import Client, LocalCluster


def foo():
    cluster = LocalCluster()
    client = Client(cluster)
    df = dask.datasets.timeseries(
        start="2000-01-01",
        end="2000-03-01",
        dtypes={"x": float, "y": str},
        freq="10 s",
    )
    df = dd.shuffle.shuffle(df, "x")
    df = df.values
    dd.compute(df)


if __name__ == "__main__":
    foo()
2023-03-28 14:55:04,649 - distributed.core - ERROR - Exception while handling op shuffle_get_or_create
Traceback (most recent call last):
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/shuffle/_scheduler_extension.py", line 139, in get_or_create
    return self.get(id, worker)
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/shuffle/_scheduler_extension.py", line 127, in get
    state = self.states[id]
KeyError: '10ae5cb829e2dc40ee50f5c15a8b6533'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/core.py", line 828, in _handle_comm
    result = handler(**msg)
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/shuffle/_scheduler_extension.py", line 143, in get_or_create
    state = self._create_dataframe_shuffle_state(id, spec)
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/shuffle/_scheduler_extension.py", line 169, in _create_dataframe_shuffle_state
    part = ts.annotations["shuffle"]
KeyError: 'shuffle'
2023-03-28 14:55:04,651 - distributed.core - ERROR - Exception while handling op shuffle_get_or_create
Traceback (most recent call last):
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/shuffle/_scheduler_extension.py", line 139, in get_or_create
    return self.get(id, worker)
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/shuffle/_scheduler_extension.py", line 127, in get
    state = self.states[id]
KeyError: '10ae5cb829e2dc40ee50f5c15a8b6533'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/core.py", line 828, in _handle_comm
    result = handler(**msg)
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/shuffle/_scheduler_extension.py", line 143, in get_or_create
    state = self._create_dataframe_shuffle_state(id, spec)
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/shuffle/_scheduler_extension.py", line 169, in _create_dataframe_shuffle_state
    part = ts.annotations["shuffle"]
KeyError: 'shuffle'
2023-03-28 14:55:04,652 - distributed.core - ERROR - Exception while handling op shuffle_get_or_create
Traceback (most recent call last):
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/shuffle/_scheduler_extension.py", line 139, in get_or_create
    return self.get(id, worker)
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/shuffle/_scheduler_extension.py", line 127, in get
    state = self.states[id]
KeyError: '10ae5cb829e2dc40ee50f5c15a8b6533'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/core.py", line 828, in _handle_comm
    result = handler(**msg)
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/shuffle/_scheduler_extension.py", line 143, in get_or_create
    state = self._create_dataframe_shuffle_state(id, spec)
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/shuffle/_scheduler_extension.py", line 169, in _create_dataframe_shuffle_state
    part = ts.annotations["shuffle"]
KeyError: 'shuffle'
2023-03-28 14:55:04,653 - distributed.core - ERROR - Exception while handling op shuffle_get_or_create
Traceback (most recent call last):
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/shuffle/_scheduler_extension.py", line 139, in get_or_create
    return self.get(id, worker)
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/shuffle/_scheduler_extension.py", line 127, in get
    state = self.states[id]
KeyError: '10ae5cb829e2dc40ee50f5c15a8b6533'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/core.py", line 828, in _handle_comm
    result = handler(**msg)
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/shuffle/_scheduler_extension.py", line 143, in get_or_create
    state = self._create_dataframe_shuffle_state(id, spec)
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/shuffle/_scheduler_extension.py", line 169, in _create_dataframe_shuffle_state
    part = ts.annotations["shuffle"]
KeyError: 'shuffle'
2023-03-28 14:55:04,654 - distributed.core - ERROR - Exception while handling op shuffle_get_or_create
Traceback (most recent call last):
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/shuffle/_scheduler_extension.py", line 139, in get_or_create
    return self.get(id, worker)
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/shuffle/_scheduler_extension.py", line 127, in get
    state = self.states[id]
KeyError: '10ae5cb829e2dc40ee50f5c15a8b6533'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/core.py", line 828, in _handle_comm
    result = handler(**msg)
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/shuffle/_scheduler_extension.py", line 143, in get_or_create
    state = self._create_dataframe_shuffle_state(id, spec)
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/shuffle/_scheduler_extension.py", line 169, in _create_dataframe_shuffle_state
    part = ts.annotations["shuffle"]
KeyError: 'shuffle'
2023-03-28 14:55:04,655 - distributed.core - ERROR - Exception while handling op shuffle_get_or_create
Traceback (most recent call last):
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/shuffle/_scheduler_extension.py", line 139, in get_or_create
    return self.get(id, worker)
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/shuffle/_scheduler_extension.py", line 127, in get
    state = self.states[id]
KeyError: '10ae5cb829e2dc40ee50f5c15a8b6533'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/core.py", line 828, in _handle_comm
    result = handler(**msg)
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/shuffle/_scheduler_extension.py", line 143, in get_or_create
    state = self._create_dataframe_shuffle_state(id, spec)
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/shuffle/_scheduler_extension.py", line 169, in _create_dataframe_shuffle_state
    part = ts.annotations["shuffle"]
KeyError: 'shuffle'
2023-03-28 14:55:04,656 - distributed.core - ERROR - Exception while handling op shuffle_get_or_create
Traceback (most recent call last):
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/shuffle/_scheduler_extension.py", line 139, in get_or_create
    return self.get(id, worker)
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/shuffle/_scheduler_extension.py", line 127, in get
    state = self.states[id]
KeyError: '10ae5cb829e2dc40ee50f5c15a8b6533'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/core.py", line 828, in _handle_comm
    result = handler(**msg)
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/shuffle/_scheduler_extension.py", line 143, in get_or_create
    state = self._create_dataframe_shuffle_state(id, spec)
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/shuffle/_scheduler_extension.py", line 169, in _create_dataframe_shuffle_state
    part = ts.annotations["shuffle"]
KeyError: 'shuffle'
2023-03-28 14:55:04,657 - distributed.core - ERROR - Exception while handling op shuffle_get_or_create
Traceback (most recent call last):
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/shuffle/_scheduler_extension.py", line 139, in get_or_create
    return self.get(id, worker)
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/shuffle/_scheduler_extension.py", line 127, in get
    state = self.states[id]
KeyError: '10ae5cb829e2dc40ee50f5c15a8b6533'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/core.py", line 828, in _handle_comm
    result = handler(**msg)
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/shuffle/_scheduler_extension.py", line 143, in get_or_create
    state = self._create_dataframe_shuffle_state(id, spec)
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/shuffle/_scheduler_extension.py", line 169, in _create_dataframe_shuffle_state
    part = ts.annotations["shuffle"]
KeyError: 'shuffle'
2023-03-28 14:55:04,658 - distributed.core - ERROR - Exception while handling op shuffle_get_or_create
Traceback (most recent call last):
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/shuffle/_scheduler_extension.py", line 139, in get_or_create
    return self.get(id, worker)
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/shuffle/_scheduler_extension.py", line 127, in get
    state = self.states[id]
KeyError: '10ae5cb829e2dc40ee50f5c15a8b6533'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/core.py", line 828, in _handle_comm
    result = handler(**msg)
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/shuffle/_scheduler_extension.py", line 143, in get_or_create
    state = self._create_dataframe_shuffle_state(id, spec)
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/shuffle/_scheduler_extension.py", line 169, in _create_dataframe_shuffle_state
    part = ts.annotations["shuffle"]
KeyError: 'shuffle'
2023-03-28 14:55:04,661 - distributed.core - ERROR - Exception while handling op shuffle_get_or_create
Traceback (most recent call last):
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/shuffle/_scheduler_extension.py", line 139, in get_or_create
    return self.get(id, worker)
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/shuffle/_scheduler_extension.py", line 127, in get
    state = self.states[id]
KeyError: '10ae5cb829e2dc40ee50f5c15a8b6533'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/core.py", line 828, in _handle_comm
    result = handler(**msg)
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/shuffle/_scheduler_extension.py", line 143, in get_or_create
    state = self._create_dataframe_shuffle_state(id, spec)
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/shuffle/_scheduler_extension.py", line 169, in _create_dataframe_shuffle_state
    part = ts.annotations["shuffle"]
KeyError: 'shuffle'
2023-03-28 14:55:04,847 - distributed.worker - WARNING - Compute Failed
Key:       ('assign-shuffle-transfer-10ae5cb829e2dc40ee50f5c15a8b6533', 13)
Function:  execute_task
args:      ((<function shuffle_transfer at 0x118853430>, (subgraph_callable-db1f36fe-7100-4a2d-b6d9-586b20a3b6b0, '_partitions', 'getitem-57f5ac6201341b78464c5b49b7eb3b97', ['x'], ([Timestamp('2000-01-14 00:00:00', freq='D'), Timestamp('2000-01-15 00:00:00', freq='D')], 1463794479)), '10ae5cb829e2dc40ee50f5c15a8b6533', 13, 60, '_partitions'))
kwargs:    {}
Exception: "RuntimeError('shuffle_transfer failed during shuffle 10ae5cb829e2dc40ee50f5c15a8b6533')"

2023-03-28 14:55:04,849 - distributed.worker - WARNING - Compute Failed
Key:       ('assign-shuffle-transfer-10ae5cb829e2dc40ee50f5c15a8b6533', 10)
Function:  execute_task
args:      ((<function shuffle_transfer at 0x11e203430>, (subgraph_callable-db1f36fe-7100-4a2d-b6d9-586b20a3b6b0, '_partitions', 'getitem-57f5ac6201341b78464c5b49b7eb3b97', ['x'], ([Timestamp('2000-01-11 00:00:00', freq='D'), Timestamp('2000-01-12 00:00:00', freq='D')], 1899652886)), '10ae5cb829e2dc40ee50f5c15a8b6533', 10, 60, '_partitions'))
kwargs:    {}
Exception: "RuntimeError('shuffle_transfer failed during shuffle 10ae5cb829e2dc40ee50f5c15a8b6533')"

2023-03-28 14:55:04,852 - distributed.worker - WARNING - Compute Failed
Key:       ('assign-shuffle-transfer-10ae5cb829e2dc40ee50f5c15a8b6533', 0)
Function:  execute_task
args:      ((<function shuffle_transfer at 0x118853430>, (subgraph_callable-db1f36fe-7100-4a2d-b6d9-586b20a3b6b0, '_partitions', 'getitem-57f5ac6201341b78464c5b49b7eb3b97', ['x'], ([Timestamp('2000-01-01 00:00:00', freq='D'), Timestamp('2000-01-02 00:00:00', freq='D')], 1560189670)), '10ae5cb829e2dc40ee50f5c15a8b6533', 0, 60, '_partitions'))
kwargs:    {}
Exception: "RuntimeError('shuffle_transfer failed during shuffle 10ae5cb829e2dc40ee50f5c15a8b6533')"

2023-03-28 14:55:04,852 - distributed.worker - WARNING - Compute Failed
Key:       ('assign-shuffle-transfer-10ae5cb829e2dc40ee50f5c15a8b6533', 16)
Function:  execute_task
args:      ((<function shuffle_transfer at 0x119d43430>, (subgraph_callable-db1f36fe-7100-4a2d-b6d9-586b20a3b6b0, '_partitions', 'getitem-57f5ac6201341b78464c5b49b7eb3b97', ['x'], ([Timestamp('2000-01-17 00:00:00', freq='D'), Timestamp('2000-01-18 00:00:00', freq='D')], 81871171)), '10ae5cb829e2dc40ee50f5c15a8b6533', 16, 60, '_partitions'))
kwargs:    {}
Exception: "RuntimeError('shuffle_transfer failed during shuffle 10ae5cb829e2dc40ee50f5c15a8b6533')"

2023-03-28 14:55:04,853 - distributed.worker - WARNING - Compute Failed
Key:       ('assign-shuffle-transfer-10ae5cb829e2dc40ee50f5c15a8b6533', 15)
Function:  execute_task
args:      ((<function shuffle_transfer at 0x11e203430>, (subgraph_callable-db1f36fe-7100-4a2d-b6d9-586b20a3b6b0, '_partitions', 'getitem-57f5ac6201341b78464c5b49b7eb3b97', ['x'], ([Timestamp('2000-01-16 00:00:00', freq='D'), Timestamp('2000-01-17 00:00:00', freq='D')], 1135040044)), '10ae5cb829e2dc40ee50f5c15a8b6533', 15, 60, '_partitions'))
kwargs:    {}
Exception: "RuntimeError('shuffle_transfer failed during shuffle 10ae5cb829e2dc40ee50f5c15a8b6533')"

2023-03-28 14:55:04,867 - distributed.worker - WARNING - Compute Failed
Key:       ('assign-shuffle-transfer-10ae5cb829e2dc40ee50f5c15a8b6533', 11)
Function:  execute_task
args:      ((<function shuffle_transfer at 0x119d43430>, (subgraph_callable-db1f36fe-7100-4a2d-b6d9-586b20a3b6b0, '_partitions', 'getitem-57f5ac6201341b78464c5b49b7eb3b97', ['x'], ([Timestamp('2000-01-12 00:00:00', freq='D'), Timestamp('2000-01-13 00:00:00', freq='D')], 486093178)), '10ae5cb829e2dc40ee50f5c15a8b6533', 11, 60, '_partitions'))
kwargs:    {}
Exception: "RuntimeError('shuffle_transfer failed during shuffle 10ae5cb829e2dc40ee50f5c15a8b6533')"

2023-03-28 14:55:04,868 - distributed.worker - WARNING - Compute Failed
Key:       ('assign-shuffle-transfer-10ae5cb829e2dc40ee50f5c15a8b6533', 12)
Function:  execute_task
args:      ((<function shuffle_transfer at 0x168bc3430>, (subgraph_callable-db1f36fe-7100-4a2d-b6d9-586b20a3b6b0, '_partitions', 'getitem-57f5ac6201341b78464c5b49b7eb3b97', ['x'], ([Timestamp('2000-01-13 00:00:00', freq='D'), Timestamp('2000-01-14 00:00:00', freq='D')], 1385589376)), '10ae5cb829e2dc40ee50f5c15a8b6533', 12, 60, '_partitions'))
kwargs:    {}
Exception: "RuntimeError('shuffle_transfer failed during shuffle 10ae5cb829e2dc40ee50f5c15a8b6533')"

2023-03-28 14:55:04,869 - distributed.worker - WARNING - Compute Failed
Key:       ('assign-shuffle-transfer-10ae5cb829e2dc40ee50f5c15a8b6533', 17)
Function:  execute_task
args:      ((<function shuffle_transfer at 0x168bc3430>, (subgraph_callable-db1f36fe-7100-4a2d-b6d9-586b20a3b6b0, '_partitions', 'getitem-57f5ac6201341b78464c5b49b7eb3b97', ['x'], ([Timestamp('2000-01-18 00:00:00', freq='D'), Timestamp('2000-01-19 00:00:00', freq='D')], 944641070)), '10ae5cb829e2dc40ee50f5c15a8b6533', 17, 60, '_partitions'))
kwargs:    {}
Exception: "RuntimeError('shuffle_transfer failed during shuffle 10ae5cb829e2dc40ee50f5c15a8b6533')"

2023-03-28 14:55:04,871 - distributed.worker - WARNING - Compute Failed
Key:       ('assign-shuffle-transfer-10ae5cb829e2dc40ee50f5c15a8b6533', 1)
Function:  execute_task
args:      ((<function shuffle_transfer at 0x1226c4430>, (subgraph_callable-db1f36fe-7100-4a2d-b6d9-586b20a3b6b0, '_partitions', 'getitem-57f5ac6201341b78464c5b49b7eb3b97', ['x'], ([Timestamp('2000-01-02 00:00:00', freq='D'), Timestamp('2000-01-03 00:00:00', freq='D')], 865865470)), '10ae5cb829e2dc40ee50f5c15a8b6533', 1, 60, '_partitions'))
kwargs:    {}
Exception: "RuntimeError('shuffle_transfer failed during shuffle 10ae5cb829e2dc40ee50f5c15a8b6533')"

2023-03-28 14:55:04,871 - distributed.worker - WARNING - Compute Failed
Key:       ('assign-shuffle-transfer-10ae5cb829e2dc40ee50f5c15a8b6533', 14)
Function:  execute_task
args:      ((<function shuffle_transfer at 0x1226c4430>, (subgraph_callable-db1f36fe-7100-4a2d-b6d9-586b20a3b6b0, '_partitions', 'getitem-57f5ac6201341b78464c5b49b7eb3b97', ['x'], ([Timestamp('2000-01-15 00:00:00', freq='D'), Timestamp('2000-01-16 00:00:00', freq='D')], 1387517202)), '10ae5cb829e2dc40ee50f5c15a8b6533', 14, 60, '_partitions'))
kwargs:    {}
Exception: "RuntimeError('shuffle_transfer failed during shuffle 10ae5cb829e2dc40ee50f5c15a8b6533')"

2023-03-28 14:55:04,888 - distributed.core - ERROR - Exception while handling op shuffle_get_or_create
Traceback (most recent call last):
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/shuffle/_scheduler_extension.py", line 139, in get_or_create
    return self.get(id, worker)
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/shuffle/_scheduler_extension.py", line 127, in get
    state = self.states[id]
KeyError: '10ae5cb829e2dc40ee50f5c15a8b6533'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/core.py", line 828, in _handle_comm
    result = handler(**msg)
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/shuffle/_scheduler_extension.py", line 143, in get_or_create
    state = self._create_dataframe_shuffle_state(id, spec)
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/shuffle/_scheduler_extension.py", line 169, in _create_dataframe_shuffle_state
    part = ts.annotations["shuffle"]
KeyError: 'shuffle'
2023-03-28 14:55:04,889 - distributed.core - ERROR - Exception while handling op shuffle_get_or_create
Traceback (most recent call last):
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/shuffle/_scheduler_extension.py", line 139, in get_or_create
    return self.get(id, worker)
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/shuffle/_scheduler_extension.py", line 127, in get
    state = self.states[id]
KeyError: '10ae5cb829e2dc40ee50f5c15a8b6533'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/core.py", line 828, in _handle_comm
    result = handler(**msg)
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/shuffle/_scheduler_extension.py", line 143, in get_or_create
    state = self._create_dataframe_shuffle_state(id, spec)
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/shuffle/_scheduler_extension.py", line 169, in _create_dataframe_shuffle_state
    part = ts.annotations["shuffle"]
KeyError: 'shuffle'
2023-03-28 14:55:04,889 - distributed.core - ERROR - Exception while handling op shuffle_get_or_create
Traceback (most recent call last):
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/shuffle/_scheduler_extension.py", line 139, in get_or_create
    return self.get(id, worker)
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/shuffle/_scheduler_extension.py", line 127, in get
    state = self.states[id]
KeyError: '10ae5cb829e2dc40ee50f5c15a8b6533'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/core.py", line 828, in _handle_comm
    result = handler(**msg)
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/shuffle/_scheduler_extension.py", line 143, in get_or_create
    state = self._create_dataframe_shuffle_state(id, spec)
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/shuffle/_scheduler_extension.py", line 169, in _create_dataframe_shuffle_state
    part = ts.annotations["shuffle"]
KeyError: 'shuffle'
2023-03-28 14:55:04,891 - distributed.worker - WARNING - Compute Failed
Key:       ('assign-shuffle-transfer-10ae5cb829e2dc40ee50f5c15a8b6533', 18)
Function:  execute_task
args:      ((<function shuffle_transfer at 0x118853430>, (subgraph_callable-db1f36fe-7100-4a2d-b6d9-586b20a3b6b0, '_partitions', 'getitem-57f5ac6201341b78464c5b49b7eb3b97', ['x'], ([Timestamp('2000-01-19 00:00:00', freq='D'), Timestamp('2000-01-20 00:00:00', freq='D')], 671510192)), '10ae5cb829e2dc40ee50f5c15a8b6533', 18, 60, '_partitions'))
kwargs:    {}
Exception: "RuntimeError('shuffle_transfer failed during shuffle 10ae5cb829e2dc40ee50f5c15a8b6533')"

Traceback (most recent call last):
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/shuffle/_shuffle.py", line 61, in shuffle_transfer
2023-03-28 14:55:04,892 - distributed.worker - WARNING - Compute Failed
Key:       ('assign-shuffle-transfer-10ae5cb829e2dc40ee50f5c15a8b6533', 20)
Function:  execute_task
args:      ((<function shuffle_transfer at 0x119d43430>, (subgraph_callable-db1f36fe-7100-4a2d-b6d9-586b20a3b6b0, '_partitions', 'getitem-57f5ac6201341b78464c5b49b7eb3b97', ['x'], ([Timestamp('2000-01-21 00:00:00', freq='D'), Timestamp('2000-01-22 00:00:00', freq='D')], 1583989252)), '10ae5cb829e2dc40ee50f5c15a8b6533', 20, 60, '_partitions'))
kwargs:    {}
Exception: "RuntimeError('shuffle_transfer failed during shuffle 10ae5cb829e2dc40ee50f5c15a8b6533')"

    return _get_worker_extension().add_partition(
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/shuffle/_worker_extension.py", line 631, in add_partition
    shuffle = self.get_or_create_shuffle(shuffle_id, type=type, **kwargs)
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/shuffle/_worker_extension.py", line 886, in get_or_create_shuffle
2023-03-28 14:55:04,893 - distributed.worker - WARNING - Compute Failed
Key:       ('assign-shuffle-transfer-10ae5cb829e2dc40ee50f5c15a8b6533', 2)
Function:  execute_task
args:      ((<function shuffle_transfer at 0x11e203430>, (subgraph_callable-db1f36fe-7100-4a2d-b6d9-586b20a3b6b0, '_partitions', 'getitem-57f5ac6201341b78464c5b49b7eb3b97', ['x'], ([Timestamp('2000-01-03 00:00:00', freq='D'), Timestamp('2000-01-04 00:00:00', freq='D')], 1938189528)), '10ae5cb829e2dc40ee50f5c15a8b6533', 2, 60, '_partitions'))
kwargs:    {}
Exception: "RuntimeError('shuffle_transfer failed during shuffle 10ae5cb829e2dc40ee50f5c15a8b6533')"

    return sync(
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/utils.py", line 416, in sync
    raise exc.with_traceback(tb)
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/utils.py", line 389, in f
    result = yield future
  File "/opt/homebrew/Caskroom/mambaforge/base/envs/dask-distributed-3.8/lib/python3.8/site-packages/tornado/gen.py", line 769, in run
    value = future.result()
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/shuffle/_worker_extension.py", line 712, in _get_or_create_shuffle
    shuffle = await self._refresh_shuffle(
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/shuffle/_worker_extension.py", line 757, in _refresh_shuffle
    result = await self.worker.scheduler.shuffle_get_or_create(
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/core.py", line 1265, in send_recv_from_rpc
    return await send_recv(comm=comm, op=key, **kwargs)
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/core.py", line 1049, in send_recv
    raise exc.with_traceback(tb)
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/core.py", line 828, in _handle_comm
    result = handler(**msg)
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/shuffle/_scheduler_extension.py", line 143, in get_or_create
    state = self._create_dataframe_shuffle_state(id, spec)
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/shuffle/_scheduler_extension.py", line 169, in _create_dataframe_shuffle_state
    part = ts.annotations["shuffle"]
KeyError: 'shuffle'

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "foo.py", line 23, in <module>
    foo()
  File "foo.py", line 19, in foo
    dd.compute(df)
  File "/Users/hendrikmakait/projects/dask/dask/dask/base.py", line 599, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/client.py", line 3198, in get
    results = self.gather(packed, asynchronous=asynchronous, direct=direct)
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/client.py", line 2345, in gather
    return self.sync(
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/utils.py", line 349, in sync
    return sync(
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/utils.py", line 416, in sync
    raise exc.with_traceback(tb)
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/utils.py", line 389, in f
    result = yield future
  File "/opt/homebrew/Caskroom/mambaforge/base/envs/dask-distributed-3.8/lib/python3.8/site-packages/tornado/gen.py", line 769, in run
    value = future.result()
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/client.py", line 2208, in _gather
    raise exception.with_traceback(traceback)
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/shuffle/_shuffle.py", line 70, in shuffle_transfer
    raise RuntimeError(f"shuffle_transfer failed during shuffle {id}") from e
RuntimeError: shuffle_transfer failed during shuffle 10ae5cb829e2dc40ee50f5c15a8b6533
@hendrikmakait hendrikmakait added bug Something is broken shuffle labels Mar 28, 2023
@hendrikmakait hendrikmakait changed the title P2P shuffling loses annotations when followed by dd.compute(df.values) P2P shuffling loses annotations when dataframe is later transformed to array withdf.values Mar 28, 2023
@hendrikmakait
Copy link
Member Author

@lixfz: This issue seems to be another incarnation of dask/dask#7036. By calling .values, the dataframe gets transformed to a dask.array, which causes the low-level fusion optimization to be activated. For now, you can avoid this behaviour by deactivating low-level fusion manually through the config:

The above example can be fixed like this:

from __future__ import annotations

import dask
import dask.dataframe as dd
from dask.distributed import Client, LocalCluster


def foo():
    cluster = LocalCluster()
    client = Client(cluster)
    df = dask.datasets.timeseries(
        start="2000-01-01",
        end="2000-03-01",
        dtypes={"x": float, "y": str},
        freq="10 s",
    )
    df = dd.shuffle.shuffle(df, "x")
    with dask.config.set({"optimization.fuse.active": None}):
        arr = df.values
        dd.compute(arr)


if __name__ == "__main__":
    foo()

cc @fjetter

@jrbourbeau
Copy link
Member

Closing as #7801 resolves this issue by bypassing annotations altogether. Just checked the original code snippet on main and it not longer errors. Thanks @hendrikmakait

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something is broken shuffle
Projects
None yet
Development

No branches or pull requests

2 participants