diff --git a/dbms/src/Flash/Coprocessor/ArrowChunkCodec.cpp b/dbms/src/Flash/Coprocessor/ArrowChunkCodec.cpp index bc4695906c2..cb74f72e32d 100644 --- a/dbms/src/Flash/Coprocessor/ArrowChunkCodec.cpp +++ b/dbms/src/Flash/Coprocessor/ArrowChunkCodec.cpp @@ -57,7 +57,7 @@ Block ArrowChunkCodec::decode(const tipb::Chunk & chunk, const DAGSchema & schem pos++; } } - Int8 field_length = getFieldLength(field.second.tp); + Int8 field_length = getFieldLengthForArrowEncode(field.second.tp); std::vector offsets; if (field_length == VAR_SIZE) { diff --git a/dbms/src/Flash/Coprocessor/DAGDriver.cpp b/dbms/src/Flash/Coprocessor/DAGDriver.cpp index 164a35b23ea..9429a743844 100644 --- a/dbms/src/Flash/Coprocessor/DAGDriver.cpp +++ b/dbms/src/Flash/Coprocessor/DAGDriver.cpp @@ -7,6 +7,7 @@ #include #include #include +#include #include #include #include @@ -66,8 +67,8 @@ try BlockOutputStreamPtr dag_output_stream = std::make_shared(dag_response, context.getSettings().dag_records_per_chunk, - dag_request.encode_type(), - dag.getResultFieldTypes(), + dag.getEncodeType(), + dag.getResultFieldTypes(dag.getDAGContext().void_result_ft), streams.in->getHeader()); copyData(*streams.in, *dag_output_stream); diff --git a/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.cpp b/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.cpp index 73ad417c4fe..11b446656cb 100644 --- a/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.cpp +++ b/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.cpp @@ -268,19 +268,15 @@ String DAGExpressionAnalyzer::appendTimeZoneCast( // add timezone cast after table scan, this is used for session level timezone support // the basic idea of supporting session level timezone is that: -// 1. for every timestamp column used in the dag request, after reading it from table scan, we add -// cast function to convert its timezone to the timezone specified in DAG request -// 2. for every timestamp column that will be returned to TiDB, we add cast function to convert its -// timezone to UTC -// for timestamp columns without any transformation or calculation(e.g. select ts_col from table), -// this will introduce two useless casts, in order to avoid these redundant cast, when cast the ts -// column to the columns with session-level timezone info, the original ts columns with UTC -// timezone are still kept -// for DAG request that does not contain agg, the final project will select the ts column with UTC -// timezone, which is exactly what TiDB want -// for DAG request that contains agg, any ts column after agg has session-level timezone info(since the ts -// column with UTC timezone will never be used in during agg), all the column with ts datatype will -// convert back to UTC timezone +// 1. for every timestamp column used in the dag request, after reading it from table scan, +// we add cast function to convert its timezone to the timezone specified in DAG request +// 2. based on the dag encode type, the return column will be with session level timezone(Arrow encode) +// or UTC timezone(Default encode), if UTC timezone is needed, another cast function is used to +// convert the session level timezone to UTC timezone. +// In the worst case(e.g select ts_col from table with Default encode), this will introduce two +// useless casts to all the timestamp columns, in order to avoid redundant cast, when cast the ts +// column to the columns with session-level timezone info, the original ts columns with UTC timezone +// are still kept, and the InterpreterDAG will choose the correct column based on encode type bool DAGExpressionAnalyzer::appendTimeZoneCastsAfterTS( ExpressionActionsChain & chain, std::vector is_ts_column, const tipb::DAGRequest & rqst) { @@ -311,13 +307,13 @@ bool DAGExpressionAnalyzer::appendTimeZoneCastsAfterTS( } void DAGExpressionAnalyzer::appendAggSelect( - ExpressionActionsChain & chain, const tipb::Aggregation & aggregation, const tipb::DAGRequest & rqst) + ExpressionActionsChain & chain, const tipb::Aggregation & aggregation, const tipb::DAGRequest & rqst, bool keep_session_timezone_info) { initChain(chain, getCurrentInputColumns()); bool need_update_aggregated_columns = false; std::vector updated_aggregated_columns; ExpressionActionsChain::Step step = chain.steps.back(); - bool need_append_timezone_cast = hasMeaningfulTZInfo(rqst); + bool need_append_timezone_cast = !keep_session_timezone_info && hasMeaningfulTZInfo(rqst); tipb::Expr tz_expr; if (need_append_timezone_cast) constructTZExpr(tz_expr, rqst, false); diff --git a/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.h b/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.h index 5c4553f52f2..e940ba164c6 100644 --- a/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.h +++ b/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.h @@ -42,7 +42,8 @@ class DAGExpressionAnalyzer : private boost::noncopyable void appendOrderBy(ExpressionActionsChain & chain, const tipb::TopN & topN, Strings & order_column_names); void appendAggregation(ExpressionActionsChain & chain, const tipb::Aggregation & agg, Names & aggregate_keys, AggregateDescriptions & aggregate_descriptions); - void appendAggSelect(ExpressionActionsChain & chain, const tipb::Aggregation & agg, const tipb::DAGRequest & rqst); + void appendAggSelect( + ExpressionActionsChain & chain, const tipb::Aggregation & agg, const tipb::DAGRequest & rqst, bool keep_session_timezone_info); String appendCastIfNeeded(const tipb::Expr & expr, ExpressionActionsPtr & actions, const String & expr_name); void initChain(ExpressionActionsChain & chain, const std::vector & columns) const { diff --git a/dbms/src/Flash/Coprocessor/DAGQuerySource.cpp b/dbms/src/Flash/Coprocessor/DAGQuerySource.cpp index c9d7b22871f..d6f2b257d43 100644 --- a/dbms/src/Flash/Coprocessor/DAGQuerySource.cpp +++ b/dbms/src/Flash/Coprocessor/DAGQuerySource.cpp @@ -61,9 +61,15 @@ DAGQuerySource::DAGQuerySource(Context & context_, DAGContext & dag_context_, Re assignOrThrowException(limit_index, i, LIMIT_NAME); break; default: - throw Exception("Unsupported executor in DAG request: " + dag_request.executors(i).DebugString(), ErrorCodes::NOT_IMPLEMENTED); + throw Exception( + "Unsupported executor in DAG request: " + dag_request.executors(i).DebugString(), ErrorCodes::NOT_IMPLEMENTED); } } + encode_type = dag_request.encode_type(); + if (encode_type == tipb::EncodeType::TypeArrow && hasUnsupportedTypeForArrowEncode(getResultFieldTypes({}))) + { + encode_type = tipb::EncodeType::TypeDefault; + } } std::tuple DAGQuerySource::parse(size_t max_query_size) @@ -127,7 +133,7 @@ bool fillExecutorOutputFieldTypes(const tipb::Executor & executor, std::vector DAGQuerySource::getResultFieldTypes() const +std::vector DAGQuerySource::getResultFieldTypes(const tipb::FieldType & void_result_ft) const { std::vector executor_output; for (int i = dag_request.executors_size() - 1; i >= 0; i--) @@ -135,7 +141,7 @@ std::vector DAGQuerySource::getResultFieldTypes() const if (fillExecutorOutputFieldTypes(dag_request.executors(i), executor_output)) { if (executor_output.empty()) - executor_output.push_back(dag_context.void_result_ft); + executor_output.push_back(void_result_ft); break; } } diff --git a/dbms/src/Flash/Coprocessor/DAGQuerySource.h b/dbms/src/Flash/Coprocessor/DAGQuerySource.h index b7f4791ad56..0909c54adef 100644 --- a/dbms/src/Flash/Coprocessor/DAGQuerySource.h +++ b/dbms/src/Flash/Coprocessor/DAGQuerySource.h @@ -80,10 +80,12 @@ class DAGQuerySource : public IQuerySource }; const tipb::DAGRequest & getDAGRequest() const { return dag_request; }; - std::vector getResultFieldTypes() const; + std::vector getResultFieldTypes(const tipb::FieldType & void_result_ft) const; ASTPtr getAST() const { return ast; }; + tipb::EncodeType getEncodeType() const { return encode_type; } + protected: void assertValid(Int32 index, const String & name) const { @@ -110,6 +112,8 @@ class DAGQuerySource : public IQuerySource Int32 order_index = -1; Int32 limit_index = -1; + tipb::EncodeType encode_type; + ASTPtr ast; }; diff --git a/dbms/src/Flash/Coprocessor/DAGUtils.cpp b/dbms/src/Flash/Coprocessor/DAGUtils.cpp index 8fb52b4b5d4..e20694b8a68 100644 --- a/dbms/src/Flash/Coprocessor/DAGUtils.cpp +++ b/dbms/src/Flash/Coprocessor/DAGUtils.cpp @@ -18,6 +18,8 @@ extern const int UNSUPPORTED_METHOD; extern const int LOGICAL_ERROR; } // namespace ErrorCodes +const Int8 VAR_SIZE = 0; + bool isFunctionExpr(const tipb::Expr & expr) { return expr.tp() == tipb::ExprType::ScalarFunc || isAggFunctionExpr(expr); } const String & getAggFunctionName(const tipb::Expr & expr) @@ -262,6 +264,49 @@ bool exprHasValidFieldType(const tipb::Expr & expr) return expr.has_field_type() && !(expr.field_type().tp() == TiDB::TP::TypeNewDecimal && expr.field_type().decimal() == -1); } +bool hasUnsupportedTypeForArrowEncode(const std::vector & types) +{ + for (const auto & type : types) + if (type.tp() == TiDB::TypeSet || type.tp() == TiDB::TypeTime || type.tp() == TiDB::TypeEnum || type.tp() == TiDB::TypeBit) + return true; + return false; +} + +UInt8 getFieldLengthForArrowEncode(Int32 tp) +{ + switch (tp) + { + case TiDB::TypeTiny: + case TiDB::TypeShort: + case TiDB::TypeInt24: + case TiDB::TypeLong: + case TiDB::TypeLongLong: + case TiDB::TypeYear: + case TiDB::TypeDouble: + return 8; + case TiDB::TypeFloat: + return 4; + case TiDB::TypeDecimal: + case TiDB::TypeNewDecimal: + return 40; + case TiDB::TypeDate: + case TiDB::TypeDatetime: + case TiDB::TypeNewDate: + case TiDB::TypeTimestamp: + return 20; + case TiDB::TypeVarchar: + case TiDB::TypeVarString: + case TiDB::TypeString: + case TiDB::TypeBlob: + case TiDB::TypeTinyBlob: + case TiDB::TypeMediumBlob: + case TiDB::TypeLongBlob: + return VAR_SIZE; + default: + throw Exception("not supported field type in arrow encode: " + std::to_string(tp)); + } +} + void constructStringLiteralTiExpr(tipb::Expr & expr, const String & value) { expr.set_tp(tipb::ExprType::String); diff --git a/dbms/src/Flash/Coprocessor/DAGUtils.h b/dbms/src/Flash/Coprocessor/DAGUtils.h index b346e14234b..06a55defb4d 100644 --- a/dbms/src/Flash/Coprocessor/DAGUtils.h +++ b/dbms/src/Flash/Coprocessor/DAGUtils.h @@ -32,8 +32,11 @@ void constructInt64LiteralTiExpr(tipb::Expr & expr, Int64 value); void constructDateTimeLiteralTiExpr(tipb::Expr & expr, UInt64 packed_value); extern std::unordered_map agg_func_map; extern std::unordered_map scalar_func_map; +extern const Int8 VAR_SIZE; tipb::FieldType columnInfoToFieldType(const TiDB::ColumnInfo & ci); TiDB::ColumnInfo fieldTypeToColumnInfo(const tipb::FieldType & field_type); +bool hasUnsupportedTypeForArrowEncode(const std::vector & types); +UInt8 getFieldLengthForArrowEncode(Int32 tp); } // namespace DB diff --git a/dbms/src/Flash/Coprocessor/InterpreterDAG.cpp b/dbms/src/Flash/Coprocessor/InterpreterDAG.cpp index f0321b3bd7b..9e5122ec71a 100644 --- a/dbms/src/Flash/Coprocessor/InterpreterDAG.cpp +++ b/dbms/src/Flash/Coprocessor/InterpreterDAG.cpp @@ -43,7 +43,10 @@ extern const int COP_BAD_DAG_REQUEST; } // namespace ErrorCodes InterpreterDAG::InterpreterDAG(Context & context_, const DAGQuerySource & dag_) - : context(context_), dag(dag_), log(&Logger::get("InterpreterDAG")) + : context(context_), + dag(dag_), + keep_session_timezone_info(dag.getEncodeType() == tipb::EncodeType::TypeArrow), + log(&Logger::get("InterpreterDAG")) {} template @@ -308,21 +311,39 @@ void InterpreterDAG::executeTS(const tipb::TableScan & ts, Pipeline & pipeline) }); } - addTimeZoneCastAfterTS(is_ts_column, pipeline); + if (addTimeZoneCastAfterTS(is_ts_column, pipeline)) + { + // for arrow encode, the final select of timestamp column should be column with session timezone + if (keep_session_timezone_info && !dag.hasAggregation()) + { + for (auto i : dag.getDAGRequest().output_offsets()) + { + if (is_ts_column[i]) + { + final_project[i].first = analyzer->getCurrentInputColumns()[i].name; + } + } + } + } } // add timezone cast for timestamp type, this is used to support session level timezone -void InterpreterDAG::addTimeZoneCastAfterTS(std::vector & is_ts_column, Pipeline & pipeline) +bool InterpreterDAG::addTimeZoneCastAfterTS(std::vector & is_ts_column, Pipeline & pipeline) { bool hasTSColumn = false; for (auto b : is_ts_column) hasTSColumn |= b; if (!hasTSColumn) - return; + return false; ExpressionActionsChain chain; if (analyzer->appendTimeZoneCastsAfterTS(chain, is_ts_column, dag.getDAGRequest())) + { pipeline.transform([&](auto & stream) { stream = std::make_shared(stream, chain.getLastActions()); }); + return true; + } + else + return false; } InterpreterDAG::AnalysisResult InterpreterDAG::analyzeExpressions() @@ -347,7 +368,7 @@ InterpreterDAG::AnalysisResult InterpreterDAG::analyzeExpressions() chain.clear(); // add cast if type is not match - analyzer->appendAggSelect(chain, dag.getAggregation(), dag.getDAGRequest()); + analyzer->appendAggSelect(chain, dag.getAggregation(), dag.getDAGRequest(), keep_session_timezone_info); //todo use output_offset to reconstruct the final project columns for (auto element : analyzer->getCurrentInputColumns()) { diff --git a/dbms/src/Flash/Coprocessor/InterpreterDAG.h b/dbms/src/Flash/Coprocessor/InterpreterDAG.h index 8a5b7dfe76a..2491d243847 100644 --- a/dbms/src/Flash/Coprocessor/InterpreterDAG.h +++ b/dbms/src/Flash/Coprocessor/InterpreterDAG.h @@ -83,7 +83,7 @@ class InterpreterDAG : public IInterpreter SortDescription getSortDescription(Strings & order_column_names); AnalysisResult analyzeExpressions(); void recordProfileStreams(Pipeline & pipeline, Int32 index); - void addTimeZoneCastAfterTS(std::vector & is_ts_column, Pipeline & pipeline); + bool addTimeZoneCastAfterTS(std::vector & is_ts_column, Pipeline & pipeline); private: Context & context; @@ -101,6 +101,8 @@ class InterpreterDAG : public IInterpreter std::unique_ptr analyzer; + const bool keep_session_timezone_info; + Poco::Logger * log; }; } // namespace DB diff --git a/dbms/src/Flash/Coprocessor/TiDBChunk.cpp b/dbms/src/Flash/Coprocessor/TiDBChunk.cpp index 75794a3334d..2da870b1b8a 100644 --- a/dbms/src/Flash/Coprocessor/TiDBChunk.cpp +++ b/dbms/src/Flash/Coprocessor/TiDBChunk.cpp @@ -21,7 +21,7 @@ TiDBChunk::TiDBChunk(const std::vector & field_types) { for (auto & type : field_types) { - columns.emplace_back(getFieldLength(type.tp())); + columns.emplace_back(getFieldLengthForArrowEncode(type.tp())); } } diff --git a/dbms/src/Flash/Coprocessor/TiDBColumn.h b/dbms/src/Flash/Coprocessor/TiDBColumn.h index d8baef39f7a..c991dad258c 100644 --- a/dbms/src/Flash/Coprocessor/TiDBColumn.h +++ b/dbms/src/Flash/Coprocessor/TiDBColumn.h @@ -1,6 +1,7 @@ #pragma once #include +#include #include #include #include @@ -8,42 +9,6 @@ namespace DB { -const Int8 VAR_SIZE = 0; -inline UInt8 getFieldLength(Int32 tp) -{ - switch (tp) - { - case TiDB::TypeTiny: - case TiDB::TypeShort: - case TiDB::TypeInt24: - case TiDB::TypeLong: - case TiDB::TypeLongLong: - case TiDB::TypeYear: - case TiDB::TypeDouble: - return 8; - case TiDB::TypeFloat: - return 4; - case TiDB::TypeDecimal: - case TiDB::TypeNewDecimal: - return 40; - case TiDB::TypeDate: - case TiDB::TypeDatetime: - case TiDB::TypeNewDate: - case TiDB::TypeTimestamp: - return 20; - case TiDB::TypeVarchar: - case TiDB::TypeVarString: - case TiDB::TypeString: - case TiDB::TypeBlob: - case TiDB::TypeTinyBlob: - case TiDB::TypeMediumBlob: - case TiDB::TypeLongBlob: - return VAR_SIZE; - default: - throw Exception("not supported field type in arrow encode: " + std::to_string(tp)); - } -} - class TiDBColumn { public: diff --git a/tests/mutable-test/txn_dag/time_zone.test b/tests/mutable-test/txn_dag/time_zone.test index 73a863737b1..4e3a0ad8152 100644 --- a/tests/mutable-test/txn_dag/time_zone.test +++ b/tests/mutable-test/txn_dag/time_zone.test @@ -32,6 +32,15 @@ │ 2019-06-12 │ 2019-06-11 08:00:00.00000 │ 2019-06-11 09:00:00 │ └────────────┴───────────────────────────┴─────────────────────┘ +# test arrow encode +=> DBGInvoke dag('select * from default.test',4,'arrow',28800) " --dag_planner="optree +┌──────col_1─┬─────────────────────col_2─┬───────────────col_3─┐ +│ 2019-06-10 │ 2019-06-10 17:00:00.00000 │ 2019-06-10 09:00:00 │ +│ 2019-06-11 │ 2019-06-11 15:00:00.00000 │ 2019-06-11 09:00:00 │ +│ 2019-06-11 │ 2019-06-11 16:00:00.00000 │ 2019-06-11 09:00:00 │ +│ 2019-06-12 │ 2019-06-11 16:00:00.00000 │ 2019-06-11 09:00:00 │ +└────────────┴───────────────────────────┴─────────────────────┘ + => DBGInvoke dag('select * from default.test where col_2 > col_3') " --dag_planner="optree => DBGInvoke dag('select * from default.test where col_2 > col_3',4,'default',28800) " --dag_planner="optree @@ -70,6 +79,14 @@ │ 2019-06-10 09:00:00 │ 2019-06-10 │ └─────────────────────┴────────────┘ +# ts_col in agg clause for arrow encode +=> DBGInvoke dag('select max(col_2) from default.test group by col_1',4,'arrow',28800) " --dag_planner="optree +┌──────────max(col_2)─┬──────col_1─┐ +│ 2019-06-11 16:00:00 │ 2019-06-12 │ +│ 2019-06-11 16:00:00 │ 2019-06-11 │ +│ 2019-06-10 17:00:00 │ 2019-06-10 │ +└─────────────────────┴────────────┘ + # Clean up. => DBGInvoke __drop_tidb_table(default, test) => drop table if exists default.test