Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions src/main/java/org/xerial/snappy/SnappyOutputStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,12 @@ public class SnappyOutputStream extends OutputStream {
private final BufferAllocator inputBufferAllocator;
private final BufferAllocator outputBufferAllocator;

protected final byte[] inputBuffer;
protected final byte[] outputBuffer;
// The input and output buffer fields are set to null when closing this stream:
protected byte[] inputBuffer;
protected byte[] outputBuffer;
private int inputCursor = 0;
private int outputCursor = 0;
private boolean closed;

public SnappyOutputStream(OutputStream out) {
this(out, DEFAULT_BLOCK_SIZE);
Expand Down Expand Up @@ -231,6 +233,9 @@ private boolean hasSufficientOutputBufferFor(int inputSize) {
* @throws IOException
*/
public void rawWrite(Object array, int byteOffset, int byteLength) throws IOException {
if (closed) {
throw new IOException("Stream is closed");
}
int cursor = 0;
while(cursor < byteLength) {
int readLen = Math.min(byteLength - cursor, blockSize - inputCursor);
Expand Down Expand Up @@ -258,6 +263,9 @@ public void rawWrite(Object array, int byteOffset, int byteLength) throws IOExce
*/
@Override
public void write(int b) throws IOException {
if (closed) {
throw new IOException("Stream is closed");
}
if(inputCursor >= inputBuffer.length) {
compressInput();
}
Expand All @@ -269,6 +277,9 @@ public void write(int b) throws IOException {
*/
@Override
public void flush() throws IOException {
if (closed) {
throw new IOException("Stream is closed");
}
compressInput();
dumpOutput();
out.flush();
Expand Down Expand Up @@ -320,12 +331,18 @@ protected void compressInput() throws IOException {
*/
@Override
public void close() throws IOException {
if (closed) {
return;
}
try {
flush();
out.close();
} finally {
closed = true;
inputBufferAllocator.release(inputBuffer);
outputBufferAllocator.release(outputBuffer);
inputBuffer = null;
outputBuffer = null;
}
}

Expand Down
82 changes: 82 additions & 0 deletions src/test/java/org/xerial/snappy/SnappyOutputStreamTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,13 @@
import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.lang.ref.WeakReference;

import org.junit.Test;
import org.xerial.snappy.buffer.BufferAllocatorFactory;
import org.xerial.snappy.buffer.CachedBufferAllocator;
import org.xerial.snappy.buffer.DefaultBufferAllocator;
import org.xerial.util.FileResource;
import org.xerial.util.log.Logger;

Expand Down Expand Up @@ -157,6 +162,83 @@ public void batchingOfWritesShouldNotAffectCompressedDataSize() throws Exception
}
}

@Test
public void closeShouldBeIdempotent() throws Exception {
// Regression test for issue #107, a bug where close() was non-idempotent and would release
// its buffers to the allocator multiple times, which could cause scenarios where two open
// SnappyOutputStreams could share the same buffers, leading to stream corruption issues.
final BufferAllocatorFactory bufferAllocatorFactory = CachedBufferAllocator.factory;
final int BLOCK_SIZE = 4096;
// Create a stream, use it, then close it once:
ByteArrayOutputStream ba1 = new ByteArrayOutputStream();
SnappyOutputStream os1 = new SnappyOutputStream(ba1, BLOCK_SIZE, bufferAllocatorFactory);
os1.write(42);
os1.close();
// Create a new output stream, which should end up re-using the first stream's freed buffers
ByteArrayOutputStream ba2 = new ByteArrayOutputStream();
SnappyOutputStream os2 = new SnappyOutputStream(ba2, BLOCK_SIZE, bufferAllocatorFactory);
// Close the first stream a second time, which is supposed to be safe due to idempotency:
os1.close();
// Allocate a third output stream, which is supposed to get its own fresh set of buffers:
ByteArrayOutputStream ba3 = new ByteArrayOutputStream();
SnappyOutputStream os3 = new SnappyOutputStream(ba3, BLOCK_SIZE, bufferAllocatorFactory);
// Since the second and third streams should have distinct sets of buffers, writes to these
// streams should not interfere with one another:
os2.write(2);
os3.write(3);
os2.close();
os3.close();
SnappyInputStream in2 = new SnappyInputStream(new ByteArrayInputStream(ba2.toByteArray()));
assertEquals(2, in2.read());
in2.close();
SnappyInputStream in3 = new SnappyInputStream(new ByteArrayInputStream(ba3.toByteArray()));
assertEquals(3, in3.read());
in3.close();
}

@Test
public void writingToClosedStreamShouldThrowIOException() throws IOException {
ByteArrayOutputStream b = new ByteArrayOutputStream();
SnappyOutputStream os = new SnappyOutputStream(b);
os.close();
try {
os.write(4);
fail("Expected write() to throw IOException");
} catch (IOException e) {
// Expected exception
}
try {
os.write(new int[] { 1, 2, 3, 4});
fail("Expected write() to throw IOException");
} catch (IOException e) {
// Expected exception
}
}

@Test
public void flushingClosedStreamShouldThrowIOException() throws IOException {
ByteArrayOutputStream b = new ByteArrayOutputStream();
SnappyOutputStream os = new SnappyOutputStream(b);
os.close();
try {
os.flush();
} catch (IOException e) {
// Expected exception
}
}

@Test
public void closingStreamShouldMakeBuffersEligibleForGarbageCollection() throws IOException {
ByteArrayOutputStream b = new ByteArrayOutputStream();
SnappyOutputStream os = new SnappyOutputStream(b, 4095, DefaultBufferAllocator.factory);
WeakReference<byte[]> inputBuffer = new WeakReference<byte[]>(os.inputBuffer);
WeakReference<byte[]> outputBuffer = new WeakReference<byte[]>(os.inputBuffer);
os.close();
System.gc();
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this necessary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The goal here was to test that the now weakly-referencable buffers get reclaimed by GC.

assertNull(inputBuffer.get());
assertNull(outputBuffer.get());
}

@Test
public void longArrayCompress() throws Exception {
long[] l = new long[10];
Expand Down