Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dask] Object store fills up too quickly in simple processing script #11565

Open
richardliaw opened this issue Oct 23, 2020 · 23 comments
Open

[dask] Object store fills up too quickly in simple processing script #11565

richardliaw opened this issue Oct 23, 2020 · 23 comments
Labels
bug Something that is supposed to be working; but isn't P2 Important issue, but not time-critical

Comments

@richardliaw
Copy link
Contributor

What is the problem?

We get an OOM though the working set is much smaller than expected.

ray.exceptions.ObjectStoreFullError: Failed to put object ad6a921e910a8533ffffffff0100000001000000 in object store because it is full. Object size is 993654894 bytes.
The local object store is full of objects that are still in scope and cannot be evicted. Tip: Use the `ray memory` command to list active objects in the cluster.
Traceback (most recent call last):
  File "preprocess.py", line 68, in <module>
    res.compute(scheduler=ray_dask_get)
  File "/Users/X/ludwig/env/lib/python3.7/site-packages/dask/base.py", line 167, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/Users/X/ludwig/env/lib/python3.7/site-packages/dask/base.py", line 452, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/Users/X/ludwig/env/lib/python3.7/site-packages/ray/util/dask/scheduler.py", line 118, in ray_dask_get
    result = ray_get_unpack(object_refs)
  File "/Users/X/ludwig/env/lib/python3.7/site-packages/ray/util/dask/scheduler.py", line 357, in ray_get_unpack
    computed_result = ray.get(object_refs)
  File "/Users/X/ludwig/env/lib/python3.7/site-packages/ray/worker.py", line 1428, in get
    raise value.as_instanceof_cause()

Reproduction (REQUIRED)

import os
from collections import deque
import pandas as pd
import ray
from ray.util.dask import ray_dask_get
import dask.dataframe as dd
from dask import delayed
ray.init()
movie_titles = dd.read_csv('./movie_titles.csv', 
                           encoding = 'ISO-8859-1', 
                           header = None, 
                           names=['Movie', 'Year', 'Name']).set_index('Movie')
def read_and_label_csv(filename):
    # Load single data-file
    df_raw = pd.read_csv(filename, header=None, names=['User', 'Rating', 'Date'], usecols=[0, 1, 2])
    # Find empty rows to slice dataframe for each movie
    tmp_movies = df_raw[df_raw['Rating'].isna()]['User'].reset_index()
    movie_indices = [[index, int(movie[:-1])] for index, movie in tmp_movies.values]
    # Shift the movie_indices by one to get start and endpoints of all movies
    shifted_movie_indices = deque(movie_indices)
    shifted_movie_indices.rotate(-1)
    # Gather all dataframes
    user_data = []
    # Iterate over all movies
    for [df_id_1, movie_id], [df_id_2, next_movie_id] in zip(movie_indices, shifted_movie_indices):
        # Check if it is the last movie in the file
        if df_id_1<df_id_2:
            tmp_df = df_raw.loc[df_id_1+1:df_id_2-1].copy()
        else:
            tmp_df = df_raw.loc[df_id_1+1:].copy()
        # Create movie_id column
        tmp_df['Movie'] = movie_id
        # Append dataframe to list
        user_data.append(tmp_df)
    # Combine all dataframes
    df = pd.concat(user_data)
    del user_data, df_raw, tmp_movies, tmp_df, shifted_movie_indices, movie_indices, df_id_1, movie_id, df_id_2, next_movie_id
    return df
# create a list of functions ready to return a pandas.DataFrame
file_list = [f'./combined_data_{i+1}.txt' for i in range(4)]
dfs = [delayed(read_and_label_csv)(fname) for fname in file_list]
# using delayed, assemble the pandas.DataFrames into a dask.DataFrame
ratings = dd.from_delayed(dfs)
ratings = ratings.repartition(12)
movie_titles = movie_titles.repartition(npartitions=1)
dataset = ratings.merge(movie_titles, how='inner', left_on='Movie', right_on='Movie')
os.makedirs('dataset.parquet')
res = dataset.to_parquet('dataset.parquet', compute=False)
res.compute(scheduler=ray_dask_get)

Using: the netflix dataset: https://www.kaggle.com/netflix-inc/netflix-prize-data

cc @tgaddair

@richardliaw richardliaw added bug Something that is supposed to be working; but isn't triage Needs triage (eg: priority, bug/not-bug, and owning component) labels Oct 23, 2020
@richardliaw richardliaw changed the title [dask] Object store fills up too quickly when [dask] Object store fills up too quickly in simple processing script Oct 23, 2020
@richardliaw
Copy link
Contributor Author

cc @clarkzinzow

@richardliaw richardliaw added P2 Important issue, but not time-critical and removed triage Needs triage (eg: priority, bug/not-bug, and owning component) labels Oct 23, 2020
@tgaddair
Copy link
Contributor

tgaddair commented Oct 23, 2020

Thanks for raising this issue @richardliaw.

For more context @clarkzinzow, I'm finding that even with a dataset 1/12 the size (157M), this error is occurring when attempting to write to Parquet (but not, interestingly, when calling compute() with the Ray scheduler). So it appears to be something related to the serialization / parquet writing procedure when using the Ray scheduler.

Let me know if you need more info from me. This is currently a blocker for me, so happy to help in any way I can.

@tgaddair
Copy link
Contributor

tgaddair commented Oct 23, 2020

I take that back, I'm actually seeing this for compute() calls with Ray as well. Will keep digging into this to find a simpler repro for you.

@clarkzinzow
Copy link
Contributor

clarkzinzow commented Oct 23, 2020

In my attempt to reproduce, I'm getting a lot of segfaults, not sure what the cause is yet. I'll keep investigating.

Let me know if you are able to create a simpler reproduction, that might help me see through a lot of the noise here.

@clarkzinzow
Copy link
Contributor

Hmm interesting, computing this without using the Dask-on-Ray scheduler crashed my dev VM! Maybe there's an underlying Dask or Parquet issue at work here. Are you able to run this workload to completion with a non-Ray scheduler? How large of a machine are you running this on?

@tgaddair
Copy link
Contributor

Yes, I can run without the Ray scheduler. I'm using a MacBook Pro with 16GB of RAM.

@richardliaw
Copy link
Contributor Author

richardliaw commented Oct 23, 2020

I'm able to run this script on a MBP with 64GB of Ram. I can reliably produce failure with:

GB = 1024 ** 3
ray.init(_memory=4*GB, object_store_memory=2*GB)

This produces a ray.cluster_resources() of:

{'CPU': 16.0, 'memory': 81.0, 'object_store_memory': 28.0, 'node:192.168.1.115': 1.0}

Which is similar to Travis' output of:

{'object_store_memory': 33.0,
 'CPU': 12.0,
 'node:192.168.4.54': 1.0,
 'memory': 97.0}

@clarkzinzow
Copy link
Contributor

clarkzinzow commented Oct 23, 2020

@tgaddair @richardliaw, which versions of Ray, Dask, and Pandas are y'all on?

@tgaddair
Copy link
Contributor

ray==1.0.0
dask==2.28.0
pandas==1.1.2

Thanks @clarkzinzow for continuing to look into this!

@richardliaw
Copy link
Contributor Author

ray==master
dask==2.28
pandas==1.0.5

thanks a bunch @clarkzinzow !

@clarkzinzow
Copy link
Contributor

I'm still working on getting to the bottom of the segfaults on Parquet table writing that I keep hitting, which is concerning in and of itself. I'll try to poke at this a bit more tomorrow.

@clarkzinzow
Copy link
Contributor

I'm assuming that both of y'all have fastparquet installed, and that Dask is using fastparquet and not pyarrow as the Parquet table writer? I get consistent segfaults with pyarrow as the Parquet I/O implementation, and the object store fills up with fastparquet.

@richardliaw
Copy link
Contributor Author

@clarkzinzow nope didn't know that was a thing! trying it out now.

@clarkzinzow
Copy link
Contributor

@richardliaw Interesting, not sure why the pyarrow Parquet implementation is segfaulting for me and not you. What version of pyarrow are you using?

@richardliaw
Copy link
Contributor Author

pyarrow                       1.0.1

though maybe we bundle it internally?

@clarkzinzow
Copy link
Contributor

though maybe we bundle it internally?

Not anymore! 😁

@richardliaw
Copy link
Contributor Author

ah nice! Let me know what other information you need to help debug your segfault.

@clarkzinzow
Copy link
Contributor

I'll probably end up opening a separate issue for the segfaults since it seems to be specific to using the pyarrow Parquet writer on Linux. Given that I have a workaround (using the fastparquet writer instead), I'm going to ignore the segfaults and keep investigating the object store OOMs so we can hopefully unblock @tgaddair's work.

@tgaddair
Copy link
Contributor

Hey @clarkzinzow, sorry for the delayed response. I'm actually using pyarrow, I don't have fastparquet installed. In my case, we use Pyarrow to read the data after Dask writes it, so we may need to stick with pyarrow.

@tgaddair
Copy link
Contributor

I'm using pyarrow 2.0.0 by the way.

@clarkzinzow
Copy link
Contributor

clarkzinzow commented Oct 27, 2020

@tgaddair Sweet, thanks! And if pyarrow isn't segfaulting for you, then no need to switch to fastparquet! Given that the OOMs are happening even without writing to Parquet files (just computing dataset), I'm considering it safe for me to debug the bloated object store while using fastparquet as the writer, with the hope that the solution will transfer.

So, despite the source dataset being only 2GiB, I think that the working set for this load + repartition + merge is fundamentally large due to Dask's partitioned dataframe implementation (not due to the implementation of the Dask-on-Ray scheduler), and that this results in a failure when using the Ray scheduler because the working set is limited to the allocated object store memory instead of the memory capacity of the entire machine. Running this sans the Ray scheduler yields a peak memory utilization of 20GiB, a lot of which is a working set of intermediate task outputs that must be materialized in memory concurrently. If you have allocated less than 6GiB to the object store, then you can expect to see OOMs since that object store memory budget can't meet the peak demand.

Until the underlying bloated memory utilization is fixed, you could try allocating e.g. 14GiB of memory to Ray and 7GiB to the object store in particular via

GB = 1024 ** 3
ray.init(_memory=14*GB, object_store_memory=7*GB)

to see if you have better luck, or try running this on a cloud instance or a set of cloud instances with more memory available.

If I'm able to confirm that there are some unnecessary dataframe copies happening in Dask's dataframe implementation, then I'll open an issue on the Dask repo, and I'll double-check to make sure that the object store is freeing unused intermediate task outputs at the expected times.

@richardliaw
Copy link
Contributor Author

@clarkzinzow thanks a bunch for this thorough evaluation!

So from what I understand, the same memory utilization occurs on both workloads, but the difference is that Dask does not have "memory limits" while Ray enforces memory limits. Is that correct?

@clarkzinzow
Copy link
Contributor

@richardliaw That's my current theory, yes! I do want to find out why the working set is so large during repartitioning and merging, and whether intermediate objects are being garbage collected in a timely manner (which I currently think they are, just want to double-check). Hopefully I'll have some time tomorrow to dig further and open up some issues.

@rkooo567 rkooo567 self-assigned this Feb 2, 2021
@rkooo567 rkooo567 removed their assignment Dec 8, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something that is supposed to be working; but isn't P2 Important issue, but not time-critical
Projects
None yet
Development

No branches or pull requests

4 participants