Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache/master' into metadata-session-re…
Browse files Browse the repository at this point in the history
…connect
  • Loading branch information
merlimat committed May 2, 2024
2 parents 3609b3d + bc44280 commit a3a709d
Show file tree
Hide file tree
Showing 46 changed files with 2,360 additions and 1,468 deletions.
65 changes: 1 addition & 64 deletions bin/pulsar-perf
Original file line number Diff line number Diff line change
Expand Up @@ -84,37 +84,6 @@ add_maven_deps_to_classpath() {
fi
PULSAR_CLASSPATH=${CLASSPATH}:`cat "${f}"`
}
pulsar_help() {
cat <<EOF
Usage: pulsar-perf <command>
where command is one of:
produce Run a producer
consume Run a consumer
transaction Run a transaction repeatedly
read Run a topic reader
websocket-producer Run a websocket producer
managed-ledger Write directly on managed-ledgers
monitor-brokers Continuously receive broker data and/or load reports
simulation-client Run a simulation server acting as a Pulsar client
simulation-controller Run a simulation controller to give commands to servers
gen-doc Generate documentation automatically.
help This help message
or command is the full name of a class with a defined main() method.
Environment variables:
PULSAR_LOG_CONF Log4j configuration file (default $DEFAULT_LOG_CONF)
PULSAR_CLIENT_CONF Configuration file for client (default: $DEFAULT_CLIENT_CONF)
PULSAR_EXTRA_OPTS Extra options to be passed to the jvm
PULSAR_EXTRA_CLASSPATH Add extra paths to the pulsar classpath
These variable can also be set in conf/pulsar_env.sh
EOF
}

if [ -d "$PULSAR_HOME/lib" ]; then
PULSAR_CLASSPATH="$PULSAR_CLASSPATH:$PULSAR_HOME/lib/*"
Expand Down Expand Up @@ -162,36 +131,4 @@ OPTS="$OPTS -Dpulsar.log.file=$PULSAR_LOG_FILE"
#Change to PULSAR_HOME to support relative paths
cd "$PULSAR_HOME"

# if no args specified, show usage
if [ $# = 0 ]; then
pulsar_help;
exit 1;
fi

# get arguments
COMMAND=$1
shift

if [ "$COMMAND" == "produce" ]; then
exec $JAVA $OPTS org.apache.pulsar.testclient.PerformanceProducer --conf-file $PULSAR_PERFTEST_CONF "$@"
elif [ "$COMMAND" == "consume" ]; then
exec $JAVA $OPTS org.apache.pulsar.testclient.PerformanceConsumer --conf-file $PULSAR_PERFTEST_CONF "$@"
elif [ "$COMMAND" == "transaction" ]; then
exec $JAVA $OPTS org.apache.pulsar.testclient.PerformanceTransaction --conf-file $PULSAR_PERFTEST_CONF "$@"
elif [ "$COMMAND" == "read" ]; then
exec $JAVA $OPTS org.apache.pulsar.testclient.PerformanceReader --conf-file $PULSAR_PERFTEST_CONF "$@"
elif [ "$COMMAND" == "monitor-brokers" ]; then
exec $JAVA $OPTS org.apache.pulsar.testclient.BrokerMonitor "$@"
elif [ "$COMMAND" == "simulation-client" ]; then
exec $JAVA $OPTS org.apache.pulsar.testclient.LoadSimulationClient "$@"
elif [ "$COMMAND" == "simulation-controller" ]; then
exec $JAVA $OPTS org.apache.pulsar.testclient.LoadSimulationController "$@"
elif [ "$COMMAND" == "websocket-producer" ]; then
exec $JAVA $OPTS org.apache.pulsar.proxy.socket.client.PerformanceClient "$@"
elif [ "$COMMAND" == "managed-ledger" ]; then
exec $JAVA $OPTS org.apache.pulsar.testclient.ManagedLedgerWriter "$@"
elif [ "$COMMAND" == "gen-doc" ]; then
exec $JAVA $OPTS org.apache.pulsar.testclient.CmdGenerateDocumentation "$@"
else
pulsar_help;
fi
exec $JAVA $OPTS org.apache.pulsar.testclient.PulsarPerfTestTool $PULSAR_PERFTEST_CONF "$@"
84 changes: 1 addition & 83 deletions bin/pulsar-perf.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -72,67 +72,7 @@ set "OPTS=%OPTS% -Dpulsar.log.level=%PULSAR_LOG_LEVEL%"
set "OPTS=%OPTS% -Dpulsar.log.root.level=%PULSAR_LOG_ROOT_LEVEL%"
set "OPTS=%OPTS% -Dpulsar.log.immediateFlush=%PULSAR_LOG_IMMEDIATE_FLUSH%"

set "COMMAND=%1"

for /f "tokens=1,* delims= " %%a in ("%*") do set "_args=%%b"

if "%COMMAND%" == "produce" (
call :execCmdWithConfigFile org.apache.pulsar.testclient.PerformanceProducer
exit /B %ERROR_CODE%
)
if "%COMMAND%" == "consume" (
call :execCmdWithConfigFile org.apache.pulsar.testclient.PerformanceConsumer
exit /B %ERROR_CODE%
)
if "%COMMAND%" == "transaction" (
call :execCmdWithConfigFile org.apache.pulsar.testclient.PerformanceTransaction
exit /B %ERROR_CODE%
)
if "%COMMAND%" == "read" (
call :execCmdWithConfigFile org.apache.pulsar.testclient.PerformanceReader
exit /B %ERROR_CODE%
)
if "%COMMAND%" == "monitor-brokers" (
call :execCmd org.apache.pulsar.testclient.BrokerMonitor
exit /B %ERROR_CODE%
)
if "%COMMAND%" == "simulation-client" (
call :execCmd org.apache.pulsar.testclient.LoadSimulationClient
exit /B %ERROR_CODE%
)
if "%COMMAND%" == "simulation-controller" (
call :execCmd org.apache.pulsar.testclient.LoadSimulationController
exit /B %ERROR_CODE%
)
if "%COMMAND%" == "websocket-producer" (
call :execCmd org.apache.pulsar.proxy.socket.client.PerformanceClient
exit /B %ERROR_CODE%
)
if "%COMMAND%" == "managed-ledger" (
call :execCmd org.apache.pulsar.testclient.ManagedLedgerWriter
exit /B %ERROR_CODE%
)
if "%COMMAND%" == "gen-doc" (
call :execCmd org.apache.pulsar.testclient.CmdGenerateDocumentation
exit /B %ERROR_CODE%
)

call :usage
exit /B %ERROR_CODE%

:execCmdWithConfigFile
"%JAVACMD%" %OPTS% %1 --conf-file "%PULSAR_PERFTEST_CONF%" %_args%
if ERRORLEVEL 1 (
call :error
)
goto :eof

:execCmd
"%JAVACMD%" %OPTS% %1 %_args%
if ERRORLEVEL 1 (
call :error
)
goto :eof
"%JAVACMD%" %OPTS% org.apache.pulsar.testclient.PulsarPerfTestTool "%PULSAR_PERFTEST_CONF%" %*



Expand All @@ -142,25 +82,3 @@ goto :eof




:usage
echo Usage: pulsar-perf COMMAND
echo where command is one of:
echo produce Run a producer
echo consume Run a consumer
echo transaction Run a transaction repeatedly
echo read Run a topic reader
echo websocket-producer Run a websocket producer
echo managed-ledger Write directly on managed-ledgers
echo monitor-brokers Continuously receive broker data and/or load reports
echo simulation-client Run a simulation server acting as a Pulsar client
echo simulation-controller Run a simulation controller to give commands to servers
echo gen-doc Generate documentation automatically.
echo help This help message
echo or command is the full name of a class with a defined main() method.
echo Environment variables:
echo PULSAR_LOG_CONF Log4j configuration file (default %PULSAR_HOME%\logs)
echo PULSAR_CLIENT_CONF Configuration file for client (default: %PULSAR_HOME%\conf\client.conf)
echo PULSAR_EXTRA_OPTS Extra options to be passed to the jvm
echo PULSAR_EXTRA_CLASSPATH Add extra paths to the pulsar classpath
goto error
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ public interface ManagedLedgerMXBean {
*/
long getAddEntrySucceed();

/**
* @return the total number of addEntry requests that succeeded
*/
long getAddEntrySucceedTotal();

/**
* @return the number of addEntry requests that failed
*/
Expand All @@ -100,6 +105,11 @@ public interface ManagedLedgerMXBean {
*/
long getReadEntriesSucceeded();

/**
* @return the total number of readEntries requests that succeeded
*/
long getReadEntriesSucceededTotal();

/**
* @return the number of readEntries requests that failed
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,11 @@ public long getAddEntrySucceed() {
return addEntryOps.getCount();
}

@Override
public long getAddEntrySucceedTotal() {
return addEntryOps.getTotalCount();
}

@Override
public long getAddEntryErrors() {
return addEntryOpsFailed.getCount();
Expand All @@ -240,6 +245,11 @@ public long getReadEntriesSucceeded() {
return readEntriesOps.getCount();
}

@Override
public long getReadEntriesSucceededTotal() {
return readEntriesOps.getTotalCount();
}

@Override
public long getReadEntriesErrors() {
return readEntriesOpsFailed.getCount();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,12 @@ public void simple() throws Exception {
assertEquals(mbean.getAddEntryWithReplicasBytesRate(), 0.0);
assertEquals(mbean.getAddEntryMessagesRate(), 0.0);
assertEquals(mbean.getAddEntrySucceed(), 0);
assertEquals(mbean.getAddEntrySucceedTotal(), 0);
assertEquals(mbean.getAddEntryErrors(), 0);
assertEquals(mbean.getReadEntriesBytesRate(), 0.0);
assertEquals(mbean.getReadEntriesRate(), 0.0);
assertEquals(mbean.getReadEntriesSucceeded(), 0);
assertEquals(mbean.getReadEntriesSucceededTotal(), 0);
assertEquals(mbean.getReadEntriesErrors(), 0);
assertEquals(mbean.getMarkDeleteRate(), 0.0);

Expand All @@ -105,10 +107,12 @@ public void simple() throws Exception {
assertEquals(mbean.getAddEntryWithReplicasBytesRate(), 1600.0);
assertEquals(mbean.getAddEntryMessagesRate(), 2.0);
assertEquals(mbean.getAddEntrySucceed(), 2);
assertEquals(mbean.getAddEntrySucceedTotal(), 2);
assertEquals(mbean.getAddEntryErrors(), 0);
assertEquals(mbean.getReadEntriesBytesRate(), 0.0);
assertEquals(mbean.getReadEntriesRate(), 0.0);
assertEquals(mbean.getReadEntriesSucceeded(), 0);
assertEquals(mbean.getReadEntriesSucceededTotal(), 0);
assertEquals(mbean.getReadEntriesErrors(), 0);
assertTrue(mbean.getMarkDeleteRate() > 0.0);

Expand All @@ -134,10 +138,14 @@ public void simple() throws Exception {
assertEquals(mbean.getReadEntriesBytesRate(), 600.0);
assertEquals(mbean.getReadEntriesRate(), 1.0);
assertEquals(mbean.getReadEntriesSucceeded(), 1);
assertEquals(mbean.getReadEntriesSucceededTotal(), 1);
assertEquals(mbean.getReadEntriesErrors(), 0);
assertEquals(mbean.getNumberOfMessagesInBacklog(), 1);
assertEquals(mbean.getMarkDeleteRate(), 0.0);

assertEquals(mbean.getAddEntrySucceed(), 0);
assertEquals(mbean.getAddEntrySucceedTotal(), 2);

factory.shutdown();
}

Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ flexible messaging model and an intuitive client API.</description>
<aerospike-client.version>4.5.0</aerospike-client.version>
<kafka-client.version>3.4.0</kafka-client.version>
<rabbitmq-client.version>5.18.0</rabbitmq-client.version>
<aws-sdk.version>1.12.262</aws-sdk.version>
<aws-sdk.version>1.12.638</aws-sdk.version>
<avro.version>1.11.3</avro.version>
<joda.version>2.10.10</joda.version>
<jclouds.version>2.6.0</jclouds.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import org.apache.pulsar.broker.service.schema.SchemaStorageFactory;
import org.apache.pulsar.broker.stats.MetricsGenerator;
import org.apache.pulsar.broker.stats.OpenTelemetryTopicStats;
import org.apache.pulsar.broker.stats.PulsarBrokerOpenTelemetry;
import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet;
import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider;
Expand Down Expand Up @@ -252,6 +253,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {

private MetricsGenerator metricsGenerator;
private final PulsarBrokerOpenTelemetry openTelemetry;
private OpenTelemetryTopicStats openTelemetryTopicStats;

private TransactionMetadataStoreService transactionMetadataStoreService;
private TransactionBufferProvider transactionBufferProvider;
Expand Down Expand Up @@ -631,6 +633,10 @@ public CompletableFuture<Void> closeAsync() {
brokerClientSharedTimer.stop();
monotonicSnapshotClock.close();

if (openTelemetryTopicStats != null) {
openTelemetryTopicStats.close();
}

asyncCloseFutures.add(EventLoopUtil.shutdownGracefully(ioEventLoopGroup));


Expand Down Expand Up @@ -771,6 +777,8 @@ public void start() throws PulsarServerException {
config.getDefaultRetentionTimeInMinutes() * 60));
}

openTelemetryTopicStats = new OpenTelemetryTopicStats(this);

localMetadataSynchronizer = StringUtils.isNotBlank(config.getMetadataSyncEventTopic())
? new PulsarMetadataEventSynchronizer(this, config.getMetadataSyncEventTopic())
: null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,9 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP
private static final AtomicLongFieldUpdater<AbstractTopic> RATE_LIMITED_UPDATER =
AtomicLongFieldUpdater.newUpdater(AbstractTopic.class, "publishRateLimitedTimes");
protected volatile long publishRateLimitedTimes = 0L;
private static final AtomicLongFieldUpdater<AbstractTopic> TOTAL_RATE_LIMITED_UPDATER =
AtomicLongFieldUpdater.newUpdater(AbstractTopic.class, "totalPublishRateLimitedCounter");
protected volatile long totalPublishRateLimitedCounter = 0L;

private static final AtomicIntegerFieldUpdater<AbstractTopic> USER_CREATED_PRODUCER_COUNTER_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(AbstractTopic.class, "userCreatedProducerCount");
Expand Down Expand Up @@ -897,6 +900,7 @@ public void recordAddLatency(long latency, TimeUnit unit) {

@Override
public long increasePublishLimitedTimes() {
TOTAL_RATE_LIMITED_UPDATER.incrementAndGet(this);
return RATE_LIMITED_UPDATER.incrementAndGet(this);
}

Expand Down Expand Up @@ -1185,6 +1189,10 @@ public long getBytesOutCounter() {
+ sumSubscriptions(AbstractSubscription::getBytesOutCounter);
}

public long getTotalPublishRateLimitCounter() {
return TOTAL_RATE_LIMITED_UPDATER.get(this);
}

private long sumSubscriptions(ToLongFunction<AbstractSubscription> toCounter) {
return getSubscriptions().values().stream()
.map(AbstractSubscription.class::cast)
Expand Down
Loading

0 comments on commit a3a709d

Please sign in to comment.