Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 68 additions & 63 deletions spec/std/fiber/execution_context/global_queue_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -132,85 +132,90 @@ describe Fiber::ExecutionContext::GlobalQueue do
fibers.each { |fc| fc.counter.should eq(increments) }
end

it "bulk operations" do
n = 7
increments = 15
{% if flag?(:darwin) %}
# FIXME: the spec regularly fails on macOS with "expected 15 got 0"
pending "bulk operations"
{% else %}
it "bulk operations" do
n = 7
increments = 15

fibers = StaticArray(Fiber::ExecutionContext::FiberCounter, 765).new do |i| # 765 can be divided by 3 and 5
Fiber::ExecutionContext::FiberCounter.new(new_fake_fiber("f#{i}"))
end

fibers = StaticArray(Fiber::ExecutionContext::FiberCounter, 765).new do |i| # 765 can be divided by 3 and 5
Fiber::ExecutionContext::FiberCounter.new(new_fake_fiber("f#{i}"))
end
queue = Fiber::ExecutionContext::GlobalQueue.new(Thread::Mutex.new)
ready = Thread::WaitGroup.new(n)

queue = Fiber::ExecutionContext::GlobalQueue.new(Thread::Mutex.new)
ready = Thread::WaitGroup.new(n)
threads = Array(Thread).new(n) do |i|
new_thread("BULK-#{i}") do
slept = 0

threads = Array(Thread).new(n) do |i|
new_thread("BULK-#{i}") do
slept = 0
r = Fiber::ExecutionContext::Runnables(3).new(queue)

r = Fiber::ExecutionContext::Runnables(3).new(queue)
batch = Fiber::List.new
size = 0

batch = Fiber::List.new
size = 0
reenqueue = -> {
if size > 0
queue.bulk_push(pointerof(batch))
names = [] of String?
batch.each { |f| names << f.name }
batch.clear
size = 0
end
}

reenqueue = -> {
if size > 0
queue.bulk_push(pointerof(batch))
names = [] of String?
batch.each { |f| names << f.name }
batch.clear
size = 0
end
}
execute = ->(fiber : Fiber) {
fc = fibers.find! { |x| x.@fiber == fiber }

execute = ->(fiber : Fiber) {
fc = fibers.find! { |x| x.@fiber == fiber }
if fc.increment < increments
batch.push(fc.@fiber)
size += 1
end
}

if fc.increment < increments
batch.push(fc.@fiber)
size += 1
end
}
ready.done

ready.done
loop do
if fiber = r.shift?
execute.call(fiber)
slept = 0
next
end

loop do
if fiber = r.shift?
execute.call(fiber)
slept = 0
next
end
if fiber = queue.grab?(r, 1)
reenqueue.call
execute.call(fiber)
slept = 0
next
end

if fiber = queue.grab?(r, 1)
reenqueue.call
execute.call(fiber)
slept = 0
next
end
if slept >= 100
break
end

if slept >= 100
break
reenqueue.call
slept += 1
Thread.sleep(1.nanosecond) # don't burn CPU
end

reenqueue.call
slept += 1
Thread.sleep(1.nanosecond) # don't burn CPU
end
end
end
ready.wait

# enqueue in batches of 5
0.step(to: fibers.size - 1, by: 5) do |i|
list = Fiber::List.new
5.times { |j| list.push(fibers[i + j].@fiber) }
queue.bulk_push(pointerof(list))
Thread.sleep(10.nanoseconds) if i % 4 == 3
end
ready.wait

# enqueue in batches of 5
0.step(to: fibers.size - 1, by: 5) do |i|
list = Fiber::List.new
5.times { |j| list.push(fibers[i + j].@fiber) }
queue.bulk_push(pointerof(list))
Thread.sleep(10.nanoseconds) if i % 4 == 3
end

threads.each(&.join)
threads.each(&.join)

# must have dequeued each fiber exactly X times (no less, no more)
fibers.each { |fc| fc.counter.should eq(increments) }
end
# must have dequeued each fiber exactly X times (no less, no more)
fibers.each { |fc| fc.counter.should eq(increments) }
end
{% end %}
end
end