Skip to content

Commit

Permalink
[Pulsar IO] Allow routing key per message to RabbitMQ sink connector (a…
Browse files Browse the repository at this point in the history
…pache#5890)

### Motivation

With the current RabbitMQ sink connector, all messages are forwarded to an exchange with the same routing key (specified in the configuration). It would be great to give a bit more flexibility and allow the use of different routing keys for different messages on the same topic.

### Modifications

The creation of the queue has been removed from the sink, all messages will be forwarded to the exchange and each consumer will create it's on queue + binding.

The configuration has been modified to reflect those changes: added the exchange type and queue name is just required for the source connector.
  • Loading branch information
Alex Rufo authored and huangdx0726 committed Aug 24, 2020
1 parent b2d65ad commit 33fd7bf
Show file tree
Hide file tree
Showing 7 changed files with 32 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,6 @@ public class RabbitMQAbstractConfig implements Serializable {
help = "The password used to authenticate to RabbitMQ")
private String password = "guest";

@FieldDoc(
required = true,
defaultValue = "",
help = "The RabbitMQ queue name from which messages should be read from or written to")
private String queueName;

@FieldDoc(
required = false,
defaultValue = "0",
Expand Down Expand Up @@ -115,7 +109,6 @@ public void validate() {
Preconditions.checkNotNull(port, "port property not set.");
Preconditions.checkNotNull(virtualHost, "virtualHost property not set.");
Preconditions.checkNotNull(connectionName, "connectionName property not set.");
Preconditions.checkNotNull(queueName, "queueName property not set.");
}

public ConnectionFactory createConnectionFactory() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ public class RabbitMQSink implements Sink<byte[]> {
private Channel rabbitMQChannel;
private RabbitMQSinkConfig rabbitMQSinkConfig;
private String exchangeName;
private String routingKey;

@Override
public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
Expand All @@ -64,21 +63,17 @@ public void open(Map<String, Object> config, SinkContext sinkContext) throws Exc
);

exchangeName = rabbitMQSinkConfig.getExchangeName();
routingKey = rabbitMQSinkConfig.getRoutingKey();
String exchangeType = rabbitMQSinkConfig.getExchangeType();

rabbitMQChannel = rabbitMQConnection.createChannel();

// several clients share a queue
rabbitMQChannel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true);
rabbitMQChannel.queueDeclare(rabbitMQSinkConfig.getQueueName(), true, false, false, null);
rabbitMQChannel.queueBind(rabbitMQSinkConfig.getQueueName(), exchangeName, routingKey);
rabbitMQChannel.exchangeDeclare(exchangeName, exchangeType, true);
}

@Override
public void write(Record<byte[]> record) {
byte[] value = record.getValue();
try {
rabbitMQChannel.basicPublish(exchangeName, routingKey, null, value);
rabbitMQChannel.basicPublish(exchangeName, record.getProperties().get("routingKey"), null, value);
record.ack();
} catch (IOException e) {
record.fail();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@ public class RabbitMQSinkConfig extends RabbitMQAbstractConfig implements Serial
private String exchangeName;

@FieldDoc(
required = true,
defaultValue = "",
help = "The routing key used for publishing the messages")
private String routingKey;
required = false,
defaultValue = "topic",
help = "The exchange type to publish the messages on")
private String exchangeType = "topic";

public static RabbitMQSinkConfig load(String yamlFile) throws IOException {
ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
Expand All @@ -64,6 +64,5 @@ public static RabbitMQSinkConfig load(Map<String, Object> map) throws IOExceptio
public void validate() {
super.validate();
Preconditions.checkNotNull(exchangeName, "exchangeName property not set.");
Preconditions.checkNotNull(routingKey, "routingKey property not set.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ public class RabbitMQSourceConfig extends RabbitMQAbstractConfig implements Seri

private static final long serialVersionUID = 1L;

@FieldDoc(
required = true,
defaultValue = "",
help = "The RabbitMQ queue name from which messages should be read from or written to")
private String queueName;

@FieldDoc(
required = false,
defaultValue = "0",
Expand All @@ -63,6 +69,7 @@ public static RabbitMQSourceConfig load(Map<String, Object> map) throws IOExcept
@Override
public void validate() {
super.validate();
Preconditions.checkNotNull(queueName, "queueName property not set.");
Preconditions.checkArgument(prefetchCount >= 0, "prefetchCount must be non-negative.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,14 @@ public final void loadFromYamlFileTest() throws IOException {
assertEquals("/", config.getVirtualHost());
assertEquals("guest", config.getUsername());
assertEquals("guest", config.getPassword());
assertEquals("test-queue", config.getQueueName());
assertEquals("test-connection", config.getConnectionName());
assertEquals(Integer.parseInt("0"), config.getRequestedChannelMax());
assertEquals(Integer.parseInt("0"), config.getRequestedFrameMax());
assertEquals(Integer.parseInt("60000"), config.getConnectionTimeout());
assertEquals(Integer.parseInt("10000"), config.getHandshakeTimeout());
assertEquals(Integer.parseInt("60"), config.getRequestedHeartbeat());
assertEquals("test-exchange", config.getExchangeName());
assertEquals("test-key", config.getRoutingKey());
assertEquals("test-exchange-type", config.getExchangeType());
}

@Test
Expand All @@ -63,15 +62,14 @@ public final void loadFromMapTest() throws IOException {
map.put("virtualHost", "/");
map.put("username", "guest");
map.put("password", "guest");
map.put("queueName", "test-queue");
map.put("connectionName", "test-connection");
map.put("requestedChannelMax", "0");
map.put("requestedFrameMax", "0");
map.put("connectionTimeout", "60000");
map.put("handshakeTimeout", "10000");
map.put("requestedHeartbeat", "60");
map.put("exchangeName", "test-exchange");
map.put("routingKey", "test-key");
map.put("exchangeType", "test-exchange-type");

RabbitMQSinkConfig config = RabbitMQSinkConfig.load(map);
assertNotNull(config);
Expand All @@ -80,15 +78,14 @@ public final void loadFromMapTest() throws IOException {
assertEquals("/", config.getVirtualHost());
assertEquals("guest", config.getUsername());
assertEquals("guest", config.getPassword());
assertEquals("test-queue", config.getQueueName());
assertEquals("test-connection", config.getConnectionName());
assertEquals(Integer.parseInt("0"), config.getRequestedChannelMax());
assertEquals(Integer.parseInt("0"), config.getRequestedFrameMax());
assertEquals(Integer.parseInt("60000"), config.getConnectionTimeout());
assertEquals(Integer.parseInt("10000"), config.getHandshakeTimeout());
assertEquals(Integer.parseInt("60"), config.getRequestedHeartbeat());
assertEquals("test-exchange", config.getExchangeName());
assertEquals("test-key", config.getRoutingKey());
assertEquals("test-exchange-type", config.getExchangeType());
}

@Test
Expand All @@ -99,15 +96,14 @@ public final void validValidateTest() throws IOException {
map.put("virtualHost", "/");
map.put("username", "guest");
map.put("password", "guest");
map.put("queueName", "test-queue");
map.put("connectionName", "test-connection");
map.put("requestedChannelMax", "0");
map.put("requestedFrameMax", "0");
map.put("connectionTimeout", "60000");
map.put("handshakeTimeout", "10000");
map.put("requestedHeartbeat", "60");
map.put("exchangeName", "test-exchange");
map.put("routingKey", "test-key");
map.put("exchangeType", "test-exchange-type");

RabbitMQSinkConfig config = RabbitMQSinkConfig.load(map);
config.validate();
Expand All @@ -122,14 +118,13 @@ public final void missingExchangeValidateTest() throws IOException {
map.put("virtualHost", "/");
map.put("username", "guest");
map.put("password", "guest");
map.put("queueName", "test-queue");
map.put("connectionName", "test-connection");
map.put("requestedChannelMax", "0");
map.put("requestedFrameMax", "0");
map.put("connectionTimeout", "60000");
map.put("handshakeTimeout", "10000");
map.put("requestedHeartbeat", "60");
map.put("routingKey", "test-key");
map.put("exchangeType", "test-exchange-type");

RabbitMQSinkConfig config = RabbitMQSinkConfig.load(map);
config.validate();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,29 +53,28 @@ public void TestOpenAndWriteSink() throws Exception {
configs.put("virtualHost", "default");
configs.put("username", "guest");
configs.put("password", "guest");
configs.put("queueName", "test-queue");
configs.put("connectionName", "test-connection");
configs.put("requestedChannelMax", "0");
configs.put("requestedFrameMax", "0");
configs.put("connectionTimeout", "60000");
configs.put("handshakeTimeout", "10000");
configs.put("requestedHeartbeat", "60");
configs.put("exchangeName", "test-exchange");
configs.put("routingKey", "test-key");
configs.put("exchangeType", "fanout");

RabbitMQSink sink = new RabbitMQSink();

// open should success
sink.open(configs, null);

// write should success
Record<byte[]> record = build("test-topic", "fakeKey", "fakeValue");
Record<byte[]> record = build("test-topic", "fakeKey", "fakeValue", "fakeRoutingKey");
sink.write(record);

sink.close();
}

private Record<byte[]> build(String topic, String key, String value) {
private Record<byte[]> build(String topic, String key, String value, String routingKey) {
// prepare a SinkRecord
SinkRecord<byte[]> record = new SinkRecord<>(new Record<byte[]>() {
@Override
Expand All @@ -96,6 +95,15 @@ public Optional<String> getDestinationTopic() {
return Optional.empty();
}
}

@Override
public Map<String, String> getProperties() {
return new HashMap<String, String>() {
{
put("routingKey", routingKey);
}
};
}
}, value.getBytes(StandardCharsets.UTF_8));
return record;
}
Expand Down
4 changes: 1 addition & 3 deletions pulsar-io/rabbitmq/src/test/resources/sinkConfig.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,12 @@
"virtualHost": "/",
"username": "guest",
"password": "guest",
"queueName": "test-queue",
"connectionName": "test-connection",
"requestedChannelMax": "0",
"requestedFrameMax": "0",
"connectionTimeout": "60000",
"handshakeTimeout": "10000",
"requestedHeartbeat": "60",
"exchangeName": "test-exchange",
"routingKey": "test-key"

"exchangeType": "test-exchange-type"
}

0 comments on commit 33fd7bf

Please sign in to comment.