diff --git a/client/trino-client/src/main/java/io/trino/client/JsonResultRows.java b/client/trino-client/src/main/java/io/trino/client/JsonResultRows.java index f2f5e965732e..ed6ceaf11112 100644 --- a/client/trino-client/src/main/java/io/trino/client/JsonResultRows.java +++ b/client/trino-client/src/main/java/io/trino/client/JsonResultRows.java @@ -27,6 +27,7 @@ import java.io.InputStream; import java.io.UncheckedIOException; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; import static com.fasterxml.jackson.core.JsonParser.Feature.AUTO_CLOSE_SOURCE; @@ -136,28 +137,16 @@ private void close() } } - public static ResultRows forJsonParser(JsonParser parser, List columns) + public static Iterator> forJsonParser(JsonParser parser, List columns) + throws IOException { - return () -> { - try { - return new RowWiseIterator(parser, createTypeDecoders(columns)); - } - catch (IOException e) { - throw new UncheckedIOException(e); - } - }; + return new RowWiseIterator(parser, createTypeDecoders(columns)); } - public static ResultRows forInputStream(InputStream stream, TypeDecoder[] decoders) + public static Iterator> forInputStream(InputStream stream, TypeDecoder[] decoders) + throws IOException { - return () -> { - try { - return new RowWiseIterator(stream, decoders); - } - catch (IOException e) { - throw new UncheckedIOException(e); - } - }; + return new RowWiseIterator(stream, decoders); } @SuppressModernizer // There is no JsonFactory in the client module diff --git a/client/trino-client/src/main/java/io/trino/client/OkHttpSegmentLoader.java b/client/trino-client/src/main/java/io/trino/client/OkHttpSegmentLoader.java index eb699d7fc562..f3f9e61b33c4 100644 --- a/client/trino-client/src/main/java/io/trino/client/OkHttpSegmentLoader.java +++ b/client/trino-client/src/main/java/io/trino/client/OkHttpSegmentLoader.java @@ -22,7 +22,6 @@ import okhttp3.Request; import okhttp3.Response; -import java.io.FilterInputStream; import java.io.IOException; import java.io.InputStream; import java.util.List; @@ -65,7 +64,7 @@ public InputStream load(SpooledSegment segment) } if (response.isSuccessful()) { - return delegatingInputStream(response, response.body().byteStream(), segment); + return response.body().byteStream(); } throw new IOException(format("Could not open segment for streaming, got error '%s' with code %d", response.message(), response.code())); } @@ -95,21 +94,6 @@ public void onResponse(Call call, Response response) }); } - private InputStream delegatingInputStream(Response response, InputStream delegate, SpooledSegment segment) - { - return new FilterInputStream(delegate) - { - @Override - public void close() - throws IOException - { - try (Response ignored = response; InputStream ignored2 = delegate) { - acknowledge(segment); - } - } - }; - } - private static Headers toHeaders(Map> headers) { Headers.Builder builder = new Headers.Builder(); diff --git a/client/trino-client/src/main/java/io/trino/client/QueryDataDecoder.java b/client/trino-client/src/main/java/io/trino/client/QueryDataDecoder.java index 631cf7eb3a5f..5dca2d7b2a61 100644 --- a/client/trino-client/src/main/java/io/trino/client/QueryDataDecoder.java +++ b/client/trino-client/src/main/java/io/trino/client/QueryDataDecoder.java @@ -17,6 +17,7 @@ import java.io.IOException; import java.io.InputStream; +import java.util.Iterator; import java.util.List; public interface QueryDataDecoder @@ -39,7 +40,7 @@ interface Factory * * @throws IOException if an I/O error occurs */ - ResultRows decode(InputStream input, DataAttributes segmentAttributes) + Iterator> decode(InputStream input, DataAttributes segmentAttributes) throws IOException; String encoding(); diff --git a/client/trino-client/src/main/java/io/trino/client/ResultRows.java b/client/trino-client/src/main/java/io/trino/client/ResultRows.java index 9a73644f72b8..dff8bba0d66c 100644 --- a/client/trino-client/src/main/java/io/trino/client/ResultRows.java +++ b/client/trino-client/src/main/java/io/trino/client/ResultRows.java @@ -40,7 +40,7 @@ public Iterator> iterator() } }; - static ResultRows fromIterableRows(Iterable> values) + static ResultRows wrapList(List> values) { return values::iterator; } diff --git a/client/trino-client/src/main/java/io/trino/client/ResultRowsDecoder.java b/client/trino-client/src/main/java/io/trino/client/ResultRowsDecoder.java index 64bf431fda9b..b75786102703 100644 --- a/client/trino-client/src/main/java/io/trino/client/ResultRowsDecoder.java +++ b/client/trino-client/src/main/java/io/trino/client/ResultRowsDecoder.java @@ -13,7 +13,6 @@ */ package io.trino.client; -import com.google.common.collect.Iterables; import io.trino.client.spooling.DataAttributes; import io.trino.client.spooling.EncodedQueryData; import io.trino.client.spooling.InlineSegment; @@ -21,21 +20,18 @@ import io.trino.client.spooling.SegmentLoader; import io.trino.client.spooling.SpooledSegment; import io.trino.client.spooling.encoding.QueryDataDecoders; -import org.gaul.modernizer_maven_annotations.SuppressModernizer; -import java.io.ByteArrayInputStream; import java.io.IOException; -import java.io.InputStream; import java.io.UncheckedIOException; +import java.util.Iterator; import java.util.List; import java.util.Optional; import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Verify.verify; -import static com.google.common.collect.Iterables.filter; -import static com.google.common.collect.Iterables.transform; +import static com.google.common.collect.Iterators.concat; +import static com.google.common.collect.Iterators.transform; import static io.trino.client.ResultRows.NULL_ROWS; -import static io.trino.client.ResultRows.fromIterableRows; import static java.util.Objects.requireNonNull; /** @@ -100,41 +96,33 @@ public ResultRows toRows(List columns, QueryData data) if (jsonData.isNull()) { return NULL_ROWS; } - return () -> JsonResultRows.forJsonParser(jsonData.getJsonParser(), columns).iterator(); + return () -> { + try { + return JsonResultRows.forJsonParser(jsonData.getJsonParser(), columns); + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + }; } if (data instanceof EncodedQueryData) { EncodedQueryData encodedData = (EncodedQueryData) data; setEncoding(columns, encodedData.getEncoding()); - return concat(transform(encodedData.getSegments(), this::segmentToRows)); + return () -> concat(transform(encodedData.getSegments().iterator(), this::segmentToRows)); } throw new UnsupportedOperationException("Unsupported data type: " + data.getClass().getName()); } - private ResultRows segmentToRows(Segment segment) + private Iterator> segmentToRows(Segment segment) { if (segment instanceof InlineSegment) { - InlineSegment inlineSegment = (InlineSegment) segment; - try { - return decoder.decode(new ByteArrayInputStream(inlineSegment.getData()), inlineSegment.getMetadata()); - } - catch (IOException e) { - throw new UncheckedIOException(e); - } + return ((InlineSegment) segment).toIterator(decoder); } if (segment instanceof SpooledSegment) { - SpooledSegment spooledSegment = (SpooledSegment) segment; - - try { - // The returned rows are lazy which means that decoder is responsible for closing input stream - InputStream stream = loader.load(spooledSegment); - return decoder.decode(stream, spooledSegment.getMetadata()); - } - catch (IOException e) { - throw new RuntimeException(e); - } + return ((SpooledSegment) segment).toIterator(loader, decoder); } throw new UnsupportedOperationException("Unsupported segment type: " + segment.getClass().getName()); @@ -152,10 +140,4 @@ public void close() { loader.close(); } - - @SuppressModernizer - private static ResultRows concat(Iterable resultRows) - { - return fromIterableRows(Iterables.concat(filter(resultRows, rows -> !rows.isNull()))); - } } diff --git a/client/trino-client/src/main/java/io/trino/client/spooling/InlineSegment.java b/client/trino-client/src/main/java/io/trino/client/spooling/InlineSegment.java index 6565aed21f38..5e13ffc7c164 100644 --- a/client/trino-client/src/main/java/io/trino/client/spooling/InlineSegment.java +++ b/client/trino-client/src/main/java/io/trino/client/spooling/InlineSegment.java @@ -15,7 +15,10 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import io.trino.client.QueryDataDecoder; +import java.util.Iterator; +import java.util.List; import java.util.Map; import static java.lang.String.format; @@ -48,4 +51,9 @@ public String toString() { return format("InlineSegment{offset=%d, rows=%d, size=%d}", getOffset(), getRowsCount(), getSegmentSize()); } + + public Iterator> toIterator(QueryDataDecoder decoder) + { + return new InlineSegmentIterator(this, decoder); + } } diff --git a/client/trino-client/src/main/java/io/trino/client/spooling/InlineSegmentIterator.java b/client/trino-client/src/main/java/io/trino/client/spooling/InlineSegmentIterator.java new file mode 100644 index 000000000000..857dbf4e5c6b --- /dev/null +++ b/client/trino-client/src/main/java/io/trino/client/spooling/InlineSegmentIterator.java @@ -0,0 +1,59 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.client.spooling; + +import com.google.common.collect.AbstractIterator; +import io.trino.client.QueryDataDecoder; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Iterator; +import java.util.List; + +import static java.util.Objects.requireNonNull; + +// Accessible through the InlineSegment.toIterator +class InlineSegmentIterator + extends AbstractIterator> +{ + private InlineSegment segment; + private final QueryDataDecoder decoder; + private Iterator> iterator; + + public InlineSegmentIterator(InlineSegment segment, QueryDataDecoder decoder) + { + this.segment = requireNonNull(segment, "segment is null"); + this.decoder = requireNonNull(decoder, "decoder is null"); + } + + @Override + protected List computeNext() + { + if (iterator == null) { + try { + iterator = decoder.decode(new ByteArrayInputStream(segment.getData()), segment.getMetadata()); + segment = null; + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + if (iterator.hasNext()) { + return iterator.next(); + } + return endOfData(); + } +} diff --git a/client/trino-client/src/main/java/io/trino/client/spooling/SpooledSegment.java b/client/trino-client/src/main/java/io/trino/client/spooling/SpooledSegment.java index 8cdd79300404..01b8a0d29899 100644 --- a/client/trino-client/src/main/java/io/trino/client/spooling/SpooledSegment.java +++ b/client/trino-client/src/main/java/io/trino/client/spooling/SpooledSegment.java @@ -17,8 +17,10 @@ import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.ImmutableMap; +import io.trino.client.QueryDataDecoder; import java.net.URI; +import java.util.Iterator; import java.util.List; import java.util.Map; @@ -75,4 +77,9 @@ public String toString() { return format("SpooledSegment{offset=%d, rows=%d, size=%d, headers=%s}", getOffset(), getRowsCount(), getSegmentSize(), headers.keySet()); } + + public Iterator> toIterator(SegmentLoader loader, QueryDataDecoder decoder) + { + return new SpooledSegmentIterator(this, loader, decoder); + } } diff --git a/client/trino-client/src/main/java/io/trino/client/spooling/SpooledSegmentIterator.java b/client/trino-client/src/main/java/io/trino/client/spooling/SpooledSegmentIterator.java new file mode 100644 index 000000000000..71ce9f88016d --- /dev/null +++ b/client/trino-client/src/main/java/io/trino/client/spooling/SpooledSegmentIterator.java @@ -0,0 +1,118 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.client.spooling; + +import com.google.common.collect.AbstractIterator; +import com.google.common.io.Closer; +import io.trino.client.QueryDataDecoder; + +import java.io.IOException; +import java.io.InputStream; +import java.io.UncheckedIOException; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; + +import static com.google.common.base.Preconditions.checkState; +import static com.google.common.base.Verify.verify; +import static java.util.Objects.requireNonNull; + +// Accessible through the SpooledSegment.toIterator +class SpooledSegmentIterator + extends AbstractIterator> +{ + private final SpooledSegment segment; + private final long rowsCount; + private final SegmentLoader loader; + private final QueryDataDecoder decoder; + private long currentRow; + private boolean loaded; + private boolean closed; + private Iterator> iterator; + private Closer closer = Closer.create(); + + public SpooledSegmentIterator(SpooledSegment spooledSegment, SegmentLoader loader, QueryDataDecoder decoder) + { + this.segment = requireNonNull(spooledSegment, "spooledSegment is null"); + this.rowsCount = spooledSegment.getRowsCount(); + this.loader = requireNonNull(loader, "loader is null"); + this.decoder = requireNonNull(decoder, "decoder is null"); + } + + public void load() + { + checkState(!closed, "Iterator is already closed"); + checkState(!loaded, "Iterator is already loaded"); + + checkState(iterator == null, "Iterator should be unloaded"); + try { + InputStream stream = closer.register(loader.load(segment)); // close stream when depleted + closer.register(() -> loader.acknowledge(segment)); // acknowledge segment when depleted + iterator = decoder.decode(stream, segment.getMetadata()); + loaded = true; + } + catch (IOException e) { + closed = true; + throw new UncheckedIOException(e); + } + } + + public void unload() + { + checkState(!closed, "Iterator is already closed"); + closed = true; + try { + closer.close(); + iterator = null; + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + public long remaining() + { + return rowsCount - currentRow; + } + + @Override + protected List computeNext() + { + if (!loaded) { + load(); + } + + if (++currentRow > rowsCount) { + return endOfData(); + } + + if (closed) { + throw new NoSuchElementException(); + } + + try { + verify(iterator.hasNext(), "Iterator should have more rows, current: %s, count: %s", currentRow, rowsCount); + List rows = iterator.next(); + if (currentRow == this.rowsCount) { + unload(); // Unload when the last row was fetched + } + return rows; + } + catch (Exception e) { + // Cleanup if decoding has failed + unload(); + throw e; + } + } +} diff --git a/client/trino-client/src/main/java/io/trino/client/spooling/encoding/CompressedQueryDataDecoder.java b/client/trino-client/src/main/java/io/trino/client/spooling/encoding/CompressedQueryDataDecoder.java index ce2fe561d5ad..7cafe8cbc4d2 100644 --- a/client/trino-client/src/main/java/io/trino/client/spooling/encoding/CompressedQueryDataDecoder.java +++ b/client/trino-client/src/main/java/io/trino/client/spooling/encoding/CompressedQueryDataDecoder.java @@ -15,13 +15,14 @@ import com.google.common.io.ByteStreams; import io.trino.client.QueryDataDecoder; -import io.trino.client.ResultRows; import io.trino.client.spooling.DataAttribute; import io.trino.client.spooling.DataAttributes; import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; +import java.util.Iterator; +import java.util.List; import java.util.Optional; import static com.google.common.base.Verify.verify; @@ -41,7 +42,7 @@ abstract void decompress(byte[] input, byte[] output) throws IOException; @Override - public ResultRows decode(InputStream stream, DataAttributes metadata) + public Iterator> decode(InputStream stream, DataAttributes metadata) throws IOException { Optional expectedDecompressedSize = metadata.getOptional(DataAttribute.UNCOMPRESSED_SIZE, Integer.class); diff --git a/client/trino-client/src/main/java/io/trino/client/spooling/encoding/JsonQueryDataDecoder.java b/client/trino-client/src/main/java/io/trino/client/spooling/encoding/JsonQueryDataDecoder.java index dcd631b0f285..abd14774cbb8 100644 --- a/client/trino-client/src/main/java/io/trino/client/spooling/encoding/JsonQueryDataDecoder.java +++ b/client/trino-client/src/main/java/io/trino/client/spooling/encoding/JsonQueryDataDecoder.java @@ -17,10 +17,11 @@ import io.trino.client.JsonDecodingUtils.TypeDecoder; import io.trino.client.JsonResultRows; import io.trino.client.QueryDataDecoder; -import io.trino.client.ResultRows; import io.trino.client.spooling.DataAttributes; +import java.io.IOException; import java.io.InputStream; +import java.util.Iterator; import java.util.List; import static io.trino.client.JsonDecodingUtils.createTypeDecoders; @@ -37,7 +38,8 @@ public class JsonQueryDataDecoder } @Override - public ResultRows decode(InputStream stream, DataAttributes queryAttributes) + public Iterator> decode(InputStream stream, DataAttributes queryAttributes) + throws IOException { return JsonResultRows.forInputStream(stream, decoders); } diff --git a/client/trino-client/src/test/java/io/trino/client/TestResultRowsDecoder.java b/client/trino-client/src/test/java/io/trino/client/TestResultRowsDecoder.java index 8a2b6465952c..64c05deec3a6 100644 --- a/client/trino-client/src/test/java/io/trino/client/TestResultRowsDecoder.java +++ b/client/trino-client/src/test/java/io/trino/client/TestResultRowsDecoder.java @@ -17,6 +17,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.io.CountingInputStream; +import io.trino.client.spooling.DataAttribute; import io.trino.client.spooling.DataAttributes; import io.trino.client.spooling.EncodedQueryData; import io.trino.client.spooling.Segment; @@ -25,7 +26,6 @@ import org.junit.jupiter.api.Test; import java.io.ByteArrayInputStream; -import java.io.FilterInputStream; import java.io.InputStream; import java.net.URI; import java.util.Arrays; @@ -117,7 +117,7 @@ public void testSpooledJsonMaterialization() AtomicInteger loaded = new AtomicInteger(); AtomicInteger acknowledged = new AtomicInteger(); try (ResultRowsDecoder decoder = new ResultRowsDecoder(new StaticLoader(loaded, acknowledged))) { - assertThat(eagerlyMaterialize(decoder.toRows(fromSegments(spooledSegment(), spooledSegment())))) + assertThat(eagerlyMaterialize(decoder.toRows(fromSegments(spooledSegment(2), spooledSegment(2))))) .hasSize(4) .containsExactly(ImmutableList.of(2137), ImmutableList.of(1337), ImmutableList.of(2137), ImmutableList.of(1337)); } @@ -132,7 +132,7 @@ public void testSpooledJsonNodeMaterialization() AtomicInteger loaded = new AtomicInteger(); AtomicInteger acknowledged = new AtomicInteger(); try (ResultRowsDecoder decoder = new ResultRowsDecoder(new StaticLoader(loaded, acknowledged))) { - assertThat(eagerlyMaterialize(decoder.toRows(fromSegments(spooledSegment(), spooledSegment())))) + assertThat(eagerlyMaterialize(decoder.toRows(fromSegments(spooledSegment(2), spooledSegment(2))))) .hasSize(4) .containsExactly(ImmutableList.of(2137), ImmutableList.of(1337), ImmutableList.of(2137), ImmutableList.of(1337)); } @@ -148,7 +148,7 @@ public void testSpooledJsonNodeScanningMaterialization() .reduce("[", (a, b) -> a + "[" + b + "],", String::concat) + "[1337]]"; CountingInputStream stream = new CountingInputStream(new ByteArrayInputStream(data.getBytes(UTF_8))); try (ResultRowsDecoder decoder = new ResultRowsDecoder(loaderFromStream(stream))) { - Iterator> iterator = decoder.toRows(fromSegments(spooledSegment())).iterator(); + Iterator> iterator = decoder.toRows(fromSegments(spooledSegment(2501))).iterator(); assertThat(stream.getCount()).isEqualTo(0); iterator.next(); assertThat(stream.getCount()).isEqualTo(8000); // Jackson reads data in 8K chunks @@ -174,7 +174,7 @@ public void testLazySpooledMaterialization() AtomicInteger loaded = new AtomicInteger(); AtomicInteger acknowledged = new AtomicInteger(); try (ResultRowsDecoder decoder = new ResultRowsDecoder(new StaticLoader(loaded, acknowledged))) { - Iterator> iterator = decoder.toRows(fromSegments(spooledSegment(), spooledSegment())) + Iterator> iterator = decoder.toRows(fromSegments(spooledSegment(2), spooledSegment(2))) .iterator(); assertThat(loaded.get()).isEqualTo(0); @@ -216,14 +216,7 @@ public StaticLoader(AtomicInteger loaded, AtomicInteger acknowledged) public InputStream load(SpooledSegment segment) { loaded.incrementAndGet(); - - return new FilterInputStream(new ByteArrayInputStream("[[2137], [1337]]".getBytes(UTF_8))) { - @Override - public void close() - { - acknowledge(segment); - } - }; + return new ByteArrayInputStream("[[2137], [1337]]".getBytes(UTF_8)); } @Override @@ -292,8 +285,12 @@ private static QueryResults fromSegments(Segment... segments) .build()); } - private static Segment spooledSegment() + private static Segment spooledSegment(long rows) { - return spooled(URI.create("http://localhost"), URI.create("http://localhost"), DataAttributes.empty(), ImmutableMap.of()); + DataAttributes attributes = DataAttributes.builder() + .set(DataAttribute.ROWS_COUNT, rows) + .build(); + + return spooled(URI.create("http://localhost"), URI.create("http://localhost"), attributes, ImmutableMap.of()); } } diff --git a/client/trino-client/src/test/java/io/trino/client/spooling/encoding/TestCompressedQueryDataDecoder.java b/client/trino-client/src/test/java/io/trino/client/spooling/encoding/TestCompressedQueryDataDecoder.java index 395d9123a639..a426d07a45f5 100644 --- a/client/trino-client/src/test/java/io/trino/client/spooling/encoding/TestCompressedQueryDataDecoder.java +++ b/client/trino-client/src/test/java/io/trino/client/spooling/encoding/TestCompressedQueryDataDecoder.java @@ -16,7 +16,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.io.ByteStreams; import io.trino.client.QueryDataDecoder; -import io.trino.client.ResultRows; import io.trino.client.spooling.DataAttributes; import org.junit.jupiter.api.Test; @@ -24,6 +23,7 @@ import java.io.FilterInputStream; import java.io.IOException; import java.io.InputStream; +import java.util.Iterator; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; @@ -53,12 +53,12 @@ public void close() QueryDataDecoder decoder = new TestQueryDataDecoder(new QueryDataDecoder() { @Override - public ResultRows decode(InputStream input, DataAttributes segmentAttributes) + public Iterator> decode(InputStream input, DataAttributes segmentAttributes) throws IOException { assertThat(new String(ByteStreams.toByteArray(input), UTF_8)) .isEqualTo("decompressed"); - return SAMPLE_VALUES::iterator; + return SAMPLE_VALUES.iterator(); } @Override @@ -74,6 +74,7 @@ public String encoding() .set(UNCOMPRESSED_SIZE, "decompressed".length()) .set(SEGMENT_SIZE, "compressed".length()) .build())) + .toIterable() .containsAll(SAMPLE_VALUES); assertThat(closed.get()).isTrue(); } @@ -95,13 +96,13 @@ public void close() QueryDataDecoder decoder = new TestQueryDataDecoder(new QueryDataDecoder() { @Override - public ResultRows decode(InputStream input, DataAttributes segmentAttributes) + public Iterator> decode(InputStream input, DataAttributes segmentAttributes) throws IOException { assertThat(new String(ByteStreams.toByteArray(input), UTF_8)) .isEqualTo("not compressed"); input.close(); // Closes input stream according to the contract - return SAMPLE_VALUES::iterator; + return SAMPLE_VALUES.iterator(); } @Override @@ -115,6 +116,7 @@ public String encoding() assertThat(decoder.decode(stream, DataAttributes.builder() .set(SEGMENT_SIZE, "not compressed".length()) .build())) + .toIterable() .containsAll(SAMPLE_VALUES); assertThat(closed.get()).isTrue(); } diff --git a/client/trino-jdbc/src/test/java/io/trino/jdbc/TestAsyncResultIterator.java b/client/trino-jdbc/src/test/java/io/trino/jdbc/TestAsyncResultIterator.java index 963c782f934a..bfb861b96884 100644 --- a/client/trino-jdbc/src/test/java/io/trino/jdbc/TestAsyncResultIterator.java +++ b/client/trino-jdbc/src/test/java/io/trino/jdbc/TestAsyncResultIterator.java @@ -41,7 +41,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; -import static io.trino.client.ResultRows.fromIterableRows; +import static io.trino.client.ResultRows.wrapList; import static java.util.Objects.requireNonNull; import static org.assertj.core.api.Assertions.assertThat; @@ -64,7 +64,7 @@ public void testIteratorCancelWhenQueueNotFull() catch (InterruptedException e) { interruptedButSwallowedLatch.countDown(); } - return fromIterableRows(ImmutableList.of(ImmutableList.of(new Object()))); + return wrapList(ImmutableList.of(ImmutableList.of(new Object()))); }), ignored -> {}, new WarningsManager(), Optional.of(new ArrayBlockingQueue<>(100))); @@ -93,7 +93,7 @@ public void testIteratorCancelWhenQueueIsFull() AsyncResultIterator iterator = new AsyncResultIterator( new MockStatementClient(() -> { thread.compareAndSet(null, Thread.currentThread()); - return fromIterableRows(ImmutableList.of(ImmutableList.of(new Object()))); + return wrapList(ImmutableList.of(ImmutableList.of(new Object()))); }), ignored -> {}, new WarningsManager(), Optional.of(queue));