diff --git a/plugin/trino-prometheus/src/main/java/io/trino/plugin/prometheus/PrometheusClient.java b/plugin/trino-prometheus/src/main/java/io/trino/plugin/prometheus/PrometheusClient.java index 971b3846aa12..b4577427469e 100644 --- a/plugin/trino-prometheus/src/main/java/io/trino/plugin/prometheus/PrometheusClient.java +++ b/plugin/trino-prometheus/src/main/java/io/trino/plugin/prometheus/PrometheusClient.java @@ -31,6 +31,7 @@ import okhttp3.OkHttpClient.Builder; import okhttp3.Request; import okhttp3.Response; +import okhttp3.ResponseBody; import java.io.File; import java.io.IOException; @@ -159,15 +160,21 @@ private String toRemoteTableName(String tableName) private Map fetchMetrics(JsonCodec> metricsCodec, URI metadataUri) { - return metricsCodec.fromJson(fetchUri(metadataUri)); + try (ResponseBody body = fetchUri(metadataUri)) { + return metricsCodec.fromJson(body.string()); + } + catch (IOException e) { + throw new TrinoException(PROMETHEUS_UNKNOWN_ERROR, "Error reading metadata", e); + } } - public byte[] fetchUri(URI uri) + public ResponseBody fetchUri(URI uri) { Request.Builder requestBuilder = new Request.Builder().url(uri.toString()); - try (Response response = httpClient.newCall(requestBuilder.build()).execute()) { + try { + Response response = httpClient.newCall(requestBuilder.build()).execute(); if (response.isSuccessful() && response.body() != null) { - return response.body().bytes(); + return response.body(); } throw new TrinoException(PROMETHEUS_UNKNOWN_ERROR, "Bad response " + response.code() + " " + response.message()); } diff --git a/plugin/trino-prometheus/src/main/java/io/trino/plugin/prometheus/PrometheusRecordCursor.java b/plugin/trino-prometheus/src/main/java/io/trino/plugin/prometheus/PrometheusRecordCursor.java index 1fd58b4a6f69..ff23d2c9d8b1 100644 --- a/plugin/trino-prometheus/src/main/java/io/trino/plugin/prometheus/PrometheusRecordCursor.java +++ b/plugin/trino-prometheus/src/main/java/io/trino/plugin/prometheus/PrometheusRecordCursor.java @@ -14,7 +14,6 @@ package io.trino.plugin.prometheus; import com.google.common.collect.ImmutableList; -import com.google.common.io.ByteSource; import com.google.common.io.CountingInputStream; import io.airlift.slice.Slice; import io.airlift.slice.Slices; @@ -30,6 +29,7 @@ import io.trino.spi.type.Type; import io.trino.spi.type.TypeUtils; import io.trino.spi.type.VarcharType; +import okhttp3.ResponseBody; import java.io.IOException; import java.io.UncheckedIOException; @@ -64,12 +64,14 @@ public class PrometheusRecordCursor private final Iterator metricsItr; private final long totalBytes; + private final Runnable closeResponse; private PrometheusStandardizedRow fields; - public PrometheusRecordCursor(List columnHandles, ByteSource byteSource) + public PrometheusRecordCursor(List columnHandles, ResponseBody responseBody) { this.columnHandles = columnHandles; + this.closeResponse = responseBody::close; fieldToColumnIndex = new int[columnHandles.size()]; for (int i = 0; i < columnHandles.size(); i++) { @@ -77,7 +79,7 @@ public PrometheusRecordCursor(List columnHandles, ByteSo fieldToColumnIndex[i] = columnHandle.ordinalPosition(); } - try (CountingInputStream input = new CountingInputStream(byteSource.openStream())) { + try (CountingInputStream input = new CountingInputStream(responseBody.byteStream())) { metricsItr = prometheusResultsInStandardizedForm(new PrometheusQueryResponseParse(input).getResults()).iterator(); totalBytes = input.getCount(); } @@ -284,5 +286,8 @@ private static List getArrayFromBlock(Type elementType, Block block) } @Override - public void close() {} + public void close() + { + closeResponse.run(); + } } diff --git a/plugin/trino-prometheus/src/main/java/io/trino/plugin/prometheus/PrometheusRecordSet.java b/plugin/trino-prometheus/src/main/java/io/trino/plugin/prometheus/PrometheusRecordSet.java index 50bff0e90c2e..4a6369e2bf38 100644 --- a/plugin/trino-prometheus/src/main/java/io/trino/plugin/prometheus/PrometheusRecordSet.java +++ b/plugin/trino-prometheus/src/main/java/io/trino/plugin/prometheus/PrometheusRecordSet.java @@ -14,10 +14,10 @@ package io.trino.plugin.prometheus; import com.google.common.collect.ImmutableList; -import com.google.common.io.ByteSource; import io.trino.spi.connector.RecordCursor; import io.trino.spi.connector.RecordSet; import io.trino.spi.type.Type; +import okhttp3.ResponseBody; import java.net.URI; import java.util.List; @@ -29,7 +29,7 @@ public class PrometheusRecordSet { private final List columnHandles; private final List columnTypes; - private final ByteSource byteSource; + private final ResponseBody responseBody; public PrometheusRecordSet(PrometheusClient prometheusClient, PrometheusSplit split, List columnHandles) { @@ -43,7 +43,7 @@ public PrometheusRecordSet(PrometheusClient prometheusClient, PrometheusSplit sp } this.columnTypes = types.build(); - this.byteSource = ByteSource.wrap(prometheusClient.fetchUri(URI.create(split.getUri()))); + this.responseBody = prometheusClient.fetchUri(URI.create(split.getUri())); } @Override @@ -55,6 +55,6 @@ public List getColumnTypes() @Override public RecordCursor cursor() { - return new PrometheusRecordCursor(columnHandles, byteSource); + return new PrometheusRecordCursor(columnHandles, responseBody); } }