Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JIT-unspill: support spilling to/from disk #708

Merged
merged 21 commits into from
Sep 22, 2021

Conversation

madsbk
Copy link
Member

@madsbk madsbk commented Aug 20, 2021

Closes #657 by implementing support of spilling to/from disk.
In this first iteration, we still only track CUDA objects thus regular CPU objects, such as ndarrays, are not spilled.

Spilling to disk is enabled by default and has the same parameters as DeviceHostFile. An new parameter shared_filesystem specifies whether the local_directory is shared between all workers or not. Normally this defaults to False but in the case of LocalCUDACluster it defaults to True.

"""
    Parameters
    ----------
    device_memory_limit: int
        Number of bytes of CUDA device memory used before spilling to host.
    host_memory_limit: int
        Number of bytes of host memory used before spilling to disk.
    local_directory: str or None, default None
        Path on local machine to store temporary files. Can be a string (like
        ``"path/to/files"``) or ``None`` to fall back on the value of
        ``dask.temporary-directory`` in the local Dask configuration, using the
        current working directory if this is not set.
        WARNING, this **cannot** change while running thus all serialization to
        disk are using the same directory.
    shared_filesystem: bool or None, default None
        Whether the `local_directory` above is shared between all workers or not.
        If ``None``, the "jit-unspill-shared-fs" config value are used, which
        defaults to False.
        Notice, a shared filesystem must support the `os.link()` operation.
"""

@github-actions github-actions bot added the python python code needed label Aug 20, 2021
@madsbk madsbk added 2 - In Progress Currently a work in progress improvement Improvement / enhancement to an existing function non-breaking Non-breaking change labels Aug 20, 2021
@codecov-commenter
Copy link

codecov-commenter commented Aug 20, 2021

Codecov Report

Merging #708 (6f849e1) into branch-21.10 (8e6ab70) will increase coverage by 1.52%.
The diff coverage is 90.77%.

Impacted file tree graph

@@               Coverage Diff                @@
##           branch-21.10     #708      +/-   ##
================================================
+ Coverage         87.63%   89.16%   +1.52%     
================================================
  Files                15       15              
  Lines              1658     1901     +243     
================================================
+ Hits               1453     1695     +242     
- Misses              205      206       +1     
Impacted Files Coverage Δ
dask_cuda/cuda_worker.py 77.64% <ø> (ø)
dask_cuda/get_device_memory_objects.py 90.00% <0.00%> (+21.94%) ⬆️
dask_cuda/utils.py 81.74% <65.95%> (-5.53%) ⬇️
dask_cuda/local_cuda_cluster.py 78.30% <80.00%> (+0.41%) ⬆️
dask_cuda/proxify_device_objects.py 95.45% <80.00%> (+6.56%) ⬆️
dask_cuda/initialize.py 90.24% <84.00%> (+1.35%) ⬆️
dask_cuda/proxify_host_file.py 93.55% <93.63%> (-5.84%) ⬇️
dask_cuda/proxy_object.py 91.31% <98.82%> (+1.66%) ⬆️
dask_cuda/cli/dask_cuda_worker.py 97.18% <100.00%> (+1.46%) ⬆️
dask_cuda/device_host_file.py 71.66% <100.00%> (+1.50%) ⬆️
... and 12 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 859c86d...6f849e1. Read the comment docs.

rapids-bot bot pushed a commit that referenced this pull request Sep 8, 2021
This PR introduce a `ProxyManager` that replaces the current implementation of proxy tracking:
```python
class ProxyManager:
    """
    This class together with Proxies, ProxiesOnHost, and ProxiesOnDevice
    implements the tracking of all known proxies and their total host/device
    memory usage. It turns out having to re-calculate memory usage continuously
    is too expensive.

    The idea is to have the ProxifyHostFile or the proxies themself update
    their location (device or host). The manager then tallies the total memory usage.

    Notice, the manager only keeps weak references to the proxies.
    """
```
Additionally, this PR fixes a rare deadlock by having all proxies and the `ProxyManager` use the same lock. Finally, this PR will make it much easier to implement spilling to disk: #708.

Notice, from the user's perspective, this PR shouldn't change anything.

Authors:
  - Mads R. B. Kristensen (https://github.com/madsbk)

Approvers:
  - Peter Andreas Entschev (https://github.com/pentschev)

URL: #712
@madsbk
Copy link
Member Author

madsbk commented Sep 10, 2021

@VibhuJawa @randerzander @ChrisJar, FYI this PR is ready for testing. I am currently debugging a ValueError: recursion limit exceeded. error that happens sometimes on shutdown but other than that, everything should be working.

@madsbk madsbk marked this pull request as ready for review September 14, 2021 12:23
@madsbk madsbk requested a review from a team as a code owner September 14, 2021 12:23
@madsbk madsbk changed the title [WIP] JIT-unspill: support spilling to/from disk JIT-unspill: support spilling to/from disk Sep 14, 2021
@madsbk madsbk added 3 - Ready for Review Ready for review by team and removed 2 - In Progress Currently a work in progress labels Sep 14, 2021
@madsbk
Copy link
Member Author

madsbk commented Sep 14, 2021

@VibhuJawa @randerzander @ChrisJar, FYI this PR is ready for testing. I am currently debugging a ValueError: recursion limit exceeded. error that happens sometimes on shutdown but other than that, everything should be working.

After much debugging I think this is because of the performance_report and not because of JIT-unspill. I see the error sometimes when JIT-unspil is disabled.

Copy link
Member

@pentschev pentschev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @madsbk for the work here. This looks great, and a very clever solution! I've added a few comments and suggestions.

dask_cuda/proxify_host_file.py Outdated Show resolved Hide resolved
dask_cuda/proxy_object.py Show resolved Hide resolved
dask_cuda/proxy_object.py Outdated Show resolved Hide resolved
dask_cuda/proxy_object.py Outdated Show resolved Hide resolved
dask_cuda/proxy_object.py Outdated Show resolved Hide resolved
dask_cuda/proxy_object.py Outdated Show resolved Hide resolved
dask_cuda/tests/test_proxy.py Show resolved Hide resolved
dask_cuda/cli/dask_cuda_worker.py Outdated Show resolved Hide resolved
dask_cuda/proxify_host_file.py Show resolved Hide resolved
dask_cuda/proxify_host_file.py Show resolved Hide resolved
@VibhuJawa
Copy link
Member

VibhuJawa commented Sep 16, 2021

On this PR i got the below error some times when running the workflow at #657 (comment) on 8 GPUs of a dgx-2 with n_parts=1500 .

/raid/vjawa/conda/envs/rapids-21.10-sep-10-dask-sql-gpu-bdb/lib/python3.8/site-packages/dask_cudf/core.py:312: UserWarning: Rearranging data by column hash. Divisions will lost. Set ignore_index=False to preserve Index values.
  warnings.warn(
distributed.worker - WARNING - Compute Failed
Function:  <dask.layers.CallableLazyImport object at 0x7f07ca
args:      ([<dask_cuda.proxify_device_objects.FrameProxyObject at 0x7f07c13c5c00 of cudf.core.dataframe.DataFrame (serialized='dask')>, <dask_cuda.proxify_device_objects.FrameProxyObject at 0x7f07c8d4b240 of cudf.core.dataframe.DataFrame at 0x7f07c1897b80>, <dask_cuda.proxify_device_objects.FrameProxyObject at 0x7f07c96ae540 of cudf.core.dataframe.DataFrame (serialized='cuda')>, <dask_cuda.proxify_device_objects.FrameProxyObject at 0x7f07bee35f80 of cudf.core.dataframe.DataFrame (serialized='cuda')>, <dask_cuda.proxify_device_objects.FrameProxyObject at 0x7f07c1690f40 of cudf.core.dataframe.DataFrame (serialized='cuda')>, <dask_cuda.proxify_device_objects.FrameProxyObject at 0x7f07bee21380 of cudf.core.dataframe.DataFrame (serialized='cuda')>, <dask_cuda.proxify_device_objects.FrameProxyObject at 0x7f07c1d277c0 of cudf.core.dataframe.DataFrame (serialized='cuda')>, <dask_cuda.proxify_device_objects.FrameProxyObject at 0x7f07bed14f80 of cudf.core.dataframe.DataFrame (serialized='cuda')>, <dask_cu
kwargs:    {}
Exception: RuntimeError('dictionary changed size during iteration')

Full Trace:

/raid/vjawa/conda/envs/rapids-21.10-sep-10-dask-sql-gpu-bdb/lib/python3.8/site-packages/dask_cudf/core.py:312: UserWarning: Rearranging data by column hash. Divisions will lost. Set ignore_index=False to preserve Index values.
  warnings.warn(
distributed.worker - WARNING - Compute Failed
Function:  <dask.layers.CallableLazyImport object at 0x7f07ca
args:      ([<dask_cuda.proxify_device_objects.FrameProxyObject at 0x7f07c13c5c00 of cudf.core.dataframe.DataFrame (serialized='dask')>, <dask_cuda.proxify_device_objects.FrameProxyObject at 0x7f07c8d4b240 of cudf.core.dataframe.DataFrame at 0x7f07c1897b80>, <dask_cuda.proxify_device_objects.FrameProxyObject at 0x7f07c96ae540 of cudf.core.dataframe.DataFrame (serialized='cuda')>, <dask_cuda.proxify_device_objects.FrameProxyObject at 0x7f07bee35f80 of cudf.core.dataframe.DataFrame (serialized='cuda')>, <dask_cuda.proxify_device_objects.FrameProxyObject at 0x7f07c1690f40 of cudf.core.dataframe.DataFrame (serialized='cuda')>, <dask_cuda.proxify_device_objects.FrameProxyObject at 0x7f07bee21380 of cudf.core.dataframe.DataFrame (serialized='cuda')>, <dask_cuda.proxify_device_objects.FrameProxyObject at 0x7f07c1d277c0 of cudf.core.dataframe.DataFrame (serialized='cuda')>, <dask_cuda.proxify_device_objects.FrameProxyObject at 0x7f07bed14f80 of cudf.core.dataframe.DataFrame (serialized='cuda')>, <dask_cu
kwargs:    {}
Exception: RuntimeError('dictionary changed size during iteration')

---------------------------------------------------------------------------
RuntimeError                              Traceback (most recent call last)
/tmp/ipykernel_96220/4026179359.py in <module>
----> 1 main(client,data_path,n_input_parts=n_parts)

/tmp/ipykernel_96220/344834194.py in main(client, data_path, n_input_parts)
     93     reviewed_sales = reviewed_sales.persist()
     94     wait(reviewed_sales)
---> 95     print(len(reviewed_sales))
     96 
     97     return

/raid/vjawa/conda/envs/rapids-21.10-sep-10-dask-sql-gpu-bdb/lib/python3.8/site-packages/dask/dataframe/core.py in __len__(self)
   3957             return super().__len__()
   3958         else:
-> 3959             return len(s)
   3960 
   3961     def __contains__(self, key):

/raid/vjawa/conda/envs/rapids-21.10-sep-10-dask-sql-gpu-bdb/lib/python3.8/site-packages/dask/dataframe/core.py in __len__(self)
    590 
    591     def __len__(self):
--> 592         return self.reduction(
    593             len, np.sum, token="len", meta=int, split_every=False
    594         ).compute()

/raid/vjawa/conda/envs/rapids-21.10-sep-10-dask-sql-gpu-bdb/lib/python3.8/site-packages/dask/base.py in compute(self, **kwargs)
    286         dask.base.compute
    287         """
--> 288         (result,) = compute(self, traverse=False, **kwargs)
    289         return result
    290 

/raid/vjawa/conda/envs/rapids-21.10-sep-10-dask-sql-gpu-bdb/lib/python3.8/site-packages/dask/base.py in compute(*args, **kwargs)
    568         postcomputes.append(x.__dask_postcompute__())
    569 
--> 570     results = schedule(dsk, keys, **kwargs)
    571     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    572 

/raid/vjawa/conda/envs/rapids-21.10-sep-10-dask-sql-gpu-bdb/lib/python3.8/site-packages/distributed/client.py in get(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   2687                     should_rejoin = False
   2688             try:
-> 2689                 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
   2690             finally:
   2691                 for f in futures.values():

/raid/vjawa/conda/envs/rapids-21.10-sep-10-dask-sql-gpu-bdb/lib/python3.8/site-packages/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
   1964             else:
   1965                 local_worker = None
-> 1966             return self.sync(
   1967                 self._gather,
   1968                 futures,

/raid/vjawa/conda/envs/rapids-21.10-sep-10-dask-sql-gpu-bdb/lib/python3.8/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    858             return future
    859         else:
--> 860             return sync(
    861                 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    862             )

/raid/vjawa/conda/envs/rapids-21.10-sep-10-dask-sql-gpu-bdb/lib/python3.8/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
    324     if error[0]:
    325         typ, exc, tb = error[0]
--> 326         raise exc.with_traceback(tb)
    327     else:
    328         return result[0]

/raid/vjawa/conda/envs/rapids-21.10-sep-10-dask-sql-gpu-bdb/lib/python3.8/site-packages/distributed/utils.py in f()
    307             if callback_timeout is not None:
    308                 future = asyncio.wait_for(future, callback_timeout)
--> 309             result[0] = yield future
    310         except Exception:
    311             error[0] = sys.exc_info()

/raid/vjawa/conda/envs/rapids-21.10-sep-10-dask-sql-gpu-bdb/lib/python3.8/site-packages/tornado/gen.py in run(self)
    760 
    761                     try:
--> 762                         value = future.result()
    763                     except Exception:
    764                         exc_info = sys.exc_info()

/raid/vjawa/conda/envs/rapids-21.10-sep-10-dask-sql-gpu-bdb/lib/python3.8/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
   1829                             exc = CancelledError(key)
   1830                         else:
-> 1831                             raise exception.with_traceback(traceback)
   1832                         raise exc
   1833                     if errors == "skip":

/raid/vjawa/conda/envs/rapids-21.10-sep-10-dask-sql-gpu-bdb/lib/python3.8/site-packages/dask/layers.py in __call__()
     34         from distributed.utils import import_term
     35 
---> 36         return import_term(self.function_path)(*args, **kwargs)
     37 
     38 

~/dask-sql/dask-cuda/dask_cuda/proxify_device_objects.py in wrapper()
    156     @functools.wraps(func)
    157     def wrapper(*args, **kwargs):
--> 158         ret = func(*args, **kwargs)
    159         if dask.config.get("jit-unspill-compatibility-mode", default=False):
    160             ret = unproxify_device_objects(ret, skip_explicit_proxies=False)

/raid/vjawa/conda/envs/rapids-21.10-sep-10-dask-sql-gpu-bdb/lib/python3.8/site-packages/dask/dataframe/core.py in _concat()
    122         args[0]
    123         if not args2
--> 124         else methods.concat(args2, uniform=True, ignore_index=ignore_index)
    125     )
    126 

/raid/vjawa/conda/envs/rapids-21.10-sep-10-dask-sql-gpu-bdb/lib/python3.8/site-packages/dask/dataframe/dispatch.py in concat()
     59     else:
     60         func = concat_dispatch.dispatch(type(dfs[0]))
---> 61         return func(
     62             dfs,
     63             axis=axis,

~/dask-sql/dask-cuda/dask_cuda/proxy_object.py in wrapper()
    801     @functools.wraps(func)
    802     def wrapper(*args, **kwargs):
--> 803         args = [unproxy(d) for d in args]
    804         kwargs = {k: unproxy(v) for k, v in kwargs.items()}
    805         return func(*args, **kwargs)

~/dask-sql/dask-cuda/dask_cuda/proxy_object.py in <listcomp>()
    801     @functools.wraps(func)
    802     def wrapper(*args, **kwargs):
--> 803         args = [unproxy(d) for d in args]
    804         kwargs = {k: unproxy(v) for k, v in kwargs.items()}
    805         return func(*args, **kwargs)

~/dask-sql/dask-cuda/dask_cuda/proxy_object.py in unproxy()
    121     except AttributeError:
    122         if type(obj) in (list, tuple, set, frozenset):
--> 123             return type(obj)(unproxy(o) for o in obj)
    124     return obj
    125 

~/dask-sql/dask-cuda/dask_cuda/proxy_object.py in <genexpr>()
    121     except AttributeError:
    122         if type(obj) in (list, tuple, set, frozenset):
--> 123             return type(obj)(unproxy(o) for o in obj)
    124     return obj
    125 

~/dask-sql/dask-cuda/dask_cuda/proxy_object.py in unproxy()
    118     """
    119     try:
--> 120         obj = obj._obj_pxy_deserialize()
    121     except AttributeError:
    122         if type(obj) in (list, tuple, set, frozenset):

~/dask-sql/dask-cuda/dask_cuda/proxy_object.py in _obj_pxy_deserialize()
    368                         and self._obj_pxy["serializer"] != "cuda"
    369                     ):
--> 370                         manager.maybe_evict(self.__sizeof__())
    371 
    372                     # Deserialize the proxied object

~/dask-sql/dask-cuda/dask_cuda/proxify_host_file.py in maybe_evict()
    346 
    347     def maybe_evict(self, extra_dev_mem=0) -> None:
--> 348         self.maybe_evict_from_device(extra_dev_mem)
    349         self.maybe_evict_from_host()
    350 

~/dask-sql/dask-cuda/dask_cuda/proxify_host_file.py in maybe_evict_from_device()
    307 
    308         with self.lock:
--> 309             total_dev_mem_usage, dev_buf_access = self.get_dev_access_info()
    310             total_dev_mem_usage += extra_dev_mem
    311             if total_dev_mem_usage > self._device_memory_limit:

~/dask-sql/dask-cuda/dask_cuda/proxify_host_file.py in get_dev_access_info()
    282             total_dev_mem_usage = 0
    283             dev_buf_access = []
--> 284             for dev_buf, proxies in self.get_dev_buffer_to_proxies().items():
    285                 last_access = max(p._obj_pxy.get("last_access", 0) for p in proxies)
    286                 size = sizeof(dev_buf)

~/dask-sql/dask-cuda/dask_cuda/proxify_host_file.py in get_dev_buffer_to_proxies()
    271             # parts of the same device buffer.
    272             ret = defaultdict(list)
--> 273             for proxy in self._dev:
    274                 for dev_buffer in proxy._obj_pxy_get_device_memory_objects():
    275                     ret[dev_buffer].append(proxy)

~/dask-sql/dask-cuda/dask_cuda/proxify_host_file.py in __iter__()
     78 
     79     def __iter__(self) -> Iterator[ProxyObject]:
---> 80         for p in self._proxy_id_to_proxy.values():
     81             ret = p()
     82             if ret is not None:

RuntimeError: dictionary changed size during iteration

dask_cuda/proxify_host_file.py Outdated Show resolved Hide resolved
@pentschev
Copy link
Member

Overall looks good to me @madsbk , I've added one last suggestion. Probably we want to address #708 (comment) here before merging too?

Co-authored-by: Peter Andreas Entschev <[email protected]>
@madsbk
Copy link
Member Author

madsbk commented Sep 17, 2021

Overall looks good to me @madsbk , I've added one last suggestion. Probably we want to address #708 (comment) here before merging too?

Thanks for the review and suggestions @pentschev!
Yes, I will begin debugging the #708 (comment) issue.

@madsbk
Copy link
Member Author

madsbk commented Sep 17, 2021

On this PR i got the below error some times when running the workflow at #657 (comment) on 8 GPUs of a dgx-2 with n_parts=1500 .

I am not able to reproduce the error on exp01 :/
@VibhuJawa, should we try to debugging together if you are able to reproduce the error fairly consistently?

@VibhuJawa
Copy link
Member

On this PR i got the below error some times when running the workflow at #657 (comment) on 8 GPUs of a dgx-2 with n_parts=1500 .

I am not able to reproduce the error on exp01 :/

After creating a fresh environment , i was not able to reproduce the error too .

Also, wanted to let you know that based on testing , this PR enables some workflows to run on machines with lesser resources ( GPU memory and host memory) where it previously failed.

Thanks again @madsbk for working though the issue . Please feel free to merge the PR from my end.

@pentschev
Copy link
Member

Overall this looks good to me, we also had the report with the error below:

Traceback (most recent call last):
  File "/raid1/cjarrett/miniconda3/envs/cudf-gpu-bdb/lib/python3.8/site-packages/distributed/nanny.py", line 844, in _run
    worker = Worker(**worker_kwargs)
  File "/raid1/cjarrett/miniconda3/envs/cudf-gpu-bdb/lib/python3.8/site-packages/distributed/worker.py", line 633, in __init__
    self.data = data[0](**data[1])
TypeError: __init__() got an unexpected keyword argument 'memory_limit'

Should we address this here before merging @madsbk ?

@madsbk
Copy link
Member Author

madsbk commented Sep 21, 2021 via email

Copy link
Member

@pentschev pentschev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks a lot for all the work here Mads!

@pentschev
Copy link
Member

@gpucibot merge

@rapids-bot rapids-bot bot merged commit a38d0b8 into rapidsai:branch-21.10 Sep 22, 2021
@jakirkham
Copy link
Member

Thanks Mads for the PR and Peter for reviewing! Also thanks Vibhu for testing 😄

@madsbk madsbk deleted the jit_unspill_from_disk branch September 23, 2021 08:29
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
3 - Ready for Review Ready for review by team improvement Improvement / enhancement to an existing function non-breaking Non-breaking change python python code needed
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[FEA] Add support for disk JIT-unspill
5 participants