Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,11 @@ public TopicClient(ConnectionStringBuilder amqpConnectionStringBuilder) throws I
}
}

public TopicClient(String namespace, String topicPath, ClientSettings clientSettings) throws InterruptedException, ServiceBusException
{
public TopicClient(String namespace, String topicPath, ClientSettings clientSettings) throws InterruptedException, ServiceBusException {
this(Util.convertNamespaceToEndPointURI(namespace), topicPath, clientSettings);
}

public TopicClient(URI namespaceEndpointURI, String topicPath, ClientSettings clientSettings) throws InterruptedException, ServiceBusException
{
public TopicClient(URI namespaceEndpointURI, String topicPath, ClientSettings clientSettings) throws InterruptedException, ServiceBusException {
this();
this.sender = ClientFactory.createMessageSenderFromEntityPath(namespaceEndpointURI, topicPath, MessagingEntityType.TOPIC, clientSettings);
this.browser = new MessageBrowser((MessageSender) sender);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ public TransactionContext(ByteBuffer txnId, MessagingFactory messagingFactory) {
* Represents the service-side transactionID
* @return transaction ID
*/
public ByteBuffer getTransactionId() { return this.txnId; }
public ByteBuffer getTransactionId() {
return this.txnId;
}

@Override
public String toString() {
Expand Down Expand Up @@ -105,12 +107,11 @@ public void notifyTransactionCompletion(boolean commit) {
}
}

void registerHandler(ITransactionHandler handler)
{
void registerHandler(ITransactionHandler handler) {
this.txnHandler = handler;
}

interface ITransactionHandler {
public void onTransactionCompleted(boolean commit);
void onTransactionCompleted(boolean commit);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,66 +20,50 @@ public static <T> T completeFuture(CompletableFuture<T> future) throws Interrupt
throw ie;
} catch (ExecutionException ee) {
Throwable cause = ee.getCause();
if(cause instanceof RuntimeException)
{
throw (RuntimeException)cause;
}
else if (cause instanceof Error)
{
throw (Error)cause;
}
else if (cause instanceof ServiceBusException)
{
if (cause instanceof RuntimeException) {
throw (RuntimeException) cause;
} else if (cause instanceof Error) {
throw (Error) cause;
} else if (cause instanceof ServiceBusException) {
throw (ServiceBusException) cause;
}
else
{
} else {
throw new ServiceBusException(true, cause);
}
}
}

static void assertNonNull(String argumentName, Object argument) {
if (argument == null)
if (argument == null) {
throw new IllegalArgumentException("Argument '" + argumentName + "' is null.");
}
}

static MessageBody fromSequence(List<Object> sequence)
{
static MessageBody fromSequence(List<Object> sequence) {
List<List<Object>> sequenceData = new ArrayList<>();
sequenceData.add(sequence);
return MessageBody.fromSequenceData(sequenceData);
}

static MessageBody fromBinay(byte[] binary)
{
static MessageBody fromBinay(byte[] binary) {
List<byte[]> binaryData = new ArrayList<>();
binaryData.add(binary);
return MessageBody.fromBinaryData(binaryData);
}

static byte[] getDataFromMessageBody(MessageBody messageBody)
{
static byte[] getDataFromMessageBody(MessageBody messageBody) {
List<byte[]> binaryData = messageBody.getBinaryData();
if(binaryData == null || binaryData.size() == 0)
{
if (binaryData == null || binaryData.size() == 0) {
return null;
}
else
{
} else {
return binaryData.get(0);
}
}

static List<Object> getSequenceFromMessageBody(MessageBody messageBody)
{
static List<Object> getSequenceFromMessageBody(MessageBody messageBody) {
List<List<Object>> sequenceData = messageBody.getSequenceData();
if(sequenceData == null || sequenceData.size() == 0)
{
if (sequenceData == null || sequenceData.size() == 0) {
return null;
}
else
{
} else {
return sequenceData.get(0);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,9 @@

package com.microsoft.azure.servicebus.amqp;

import org.apache.qpid.proton.amqp.*;
import org.apache.qpid.proton.amqp.Symbol;

public final class AmqpConstants
{
public final class AmqpConstants {
private AmqpConstants() { }

public static final String APACHE = "apache.org";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@

import org.apache.qpid.proton.amqp.Symbol;

public final class AmqpErrorCode
{
public final class AmqpErrorCode {

public static final Symbol NotFound = Symbol.getSymbol("amqp:not-found");
public static final Symbol UnauthorizedAccess = Symbol.getSymbol("amqp:unauthorized-access");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@

package com.microsoft.azure.servicebus.amqp;

import org.apache.qpid.proton.amqp.transport.*;
import org.apache.qpid.proton.engine.*;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.BaseHandler;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Event;
import org.apache.qpid.proton.engine.Link;
import org.slf4j.LoggerFactory;
import org.slf4j.Logger;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

import javax.net.ssl.SSLContext;

import com.microsoft.azure.servicebus.primitives.MessagingFactory;
import com.microsoft.azure.servicebus.primitives.TransportType;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.Symbol;
Expand All @@ -32,38 +31,29 @@

// ServiceBus <-> ProtonReactor interaction handles all
// amqp_connection/transport related events from reactor
public class ConnectionHandler extends BaseHandler
{
public class ConnectionHandler extends BaseHandler {
private static final SslDomain.VerifyMode VERIFY_MODE;
private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(ConnectionHandler.class);
protected final IAmqpConnection messagingFactory;

static
{
static {
String verifyModePropValue = System.getProperty(ClientConstants.SSL_VERIFY_MODE_PROPERTY_NAME);
if(ClientConstants.SSL_VERIFY_MODE_ANONYMOUS.equalsIgnoreCase(verifyModePropValue))
{
if (ClientConstants.SSL_VERIFY_MODE_ANONYMOUS.equalsIgnoreCase(verifyModePropValue)) {
VERIFY_MODE = SslDomain.VerifyMode.ANONYMOUS_PEER;
}
else if(ClientConstants.SSL_VERIFY_MODE_CERTONLY.equalsIgnoreCase(verifyModePropValue))
{
} else if (ClientConstants.SSL_VERIFY_MODE_CERTONLY.equalsIgnoreCase(verifyModePropValue)) {
VERIFY_MODE = SslDomain.VerifyMode.VERIFY_PEER;
}
else
{
} else {
VERIFY_MODE = SslDomain.VerifyMode.VERIFY_PEER_NAME;
}
}

protected ConnectionHandler(final IAmqpConnection messagingFactory)
{
protected ConnectionHandler(final IAmqpConnection messagingFactory) {
add(new Handshaker());
this.messagingFactory = messagingFactory;
}

public static ConnectionHandler create(TransportType transportType, IAmqpConnection messagingFactory)
{
switch(transportType) {
public static ConnectionHandler create(TransportType transportType, IAmqpConnection messagingFactory) {
switch (transportType) {
case AMQP_WEB_SOCKETS:
if (ProxyConnectionHandler.shouldUseProxy(messagingFactory.getHostName())) {
return new ProxyConnectionHandler(messagingFactory);
Expand All @@ -77,8 +67,7 @@ public static ConnectionHandler create(TransportType transportType, IAmqpConnect
}

@Override
public void onConnectionInit(Event event)
{
public void onConnectionInit(Event event) {
final Connection connection = event.getConnection();
final String hostName = new StringBuilder(messagingFactory.getHostName())
.append(":")
Expand All @@ -97,35 +86,30 @@ public void onConnectionInit(Event event)
connection.open();
}

protected IAmqpConnection getMessagingFactory()
{
protected IAmqpConnection getMessagingFactory() {
return this.messagingFactory;
}

public void addTransportLayers(final Event event, final TransportInternal transport)
{
public void addTransportLayers(final Event event, final TransportInternal transport) {
SslDomain domain = Proton.sslDomain();
domain.init(SslDomain.Mode.CLIENT);

if(VERIFY_MODE == SslDomain.VerifyMode.VERIFY_PEER_NAME)
{
if (VERIFY_MODE == SslDomain.VerifyMode.VERIFY_PEER_NAME) {
try {
// Default SSL context will have the root certificate from azure in truststore anyway
SSLContext defaultContext = SSLContext.getDefault();
StrictTLSContextSpi strictTlsContextSpi = new StrictTLSContextSpi(defaultContext);
SSLContext strictTlsContext = new StrictTLSContext(strictTlsContextSpi, defaultContext.getProvider(), defaultContext.getProtocol());
domain.setSslContext(strictTlsContext);
domain.setPeerAuthentication(SslDomain.VerifyMode.VERIFY_PEER_NAME);
SslPeerDetails peerDetails = Proton.sslPeerDetails(this.getOutboundSocketHostName(), this.getOutboundSocketPort());
SslPeerDetails peerDetails = Proton.sslPeerDetails(this.getOutboundSocketHostName(), this.getOutboundSocketPort());
transport.ssl(domain, peerDetails);
} catch (NoSuchAlgorithmException e) {
// Should never happen
TRACE_LOGGER.error("Default SSL algorithm not found in JRE. Please check your JRE setup.", e);
// this.messagingFactory.onConnectionError(new ErrorCondition(AmqpErrorCode.InternalError, e.getMessage()));
}
}
else if (VERIFY_MODE == SslDomain.VerifyMode.VERIFY_PEER)
{
} else if (VERIFY_MODE == SslDomain.VerifyMode.VERIFY_PEER) {
// Default SSL context will have the root certificate from azure in truststore anyway
try {
SSLContext defaultContext = SSLContext.getDefault();
Expand All @@ -138,33 +122,34 @@ else if (VERIFY_MODE == SslDomain.VerifyMode.VERIFY_PEER)
// this.messagingFactory.onConnectionError(new ErrorCondition(AmqpErrorCode.InternalError, e.getMessage()));
}

}
else
{
} else {
domain.setPeerAuthentication(SslDomain.VerifyMode.ANONYMOUS_PEER);
transport.ssl(domain);
}
}

protected void notifyTransportErrors(final Event event) { /* no-op */ }
protected void notifyTransportErrors(final Event event) {
/* no-op */
}

public String getOutboundSocketHostName() { return messagingFactory.getHostName(); }
public String getOutboundSocketHostName() {
return messagingFactory.getHostName();
}

public int getOutboundSocketPort() { return this.getProtocolPort(); }
public int getOutboundSocketPort() {
return this.getProtocolPort();
}

public int getProtocolPort()
{
public int getProtocolPort() {
return ClientConstants.AMQPS_PORT;
}

public int getMaxFrameSize()
{
public int getMaxFrameSize() {
return AmqpConstants.MAX_FRAME_SIZE;
}

@Override
public void onConnectionBound(Event event)
{
public void onConnectionBound(Event event) {
TRACE_LOGGER.debug("onConnectionBound: hostname:{}", event.getConnection().getHostname());
Transport transport = event.getTransport();

Expand All @@ -174,46 +159,38 @@ public void onConnectionBound(Event event)
}

@Override
public void onTransportError(Event event)
{
public void onTransportError(Event event) {
ErrorCondition condition = event.getTransport().getCondition();
if (condition != null)
{
if (condition != null) {
TRACE_LOGGER.warn("Connection.onTransportError: hostname:{}, error:{}", event.getConnection().getHostname(), condition.getDescription());
}
else
{
} else {
TRACE_LOGGER.warn("Connection.onTransportError: hostname:{}. error:{}", event.getConnection().getHostname(), "no description returned");
}

this.messagingFactory.onConnectionError(condition);
Connection connection = event.getConnection();
if(connection != null)
{
if (connection != null) {
connection.free();
}

this.notifyTransportErrors(event);
}

@Override
public void onConnectionRemoteOpen(Event event)
{
public void onConnectionRemoteOpen(Event event) {
TRACE_LOGGER.debug("Connection.onConnectionRemoteOpen: hostname:{}, remotecontainer:{}", event.getConnection().getHostname(), event.getConnection().getRemoteContainer());
this.messagingFactory.onConnectionOpen();
}

@Override
public void onConnectionRemoteClose(Event event)
{
public void onConnectionRemoteClose(Event event) {
final Connection connection = event.getConnection();
final ErrorCondition error = connection.getRemoteCondition();

TRACE_LOGGER.debug("onConnectionRemoteClose: hostname:{},errorCondition:{}", connection.getHostname(), error != null ? error.getCondition() + "," + error.getDescription() : null);
boolean shouldFreeConnection = connection.getLocalState() == EndpointState.CLOSED;
this.messagingFactory.onConnectionError(error);
if(shouldFreeConnection)
{
if (shouldFreeConnection) {
connection.free();
}
}
Expand All @@ -227,11 +204,9 @@ public void onConnectionFinal(Event event) {
public void onConnectionLocalClose(Event event) {
Connection connection = event.getConnection();
TRACE_LOGGER.debug("onConnectionLocalClose: hostname:{}", connection.getHostname());
if(connection.getRemoteState() == EndpointState.CLOSED)
{
if (connection.getRemoteState() == EndpointState.CLOSED) {
// Service closed it first. In some such cases transport is not unbound and causing a leak.
if(connection.getTransport() != null)
{
if (connection.getTransport() != null) {
connection.getTransport().unbind();
}

Expand Down
Loading