Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support all DAG operator types in mock SQL -> DAG parser #176

Merged
merged 27 commits into from
Aug 15, 2019
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
309ab6b
Enhance dbg invoke and add dag as schemaful function
zanmato1984 Aug 7, 2019
31d83c4
Add basic sql parse to dag
zanmato1984 Aug 8, 2019
3c4c508
Merge cop
zanmato1984 Aug 8, 2019
2de7311
Merge branch 'cop' into cop-ruoxi
zanmato1984 Aug 8, 2019
63a5800
Column id starts from 1
zanmato1984 Aug 8, 2019
232e7de
Fix value to ref
zanmato1984 Aug 8, 2019
1b14a12
Add basic dag test
zanmato1984 Aug 8, 2019
25eb831
Fix dag bugs and pass 1st mock test
zanmato1984 Aug 8, 2019
80f9fc6
Make dag go normal routine and add mock dag
zanmato1984 Aug 8, 2019
a1173e1
Add todo
zanmato1984 Aug 8, 2019
c8109f6
Add comment
zanmato1984 Aug 8, 2019
7dc0397
Fix gcc compile error
zanmato1984 Aug 8, 2019
66d9e8a
Enhance dag test
zanmato1984 Aug 8, 2019
36d1117
Address comments
zanmato1984 Aug 9, 2019
8aea5aa
Merge branch 'cop' into cop-ruoxi
zanmato1984 Aug 9, 2019
f62b318
Merge branch 'cop' into cop-ruoxi
zanmato1984 Aug 12, 2019
a9fe9f9
Enhance mock sql -> dag compiler and add project test
zanmato1984 Aug 12, 2019
1372262
Mock sql dag compiler support more expression types and add filter test
zanmato1984 Aug 13, 2019
e2f9a02
Add topn and limit test
zanmato1984 Aug 13, 2019
8cea243
Add agg for sql -> dag parser and agg test
zanmato1984 Aug 13, 2019
5008a7a
Merge branch 'cop' of github.com:pingcap/tics into cop-ruoxi
zanmato1984 Aug 14, 2019
8fb4d52
Add dag specific codec
zanmato1984 Aug 15, 2019
c77310e
Merge branch 'cop' of github.com:pingcap/tics into cop-ruoxi
zanmato1984 Aug 15, 2019
0c8e3a5
type
zanmato1984 Aug 15, 2019
76b5444
Merge branch 'cop' of github.com:pingcap/tics into cop-ruoxi
zanmato1984 Aug 15, 2019
41d2b4f
Update codec accordingly
zanmato1984 Aug 15, 2019
17111f5
Remove cop-test
zanmato1984 Aug 15, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
378 changes: 345 additions & 33 deletions dbms/src/Debug/dbgFuncCoprocessor.cpp

Large diffs are not rendered by default.

65 changes: 65 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGCodec.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
#include <Flash/Coprocessor/DAGCodec.h>

#include <Storages/Transaction/Codec.h>
#include <Storages/Transaction/TiKVRecordFormat.h>

namespace DB
{

void encodeDAGInt64(Int64 i, std::stringstream & ss)
{
auto u = RecordKVFormat::encodeInt64(i);
ss.write(reinterpret_cast<const char *>(&u), sizeof(u));
}

void encodeDAGUInt64(UInt64 i, std::stringstream & ss)
{
auto u = RecordKVFormat::encodeUInt64(i);
ss.write(reinterpret_cast<const char *>(&u), sizeof(u));
}

void encodeDAGFloat32(Float32 f, std::stringstream & ss) { EncodeFloat64(f, ss); }

void encodeDAGFloat64(Float64 f, std::stringstream & ss) { EncodeFloat64(f, ss); }

void encodeDAGString(const String & s, std::stringstream & ss) { ss << s; }

void encodeDAGBytes(const String & bytes, std::stringstream & ss) { ss << bytes; }

void encodeDAGDecimal(const Decimal & d, std::stringstream & ss) { EncodeDecimal(d, ss); }

Int64 decodeDAGInt64(const String & s)
{
auto u = *(reinterpret_cast<const UInt64 *>(s.data()));
return RecordKVFormat::decodeInt64(u);
}

UInt64 decodeDAGUInt64(const String & s)
{
auto u = *(reinterpret_cast<const UInt64 *>(s.data()));
return RecordKVFormat::decodeUInt64(u);
}

Float32 decodeDAGFloat32(const String & s)
{
size_t cursor = 0;
return DecodeFloat64(cursor, s);
}

Float64 decodeDAGFloat64(const String & s)
{
size_t cursor = 0;
return DecodeFloat64(cursor, s);
}

String decodeDAGString(const String & s) { return s; }

String decodeDAGBytes(const String & s) { return s; }

Decimal decodeDAGDecimal(const String & s)
{
size_t cursor = 0;
return DecodeDecimal(cursor, s);
}

} // namespace DB
25 changes: 25 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGCodec.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#pragma once

#include <Common/Decimal.h>
#include <Core/Field.h>

namespace DB
{

void encodeDAGInt64(Int64, std::stringstream &);
void encodeDAGUInt64(UInt64, std::stringstream &);
void encodeDAGFloat32(Float32, std::stringstream &);
void encodeDAGFloat64(Float64, std::stringstream &);
void encodeDAGString(const String &, std::stringstream &);
void encodeDAGBytes(const String &, std::stringstream &);
void encodeDAGDecimal(const Decimal &, std::stringstream &);

Int64 decodeDAGInt64(const String &);
UInt64 decodeDAGUInt64(const String &);
Float32 decodeDAGFloat32(const String &);
Float64 decodeDAGFloat64(const String &);
String decodeDAGString(const String &);
String decodeDAGBytes(const String &);
Decimal decodeDAGDecimal(const String &);

} // namespace DB
33 changes: 17 additions & 16 deletions dbms/src/Flash/Coprocessor/DAGUtils.cpp
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
#include <Flash/Coprocessor/DAGUtils.h>

#include <Core/Types.h>
#include <Flash/Coprocessor/DAGCodec.h>
#include <Interpreters/Context.h>
#include <Storages/Transaction/Codec.h>
#include <Storages/Transaction/TiKVRecordFormat.h>

#include <unordered_map>

Expand Down Expand Up @@ -51,7 +50,6 @@ const String & getFunctionName(const tipb::Expr & expr)
String exprToString(const tipb::Expr & expr, const NamesAndTypesList & input_col, bool for_parser)
{
std::stringstream ss;
size_t cursor = 0;
Int64 column_id = 0;
String func_name;
Field f;
Expand All @@ -60,19 +58,21 @@ String exprToString(const tipb::Expr & expr, const NamesAndTypesList & input_col
case tipb::ExprType::Null:
return "NULL";
case tipb::ExprType::Int64:
return std::to_string(RecordKVFormat::decodeInt64(RecordKVFormat::read<UInt64>(expr.val().data())));
return std::to_string(decodeDAGInt64(expr.val()));
case tipb::ExprType::Uint64:
return std::to_string(DecodeInt<UInt64>(cursor, expr.val()));
return std::to_string(decodeDAGUInt64(expr.val()));
case tipb::ExprType::Float32:
return std::to_string(decodeDAGFloat32(expr.val()));
case tipb::ExprType::Float64:
return std::to_string(DecodeFloat64(cursor, expr.val()));
return std::to_string(decodeDAGFloat64(expr.val()));
case tipb::ExprType::String:
return decodeDAGString(expr.val());
case tipb::ExprType::Bytes:
return expr.val();
return decodeDAGBytes(expr.val());
case tipb::ExprType::MysqlDecimal:
return DecodeDecimal(cursor, expr.val()).toString();
return decodeDAGDecimal(expr.val()).toString();
case tipb::ExprType::ColumnRef:
column_id = RecordKVFormat::decodeInt64(RecordKVFormat::read<UInt64>(expr.val().data()));
column_id = decodeDAGInt64(expr.val());
if (column_id < 0 || column_id >= (ColumnID)input_col.size())
{
throw Exception("Column id out of bound", ErrorCodes::COP_BAD_DAG_REQUEST);
Expand Down Expand Up @@ -191,23 +191,24 @@ bool isColumnExpr(const tipb::Expr & expr) { return expr.tp() == tipb::ExprType:

Field decodeLiteral(const tipb::Expr & expr)
{
size_t cursor = 0;
switch (expr.tp())
{
case tipb::ExprType::Null:
return Field();
case tipb::ExprType::Int64:
return RecordKVFormat::decodeInt64(RecordKVFormat::read<UInt64>(expr.val().data()));
return decodeDAGInt64(expr.val());
case tipb::ExprType::Uint64:
return DecodeInt<UInt64>(cursor, expr.val());
return decodeDAGUInt64(expr.val());
case tipb::ExprType::Float32:
return Float64(decodeDAGFloat32(expr.val()));
case tipb::ExprType::Float64:
return DecodeFloat64(cursor, expr.val());
return decodeDAGFloat64(expr.val());
case tipb::ExprType::String:
return decodeDAGString(expr.val());
case tipb::ExprType::Bytes:
return expr.val();
return decodeDAGBytes(expr.val());
case tipb::ExprType::MysqlDecimal:
return DecodeDecimal(cursor, expr.val());
return decodeDAGDecimal(expr.val());
case tipb::ExprType::MysqlBit:
case tipb::ExprType::MysqlDuration:
case tipb::ExprType::MysqlEnum:
Expand All @@ -224,7 +225,7 @@ Field decodeLiteral(const tipb::Expr & expr)

ColumnID getColumnID(const tipb::Expr & expr)
{
auto column_id = RecordKVFormat::decodeInt64(RecordKVFormat::read<UInt64>(expr.val().data()));
auto column_id = decodeDAGInt64(expr.val());
return column_id;
}

Expand Down
10 changes: 10 additions & 0 deletions dbms/src/Storages/Transaction/TiDB.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,16 @@ enum TP
M(PartKey, (1 << 14)) \
M(Num, (1 << 15))

enum ColumnFlag
{
#ifdef M
#error "Please undefine macro M first."
#endif
#define M(cf, v) ColumnFlag##cf = v,
COLUMN_FLAGS(M)
#undef M
};

// Codec flags.
// In format: TiDB codec flag, int value.
#ifdef M
Expand Down
32 changes: 32 additions & 0 deletions tests/mutable-test/txn_dag/aggregation.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Preparation.
=> DBGInvoke __enable_schema_sync_service('true')

=> DBGInvoke __drop_tidb_table(default, test)
=> drop table if exists default.test

=> DBGInvoke __set_flush_threshold(1000000, 1000000)

# Data.
=> DBGInvoke __mock_tidb_table(default, test, 'col_1 String, col_2 Int64')
=> DBGInvoke __refresh_schemas()
=> DBGInvoke __put_region(4, 0, 100, default, test)
=> DBGInvoke __raft_insert_row(default, test, 4, 50, 'test1', 666)
=> DBGInvoke __raft_insert_row(default, test, 4, 51, 'test2', 666)
=> DBGInvoke __raft_insert_row(default, test, 4, 52, 'test3', 777)

# DAG read by not specifying region id, group by.
=> DBGInvoke dag('select count(col_1) from default.test group by col_2')
┌─count(col_1)─┬─col_2─┐
│ 2 │ 666 │
│ 1 │ 777 │
└──────────────┴───────┘

# DAG read by explicitly specifying region id, where + group by.
=> DBGInvoke dag('select count(col_1) from default.test where col_2 = 666 group by col_2', 4)
┌─count(col_1)─┬─col_2─┐
│ 2 │ 666 │
└──────────────┴───────┘

# Clean up.
=> DBGInvoke __drop_tidb_table(default, test)
=> drop table if exists default.test
37 changes: 37 additions & 0 deletions tests/mutable-test/txn_dag/filter.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# Preparation.
=> DBGInvoke __enable_schema_sync_service('true')

=> DBGInvoke __drop_tidb_table(default, test)
=> drop table if exists default.test

=> DBGInvoke __set_flush_threshold(1000000, 1000000)

# Data.
=> DBGInvoke __mock_tidb_table(default, test, 'col_1 String, col_2 Int64')
=> DBGInvoke __refresh_schemas()
=> DBGInvoke __put_region(4, 0, 100, default, test)
=> DBGInvoke __raft_insert_row(default, test, 4, 50, 'test1', 666)
=> DBGInvoke __raft_insert_row(default, test, 4, 51, 'test2', 777)

# DAG read by not specifying region id, where col_1 = 666.
=> DBGInvoke dag('select * from default.test where col_2 = 666')
┌─col_1─┬─col_2─┐
│ test1 │ 666 │
└───────┴───────┘

# DAG read by explicitly specifying region id, where col_2 = 'test2'.
=> DBGInvoke dag('select col_2 from default.test where col_1 = \'test2\'', 4)
┌─col_2─┐
│ 777 │
└───────┘

# Mock DAG read, where or.
=> DBGInvoke mock_dag('select col_2, col_1, col_2 from default.test where col_1 = \'test2\' or col_2 = 666', 4)
┌─col_2─┬─col_1─┬─col_2─┐
│ 666 │ test1 │ 666 │
│ 777 │ test2 │ 777 │
└───────┴───────┴───────┘

# Clean up.
=> DBGInvoke __drop_tidb_table(default, test)
=> drop table if exists default.test
31 changes: 31 additions & 0 deletions tests/mutable-test/txn_dag/limit.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Preparation.
=> DBGInvoke __enable_schema_sync_service('true')

=> DBGInvoke __drop_tidb_table(default, test)
=> drop table if exists default.test

=> DBGInvoke __set_flush_threshold(1000000, 1000000)

# Data.
=> DBGInvoke __mock_tidb_table(default, test, 'col_1 String, col_2 Int64')
=> DBGInvoke __refresh_schemas()
=> DBGInvoke __put_region(4, 0, 100, default, test)
=> DBGInvoke __raft_insert_row(default, test, 4, 50, 'test1', 666)
=> DBGInvoke __raft_insert_row(default, test, 4, 51, 'test1', 666)

# DAG read by not specifying region id, order by col_2 limit 1.
=> DBGInvoke dag('select * from default.test')
┌─col_1─┬─col_2─┐
│ test1 │ 666 │
│ test1 │ 666 │
└───────┴───────┘

# Mock DAG read, where + topn.
=> DBGInvoke mock_dag('select col_2, col_1, col_2 from default.test where col_2 = 666 limit 1', 4)
┌─col_2─┬─col_1─┬─col_2─┐
│ 666 │ test1 │ 666 │
└───────┴───────┴───────┘

# Clean up.
=> DBGInvoke __drop_tidb_table(default, test)
=> drop table if exists default.test
41 changes: 41 additions & 0 deletions tests/mutable-test/txn_dag/project.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# Preparation.
=> DBGInvoke __enable_schema_sync_service('true')

=> DBGInvoke __drop_tidb_table(default, test)
=> drop table if exists default.test

=> DBGInvoke __set_flush_threshold(1000000, 1000000)

# Data.
=> DBGInvoke __mock_tidb_table(default, test, 'col_1 String, col_2 Int64')
=> DBGInvoke __refresh_schemas()
=> DBGInvoke __put_region(4, 0, 100, default, test)
=> DBGInvoke __raft_insert_row(default, test, 4, 50, 'test1', 666)

# DAG read by not specifying region id, select *.
=> DBGInvoke dag('select * from default.test') " --dag_planner="optree
┌─col_1─┬─col_2─┐
│ test1 │ 666 │
└───────┴───────┘

# DAG read by not specifying region id, select col_1.
=> DBGInvoke dag('select col_1 from default.test') " --dag_planner="optree
┌─col_1─┐
│ test1 │
└───────┘

# DAG read by explicitly specifying region id, select col_2.
=> DBGInvoke dag('select col_2 from default.test', 4) " --dag_planner="optree
┌─col_2─┐
│ 666 │
└───────┘

# Mock DAG read, select col_2, col_1, col_2.
=> DBGInvoke mock_dag('select col_2, col_1, col_2 from default.test', 4) " --dag_planner="optree
┌─col_2─┬─col_1─┬─col_2─┐
│ 666 │ test1 │ 666 │
└───────┴───────┴───────┘

# Clean up.
=> DBGInvoke __drop_tidb_table(default, test)
=> drop table if exists default.test
2 changes: 1 addition & 1 deletion tests/mutable-test/txn_dag/table_scan.test
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

=> DBGInvoke __set_flush_threshold(1000000, 1000000)

# Data
# Data.
=> DBGInvoke __mock_tidb_table(default, test, 'col_1 String')
=> DBGInvoke __refresh_schemas()
=> DBGInvoke __put_region(4, 0, 100, default, test)
Expand Down
30 changes: 30 additions & 0 deletions tests/mutable-test/txn_dag/topn.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Preparation.
=> DBGInvoke __enable_schema_sync_service('true')

=> DBGInvoke __drop_tidb_table(default, test)
=> drop table if exists default.test

=> DBGInvoke __set_flush_threshold(1000000, 1000000)

# Data.
=> DBGInvoke __mock_tidb_table(default, test, 'col_1 String, col_2 Int64')
=> DBGInvoke __refresh_schemas()
=> DBGInvoke __put_region(4, 0, 100, default, test)
=> DBGInvoke __raft_insert_row(default, test, 4, 50, 'test1', 666)
=> DBGInvoke __raft_insert_row(default, test, 4, 51, 'test2', 777)

# DAG read by not specifying region id, order by col_2 limit 1.
=> DBGInvoke dag('select * from default.test order by col_2 limit 1')
┌─col_1─┬─col_2─┐
│ test1 │ 666 │
└───────┴───────┘

# Mock DAG read, where + topn.
=> DBGInvoke mock_dag('select col_2, col_1, col_2 from default.test where col_1 = \'test2\' or col_2 = 666 order by col_1 desc limit 1', 4)
┌─col_2─┬─col_1─┬─col_2─┐
│ 777 │ test2 │ 777 │
└───────┴───────┴───────┘

# Clean up.
=> DBGInvoke __drop_tidb_table(default, test)
=> drop table if exists default.test