-
-
Notifications
You must be signed in to change notification settings - Fork 5.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
deprecate produce, consume and task iteration #19841
Conversation
if topdown | ||
produce(root, dirs, files) | ||
put!(chnl, (root, dirs, files)) | ||
yield() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Won't put!
yield automatically since the channel is unbuffered?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
put!
only had a notify
and not an explicit yield. Have added a yield
to put!
now.
Looks pretty good. Having a method of |
85e5c66
to
250bcb8
Compare
Updates:
Please do review and provide feedback. I'll update NEWS and add a couple of code examples before a final merge. |
@@ -950,6 +950,7 @@ export | |||
sizeof, | |||
|
|||
# tasks and conditions | |||
channeled_tasks, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this going to need to be called often? does it need to be exported?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As a convenience function, Yes and Yes. Makes it easy to link related tasks with channels in a single call. Effectively performs
- create a set of Channels
- create a set of tasks
- bind tasks to channels
- schedule tasks
in a single call.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think starting N tasks with N channels is the common case. It would be simpler to have more of a drop-in replacement for reading values from a single task like produce and consume do now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was inspired by the usage in test/worlds.jl
which used produce/consume in a bi-directional communication way (which would need to be simulated by 2 channels). I think a common case would actually be starting a bunch of worker tasks reading of an input channel and writing to an output channel.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can remove the export for now. Can revisit it later based on usage. For now I'll add a channel(T, size, function) -> (Channel, Task)
convenience function which creates and binds a channel to a 1-arg (the created channel object) function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that lowercase channel
will be a new export. Will need a new function as we have to return a tuple of channel and task. This need prevents us from adding this functionality to either channel/task constructors, or even adding a keyword option to schedule
isbuffered(c) ? put_buffered(c,v) : put_unbuffered(c,v) | ||
yield() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe should only do this if notify
woke up any tasks?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK.
@@ -20,7 +20,7 @@ Other constructors: | |||
type Channel{T} <: AbstractChannel | |||
cond_take::Condition # waiting for data to become available | |||
cond_put::Condition # waiting for a writeable slot | |||
state::Symbol | |||
state::Tuple |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be cleaner to have a separate field for the exception.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK.
127e541
to
b289d1c
Compare
Updated.
A new Channel constructor Docs and tests have been updated. |
Is that really true? Couldn't there be a task blocked on something else, that hasn't gotten to its |
For unbuffered there is no intermediate storage in the Channel. The Currently implemented as
Only when we have at least one other task on |
Ok, I see. |
bc6941c
to
c3c2273
Compare
Segfault in Will keep the PR open for a couple more days for feedback before a final conflict resolution, CI run and merge. |
c3c2273
to
47962ae
Compare
@@ -187,6 +189,9 @@ Deprecated or removed | |||
functions (`airyai`, `airybi`, `airyaiprime`, `airybiprimex`, `airyaix`, `airybix`, | |||
`airyaiprimex`, `airybiprimex`) ([#18050]). | |||
|
|||
* `produce`, `consume` and iteration over a Task object has been deprecated in favor of |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
have been deprecated
Channel(func::Function; ctype=Any, csize=0, taskref=nothing) | ||
|
||
Creates a new task from `func`, binds it to a new channel of type | ||
`ctype` and size `csize`, schedules the task, all in a single call. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and schedules the task
|
||
julia> chnl = Channel(c->(@show take!(c)); taskref=taskref); | ||
|
||
julia> task = taskref[]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a really awkward API
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Think of it is pass-by-reference in C. Most times since the channel is bound to the task, the user will not need the task object. But if required, can be retrieved.
The alternative would be to export a new function which would return a tuple of (Channel, Task)
, which in the common case would require a [1]
to retrieve the Channel. A little inconvenient.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Julia isn't C, I can't think of any Julia APIs that do this. A separate function for this would make more sense, might not even need to be exported if it isn't used very often. Is there any other way of identifying whether or not a Channel has a task bound to it? Something like being able to ask for boundtasks(c::Channel)
might be useful if possible
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nope, no Julia APIs do this yet, but it may catch on.
If a need is felt for boundtasks(c::Channel)
, it will need additional housekeeping in the Channel object. Not currently done (task local storage in the bound task is used to record a WeakRef to the channel).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but it may catch on.
I seriously hope not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would Channel(t::Task)
work? we really shouldn't be trying to imitate varargout here when there's already another clearer way of accomplishing the same result
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, since Task(f)
expects a 0-arg function and Channel(f)
expects a 1-arg function which is the created Channel itself. As this replaces task-iteration with a bound channel iteration, it is important that the task get early access to the channel and any early errors raised in the task are propagated to the channel.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
calling bind separately is sufficient though?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, but you typically need to ensure that the Task is bound before it is scheduled to handle early errors in the task. For example,
c = Channel()
t = @schedule (foo(); while(some_condition); put!(c,data); end)
bind(c,t)
for x in c
bar(x)
end
can lead to the for-loop missing any exception thrown by foo
. Actually, it is currently not an issue since @chedule
does not yield, but it is safe to assume that this behavior can change in the future.
A safer way is
c = Channel()
t = Task(()->(foo(); while(some_condition); put!(c,data); end))
bind(c,t)
schedule(t)
for x in c
bar(x)
end
With the special Channel constructor, the above code block is equivalent to
for x in Channel(c->(foo(); while(some_condition); put!(c,data); end))
bar(x)
end
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not saying the latter isn't useful, but if needing to use the task afterwards is uncommon I think the former pattern is clearer and more idiomatic than passing a reference argument.
schedule(task) | ||
yield() | ||
|
||
isa(taskref, Ref{Task}) && (taskref.x = task) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isn't taskref[] = task
a more recommended way of writing this, in case the representation of Ref
ever changes?
|
||
When a channel is bound to multiple tasks, the first task to terminate will | ||
close the channel. When multiple channels are bound to the same task, | ||
termination of the task will close all channels. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
all of the bound channels, not absolutely all channels that might exist
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is obvious it refers to the "multiple channels" mentioned before.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No it is not obvious, it sounds confusing since it's missing an article or qualifier.
the consuming task is what makes [`produce()`](@ref) easier to use than [`yieldto()`](@ref). | ||
of switching back, can require considerable coordination. For example, [`put!()`](@ref) and [`take!()`](@ref) | ||
are blocking operations, which, when used in the context of channels maintain state to remember | ||
who the consumers is. Not needing to manually keep track of the consuming task is what makes [`put!()`](@ref) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"consumer is" or "consumers are"
@test_throws ErrorException fetch(cs[i]) | ||
end | ||
|
||
# Multiple tasks, first one to terminate, closes the channel |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the sentence shouldn't be interrupted with the second comma
cond = Condition() | ||
tf3(i) = begin | ||
if i == nth | ||
ref.x = i |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wouldn't ref[] = i
be the documented api for this?
tasks = [Task(()->tf3(i)) for i in 1:5] | ||
c = Channel(N) | ||
foreach(t->bind(c,t), tasks) | ||
foreach(t->schedule(t), tasks) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
t->schedule(t)
redundant
foreach(t->schedule(t), tasks) | ||
@test_throws InvalidStateException wait(c) | ||
@test !isopen(c) | ||
@test ref.x == nth |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ref[] == nth
Hi
|
You can get the previous behavior with a pair of channels. See Lines 96 to 113 in 1303dfb
Base.channeled_tasks is a documented (REPL help, not HTML docs), but unexported convenience function for setting things up. Both the behavior and name of Base.channeled_tasks will change in the future when an equivalent may be exported.
You should Your requirement of a direct replacement for |
I see. I will open an issue. |
I don't know if your use case allows for this, but task switches can be reduced by using a buffered Channel.
Performance of the new model i.e. unbuffered channels and task switching should be looked into. You could mention this in the issue you intend to open. |
My use case does not allow the use of a buffered |
Fixes deprecation warnings introduced in: JuliaLang/julia#19841 Changes an API interface: -function Graph(nvg::Int, neg::Int, edgestream::Task) +function Graph(nvg::Int, neg::Int, edgestream::Channel) Iteration over Tasks is deprecated so now we iterate over the Channel.
Fixes deprecation warnings introduced in: JuliaLang/julia#19841 Changes an API interface: -function Graph(nvg::Int, neg::Int, edgestream::Task) +function Graph(nvg::Int, neg::Int, edgestream::Channel) Iteration over Tasks is deprecated so now we iterate over the Channel.
* benchmarks * add edgetype benchmark katz centrality is broken. * simplegraphs abstraction * Edge is no longer a Pair * pkgbenchmarks * f * remove data files from benchmarks * simplegraphs, take 2 * more changes * reshuffle * fix tests * more tests * abstractions * more tests * tests and fixes * trait fixes and tests - unrolling * persistence and floyd-warshall * make(di)graphs, through spanningtrees * moved cliques, testing through connectivity.jl * @jpfairbanks first round of review * another fix * all tests * new simpletraits * first cut at 0.6 compat * squash * update randgraphs.jl to use Channels over Tasks Fixes deprecation warnings introduced in: JuliaLang/julia#19841 Changes an API interface: -function Graph(nvg::Int, neg::Int, edgestream::Task) +function Graph(nvg::Int, neg::Int, edgestream::Channel) Iteration over Tasks is deprecated so now we iterate over the Channel. * got rid of tasks in randgraphs * graph -> g * Add tutorials to section on docs (#547) * Update README.md * Update README.md Made tutorials separate line and consistent with the other lines. * type -> mutable struct * more type -> mutable struct, plus OF detection for add_vertex! * foo{T}(x::T) -> foo(x::T) where T * test negative cycles * test coverage * manual cherry-pick of #551 * simplegraph/ -> simplegraphs, optimization for is_connected, some type simplifications * re-add b-f tests * Inferred (#554) * core * @inferred wherever possible * empty -> zero * test grid periodic=true * oops * redo graphmatrices tests * linalg test fix * loosen type restrictions in randgraphs functions * readall -> readstring, and comment rationalization in randgraphs * Fixes #555: graphmatrices convert incorrect on CA (#560) CombinatorialAdjacency(CombinatorialAdjacency(g)) was returning the backing storage. Fix includes tests. * fixes #564 * one more test * removed nv() and vertices() (#565) * simpleedge tests * test coverage * short circuit B-F negative cycles, plus tests * more test coverage * more test coverage * Docs (#567) * docs, plus some various fixes (see blocking_flow). * nodes -> vertices, node -> vertex, and more doc consistency * doc fixes * 1.0 -> 0.8 * docfix and benchmarks * doc fixes
Closes #17699
This deprecates
produce
,consume
and Task iteration. These are not always directly replaceable withput!
andtake!
on Channels (As can be seen in the modified test files). This is becauseproduce
is effectively a put-and-take! operationconsume
is a take-and-put! operationconsume
In order to maintain an equivalence for the last point above, the PR introduces a
close(c::Channel, t::Task)
which closes a Channel automatically when the task exits, and also propagates any errors in the Task to waiters on the Channel. Currently it is done via extending the task_done_hook to also check for registered hook functions which are recorded in the TLS. We can look at a different mechanism when we fully remove the produce/consume machinery in the next cycle.Note that
close(c::Channel, t::Task)
does not close the channel immediately. It effectively links the lifetime of the channel to that of the task. We can consider a different name (will have to be exported) if this is found confusing.walkdir
which used to return an iterable task now returns a channel instead.Pending items are: