Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prepare a StreamingOutput response when serving file downloads. #1829

Merged
merged 7 commits into from
Aug 16, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@
import static com.hubspot.singularity.WebExceptions.checkNotFound;
import static com.hubspot.singularity.WebExceptions.notFound;

import java.io.InputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.ConnectException;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;

import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes;
Expand All @@ -25,6 +27,7 @@
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.StreamingOutput;

import org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.slf4j.Logger;
Expand Down Expand Up @@ -82,7 +85,13 @@
import com.hubspot.singularity.data.TaskManager;
import com.hubspot.singularity.data.TaskRequestManager;
import com.hubspot.singularity.helpers.RequestHelper;
import com.ning.http.client.AsyncHandler;
import com.ning.http.client.AsyncHttpClient;
import com.ning.http.client.AsyncHttpClient.BoundRequestBuilder;
import com.ning.http.client.HttpResponseBodyPart;
import com.ning.http.client.HttpResponseHeaders;
import com.ning.http.client.HttpResponseStatus;
import com.ning.http.client.PerRequestConfig;

import io.dropwizard.auth.Auth;
import io.swagger.v3.oas.annotations.Operation;
Expand Down Expand Up @@ -634,19 +643,22 @@ public Response downloadFileOverProxy(
httpPrefix, slaveHostname, httpPort);

try {
final InputStream responseStream = httpClient.prepareGet(url)
.addQueryParameter("path", fileFullPath)
.execute()
.get()
.getResponseBodyAsStream();
PerRequestConfig unlimitedTimeout = new PerRequestConfig();
unlimitedTimeout.setRequestTimeoutInMs(-1);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default 60 second timeout on our AsyncHttpClient is insufficient for large file downloads.


NingOutputToJaxRsStreamingOutputWrapper streamingOutputNingHandler = new NingOutputToJaxRsStreamingOutputWrapper(
httpClient
.prepareGet(url)
.addQueryParameter("path", fileFullPath)
.setPerRequestConfig(unlimitedTimeout)
);

// Strip file path down to just a file name if we can
java.nio.file.Path filePath = Paths.get(fileFullPath).getFileName();
String fileName = filePath != null ? filePath.toString() : fileFullPath;

final String headerValue = String.format("attachment; filename=\"%s\"", fileName);
return Response.ok(responseStream).header("Content-Disposition", headerValue).build();

return Response.ok(streamingOutputNingHandler).header("Content-Disposition", headerValue).build();
} catch (Exception e) {
if (e.getCause().getClass() == ConnectException.class) {
throw new SlaveNotFoundException(e);
Expand All @@ -656,4 +668,56 @@ public Response downloadFileOverProxy(
}

}

private static class NingOutputToJaxRsStreamingOutputWrapper implements AsyncHandler<Void>, StreamingOutput {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fully open to suggestions on a better name for this, heh.

private OutputStream wrappedOutputStream;
private BoundRequestBuilder requestBuilder;

public NingOutputToJaxRsStreamingOutputWrapper(BoundRequestBuilder requestBuilder) {
this.requestBuilder = requestBuilder;
}

@Override
public void onThrowable(Throwable t) {
LOG.error("Unable to proxy file download", t);
}

@Override
public STATE onBodyPartReceived(HttpResponseBodyPart bodyPart) throws Exception {
bodyPart.writeTo(wrappedOutputStream);
wrappedOutputStream.flush();
return STATE.CONTINUE;
}

@Override
public STATE onStatusReceived(HttpResponseStatus responseStatus) {
LOG.trace("Proxying download of {} from Mesos: Status={}", requestBuilder.build().getUrl(), responseStatus.getStatusCode());
return STATE.CONTINUE;
}

@Override
public STATE onHeadersReceived(HttpResponseHeaders headers) {
LOG.trace("Proxying download of {} from Mesos: Headers={}", requestBuilder.build().getUrl(), headers.getHeaders());
return STATE.CONTINUE;
}

@Override
public Void onCompleted() {
LOG.info("Proxying download of {} from Mesos: Completed.", requestBuilder.build().getUrl());
return null;
}

// StreamingOutput implementation: just make the OutputStream available to the AsyncHandler<> implementation.
@Override
public void write(OutputStream output) throws WebApplicationException, IOException {
if (wrappedOutputStream == null) {
wrappedOutputStream = output;
try {
requestBuilder.execute(this).get();
Copy link
Contributor Author

@baconmania baconmania Aug 8, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to do it this way because the OutputStream doesn't become available to us until Jersey actually calls write() on this implementation. The alternative would be to begin the proxied request before waiting for Jersey, in which case we'd be back to buffering a potentially large response in memory.

The get() here on the Future is also necessary because Jersey expects write() to be a blocking call, and closes the provided OutputStream after this method returns.

} catch (ExecutionException | InterruptedException e) {
LOG.error("Failed or interrupted while proxying a download from Mesos", e);
}
}
}
}
}