Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/pulsar-ci-flaky.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ on:
- '21'
- '24'
- '25'
default: '21'
default: '25'
trace_test_resource_cleanup:
description: 'Collect thread & heap information before exiting a test JVM. When set to "on", thread dump and heap histogram will be collected. When set to "full", a heap dump will also be collected.'
required: true
Expand Down Expand Up @@ -112,8 +112,8 @@ jobs:
echo "jdk_major_version=17" >> $GITHUB_OUTPUT
exit 0
fi
# use JDK 21 for build unless overridden with workflow_dispatch input
echo "jdk_major_version=${{ github.event_name == 'workflow_dispatch' && github.event.inputs.jdk_major_version || '21'}}" >> $GITHUB_OUTPUT
# use JDK 25 for build unless overridden with workflow_dispatch input
echo "jdk_major_version=${{ github.event_name == 'workflow_dispatch' && github.event.inputs.jdk_major_version || '25'}}" >> $GITHUB_OUTPUT

- name: checkout
if: ${{ github.event_name == 'pull_request' }}
Expand Down
6 changes: 3 additions & 3 deletions .github/workflows/pulsar-ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ on:
- '21'
- '24'
- '25'
default: '21'
default: '25'
trace_test_resource_cleanup:
description: 'Collect thread & heap information before exiting a test JVM. When set to "on", thread dump and heap histogram will be collected. When set to "full", a heap dump will also be collected.'
required: true
Expand Down Expand Up @@ -122,8 +122,8 @@ jobs:
echo "jdk_major_version=17" >> $GITHUB_OUTPUT
exit 0
fi
# use JDK 21 for build unless overridden with workflow_dispatch input
echo "jdk_major_version=${{ github.event_name == 'workflow_dispatch' && github.event.inputs.jdk_major_version || '21'}}" >> $GITHUB_OUTPUT
# use JDK 25 for build unless overridden with workflow_dispatch input
echo "jdk_major_version=${{ github.event_name == 'workflow_dispatch' && github.event.inputs.jdk_major_version || '25'}}" >> $GITHUB_OUTPUT

- name: checkout
if: ${{ github.event_name == 'pull_request' }}
Expand Down
2 changes: 1 addition & 1 deletion buildtools/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
<ant.version>1.10.12</ant.version>
<snakeyaml.version>2.0</snakeyaml.version>
<mockito.version>5.19.0</mockito.version>
<byte-buddy.version>1.17.7</byte-buddy.version>
<byte-buddy.version>1.18.2</byte-buddy.version>
<wagon-ssh-external.version>3.5.3</wagon-ssh-external.version>
<!-- required for running tests on JDK11+ -->
<test.additional.args>
Expand Down
2 changes: 1 addition & 1 deletion distribution/server/src/assemble/LICENSE.bin.txt
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ The Apache Software License, Version 2.0
- net.jodah-typetools-0.5.0.jar
- dev.failsafe-failsafe-3.3.2.jar
* Byte Buddy
- net.bytebuddy-byte-buddy-1.17.7.jar
- net.bytebuddy-byte-buddy-1.18.2.jar
* zt-zip
- org.zeroturnaround-zt-zip-1.17.jar
* Apache Avro
Expand Down
23 changes: 17 additions & 6 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ flexible messaging model and an intuitive client API.</description>
<docker.verbose>true</docker.verbose>
<typetools.version>0.5.0</typetools.version>
<!-- match the byte-buddy version (or newer) used by the mockito-core pom -->
<byte-buddy.version>1.17.7</byte-buddy.version>
<byte-buddy.version>1.18.2</byte-buddy.version>
<zt-zip.version>1.17</zt-zip.version>
<protobuf3.version>3.25.5</protobuf3.version>
<protoc3.version>${protobuf3.version}</protoc3.version>
Expand All @@ -232,7 +232,7 @@ flexible messaging model and an intuitive client API.</description>
<cassandra.version>3.11.2</cassandra.version>
<aerospike-client.version>4.5.0</aerospike-client.version>
<kafka-client.version>3.9.1</kafka-client.version>
<rabbitmq-client.version>5.18.0</rabbitmq-client.version>
<rabbitmq-client.version>5.28.0</rabbitmq-client.version>
<aws-sdk.version>1.12.788</aws-sdk.version>
<aws-sdk2.version>2.32.28</aws-sdk2.version>
<avro.version>1.12.0</avro.version>
Expand All @@ -252,10 +252,10 @@ flexible messaging model and an intuitive client API.</description>
<debezium.mysql.version>9.4.0</debezium.mysql.version>
<jsonwebtoken.version>0.13.0</jsonwebtoken.version>
<opencensus.version>0.28.0</opencensus.version>
<hadoop3.version>3.4.2</hadoop3.version>
<hadoop3.version>3.5.0-SNAPSHOT</hadoop3.version>
<dnsjava3.version>3.6.2</dnsjava3.version>
<hdfs-offload-version3>${hadoop3.version}</hdfs-offload-version3>
<hbase.version>2.6.3-hadoop3</hbase.version>
<hbase.version>2.6.4-hadoop3</hbase.version>
<guava.version>33.4.8-jre</guava.version>
<jcip.version>1.0</jcip.version>
<prometheus-jmx.version>0.16.1</prometheus-jmx.version>
Expand Down Expand Up @@ -2986,7 +2986,7 @@ flexible messaging model and an intuitive client API.</description>
<profile>
<id>jdk21</id>
<activation>
<jdk>21</jdk>
<jdk>[21,)</jdk>
</activation>
<properties>
<!-- nifi-nar-maven-plugin >= 2.0.0 require Java 21+ -->
Expand Down Expand Up @@ -3026,7 +3026,7 @@ flexible messaging model and an intuitive client API.</description>
<profile>
<id>jdk24</id>
<activation>
<jdk>24</jdk>
<jdk>[24,)</jdk>
</activation>
<properties>
<!-- JDK 24+ specific arguments -->
Expand Down Expand Up @@ -3137,5 +3137,16 @@ flexible messaging model and an intuitive client API.</description>
<enabled>false</enabled>
</snapshots>
</repository>
<repository>
<id>apache-snapshots</id>
<name>Apache Snapshots Repository</name>
<url>https://repository.apache.org/content/repositories/snapshots/</url>
<releases>
<enabled>false</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
</project>
19 changes: 2 additions & 17 deletions pulsar-io/rabbitmq/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -81,24 +81,9 @@
</dependency>

<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-broker</artifactId>
<version>9.2.0</version>
<groupId>org.testcontainers</groupId>
<artifactId>rabbitmq</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-bdbstore</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-broker-plugins-amqp-1-0-protocol-bdb-link-store</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-broker-plugins-derby-store</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,36 +18,34 @@
*/
package org.apache.pulsar.io.rabbitmq;

import java.nio.file.Files;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.Map;
import org.apache.qpid.server.SystemLauncher;
import org.apache.qpid.server.model.SystemConfig;
import org.testcontainers.containers.RabbitMQContainer;
import org.testcontainers.utility.DockerImageName;

public class RabbitMQBrokerManager {
private RabbitMQContainer rabbitMQContainer;

private final SystemLauncher systemLauncher = new SystemLauncher();

public void startBroker(String port) throws Exception {
Map<String, Object> brokerOptions = getBrokerOptions(port);
systemLauncher.startup(brokerOptions);
public void startBroker() throws Exception {
rabbitMQContainer = new RabbitMQContainer(DockerImageName.parse("rabbitmq:3.7.25-management-alpine"));
rabbitMQContainer.withVhost("default");
rabbitMQContainer.start();
}

public void stopBroker() {
systemLauncher.shutdown();
if (rabbitMQContainer != null) {
rabbitMQContainer.stop();
rabbitMQContainer = null;
}
}

public int getPort() {
return rabbitMQContainer.getAmqpPort();
}

Map<String, Object> getBrokerOptions(String port) throws Exception {
Path tmpFolder = Files.createTempDirectory("qpidWork");
Map<String, Object> config = new HashMap<>();
config.put("qpid.work_dir", tmpFolder.toAbsolutePath().toString());
config.put("qpid.amqp_port", port);
public String getUser() {
return rabbitMQContainer.getAdminUsername();
}

Map<String, Object> context = new HashMap<>();
context.put(SystemConfig.INITIAL_CONFIGURATION_LOCATION, "classpath:qpid.json");
context.put(SystemConfig.TYPE, "Memory");
context.put(SystemConfig.CONTEXT, config);
return context;
public String getPassword() {
return rabbitMQContainer.getAdminPassword();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class RabbitMQSinkTest {
@BeforeMethod
public void setUp() throws Exception {
rabbitMQBrokerManager = new RabbitMQBrokerManager();
rabbitMQBrokerManager.startBroker("5673");
rabbitMQBrokerManager.startBroker();
}

@AfterMethod(alwaysRun = true)
Expand All @@ -52,10 +52,10 @@ public void tearDown() {
public void testOpenAndWriteSink() throws Exception {
Map<String, Object> configs = new HashMap<>();
configs.put("host", "localhost");
configs.put("port", "5673");
configs.put("port", String.valueOf(rabbitMQBrokerManager.getPort()));
configs.put("virtualHost", "default");
configs.put("username", "guest");
configs.put("password", "guest");
configs.put("username", rabbitMQBrokerManager.getUser());
configs.put("password", rabbitMQBrokerManager.getPassword());
configs.put("connectionName", "test-connection");
configs.put("requestedChannelMax", "0");
configs.put("requestedFrameMax", "0");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class RabbitMQSourceTest {
@BeforeMethod
public void setUp() throws Exception {
rabbitMQBrokerManager = new RabbitMQBrokerManager();
rabbitMQBrokerManager.startBroker("5672");
rabbitMQBrokerManager.startBroker();
}

@AfterMethod(alwaysRun = true)
Expand All @@ -49,10 +49,10 @@ public void tearDown() {
public void testOpenAndWriteSink() throws Exception {
Map<String, Object> configs = new HashMap<>();
configs.put("host", "localhost");
configs.put("port", "5672");
configs.put("port", String.valueOf(rabbitMQBrokerManager.getPort()));
configs.put("virtualHost", "default");
configs.put("username", "guest");
configs.put("password", "guest");
configs.put("username", rabbitMQBrokerManager.getUser());
configs.put("password", rabbitMQBrokerManager.getPassword());
configs.put("queueName", "test-queue");
configs.put("connectionName", "test-connection");
configs.put("requestedChannelMax", "0");
Expand Down
68 changes: 0 additions & 68 deletions pulsar-io/rabbitmq/src/test/resources/qpid.json

This file was deleted.

Loading