Skip to content

Commit c88385f

Browse files
committed
add slow_query_parallelism to limit slow query
1 parent a4aa3b6 commit c88385f

File tree

2 files changed

+31
-9
lines changed

2 files changed

+31
-9
lines changed

rpc/src/module/indexer.rs

+24-8
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use std::sync::Arc;
2+
13
use crate::error::RPCError;
24
use async_trait::async_trait;
35
use ckb_indexer::IndexerHandle;
@@ -7,6 +9,7 @@ use ckb_jsonrpc_types::{
79
};
810
use jsonrpc_core::Result;
911
use jsonrpc_utils::rpc;
12+
use tokio::sync::Semaphore;
1013

1114
/// RPC Module Indexer.
1215
#[rpc(openrpc)]
@@ -394,7 +397,7 @@ pub trait IndexerRpc {
394397
/// }
395398
/// ```
396399
#[rpc(name = "get_cells")]
397-
fn get_cells(
400+
async fn get_cells(
398401
&self,
399402
search_key: IndexerSearchKey,
400403
order: IndexerOrder,
@@ -810,7 +813,7 @@ pub trait IndexerRpc {
810813
/// }
811814
/// ```
812815
#[rpc(name = "get_transactions")]
813-
fn get_transactions(
816+
async fn get_transactions(
814817
&self,
815818
search_key: IndexerSearchKey,
816819
order: IndexerOrder,
@@ -877,7 +880,7 @@ pub trait IndexerRpc {
877880
/// }
878881
/// ```
879882
#[rpc(name = "get_cells_capacity")]
880-
fn get_cells_capacity(
883+
async fn get_cells_capacity(
881884
&self,
882885
search_key: IndexerSearchKey,
883886
) -> Result<Option<IndexerCellsCapacity>>;
@@ -886,11 +889,15 @@ pub trait IndexerRpc {
886889
#[derive(Clone)]
887890
pub(crate) struct IndexerRpcImpl {
888891
pub(crate) handle: IndexerHandle,
892+
pub(crate) slow_query_parallelism: Arc<Semaphore>,
889893
}
890894

891895
impl IndexerRpcImpl {
892-
pub fn new(handle: IndexerHandle) -> Self {
893-
IndexerRpcImpl { handle }
896+
pub fn new(handle: IndexerHandle, slow_query_limit: usize) -> Self {
897+
IndexerRpcImpl {
898+
handle,
899+
slow_query_parallelism: Arc::new(Semaphore::new(slow_query_limit)),
900+
}
894901
}
895902
}
896903

@@ -902,34 +909,43 @@ impl IndexerRpc for IndexerRpcImpl {
902909
.map_err(|e| RPCError::custom(RPCError::Indexer, e))
903910
}
904911

905-
fn get_cells(
912+
async fn get_cells(
906913
&self,
907914
search_key: IndexerSearchKey,
908915
order: IndexerOrder,
909916
limit: Uint32,
910917
after: Option<JsonBytes>,
911918
) -> Result<IndexerPagination<IndexerCell>> {
919+
let _permit = Arc::clone(&self.slow_query_parallelism)
920+
.acquire_owned()
921+
.await;
912922
self.handle
913923
.get_cells(search_key, order, limit, after)
914924
.map_err(|e| RPCError::custom(RPCError::Indexer, e))
915925
}
916926

917-
fn get_transactions(
927+
async fn get_transactions(
918928
&self,
919929
search_key: IndexerSearchKey,
920930
order: IndexerOrder,
921931
limit: Uint32,
922932
after: Option<JsonBytes>,
923933
) -> Result<IndexerPagination<IndexerTx>> {
934+
let _permit = Arc::clone(&self.slow_query_parallelism)
935+
.acquire_owned()
936+
.await;
924937
self.handle
925938
.get_transactions(search_key, order, limit, after)
926939
.map_err(|e| RPCError::custom(RPCError::Indexer, e))
927940
}
928941

929-
fn get_cells_capacity(
942+
async fn get_cells_capacity(
930943
&self,
931944
search_key: IndexerSearchKey,
932945
) -> Result<Option<IndexerCellsCapacity>> {
946+
let _permit = Arc::clone(&self.slow_query_parallelism)
947+
.acquire_owned()
948+
.await;
933949
self.handle
934950
.get_cells_capacity(search_key)
935951
.map_err(|e| RPCError::custom(RPCError::Indexer, e))

rpc/src/service_builder.rs

+7-1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use ckb_util::Mutex;
2323
use jsonrpc_core::{MetaIoHandler, RemoteProcedure};
2424
use jsonrpc_utils::pub_sub::Session;
2525
use std::sync::Arc;
26+
use std::thread::available_parallelism;
2627

2728
const DEPRECATED_RPC_PREFIX: &str = "deprecated.";
2829

@@ -208,7 +209,12 @@ impl<'a> ServiceBuilder<'a> {
208209
}
209210

210211
let indexer_handle = indexer.handle();
211-
let methods = IndexerRpcImpl::new(indexer_handle);
212+
let threads_num: usize = self
213+
.config
214+
.threads
215+
.unwrap_or(available_parallelism().unwrap().into());
216+
let slow_query_limit = usize::max(threads_num * 2 / 3, 1);
217+
let methods = IndexerRpcImpl::new(indexer_handle, slow_query_limit);
212218
self = set_rpc_module_methods!(
213219
self,
214220
"Indexer",

0 commit comments

Comments
 (0)