88
99package org .elasticsearch .common .compress ;
1010
11+ import org .elasticsearch .ElasticsearchException ;
1112import org .elasticsearch .common .bytes .BytesArray ;
1213import org .elasticsearch .common .bytes .BytesReference ;
1314import org .elasticsearch .common .io .Streams ;
1415import org .elasticsearch .common .io .stream .BytesStreamOutput ;
1516import org .elasticsearch .common .io .stream .StreamInput ;
1617import org .elasticsearch .common .io .stream .StreamOutput ;
18+ import org .elasticsearch .core .Releasable ;
1719import org .elasticsearch .xcontent .ToXContent ;
1820import org .elasticsearch .xcontent .XContentBuilder ;
1921import org .elasticsearch .xcontent .XContentFactory ;
2022import org .elasticsearch .xcontent .XContentType ;
2123
2224import java .io .IOException ;
2325import java .io .OutputStream ;
26+ import java .nio .ByteBuffer ;
2427import java .nio .charset .StandardCharsets ;
2528import java .util .Arrays ;
2629import java .util .zip .CRC32 ;
2730import java .util .zip .CheckedOutputStream ;
31+ import java .util .zip .DataFormatException ;
32+ import java .util .zip .Inflater ;
2833
2934/**
3035 * Similar class to the {@link String} class except that it internally stores
3439 */
3540public final class CompressedXContent {
3641
42+ private static final ThreadLocal <InflaterAndBuffer > inflater1 = ThreadLocal .withInitial (InflaterAndBuffer ::new );
43+ private static final ThreadLocal <InflaterAndBuffer > inflater2 = ThreadLocal .withInitial (InflaterAndBuffer ::new );
44+
3745 private static int crc32 (BytesReference data ) {
3846 CRC32 crc32 = new CRC32 ();
3947 try {
@@ -45,6 +53,25 @@ private static int crc32(BytesReference data) {
4553 return (int ) crc32 .getValue ();
4654 }
4755
56+ private static int crc32FromCompressed (byte [] compressed ) {
57+ CRC32 crc32 = new CRC32 ();
58+ try (InflaterAndBuffer inflaterAndBuffer = inflater1 .get ()) {
59+ final Inflater inflater = inflaterAndBuffer .inflater ;
60+ final ByteBuffer buffer = inflaterAndBuffer .buffer ;
61+ assert assertBufferIsCleared (buffer );
62+ setInflaterInput (compressed , inflater );
63+ do {
64+ if (inflate (inflater , buffer ) > 0 ) {
65+ crc32 .update ((ByteBuffer ) buffer .flip ());
66+ }
67+ buffer .clear ();
68+ } while (inflater .finished () == false );
69+ return (int ) crc32 .getValue ();
70+ } catch (DataFormatException e ) {
71+ throw new ElasticsearchException (e );
72+ }
73+ }
74+
4875 private final byte [] bytes ;
4976 private final int crc32 ;
5077
@@ -60,9 +87,8 @@ private CompressedXContent(byte[] compressed, int crc32) {
6087 */
6188 public CompressedXContent (ToXContent xcontent , XContentType type , ToXContent .Params params ) throws IOException {
6289 BytesStreamOutput bStream = new BytesStreamOutput ();
63- OutputStream compressedStream = CompressorFactory .COMPRESSOR .threadLocalOutputStream (bStream );
6490 CRC32 crc32 = new CRC32 ();
65- OutputStream checkedStream = new CheckedOutputStream (compressedStream , crc32 );
91+ OutputStream checkedStream = new CheckedOutputStream (CompressorFactory . COMPRESSOR . threadLocalOutputStream ( bStream ) , crc32 );
6692 try (XContentBuilder builder = XContentFactory .contentBuilder (type , checkedStream )) {
6793 if (xcontent .isFragment ()) {
6894 builder .startObject ();
@@ -86,7 +112,7 @@ public CompressedXContent(BytesReference data) throws IOException {
86112 if (compressor != null ) {
87113 // already compressed...
88114 this .bytes = BytesReference .toBytes (data );
89- this .crc32 = crc32 ( uncompressed () );
115+ this .crc32 = crc32FromCompressed ( this . bytes );
90116 } else {
91117 this .bytes = BytesReference .toBytes (CompressorFactory .COMPRESSOR .compress (data ));
92118 this .crc32 = crc32 (data );
@@ -97,6 +123,7 @@ public CompressedXContent(BytesReference data) throws IOException {
97123 private void assertConsistent () {
98124 assert CompressorFactory .compressor (new BytesArray (bytes )) != null ;
99125 assert this .crc32 == crc32 (uncompressed ());
126+ assert this .crc32 == crc32FromCompressed (bytes );
100127 }
101128
102129 public CompressedXContent (byte [] data ) throws IOException {
@@ -147,15 +174,47 @@ public boolean equals(Object o) {
147174
148175 CompressedXContent that = (CompressedXContent ) o ;
149176
150- if (Arrays .equals (compressed (), that .compressed ())) {
151- return true ;
152- }
153-
154177 if (crc32 != that .crc32 ) {
155178 return false ;
156179 }
157180
158- return uncompressed ().equals (that .uncompressed ());
181+ if (Arrays .equals (bytes , that .bytes )) {
182+ return true ;
183+ }
184+ // compression is not entirely deterministic in all cases depending on hwo the compressed bytes were assembled, check uncompressed
185+ // equality
186+ return equalsWhenUncompressed (bytes , that .bytes );
187+ }
188+
189+ // package private for testing
190+ static boolean equalsWhenUncompressed (byte [] compressed1 , byte [] compressed2 ) {
191+ try (InflaterAndBuffer inflaterAndBuffer1 = inflater1 .get ();
192+ InflaterAndBuffer inflaterAndBuffer2 = inflater2 .get ()) {
193+ final Inflater inf1 = inflaterAndBuffer1 .inflater ;
194+ final Inflater inf2 = inflaterAndBuffer2 .inflater ;
195+ setInflaterInput (compressed1 , inf1 );
196+ setInflaterInput (compressed2 , inf2 );
197+ final ByteBuffer buf1 = inflaterAndBuffer1 .buffer ;
198+ assert assertBufferIsCleared (buf1 );
199+ final ByteBuffer buf2 = inflaterAndBuffer2 .buffer ;
200+ assert assertBufferIsCleared (buf2 );
201+ while (true ) {
202+ while (inflate (inf1 , buf1 ) > 0 && buf1 .hasRemaining ()) ;
203+ while (inflate (inf2 , buf2 ) > 0 && buf2 .hasRemaining ()) ;
204+ if (buf1 .flip ().equals (buf2 .flip ()) == false ) {
205+ return false ;
206+ }
207+ if (inf1 .finished ()) {
208+ // if the first inflater is done but the second one still has data we fail here, if it's the other way around we fail
209+ // on the next round because we will only read bytes into 2
210+ return inf2 .finished ();
211+ }
212+ buf1 .clear ();
213+ buf2 .clear ();
214+ }
215+ } catch (DataFormatException e ) {
216+ throw new ElasticsearchException (e );
217+ }
159218 }
160219
161220 @ Override
@@ -167,4 +226,39 @@ public int hashCode() {
167226 public String toString () {
168227 return string ();
169228 }
229+
230+ /**
231+ * Set the given bytes as inflater input, accounting for the fact that they start with our header of size
232+ * {@link DeflateCompressor#HEADER_SIZE}.
233+ */
234+ private static void setInflaterInput (byte [] compressed , Inflater inflater ) {
235+ inflater .setInput (compressed , DeflateCompressor .HEADER_SIZE , compressed .length - DeflateCompressor .HEADER_SIZE );
236+ }
237+
238+ private static boolean assertBufferIsCleared (ByteBuffer buffer ) {
239+ assert buffer .limit () == buffer .capacity ()
240+ : "buffer limit != capacity, was [" + buffer .limit () + "] and [" + buffer .capacity () + "]" ;
241+ assert buffer .position () == 0 : "buffer position != 0, was [" + buffer .position () + "]" ;
242+ return true ;
243+ }
244+
245+ private static int inflate (Inflater inflater , ByteBuffer buffer ) throws DataFormatException {
246+ final int bufferPos = buffer .position ();
247+ final int inflated = inflater .inflate (buffer .array (), buffer .arrayOffset () + bufferPos , buffer .remaining ());
248+ buffer .position (bufferPos + inflated );
249+ return inflated ;
250+ }
251+
252+ private static final class InflaterAndBuffer implements Releasable {
253+
254+ final ByteBuffer buffer = ByteBuffer .allocate (DeflateCompressor .BUFFER_SIZE );
255+
256+ final Inflater inflater = new Inflater (true );
257+
258+ @ Override
259+ public void close () {
260+ inflater .reset ();
261+ buffer .clear ();
262+ }
263+ }
170264}
0 commit comments