Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

functions are serialised at every batch with pmap and pgenerate #16345

Closed
oxinabox opened this issue May 13, 2016 · 8 comments
Closed

functions are serialised at every batch with pmap and pgenerate #16345

oxinabox opened this issue May 13, 2016 · 8 comments
Labels
parallelism Parallel or distributed computation performance Must go faster

Comments

@oxinabox
Copy link
Contributor

oxinabox commented May 13, 2016

So I was trying to workout why my parallel code was taking so long.
After-all I only sent the big datastructures once, though a closure as the function that was mapped over.
That should happen, once (I thought), since the function is constant
Not so

MWE:

addprocs(5)

immutable Foo
    bar::Char
end

#HACK: lets debug what is being serialised by overloading the calls
function Base.serialize(s::Base.SerializationState, x::Foo)
    tic()
    Base.Serializer.serialize_any(s,x)
    tt=toq()
    open("ser_log.txt","a") do fp
        println(fp, tt)
    end
end

function test()
    st = Foo(rand('a':'z')) 
    pmap(r->string(st.bar)^r, 1:100)
    #Base.pgenerate(default_worker_pool(), r->string(st.bar)^r, 1:100) |> collect
end

Then running the function and counting the lines in the log:

test()
run(`wc -l ser_log.txt`)
OUT> 17

So it was serialized 17 times for pmap.
If is switch to pgenerate is it 18 times (so about the same).
I believe that after the batchsplit step is done that is once serialisation of the closure, per batch that was sent.
It only need to be serialized once.

(in my nonMWE, it is happening millions of times, and takes 6 seconds a piece...)


I suspect this is already known, but I can't find an issue for it, so maybe not.

see also:


versioninfo()
Julia Version 0.5.0-dev+3928
Commit fdc4a85 (2016-05-06 04:46 UTC)
Platform Info:
  System: Linux (x86_64-linux-gnu)
  CPU: AMD Opteron 63xx class CPU
  WORD_SIZE: 64
  BLAS: libopenblas (USE64BITINT DYNAMIC_ARCH NO_AFFINITY Piledriver)
  LAPACK: libopenblas64_
  LIBM: libopenlibm
  LLVM: libLLVM-3.7.1 (ORCJIT, bdver2
@oxinabox
Copy link
Contributor Author

Work Arounds

So goal here is to cut the serialisation down to once per worker.
Ideally it would be once ever, but that requires a delving down deeper.

Presending (Does Not Work, and IDK why)

Idea is we should first send the function into remote Futures.
Then replace the function we send with pgenerate with a lightweight function that does a lookup for the right (local) Future.
Instead of each time sending the orginal function (which could be a closure haulling tons of data).
Just send a call to fetch the function.

I really thought this would work, but it does not.
And that might be a bug in and of itself; or the cause of the bug reported above.

function _pgenerate(f,c)
    worker_ids = workers()
    r_fs = Dict{Int64,Base.AbstractRemoteRef}()
    for id in worker_ids
        r_fs[id]=Future(id)
        put!(r_fs[id], f)                
    end

    worker_pool = WorkerPool(worker_ids)

    #Give everyone the dict of functions on each worker
    #get them to retieve theirs
    #and execute it    
    Base.pgenerate(worker_pool, x->fetch(r_fs[myid()])(x), c)  
end

Does not help. Infact makes it worse, cos one extra call initially, per worker.

Global Variable Hack (Works, but leaks memory and is and scary)

This is kinda scary.
Instead of sending the function intailly, in a remote Future, and then fetching it.
Send a command to create a global variable with the function it it.
Then just use the function that is now declared remotely.

function _pgenerate_gh(f,c, mname=Symbol("_func_genmagic_hack"*string(rand(1:1024)))::Symbol)
    #Reusing the `mname` in subsequent called can A.) Reclaim memory, B.) Violate certain concurrency expectations
    worker_ids = workers()
    for id in worker_ids
        remotecall_wait(id, mname,f) do mname_i, f_i
            eval(Expr(:global, Expr(Symbol("="), mname_i, f_i)))
        end
    end

    worker_pool = WorkerPool(worker_ids)

    #Give send a function telling them to look up the function locally
    Base.pgenerate(worker_pool, x->eval(mname)(x), c)  
end

I feel unclean.

@amitmurthy
Copy link
Contributor

For this particular scenario we could support a batch_size=:max in #15975 which would result in only a single call to each worker.

Other alternatives (along the lines you are exploring)

  • create a global variable with only the data and use that. You will need to remember to clear it after the map though.
  • https://github.com/amitmurthy/ClusterDicts.jl has a simple approach of replicating data on all workers and storing them in a global dict and use a pre-defined named function that accesses data off the global dict. Still need to remember to clear data off the dict though.

cc: @samoconnor

@oxinabox
Copy link
Contributor Author

oxinabox commented May 13, 2016

A very different work around is what I thought was a good idea a couple of days ago.
Before I found pgenerate

http://codereview.stackexchange.com/questions/128058/using-remotechannels-for-an-parallel-iterable-map
(That code has problems, like lost of small sends)

The idea being that when you start you create on each worker a task that infinite loops, while reading one channel and writing to another. The infinite loop ends when a channel is closed.
Then the constant data portion - i.e. the expensive closure is only serialised in the first step when the loops on the workers are created.


Problem with batch_size=:max is that it assumed you can hold the whole Iterable you are running over in memory. Or at least a very substantial portion of it.
And that is not generally true.
(Although as I have just found out. I personally can afford to whole the whole Gigaword Corpus in RAM. Most people can't.)

@JeffBezanson
Copy link
Member

We should probably remember which functions we have sent to which workers and automatically avoid duplicating.

@amitmurthy
Copy link
Contributor

Not a bad idea. The mapping function is typically an anonymous function. pmap should cache it on each worker only for the duration of the call. Will need to store in a remote reference and manage its lifetime efficiently.

@JeffBezanson
Copy link
Member

Thinking in more detail, closures might be a separate issue. Internal to the serializer, we could avoid re-sending the code for functions, which is not very big but is slow to serialize. Avoiding re-sending closure data is harder, since it can be arbitrarily big and will need a cache management strategy as you point out.

@JeffBezanson JeffBezanson added performance Must go faster parallelism Parallel or distributed computation labels May 13, 2016
@oxinabox
Copy link
Contributor Author

So I improved my hack around the issue, to actually only serialize once.
By doing the serialization entirely outside the remote_calls, and just sending the serialised data.
It is annoying that the serialization code is stateful (rather than pure), which makes it hard to logic around -- I was trying to do this at a lower, more general level.

This, as with the last, is twice as fast as pmap, in the above test.
I can't using the MWE above detect a significant difference.
But I suspect that when it comes to my actual code.
Which hauls >10MB of data around within the closures, and has to send to 32 workers,
this will be pretty huge.

function _pgenerate_gh_sc(f,c, mname=Symbol("_func_genmagic_hack"*string(rand(UInt64)))::Symbol)
    #Reusing the `mname` in subsequent called can A.) Reclaim memory, B.) Violate certain concurrency expectations
    worker_ids = workers()

    s_f_buff = IOBuffer()
    serialize(s_f_buff, f)
    s_f = s_f_buff.data

    function make_global(mname_i, s_f_i)
        eval(Expr(:global, 
                    Expr(Symbol("="), mname_i, 
                        Expr(:call,:deserialize,
                            Expr(:call, :IOBuffer, s_f_i)
                            )
                        )
                 )
            )
    end

    @sync for id in worker_ids
        @async remotecall_wait(make_global, id, mname, s_f)
    end

    worker_pool = WorkerPool(worker_ids)

    #Give send a function telling them to look up the function locally
    Base.pgenerate(worker_pool, x->eval(mname)(x), c)  
end

@amitmurthy
Copy link
Contributor

closed by #16808

oscardssmith pushed a commit that referenced this issue Jul 28, 2023
…ool (#33892)

Once upon a time, there was a young julia user first getting started
with parallelism.
And she found it fearsomely slow.
And so she did investigate, and she did illuminate upon her issue.
Her closures, they were being reserialized again and again.
And so this young woman, she openned an issue #16345.
Lo and behold, a noble soul did come and resolve it,
by making the glorious `CachingPool()` in #16808.

3 long years a later this julia user did bravely return to the world of
parallism, with many battle worn scars.
and once more she did face the demon that is `pmap` over closures.
But to her folly, she felt no fear, for she believed the demon to be
crippled and chained by the glorious `CachingPool`.
Fearlessly, she threw his closure over 2GB of data into the maw of the
demon `pmap`.
But alas, alas indeed, she was wrong.
The demon remained unbound, and it slew her, and slew her again.
100 times did it slay her for 101 items was the user iterating upon. 
For the glorious chains of the the `CachingPool()` remains unused, left
aside in the users tool chest, forgotten.
vchuravy pushed a commit to JuliaLang/Distributed.jl that referenced this issue Oct 6, 2023
…ool (JuliaLang/julia#33892)

Once upon a time, there was a young julia user first getting started
with parallelism.
And she found it fearsomely slow.
And so she did investigate, and she did illuminate upon her issue.
Her closures, they were being reserialized again and again.
And so this young woman, she openned an issue JuliaLang/julia#16345.
Lo and behold, a noble soul did come and resolve it,
by making the glorious `CachingPool()` in JuliaLang/julia#16808.

3 long years a later this julia user did bravely return to the world of
parallism, with many battle worn scars.
and once more she did face the demon that is `pmap` over closures.
But to her folly, she felt no fear, for she believed the demon to be
crippled and chained by the glorious `CachingPool`.
Fearlessly, she threw his closure over 2GB of data into the maw of the
demon `pmap`.
But alas, alas indeed, she was wrong.
The demon remained unbound, and it slew her, and slew her again.
100 times did it slay her for 101 items was the user iterating upon. 
For the glorious chains of the the `CachingPool()` remains unused, left
aside in the users tool chest, forgotten.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
parallelism Parallel or distributed computation performance Must go faster
Projects
None yet
Development

No branches or pull requests

3 participants