Skip to content

Commit

Permalink
Merge branch 'master' into cop
Browse files Browse the repository at this point in the history
  • Loading branch information
zanmato1984 authored Oct 11, 2019
2 parents 7fc53ad + a3f9370 commit 22ad2d3
Show file tree
Hide file tree
Showing 18 changed files with 206 additions and 68 deletions.
6 changes: 6 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ if (COMPILER_GCC OR COMPILER_CLANG)
set (CXX_WARNING_FLAGS "${CXX_WARNING_FLAGS} -Wnon-virtual-dtor")
endif ()

if (COMPILER_CLANG)
# Clang doesn't have int128 predefined macros, workaround by manually defining them
# Reference: https://stackoverflow.com/questions/41198673/uint128-t-not-working-with-clang-and-libstdc
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -D__GLIBCXX_BITSIZE_INT_N_0=128 -D__GLIBCXX_TYPE_INT_N_0=__int128")
endif ()

if (CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
# clang: warning: argument unused during compilation: '-stdlib=libc++'
# clang: warning: argument unused during compilation: '-specs=/usr/share/dpkg/no-pie-compile.specs' [-Wunused-command-line-argument]
Expand Down
17 changes: 2 additions & 15 deletions dbms/src/Interpreters/Context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@ struct ContextShared
String interserver_io_host; /// The host name by which this server is available for other servers.
UInt16 interserver_io_port = 0; /// and port.

std::vector<String> paths; /// Path to all candidate data directory
String path; /// Path to the primary data directory, with a slash at the end.
String tmp_path; /// The path to the temporary files that occur when processing the request.
String flags_path; /// Path to the directory with some control flags for server maintenance.
Expand Down Expand Up @@ -482,12 +481,6 @@ DatabasePtr Context::tryGetDatabase(const String & database_name)
return it->second;
}

const std::vector<String> & Context::getAllPath() const
{
auto lock = getLock();
return shared->paths;
}

String Context::getPath() const
{
auto lock = getLock();
Expand All @@ -512,12 +505,6 @@ String Context::getUserFilesPath() const
return shared->user_files_path;
}

void Context::setAllPath(const std::vector<String> & paths_)
{
auto lock = getLock();
shared->paths = paths_;
}

void Context::setPath(const String & path)
{
auto lock = getLock();
Expand Down Expand Up @@ -1430,12 +1417,12 @@ void Context::createTMTContext(const std::vector<std::string> & pd_addrs,
shared->tmt_context = std::make_shared<TMTContext>(*this, pd_addrs, learner_key, learner_value, ignore_databases, kvstore_path);
}

void Context::initializePartPathSelector(const std::vector<std::string> & all_path)
void Context::initializePartPathSelector(std::vector<std::string> && all_normal_path, std::vector<std::string> && all_fast_path)
{
auto lock = getLock();
if (shared->part_path_selector_ptr)
throw Exception("PartPathSelector instance has already existed", ErrorCodes::LOGICAL_ERROR);
shared->part_path_selector_ptr = std::make_shared<PartPathSelector>(all_path);
shared->part_path_selector_ptr = std::make_shared<PartPathSelector>(std::move(all_normal_path), std::move(all_fast_path));
}

PartPathSelector & Context::getPartPathSelector()
Expand Down
4 changes: 1 addition & 3 deletions dbms/src/Interpreters/Context.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,13 +144,11 @@ class Context

~Context();

const std::vector<String> & getAllPath() const;
String getPath() const;
String getTemporaryPath() const;
String getFlagsPath() const;
String getUserFilesPath() const;

void setAllPath(const std::vector<String> & paths);
void setPath(const String & path);
void setTemporaryPath(const String & path);
void setFlagsPath(const String & path);
Expand Down Expand Up @@ -372,7 +370,7 @@ class Context
void initializeSchemaSyncService();
SchemaSyncServicePtr & getSchemaSyncService();

void initializePartPathSelector(const std::vector<std::string> & all_path);
void initializePartPathSelector(std::vector<std::string> && all_path, std::vector<std::string> && all_fast_path);
PartPathSelector & getPartPathSelector();

Clusters & getClusters() const;
Expand Down
11 changes: 10 additions & 1 deletion dbms/src/Raft/RaftService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include <Raft/RaftService.h>
#include <Storages/Transaction/KVStore.h>
#include <Storages/Transaction/Region.h>
#include <Storages/Transaction/RegionDataMover.h>
#include <Storages/Transaction/TMTContext.h>
#include <Storages/Transaction/applySnapshot.h>

Expand All @@ -20,7 +21,15 @@ RaftService::RaftService(DB::Context & db_context_)
persist_handle = background_pool.addTask([this] { return kvstore->tryPersist(); }, false);

table_flush_handle = background_pool.addTask([this] {
RegionTable & region_table = db_context.getTMTContext().getRegionTable();
auto & tmt = db_context.getTMTContext();
RegionTable & region_table = tmt.getRegionTable();

// if all regions of table is removed, try to optimize data.
if (auto table_id = region_table.popOneTableToClean(); table_id != InvalidTableID)
{
LOG_INFO(log, "try to final optimize table " << table_id);
tryOptimizeStorageFinal(db_context, table_id);
}
return region_table.tryFlushRegions();
});

Expand Down
29 changes: 20 additions & 9 deletions dbms/src/Server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,29 +121,40 @@ int Server::main(const std::vector<std::string> & /*args*/)
config().add(loaded_config.configuration.duplicate(), PRIO_DEFAULT, false);
}

std::vector<String> all_fast_path;
if (config().has("fast_path"))
{
String fast_paths = config().getString("fast_path");
Poco::trimInPlace(fast_paths);
if (!fast_paths.empty())
{
Poco::StringTokenizer string_tokens(fast_paths, ";");
for (auto it = string_tokens.begin(); it != string_tokens.end(); it++)
{
all_fast_path.emplace_back(getCanonicalPath(std::string(*it)));
LOG_DEBUG(log, "Fast data part candidate path: " << getCanonicalPath(std::string(*it)));
}
}
}
String paths = config().getString("path");
std::vector<String> all_path;
std::vector<String> all_normal_path;
Poco::trimInPlace(paths);
if (paths.empty())
throw Exception("path configuration parameter is empty");
Poco::StringTokenizer string_tokens(paths, ";");
for (auto it = string_tokens.begin(); it != string_tokens.end(); it++)
{
all_path.push_back(getCanonicalPath(std::string(*it)));
all_normal_path.emplace_back(getCanonicalPath(std::string(*it)));
LOG_DEBUG(log, "Data part candidate path: " << std::string(*it));
}
global_context->setAllPath(all_path);
{
global_context->initializePartPathSelector(global_context->getAllPath());
}

std::string path = global_context->getAllPath()[0];
std::string path = all_normal_path[0];
std::string default_database = config().getString("default_database", "default");

global_context->setPath(path);
global_context->initializePartPathSelector(std::move(all_normal_path), std::move(all_fast_path));

/// Create directories for 'path' and for default database, if not exist.
for (const String & candidate_path : global_context->getAllPath())
for (const String & candidate_path : global_context->getPartPathSelector().getAllPath())
{
Poco::File(candidate_path + "data/" + default_database).createDirectories();
}
Expand Down
10 changes: 5 additions & 5 deletions dbms/src/Storages/MergeTree/MergeTreeData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ MergeTreeData::MergeTreeData(

auto path_exists = Poco::File(full_path).exists();
/// Creating directories, if not exist.
for (const String & path : context.getAllPath())
for (const String & path : context.getPartPathSelector().getAllPath())
{
String candidate_path = getDataPartsPath(path);
Poco::File(candidate_path).createDirectories();
Expand Down Expand Up @@ -445,7 +445,7 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
Strings part_file_names;
Strings part_file_parent_paths;
Poco::DirectoryIterator end;
for (const auto & path : context.getAllPath())
for (const auto & path : context.getPartPathSelector().getAllPath())
{
String data_path = getDataPartsPath(path);
LOG_DEBUG(log, "Loading data parts from path: " << data_path);
Expand Down Expand Up @@ -653,7 +653,7 @@ void MergeTreeData::clearOldTemporaryDirectories(ssize_t custom_directories_life
: current_time - settings.temporary_directories_lifetime.totalSeconds();

/// Delete temporary directories older than a day.
for (auto & path : context.getAllPath())
for (auto & path : context.getPartPathSelector().getAllPath())
{
Poco::DirectoryIterator end;
String storage_path = getDataPartsPath(path);
Expand Down Expand Up @@ -812,7 +812,7 @@ void MergeTreeData::dropAllData()

LOG_TRACE(log, "dropAllData: removing data from filesystem.");

for (const auto & path : context.getAllPath())
for (const auto & path : context.getPartPathSelector().getAllPath())
{
Poco::File(getDataPartsPath(path)).remove(true);
}
Expand Down Expand Up @@ -2065,7 +2065,7 @@ size_t MergeTreeData::getPartitionSize(const std::string & partition_id) const

Poco::DirectoryIterator end;

for (const auto & path : context.getAllPath())
for (const auto & path : context.getPartPathSelector().getAllPath())
{
String parts_path = getDataPartsPath(path);
for (Poco::DirectoryIterator it(parts_path); it != end; ++it)
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Storages/MergeTree/MergeTreeData.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <Core/SortDescription.h>
#include <Common/SimpleIncrement.h>
#include <Common/escapeForFileName.h>
#include <Interpreters/Context.h>
#include <Interpreters/ExpressionActions.h>
#include <Storages/ITableDeclaration.h>
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ void MergeTreeDataMerger::FuturePart::assign(MergeTreeData::DataPartsVector part
}
else
name = part_info.getPartName();
path = parts[0]->storage.context.getPartPathSelector().getPathForPart(parts[0]->storage, name);
path = parts[0]->storage.context.getPartPathSelector().getPathForPart(parts[0]->storage, name, part_info);
}

MergeTreeDataMerger::MergeTreeDataMerger(MergeTreeData & data_, const BackgroundProcessingPool & pool_)
Expand Down Expand Up @@ -153,7 +153,7 @@ size_t MergeTreeDataMerger::getMaxPartsSizeForMerge(size_t pool_size, size_t poo
static_cast<double>(free_entries) / data.settings.number_of_free_entries_in_pool_to_lower_max_size_of_merge);

size_t max_parts_size = max_size;
for (auto & path : data.context.getAllPath())
for (auto & path : data.context.getPartPathSelector().getAllPath())
{
auto s = static_cast<size_t>(DiskSpaceMonitor::getUnreservedFreeSpace(path) / DISK_USAGE_COEFFICIENT_TO_SELECT);
if (s < max_parts_size)
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/MergeTree/MergeTreeDataPart.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,15 +146,15 @@ MergeTreeDataPart::MergeTreeDataPart(MergeTreeData & storage_, const String & na
{
if (storage.merging_params.mode == MergeTreeData::MergingParams::Txn)
tmt_property = std::make_unique<TMTDataPartProperty>();
full_path_prefix = storage.context.getPartPathSelector().getPathForPart(storage, name);
full_path_prefix = storage.context.getPartPathSelector().getPathForPart(storage, name, info);
}

MergeTreeDataPart::MergeTreeDataPart(MergeTreeData & storage_, const String & name_)
: storage(storage_), name(name_), info(MergeTreePartInfo::fromPartName(name_, storage.format_version))
{
if (storage.merging_params.mode == MergeTreeData::MergingParams::Txn)
tmt_property = std::make_unique<TMTDataPartProperty>();
full_path_prefix = storage.context.getPartPathSelector().getPathForPart(storage, name);
full_path_prefix = storage.context.getPartPathSelector().getPathForPart(storage, name, info);
}

/// Takes into account the fact that several columns can e.g. share their .size substreams.
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/MergeTree/MergeTreeDataWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPa

/// The name could be non-unique in case of stale files from previous runs.
String full_path = new_data_part->getFullPath();
LOG_DEBUG(log, "Data part: " << new_data_part->relative_path << " will be in path " << full_path);
LOG_TRACE(log, "Data part: " << new_data_part->relative_path << " will be in path " << full_path);
Poco::File dir(full_path);

if (dir.exists())
Expand Down
69 changes: 52 additions & 17 deletions dbms/src/Storages/PartPathSelector.cpp
Original file line number Diff line number Diff line change
@@ -1,41 +1,76 @@
#include <Storages/MergeTree/DiskSpaceMonitor.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/MergeTreePartInfo.h>
#include <Storages/PartPathSelector.h>
#include <common/likely.h>
#include <map>

namespace DB
{
const String PartPathSelector::getPathForPart(MergeTreeData & data, const String & part_name) const
const String PartPathSelector::getPathForPart(
MergeTreeData & data, const String & part_name, const MergeTreePartInfo & info, size_t part_size) const
{
if (all_path.size() == 1)
// test whether this part can be put on fast path and return the path if it can
if (hasFastPath())
{
return all_path[0];
if (info.level == 0)
{
return normal_and_fast_path[getRandomFastPathIndex()];
}
size_t max_available_space = DiskSpaceMonitor::getUnreservedFreeSpace(normal_and_fast_path[fast_path_start_index]);
size_t path_index = fast_path_start_index;
for (size_t i = fast_path_start_index + 1; i < normal_and_fast_path.size(); i++)
{
size_t s = DiskSpaceMonitor::getUnreservedFreeSpace(normal_and_fast_path[i]);
if (s > max_available_space)
{
max_available_space = s;
path_index = i;
}
}

if (max_available_space >= settings.min_space_reserved_for_level_zero_parts
&& part_size <= settings.part_other_than_level_zero_max_ratio * max_available_space)
{
return normal_and_fast_path[path_index];
}
}

// there is only one normal path, just return it
if (fast_path_start_index == 1)
{
return normal_and_fast_path[0];
}
std::unordered_map<String, UInt64> path_size_map;
for (const auto & path : all_path)
if (info.level == 0)
{
path_size_map.emplace(path, 0);
return normal_and_fast_path[getRandomNormalPathIndex()];
}
// find the normal path with least size of parts of this table
std::vector<UInt64> path_parts_size;
path_parts_size.resize(fast_path_start_index, 0);
for (const auto & part : data.getDataPartsVector())
{
if (unlikely(path_size_map.find(part->full_path_prefix) == path_size_map.end()))
if (unlikely(normal_path_to_index_map.find(part->full_path_prefix) == normal_path_to_index_map.end()))
{
throw Exception("Part " + part->relative_path + " got unexpected path " + part->full_path_prefix, ErrorCodes::LOGICAL_ERROR);
}
path_size_map[part->full_path_prefix] += part->bytes_on_disk;
path_parts_size[normal_path_to_index_map.at(part->full_path_prefix)] += part->bytes_on_disk;
}
String result = all_path[0];
UInt64 parts_size = path_size_map[result];
for (const auto & element : path_size_map)
size_t result_path_index = 0;
UInt64 parts_size_on_result_path = path_parts_size[0];
for (size_t i = 1; i < fast_path_start_index; i++)
{
if (element.second < parts_size)
if (path_parts_size[i] < parts_size_on_result_path)
{
result = element.first;
parts_size = element.second;
result_path_index = i;
parts_size_on_result_path = path_parts_size[i];
}
LOG_DEBUG(log, "Path " << element.first << " size is " << element.second << " bytes.");
LOG_TRACE(log, "Path " << normal_and_fast_path[i] << " size is " << path_parts_size[i] << " bytes.");
}
LOG_TRACE(log,
"database: " << data.getDatabaseName() << " table: " << data.getTableName() << " part name: " << part_name
<< " path: " << normal_and_fast_path[result_path_index]);

LOG_DEBUG(log, "database: " << data.getDatabaseName() << " table: " << data.getTableName() << " part name: " << part_name << " path: " << result);
return result;
return normal_and_fast_path[result_path_index];
}
} // namespace DB
Loading

0 comments on commit 22ad2d3

Please sign in to comment.