Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flash-481 Arrow encode #279

Merged
merged 150 commits into from
Oct 25, 2019
Merged
Changes from 1 commit
Commits
Show all changes
150 commits
Select commit Hold shift + click to select a range
aa92f4e
basic framework for coprocessor support in tiflash
windtalker Jul 30, 2019
4f37218
basic support for InterpreterDagRequestV2
windtalker Jul 30, 2019
85bfd5c
code refine
windtalker Jul 30, 2019
e1700c3
tipb submodule use tipb master branch
windtalker Jul 31, 2019
0f82665
rewrite build flow in InterpreterDagRequest
windtalker Jul 31, 2019
a7655bc
rename Dag to DAG
windtalker Jul 31, 2019
f516f00
Update tipb submodule
zanmato1984 Aug 1, 2019
3b520c9
basic support for selection/limit/topn executor in InterpreterDAGRequest
windtalker Aug 2, 2019
9591d26
Merge branch 'cop' of https://github.com/pingcap/tics into cop
windtalker Aug 2, 2019
ead9609
basic support for selection/limit/topn executor in InterpreterDAGRequ…
windtalker Aug 2, 2019
bed0bd4
merge pingcap/cop branch
windtalker Aug 2, 2019
526cad9
Code reorg
zanmato1984 Aug 4, 2019
be4d80c
Format
zanmato1984 Aug 4, 2019
64a45a9
merge pingcap/cop
windtalker Aug 5, 2019
a76fdb3
merge pingcap/cop
windtalker Aug 5, 2019
0cfe045
Refine code
zanmato1984 Aug 5, 2019
e9b216c
Merge branch 'cop' of https://github.com/pingcap/tics into cop
windtalker Aug 5, 2019
3617a87
basic support for dag agg executor
windtalker Aug 5, 2019
cb55df4
Code refine
zanmato1984 Aug 5, 2019
ed41c93
Merge master into cop
zanmato1984 Aug 5, 2019
08b7142
Refine code
zanmato1984 Aug 5, 2019
bc25942
Another way of getting codec flag
zanmato1984 Aug 5, 2019
059f267
fix cop test regression (#157)
windtalker Aug 6, 2019
e59e8f3
fix npe during dag execute (#160)
windtalker Aug 6, 2019
a618cb5
Add tipb cpp gen in build script
zanmato1984 Aug 6, 2019
4f797fe
Merge branch 'master' into cop
zanmato1984 Aug 6, 2019
bb51749
Fix build error and adjust some formats
zanmato1984 Aug 6, 2019
da1cb0e
Fix build error
zanmato1984 Aug 6, 2019
816ef4b
Fix build error
zanmato1984 Aug 6, 2019
f18fcdd
Update flash configs
zanmato1984 Aug 6, 2019
2ade1cb
Format
zanmato1984 Aug 6, 2019
3870d93
Merge branch 'master' into cop
zanmato1984 Aug 7, 2019
7cb9e71
throw exception when meet error duing cop request handling (#162)
windtalker Aug 7, 2019
5fe66ee
Merge branch 'master' into cop
zanmato1984 Aug 8, 2019
0174b7e
add DAGContext so InterpreterDAG can exchange information with DAGDri…
windtalker Aug 8, 2019
9a1dd23
columnref index is based on executor output schema (#167)
windtalker Aug 8, 2019
26e20d5
Move flash/cop/dag to individual library
zanmato1984 Aug 8, 2019
bf67d9d
Merge cop lib
zanmato1984 Aug 8, 2019
62ced38
DAG planner fix and mock dag request (#169)
zanmato1984 Aug 9, 2019
b346a24
Merge branch 'master' into cop
zanmato1984 Aug 9, 2019
57cd382
Fix DAG get and lock storage
zanmato1984 Aug 9, 2019
4a76e91
handle error in cop request (#171)
windtalker Aug 12, 2019
2d093a8
code refine && several minor bug fix (#174)
windtalker Aug 12, 2019
c8cd3d7
Fix region id in mock dag
zanmato1984 Aug 12, 2019
0492af6
support udf in (#175)
windtalker Aug 14, 2019
4a6bad8
Merge branch 'master' into cop
zanmato1984 Aug 14, 2019
8713ff2
1. fix decode literal expr error, 2. add all scalar function sig in s…
windtalker Aug 14, 2019
7759af1
Merge branch 'master' into cop
zanmato1984 Aug 15, 2019
b25d1cc
some bug fix (#179)
windtalker Aug 15, 2019
3d38b7b
Support all DAG operator types in mock SQL -> DAG parser (#176)
zanmato1984 Aug 15, 2019
cbcfdb0
filter column must be uint8 in tiflash (#180)
windtalker Aug 16, 2019
d87e2d5
1. fix encode null error, 2. fix empty field type generated by TiFlas…
windtalker Aug 16, 2019
17f7fcb
Merge branch 'master' into cop
zanmato1984 Aug 16, 2019
5853b91
check validation of dag exprs field type (#183)
windtalker Aug 19, 2019
0a6767a
Merge branch 'master' into cop
zanmato1984 Aug 19, 2019
d53ca34
Merge branch 'master' into cop
zanmato1984 Aug 20, 2019
5de0ec6
add more coprocessor mock tests (#185)
windtalker Aug 20, 2019
6196171
add some log about implicit cast (#188)
windtalker Aug 21, 2019
960cc56
Merge branch 'master' into cop
zanmato1984 Aug 24, 2019
08bacd7
Pass DAG tests after merging master (#199)
zanmato1984 Aug 24, 2019
e8b4198
Fix date/datetime/bit encode error (#200)
zanmato1984 Aug 26, 2019
61cdc8f
improve dag execution time collection (#202)
windtalker Aug 26, 2019
53dcd1f
Merge branch 'master' into cop
zanmato1984 Aug 27, 2019
10e3883
column id in table scan operator may be -1 (#205)
windtalker Aug 27, 2019
39d1994
quick fix for decimal encode (#210)
windtalker Aug 30, 2019
8a0fb66
support udf like with 3 arguments (#212)
windtalker Sep 2, 2019
ff9a1de
Flash-473 optimize date and datetime comparison (#221)
windtalker Sep 5, 2019
17aacde
Merge master
zanmato1984 Sep 5, 2019
6b14b38
FLASH-479 select from empty table throw error in tiflash (#223)
windtalker Sep 6, 2019
548e519
Update flash service port
zanmato1984 Sep 6, 2019
a1b8444
fix bug in DAGBlockOutputStream
windtalker Sep 10, 2019
fce3676
fix bug in DAGBlockOutputStream (#230)
windtalker Sep 10, 2019
a9f9b48
FLASH-475: Support BATCH COMMANDS in flash service (#232)
zanmato1984 Sep 12, 2019
bdc7d57
init change for array encode
windtalker Sep 12, 2019
516d340
merge pingcap/tics/cop
windtalker Sep 12, 2019
1ccfbd4
Merge branch 'master' into cop
zhexuany Sep 12, 2019
df07939
FLASH-483: Combine raft service and flash service (#235)
zanmato1984 Sep 16, 2019
99f26c0
Merge master
zanmato1984 Sep 16, 2019
0bb7991
Fix build error
zanmato1984 Sep 16, 2019
f41f853
Fix test regression
zanmato1984 Sep 16, 2019
259ec77
Fix null value bug in datum
zanmato1984 Sep 17, 2019
ef65514
Merge branch 'master' into cop
zanmato1984 Sep 17, 2019
708d52f
FLASH-490: Fix table scan with -1 column ID and no agg (#240)
zanmato1984 Sep 23, 2019
3656a95
Merge branch 'master' into cop
zanmato1984 Sep 23, 2019
a4c1074
throw error if the cop request is not based on full region scan (#247)
windtalker Sep 24, 2019
b57656c
Merge branch 'master' into cop
zanmato1984 Sep 25, 2019
3a43942
FLASH-437 Support time zone in coprocessor (#259)
windtalker Sep 27, 2019
01caa55
Merge branch 'master' into cop
zanmato1984 Sep 27, 2019
8d2576e
Address comment
zanmato1984 Sep 29, 2019
8ec5380
Merge branch 'cop' of https://github.com/pingcap/tics into array_encode
windtalker Sep 29, 2019
2e3b1c1
use the new date implementation
windtalker Sep 29, 2019
d33a278
FLASH-489 support key condition for coprocessor query (#261)
windtalker Sep 30, 2019
087faee
Merge branch 'master' into cop
zanmato1984 Sep 30, 2019
4aa2b58
only return execute summaies if requested (#264)
windtalker Sep 30, 2019
aed5e84
Merge branch 'cop' of https://github.com/pingcap/tics into array_encode
windtalker Oct 8, 2019
8663811
refine code
windtalker Oct 8, 2019
80f6f35
Refine service init (#265)
zanmato1984 Oct 8, 2019
0b737dc
fix bug
windtalker Oct 9, 2019
d3af009
fix bug
windtalker Oct 9, 2019
004f7c5
Merge branch 'cop' of https://github.com/pingcap/tics into arrow_encode
windtalker Oct 9, 2019
f255362
FLASH-554 cop check range should be based on region range (#270)
windtalker Oct 10, 2019
170f652
add ut for arrow encode
windtalker Oct 11, 2019
c53e456
Merge branch 'cop' of https://github.com/pingcap/tics into arrow_encode
windtalker Oct 11, 2019
7fc53ad
minor improve (#273)
windtalker Oct 11, 2019
22ad2d3
Merge branch 'master' into cop
zanmato1984 Oct 11, 2019
b01ccb3
update tipb
windtalker Oct 11, 2019
a1304ae
Fix mutex on timezone retrieval (#276)
ilovesoup2000 Oct 11, 2019
687dcbe
Fix race condition of batch command handling (#277)
zanmato1984 Oct 12, 2019
4dd5e1e
Merge branch 'cop' of https://github.com/pingcap/tics into arrow_encode
windtalker Oct 12, 2019
80c20b2
update tipb version
windtalker Oct 12, 2019
7c5bea6
set default record_per_chunk to 1024
windtalker Oct 13, 2019
939b8cf
address comment
windtalker Oct 14, 2019
d25dadc
address comments
windtalker Oct 14, 2019
512fa8e
refine code
windtalker Oct 14, 2019
ff9bf8f
Merge branch 'cop' of https://github.com/pingcap/tics into arrow_encode
windtalker Oct 14, 2019
a6f6dda
refine code
windtalker Oct 14, 2019
a943e8d
add mock_dag test
windtalker Oct 14, 2019
41272da
code refine
windtalker Oct 14, 2019
00dac75
code refine
windtalker Oct 14, 2019
4080fba
address comments
windtalker Oct 14, 2019
1188e69
Merge branch 'cop' of https://github.com/pingcap/tics into arrow_encode
windtalker Oct 14, 2019
d2890e3
Fix NULL order for dag (#281)
zanmato1984 Oct 14, 2019
bc075c5
refine get actions in DAGExpressionAnalyzer, fix bug in dbgFuncCoproc…
windtalker Oct 15, 2019
4dbff78
Merge branch 'cop' of https://github.com/pingcap/tics into arrow_encode
windtalker Oct 15, 2019
fbcbdc0
remove duplicate agg funcs (#283)
windtalker Oct 15, 2019
8f2bfaf
Merge branch 'cop' of https://github.com/pingcap/tics into arrow_encode
windtalker Oct 16, 2019
3716b98
refine code
windtalker Oct 16, 2019
fa42c69
remove useless code
windtalker Oct 16, 2019
7bbe8c0
address comments
windtalker Oct 16, 2019
31973bf
remove uselss include
windtalker Oct 16, 2019
d968c09
address comments
windtalker Oct 16, 2019
edf32d4
Merge branch 'cop' of https://github.com/pingcap/tics into arrow_encode
windtalker Oct 16, 2019
f1256bd
refine code
windtalker Oct 17, 2019
73befbd
address comments
windtalker Oct 17, 2019
3188c07
format code
windtalker Oct 17, 2019
87955d1
fix typo
windtalker Oct 17, 2019
4f58878
Update dbms/src/Flash/BatchCommandsHandler.cpp
zanmato1984 Oct 17, 2019
92c16c2
revert unnecessary changes
windtalker Oct 17, 2019
0f6f0a6
Merge branch 'cop' of https://github.com/pingcap/tics into arrow_encode
windtalker Oct 17, 2019
d550644
refine code
windtalker Oct 17, 2019
bac7951
fix build error
windtalker Oct 17, 2019
4a251b0
refine code
windtalker Oct 17, 2019
e8b92b4
Merge branch 'master' into cop
zanmato1984 Oct 17, 2019
48dd7bd
Merge master
zanmato1984 Oct 18, 2019
a8cba5f
Merge remote-tracking branch 'origin/cop' into arrow_encode_2
windtalker Oct 18, 2019
e3232af
Merge branch 'master' of https://github.com/pingcap/tics into arrow_e…
windtalker Oct 21, 2019
4d5e5d4
address comments
windtalker Oct 21, 2019
c7d8d4e
refine code
windtalker Oct 22, 2019
0b1ed77
address comments
windtalker Oct 25, 2019
683e7e0
Merge branch 'master' into arrow_encode
zanmato1984 Oct 25, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
basic framework for coprocessor support in tiflash
windtalker committed Jul 30, 2019
commit aa92f4edb3af8e94c57baf6e934c0b0543183a4e
4 changes: 4 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -37,3 +37,7 @@
[submodule "contrib/kvproto"]
path = contrib/kvproto
url = https://github.com/pingcap/kvproto.git
[submodule "contrib/tipb"]
path = contrib/tipb
url = https://github.com/pingcap/tipb.git
branch = tipb_cpp
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -266,6 +266,7 @@ include (cmake/find_llvm.cmake)
include (cmake/find_grpc.cmake)
include (cmake/find_kvproto.cmake)
include (cmake/find_curl.cmake)
include (cmake/find_tipb.cmake)


include (cmake/find_contrib_lib.cmake)
10 changes: 10 additions & 0 deletions cmake/find_tipb.cmake
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@

if (NOT EXISTS "${ClickHouse_SOURCE_DIR}/contrib/tipb/cpp/tipb/select.pb.h")
if (EXISTS "${ClickHouse_SOURCE_DIR}/contrib/tipb/proto/select.proto")
message (FATAL_ERROR "tipb cpp files in contrib/tipb is missing. Try go to contrib/tipb, and run ./generate_cpp.sh")
else()
message (FATAL_ERROR "tipb submodule in contrib/tipb is missing. Try run 'git submodule update --init --recursive', and go to contrib/tipb, and run ./generate_cpp.sh")
endif()
endif ()

message(STATUS "Using tipb: ${ClickHouse_SOURCE_DIR}/contrib/tipb/cpp")
1 change: 1 addition & 0 deletions contrib/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
add_subdirectory (kvproto/cpp)
add_subdirectory (client-c)
add_subdirectory (tipb/cpp)

if (NOT MSVC)
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-old-style-cast")
1 change: 1 addition & 0 deletions contrib/tipb
Submodule tipb added at 961b01
2 changes: 2 additions & 0 deletions dbms/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -60,6 +60,7 @@ add_headers_and_sources(dbms src/Storages/Page)
add_headers_and_sources(dbms src/Raft)
add_headers_and_sources(dbms src/TiDB)
add_headers_and_sources(dbms src/Client)
add_headers_and_sources(dbms src/Coprocessor)
add_headers_only(dbms src/Server)

list (APPEND clickhouse_common_io_sources ${CONFIG_BUILD})
@@ -151,6 +152,7 @@ target_link_libraries (dbms
clickhouse_common_io
kvproto
kv_client
tipb
${Protobuf_LIBRARIES}
gRPC::grpc++_unsecure
${CURL_LIBRARIES}
61 changes: 61 additions & 0 deletions dbms/src/Coprocessor/CoprocessorHandler.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
#include <Coprocessor/CoprocessorHandler.h>
#include <Storages/Transaction/TMTContext.h>
#include <Storages/Transaction/SchemaSyncer.h>
#include <Storages/Transaction/Codec.h>
#include <Storages/IStorage.h>
#include <Storages/StorageMergeTree.h>
#include <DataStreams/BlockIO.h>
#include <DataStreams/TidbCopBlockOutputStream.h>
#include <DataStreams/copyData.h>
#include <Interpreters/executeQuery.h>
#include <Interpreters/InterpreterDagRequestV1.h>
#include <Interpreters/InterpreterDagRequestV2.h>

namespace DB
{

CoprocessorHandler::CoprocessorHandler(const coprocessor::Request * cop_request_, coprocessor::Response * cop_response_, CoprocessorContext & context_)
: cop_request(cop_request_), cop_response(cop_response_), context(context_)
{
if(!dag_request.ParseFromString(cop_request->data())) {
throw Exception("Could not extract dag request from coprocessor request");
}
}

CoprocessorHandler::~CoprocessorHandler()
{
}

BlockIO CoprocessorHandler::buildCHPlan() {
String builder_version = context.ch_context.getSettings().coprocessor_plan_builder_version;
if(builder_version == "v1") {
InterpreterDagRequestV1 builder(context, dag_request);
return builder.execute();
} else if (builder_version == "v2"){
//throw Exception("coprocessor plan builder version v2 is not supported yet");
InterpreterDagRequestV2 builder(context, dag_request);
return builder.execute();
} else {
throw Exception("coprocessor plan builder version should be set to v1 or v2");
}
}

bool CoprocessorHandler::execute() {
context.ch_context.setSetting("read_tso", UInt64(dag_request.start_ts()));
//todo set region related info
BlockIO streams = buildCHPlan();
if(!streams.in || streams.out) {
// only query is allowed, so streams.in must not be null and streams.out must be null
return false;
}
tipb::SelectResponse select_response;
BlockOutputStreamPtr outputStreamPtr = std::make_shared<TidbCopBlockOutputStream>(
&select_response, context.ch_context.getSettings().records_per_chunk, dag_request.encode_type(), streams.in->getHeader()
);
copyData(*streams.in, *outputStreamPtr);
cop_response->set_data(select_response.SerializeAsString());
return true;
}

}

43 changes: 43 additions & 0 deletions dbms/src/Coprocessor/CoprocessorHandler.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
#pragma once

#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wunused-parameter"
#include <tipb/select.pb.h>
#include <kvproto/coprocessor.pb.h>
#pragma GCC diagnostic pop

#include <DataStreams/BlockIO.h>
#include <grpcpp/server_context.h>

namespace DB {

struct CoprocessorContext {
Context & ch_context;
const kvrpcpb::Context & kv_context;
grpc::ServerContext & grpc_server_context;
CoprocessorContext(Context & ch_context_, const kvrpcpb::Context & kv_context_,
grpc::ServerContext & grpc_server_context_)
: ch_context(ch_context_), kv_context(kv_context_), grpc_server_context(grpc_server_context_) {
}
};

/** handle coprocesssor request, this is used by tiflash coprocessor.
*/
class CoprocessorHandler {
public:
CoprocessorHandler(const coprocessor::Request *cop_request, coprocessor::Response *response, CoprocessorContext &context);

~CoprocessorHandler();

bool execute();

private:
String buildSqlString();
BlockIO buildCHPlan();
const coprocessor::Request *cop_request;
coprocessor::Response *cop_response;
CoprocessorContext &context;
tipb::DAGRequest dag_request;

};
}
2 changes: 2 additions & 0 deletions dbms/src/Core/Defines.h
Original file line number Diff line number Diff line change
@@ -28,6 +28,8 @@

#define DEFAULT_MAX_READ_TSO 0xFFFFFFFFFFFFFFFF

#define DEFAULT_RECORDS_PER_CHUNK 64L

/** Which blocks by default read the data (by number of rows).
* Smaller values give better cache locality, less consumption of RAM, but more overhead to process the query.
*/
74 changes: 74 additions & 0 deletions dbms/src/DataStreams/TidbCopBlockOutputStream.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@

#include <DataStreams/TidbCopBlockOutputStream.h>
#include <Storages/Transaction/Codec.h>
#include <Storages/Transaction/TypeMapping.h>
#include <DataTypes/DataTypeNullable.h>


namespace DB
{

namespace ErrorCodes {
extern const int UNSUPPORTED_PARAMETER;
}

struct TypeMapping;

TidbCopBlockOutputStream::TidbCopBlockOutputStream(
tipb::SelectResponse *response_, Int64 records_per_chunk_, tipb::EncodeType encodeType_, Block header_)
: response(response_), records_per_chunk(records_per_chunk_), encodeType(encodeType_), header(header_)
{
if(encodeType == tipb::EncodeType::TypeArrow) {
throw Exception("Encode type TypeArrow is not supported yet in TidbCopBlockOutputStream.", ErrorCodes::UNSUPPORTED_PARAMETER);
}
current_chunk = nullptr;
current_records_num = 0;
total_rows = 0;
}


void TidbCopBlockOutputStream::writePrefix()
{
//something to do here?
}

void TidbCopBlockOutputStream::writeSuffix()
{
// error handle,
if(current_chunk != nullptr && records_per_chunk > 0) {
current_chunk->set_rows_data(current_ss.str());
}
}


void TidbCopBlockOutputStream::write(const Block & block)
{
// encode data to chunk
size_t rows = block.rows();
for(size_t i = 0; i < rows; i++) {
if(current_chunk == nullptr || current_records_num >= records_per_chunk) {
if(current_chunk) {
// set the current ss to current chunk
current_chunk->set_rows_data(current_ss.str());
}
current_chunk = response->add_chunks();
current_ss.str("");
records_per_chunk = 0;
}
for(size_t j = 0; j < block.columns(); j++) {
auto field = (*block.getByPosition(j).column.get())[i];
const DataTypePtr & dataTypePtr = block.getByPosition(j).type;
if(dataTypePtr->isNullable()) {
const DataTypePtr real = dynamic_cast<const DataTypeNullable *>(dataTypePtr.get())->getNestedType();
EncodeDatum(field, getCodecFlagByDataType(real), current_ss);
} else {
EncodeDatum(field, getCodecFlagByDataType(block.getByPosition(j).type), current_ss);
}
}
//encode current row
records_per_chunk++;
total_rows++;
}
}

}
43 changes: 43 additions & 0 deletions dbms/src/DataStreams/TidbCopBlockOutputStream.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
#pragma once

#include <DataStreams/IBlockOutputStream.h>
#include <Core/Types.h>
#include <DataTypes/IDataType.h>
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wunused-parameter"
#include <tipb/select.pb.h>

#pragma GCC diagnostic pop

namespace DB
{



/** Serializes the stream of blocks in tidb coprocessor format.
* Designed for communication with tidb via coprocessor.
*/
class TidbCopBlockOutputStream : public IBlockOutputStream
{
public:
TidbCopBlockOutputStream(
tipb::SelectResponse *response, Int64 records_per_chunk, tipb::EncodeType encodeType, Block header);

Block getHeader() const override { return header; }
void write(const Block & block) override;
void writePrefix() override;
void writeSuffix() override;

private:
tipb::SelectResponse *response;
Int64 records_per_chunk;
tipb::EncodeType encodeType;
Block header;
tipb::Chunk *current_chunk;
Int64 current_records_num;
std::stringstream current_ss;
Int64 total_rows;

};

}
1 change: 1 addition & 0 deletions dbms/src/Interpreters/ClientInfo.h
Original file line number Diff line number Diff line change
@@ -24,6 +24,7 @@ class ClientInfo
{
TCP = 1,
HTTP = 2,
GRPC = 2,
};

enum class HTTPMethod : UInt8
403 changes: 403 additions & 0 deletions dbms/src/Interpreters/CoprocessorBuilderUtils.cpp

Large diffs are not rendered by default.

10 changes: 10 additions & 0 deletions dbms/src/Interpreters/CoprocessorBuilderUtils.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#pragma once

#include <unordered_map>

namespace DB {

extern std::unordered_map<tipb::ExprType, String> aggFunMap;
extern std::unordered_map<tipb::ScalarFuncSig, String> scalarFunMap;

}
217 changes: 217 additions & 0 deletions dbms/src/Interpreters/InterpreterDagRequestV1.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
#include <DataStreams/BlockIO.h>
#include <Interpreters/InterpreterDagRequestV1.h>
#include <Core/QueryProcessingStage.h>
#include <Interpreters/executeQuery.h>
#include <Interpreters/CoprocessorBuilderUtils.h>
#include <Storages/Transaction/Types.h>
#include <Storages/Transaction/TMTContext.h>
#include <Storages/Transaction/SchemaSyncer.h>
#include <Storages/StorageMergeTree.h>
#include <Storages/Transaction/Codec.h>

namespace DB {

bool InterpreterDagRequestV1::buildTSString(const tipb::TableScan & ts, std::stringstream & ss) {
TableID id;
if(ts.has_table_id()) {
id = ts.table_id();
} else {
// do not have table id
return false;
}
auto & tmt_ctx = context.ch_context.getTMTContext();
auto storage = tmt_ctx.getStorages().get(id);
if(storage == nullptr) {
tmt_ctx.getSchemaSyncer()->syncSchema(id, context.ch_context, false);
storage = tmt_ctx.getStorages().get(id);
}
if(storage == nullptr) {
return false;
}
const auto * merge_tree = dynamic_cast<const StorageMergeTree *>(storage.get());
if (!merge_tree) {
return false;
}

for(const tipb::ColumnInfo &ci : ts.columns()) {
ColumnID cid = ci.column_id();
String name = merge_tree->getTableInfo().columns[cid-1].name;
column_name_from_ts.emplace(std::make_pair(cid, name));
}
if(column_name_from_ts.empty()) {
// no column selected, must be something wrong
return false;
}
ss << "FROM " << merge_tree->getTableInfo().db_name << "." << merge_tree->getTableInfo().name << " ";
return true;
}

String InterpreterDagRequestV1::exprToString(const tipb::Expr & expr, bool &succ) {
std::stringstream ss;
succ = true;
size_t cursor = 1;
Int64 columnId = 0;
String func_name;
Field f;
switch (expr.tp()) {
case tipb::ExprType::Null:
return "NULL";
case tipb::ExprType::Int64:
return std::to_string(DecodeInt<Int64>(cursor, expr.val()));
case tipb::ExprType::Uint64:
return std::to_string(DecodeInt<UInt64>(cursor, expr.val()));
case tipb::ExprType::Float32:
case tipb::ExprType::Float64:
return std::to_string(DecodeFloat64(cursor, expr.val()));
case tipb::ExprType::String:
//
return expr.val();
case tipb::ExprType::Bytes:
return DecodeBytes(cursor, expr.val());
case tipb::ExprType::ColumnRef:
columnId = DecodeInt<Int64>(cursor, expr.val());
if(getCurrentColumnNames().count(columnId) == 0) {
succ = false;
return "";
}
return getCurrentColumnNames().find(columnId)->second;
case tipb::ExprType::Count:
case tipb::ExprType::Sum:
case tipb::ExprType::Avg:
case tipb::ExprType::Min:
case tipb::ExprType::Max:
case tipb::ExprType::First:
if(!aggFunMap.count(expr.tp())) {
succ = false;
return "";
}
func_name = aggFunMap.find(expr.tp())->second;
break;
case tipb::ExprType::ScalarFunc:
if(!scalarFunMap.count(expr.sig())) {
succ = false;
return "";
}
func_name = scalarFunMap.find(expr.sig())->second;
break;
default:
succ = false;
return "";
}
// build function expr
if(func_name == "in") {
// for in, we could not represent the function expr using func_name(param1, param2, ...)
succ = false;
return "";
} else {
ss << func_name << "(";
bool first = true;
bool sub_succ = true;
for(const tipb::Expr &child : expr.children()) {
String s = exprToString(child, sub_succ);
if(!sub_succ) {
succ = false;
return "";
}
if(first) {
first = false;
} else {
ss << ", ";
}
ss << s;
}
ss << ") ";
return ss.str();
}
}

bool InterpreterDagRequestV1::buildSelString(const tipb::Selection & sel, std::stringstream & ss) {
bool first = true;
for(const tipb::Expr & expr : sel.conditions()) {
bool succ = true;
auto s = exprToString(expr, succ);
if(!succ) {
return false;
}
if(first) {
ss << "WHERE ";
first = false;
} else {
ss << "AND ";
}
ss << s << " ";
}
return true;
}

bool InterpreterDagRequestV1::buildLimitString(const tipb::Limit & limit, std::stringstream & ss) {
ss << "LIMIT " << limit.limit() << " ";
return true;
}

//todo return the error message
bool InterpreterDagRequestV1::buildString(const tipb::Executor & executor, std::stringstream & ss) {
switch (executor.tp()) {
case tipb::ExecType::TypeTableScan:
return buildTSString(executor.tbl_scan(), ss);
case tipb::ExecType::TypeIndexScan:
// index scan not supported
return false;
case tipb::ExecType::TypeSelection:
return buildSelString(executor.selection(), ss);
case tipb::ExecType::TypeAggregation:
// stream agg is not supported, treated as normal agg
case tipb::ExecType::TypeStreamAgg:
//todo support agg
return false;
case tipb::ExecType::TypeTopN:
// todo support top n
return false;
case tipb::ExecType::TypeLimit:
return buildLimitString(executor.limit(), ss);
}
}

bool isProject(const tipb::Executor &) {
// currently, project is not pushed so always return false
return false;
}
InterpreterDagRequestV1::InterpreterDagRequestV1(CoprocessorContext & context_, tipb::DAGRequest & dag_request_)
: context(context_), dag_request(dag_request_) {
afterAgg = false;
}

BlockIO InterpreterDagRequestV1::execute() {
String query = buildSqlString();
return executeQuery(query, context.ch_context, false, QueryProcessingStage::Complete);
}

String InterpreterDagRequestV1::buildSqlString() {
std::stringstream query_buf;
std::stringstream project;
for(const tipb::Executor & executor : dag_request.executors()) {
if(!buildString(executor, query_buf)) {
return "";
}
}
if(!isProject(dag_request.executors(dag_request.executors_size()-1))) {
//append final project
project << "SELECT ";
bool first = true;
for(UInt32 index : dag_request.output_offsets()) {
if(first) {
first = false;
} else {
project << ", ";
}
project << getCurrentColumnNames()[index+1];
}
project << " ";
}
return project.str() + query_buf.str();
}

InterpreterDagRequestV1::~InterpreterDagRequestV1() {

}
}
45 changes: 45 additions & 0 deletions dbms/src/Interpreters/InterpreterDagRequestV1.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
#pragma once

#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wunused-parameter"
#include <tipb/select.pb.h>
#include <tipb/executor.pb.h>
#pragma GCC diagnostic pop

#include <DataStreams/BlockIO.h>
#include <Coprocessor/CoprocessorHandler.h>

namespace DB {

/** build ch plan from dag request: dag executors -> query_string -> ch plan
*/
class InterpreterDagRequestV1 {
public:
InterpreterDagRequestV1(CoprocessorContext & context_, tipb::DAGRequest & dag_request_);

~InterpreterDagRequestV1();

BlockIO execute();

private:
String buildSqlString();
bool buildTSString(const tipb::TableScan & ts, std::stringstream & ss);
String exprToString(const tipb::Expr & expr, bool &succ);
bool buildSelString(const tipb::Selection & sel, std::stringstream & ss);
bool buildLimitString(const tipb::Limit & limit, std::stringstream & ss);
bool buildString(const tipb::Executor & executor, std::stringstream & ss);
CoprocessorContext & context;
tipb::DAGRequest & dag_request;
std::unordered_map<Int64, std::string> column_name_from_ts;
std::unordered_map<Int64, std::string> column_name_from_agg;
bool afterAgg;
std::unordered_map<Int64, std::string> & getCurrentColumnNames() {
if(afterAgg) {
return column_name_from_agg;
}
return column_name_from_ts;
}

};

}
160 changes: 160 additions & 0 deletions dbms/src/Interpreters/InterpreterDagRequestV2.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
#include <DataStreams/BlockIO.h>
#include <Interpreters/InterpreterDagRequestV2.h>
#include <Storages/Transaction/Types.h>
#include <Storages/Transaction/TMTContext.h>
#include <Storages/Transaction/SchemaSyncer.h>
#include <Storages/Transaction/TiDB.h>
#include <Storages/Transaction/Region.h>
#include <Storages/StorageMergeTree.h>
#include <Storages/RegionQueryInfo.h>
#include <Parsers/ASTSelectQuery.h>
#include "CoprocessorBuilderUtils.h"

namespace DB {

namespace ErrorCodes
{
extern const int TOO_MANY_COLUMNS;
}

InterpreterDagRequestV2::InterpreterDagRequestV2(CoprocessorContext & context_, tipb::DAGRequest & dag_request_)
: context(context_), dag_request(dag_request_) {
(void)dag_request;
}

bool InterpreterDagRequestV2::buildTSPlan(const tipb::TableScan & ts, Pipeline & pipeline) {
if(!ts.has_table_id()) {
// do not have table id
return false;
}
TableID id = ts.table_id();
auto & tmt_ctx = context.ch_context.getTMTContext();
auto storage = tmt_ctx.getStorages().get(id);
if(storage == nullptr) {
tmt_ctx.getSchemaSyncer()->syncSchema(id, context.ch_context, false);
storage = tmt_ctx.getStorages().get(id);
}
if(storage == nullptr) {
return false;
}
auto table_lock = storage->lockStructure(false, __PRETTY_FUNCTION__);
const auto * merge_tree = dynamic_cast<const StorageMergeTree *>(storage.get());
if(!merge_tree) {
return false;
}

Names required_columns;
for(const tipb::ColumnInfo & ci : ts.columns()) {
ColumnID cid = ci.column_id();
if(cid < 1 || cid > (Int64)merge_tree->getTableInfo().columns.size()) {
// cid out of bound
return false;
}
String name = merge_tree->getTableInfo().columns[cid - 1].name;
//todo handle output_offset
required_columns.push_back(name);
}
if(required_columns.empty()) {
// no column selected, must be something wrong
return false;
}
// todo handle alias column
const Settings & settings = context.ch_context.getSettingsRef();

if(settings.max_columns_to_read && required_columns.size() > settings.max_columns_to_read) {
throw Exception("Limit for number of columns to read exceeded. "
"Requested: " + toString(required_columns.size())
+ ", maximum: " + settings.max_columns_to_read.toString(),
ErrorCodes::TOO_MANY_COLUMNS);
}

size_t max_block_size = settings.max_block_size;
size_t max_streams = settings.max_threads;
QueryProcessingStage::Enum from_stage = QueryProcessingStage::FetchColumns;
if(max_streams > 1) {
max_streams *= settings.max_streams_to_max_threads_ratio;
}

//todo support index in
SelectQueryInfo query_info;
query_info.query = std::make_unique<ASTSelectQuery>();
query_info.mvcc_query_info = std::make_unique<MvccQueryInfo>();
query_info.mvcc_query_info->resolve_locks = true;
query_info.mvcc_query_info->read_tso = settings.read_tso;
RegionQueryInfo info;
info.region_id = context.kv_context.region_id();
info.conf_version = context.kv_context.region_epoch().conf_ver();
info.version = context.kv_context.region_epoch().version();
auto current_region = context.ch_context.getTMTContext().getRegionTable().getRegionById(id, info.region_id);
if(!current_region) {
return false;
}
info.range_in_table = current_region->getHandleRangeByTable(id);
query_info.mvcc_query_info->regions_query_info.push_back(info);
query_info.mvcc_query_info->concurrent = 0.0;
pipeline.streams = storage->read(required_columns, query_info, context.ch_context, from_stage, max_block_size, max_streams);
/// Set the limits and quota for reading data, the speed and time of the query.
{
IProfilingBlockInputStream::LocalLimits limits;
limits.mode = IProfilingBlockInputStream::LIMITS_TOTAL;
limits.size_limits = SizeLimits(settings.max_rows_to_read, settings.max_bytes_to_read, settings.read_overflow_mode);
limits.max_execution_time = settings.max_execution_time;
limits.timeout_overflow_mode = settings.timeout_overflow_mode;

/** Quota and minimal speed restrictions are checked on the initiating server of the request, and not on remote servers,
* because the initiating server has a summary of the execution of the request on all servers.
*
* But limits on data size to read and maximum execution time are reasonable to check both on initiator and
* additionally on each remote server, because these limits are checked per block of data processed,
* and remote servers may process way more blocks of data than are received by initiator.
*/
limits.min_execution_speed = settings.min_execution_speed;
limits.timeout_before_checking_execution_speed = settings.timeout_before_checking_execution_speed;

QuotaForIntervals & quota = context.ch_context.getQuota();

pipeline.transform([&](auto & stream)
{
if (IProfilingBlockInputStream * p_stream = dynamic_cast<IProfilingBlockInputStream *>(stream.get()))
{
p_stream->setLimits(limits);
p_stream->setQuota(quota);
}
});
}
return true;
}

//todo return the error message
bool InterpreterDagRequestV2::buildPlan(const tipb::Executor & executor, Pipeline & pipeline) {
switch (executor.tp()) {
case tipb::ExecType::TypeTableScan:
return buildTSPlan(executor.tbl_scan(), pipeline);
case tipb::ExecType::TypeIndexScan:
// index scan is not supported
return false;
case tipb::ExecType::TypeSelection:
return false;
case tipb::ExecType::TypeAggregation:
case tipb::ExecType::TypeStreamAgg:
return false;
case tipb::ExecType::TypeTopN:
return false;
case tipb::ExecType::TypeLimit:
return false;
}
}

BlockIO InterpreterDagRequestV2::execute() {
Pipeline pipeline;
for(const tipb::Executor & executor : dag_request.executors()) {
if(!buildPlan(executor, pipeline)) {
return BlockIO();
}
}
return BlockIO();
}
InterpreterDagRequestV2::~InterpreterDagRequestV2() {

}
}
51 changes: 51 additions & 0 deletions dbms/src/Interpreters/InterpreterDagRequestV2.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
#pragma once

#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wunused-parameter"
#include <tipb/select.pb.h>
#include <kvproto/coprocessor.pb.h>
#pragma GCC diagnostic pop

#include <DataStreams/BlockIO.h>
#include <Coprocessor/CoprocessorHandler.h>
#include "CoprocessorBuilderUtils.h"

namespace DB {

/** build ch plan from dag request: dag executors -> ch plan
*/
class InterpreterDagRequestV2 {
public:
InterpreterDagRequestV2(CoprocessorContext & context_, tipb::DAGRequest & dag_request);

~InterpreterDagRequestV2();

BlockIO execute();

private:
CoprocessorContext & context;
tipb::DAGRequest & dag_request;
struct Pipeline
{
BlockInputStreams streams;

BlockInputStreamPtr & firstStream() { return streams.at(0); }

template <typename Transform>
void transform(Transform && transform)
{
for (auto & stream : streams)
transform(stream);
}

bool hasMoreThanOneStream() const
{
return streams.size() > 1;
}
};

bool buildPlan(const tipb::Executor & executor, Pipeline & streams);
bool buildTSPlan(const tipb::TableScan & ts, Pipeline & streams);

};
}
2 changes: 2 additions & 0 deletions dbms/src/Interpreters/Settings.h
Original file line number Diff line number Diff line change
@@ -29,6 +29,8 @@ struct Settings
M(SettingString, regions, "", "the region need to be read.") \
M(SettingBool, resolve_locks, false, "tmt read tso.") \
M(SettingUInt64, read_tso, DEFAULT_MAX_READ_TSO, "tmt read tso.") \
M(SettingInt64, records_per_chunk, DEFAULT_RECORDS_PER_CHUNK, "default chunk size for coprocessor.") \
M(SettingString, coprocessor_plan_builder_version, "v1", "how to build ch plan in coprocessor handler, v1 means build the plan based on string, v2 means build the plan based on cop executor") \
M(SettingUInt64, min_compress_block_size, DEFAULT_MIN_COMPRESS_BLOCK_SIZE, "The actual size of the block to compress, if the uncompressed data less than max_compress_block_size is no less than this value and no less than the volume of data for one mark.") \
M(SettingUInt64, max_compress_block_size, DEFAULT_MAX_COMPRESS_BLOCK_SIZE, "The maximum size of blocks of uncompressed data before compressing for writing to a table.") \
M(SettingUInt64, max_block_size, DEFAULT_BLOCK_SIZE, "Maximum block size for reading") \
7 changes: 6 additions & 1 deletion dbms/src/Server/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -24,7 +24,9 @@ add_library (clickhouse-server-lib
RootRequestHandler.cpp
Server.cpp
StatusFile.cpp
TCPHandler.cpp)
TCPHandler.cpp
FlashService.cpp
cop_test.cpp)

target_link_libraries (clickhouse-server-lib clickhouse_common_io daemon clickhouse_storages_system clickhouse_functions clickhouse_aggregate_functions clickhouse_table_functions)
target_include_directories (clickhouse-server-lib PUBLIC ${ClickHouse_SOURCE_DIR}/libs/libdaemon/include)
@@ -105,6 +107,9 @@ else ()
target_include_directories (theflash BEFORE PRIVATE ${COMMON_INCLUDE_DIR})
target_include_directories (theflash PRIVATE ${CMAKE_CURRENT_BINARY_DIR})

add_executable (copClient cop_test.cpp)
target_link_libraries (copClient clickhouse-server-lib)

if (USE_EMBEDDED_COMPILER)
target_link_libraries (theflash clickhouse-compiler-lib)
endif ()
118 changes: 118 additions & 0 deletions dbms/src/Server/FlashService.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
#include <Server/FlashService.h>
#include <grpcpp/server_builder.h>
#include <grpcpp/security/server_credentials.h>
#include <Core/Types.h>
#include <tipb/select.pb.h>
#include <Coprocessor/CoprocessorHandler.h>
#include <Storages/Transaction/LockException.h>
#include <Storages/Transaction/RegionException.h>

namespace DB
{
const Int64 REQ_TYPE_DAG = 103;
//const Int64 REQ_TYPE_ANALYZE = 104;
//const Int64 REQ_TYPE_CHECKSUM = 105;

FlashService::FlashService(const std::string & address_, IServer & server_)
: server(server_),
address(address_),
log(&Logger::get("FlashService"))
{
grpc::ServerBuilder builder;
builder.AddListeningPort(address, grpc::InsecureServerCredentials());
builder.RegisterService(this);

// todo should set a reasonable value??
builder.SetMaxReceiveMessageSize(-1);
builder.SetMaxSendMessageSize(-1);

grpc_server = builder.BuildAndStart();

LOG_INFO(log, "Flash service listening on [" << address << "]");
}

FlashService::~FlashService()
{
// wait 5 seconds for pending rpcs to gracefully stop
gpr_timespec deadline{5, 0, GPR_TIMESPAN};
LOG_DEBUG(log, "Begin to shutting down grpc server");
grpc_server->Shutdown(deadline);
grpc_server->Wait();
}

String getClientMetaVar(grpc::ServerContext * grpc_context, String name, String default_val) {
if(grpc_context->client_metadata().count(name) != 1) {
return default_val;
} else {
return String(grpc_context->client_metadata().find(name)->second.data());
}
}

::grpc::Status setClientInfo(grpc::ServerContext * grpc_context, Context & server_context) {
auto client_meta = grpc_context->client_metadata();
String query_id = getClientMetaVar(grpc_context, "query_id", "");
server_context.setCurrentQueryId(query_id);
ClientInfo & client_info = server_context.getClientInfo();
client_info.query_kind = ClientInfo::QueryKind::INITIAL_QUERY;
client_info.interface = ClientInfo::Interface::GRPC;
std::string peer = grpc_context->peer();
Int64 pos = peer.find(':');
if(pos == -1) {
return ::grpc::Status(::grpc::StatusCode::INVALID_ARGUMENT, "invalid peer address");
}
std::string client_ip = peer.substr(pos+1);
Poco::Net::SocketAddress client_address(client_ip);
client_info.current_address = client_address;
client_info.current_user = getClientMetaVar(grpc_context, "user", "");
std::string records_per_chunk_str = getClientMetaVar(grpc_context, "records_per_chunk", "");
if(!records_per_chunk_str.empty()) {
server_context.setSetting("records_per_chunk", records_per_chunk_str);
}
std::string builder_version = getClientMetaVar(grpc_context, "builder_version", "v1");
server_context.setSetting("coprocessor_plan_builder_version", builder_version);
return ::grpc::Status::OK;
}

grpc::Status FlashService::Coprocessor(grpc::ServerContext * grpc_context, const coprocessor::Request * request,
coprocessor::Response * response)
{
LOG_DEBUG(log, "receive coprocessor request");
LOG_DEBUG(log, request->DebugString());
Context context = server.context();
context.setGlobalContext(server.context());
setClientInfo(grpc_context, context);
if(request->tp() != REQ_TYPE_DAG) {
LOG_ERROR(log, "Flash service Coprocessor other than dag request not implement yet");
return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "Only DAG request is supported");
}
try {
CoprocessorContext cop_context(context, request->context(), *grpc_context);
CoprocessorHandler coprocessorHandler(request, response, cop_context);
if (coprocessorHandler.execute()) {
LOG_DEBUG(log, "Flash service Coprocessor finished");
return ::grpc::Status(::grpc::StatusCode::OK, "");
} else {
LOG_ERROR(log, "Flash service Coprocessor meet internal error");
return ::grpc::Status(::grpc::StatusCode::INTERNAL, "");
}
} catch (LockException & e) {
//todo set lock error info
LOG_ERROR(log, "meet lock exception");
// clear the data to avoid sending partial data
response->set_data("");
} catch (RegionException & e) {
// todo set region error info
LOG_ERROR(log, "meet region exception");
response->set_data("");
} catch (Exception & e) {
// todo return exception message
LOG_ERROR(log, "meet unknown exception, errmsg: " + e.message());
response->set_data("");
} catch (...) {
LOG_ERROR(log, "meet unknown exception");
response->set_data("");
}
return ::grpc::Status(::grpc::StatusCode::INTERNAL, "");
}

} // namespace DB
39 changes: 39 additions & 0 deletions dbms/src/Server/FlashService.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
#pragma once

#include <grpc++/grpc++.h>
#include <common/logger_useful.h>
#include <boost/noncopyable.hpp>
#include "IServer.h"
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wunused-parameter"
#include <kvproto/tikvpb.grpc.pb.h>
#pragma GCC diagnostic pop

namespace DB
{

using GRPCServerPtr = std::unique_ptr<grpc::Server>;
class FlashService;
using FlashServicePtr = std::shared_ptr<FlashService>;

class FlashService final : public tikvpb::Tikv::Service, public std::enable_shared_from_this<FlashService>, private boost::noncopyable
{
public:
FlashService(const std::string & address_, IServer & server_);

~FlashService() final;

grpc::Status Coprocessor(grpc::ServerContext* context, const coprocessor::Request* request, coprocessor::Response* response);
private:

IServer &server;

std::string address;

GRPCServerPtr grpc_server;

Logger * log;

};

} // namespace DB
15 changes: 15 additions & 0 deletions dbms/src/Server/Server.cpp
Original file line number Diff line number Diff line change
@@ -37,6 +37,7 @@
#include "MetricsTransmitter.h"
#include "StatusFile.h"
#include "TCPHandlerFactory.h"
#include "FlashService.h"

#if Poco_NetSSL_FOUND
#include <Poco/Net/Context.h>
@@ -429,6 +430,20 @@ int Server::main(const std::vector<std::string> & /*args*/)
LOG_INFO(log, "Shutted down raft service.");
});

FlashServicePtr flash_service = nullptr;
if(config().has("flash")) {
String flash_service_addr = config().getString("flash.service_addr");
flash_service = std::make_shared<FlashService>(flash_service_addr, *this);
}

SCOPE_EXIT({
if (flash_service != nullptr) {
LOG_INFO(log, "Shutting down flash service.");
flash_service.reset();
LOG_INFO(log, "Shutted down flash service.");
}
});

{
Poco::Timespan keep_alive_timeout(config().getUInt("keep_alive_timeout", 10), 0);

125 changes: 125 additions & 0 deletions dbms/src/Server/cop_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
#include <grpc++/grpc++.h>
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wunused-parameter"
#include <tipb/expression.pb.h>
#include <tipb/executor.pb.h>
#include <tipb/select.pb.h>
#include <kvproto/tikvpb.grpc.pb.h>
#pragma GCC diagnostic pop
#include <sstream>
#include <Storages/Transaction/Codec.h>


using ChannelPtr = std::shared_ptr<grpc::Channel>;
using SubPtr = std::shared_ptr<tikvpb::Tikv::Stub>;
static const int DAGREQUEST = 103;
class FlashClient {
private:
SubPtr sp;
public:
FlashClient(ChannelPtr cp) : sp(tikvpb::Tikv::NewStub(cp)){
}
grpc::Status coprocessor(coprocessor::Request* rqst) {
grpc::ClientContext clientContext;
clientContext.AddMetadata("user_name","");
clientContext.AddMetadata("builder_version","v1");
coprocessor::Response response;
grpc::Status status = sp->Coprocessor(&clientContext, *rqst, &response);
size_t column_num = 3;
if(status.ok()) {
// if status is ok, try to decode the result
tipb::SelectResponse selectResponse;
if(selectResponse.ParseFromString(response.data())) {
for(tipb::Chunk chunk : selectResponse.chunks()) {
size_t cursor = 0;
std::vector<DB::Field> row_result;
const std::string &data = chunk.rows_data();
while (cursor < data.size()) {
row_result.push_back(DB::DecodeDatum(cursor, data));
if(row_result.size() == column_num) {
//print the result
std::cout << row_result[0].get<DB::Int64>()
<< " "<< row_result[1].get<DB::String>()
<< " "<< row_result[2].get<DB::Int64>() << std::endl;
row_result.clear();
}
}

}
}
}
return status;
}
};

using ClientPtr = std::shared_ptr<FlashClient>;
grpc::Status rpcTest() {
ChannelPtr cp = grpc::CreateChannel("localhost:9093", grpc::InsecureChannelCredentials());
ClientPtr clientPtr = std::make_shared<FlashClient>(cp);
// construct a dag request
tipb::DAGRequest dagRequest;
dagRequest.set_start_ts(18446744073709551615uL);
tipb::Executor *executor = dagRequest.add_executors();
executor->set_tp(tipb::ExecType::TypeTableScan);
tipb::TableScan *ts = executor->mutable_tbl_scan();
ts->set_table_id(41);
tipb::ColumnInfo * ci = ts->add_columns();
ci->set_column_id(1);
ci = ts->add_columns();
ci->set_column_id(2);
dagRequest.add_output_offsets(1);
dagRequest.add_output_offsets(0);
dagRequest.add_output_offsets(1);
executor = dagRequest.add_executors();
executor->set_tp(tipb::ExecType::TypeSelection);
tipb::Selection *selection = executor->mutable_selection();
tipb::Expr *expr = selection->add_conditions();
expr->set_tp(tipb::ExprType::ScalarFunc);
expr->set_sig(tipb::ScalarFuncSig::LTInt);
tipb::Expr *col = expr->add_children();
tipb::Expr *value = expr->add_children();
col->set_tp(tipb::ExprType::ColumnRef);
std::stringstream ss;
DB::EncodeNumber<Int64, TiDB::CodecFlagInt>(2, ss);
col->set_val(ss.str());
value->set_tp(tipb::ExprType::Int64);
ss.str("");
DB::EncodeNumber<Int64, TiDB::CodecFlagInt>(289,ss);
value->set_val(std::string(ss.str()));


// construct a coprocessor request
coprocessor::Request request;
//todo add context info
kvrpcpb::Context *ctx = request.mutable_context();
ctx->set_region_id(2);
auto region_epoch = ctx->mutable_region_epoch();
region_epoch->set_version(20);
region_epoch->set_conf_ver(2);
request.set_tp(DAGREQUEST);
request.set_data(dagRequest.SerializeAsString());
//request.add_ranges();
return clientPtr->coprocessor(&request);
}

void codecTest() {
Int64 i = 123;
std::stringstream ss;
DB::EncodeNumber<Int64, TiDB::CodecFlag::CodecFlagInt>(i, ss);
std::string val = ss.str();
std::stringstream decode_ss;
size_t cursor = 0;
DB::Field f = DB::DecodeDatum(cursor, val);
Int64 r = f.get<Int64>();
r++;
}

int main() {
// std::cout << "Before rpcTest"<< std::endl;
grpc::Status ret = rpcTest();
// codecTest();
// std::cout << "End rpcTest " << std::endl;
// std::cout << "The ret is " << ret.error_code() << " " << ret.error_details()
// << " " << ret.error_message() << std::endl;
return 0;
}
16 changes: 16 additions & 0 deletions dbms/src/Storages/Transaction/RegionTable.cpp
Original file line number Diff line number Diff line change
@@ -512,6 +512,22 @@ void RegionTable::traverseInternalRegionsByTable(const TableID table_id, std::fu
callback(region_info.second);
}

RegionPtr RegionTable::getRegionById(const TableID table_id, const RegionID region_id) {
auto & kvstore = context.getTMTContext().getKVStore();
{
std::lock_guard<std::mutex> lock(mutex);
auto & table = getOrCreateTable(table_id);

for (const auto & region_info : table.regions)
{
if(region_info.second.region_id == region_id) {
return kvstore->getRegion(region_info.second.region_id);
}
}
}
return nullptr;
}

std::vector<std::pair<RegionID, RegionPtr>> RegionTable::getRegionsByTable(const TableID table_id)
{
auto & kvstore = context.getTMTContext().getKVStore();
1 change: 1 addition & 0 deletions dbms/src/Storages/Transaction/RegionTable.h
Original file line number Diff line number Diff line change
@@ -177,6 +177,7 @@ class RegionTable : private boost::noncopyable
void traverseInternalRegions(std::function<void(TableID, InternalRegion &)> && callback);
void traverseInternalRegionsByTable(const TableID table_id, std::function<void(const InternalRegion &)> && callback);
std::vector<std::pair<RegionID, RegionPtr>> getRegionsByTable(const TableID table_id);
RegionPtr getRegionById(const TableID table_id, const RegionID region_id);

static std::tuple<std::optional<Block>, RegionReadStatus> getBlockInputStreamByRegion(TableID table_id,
RegionPtr region,
17 changes: 16 additions & 1 deletion dbms/src/Storages/Transaction/TypeMapping.cpp
Original file line number Diff line number Diff line change
@@ -37,18 +37,23 @@ class TypeMapping : public ext::singleton<TypeMapping>
public:
using Creator = std::function<DataTypePtr(const ColumnInfo & column_info)>;
using TypeMap = std::unordered_map<TiDB::TP, Creator>;
using CodecFlagMap = std::unordered_map<String, TiDB::CodecFlag>;

DataTypePtr getSigned(const ColumnInfo & column_info);

DataTypePtr getUnsigned(const ColumnInfo & column_info);

TiDB::CodecFlag getCodecFlag(const DataTypePtr & dataTypePtr);

private:
TypeMapping();

TypeMap signed_type_map;

TypeMap unsigned_type_map;

CodecFlagMap codec_flag_map;

friend class ext::singleton<TypeMapping>;
};

@@ -61,7 +66,9 @@ TypeMapping::TypeMapping()

#define M(tt, v, cf, cfu, ct, ctu) \
signed_type_map[TiDB::Type##tt] = getDataTypeByColumnInfoBase<DataType##ct>; \
unsigned_type_map[TiDB::Type##tt] = getDataTypeByColumnInfoBase<DataType##ctu>;
unsigned_type_map[TiDB::Type##tt] = getDataTypeByColumnInfoBase<DataType##ctu>; \
codec_flag_map[#ctu] = TiDB::CodecFlag##cfu; \
codec_flag_map[#ct] = TiDB::CodecFlag##cf;
COLUMN_TYPES(M)
#undef M
}
@@ -78,6 +85,14 @@ DataTypePtr TypeMapping::getUnsigned(const ColumnInfo & column_info)
return unsigned_type_map[column_info.tp](column_info);
}

TiDB::CodecFlag TypeMapping::getCodecFlag(const DB::DataTypePtr & dataTypePtr) {
// fixme: String's CodecFlag will be CodecFlagCompactBytes, which is wrong for Json type
return codec_flag_map[dataTypePtr->getFamilyName()];
}

TiDB::CodecFlag getCodecFlagByDataType(const DataTypePtr & dataTypePtr) {
return TypeMapping::instance().getCodecFlag(dataTypePtr);
}

DataTypePtr getDataTypeByColumnInfo(const ColumnInfo & column_info)
{
2 changes: 2 additions & 0 deletions dbms/src/Storages/Transaction/TypeMapping.h
Original file line number Diff line number Diff line change
@@ -11,4 +11,6 @@ using ColumnInfo = TiDB::ColumnInfo;

DataTypePtr getDataTypeByColumnInfo(const ColumnInfo & column_info);

TiDB::CodecFlag getCodecFlagByDataType(const DataTypePtr & dataTypePtr);

}