Skip to content

Commit

Permalink
Add schema sync on read and simplify schema syncer interface and adju…
Browse files Browse the repository at this point in the history
…st mock stuff (#108)

* Add sync schema on read

* Simplify schema syncer interface and adjust mock stuff

* Rename default schema version setting

* Compensate last commit
  • Loading branch information
zanmato1984 authored and hanfei1991 committed Jul 18, 2019
1 parent 149a114 commit f942f8a
Show file tree
Hide file tree
Showing 16 changed files with 122 additions and 174 deletions.
2 changes: 1 addition & 1 deletion dbms/src/Core/Defines.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
#define DEFAULT_MAX_COMPRESS_BLOCK_SIZE 1048576

#define DEFAULT_MAX_READ_TSO 0xFFFFFFFFFFFFFFFF
#define DEFAULT_SCHEMA_VERSION -1
#define DEFAULT_UNSPECIFIED_SCHEMA_VERSION -1

/** Which blocks by default read the data (by number of rows).
* Smaller values give better cache locality, less consumption of RAM, but more overhead to process the query.
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Debug/DBGInvoker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,9 @@ DBGInvoker::DBGInvoker()

regFunc("dump_all_region", dbgFuncDumpAllRegion);

regFunc("enable_schema_sync_service", dbgFuncEnableSchemaSyncService);
regFunc("mock_schema_syncer", dbgFuncMockSchemaSyncer);
regFunc("refresh_schema", dbgFuncRefreshSchema);
regFunc("refresh_schemas", dbgFuncRefreshSchemas);
}

void replaceSubstr(std::string & str, const std::string & target, const std::string & replacement)
Expand Down
55 changes: 26 additions & 29 deletions dbms/src/Debug/MockSchemaSyncer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -235,31 +235,36 @@ AlterCommands detectSchemaChanges(const TableInfo & table_info, const TableInfo

MockSchemaSyncer::MockSchemaSyncer() : log(&Logger::get("MockSchemaSyncer")) {}

bool MockSchemaSyncer::syncSchemas(Context & /*context*/)
bool MockSchemaSyncer::syncSchemas(Context & context)
{
// Don't do full schema sync, we want to test schema sync timing in a fine-grained fashion.
return false;
std::unordered_map<TableID, MockTiDB::TablePtr> new_tables;
MockTiDB::instance().traverseTables([&](const auto & table) { new_tables.emplace(table->id(), table); });

for (auto [id, table] : tables)
{
if (new_tables.find(id) == new_tables.end())
dropTable(table->table_info.db_name, table->table_info.name, context);
}

for (auto [id, table] : new_tables)
{
std::ignore = id;
syncTable(context, table);
}

tables.swap(new_tables);

return true;
}

void MockSchemaSyncer::syncSchema(Context & context, TableID table_id, bool /*lock*/)
void MockSchemaSyncer::syncTable(Context & context, MockTiDB::TablePtr table)
{
auto & tmt_context = context.getTMTContext();

/// Get table schema json from TiDB/TiKV.
String table_info_json = getSchemaJson(table_id, context);
if (table_info_json.empty())
{
/// Table dropped.
auto storage = tmt_context.getStorages().get(table_id);
if (storage == nullptr)
{
LOG_DEBUG(log, __PRETTY_FUNCTION__ << ": Table " << table_id << "doesn't exist in TiDB and doesn't exist in TMT, do nothing.");
return;
}
LOG_DEBUG(log, __PRETTY_FUNCTION__ << ": Table " << table_id << "doesn't exist in TiDB, dropping.");
dropTable(storage->getDatabaseName(), storage->getTableName(), context);
return;
}
/// Get table schema json.
auto table_id = table->id();

String table_info_json = table->table_info.serialize(false);

LOG_DEBUG(log, __PRETTY_FUNCTION__ << ": Table " << table_id << " info json: " << table_info_json);

Expand Down Expand Up @@ -326,8 +331,7 @@ void MockSchemaSyncer::syncSchema(Context & context, TableID table_id, bool /*lo
}

/// Table existing, detect schema changes and apply.
auto merge_tree = std::dynamic_pointer_cast<StorageMergeTree>(storage);
const TableInfo & orig_table_info = merge_tree->getTableInfo();
const TableInfo & orig_table_info = storage->getTableInfo();
AlterCommands alter_commands = detectSchemaChanges(table_info, orig_table_info);

std::stringstream ss;
Expand All @@ -345,15 +349,8 @@ void MockSchemaSyncer::syncSchema(Context & context, TableID table_id, bool /*lo

LOG_DEBUG(log, __PRETTY_FUNCTION__ << ": " << ss.str());

{
// Change internal TableInfo in TMT first.
// TODO: Ideally this should be done within alter function, however we are limited by the narrow alter interface, thus not truly atomic.
auto table_hard_lock = storage->lockStructureForAlter(__PRETTY_FUNCTION__);
merge_tree->setTableInfo(table_info);
}

// Call storage alter to apply schema changes.
storage->alter(alter_commands, table_info.db_name, table_info.name, context);
storage->alterForTMT(alter_commands, table_info, context);

LOG_DEBUG(log, __PRETTY_FUNCTION__ << ": Schema changes apply done.");

Expand Down
13 changes: 5 additions & 8 deletions dbms/src/Debug/MockSchemaSyncer.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

#include <Debug/MockTiDB.h>

#include <unordered_map>

namespace DB
{

Expand All @@ -12,17 +14,12 @@ class MockSchemaSyncer : public SchemaSyncer

bool syncSchemas(Context & context) override;

void syncSchema(Context & context, TableID table_id, bool) override;

TableID getTableIdByName(const std::string & database_name, const std::string & table_name)
{
return MockTiDB::instance().getTableIDByName(database_name, table_name);
}

protected:
String getSchemaJson(TableID table_id, Context & /*context*/) { return MockTiDB::instance().getSchemaJson(table_id); }
void syncTable(Context & context, MockTiDB::TablePtr table);

Logger * log;

std::unordered_map<TableID, MockTiDB::TablePtr> tables;
};

} // namespace DB
34 changes: 7 additions & 27 deletions dbms/src/Debug/MockTiDB.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,33 +26,6 @@ Table::Table(const String & database_name_, const String & table_name_, TableInf
: table_info(std::move(table_info_)), database_name(database_name_), table_name(table_name_)
{}

String MockTiDB::getSchemaJson(TableID table_id)
{
std::lock_guard lock(tables_mutex);

auto it = tables_by_id.find(table_id);
if (it == tables_by_id.end())
{
return "";
}

return it->second->table_info.serialize(false);
}

TableID MockTiDB::getTableIDByName(const std::string & database_name, const std::string & table_name)
{
std::lock_guard lock(tables_mutex);

String qualified_name = database_name + "." + table_name;
auto it = tables_by_name.find(qualified_name);
if (it == tables_by_name.end())
{
return InvalidTableID;
}

return it->second->table_info.id;
}

void MockTiDB::dropTable(const String & database_name, const String & table_name)
{
std::lock_guard lock(tables_mutex);
Expand Down Expand Up @@ -251,6 +224,13 @@ TablePtr MockTiDB::getTableByName(const String & database_name, const String & t
return getTableByNameInternal(database_name, table_name);
}

void MockTiDB::traverseTables(std::function<void(TablePtr)> f)
{
std::lock_guard lock(tables_mutex);

std::for_each(tables_by_id.begin(), tables_by_id.end(), [&](const auto & pair) { f(pair.second); });
}

TablePtr MockTiDB::getTableByNameInternal(const String & database_name, const String & table_name)
{
String qualified_name = database_name + "." + table_name;
Expand Down
6 changes: 2 additions & 4 deletions dbms/src/Debug/MockTiDB.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,6 @@ class MockTiDB : public ext::singleton<MockTiDB>
using TablePtr = std::shared_ptr<Table>;

public:
String getSchemaJson(TableID table_id);

TableID getTableIDByName(const std::string & database_name, const std::string & table_name);

TableID newTable(const String & database_name, const String & table_name, const ColumnsDescription & columns);

TableID newPartition(const String & database_name, const String & table_name, const String & partition_name);
Expand All @@ -80,6 +76,8 @@ class MockTiDB : public ext::singleton<MockTiDB>

TablePtr getTableByName(const String & database_name, const String & table_name);

void traverseTables(std::function<void(TablePtr)> f);

private:
TablePtr getTableByNameInternal(const String & database_name, const String & table_name);

Expand Down
92 changes: 23 additions & 69 deletions dbms/src/Debug/dbgFuncSchema.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTLiteral.h>
#include <Storages/StorageMergeTree.h>
#include <Storages/Transaction/SchemaSyncService.h>
#include <Storages/Transaction/SchemaSyncer.h>
#include <Storages/Transaction/TMTContext.h>
#include <Storages/Transaction/TiDB.h>
Expand All @@ -15,6 +16,23 @@ namespace ErrorCodes
extern const int UNKNOWN_TABLE;
} // namespace ErrorCodes

void dbgFuncEnableSchemaSyncService(Context & context, const ASTs & args, DBGInvoker::Printer output)
{
if (args.size() != 1)
throw Exception("Args not matched, should be: enable (true/false)", ErrorCodes::BAD_ARGUMENTS);

bool enable = safeGet<String>(typeid_cast<const ASTLiteral &>(*args[0]).value) == "true";

if (enable)
context.initializeSchemaSyncService();
else
context.getSchemaSyncService().reset();

std::stringstream ss;
ss << "schema sync service " << (enable ? "enabled" : "disabled");
output(ss.str());
}

void dbgFuncMockSchemaSyncer(Context & context, const ASTs & args, DBGInvoker::Printer output)
{
if (args.size() != 1)
Expand All @@ -39,79 +57,15 @@ void dbgFuncMockSchemaSyncer(Context & context, const ASTs & args, DBGInvoker::P
output(ss.str());
}

void dbgFuncRefreshSchema(Context & context, const ASTs & args, DBGInvoker::Printer output)
void dbgFuncRefreshSchemas(Context & context, const ASTs &, DBGInvoker::Printer output)
{
if (args.size() != 2)
throw Exception("Args not matched, should be: database-name, table-name", ErrorCodes::BAD_ARGUMENTS);

std::string database_name = typeid_cast<const ASTIdentifier &>(*args[0]).name;
std::transform(database_name.begin(), database_name.end(), database_name.begin(), ::tolower);
std::string table_name = typeid_cast<const ASTIdentifier &>(*args[1]).name;
std::transform(table_name.begin(), table_name.end(), table_name.begin(), ::tolower);

auto log = [&](TableID table_id) {
std::stringstream ss;
ss << "refreshed schema for table #" << table_id;
output(ss.str());
};

TMTContext & tmt = context.getTMTContext();
auto schema_syncer = tmt.getSchemaSyncer();
auto mock_schema_syncer = std::dynamic_pointer_cast<MockSchemaSyncer>(schema_syncer);
if (!mock_schema_syncer)
throw Exception("Debug function refresh_schema can only be used under mock schema syncer.");

TableID table_id = mock_schema_syncer->getTableIdByName(database_name, table_name);
auto storage = tmt.getStorages().getByName(database_name, table_name);

if (storage == nullptr && table_id == InvalidTableID)
// Table does not exist in CH nor TiDB, error out.
throw Exception("Table " + database_name + "." + table_name + " doesn't exist in tidb", ErrorCodes::UNKNOWN_TABLE);

if (storage == nullptr && table_id != InvalidTableID)
{
// Table does not exist in CH, but exists in TiDB.
// Might be renamed or never synced.
// Note there will be a dangling table in CH for the following scenario:
// Table t was synced to CH already, then t was renamed (name changed) and truncated (ID changed).
// Then this function was called with the new name given, the table will be synced to a new table.
// User must manually call this function with the old name to remove the dangling table in CH.
mock_schema_syncer->syncSchema(context, table_id, true);

log(table_id);

return;
}

if (table_id == InvalidTableID)
{
// Table exists in CH, but does not exist in TiDB.
// Just sync it using the storage's ID, syncer will then remove it.
mock_schema_syncer->syncSchema(context, storage->getTableInfo().id, true);

log(table_id);

return;
}
schema_syncer->syncSchemas(context);

// Table exists in both CH and TiDB.
if (table_id != storage->getTableInfo().id)
{
// Table in TiDB is not the old one, i.e. dropped/renamed then recreated.
// Sync the old one in CH first, then sync the new one.
mock_schema_syncer->syncSchema(context, storage->getTableInfo().id, true);
mock_schema_syncer->syncSchema(context, table_id, true);

log(table_id);

return;
}

// Table in TiDB is the same one as in CH.
// Just sync it.
mock_schema_syncer->syncSchema(context, table_id, true);

log(table_id);
std::stringstream ss;
ss << "schemas refreshed";
output(ss.str());
}

} // namespace DB
11 changes: 8 additions & 3 deletions dbms/src/Debug/dbgFuncSchema.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,19 @@ namespace DB

class Context;

// Enable/disable schema sync service.
// Usage:
// ./storages-client.sh "DBGInvoke enable_schema_sync_service(enable)"
void dbgFuncEnableSchemaSyncService(Context & context, const ASTs & args, DBGInvoker::Printer output);

// Change whether to mock schema syncer.
// Usage:
// ./storages-client.sh "DBGInvoke mock_schema_syncer(enabled)"
void dbgFuncMockSchemaSyncer(Context & context, const ASTs & args, DBGInvoker::Printer output);

// Refresh schema of the given table.
// Refresh schemas for all tables.
// Usage:
// ./storage-client.sh "DBGInvoke refresh_schema(database_name, table_name)"
void dbgFuncRefreshSchema(Context & context, const ASTs & args, DBGInvoker::Printer output);
// ./storage-client.sh "DBGInvoke refresh_schemas()"
void dbgFuncRefreshSchemas(Context & context, const ASTs & args, DBGInvoker::Printer output);

} // namespace DB
4 changes: 2 additions & 2 deletions dbms/src/Interpreters/Context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1450,12 +1450,12 @@ void Context::initializeSchemaSyncService()
shared->schema_sync_service = std::make_shared<SchemaSyncService>(*this);
}

SchemaSyncService & Context::getSchemaSyncService()
SchemaSyncServicePtr Context::getSchemaSyncService()
{
auto lock = getLock();
if (!shared->schema_sync_service)
throw Exception("Schema Sync Service is not initialized.", ErrorCodes::LOGICAL_ERROR);
return *shared->schema_sync_service;
return shared->schema_sync_service;
}

zkutil::ZooKeeperPtr Context::getZooKeeper() const
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Interpreters/Context.h
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ class Context
TiDBService & getTiDBService();

void initializeSchemaSyncService();
SchemaSyncService & getSchemaSyncService();
SchemaSyncServicePtr getSchemaSyncService();

Clusters & getClusters() const;
std::shared_ptr<Cluster> getCluster(const std::string & cluster_name) const;
Expand Down
Loading

0 comments on commit f942f8a

Please sign in to comment.