Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Missing resample() implementation for grouped data frame #1134

Open
dbalabka opened this issue Sep 16, 2024 · 10 comments
Open

Missing resample() implementation for grouped data frame #1134

dbalabka opened this issue Sep 16, 2024 · 10 comments

Comments

@dbalabka
Copy link

Describe the issue:

import pandas as pd
import dask.dataframe as dd

# Create a sample DataFrame with 'id' and 'date' columns
data = {
    'id': [1, 1, 1, 2, 2, 2],
    'date': pd.to_datetime(['2023-01-01', '2023-01-04', '2023-01-05', '2023-01-01', '2023-01-04', '2023-01-05'])
}
df = dd.DataFrame(data)

# Group by 'id' and resample to daily frequency
result = df.groupby(by=['id']).resample("D", on="date").compute()

# Print the resulting DataFrame
print(result)
---------------------------------------------------------------------------
KeyError                                  Traceback (most recent call last)
File ~/src/.venv/lib/python3.10/site-packages/dask_expr/_groupby.py:1593, in GroupBy.__getattr__(self, key)
   1592 try:
-> 1593     return self[key]
   1594 except KeyError as e:

File ~/src/.venv/lib/python3.10/site-packages/dask_expr/_groupby.py:1615, in GroupBy.__getitem__(self, key)
   1614 if is_scalar(key):
-> 1615     return SeriesGroupBy(
   1616         self.obj,
   1617         by=self.by,
   1618         slice=key,
   1619         sort=self.sort,
   1620         dropna=self.dropna,
   1621         observed=self.observed,
   1622     )
   1623 g = GroupBy(
   1624     self.obj,
   1625     by=self.by,
   (...)
   1630     group_keys=self.group_keys,
   1631 )

File ~/src/.venv/lib/python3.10/site-packages/dask_expr/_groupby.py:2214, in SeriesGroupBy.__init__(self, obj, by, sort, observed, dropna, slice)
   2212         obj._meta.groupby(by, **_as_dict("observed", observed))
-> 2214 super().__init__(
   2215     obj, by=by, slice=slice, observed=observed, dropna=dropna, sort=sort
   2216 )

File ~/src/.venv/lib/python3.10/site-packages/dask_expr/_groupby.py:1558, in GroupBy.__init__(self, obj, by, group_keys, sort, observed, dropna, slice)
   1557     slice = list(slice)
-> 1558 self._meta = self._meta[slice]

File ~/src/.venv/lib/python3.10/site-packages/pandas/core/groupby/generic.py:1951, in DataFrameGroupBy.__getitem__(self, key)
   1947     raise ValueError(
   1948         "Cannot subset columns with a tuple with more than one element. "
   1949         "Use a list instead."
   1950     )
-> 1951 return super().__getitem__(key)

File ~/src/.venv/lib/python3.10/site-packages/pandas/core/base.py:244, in SelectionMixin.__getitem__(self, key)
    243 if key not in self.obj:
--> 244     raise KeyError(f"Column not found: {key}")
    245 ndim = self.obj[key].ndim

KeyError: 'Column not found: resample'

Minimal Complete Verifiable Example:

# Put your MCVE code here

Environment:

  • Dask version: 2024.8.0
  • dask-expr==1.1.10
  • Python version: 3.10
  • Operating System: WSL
  • Install method (conda, pip, source): poetry
@dbalabka dbalabka changed the title resample() on group rises weired error Missing resample() implementation for grouped data frame Sep 16, 2024
@phofl
Copy link
Collaborator

phofl commented Sep 16, 2024

Thanks for the report.

I think resample isn't implemented for groupby in Dask. The error message could certainly be better, adding it would also be fine

@dbalabka
Copy link
Author

dbalabka commented Sep 16, 2024

@phofl , thanks for the quick reply. Is there any workaround to run arbitrary Pandas functions on groups, like map_partition? Due to the group being distributed over a cluster, I need to make something smarter. The resample needs to have all rows in one partition to fill the gaps.

@phofl
Copy link
Collaborator

phofl commented Sep 16, 2024

Do you want the whole group in a single partition? If yes, you can use groupby.apply / groupby.transform

@dbalabka
Copy link
Author

dbalabka commented Sep 17, 2024

@phofl, it works, thanks:

import pandas as pd
import dask.dataframe as dd
data = {
    'id': [1, 1, 1, 2, 2, 2],
    'date': pd.to_datetime(['2023-01-01', '2023-01-04', '2023-01-05', '2023-01-01', '2023-01-04', '2023-01-05']),
    'metric': [1,1,1,1,1,1]
}
df = dd.from_pandas(pd.DataFrame(data).astype({'id': 'int64[pyarrow]', 'metric': 'int64[pyarrow]', 'date': 'timestamp[ns][pyarrow]'}))

print(
    df
    .groupby(by=['id'])
    .apply(lambda x: x.resample("D", on="date").sum(), include_groups=False, meta={"metric": "int64[pyarrow]"})
    .reset_index()
)

@dbalabka
Copy link
Author

FYI, for those who came across this ticket. It was a bit unexpected for me that Dask keeps one group in a single partition, which means we can lead to OOM if a group is too large, and we should keep it in mind while grouping. Otherwise, we should do this:
https://stackoverflow.com/a/55881591/983577

@phofl
Copy link
Collaborator

phofl commented Sep 17, 2024

Apply and transform are doing this specifically, there is no way around that fwiw

@dbalabka
Copy link
Author

@phofl , I've spent a lot of time working around the missing resample method. Found another bug that prevents me from using apply on groups:
dask/dask#11394

@phofl
Copy link
Collaborator

phofl commented Sep 17, 2024

that's odd, thanks for digging these up, I'll try to take a look tomorrow

@dbalabka
Copy link
Author

dbalabka commented Sep 18, 2024

@phofl, it might be related to a bug in pandas that I spoted during this investigation: pandas-dev/pandas#59823
Pandas produces the empty column name that can affect Dask logic

@dbalabka
Copy link
Author

dbalabka commented Sep 19, 2024

Here is a work around that only work in my case:

import pandas as pd
import dask.dataframe as dd
data = {
    'id': [1, 1, 1, 2, 2, 2],
    'date': pd.to_datetime(['2023-01-01', '2023-01-04', '2023-01-05', '2023-01-01', '2023-01-04', '2023-01-05']),
    'metric': [1,1,1,1,1,1]
}
df = dd.from_pandas(pd.DataFrame(data).astype({'id': 'int64[pyarrow]', 'metric': 'int64[pyarrow]', 'date': 'timestamp[ns][pyarrow]'}))

print(
    df
    # Partitioning by id as a replacement for groupby
    .set_index('id')
    # See bug: https://github.com/pandas-dev/pandas/issues/59823
    .astype({'date': 'datetime64[ns]'})
    # Apply the required Pandas function on each partition. Previously, set index guarantee us that each partition has all required rows
    .map_partitions(
        lambda x: x.groupby('id').resample("D", on="date").sum().reset_index(), 
        meta={'id': 'int64[pyarrow]', 'date': 'timestamp[ns][pyarrow]', 'metric': 'int64[pyarrow]'},
    )
    # Remove unnecessary index
    .reset_index(drop=True)
    .compute()
)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants