Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature request: allow updating of progress bar from functions launched on separate workers. #125

Open
affans opened this issue Jan 5, 2019 · 8 comments · May be fixed by #157
Open

feature request: allow updating of progress bar from functions launched on separate workers. #125

affans opened this issue Jan 5, 2019 · 8 comments · May be fixed by #157

Comments

@affans
Copy link

affans commented Jan 5, 2019

Consider this following MWE which works but not as intended.

using Distributed
addprocs(2)
@everywhere using ProgressMeter

p = Progress(1000, barglyphs=BarGlyphs("[=> ]"))

@everywhere function simulation(s)  
   for i = 1:100
        sleep(0.01)                
   end
end

progress_pmap(x -> simulation(x), 1:10, progress=p, channel_bufflen=1000) 

The intention: Suppose the simulation function is a long-running function that can provide it's progress by utilizing the internal for loop. Further suppose that simulation is independently launched on multiple processes by pmap. The goal is to have a more granular progress bar that gets updated at each iteration in the for loop for each simulation. Thus, by a simple calculation the maximum length of a progress bar is of length 1000 where each unit of the progress bar is updated by the for loop in simulation. In other words, 10 simulation processes all updating the progress bar through their internal for loops one unit at a time. This allows monitor progress of long running computations.

PmapProgressMeters.jl did this by passing simulation a callback function that updated the Progress bar on the main process.

However, I am trying to think of a solution involving Channels and RemoteChannel although I don't have the expertise of how this stuff works yet.

@affans
Copy link
Author

affans commented Jan 5, 2019

I tried

@everywhere function main(s)  
   for i = 1:100
        remotecall(put!(channel, true), 1)
        sleep(0.01)                
   end
end

but I get

On worker 3:
UndefVarError: channel not defined

which dosn't make sense to me. According to the source code,

vals = mapfun(other_args...; kwargs...) do x...
                val = f(x...)
                put!(channel, true)
                return val
end

the line put!(channel, true) is indeed part of a function that is launched on separate workers. Am I seeing a scoping issue here?

@zsunberg
Copy link
Collaborator

zsunberg commented Jan 6, 2019

The reason for the error is that channel is not a defined symbol on the other worker processes. @everywhere simply executes the code on all the processes, and channel does not exist on those processes. Also note that

@everywhere channel = RemoteChannel(()->Channel{Bool}(10), 1)

will not work because it will create a new channel for each process.

I haven't been able to find documentation on this, but apparently anonymous functions like those created with the do or -> syntax behave differently wrt serialization and availability on multiple processes.

addprocs(2)
f(x) = x^2
pmap(f, 1:10)

will throw an error, but

addprocs(2)
f = x -> x^2
pmap(f, 1:10)

will work fine.

Anyways, workarounds are to (1) pass the channel as an argument to the function, or (2) use

main = s-> begin
   for i = 1:100
        remotecall(put!(channel, true), 1)
        sleep(0.01)                
   end
end

Sorry I don't know more about why anonymous functions work differently.

@affans
Copy link
Author

affans commented Jan 6, 2019

This might be a bug.. let's see if someone replies on discourse. You should clarify your post that both functions work fine when @everywhere()ed but the anon will work on all processes even if its not @everywhereed.

@affans
Copy link
Author

affans commented Jun 28, 2019

I'll see if I can figure this out over the next few days. This feature would be great for my workflow.

@marius311
Copy link
Contributor

marius311 commented Jun 13, 2021

I have the same feature request. The code in the readme is basically perfect, it would just be nice if all that boilerplate could be hidden inside a parallel progress bar type like,

p = DistributedProgress(100)

pmap(1:10) do i
    for j=1:10
        ...
        next!(p)
    end
end

finish!(p)

and the whole RemoteChannel thing would be set up for you (additionally, if it worked with showvalues).

@marius311
Copy link
Contributor

Actually, I hacked together the following, which allows next! and update! from any worker. Is this approach something that you'd be interested in having as a PR?

struct DistributedProgress <: ProgressMeter.AbstractProgress
    channel :: RemoteChannel{Channel{Any}}
    pbar :: Progress
end

function DistributedProgress(args...; kwargs...)
    pbar = Progress(args...; kwargs...)
    channel = RemoteChannel(()->Channel(), 1)
    @async begin
        while (x = take!(channel)) != nothing
            func, args, kwargs = x
            func(pbar, args...; kwargs...)
        end
        finish!(pbar)
    end
    DistributedProgress(channel, pbar)
end

ProgressMeter.next!(pbar::DistributedProgress, args...; kwargs...) = put!(pbar.channel, (ProgressMeter.next!, args, kwargs))
ProgressMeter.update!(pbar::DistributedProgress, args...; kwargs...) = put!(pbar.channel, (ProgressMeter.update!, args, kwargs))
ProgressMeter.finish!(pbar::DistributedProgress, args...; kwargs...) = (put!(pbar.channel, nothing); close(pbar.channel))

@IanButterworth
Copy link
Collaborator

Absolutely. Please PR and add tests if you can

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants