Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package io.trino.plugin.prometheus;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.ByteSource;
import com.google.common.io.CountingInputStream;
import io.airlift.slice.Slice;
Expand Down Expand Up @@ -119,7 +118,7 @@ private Object getFieldValue(int field)
int columnIndex = fieldToColumnIndex[field];
switch (columnIndex) {
case 0:
return fields.getLabels();
return getBlockFromMap(columnHandles.get(columnIndex).getColumnType(), fields.getLabels());
case 1:
return fields.getTimestamp();
case 2:
Expand Down Expand Up @@ -186,7 +185,7 @@ private List<PrometheusStandardizedRow> prometheusResultsInStandardizedForm(List
{
return results.stream().map(result ->
result.getTimeSeriesValues().getValues().stream().map(prometheusTimeSeriesValue -> new PrometheusStandardizedRow(
getBlockFromMap(columnHandles.get(0).getColumnType(), ImmutableMap.copyOf(result.getMetricHeader())),
result.getMetricHeader(),
prometheusTimeSeriesValue.getTimestamp(),
Double.parseDouble(prometheusTimeSeriesValue.getValue())))
.collect(Collectors.toList()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,26 +13,27 @@
*/
package io.trino.plugin.prometheus;

import io.trino.spi.block.Block;
import com.google.common.collect.ImmutableMap;

import java.time.Instant;
import java.util.Map;

import static java.util.Objects.requireNonNull;

public class PrometheusStandardizedRow
{
private final Block labels;
private final Map<String, String> labels;
private final Instant timestamp;
private final Double value;

public PrometheusStandardizedRow(Block labels, Instant timestamp, Double value)
public PrometheusStandardizedRow(Map<String, String> labels, Instant timestamp, Double value)
{
this.labels = requireNonNull(labels, "labels is null");
this.labels = ImmutableMap.copyOf(requireNonNull(labels, "labels is null"));
this.timestamp = requireNonNull(timestamp, "timestamp is null");
this.value = requireNonNull(value, "value is null");
}

public Block getLabels()
public Map<String, String> getLabels()
{
return labels;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,13 @@ public void testSelectTable()
.matches("SELECT MAP(ARRAY[VARCHAR 'instance', '__name__', 'job'], ARRAY[VARCHAR 'localhost:9090', 'up', 'prometheus'])");
}

@Test
public void testAggregation()
{
assertQuerySucceeds("SELECT count(*) FROM default.up"); // Don't check value since the row number isn't deterministic
assertQuery("SELECT avg(value) FROM default.up", "VALUES ('1.0')");
}

@Test
public void testPushDown()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.ArrayList;
import java.util.List;

import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static io.trino.plugin.prometheus.MetadataUtil.METRIC_CODEC;
import static io.trino.plugin.prometheus.MetadataUtil.varcharMapType;
import static io.trino.plugin.prometheus.PrometheusClient.TIMESTAMP_COLUMN_TYPE;
Expand Down Expand Up @@ -62,30 +63,31 @@ public void testCursorSimple()
List<PrometheusStandardizedRow> actual = new ArrayList<>();
while (cursor.advanceNextPosition()) {
actual.add(new PrometheusStandardizedRow(
(Block) cursor.getObject(0),
getMapFromBlock(varcharMapType, (Block) cursor.getObject(0)).entrySet().stream()
.collect(toImmutableMap(entry -> (String) entry.getKey(), entry -> (String) entry.getValue())),
(Instant) cursor.getObject(1),
cursor.getDouble(2)));
assertFalse(cursor.isNull(0));
assertFalse(cursor.isNull(1));
assertFalse(cursor.isNull(2));
}
List<PrometheusStandardizedRow> expected = ImmutableList.<PrometheusStandardizedRow>builder()
.add(new PrometheusStandardizedRow(getBlockFromMap(varcharMapType,
ImmutableMap.of("instance", "localhost:9090", "__name__", "up", "job", "prometheus")), ofEpochMilli(1565962969044L), 1.0))
.add(new PrometheusStandardizedRow(getBlockFromMap(varcharMapType,
ImmutableMap.of("instance", "localhost:9090", "__name__", "up", "job", "prometheus")), ofEpochMilli(1565962984045L), 1.0))
.add(new PrometheusStandardizedRow(getBlockFromMap(varcharMapType,
ImmutableMap.of("instance", "localhost:9090", "__name__", "up", "job", "prometheus")), ofEpochMilli(1565962999044L), 1.0))
.add(new PrometheusStandardizedRow(getBlockFromMap(varcharMapType,
ImmutableMap.of("instance", "localhost:9090", "__name__", "up", "job", "prometheus")), ofEpochMilli(1565963014044L), 1.0))
.add(new PrometheusStandardizedRow(
ImmutableMap.of("instance", "localhost:9090", "__name__", "up", "job", "prometheus"), ofEpochMilli(1565962969044L), 1.0))
.add(new PrometheusStandardizedRow(
ImmutableMap.of("instance", "localhost:9090", "__name__", "up", "job", "prometheus"), ofEpochMilli(1565962984045L), 1.0))
.add(new PrometheusStandardizedRow(
ImmutableMap.of("instance", "localhost:9090", "__name__", "up", "job", "prometheus"), ofEpochMilli(1565962999044L), 1.0))
.add(new PrometheusStandardizedRow(
ImmutableMap.of("instance", "localhost:9090", "__name__", "up", "job", "prometheus"), ofEpochMilli(1565963014044L), 1.0))
.build();

assertThat(actual).as("actual")
.hasSize(expected.size());
for (int i = 0; i < actual.size(); i++) {
PrometheusStandardizedRow actualRow = actual.get(i);
PrometheusStandardizedRow expectedRow = expected.get(i);
assertEquals(getMapFromBlock(varcharMapType, actualRow.getLabels()), getMapFromBlock(varcharMapType, expectedRow.getLabels()));
assertEquals(getMapFromBlock(varcharMapType, getBlockFromMap(varcharMapType, actualRow.getLabels())), getMapFromBlock(varcharMapType, getBlockFromMap(varcharMapType, expectedRow.getLabels())));
assertEquals(actualRow.getTimestamp(), expectedRow.getTimestamp());
assertEquals(actualRow.getValue(), expectedRow.getValue());
}
Expand Down