Skip to content
This repository was archived by the owner on Oct 25, 2024. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions tape/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,8 @@ checkstyle {
toolVersion = '7.7'
}

test {
maxHeapSize = '4G' // maximum heap size
}

apply from: rootProject.file('gradle/gradle-mvn-push.gradle')
12 changes: 10 additions & 2 deletions tape/src/main/java/com/squareup/tape2/QueueFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -454,13 +454,21 @@ private void expandIfNecessary(long dataLength) throws IOException {
// Calculate the position of the tail end of the data in the ring buffer
long endOfLastElement = wrapPosition(last.position + Element.HEADER_LENGTH + last.length);
long count = 0;
long pos = headerLength;
// If the buffer is split, we need to make it contiguous
if (endOfLastElement <= first.position) {
FileChannel channel = raf.getChannel();
channel.position(fileLength); // destination position
count = endOfLastElement - headerLength;
if (channel.transferTo(headerLength, count, channel) != count) {
throw new AssertionError("Copied insufficient number of bytes!");
// Transfer data in batches because of the write limitation of 2GB in FileChannelImpl.
long remainingToTransfer = count;
while (remainingToTransfer > 0) {
long sizeToTransfer = min(remainingToTransfer, Integer.MAX_VALUE);
if (channel.transferTo(pos, sizeToTransfer, channel) != sizeToTransfer) {
throw new AssertionError("Copied insufficient number of bytes!");
}
remainingToTransfer -= sizeToTransfer;
pos += sizeToTransfer;
}
}

Expand Down
45 changes: 45 additions & 0 deletions tape/src/test/java/com/squareup/tape2/QueueFileTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -751,6 +751,51 @@ private QueueFile newQueueFile(boolean zero) throws IOException {
queue.close();
}

/**
* Exercise a bug where an expanding queue file needs to transfer large data (more than 2GB).
*/
@Test public void testFileExpansionMoveLargeElements() throws IOException {
QueueFile queue = newQueueFile();

// Create test data - (1GB - 4 byte - 4 byte) block
// We can mock the large elements transfer as follows:
// 1. add four blocks to saturate the queue file;
// 2. remove first three blocks;
// 3. add three blocks to make the last position smaller than first position;
// 4. add one new block which trigger the expansion of queue file
// and large data transfer (first three blocks approximately 3GB).
int blockSize = 1024 * 1024 * 1024 - headerLength - Element.HEADER_LENGTH;
byte[] values = new byte[blockSize];
for (int i = 0; i < blockSize; i++) {
values[i] = (byte) (i + 1);
}

// Saturate the queue file
queue.add(values);
queue.add(values);
queue.add(values);
queue.add(values);

// Remove first three blocks and add three blocks
queue.remove();
queue.remove();
queue.remove();
queue.add(values);
queue.add(values);
queue.add(values);

// Cause the queue file to expand and a large data transfer
queue.add(values);

// Make sure values are not corrupted
for (int i = 0; i < 5; i++) {
assertThat(queue.peek()).isEqualTo(values);
queue.remove();
}

queue.close();
}

/**
* Exercise a bug where opening a queue whose first or last element's header
* was non contiguous throws an {@link java.io.EOFException}.
Expand Down