-
Notifications
You must be signed in to change notification settings - Fork 6.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Auto-balance big parts in JBOD array #16481
Conversation
c28def2
to
e10faae
Compare
e10faae
to
e184235
Compare
9bc3ec3
to
a36c60c
Compare
@azat Another uncaught exception from dtor. Please take a look. https://clickhouse-test-reports.s3.yandex.net/16481/5d150cce4778dd14f58dcff67435bdec1efa155b/stress_test_(thread).html#fail1 |
Here is another patch - #20981 |
5d150cc
to
605dd9a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe I'm a little confused, but we just have three types of parts:
- Existing parts.
- Currently merging source parts that will become outdated (included in 1).
- Currently merging parts that will be added as a result of the merge.
For each new merged/fetched part we are trying to find a disk that has a minimum size of data in the same partition. So we just summing sizes by a disk of all existing parts from this same partition excluding parts 2) and adding parts 3). After that, we choose the smallest occupied disk.
The code looks slightly more complicated :)
Btw, how it will work with background moves? Also without tests merge is impossible.
|
||
// Pre-define a reasonable minimum size for the JBOD rebalancer | ||
static constexpr size_t MIN_BYTES_TO_REBALANCE_OVER_JBOD = 100 * 1024 * 1024; | ||
if (storage_settings->min_bytes_to_rebalance_partition_over_jbod > 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
for (const auto & part : parts) | ||
{ | ||
if (!storage.currently_submerging_parts.count(part)) | ||
LOG_WARNING(log, "currently_submerging_parts is missing parts. JBOD might lose balance"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unclear message. Which part is missing? What does it mean?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
|
||
for (const auto & [name, emerging_part] : currently_emerging_parts) | ||
{ | ||
// It's possible that the emerging parts are committed and get added twice. Thus a set is used to deduplicate. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How is it possible?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because currently_emerging_parts
is also maintained separately. We do commit before removing parts from it. Thus the previous getDataPartsStateRange(MergeTreeData::DataPartState::Committed)
might contain (rarely) the same part.
{ | ||
auto name = part->volume->getDisk()->getName(); | ||
auto it = disk_occupation.find(name); | ||
if (it != disk_occupation.end()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Otherwise, it's a LOGICAL_ERROR, isn't it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's possible that parts are stored on different volumes. We ignore them here.
{ | ||
it->second += part->getBytesOnDisk(); | ||
disk_parts[name].push_back(formatReadableSizeWithBinarySuffix(part->getBytesOnDisk())); | ||
big_parts.insert(part->name); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
big_not_merging_parts
?
|
||
// Also include current submerging parts | ||
for (const auto & part : covered_parts) | ||
merging_parts.insert(part->name); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why they are already not here? Ho we can have covered parts which are not in currently_submerging_parts
? Also why we don't check?
part->isStoredOnDisk() && part->getBytesOnDisk() >= min_bytes_to_rebalance_partition_over_jbod
&& part_info.partition_id == part->info.partition_id
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
currently_submerging_parts
only records parts in previous balanced reservation.
for (const auto & [name, emerging_part] : currently_emerging_parts) | ||
{ | ||
// It's possible that the emerging parts are committed and get added twice. Thus a set is used to deduplicate. | ||
if (big_parts.find(name) == big_parts.end()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again, no checks -- maybe this part from a different partition or small?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, we should check partition here. It will be alway big though.
{ | ||
const auto & disks = getStoragePolicy()->getVolume(max_volume_index)->getDisks(); | ||
std::map<String, size_t> disk_occupation; | ||
std::map<String, std::vector<String>> disk_parts; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a comment: "Used only for logging"
if (part->isStoredOnDisk() && part->getBytesOnDisk() >= min_bytes_to_rebalance_partition_over_jbod | ||
&& part_info.partition_id == part->info.partition_id) | ||
{ | ||
merging_parts.insert(part->name); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
big_submerging_parst_from_partition
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The set is used to remove some big valid parts out of calculation. It doesn't have currently submerging parts because those are not yet inserted into currently_submerging_parts
, which only contains parts that participate in some other balanced activity.
try | ||
{ | ||
const auto & disks = getStoragePolicy()->getVolume(max_volume_index)->getDisks(); | ||
std::map<String, size_t> disk_occupation; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why map
and not unordered_map
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Disk names are usually sorted.
for _ in range(10): | ||
try: | ||
print("Syncing replica") | ||
node2.query("SYSTEM SYNC REPLICA tbl") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This query ignores timeout, which exception do we expect?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added timeout = 10s. We can safely ignore any exceptions. The test checks the final state of disk balance.
"insert into tmp2 select randConstant() % 2, randomPrintableASCII(16) from numbers(50)" | ||
) | ||
|
||
time.sleep(2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why we need it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't.
WriteBufferFromOwnString log_str; | ||
writeCString("\nbalancer: \n", log_str); | ||
for (const auto & [disk_name, per_disk_parts] : disk_parts_for_logging) | ||
writeString(fmt::format(" {}: [{}]\n", disk_name, boost::algorithm::join(per_disk_parts, ", ")), log_str); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
btw, fmt::join
looks prettier.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, quite isolated and useful code. Also, we have a setting that allows disabling this behavior.
Internal documentation ticket: DOCSUP-12425 |
I hereby agree to the terms of the CLA available at: https://yandex.ru/legal/cla/?lang=en
Changelog category (leave one):
Changelog entry (a user-readable short description of the changes that goes to CHANGELOG.md):
Introduce a new merge tree setting
min_bytes_to_rebalance_partition_over_jbod
which allows assigning new parts to different disks of a JBOD volume in a balanced way.Detailed description / Documentation draft:
JBOD volume is notorious in that only a small subset of disks can be fully utilized for a given query. This PR focuses on a particular common scenario: partition-wise queries. Now big new parts are assigned to different disks of a JBOD volume in a balanced way.
It might be useful for #16300.