Skip to content

Commit

Permalink
tmp
Browse files Browse the repository at this point in the history
  • Loading branch information
chia7712 committed Nov 14, 2024
1 parent 67bf0a3 commit ddef9b8
Show file tree
Hide file tree
Showing 4 changed files with 174 additions and 5 deletions.
5 changes: 4 additions & 1 deletion app/src/main/java/org/astraea/app/App.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@
import java.util.Map;
import org.astraea.app.automation.Automation;
import org.astraea.app.benchmark.BalancerBenchmarkApp;
import org.astraea.app.homework.Prepare;
import org.astraea.app.homework.SendYourData;
import org.astraea.app.performance.Performance;
import org.astraea.app.performance.Prepare;
import org.astraea.app.publisher.MetricPublisher;
import org.astraea.app.version.Version;
import org.astraea.app.web.WebService;
Expand All @@ -36,6 +37,8 @@ public class App {
Performance.class,
"prepare",
Prepare.class,
"send_your_data",
SendYourData.class,
"automation",
Automation.class,
"web",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.astraea.app.performance;
package org.astraea.app.homework;

import com.beust.jcommander.Parameter;
import java.util.List;
Expand Down Expand Up @@ -114,8 +114,7 @@ public static class Argument extends org.astraea.app.argument.Argument {
names = {"--topics"},
description = "List<String>: topic names which you subscribed",
validateWith = StringListField.class,
listConverter = StringListField.class,
required = false)
listConverter = StringListField.class)
List<String> topics;
}
}
167 changes: 167 additions & 0 deletions app/src/main/java/org/astraea/app/homework/SendYourData.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.astraea.app.homework;

import com.beust.jcommander.Parameter;
import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.astraea.app.argument.DurationField;
import org.astraea.app.argument.StringListField;
import org.astraea.common.DataSize;
import org.astraea.common.admin.Admin;
import org.astraea.common.admin.TopicPartition;

public class SendYourData {

private static final int NUMBER_OF_PARTITIONS = 4;

public static void main(String[] args) throws IOException, InterruptedException {
execute(Argument.parse(new Argument(), args));
}

public static void execute(final Argument param) throws IOException, InterruptedException {
try (var admin = Admin.of(param.bootstrapServers())) {
param.topics.forEach(
t ->
admin
.creator()
.topic(t)
.numberOfReplicas((short) 1)
.numberOfPartitions(NUMBER_OF_PARTITIONS)
.run()
.toCompletableFuture()
.join());
admin.waitCluster(
Set.copyOf(param.topics),
clusterInfo -> clusterInfo.topicNames().containsAll(param.topics),
Duration.ofSeconds(10),
1);
}
var keys =
IntStream.range(0, 1000)
.mapToObj(
index -> new Key(IntStream.range(0, index + 1).mapToObj(Long::valueOf).toList()))
.toList();
var max = Runtime.getRuntime().totalMemory();
var count = new AtomicLong();
try (var sender = new YourSender(param.bootstrapServers())) {
var start = System.currentTimeMillis();
var fs =
IntStream.range(0, 2)
.mapToObj(
__ ->
CompletableFuture.runAsync(
() -> {
while (System.currentTimeMillis() - start
<= param.duration.toMillis()) {
var index = count.getAndIncrement();
sender.send(param.topics, keys.get((int) (index % keys.size())));
}
}))
.toList();
while (!fs.stream().allMatch(CompletableFuture::isDone)) {
max = Math.max(max, Runtime.getRuntime().totalMemory());
TimeUnit.MILLISECONDS.sleep(300);
}
}
try (var admin = Admin.of(param.bootstrapServers())) {
var offsets =
param.topics.stream()
.collect(
Collectors.toUnmodifiableMap(
t -> t,
t ->
admin
.latestOffsets(
IntStream.range(0, NUMBER_OF_PARTITIONS)
.mapToObj(i -> TopicPartition.of(t, i))
.collect(Collectors.toSet()))
.toCompletableFuture()
.join()
.values()
.stream()
.mapToLong(i -> i)
.sum()));
System.out.println("memory=" + DataSize.Byte.of(max));
offsets.forEach((t, o) -> System.out.println(t + "=" + o));
}
}

public record Key(List<Long> vs) {}

public static class YourSender implements Closeable {
private final KafkaProducer<Key, byte[]> producer;

@Override
public void close() throws IOException {
producer.close();
}

public YourSender(String bootstrapServers) {
Serializer<Key> serializer =
(topic, key) -> {
var buffer = ByteBuffer.allocate(Long.BYTES * key.vs.size());
key.vs.forEach(buffer::putLong);
buffer.flip();
var bytes = new byte[buffer.remaining()];
buffer.get(bytes);
return bytes;
};
producer =
new KafkaProducer<>(
Map.of(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers),
serializer,
new ByteArraySerializer());
}

public void send(List<String> topic, Key key) {
topic.forEach(t -> producer.send(new ProducerRecord<>(t, key, null)));
}
}

public static class Argument extends org.astraea.app.argument.Argument {
@Parameter(
names = {"--topics"},
description = "List<String>: topic names which you subscribed",
validateWith = StringListField.class,
listConverter = StringListField.class,
required = true)
List<String> topics;

@Parameter(
names = {"--duration"},
description = "duration: the time to test",
validateWith = DurationField.class,
converter = DurationField.class)
Duration duration = Duration.ofSeconds(20);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.astraea.common.partitioner;
package org.astraea.app.homework;

import java.util.Map;
import org.apache.kafka.clients.producer.Partitioner;
Expand Down

0 comments on commit ddef9b8

Please sign in to comment.