Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLASH-463]Flush regions into DM as soon as raft commands processed #316

Merged
merged 9 commits into from
Nov 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions dbms/src/Interpreters/Context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1411,12 +1411,13 @@ void Context::createTMTContext(const std::vector<std::string> & pd_addrs,
const std::unordered_set<std::string> & ignore_databases,
const std::string & kvstore_path,
const std::string & flash_service_address,
::TiDB::StorageEngine engine)
::TiDB::StorageEngine engine,
bool disable_bg_flush)
{
auto lock = getLock();
if (shared->tmt_context)
throw Exception("TMTContext has already existed", ErrorCodes::LOGICAL_ERROR);
shared->tmt_context = std::make_shared<TMTContext>(*this, pd_addrs, learner_key, learner_value, ignore_databases, kvstore_path, flash_service_address, engine);
shared->tmt_context = std::make_shared<TMTContext>(*this, pd_addrs, learner_key, learner_value, ignore_databases, kvstore_path, flash_service_address, engine, disable_bg_flush);
}

void Context::initializePartPathSelector(std::vector<std::string> && all_normal_path, std::vector<std::string> && all_fast_path)
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Interpreters/Context.h
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,8 @@ class Context
const std::unordered_set<std::string> & ignore_databases,
const std::string & kvstore_path,
const std::string & flash_service_address,
::TiDB::StorageEngine engine);
::TiDB::StorageEngine engine,
bool disable_bg_tasks);
RaftService & getRaftService();

void initializeSchemaSyncService();
Expand Down
70 changes: 43 additions & 27 deletions dbms/src/Raft/RaftService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,32 +30,40 @@ RaftService::RaftService(DB::Context & db_context_)
},
false);

table_flush_handle = background_pool.addTask([this] {
auto & tmt = db_context.getTMTContext();
RegionTable & region_table = tmt.getRegionTable();
if (!db_context.getTMTContext().disableBgFlush())
{

// if all regions of table is removed, try to optimize data.
if (auto table_id = region_table.popOneTableToOptimize(); table_id != InvalidTableID)
{
LOG_INFO(log, "try to final optimize table " << table_id);
tryOptimizeStorageFinal(db_context, table_id);
}
return region_table.tryFlushRegions();
});
table_flush_handle = background_pool.addTask([this] {
auto & tmt = db_context.getTMTContext();
RegionTable & region_table = tmt.getRegionTable();

region_flush_handle = background_pool.addTask([this] {
RegionID region_id;
{
std::lock_guard<std::mutex> lock(region_mutex);
if (regions_to_flush.empty())
return false;
region_id = regions_to_flush.front();
regions_to_flush.pop();
}
RegionTable & region_table = db_context.getTMTContext().getRegionTable();
region_table.tryFlushRegion(region_id);
return true;
});
// if all regions of table is removed, try to optimize data.
if (auto table_id = region_table.popOneTableToOptimize(); table_id != InvalidTableID)
{
LOG_INFO(log, "try to final optimize table " << table_id);
tryOptimizeStorageFinal(db_context, table_id);
}
return region_table.tryFlushRegions();
});

region_flush_handle = background_pool.addTask([this] {
RegionID region_id;
{
std::lock_guard<std::mutex> lock(region_mutex);
if (regions_to_flush.empty())
return false;
region_id = regions_to_flush.front();
regions_to_flush.pop();
}
RegionTable & region_table = db_context.getTMTContext().getRegionTable();
region_table.tryFlushRegion(region_id);
return true;
});
}
else
{
LOG_INFO(log, "Configuration raft.disable_bg_flush is set to true, background flush tasks are disabled.");
}

region_decode_handle = background_pool.addTask([this] {
RegionPtr region;
Expand Down Expand Up @@ -85,11 +93,19 @@ RaftService::RaftService(DB::Context & db_context_)

void RaftService::addRegionToFlush(const Region & region)
{
if (!db_context.getTMTContext().disableBgFlush())
{
std::lock_guard<std::mutex> lock(region_mutex);
regions_to_flush.push(region.id());
{
std::lock_guard<std::mutex> lock(region_mutex);
regions_to_flush.push(region.id());
}
region_flush_handle->wake();
}
else
{
auto & region_table = db_context.getTMTContext().getRegionTable();
region_table.tryFlushRegion(region.id());
}
region_flush_handle->wake();
}

void RaftService::addRegionToDecode(const RegionPtr & region)
Expand Down
11 changes: 10 additions & 1 deletion dbms/src/Server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,8 @@ int Server::main(const std::vector<std::string> & /*args*/)
std::string kvstore_path = path + "kvstore/";
String flash_server_addr = config().getString("flash.service_addr", "0.0.0.0:3930");

bool disable_bg_flush = false;

::TiDB::StorageEngine engine_if_empty = ::TiDB::StorageEngine::TMT;
::TiDB::StorageEngine engine = engine_if_empty;

Expand Down Expand Up @@ -422,12 +424,19 @@ int Server::main(const std::vector<std::string> & /*args*/)
else
engine = engine_if_empty;
}

if (config().has("raft.disable_bg_flush"))
leiysky marked this conversation as resolved.
Show resolved Hide resolved
{
bool disable = config().getBool("raft.disable_bg_flush");
if (disable)
disable_bg_flush = true;
}
}

{
LOG_DEBUG(log, "Default storage engine: " << static_cast<Int64>(engine));
/// create TMTContext
global_context->createTMTContext(pd_addrs, learner_key, learner_value, ignore_databases, kvstore_path, flash_server_addr, engine);
global_context->createTMTContext(pd_addrs, learner_key, learner_value, ignore_databases, kvstore_path, flash_server_addr, engine, disable_bg_flush);
global_context->getTMTContext().getRegionTable().setTableCheckerThreshold(config().getDouble("flash.overlap_threshold", 0.9));
}

Expand Down
1 change: 1 addition & 0 deletions dbms/src/Server/config.xml
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,7 @@
<kvstore_path>/var/lib/clickhouse/kvstore</kvstore_path>
<regmap>/var/lib/clickhouse/regmap</regmap>
<pd_addr>http://127.0.0.1:13579</pd_addr>
<disable_bg_flush>false</disable_bg_flush>
</raft>

<flash>
Expand Down
60 changes: 32 additions & 28 deletions dbms/src/Storages/StorageDeltaMerge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -341,40 +341,44 @@ inline void doLearnerRead(const TiDB::TableID table_id, //
LOG_DEBUG(log,
"[Learner Read] wait index cost " << std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time).count() << " ms");

// After raft index is satisfied, we flush region to StorageDeltaMerge so that we can read all data
start_time = Clock::now();
std::set<RegionID> regions_flushing_in_bg_threads;
auto & region_table = tmt.getRegionTable();
for (auto && [region_id, region] : kvstore_region)
{
(void)region;
bool is_flushed = region_table.tryFlushRegion(region_id, table_id, false);
// If region is flushing by other bg threads, we should mark those regions to wait.
if (!is_flushed)
{
regions_flushing_in_bg_threads.insert(region_id);
LOG_DEBUG(log, "[Learner Read] region " << region_id << " is flushing by other thread.");
}
}
end_time = Clock::now();
LOG_DEBUG(log,
"[Learner Read] flush " << kvstore_region.size() - regions_flushing_in_bg_threads.size() << " regions of " << kvstore_region.size()
<< " to StorageDeltaMerge cost "
<< std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time).count() << " ms");

// Maybe there is some data not flush to store yet, we should wait till all regions is flushed.
if (!regions_flushing_in_bg_threads.empty())
/// If `disable_bg_flush` is true, we don't need to do both flushing KVStore or waiting for background tasks.
if (!tmt.disableBgFlush())
{
// After raft index is satisfied, we flush region to StorageDeltaMerge so that we can read all data
start_time = Clock::now();
for (const auto & region_id : regions_flushing_in_bg_threads)
std::set<RegionID> regions_flushing_in_bg_threads;
auto & region_table = tmt.getRegionTable();
for (auto && [region_id, region] : kvstore_region)
{
region_table.waitTillRegionFlushed(region_id);
(void)region;
bool is_flushed = region_table.tryFlushRegion(region_id, table_id, false);
// If region is flushing by other bg threads, we should mark those regions to wait.
if (!is_flushed)
{
regions_flushing_in_bg_threads.insert(region_id);
LOG_DEBUG(log, "[Learner Read] region " << region_id << " is flushing by other thread.");
}
}
end_time = Clock::now();
LOG_DEBUG(log,
"[Learner Read] wait bg flush " << regions_flushing_in_bg_threads.size() << " regions to StorageDeltaMerge cost "
<< std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time).count()
<< " ms");
"[Learner Read] flush " << kvstore_region.size() - regions_flushing_in_bg_threads.size() << " regions of "
<< kvstore_region.size() << " to StorageDeltaMerge cost "
<< std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time).count() << " ms");

// Maybe there is some data not flush to store yet, we should wait till all regions is flushed.
if (!regions_flushing_in_bg_threads.empty())
{
start_time = Clock::now();
for (const auto & region_id : regions_flushing_in_bg_threads)
{
region_table.waitTillRegionFlushed(region_id);
}
end_time = Clock::now();
LOG_DEBUG(log,
"[Learner Read] wait bg flush " << regions_flushing_in_bg_threads.size() << " regions to StorageDeltaMerge cost "
<< std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time).count()
<< " ms");
}
}
}

Expand Down
49 changes: 49 additions & 0 deletions dbms/src/Storages/Transaction/KVStore.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
#include <chrono>

#include <Interpreters/Context.h>
#include <Raft/RaftContext.h>
#include <Raft/RaftService.h>
#include <Storages/StorageDeltaMerge.h>
#include <Storages/Transaction/KVStore.h>
#include <Storages/Transaction/RaftCommandResult.h>
#include <Storages/Transaction/Region.h>
Expand Down Expand Up @@ -148,6 +151,16 @@ bool KVStore::onSnapshot(RegionPtr new_region, Context * context, const RegionsA
{
context->getRaftService().addRegionToDecode(new_region);
context->getTMTContext().getRegionTable().applySnapshotRegion(*new_region);
if (context->getTMTContext().disableBgFlush())
{
auto s_time = Clock::now();
context->getTMTContext().getRegionTable().tryFlushRegion(new_region->id());
auto e_time = Clock::now();
LOG_DEBUG(log,
"[syncFlush] Apply snapshot for region " << new_region->id() << ", cost "
<< std::chrono::duration_cast<std::chrono::milliseconds>(e_time - s_time).count()
<< "ms");
}
}

return true;
Expand All @@ -170,6 +183,9 @@ void KVStore::onServiceCommand(enginepb::CommandRequestBatch && cmds, RaftContex

auto task_lock = genTaskLock();

TableIDSet tables_to_flush;
std::unordered_set<RegionID> dirty_regions;

for (auto && cmd : *cmds.mutable_requests())
{
const auto & header = cmd.header();
Expand Down Expand Up @@ -200,6 +216,17 @@ void KVStore::onServiceCommand(enginepb::CommandRequestBatch && cmds, RaftContex
curr_region.makeRaftCommandDelegate(task_lock).onCommand(std::move(cmd), *this, region_table, *raft_cmd_res);
RaftCommandResult & result = *raft_cmd_res;

if (tmt_context != nullptr && tmt_context->disableBgFlush())
{
for (auto id : result.table_ids)
{
tables_to_flush.emplace(id);
}

if (!result.table_ids.empty())
dirty_regions.emplace(curr_region_id);
}

const auto region_report = [&]() { *(responseBatch.add_responses()) = curr_region.toCommandResponse(); };

const auto report_sync_log = [&]() {
Expand Down Expand Up @@ -314,6 +341,28 @@ void KVStore::onServiceCommand(enginepb::CommandRequestBatch && cmds, RaftContex
}
}

if (tmt_context != nullptr && tmt_context->disableBgFlush())
{
auto & region_table = tmt_context->getRegionTable();
for (auto table_id : tables_to_flush)
{
auto s_time = Clock::now();
auto regions_to_flush = region_table.getRegionsByTable(table_id);
for (auto region : regions_to_flush)
{
if (auto && itr = dirty_regions.find(region.first); itr != dirty_regions.end())
{
region_table.tryFlushRegion(region.first, table_id, false);
}
}
auto e_time = Clock::now();
LOG_DEBUG(log,
"[syncFlush]"
<< " table_id " << table_id << ", cost "
<< std::chrono::duration_cast<std::chrono::milliseconds>(e_time - s_time).count() << "ms");
}
}

if (responseBatch.responses_size())
raft_ctx.send(responseBatch);
}
Expand Down
6 changes: 4 additions & 2 deletions dbms/src/Storages/Transaction/TMTContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ namespace DB
TMTContext::TMTContext(Context & context, const std::vector<std::string> & addrs, const std::string & learner_key,
const std::string & learner_value, const std::unordered_set<std::string> & ignore_databases_, const std::string & kvstore_path,
const std::string & flash_service_address_,
::TiDB::StorageEngine engine_)
::TiDB::StorageEngine engine_,
bool disable_bg_flush_)
: kvstore(std::make_shared<KVStore>(kvstore_path)),
region_table(context),
pd_client(addrs.size() == 0 ? static_cast<pingcap::pd::IClient *>(new pingcap::pd::MockPDClient())
Expand All @@ -26,7 +27,8 @@ TMTContext::TMTContext(Context & context, const std::vector<std::string> & addrs
? std::static_pointer_cast<SchemaSyncer>(std::make_shared<TiDBSchemaSyncer<true>>(pd_client, region_cache, rpc_client))
: std::static_pointer_cast<SchemaSyncer>(std::make_shared<TiDBSchemaSyncer<false>>(pd_client, region_cache, rpc_client))),
flash_service_address(flash_service_address_),
engine(engine_)
engine(engine_),
disable_bg_flush(disable_bg_flush_)
{}

void TMTContext::restore()
Expand Down
7 changes: 6 additions & 1 deletion dbms/src/Storages/Transaction/TMTContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,14 @@ class TMTContext : private boost::noncopyable

bool isInitialized() const;

bool disableBgFlush() const { return disable_bg_flush; }

// TODO: get flusher args from config file
explicit TMTContext(Context & context, const std::vector<std::string> & addrs, const std::string & learner_key,
const std::string & learner_value, const std::unordered_set<std::string> & ignore_databases_, const std::string & kv_store_path,
const std::string & flash_service_address_,
TiDB::StorageEngine engine_);
TiDB::StorageEngine engine_,
bool disable_bg_flush_);

SchemaSyncerPtr getSchemaSyncer() const;
void setSchemaSyncer(SchemaSyncerPtr);
Expand Down Expand Up @@ -68,6 +71,8 @@ class TMTContext : private boost::noncopyable

String flash_service_address;
::TiDB::StorageEngine engine;

bool disable_bg_flush;
};

} // namespace DB
2 changes: 1 addition & 1 deletion dbms/src/test_utils/TiflashTestBasic.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class TiFlashTestEnv
}
catch (Exception & e)
{
context.createTMTContext({}, "", "", {"default"}, "./__tmp_data/kvstore", "", TiDB::StorageEngine::TMT);
context.createTMTContext({}, "", "", {"default"}, "./__tmp_data/kvstore", "", TiDB::StorageEngine::TMT, false);
context.getTMTContext().restore();
}
context.getSettingsRef() = settings;
Expand Down
1 change: 1 addition & 0 deletions tests/docker/config/config.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
<ignore_databases>system</ignore_databases>
<learner_key>engine</learner_key>
<learner_value>tiflash</learner_value>
<disable_bg_flush>false</disable_bg_flush>
</raft>

<flash>
Expand Down
1 change: 1 addition & 0 deletions tests/docker/config/tiflash.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
<learner_value>tiflash</learner_value>
<!--specify what engine we use. tmt or dm -->
<storage_engine>tmt</storage_engine>
<disable_bg_flush>false</disable_bg_flush>
</raft>

<flash>
Expand Down