diff --git a/spec/std/fiber/execution_context/global_queue_spec.cr b/spec/std/fiber/execution_context/global_queue_spec.cr index cc409c45efc6..f2f9023f1d83 100644 --- a/spec/std/fiber/execution_context/global_queue_spec.cr +++ b/spec/std/fiber/execution_context/global_queue_spec.cr @@ -132,85 +132,90 @@ describe Fiber::ExecutionContext::GlobalQueue do fibers.each { |fc| fc.counter.should eq(increments) } end - it "bulk operations" do - n = 7 - increments = 15 + {% if flag?(:darwin) %} + # FIXME: the spec regularly fails on macOS with "expected 15 got 0" + pending "bulk operations" + {% else %} + it "bulk operations" do + n = 7 + increments = 15 + + fibers = StaticArray(Fiber::ExecutionContext::FiberCounter, 765).new do |i| # 765 can be divided by 3 and 5 + Fiber::ExecutionContext::FiberCounter.new(new_fake_fiber("f#{i}")) + end - fibers = StaticArray(Fiber::ExecutionContext::FiberCounter, 765).new do |i| # 765 can be divided by 3 and 5 - Fiber::ExecutionContext::FiberCounter.new(new_fake_fiber("f#{i}")) - end + queue = Fiber::ExecutionContext::GlobalQueue.new(Thread::Mutex.new) + ready = Thread::WaitGroup.new(n) - queue = Fiber::ExecutionContext::GlobalQueue.new(Thread::Mutex.new) - ready = Thread::WaitGroup.new(n) + threads = Array(Thread).new(n) do |i| + new_thread("BULK-#{i}") do + slept = 0 - threads = Array(Thread).new(n) do |i| - new_thread("BULK-#{i}") do - slept = 0 + r = Fiber::ExecutionContext::Runnables(3).new(queue) - r = Fiber::ExecutionContext::Runnables(3).new(queue) + batch = Fiber::List.new + size = 0 - batch = Fiber::List.new - size = 0 + reenqueue = -> { + if size > 0 + queue.bulk_push(pointerof(batch)) + names = [] of String? + batch.each { |f| names << f.name } + batch.clear + size = 0 + end + } - reenqueue = -> { - if size > 0 - queue.bulk_push(pointerof(batch)) - names = [] of String? - batch.each { |f| names << f.name } - batch.clear - size = 0 - end - } + execute = ->(fiber : Fiber) { + fc = fibers.find! { |x| x.@fiber == fiber } - execute = ->(fiber : Fiber) { - fc = fibers.find! { |x| x.@fiber == fiber } + if fc.increment < increments + batch.push(fc.@fiber) + size += 1 + end + } - if fc.increment < increments - batch.push(fc.@fiber) - size += 1 - end - } + ready.done - ready.done + loop do + if fiber = r.shift? + execute.call(fiber) + slept = 0 + next + end - loop do - if fiber = r.shift? - execute.call(fiber) - slept = 0 - next - end + if fiber = queue.grab?(r, 1) + reenqueue.call + execute.call(fiber) + slept = 0 + next + end - if fiber = queue.grab?(r, 1) - reenqueue.call - execute.call(fiber) - slept = 0 - next - end + if slept >= 100 + break + end - if slept >= 100 - break + reenqueue.call + slept += 1 + Thread.sleep(1.nanosecond) # don't burn CPU end - - reenqueue.call - slept += 1 - Thread.sleep(1.nanosecond) # don't burn CPU end end - end - ready.wait - - # enqueue in batches of 5 - 0.step(to: fibers.size - 1, by: 5) do |i| - list = Fiber::List.new - 5.times { |j| list.push(fibers[i + j].@fiber) } - queue.bulk_push(pointerof(list)) - Thread.sleep(10.nanoseconds) if i % 4 == 3 - end + ready.wait + + # enqueue in batches of 5 + 0.step(to: fibers.size - 1, by: 5) do |i| + list = Fiber::List.new + 5.times { |j| list.push(fibers[i + j].@fiber) } + queue.bulk_push(pointerof(list)) + Thread.sleep(10.nanoseconds) if i % 4 == 3 + end - threads.each(&.join) + threads.each(&.join) - # must have dequeued each fiber exactly X times (no less, no more) - fibers.each { |fc| fc.counter.should eq(increments) } - end + # must have dequeued each fiber exactly X times (no less, no more) + fibers.each { |fc| fc.counter.should eq(increments) } + end + {% end %} end end