Skip to content

Commit

Permalink
FLASH-591 timezone is not considered in Arrow Encode (#295)
Browse files Browse the repository at this point in the history
* Flash-594 coprocessor support filter on non-uint8 column

* address comments

* fix merge error

* timestamp column should return with session level timezone info in arrow encode mode

* fix bug

* refine code

* address comments

* address comments
  • Loading branch information
windtalker authored Oct 29, 2019
1 parent 7b5c2d9 commit ae3c5cb
Show file tree
Hide file tree
Showing 13 changed files with 127 additions and 66 deletions.
2 changes: 1 addition & 1 deletion dbms/src/Flash/Coprocessor/ArrowChunkCodec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ Block ArrowChunkCodec::decode(const tipb::Chunk & chunk, const DAGSchema & schem
pos++;
}
}
Int8 field_length = getFieldLength(field.second.tp);
Int8 field_length = getFieldLengthForArrowEncode(field.second.tp);
std::vector<UInt64> offsets;
if (field_length == VAR_SIZE)
{
Expand Down
5 changes: 3 additions & 2 deletions dbms/src/Flash/Coprocessor/DAGDriver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <Flash/Coprocessor/DAGBlockOutputStream.h>
#include <Flash/Coprocessor/DAGQuerySource.h>
#include <Flash/Coprocessor/DAGStringConverter.h>
#include <Flash/Coprocessor/DAGUtils.h>
#include <Interpreters/Context.h>
#include <Interpreters/executeQuery.h>
#include <Storages/Transaction/LockException.h>
Expand Down Expand Up @@ -66,8 +67,8 @@ try

BlockOutputStreamPtr dag_output_stream = std::make_shared<DAGBlockOutputStream>(dag_response,
context.getSettings().dag_records_per_chunk,
dag_request.encode_type(),
dag.getResultFieldTypes(),
dag.getEncodeType(),
dag.getResultFieldTypes(dag.getDAGContext().void_result_ft),
streams.in->getHeader());
copyData(*streams.in, *dag_output_stream);

Expand Down
26 changes: 11 additions & 15 deletions dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -268,19 +268,15 @@ String DAGExpressionAnalyzer::appendTimeZoneCast(

// add timezone cast after table scan, this is used for session level timezone support
// the basic idea of supporting session level timezone is that:
// 1. for every timestamp column used in the dag request, after reading it from table scan, we add
// cast function to convert its timezone to the timezone specified in DAG request
// 2. for every timestamp column that will be returned to TiDB, we add cast function to convert its
// timezone to UTC
// for timestamp columns without any transformation or calculation(e.g. select ts_col from table),
// this will introduce two useless casts, in order to avoid these redundant cast, when cast the ts
// column to the columns with session-level timezone info, the original ts columns with UTC
// timezone are still kept
// for DAG request that does not contain agg, the final project will select the ts column with UTC
// timezone, which is exactly what TiDB want
// for DAG request that contains agg, any ts column after agg has session-level timezone info(since the ts
// column with UTC timezone will never be used in during agg), all the column with ts datatype will
// convert back to UTC timezone
// 1. for every timestamp column used in the dag request, after reading it from table scan,
// we add cast function to convert its timezone to the timezone specified in DAG request
// 2. based on the dag encode type, the return column will be with session level timezone(Arrow encode)
// or UTC timezone(Default encode), if UTC timezone is needed, another cast function is used to
// convert the session level timezone to UTC timezone.
// In the worst case(e.g select ts_col from table with Default encode), this will introduce two
// useless casts to all the timestamp columns, in order to avoid redundant cast, when cast the ts
// column to the columns with session-level timezone info, the original ts columns with UTC timezone
// are still kept, and the InterpreterDAG will choose the correct column based on encode type
bool DAGExpressionAnalyzer::appendTimeZoneCastsAfterTS(
ExpressionActionsChain & chain, std::vector<bool> is_ts_column, const tipb::DAGRequest & rqst)
{
Expand Down Expand Up @@ -311,13 +307,13 @@ bool DAGExpressionAnalyzer::appendTimeZoneCastsAfterTS(
}

void DAGExpressionAnalyzer::appendAggSelect(
ExpressionActionsChain & chain, const tipb::Aggregation & aggregation, const tipb::DAGRequest & rqst)
ExpressionActionsChain & chain, const tipb::Aggregation & aggregation, const tipb::DAGRequest & rqst, bool keep_session_timezone_info)
{
initChain(chain, getCurrentInputColumns());
bool need_update_aggregated_columns = false;
std::vector<NameAndTypePair> updated_aggregated_columns;
ExpressionActionsChain::Step step = chain.steps.back();
bool need_append_timezone_cast = hasMeaningfulTZInfo(rqst);
bool need_append_timezone_cast = !keep_session_timezone_info && hasMeaningfulTZInfo(rqst);
tipb::Expr tz_expr;
if (need_append_timezone_cast)
constructTZExpr(tz_expr, rqst, false);
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ class DAGExpressionAnalyzer : private boost::noncopyable
void appendOrderBy(ExpressionActionsChain & chain, const tipb::TopN & topN, Strings & order_column_names);
void appendAggregation(ExpressionActionsChain & chain, const tipb::Aggregation & agg, Names & aggregate_keys,
AggregateDescriptions & aggregate_descriptions);
void appendAggSelect(ExpressionActionsChain & chain, const tipb::Aggregation & agg, const tipb::DAGRequest & rqst);
void appendAggSelect(
ExpressionActionsChain & chain, const tipb::Aggregation & agg, const tipb::DAGRequest & rqst, bool keep_session_timezone_info);
String appendCastIfNeeded(const tipb::Expr & expr, ExpressionActionsPtr & actions, const String & expr_name);
void initChain(ExpressionActionsChain & chain, const std::vector<NameAndTypePair> & columns) const
{
Expand Down
12 changes: 9 additions & 3 deletions dbms/src/Flash/Coprocessor/DAGQuerySource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,15 @@ DAGQuerySource::DAGQuerySource(Context & context_, DAGContext & dag_context_, Re
assignOrThrowException(limit_index, i, LIMIT_NAME);
break;
default:
throw Exception("Unsupported executor in DAG request: " + dag_request.executors(i).DebugString(), ErrorCodes::NOT_IMPLEMENTED);
throw Exception(
"Unsupported executor in DAG request: " + dag_request.executors(i).DebugString(), ErrorCodes::NOT_IMPLEMENTED);
}
}
encode_type = dag_request.encode_type();
if (encode_type == tipb::EncodeType::TypeArrow && hasUnsupportedTypeForArrowEncode(getResultFieldTypes({})))
{
encode_type = tipb::EncodeType::TypeDefault;
}
}

std::tuple<std::string, ASTPtr> DAGQuerySource::parse(size_t max_query_size)
Expand Down Expand Up @@ -127,15 +133,15 @@ bool fillExecutorOutputFieldTypes(const tipb::Executor & executor, std::vector<t
}
}

std::vector<tipb::FieldType> DAGQuerySource::getResultFieldTypes() const
std::vector<tipb::FieldType> DAGQuerySource::getResultFieldTypes(const tipb::FieldType & void_result_ft) const
{
std::vector<tipb::FieldType> executor_output;
for (int i = dag_request.executors_size() - 1; i >= 0; i--)
{
if (fillExecutorOutputFieldTypes(dag_request.executors(i), executor_output))
{
if (executor_output.empty())
executor_output.push_back(dag_context.void_result_ft);
executor_output.push_back(void_result_ft);
break;
}
}
Expand Down
6 changes: 5 additions & 1 deletion dbms/src/Flash/Coprocessor/DAGQuerySource.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,12 @@ class DAGQuerySource : public IQuerySource
};
const tipb::DAGRequest & getDAGRequest() const { return dag_request; };

std::vector<tipb::FieldType> getResultFieldTypes() const;
std::vector<tipb::FieldType> getResultFieldTypes(const tipb::FieldType & void_result_ft) const;

ASTPtr getAST() const { return ast; };

tipb::EncodeType getEncodeType() const { return encode_type; }

protected:
void assertValid(Int32 index, const String & name) const
{
Expand All @@ -110,6 +112,8 @@ class DAGQuerySource : public IQuerySource
Int32 order_index = -1;
Int32 limit_index = -1;

tipb::EncodeType encode_type;

ASTPtr ast;
};

Expand Down
45 changes: 45 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ extern const int UNSUPPORTED_METHOD;
extern const int LOGICAL_ERROR;
} // namespace ErrorCodes

const Int8 VAR_SIZE = 0;

bool isFunctionExpr(const tipb::Expr & expr) { return expr.tp() == tipb::ExprType::ScalarFunc || isAggFunctionExpr(expr); }

const String & getAggFunctionName(const tipb::Expr & expr)
Expand Down Expand Up @@ -262,6 +264,49 @@ bool exprHasValidFieldType(const tipb::Expr & expr)
return expr.has_field_type() && !(expr.field_type().tp() == TiDB::TP::TypeNewDecimal && expr.field_type().decimal() == -1);
}

bool hasUnsupportedTypeForArrowEncode(const std::vector<tipb::FieldType> & types)
{
for (const auto & type : types)
if (type.tp() == TiDB::TypeSet || type.tp() == TiDB::TypeTime || type.tp() == TiDB::TypeEnum || type.tp() == TiDB::TypeBit)
return true;
return false;
}

UInt8 getFieldLengthForArrowEncode(Int32 tp)
{
switch (tp)
{
case TiDB::TypeTiny:
case TiDB::TypeShort:
case TiDB::TypeInt24:
case TiDB::TypeLong:
case TiDB::TypeLongLong:
case TiDB::TypeYear:
case TiDB::TypeDouble:
return 8;
case TiDB::TypeFloat:
return 4;
case TiDB::TypeDecimal:
case TiDB::TypeNewDecimal:
return 40;
case TiDB::TypeDate:
case TiDB::TypeDatetime:
case TiDB::TypeNewDate:
case TiDB::TypeTimestamp:
return 20;
case TiDB::TypeVarchar:
case TiDB::TypeVarString:
case TiDB::TypeString:
case TiDB::TypeBlob:
case TiDB::TypeTinyBlob:
case TiDB::TypeMediumBlob:
case TiDB::TypeLongBlob:
return VAR_SIZE;
default:
throw Exception("not supported field type in arrow encode: " + std::to_string(tp));
}
}

void constructStringLiteralTiExpr(tipb::Expr & expr, const String & value)
{
expr.set_tp(tipb::ExprType::String);
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,11 @@ void constructInt64LiteralTiExpr(tipb::Expr & expr, Int64 value);
void constructDateTimeLiteralTiExpr(tipb::Expr & expr, UInt64 packed_value);
extern std::unordered_map<tipb::ExprType, String> agg_func_map;
extern std::unordered_map<tipb::ScalarFuncSig, String> scalar_func_map;
extern const Int8 VAR_SIZE;

tipb::FieldType columnInfoToFieldType(const TiDB::ColumnInfo & ci);
TiDB::ColumnInfo fieldTypeToColumnInfo(const tipb::FieldType & field_type);
bool hasUnsupportedTypeForArrowEncode(const std::vector<tipb::FieldType> & types);
UInt8 getFieldLengthForArrowEncode(Int32 tp);

} // namespace DB
31 changes: 26 additions & 5 deletions dbms/src/Flash/Coprocessor/InterpreterDAG.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,10 @@ extern const int COP_BAD_DAG_REQUEST;
} // namespace ErrorCodes

InterpreterDAG::InterpreterDAG(Context & context_, const DAGQuerySource & dag_)
: context(context_), dag(dag_), log(&Logger::get("InterpreterDAG"))
: context(context_),
dag(dag_),
keep_session_timezone_info(dag.getEncodeType() == tipb::EncodeType::TypeArrow),
log(&Logger::get("InterpreterDAG"))
{}

template <typename HandleType>
Expand Down Expand Up @@ -308,21 +311,39 @@ void InterpreterDAG::executeTS(const tipb::TableScan & ts, Pipeline & pipeline)
});
}

addTimeZoneCastAfterTS(is_ts_column, pipeline);
if (addTimeZoneCastAfterTS(is_ts_column, pipeline))
{
// for arrow encode, the final select of timestamp column should be column with session timezone
if (keep_session_timezone_info && !dag.hasAggregation())
{
for (auto i : dag.getDAGRequest().output_offsets())
{
if (is_ts_column[i])
{
final_project[i].first = analyzer->getCurrentInputColumns()[i].name;
}
}
}
}
}

// add timezone cast for timestamp type, this is used to support session level timezone
void InterpreterDAG::addTimeZoneCastAfterTS(std::vector<bool> & is_ts_column, Pipeline & pipeline)
bool InterpreterDAG::addTimeZoneCastAfterTS(std::vector<bool> & is_ts_column, Pipeline & pipeline)
{
bool hasTSColumn = false;
for (auto b : is_ts_column)
hasTSColumn |= b;
if (!hasTSColumn)
return;
return false;

ExpressionActionsChain chain;
if (analyzer->appendTimeZoneCastsAfterTS(chain, is_ts_column, dag.getDAGRequest()))
{
pipeline.transform([&](auto & stream) { stream = std::make_shared<ExpressionBlockInputStream>(stream, chain.getLastActions()); });
return true;
}
else
return false;
}

InterpreterDAG::AnalysisResult InterpreterDAG::analyzeExpressions()
Expand All @@ -347,7 +368,7 @@ InterpreterDAG::AnalysisResult InterpreterDAG::analyzeExpressions()
chain.clear();

// add cast if type is not match
analyzer->appendAggSelect(chain, dag.getAggregation(), dag.getDAGRequest());
analyzer->appendAggSelect(chain, dag.getAggregation(), dag.getDAGRequest(), keep_session_timezone_info);
//todo use output_offset to reconstruct the final project columns
for (auto element : analyzer->getCurrentInputColumns())
{
Expand Down
4 changes: 3 additions & 1 deletion dbms/src/Flash/Coprocessor/InterpreterDAG.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class InterpreterDAG : public IInterpreter
SortDescription getSortDescription(Strings & order_column_names);
AnalysisResult analyzeExpressions();
void recordProfileStreams(Pipeline & pipeline, Int32 index);
void addTimeZoneCastAfterTS(std::vector<bool> & is_ts_column, Pipeline & pipeline);
bool addTimeZoneCastAfterTS(std::vector<bool> & is_ts_column, Pipeline & pipeline);

private:
Context & context;
Expand All @@ -101,6 +101,8 @@ class InterpreterDAG : public IInterpreter

std::unique_ptr<DAGExpressionAnalyzer> analyzer;

const bool keep_session_timezone_info;

Poco::Logger * log;
};
} // namespace DB
2 changes: 1 addition & 1 deletion dbms/src/Flash/Coprocessor/TiDBChunk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ TiDBChunk::TiDBChunk(const std::vector<tipb::FieldType> & field_types)
{
for (auto & type : field_types)
{
columns.emplace_back(getFieldLength(type.tp()));
columns.emplace_back(getFieldLengthForArrowEncode(type.tp()));
}
}

Expand Down
37 changes: 1 addition & 36 deletions dbms/src/Flash/Coprocessor/TiDBColumn.h
Original file line number Diff line number Diff line change
@@ -1,49 +1,14 @@
#pragma once

#include <DataStreams/IBlockInputStream.h>
#include <Flash/Coprocessor/DAGUtils.h>
#include <Flash/Coprocessor/TiDBDecimal.h>
#include <Flash/Coprocessor/TiDBTime.h>
#include <Storages/Transaction/TiDB.h>

namespace DB
{

const Int8 VAR_SIZE = 0;
inline UInt8 getFieldLength(Int32 tp)
{
switch (tp)
{
case TiDB::TypeTiny:
case TiDB::TypeShort:
case TiDB::TypeInt24:
case TiDB::TypeLong:
case TiDB::TypeLongLong:
case TiDB::TypeYear:
case TiDB::TypeDouble:
return 8;
case TiDB::TypeFloat:
return 4;
case TiDB::TypeDecimal:
case TiDB::TypeNewDecimal:
return 40;
case TiDB::TypeDate:
case TiDB::TypeDatetime:
case TiDB::TypeNewDate:
case TiDB::TypeTimestamp:
return 20;
case TiDB::TypeVarchar:
case TiDB::TypeVarString:
case TiDB::TypeString:
case TiDB::TypeBlob:
case TiDB::TypeTinyBlob:
case TiDB::TypeMediumBlob:
case TiDB::TypeLongBlob:
return VAR_SIZE;
default:
throw Exception("not supported field type in arrow encode: " + std::to_string(tp));
}
}

class TiDBColumn
{
public:
Expand Down
17 changes: 17 additions & 0 deletions tests/mutable-test/txn_dag/time_zone.test
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,15 @@
│ 2019-06-12 │ 2019-06-11 08:00:00.00000 │ 2019-06-11 09:00:00 │
└────────────┴───────────────────────────┴─────────────────────┘

# test arrow encode
=> DBGInvoke dag('select * from default.test',4,'arrow',28800) " --dag_planner="optree
┌──────col_1─┬─────────────────────col_2─┬───────────────col_3─┐
│ 2019-06-10 │ 2019-06-10 17:00:00.00000 │ 2019-06-10 09:00:00 │
│ 2019-06-11 │ 2019-06-11 15:00:00.00000 │ 2019-06-11 09:00:00 │
│ 2019-06-11 │ 2019-06-11 16:00:00.00000 │ 2019-06-11 09:00:00 │
│ 2019-06-12 │ 2019-06-11 16:00:00.00000 │ 2019-06-11 09:00:00 │
└────────────┴───────────────────────────┴─────────────────────┘

=> DBGInvoke dag('select * from default.test where col_2 > col_3') " --dag_planner="optree

=> DBGInvoke dag('select * from default.test where col_2 > col_3',4,'default',28800) " --dag_planner="optree
Expand Down Expand Up @@ -70,6 +79,14 @@
│ 2019-06-10 09:00:00 │ 2019-06-10 │
└─────────────────────┴────────────┘

# ts_col in agg clause for arrow encode
=> DBGInvoke dag('select max(col_2) from default.test group by col_1',4,'arrow',28800) " --dag_planner="optree
┌──────────max(col_2)─┬──────col_1─┐
│ 2019-06-11 16:00:00 │ 2019-06-12 │
│ 2019-06-11 16:00:00 │ 2019-06-11 │
│ 2019-06-10 17:00:00 │ 2019-06-10 │
└─────────────────────┴────────────┘

# Clean up.
=> DBGInvoke __drop_tidb_table(default, test)
=> drop table if exists default.test

0 comments on commit ae3c5cb

Please sign in to comment.