Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,28 @@

package org.elasticsearch.common.compress;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentFactory;
import org.elasticsearch.xcontent.XContentType;

import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.zip.CRC32;
import java.util.zip.CheckedOutputStream;
import java.util.zip.DataFormatException;
import java.util.zip.Inflater;

/**
* Similar class to the {@link String} class except that it internally stores
Expand All @@ -34,6 +39,9 @@
*/
public final class CompressedXContent {

private static final ThreadLocal<InflaterAndBuffer> inflater1 = ThreadLocal.withInitial(InflaterAndBuffer::new);
private static final ThreadLocal<InflaterAndBuffer> inflater2 = ThreadLocal.withInitial(InflaterAndBuffer::new);

private static int crc32(BytesReference data) {
CRC32 crc32 = new CRC32();
try {
Expand All @@ -45,6 +53,25 @@ private static int crc32(BytesReference data) {
return (int) crc32.getValue();
}

private static int crc32FromCompressed(byte[] compressed) {
CRC32 crc32 = new CRC32();
try (InflaterAndBuffer inflaterAndBuffer = inflater1.get()) {
final Inflater inflater = inflaterAndBuffer.inflater;
final ByteBuffer buffer = inflaterAndBuffer.buffer;
assert assertBufferIsCleared(buffer);
setInflaterInput(compressed, inflater);
do {
if (inflater.inflate(buffer) > 0) {
crc32.update(buffer.flip());
}
buffer.clear();
} while (inflater.finished() == false);
return (int) crc32.getValue();
} catch (DataFormatException e) {
throw new ElasticsearchException(e);
}
}

private final byte[] bytes;
private final int crc32;

Expand All @@ -60,9 +87,8 @@ private CompressedXContent(byte[] compressed, int crc32) {
*/
public CompressedXContent(ToXContent xcontent, XContentType type, ToXContent.Params params) throws IOException {
BytesStreamOutput bStream = new BytesStreamOutput();
OutputStream compressedStream = CompressorFactory.COMPRESSOR.threadLocalOutputStream(bStream);
CRC32 crc32 = new CRC32();
OutputStream checkedStream = new CheckedOutputStream(compressedStream, crc32);
OutputStream checkedStream = new CheckedOutputStream(CompressorFactory.COMPRESSOR.threadLocalOutputStream(bStream), crc32);
try (XContentBuilder builder = XContentFactory.contentBuilder(type, checkedStream)) {
if (xcontent.isFragment()) {
builder.startObject();
Expand All @@ -86,7 +112,7 @@ public CompressedXContent(BytesReference data) throws IOException {
if (compressor != null) {
// already compressed...
this.bytes = BytesReference.toBytes(data);
this.crc32 = crc32(uncompressed());
this.crc32 = crc32FromCompressed(this.bytes);
} else {
this.bytes = BytesReference.toBytes(CompressorFactory.COMPRESSOR.compress(data));
this.crc32 = crc32(data);
Expand All @@ -97,6 +123,7 @@ public CompressedXContent(BytesReference data) throws IOException {
private void assertConsistent() {
assert CompressorFactory.compressor(new BytesArray(bytes)) != null;
assert this.crc32 == crc32(uncompressed());
assert this.crc32 == crc32FromCompressed(bytes);
}

public CompressedXContent(byte[] data) throws IOException {
Expand Down Expand Up @@ -147,15 +174,47 @@ public boolean equals(Object o) {

CompressedXContent that = (CompressedXContent) o;

if (Arrays.equals(compressed(), that.compressed())) {
return true;
}

if (crc32 != that.crc32) {
return false;
}

return uncompressed().equals(that.uncompressed());
if (Arrays.equals(bytes, that.bytes)) {
return true;
}
// compression is not entirely deterministic in all cases depending on hwo the compressed bytes were assembled, check uncompressed
// equality
return equalsWhenUncompressed(bytes, that.bytes);
}

// package private for testing
static boolean equalsWhenUncompressed(byte[] compressed1, byte[] compressed2) {
try (InflaterAndBuffer inflaterAndBuffer1 = inflater1.get();
InflaterAndBuffer inflaterAndBuffer2 = inflater2.get()) {
final Inflater inf1 = inflaterAndBuffer1.inflater;
final Inflater inf2 = inflaterAndBuffer2.inflater;
setInflaterInput(compressed1, inf1);
setInflaterInput(compressed2, inf2);
final ByteBuffer buf1 = inflaterAndBuffer1.buffer;
assert assertBufferIsCleared(buf1);
final ByteBuffer buf2 = inflaterAndBuffer2.buffer;
assert assertBufferIsCleared(buf2);
while (true) {
while (inf1.inflate(buf1) > 0 && buf1.hasRemaining()) ;
while (inf2.inflate(buf2) > 0 && buf2.hasRemaining()) ;
if (buf1.flip().equals(buf2.flip()) == false) {
return false;
}
if (inf1.finished()) {
// if the first inflater is done but the second one still has data we fail here, if it's the other way around we fail
// on the next round because we will only read bytes into 2
return inf2.finished();
}
buf1.clear();
buf2.clear();
}
} catch (DataFormatException e) {
throw new ElasticsearchException(e);
}
}

@Override
Expand All @@ -167,4 +226,32 @@ public int hashCode() {
public String toString() {
return string();
}

/**
* Set the given bytes as inflater input, accounting for the fact that they start with our header of size
* {@link DeflateCompressor#HEADER_SIZE}.
*/
private static void setInflaterInput(byte[] compressed, Inflater inflater) {
inflater.setInput(compressed, DeflateCompressor.HEADER_SIZE, compressed.length - DeflateCompressor.HEADER_SIZE);
}

private static boolean assertBufferIsCleared(ByteBuffer buffer) {
assert buffer.limit() == buffer.capacity()
: "buffer limit != capacity, was [" + buffer.limit() + "] and [" + buffer.capacity() + "]";
assert buffer.position() == 0 : "buffer position != 0, was [" + buffer.position() + "]";
return true;
}

private static final class InflaterAndBuffer implements Releasable {

private final ByteBuffer buffer = ByteBuffer.allocate(DeflateCompressor.BUFFER_SIZE);

private final Inflater inflater = new Inflater(true);

@Override
public void close() {
inflater.reset();
buffer.clear();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,14 @@ public class DeflateCompressor implements Compressor {
// enough so that no stream starting with these bytes could be detected as
// a XContent
private static final byte[] HEADER = new byte[]{'D', 'F', 'L', '\0'};

public static final int HEADER_SIZE = HEADER.length;

// 3 is a good trade-off between speed and compression ratio
private static final int LEVEL = 3;
// We use buffering on the input and output of in/def-laters in order to
// limit the number of JNI calls
private static final int BUFFER_SIZE = 4096;
public static final int BUFFER_SIZE = 4096;

@Override
public boolean isCompressed(BytesReference bytes) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,17 @@
import org.apache.lucene.util.TestUtil;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.ToXContentFragment;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentFactory;
import org.elasticsearch.xcontent.XContentType;
import org.elasticsearch.test.ESTestCase;
import org.junit.Assert;

import java.io.IOException;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.Random;

import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -106,4 +108,30 @@ public void testToXContentFragment() throws IOException {
CompressedXContent compressedXContent = new CompressedXContent(toXContentFragment, XContentType.JSON, ToXContent.EMPTY_PARAMS);
assertEquals("{\"field\":\"value\"}", compressedXContent.string());
}

public void testEquals() throws IOException {
final String[] randomJSON =
generateRandomStringArray(1000, randomIntBetween(1, 512), false, true);
assertNotNull(randomJSON);
final BytesReference jsonDirect = BytesReference.bytes(
XContentFactory.jsonBuilder().startObject().stringListField("arr", Arrays.asList(randomJSON)).endObject());
final CompressedXContent one = new CompressedXContent(jsonDirect);
final CompressedXContent sameAsOne = new CompressedXContent((builder, params) ->
builder.stringListField("arr", Arrays.asList(randomJSON)), XContentType.JSON, ToXContent.EMPTY_PARAMS);
assertEquals(one, sameAsOne);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we assert that the compressed bytes are not equal in this case?

Also we are not exercising equalsWhenUncompressed very hard in any of these tests since usually the bytes will be the same or the CRC is not going to match. Could we either construct some CRC collisions or else just test the method directly, checking in particular that does sometimes return false?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see Alan already approved this. I think this is a blocker, the rest of my comments are less important.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a test for this directly now :) I didn't want to hard code a collision (or brute force one to begin with :D) and finding one at runtime takes too long as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd quite like to have an assertFalse(Arrays.equals(one.compressed(), sameAsOne.compressed())); here too, mostly to document that the compressed representations are different.

}

public void testEqualsWhenUncompressed() throws IOException {
final String[] randomJSON1 =
generateRandomStringArray(randomIntBetween(1, 1000), randomIntBetween(1, 512), false, false);
final String[] randomJSON2 = randomValueOtherThanMany(
arr -> Arrays.equals(arr, randomJSON1),
() -> generateRandomStringArray(randomIntBetween(1, 1000), randomIntBetween(1, 512), false, true)
);
final CompressedXContent one = new CompressedXContent((builder, params) ->
builder.stringListField("arr", Arrays.asList(randomJSON1)), XContentType.JSON, ToXContent.EMPTY_PARAMS);
final CompressedXContent two = new CompressedXContent((builder, params) ->
builder.stringListField("arr", Arrays.asList(randomJSON2)), XContentType.JSON, ToXContent.EMPTY_PARAMS);
assertFalse(CompressedXContent.equalsWhenUncompressed(one.compressed(), two.compressed()));
}
Copy link
Contributor

@DaveCTurner DaveCTurner Oct 21, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here you go (cheating slightly since the inputs aren't XContent, but I won't tell...) that irked me, I found a JSON collision instead.

Suggested change
}
}
public void testEqualsCrcCollision() throws IOException {
final CompressedXContent content1 = new CompressedXContent("{\"d\":\"68&A<\"}".getBytes(StandardCharsets.UTF_8));
final CompressedXContent content2 = new CompressedXContent("{\"d\":\"gZG- \"}".getBytes(StandardCharsets.UTF_8));
assertEquals(content1.hashCode(), content2.hashCode()); // the inputs are a known CRC32 collision
assertNotEquals(content1, content2);
}

}