Skip to content

Commit

Permalink
api: fix INIT state stuck
Browse files Browse the repository at this point in the history
Sometimes, instance could enter the queue initialization
while still not running (for example, left in the orphan mode).
This resulted in "lazy start". But Tarantool does not call
`box.cfg {}` after leaving orphan mode, so queue could stuck in the
`INIT` state.

Now we wait in the background for instances, that are not running.
It is similar to lazy init for read-only instances.

Note that this fix works only for Tarantool versions >= 2.10.0.
This is because of used watchers.

Closes #226
  • Loading branch information
DerekBum committed Apr 9, 2024
1 parent 5f2b145 commit 5ebc059
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 4 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,13 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).

## [Unreleased]

### Fixed

- Stuck in `INIT` state if an instance failed to enter the `running` mode
in time (#226). This fix works only for Tarantool versions >= 2.10.0.

## [1.3.3] - 2023-09-13

### Fixed
Expand Down
31 changes: 27 additions & 4 deletions queue/init.lua
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
local fiber = require('fiber')

local abstract = require('queue.abstract')
local queue_state = require('queue.abstract.queue_state')
local queue = nil
Expand All @@ -11,6 +13,10 @@ local core_drivers = {
limfifottl = require('queue.abstract.driver.limfifottl')
}

-- since:
-- https://github.com/locker/tarantool/commit/8cf5151cb4f05cee3fd0ea831add2b3187a01fe4
local watchers_supported = box.watch ~= nil

local function register_driver(driver_name, tube_ctr)
if type(tube_ctr.create_space) ~= 'function' or
type(tube_ctr.new) ~= 'function' then
Expand Down Expand Up @@ -62,6 +68,18 @@ local orig_call = nil

local wrapper_impl

local function running_waiter()
local wait_cond = fiber.cond()
local w = box.watch('box.status', function(_, new_status)
if new_status.status == 'running' then
wait_cond:signal()
end
end)
wait_cond:wait()
w:unregister()
return wrapper_impl()
end

local function cfg_wrapper(...)
box.cfg = orig_cfg
return wrapper_impl(...)
Expand All @@ -79,10 +97,15 @@ local function wrap_box_cfg()
orig_cfg = box.cfg
box.cfg = cfg_wrapper
elseif type(box.cfg) == 'table' then
-- box.cfg after the first box.cfg call
local cfg_mt = getmetatable(box.cfg)
orig_call = cfg_mt.__call
cfg_mt.__call = cfg_call_wrapper
if watchers_supported and box.info.status ~= 'running' then
-- Wait for the running state and initialize the queue.
fiber.new(running_waiter)
else
-- box.cfg after the first box.cfg call
local cfg_mt = getmetatable(box.cfg)
orig_call = cfg_mt.__call
cfg_mt.__call = cfg_call_wrapper
end
else
error('The box.cfg type is unexpected: ' .. type(box.cfg))
end
Expand Down

0 comments on commit 5ebc059

Please sign in to comment.