Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

import static com.fasterxml.jackson.core.JsonParser.Feature.AUTO_CLOSE_SOURCE;
Expand Down Expand Up @@ -136,28 +137,16 @@ private void close()
}
}

public static ResultRows forJsonParser(JsonParser parser, List<Column> columns)
public static Iterator<List<Object>> forJsonParser(JsonParser parser, List<Column> columns)
throws IOException
{
return () -> {
try {
return new RowWiseIterator(parser, createTypeDecoders(columns));
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
};
return new RowWiseIterator(parser, createTypeDecoders(columns));
}

public static ResultRows forInputStream(InputStream stream, TypeDecoder[] decoders)
public static Iterator<List<Object>> forInputStream(InputStream stream, TypeDecoder[] decoders)
throws IOException
{
return () -> {
try {
return new RowWiseIterator(stream, decoders);
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
};
return new RowWiseIterator(stream, decoders);
}

@SuppressModernizer // There is no JsonFactory in the client module
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import okhttp3.Request;
import okhttp3.Response;

import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
Expand Down Expand Up @@ -65,7 +64,7 @@ public InputStream load(SpooledSegment segment)
}

if (response.isSuccessful()) {
return delegatingInputStream(response, response.body().byteStream(), segment);
return response.body().byteStream();
}
throw new IOException(format("Could not open segment for streaming, got error '%s' with code %d", response.message(), response.code()));
}
Expand Down Expand Up @@ -95,21 +94,6 @@ public void onResponse(Call call, Response response)
});
}

private InputStream delegatingInputStream(Response response, InputStream delegate, SpooledSegment segment)
{
return new FilterInputStream(delegate)
{
@Override
public void close()
throws IOException
{
try (Response ignored = response; InputStream ignored2 = delegate) {
acknowledge(segment);
}
}
};
}

private static Headers toHeaders(Map<String, List<String>> headers)
{
Headers.Builder builder = new Headers.Builder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import java.io.IOException;
import java.io.InputStream;
import java.util.Iterator;
import java.util.List;

public interface QueryDataDecoder
Expand All @@ -39,7 +40,7 @@ interface Factory
*
* @throws IOException if an I/O error occurs
*/
ResultRows decode(InputStream input, DataAttributes segmentAttributes)
Iterator<List<Object>> decode(InputStream input, DataAttributes segmentAttributes)
throws IOException;

String encoding();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public Iterator<List<Object>> iterator()
}
};

static ResultRows fromIterableRows(Iterable<List<Object>> values)
static ResultRows wrapList(List<List<Object>> values)
{
return values::iterator;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,29 +13,25 @@
*/
package io.trino.client;

import com.google.common.collect.Iterables;
import io.trino.client.spooling.DataAttributes;
import io.trino.client.spooling.EncodedQueryData;
import io.trino.client.spooling.InlineSegment;
import io.trino.client.spooling.Segment;
import io.trino.client.spooling.SegmentLoader;
import io.trino.client.spooling.SpooledSegment;
import io.trino.client.spooling.encoding.QueryDataDecoders;
import org.gaul.modernizer_maven_annotations.SuppressModernizer;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;

import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.Iterables.filter;
import static com.google.common.collect.Iterables.transform;
import static com.google.common.collect.Iterators.concat;
import static com.google.common.collect.Iterators.transform;
import static io.trino.client.ResultRows.NULL_ROWS;
import static io.trino.client.ResultRows.fromIterableRows;
import static java.util.Objects.requireNonNull;

/**
Expand Down Expand Up @@ -100,41 +96,33 @@ public ResultRows toRows(List<Column> columns, QueryData data)
if (jsonData.isNull()) {
return NULL_ROWS;
}
return () -> JsonResultRows.forJsonParser(jsonData.getJsonParser(), columns).iterator();
return () -> {
try {
return JsonResultRows.forJsonParser(jsonData.getJsonParser(), columns);
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
};
}

if (data instanceof EncodedQueryData) {
EncodedQueryData encodedData = (EncodedQueryData) data;
setEncoding(columns, encodedData.getEncoding());
return concat(transform(encodedData.getSegments(), this::segmentToRows));
return () -> concat(transform(encodedData.getSegments().iterator(), this::segmentToRows));
}

throw new UnsupportedOperationException("Unsupported data type: " + data.getClass().getName());
}

private ResultRows segmentToRows(Segment segment)
private Iterator<List<Object>> segmentToRows(Segment segment)
{
if (segment instanceof InlineSegment) {
InlineSegment inlineSegment = (InlineSegment) segment;
try {
return decoder.decode(new ByteArrayInputStream(inlineSegment.getData()), inlineSegment.getMetadata());
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
return ((InlineSegment) segment).toIterator(decoder);
}

if (segment instanceof SpooledSegment) {
SpooledSegment spooledSegment = (SpooledSegment) segment;

try {
// The returned rows are lazy which means that decoder is responsible for closing input stream
InputStream stream = loader.load(spooledSegment);
return decoder.decode(stream, spooledSegment.getMetadata());
}
catch (IOException e) {
throw new RuntimeException(e);
}
return ((SpooledSegment) segment).toIterator(loader, decoder);
}

throw new UnsupportedOperationException("Unsupported segment type: " + segment.getClass().getName());
Expand All @@ -152,10 +140,4 @@ public void close()
{
loader.close();
}

@SuppressModernizer
private static ResultRows concat(Iterable<ResultRows> resultRows)
{
return fromIterableRows(Iterables.concat(filter(resultRows, rows -> !rows.isNull())));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.trino.client.QueryDataDecoder;

import java.util.Iterator;
import java.util.List;
import java.util.Map;

import static java.lang.String.format;
Expand Down Expand Up @@ -48,4 +51,9 @@ public String toString()
{
return format("InlineSegment{offset=%d, rows=%d, size=%d}", getOffset(), getRowsCount(), getSegmentSize());
}

public Iterator<List<Object>> toIterator(QueryDataDecoder decoder)
{
return new InlineSegmentIterator(this, decoder);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.client.spooling;

import com.google.common.collect.AbstractIterator;
import io.trino.client.QueryDataDecoder;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Iterator;
import java.util.List;

import static java.util.Objects.requireNonNull;

// Accessible through the InlineSegment.toIterator
class InlineSegmentIterator
extends AbstractIterator<List<Object>>
{
private InlineSegment segment;
private final QueryDataDecoder decoder;
private Iterator<List<Object>> iterator;

public InlineSegmentIterator(InlineSegment segment, QueryDataDecoder decoder)
{
this.segment = requireNonNull(segment, "segment is null");
this.decoder = requireNonNull(decoder, "decoder is null");
}

@Override
protected List<Object> computeNext()
{
if (iterator == null) {
try {
iterator = decoder.decode(new ByteArrayInputStream(segment.getData()), segment.getMetadata());
segment = null;
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
}

if (iterator.hasNext()) {
return iterator.next();
}
return endOfData();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableMap;
import io.trino.client.QueryDataDecoder;

import java.net.URI;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -75,4 +77,9 @@ public String toString()
{
return format("SpooledSegment{offset=%d, rows=%d, size=%d, headers=%s}", getOffset(), getRowsCount(), getSegmentSize(), headers.keySet());
}

public Iterator<List<Object>> toIterator(SegmentLoader loader, QueryDataDecoder decoder)
{
return new SpooledSegmentIterator(this, loader, decoder);
}
}
Loading