Skip to content

Commit b8c3551

Browse files
committed
tmp
1 parent 67bf0a3 commit b8c3551

File tree

4 files changed

+132
-5
lines changed

4 files changed

+132
-5
lines changed

app/src/main/java/org/astraea/app/App.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,9 @@
2323
import java.util.Map;
2424
import org.astraea.app.automation.Automation;
2525
import org.astraea.app.benchmark.BalancerBenchmarkApp;
26+
import org.astraea.app.homework.Prepare;
27+
import org.astraea.app.homework.SendYourData;
2628
import org.astraea.app.performance.Performance;
27-
import org.astraea.app.performance.Prepare;
2829
import org.astraea.app.publisher.MetricPublisher;
2930
import org.astraea.app.version.Version;
3031
import org.astraea.app.web.WebService;
@@ -36,6 +37,8 @@ public class App {
3637
Performance.class,
3738
"prepare",
3839
Prepare.class,
40+
"send_your_data",
41+
SendYourData.class,
3942
"automation",
4043
Automation.class,
4144
"web",

app/src/main/java/org/astraea/app/performance/Prepare.java renamed to app/src/main/java/org/astraea/app/homework/Prepare.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17-
package org.astraea.app.performance;
17+
package org.astraea.app.homework;
1818

1919
import com.beust.jcommander.Parameter;
2020
import java.util.List;
@@ -114,8 +114,7 @@ public static class Argument extends org.astraea.app.argument.Argument {
114114
names = {"--topics"},
115115
description = "List<String>: topic names which you subscribed",
116116
validateWith = StringListField.class,
117-
listConverter = StringListField.class,
118-
required = false)
117+
listConverter = StringListField.class)
119118
List<String> topics;
120119
}
121120
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.astraea.app.homework;
18+
19+
import com.beust.jcommander.Parameter;
20+
import java.io.Closeable;
21+
import java.io.IOException;
22+
import java.nio.ByteBuffer;
23+
import java.time.Duration;
24+
import java.util.List;
25+
import java.util.Map;
26+
import java.util.Set;
27+
import java.util.stream.IntStream;
28+
import org.apache.kafka.clients.producer.KafkaProducer;
29+
import org.apache.kafka.clients.producer.ProducerConfig;
30+
import org.apache.kafka.clients.producer.ProducerRecord;
31+
import org.apache.kafka.common.serialization.ByteArraySerializer;
32+
import org.apache.kafka.common.serialization.Serializer;
33+
import org.astraea.app.argument.NonNegativeIntegerField;
34+
import org.astraea.app.argument.StringListField;
35+
import org.astraea.common.admin.Admin;
36+
37+
public class SendYourData {
38+
39+
public static void main(String[] args) throws IOException {
40+
execute(Argument.parse(new Argument(), args));
41+
}
42+
43+
public static void execute(final Argument param) throws IOException {
44+
try (var admin = Admin.of(param.bootstrapServers())) {
45+
param.topics.forEach(
46+
t ->
47+
admin
48+
.creator()
49+
.topic(t)
50+
.numberOfReplicas((short) 1)
51+
.numberOfPartitions(10)
52+
.run()
53+
.toCompletableFuture()
54+
.join());
55+
admin.waitCluster(
56+
Set.copyOf(param.topics),
57+
clusterInfo -> clusterInfo.topicNames().containsAll(param.topics),
58+
Duration.ofSeconds(10),
59+
1);
60+
}
61+
var start = System.currentTimeMillis();
62+
var max = Runtime.getRuntime().totalMemory();
63+
var last = -1L;
64+
try (var sender = new YourSender(param.bootstrapServers())) {
65+
IntStream.range(0, param.count)
66+
.forEach(index -> sender.send(param.topics, new Key(index, index, index)));
67+
if (System.currentTimeMillis() - last > 5000) {
68+
max = Math.max(max, Runtime.getRuntime().totalMemory());
69+
last = System.currentTimeMillis();
70+
}
71+
}
72+
System.out.println("elapsed=" + (System.currentTimeMillis() - start));
73+
System.out.println("memory=" + Math.max(max, Runtime.getRuntime().totalMemory()));
74+
}
75+
76+
public record Key(int v0, int v1, int v2) {}
77+
78+
public static class YourSender implements Closeable {
79+
private final KafkaProducer<Key, byte[]> producer;
80+
81+
@Override
82+
public void close() throws IOException {
83+
producer.close();
84+
}
85+
86+
public YourSender(String bootstrapServers) {
87+
Serializer<Key> serializer =
88+
(topic, key) -> {
89+
var buffer = ByteBuffer.allocate(Long.BYTES + Integer.BYTES + Long.BYTES);
90+
buffer.putInt(key.v0);
91+
buffer.putInt(key.v1);
92+
buffer.putInt(key.v2);
93+
buffer.flip();
94+
var bytes = new byte[buffer.remaining()];
95+
buffer.get(bytes);
96+
return bytes;
97+
};
98+
producer =
99+
new KafkaProducer<>(
100+
Map.of(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers),
101+
serializer,
102+
new ByteArraySerializer());
103+
}
104+
105+
public void send(List<String> topic, Key key) {
106+
topic.forEach(t -> producer.send(new ProducerRecord<>(t, key, null)));
107+
}
108+
}
109+
110+
public static class Argument extends org.astraea.app.argument.Argument {
111+
@Parameter(
112+
names = {"--topics"},
113+
description = "List<String>: topic names which you subscribed",
114+
validateWith = StringListField.class,
115+
listConverter = StringListField.class,
116+
required = true)
117+
List<String> topics;
118+
119+
@Parameter(
120+
names = {"--count"},
121+
description = "Long: the record count",
122+
validateWith = NonNegativeIntegerField.class)
123+
int count = 100;
124+
}
125+
}

common/src/main/java/org/astraea/common/partitioner/YourPartitioner.java renamed to app/src/main/java/org/astraea/app/homework/YourPartitioner.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17-
package org.astraea.common.partitioner;
17+
package org.astraea.app.homework;
1818

1919
import java.util.Map;
2020
import org.apache.kafka.clients.producer.Partitioner;

0 commit comments

Comments
 (0)