Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import okhttp3.OkHttpClient.Builder;
import okhttp3.Request;
import okhttp3.Response;
import okhttp3.ResponseBody;

import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -159,15 +160,21 @@ private String toRemoteTableName(String tableName)

private Map<String, Object> fetchMetrics(JsonCodec<Map<String, Object>> metricsCodec, URI metadataUri)
{
return metricsCodec.fromJson(fetchUri(metadataUri));
try (ResponseBody body = fetchUri(metadataUri)) {
return metricsCodec.fromJson(body.string());
}
catch (IOException e) {
throw new TrinoException(PROMETHEUS_UNKNOWN_ERROR, "Error reading metadata", e);
}
}

public byte[] fetchUri(URI uri)
public ResponseBody fetchUri(URI uri)
{
Request.Builder requestBuilder = new Request.Builder().url(uri.toString());
try (Response response = httpClient.newCall(requestBuilder.build()).execute()) {
try {
Response response = httpClient.newCall(requestBuilder.build()).execute();
if (response.isSuccessful() && response.body() != null) {
return response.body().bytes();
return response.body();
}
throw new TrinoException(PROMETHEUS_UNKNOWN_ERROR, "Bad response " + response.code() + " " + response.message());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package io.trino.plugin.prometheus;

import com.google.common.collect.ImmutableList;
import com.google.common.io.ByteSource;
import com.google.common.io.CountingInputStream;
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;
Expand All @@ -30,6 +29,7 @@
import io.trino.spi.type.Type;
import io.trino.spi.type.TypeUtils;
import io.trino.spi.type.VarcharType;
import okhttp3.ResponseBody;

import java.io.IOException;
import java.io.UncheckedIOException;
Expand Down Expand Up @@ -64,20 +64,22 @@ public class PrometheusRecordCursor

private final Iterator<PrometheusStandardizedRow> metricsItr;
private final long totalBytes;
private final Runnable closeResponse;

private PrometheusStandardizedRow fields;

public PrometheusRecordCursor(List<PrometheusColumnHandle> columnHandles, ByteSource byteSource)
public PrometheusRecordCursor(List<PrometheusColumnHandle> columnHandles, ResponseBody responseBody)
{
this.columnHandles = columnHandles;
this.closeResponse = responseBody::close;

fieldToColumnIndex = new int[columnHandles.size()];
for (int i = 0; i < columnHandles.size(); i++) {
PrometheusColumnHandle columnHandle = columnHandles.get(i);
fieldToColumnIndex[i] = columnHandle.ordinalPosition();
}

try (CountingInputStream input = new CountingInputStream(byteSource.openStream())) {
try (CountingInputStream input = new CountingInputStream(responseBody.byteStream())) {
metricsItr = prometheusResultsInStandardizedForm(new PrometheusQueryResponseParse(input).getResults()).iterator();
totalBytes = input.getCount();
}
Expand Down Expand Up @@ -284,5 +286,8 @@ private static List<Object> getArrayFromBlock(Type elementType, Block block)
}

@Override
public void close() {}
public void close()
{
closeResponse.run();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@
package io.trino.plugin.prometheus;

import com.google.common.collect.ImmutableList;
import com.google.common.io.ByteSource;
import io.trino.spi.connector.RecordCursor;
import io.trino.spi.connector.RecordSet;
import io.trino.spi.type.Type;
import okhttp3.ResponseBody;

import java.net.URI;
import java.util.List;
Expand All @@ -29,7 +29,7 @@ public class PrometheusRecordSet
{
private final List<PrometheusColumnHandle> columnHandles;
private final List<Type> columnTypes;
private final ByteSource byteSource;
private final ResponseBody responseBody;

public PrometheusRecordSet(PrometheusClient prometheusClient, PrometheusSplit split, List<PrometheusColumnHandle> columnHandles)
{
Expand All @@ -43,7 +43,7 @@ public PrometheusRecordSet(PrometheusClient prometheusClient, PrometheusSplit sp
}
this.columnTypes = types.build();

this.byteSource = ByteSource.wrap(prometheusClient.fetchUri(URI.create(split.getUri())));
this.responseBody = prometheusClient.fetchUri(URI.create(split.getUri()));
}

@Override
Expand All @@ -55,6 +55,6 @@ public List<Type> getColumnTypes()
@Override
public RecordCursor cursor()
{
return new PrometheusRecordCursor(columnHandles, byteSource);
return new PrometheusRecordCursor(columnHandles, responseBody);
}
}