Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add schema sync on read and simplify schema syncer interface and adjust mock stuff #108

Merged
merged 4 commits into from
Jul 18, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dbms/src/Core/Defines.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
#define DEFAULT_MAX_COMPRESS_BLOCK_SIZE 1048576

#define DEFAULT_MAX_READ_TSO 0xFFFFFFFFFFFFFFFF
#define DEFAULT_SCHEMA_VERSION -1
#define DEFAULT_UNSPECIFIED_SCHEMA_VERSION -1

/** Which blocks by default read the data (by number of rows).
* Smaller values give better cache locality, less consumption of RAM, but more overhead to process the query.
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Debug/DBGInvoker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,9 @@ DBGInvoker::DBGInvoker()

regFunc("dump_all_region", dbgFuncDumpAllRegion);

regFunc("enable_schema_sync_service", dbgFuncEnableSchemaSyncService);
regFunc("mock_schema_syncer", dbgFuncMockSchemaSyncer);
regFunc("refresh_schema", dbgFuncRefreshSchema);
regFunc("refresh_schemas", dbgFuncRefreshSchemas);
}

void replaceSubstr(std::string & str, const std::string & target, const std::string & replacement)
Expand Down
55 changes: 26 additions & 29 deletions dbms/src/Debug/MockSchemaSyncer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -235,31 +235,36 @@ AlterCommands detectSchemaChanges(const TableInfo & table_info, const TableInfo

MockSchemaSyncer::MockSchemaSyncer() : log(&Logger::get("MockSchemaSyncer")) {}

bool MockSchemaSyncer::syncSchemas(Context & /*context*/)
bool MockSchemaSyncer::syncSchemas(Context & context)
{
// Don't do full schema sync, we want to test schema sync timing in a fine-grained fashion.
return false;
std::unordered_map<TableID, MockTiDB::TablePtr> new_tables;
MockTiDB::instance().traverseTables([&](const auto & table) { new_tables.emplace(table->id(), table); });

for (auto [id, table] : tables)
{
if (new_tables.find(id) == new_tables.end())
dropTable(table->table_info.db_name, table->table_info.name, context);
}

for (auto [id, table] : new_tables)
{
std::ignore = id;
syncTable(context, table);
}

tables.swap(new_tables);

return true;
}

void MockSchemaSyncer::syncSchema(Context & context, TableID table_id, bool /*lock*/)
void MockSchemaSyncer::syncTable(Context & context, MockTiDB::TablePtr table)
{
auto & tmt_context = context.getTMTContext();

/// Get table schema json from TiDB/TiKV.
String table_info_json = getSchemaJson(table_id, context);
if (table_info_json.empty())
{
/// Table dropped.
auto storage = tmt_context.getStorages().get(table_id);
if (storage == nullptr)
{
LOG_DEBUG(log, __PRETTY_FUNCTION__ << ": Table " << table_id << "doesn't exist in TiDB and doesn't exist in TMT, do nothing.");
return;
}
LOG_DEBUG(log, __PRETTY_FUNCTION__ << ": Table " << table_id << "doesn't exist in TiDB, dropping.");
dropTable(storage->getDatabaseName(), storage->getTableName(), context);
return;
}
/// Get table schema json.
auto table_id = table->id();

String table_info_json = table->table_info.serialize(false);

LOG_DEBUG(log, __PRETTY_FUNCTION__ << ": Table " << table_id << " info json: " << table_info_json);

Expand Down Expand Up @@ -326,8 +331,7 @@ void MockSchemaSyncer::syncSchema(Context & context, TableID table_id, bool /*lo
}

/// Table existing, detect schema changes and apply.
auto merge_tree = std::dynamic_pointer_cast<StorageMergeTree>(storage);
const TableInfo & orig_table_info = merge_tree->getTableInfo();
const TableInfo & orig_table_info = storage->getTableInfo();
AlterCommands alter_commands = detectSchemaChanges(table_info, orig_table_info);

std::stringstream ss;
Expand All @@ -345,15 +349,8 @@ void MockSchemaSyncer::syncSchema(Context & context, TableID table_id, bool /*lo

LOG_DEBUG(log, __PRETTY_FUNCTION__ << ": " << ss.str());

{
// Change internal TableInfo in TMT first.
// TODO: Ideally this should be done within alter function, however we are limited by the narrow alter interface, thus not truly atomic.
auto table_hard_lock = storage->lockStructureForAlter(__PRETTY_FUNCTION__);
merge_tree->setTableInfo(table_info);
}

// Call storage alter to apply schema changes.
storage->alter(alter_commands, table_info.db_name, table_info.name, context);
storage->alterForTMT(alter_commands, table_info, context);

LOG_DEBUG(log, __PRETTY_FUNCTION__ << ": Schema changes apply done.");

Expand Down
13 changes: 5 additions & 8 deletions dbms/src/Debug/MockSchemaSyncer.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

#include <Debug/MockTiDB.h>

#include <unordered_map>

namespace DB
{

Expand All @@ -12,17 +14,12 @@ class MockSchemaSyncer : public SchemaSyncer

bool syncSchemas(Context & context) override;

void syncSchema(Context & context, TableID table_id, bool) override;

TableID getTableIdByName(const std::string & database_name, const std::string & table_name)
{
return MockTiDB::instance().getTableIDByName(database_name, table_name);
}

protected:
String getSchemaJson(TableID table_id, Context & /*context*/) { return MockTiDB::instance().getSchemaJson(table_id); }
void syncTable(Context & context, MockTiDB::TablePtr table);

Logger * log;

std::unordered_map<TableID, MockTiDB::TablePtr> tables;
};

} // namespace DB
34 changes: 7 additions & 27 deletions dbms/src/Debug/MockTiDB.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,33 +26,6 @@ Table::Table(const String & database_name_, const String & table_name_, TableInf
: table_info(std::move(table_info_)), database_name(database_name_), table_name(table_name_)
{}

String MockTiDB::getSchemaJson(TableID table_id)
{
std::lock_guard lock(tables_mutex);

auto it = tables_by_id.find(table_id);
if (it == tables_by_id.end())
{
return "";
}

return it->second->table_info.serialize(false);
}

TableID MockTiDB::getTableIDByName(const std::string & database_name, const std::string & table_name)
{
std::lock_guard lock(tables_mutex);

String qualified_name = database_name + "." + table_name;
auto it = tables_by_name.find(qualified_name);
if (it == tables_by_name.end())
{
return InvalidTableID;
}

return it->second->table_info.id;
}

void MockTiDB::dropTable(const String & database_name, const String & table_name)
{
std::lock_guard lock(tables_mutex);
Expand Down Expand Up @@ -251,6 +224,13 @@ TablePtr MockTiDB::getTableByName(const String & database_name, const String & t
return getTableByNameInternal(database_name, table_name);
}

void MockTiDB::traverseTables(std::function<void(TablePtr)> f)
{
std::lock_guard lock(tables_mutex);

std::for_each(tables_by_id.begin(), tables_by_id.end(), [&](const auto & pair) { f(pair.second); });
}

TablePtr MockTiDB::getTableByNameInternal(const String & database_name, const String & table_name)
{
String qualified_name = database_name + "." + table_name;
Expand Down
6 changes: 2 additions & 4 deletions dbms/src/Debug/MockTiDB.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,6 @@ class MockTiDB : public ext::singleton<MockTiDB>
using TablePtr = std::shared_ptr<Table>;

public:
String getSchemaJson(TableID table_id);

TableID getTableIDByName(const std::string & database_name, const std::string & table_name);

TableID newTable(const String & database_name, const String & table_name, const ColumnsDescription & columns);

TableID newPartition(const String & database_name, const String & table_name, const String & partition_name);
Expand All @@ -80,6 +76,8 @@ class MockTiDB : public ext::singleton<MockTiDB>

TablePtr getTableByName(const String & database_name, const String & table_name);

void traverseTables(std::function<void(TablePtr)> f);

private:
TablePtr getTableByNameInternal(const String & database_name, const String & table_name);

Expand Down
92 changes: 23 additions & 69 deletions dbms/src/Debug/dbgFuncSchema.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTLiteral.h>
#include <Storages/StorageMergeTree.h>
#include <Storages/Transaction/SchemaSyncService.h>
#include <Storages/Transaction/SchemaSyncer.h>
#include <Storages/Transaction/TMTContext.h>
#include <Storages/Transaction/TiDB.h>
Expand All @@ -15,6 +16,23 @@ namespace ErrorCodes
extern const int UNKNOWN_TABLE;
} // namespace ErrorCodes

void dbgFuncEnableSchemaSyncService(Context & context, const ASTs & args, DBGInvoker::Printer output)
{
if (args.size() != 1)
throw Exception("Args not matched, should be: enable (true/false)", ErrorCodes::BAD_ARGUMENTS);

bool enable = safeGet<String>(typeid_cast<const ASTLiteral &>(*args[0]).value) == "true";

if (enable)
context.initializeSchemaSyncService();
else
context.getSchemaSyncService().reset();

std::stringstream ss;
ss << "schema sync service " << (enable ? "enabled" : "disabled");
output(ss.str());
}

void dbgFuncMockSchemaSyncer(Context & context, const ASTs & args, DBGInvoker::Printer output)
{
if (args.size() != 1)
Expand All @@ -39,79 +57,15 @@ void dbgFuncMockSchemaSyncer(Context & context, const ASTs & args, DBGInvoker::P
output(ss.str());
}

void dbgFuncRefreshSchema(Context & context, const ASTs & args, DBGInvoker::Printer output)
void dbgFuncRefreshSchemas(Context & context, const ASTs &, DBGInvoker::Printer output)
{
if (args.size() != 2)
throw Exception("Args not matched, should be: database-name, table-name", ErrorCodes::BAD_ARGUMENTS);

std::string database_name = typeid_cast<const ASTIdentifier &>(*args[0]).name;
std::transform(database_name.begin(), database_name.end(), database_name.begin(), ::tolower);
std::string table_name = typeid_cast<const ASTIdentifier &>(*args[1]).name;
std::transform(table_name.begin(), table_name.end(), table_name.begin(), ::tolower);

auto log = [&](TableID table_id) {
std::stringstream ss;
ss << "refreshed schema for table #" << table_id;
output(ss.str());
};

TMTContext & tmt = context.getTMTContext();
auto schema_syncer = tmt.getSchemaSyncer();
auto mock_schema_syncer = std::dynamic_pointer_cast<MockSchemaSyncer>(schema_syncer);
if (!mock_schema_syncer)
throw Exception("Debug function refresh_schema can only be used under mock schema syncer.");

TableID table_id = mock_schema_syncer->getTableIdByName(database_name, table_name);
auto storage = tmt.getStorages().getByName(database_name, table_name);

if (storage == nullptr && table_id == InvalidTableID)
// Table does not exist in CH nor TiDB, error out.
throw Exception("Table " + database_name + "." + table_name + " doesn't exist in tidb", ErrorCodes::UNKNOWN_TABLE);

if (storage == nullptr && table_id != InvalidTableID)
{
// Table does not exist in CH, but exists in TiDB.
// Might be renamed or never synced.
// Note there will be a dangling table in CH for the following scenario:
// Table t was synced to CH already, then t was renamed (name changed) and truncated (ID changed).
// Then this function was called with the new name given, the table will be synced to a new table.
// User must manually call this function with the old name to remove the dangling table in CH.
mock_schema_syncer->syncSchema(context, table_id, true);

log(table_id);

return;
}

if (table_id == InvalidTableID)
{
// Table exists in CH, but does not exist in TiDB.
// Just sync it using the storage's ID, syncer will then remove it.
mock_schema_syncer->syncSchema(context, storage->getTableInfo().id, true);

log(table_id);

return;
}
schema_syncer->syncSchemas(context);

// Table exists in both CH and TiDB.
if (table_id != storage->getTableInfo().id)
{
// Table in TiDB is not the old one, i.e. dropped/renamed then recreated.
// Sync the old one in CH first, then sync the new one.
mock_schema_syncer->syncSchema(context, storage->getTableInfo().id, true);
mock_schema_syncer->syncSchema(context, table_id, true);

log(table_id);

return;
}

// Table in TiDB is the same one as in CH.
// Just sync it.
mock_schema_syncer->syncSchema(context, table_id, true);

log(table_id);
std::stringstream ss;
ss << "schemas refreshed";
output(ss.str());
}

} // namespace DB
11 changes: 8 additions & 3 deletions dbms/src/Debug/dbgFuncSchema.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,19 @@ namespace DB

class Context;

// Enable/disable schema sync service.
// Usage:
// ./storages-client.sh "DBGInvoke enable_schema_sync_service(enable)"
void dbgFuncEnableSchemaSyncService(Context & context, const ASTs & args, DBGInvoker::Printer output);

// Change whether to mock schema syncer.
// Usage:
// ./storages-client.sh "DBGInvoke mock_schema_syncer(enabled)"
void dbgFuncMockSchemaSyncer(Context & context, const ASTs & args, DBGInvoker::Printer output);

// Refresh schema of the given table.
// Refresh schemas for all tables.
// Usage:
// ./storage-client.sh "DBGInvoke refresh_schema(database_name, table_name)"
void dbgFuncRefreshSchema(Context & context, const ASTs & args, DBGInvoker::Printer output);
// ./storage-client.sh "DBGInvoke refresh_schemas()"
void dbgFuncRefreshSchemas(Context & context, const ASTs & args, DBGInvoker::Printer output);

} // namespace DB
4 changes: 2 additions & 2 deletions dbms/src/Interpreters/Context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1450,12 +1450,12 @@ void Context::initializeSchemaSyncService()
shared->schema_sync_service = std::make_shared<SchemaSyncService>(*this);
}

SchemaSyncService & Context::getSchemaSyncService()
SchemaSyncServicePtr Context::getSchemaSyncService()
{
auto lock = getLock();
if (!shared->schema_sync_service)
throw Exception("Schema Sync Service is not initialized.", ErrorCodes::LOGICAL_ERROR);
return *shared->schema_sync_service;
return shared->schema_sync_service;
}

zkutil::ZooKeeperPtr Context::getZooKeeper() const
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Interpreters/Context.h
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ class Context
TiDBService & getTiDBService();

void initializeSchemaSyncService();
SchemaSyncService & getSchemaSyncService();
SchemaSyncServicePtr getSchemaSyncService();

Clusters & getClusters() const;
std::shared_ptr<Cluster> getCluster(const std::string & cluster_name) const;
Expand Down
Loading