-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[DO NOT MERGE] Added error handling in MXNet #19
base: mxnet_feature_fp16
Are you sure you want to change the base?
Conversation
horovod/mxnet/handle_manager.cc
Outdated
} | ||
|
||
void HandleManager::AttachCallback(int handle, Callback cb) { | ||
std::unique_lock<std::mutex> lock(mutex_); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can use lock_guard here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
horovod/mxnet/mpi_ops.cc
Outdated
auto device = TensorUtil::GetDevice(tensor); | ||
auto hvd_tensor = std::make_shared<MXTensor<NDArray>>(tensor); | ||
auto hvd_context = std::make_shared<MXOpContext<NDArray>>(device, output); | ||
auto hvd_output = std::make_shared<MXTensor<NDArray>>(output); | ||
handle_manager.AttachCallback(handle, cb); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: let's have consistent formatting about this line across different functions. How about having one empty line before L52 and remove the empty line L53?
horovod/mxnet/mpi_ops.cc
Outdated
@@ -72,37 +73,41 @@ void DoAllreduceCudaOnCPU(NDArray* tensor, NDArray* output, std::string& name, | |||
|
|||
auto hvd_context = std::make_shared<MXOpContext<NDArray>>( | |||
CPU_DEVICE_ID, hvd_cpu_buffer->tensor()); | |||
handle_manager.AttachCallback(handle, cb); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as above
horovod/mxnet/mpi_ops.cc
Outdated
@@ -142,50 +149,57 @@ void DoBroadcast(NDArray* tensor, NDArray* output, int root_rank, | |||
hvd_output = std::make_shared<MXTensor<NDArray>>(output); | |||
} | |||
|
|||
handle_manager.AttachCallback(handle, cb); | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: remove L153?
horovod/mxnet/mpi_ops.cc
Outdated
// Make async copy of input tensor to CPU tensor and record completion event. | ||
auto hvd_context = std::make_shared<MXOpContext<NDArray>>( | ||
CPU_DEVICE_ID, hvd_cpu_buffer->tensor()); | ||
auto ready_event = | ||
std::make_shared<MXReadyEvent<NDArray>>(hvd_cpu_buffer->tensor()); | ||
|
||
handle_manager.AttachCallback(handle, cb); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: remove L175?
test/test_mxnet.py
Outdated
hvd.broadcast(tensor, 0) | ||
assert False, 'hvd.broadcast did not throw error' | ||
except (MXNetError, RuntimeError) as e: | ||
print(e) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove print or keep it?
test/test_mxnet.py
Outdated
hvd.broadcast(tensor, 0) | ||
assert False, 'hvd.broadcast did not throw error' | ||
except (MXNetError, RuntimeError) as e: | ||
print(e) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove print or keep it?
|
||
check_call(MPI_MXNET_LIB_CTYPES.horovod_mxnet_wait_and_clear(handle)) | ||
output = _handle_map.pop(handle) | ||
return output |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this output useful for the users? If not, we may not need to store the output in the _handle_map. We can even use a set to store the handle.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
output is useful because allreduce()
, broadcast()
need to return tensor, and they are returned by calling synchronize()
horovod/mxnet/mpi_ops.py
Outdated
handle = MPI_MXNET_LIB_CTYPES.horovod_mxnet_allreduce_async( | ||
c_in, c_out, name, ctypes.c_bool(average)) | ||
|
||
_handle_map[handle] = (tensor, tensor) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
_handle_map[handle] = tensor?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch. this is a bug
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's discuss PR in more details.
|
||
extern "C" int horovod_mxnet_wait_and_clear(int handle) { | ||
API_BEGIN(); | ||
while (!handle_manager.PollHandle(handle)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have concerns about this since it will introduce contention on the mutex.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated the sequence of callback
and markdone
as we discussed. So this should no longer introduce race condition. Please review again
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. I will review after you push your changes.
ctypes.byref(mx_handle))) | ||
|
||
_handle_map[mx_handle.value] = output | ||
return synchronize(mx_handle.value) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the plan to introduce true async functions later that could be used in DistributedOptimizer
to improve performance? Would current formulation still perform better than the reference parameter server?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Referring to these numbers:
# gpus | Without HA | With HA
---------------------------------
8 | 3072 (NA)| 3078 (NA)
16 | 6027 (98%)| 5859 (95%)
32 | 12030 (98%)| 11675 (95%)
64 | 22346 (83%)| 23166 (94%)
128 | 40938 (84%)| 45972 (93%)
256 | 64998 (66%)| 89858 (91%)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it is possible to remove the synchronize()
and reply on MXNet engine to handle the dependencies between tasks. We plan to further improve the performance after we merge the current stable PR into Horovod. Does that sound good to you?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, sounds good. Do you have a sense of scaling efficiency we'll see with the current version? I hope to include MXNet support in upcoming 0.16.0 release next week, and I wanted to see if we can publish good scaling #s with it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@alsrgv We found the throughput is affected when we use synchronize()
. I am reverting to the original implementation now. However, we need a better mechanism to catch the error status returned by Horovod. I am currently trying to introduce a context
variable to store the Status
inside the callback just like what Tensorflow is doing. If you have other better suggestion, it will be greatly appreciated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think if MXNet has a mechanism to notify framework about op failure, which it will propagate to the user, it would be the best option to use.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@alsrgv I have been trying to leverage MXNet to catch the exception and propagate to user at Python level in the past two days. However, there was always a ibc++abi.dylib: terminating with uncaught exception of type dmlc::Error:
. I suspect there is some bug in MXNet side to handle exception thrown in the engine callback.
In the meantime, do you think would it be okay to just log the error for now in Horovod and we will continue to improve this after MXNet is merged in Horovod? Please let us know your thoughts. Thanks!
char* name, bool average) { | ||
auto handle = handle_manager.AllocateHandle(); | ||
const char* name, bool average, | ||
int* handle) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we use int* handle here? we cannot return the handle to the caller?
If you are not returning, the function return type should be changed to void
Same applies to other functions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function returns the status through MX_API_END()
instead of handle. The return status is actually needed at python level when you call check_call()
.
Also added unit tests