-
Notifications
You must be signed in to change notification settings - Fork 1k
/
client.rs
85 lines (67 loc) · 2.73 KB
/
client.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
pub mod pb {
tonic::include_proto!("grpc.examples.echo");
}
use std::time::Duration;
use tokio_stream::{Stream, StreamExt};
use tonic::transport::Channel;
use pb::{echo_client::EchoClient, EchoRequest};
fn echo_requests_iter() -> impl Stream<Item = EchoRequest> {
tokio_stream::iter(1..usize::MAX).map(|i| EchoRequest {
message: format!("msg {:02}", i),
})
}
async fn streaming_echo(client: &mut EchoClient<Channel>, num: usize) {
let stream = client
.server_streaming_echo(EchoRequest {
message: "foo".into(),
})
.await
.unwrap()
.into_inner();
// stream is infinite - take just 5 elements and then disconnect
let mut stream = stream.take(num);
while let Some(item) = stream.next().await {
println!("\treceived: {}", item.unwrap().message);
}
// stream is dropped here and the disconnect info is sent to server
}
async fn bidirectional_streaming_echo(client: &mut EchoClient<Channel>, num: usize) {
let in_stream = echo_requests_iter().take(num);
let response = client
.bidirectional_streaming_echo(in_stream)
.await
.unwrap();
let mut resp_stream = response.into_inner();
while let Some(received) = resp_stream.next().await {
let received = received.unwrap();
println!("\treceived message: `{}`", received.message);
}
}
async fn bidirectional_streaming_echo_throttle(client: &mut EchoClient<Channel>, dur: Duration) {
let in_stream = echo_requests_iter().throttle(dur);
let response = client
.bidirectional_streaming_echo(in_stream)
.await
.unwrap();
let mut resp_stream = response.into_inner();
while let Some(received) = resp_stream.next().await {
let received = received.unwrap();
println!("\treceived message: `{}`", received.message);
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut client = EchoClient::connect("http://[::1]:50051").await.unwrap();
println!("Streaming echo:");
streaming_echo(&mut client, 5).await;
tokio::time::sleep(Duration::from_secs(1)).await; //do not mess server println functions
// Echo stream that sends 17 requests then graceful end that connection
println!("\r\nBidirectional stream echo:");
bidirectional_streaming_echo(&mut client, 17).await;
// Echo stream that sends up to `usize::MAX` requests. One request each 2s.
// Exiting client with CTRL+C demonstrate how to distinguish broken pipe from
// graceful client disconnection (above example) on the server side.
println!("\r\nBidirectional stream echo (kill client with CTLR+C):");
bidirectional_streaming_echo_throttle(&mut client, Duration::from_secs(2)).await;
Ok(())
}