From ddef9b863161bd3e1d96b368b059349145b6bea2 Mon Sep 17 00:00:00 2001 From: Chia-Ping Tsai Date: Thu, 14 Nov 2024 13:49:31 +0800 Subject: [PATCH] tmp --- app/src/main/java/org/astraea/app/App.java | 5 +- .../{performance => homework}/Prepare.java | 5 +- .../astraea/app/homework/SendYourData.java | 167 ++++++++++++++++++ .../app/homework}/YourPartitioner.java | 2 +- 4 files changed, 174 insertions(+), 5 deletions(-) rename app/src/main/java/org/astraea/app/{performance => homework}/Prepare.java (97%) create mode 100644 app/src/main/java/org/astraea/app/homework/SendYourData.java rename {common/src/main/java/org/astraea/common/partitioner => app/src/main/java/org/astraea/app/homework}/YourPartitioner.java (97%) diff --git a/app/src/main/java/org/astraea/app/App.java b/app/src/main/java/org/astraea/app/App.java index 4180d4c246..c53c89de60 100644 --- a/app/src/main/java/org/astraea/app/App.java +++ b/app/src/main/java/org/astraea/app/App.java @@ -23,8 +23,9 @@ import java.util.Map; import org.astraea.app.automation.Automation; import org.astraea.app.benchmark.BalancerBenchmarkApp; +import org.astraea.app.homework.Prepare; +import org.astraea.app.homework.SendYourData; import org.astraea.app.performance.Performance; -import org.astraea.app.performance.Prepare; import org.astraea.app.publisher.MetricPublisher; import org.astraea.app.version.Version; import org.astraea.app.web.WebService; @@ -36,6 +37,8 @@ public class App { Performance.class, "prepare", Prepare.class, + "send_your_data", + SendYourData.class, "automation", Automation.class, "web", diff --git a/app/src/main/java/org/astraea/app/performance/Prepare.java b/app/src/main/java/org/astraea/app/homework/Prepare.java similarity index 97% rename from app/src/main/java/org/astraea/app/performance/Prepare.java rename to app/src/main/java/org/astraea/app/homework/Prepare.java index 8b0d35ffc0..040fc66e99 100644 --- a/app/src/main/java/org/astraea/app/performance/Prepare.java +++ b/app/src/main/java/org/astraea/app/homework/Prepare.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.astraea.app.performance; +package org.astraea.app.homework; import com.beust.jcommander.Parameter; import java.util.List; @@ -114,8 +114,7 @@ public static class Argument extends org.astraea.app.argument.Argument { names = {"--topics"}, description = "List: topic names which you subscribed", validateWith = StringListField.class, - listConverter = StringListField.class, - required = false) + listConverter = StringListField.class) List topics; } } diff --git a/app/src/main/java/org/astraea/app/homework/SendYourData.java b/app/src/main/java/org/astraea/app/homework/SendYourData.java new file mode 100644 index 0000000000..9a40ebf1c3 --- /dev/null +++ b/app/src/main/java/org/astraea/app/homework/SendYourData.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.astraea.app.homework; + +import com.beust.jcommander.Parameter; +import java.io.Closeable; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.Serializer; +import org.astraea.app.argument.DurationField; +import org.astraea.app.argument.StringListField; +import org.astraea.common.DataSize; +import org.astraea.common.admin.Admin; +import org.astraea.common.admin.TopicPartition; + +public class SendYourData { + + private static final int NUMBER_OF_PARTITIONS = 4; + + public static void main(String[] args) throws IOException, InterruptedException { + execute(Argument.parse(new Argument(), args)); + } + + public static void execute(final Argument param) throws IOException, InterruptedException { + try (var admin = Admin.of(param.bootstrapServers())) { + param.topics.forEach( + t -> + admin + .creator() + .topic(t) + .numberOfReplicas((short) 1) + .numberOfPartitions(NUMBER_OF_PARTITIONS) + .run() + .toCompletableFuture() + .join()); + admin.waitCluster( + Set.copyOf(param.topics), + clusterInfo -> clusterInfo.topicNames().containsAll(param.topics), + Duration.ofSeconds(10), + 1); + } + var keys = + IntStream.range(0, 1000) + .mapToObj( + index -> new Key(IntStream.range(0, index + 1).mapToObj(Long::valueOf).toList())) + .toList(); + var max = Runtime.getRuntime().totalMemory(); + var count = new AtomicLong(); + try (var sender = new YourSender(param.bootstrapServers())) { + var start = System.currentTimeMillis(); + var fs = + IntStream.range(0, 2) + .mapToObj( + __ -> + CompletableFuture.runAsync( + () -> { + while (System.currentTimeMillis() - start + <= param.duration.toMillis()) { + var index = count.getAndIncrement(); + sender.send(param.topics, keys.get((int) (index % keys.size()))); + } + })) + .toList(); + while (!fs.stream().allMatch(CompletableFuture::isDone)) { + max = Math.max(max, Runtime.getRuntime().totalMemory()); + TimeUnit.MILLISECONDS.sleep(300); + } + } + try (var admin = Admin.of(param.bootstrapServers())) { + var offsets = + param.topics.stream() + .collect( + Collectors.toUnmodifiableMap( + t -> t, + t -> + admin + .latestOffsets( + IntStream.range(0, NUMBER_OF_PARTITIONS) + .mapToObj(i -> TopicPartition.of(t, i)) + .collect(Collectors.toSet())) + .toCompletableFuture() + .join() + .values() + .stream() + .mapToLong(i -> i) + .sum())); + System.out.println("memory=" + DataSize.Byte.of(max)); + offsets.forEach((t, o) -> System.out.println(t + "=" + o)); + } + } + + public record Key(List vs) {} + + public static class YourSender implements Closeable { + private final KafkaProducer producer; + + @Override + public void close() throws IOException { + producer.close(); + } + + public YourSender(String bootstrapServers) { + Serializer serializer = + (topic, key) -> { + var buffer = ByteBuffer.allocate(Long.BYTES * key.vs.size()); + key.vs.forEach(buffer::putLong); + buffer.flip(); + var bytes = new byte[buffer.remaining()]; + buffer.get(bytes); + return bytes; + }; + producer = + new KafkaProducer<>( + Map.of(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers), + serializer, + new ByteArraySerializer()); + } + + public void send(List topic, Key key) { + topic.forEach(t -> producer.send(new ProducerRecord<>(t, key, null))); + } + } + + public static class Argument extends org.astraea.app.argument.Argument { + @Parameter( + names = {"--topics"}, + description = "List: topic names which you subscribed", + validateWith = StringListField.class, + listConverter = StringListField.class, + required = true) + List topics; + + @Parameter( + names = {"--duration"}, + description = "duration: the time to test", + validateWith = DurationField.class, + converter = DurationField.class) + Duration duration = Duration.ofSeconds(20); + } +} diff --git a/common/src/main/java/org/astraea/common/partitioner/YourPartitioner.java b/app/src/main/java/org/astraea/app/homework/YourPartitioner.java similarity index 97% rename from common/src/main/java/org/astraea/common/partitioner/YourPartitioner.java rename to app/src/main/java/org/astraea/app/homework/YourPartitioner.java index 8ae5bc82e8..c0bea8ec55 100644 --- a/common/src/main/java/org/astraea/common/partitioner/YourPartitioner.java +++ b/app/src/main/java/org/astraea/app/homework/YourPartitioner.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.astraea.common.partitioner; +package org.astraea.app.homework; import java.util.Map; import org.apache.kafka.clients.producer.Partitioner;