Skip to content

Commit

Permalink
tmp
Browse files Browse the repository at this point in the history
  • Loading branch information
chia7712 committed Nov 14, 2024
1 parent 67bf0a3 commit a60d0a7
Show file tree
Hide file tree
Showing 4 changed files with 159 additions and 5 deletions.
5 changes: 4 additions & 1 deletion app/src/main/java/org/astraea/app/App.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@
import java.util.Map;
import org.astraea.app.automation.Automation;
import org.astraea.app.benchmark.BalancerBenchmarkApp;
import org.astraea.app.homework.Prepare;
import org.astraea.app.homework.SendYourData;
import org.astraea.app.performance.Performance;
import org.astraea.app.performance.Prepare;
import org.astraea.app.publisher.MetricPublisher;
import org.astraea.app.version.Version;
import org.astraea.app.web.WebService;
Expand All @@ -36,6 +37,8 @@ public class App {
Performance.class,
"prepare",
Prepare.class,
"send_your_data",
SendYourData.class,
"automation",
Automation.class,
"web",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.astraea.app.performance;
package org.astraea.app.homework;

import com.beust.jcommander.Parameter;
import java.util.List;
Expand Down Expand Up @@ -114,8 +114,7 @@ public static class Argument extends org.astraea.app.argument.Argument {
names = {"--topics"},
description = "List<String>: topic names which you subscribed",
validateWith = StringListField.class,
listConverter = StringListField.class,
required = false)
listConverter = StringListField.class)
List<String> topics;
}
}
152 changes: 152 additions & 0 deletions app/src/main/java/org/astraea/app/homework/SendYourData.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.astraea.app.homework;

import com.beust.jcommander.Parameter;
import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.astraea.app.argument.NonNegativeIntegerField;
import org.astraea.app.argument.StringListField;
import org.astraea.common.admin.Admin;
import org.astraea.common.admin.TopicPartition;

public class SendYourData {

private static final int NUMBER_OF_PARTITIONS = 4;

public static void main(String[] args) throws IOException {
execute(Argument.parse(new Argument(), args));
}

public static void execute(final Argument param) throws IOException {
try (var admin = Admin.of(param.bootstrapServers())) {
param.topics.forEach(
t ->
admin
.creator()
.topic(t)
.numberOfReplicas((short) 1)
.numberOfPartitions(NUMBER_OF_PARTITIONS)
.run()
.toCompletableFuture()
.join());
admin.waitCluster(
Set.copyOf(param.topics),
clusterInfo -> clusterInfo.topicNames().containsAll(param.topics),
Duration.ofSeconds(10),
1);
}
var start = System.currentTimeMillis();
var max = Runtime.getRuntime().totalMemory();
var last = -1L;
try (var sender = new YourSender(param.bootstrapServers())) {
IntStream.range(0, param.count)
.forEach(index -> sender.send(param.topics, new Key(index, index, index)));
if (System.currentTimeMillis() - last > 5000) {
max = Math.max(max, Runtime.getRuntime().totalMemory());
last = System.currentTimeMillis();
}
}
var elapsed = (System.currentTimeMillis() - start);
max = Runtime.getRuntime().totalMemory();

try (var admin = Admin.of(param.bootstrapServers())) {
var offsets =
param.topics.stream()
.collect(
Collectors.toUnmodifiableMap(
t -> t,
t ->
admin
.latestOffsets(
IntStream.range(0, NUMBER_OF_PARTITIONS)
.mapToObj(i -> TopicPartition.of(t, i))
.collect(Collectors.toSet()))
.toCompletableFuture()
.join()
.values()
.stream()
.mapToLong(i -> i)
.sum()));
System.out.println("elapsed=" + elapsed);
System.out.println("memory=" + max);
offsets.forEach((t, o) -> System.out.println(t + "=" + o));
}
}

public record Key(int v0, int v1, int v2) {}

public static class YourSender implements Closeable {
private final KafkaProducer<Key, byte[]> producer;

@Override
public void close() throws IOException {
producer.close();
}

public YourSender(String bootstrapServers) {
Serializer<Key> serializer =
(topic, key) -> {
var buffer = ByteBuffer.allocate(Long.BYTES + Integer.BYTES + Long.BYTES);
buffer.putInt(key.v0);
buffer.putInt(key.v1);
buffer.putInt(key.v2);
buffer.flip();
var bytes = new byte[buffer.remaining()];
buffer.get(bytes);
return bytes;
};
producer =
new KafkaProducer<>(
Map.of(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers),
serializer,
new ByteArraySerializer());
}

public void send(List<String> topic, Key key) {
topic.forEach(t -> producer.send(new ProducerRecord<>(t, key, null)));
}
}

public static class Argument extends org.astraea.app.argument.Argument {
@Parameter(
names = {"--topics"},
description = "List<String>: topic names which you subscribed",
validateWith = StringListField.class,
listConverter = StringListField.class,
required = true)
List<String> topics;

@Parameter(
names = {"--count"},
description = "Long: the record count",
validateWith = NonNegativeIntegerField.class)
int count = 100;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.astraea.common.partitioner;
package org.astraea.app.homework;

import java.util.Map;
import org.apache.kafka.clients.producer.Partitioner;
Expand Down

0 comments on commit a60d0a7

Please sign in to comment.