Skip to content

Commit

Permalink
utube: fix slow take on busy utubes
Browse files Browse the repository at this point in the history
If some of the utube for tasks at the top of the queue were busy
most of the time, `take` would slow down for every other task.
This problem is fixed by creating a new space `space_ready`. It
contains first task with `READY` status from each utube.

This solution shows great results for the stated problem, with the cost
of slowing the `put` method (it is ~3 times slower). Thus, this workaround is
disabled by default. To enable it, user should set the `v2 = true` as an
option while creating the tube. As example:
```lua
local test_queue = queue.create_tube('test_queue', 'utube',
        {temporary = true, v2 = true})
```

Part of #228
  • Loading branch information
DerekBum committed Apr 28, 2024
1 parent aa7c092 commit 7827b24
Show file tree
Hide file tree
Showing 4 changed files with 275 additions and 0 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,16 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.

## [Unreleased]

### Added
- `v2` boolean option for creating a `utube` tube (#228). It enables the
workaround for slow takes while working with busy tubes.

### Fixed

- Stuck in `INIT` state if an instance failed to enter the `running` mode
in time (#226). This fix works only for Tarantool versions >= 2.10.0.
- Slow takes on busy `utube` tubes (#228). The workaround could be enabled by
passing the `v2 = true` option while creating the tube.

## [1.3.3] - 2023-09-13

Expand Down
89 changes: 89 additions & 0 deletions queue/abstract/driver/utube.lua
Original file line number Diff line number Diff line change
Expand Up @@ -53,17 +53,35 @@ function tube.create_space(space_name, opts)
type = 'tree',
parts = {2, str_type(), 3, str_type(), 1, num_type()}
})
space.v2 = opts.v2
return space
end

-- start tube on space
function tube.new(space, on_task_change)
validate_space(space)

local space_ready_name = space.name .. "_utube_ready"
local space_ready = box.space[space_ready_name]
if space.v2 and not space_ready then
-- Create a space for first ready tasks from each utube.
space_ready = box.schema.create_space(space_ready_name, space_opts)
space_ready:create_index('task_id', {
type = 'tree',
parts = {1, num_type()}
})
space_ready:create_index('utube', {
type = 'tree',
parts = {2, str_type()}
})
end

on_task_change = on_task_change or (function() end)
local self = setmetatable({
space = space,
space_ready = space_ready,
on_task_change = on_task_change,
v2 = space.v2 or false,
}, { __index = method })
return self
end
Expand All @@ -73,6 +91,13 @@ function method.normalize_task(self, task)
return task and task:transform(3, 1)
end

local function put_ready(self, id, utube)
local added = self.space_ready.index.utube:get{utube}
if added == nil then
self.space_ready:insert{id, utube}
end
end

-- put task in space
function method.put(self, data, opts)
local max
Expand All @@ -98,12 +123,61 @@ function method.put(self, data, opts)

local id = max and max[1] + 1 or 0
local task = self.space:insert{id, state.READY, tostring(opts.utube), data}
if self.v2 then
put_ready(self, id, task[3])
end
self.on_task_change(task, 'put')
return task
end

local function take_ready(self)
for s, task_ready in self.space_ready:pairs({}, { iterator = 'GE' }) do
local id = task_ready[1]
local commit_requirements = box.cfg.memtx_use_mvcc_engine and
(not box.is_in_txn())
local task

if commit_requirements then
box.begin({txn_isolation = 'read-committed'})
task = self.space:get(id)
box.commit()
else
task = self.space:get(id)
end

if task[2] == state.READY then
local taken

if commit_requirements then
box.begin({txn_isolation = 'read-committed'})
taken = self.space.index.utube:min{state.TAKEN, task[3]}
box.commit()
else
taken = self.space.index.utube:min{state.TAKEN, task[3]}
end

if taken == nil or taken[2] ~= state.TAKEN then
task = self.space:update(id, { { '=', 2, state.TAKEN } })

self.space_ready:delete(id)
local ready = self.space.index.utube:min{state.READY, task[3]}
if ready ~= nil and ready[2] == state.READY then
self.space_ready:insert{ready.task_id, tostring(task[3])}
end

self.on_task_change(task, 'take')
return task
end
end
end
end

-- take task
function method.take(self)
if self.v2 then
return take_ready(self)
end

for s, task in self.space.index.status:pairs(state.READY,
{ iterator = 'GE' }) do
if task[2] ~= state.READY then
Expand Down Expand Up @@ -141,11 +215,26 @@ function method.touch(self, id, ttr)
error('utube queue does not support touch')
end

local function delete_ready(self, id, utube)
self.space_ready:delete(id)
local next_task = self.space.index.utube:min{state.READY, utube}
if next_task ~= nil then
local added = self.space_ready.index.utube:get{next_task[3]}
if added == nil then
self.space_ready:insert{next_task[1], next_task[3]}
end
end
end

-- delete task
function method.delete(self, id)
local task = self.space:get(id)
self.space:delete(id)
if task ~= nil then
if self.v2 then
delete_ready(self, id, task[3])
end

task = task:transform(2, 1, state.DONE)

local neighbour = self.space.index.utube:min{state.READY, task[3]}
Expand Down
94 changes: 94 additions & 0 deletions t/benchmark/busy_utubes.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
#!/usr/bin/env tarantool

local clock = require('clock')
local os = require('os')
local fiber = require('fiber')
local queue = require('queue')

-- Set the number of consumers.
local consumers_count = 10
-- Set the number of tasks processed by one consumer per iteration.
local batch_size = 150000

local barrier = fiber.cond()
local wait_count = 0

box.cfg()

local test_queue = queue.create_tube('test_queue', 'utube',
{temporary = true, v2 = true})

local function prepare_tasks()
local test_data = 'test data'

for i = 1, consumers_count do
for _ = 1, batch_size do
test_queue:put(test_data, {utube = tostring(i)})
end
end
end

local function prepare_consumers()
local consumers = {}

-- Make half the utubes busy.
for _ = 1, consumers_count / 2 do
test_queue:take()
end

for i = 1, consumers_count / 2 do
consumers[i] = fiber.create(function()
wait_count = wait_count + 1
-- Wait for all consumers to start.
barrier:wait()

-- Ack the tasks.
for _ = 1, batch_size do
local task = test_queue:take()
test_queue:ack(task[1])
end

wait_count = wait_count + 1
end)
end

return consumers
end

local function multi_consumer_bench()
--- Wait for all consumer fibers.
local wait_all = function()
while (wait_count ~= consumers_count / 2) do
fiber.yield()
end
wait_count = 0
end

fiber.set_max_slice(100)

prepare_tasks()

-- Wait for all consumers to start.
local consumers = prepare_consumers()
wait_all()

-- Start timing of task confirmation.
local start_ack_time = clock.proc64()
barrier:broadcast()
-- Wait for all tasks to be acked.
wait_all()
-- Complete the timing of task confirmation.
local complete_time = clock.proc64()

-- Print the result in milliseconds.
print(string.format("Time it takes to confirm the tasks: %i",
tonumber((complete_time - start_ack_time) / 10^6)))
end

-- Start benchmark.
multi_consumer_bench()

-- Cleanup.
test_queue:drop()

os.exit(0)
86 changes: 86 additions & 0 deletions t/benchmark/many_utubes.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
#!/usr/bin/env tarantool

local clock = require('clock')
local os = require('os')
local fiber = require('fiber')
local queue = require('queue')

-- Set the number of consumers.
local consumers_count = 30000

local barrier = fiber.cond()
local wait_count = 0

box.cfg()

local test_queue = queue.create_tube('test_queue', 'utube',
{temporary = true, v2 = true})

local function prepare_tasks()
local test_data = 'test data'

for i = 1, consumers_count do
test_queue:put(test_data, {utube = tostring(i)})
end
end

local function prepare_consumers()
local consumers = {}

for i = 1, consumers_count do
consumers[i] = fiber.create(function()
wait_count = wait_count + 1
-- Wait for all consumers to start.
barrier:wait()

-- Ack the task.
local task = test_queue:take()
test_queue:ack(task[1])

wait_count = wait_count + 1
end)
end

return consumers
end

local function multi_consumer_bench()
--- Wait for all consumer fibers.
local wait_all = function()
while (wait_count ~= consumers_count) do
fiber.yield()
end
wait_count = 0
end

fiber.set_max_slice(100)

-- Wait for all consumers to start.
local consumers = prepare_consumers()
wait_all()

-- Start timing creation of tasks.
local start_put_time = clock.proc64()
prepare_tasks()
-- Start timing of task confirmation.
local start_ack_time = clock.proc64()
barrier:broadcast()
-- Wait for all tasks to be acked.
wait_all()
-- Complete the timing of task confirmation.
local complete_time = clock.proc64()

-- Print results in milliseconds.
print(string.format("Time it takes to fill the queue: %i",
tonumber((start_ack_time - start_put_time) / 10^6)))
print(string.format("Time it takes to confirm the tasks: %i",
tonumber((complete_time - start_ack_time) / 10^6)))
end

-- Start benchmark.
multi_consumer_bench()

-- Cleanup.
test_queue:drop()

os.exit(0)

0 comments on commit 7827b24

Please sign in to comment.