Skip to content

Commit

Permalink
Support token renewal
Browse files Browse the repository at this point in the history
For RabbitMQ 4.1.

See rabbitmq/rabbitmq-server#12599
  • Loading branch information
acogoluegnes committed Nov 19, 2024
1 parent d4408bd commit dac61c0
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 5 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/test-pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ jobs:
cache: 'maven'
- name: Start broker
run: ci/start-broker.sh
env:
RABBITMQ_IMAGE: 'pivotalrabbitmq/rabbitmq:amqp-token-renew'
- name: Start toxiproxy
run: ci/start-toxiproxy.sh
- name: Display Java version
Expand Down
20 changes: 20 additions & 0 deletions src/main/java/com/rabbitmq/client/amqp/impl/AmqpManagement.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import com.rabbitmq.client.amqp.AmqpException;
import com.rabbitmq.client.amqp.Management;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.*;
Expand All @@ -38,6 +39,7 @@
import org.apache.qpid.protonj2.client.exceptions.ClientException;
import org.apache.qpid.protonj2.client.exceptions.ClientLinkRemotelyClosedException;
import org.apache.qpid.protonj2.client.exceptions.ClientSessionRemotelyClosedException;
import org.apache.qpid.protonj2.types.Binary;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -169,6 +171,24 @@ public UnbindSpecification unbind() {
return new AmqpBindingManagement.AmqpUnbindSpecification(this);
}

void setToken(String token) {
checkAvailable();
UUID requestId = messageId();
try {
Message<?> request =
Message.create(new Binary(token.getBytes(StandardCharsets.UTF_8)))
.to("/auth/tokens")
.subject("PUT");

OutstandingRequest outstandingRequest = this.request(request, requestId);
outstandingRequest.block();

checkResponse(outstandingRequest, requestId, 204);
} catch (ClientException e) {
throw new AmqpException("Error on set-token operation", e);
}
}

@Override
public void close() {
if (this.initializing) {
Expand Down
5 changes: 2 additions & 3 deletions src/test/java/com/rabbitmq/client/amqp/impl/Cli.java
Original file line number Diff line number Diff line change
Expand Up @@ -182,12 +182,11 @@ public static void addVhost(String vhost) {
rabbitmqctl("add_vhost " + vhost);
}

public static void addUser(String username, String password) throws IOException {
public static void addUser(String username, String password) {
rabbitmqctl(format("add_user %s %s", username, password));
}

public static void setPermissions(String username, String vhost, String permission)
throws IOException {
public static void setPermissions(String username, String vhost, String permission) {
setPermissions(username, vhost, asList(permission, permission, permission));
}

Expand Down
73 changes: 71 additions & 2 deletions src/test/java/com/rabbitmq/client/amqp/impl/ManagementTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,26 @@
package com.rabbitmq.client.amqp.impl;

import static com.rabbitmq.client.amqp.impl.Assertions.assertThat;
import static com.rabbitmq.client.amqp.impl.Cli.*;
import static com.rabbitmq.client.amqp.impl.TestConditions.BrokerVersion.RABBITMQ_4_1_0;
import static com.rabbitmq.client.amqp.impl.TestUtils.sync;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import com.rabbitmq.client.amqp.AmqpException;
import com.rabbitmq.client.amqp.Management;
import com.rabbitmq.client.amqp.*;
import com.rabbitmq.client.amqp.impl.TestConditions.BrokerVersionAtLeast;
import com.rabbitmq.client.amqp.impl.TestUtils.Sync;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import org.junit.jupiter.api.*;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

@AmqpTestInfrastructure
public class ManagementTest {

Environment environment;
AmqpConnection connection;
AmqpManagement management;

Expand Down Expand Up @@ -88,4 +95,66 @@ void receiveLoopShouldStopAfterBeingIdle() {
assertThat(management.queueInfo(info1.name())).hasName(info1.name());
assertThat(management.queueInfo(info2.name())).hasName(info2.name());
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
@BrokerVersionAtLeast(RABBITMQ_4_1_0)
void setToken(boolean isolateResources, TestInfo info) {
String username = "foo";
String password = "bar";
String vh = "/";
String q = TestUtils.name(info);

Connection c = null;
try {
addUser(username, password);
setPermissions(username, vh, ".*");
this.connection.management().queue(q).declare();

c =
((AmqpConnectionBuilder) environment.connectionBuilder())
.isolateResources(isolateResources)
.username(username)
.password(password)
.build();
Sync consumeSync = sync();
Sync publisherClosedSync = sync();
Sync consumerClosedSync = sync();
Publisher p =
c.publisherBuilder().queue(q).listeners(closedListener(publisherClosedSync)).build();
c.consumerBuilder()
.queue(q)
.messageHandler(
(ctx, msg) -> {
ctx.accept();
consumeSync.down();
})
.listeners(closedListener(consumerClosedSync))
.build();

p.publish(p.message(), ctx -> {});
assertThat(consumeSync).completes();

setPermissions(username, vh, "foobar");
AmqpManagement m = (AmqpManagement) c.management();
m.setToken(password);
assertThat(publisherClosedSync).completes();
assertThat(consumerClosedSync).completes();
} finally {
if (c != null) {
c.close();
}
this.connection.management().queueDeletion().delete(q);
deleteUser(username);
}
}

private static Resource.StateListener closedListener(Sync sync) {
return context -> {
if (context.currentState() == Resource.State.CLOSED
&& context.failureCause() instanceof AmqpException.AmqpSecurityException) {
sync.down();
}
};
}
}

0 comments on commit dac61c0

Please sign in to comment.