Skip to content

Commit

Permalink
draft
Browse files Browse the repository at this point in the history
  • Loading branch information
arya2 committed Feb 13, 2024
1 parent 3929a52 commit 7ee7c93
Show file tree
Hide file tree
Showing 4 changed files with 137 additions and 3 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -5772,10 +5772,12 @@ dependencies = [
"futures-util",
"prost",
"tokio",
"tokio-stream",
"tonic",
"tonic-build",
"tower",
"zcash_primitives",
"zebra-chain",
"zebra-node-services",
]

Expand Down
2 changes: 2 additions & 0 deletions zebra-grpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@ futures-util = "0.3.28"
tonic = "0.10.2"
prost = "0.12.3"
tokio = { version = "1.36.0", features = ["macros", "rt-multi-thread"] }
tokio-stream = "0.1.14"
tower = { version = "0.4.13", features = ["util", "buffer"] }
color-eyre = "0.6.2"

zcash_primitives = { version = "0.13.0-rc.1" }

zebra-node-services = { path = "../zebra-node-services", version = "1.0.0-beta.34", features = ["shielded-scan"] }
zebra-chain = { path = "../zebra-chain" , version = "1.0.0-beta.34" }

[build-dependencies]
tonic-build = "0.10.2"
20 changes: 20 additions & 0 deletions zebra-grpc/proto/scanner.proto
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ service Scanner {

// Get all data we have stored for the given keys.
rpc GetResults(GetResultsRequest) returns (GetResultsResponse);

// Register keys and listen to the results
rpc Scan (ScanRequest) returns (stream ScanResponse);
}

// A response to a GetInfo call.
Expand Down Expand Up @@ -62,3 +65,20 @@ message TransactionHash {
// A transaction id hash
repeated string hash = 1;
}

// A request for registering keys and getting their transactions
message ScanRequest {
// A set of viewing keys
repeated string keys = 2;
}

// Response to Scan calls
message ScanResponse {
uint32 height = 1;

map<string, ScanResults> results = 2;
}

message ScanResults {
repeated bytes tx_ids = 1;
}
116 changes: 113 additions & 3 deletions zebra-grpc/src/server.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
//! The gRPC server implementation

use std::{collections::BTreeMap, net::SocketAddr};
use std::{collections::BTreeMap, net::SocketAddr, pin::Pin};

use futures_util::future::TryFutureExt;
use tokio_stream::{wrappers::ReceiverStream, Stream};
use tonic::{transport::Server, Response, Status};
use tower::ServiceExt;

use zebra_chain::block::Height;
use zebra_node_services::scan_service::{
request::Request as ScanServiceRequest, response::Response as ScanServiceResponse,
request::Request as ScanServiceRequest,
response::{Response as ScanServiceResponse, ScanResult},
};

use crate::scanner::{
scanner_server::{Scanner, ScannerServer},
ClearResultsRequest, DeleteKeysRequest, Empty, GetResultsRequest, GetResultsResponse,
InfoReply, Results, TransactionHash,
InfoReply, Results, ScanRequest, ScanResponse, ScanResults, TransactionHash,
};

type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;
Expand Down Expand Up @@ -42,6 +45,113 @@ where
+ 'static,
<ScanService as tower::Service<ScanServiceRequest>>::Future: Send,
{
type ScanStream = Pin<Box<dyn Stream<Item = Result<ScanResponse, Status>> + Send>>;

async fn scan(
&self,
request: tonic::Request<ScanRequest>,
) -> Result<Response<Self::ScanStream>, Status> {
let keys = request.into_inner().keys;

if keys.is_empty() {
let msg = "must provide at least 1 key in scan request";
return Err(Status::invalid_argument(msg));
}

// TODO: join these two futures so they're called in quick succession if there are items in the service's buffer
let ScanServiceResponse::Results(results) = self
.scan_service
.clone()
.ready()
.and_then(|service| service.call(ScanServiceRequest::Results(keys.clone())))
.await
.map_err(|err| Status::unknown(format!("scan service returned error: {err}")))?
else {
return Err(Status::unknown(
"scan service returned an unexpected response",
));
};

let ScanServiceResponse::SubscribeResults(mut results_receiver) = self
.scan_service
.clone()
.ready()
.and_then(|service| {
service.call(ScanServiceRequest::SubscribeResults(
keys.iter().cloned().collect(),
))
})
.await
.map_err(|err| Status::unknown(format!("scan service returned error: {err}")))?
else {
return Err(Status::unknown(
"scan service returned an unexpected response",
));
};

let (response_sender, response_receiver) = tokio::sync::mpsc::channel(10_000);

let response_stream = ReceiverStream::new(response_receiver);

tokio::spawn(async move {
// Transpose the nested BTreeMaps
let mut initial_results: BTreeMap<Height, BTreeMap<String, Vec<Vec<u8>>>> =
BTreeMap::new();
for (key, results_by_height) in results {
assert!(
keys.contains(&key),
"should not return results for keys that weren't provided"
);

for (height, results_for_key) in results_by_height {
let results_for_height = initial_results.entry(height).or_default();
results_for_height.entry(key.clone()).or_default().extend(
results_for_key
.into_iter()
.map(|result| result.bytes_in_display_order().to_vec()),
);
}
}

for (Height(height), results) in initial_results {
response_sender
.send(Ok(ScanResponse {
height,
results: results
.into_iter()
.map(|(key, results)| (key, ScanResults { tx_ids: results }))
.collect(),
}))
.await
.expect("channel should not be disconnected");
}

while let Some(ScanResult {
key,
height: Height(height),
tx_id,
}) = results_receiver.recv().await
{
response_sender
.send(Ok(ScanResponse {
height,
results: [(
key,
ScanResults {
tx_ids: vec![tx_id.bytes_in_display_order().to_vec()],
},
)]
.into_iter()
.collect(),
}))
.await
.expect("sender should not be dropped");
}
});

Ok(Response::new(Box::pin(response_stream)))
}

async fn get_info(
&self,
_request: tonic::Request<Empty>,
Expand Down

0 comments on commit 7ee7c93

Please sign in to comment.