Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FLASH-437 Support time zone in coprocessor #259

Merged
merged 8 commits into from
Sep 27, 2019
11 changes: 11 additions & 0 deletions dbms/src/Common/MyTime.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -473,4 +473,15 @@ void convertTimeZone(UInt64 from_time, UInt64 & to_time, const DateLUTImpl & tim
to_time = to_my_time.toPackedUInt();
}

void convertTimeZoneByOffset(UInt64 from_time, UInt64 & to_time, Int64 offset, const DateLUTImpl & time_zone)
{
MyDateTime from_my_time(from_time);
time_t epoch = time_zone.makeDateTime(
from_my_time.year, from_my_time.month, from_my_time.day, from_my_time.hour, from_my_time.minute, from_my_time.second);
epoch += offset;
MyDateTime to_my_time(time_zone.toYear(epoch), time_zone.toMonth(epoch), time_zone.toDayOfMonth(epoch),
time_zone.toHour(epoch), time_zone.toMinute(epoch), time_zone.toSecond(epoch), from_my_time.micro_second);
to_time = to_my_time.toPackedUInt();
}

} // namespace DB
2 changes: 2 additions & 0 deletions dbms/src/Common/MyTime.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,6 @@ Field parseMyDateTime(const String & str);

void convertTimeZone(UInt64 from_time, UInt64 & to_time, const DateLUTImpl & time_zone_from, const DateLUTImpl & time_zone_to);

void convertTimeZoneByOffset(UInt64 from_time, UInt64 & to_time, Int64 offset, const DateLUTImpl & time_zone);

} // namespace DB
57 changes: 46 additions & 11 deletions dbms/src/Debug/dbgFuncCoprocessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,20 +38,27 @@ using DAGColumnInfo = std::pair<String, ColumnInfo>;
using DAGSchema = std::vector<DAGColumnInfo>;
using SchemaFetcher = std::function<TableInfo(const String &, const String &)>;
std::tuple<TableID, DAGSchema, tipb::DAGRequest> compileQuery(
Context & context, const String & query, SchemaFetcher schema_fetcher, Timestamp start_ts);
Context & context, const String & query, SchemaFetcher schema_fetcher, Timestamp start_ts,
Int64 tz_offset, const String & tz_name);
tipb::SelectResponse executeDAGRequest(
Context & context, const tipb::DAGRequest & dag_request, RegionID region_id, UInt64 region_version, UInt64 region_conf_version);
BlockInputStreamPtr outputDAGResponse(Context & context, const DAGSchema & schema, const tipb::SelectResponse & dag_response);

BlockInputStreamPtr dbgFuncDAG(Context & context, const ASTs & args)
{
if (args.size() < 1 || args.size() > 2)
throw Exception("Args not matched, should be: query[, region-id]", ErrorCodes::BAD_ARGUMENTS);
if (args.size() < 1 || args.size() > 4)
throw Exception("Args not matched, should be: query[, region-id, tz_offset, tz_name]", ErrorCodes::BAD_ARGUMENTS);

String query = safeGet<String>(typeid_cast<const ASTLiteral &>(*args[0]).value);
RegionID region_id = InvalidRegionID;
if (args.size() == 2)
if (args.size() >= 2)
region_id = safeGet<RegionID>(typeid_cast<const ASTLiteral &>(*args[1]).value);
Int64 tz_offset = 0;
String tz_name = "";
if (args.size() >= 3)
tz_offset = get<Int64>(typeid_cast<const ASTLiteral &>(*args[2]).value);
if (args.size() >= 4)
tz_name = safeGet<String>(typeid_cast<const ASTLiteral &>(*args[3]).value);
Timestamp start_ts = context.getTMTContext().getPDClient()->getTS();

auto [table_id, schema, dag_request] = compileQuery(
Expand All @@ -63,7 +70,7 @@ BlockInputStreamPtr dbgFuncDAG(Context & context, const ASTs & args)
throw Exception("Not TMT", ErrorCodes::BAD_ARGUMENTS);
return mmt->getTableInfo();
},
start_ts);
start_ts, tz_offset, tz_name);

RegionPtr region;
if (region_id == InvalidRegionID)
Expand All @@ -86,23 +93,29 @@ BlockInputStreamPtr dbgFuncDAG(Context & context, const ASTs & args)

BlockInputStreamPtr dbgFuncMockDAG(Context & context, const ASTs & args)
{
if (args.size() < 2 || args.size() > 3)
throw Exception("Args not matched, should be: query, region-id[, start-ts]", ErrorCodes::BAD_ARGUMENTS);
if (args.size() < 2 || args.size() > 5)
throw Exception("Args not matched, should be: query, region-id[, start-ts, tz_offset, tz_name]", ErrorCodes::BAD_ARGUMENTS);

String query = safeGet<String>(typeid_cast<const ASTLiteral &>(*args[0]).value);
RegionID region_id = safeGet<RegionID>(typeid_cast<const ASTLiteral &>(*args[1]).value);
Timestamp start_ts = DEFAULT_MAX_READ_TSO;
if (args.size() == 3)
if (args.size() >= 3)
start_ts = safeGet<Timestamp>(typeid_cast<const ASTLiteral &>(*args[2]).value);
if (start_ts == 0)
start_ts = context.getTMTContext().getPDClient()->getTS();
Int64 tz_offset = 0;
String tz_name = "";
if (args.size() >= 3)
tz_offset = safeGet<Int64>(typeid_cast<const ASTLiteral &>(*args[2]).value);
if (args.size() >= 4)
tz_name = safeGet<String>(typeid_cast<const ASTLiteral &>(*args[3]).value);

auto [table_id, schema, dag_request] = compileQuery(
context, query,
[&](const String & database_name, const String & table_name) {
return MockTiDB::instance().getTableByName(database_name, table_name)->table_info;
},
start_ts);
start_ts, tz_offset, tz_name);
std::ignore = table_id;

RegionPtr region = context.getTMTContext().getKVStore()->getRegion(region_id);
Expand Down Expand Up @@ -170,6 +183,14 @@ void compileExpr(const DAGSchema & input, ASTPtr ast, tipb::Expr * expr, std::un
ft->set_tp(TiDB::TypeTiny);
ft->set_flag(TiDB::ColumnFlagUnsigned);
}
else if (func_name_lowercase == "greater")
{
expr->set_sig(tipb::ScalarFuncSig::GTInt);
auto * ft = expr->mutable_field_type();
// TODO: TiDB will infer Int64.
ft->set_tp(TiDB::TypeTiny);
ft->set_flag(TiDB::ColumnFlagUnsigned);
}
else
{
throw Exception("Unsupported function: " + func_name_lowercase, ErrorCodes::LOGICAL_ERROR);
Expand Down Expand Up @@ -239,10 +260,13 @@ void compileFilter(const DAGSchema & input, ASTPtr ast, tipb::Selection * filter
}

std::tuple<TableID, DAGSchema, tipb::DAGRequest> compileQuery(
Context & context, const String & query, SchemaFetcher schema_fetcher, Timestamp start_ts)
Context & context, const String & query, SchemaFetcher schema_fetcher,
Timestamp start_ts, Int64 tz_offset, const String & tz_name)
{
DAGSchema schema;
tipb::DAGRequest dag_request;
dag_request.set_time_zone_name(tz_name);
dag_request.set_time_zone_offset(tz_offset);

dag_request.set_start_ts(start_ts);

Expand Down Expand Up @@ -291,8 +315,11 @@ std::tuple<TableID, DAGSchema, tipb::DAGRequest> compileQuery(
ci.tp = column_info.tp;
ci.flag = column_info.flag;
ci.flen = column_info.flen;
ci.decimal = column_info.flen;
ci.decimal = column_info.decimal;
ci.elems = column_info.elems;
// a hack to test timestamp type in mock test
if (column_info.tp == TiDB::TypeDatetime && ci.decimal == 5)
ci.tp = TiDB::TypeTimestamp;
ts_output.emplace_back(std::make_pair(column_info.name, std::move(ci)));
}
executor_ctx_map.emplace(ts_exec, ExecutorCtx{nullptr, std::move(ts_output), std::unordered_map<String, tipb::Expr *>{}});
Expand Down Expand Up @@ -430,6 +457,14 @@ std::tuple<TableID, DAGSchema, tipb::DAGRequest> compileQuery(
ft->set_tp(TiDB::TypeLongLong);
ft->set_flag(TiDB::ColumnFlagUnsigned | TiDB::ColumnFlagNotNull);
}
else if (func->name == "max")
{
agg_func->set_tp(tipb::Max);
if (agg_func->children_size() != 1)
throw Exception("udaf max only accept 1 argument");
auto ft = agg_func->mutable_field_type();
ft->set_tp(agg_func->children(0).field_type().tp());
}
// TODO: Other agg func.
else
{
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Coprocessor/DAGDriver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ catch (const LockException & e)
}
catch (const Exception & e)
{
LOG_ERROR(log, __PRETTY_FUNCTION__ << ": Exception: " << e.displayText());
LOG_ERROR(log, __PRETTY_FUNCTION__ << ": Exception: " << e.getStackTrace().toString());
recordError(e.code(), e.message());
}
catch (const std::exception & e)
Expand Down
112 changes: 102 additions & 10 deletions dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <DataTypes/DataTypeSet.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/FieldToDataType.h>
#include <Flash/Coprocessor/DAGCodec.h>
#include <Flash/Coprocessor/DAGUtils.h>
#include <Functions/FunctionFactory.h>
#include <Interpreters/Context.h>
Expand Down Expand Up @@ -44,7 +45,7 @@ static String genFuncString(const String & func_name, const Names & argument_nam
return ss.str();
}

DAGExpressionAnalyzer::DAGExpressionAnalyzer(const NamesAndTypesList & source_columns_, const Context & context_)
DAGExpressionAnalyzer::DAGExpressionAnalyzer(const std::vector<NameAndTypePair> && source_columns_, const Context & context_)
: source_columns(source_columns_),
context(context_),
after_agg(false),
Expand Down Expand Up @@ -177,28 +178,114 @@ void DAGExpressionAnalyzer::appendOrderBy(ExpressionActionsChain & chain, const
}
}

const NamesAndTypesList & DAGExpressionAnalyzer::getCurrentInputColumns() { return after_agg ? aggregated_columns : source_columns; }
const std::vector<NameAndTypePair> & DAGExpressionAnalyzer::getCurrentInputColumns()
{
return after_agg ? aggregated_columns : source_columns;
}

void DAGExpressionAnalyzer::appendFinalProject(ExpressionActionsChain & chain, const NamesWithAliases & final_project)
{
initChain(chain, getCurrentInputColumns());
for (auto name : final_project)
for (const auto & name : final_project)
{
chain.steps.back().required_output.push_back(name.first);
}
}

void DAGExpressionAnalyzer::appendAggSelect(ExpressionActionsChain & chain, const tipb::Aggregation & aggregation)
void constructTZExpr(tipb::Expr & tz_expr, const tipb::DAGRequest & rqst, bool from_utc)
{
if (rqst.has_time_zone_name() && rqst.time_zone_name().length() > 0)
{
tz_expr.set_tp(tipb::ExprType::String);
tz_expr.set_val(rqst.time_zone_name());
auto * field_type = tz_expr.mutable_field_type();
field_type->set_tp(0xfe);
Copy link
Contributor

@zanmato1984 zanmato1984 Sep 27, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there symbols representing these magic numbers in TiDB.h?

field_type->set_flag(1);
}
else
{
tz_expr.set_tp(tipb::ExprType::Int64);
std::stringstream ss;
encodeDAGInt64(from_utc ? rqst.time_zone_offset() : -rqst.time_zone_offset(), ss);
tz_expr.set_val(ss.str());
auto * field_type = tz_expr.mutable_field_type();
field_type->set_tp(8);
field_type->set_flag(1);
}
}

bool hasMeaningfullTZInfo(const tipb::DAGRequest & rqst)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Meaningful.

{
if (rqst.has_time_zone_name() && rqst.time_zone_name().length() > 0)
return rqst.time_zone_name() != "UTC";
if (rqst.has_time_zone_offset())
return rqst.has_time_zone_offset() != 0;
return false;
}

String DAGExpressionAnalyzer::appendTimeZoneCast(
const String & tz_col, const String & ts_col, const String & func_name, ExpressionActionsPtr & actions)
{
Names cast_argument_names;
cast_argument_names.push_back(ts_col);
cast_argument_names.push_back(tz_col);
String cast_expr_name = applyFunction(func_name, cast_argument_names, actions);
return cast_expr_name;
}

bool DAGExpressionAnalyzer::appendTimeZoneCastAfterTS(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May call it appendTimeZoneCast***s***AfterTS?

ExpressionActionsChain & chain, std::vector<bool> is_ts_column, const tipb::DAGRequest & rqst)
{
if (!hasMeaningfullTZInfo(rqst))
return false;

bool ret = false;
initChain(chain, getCurrentInputColumns());
ExpressionActionsPtr actions = chain.getLastActions();
tipb::Expr tz_expr;
constructTZExpr(tz_expr, rqst, true);
String tz_col;
String func_name
= rqst.has_time_zone_name() && rqst.time_zone_name().length() > 0 ? "ConvertTimeZoneFromUTC" : "ConvertTimeZoneByOffset";
for (size_t i = 0; i < is_ts_column.size(); i++)
{
if (is_ts_column[i])
{
if (tz_col.length() == 0)
tz_col = getActions(tz_expr, actions);
String casted_name = appendTimeZoneCast(tz_col, source_columns[i].name, func_name, actions);
source_columns.emplace_back(source_columns[i].name, source_columns[i].type);
source_columns[i].name = casted_name;
ret = true;
}
}
return ret;
}

void DAGExpressionAnalyzer::appendAggSelect(
ExpressionActionsChain & chain, const tipb::Aggregation & aggregation, const tipb::DAGRequest & rqst)
{
initChain(chain, getCurrentInputColumns());
bool need_update_aggregated_columns = false;
NamesAndTypesList updated_aggregated_columns;
ExpressionActionsChain::Step step = chain.steps.back();
auto agg_col_names = aggregated_columns.getNames();
bool need_append_timezone_cast = hasMeaningfullTZInfo(rqst);
tipb::Expr tz_expr;
if (need_append_timezone_cast)
constructTZExpr(tz_expr, rqst, false);
String tz_col;
String tz_cast_func_name
= rqst.has_time_zone_name() && rqst.time_zone_name().length() > 0 ? "ConvertTimeZoneToUTC" : "ConvertTimeZoneByOffset";
for (Int32 i = 0; i < aggregation.agg_func_size(); i++)
{
String & name = agg_col_names[i];
String & name = aggregated_columns[i].name;
String updated_name = appendCastIfNeeded(aggregation.agg_func(i), step.actions, name);
if (need_append_timezone_cast && aggregation.agg_func(i).field_type().tp() == TiDB::TypeTimestamp)
{
if (tz_col.length() == 0)
tz_col = getActions(tz_expr, step.actions);
updated_name = appendTimeZoneCast(tz_col, updated_name, tz_cast_func_name, step.actions);
}
if (name != updated_name)
{
need_update_aggregated_columns = true;
Expand All @@ -208,14 +295,20 @@ void DAGExpressionAnalyzer::appendAggSelect(ExpressionActionsChain & chain, cons
}
else
{
updated_aggregated_columns.emplace_back(name, aggregated_columns.getTypes()[i]);
updated_aggregated_columns.emplace_back(name, aggregated_columns[i].type);
step.required_output.push_back(name);
}
}
for (Int32 i = 0; i < aggregation.group_by_size(); i++)
{
String & name = agg_col_names[i + aggregation.agg_func_size()];
String & name = aggregated_columns[i + aggregation.agg_func_size()].name;
String updated_name = appendCastIfNeeded(aggregation.group_by(i), step.actions, name);
if (need_append_timezone_cast && aggregation.group_by(i).field_type().tp() == TiDB::TypeTimestamp)
{
if (tz_col.length() == 0)
tz_col = getActions(tz_expr, step.actions);
updated_name = appendTimeZoneCast(tz_col, updated_name, tz_cast_func_name, step.actions);
}
if (name != updated_name)
{
need_update_aggregated_columns = true;
Expand All @@ -225,7 +318,7 @@ void DAGExpressionAnalyzer::appendAggSelect(ExpressionActionsChain & chain, cons
}
else
{
updated_aggregated_columns.emplace_back(name, aggregated_columns.getTypes()[i]);
updated_aggregated_columns.emplace_back(name, aggregated_columns[i].type);
step.required_output.push_back(name);
}
}
Expand Down Expand Up @@ -263,7 +356,6 @@ String DAGExpressionAnalyzer::appendCastIfNeeded(const tipb::Expr & expr, Expres
// first construct the second argument
tipb::Expr type_expr;
type_expr.set_tp(tipb::ExprType::String);
std::stringstream ss;
type_expr.set_val(expected_type->getName());
auto * type_field_type = type_expr.mutable_field_type();
type_field_type->set_tp(0xfe);
Expand Down
14 changes: 8 additions & 6 deletions dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ class DAGExpressionAnalyzer : private boost::noncopyable
private:
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
// all columns from table scan
NamesAndTypesList source_columns;
std::vector<NameAndTypePair> source_columns;
// all columns after aggregation
NamesAndTypesList aggregated_columns;
std::vector<NameAndTypePair> aggregated_columns;
DAGPreparedSets prepared_sets;
Settings settings;
const Context & context;
Expand All @@ -36,14 +36,14 @@ class DAGExpressionAnalyzer : private boost::noncopyable
Poco::Logger * log;

public:
DAGExpressionAnalyzer(const NamesAndTypesList & source_columns_, const Context & context_);
DAGExpressionAnalyzer(const std::vector<NameAndTypePair> && source_columns_, const Context & context_);
void appendWhere(ExpressionActionsChain & chain, const tipb::Selection & sel, String & filter_column_name);
void appendOrderBy(ExpressionActionsChain & chain, const tipb::TopN & topN, Strings & order_column_names);
void appendAggregation(ExpressionActionsChain & chain, const tipb::Aggregation & agg, Names & aggregate_keys,
AggregateDescriptions & aggregate_descriptions);
void appendAggSelect(ExpressionActionsChain & chain, const tipb::Aggregation & agg);
void appendAggSelect(ExpressionActionsChain & chain, const tipb::Aggregation & agg, const tipb::DAGRequest & rqst);
String appendCastIfNeeded(const tipb::Expr & expr, ExpressionActionsPtr & actions, const String & expr_name);
void initChain(ExpressionActionsChain & chain, const NamesAndTypesList & columns) const
void initChain(ExpressionActionsChain & chain, const std::vector<NameAndTypePair> & columns) const
{
if (chain.steps.empty())
{
Expand All @@ -53,10 +53,12 @@ class DAGExpressionAnalyzer : private boost::noncopyable
}
void appendFinalProject(ExpressionActionsChain & chain, const NamesWithAliases & final_project);
String getActions(const tipb::Expr & expr, ExpressionActionsPtr & actions);
const NamesAndTypesList & getCurrentInputColumns();
const std::vector<NameAndTypePair> & getCurrentInputColumns();
void makeExplicitSet(const tipb::Expr & expr, const Block & sample_block, bool create_ordered_set, const String & left_arg_name);
String applyFunction(const String & func_name, Names & arg_names, ExpressionActionsPtr & actions);
Int32 getImplicitCastCount() { return implicit_cast_count; };
bool appendTimeZoneCastAfterTS(ExpressionActionsChain & chain, std::vector<bool> is_ts_column, const tipb::DAGRequest & rqst);
String appendTimeZoneCast(const String & tz_col, const String & ts_col, const String & func_name, ExpressionActionsPtr & actions);
};

} // namespace DB
6 changes: 5 additions & 1 deletion dbms/src/Flash/Coprocessor/DAGStringConverter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,11 @@ void DAGStringConverter::buildTSString(const tipb::TableScan & ts, std::stringst
// no column selected, must be something wrong
throw Exception("No column is selected in table scan executor", ErrorCodes::COP_BAD_DAG_REQUEST);
}
columns_from_ts = storage->getColumns().getAllPhysical();
const auto & column_list = storage->getColumns().getAllPhysical();
for (auto & column : column_list)
{
columns_from_ts.emplace_back(column.name, column.type);
}
for (const tipb::ColumnInfo & ci : ts.columns())
{
ColumnID cid = ci.column_id();
Expand Down
Loading