Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

deprecate produce, consume and task iteration #19841

Merged
merged 1 commit into from
Jan 12, 2017
Merged

Conversation

amitmurthy
Copy link
Contributor

Closes #17699

This deprecates produce, consume and Task iteration. These are not always directly replaceable with put! and take! on Channels (As can be seen in the modified test files). This is because

  • produce is effectively a put-and-take! operation
  • consume is a take-and-put! operation
  • Errors in tasks are propagated to waiters on a consume

In order to maintain an equivalence for the last point above, the PR introduces a close(c::Channel, t::Task) which closes a Channel automatically when the task exits, and also propagates any errors in the Task to waiters on the Channel. Currently it is done via extending the task_done_hook to also check for registered hook functions which are recorded in the TLS. We can look at a different mechanism when we fully remove the produce/consume machinery in the next cycle.

Note that close(c::Channel, t::Task) does not close the channel immediately. It effectively links the lifetime of the channel to that of the task. We can consider a different name (will have to be exported) if this is found confusing.

walkdir which used to return an iterable task now returns a channel instead.

Pending items are:

  • tests specific to the changes introduced.
  • doc and NEWS updates

if topdown
produce(root, dirs, files)
put!(chnl, (root, dirs, files))
yield()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't put! yield automatically since the channel is unbuffered?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

put! only had a notify and not an explicit yield. Have added a yield to put! now.

@JeffBezanson
Copy link
Member

Looks pretty good. Having a method of close that doesn't close something immediately is a bit confusing; we should think about a better spelling for that. Maybe it should be a constructor of Task or Channel, or something else that lets you make a task and a channel for talking to it in one step.

@tkelman tkelman added the needs news A NEWS entry is required for this change label Jan 4, 2017
@kshyatt kshyatt added io Involving the I/O subsystem: libuv, read, write, etc. parallelism Parallel or distributed computation labels Jan 4, 2017
@amitmurthy
Copy link
Contributor Author

Updates:

  • Reusing the verb bind, to bind a Channel to a Task now. This links the open lifetime of a Channel to that of the Task.

  • New convenience function channeled_tasks to bind a set of Channels to a set of newly created Task objects from functions in a single call.

Please do review and provide feedback.

I'll update NEWS and add a couple of code examples before a final merge.

@@ -950,6 +950,7 @@ export
sizeof,

# tasks and conditions
channeled_tasks,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this going to need to be called often? does it need to be exported?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a convenience function, Yes and Yes. Makes it easy to link related tasks with channels in a single call. Effectively performs

  • create a set of Channels
  • create a set of tasks
  • bind tasks to channels
  • schedule tasks

in a single call.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think starting N tasks with N channels is the common case. It would be simpler to have more of a drop-in replacement for reading values from a single task like produce and consume do now.

Copy link
Contributor Author

@amitmurthy amitmurthy Jan 5, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was inspired by the usage in test/worlds.jl which used produce/consume in a bi-directional communication way (which would need to be simulated by 2 channels). I think a common case would actually be starting a bunch of worker tasks reading of an input channel and writing to an output channel.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can remove the export for now. Can revisit it later based on usage. For now I'll add a channel(T, size, function) -> (Channel, Task) convenience function which creates and binds a channel to a 1-arg (the created channel object) function.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that lowercase channel will be a new export. Will need a new function as we have to return a tuple of channel and task. This need prevents us from adding this functionality to either channel/task constructors, or even adding a keyword option to schedule

isbuffered(c) ? put_buffered(c,v) : put_unbuffered(c,v)
yield()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe should only do this if notify woke up any tasks?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK.

@@ -20,7 +20,7 @@ Other constructors:
type Channel{T} <: AbstractChannel
cond_take::Condition # waiting for data to become available
cond_put::Condition # waiting for a writeable slot
state::Symbol
state::Tuple
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be cleaner to have a separate field for the exception.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK.

@amitmurthy amitmurthy force-pushed the amitm/deppc2 branch 2 times, most recently from 127e541 to b289d1c Compare January 6, 2017 14:12
@amitmurthy
Copy link
Contributor Author

Updated.

notify now returns the number of tasks woken up and this is used to check if put! on unbuffered channels should yield. I'll change the check to an assertion since on unbuffered channels, there has to be at least one waiter woken up anyway. I had the same check (and yield) for buffered channels but that was causing an issue with InterruptException handling in the REPL. Have removed it for now.

A new Channel constructor Channel(func::Function; ctype=Any, csize=0, taskref=nothing) has been added which creates, binds and schedules a channel bound task. A reference to the created task can be retrieved by specifying keyword arg taskref. If specified with a Ref{Task}(), the task object is returned via the Ref object. This does away the need for any new exports.

Docs and tests have been updated.

@JeffBezanson
Copy link
Member

on unbuffered channels, there has to be at least one waiter woken up

Is that really true? Couldn't there be a task blocked on something else, that hasn't gotten to its take! call yet?

@amitmurthy
Copy link
Contributor Author

For unbuffered there is no intermediate storage in the Channel. The put! waits till there is a taker and sends the value via a Condition variable.

Currently implemented as

function put_unbuffered(c::Channel, v)
    while length(c.takers) == 0
        notify(c.cond_take, nothing, true, false)  # Required to handle wait() on 0-sized channels
        wait(c.cond_put)
    end
    cond_taker = shift!(c.takers)
    notify(cond_taker, v, false, false) > 0 && yield()
    v
end

Only when we have at least one other task on take! does the loop exit.

@JeffBezanson
Copy link
Member

Ok, I see.

@kshyatt kshyatt added the deprecation This change introduces or involves a deprecation label Jan 8, 2017
@amitmurthy amitmurthy force-pushed the amitm/deppc2 branch 2 times, most recently from bc6941c to c3c2273 Compare January 9, 2017 14:08
@amitmurthy amitmurthy changed the title RFC/WIP: deprecate produce, consume and task iteration deprecate produce, consume and task iteration Jan 9, 2017
@amitmurthy
Copy link
Contributor Author

Segfault in cmdlineargs appears unrelated to this PR. Anyone come across it recently?

Will keep the PR open for a couple more days for feedback before a final conflict resolution, CI run and merge.

@amitmurthy amitmurthy merged commit 7b7b4e1 into master Jan 12, 2017
@amitmurthy amitmurthy deleted the amitm/deppc2 branch January 12, 2017 10:07
@@ -187,6 +189,9 @@ Deprecated or removed
functions (`airyai`, `airybi`, `airyaiprime`, `airybiprimex`, `airyaix`, `airybix`,
`airyaiprimex`, `airybiprimex`) ([#18050]).

* `produce`, `consume` and iteration over a Task object has been deprecated in favor of
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have been deprecated

Channel(func::Function; ctype=Any, csize=0, taskref=nothing)

Creates a new task from `func`, binds it to a new channel of type
`ctype` and size `csize`, schedules the task, all in a single call.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and schedules the task


julia> chnl = Channel(c->(@show take!(c)); taskref=taskref);

julia> task = taskref[];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a really awkward API

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Think of it is pass-by-reference in C. Most times since the channel is bound to the task, the user will not need the task object. But if required, can be retrieved.

The alternative would be to export a new function which would return a tuple of (Channel, Task), which in the common case would require a [1] to retrieve the Channel. A little inconvenient.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Julia isn't C, I can't think of any Julia APIs that do this. A separate function for this would make more sense, might not even need to be exported if it isn't used very often. Is there any other way of identifying whether or not a Channel has a task bound to it? Something like being able to ask for boundtasks(c::Channel) might be useful if possible

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, no Julia APIs do this yet, but it may catch on.

If a need is felt for boundtasks(c::Channel), it will need additional housekeeping in the Channel object. Not currently done (task local storage in the bound task is used to record a WeakRef to the channel).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but it may catch on.

I seriously hope not.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would Channel(t::Task) work? we really shouldn't be trying to imitate varargout here when there's already another clearer way of accomplishing the same result

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, since Task(f) expects a 0-arg function and Channel(f) expects a 1-arg function which is the created Channel itself. As this replaces task-iteration with a bound channel iteration, it is important that the task get early access to the channel and any early errors raised in the task are propagated to the channel.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

calling bind separately is sufficient though?

Copy link
Contributor Author

@amitmurthy amitmurthy Jan 13, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but you typically need to ensure that the Task is bound before it is scheduled to handle early errors in the task. For example,

c = Channel()
t = @schedule (foo(); while(some_condition); put!(c,data); end)
bind(c,t)
for x in c
  bar(x)
end 

can lead to the for-loop missing any exception thrown by foo. Actually, it is currently not an issue since @chedule does not yield, but it is safe to assume that this behavior can change in the future.

A safer way is

c = Channel()
t = Task(()->(foo(); while(some_condition); put!(c,data); end))
bind(c,t)
schedule(t)
for x in c
  bar(x)
end 

With the special Channel constructor, the above code block is equivalent to

for x in Channel(c->(foo(); while(some_condition); put!(c,data); end))
  bar(x)
end

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not saying the latter isn't useful, but if needing to use the task afterwards is uncommon I think the former pattern is clearer and more idiomatic than passing a reference argument.

schedule(task)
yield()

isa(taskref, Ref{Task}) && (taskref.x = task)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't taskref[] = task a more recommended way of writing this, in case the representation of Ref ever changes?


When a channel is bound to multiple tasks, the first task to terminate will
close the channel. When multiple channels are bound to the same task,
termination of the task will close all channels.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all of the bound channels, not absolutely all channels that might exist

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is obvious it refers to the "multiple channels" mentioned before.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No it is not obvious, it sounds confusing since it's missing an article or qualifier.

the consuming task is what makes [`produce()`](@ref) easier to use than [`yieldto()`](@ref).
of switching back, can require considerable coordination. For example, [`put!()`](@ref) and [`take!()`](@ref)
are blocking operations, which, when used in the context of channels maintain state to remember
who the consumers is. Not needing to manually keep track of the consuming task is what makes [`put!()`](@ref)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"consumer is" or "consumers are"

@test_throws ErrorException fetch(cs[i])
end

# Multiple tasks, first one to terminate, closes the channel
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the sentence shouldn't be interrupted with the second comma

cond = Condition()
tf3(i) = begin
if i == nth
ref.x = i
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wouldn't ref[] = i be the documented api for this?

tasks = [Task(()->tf3(i)) for i in 1:5]
c = Channel(N)
foreach(t->bind(c,t), tasks)
foreach(t->schedule(t), tasks)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

t->schedule(t) redundant

foreach(t->schedule(t), tasks)
@test_throws InvalidStateException wait(c)
@test !isopen(c)
@test ref.x == nth
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ref[] == nth

amitmurthy added a commit that referenced this pull request Jan 13, 2017
amitmurthy added a commit that referenced this pull request Jan 13, 2017
@BenLauwens
Copy link

Hi

consume and produce are heavily used in SimJulia. As @amitmurthy has stated they are not always directly replaceable with take! and put!. I really need take_and_put! and put_and_take!. Do you already have best practices to show how I can recover with channels the old behaviour? e.g. if a Task stops with a return I like to recover the return value ... I have not found how I can do this with a channel.
Thanks

@amitmurthy
Copy link
Contributor Author

You can get the previous behavior with a pair of channels. See

julia/test/worlds.jl

Lines 96 to 113 in 1303dfb

# test oldworld call / inference
function wfunc(c1,c2)
while true
(f, args) = take!(c1)
put!(c2, f(args...))
end
end
function put_n_take!(v...)
put!(chnls[1], v)
take!(chnls[2])
end
g265() = [f265(x) for x in 1:3.]
wc265 = world_counter()
f265(::Any) = 1.0
@test wc265 + 1 == world_counter()
chnls, tasks = Base.channeled_tasks(2, wfunc)
t265 = tasks[1]
for an example. Base.channeled_tasks is a documented (REPL help, not HTML docs), but unexported convenience function for setting things up. Both the behavior and name of Base.channeled_tasks will change in the future when an equivalent may be exported.

if a Task stops with a return I like to recover the return value ...

You should put! to the bound channel before returning. Any exceptions thrown by the task will be propagated and rethrown in the task waiting on the bound channel.

Your requirement of a direct replacement for consume and produce via an equivalent take_and_put! and put_and_take! using Channels will probably be a good feature addition. Would you care to open an issue as a feature request for that?

@BenLauwens
Copy link

I see. I will open an issue.
I have been playing with the new construct and I have done some benchmarking (see gist for the code).
Results on OS X
Julia v0.5:
yieldto
0.003023 seconds (10.13 k allocations: 165.266 KB)
schedule_and_wait
0.003329 seconds (10.01 k allocations: 157.781 KB)
produce
0.004892 seconds (20.01 k allocations: 314.547 KB)
statemachine
0.000151 seconds (10.01 k allocations: 156.438 KB)
Julia v0.6-dev:
yieldto
0.002697 seconds (10.11 k allocations: 164.781 KiB)
schedule_and_wait
0.003329 seconds (10.01 k allocations: 157.781 KiB)
channel
0.061945 seconds (60.03 k allocations: 1.835 MiB)
statemachine
0.000147 seconds (10.01 k allocations: 156.438 KiB)
put! and take! on a channel do 3 times more allocations and are more than one order of magnitude slower than produce and consume.
yieldto gives the best result when using Tasks but compared to a transformation to a state machine it is still very slow. I understand the slowness is inherent by doing the context switch and copying the stack. I have already implemented a macro that translates a coroutine (a normal function with a yield) to a state machine as shown in the last test. This is also the way that coroutines are implemented in c#. If this sounds interesting I will make a package that implements very lightweight coroutines.

@amitmurthy
Copy link
Contributor Author

I don't know if your use case allows for this, but task switches can be reduced by using a buffered Channel.

fib = Channel(fibonnaci_channel; ctype=Float64, csize=20) results in a fairly large speedup. 0.08 to 0.002 seconds for your channel test case on my machine.

Performance of the new model i.e. unbuffered channels and task switching should be looked into. You could mention this in the issue you intend to open.

@BenLauwens
Copy link

My use case does not allow the use of a buffered Channel. Using a pair of Channels makes the performance even worse. I will definitely open an issue ... (beginning of next week ... I am running out of time for other stuff;)

jpfairbanks added a commit to sbromberger/LightGraphs.jl that referenced this pull request Mar 13, 2017
Fixes deprecation warnings introduced in:

  JuliaLang/julia#19841

Changes an API interface:

  -function Graph(nvg::Int, neg::Int, edgestream::Task)
  +function Graph(nvg::Int, neg::Int, edgestream::Channel)

Iteration over Tasks is deprecated so now we iterate over the Channel.
sbromberger pushed a commit to sbromberger/LightGraphs.jl that referenced this pull request Mar 19, 2017
Fixes deprecation warnings introduced in:

  JuliaLang/julia#19841

Changes an API interface:

  -function Graph(nvg::Int, neg::Int, edgestream::Task)
  +function Graph(nvg::Int, neg::Int, edgestream::Channel)

Iteration over Tasks is deprecated so now we iterate over the Channel.
sbromberger added a commit to sbromberger/LightGraphs.jl that referenced this pull request Mar 29, 2017
* benchmarks

* add edgetype benchmark

katz centrality is broken.

* simplegraphs abstraction

* Edge is no longer a Pair

* pkgbenchmarks

* f

* remove data files from benchmarks

* simplegraphs, take 2

* more changes

* reshuffle

* fix tests

* more tests

* abstractions

* more tests

* tests and fixes

* trait fixes and tests - unrolling

* persistence and floyd-warshall

* make(di)graphs, through spanningtrees

* moved cliques, testing through connectivity.jl

* @jpfairbanks first round of review

* another fix

* all tests

* new simpletraits

* first cut at 0.6 compat

* squash

* update randgraphs.jl to use Channels over Tasks

Fixes deprecation warnings introduced in:

  JuliaLang/julia#19841

Changes an API interface:

  -function Graph(nvg::Int, neg::Int, edgestream::Task)
  +function Graph(nvg::Int, neg::Int, edgestream::Channel)

Iteration over Tasks is deprecated so now we iterate over the Channel.

* got rid of tasks in randgraphs

* graph -> g

* Add tutorials to section on docs (#547)

* Update README.md

* Update README.md

Made tutorials separate line and consistent with the other lines.

* type -> mutable struct

* more type -> mutable struct, plus OF detection for add_vertex!

* foo{T}(x::T) -> foo(x::T) where T

* test negative cycles

* test coverage

* manual cherry-pick of #551

* simplegraph/ -> simplegraphs, optimization for is_connected, some type simplifications

* re-add b-f tests

* Inferred (#554)

* core

* @inferred wherever possible

* empty -> zero

* test grid periodic=true

* oops

* redo graphmatrices tests

* linalg test fix

* loosen type restrictions in randgraphs functions

* readall -> readstring, and comment rationalization in randgraphs

* Fixes #555: graphmatrices convert incorrect on CA (#560)

CombinatorialAdjacency(CombinatorialAdjacency(g)) was returning the backing storage.
Fix includes tests.

* fixes #564

* one more test

* removed nv() and vertices() (#565)

* simpleedge tests

* test coverage

* short circuit B-F negative cycles, plus tests

* more test coverage

* more test coverage

* Docs (#567)

* docs, plus some various fixes (see blocking_flow).

* nodes -> vertices, node -> vertex, and more doc consistency

* doc fixes

* 1.0 -> 0.8

* docfix and benchmarks

* doc fixes
@Sacha0 Sacha0 removed the needs news A NEWS entry is required for this change label May 12, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
deprecation This change introduces or involves a deprecation io Involving the I/O subsystem: libuv, read, write, etc. parallelism Parallel or distributed computation
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Deprecate produce-consume
6 participants