-
-
Notifications
You must be signed in to change notification settings - Fork 8.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[RFC] harde rabit framework #4250
Comments
@chenqin Hi, first of all, I'd like to say thank you. Your work in distributed training has been instrumental in brining XGBoost to industrial-grade standard. I'd like to receives your suggestions about improving the CI infrastructure, i.e. how we should test distributed training. See discussion at #4234. For example, we should maybe test dmlc/rabit together in dmlc/xgboost, so that no future change of dmlc/rabit would break distributed XGBoost. |
Thanks for all the work @chenqin ! Regarding rabbit's future, I was also wondering if it would be possible to introduce a scatter-reduce algorithm like LightGBM uses. The benefits when user 2^x workers seem to be significant. For job restores, what is the state that we need to be restoring? I'm not familiar with the term peer restore, does that mean communicating data and models from other nodes instead of a single node reading from HDFS? Do you think there are ideas we can use from Flink/Spark job recovery? |
Sounds good, split build/test , adding OSX, LINUX, WINDOWS should be straight forward.
In term of distributed XGBoost, we can start with including XGBoost4j-Spark tests as start point. Possibly next adding simulate rabit failures tests in XGBoost4j-Spark using mock engine to verify item 2) |
yes, that should be item 3) once we can verify framework failure recovery works as expected. We can add more functions with good failure resilient recovery support.
That's mostly correct to start a job. item 2) is mostly around the case when you have single worker failure. How distributed XGBoost deal with entire worker fleet. What we have is to drop all works since last checkpoint and restart job.(entire load data -> restore external checkpoint -> continue train) What item 2) trying to solve is only restore worker that lost (load only partial data assigned to lost worker(s)-> restore last checkpoint from other peers ->catch up on next allreduce) The catch is there will be situation where XGBoost lost too many workers in tree-reduce (most likely) topology, so we need to basically fallback to what we have right now.
AFAIK underlaying Flink/Spark is continuous processing and map-reduce paradigms. In map-reduce, we can retry mapper in case they lost, sync (barrier) happens after a map stage complete. In continuous processing, the sync might never happens hence flink introduced control messages to coordinate workers with different roles. In distributed XGBoost, global sync(ALLREDUCE/BRAODCAST) happens frequently to update histogram on fixed topology (network topology is fixed after all workers registered in tracker). AllReduce -> BroadCast->AllReduce ... Unless we could prove in math we can upper bound error ratio based on partial histogram, rabit still have to block till MPI func complete each operation. |
OK if I understand correctly then when a worker fails in this scenario, the others continue with the iteration while we launch a new one and it reads its data partition. We complete the current iteration using the limited information from the live workers, and in the next iteration, the new worker gets the model state and we continue as before.
OK I see what you are saying. We have discussed early termination of histogram creation/communication with @hcho3 in the past, where concentration bounds like the Hoeffding bound can be used to prove convergence, similar to the VFDT tree or this recent work by @arapat and Freund (code here). |
Linking Rabit's paper here that focuses on the fault tolerance mechanism. I'll try to go through this and find correspondences in the code, and hopefully annotate the code with comments to make it more readable. |
I agree we should trace back to rabit paper and see how they designed failure recovery initially. At same time, binary tree wise tree reduce might not be most performant network topology considering time cost of {allreduce-> broadcast} e.g when we try to grow tree depth on large worker cluster total number of {allreduce-> broadcast} per iteration goes up exponentially (2^number_of_depth) so as number of network hops (log (number_of_workers) ).
Thanks! Let me dig in a bit and will come back to you latter. |
Rabit2 design proposal keyword FSM, EPOLL, C10K, data linerage, map-reduce, allreduce, fast recovery, large scale, MPI |
@chenqin Is this task complete? |
I think we can say hardening part is complete, improving is still undergoing. |
Improvement I have been working on enable native rabit checkpoint in YARN/SPARK deployment. https://github.com/chenqin/xgboost/blob/master/tests/cli/runsingle.sh so far I identified two issues with current rabit based recovery in xgb.
In order to construct a Partitioned DMatrix and build learner (which will get injected with loadcheckpoint) we need to sync number of columns in all workers before loadcheckpoint.
AFAIK, a simple way is to keep this configuration in tracker (each worker just call once when it runs or reruns) and expose via rabit.h with following changes https://github.com/chenqin/rabit/commit/11adddc83fa8ec0041b547fb2fe77721cc072eae then it pass failure recovery tests with rabit. |
@chenqin Thanks. Honestly I don't quite understand what's happening ... So I will stay out of this. |
follow your github repo @chenqin , dmlc/rabit#63 is solved. but there is another issue when one worker stopped and restart. I add some logging, the error occured when
|
can you share tests case you were running?
|
my test as follow:
after training started for a few round, one worker killed. then restart the worker. the conf:
the above error was but, with so it seems there is still some problem with |
Thanks for sharing, will take a look! update: update 4th July: |
Thank you, @chenqin for all the work. We are experimenting with XGBoost on a large scale and I am very interested in working on making xgboost-spark training reliable. I read this thread and I am a bit confused. As far as I understand xgboost-spark is not reliable. At least it has TaskFailedListener which kills the job if one of tasks fail. On another hand you have a test showing that distributed xgboost without Spark can recover from a node failure (great achievement!) Do I understand correctly that distributed training in xgboost-spark is still not reliable (in 0.9)? |
@trams xgb-spark is reliable and able to do checkpoint restore, we daily runs large number of jobs with hundreds of containers and thousands of cores deployment. This doc is tracking works around rabit (worker synchronization layer). As we speak right now, XGBSpark works fine with remote checkpoint and restart / resume job. The goal of improvement is to allow distributed xgb tolerate a few failures without restart/resume entire training job. It can also help moving larger xgb workers on cheaper preemptive instances which can only last a few iterations with much cheaper cost.
|
@trams fail-all-and-make-a-checkpoint on any error is a very typical strategy for machine learning systems. The very original reason to fail everything on any error in spark layer is that the rabit fault tolerance is a bit off here and there leading to the stuck training process. Fixing those "offs" is the topic in this thread. In future, we should provide two types of fault tolerance strategy, auto task recovery functionality from this thread, and checkpoint in spark (to handle situation that the whole job is down for some reason) |
I am glad to hear about your plans on making distributed xgboost more reliable. And thank you for a quick response As for checkpointing it works but #3946 prevents us to use to full extend |
@wstian can you try my latest master and see if it can pass your previous tests next week? |
rabit one off recovery cache |
sorry for late response. try with your latest master:
and repeat the previous tests, which is
after the shutdown
worker-1 also come across this error and exit. Do I miss any dependency update, or using wrong commits? |
@wstian I added more logging https://github.com/chenqin/rabit/commit/2bd993a8d0483ddb8a4661561cf2bfd16bcd8a91 The error you saw might be a rabit long existing rabit bug. in my test, it means checkpoint/loadcheckpoint collective calls sometimes unexpectedly interact with allreduce/broadcast calls from healthy nodes. Which causes trygetresult fail due to lack of necessary roles to operate (KRequestData, KHaveData) |
try with [chenqin/rabit@2bd993a] worker-0 shut down when restart, log as below:
worker-1 is still running (not shut down this time), log as below:
|
@wstian quick look into log you posted, I think it was same as I saw before. wha happens is worker-1 calls into checkpoint procedure while worker-0 is still in bootstrap/allreduce. it force worker1 and worker0 into collective checkpoint state I guess the initial intention of this code is to see if any nodes are missing latest checkpoint payload before collective checkpointing. The problem you saw was that worker 0 caller give a buffer pointer and size that suppose to do allreduce/broadcast. When actual checkpoint payload recover starts. payload size mismatch. working on fixing. update: ` [0]@@@hit Mock Error:CheckPoint |
try chenqin/rabit@1e82c33
|
Thanks you quick response! This is also a known issue where I put an assert to avoid unexpected state Also, I tried to avoid seq number overflow by reducing specialOp value. Can you help give another try on this as well? |
try chenqin/rabit@aa2153f
|
humm, last try. adding assertion of negative seq no as "recv ack - |-55050240|" as well as flag logging. passing multiple run of https://github.com/chenqin/xgboost/blob/master/tests/cli/runxgbtests.sh @wstian will you be able to help run one last time against https://github.com/chenqin/rabit/commit/0a522a77829162f2dbea8c99487819b8b50b47c2 |
Let's document what is "expected" fault recovery behavior in current rabit design so we are all on same page with reasonable expectation.
example of supported use case:
example of not supported use case:
if node fail and recovered node run allreduce before loadcheckpoint, other nodes will not be able to recover results. A successful checkpoint also instruments all nodes clear allreduce/boradcast results from corresponding iteration. Hence recovery node will not be able to pick right results from other nodes. It also apply to histogram init within first iteration after loadcheckpoint-0. When nodes surpass first iteration, those results will be lost. |
So I found root cause of negative overflow, it was caused by introduce additional bit and update left shift 5 bits. It cause kSpecialOp << 5 + flags greater than signed int32 range. I submitted a fix which will use unsigned_int32 instead of signed one. https://github.com/chenqin/rabit/pull/4/files#diff-5404c32553915533fabb6d48e626e98fR274 |
@chenqin Can we close this one? |
Yes please |
Over the course of last half, I have been working on some of largest xgboost jobs.
Often we face some of weird issues while running thousands of workers over billion rows of training data-set. Minor set of PRs submitted dmlc/dmlc-core#512 dmlc/rabit#81 dmlc/rabit#73
As distributed training moves into production with hundreds of production models running on top of XGB, this lead to the need of harden rabit framework with better code layout/documentation as well as improved function test coverage. Here is list of work I am planning to execute and request for feedbacks from community.
increase test coverages with split of _robust implementations, add test coverage and harden on ring based restore (haven't able to find where that code path were used). There should be a waterfall flow where filesystem based checkpoint and job restart will be executed only when ring based recovery can't restore.
switch distributed xgb checkpoint restore from external checkpoint(HDFS/S3) to peer restore. Redistribute training dataset can take 30 - 40 mins pulling from HDFS, restore single worker failure (increase with cluster getting larger) without restart entire training stage can save significant amount of time. This is particularly interesting while workers running on top of preemptive instances. If small percentage of tasks might get rescheduled due to executor host get preempted, whether job can restore fast. AFAIK, not of other MPI frameworks has build such recovery mechanism to run on top of preemptive instances.
more MPI features, primitive metrics and profiling capability. Investigate job failure can be burdensome, primitive metrics report aggregation could improve user experience a lot.
Thanks,
Chen
The text was updated successfully, but these errors were encountered: