Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[JENKINS-61409] Websockets: Use AbstractByteBufferCommandTransport to transport messages #373

Merged
merged 4 commits into from
Mar 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -306,4 +306,13 @@ public final void write(Command cmd, boolean last) throws IOException {
}
}

/**
* Indicates that the endpoint has encountered a problem.
* This tells the transport that it shouldn't expect future invocation of {@link #receive(ByteBuffer)},
* and it'll abort the communication.
*/
public void terminate(IOException e) {
receiver.terminate(e);
}

}
37 changes: 20 additions & 17 deletions src/main/java/hudson/remoting/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -562,49 +562,52 @@ public void afterResponse(HandshakeResponse hr) {
HeaderHandler headerHandler = new HeaderHandler();
class AgentEndpoint extends Endpoint {
@SuppressFBWarnings(value = "UWF_FIELD_NOT_INITIALIZED_IN_CONSTRUCTOR", justification = "just trust me here")
AbstractByteArrayCommandTransport.ByteArrayReceiver receiver;
AgentEndpoint.Transport transport;

@Override
public void onOpen(Session session, EndpointConfig config) {
events.status("WebSocket connection open");
session.addMessageHandler(byte[].class, this::onMessage);
session.addMessageHandler(ByteBuffer.class, this::onMessage);
try {
transport = new Transport(session);
ch.set(new ChannelBuilder(agentName, executor).
withJarCacheOrDefault(jarCache). // unless EngineJnlpConnectionStateListener can be used for this purpose
build(new Transport(session)));
build(transport));
} catch (IOException x) {
events.error(x);
}
}
private void onMessage(byte[] message) {
LOGGER.finest(() -> "received message of length " + message.length);
receiver.handle(message);
private void onMessage(ByteBuffer message) {
try {
transport.receive(message);
} catch (IOException|InterruptedException x) {
events.error(x);
}
}
@Override
public void onClose(Session session, CloseReason closeReason) {
LOGGER.fine(() -> "onClose: " + closeReason);
receiver.terminate(new ChannelClosedException(ch.get(), null));
transport.terminate(new ChannelClosedException(ch.get(), null));
}
@Override
public void onError(Session session, Throwable x) {
// TODO or would events.error(x) be better?
LOGGER.log(Level.FINE, null, x);
receiver.terminate(new ChannelClosedException(ch.get(), x));
transport.terminate(new ChannelClosedException(ch.get(), x));
}
class Transport extends AbstractByteArrayCommandTransport {

class Transport extends AbstractByteBufferCommandTransport {
final Session session;
Transport(Session session) {
this.session = session;
}
@Override
public void setup(AbstractByteArrayCommandTransport.ByteArrayReceiver _receiver) {
events.status("Setting up channel");
receiver = _receiver;
}
@Override
public void writeBlock(Channel channel, byte[] payload) throws IOException {
LOGGER.finest(() -> "sending message of length " + payload.length);
session.getBasicRemote().sendBinary(ByteBuffer.wrap(payload));
protected void write(ByteBuffer header, ByteBuffer data) throws IOException {
LOGGER.finest(() -> "sending message of length + " + ChunkHeader.length(ChunkHeader.peek(header)));
session.getBasicRemote().sendBinary(header, false);
session.getBasicRemote().sendBinary(data, true);
}

@Override
public Capability getRemoteCapability() throws IOException {
return headerHandler.remoteCapability;
Expand Down