Skip to content

Commit

Permalink
Cherry-pick: Support Serverless Proxy (pingcap#201)
Browse files Browse the repository at this point in the history
Signed-off-by: CalvinNeo <[email protected]>
  • Loading branch information
CalvinNeo authored May 24, 2024
1 parent fbf91ad commit 99cbdb6
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 10 deletions.
7 changes: 7 additions & 0 deletions cmake/find_tiflash_proxy.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,13 @@ if(NOT EXTERNAL_TIFLASH_PROXY_FOUND)
endif()

set(TIFLASH_PROXY_FOUND TRUE)
# SERVERLESS_PROXY=0 if using normal proxy.
# SERVERLESS_PROXY=1 if using serverless proxy.
if (EXISTS "${TiFlash_SOURCE_DIR}/contrib/tiflash-proxy/proxy_components/proxy_ffi/src/cloud_helper.rs")
add_definitions(-DSERVERLESS_PROXY=1)
else()
add_definitions(-DSERVERLESS_PROXY=0)
endif()

message(STATUS "Using tiflash proxy: ${USE_INTERNAL_TIFLASH_PROXY} : ${TIFLASH_PROXY_INCLUDE_DIR}, ${TIFLASH_PROXY_LIBRARY}")

Expand Down
15 changes: 9 additions & 6 deletions dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -415,12 +415,15 @@ namespace DB
M(tiflash_fap_task_duration_seconds, \
"", \
Histogram, \
F(type_select_stage, {{"type", "select_stage"}}, ExpBucketsWithRange{0.1, 2, 60}), \
F(type_write_stage, {{"type", "write_stage"}}, ExpBucketsWithRange{0.05, 2, 60}), \
F(type_ingest_stage, {{"type", "ingest_stage"}}, ExpBucketsWithRange{0.05, 2, 30}), \
F(type_total, {{"type", "total"}}, ExpBucketsWithRange{0.1, 2, 300}), \
F(type_queue_stage, {{"type", "queue_stage"}}, ExpBucketsWithRange{0.1, 2, 300}), \
F(type_phase1_total, {{"type", "phase1_total"}}, ExpBucketsWithRange{0.2, 2, 80})) \
F(type_select_stage, {{"type", "select_stage"}}, ExpBucketsWithRange{0.2, 2, 120}), \
F(type_write_stage, {{"type", "write_stage"}}, ExpBucketsWithRange{0.2, 2, 120}), \
F(type_write_stage_build, {{"type", "write_stage_build"}}, ExpBucketsWithRange{0.2, 2, 120}), \
F(type_write_stage_raft, {{"type", "write_stage_raft"}}, ExpBucketsWithRange{0.2, 2, 30}), \
F(type_write_stage_insert, {{"type", "write_stage_insert"}}, ExpBucketsWithRange{0.2, 2, 30}), \
F(type_ingest_stage, {{"type", "ingest_stage"}}, ExpBucketsWithRange{0.2, 2, 30}), \
F(type_total, {{"type", "total"}}, ExpBucketsWithRange{0.2, 4, 300}), \
F(type_queue_stage, {{"type", "queue_stage"}}, ExpBucketsWithRange{0.2, 4, 300}), \
F(type_phase1_total, {{"type", "phase1_total"}}, ExpBucketsWithRange{0.2, 4, 300})) \
M(tiflash_raft_command_duration_seconds, \
"Bucketed histogram of some raft command: apply snapshot and ingest SST", \
Histogram, /* these command usually cost several seconds, increase the start bucket to 50ms */ \
Expand Down
5 changes: 5 additions & 0 deletions dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,9 @@ FastAddPeerRes FastAddPeerImplWrite(
GET_METRIC(tiflash_fap_task_result, type_failed_cancel).Increment();
return genFastAddPeerRes(FastAddPeerStatus::Canceled, "", "");
}

auto segments = dm_storage->buildSegmentsFromCheckpointInfo(new_key_range, checkpoint_info, settings);
GET_METRIC(tiflash_fap_task_duration_seconds, type_write_stage_build).Observe(watch.elapsedSecondsFromLastTime());

fap_ctx->insertCheckpointIngestInfo(
tmt,
Expand All @@ -339,6 +341,7 @@ FastAddPeerRes FastAddPeerImplWrite(
region,
std::move(segments),
start_time);
GET_METRIC(tiflash_fap_task_duration_seconds, type_write_stage_insert).Observe(watch.elapsedSecondsFromLastTime());

SYNC_FOR("in_FastAddPeerImplWrite::after_write_segments");
if (cancel_handle->isCanceled())
Expand All @@ -353,6 +356,7 @@ FastAddPeerRes FastAddPeerImplWrite(
GET_METRIC(tiflash_fap_task_result, type_failed_cancel).Increment();
return genFastAddPeerRes(FastAddPeerStatus::Canceled, "", "");
}

// Write raft log to uni ps, we do this here because we store raft log seperately.
// Currently, FAP only handle when the peer is newly created in this store.
// TODO(fap) However, Move this to `ApplyFapSnapshot` and clean stale data, if FAP can later handle all snapshots.
Expand All @@ -369,6 +373,7 @@ FastAddPeerRes FastAddPeerImplWrite(
UniversalPageIdFormat::getU64ID(page_id));
wb.putRemotePage(page_id, 0, size, location, {});
});
GET_METRIC(tiflash_fap_task_duration_seconds, type_write_stage_raft).Observe(watch.elapsedSecondsFromLastTime());
auto wn_ps = tmt.getContext().getWriteNodePageStorage();
wn_ps->write(std::move(wb));
SYNC_FOR("in_FastAddPeerImplWrite::after_write_raft_log");
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/KVStore/MultiRaft/PrehandleSnapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -250,10 +250,10 @@ static inline std::pair<std::vector<std::string>, size_t> getSplitKey(

// Don't change the order of following checks, `getApproxBytes` involves some overhead,
// although it is optimized to bring about the minimum overhead.
#if SERVERLESS_PROXY == 0
if (new_region->getClusterRaftstoreVer() != RaftstoreVer::V2)
return std::make_pair(std::vector<std::string>{}, 0);
if (kvstore->getOngoingPrehandleTaskCount() >= 2)
return std::make_pair(std::vector<std::string>{}, 0);
#endif
auto approx_bytes = sst_stream->getApproxBytes();
if (approx_bytes <= parallel_prehandle_threshold)
{
Expand Down
7 changes: 7 additions & 0 deletions dbms/src/Storages/KVStore/tests/gtest_new_kvstore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -989,4 +989,11 @@ try
}
CATCH

TEST(ProxyMode, Normal)
try
{
ASSERT_EQ(SERVERLESS_PROXY, 1);
}
CATCH

} // namespace DB::tests
3 changes: 1 addition & 2 deletions dbms/src/Storages/Page/V3/Universal/RaftDataReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,8 @@ void RaftDataReader::traverseRemoteRaftLogForRegion(
// 20 = 1(RAFT_PREFIX) + 1(LOCAL_PREFIX) + 1(REGION_RAFT_PREFIX) + 8(region id) + 1(RAFT_LOG_SUFFIX) + 8(raft log index)
RUNTIME_CHECK(page_id.size() == 20, page_id.size());
auto maybe_location = uni_ps.getCheckpointLocation(page_id, snapshot);
RUNTIME_CHECK(maybe_location.has_value());
auto entry = uni_ps.getEntry(page_id, snapshot);
acceptor(page_id, entry.size, *maybe_location);
acceptor(page_id, entry.size, maybe_location.value_or(PS::V3::CheckpointLocation()));
}
}

Expand Down

0 comments on commit 99cbdb6

Please sign in to comment.