Skip to content

Commit

Permalink
api: fix INIT state stuck
Browse files Browse the repository at this point in the history
Sometimes, instance could enter the queue initialization
while still not running (for example, left in the orphan mode).
This resulted in "lazy start". But Tarantool does not call
`box.cfg {}` after leaving orphan mode, so queue could stuck in the
`INIT` state.

Now we wait in the background for instances, that are not running.
It is similar to lazy init for read-only instances.

Note that this fix works only for Tarantool versions >= 2.10.0.
This is because of used watchers.

Closes #226
  • Loading branch information
DerekBum committed Apr 11, 2024
1 parent 5f2b145 commit 8a440ed
Show file tree
Hide file tree
Showing 3 changed files with 138 additions and 4 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,13 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).

## [Unreleased]

### Fixed

- Stuck in `INIT` state if an instance failed to enter the `running` mode
in time (#226). This fix works only for Tarantool versions >= 2.10.0.

## [1.3.3] - 2023-09-13

### Fixed
Expand Down
33 changes: 29 additions & 4 deletions queue/init.lua
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
local fiber = require('fiber')

local abstract = require('queue.abstract')
local queue_state = require('queue.abstract.queue_state')
local qc = require('queue.compat')
local queue = nil

-- load all core drivers
Expand All @@ -11,6 +14,10 @@ local core_drivers = {
limfifottl = require('queue.abstract.driver.limfifottl')
}

-- since:
-- https://github.com/locker/tarantool/commit/8cf5151cb4f05cee3fd0ea831add2b3187a01fe4
local watchers_supported = qc.check_version({2, 10, 0})

local function register_driver(driver_name, tube_ctr)
if type(tube_ctr.create_space) ~= 'function' or
type(tube_ctr.new) ~= 'function' then
Expand Down Expand Up @@ -62,6 +69,19 @@ local orig_call = nil

local wrapper_impl

local function running_waiter()
fiber.name('queue running waiter')
local wait_cond = fiber.cond()
local w = box.watch('box.status', function(_, new_status)
if new_status.status == 'running' then
wait_cond:signal()
end
end)
wait_cond:wait()
w:unregister()
return wrapper_impl()
end

local function cfg_wrapper(...)
box.cfg = orig_cfg
return wrapper_impl(...)
Expand All @@ -79,10 +99,15 @@ local function wrap_box_cfg()
orig_cfg = box.cfg
box.cfg = cfg_wrapper
elseif type(box.cfg) == 'table' then
-- box.cfg after the first box.cfg call
local cfg_mt = getmetatable(box.cfg)
orig_call = cfg_mt.__call
cfg_mt.__call = cfg_call_wrapper
if watchers_supported and box.info.status ~= 'running' then
-- Wait for the running state and initialize the queue.
fiber.new(running_waiter)
else
-- box.cfg after the first box.cfg call
local cfg_mt = getmetatable(box.cfg)
orig_call = cfg_mt.__call
cfg_mt.__call = cfg_call_wrapper
end
else
error('The box.cfg type is unexpected: ' .. type(box.cfg))
end
Expand Down
102 changes: 102 additions & 0 deletions t/230-orphan-not-stalling-init.t
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
#!/usr/bin/env tarantool

local test = require('tap').test('')
local queue = require('queue')
local tnt = require('t.tnt')
local fio = require('fio')
local fiber = require('fiber')

rawset(_G, 'queue', require('queue'))

local qc = require('queue.compat')
if not qc.check_version({2, 10, 0}) then
require('log').info('Tests skipped, tarantool version < 2.10.0')
return
end

local snapdir_optname = qc.snapdir_optname
local logger_optname = qc.logger_optname

test:plan(1)

test:test('Check orphan mode not stalling queue', function(test)
test:plan(4)
local engine = os.getenv('ENGINE') or 'memtx'
tnt.cluster.cfg{}

local dir_replica = fio.tempdir()
local cmd_replica = {
arg[-1],
'-e',
[[
box.cfg {
replication = {
'replicator:[email protected]:3399',
'replicator:[email protected]:3398',
},
listen = '127.0.0.1:3396',
wal_dir = ']] .. dir_replica .. '\'' ..
',' .. snapdir_optname() .. ' = \'' .. dir_replica .. '\'' ..
',' .. logger_optname() .. ' = \'' ..
fio.pathjoin(dir_replica, 'tarantool.log') .. '\'' ..
'}'
}

replica = require('popen').new(cmd_replica, {
stdin = 'devnull',
stdout = 'devnull',
stderr = 'devnull',
})

local attempts = 0
-- Wait for replica to connect.
while box.info.replication[3] == nil or box.info.replication[3].downstream.status ~= 'follow' do
attempts = attempts + 1
if attempts == 30 then
error('wait for replica connection')
end
fiber.sleep(0.1)
end

local conn = require('net.box').connect('127.0.0.1:3396')

conn:eval([[
box.cfg{
replication = {
'replicator:[email protected]:3399',
'replicator:[email protected]:3398',
'replicator:[email protected]:3396',
},
listen = '127.0.0.1:3397',
replication_connect_quorum = 4,
}
]])

conn:eval('rawset(_G, "queue", require("queue"))')

test:is(conn:call('queue.state'), 'INIT', 'check queue state')
test:is(conn:call('box.info').ro, true, 'check read only')
test:is(conn:call('box.info').ro_reason, 'orphan', 'check ro reason')

conn:eval('box.cfg{replication_connect_quorum = 2}')

local attempts = 0
while true do
if conn:call('queue.state') == 'RUNNING' then
test:is(conn:call('queue.state'), 'RUNNING',
'check queue state after orphan')
return
end
attempts = attempts + 1
if attempts == 10 then
break
end
fiber.sleep(0.1)
end
test:is(conn:call('queue.state'), 'RUNNING', 'check queue state after orphan')
end)

rawset(_G, 'queue', nil)
tnt.finish()
os.exit(test:check() and 0 or 1)
-- vim: set ft=lua :

0 comments on commit 8a440ed

Please sign in to comment.