Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -97,47 +97,47 @@ protected final void channelRead0(final ChannelHandlerContext ctx, final ByteBuf
ctx.writeAndFlush(nextMsg.get());
} else if (handshaker.getStatus() != Handshaker.HandshakeStatus.SUCCESS) {
LOG.debug("waiting for more bytes");
} else {

final Bytes nodeId = handshaker.partyPubKey().getEncodedBytes();
if (!localNode.isReady()) {
// If we're handling a connection before the node is fully up, just disconnect
LOG.debug("Rejecting connection because local node is not ready {}", nodeId);
disconnect(ctx, DisconnectMessage.DisconnectReason.UNKNOWN);
return;
}
return;
}

LOG.trace("Sending framed hello");

// Exchange keys done
final Framer framer = this.framerProvider.buildFramer(handshaker.secrets());

final ByteToMessageDecoder deFramer =
new DeFramer(
framer,
subProtocols,
localNode,
expectedPeer,
connectionEventDispatcher,
connectionFuture,
metricsSystem,
inboundInitiated);

ctx.channel()
.pipeline()
.replace(this, "DeFramer", deFramer)
.addBefore("DeFramer", "validate", new ValidateFirstOutboundMessage(framer));

ctx.writeAndFlush(new OutboundMessage(null, HelloMessage.create(localNode.getPeerInfo())))
.addListener(
ff -> {
if (ff.isSuccess()) {
LOG.trace("Successfully wrote hello message");
}
});
msg.retain();
ctx.fireChannelRead(msg);
final Bytes nodeId = handshaker.partyPubKey().getEncodedBytes();
if (!localNode.isReady()) {
// If we're handling a connection before the node is fully up, just disconnect
LOG.debug("Rejecting connection because local node is not ready {}", nodeId);
disconnect(ctx, DisconnectMessage.DisconnectReason.UNKNOWN);
return;
}

LOG.trace("Sending framed hello");

// Exchange keys done
final Framer framer = this.framerProvider.buildFramer(handshaker.secrets());

final ByteToMessageDecoder deFramer =
new DeFramer(
framer,
subProtocols,
localNode,
expectedPeer,
connectionEventDispatcher,
connectionFuture,
metricsSystem,
inboundInitiated);

ctx.channel()
.pipeline()
.replace(this, "DeFramer", deFramer)
.addBefore("DeFramer", "validate", new ValidateFirstOutboundMessage(framer));

ctx.writeAndFlush(new OutboundMessage(null, HelloMessage.create(localNode.getPeerInfo())))
.addListener(
ff -> {
if (ff.isSuccess()) {
LOG.trace("Successfully wrote hello message");
}
});
msg.retain();
ctx.fireChannelRead(msg);
}

private void disconnect(
Expand Down