Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,11 @@
import org.apache.qpid.proton.engine.Transport;
import org.apache.qpid.proton.reactor.impl.IOHandler;

public class CustomIOHandler extends IOHandler
{
public class CustomIOHandler extends IOHandler {
@Override
public void onConnectionLocalOpen(Event event)
{
public void onConnectionLocalOpen(Event event) {
Connection connection = event.getConnection();
if (connection.getRemoteState() != EndpointState.UNINITIALIZED)
{
if (connection.getRemoteState() != EndpointState.UNINITIALIZED) {
return;
}

Expand All @@ -28,10 +25,8 @@ public void onConnectionLocalOpen(Event event)
}

@Override
public void onTransportClosed(Event event)
{
if(event.getTransport() != null)
{
public void onTransportClosed(Event event) {
if (event.getTransport() != null) {
event.getTransport().unbind();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,8 @@
import org.apache.qpid.proton.engine.BaseHandler;
import org.apache.qpid.proton.engine.Event;

public abstract class DispatchHandler extends BaseHandler
{
@Override public void onTimerTask(Event e)
{
public abstract class DispatchHandler extends BaseHandler {
@Override public void onTimerTask(Event e) {
this.onEvent();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,10 @@

package com.microsoft.azure.servicebus.amqp;

import com.microsoft.azure.servicebus.ClientSettings;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.Link;

public interface IAmqpConnection
{
public interface IAmqpConnection {
String getHostName();

void onConnectionOpen();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@

import org.apache.qpid.proton.amqp.transport.ErrorCondition;

public interface IAmqpLink
{
public interface IAmqpLink {
/**
* @param completionException completionException=null if open is successful
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

import org.apache.qpid.proton.engine.Delivery;

public interface IAmqpReceiver extends IAmqpLink
{
public interface IAmqpReceiver extends IAmqpLink {
void onReceiveComplete(Delivery delivery);
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@

import org.apache.qpid.proton.engine.Delivery;

public interface IAmqpSender extends IAmqpLink
{
public interface IAmqpSender extends IAmqpLink {
void onFlow(final int creditIssued);

void onSendComplete(final Delivery delivery);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,8 @@ public class LoggingHandler extends BaseHandler {
private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(LoggingHandler.class);

@Override
public void onUnhandled(Event event)
{
if(TRACE_LOGGER.isTraceEnabled() && event.getType() != Type.REACTOR_QUIESCED ) // Too may REACTOR_QUIESCED events will be raised
{
public void onUnhandled(Event event) {
if (TRACE_LOGGER.isTraceEnabled() && event.getType() != Type.REACTOR_QUIESCED) { // Too may REACTOR_QUIESCED events will be raised
TRACE_LOGGER.trace("Event raised by protonj: {}", event.toString());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,11 @@
import org.apache.qpid.proton.reactor.Reactor;
import org.apache.qpid.proton.reactor.ReactorOptions;

public final class ProtonUtil
{
private ProtonUtil()
{
public final class ProtonUtil {
private ProtonUtil() {
}

public static Reactor reactor(ReactorHandler reactorHandler, final int maxFrameSize) throws IOException
{
public static Reactor reactor(ReactorHandler reactorHandler, final int maxFrameSize) throws IOException {
final ReactorOptions reactorOptions = new ReactorOptions();
reactorOptions.setMaxFrameSize(maxFrameSize);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

import com.microsoft.azure.servicebus.primitives.ClientConstants;
import com.microsoft.azure.servicebus.primitives.StringUtil;
import com.microsoft.azure.servicebus.primitives.MessagingFactory;
import org.apache.qpid.proton.amqp.transport.ConnectionError;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.Connection;
Expand All @@ -20,7 +19,13 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.*;

import java.net.Authenticator;
import java.net.InetSocketAddress;
import java.net.PasswordAuthentication;
import java.net.Proxy;
import java.net.ProxySelector;
import java.net.URI;
import java.util.Base64;
import java.util.HashMap;
import java.util.List;
Expand All @@ -43,7 +48,9 @@ public static boolean shouldUseProxy(final String hostName) {
return isProxyAddressLegal(proxies);
}

public ProxyConnectionHandler(IAmqpConnection messagingFactory) { super(messagingFactory); }
public ProxyConnectionHandler(IAmqpConnection messagingFactory) {
super(messagingFactory);
}

@Override
public void addTransportLayers(final Event event, final TransportInternal transport) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ private static final class DelayHandler extends BaseHandler {
final BaseHandler timerCallback;
final Reactor reactor;

public DelayHandler(final Reactor reactor, final int delay, final DispatchHandler timerCallback) {
DelayHandler(final Reactor reactor, final int delay, final DispatchHandler timerCallback) {
this.delay = delay;
this.timerCallback = timerCallback;
this.reactor = reactor;
Expand All @@ -91,8 +91,7 @@ public void run(Selectable selectable) {
try {
ioSignal.source().read(ByteBuffer.allocate(1024));
} catch (ClosedChannelException ignorePipeClosedDuringReactorShutdown) {

} catch(IOException ioException) {
} catch (IOException ioException) {
throw new RuntimeException(ioException);
}

Expand All @@ -108,24 +107,21 @@ private final class CloseHandler implements Callback {
public void run(Selectable selectable) {
try {
selectable.getChannel().close();
} catch (IOException ignore) {
}
} catch (IOException ignore) { }

try {
if (ioSignal.sink().isOpen()) {
ioSignal.sink().close();
}
} catch (IOException ignore) {
}
} catch (IOException ignore) { }

workScheduler.run(null);

try {
if (ioSignal.source().isOpen()) {
ioSignal.source().close();
}
} catch (IOException ignore) {
}
} catch (IOException ignore) { }
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,19 @@

import com.microsoft.azure.servicebus.primitives.ClientConstants;

public class ReactorHandler extends BaseHandler
{
public class ReactorHandler extends BaseHandler {
private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(ReactorHandler.class);

@Override
public void onReactorInit(Event e)
{
public void onReactorInit(Event e) {
TRACE_LOGGER.debug("reactor.onReactorInit");

final Reactor reactor = e.getReactor();
reactor.setTimeout(ClientConstants.REACTOR_IO_POLL_TIMEOUT);
}

@Override
public void onReactorFinal(Event e)
{
public void onReactorFinal(Event e) {
TRACE_LOGGER.debug("reactor.onReactorFinal");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,14 @@

// ServiceBus <-> ProtonReactor interaction
// handles all recvLink - reactor events
public final class ReceiveLinkHandler extends BaseLinkHandler
{
public final class ReceiveLinkHandler extends BaseLinkHandler {
private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(ReceiveLinkHandler.class);

private final IAmqpReceiver amqpReceiver;
private final Object firstResponse;
private boolean isFirstResponse;

public ReceiveLinkHandler(final IAmqpReceiver receiver)
{
public ReceiveLinkHandler(final IAmqpReceiver receiver) {
super(receiver);

this.amqpReceiver = receiver;
Expand All @@ -30,47 +28,36 @@ public ReceiveLinkHandler(final IAmqpReceiver receiver)
}

@Override
public void onLinkLocalOpen(Event evt)
{
public void onLinkLocalOpen(Event evt) {
Link link = evt.getLink();
if (link instanceof Receiver)
{
if (link instanceof Receiver) {
Receiver receiver = (Receiver) link;
TRACE_LOGGER.debug("onLinkLocalOpen: linkName:{}, localSource:{}", receiver.getName(), receiver.getSource());
}
}

@Override
public void onLinkRemoteOpen(Event event)
{
public void onLinkRemoteOpen(Event event) {
Link link = event.getLink();
if (link != null && link instanceof Receiver)
{
if (link != null && link instanceof Receiver) {
Receiver receiver = (Receiver) link;
if (link.getRemoteSource() != null)
{
if (link.getRemoteSource() != null) {
TRACE_LOGGER.debug("onLinkRemoteOpen: linkName:{}, remoteSource:{}", receiver.getName(), receiver.getRemoteSource());

synchronized (this.firstResponse)
{
synchronized (this.firstResponse) {
this.isFirstResponse = false;
this.amqpReceiver.onOpenComplete(null);
}
}
else
{
} else {
TRACE_LOGGER.debug("onLinkRemoteOpen: linkName:{}, remoteTarget:{}, remoteTarget:{}, action:{}", receiver.getName(), null, null, "waitingForError");
}
}
}

@Override
public void onDelivery(Event event)
{
synchronized (this.firstResponse)
{
if (this.isFirstResponse)
{
public void onDelivery(Event event) {
synchronized (this.firstResponse) {
if (this.isFirstResponse) {
this.isFirstResponse = false;
this.amqpReceiver.onOpenComplete(null);
}
Expand All @@ -87,8 +74,7 @@ public void onDelivery(Event event)
// If a message spans across deliveries (for ex: 200k message will be 4 frames (deliveries) 64k 64k 64k 8k),
// all until "last-1" deliveries will be partial
// reactor will raise onDelivery event for all of these - we only need the last one
if (!delivery.isPartial())
{
if (!delivery.isPartial()) {
this.amqpReceiver.onReceiveComplete(delivery);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,51 +10,43 @@
import org.slf4j.LoggerFactory;
import org.slf4j.Logger;

public class SessionHandler extends BaseHandler
{
public class SessionHandler extends BaseHandler {
protected static final Logger TRACE_LOGGER = LoggerFactory.getLogger(SessionHandler.class);

private final String name;

public SessionHandler(final String name)
{
public SessionHandler(final String name) {
this.name = name;
}

@Override
public void onSessionRemoteOpen(Event e)
{
public void onSessionRemoteOpen(Event e) {
TRACE_LOGGER.debug("onSessionRemoteOpen - entityName: {}, sessionIncCapacity: {}, sessionOutgoingWindow: {}", this.name, e.getSession().getIncomingCapacity(), e.getSession().getOutgoingWindow());

Session session = e.getSession();
if (session != null && session.getLocalState() == EndpointState.UNINITIALIZED)
{
if (session != null && session.getLocalState() == EndpointState.UNINITIALIZED) {
session.open();
}
}


@Override
public void onSessionLocalClose(Event e)
{
public void onSessionLocalClose(Event e) {
TRACE_LOGGER.debug("onSessionLocalClose - entityName: {}, condition: {}", this.name, e.getSession().getCondition() == null ? "none" : e.getSession().getCondition().toString());
}

@Override
public void onSessionRemoteClose(Event e)
{
public void onSessionRemoteClose(Event e) {
TRACE_LOGGER.debug("onSessionRemoteClose - entityName: {}, condition: {}", this.name, e.getSession().getCondition() == null ? "none" : e.getSession().getCondition().toString());

Session session = e.getSession();
if (session != null && session.getLocalState() != EndpointState.CLOSED)
{
if (session != null && session.getLocalState() != EndpointState.CLOSED) {
session.close();
}
}

@Override
public void onSessionFinal(Event e)
{
public void onSessionFinal(Event e) {
TRACE_LOGGER.debug("onSessionFinal - entityName: {}", this.name);
}
}
Loading