Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dask async operation with XGBoost failed (with Rapids on GPU) #5639

Closed
mengdong opened this issue May 6, 2020 · 7 comments · Fixed by #5862
Closed

dask async operation with XGBoost failed (with Rapids on GPU) #5639

mengdong opened this issue May 6, 2020 · 7 comments · Fixed by #5862
Assignees

Comments

@mengdong
Copy link
Contributor

mengdong commented May 6, 2020

Hi Folks,

Trying to launch Dask cluster with python async APIs and do XGBoost training, 2 issues observed: 1, seems like xgb.dask.DaskDMatrix cannot be awaited, how should I use it in async mode (dask_xgboost doesn't have this issue as it is not depended on xgb.dask.DaskDMatrix) ? 2, calling client.shutdown after a XGBoost training will incur error, without XGBoost, Dask cluster can shutdown gracefully, what is the reason behind it? (this error occurs with dask_xgboost as well)

Example code with xgboost:

import numba, socket, time, pprint
import dask, dask_cudf, asyncio
from dask.distributed import Client, wait
import xgboost as xgb

async def main():
    async with Client('[scheduler-ip]:8786', asynchronous=True) as client:
        dask.config.set({'distributed.scheduler.work-stealing': False})
        dask.config.get('distributed.scheduler.work-stealing')
        dask.config.set({'distributed.scheduler.bandwidth': 1})
        dask.config.get('distributed.scheduler.bandwidth')
        base_path = '/rapids/my_data/RAPIDS/s3-upload/'
        df = dask_cudf.read_csv(base_path+'*.part')
        y = df['fare_amount']
        x = df[df.columns.difference(['fare_amount'])]
        dtrain = await xgb.dask.DaskDMatrix(client, x, y)
        trained_model = await xgb.dask.train(client,
                                { 'verbosity': 3,
                                 'learning_rate': 0.3,
                                  'max_depth': 8,
                                  'objective': 'reg:squarederror',
                                  'subsample': 0.6,
                                  'gamma': 1,
                                  'verbose_eval': True,
                                  'tree_method':'gpu_hist',
                                 },
				dtrain,
				num_boost_round=200, evals=[(dtrain, 'train')]))
        print("training finished")
        await client.shutdown()

Error log:

  FutureWarning,
/opt/conda/envs/rapids/lib/python3.6/site-packages/xgboost/dask.py:177: RuntimeWarning: coroutine 'DaskDMatrix.map_local_data' was never awaited
  client.sync(self.map_local_data, client, data, label, weight)
Traceback (most recent call last):
  File "test_client.py", line 34, in <module>
    asyncio.get_event_loop().run_until_complete(main())
  File "/opt/conda/envs/rapids/lib/python3.6/asyncio/base_events.py", line 488, in run_until_complete
    return future.result()
  File "test_client.py", line 17, in main
    dtrain = await xgb.dask.DaskDMatrix(client, x, y)
TypeError: object DaskDMatrix can't be used in 'await' expression

Example code with dask-xgboost:

import numba, socket, time, pprint
import dask, dask_cudf, asyncio
from dask.distributed import Client, wait
import dask_xgboost as dxgb

async def main():
    async with Client('[scheduler-ip]:8786', asynchronous=True) as client:
        dask.config.set({'distributed.scheduler.work-stealing': False})
        dask.config.get('distributed.scheduler.work-stealing')
        dask.config.set({'distributed.scheduler.bandwidth': 1})
        dask.config.get('distributed.scheduler.bandwidth')
        base_path = '/rapids/my_data/RAPIDS/s3-upload/'
        df = dask_cudf.read_csv(base_path+'*.part')
        y = df['fare_amount']
        x = df[df.columns.difference(['fare_amount'])]
        trained_model = await dxgb._train(client,
                                { 'verbosity': 3,
                                 'learning_rate': 0.3,
                                  'max_depth': 8,
                                  'objective': 'reg:squarederror',
                                  'subsample': 0.6,
                                  'gamma': 1,
                                  'verbose_eval': True,
                                  'tree_method':'gpu_hist',
                                  'num_round': 100,
                                },
                                x, y)
        print("training finished")
        await client.shutdown()

if __name__=='__main__':
    asyncio.get_event_loop().run_until_complete(main())

Error log:

[10:28:13] ======== Monitor: GBTree ========
[10:28:13] BoostNewTrees: 9.36862s, 100 calls @ 9368619us
[10:28:13] CommitModel: 0.004453s, 100 calls @ 4453us
[10:28:13] ======== Monitor:  ========
[10:28:13] PredictFromCache: 0.004156s, 199 calls @ 4156us
[10:28:13] ======== Monitor: ellpack_page ========
[10:28:13] BinningCompression: 0.417215s, 1 calls @ 417215us
[10:28:13] InitCompressedData: 0.003502s, 1 calls @ 3502us
[10:28:13] InitEllpackInfo: 6.1e-05s, 1 calls @ 61us
[10:28:13] Quantiles: 0.677472s, 1 calls @ 677472us
terminate called after throwing an instance of 'dmlc::Error'
  what():  [10:28:13] /conda/conda-bld/xgboost_1585067869426/work/src/c_api/../data/../common/device_helpers.cuh:347: Attempting to deallocate 2155399824 bytes on device 0 that was never allocated
Stack trace:
  [bt] (0) /opt/conda/envs/rapids/lib/libxgboost.so(+0xe762f) [0x7febebf5a62f]
  [bt] (1) /opt/conda/envs/rapids/lib/libxgboost.so(dh::detail::MemoryLogger::DeviceStats::RegisterDeallocation(void*, unsigned long, int)+0x1c9) [0x7febec134309]
  [bt] (2) /opt/conda/envs/rapids/lib/libxgboost.so(+0x2e9253) [0x7febec15c253]
  [bt] (3) /opt/conda/envs/rapids/lib/libxgboost.so(xgboost::HostDeviceVector<xgboost::Entry>::~HostDeviceVector()+0x79) [0x7febec161209]
  [bt] (4) /opt/conda/envs/rapids/lib/libxgboost.so(+0xe4d06) [0x7febebf57d06]
  [bt] (5) /opt/conda/envs/rapids/lib/libxgboost.so(+0x160eab) [0x7febebfd3eab]
  [bt] (6) /opt/conda/envs/rapids/lib/libxgboost.so(std::_Sp_counted_ptr_inplace<std::unordered_map<xgboost::DMatrix*, xgboost::PredictionCacheEntry, std::hash<xgboost::DMatrix*>, std::equal_to<xgboost::DMatrix*>, std::allocator<std::pair<xgboost::DMatrix* const, xgboost::PredictionCacheEntry> > >, std::allocator<std::unordered_map<xgboost::DMatrix*, xgboost::PredictionCacheEntry, std::hash<xgboost::DMatrix*>, std::equal_to<xgboost::DMatrix*>, std::allocator<std::pair<xgboost::DMatrix* const, xgboost::PredictionCacheEntry> > > >, (__gnu_cxx::_Lock_policy)2>::_M_dispose()+0xe3) [0x7febec00a3e3]
  [bt] (7) /opt/conda/envs/rapids/lib/libxgboost.so(std::_Sp_counted_base<(__gnu_cxx::_Lock_policy)2>::_M_release()+0x47) [0x7febebf5e977]
  [bt] (8) /opt/conda/envs/rapids/lib/libxgboost.so(+0x19c89f) [0x7febec00f89f]
@trivialfis trivialfis self-assigned this May 6, 2020
@trivialfis
Copy link
Member

Haven't tested it yet. But from a quick glance, for the first log, I'm not sure why do you need to await DaskDMatrix, it should be used as a parameter of train function (replacing X, y), and awaiting train should be sufficient?

For the second log, it's a bug in XGBoost's internal profiling code where verbosity is not available for DMatrix. A simple workaround right now is don't set verbosity to 3.

@mengdong
Copy link
Contributor Author

mengdong commented May 6, 2020

Thanks @trivialfis ! First, I did made a mistake copying the code over, I have corrected in original issue, I cross pasted the code between xgboost and dask-xgboost. for the first issue, if I don't await DaskDmatrix, here is the error

/opt/conda/envs/rapids/lib/python3.6/site-packages/xgboost/dask.py:177: RuntimeWarning: coroutine 'DaskDMatrix.map_local_data' was never awaited
  client.sync(self.map_local_data, client, data, label, weight)
Traceback (most recent call last):
  File "test_client.py", line 34, in <module>
    asyncio.get_event_loop().run_until_complete(main())
  File "/opt/conda/envs/rapids/lib/python3.6/asyncio/base_events.py", line 488, in run_until_complete
    return future.result()
  File "test_client.py", line 29, in main
    x, y) 
  File "/opt/conda/envs/rapids/lib/python3.6/site-packages/xgboost/dask.py", line 398, in train
    rabit_args = _get_rabit_args(workers, client)
  File "/opt/conda/envs/rapids/lib/python3.6/site-packages/xgboost/dask.py", line 354, in _get_rabit_args
    rabit_args = [('%s=%s' % item).encode() for item in env.items()]
AttributeError: 'coroutine' object has no attribute 'items'
sys:1: RuntimeWarning: coroutine 'Client._run_on_scheduler' was never awaited

@mengdong
Copy link
Contributor Author

mengdong commented May 6, 2020

on second issue, I tried your workaround and change verbosity to 2, this time the scheduler has a different error

distributed.core - ERROR - 
Traceback (most recent call last):
  File "/opt/conda/envs/rapids/lib/python3.6/site-packages/distributed/core.py", line 456, in handle_stream
    msgs = await comm.read()
  File "/opt/conda/envs/rapids/lib/python3.6/site-packages/distributed/comm/tcp.py", line 188, in read
    n_frames = await stream.read_bytes(8)
concurrent.futures._base.CancelledError
distributed.utils - ERROR - 
Traceback (most recent call last):
  File "/opt/conda/envs/rapids/lib/python3.6/site-packages/distributed/utils.py", line 665, in log_errors
    yield
  File "/opt/conda/envs/rapids/lib/python3.6/site-packages/distributed/scheduler.py", line 1740, in add_worker
    await self.handle_worker(comm=comm, worker=address)
  File "/opt/conda/envs/rapids/lib/python3.6/site-packages/distributed/scheduler.py", line 2695, in handle_worker
    await self.handle_stream(comm=comm, extra={"worker": worker})
  File "/opt/conda/envs/rapids/lib/python3.6/site-packages/distributed/core.py", line 456, in handle_stream
    msgs = await comm.read()
  File "/opt/conda/envs/rapids/lib/python3.6/site-packages/distributed/comm/tcp.py", line 188, in read
    n_frames = await stream.read_bytes(8)
concurrent.futures._base.CancelledError
/opt/conda/envs/rapids/lib/python3.6/site-packages/distributed/client.py:4648: RuntimeWarning: coroutine 'Client._close' was never awaited
  c.close(timeout=2)

@jakirkham
Copy link
Contributor

jakirkham commented May 6, 2020

Jumping in here briefly (hope that is ok), I think what Dong is looking for (also please correct me if I'm wrong Dong) is an option to have Futures returned here instead of syncing on them. Maybe this can be controlled by a flag to train?

@trivialfis
Copy link
Member

trivialfis commented May 6, 2020 via email

@trivialfis
Copy link
Member

trivialfis commented Jul 6, 2020

Added to 1.2 roadmap.

@mengdong
Copy link
Contributor Author

I have tested #5862 will fix this issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants