diff --git a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/TopicClient.java b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/TopicClient.java index 612160d92a2b..b1be84895129 100644 --- a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/TopicClient.java +++ b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/TopicClient.java @@ -39,13 +39,11 @@ public TopicClient(ConnectionStringBuilder amqpConnectionStringBuilder) throws I } } - public TopicClient(String namespace, String topicPath, ClientSettings clientSettings) throws InterruptedException, ServiceBusException - { + public TopicClient(String namespace, String topicPath, ClientSettings clientSettings) throws InterruptedException, ServiceBusException { this(Util.convertNamespaceToEndPointURI(namespace), topicPath, clientSettings); } - public TopicClient(URI namespaceEndpointURI, String topicPath, ClientSettings clientSettings) throws InterruptedException, ServiceBusException - { + public TopicClient(URI namespaceEndpointURI, String topicPath, ClientSettings clientSettings) throws InterruptedException, ServiceBusException { this(); this.sender = ClientFactory.createMessageSenderFromEntityPath(namespaceEndpointURI, topicPath, MessagingEntityType.TOPIC, clientSettings); this.browser = new MessageBrowser((MessageSender) sender); diff --git a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/TransactionContext.java b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/TransactionContext.java index 14fceeb8cd27..e5e6e531b050 100644 --- a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/TransactionContext.java +++ b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/TransactionContext.java @@ -34,7 +34,9 @@ public TransactionContext(ByteBuffer txnId, MessagingFactory messagingFactory) { * Represents the service-side transactionID * @return transaction ID */ - public ByteBuffer getTransactionId() { return this.txnId; } + public ByteBuffer getTransactionId() { + return this.txnId; + } @Override public String toString() { @@ -105,12 +107,11 @@ public void notifyTransactionCompletion(boolean commit) { } } - void registerHandler(ITransactionHandler handler) - { + void registerHandler(ITransactionHandler handler) { this.txnHandler = handler; } interface ITransactionHandler { - public void onTransactionCompleted(boolean commit); + void onTransactionCompleted(boolean commit); } } diff --git a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/Utils.java b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/Utils.java index da16df6f4059..3f50ae6b3ae9 100644 --- a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/Utils.java +++ b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/Utils.java @@ -20,66 +20,50 @@ public static T completeFuture(CompletableFuture future) throws Interrupt throw ie; } catch (ExecutionException ee) { Throwable cause = ee.getCause(); - if(cause instanceof RuntimeException) - { - throw (RuntimeException)cause; - } - else if (cause instanceof Error) - { - throw (Error)cause; - } - else if (cause instanceof ServiceBusException) - { + if (cause instanceof RuntimeException) { + throw (RuntimeException) cause; + } else if (cause instanceof Error) { + throw (Error) cause; + } else if (cause instanceof ServiceBusException) { throw (ServiceBusException) cause; - } - else - { + } else { throw new ServiceBusException(true, cause); } } } static void assertNonNull(String argumentName, Object argument) { - if (argument == null) + if (argument == null) { throw new IllegalArgumentException("Argument '" + argumentName + "' is null."); + } } - static MessageBody fromSequence(List sequence) - { + static MessageBody fromSequence(List sequence) { List> sequenceData = new ArrayList<>(); sequenceData.add(sequence); return MessageBody.fromSequenceData(sequenceData); } - static MessageBody fromBinay(byte[] binary) - { + static MessageBody fromBinay(byte[] binary) { List binaryData = new ArrayList<>(); binaryData.add(binary); return MessageBody.fromBinaryData(binaryData); } - static byte[] getDataFromMessageBody(MessageBody messageBody) - { + static byte[] getDataFromMessageBody(MessageBody messageBody) { List binaryData = messageBody.getBinaryData(); - if(binaryData == null || binaryData.size() == 0) - { + if (binaryData == null || binaryData.size() == 0) { return null; - } - else - { + } else { return binaryData.get(0); } } - static List getSequenceFromMessageBody(MessageBody messageBody) - { + static List getSequenceFromMessageBody(MessageBody messageBody) { List> sequenceData = messageBody.getSequenceData(); - if(sequenceData == null || sequenceData.size() == 0) - { + if (sequenceData == null || sequenceData.size() == 0) { return null; - } - else - { + } else { return sequenceData.get(0); } } diff --git a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/AmqpConstants.java b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/AmqpConstants.java index 024d0a831395..7efb15e0ad9a 100644 --- a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/AmqpConstants.java +++ b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/AmqpConstants.java @@ -3,10 +3,9 @@ package com.microsoft.azure.servicebus.amqp; -import org.apache.qpid.proton.amqp.*; +import org.apache.qpid.proton.amqp.Symbol; -public final class AmqpConstants -{ +public final class AmqpConstants { private AmqpConstants() { } public static final String APACHE = "apache.org"; diff --git a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/AmqpErrorCode.java b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/AmqpErrorCode.java index b0917c8a0514..ad6c1fa4bcc1 100644 --- a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/AmqpErrorCode.java +++ b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/AmqpErrorCode.java @@ -5,8 +5,7 @@ import org.apache.qpid.proton.amqp.Symbol; -public final class AmqpErrorCode -{ +public final class AmqpErrorCode { public static final Symbol NotFound = Symbol.getSymbol("amqp:not-found"); public static final Symbol UnauthorizedAccess = Symbol.getSymbol("amqp:unauthorized-access"); diff --git a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/BaseLinkHandler.java b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/BaseLinkHandler.java index 7efcc72583c4..a612cf5dc466 100644 --- a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/BaseLinkHandler.java +++ b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/BaseLinkHandler.java @@ -3,8 +3,11 @@ package com.microsoft.azure.servicebus.amqp; -import org.apache.qpid.proton.amqp.transport.*; -import org.apache.qpid.proton.engine.*; +import org.apache.qpid.proton.amqp.transport.ErrorCondition; +import org.apache.qpid.proton.engine.BaseHandler; +import org.apache.qpid.proton.engine.EndpointState; +import org.apache.qpid.proton.engine.Event; +import org.apache.qpid.proton.engine.Link; import org.slf4j.LoggerFactory; import org.slf4j.Logger; diff --git a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/ConnectionHandler.java b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/ConnectionHandler.java index 63501307537c..b1ea762d0d95 100644 --- a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/ConnectionHandler.java +++ b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/ConnectionHandler.java @@ -9,7 +9,6 @@ import javax.net.ssl.SSLContext; -import com.microsoft.azure.servicebus.primitives.MessagingFactory; import com.microsoft.azure.servicebus.primitives.TransportType; import org.apache.qpid.proton.Proton; import org.apache.qpid.proton.amqp.Symbol; @@ -32,38 +31,29 @@ // ServiceBus <-> ProtonReactor interaction handles all // amqp_connection/transport related events from reactor -public class ConnectionHandler extends BaseHandler -{ +public class ConnectionHandler extends BaseHandler { private static final SslDomain.VerifyMode VERIFY_MODE; private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(ConnectionHandler.class); protected final IAmqpConnection messagingFactory; - static - { + static { String verifyModePropValue = System.getProperty(ClientConstants.SSL_VERIFY_MODE_PROPERTY_NAME); - if(ClientConstants.SSL_VERIFY_MODE_ANONYMOUS.equalsIgnoreCase(verifyModePropValue)) - { + if (ClientConstants.SSL_VERIFY_MODE_ANONYMOUS.equalsIgnoreCase(verifyModePropValue)) { VERIFY_MODE = SslDomain.VerifyMode.ANONYMOUS_PEER; - } - else if(ClientConstants.SSL_VERIFY_MODE_CERTONLY.equalsIgnoreCase(verifyModePropValue)) - { + } else if (ClientConstants.SSL_VERIFY_MODE_CERTONLY.equalsIgnoreCase(verifyModePropValue)) { VERIFY_MODE = SslDomain.VerifyMode.VERIFY_PEER; - } - else - { + } else { VERIFY_MODE = SslDomain.VerifyMode.VERIFY_PEER_NAME; } } - protected ConnectionHandler(final IAmqpConnection messagingFactory) - { + protected ConnectionHandler(final IAmqpConnection messagingFactory) { add(new Handshaker()); this.messagingFactory = messagingFactory; } - public static ConnectionHandler create(TransportType transportType, IAmqpConnection messagingFactory) - { - switch(transportType) { + public static ConnectionHandler create(TransportType transportType, IAmqpConnection messagingFactory) { + switch (transportType) { case AMQP_WEB_SOCKETS: if (ProxyConnectionHandler.shouldUseProxy(messagingFactory.getHostName())) { return new ProxyConnectionHandler(messagingFactory); @@ -77,8 +67,7 @@ public static ConnectionHandler create(TransportType transportType, IAmqpConnect } @Override - public void onConnectionInit(Event event) - { + public void onConnectionInit(Event event) { final Connection connection = event.getConnection(); final String hostName = new StringBuilder(messagingFactory.getHostName()) .append(":") @@ -97,18 +86,15 @@ public void onConnectionInit(Event event) connection.open(); } - protected IAmqpConnection getMessagingFactory() - { + protected IAmqpConnection getMessagingFactory() { return this.messagingFactory; } - public void addTransportLayers(final Event event, final TransportInternal transport) - { + public void addTransportLayers(final Event event, final TransportInternal transport) { SslDomain domain = Proton.sslDomain(); domain.init(SslDomain.Mode.CLIENT); - if(VERIFY_MODE == SslDomain.VerifyMode.VERIFY_PEER_NAME) - { + if (VERIFY_MODE == SslDomain.VerifyMode.VERIFY_PEER_NAME) { try { // Default SSL context will have the root certificate from azure in truststore anyway SSLContext defaultContext = SSLContext.getDefault(); @@ -116,16 +102,14 @@ public void addTransportLayers(final Event event, final TransportInternal transp SSLContext strictTlsContext = new StrictTLSContext(strictTlsContextSpi, defaultContext.getProvider(), defaultContext.getProtocol()); domain.setSslContext(strictTlsContext); domain.setPeerAuthentication(SslDomain.VerifyMode.VERIFY_PEER_NAME); - SslPeerDetails peerDetails = Proton.sslPeerDetails(this.getOutboundSocketHostName(), this.getOutboundSocketPort()); + SslPeerDetails peerDetails = Proton.sslPeerDetails(this.getOutboundSocketHostName(), this.getOutboundSocketPort()); transport.ssl(domain, peerDetails); } catch (NoSuchAlgorithmException e) { // Should never happen TRACE_LOGGER.error("Default SSL algorithm not found in JRE. Please check your JRE setup.", e); // this.messagingFactory.onConnectionError(new ErrorCondition(AmqpErrorCode.InternalError, e.getMessage())); } - } - else if (VERIFY_MODE == SslDomain.VerifyMode.VERIFY_PEER) - { + } else if (VERIFY_MODE == SslDomain.VerifyMode.VERIFY_PEER) { // Default SSL context will have the root certificate from azure in truststore anyway try { SSLContext defaultContext = SSLContext.getDefault(); @@ -138,33 +122,34 @@ else if (VERIFY_MODE == SslDomain.VerifyMode.VERIFY_PEER) // this.messagingFactory.onConnectionError(new ErrorCondition(AmqpErrorCode.InternalError, e.getMessage())); } - } - else - { + } else { domain.setPeerAuthentication(SslDomain.VerifyMode.ANONYMOUS_PEER); transport.ssl(domain); } } - protected void notifyTransportErrors(final Event event) { /* no-op */ } + protected void notifyTransportErrors(final Event event) { + /* no-op */ + } - public String getOutboundSocketHostName() { return messagingFactory.getHostName(); } + public String getOutboundSocketHostName() { + return messagingFactory.getHostName(); + } - public int getOutboundSocketPort() { return this.getProtocolPort(); } + public int getOutboundSocketPort() { + return this.getProtocolPort(); + } - public int getProtocolPort() - { + public int getProtocolPort() { return ClientConstants.AMQPS_PORT; } - public int getMaxFrameSize() - { + public int getMaxFrameSize() { return AmqpConstants.MAX_FRAME_SIZE; } @Override - public void onConnectionBound(Event event) - { + public void onConnectionBound(Event event) { TRACE_LOGGER.debug("onConnectionBound: hostname:{}", event.getConnection().getHostname()); Transport transport = event.getTransport(); @@ -174,22 +159,17 @@ public void onConnectionBound(Event event) } @Override - public void onTransportError(Event event) - { + public void onTransportError(Event event) { ErrorCondition condition = event.getTransport().getCondition(); - if (condition != null) - { + if (condition != null) { TRACE_LOGGER.warn("Connection.onTransportError: hostname:{}, error:{}", event.getConnection().getHostname(), condition.getDescription()); - } - else - { + } else { TRACE_LOGGER.warn("Connection.onTransportError: hostname:{}. error:{}", event.getConnection().getHostname(), "no description returned"); } this.messagingFactory.onConnectionError(condition); Connection connection = event.getConnection(); - if(connection != null) - { + if (connection != null) { connection.free(); } @@ -197,23 +177,20 @@ public void onTransportError(Event event) } @Override - public void onConnectionRemoteOpen(Event event) - { + public void onConnectionRemoteOpen(Event event) { TRACE_LOGGER.debug("Connection.onConnectionRemoteOpen: hostname:{}, remotecontainer:{}", event.getConnection().getHostname(), event.getConnection().getRemoteContainer()); this.messagingFactory.onConnectionOpen(); } @Override - public void onConnectionRemoteClose(Event event) - { + public void onConnectionRemoteClose(Event event) { final Connection connection = event.getConnection(); final ErrorCondition error = connection.getRemoteCondition(); TRACE_LOGGER.debug("onConnectionRemoteClose: hostname:{},errorCondition:{}", connection.getHostname(), error != null ? error.getCondition() + "," + error.getDescription() : null); boolean shouldFreeConnection = connection.getLocalState() == EndpointState.CLOSED; this.messagingFactory.onConnectionError(error); - if(shouldFreeConnection) - { + if (shouldFreeConnection) { connection.free(); } } @@ -227,11 +204,9 @@ public void onConnectionFinal(Event event) { public void onConnectionLocalClose(Event event) { Connection connection = event.getConnection(); TRACE_LOGGER.debug("onConnectionLocalClose: hostname:{}", connection.getHostname()); - if(connection.getRemoteState() == EndpointState.CLOSED) - { + if (connection.getRemoteState() == EndpointState.CLOSED) { // Service closed it first. In some such cases transport is not unbound and causing a leak. - if(connection.getTransport() != null) - { + if (connection.getTransport() != null) { connection.getTransport().unbind(); } diff --git a/servicebus/data-plane/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/TestCommons.java b/servicebus/data-plane/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/TestCommons.java index b35d45a76fa9..d47b89d3fb3a 100644 --- a/servicebus/data-plane/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/TestCommons.java +++ b/servicebus/data-plane/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/TestCommons.java @@ -9,7 +9,6 @@ import java.util.Arrays; import java.util.Collection; import java.util.HashMap; -import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -34,7 +33,7 @@ public static void testBasicSend(IMessageSender sender) throws InterruptedExcept public static void testBasicSendBatch(IMessageSender sender) throws InterruptedException, ServiceBusException { List messages = new ArrayList(); - for (int i=0; i<10; i++) { + for (int i = 0; i < 10; i++) { messages.add(new Message("AMQP message")); } sender.sendBatch(messages); @@ -71,8 +70,8 @@ public static void testBasicReceiveAndDeleteWithLargeBinaryData(IMessageSender s private static void testBasicReceiveAndDeleteWithBinaryData(IMessageSender sender, String sessionId, IMessageReceiver receiver, int messageSize) throws InterruptedException, ServiceBusException, ExecutionException { String messageId = UUID.randomUUID().toString(); byte[] binaryData = new byte[messageSize]; - for (int i=0; i< binaryData.length; i++) { - binaryData[i] = (byte)i; + for (int i = 0; i < binaryData.length; i++) { + binaryData[i] = (byte) i; } Message message = new Message(Utils.fromBinay(binaryData)); message.setMessageId(messageId); @@ -117,7 +116,7 @@ public static void testBasicReceiveBatchAndDelete(IMessageSender sender, String int numMessages = 10; if (isEntityPartitioned) { - for (int i=0; i messages = new ArrayList(); - for (int i=0; i messages = new ArrayList(); - for (int i=0; i sentProperties = new HashMap<>(); sentProperties.put("NullProperty", null); sentProperties.put("BooleanProperty", true); - sentProperties.put("ByteProperty", (byte)1); - sentProperties.put("ShortProperty", (short)2); + sentProperties.put("ByteProperty", (byte) 1); + sentProperties.put("ShortProperty", (short) 2); sentProperties.put("IntProperty", 3); - sentProperties.put("LongProperty", 4l); + sentProperties.put("LongProperty", 4L); sentProperties.put("FloatProperty", 5.5f); sentProperties.put("DoubleProperty", 6.6f); sentProperties.put("CharProperty", 'z'); @@ -583,7 +582,7 @@ public static void testSendReceiveMessageWithVariousPropertyTypes(IMessageSender Map receivedProperties = receivedMessage.getProperties(); for (Map.Entry sentEntry : sentProperties.entrySet()) { if (sentEntry.getValue() != null && sentEntry.getValue().getClass().isArray()) { - Assert.assertArrayEquals("Sent property didn't match with received property", (Object[])sentEntry.getValue(), (Object[])receivedProperties.get(sentEntry.getKey())); + Assert.assertArrayEquals("Sent property didn't match with received property", (Object[]) sentEntry.getValue(), (Object[]) receivedProperties.get(sentEntry.getKey())); } else { Assert.assertEquals("Sent property didn't match with received property", sentEntry.getValue(), receivedProperties.get(sentEntry.getKey())); } @@ -593,7 +592,7 @@ public static void testSendReceiveMessageWithVariousPropertyTypes(IMessageSender public static void testGetMessageSessions(IMessageSender sender, Object sessionsClient) throws InterruptedException, ServiceBusException { int numSessions = 110; // More than default page size String[] sessionIds = new String[numSessions]; - for (int i=0; i messages = receiver.receiveBatch(batchSize, DRAIN_MESSAGES_WAIT_TIME); - while (messages !=null && messages.size() > 0) { - if(receiver.getReceiveMode() == ReceiveMode.PEEKLOCK) { + while (messages != null && messages.size() > 0) { + if (receiver.getReceiveMode() == ReceiveMode.PEEKLOCK) { for (IMessage message: messages) { receiver.complete(message.getLockToken()); } @@ -688,16 +687,17 @@ public static void drainAllSessions(String receivePath, boolean isQueue) throws CompletableFuture[] drainFutures = new CompletableFuture[numParallelSessionDrains]; int drainFutureIndex = 0; for (IMessageSession browsableSession : browsableSessions) { - CompletableFuture drainFuture = ClientFactory.acceptSessionFromEntityPathAsync - (TestUtils.getNamespaceEndpointURI(), receivePath, browsableSession.getSessionId(), TestUtils.getClientSettings(), ReceiveMode.RECEIVEANDDELETE).thenAcceptAsync((session) -> { - try { - TestCommons.drainAllMessagesFromReceiver(session, false); - session.setState(null); - session.close(); - } catch (InterruptedException | ServiceBusException e) { - e.printStackTrace(); - } - }); + CompletableFuture drainFuture = ClientFactory.acceptSessionFromEntityPathAsync( + TestUtils.getNamespaceEndpointURI(), receivePath, browsableSession.getSessionId(), + TestUtils.getClientSettings(), ReceiveMode.RECEIVEANDDELETE).thenAcceptAsync((session) -> { + try { + TestCommons.drainAllMessagesFromReceiver(session, false); + session.setState(null); + session.close(); + } catch (InterruptedException | ServiceBusException e) { + e.printStackTrace(); + } + }); drainFutures[drainFutureIndex++] = drainFuture; if (drainFutureIndex == numParallelSessionDrains) { diff --git a/servicebus/data-plane/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/TestUtils.java b/servicebus/data-plane/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/TestUtils.java index 1540b243edff..27f12e9c4559 100644 --- a/servicebus/data-plane/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/TestUtils.java +++ b/servicebus/data-plane/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/TestUtils.java @@ -4,7 +4,11 @@ package com.microsoft.azure.servicebus; import java.io.IOException; -import java.net.*; +import java.net.InetSocketAddress; +import java.net.Proxy; +import java.net.ProxySelector; +import java.net.SocketAddress; +import java.net.URI; import java.util.LinkedList; import java.util.List; import java.util.UUID; @@ -39,16 +43,16 @@ public class TestUtils extends TestBase { // Read proxy settings only if transport type is WebSockets runWithProxy = Boolean.valueOf(System.getenv(RUN_WITH_PROXY_ENV_VAR)); proxyHostName = System.getenv(PROXY_HOSTNAME_ENV_VAR); - proxyPort = System.getenv(PROXY_PORT_ENV_VAR) == null ? - 0 : Integer.valueOf(System.getenv(PROXY_PORT_ENV_VAR)); + proxyPort = System.getenv(PROXY_PORT_ENV_VAR) == null ? 0 : Integer.valueOf(System.getenv(PROXY_PORT_ENV_VAR)); } - public static URI getNamespaceEndpointURI() - { + public static URI getNamespaceEndpointURI() { return namespaceConnectionStringBuilder.getEndpoint(); } - public static String getNamespaceConnectionString() { return namespaceConnectionString; } + public static String getNamespaceConnectionString() { + return namespaceConnectionString; + } public static ClientSettings getClientSettings() { if (runWithProxy) {