Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add(scan): Implement scan gRPC method #8268

Merged
merged 8 commits into from
Feb 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -5819,6 +5819,7 @@ dependencies = [
"prost",
"serde",
"tokio",
"tokio-stream",
"tonic 0.11.0",
"tonic-build 0.11.0",
"tower",
Expand Down
2 changes: 2 additions & 0 deletions zebra-grpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@ tonic = "0.11.0"
prost = "0.12.3"
serde = { version = "1.0.196", features = ["serde_derive"] }
tokio = { version = "1.36.0", features = ["macros", "rt-multi-thread"] }
tokio-stream = "0.1.14"
tower = { version = "0.4.13", features = ["util", "buffer"] }
color-eyre = "0.6.2"

zcash_primitives = { version = "0.13.0-rc.1" }

zebra-node-services = { path = "../zebra-node-services", version = "1.0.0-beta.34", features = ["shielded-scan"] }
zebra-chain = { path = "../zebra-chain" , version = "1.0.0-beta.34" }

[build-dependencies]
tonic-build = "0.11.0"
Expand Down
29 changes: 25 additions & 4 deletions zebra-grpc/proto/scanner.proto
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ service Scanner {

// Submits scanning keys to the scanner.
rpc RegisterKeys(RegisterKeysRequest) returns (RegisterKeysResponse);

// Register keys and listen to the results
rpc Scan (ScanRequest) returns (stream ScanResponse);
}

// A response to a GetInfo call.
Expand Down Expand Up @@ -69,13 +72,19 @@ message RegisterKeysResponse {
// A result for a single key.
message Results {
// A height, transaction id map
map<uint32, TransactionHash> transactions = 1;
map<uint32, Transactions> by_height = 1;
}

// A vector of transaction hashes
message TransactionHash {
// A transaction id hash
repeated string hash = 1;
message Transactions {
// Transactions
repeated Transaction transactions = 1;
}

// Transaction data
message Transaction {
// The transaction hash/id
string hash = 1;
}

// A scanning key with an optional birth height
Expand All @@ -85,3 +94,15 @@ message KeyWithHeight {
// Birth height of the key
optional uint32 height = 2;
}

// A request for registering keys and getting their transactions
message ScanRequest {
// A set of viewing keys
repeated KeyWithHeight keys = 2;
}

// Response to Scan calls
message ScanResponse {
// Results for each key.
map<string, Results> results = 1;
}
199 changes: 172 additions & 27 deletions zebra-grpc/src/server.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,31 @@
//! The gRPC server implementation

use std::{collections::BTreeMap, net::SocketAddr};
use std::{collections::BTreeMap, net::SocketAddr, pin::Pin};

use futures_util::future::TryFutureExt;
use tokio_stream::{wrappers::ReceiverStream, Stream};
use tonic::{transport::Server, Request, Response, Status};
use tower::ServiceExt;

use zebra_chain::{block::Height, transaction};
use zebra_node_services::scan_service::{
request::Request as ScanServiceRequest, response::Response as ScanServiceResponse,
request::Request as ScanServiceRequest,
response::{Response as ScanServiceResponse, ScanResult},
};

use crate::scanner::{
scanner_server::{Scanner, ScannerServer},
ClearResultsRequest, DeleteKeysRequest, Empty, GetResultsRequest, GetResultsResponse,
InfoReply, RegisterKeysRequest, RegisterKeysResponse, Results, TransactionHash,
InfoReply, KeyWithHeight, RegisterKeysRequest, RegisterKeysResponse, Results, ScanRequest,
ScanResponse, Transaction, Transactions,
};

type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;

/// The maximum number of messages that can be queued to be streamed to a client
/// from the `scan` method.
const SCAN_RESPONDER_BUFFER_SIZE: usize = 10_000;

#[derive(Debug)]
/// The server implementation
pub struct ScannerRPC<ScanService>
Expand All @@ -42,7 +50,104 @@ where
+ 'static,
<ScanService as tower::Service<ScanServiceRequest>>::Future: Send,
{
async fn get_info(&self, _request: Request<Empty>) -> Result<Response<InfoReply>, Status> {
type ScanStream = Pin<Box<dyn Stream<Item = Result<ScanResponse, Status>> + Send>>;

async fn scan(
&self,
request: tonic::Request<ScanRequest>,
) -> Result<Response<Self::ScanStream>, Status> {
let keys = request.into_inner().keys;

if keys.is_empty() {
let msg = "must provide at least 1 key in scan request";
return Err(Status::invalid_argument(msg));
}

arya2 marked this conversation as resolved.
Show resolved Hide resolved
let keys: Vec<_> = keys
.into_iter()
.map(|KeyWithHeight { key, height }| (key, height))
.collect();

let ScanServiceResponse::RegisteredKeys(_) = self
.scan_service
.clone()
.ready()
.and_then(|service| service.call(ScanServiceRequest::RegisterKeys(keys.clone())))
.await
.map_err(|err| Status::unknown(format!("scan service returned error: {err}")))?
else {
return Err(Status::unknown(
"scan service returned an unexpected response",
));
};

let keys: Vec<_> = keys.into_iter().map(|(key, _start_at)| key).collect();

let ScanServiceResponse::Results(results) = self
.scan_service
.clone()
.ready()
.and_then(|service| service.call(ScanServiceRequest::Results(keys.clone())))
.await
.map_err(|err| Status::unknown(format!("scan service returned error: {err}")))?
else {
return Err(Status::unknown(
"scan service returned an unexpected response",
));
};

let ScanServiceResponse::SubscribeResults(mut results_receiver) = self
.scan_service
.clone()
.ready()
.and_then(|service| {
service.call(ScanServiceRequest::SubscribeResults(
keys.iter().cloned().collect(),
))
})
.await
.map_err(|err| Status::unknown(format!("scan service returned error: {err}")))?
else {
return Err(Status::unknown(
"scan service returned an unexpected response",
));
};

let (response_sender, response_receiver) =
tokio::sync::mpsc::channel(SCAN_RESPONDER_BUFFER_SIZE);
let response_stream = ReceiverStream::new(response_receiver);

tokio::spawn(async move {
let initial_results = process_results(keys, results);

let send_result = response_sender
.send(Ok(ScanResponse {
results: initial_results,
}))
.await;

if send_result.is_err() {
// return early if the client has disconnected
return;
}

while let Some(scan_result) = results_receiver.recv().await {
let send_result = response_sender.send(Ok(scan_result.into())).await;

// Finish task if the client has disconnected
if send_result.is_err() {
break;
}
}
});

Ok(Response::new(Box::pin(response_stream)))
}

async fn get_info(
&self,
_request: tonic::Request<Empty>,
) -> Result<Response<InfoReply>, Status> {
let ScanServiceResponse::Info {
min_sapling_birthday_height,
} = self
Expand Down Expand Up @@ -170,34 +275,74 @@ where
));
};

// If there are no results for a key, we still want to return it with empty results.
let empty_map = BTreeMap::new();

let results = keys
.into_iter()
.map(|key| {
let values = response.get(&key).unwrap_or(&empty_map);

// Skip heights with no transactions, they are scanner markers and should not be returned.
let transactions = Results {
transactions: values
.iter()
.filter(|(_, transactions)| !transactions.is_empty())
.map(|(height, transactions)| {
let txs = transactions.iter().map(ToString::to_string).collect();
(height.0, TransactionHash { hash: txs })
})
.collect(),
};

(key, transactions)
})
.collect::<BTreeMap<_, _>>();
let results = process_results(keys, response);

Ok(Response::new(GetResultsResponse { results }))
}
}

fn process_results(
oxarbitrage marked this conversation as resolved.
Show resolved Hide resolved
keys: Vec<String>,
results: BTreeMap<String, BTreeMap<Height, Vec<transaction::Hash>>>,
) -> BTreeMap<String, Results> {
// If there are no results for a key, we still want to return it with empty results.
let empty_map = BTreeMap::new();

keys.into_iter()
.map(|key| {
let values = results.get(&key).unwrap_or(&empty_map);

// Skip heights with no transactions, they are scanner markers and should not be returned.
let transactions = Results {
by_height: values
.iter()
.filter(|(_, transactions)| !transactions.is_empty())
.map(|(height, transactions)| {
let transactions = transactions
.iter()
.map(ToString::to_string)
.map(|hash| Transaction { hash })
.collect();
(height.0, Transactions { transactions })
})
.collect(),
};

(key, transactions)
})
.collect::<BTreeMap<_, _>>()
}

impl From<ScanResult> for ScanResponse {
fn from(
ScanResult {
key,
height: Height(height),
tx_id,
}: ScanResult,
) -> Self {
ScanResponse {
results: [(
key,
Results {
by_height: [(
height,
Transactions {
transactions: [tx_id.to_string()]
.map(|hash| Transaction { hash })
.to_vec(),
},
)]
.into_iter()
.collect(),
},
)]
.into_iter()
.collect(),
}
}
}

/// Initializes the zebra-scan gRPC server
pub async fn init<ScanService>(
listen_addr: SocketAddr,
Expand Down
Loading
Loading