|
43 | 43 | import java.io.ByteArrayInputStream;
|
44 | 44 | import java.io.FileInputStream;
|
45 | 45 | import java.io.FilterInputStream;
|
46 |
| -import java.io.FilterOutputStream; |
47 | 46 | import java.io.IOException;
|
48 | 47 | import java.io.InputStream;
|
49 | 48 | import java.io.OutputStream;
|
@@ -199,23 +198,31 @@ public boolean get(String key, OutputStream out) throws IOException, Interrupted
|
199 | 198 | }
|
200 | 199 |
|
201 | 200 | @SuppressWarnings("FutureReturnValueIgnored")
|
202 |
| - private boolean get(String key, OutputStream out, boolean casDownload) |
| 201 | + private boolean get(String key, final OutputStream out, boolean casDownload) |
203 | 202 | throws IOException, InterruptedException {
|
204 | 203 | final AtomicBoolean dataWritten = new AtomicBoolean();
|
| 204 | + |
205 | 205 | OutputStream wrappedOut =
|
206 |
| - new FilterOutputStream(out) { |
| 206 | + new OutputStream() { |
| 207 | + // OutputStream.close() does nothing, which is what we want to ensure that the |
| 208 | + // OutputStream can't be closed somewhere in the Netty pipeline, so that we can support |
| 209 | + // retries. The OutputStream is closed in the finally block below. |
| 210 | + |
| 211 | + @Override |
| 212 | + public void write(byte[] b, int offset, int length) throws IOException { |
| 213 | + dataWritten.set(true); |
| 214 | + out.write(b, offset, length); |
| 215 | + } |
207 | 216 |
|
208 | 217 | @Override
|
209 | 218 | public void write(int b) throws IOException {
|
210 | 219 | dataWritten.set(true);
|
211 |
| - super.write(b); |
| 220 | + out.write(b); |
212 | 221 | }
|
213 | 222 |
|
214 | 223 | @Override
|
215 |
| - public void close() { |
216 |
| - // Ensure that the OutputStream can't be closed somewhere in the Netty |
217 |
| - // pipeline, so that we can support retries. The OutputStream is closed in |
218 |
| - // the finally block below. |
| 224 | + public void flush() throws IOException { |
| 225 | + out.flush(); |
219 | 226 | }
|
220 | 227 | };
|
221 | 228 | DownloadCommand download = new DownloadCommand(uri, casDownload, key, wrappedOut);
|
|
0 commit comments