Skip to content

Commit d56dfed

Browse files
authored
Enable manually adding some malfunctioned WN into blocklist (pingcap#308)
Signed-off-by: Calvin Neo <[email protected]>
1 parent 7bf8f41 commit d56dfed

File tree

7 files changed

+56
-3
lines changed

7 files changed

+56
-3
lines changed

Diff for: dbms/src/Interpreters/Context.cpp

+45
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,8 @@ struct ContextShared
192192

193193
JointThreadInfoJeallocMapPtr joint_memory_allocation_map; /// Joint thread-wise alloc/dealloc map
194194

195+
std::unordered_set<uint64_t> store_id_blocklist; /// Those store id are blocked from batch cop request.
196+
195197
class SessionKeyHash
196198
{
197199
public:
@@ -2264,6 +2266,48 @@ void Context::setMockMPPServerInfo(MockMPPServerInfo & info)
22642266
mpp_server_info = info;
22652267
}
22662268

2269+
const std::unordered_set<uint64_t> * Context::getStoreIdBlockList() const
2270+
{
2271+
return &shared->store_id_blocklist;
2272+
}
2273+
2274+
// NOLINTNEXTLINE(readability-convert-member-functions-to-static)
2275+
bool Context::initializeStoreIdBlockList(const String & comma_sep_string)
2276+
{
2277+
#if SERVERLESS_PROXY == 1
2278+
std::istringstream iss(comma_sep_string);
2279+
std::string token;
2280+
2281+
while (std::getline(iss, token, ','))
2282+
{
2283+
try
2284+
{
2285+
uint64_t number = std::stoull(token);
2286+
shared->store_id_blocklist.insert(number);
2287+
}
2288+
catch (...)
2289+
{
2290+
// Keep empty
2291+
LOG_INFO(DB::Logger::get(), "Error disagg_blocklist_wn_store_id setting, {}", comma_sep_string);
2292+
shared->store_id_blocklist.clear();
2293+
return false;
2294+
}
2295+
}
2296+
2297+
if (!shared->store_id_blocklist.empty())
2298+
LOG_DEBUG(
2299+
DB::Logger::get(),
2300+
"Blocklisted {} stores, which are {}",
2301+
shared->store_id_blocklist.size(),
2302+
comma_sep_string);
2303+
2304+
return true;
2305+
#else
2306+
UNUSED(comma_sep_string);
2307+
return true;
2308+
#endif
2309+
}
2310+
22672311
SessionCleaner::~SessionCleaner()
22682312
{
22692313
try
@@ -2297,4 +2341,5 @@ void SessionCleaner::run()
22972341
break;
22982342
}
22992343
}
2344+
23002345
} // namespace DB

Diff for: dbms/src/Interpreters/Context.h

+3
Original file line numberDiff line numberDiff line change
@@ -569,6 +569,9 @@ class Context
569569
bool isRegionInBlacklist(const RegionID region_id);
570570
bool isRegionsContainsInBlacklist(const std::vector<RegionID> regions);
571571

572+
bool initializeStoreIdBlockList(const String &);
573+
const std::unordered_set<uint64_t> * getStoreIdBlockList() const;
574+
572575
private:
573576
/** Check if the current client has access to the specified database.
574577
* If access is denied, throw an exception.

Diff for: dbms/src/Interpreters/Settings.h

+2-2
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ struct Settings
5555
M(SettingUInt64, disagg_build_task_timeout, DEFAULT_DISAGG_TASK_BUILD_TIMEOUT_SEC, "disagg task establish timeout, unit is second.") \
5656
M(SettingUInt64, disagg_task_snapshot_timeout, DEFAULT_DISAGG_TASK_TIMEOUT_SEC, "disagg task snapshot max endurable time, unit is second.") \
5757
M(SettingUInt64, disagg_fetch_pages_timeout, DEFAULT_DISAGG_FETCH_PAGES_TIMEOUT_SEC, "fetch disagg pages timeout for one segment, unit is second.") \
58+
M(SettingString, disagg_blocklist_wn_store_id, "", "comma seperated unsigned integers representing `store_id`s of stores that are blacklisted.") \
5859
M(SettingInt64, safe_point_update_interval_seconds, 1, "The interval in seconds to update safe point from PD.") \
5960
M(SettingUInt64, min_compress_block_size, DEFAULT_MIN_COMPRESS_BLOCK_SIZE, "The actual size of the block to compress, if the uncompressed data less than max_compress_block_size is no less than this value " \
6061
"and no less than the volume of data for one mark.") \
@@ -251,9 +252,8 @@ struct Settings
251252
M(SettingDouble, remote_gc_ratio, 0.5, "The files with valid rate less than this threshold will be compacted") \
252253
M(SettingInt64, remote_gc_small_size, 128 * 1024, "The files with total size less than this threshold will be compacted") \
253254
M(SettingDouble, disagg_read_concurrency_scale, 20.0, "Scale * logical cpu cores = disaggregated read IO concurrency.") \
254-
\
255255
M(SettingInt64, fap_wait_checkpoint_timeout_seconds, 80, "The max time wait for a usable checkpoint for FAP") \
256-
M(SettingUInt64, fap_task_timeout_seconds, 120, "The max time FAP can take before fallback") \
256+
M(SettingUInt64, fap_task_timeout_seconds, 120, "The max time FAP can take before fallback") \
257257
M(SettingUInt64, fap_handle_concurrency, 25, "The number of threads for handling FAP tasks") \
258258
\
259259
M(SettingUInt64, max_rows_in_set, 0, "Maximum size of the set (in number of elements) resulting from the execution of the IN section.") \

Diff for: dbms/src/Interpreters/SettingsCommon.h

+1
Original file line numberDiff line numberDiff line change
@@ -842,6 +842,7 @@ struct SettingString
842842
void write(WriteBuffer & buf) const { writeBinary(value, buf); }
843843

844844
String get() const { return value; }
845+
const String & getRef() const { return value; }
845846

846847
private:
847848
String value;

Diff for: dbms/src/Server/Server.cpp

+3
Original file line numberDiff line numberDiff line change
@@ -1560,6 +1560,9 @@ int Server::main(const std::vector<std::string> & /*args*/)
15601560
}
15611561
}
15621562

1563+
// We do not support blocking store by id in OP mode currently.
1564+
global_context->initializeStoreIdBlockList(global_context->getSettingsRef().disagg_blocklist_wn_store_id);
1565+
15631566
LOG_INFO(log, "Loading metadata.");
15641567
loadMetadataSystem(*global_context); // Load "system" database. Its engine keeps as Ordinary.
15651568
/// After attaching system databases we can initialize system log.

Diff for: dbms/src/Storages/StorageDisaggregated.cpp

+1
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,7 @@ std::vector<pingcap::coprocessor::BatchCopTask> StorageDisaggregated::buildBatch
179179
table_scan.isPartitionTableScan(),
180180
physical_table_ids,
181181
ranges_for_each_physical_table,
182+
context.getStoreIdBlockList(),
182183
store_type,
183184
label_filter,
184185
&Poco::Logger::get("pingcap/coprocessor"),

0 commit comments

Comments
 (0)