diff --git a/dbms/src/Interpreters/Context.cpp b/dbms/src/Interpreters/Context.cpp index 2e25ab65506..c7224f0d376 100644 --- a/dbms/src/Interpreters/Context.cpp +++ b/dbms/src/Interpreters/Context.cpp @@ -1411,12 +1411,13 @@ void Context::createTMTContext(const std::vector & pd_addrs, const std::unordered_set & ignore_databases, const std::string & kvstore_path, const std::string & flash_service_address, - ::TiDB::StorageEngine engine) + ::TiDB::StorageEngine engine, + bool disable_bg_flush) { auto lock = getLock(); if (shared->tmt_context) throw Exception("TMTContext has already existed", ErrorCodes::LOGICAL_ERROR); - shared->tmt_context = std::make_shared(*this, pd_addrs, learner_key, learner_value, ignore_databases, kvstore_path, flash_service_address, engine); + shared->tmt_context = std::make_shared(*this, pd_addrs, learner_key, learner_value, ignore_databases, kvstore_path, flash_service_address, engine, disable_bg_flush); } void Context::initializePartPathSelector(std::vector && all_normal_path, std::vector && all_fast_path) diff --git a/dbms/src/Interpreters/Context.h b/dbms/src/Interpreters/Context.h index fd55387d8b4..03923893345 100644 --- a/dbms/src/Interpreters/Context.h +++ b/dbms/src/Interpreters/Context.h @@ -367,7 +367,8 @@ class Context const std::unordered_set & ignore_databases, const std::string & kvstore_path, const std::string & flash_service_address, - ::TiDB::StorageEngine engine); + ::TiDB::StorageEngine engine, + bool disable_bg_tasks); RaftService & getRaftService(); void initializeSchemaSyncService(); diff --git a/dbms/src/Raft/RaftService.cpp b/dbms/src/Raft/RaftService.cpp index db9bcb5790c..5af5326d711 100644 --- a/dbms/src/Raft/RaftService.cpp +++ b/dbms/src/Raft/RaftService.cpp @@ -30,32 +30,40 @@ RaftService::RaftService(DB::Context & db_context_) }, false); - table_flush_handle = background_pool.addTask([this] { - auto & tmt = db_context.getTMTContext(); - RegionTable & region_table = tmt.getRegionTable(); + if (!db_context.getTMTContext().disableBgFlush()) + { - // if all regions of table is removed, try to optimize data. - if (auto table_id = region_table.popOneTableToOptimize(); table_id != InvalidTableID) - { - LOG_INFO(log, "try to final optimize table " << table_id); - tryOptimizeStorageFinal(db_context, table_id); - } - return region_table.tryFlushRegions(); - }); + table_flush_handle = background_pool.addTask([this] { + auto & tmt = db_context.getTMTContext(); + RegionTable & region_table = tmt.getRegionTable(); - region_flush_handle = background_pool.addTask([this] { - RegionID region_id; - { - std::lock_guard lock(region_mutex); - if (regions_to_flush.empty()) - return false; - region_id = regions_to_flush.front(); - regions_to_flush.pop(); - } - RegionTable & region_table = db_context.getTMTContext().getRegionTable(); - region_table.tryFlushRegion(region_id); - return true; - }); + // if all regions of table is removed, try to optimize data. + if (auto table_id = region_table.popOneTableToOptimize(); table_id != InvalidTableID) + { + LOG_INFO(log, "try to final optimize table " << table_id); + tryOptimizeStorageFinal(db_context, table_id); + } + return region_table.tryFlushRegions(); + }); + + region_flush_handle = background_pool.addTask([this] { + RegionID region_id; + { + std::lock_guard lock(region_mutex); + if (regions_to_flush.empty()) + return false; + region_id = regions_to_flush.front(); + regions_to_flush.pop(); + } + RegionTable & region_table = db_context.getTMTContext().getRegionTable(); + region_table.tryFlushRegion(region_id); + return true; + }); + } + else + { + LOG_INFO(log, "Configuration raft.disable_bg_flush is set to true, background flush tasks are disabled."); + } region_decode_handle = background_pool.addTask([this] { RegionPtr region; @@ -85,11 +93,19 @@ RaftService::RaftService(DB::Context & db_context_) void RaftService::addRegionToFlush(const Region & region) { + if (!db_context.getTMTContext().disableBgFlush()) { - std::lock_guard lock(region_mutex); - regions_to_flush.push(region.id()); + { + std::lock_guard lock(region_mutex); + regions_to_flush.push(region.id()); + } + region_flush_handle->wake(); + } + else + { + auto & region_table = db_context.getTMTContext().getRegionTable(); + region_table.tryFlushRegion(region.id()); } - region_flush_handle->wake(); } void RaftService::addRegionToDecode(const RegionPtr & region) diff --git a/dbms/src/Server/Server.cpp b/dbms/src/Server/Server.cpp index 8c056b19084..e73f51634a3 100644 --- a/dbms/src/Server/Server.cpp +++ b/dbms/src/Server/Server.cpp @@ -351,6 +351,8 @@ int Server::main(const std::vector & /*args*/) std::string kvstore_path = path + "kvstore/"; String flash_server_addr = config().getString("flash.service_addr", "0.0.0.0:3930"); + bool disable_bg_flush = false; + ::TiDB::StorageEngine engine_if_empty = ::TiDB::StorageEngine::TMT; ::TiDB::StorageEngine engine = engine_if_empty; @@ -422,12 +424,19 @@ int Server::main(const std::vector & /*args*/) else engine = engine_if_empty; } + + if (config().has("raft.disable_bg_flush")) + { + bool disable = config().getBool("raft.disable_bg_flush"); + if (disable) + disable_bg_flush = true; + } } { LOG_DEBUG(log, "Default storage engine: " << static_cast(engine)); /// create TMTContext - global_context->createTMTContext(pd_addrs, learner_key, learner_value, ignore_databases, kvstore_path, flash_server_addr, engine); + global_context->createTMTContext(pd_addrs, learner_key, learner_value, ignore_databases, kvstore_path, flash_server_addr, engine, disable_bg_flush); global_context->getTMTContext().getRegionTable().setTableCheckerThreshold(config().getDouble("flash.overlap_threshold", 0.9)); } diff --git a/dbms/src/Server/config.xml b/dbms/src/Server/config.xml index ced2c54b2c1..8507e1c412a 100644 --- a/dbms/src/Server/config.xml +++ b/dbms/src/Server/config.xml @@ -316,6 +316,7 @@ /var/lib/clickhouse/kvstore /var/lib/clickhouse/regmap http://127.0.0.1:13579 + false diff --git a/dbms/src/Storages/StorageDeltaMerge.cpp b/dbms/src/Storages/StorageDeltaMerge.cpp index 07c86edf59c..09b2980f7e0 100644 --- a/dbms/src/Storages/StorageDeltaMerge.cpp +++ b/dbms/src/Storages/StorageDeltaMerge.cpp @@ -341,40 +341,44 @@ inline void doLearnerRead(const TiDB::TableID table_id, // LOG_DEBUG(log, "[Learner Read] wait index cost " << std::chrono::duration_cast(end_time - start_time).count() << " ms"); - // After raft index is satisfied, we flush region to StorageDeltaMerge so that we can read all data - start_time = Clock::now(); - std::set regions_flushing_in_bg_threads; - auto & region_table = tmt.getRegionTable(); - for (auto && [region_id, region] : kvstore_region) - { - (void)region; - bool is_flushed = region_table.tryFlushRegion(region_id, table_id, false); - // If region is flushing by other bg threads, we should mark those regions to wait. - if (!is_flushed) - { - regions_flushing_in_bg_threads.insert(region_id); - LOG_DEBUG(log, "[Learner Read] region " << region_id << " is flushing by other thread."); - } - } - end_time = Clock::now(); - LOG_DEBUG(log, - "[Learner Read] flush " << kvstore_region.size() - regions_flushing_in_bg_threads.size() << " regions of " << kvstore_region.size() - << " to StorageDeltaMerge cost " - << std::chrono::duration_cast(end_time - start_time).count() << " ms"); - - // Maybe there is some data not flush to store yet, we should wait till all regions is flushed. - if (!regions_flushing_in_bg_threads.empty()) + /// If `disable_bg_flush` is true, we don't need to do both flushing KVStore or waiting for background tasks. + if (!tmt.disableBgFlush()) { + // After raft index is satisfied, we flush region to StorageDeltaMerge so that we can read all data start_time = Clock::now(); - for (const auto & region_id : regions_flushing_in_bg_threads) + std::set regions_flushing_in_bg_threads; + auto & region_table = tmt.getRegionTable(); + for (auto && [region_id, region] : kvstore_region) { - region_table.waitTillRegionFlushed(region_id); + (void)region; + bool is_flushed = region_table.tryFlushRegion(region_id, table_id, false); + // If region is flushing by other bg threads, we should mark those regions to wait. + if (!is_flushed) + { + regions_flushing_in_bg_threads.insert(region_id); + LOG_DEBUG(log, "[Learner Read] region " << region_id << " is flushing by other thread."); + } } end_time = Clock::now(); LOG_DEBUG(log, - "[Learner Read] wait bg flush " << regions_flushing_in_bg_threads.size() << " regions to StorageDeltaMerge cost " - << std::chrono::duration_cast(end_time - start_time).count() - << " ms"); + "[Learner Read] flush " << kvstore_region.size() - regions_flushing_in_bg_threads.size() << " regions of " + << kvstore_region.size() << " to StorageDeltaMerge cost " + << std::chrono::duration_cast(end_time - start_time).count() << " ms"); + + // Maybe there is some data not flush to store yet, we should wait till all regions is flushed. + if (!regions_flushing_in_bg_threads.empty()) + { + start_time = Clock::now(); + for (const auto & region_id : regions_flushing_in_bg_threads) + { + region_table.waitTillRegionFlushed(region_id); + } + end_time = Clock::now(); + LOG_DEBUG(log, + "[Learner Read] wait bg flush " << regions_flushing_in_bg_threads.size() << " regions to StorageDeltaMerge cost " + << std::chrono::duration_cast(end_time - start_time).count() + << " ms"); + } } } diff --git a/dbms/src/Storages/Transaction/KVStore.cpp b/dbms/src/Storages/Transaction/KVStore.cpp index e460b825ae3..08b46513ba4 100644 --- a/dbms/src/Storages/Transaction/KVStore.cpp +++ b/dbms/src/Storages/Transaction/KVStore.cpp @@ -1,6 +1,9 @@ +#include + #include #include #include +#include #include #include #include @@ -148,6 +151,16 @@ bool KVStore::onSnapshot(RegionPtr new_region, Context * context, const RegionsA { context->getRaftService().addRegionToDecode(new_region); context->getTMTContext().getRegionTable().applySnapshotRegion(*new_region); + if (context->getTMTContext().disableBgFlush()) + { + auto s_time = Clock::now(); + context->getTMTContext().getRegionTable().tryFlushRegion(new_region->id()); + auto e_time = Clock::now(); + LOG_DEBUG(log, + "[syncFlush] Apply snapshot for region " << new_region->id() << ", cost " + << std::chrono::duration_cast(e_time - s_time).count() + << "ms"); + } } return true; @@ -170,6 +183,9 @@ void KVStore::onServiceCommand(enginepb::CommandRequestBatch && cmds, RaftContex auto task_lock = genTaskLock(); + TableIDSet tables_to_flush; + std::unordered_set dirty_regions; + for (auto && cmd : *cmds.mutable_requests()) { const auto & header = cmd.header(); @@ -200,6 +216,17 @@ void KVStore::onServiceCommand(enginepb::CommandRequestBatch && cmds, RaftContex curr_region.makeRaftCommandDelegate(task_lock).onCommand(std::move(cmd), *this, region_table, *raft_cmd_res); RaftCommandResult & result = *raft_cmd_res; + if (tmt_context != nullptr && tmt_context->disableBgFlush()) + { + for (auto id : result.table_ids) + { + tables_to_flush.emplace(id); + } + + if (!result.table_ids.empty()) + dirty_regions.emplace(curr_region_id); + } + const auto region_report = [&]() { *(responseBatch.add_responses()) = curr_region.toCommandResponse(); }; const auto report_sync_log = [&]() { @@ -314,6 +341,28 @@ void KVStore::onServiceCommand(enginepb::CommandRequestBatch && cmds, RaftContex } } + if (tmt_context != nullptr && tmt_context->disableBgFlush()) + { + auto & region_table = tmt_context->getRegionTable(); + for (auto table_id : tables_to_flush) + { + auto s_time = Clock::now(); + auto regions_to_flush = region_table.getRegionsByTable(table_id); + for (auto region : regions_to_flush) + { + if (auto && itr = dirty_regions.find(region.first); itr != dirty_regions.end()) + { + region_table.tryFlushRegion(region.first, table_id, false); + } + } + auto e_time = Clock::now(); + LOG_DEBUG(log, + "[syncFlush]" + << " table_id " << table_id << ", cost " + << std::chrono::duration_cast(e_time - s_time).count() << "ms"); + } + } + if (responseBatch.responses_size()) raft_ctx.send(responseBatch); } diff --git a/dbms/src/Storages/Transaction/TMTContext.cpp b/dbms/src/Storages/Transaction/TMTContext.cpp index 28440ba76e2..ade09049bc1 100644 --- a/dbms/src/Storages/Transaction/TMTContext.cpp +++ b/dbms/src/Storages/Transaction/TMTContext.cpp @@ -14,7 +14,8 @@ namespace DB TMTContext::TMTContext(Context & context, const std::vector & addrs, const std::string & learner_key, const std::string & learner_value, const std::unordered_set & ignore_databases_, const std::string & kvstore_path, const std::string & flash_service_address_, - ::TiDB::StorageEngine engine_) + ::TiDB::StorageEngine engine_, + bool disable_bg_flush_) : kvstore(std::make_shared(kvstore_path)), region_table(context), pd_client(addrs.size() == 0 ? static_cast(new pingcap::pd::MockPDClient()) @@ -26,7 +27,8 @@ TMTContext::TMTContext(Context & context, const std::vector & addrs ? std::static_pointer_cast(std::make_shared>(pd_client, region_cache, rpc_client)) : std::static_pointer_cast(std::make_shared>(pd_client, region_cache, rpc_client))), flash_service_address(flash_service_address_), - engine(engine_) + engine(engine_), + disable_bg_flush(disable_bg_flush_) {} void TMTContext::restore() diff --git a/dbms/src/Storages/Transaction/TMTContext.h b/dbms/src/Storages/Transaction/TMTContext.h index b2744fbdae9..0236bd97046 100644 --- a/dbms/src/Storages/Transaction/TMTContext.h +++ b/dbms/src/Storages/Transaction/TMTContext.h @@ -31,11 +31,14 @@ class TMTContext : private boost::noncopyable bool isInitialized() const; + bool disableBgFlush() const { return disable_bg_flush; } + // TODO: get flusher args from config file explicit TMTContext(Context & context, const std::vector & addrs, const std::string & learner_key, const std::string & learner_value, const std::unordered_set & ignore_databases_, const std::string & kv_store_path, const std::string & flash_service_address_, - TiDB::StorageEngine engine_); + TiDB::StorageEngine engine_, + bool disable_bg_flush_); SchemaSyncerPtr getSchemaSyncer() const; void setSchemaSyncer(SchemaSyncerPtr); @@ -68,6 +71,8 @@ class TMTContext : private boost::noncopyable String flash_service_address; ::TiDB::StorageEngine engine; + + bool disable_bg_flush; }; } // namespace DB diff --git a/dbms/src/test_utils/TiflashTestBasic.h b/dbms/src/test_utils/TiflashTestBasic.h index 4960548c452..fb0a3986d3f 100644 --- a/dbms/src/test_utils/TiflashTestBasic.h +++ b/dbms/src/test_utils/TiflashTestBasic.h @@ -22,7 +22,7 @@ class TiFlashTestEnv } catch (Exception & e) { - context.createTMTContext({}, "", "", {"default"}, "./__tmp_data/kvstore", "", TiDB::StorageEngine::TMT); + context.createTMTContext({}, "", "", {"default"}, "./__tmp_data/kvstore", "", TiDB::StorageEngine::TMT, false); context.getTMTContext().restore(); } context.getSettingsRef() = settings; diff --git a/tests/docker/config/config.xml b/tests/docker/config/config.xml index fe9f016bb85..59e05284c4a 100644 --- a/tests/docker/config/config.xml +++ b/tests/docker/config/config.xml @@ -21,6 +21,7 @@ system engine tiflash + false diff --git a/tests/docker/config/tiflash.xml b/tests/docker/config/tiflash.xml index f30e4e50044..865dff2a341 100644 --- a/tests/docker/config/tiflash.xml +++ b/tests/docker/config/tiflash.xml @@ -23,6 +23,7 @@ tiflash tmt + false