diff --git a/server/src/main/java/org/elasticsearch/common/compress/CompressedXContent.java b/server/src/main/java/org/elasticsearch/common/compress/CompressedXContent.java index e3c673a150d3f..6606e148dd92d 100644 --- a/server/src/main/java/org/elasticsearch/common/compress/CompressedXContent.java +++ b/server/src/main/java/org/elasticsearch/common/compress/CompressedXContent.java @@ -8,12 +8,14 @@ package org.elasticsearch.common.compress; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.core.Releasable; import org.elasticsearch.xcontent.ToXContent; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.XContentFactory; @@ -21,10 +23,13 @@ import java.io.IOException; import java.io.OutputStream; +import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.zip.CRC32; import java.util.zip.CheckedOutputStream; +import java.util.zip.DataFormatException; +import java.util.zip.Inflater; /** * Similar class to the {@link String} class except that it internally stores @@ -34,6 +39,9 @@ */ public final class CompressedXContent { + private static final ThreadLocal inflater1 = ThreadLocal.withInitial(InflaterAndBuffer::new); + private static final ThreadLocal inflater2 = ThreadLocal.withInitial(InflaterAndBuffer::new); + private static int crc32(BytesReference data) { CRC32 crc32 = new CRC32(); try { @@ -45,6 +53,25 @@ private static int crc32(BytesReference data) { return (int) crc32.getValue(); } + private static int crc32FromCompressed(byte[] compressed) { + CRC32 crc32 = new CRC32(); + try (InflaterAndBuffer inflaterAndBuffer = inflater1.get()) { + final Inflater inflater = inflaterAndBuffer.inflater; + final ByteBuffer buffer = inflaterAndBuffer.buffer; + assert assertBufferIsCleared(buffer); + setInflaterInput(compressed, inflater); + do { + if (inflater.inflate(buffer) > 0) { + crc32.update(buffer.flip()); + } + buffer.clear(); + } while (inflater.finished() == false); + return (int) crc32.getValue(); + } catch (DataFormatException e) { + throw new ElasticsearchException(e); + } + } + private final byte[] bytes; private final int crc32; @@ -60,9 +87,8 @@ private CompressedXContent(byte[] compressed, int crc32) { */ public CompressedXContent(ToXContent xcontent, XContentType type, ToXContent.Params params) throws IOException { BytesStreamOutput bStream = new BytesStreamOutput(); - OutputStream compressedStream = CompressorFactory.COMPRESSOR.threadLocalOutputStream(bStream); CRC32 crc32 = new CRC32(); - OutputStream checkedStream = new CheckedOutputStream(compressedStream, crc32); + OutputStream checkedStream = new CheckedOutputStream(CompressorFactory.COMPRESSOR.threadLocalOutputStream(bStream), crc32); try (XContentBuilder builder = XContentFactory.contentBuilder(type, checkedStream)) { if (xcontent.isFragment()) { builder.startObject(); @@ -86,7 +112,7 @@ public CompressedXContent(BytesReference data) throws IOException { if (compressor != null) { // already compressed... this.bytes = BytesReference.toBytes(data); - this.crc32 = crc32(uncompressed()); + this.crc32 = crc32FromCompressed(this.bytes); } else { this.bytes = BytesReference.toBytes(CompressorFactory.COMPRESSOR.compress(data)); this.crc32 = crc32(data); @@ -97,6 +123,7 @@ public CompressedXContent(BytesReference data) throws IOException { private void assertConsistent() { assert CompressorFactory.compressor(new BytesArray(bytes)) != null; assert this.crc32 == crc32(uncompressed()); + assert this.crc32 == crc32FromCompressed(bytes); } public CompressedXContent(byte[] data) throws IOException { @@ -147,15 +174,47 @@ public boolean equals(Object o) { CompressedXContent that = (CompressedXContent) o; - if (Arrays.equals(compressed(), that.compressed())) { - return true; - } - if (crc32 != that.crc32) { return false; } - return uncompressed().equals(that.uncompressed()); + if (Arrays.equals(bytes, that.bytes)) { + return true; + } + // compression is not entirely deterministic in all cases depending on hwo the compressed bytes were assembled, check uncompressed + // equality + return equalsWhenUncompressed(bytes, that.bytes); + } + + // package private for testing + static boolean equalsWhenUncompressed(byte[] compressed1, byte[] compressed2) { + try (InflaterAndBuffer inflaterAndBuffer1 = inflater1.get(); + InflaterAndBuffer inflaterAndBuffer2 = inflater2.get()) { + final Inflater inf1 = inflaterAndBuffer1.inflater; + final Inflater inf2 = inflaterAndBuffer2.inflater; + setInflaterInput(compressed1, inf1); + setInflaterInput(compressed2, inf2); + final ByteBuffer buf1 = inflaterAndBuffer1.buffer; + assert assertBufferIsCleared(buf1); + final ByteBuffer buf2 = inflaterAndBuffer2.buffer; + assert assertBufferIsCleared(buf2); + while (true) { + while (inf1.inflate(buf1) > 0 && buf1.hasRemaining()) ; + while (inf2.inflate(buf2) > 0 && buf2.hasRemaining()) ; + if (buf1.flip().equals(buf2.flip()) == false) { + return false; + } + if (inf1.finished()) { + // if the first inflater is done but the second one still has data we fail here, if it's the other way around we fail + // on the next round because we will only read bytes into 2 + return inf2.finished(); + } + buf1.clear(); + buf2.clear(); + } + } catch (DataFormatException e) { + throw new ElasticsearchException(e); + } } @Override @@ -167,4 +226,32 @@ public int hashCode() { public String toString() { return string(); } + + /** + * Set the given bytes as inflater input, accounting for the fact that they start with our header of size + * {@link DeflateCompressor#HEADER_SIZE}. + */ + private static void setInflaterInput(byte[] compressed, Inflater inflater) { + inflater.setInput(compressed, DeflateCompressor.HEADER_SIZE, compressed.length - DeflateCompressor.HEADER_SIZE); + } + + private static boolean assertBufferIsCleared(ByteBuffer buffer) { + assert buffer.limit() == buffer.capacity() + : "buffer limit != capacity, was [" + buffer.limit() + "] and [" + buffer.capacity() + "]"; + assert buffer.position() == 0 : "buffer position != 0, was [" + buffer.position() + "]"; + return true; + } + + private static final class InflaterAndBuffer implements Releasable { + + final ByteBuffer buffer = ByteBuffer.allocate(DeflateCompressor.BUFFER_SIZE); + + final Inflater inflater = new Inflater(true); + + @Override + public void close() { + inflater.reset(); + buffer.clear(); + } + } } diff --git a/server/src/main/java/org/elasticsearch/common/compress/DeflateCompressor.java b/server/src/main/java/org/elasticsearch/common/compress/DeflateCompressor.java index da94385118155..4553b953bddda 100644 --- a/server/src/main/java/org/elasticsearch/common/compress/DeflateCompressor.java +++ b/server/src/main/java/org/elasticsearch/common/compress/DeflateCompressor.java @@ -35,11 +35,14 @@ public class DeflateCompressor implements Compressor { // enough so that no stream starting with these bytes could be detected as // a XContent private static final byte[] HEADER = new byte[]{'D', 'F', 'L', '\0'}; + + public static final int HEADER_SIZE = HEADER.length; + // 3 is a good trade-off between speed and compression ratio private static final int LEVEL = 3; // We use buffering on the input and output of in/def-laters in order to // limit the number of JNI calls - private static final int BUFFER_SIZE = 4096; + public static final int BUFFER_SIZE = 4096; @Override public boolean isCompressed(BytesReference bytes) { diff --git a/server/src/test/java/org/elasticsearch/common/compress/DeflateCompressedXContentTests.java b/server/src/test/java/org/elasticsearch/common/compress/DeflateCompressedXContentTests.java index b37dcd7e71970..958dc563f68dc 100644 --- a/server/src/test/java/org/elasticsearch/common/compress/DeflateCompressedXContentTests.java +++ b/server/src/test/java/org/elasticsearch/common/compress/DeflateCompressedXContentTests.java @@ -11,15 +11,18 @@ import org.apache.lucene.util.TestUtil; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xcontent.ToXContent; import org.elasticsearch.xcontent.ToXContentFragment; import org.elasticsearch.xcontent.ToXContentObject; +import org.elasticsearch.xcontent.XContentFactory; import org.elasticsearch.xcontent.XContentType; -import org.elasticsearch.test.ESTestCase; import org.junit.Assert; import java.io.IOException; import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; import java.util.Random; import static org.hamcrest.Matchers.equalTo; @@ -106,4 +109,38 @@ public void testToXContentFragment() throws IOException { CompressedXContent compressedXContent = new CompressedXContent(toXContentFragment, XContentType.JSON, ToXContent.EMPTY_PARAMS); assertEquals("{\"field\":\"value\"}", compressedXContent.string()); } + + public void testEquals() throws IOException { + final String[] randomJSON = + generateRandomStringArray(1000, randomIntBetween(1, 512), false, true); + assertNotNull(randomJSON); + final BytesReference jsonDirect = BytesReference.bytes( + XContentFactory.jsonBuilder().startObject().stringListField("arr", Arrays.asList(randomJSON)).endObject()); + final CompressedXContent one = new CompressedXContent(jsonDirect); + final CompressedXContent sameAsOne = new CompressedXContent((builder, params) -> + builder.stringListField("arr", Arrays.asList(randomJSON)), XContentType.JSON, ToXContent.EMPTY_PARAMS); + assertFalse(Arrays.equals(one.compressed(), sameAsOne.compressed())); + assertEquals(one, sameAsOne); + } + + public void testEqualsWhenUncompressed() throws IOException { + final String[] randomJSON1 = + generateRandomStringArray(randomIntBetween(1, 1000), randomIntBetween(1, 512), false, false); + final String[] randomJSON2 = randomValueOtherThanMany( + arr -> Arrays.equals(arr, randomJSON1), + () -> generateRandomStringArray(randomIntBetween(1, 1000), randomIntBetween(1, 512), false, true) + ); + final CompressedXContent one = new CompressedXContent((builder, params) -> + builder.stringListField("arr", Arrays.asList(randomJSON1)), XContentType.JSON, ToXContent.EMPTY_PARAMS); + final CompressedXContent two = new CompressedXContent((builder, params) -> + builder.stringListField("arr", Arrays.asList(randomJSON2)), XContentType.JSON, ToXContent.EMPTY_PARAMS); + assertFalse(CompressedXContent.equalsWhenUncompressed(one.compressed(), two.compressed())); + } + + public void testEqualsCrcCollision() throws IOException { + final CompressedXContent content1 = new CompressedXContent("{\"d\":\"68&A<\"}".getBytes(StandardCharsets.UTF_8)); + final CompressedXContent content2 = new CompressedXContent("{\"d\":\"gZG- \"}".getBytes(StandardCharsets.UTF_8)); + assertEquals(content1.hashCode(), content2.hashCode()); // the inputs are a known CRC32 collision + assertNotEquals(content1, content2); + } }