Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flash-481 Arrow encode #279

Merged
merged 150 commits into from
Oct 25, 2019
Merged
Changes from 1 commit
Commits
Show all changes
150 commits
Select commit Hold shift + click to select a range
aa92f4e
basic framework for coprocessor support in tiflash
windtalker Jul 30, 2019
4f37218
basic support for InterpreterDagRequestV2
windtalker Jul 30, 2019
85bfd5c
code refine
windtalker Jul 30, 2019
e1700c3
tipb submodule use tipb master branch
windtalker Jul 31, 2019
0f82665
rewrite build flow in InterpreterDagRequest
windtalker Jul 31, 2019
a7655bc
rename Dag to DAG
windtalker Jul 31, 2019
f516f00
Update tipb submodule
zanmato1984 Aug 1, 2019
3b520c9
basic support for selection/limit/topn executor in InterpreterDAGRequest
windtalker Aug 2, 2019
9591d26
Merge branch 'cop' of https://github.com/pingcap/tics into cop
windtalker Aug 2, 2019
ead9609
basic support for selection/limit/topn executor in InterpreterDAGRequ…
windtalker Aug 2, 2019
bed0bd4
merge pingcap/cop branch
windtalker Aug 2, 2019
526cad9
Code reorg
zanmato1984 Aug 4, 2019
be4d80c
Format
zanmato1984 Aug 4, 2019
64a45a9
merge pingcap/cop
windtalker Aug 5, 2019
a76fdb3
merge pingcap/cop
windtalker Aug 5, 2019
0cfe045
Refine code
zanmato1984 Aug 5, 2019
e9b216c
Merge branch 'cop' of https://github.com/pingcap/tics into cop
windtalker Aug 5, 2019
3617a87
basic support for dag agg executor
windtalker Aug 5, 2019
cb55df4
Code refine
zanmato1984 Aug 5, 2019
ed41c93
Merge master into cop
zanmato1984 Aug 5, 2019
08b7142
Refine code
zanmato1984 Aug 5, 2019
bc25942
Another way of getting codec flag
zanmato1984 Aug 5, 2019
059f267
fix cop test regression (#157)
windtalker Aug 6, 2019
e59e8f3
fix npe during dag execute (#160)
windtalker Aug 6, 2019
a618cb5
Add tipb cpp gen in build script
zanmato1984 Aug 6, 2019
4f797fe
Merge branch 'master' into cop
zanmato1984 Aug 6, 2019
bb51749
Fix build error and adjust some formats
zanmato1984 Aug 6, 2019
da1cb0e
Fix build error
zanmato1984 Aug 6, 2019
816ef4b
Fix build error
zanmato1984 Aug 6, 2019
f18fcdd
Update flash configs
zanmato1984 Aug 6, 2019
2ade1cb
Format
zanmato1984 Aug 6, 2019
3870d93
Merge branch 'master' into cop
zanmato1984 Aug 7, 2019
7cb9e71
throw exception when meet error duing cop request handling (#162)
windtalker Aug 7, 2019
5fe66ee
Merge branch 'master' into cop
zanmato1984 Aug 8, 2019
0174b7e
add DAGContext so InterpreterDAG can exchange information with DAGDri…
windtalker Aug 8, 2019
9a1dd23
columnref index is based on executor output schema (#167)
windtalker Aug 8, 2019
26e20d5
Move flash/cop/dag to individual library
zanmato1984 Aug 8, 2019
bf67d9d
Merge cop lib
zanmato1984 Aug 8, 2019
62ced38
DAG planner fix and mock dag request (#169)
zanmato1984 Aug 9, 2019
b346a24
Merge branch 'master' into cop
zanmato1984 Aug 9, 2019
57cd382
Fix DAG get and lock storage
zanmato1984 Aug 9, 2019
4a76e91
handle error in cop request (#171)
windtalker Aug 12, 2019
2d093a8
code refine && several minor bug fix (#174)
windtalker Aug 12, 2019
c8cd3d7
Fix region id in mock dag
zanmato1984 Aug 12, 2019
0492af6
support udf in (#175)
windtalker Aug 14, 2019
4a6bad8
Merge branch 'master' into cop
zanmato1984 Aug 14, 2019
8713ff2
1. fix decode literal expr error, 2. add all scalar function sig in s…
windtalker Aug 14, 2019
7759af1
Merge branch 'master' into cop
zanmato1984 Aug 15, 2019
b25d1cc
some bug fix (#179)
windtalker Aug 15, 2019
3d38b7b
Support all DAG operator types in mock SQL -> DAG parser (#176)
zanmato1984 Aug 15, 2019
cbcfdb0
filter column must be uint8 in tiflash (#180)
windtalker Aug 16, 2019
d87e2d5
1. fix encode null error, 2. fix empty field type generated by TiFlas…
windtalker Aug 16, 2019
17f7fcb
Merge branch 'master' into cop
zanmato1984 Aug 16, 2019
5853b91
check validation of dag exprs field type (#183)
windtalker Aug 19, 2019
0a6767a
Merge branch 'master' into cop
zanmato1984 Aug 19, 2019
d53ca34
Merge branch 'master' into cop
zanmato1984 Aug 20, 2019
5de0ec6
add more coprocessor mock tests (#185)
windtalker Aug 20, 2019
6196171
add some log about implicit cast (#188)
windtalker Aug 21, 2019
960cc56
Merge branch 'master' into cop
zanmato1984 Aug 24, 2019
08bacd7
Pass DAG tests after merging master (#199)
zanmato1984 Aug 24, 2019
e8b4198
Fix date/datetime/bit encode error (#200)
zanmato1984 Aug 26, 2019
61cdc8f
improve dag execution time collection (#202)
windtalker Aug 26, 2019
53dcd1f
Merge branch 'master' into cop
zanmato1984 Aug 27, 2019
10e3883
column id in table scan operator may be -1 (#205)
windtalker Aug 27, 2019
39d1994
quick fix for decimal encode (#210)
windtalker Aug 30, 2019
8a0fb66
support udf like with 3 arguments (#212)
windtalker Sep 2, 2019
ff9a1de
Flash-473 optimize date and datetime comparison (#221)
windtalker Sep 5, 2019
17aacde
Merge master
zanmato1984 Sep 5, 2019
6b14b38
FLASH-479 select from empty table throw error in tiflash (#223)
windtalker Sep 6, 2019
548e519
Update flash service port
zanmato1984 Sep 6, 2019
a1b8444
fix bug in DAGBlockOutputStream
windtalker Sep 10, 2019
fce3676
fix bug in DAGBlockOutputStream (#230)
windtalker Sep 10, 2019
a9f9b48
FLASH-475: Support BATCH COMMANDS in flash service (#232)
zanmato1984 Sep 12, 2019
bdc7d57
init change for array encode
windtalker Sep 12, 2019
516d340
merge pingcap/tics/cop
windtalker Sep 12, 2019
1ccfbd4
Merge branch 'master' into cop
zhexuany Sep 12, 2019
df07939
FLASH-483: Combine raft service and flash service (#235)
zanmato1984 Sep 16, 2019
99f26c0
Merge master
zanmato1984 Sep 16, 2019
0bb7991
Fix build error
zanmato1984 Sep 16, 2019
f41f853
Fix test regression
zanmato1984 Sep 16, 2019
259ec77
Fix null value bug in datum
zanmato1984 Sep 17, 2019
ef65514
Merge branch 'master' into cop
zanmato1984 Sep 17, 2019
708d52f
FLASH-490: Fix table scan with -1 column ID and no agg (#240)
zanmato1984 Sep 23, 2019
3656a95
Merge branch 'master' into cop
zanmato1984 Sep 23, 2019
a4c1074
throw error if the cop request is not based on full region scan (#247)
windtalker Sep 24, 2019
b57656c
Merge branch 'master' into cop
zanmato1984 Sep 25, 2019
3a43942
FLASH-437 Support time zone in coprocessor (#259)
windtalker Sep 27, 2019
01caa55
Merge branch 'master' into cop
zanmato1984 Sep 27, 2019
8d2576e
Address comment
zanmato1984 Sep 29, 2019
8ec5380
Merge branch 'cop' of https://github.com/pingcap/tics into array_encode
windtalker Sep 29, 2019
2e3b1c1
use the new date implementation
windtalker Sep 29, 2019
d33a278
FLASH-489 support key condition for coprocessor query (#261)
windtalker Sep 30, 2019
087faee
Merge branch 'master' into cop
zanmato1984 Sep 30, 2019
4aa2b58
only return execute summaies if requested (#264)
windtalker Sep 30, 2019
aed5e84
Merge branch 'cop' of https://github.com/pingcap/tics into array_encode
windtalker Oct 8, 2019
8663811
refine code
windtalker Oct 8, 2019
80f6f35
Refine service init (#265)
zanmato1984 Oct 8, 2019
0b737dc
fix bug
windtalker Oct 9, 2019
d3af009
fix bug
windtalker Oct 9, 2019
004f7c5
Merge branch 'cop' of https://github.com/pingcap/tics into arrow_encode
windtalker Oct 9, 2019
f255362
FLASH-554 cop check range should be based on region range (#270)
windtalker Oct 10, 2019
170f652
add ut for arrow encode
windtalker Oct 11, 2019
c53e456
Merge branch 'cop' of https://github.com/pingcap/tics into arrow_encode
windtalker Oct 11, 2019
7fc53ad
minor improve (#273)
windtalker Oct 11, 2019
22ad2d3
Merge branch 'master' into cop
zanmato1984 Oct 11, 2019
b01ccb3
update tipb
windtalker Oct 11, 2019
a1304ae
Fix mutex on timezone retrieval (#276)
ilovesoup2000 Oct 11, 2019
687dcbe
Fix race condition of batch command handling (#277)
zanmato1984 Oct 12, 2019
4dd5e1e
Merge branch 'cop' of https://github.com/pingcap/tics into arrow_encode
windtalker Oct 12, 2019
80c20b2
update tipb version
windtalker Oct 12, 2019
7c5bea6
set default record_per_chunk to 1024
windtalker Oct 13, 2019
939b8cf
address comment
windtalker Oct 14, 2019
d25dadc
address comments
windtalker Oct 14, 2019
512fa8e
refine code
windtalker Oct 14, 2019
ff9bf8f
Merge branch 'cop' of https://github.com/pingcap/tics into arrow_encode
windtalker Oct 14, 2019
a6f6dda
refine code
windtalker Oct 14, 2019
a943e8d
add mock_dag test
windtalker Oct 14, 2019
41272da
code refine
windtalker Oct 14, 2019
00dac75
code refine
windtalker Oct 14, 2019
4080fba
address comments
windtalker Oct 14, 2019
1188e69
Merge branch 'cop' of https://github.com/pingcap/tics into arrow_encode
windtalker Oct 14, 2019
d2890e3
Fix NULL order for dag (#281)
zanmato1984 Oct 14, 2019
bc075c5
refine get actions in DAGExpressionAnalyzer, fix bug in dbgFuncCoproc…
windtalker Oct 15, 2019
4dbff78
Merge branch 'cop' of https://github.com/pingcap/tics into arrow_encode
windtalker Oct 15, 2019
fbcbdc0
remove duplicate agg funcs (#283)
windtalker Oct 15, 2019
8f2bfaf
Merge branch 'cop' of https://github.com/pingcap/tics into arrow_encode
windtalker Oct 16, 2019
3716b98
refine code
windtalker Oct 16, 2019
fa42c69
remove useless code
windtalker Oct 16, 2019
7bbe8c0
address comments
windtalker Oct 16, 2019
31973bf
remove uselss include
windtalker Oct 16, 2019
d968c09
address comments
windtalker Oct 16, 2019
edf32d4
Merge branch 'cop' of https://github.com/pingcap/tics into arrow_encode
windtalker Oct 16, 2019
f1256bd
refine code
windtalker Oct 17, 2019
73befbd
address comments
windtalker Oct 17, 2019
3188c07
format code
windtalker Oct 17, 2019
87955d1
fix typo
windtalker Oct 17, 2019
4f58878
Update dbms/src/Flash/BatchCommandsHandler.cpp
zanmato1984 Oct 17, 2019
92c16c2
revert unnecessary changes
windtalker Oct 17, 2019
0f6f0a6
Merge branch 'cop' of https://github.com/pingcap/tics into arrow_encode
windtalker Oct 17, 2019
d550644
refine code
windtalker Oct 17, 2019
bac7951
fix build error
windtalker Oct 17, 2019
4a251b0
refine code
windtalker Oct 17, 2019
e8b92b4
Merge branch 'master' into cop
zanmato1984 Oct 17, 2019
48dd7bd
Merge master
zanmato1984 Oct 18, 2019
a8cba5f
Merge remote-tracking branch 'origin/cop' into arrow_encode_2
windtalker Oct 18, 2019
e3232af
Merge branch 'master' of https://github.com/pingcap/tics into arrow_e…
windtalker Oct 21, 2019
4d5e5d4
address comments
windtalker Oct 21, 2019
c7d8d4e
refine code
windtalker Oct 22, 2019
0b1ed77
address comments
windtalker Oct 25, 2019
683e7e0
Merge branch 'master' into arrow_encode
zanmato1984 Oct 25, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
FLASH-437 Support time zone in coprocessor (#259)
* do not allow timestamp literal in DAG request

* refine code

* fix cop date type encode error

* support tz info in DAG request

* address comments
windtalker authored and zanmato1984 committed Sep 27, 2019

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
commit 3a439425d374258bdc4c40f561ca2a3a385550e3
11 changes: 11 additions & 0 deletions dbms/src/Common/MyTime.cpp
Original file line number Diff line number Diff line change
@@ -473,4 +473,15 @@ void convertTimeZone(UInt64 from_time, UInt64 & to_time, const DateLUTImpl & tim
to_time = to_my_time.toPackedUInt();
}

void convertTimeZoneByOffset(UInt64 from_time, UInt64 & to_time, Int64 offset, const DateLUTImpl & time_zone)
{
MyDateTime from_my_time(from_time);
time_t epoch = time_zone.makeDateTime(
from_my_time.year, from_my_time.month, from_my_time.day, from_my_time.hour, from_my_time.minute, from_my_time.second);
epoch += offset;
MyDateTime to_my_time(time_zone.toYear(epoch), time_zone.toMonth(epoch), time_zone.toDayOfMonth(epoch),
time_zone.toHour(epoch), time_zone.toMinute(epoch), time_zone.toSecond(epoch), from_my_time.micro_second);
to_time = to_my_time.toPackedUInt();
}

} // namespace DB
2 changes: 2 additions & 0 deletions dbms/src/Common/MyTime.h
Original file line number Diff line number Diff line change
@@ -65,4 +65,6 @@ Field parseMyDateTime(const String & str);

void convertTimeZone(UInt64 from_time, UInt64 & to_time, const DateLUTImpl & time_zone_from, const DateLUTImpl & time_zone_to);

void convertTimeZoneByOffset(UInt64 from_time, UInt64 & to_time, Int64 offset, const DateLUTImpl & time_zone);

} // namespace DB
57 changes: 46 additions & 11 deletions dbms/src/Debug/dbgFuncCoprocessor.cpp
Original file line number Diff line number Diff line change
@@ -38,20 +38,27 @@ using DAGColumnInfo = std::pair<String, ColumnInfo>;
using DAGSchema = std::vector<DAGColumnInfo>;
using SchemaFetcher = std::function<TableInfo(const String &, const String &)>;
std::tuple<TableID, DAGSchema, tipb::DAGRequest> compileQuery(
Context & context, const String & query, SchemaFetcher schema_fetcher, Timestamp start_ts);
Context & context, const String & query, SchemaFetcher schema_fetcher, Timestamp start_ts,
Int64 tz_offset, const String & tz_name);
tipb::SelectResponse executeDAGRequest(
Context & context, const tipb::DAGRequest & dag_request, RegionID region_id, UInt64 region_version, UInt64 region_conf_version);
BlockInputStreamPtr outputDAGResponse(Context & context, const DAGSchema & schema, const tipb::SelectResponse & dag_response);

BlockInputStreamPtr dbgFuncDAG(Context & context, const ASTs & args)
{
if (args.size() < 1 || args.size() > 2)
throw Exception("Args not matched, should be: query[, region-id]", ErrorCodes::BAD_ARGUMENTS);
if (args.size() < 1 || args.size() > 4)
throw Exception("Args not matched, should be: query[, region-id, tz_offset, tz_name]", ErrorCodes::BAD_ARGUMENTS);

String query = safeGet<String>(typeid_cast<const ASTLiteral &>(*args[0]).value);
RegionID region_id = InvalidRegionID;
if (args.size() == 2)
if (args.size() >= 2)
region_id = safeGet<RegionID>(typeid_cast<const ASTLiteral &>(*args[1]).value);
Int64 tz_offset = 0;
String tz_name = "";
if (args.size() >= 3)
tz_offset = get<Int64>(typeid_cast<const ASTLiteral &>(*args[2]).value);
if (args.size() >= 4)
tz_name = safeGet<String>(typeid_cast<const ASTLiteral &>(*args[3]).value);
Timestamp start_ts = context.getTMTContext().getPDClient()->getTS();

auto [table_id, schema, dag_request] = compileQuery(
@@ -63,7 +70,7 @@ BlockInputStreamPtr dbgFuncDAG(Context & context, const ASTs & args)
throw Exception("Not TMT", ErrorCodes::BAD_ARGUMENTS);
return mmt->getTableInfo();
},
start_ts);
start_ts, tz_offset, tz_name);

RegionPtr region;
if (region_id == InvalidRegionID)
@@ -86,23 +93,29 @@ BlockInputStreamPtr dbgFuncDAG(Context & context, const ASTs & args)

BlockInputStreamPtr dbgFuncMockDAG(Context & context, const ASTs & args)
{
if (args.size() < 2 || args.size() > 3)
throw Exception("Args not matched, should be: query, region-id[, start-ts]", ErrorCodes::BAD_ARGUMENTS);
if (args.size() < 2 || args.size() > 5)
throw Exception("Args not matched, should be: query, region-id[, start-ts, tz_offset, tz_name]", ErrorCodes::BAD_ARGUMENTS);

String query = safeGet<String>(typeid_cast<const ASTLiteral &>(*args[0]).value);
RegionID region_id = safeGet<RegionID>(typeid_cast<const ASTLiteral &>(*args[1]).value);
Timestamp start_ts = DEFAULT_MAX_READ_TSO;
if (args.size() == 3)
if (args.size() >= 3)
start_ts = safeGet<Timestamp>(typeid_cast<const ASTLiteral &>(*args[2]).value);
if (start_ts == 0)
start_ts = context.getTMTContext().getPDClient()->getTS();
Int64 tz_offset = 0;
String tz_name = "";
if (args.size() >= 3)
tz_offset = safeGet<Int64>(typeid_cast<const ASTLiteral &>(*args[2]).value);
if (args.size() >= 4)
tz_name = safeGet<String>(typeid_cast<const ASTLiteral &>(*args[3]).value);

auto [table_id, schema, dag_request] = compileQuery(
context, query,
[&](const String & database_name, const String & table_name) {
return MockTiDB::instance().getTableByName(database_name, table_name)->table_info;
},
start_ts);
start_ts, tz_offset, tz_name);
std::ignore = table_id;

RegionPtr region = context.getTMTContext().getKVStore()->getRegion(region_id);
@@ -170,6 +183,14 @@ void compileExpr(const DAGSchema & input, ASTPtr ast, tipb::Expr * expr, std::un
ft->set_tp(TiDB::TypeTiny);
ft->set_flag(TiDB::ColumnFlagUnsigned);
}
else if (func_name_lowercase == "greater")
{
expr->set_sig(tipb::ScalarFuncSig::GTInt);
auto * ft = expr->mutable_field_type();
// TODO: TiDB will infer Int64.
ft->set_tp(TiDB::TypeTiny);
ft->set_flag(TiDB::ColumnFlagUnsigned);
}
else
{
throw Exception("Unsupported function: " + func_name_lowercase, ErrorCodes::LOGICAL_ERROR);
@@ -239,10 +260,13 @@ void compileFilter(const DAGSchema & input, ASTPtr ast, tipb::Selection * filter
}

std::tuple<TableID, DAGSchema, tipb::DAGRequest> compileQuery(
Context & context, const String & query, SchemaFetcher schema_fetcher, Timestamp start_ts)
Context & context, const String & query, SchemaFetcher schema_fetcher,
Timestamp start_ts, Int64 tz_offset, const String & tz_name)
{
DAGSchema schema;
tipb::DAGRequest dag_request;
dag_request.set_time_zone_name(tz_name);
dag_request.set_time_zone_offset(tz_offset);

dag_request.set_start_ts(start_ts);

@@ -291,8 +315,11 @@ std::tuple<TableID, DAGSchema, tipb::DAGRequest> compileQuery(
ci.tp = column_info.tp;
ci.flag = column_info.flag;
ci.flen = column_info.flen;
ci.decimal = column_info.flen;
ci.decimal = column_info.decimal;
ci.elems = column_info.elems;
// a hack to test timestamp type in mock test
if (column_info.tp == TiDB::TypeDatetime && ci.decimal == 5)
ci.tp = TiDB::TypeTimestamp;
ts_output.emplace_back(std::make_pair(column_info.name, std::move(ci)));
}
executor_ctx_map.emplace(ts_exec, ExecutorCtx{nullptr, std::move(ts_output), std::unordered_map<String, tipb::Expr *>{}});
@@ -430,6 +457,14 @@ std::tuple<TableID, DAGSchema, tipb::DAGRequest> compileQuery(
ft->set_tp(TiDB::TypeLongLong);
ft->set_flag(TiDB::ColumnFlagUnsigned | TiDB::ColumnFlagNotNull);
}
else if (func->name == "max")
{
agg_func->set_tp(tipb::Max);
if (agg_func->children_size() != 1)
throw Exception("udaf max only accept 1 argument");
auto ft = agg_func->mutable_field_type();
ft->set_tp(agg_func->children(0).field_type().tp());
}
// TODO: Other agg func.
else
{
2 changes: 1 addition & 1 deletion dbms/src/Flash/Coprocessor/DAGDriver.cpp
Original file line number Diff line number Diff line change
@@ -102,7 +102,7 @@ catch (const LockException & e)
}
catch (const Exception & e)
{
LOG_ERROR(log, __PRETTY_FUNCTION__ << ": Exception: " << e.displayText());
LOG_ERROR(log, __PRETTY_FUNCTION__ << ": Exception: " << e.getStackTrace().toString());
recordError(e.code(), e.message());
}
catch (const std::exception & e)
131 changes: 119 additions & 12 deletions dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.cpp
Original file line number Diff line number Diff line change
@@ -6,6 +6,7 @@
#include <DataTypes/DataTypeSet.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/FieldToDataType.h>
#include <Flash/Coprocessor/DAGCodec.h>
#include <Flash/Coprocessor/DAGUtils.h>
#include <Functions/FunctionFactory.h>
#include <Interpreters/Context.h>
@@ -44,7 +45,7 @@ static String genFuncString(const String & func_name, const Names & argument_nam
return ss.str();
}

DAGExpressionAnalyzer::DAGExpressionAnalyzer(const NamesAndTypesList & source_columns_, const Context & context_)
DAGExpressionAnalyzer::DAGExpressionAnalyzer(const std::vector<NameAndTypePair> && source_columns_, const Context & context_)
: source_columns(source_columns_),
context(context_),
after_agg(false),
@@ -177,28 +178,129 @@ void DAGExpressionAnalyzer::appendOrderBy(ExpressionActionsChain & chain, const
}
}

const NamesAndTypesList & DAGExpressionAnalyzer::getCurrentInputColumns() { return after_agg ? aggregated_columns : source_columns; }
const std::vector<NameAndTypePair> & DAGExpressionAnalyzer::getCurrentInputColumns()
{
return after_agg ? aggregated_columns : source_columns;
}

void DAGExpressionAnalyzer::appendFinalProject(ExpressionActionsChain & chain, const NamesWithAliases & final_project)
{
initChain(chain, getCurrentInputColumns());
for (auto name : final_project)
for (const auto & name : final_project)
{
chain.steps.back().required_output.push_back(name.first);
}
}

void DAGExpressionAnalyzer::appendAggSelect(ExpressionActionsChain & chain, const tipb::Aggregation & aggregation)
void constructTZExpr(tipb::Expr & tz_expr, const tipb::DAGRequest & rqst, bool from_utc)
{
if (rqst.has_time_zone_name() && rqst.time_zone_name().length() > 0)
{
tz_expr.set_tp(tipb::ExprType::String);
tz_expr.set_val(rqst.time_zone_name());
auto * field_type = tz_expr.mutable_field_type();
field_type->set_tp(TiDB::TypeString);
field_type->set_flag(TiDB::ColumnFlagNotNull);
}
else
{
tz_expr.set_tp(tipb::ExprType::Int64);
std::stringstream ss;
encodeDAGInt64(from_utc ? rqst.time_zone_offset() : -rqst.time_zone_offset(), ss);
tz_expr.set_val(ss.str());
auto * field_type = tz_expr.mutable_field_type();
field_type->set_tp(TiDB::TypeLongLong);
field_type->set_flag(TiDB::ColumnFlagNotNull);
}
}

bool hasMeaningfulTZInfo(const tipb::DAGRequest &rqst)
{
if (rqst.has_time_zone_name() && rqst.time_zone_name().length() > 0)
return rqst.time_zone_name() != "UTC";
if (rqst.has_time_zone_offset())
return rqst.has_time_zone_offset() != 0;
return false;
}

String DAGExpressionAnalyzer::appendTimeZoneCast(
const String & tz_col, const String & ts_col, const String & func_name, ExpressionActionsPtr & actions)
{
Names cast_argument_names;
cast_argument_names.push_back(ts_col);
cast_argument_names.push_back(tz_col);
String cast_expr_name = applyFunction(func_name, cast_argument_names, actions);
return cast_expr_name;
}

// add timezone cast after table scan, this is used for session level timezone support
// the basic idea of supporting session level timezone is that:
// 1. for every timestamp column used in the dag request, after reading it from table scan, we add
// cast function to convert its timezone to the timezone specified in DAG request
// 2. for every timestamp column that will be returned to TiDB, we add cast function to convert its
// timezone to UTC
// for timestamp columns without any transformation or calculation(e.g. select ts_col from table),
// this will introduce two useless casts, in order to avoid these redundant cast, when cast the ts
// column to the columns with session-level timezone info, the original ts columns with UTC
// timezone are still kept
// for DAG request that does not contain agg, the final project will select the ts column with UTC
// timezone, which is exactly what TiDB want
// for DAG request that contains agg, any ts column after agg has session-level timezone info(since the ts
// column with UTC timezone will never be used in during agg), all the column with ts datatype will
// convert back to UTC timezone
bool DAGExpressionAnalyzer::appendTimeZoneCastsAfterTS(
ExpressionActionsChain &chain, std::vector<bool> is_ts_column, const tipb::DAGRequest &rqst)
{
if (!hasMeaningfulTZInfo(rqst))
return false;

bool ret = false;
initChain(chain, getCurrentInputColumns());
ExpressionActionsPtr actions = chain.getLastActions();
tipb::Expr tz_expr;
constructTZExpr(tz_expr, rqst, true);
String tz_col;
String func_name
= rqst.has_time_zone_name() && rqst.time_zone_name().length() > 0 ? "ConvertTimeZoneFromUTC" : "ConvertTimeZoneByOffset";
for (size_t i = 0; i < is_ts_column.size(); i++)
{
if (is_ts_column[i])
{
if (tz_col.length() == 0)
tz_col = getActions(tz_expr, actions);
String casted_name = appendTimeZoneCast(tz_col, source_columns[i].name, func_name, actions);
source_columns.emplace_back(source_columns[i].name, source_columns[i].type);
source_columns[i].name = casted_name;
ret = true;
}
}
return ret;
}

void DAGExpressionAnalyzer::appendAggSelect(
ExpressionActionsChain & chain, const tipb::Aggregation & aggregation, const tipb::DAGRequest & rqst)
{
initChain(chain, getCurrentInputColumns());
bool need_update_aggregated_columns = false;
NamesAndTypesList updated_aggregated_columns;
ExpressionActionsChain::Step step = chain.steps.back();
auto agg_col_names = aggregated_columns.getNames();
bool need_append_timezone_cast = hasMeaningfulTZInfo(rqst);
tipb::Expr tz_expr;
if (need_append_timezone_cast)
constructTZExpr(tz_expr, rqst, false);
String tz_col;
String tz_cast_func_name
= rqst.has_time_zone_name() && rqst.time_zone_name().length() > 0 ? "ConvertTimeZoneToUTC" : "ConvertTimeZoneByOffset";
for (Int32 i = 0; i < aggregation.agg_func_size(); i++)
{
String & name = agg_col_names[i];
String & name = aggregated_columns[i].name;
String updated_name = appendCastIfNeeded(aggregation.agg_func(i), step.actions, name);
if (need_append_timezone_cast && aggregation.agg_func(i).field_type().tp() == TiDB::TypeTimestamp)
{
if (tz_col.length() == 0)
tz_col = getActions(tz_expr, step.actions);
updated_name = appendTimeZoneCast(tz_col, updated_name, tz_cast_func_name, step.actions);
}
if (name != updated_name)
{
need_update_aggregated_columns = true;
@@ -208,14 +310,20 @@ void DAGExpressionAnalyzer::appendAggSelect(ExpressionActionsChain & chain, cons
}
else
{
updated_aggregated_columns.emplace_back(name, aggregated_columns.getTypes()[i]);
updated_aggregated_columns.emplace_back(name, aggregated_columns[i].type);
step.required_output.push_back(name);
}
}
for (Int32 i = 0; i < aggregation.group_by_size(); i++)
{
String & name = agg_col_names[i + aggregation.agg_func_size()];
String & name = aggregated_columns[i + aggregation.agg_func_size()].name;
String updated_name = appendCastIfNeeded(aggregation.group_by(i), step.actions, name);
if (need_append_timezone_cast && aggregation.group_by(i).field_type().tp() == TiDB::TypeTimestamp)
{
if (tz_col.length() == 0)
tz_col = getActions(tz_expr, step.actions);
updated_name = appendTimeZoneCast(tz_col, updated_name, tz_cast_func_name, step.actions);
}
if (name != updated_name)
{
need_update_aggregated_columns = true;
@@ -225,7 +333,7 @@ void DAGExpressionAnalyzer::appendAggSelect(ExpressionActionsChain & chain, cons
}
else
{
updated_aggregated_columns.emplace_back(name, aggregated_columns.getTypes()[i]);
updated_aggregated_columns.emplace_back(name, aggregated_columns[i].type);
step.required_output.push_back(name);
}
}
@@ -263,11 +371,10 @@ String DAGExpressionAnalyzer::appendCastIfNeeded(const tipb::Expr & expr, Expres
// first construct the second argument
tipb::Expr type_expr;
type_expr.set_tp(tipb::ExprType::String);
std::stringstream ss;
type_expr.set_val(expected_type->getName());
auto * type_field_type = type_expr.mutable_field_type();
type_field_type->set_tp(0xfe);
type_field_type->set_flag(1);
type_field_type->set_tp(TiDB::TypeString);
type_field_type->set_flag(TiDB::ColumnFlagNotNull);
getActions(type_expr, actions);

Names cast_argument_names;
22 changes: 15 additions & 7 deletions dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.h
Original file line number Diff line number Diff line change
@@ -25,9 +25,9 @@ class DAGExpressionAnalyzer : private boost::noncopyable
private:
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
// all columns from table scan
NamesAndTypesList source_columns;
std::vector<NameAndTypePair> source_columns;
// all columns after aggregation
NamesAndTypesList aggregated_columns;
std::vector<NameAndTypePair> aggregated_columns;
DAGPreparedSets prepared_sets;
Settings settings;
const Context & context;
@@ -36,27 +36,35 @@ class DAGExpressionAnalyzer : private boost::noncopyable
Poco::Logger * log;

public:
DAGExpressionAnalyzer(const NamesAndTypesList & source_columns_, const Context & context_);
DAGExpressionAnalyzer(const std::vector<NameAndTypePair> && source_columns_, const Context & context_);
void appendWhere(ExpressionActionsChain & chain, const tipb::Selection & sel, String & filter_column_name);
void appendOrderBy(ExpressionActionsChain & chain, const tipb::TopN & topN, Strings & order_column_names);
void appendAggregation(ExpressionActionsChain & chain, const tipb::Aggregation & agg, Names & aggregate_keys,
AggregateDescriptions & aggregate_descriptions);
void appendAggSelect(ExpressionActionsChain & chain, const tipb::Aggregation & agg);
void appendAggSelect(ExpressionActionsChain & chain, const tipb::Aggregation & agg, const tipb::DAGRequest & rqst);
String appendCastIfNeeded(const tipb::Expr & expr, ExpressionActionsPtr & actions, const String & expr_name);
void initChain(ExpressionActionsChain & chain, const NamesAndTypesList & columns) const
void initChain(ExpressionActionsChain & chain, const std::vector<NameAndTypePair> & columns) const
{
if (chain.steps.empty())
{
chain.settings = settings;
chain.steps.emplace_back(std::make_shared<ExpressionActions>(columns, settings));
NamesAndTypesList column_list;
for (const auto & col : columns)
{
column_list.emplace_back(col.name, col.type);
}
chain.steps.emplace_back(std::make_shared<ExpressionActions>(column_list, settings));
}
}
void appendFinalProject(ExpressionActionsChain & chain, const NamesWithAliases & final_project);
String getActions(const tipb::Expr & expr, ExpressionActionsPtr & actions);
const NamesAndTypesList & getCurrentInputColumns();
const std::vector<NameAndTypePair> & getCurrentInputColumns();
void makeExplicitSet(const tipb::Expr & expr, const Block & sample_block, bool create_ordered_set, const String & left_arg_name);
String applyFunction(const String & func_name, Names & arg_names, ExpressionActionsPtr & actions);
Int32 getImplicitCastCount() { return implicit_cast_count; };
bool appendTimeZoneCastsAfterTS(ExpressionActionsChain &chain, std::vector<bool> is_ts_column,
const tipb::DAGRequest &rqst);
String appendTimeZoneCast(const String & tz_col, const String & ts_col, const String & func_name, ExpressionActionsPtr & actions);
};

} // namespace DB
6 changes: 5 additions & 1 deletion dbms/src/Flash/Coprocessor/DAGStringConverter.cpp
Original file line number Diff line number Diff line change
@@ -47,7 +47,11 @@ void DAGStringConverter::buildTSString(const tipb::TableScan & ts, std::stringst
// no column selected, must be something wrong
throw Exception("No column is selected in table scan executor", ErrorCodes::COP_BAD_DAG_REQUEST);
}
columns_from_ts = storage->getColumns().getAllPhysical();
const auto & column_list = storage->getColumns().getAllPhysical();
for (auto & column : column_list)
{
columns_from_ts.emplace_back(column.name, column.type);
}
for (const tipb::ColumnInfo & ci : ts.columns())
{
ColumnID cid = ci.column_id();
6 changes: 3 additions & 3 deletions dbms/src/Flash/Coprocessor/DAGStringConverter.h
Original file line number Diff line number Diff line change
@@ -22,7 +22,7 @@ class DAGStringConverter

String buildSqlString();

const NamesAndTypesList & getCurrentColumns()
const std::vector<NameAndTypePair> & getCurrentColumns()
{
if (afterAgg)
{
@@ -50,8 +50,8 @@ class DAGStringConverter
Context & context;
const tipb::DAGRequest & dag_request;
// used by columnRef, which starts with 1, and refs column index in the original ts/agg output
NamesAndTypesList columns_from_ts;
NamesAndTypesList columns_from_agg;
std::vector<NameAndTypePair> columns_from_ts;
std::vector<NameAndTypePair> columns_from_agg;
// used by output_offset, which starts with 0, and refs the index in the selected output of ts/agg operater
Names output_from_ts;
Names output_from_agg;
14 changes: 5 additions & 9 deletions dbms/src/Flash/Coprocessor/DAGUtils.cpp
Original file line number Diff line number Diff line change
@@ -49,7 +49,7 @@ const String & getFunctionName(const tipb::Expr & expr)
}
}

String exprToString(const tipb::Expr & expr, const NamesAndTypesList & input_col, bool for_parser)
String exprToString(const tipb::Expr & expr, const std::vector<NameAndTypePair> & input_col, bool for_parser)
{
std::stringstream ss;
Int64 column_id = 0;
@@ -87,9 +87,7 @@ String exprToString(const tipb::Expr & expr, const NamesAndTypesList & input_col
}
case tipb::ExprType::MysqlTime:
{
if (!expr.has_field_type()
|| (expr.field_type().tp() != TiDB::TypeDate && expr.field_type().tp() != TiDB::TypeDatetime
&& expr.field_type().tp() != TiDB::TypeTimestamp))
if (!expr.has_field_type() || (expr.field_type().tp() != TiDB::TypeDate && expr.field_type().tp() != TiDB::TypeDatetime))
throw Exception("Invalid MySQL Time literal " + expr.DebugString(), ErrorCodes::COP_BAD_DAG_REQUEST);
auto t = decodeDAGUInt64(expr.val());
// TODO: Use timezone in DAG request.
@@ -101,7 +99,7 @@ String exprToString(const tipb::Expr & expr, const NamesAndTypesList & input_col
{
throw Exception("Column id out of bound", ErrorCodes::COP_BAD_DAG_REQUEST);
}
return input_col.getNames()[column_id];
return input_col[column_id].name;
case tipb::ExprType::Count:
case tipb::ExprType::Sum:
case tipb::ExprType::Avg:
@@ -151,7 +149,7 @@ String exprToString(const tipb::Expr & expr, const NamesAndTypesList & input_col

const String & getTypeName(const tipb::Expr & expr) { return tipb::ExprType_Name(expr.tp()); }

String getName(const tipb::Expr & expr, const NamesAndTypesList & current_input_columns)
String getName(const tipb::Expr & expr, const std::vector<NameAndTypePair> & current_input_columns)
{
return exprToString(expr, current_input_columns, false);
}
@@ -235,9 +233,7 @@ Field decodeLiteral(const tipb::Expr & expr)
return decodeDAGDecimal(expr.val());
case tipb::ExprType::MysqlTime:
{
if (!expr.has_field_type()
|| (expr.field_type().tp() != TiDB::TypeDate && expr.field_type().tp() != TiDB::TypeDatetime
&& expr.field_type().tp() != TiDB::TypeTimestamp))
if (!expr.has_field_type() || (expr.field_type().tp() != TiDB::TypeDate && expr.field_type().tp() != TiDB::TypeDatetime))
throw Exception("Invalid MySQL Time literal " + expr.DebugString(), ErrorCodes::COP_BAD_DAG_REQUEST);
auto t = decodeDAGUInt64(expr.val());
// TODO: Use timezone in DAG request.
4 changes: 2 additions & 2 deletions dbms/src/Flash/Coprocessor/DAGUtils.h
Original file line number Diff line number Diff line change
@@ -23,9 +23,9 @@ const String & getFunctionName(const tipb::Expr & expr);
const String & getAggFunctionName(const tipb::Expr & expr);
bool isColumnExpr(const tipb::Expr & expr);
ColumnID getColumnID(const tipb::Expr & expr);
String getName(const tipb::Expr & expr, const NamesAndTypesList & current_input_columns);
String getName(const tipb::Expr & expr, const std::vector<NameAndTypePair> & current_input_columns);
const String & getTypeName(const tipb::Expr & expr);
String exprToString(const tipb::Expr & expr, const NamesAndTypesList & input_col, bool for_parser = true);
String exprToString(const tipb::Expr & expr, const std::vector<NameAndTypePair> & input_col, bool for_parser = true);
bool isInOrGlobalInOperator(const String & name);
bool exprHasValidFieldType(const tipb::Expr & expr);
extern std::unordered_map<tipb::ExprType, String> agg_func_map;
25 changes: 23 additions & 2 deletions dbms/src/Flash/Coprocessor/InterpreterDAG.cpp
Original file line number Diff line number Diff line change
@@ -123,6 +123,8 @@ void InterpreterDAG::executeTS(const tipb::TableScan & ts, Pipeline & pipeline)
}

Names required_columns;
std::vector<NameAndTypePair> source_columns;
std::vector<bool> is_ts_column;
for (const tipb::ColumnInfo & ci : ts.columns())
{
ColumnID cid = ci.column_id();
@@ -140,6 +142,7 @@ void InterpreterDAG::executeTS(const tipb::TableScan & ts, Pipeline & pipeline)
required_columns.push_back(name);
auto pair = storage->getColumns().getPhysical(name);
source_columns.emplace_back(std::move(pair));
is_ts_column.push_back(ci.tp() == TiDB::TypeTimestamp);
}
if (required_columns.empty())
{
@@ -150,6 +153,7 @@ void InterpreterDAG::executeTS(const tipb::TableScan & ts, Pipeline & pipeline)
required_columns.push_back(pk_handle_col->get().name);
auto pair = storage->getColumns().getPhysical(pk_handle_col->get().name);
source_columns.push_back(pair);
is_ts_column.push_back(false);
// For PK handle, use original column info of itself.
dag.getDAGContext().void_result_ft = columnInfoToFieldType(pk_handle_col->get());
}
@@ -158,13 +162,14 @@ void InterpreterDAG::executeTS(const tipb::TableScan & ts, Pipeline & pipeline)
required_columns.push_back(MutableSupport::tidb_pk_column_name);
auto pair = storage->getColumns().getPhysical(MutableSupport::tidb_pk_column_name);
source_columns.push_back(pair);
is_ts_column.push_back(false);
// For implicit handle, reverse get a column info.
auto column_info = reverseGetColumnInfo(pair, -1, Field());
dag.getDAGContext().void_result_ft = columnInfoToFieldType(column_info);
}
}

analyzer = std::make_unique<DAGExpressionAnalyzer>(source_columns, context);
analyzer = std::make_unique<DAGExpressionAnalyzer>(std::move(source_columns), context);

if (!dag.hasAggregation())
{
@@ -260,6 +265,22 @@ void InterpreterDAG::executeTS(const tipb::TableScan & ts, Pipeline & pipeline)
}
});
}

addTimeZoneCastAfterTS(is_ts_column, pipeline);
}

// add timezone cast for timestamp type, this is used to support session level timezone
void InterpreterDAG::addTimeZoneCastAfterTS(std::vector<bool> & is_ts_column, Pipeline & pipeline)
{
bool hasTSColumn = false;
for (auto b : is_ts_column)
hasTSColumn |= b;
if (!hasTSColumn)
return;

ExpressionActionsChain chain;
if (analyzer->appendTimeZoneCastsAfterTS(chain, is_ts_column, dag.getDAGRequest()))
pipeline.transform([&](auto & stream) { stream = std::make_shared<ExpressionBlockInputStream>(stream, chain.getLastActions()); });
}

InterpreterDAG::AnalysisResult InterpreterDAG::analyzeExpressions()
@@ -284,7 +305,7 @@ InterpreterDAG::AnalysisResult InterpreterDAG::analyzeExpressions()
chain.clear();

// add cast if type is not match
analyzer->appendAggSelect(chain, dag.getAggregation());
analyzer->appendAggSelect(chain, dag.getAggregation(), dag.getDAGRequest());
//todo use output_offset to reconstruct the final project columns
for (auto element : analyzer->getCurrentInputColumns())
{
2 changes: 1 addition & 1 deletion dbms/src/Flash/Coprocessor/InterpreterDAG.h
Original file line number Diff line number Diff line change
@@ -83,14 +83,14 @@ class InterpreterDAG : public IInterpreter
SortDescription getSortDescription(Strings & order_column_names);
AnalysisResult analyzeExpressions();
void recordProfileStreams(Pipeline & pipeline, Int32 index);
void addTimeZoneCastAfterTS(std::vector<bool> & is_ts_column, Pipeline & pipeline);

private:
Context & context;

const DAGQuerySource & dag;

NamesWithAliases final_project;
NamesAndTypesList source_columns;

/// How many streams we ask for storage to produce, and in how many threads we will do further processing.
size_t max_streams = 1;
6 changes: 3 additions & 3 deletions dbms/src/Flash/CoprocessorHandler.cpp
Original file line number Diff line number Diff line change
@@ -60,7 +60,7 @@ try
}
catch (const LockException & e)
{
LOG_ERROR(log, __PRETTY_FUNCTION__ << ": LockException: " << e.displayText());
LOG_ERROR(log, __PRETTY_FUNCTION__ << ": LockException: " << e.getStackTrace().toString());
cop_response->Clear();
kvrpcpb::LockInfo * lock_info = cop_response->mutable_locked();
lock_info->set_key(e.lock_infos[0]->key);
@@ -72,7 +72,7 @@ catch (const LockException & e)
}
catch (const RegionException & e)
{
LOG_ERROR(log, __PRETTY_FUNCTION__ << ": RegionException: " << e.displayText());
LOG_ERROR(log, __PRETTY_FUNCTION__ << ": RegionException: " << e.getStackTrace().toString());
cop_response->Clear();
errorpb::Error * region_err;
switch (e.status)
@@ -95,7 +95,7 @@ catch (const RegionException & e)
}
catch (const Exception & e)
{
LOG_ERROR(log, __PRETTY_FUNCTION__ << ": Exception: " << e.displayText());
LOG_ERROR(log, __PRETTY_FUNCTION__ << ": Exception: " << e.getStackTrace().toString());
cop_response->Clear();
cop_response->set_other_error(e.message());

1 change: 1 addition & 0 deletions dbms/src/Functions/FunctionsDateTime.cpp
Original file line number Diff line number Diff line change
@@ -59,6 +59,7 @@ void registerFunctionsDateTime(FunctionFactory & factory)
{
factory.registerFunction<FunctionMyTimeZoneConverter<true>>();
factory.registerFunction<FunctionMyTimeZoneConverter<false>>();
factory.registerFunction<FunctionMyTimeZoneConvertByOffset>();
factory.registerFunction<FunctionToYear>();
factory.registerFunction<FunctionToQuarter>();
factory.registerFunction<FunctionToMonth>();
65 changes: 65 additions & 0 deletions dbms/src/Functions/FunctionsDateTime.h
Original file line number Diff line number Diff line change
@@ -1276,6 +1276,71 @@ class FunctionYesterday : public IFunction
}
};

class FunctionMyTimeZoneConvertByOffset : public IFunction
{
using FromFieldType = typename DataTypeMyDateTime::FieldType;
using ToFieldType = typename DataTypeMyDateTime::FieldType;
public:
static FunctionPtr create(const Context &) { return std::make_shared<FunctionMyTimeZoneConvertByOffset>(); };
static constexpr auto name = "ConvertTimeZoneByOffset";

String getName() const override
{
return name;
}

size_t getNumberOfArguments() const override {return 2; }

DataTypePtr getReturnTypeImpl(const ColumnsWithTypeAndName & arguments) const override
{
if (arguments.size() != 2)
throw Exception("Number of arguments for function " + getName() + " doesn't match: passed "
+ toString(arguments.size()) + ", should be 2",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);

if (!checkDataType<DataTypeMyDateTime>(arguments[0].type.get()))
throw Exception{
"Illegal type " + arguments[0].type->getName() + " of first argument of function " + getName() +
". Should be MyDateTime", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT};
if (!arguments[1].type->isInteger())
throw Exception{
"Illegal type " + arguments[1].type->getName() + " of second argument of function " + getName() +
". Should be Integer type", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT};

return arguments[0].type;
}

void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result) override {
if (const ColumnVector<FromFieldType> *col_from
= checkAndGetColumn<ColumnVector<FromFieldType>>(block.getByPosition(arguments[0]).column.get())) {
auto col_to = ColumnVector<ToFieldType>::create();
const typename ColumnVector<FromFieldType>::Container &vec_from = col_from->getData();
typename ColumnVector<ToFieldType>::Container &vec_to = col_to->getData();
size_t size = vec_from.size();
vec_to.resize(size);

const auto offset_col = block.getByPosition(arguments.back()).column.get();
if (!offset_col->isColumnConst())
throw Exception{
"Second argument of function " + getName() + " must be an integral constant",
ErrorCodes::ILLEGAL_COLUMN};

const auto offset = offset_col->getInt(0);
for (size_t i = 0; i < size; ++i) {
UInt64 result_time = vec_from[i] + offset;
// todo maybe affected by daytime saving, need double check
convertTimeZoneByOffset(vec_from[i], result_time, offset, DateLUT::instance("UTC"));
vec_to[i] = result_time;
}

block.getByPosition(result).column = std::move(col_to);
} else
throw Exception("Illegal column " + block.getByPosition(arguments[0]).column->getName()
+ " of first argument of function " + name,
ErrorCodes::ILLEGAL_COLUMN);
}

};
template <bool convert_from_utc>
class FunctionMyTimeZoneConverter : public IFunction
{
11 changes: 0 additions & 11 deletions dbms/src/Storages/Transaction/Datum.cpp
Original file line number Diff line number Diff line change
@@ -54,17 +54,6 @@ struct DatumOp<tp, typename std::enable_if<tp == TypeTiny || tp == TypeShort ||
}
};

/// Specialized for Date/DateTime/Timestamp, using unflatten/flatten to transform Int to UInt back and forth.
template <TP tp>
struct DatumOp<tp, typename std::enable_if<tp == TypeDate || tp == TypeNewDate || tp == TypeDatetime || tp == TypeTimestamp>::type>
{
static void unflatten(const Field & orig, std::optional<Field> & copy) { copy = static_cast<UInt64>(orig.get<Int64>()); }

static void flatten(const Field & orig, std::optional<Field> & copy) { copy = static_cast<Int64>(orig.get<UInt64>()); }

static bool overflow(const Field &, const ColumnInfo &) { return false; }
};

/// Specialized for Enum, using unflatten/flatten to transform UInt to Int back and forth.
template <TP tp>
struct DatumOp<tp, typename std::enable_if<tp == TypeEnum>::type>
6 changes: 3 additions & 3 deletions dbms/src/Storages/Transaction/TiDB.h
Original file line number Diff line number Diff line change
@@ -43,12 +43,12 @@ using DB::Timestamp;
M(Float, 4, Float, Float32, false) \
M(Double, 5, Float, Float64, false) \
M(Null, 6, Nil, Nothing, false) \
M(Timestamp, 7, Int, MyDateTime, false) \
M(Timestamp, 7, UInt, MyDateTime, false) \
M(LongLong, 8, Int, Int64, false) \
M(Int24, 9, VarInt, Int32, true) \
M(Date, 10, Int, MyDate, false) \
M(Date, 10, UInt, MyDate, false) \
M(Time, 11, Duration, Int64, false) \
M(Datetime, 12, Int, MyDateTime, false) \
M(Datetime, 12, UInt, MyDateTime, false) \
M(Year, 13, Int, Int16, false) \
M(NewDate, 14, Int, MyDate, false) \
M(Varchar, 15, CompactBytes, String, false) \
64 changes: 64 additions & 0 deletions tests/mutable-test/txn_dag/time_zone.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# Preparation.
=> DBGInvoke __enable_schema_sync_service('true')

=> DBGInvoke __drop_tidb_table(default, test)
=> drop table if exists default.test

=> DBGInvoke __set_flush_threshold(1000000, 1000000)

# Data.
=> DBGInvoke __mock_tidb_table(default, test, 'col_1 MyDate, col_2 MyDatetime(5), col_3 MyDatetime')
=> DBGInvoke __refresh_schemas()
=> DBGInvoke __put_region(4, 0, 100, default, test)
=> DBGInvoke __raft_insert_row(default, test, 4, 50, '2019-06-10', '2019-06-10 09:00:00', '2019-06-10 09:00:00')
=> DBGInvoke __raft_insert_row(default, test, 4, 51, '2019-06-11', '2019-06-11 07:00:00', '2019-06-11 09:00:00')
=> DBGInvoke __raft_insert_row(default, test, 4, 52, '2019-06-11', '2019-06-11 08:00:00', '2019-06-11 09:00:00')
=> DBGInvoke __raft_insert_row(default, test, 4, 53, '2019-06-12', '2019-06-11 08:00:00', '2019-06-11 09:00:00')

=> DBGInvoke dag('select * from default.test') " --dag_planner="optree
┌──────col_1─┬─────────────────────col_2─┬───────────────col_3─┐
│ 2019-06-10 │ 2019-06-10 09:00:00.00000 │ 2019-06-10 09:00:00 │
│ 2019-06-11 │ 2019-06-11 07:00:00.00000 │ 2019-06-11 09:00:00 │
│ 2019-06-11 │ 2019-06-11 08:00:00.00000 │ 2019-06-11 09:00:00 │
│ 2019-06-12 │ 2019-06-11 08:00:00.00000 │ 2019-06-11 09:00:00 │
└────────────┴───────────────────────────┴─────────────────────┘

# use tz_offset, result is the same since cop will convert the timestamp value to utc timestamp when returing to tidb
=> DBGInvoke dag('select * from default.test',4,28800) " --dag_planner="optree
┌──────col_1─┬─────────────────────col_2─┬───────────────col_3─┐
│ 2019-06-10 │ 2019-06-10 09:00:00.00000 │ 2019-06-10 09:00:00 │
│ 2019-06-11 │ 2019-06-11 07:00:00.00000 │ 2019-06-11 09:00:00 │
│ 2019-06-11 │ 2019-06-11 08:00:00.00000 │ 2019-06-11 09:00:00 │
│ 2019-06-12 │ 2019-06-11 08:00:00.00000 │ 2019-06-11 09:00:00 │
└────────────┴───────────────────────────┴─────────────────────┘

=> DBGInvoke dag('select * from default.test where col_2 > col_3') " --dag_planner="optree

=> DBGInvoke dag('select * from default.test where col_2 > col_3',4,28800) " --dag_planner="optree
┌──────col_1─┬─────────────────────col_2─┬───────────────col_3─┐
│ 2019-06-10 │ 2019-06-10 09:00:00.00000 │ 2019-06-10 09:00:00 │
│ 2019-06-11 │ 2019-06-11 07:00:00.00000 │ 2019-06-11 09:00:00 │
│ 2019-06-11 │ 2019-06-11 08:00:00.00000 │ 2019-06-11 09:00:00 │
│ 2019-06-12 │ 2019-06-11 08:00:00.00000 │ 2019-06-11 09:00:00 │
└────────────┴───────────────────────────┴─────────────────────┘

# tz_name overwrite tz_offset
=> DBGInvoke dag('select * from default.test where col_2 > col_3',4,28800,'UTC') " --dag_planner="optree

# ts_col in group by clause
=> DBGInvoke dag('select count(1) from default.test where col_2 > \'2019-06-11 15:00:00\' group by col_2',4,28800) " --dag_planner="optree
┌─count(1)─┬─────────────────────col_2─┐
│ 2 │ 2019-06-11 08:00:00.00000 │
└──────────┴───────────────────────────┘

# ts_col in agg clause
=> DBGInvoke dag('select max(col_2) from default.test group by col_1',4,28800) " --dag_planner="optree
┌──────────max(col_2)─┬──────col_1─┐
│ 2019-06-11 08:00:00 │ 2019-06-12 │
│ 2019-06-11 08:00:00 │ 2019-06-11 │
│ 2019-06-10 09:00:00 │ 2019-06-10 │
└─────────────────────┴────────────┘

# Clean up.
=> DBGInvoke __drop_tidb_table(default, test)
=> drop table if exists default.test