Skip to content

Commit 09d663f

Browse files
mfcrippscarl-mastrangelo
authored andcommitted
integration test: add a flowcontrolling proxy and integration tests
1 parent 0d89bb4 commit 09d663f

File tree

2 files changed

+517
-0
lines changed

2 files changed

+517
-0
lines changed
Lines changed: 251 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,251 @@
1+
/*
2+
* Copyright 2016, Google Inc. All rights reserved.
3+
*
4+
* Redistribution and use in source and binary forms, with or without
5+
* modification, are permitted provided that the following conditions are
6+
* met:
7+
*
8+
* * Redistributions of source code must retain the above copyright
9+
* notice, this list of conditions and the following disclaimer.
10+
* * Redistributions in binary form must reproduce the above
11+
* copyright notice, this list of conditions and the following disclaimer
12+
* in the documentation and/or other materials provided with the
13+
* distribution.
14+
*
15+
* * Neither the name of Google Inc. nor the names of its
16+
* contributors may be used to endorse or promote products derived from
17+
* this software without specific prior written permission.
18+
*
19+
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
20+
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
21+
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
22+
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
23+
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24+
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25+
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
26+
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
27+
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
28+
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
29+
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30+
*/
31+
32+
package io.grpc.testing.integration;
33+
34+
import static org.junit.Assert.assertEquals;
35+
36+
import org.junit.AfterClass;
37+
import org.junit.Test;
38+
import org.junit.runner.RunWith;
39+
import org.junit.runners.JUnit4;
40+
41+
import java.io.DataInputStream;
42+
import java.io.DataOutputStream;
43+
import java.io.IOException;
44+
import java.net.ServerSocket;
45+
import java.net.Socket;
46+
import java.net.UnknownHostException;
47+
import java.util.concurrent.ExecutionException;
48+
import java.util.concurrent.Future;
49+
import java.util.concurrent.LinkedBlockingQueue;
50+
import java.util.concurrent.ThreadPoolExecutor;
51+
import java.util.concurrent.TimeUnit;
52+
53+
@RunWith(JUnit4.class)
54+
public class ProxyTest {
55+
56+
private int serverPort = 5001;
57+
private int proxyPort = 5050;
58+
private String loopBack = "127.0.0.1";
59+
private static ThreadPoolExecutor executor =
60+
new ThreadPoolExecutor(1, 4, 1, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
61+
62+
@AfterClass
63+
public static void stopExecutor() {
64+
executor.shutdown();
65+
}
66+
67+
@Test
68+
public void smallLatency()
69+
throws UnknownHostException, IOException, InterruptedException, ExecutionException {
70+
Server server = new Server();
71+
Thread serverThread = new Thread(server);
72+
serverThread.start();
73+
74+
int latency = (int) TimeUnit.MILLISECONDS.toNanos(10);
75+
TrafficControlProxy p = new TrafficControlProxy(1024 * 1024, latency, TimeUnit.NANOSECONDS);
76+
startProxy(p).get();
77+
Socket client = new Socket(loopBack, proxyPort);
78+
client.setReuseAddress(true);
79+
DataOutputStream clientOut = new DataOutputStream(client.getOutputStream());
80+
DataInputStream clientIn = new DataInputStream(client.getInputStream());
81+
byte[] message = new byte[1];
82+
83+
// test
84+
long start = System.nanoTime();
85+
clientOut.write(message, 0, 1);
86+
clientIn.read(message);
87+
long stop = System.nanoTime();
88+
89+
p.shutDown();
90+
server.shutDown();
91+
client.close();
92+
93+
long rtt = (stop - start);
94+
assertEquals(latency, rtt, latency);
95+
}
96+
97+
@Test
98+
public void bigLatency()
99+
throws UnknownHostException, IOException, InterruptedException, ExecutionException {
100+
Server server = new Server();
101+
Thread serverThread = new Thread(server);
102+
serverThread.start();
103+
104+
int latency = (int) TimeUnit.MILLISECONDS.toNanos(250);
105+
TrafficControlProxy p = new TrafficControlProxy(1024 * 1024, latency, TimeUnit.NANOSECONDS);
106+
startProxy(p).get();
107+
Socket client = new Socket(loopBack, proxyPort);
108+
DataOutputStream clientOut = new DataOutputStream(client.getOutputStream());
109+
DataInputStream clientIn = new DataInputStream(client.getInputStream());
110+
byte[] message = new byte[1];
111+
112+
// test
113+
long start = System.nanoTime();
114+
clientOut.write(message, 0, 1);
115+
clientIn.read(message);
116+
long stop = System.nanoTime();
117+
118+
p.shutDown();
119+
server.shutDown();
120+
client.close();
121+
122+
long rtt = (stop - start);
123+
assertEquals(latency, rtt, latency);
124+
}
125+
126+
@Test
127+
public void smallBandwidth()
128+
throws UnknownHostException, IOException, InterruptedException, ExecutionException {
129+
Server server = new Server();
130+
server.setMode("stream");
131+
(new Thread(server)).start();
132+
assertEquals(server.mode(), "stream");
133+
134+
int bandwidth = 64 * 1024;
135+
TrafficControlProxy p = new TrafficControlProxy(bandwidth, 200, TimeUnit.MILLISECONDS);
136+
startProxy(p).get();
137+
Socket client = new Socket(loopBack, proxyPort);
138+
DataOutputStream clientOut = new DataOutputStream(client.getOutputStream());
139+
DataInputStream clientIn = new DataInputStream(client.getInputStream());
140+
141+
clientOut.write(new byte[1]);
142+
clientIn.readFully(new byte[100 * 1024]);
143+
long start = System.nanoTime();
144+
clientIn.readFully(new byte[5 * bandwidth]);
145+
long stop = System.nanoTime();
146+
147+
p.shutDown();
148+
server.shutDown();
149+
client.close();
150+
151+
long bandUsed = ((5 * bandwidth) / ((stop - start) / TimeUnit.SECONDS.toNanos(1)));
152+
assertEquals(bandwidth, bandUsed, .5 * bandwidth);
153+
}
154+
155+
@Test
156+
public void largeBandwidth()
157+
throws UnknownHostException, IOException, InterruptedException, ExecutionException {
158+
Server server = new Server();
159+
server.setMode("stream");
160+
(new Thread(server)).start();
161+
assertEquals(server.mode(), "stream");
162+
int bandwidth = 10 * 1024 * 1024;
163+
TrafficControlProxy p = new TrafficControlProxy(bandwidth, 200, TimeUnit.MILLISECONDS);
164+
startProxy(p).get();
165+
Socket client = new Socket(loopBack, proxyPort);
166+
DataOutputStream clientOut = new DataOutputStream(client.getOutputStream());
167+
DataInputStream clientIn = new DataInputStream(client.getInputStream());
168+
169+
clientOut.write(new byte[1]);
170+
clientIn.readFully(new byte[100 * 1024]);
171+
long start = System.nanoTime();
172+
clientIn.readFully(new byte[5 * bandwidth]);
173+
long stop = System.nanoTime();
174+
175+
p.shutDown();
176+
server.shutDown();
177+
client.close();
178+
179+
long bandUsed = ((5 * bandwidth) / ((stop - start) / TimeUnit.SECONDS.toNanos(1)));
180+
assertEquals(bandwidth, bandUsed, .5 * bandwidth);
181+
}
182+
183+
private Future<?> startProxy(final TrafficControlProxy p) {
184+
return executor.submit(new Runnable() {
185+
@Override
186+
public void run() {
187+
try {
188+
p.start();
189+
} catch (Exception e) {
190+
throw new RuntimeException(e);
191+
}
192+
}
193+
});
194+
}
195+
196+
// server with echo and streaming modes
197+
private class Server implements Runnable {
198+
private ServerSocket server;
199+
private Socket rcv;
200+
private boolean shutDown;
201+
private String mode = "echo";
202+
203+
public void setMode(String mode) {
204+
this.mode = mode;
205+
}
206+
207+
public String mode() {
208+
return mode;
209+
}
210+
211+
public void shutDown() {
212+
try {
213+
rcv.close();
214+
server.close();
215+
shutDown = true;
216+
} catch (IOException e) {
217+
shutDown = true;
218+
}
219+
}
220+
221+
@Override
222+
public void run() {
223+
try {
224+
server = new ServerSocket(serverPort);
225+
rcv = server.accept();
226+
DataInputStream serverIn = new DataInputStream(rcv.getInputStream());
227+
DataOutputStream serverOut = new DataOutputStream(rcv.getOutputStream());
228+
byte[] response = new byte[1024];
229+
if (mode.equals("echo")) {
230+
while (!shutDown) {
231+
int readable = serverIn.read(response);
232+
serverOut.write(response, 0, readable);
233+
}
234+
} else if (mode.equals("stream")) {
235+
serverIn.read(response);
236+
byte[] message = new byte[16 * 1024];
237+
while (!shutDown) {
238+
serverOut.write(message, 0, message.length);
239+
}
240+
serverIn.close();
241+
serverOut.close();
242+
rcv.close();
243+
} else {
244+
System.out.println("Unknown mode: use 'echo' or 'stream'");
245+
}
246+
} catch (IOException e) {
247+
e.printStackTrace();
248+
}
249+
}
250+
}
251+
}

0 commit comments

Comments
 (0)