Skip to content

Commit 9ec3d55

Browse files
committed
improve OncePer implementation
Address reviewer feedback, add more fixes and more tests, rename to add Once prefix.
1 parent b9638b7 commit 9ec3d55

File tree

7 files changed

+153
-107
lines changed

7 files changed

+153
-107
lines changed

NEWS.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,9 +69,9 @@ Multi-threading changes
6969
-----------------------
7070

7171
* New types are defined to handle the pattern of code that must run once per process, called
72-
a `PerProcess{T}` type, which allows defining a function that should be run exactly once
72+
a `OncePerProcess{T}` type, which allows defining a function that should be run exactly once
7373
the first time it is called, and then always return the same result value of type `T`
74-
every subsequent time afterwards. There are also `PerThread{T}` and `PerTask{T}` types for
74+
every subsequent time afterwards. There are also `OncePerThread{T}` and `OncePerTask{T}` types for
7575
similar usage with threads or tasks. ([#TBD])
7676

7777
Build system changes

base/docs/basedocs.jl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ runtime initialization functions of external C libraries and initializing global
153153
that involve pointers returned by external libraries.
154154
See the [manual section about modules](@ref modules) for more details.
155155
156-
See also: [`PerProcess`](@ref).
156+
See also: [`OncePerProcess`](@ref).
157157
158158
# Examples
159159
```julia

base/exports.jl

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -70,9 +70,9 @@ export
7070
OrdinalRange,
7171
Pair,
7272
PartialQuickSort,
73-
PerProcess,
74-
PerTask,
75-
PerThread,
73+
OncePerProcess,
74+
OncePerTask,
75+
OncePerThread,
7676
PermutedDimsArray,
7777
QuickSort,
7878
Rational,

base/lock.jl

Lines changed: 78 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -507,7 +507,7 @@ Create a level-triggered event source. Tasks that call [`wait`](@ref) on an
507507
After `notify` is called, the `Event` remains in a signaled state and
508508
tasks will no longer block when waiting for it, until `reset` is called.
509509
510-
If `autoreset` is true, at most one task will be released from `wait` for)
510+
If `autoreset` is true, at most one task will be released from `wait` for
511511
each call to `notify`.
512512
513513
This provides an acquire & release memory ordering on notify/wait.
@@ -578,11 +578,15 @@ end
578578
export Event
579579
end
580580

581+
const PerStateInitial = 0x00
582+
const PerStateHasrun = 0x01
583+
const PerStateErrored = 0x02
584+
const PerStateConcurrent = 0x03
581585

582586
"""
583-
PerProcess{T}
587+
OncePerProcess{T}(init::Function)() -> T
584588
585-
Calling a `PerProcess` object returns a value of type `T` by running the
589+
Calling a `OncePerProcess` object returns a value of type `T` by running the
586590
function `initializer` exactly once per process. All concurrent and future
587591
calls in the same process will return exactly the same value. This is useful in
588592
code that will be precompiled, as it allows setting up caches or other state
@@ -591,13 +595,14 @@ which won't get serialized.
591595
## Example
592596
593597
```jldoctest
594-
julia> const global_state = Base.PerProcess{Vector{UInt32}}() do
598+
julia> const global_state = Base.OncePerProcess{Vector{UInt32}}() do
595599
println("Making lazy global value...done.")
596600
return [Libc.rand()]
597601
end;
598602
599-
julia> procstate = global_state();
603+
julia> (procstate = global_state()) |> typeof
600604
Making lazy global value...done.
605+
Vector{UInt32}
601606
602607
julia> procstate === global_state()
603608
true
@@ -606,51 +611,51 @@ julia> procstate === fetch(@async global_state())
606611
true
607612
```
608613
"""
609-
mutable struct PerProcess{T, F}
610-
x::Union{Nothing,T}
614+
mutable struct OncePerProcess{T, F}
615+
value::Union{Nothing,T}
611616
@atomic state::UInt8 # 0=initial, 1=hasrun, 2=error
612617
@atomic allow_compile_time::Bool
613618
const initializer::F
614619
const lock::ReentrantLock
615620

616-
function PerProcess{T,F}(initializer::F) where {T, F}
617-
once = new{T,F}(nothing, 0x00, true, initializer, ReentrantLock())
621+
function OncePerProcess{T,F}(initializer::F) where {T, F}
622+
once = new{T,F}(nothing, PerStateInitial, true, initializer, ReentrantLock())
618623
ccall(:jl_set_precompile_field_replace, Cvoid, (Any, Any, Any),
619-
once, :x, nothing)
624+
once, :value, nothing)
620625
ccall(:jl_set_precompile_field_replace, Cvoid, (Any, Any, Any),
621-
once, :state, 0x00)
626+
once, :state, PerStateInitial)
622627
return once
623628
end
624629
end
625-
PerProcess{T}(initializer::F) where {T, F} = PerProcess{T, F}(initializer)
626-
PerProcess(initializer) = PerProcess{Base.promote_op(initializer), typeof(initializer)}(initializer)
627-
@inline function (once::PerProcess{T})() where T
630+
OncePerProcess{T}(initializer::F) where {T, F} = OncePerProcess{T, F}(initializer)
631+
OncePerProcess(initializer) = OncePerProcess{Base.promote_op(initializer), typeof(initializer)}(initializer)
632+
@inline function (once::OncePerProcess{T})() where T
628633
state = (@atomic :acquire once.state)
629-
if state != 0x01
634+
if state != PerStateHasrun
630635
(@noinline function init_perprocesss(once, state)
631-
state == 0x02 && error("PerProcess initializer failed previously")
636+
state == PerStateErrored && error("OncePerProcess initializer failed previously")
632637
once.allow_compile_time || __precompile__(false)
633638
lock(once.lock)
634639
try
635640
state = @atomic :monotonic once.state
636-
if state == 0x00
637-
once.x = once.initializer()
638-
elseif state == 0x02
639-
error("PerProcess initializer failed previously")
640-
elseif state != 0x01
641-
error("invalid state for PerProcess")
641+
if state == PerStateInitial
642+
once.value = once.initializer()
643+
elseif state == PerStateErrored
644+
error("OncePerProcess initializer failed previously")
645+
elseif state != PerStateHasrun
646+
error("invalid state for OncePerProcess")
642647
end
643648
catch
644-
state == 0x02 || @atomic :release once.state = 0x02
649+
state == PerStateErrored || @atomic :release once.state = PerStateErrored
645650
unlock(once.lock)
646651
rethrow()
647652
end
648-
state == 0x01 || @atomic :release once.state = 0x01
653+
state == PerStateHasrun || @atomic :release once.state = PerStateHasrun
649654
unlock(once.lock)
650655
nothing
651656
end)(once, state)
652657
end
653-
return once.x::T
658+
return once.value::T
654659
end
655660

656661
function copyto_monotonic!(dest::AtomicMemory, src)
@@ -659,7 +664,7 @@ function copyto_monotonic!(dest::AtomicMemory, src)
659664
if isassigned(src, j)
660665
@atomic :monotonic dest[i] = src[j]
661666
#else
662-
# _unsafeindex_atomic!(dest, i, src[j], :monotonic)
667+
# _unsetindex_atomic!(dest, i, src[j], :monotonic)
663668
end
664669
i += 1
665670
end
@@ -674,36 +679,38 @@ function fill_monotonic!(dest::AtomicMemory, x)
674679
end
675680

676681

677-
# share a lock, since we just need it briefly, so some contention is okay
682+
# share a lock/condition, since we just need it briefly, so some contention is okay
678683
const PerThreadLock = ThreadSynchronizer()
679684
"""
680-
PerThread{T}
685+
OncePerThread{T}(init::Function)() -> T
681686
682-
Calling a `PerThread` object returns a value of type `T` by running the function
687+
Calling a `OncePerThread` object returns a value of type `T` by running the function
683688
`initializer` exactly once per thread. All future calls in the same thread, and
684689
concurrent or future calls with the same thread id, will return exactly the
685690
same value. The object can also be indexed by the threadid for any existing
686691
thread, to get (or initialize *on this thread*) the value stored for that
687692
thread. Incorrect usage can lead to data-races or memory corruption so use only
688693
if that behavior is correct within your library's threading-safety design.
689694
690-
Warning: it is not necessarily true that a Task only runs on one thread, therefore the value
691-
returned here may alias other values or change in the middle of your program. This type may
692-
get deprecated in the future. If initializer yields, the thread running the current task
693-
after the call might not be the same as the one at the start of the call.
695+
!!! warning
696+
It is not necessarily true that a Task only runs on one thread, therefore the value
697+
returned here may alias other values or change in the middle of your program. This function
698+
may get deprecated in the future. If initializer yields, the thread running the current
699+
task after the call might not be the same as the one at the start of the call.
694700
695-
See also: [`PerTask`](@ref).
701+
See also: [`OncePerTask`](@ref).
696702
697703
## Example
698704
699705
```jldoctest
700-
julia> const thread_state = Base.PerThread{Vector{UInt32}}() do
706+
julia> const thread_state = Base.OncePerThread{Vector{UInt32}}() do
701707
println("Making lazy thread value...done.")
702708
return [Libc.rand()]
703709
end;
704710
705-
julia> threadvec = thread_state();
711+
julia> (threadvec = thread_state()) |> typeof
706712
Making lazy thread value...done.
713+
Vector{UInt32}
707714
708715
julia> threadvec === fetch(@async thread_state())
709716
true
@@ -712,12 +719,12 @@ julia> threadvec === thread_state[Threads.threadid()]
712719
true
713720
```
714721
"""
715-
mutable struct PerThread{T, F}
722+
mutable struct OncePerThread{T, F}
716723
@atomic xs::AtomicMemory{T} # values
717724
@atomic ss::AtomicMemory{UInt8} # states: 0=initial, 1=hasrun, 2=error, 3==concurrent
718725
const initializer::F
719726

720-
function PerThread{T,F}(initializer::F) where {T, F}
727+
function OncePerThread{T,F}(initializer::F) where {T, F}
721728
xs, ss = AtomicMemory{T}(), AtomicMemory{UInt8}()
722729
once = new{T,F}(xs, ss, initializer)
723730
ccall(:jl_set_precompile_field_replace, Cvoid, (Any, Any, Any),
@@ -727,29 +734,30 @@ mutable struct PerThread{T, F}
727734
return once
728735
end
729736
end
730-
PerThread{T}(initializer::F) where {T, F} = PerThread{T,F}(initializer)
731-
PerThread(initializer) = PerThread{Base.promote_op(initializer), typeof(initializer)}(initializer)
732-
@inline function getindex(once::PerThread, tid::Integer)
737+
OncePerThread{T}(initializer::F) where {T, F} = OncePerThread{T,F}(initializer)
738+
OncePerThread(initializer) = OncePerThread{Base.promote_op(initializer), typeof(initializer)}(initializer)
739+
@inline (once::OncePerThread)() = once[Threads.threadid()]
740+
@inline function getindex(once::OncePerThread, tid::Integer)
733741
tid = Int(tid)
734742
ss = @atomic :acquire once.ss
735743
xs = @atomic :monotonic once.xs
736744
# n.b. length(xs) >= length(ss)
737-
if tid > length(ss) || (@atomic :acquire ss[tid]) != 0x01
745+
if tid <= 0 || tid > length(ss) || (@atomic :acquire ss[tid]) != PerStateHasrun
738746
(@noinline function init_perthread(once, tid)
739-
local xs = @atomic :acquire once.xs
740-
local ss = @atomic :monotonic once.ss
747+
local ss = @atomic :acquire once.ss
748+
local xs = @atomic :monotonic once.xs
741749
local len = length(ss)
742750
# slow path to allocate it
743751
nt = Threads.maxthreadid()
744-
0 < tid <= nt || ArgumentError("thread id outside of allocated range")
745-
if tid <= length(ss) && (@atomic :acquire ss[tid]) == 0x02
746-
error("PerThread initializer failed previously")
752+
0 < tid <= nt || throw(ArgumentError("thread id outside of allocated range"))
753+
if tid <= length(ss) && (@atomic :acquire ss[tid]) == PerStateErrored
754+
error("OncePerThread initializer failed previously")
747755
end
748756
newxs = xs
749757
newss = ss
750758
if tid > len
751759
# attempt to do all allocations outside of PerThreadLock for better scaling
752-
@assert length(xs) == length(ss) "logical constraint violation"
760+
@assert length(xs) >= length(ss) "logical constraint violation"
753761
newxs = typeof(xs)(undef, len + nt)
754762
newss = typeof(ss)(undef, len + nt)
755763
end
@@ -759,30 +767,30 @@ PerThread(initializer) = PerThread{Base.promote_op(initializer), typeof(initiali
759767
ss = @atomic :monotonic once.ss
760768
xs = @atomic :monotonic once.xs
761769
if tid > length(ss)
762-
@assert length(ss) >= len && newxs !== xs && newss != ss "logical constraint violation"
763-
fill_monotonic!(newss, 0x00)
770+
@assert len <= length(ss) <= length(newss) "logical constraint violation"
771+
fill_monotonic!(newss, PerStateInitial)
764772
xs = copyto_monotonic!(newxs, xs)
765773
ss = copyto_monotonic!(newss, ss)
766774
@atomic :release once.xs = xs
767775
@atomic :release once.ss = ss
768776
end
769777
state = @atomic :monotonic ss[tid]
770-
while state == 0x04
778+
while state == PerStateConcurrent
771779
# lost race, wait for notification this is done running elsewhere
772780
wait(PerThreadLock) # wait for initializer to finish without releasing this thread
773781
ss = @atomic :monotonic once.ss
774-
state = @atomic :monotonic ss[tid] == 0x04
782+
state = @atomic :monotonic ss[tid]
775783
end
776-
if state == 0x00
784+
if state == PerStateInitial
777785
# won the race, drop lock in exchange for state, and run user initializer
778-
@atomic :monotonic ss[tid] = 0x04
786+
@atomic :monotonic ss[tid] = PerStateConcurrent
779787
result = try
780788
unlock(PerThreadLock)
781789
once.initializer()
782790
catch
783791
lock(PerThreadLock)
784792
ss = @atomic :monotonic once.ss
785-
@atomic :release ss[tid] = 0x02
793+
@atomic :release ss[tid] = PerStateErrored
786794
notify(PerThreadLock)
787795
rethrow()
788796
end
@@ -791,12 +799,12 @@ PerThread(initializer) = PerThread{Base.promote_op(initializer), typeof(initiali
791799
xs = @atomic :monotonic once.xs
792800
@atomic :release xs[tid] = result
793801
ss = @atomic :monotonic once.ss
794-
@atomic :release ss[tid] = 0x01
802+
@atomic :release ss[tid] = PerStateHasrun
795803
notify(PerThreadLock)
796-
elseif state == 0x02
797-
error("PerThread initializer failed previously")
798-
elseif state != 0x01
799-
error("invalid state for PerThread")
804+
elseif state == PerStateErrored
805+
error("OncePerThread initializer failed previously")
806+
elseif state != PerStateHasrun
807+
error("invalid state for OncePerThread")
800808
end
801809
finally
802810
unlock(PerThreadLock)
@@ -807,26 +815,26 @@ PerThread(initializer) = PerThread{Base.promote_op(initializer), typeof(initiali
807815
end
808816
return xs[tid]
809817
end
810-
@inline (once::PerThread)() = once[Threads.threadid()]
811818

812819
"""
813-
PerTask{T}
820+
OncePerTask{T}
814821
815-
Calling a `PerTask` object returns a value of type `T` by running the function `initializer`
822+
Calling a `OncePerTask` object returns a value of type `T` by running the function `initializer`
816823
exactly once per Task. All future calls in the same Task will return exactly the same value.
817824
818825
See also: [`task_local_storage`](@ref).
819826
820827
## Example
821828
822829
```jldoctest
823-
julia> const task_state = Base.PerTask{Vector{UInt32}}() do
830+
julia> const task_state = Base.OncePerTask{Vector{UInt32}}() do
824831
println("Making lazy task value...done.")
825832
return [Libc.rand()]
826833
end;
827834
828-
julia> taskvec = task_state();
835+
julia> (taskvec = task_state()) |> typeof
829836
Making lazy task value...done.
837+
Vector{UInt32}
830838
831839
julia> taskvec === task_state()
832840
true
@@ -836,11 +844,11 @@ Making lazy task value...done.
836844
false
837845
```
838846
"""
839-
mutable struct PerTask{T, F}
847+
mutable struct OncePerTask{T, F}(init::Function)() -> T
840848
const initializer::F
841849

842-
PerTask{T}(initializer::F) where {T, F} = new{T,F}(initializer)
843-
PerTask{T,F}(initializer::F) where {T, F} = new{T,F}(initializer)
844-
PerTask(initializer) = new{Base.promote_op(initializer), typeof(initializer)}(initializer)
850+
OncePerTask{T}(initializer::F) where {T, F} = new{T,F}(initializer)
851+
OncePerTask{T,F}(initializer::F) where {T, F} = new{T,F}(initializer)
852+
OncePerTask(initializer) = new{Base.promote_op(initializer), typeof(initializer)}(initializer)
845853
end
846-
@inline (once::PerTask)() = get!(once.initializer, task_local_storage(), once)
854+
@inline (once::OncePerTask)() = get!(once.initializer, task_local_storage(), once)

doc/src/base/base.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,9 @@ Main.include
3434
Base.include_string
3535
Base.include_dependency
3636
__init__
37-
Base.PerProcess
38-
Base.PerTask
39-
Base.PerThread
37+
Base.OncePerProcess
38+
Base.OncePerTask
39+
Base.OncePerThread
4040
Base.which(::Any, ::Any)
4141
Base.methods
4242
Base.@show

test/precompile.jl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ precompile_test_harness(false) do dir
9696
struct GAPGroupHomomorphism{A, B} <: AbstractAlgebraMap{GAPGroupHomomorphism{B, A}} end
9797
9898
global process_state_calls::Int = 0
99-
const process_state = Base.PerProcess{typeof(getpid())}() do
99+
const process_state = Base.OncePerProcess{typeof(getpid())}() do
100100
@assert (global process_state_calls += 1) == 1
101101
return getpid()
102102
end

0 commit comments

Comments
 (0)