Skip to content

Commit ec97d4e

Browse files
committed
tmp
1 parent 67bf0a3 commit ec97d4e

File tree

4 files changed

+174
-5
lines changed

4 files changed

+174
-5
lines changed

app/src/main/java/org/astraea/app/App.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,9 @@
2323
import java.util.Map;
2424
import org.astraea.app.automation.Automation;
2525
import org.astraea.app.benchmark.BalancerBenchmarkApp;
26+
import org.astraea.app.homework.Prepare;
27+
import org.astraea.app.homework.SendYourData;
2628
import org.astraea.app.performance.Performance;
27-
import org.astraea.app.performance.Prepare;
2829
import org.astraea.app.publisher.MetricPublisher;
2930
import org.astraea.app.version.Version;
3031
import org.astraea.app.web.WebService;
@@ -36,6 +37,8 @@ public class App {
3637
Performance.class,
3738
"prepare",
3839
Prepare.class,
40+
"send_your_data",
41+
SendYourData.class,
3942
"automation",
4043
Automation.class,
4144
"web",

app/src/main/java/org/astraea/app/performance/Prepare.java renamed to app/src/main/java/org/astraea/app/homework/Prepare.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17-
package org.astraea.app.performance;
17+
package org.astraea.app.homework;
1818

1919
import com.beust.jcommander.Parameter;
2020
import java.util.List;
@@ -114,8 +114,7 @@ public static class Argument extends org.astraea.app.argument.Argument {
114114
names = {"--topics"},
115115
description = "List<String>: topic names which you subscribed",
116116
validateWith = StringListField.class,
117-
listConverter = StringListField.class,
118-
required = false)
117+
listConverter = StringListField.class)
119118
List<String> topics;
120119
}
121120
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.astraea.app.homework;
18+
19+
import com.beust.jcommander.Parameter;
20+
import java.io.Closeable;
21+
import java.io.IOException;
22+
import java.nio.ByteBuffer;
23+
import java.time.Duration;
24+
import java.util.List;
25+
import java.util.Map;
26+
import java.util.Set;
27+
import java.util.concurrent.CompletableFuture;
28+
import java.util.concurrent.TimeUnit;
29+
import java.util.concurrent.atomic.AtomicLong;
30+
import java.util.stream.Collectors;
31+
import java.util.stream.IntStream;
32+
import org.apache.kafka.clients.producer.KafkaProducer;
33+
import org.apache.kafka.clients.producer.ProducerConfig;
34+
import org.apache.kafka.clients.producer.ProducerRecord;
35+
import org.apache.kafka.common.serialization.ByteArraySerializer;
36+
import org.apache.kafka.common.serialization.Serializer;
37+
import org.astraea.app.argument.DurationField;
38+
import org.astraea.app.argument.StringListField;
39+
import org.astraea.common.DataSize;
40+
import org.astraea.common.admin.Admin;
41+
import org.astraea.common.admin.TopicPartition;
42+
43+
public class SendYourData {
44+
45+
private static final int NUMBER_OF_PARTITIONS = 4;
46+
47+
public static void main(String[] args) throws IOException, InterruptedException {
48+
execute(Argument.parse(new Argument(), args));
49+
}
50+
51+
public static void execute(final Argument param) throws IOException, InterruptedException {
52+
try (var admin = Admin.of(param.bootstrapServers())) {
53+
param.topics.forEach(
54+
t ->
55+
admin
56+
.creator()
57+
.topic(t)
58+
.numberOfReplicas((short) 1)
59+
.numberOfPartitions(NUMBER_OF_PARTITIONS)
60+
.run()
61+
.toCompletableFuture()
62+
.join());
63+
admin.waitCluster(
64+
Set.copyOf(param.topics),
65+
clusterInfo -> clusterInfo.topicNames().containsAll(param.topics),
66+
Duration.ofSeconds(10),
67+
1);
68+
}
69+
var keys =
70+
List.of(
71+
new Key(IntStream.range(0, 1000).mapToObj(Long::valueOf).toList()),
72+
new Key(IntStream.range(0, 2500).mapToObj(Long::valueOf).toList()),
73+
new Key(IntStream.range(0, 3000).mapToObj(Long::valueOf).toList()));
74+
var max = Runtime.getRuntime().totalMemory();
75+
var count = new AtomicLong();
76+
try (var sender = new YourSender(param.bootstrapServers())) {
77+
var start = System.currentTimeMillis();
78+
var fs =
79+
IntStream.range(0, 5)
80+
.mapToObj(
81+
__ ->
82+
CompletableFuture.runAsync(
83+
() -> {
84+
while (System.currentTimeMillis() - start
85+
<= param.duration.toMillis()) {
86+
var index = count.getAndIncrement();
87+
sender.send(param.topics, keys.get((int) (index % keys.size())));
88+
}
89+
}))
90+
.toList();
91+
while (!fs.stream().allMatch(CompletableFuture::isDone)) {
92+
max = Math.max(max, Runtime.getRuntime().totalMemory());
93+
TimeUnit.MILLISECONDS.sleep(300);
94+
}
95+
}
96+
try (var admin = Admin.of(param.bootstrapServers())) {
97+
var offsets =
98+
param.topics.stream()
99+
.collect(
100+
Collectors.toUnmodifiableMap(
101+
t -> t,
102+
t ->
103+
admin
104+
.latestOffsets(
105+
IntStream.range(0, NUMBER_OF_PARTITIONS)
106+
.mapToObj(i -> TopicPartition.of(t, i))
107+
.collect(Collectors.toSet()))
108+
.toCompletableFuture()
109+
.join()
110+
.values()
111+
.stream()
112+
.mapToLong(i -> i)
113+
.sum()));
114+
System.out.println("memory=" + DataSize.Byte.of(max));
115+
offsets.forEach((t, o) -> System.out.println(t + "=" + o));
116+
}
117+
}
118+
119+
public record Key(List<Long> vs) {}
120+
121+
public static class YourSender implements Closeable {
122+
private final KafkaProducer<Key, byte[]> producer;
123+
124+
@Override
125+
public void close() throws IOException {
126+
producer.close();
127+
}
128+
129+
public YourSender(String bootstrapServers) {
130+
Serializer<Key> serializer =
131+
(topic, key) -> {
132+
var buffer = ByteBuffer.allocate(Long.BYTES * key.vs.size());
133+
key.vs.forEach(buffer::putLong);
134+
buffer.flip();
135+
var bytes = new byte[buffer.remaining()];
136+
buffer.get(bytes);
137+
return bytes;
138+
};
139+
producer =
140+
new KafkaProducer<>(
141+
Map.of(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers),
142+
serializer,
143+
new ByteArraySerializer());
144+
}
145+
146+
public void send(List<String> topic, Key key) {
147+
topic.forEach(t -> producer.send(new ProducerRecord<>(t, key, null)));
148+
}
149+
}
150+
151+
public static class Argument extends org.astraea.app.argument.Argument {
152+
@Parameter(
153+
names = {"--topics"},
154+
description = "List<String>: topic names which you subscribed",
155+
validateWith = StringListField.class,
156+
listConverter = StringListField.class,
157+
required = true)
158+
List<String> topics;
159+
160+
@Parameter(
161+
names = {"--duration"},
162+
description = "duration: the time to test",
163+
validateWith = DurationField.class,
164+
converter = DurationField.class)
165+
Duration duration = Duration.ofSeconds(20);
166+
}
167+
}

common/src/main/java/org/astraea/common/partitioner/YourPartitioner.java renamed to app/src/main/java/org/astraea/app/homework/YourPartitioner.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17-
package org.astraea.common.partitioner;
17+
package org.astraea.app.homework;
1818

1919
import java.util.Map;
2020
import org.apache.kafka.clients.producer.Partitioner;

0 commit comments

Comments
 (0)