-
-
Notifications
You must be signed in to change notification settings - Fork 5.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
@catch, retry, partition, asyncmap and pmap #15409
Conversation
@@ -0,0 +1,72 @@ | |||
# This file is a part of Julia. License is MIT: http://julialang.org/license |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this needs to be added to test/choosetests.jl
to actually run
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
true that 🙄
Note: this commit adds a small default delay to I'm happy to back this out if it's seen as adding too much at once. However, it seems to me that it is probably a good thing to have a small delay by default when retrying something that caused an exception (or at the very least to |
acbe45e
to
91bde90
Compare
rebased to ec6f886 |
tests now ok. |
|
@JeffBezanson any suggestions on what this thing should be called if not
|
@@ -3,6 +3,9 @@ | |||
|
|||
type WorkerPool | |||
channel::RemoteChannel{Channel{Int}} | |||
|
|||
# Create a shared queue of available workers... | |||
WorkerPool() = new(RemoteChannel(()->Channel{Int}(128))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be changed to RemoteChannel(()->Channel(typemax(Int)))
, else WorkerPool(workers::Vector{Int})
will block when trying to add more than 128 workers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will do
|
There is some precedence for things that return an iterator having an Then there are a bunch of things that have no hint of iterator-ness in their names:
|
0483198
to
7281a03
Compare
On reflection, @amitmurthy's suggestion of It would have been very handy for this JuliaCloud/AWSCore.jl@9330a7b#diff-c33c63b7ba6d9fce289488d1cf3dfcaeR64 just yesterday. |
Isn't |
Thanks for pointing that out @nalimilan, its good to know about Iterators.jl. julia> collect(partition("Hello World!", 5))
2-element Array{Tuple{Char,Char,Char,Char,Char},1}:
('H','e','l','l','o')
(' ','W','o','r','l')
julia> collect(split("Hello World!", 5))
3-element Array{ASCIIString,1}:
"Hello"
" Worl"
"d!"
julia> collect(partition([1,2,3,4,5], 3))
1-element Array{Tuple{Int64,Int64,Int64},1}:
(1,2,3)
julia> collect(split([1,2,3,4,5], 3))
2-element Array{Array{Int64,1},1}:
[1,2,3]
[4,5] |
Sorry for the delay reviewing this. @samoconnor , I am thinking it will be easier to just have the complete new Avoiding new exports makes the PR less controversial since we are not adding something that needs to be supported over the long term. I'll push a fix separately for the |
Unless you would rather submit a PR for these corrections. I should have asked. |
@samoconnor Looks like you should add a |
@amitmurthy, please go ahead and make the I'll also add My current intention is for asyncmap(f, c...) = collect(StreamMapIterator(f, c...))
pmap(f, c...) = vcat(asyncmap(remote(chunk->asyncmap(a->f(a...), chunk), split(zip(c), 100)) i.e. run batches of 100 (or whatever a good default is) async maps on each worker. |
I've just committed a WIP pmap() implementation. I've also included |
f69b327
to
0cb8943
Compare
Thanks. Could you squash the commits? Will merge in a day or two if there are no major concerns. |
…g#15409 and JuliaLang#14843) Rename *MapIterator to *Generator
381c584
to
808e868
Compare
squashed |
end | ||
|
||
function next(itr::AsyncGenerator, state::AsyncGeneratorState) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think a line break is in order here, even if we allowed the exception of blocks starting with a comment.
Thanks @samoconnor for this, and everyone for reviewing it. I'll submit a PR for the remaining cosmetic changes discussed here. |
Thanks Amit |
Looking at #15661 made me notice the |
@JeffBezanson I though that someone might notice that I chose
I'm not sure that My preference would be to remove Also, why not make Then you would just have ... or even Note: Midori has |
You could call it |
Could also call it |
+1 for |
What is "except" supposed to mean? "exception" or "except"? |
Taking this in a slightly different direction, it makes me nervous to have an API that returns either a value or an exception that was thrown, without leaving any indication of which it was. |
Something more Nullable-like, a la ExceptionOr would be easier to make type stable and less ambiguous if the non error branch happens to want to return an exception object. |
This is useful in situations where the programmer is OK with a certain amount of errors. For example, a search engine, implementing a distributed map-reduce may be OK displaying the results (instead of an error) if the number of failed calls were less than a certain threshold. Or you would want to a really long running As another example, there is an open issue for always completing the entire CI test suite even when we encounter a failure. This is a convenience wrapper for a functionality that users can always code into the mapping function. |
These cases probably shouldn't be returning exceptions inline among the results though. Maybe a NullableArray for results with an auxiliary Any-array to hold any exceptions that occur would have more predictable behavior. |
The exceptions array will have to be the same length as the NullableArray - in order to maintain the same indexes as the jobs array and easily track failed jobs - or we return an array of tuples Only if |
Yes, while you can test |
I don't think there is a compelling reason to have re-introduce If doing witherr(f) = (args...) -> try (f(args...), nothing) catch ex; (nothing, ex) end
julia> v, err = witherr(string)(7)
("7",nothing)
julia> v, err = witherr(error)(7)
(nothing,ErrorException("7"))
pmap(witherr(f), c) |
The scenarios which I described above requires |
See #15975 |
Third step in Simplifying and generalising pmap (as per #14843).
See also #15058 StreamMapIterator (was asyncmap) and #15073 WorkerPool and remote
@catch
@catch
takes a function and returns a lambda that catches any exceptions thrown by the function and returns the exception object.retry
retry
takes a function and returns a lambda that retries the function 3 times if an error occurspartition(collection, n)
partition(collection, n)
returns ann
elements at a time iterator over collection.batchsplit(c; min_batch_count=1, max_batch_size=100)
Split a collection into at least
min_batch_count
batches.Equivalent to
partition(c, max_batch_size)
whenlength(c) >> max_batch_size
.asyncgenerate and asyncmap
Generate or map using parallel tasks.
pgenerate and pmap
Generate or map using parallel workers and tasks.