diff --git a/cmake/find_tiflash_proxy.cmake b/cmake/find_tiflash_proxy.cmake index 28e9079fc56..7b380669074 100644 --- a/cmake/find_tiflash_proxy.cmake +++ b/cmake/find_tiflash_proxy.cmake @@ -48,6 +48,13 @@ if(NOT EXTERNAL_TIFLASH_PROXY_FOUND) endif() set(TIFLASH_PROXY_FOUND TRUE) +# SERVERLESS_PROXY=0 if using normal proxy. +# SERVERLESS_PROXY=1 if using serverless proxy. +if (EXISTS "${TiFlash_SOURCE_DIR}/contrib/tiflash-proxy/proxy_components/proxy_ffi/src/cloud_helper.rs") + add_definitions(-DSERVERLESS_PROXY=1) +else() + add_definitions(-DSERVERLESS_PROXY=0) +endif() message(STATUS "Using tiflash proxy: ${USE_INTERNAL_TIFLASH_PROXY} : ${TIFLASH_PROXY_INCLUDE_DIR}, ${TIFLASH_PROXY_LIBRARY}") diff --git a/dbms/src/Common/TiFlashMetrics.h b/dbms/src/Common/TiFlashMetrics.h index abeecb7fcf7..5bcfb6a139c 100644 --- a/dbms/src/Common/TiFlashMetrics.h +++ b/dbms/src/Common/TiFlashMetrics.h @@ -415,12 +415,15 @@ namespace DB M(tiflash_fap_task_duration_seconds, \ "", \ Histogram, \ - F(type_select_stage, {{"type", "select_stage"}}, ExpBucketsWithRange{0.1, 2, 60}), \ - F(type_write_stage, {{"type", "write_stage"}}, ExpBucketsWithRange{0.05, 2, 60}), \ - F(type_ingest_stage, {{"type", "ingest_stage"}}, ExpBucketsWithRange{0.05, 2, 30}), \ - F(type_total, {{"type", "total"}}, ExpBucketsWithRange{0.1, 2, 300}), \ - F(type_queue_stage, {{"type", "queue_stage"}}, ExpBucketsWithRange{0.1, 2, 300}), \ - F(type_phase1_total, {{"type", "phase1_total"}}, ExpBucketsWithRange{0.2, 2, 80})) \ + F(type_select_stage, {{"type", "select_stage"}}, ExpBucketsWithRange{0.2, 2, 120}), \ + F(type_write_stage, {{"type", "write_stage"}}, ExpBucketsWithRange{0.2, 2, 120}), \ + F(type_write_stage_build, {{"type", "write_stage_build"}}, ExpBucketsWithRange{0.2, 2, 120}), \ + F(type_write_stage_raft, {{"type", "write_stage_raft"}}, ExpBucketsWithRange{0.2, 2, 30}), \ + F(type_write_stage_insert, {{"type", "write_stage_insert"}}, ExpBucketsWithRange{0.2, 2, 30}), \ + F(type_ingest_stage, {{"type", "ingest_stage"}}, ExpBucketsWithRange{0.2, 2, 30}), \ + F(type_total, {{"type", "total"}}, ExpBucketsWithRange{0.2, 4, 300}), \ + F(type_queue_stage, {{"type", "queue_stage"}}, ExpBucketsWithRange{0.2, 4, 300}), \ + F(type_phase1_total, {{"type", "phase1_total"}}, ExpBucketsWithRange{0.2, 4, 300})) \ M(tiflash_raft_command_duration_seconds, \ "Bucketed histogram of some raft command: apply snapshot and ingest SST", \ Histogram, /* these command usually cost several seconds, increase the start bucket to 50ms */ \ diff --git a/dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeer.cpp b/dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeer.cpp index 50485dd2e3b..9ce5f1843bb 100644 --- a/dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeer.cpp +++ b/dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeer.cpp @@ -329,7 +329,9 @@ FastAddPeerRes FastAddPeerImplWrite( GET_METRIC(tiflash_fap_task_result, type_failed_cancel).Increment(); return genFastAddPeerRes(FastAddPeerStatus::Canceled, "", ""); } + auto segments = dm_storage->buildSegmentsFromCheckpointInfo(new_key_range, checkpoint_info, settings); + GET_METRIC(tiflash_fap_task_duration_seconds, type_write_stage_build).Observe(watch.elapsedSecondsFromLastTime()); fap_ctx->insertCheckpointIngestInfo( tmt, @@ -339,6 +341,7 @@ FastAddPeerRes FastAddPeerImplWrite( region, std::move(segments), start_time); + GET_METRIC(tiflash_fap_task_duration_seconds, type_write_stage_insert).Observe(watch.elapsedSecondsFromLastTime()); SYNC_FOR("in_FastAddPeerImplWrite::after_write_segments"); if (cancel_handle->isCanceled()) @@ -353,6 +356,7 @@ FastAddPeerRes FastAddPeerImplWrite( GET_METRIC(tiflash_fap_task_result, type_failed_cancel).Increment(); return genFastAddPeerRes(FastAddPeerStatus::Canceled, "", ""); } + // Write raft log to uni ps, we do this here because we store raft log seperately. // Currently, FAP only handle when the peer is newly created in this store. // TODO(fap) However, Move this to `ApplyFapSnapshot` and clean stale data, if FAP can later handle all snapshots. @@ -369,6 +373,7 @@ FastAddPeerRes FastAddPeerImplWrite( UniversalPageIdFormat::getU64ID(page_id)); wb.putRemotePage(page_id, 0, size, location, {}); }); + GET_METRIC(tiflash_fap_task_duration_seconds, type_write_stage_raft).Observe(watch.elapsedSecondsFromLastTime()); auto wn_ps = tmt.getContext().getWriteNodePageStorage(); wn_ps->write(std::move(wb)); SYNC_FOR("in_FastAddPeerImplWrite::after_write_raft_log"); diff --git a/dbms/src/Storages/KVStore/MultiRaft/PrehandleSnapshot.cpp b/dbms/src/Storages/KVStore/MultiRaft/PrehandleSnapshot.cpp index af4f111e17e..82f6f26b673 100644 --- a/dbms/src/Storages/KVStore/MultiRaft/PrehandleSnapshot.cpp +++ b/dbms/src/Storages/KVStore/MultiRaft/PrehandleSnapshot.cpp @@ -250,10 +250,10 @@ static inline std::pair, size_t> getSplitKey( // Don't change the order of following checks, `getApproxBytes` involves some overhead, // although it is optimized to bring about the minimum overhead. +#if SERVERLESS_PROXY == 0 if (new_region->getClusterRaftstoreVer() != RaftstoreVer::V2) return std::make_pair(std::vector{}, 0); - if (kvstore->getOngoingPrehandleTaskCount() >= 2) - return std::make_pair(std::vector{}, 0); +#endif auto approx_bytes = sst_stream->getApproxBytes(); if (approx_bytes <= parallel_prehandle_threshold) { diff --git a/dbms/src/Storages/KVStore/tests/gtest_new_kvstore.cpp b/dbms/src/Storages/KVStore/tests/gtest_new_kvstore.cpp index 3bd810bc283..6131665ae84 100644 --- a/dbms/src/Storages/KVStore/tests/gtest_new_kvstore.cpp +++ b/dbms/src/Storages/KVStore/tests/gtest_new_kvstore.cpp @@ -989,4 +989,11 @@ try } CATCH +TEST(ProxyMode, Normal) +try +{ + ASSERT_EQ(SERVERLESS_PROXY, 1); +} +CATCH + } // namespace DB::tests diff --git a/dbms/src/Storages/Page/V3/Universal/RaftDataReader.cpp b/dbms/src/Storages/Page/V3/Universal/RaftDataReader.cpp index c879cbebcaf..d1cc6e2907e 100644 --- a/dbms/src/Storages/Page/V3/Universal/RaftDataReader.cpp +++ b/dbms/src/Storages/Page/V3/Universal/RaftDataReader.cpp @@ -97,9 +97,8 @@ void RaftDataReader::traverseRemoteRaftLogForRegion( // 20 = 1(RAFT_PREFIX) + 1(LOCAL_PREFIX) + 1(REGION_RAFT_PREFIX) + 8(region id) + 1(RAFT_LOG_SUFFIX) + 8(raft log index) RUNTIME_CHECK(page_id.size() == 20, page_id.size()); auto maybe_location = uni_ps.getCheckpointLocation(page_id, snapshot); - RUNTIME_CHECK(maybe_location.has_value()); auto entry = uni_ps.getEntry(page_id, snapshot); - acceptor(page_id, entry.size, *maybe_location); + acceptor(page_id, entry.size, maybe_location.value_or(PS::V3::CheckpointLocation())); } }