Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FLASH-489 support key condition for coprocessor query #261

Merged
merged 7 commits into from
Sep 30, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions dbms/src/Debug/MockTiDB.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,8 @@ DatabaseID MockTiDB::newDataBase(const String & database_name)
return schema_id;
}

TableID MockTiDB::newTable(const String & database_name, const String & table_name, const ColumnsDescription & columns, Timestamp tso)
TableID MockTiDB::newTable(const String & database_name, const String & table_name,
const ColumnsDescription & columns, Timestamp tso, const String & handle_pk_name)
{
std::lock_guard lock(tables_mutex);

Expand All @@ -153,14 +154,21 @@ TableID MockTiDB::newTable(const String & database_name, const String & table_na
table_info.db_name = database_name;
table_info.id = table_id_allocator++;
table_info.name = table_name;
table_info.pk_is_handle = false;

int i = 1;
for (auto & column : columns.getAllPhysical())
{
table_info.columns.emplace_back(reverseGetColumnInfo(column, i++, Field()));
if (handle_pk_name == column.name)
{
if (!column.type->isInteger() && !column.type->isUnsignedInteger())
throw Exception("MockTiDB pk column must be integer or unsigned integer type", ErrorCodes::LOGICAL_ERROR);
table_info.columns.back().setPriKeyFlag();
table_info.pk_is_handle = true;
}
}

table_info.pk_is_handle = false;
table_info.comment = "Mocked.";
table_info.update_timestamp = tso;

Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Debug/MockTiDB.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ class MockTiDB : public ext::singleton<MockTiDB>
using TablePtr = std::shared_ptr<Table>;

public:
TableID newTable(const String & database_name, const String & table_name, const ColumnsDescription & columns, Timestamp tso);
TableID newTable(const String & database_name, const String & table_name,
const ColumnsDescription & columns, Timestamp tso, const String & handle_pk_name);

DatabaseID newDataBase(const String & database_name);

Expand Down
19 changes: 11 additions & 8 deletions dbms/src/Debug/dbgFuncCoprocessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,32 +163,35 @@ void compileExpr(const DAGSchema & input, ASTPtr ast, tipb::Expr * expr, std::un
{
expr->set_sig(tipb::ScalarFuncSig::EQInt);
auto * ft = expr->mutable_field_type();
// TODO: TiDB will infer Int64.
ft->set_tp(TiDB::TypeTiny);
ft->set_tp(TiDB::TypeLongLong);
ft->set_flag(TiDB::ColumnFlagUnsigned);
}
else if (func_name_lowercase == "and")
{
expr->set_sig(tipb::ScalarFuncSig::LogicalAnd);
auto * ft = expr->mutable_field_type();
// TODO: TiDB will infer Int64.
ft->set_tp(TiDB::TypeTiny);
ft->set_tp(TiDB::TypeLongLong);
ft->set_flag(TiDB::ColumnFlagUnsigned);
}
else if (func_name_lowercase == "or")
{
expr->set_sig(tipb::ScalarFuncSig::LogicalOr);
auto * ft = expr->mutable_field_type();
// TODO: TiDB will infer Int64.
ft->set_tp(TiDB::TypeTiny);
ft->set_tp(TiDB::TypeLongLong);
ft->set_flag(TiDB::ColumnFlagUnsigned);
}
else if (func_name_lowercase == "greater")
{
expr->set_sig(tipb::ScalarFuncSig::GTInt);
auto * ft = expr->mutable_field_type();
// TODO: TiDB will infer Int64.
ft->set_tp(TiDB::TypeTiny);
ft->set_tp(TiDB::TypeLongLong);
ft->set_flag(TiDB::ColumnFlagUnsigned);
}
else if (func_name_lowercase == "greaterorequals")
{
expr->set_sig(tipb::ScalarFuncSig::GEInt);
auto *ft = expr->mutable_field_type();
ft->set_tp(TiDB::TypeLongLong);
ft->set_flag(TiDB::ColumnFlagUnsigned);
}
else
Expand Down
10 changes: 7 additions & 3 deletions dbms/src/Debug/dbgFuncMockTiDBTable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,17 @@ extern const int LOGICAL_ERROR;

void MockTiDBTable::dbgFuncMockTiDBTable(Context & context, const ASTs & args, DBGInvoker::Printer output)
{
if (args.size() != 3)
throw Exception("Args not matched, should be: database-name, table-name, schema-string", ErrorCodes::BAD_ARGUMENTS);
if (args.size() != 3 && args.size() != 4)
throw Exception("Args not matched, should be: database-name, table-name, schema-string [, handle_pk_name]", ErrorCodes::BAD_ARGUMENTS);

const String & database_name = typeid_cast<const ASTIdentifier &>(*args[0]).name;
const String & table_name = typeid_cast<const ASTIdentifier &>(*args[1]).name;

auto schema_str = safeGet<String>(typeid_cast<const ASTLiteral &>(*args[2]).value);
String handle_pk_name = "";
if (args.size() == 4)
handle_pk_name = safeGet<String>(typeid_cast<const ASTLiteral &>(*args[3]).value);

ASTPtr columns_ast;
ParserColumnDeclarationList schema_parser;
Tokens tokens(schema_str.data(), schema_str.data() + schema_str.length());
Expand All @@ -43,7 +47,7 @@ void MockTiDBTable::dbgFuncMockTiDBTable(Context & context, const ASTs & args, D
= InterpreterCreateQuery::getColumnsDescription(typeid_cast<const ASTExpressionList &>(*columns_ast), context);
auto tso = context.getTMTContext().getPDClient()->getTS();

TableID table_id = MockTiDB::instance().newTable(database_name, table_name, columns, tso);
TableID table_id = MockTiDB::instance().newTable(database_name, table_name, columns, tso, handle_pk_name);

std::stringstream ss;
ss << "mock table #" << table_id;
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Debug/dbgTools.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ Field convertField(const ColumnInfo & column_info, const Field & field)

void encodeRow(const TiDB::TableInfo & table_info, const std::vector<Field> & fields, std::stringstream & ss)
{
if (table_info.columns.size() != fields.size())
if (table_info.columns.size() != fields.size() + table_info.pk_is_handle)
throw Exception("Encoding row has different sizes between columns and values", ErrorCodes::LOGICAL_ERROR);
for (size_t i = 0; i < fields.size(); i++)
{
Expand All @@ -261,7 +261,7 @@ void insert(const TiDB::TableInfo & table_info, RegionID region_id, HandleID han
fields.emplace_back(field);
idx++;
}
if (fields.size() != table_info.columns.size())
if (fields.size() + table_info.pk_is_handle != table_info.columns.size())
throw Exception("Number of insert values and columns do not match.", ErrorCodes::LOGICAL_ERROR);

TMTContext & tmt = context.getTMTContext();
Expand Down
37 changes: 34 additions & 3 deletions dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
#include <Interpreters/Context.h>
#include <Interpreters/Set.h>
#include <Interpreters/convertFieldToType.h>
#include <Parsers/ASTIdentifier.h>
#include <Storages/StorageMergeTree.h>
#include <Storages/Transaction/Codec.h>
#include <Storages/Transaction/TypeMapping.h>

Expand Down Expand Up @@ -214,7 +216,7 @@ void constructTZExpr(tipb::Expr & tz_expr, const tipb::DAGRequest & rqst, bool f
}
}

bool hasMeaningfulTZInfo(const tipb::DAGRequest &rqst)
bool hasMeaningfulTZInfo(const tipb::DAGRequest & rqst)
{
if (rqst.has_time_zone_name() && rqst.time_zone_name().length() > 0)
return rqst.time_zone_name() != "UTC";
Expand Down Expand Up @@ -249,7 +251,7 @@ String DAGExpressionAnalyzer::appendTimeZoneCast(
// column with UTC timezone will never be used in during agg), all the column with ts datatype will
// convert back to UTC timezone
bool DAGExpressionAnalyzer::appendTimeZoneCastsAfterTS(
ExpressionActionsChain &chain, std::vector<bool> is_ts_column, const tipb::DAGRequest &rqst)
ExpressionActionsChain & chain, std::vector<bool> is_ts_column, const tipb::DAGRequest & rqst)
{
if (!hasMeaningfulTZInfo(rqst))
return false;
Expand Down Expand Up @@ -391,6 +393,35 @@ String DAGExpressionAnalyzer::appendCastIfNeeded(const tipb::Expr & expr, Expres
return expr_name;
}

void DAGExpressionAnalyzer::makeExplicitSetForIndex(const tipb::Expr & expr, const TMTStoragePtr & storage)
{
for (auto & child : expr.children())
{
makeExplicitSetForIndex(child, storage);
}
if (expr.tp() != tipb::ExprType::ScalarFunc)
{
return;
}
const String & func_name = getFunctionName(expr);
// only support col_name in (value_list)
if (isInOrGlobalInOperator(func_name) && expr.children(0).tp() == tipb::ExprType::ColumnRef && !prepared_sets.count(&expr))
{
NamesAndTypesList column_list;
for (const auto & col : getCurrentInputColumns())
{
column_list.emplace_back(col.name, col.type);
}
ExpressionActionsPtr temp_actions = std::make_shared<ExpressionActions>(column_list, settings);
String name = getActions(expr.children(0), temp_actions);
ASTPtr name_ast = std::make_shared<ASTIdentifier>(name);
if (storage->mayBenefitFromIndexForIn(name_ast))
{
makeExplicitSet(expr, temp_actions->getSampleBlock(), true, name);
}
}
}

void DAGExpressionAnalyzer::makeExplicitSet(
const tipb::Expr & expr, const Block & sample_block, bool create_ordered_set, const String & left_arg_name)
{
Expand All @@ -400,7 +431,7 @@ void DAGExpressionAnalyzer::makeExplicitSet(
}
DataTypes set_element_types;
// todo support tuple in, i.e. (a,b) in ((1,2), (3,4)), currently TiDB convert tuple in into a series of or/and/eq exprs
// which means tuple in is never be pushed to coprocessor, but it is quite in-efficient
// which means tuple in is never be pushed to coprocessor, but it is quite in-efficient
set_element_types.push_back(sample_block.getByName(left_arg_name).type);

// todo if this is a single value in, then convert it to equal expr
Expand Down
6 changes: 4 additions & 2 deletions dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <Flash/Coprocessor/DAGUtils.h>
#include <Interpreters/AggregateDescription.h>
#include <Interpreters/ExpressionActions.h>
#include <Storages/Transaction/TMTStorages.h>

namespace DB
{
Expand Down Expand Up @@ -60,11 +61,12 @@ class DAGExpressionAnalyzer : private boost::noncopyable
String getActions(const tipb::Expr & expr, ExpressionActionsPtr & actions);
const std::vector<NameAndTypePair> & getCurrentInputColumns();
void makeExplicitSet(const tipb::Expr & expr, const Block & sample_block, bool create_ordered_set, const String & left_arg_name);
void makeExplicitSetForIndex(const tipb::Expr & expr, const TMTStoragePtr & storage);
String applyFunction(const String & func_name, Names & arg_names, ExpressionActionsPtr & actions);
Int32 getImplicitCastCount() { return implicit_cast_count; };
bool appendTimeZoneCastsAfterTS(ExpressionActionsChain &chain, std::vector<bool> is_ts_column,
const tipb::DAGRequest &rqst);
bool appendTimeZoneCastsAfterTS(ExpressionActionsChain & chain, std::vector<bool> is_ts_column, const tipb::DAGRequest & rqst);
String appendTimeZoneCast(const String & tz_col, const String & ts_col, const String & func_name, ExpressionActionsPtr & actions);
DAGPreparedSets getPreparedSets() { return prepared_sets; }
};

} // namespace DB
23 changes: 23 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGQueryInfo.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#pragma once

#include <unordered_map>

#include <Flash/Coprocessor/DAGExpressionAnalyzer.h>
#include <Flash/Coprocessor/DAGQuerySource.h>

namespace DB
{

struct DAGQueryInfo
{
DAGQueryInfo(const DAGQuerySource & dag_, DAGPreparedSets dag_sets_, std::vector<NameAndTypePair> & source_columns_)
: dag(dag_), dag_sets(std::move(dag_sets_))
{
for (auto & c : source_columns_)
source_columns.emplace_back(c.name, c.type);
};
const DAGQuerySource & dag;
DAGPreparedSets dag_sets;
NamesAndTypesList source_columns;
};
} // namespace DB
10 changes: 10 additions & 0 deletions dbms/src/Flash/Coprocessor/InterpreterDAG.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <DataStreams/PartialSortingBlockInputStream.h>
#include <DataStreams/UnionBlockInputStream.h>
#include <Flash/Coprocessor/DAGExpressionAnalyzer.h>
#include <Flash/Coprocessor/DAGQueryInfo.h>
#include <Flash/Coprocessor/DAGUtils.h>
#include <Interpreters/Aggregator.h>
#include <Parsers/ASTSelectQuery.h>
Expand Down Expand Up @@ -208,9 +209,18 @@ void InterpreterDAG::executeTS(const tipb::TableScan & ts, Pipeline & pipeline)
if (!checkKeyRanges(dag.getKeyRanges(), table_id, storage->pkIsUInt64()))
throw Exception("Cop request only support full range scan for given region", ErrorCodes::COP_BAD_DAG_REQUEST);

if (dag.hasSelection())
{
for (auto & condition : dag.getSelection().conditions())
{
analyzer->makeExplicitSetForIndex(condition, storage);
}
}
//todo support index in
SelectQueryInfo query_info;
// set query to avoid unexpected NPE
query_info.query = dag.getAST();
query_info.dag_query = std::make_unique<DAGQueryInfo>(dag, analyzer->getPreparedSets(), source_columns);
query_info.mvcc_query_info = std::make_unique<MvccQueryInfo>();
query_info.mvcc_query_info->resolve_locks = true;
query_info.mvcc_query_info->read_tso = settings.read_tso;
Expand Down
Loading