Skip to content

Commit

Permalink
Merge pull request #1829 from HubSpot/proper-download-streaming
Browse files Browse the repository at this point in the history
Prepare a StreamingOutput response when serving file downloads.
  • Loading branch information
ssalinas authored Aug 16, 2018
2 parents 9cb03f5 + 2d24a6d commit fa2a287
Showing 1 changed file with 72 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@
import static com.hubspot.singularity.WebExceptions.checkNotFound;
import static com.hubspot.singularity.WebExceptions.notFound;

import java.io.InputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.ConnectException;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;

import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes;
Expand All @@ -25,6 +27,7 @@
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.StreamingOutput;

import org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.slf4j.Logger;
Expand Down Expand Up @@ -82,7 +85,13 @@
import com.hubspot.singularity.data.TaskManager;
import com.hubspot.singularity.data.TaskRequestManager;
import com.hubspot.singularity.helpers.RequestHelper;
import com.ning.http.client.AsyncHandler;
import com.ning.http.client.AsyncHttpClient;
import com.ning.http.client.AsyncHttpClient.BoundRequestBuilder;
import com.ning.http.client.HttpResponseBodyPart;
import com.ning.http.client.HttpResponseHeaders;
import com.ning.http.client.HttpResponseStatus;
import com.ning.http.client.PerRequestConfig;

import io.dropwizard.auth.Auth;
import io.swagger.v3.oas.annotations.Operation;
Expand Down Expand Up @@ -634,19 +643,22 @@ public Response downloadFileOverProxy(
httpPrefix, slaveHostname, httpPort);

try {
final InputStream responseStream = httpClient.prepareGet(url)
.addQueryParameter("path", fileFullPath)
.execute()
.get()
.getResponseBodyAsStream();
PerRequestConfig unlimitedTimeout = new PerRequestConfig();
unlimitedTimeout.setRequestTimeoutInMs(-1);

NingOutputToJaxRsStreamingOutputWrapper streamingOutputNingHandler = new NingOutputToJaxRsStreamingOutputWrapper(
httpClient
.prepareGet(url)
.addQueryParameter("path", fileFullPath)
.setPerRequestConfig(unlimitedTimeout)
);

// Strip file path down to just a file name if we can
java.nio.file.Path filePath = Paths.get(fileFullPath).getFileName();
String fileName = filePath != null ? filePath.toString() : fileFullPath;

final String headerValue = String.format("attachment; filename=\"%s\"", fileName);
return Response.ok(responseStream).header("Content-Disposition", headerValue).build();

return Response.ok(streamingOutputNingHandler).header("Content-Disposition", headerValue).build();
} catch (Exception e) {
if (e.getCause().getClass() == ConnectException.class) {
throw new SlaveNotFoundException(e);
Expand All @@ -656,4 +668,56 @@ public Response downloadFileOverProxy(
}

}

private static class NingOutputToJaxRsStreamingOutputWrapper implements AsyncHandler<Void>, StreamingOutput {
private OutputStream wrappedOutputStream;
private BoundRequestBuilder requestBuilder;

public NingOutputToJaxRsStreamingOutputWrapper(BoundRequestBuilder requestBuilder) {
this.requestBuilder = requestBuilder;
}

@Override
public void onThrowable(Throwable t) {
LOG.error("Unable to proxy file download", t);
}

@Override
public STATE onBodyPartReceived(HttpResponseBodyPart bodyPart) throws Exception {
bodyPart.writeTo(wrappedOutputStream);
wrappedOutputStream.flush();
return STATE.CONTINUE;
}

@Override
public STATE onStatusReceived(HttpResponseStatus responseStatus) {
LOG.trace("Proxying download of {} from Mesos: Status={}", requestBuilder.build().getUrl(), responseStatus.getStatusCode());
return STATE.CONTINUE;
}

@Override
public STATE onHeadersReceived(HttpResponseHeaders headers) {
LOG.trace("Proxying download of {} from Mesos: Headers={}", requestBuilder.build().getUrl(), headers.getHeaders());
return STATE.CONTINUE;
}

@Override
public Void onCompleted() {
LOG.info("Proxying download of {} from Mesos: Completed.", requestBuilder.build().getUrl());
return null;
}

// StreamingOutput implementation: just make the OutputStream available to the AsyncHandler<> implementation.
@Override
public void write(OutputStream output) throws WebApplicationException, IOException {
if (wrappedOutputStream == null) {
wrappedOutputStream = output;
try {
requestBuilder.execute(this).get();
} catch (ExecutionException | InterruptedException e) {
LOG.error("Failed or interrupted while proxying a download from Mesos", e);
}
}
}
}
}

0 comments on commit fa2a287

Please sign in to comment.