From 863415d6505d10c16785b5ffd308c02bec509ebf Mon Sep 17 00:00:00 2001 From: Vijaya Gopal Yarramneni Date: Wed, 20 May 2020 22:35:50 -0700 Subject: [PATCH] Setting MessageTTL as the difference between ExpiryTime and CreationTime, along with TTL header. This is to overcome the limitation that TTL header doesn't support values larger than 50 days. --- .../azure/servicebus/MessageConverter.java | 18 ++++++++++++++++-- .../servicebus/primitives/ClientConstants.java | 1 + .../azure/servicebus/SendReceiveTests.java | 6 ++++++ .../azure/servicebus/TestCommons.java | 18 ++++++++++++++++++ 4 files changed, 41 insertions(+), 2 deletions(-) diff --git a/sdk/servicebus/microsoft-azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageConverter.java b/sdk/servicebus/microsoft-azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageConverter.java index 06e4d1a67823..9fc3e59398af 100644 --- a/sdk/servicebus/microsoft-azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageConverter.java +++ b/sdk/servicebus/microsoft-azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageConverter.java @@ -4,6 +4,7 @@ package com.microsoft.azure.servicebus; import java.time.Duration; +import java.time.Instant; import java.util.Date; import java.util.HashMap; import java.util.List; @@ -46,7 +47,15 @@ public static org.apache.qpid.proton.message.Message convertBrokeredMessageToAmq } if (brokeredMessage.getTimeToLive() != null) { - amqpMessage.setTtl(brokeredMessage.getTimeToLive().toMillis()); + long ttlMillis = brokeredMessage.getTimeToLive().toMillis(); + if (ttlMillis > ClientConstants.UNSIGNED_INT_MAX_VALUE) { + ttlMillis = ClientConstants.UNSIGNED_INT_MAX_VALUE; + } + amqpMessage.setTtl(ttlMillis); + Instant creationTime = Instant.now(); + Instant absoluteExpiryTime = creationTime.plus(brokeredMessage.getTimeToLive()); + amqpMessage.setCreationTime(creationTime.toEpochMilli()); + amqpMessage.setExpiryTime(absoluteExpiryTime.toEpochMilli()); } amqpMessage.setMessageId(brokeredMessage.getMessageId()); @@ -120,10 +129,15 @@ public static Message convertAmqpMessageToBrokeredMessage(org.apache.qpid.proton } // Header - brokeredMessage.setTimeToLive(Duration.ofMillis(amqpMessage.getTtl())); brokeredMessage.setDeliveryCount(amqpMessage.getDeliveryCount()); + brokeredMessage.setTimeToLive(Duration.ofMillis(amqpMessage.getTtl())); + // Properties + // Override TimeToLive from CrationTime and ExpiryTime, as they support duration of any length, which ttl doesn't + if (amqpMessage.getCreationTime() != 0l && amqpMessage.getExpiryTime() != 0l) { + brokeredMessage.setTimeToLive(Duration.ofMillis(amqpMessage.getExpiryTime() - amqpMessage.getCreationTime())); + } Object messageId = amqpMessage.getMessageId(); if (messageId != null) { brokeredMessage.setMessageId(messageId.toString()); diff --git a/sdk/servicebus/microsoft-azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/ClientConstants.java b/sdk/servicebus/microsoft-azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/ClientConstants.java index e7ec419d11c7..eb74dbbf5d0e 100644 --- a/sdk/servicebus/microsoft-azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/ClientConstants.java +++ b/sdk/servicebus/microsoft-azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/ClientConstants.java @@ -39,6 +39,7 @@ private ClientConstants() { } public static final String VIAPARTITIONKEYNAME = "x-opt-via-partition-key"; public static final String DEADLETTERSOURCENAME = "x-opt-deadletter-source"; public static final UUID ZEROLOCKTOKEN = new UUID(0L, 0L); + public static final long UNSIGNED_INT_MAX_VALUE = 4294967295l; public static final int AMQPS_PORT = 5671; public static final int HTTPS_PORT = 443; diff --git a/sdk/servicebus/microsoft-azure-servicebus/src/test/java/com/microsoft/azure/servicebus/SendReceiveTests.java b/sdk/servicebus/microsoft-azure-servicebus/src/test/java/com/microsoft/azure/servicebus/SendReceiveTests.java index 35f7105193a7..be65c338fe2e 100644 --- a/sdk/servicebus/microsoft-azure-servicebus/src/test/java/com/microsoft/azure/servicebus/SendReceiveTests.java +++ b/sdk/servicebus/microsoft-azure-servicebus/src/test/java/com/microsoft/azure/servicebus/SendReceiveTests.java @@ -146,6 +146,12 @@ public void testBasicReceiveAndComplete() throws InterruptedException, ServiceBu this.receiver = ClientFactory.createMessageReceiverFromEntityPath(factory, this.receiveEntityPath, ReceiveMode.PEEKLOCK); TestCommons.testBasicReceiveAndComplete(this.sender, this.sessionId, this.receiver); } + + @Test + public void testLargeTimeToLiveOnMessage() throws InterruptedException, ServiceBusException, ExecutionException { + this.receiver = ClientFactory.createMessageReceiverFromEntityPath(factory, this.receiveEntityPath, ReceiveMode.PEEKLOCK); + TestCommons.testLargeTimeToLiveOnMessage(this.sender, this.sessionId, this.receiver); + } @Test public void testBasicReceiveAndCompleteMessageWithProperties() throws InterruptedException, ServiceBusException, ExecutionException { diff --git a/sdk/servicebus/microsoft-azure-servicebus/src/test/java/com/microsoft/azure/servicebus/TestCommons.java b/sdk/servicebus/microsoft-azure-servicebus/src/test/java/com/microsoft/azure/servicebus/TestCommons.java index 2908d3c121f3..d0a4fb446e47 100644 --- a/sdk/servicebus/microsoft-azure-servicebus/src/test/java/com/microsoft/azure/servicebus/TestCommons.java +++ b/sdk/servicebus/microsoft-azure-servicebus/src/test/java/com/microsoft/azure/servicebus/TestCommons.java @@ -394,6 +394,24 @@ public static void testSendSceduledMessageAndCancel(IMessageSender sender, Strin Assert.assertTrue("Scheduled messages not received", allReceivedMessages.removeIf(msg -> msg.getMessageId().equals(msgId1))); Assert.assertFalse("Cancelled scheduled messages also received", allReceivedMessages.removeIf(msg -> msg.getMessageId().equals(msgId2))); } + + public static void testLargeTimeToLiveOnMessage(IMessageSender sender, String sessionId, IMessageReceiver receiver) throws InterruptedException, ServiceBusException, ExecutionException { + Message message = new Message("AMQP message"); + if (sessionId != null) { + message.setSessionId(sessionId); + } + // Must be larger than 50 days to exceed the max duration supported ttl header + Duration timeToLive = Duration.ofDays(100); + message.setTimeToLive(timeToLive); + sender.send(message); + + IMessage receivedMessage = receiver.receive(); + Assert.assertNotNull("Message not received", receivedMessage); + Assert.assertEquals("TimeToLive value didn't match", timeToLive, receivedMessage.getTimeToLive()); + receiver.complete(receivedMessage.getLockToken()); + receivedMessage = receiver.receive(SHORT_WAIT_TIME); + Assert.assertNull("Message was not properly completed", receivedMessage); + } public static void testPeekMessage(IMessageSender sender, String sessionId, IMessageBrowser browser) throws InterruptedException, ServiceBusException { Message message = new Message("AMQP Scheduled message");