Skip to content

Commit

Permalink
[timeseries] Part-3.1: Add Support for Partial Aggregate and Complex …
Browse files Browse the repository at this point in the history
…Intermediate Type (apache#14631)

* [timeseries] Add Support for Partial Aggregation and Complex Intermediate Type

* Fix tests + add tests + cleanup

* address feedback

* add missing todo
  • Loading branch information
ankitsultana authored Dec 17, 2024
1 parent 9d12d38 commit b64bd81
Show file tree
Hide file tree
Showing 18 changed files with 301 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ private static PinotBrokerTimeSeriesResponse convertBucketedSeriesBlock(TimeSeri
for (TimeSeries timeSeries : listOfTimeSeries) {
Object[][] values = new Object[timeValues.length][];
for (int i = 0; i < timeValues.length; i++) {
Object nullableValue = timeSeries.getValues()[i];
Object nullableValue = timeSeries.getDoubleValues()[i];
values[i] = new Object[]{timeValues[i], nullableValue == null ? null : nullableValue.toString()};
}
result.add(new PinotBrokerTimeSeriesResponse.Value(metricMap, values));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public class TimeSeriesAggregationOperatorTest {
private static final Random RANDOM = new Random();
private static final String DUMMY_TIME_COLUMN = "someTimeColumn";
private static final String GROUP_BY_COLUMN = "city";
private static final AggInfo AGG_INFO = new AggInfo("SUM", Collections.emptyMap());
private static final AggInfo AGG_INFO = new AggInfo("SUM", false, Collections.emptyMap());
private static final ExpressionContext VALUE_EXPRESSION = ExpressionContext.forIdentifier("someValueColumn");
private static final TimeBuckets TIME_BUCKETS = TimeBuckets.ofSeconds(1000, Duration.ofSeconds(100), 10);
private static final int NUM_DOCS_IN_DUMMY_DATA = 1000;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ public void testTimeSeriesSumQuery() {
ExpressionContext valueExpression = ExpressionContext.forIdentifier("orderAmount");
TimeSeriesContext timeSeriesContext =
new TimeSeriesContext(TIME_SERIES_LANGUAGE_NAME, TIME_SERIES_TIME_COL_NAME, TimeUnit.SECONDS, timeBuckets,
0L /* offsetSeconds */, valueExpression, new AggInfo("SUM", null));
0L /* offsetSeconds */, valueExpression, new AggInfo("SUM", false, Collections.emptyMap()));
QueryContext queryContext = getQueryContextForTimeSeries(timeSeriesContext, Collections.emptyList());
ServerQueryRequest serverQueryRequest =
new ServerQueryRequest(queryContext, _segmentNames, new HashMap<>(), ServerMetrics.get());
Expand All @@ -232,8 +232,8 @@ public void testTimeSeriesSumQuery() {
TimeSeriesResultsBlock resultsBlock = (TimeSeriesResultsBlock) instanceResponse.getResultsBlock();
TimeSeriesBlock timeSeriesBlock = resultsBlock.getTimeSeriesBuilderBlock().build();
assertEquals(timeSeriesBlock.getSeriesMap().size(), 1);
assertNull(timeSeriesBlock.getSeriesMap().values().iterator().next().get(0).getValues()[0]);
assertEquals(timeSeriesBlock.getSeriesMap().values().iterator().next().get(0).getValues()[1], 29885544.0);
assertNull(timeSeriesBlock.getSeriesMap().values().iterator().next().get(0).getDoubleValues()[0]);
assertEquals(timeSeriesBlock.getSeriesMap().values().iterator().next().get(0).getDoubleValues()[1], 29885544.0);
}

@Test
Expand All @@ -242,7 +242,7 @@ public void testTimeSeriesMaxQuery() {
ExpressionContext valueExpression = ExpressionContext.forIdentifier("orderItemCount");
TimeSeriesContext timeSeriesContext =
new TimeSeriesContext(TIME_SERIES_LANGUAGE_NAME, TIME_SERIES_TIME_COL_NAME, TimeUnit.SECONDS, timeBuckets,
0L /* offsetSeconds */, valueExpression, new AggInfo("MAX", null));
0L /* offsetSeconds */, valueExpression, new AggInfo("MAX", false, Collections.emptyMap()));
QueryContext queryContext = getQueryContextForTimeSeries(timeSeriesContext);
ServerQueryRequest serverQueryRequest =
new ServerQueryRequest(queryContext, _segmentNames, new HashMap<>(), ServerMetrics.get());
Expand All @@ -260,7 +260,7 @@ public void testTimeSeriesMaxQuery() {
assertFalse(foundNewYork, "Found multiple time-series for New York");
foundNewYork = true;
Optional<Double> maxValue =
Arrays.stream(timeSeries.getValues()).filter(Objects::nonNull).max(Comparator.naturalOrder());
Arrays.stream(timeSeries.getDoubleValues()).filter(Objects::nonNull).max(Comparator.naturalOrder());
assertTrue(maxValue.isPresent());
assertEquals(maxValue.get().longValue(), 4L);
}
Expand All @@ -274,7 +274,7 @@ public void testTimeSeriesMinQuery() {
ExpressionContext valueExpression = ExpressionContext.forIdentifier("orderItemCount");
TimeSeriesContext timeSeriesContext =
new TimeSeriesContext(TIME_SERIES_LANGUAGE_NAME, TIME_SERIES_TIME_COL_NAME, TimeUnit.SECONDS, timeBuckets,
0L /* offsetSeconds */, valueExpression, new AggInfo("MIN", null));
0L /* offsetSeconds */, valueExpression, new AggInfo("MIN", false, Collections.emptyMap()));
QueryContext queryContext = getQueryContextForTimeSeries(timeSeriesContext);
ServerQueryRequest serverQueryRequest =
new ServerQueryRequest(queryContext, _segmentNames, new HashMap<>(), ServerMetrics.get());
Expand All @@ -292,7 +292,7 @@ public void testTimeSeriesMinQuery() {
assertFalse(foundChicago, "Found multiple time-series for Chicago");
foundChicago = true;
Optional<Double> minValue =
Arrays.stream(timeSeries.getValues()).filter(Objects::nonNull).min(Comparator.naturalOrder());
Arrays.stream(timeSeries.getDoubleValues()).filter(Objects::nonNull).min(Comparator.naturalOrder());
assertTrue(minValue.isPresent());
assertEquals(minValue.get().longValue(), 0L);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -84,7 +85,7 @@ public BaseTimeSeriesPlanNode planQuery(RangeTimeSeriesRequest request) {
case "max":
Preconditions.checkState(commandId == 1, "Aggregation should be the second command (fetch should be first)");
Preconditions.checkState(aggInfo == null, "Aggregation already set. Only single agg allowed.");
aggInfo = new AggInfo(command.toUpperCase(Locale.ENGLISH), null);
aggInfo = new AggInfo(command.toUpperCase(Locale.ENGLISH), false, Collections.emptyMap());
if (commands.get(commandId).size() > 1) {
String[] cols = commands.get(commandId).get(1).split(",");
groupByColumns = Stream.of(cols).map(String::trim).collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public TimeSeriesBlock getNextBlock() {
TimeSeriesBlock seriesBlock = _childOperators.get(0).nextBlock();
seriesBlock.getSeriesMap().values().parallelStream().forEach(unionOfSeries -> {
for (TimeSeries series : unionOfSeries) {
Double[] values = series.getValues();
Double[] values = series.getDoubleValues();
Double lastValue = null;
for (int index = 0; index < values.length; index++) {
if (values[index] != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public TimeSeriesBlock getNextBlock() {
TimeSeriesBlock seriesBlock = _childOperators.get(0).nextBlock();
seriesBlock.getSeriesMap().values().parallelStream().forEach(unionOfSeries -> {
for (TimeSeries series : unionOfSeries) {
Double[] values = series.getValues();
Double[] values = series.getDoubleValues();
for (int index = 0; index < values.length; index++) {
values[index] = values[index] == null ? _defaultValue : values[index];
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,27 @@
*/
package org.apache.pinot.query.runtime.timeseries.serde;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import javax.annotation.Nullable;
import org.apache.commons.codec.DecoderException;
import org.apache.commons.codec.binary.Hex;
import org.apache.pinot.common.datablock.DataBlock;
import org.apache.pinot.common.datablock.DataBlockUtils;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.core.common.datablock.DataBlockBuilder;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.apache.pinot.tsdb.spi.TimeBuckets;
Expand All @@ -51,7 +57,7 @@
* the last column. As an example, consider the following, where FBV represents the first bucket value of TimeBuckets.
* <pre>
* +-------------+------------+-------------+---------------------------------+
* | tag-0 | tag-1 | tag-n | values |
* | tag-0 | tag-1 | tag-n | values (String[] or double[]) |
* +-------------+------------+-------------+---------------------------------+
* | null | null | null | [FBV, bucketSize, numBuckets] |
* +-------------+------------+-------------+---------------------------------+
Expand All @@ -74,6 +80,7 @@ public class TimeSeriesBlockSerde {
* Using Double.MIN_VALUE is better than using Double.NaN since Double.NaN can help detect divide by 0.
* TODO(timeseries): Check if we can get rid of boxed Doubles altogether.
*/
private static final String VALUES_COLUMN_NAME = "__ts_serde_values";
private static final double NULL_PLACEHOLDER = Double.MIN_VALUE;

private TimeSeriesBlockSerde() {
Expand All @@ -85,12 +92,13 @@ public static TimeSeriesBlock deserializeTimeSeriesBlock(ByteBuffer readOnlyByte
TransferableBlock transferableBlock = TransferableBlockUtils.wrap(dataBlock);
List<String> tagNames = generateTagNames(Objects.requireNonNull(transferableBlock.getDataSchema(),
"Missing data schema in TransferableBlock"));
final DataSchema dataSchema = transferableBlock.getDataSchema();
List<Object[]> container = transferableBlock.getContainer();
TimeBuckets timeBuckets = timeBucketsFromRow(container.get(0));
TimeBuckets timeBuckets = timeBucketsFromRow(container.get(0), dataSchema);
Map<Long, List<TimeSeries>> seriesMap = new HashMap<>();
for (int index = 1; index < container.size(); index++) {
Object[] row = container.get(index);
TimeSeries timeSeries = timeSeriesFromRow(tagNames, row, timeBuckets);
TimeSeries timeSeries = timeSeriesFromRow(tagNames, row, timeBuckets, dataSchema);
long seriesId = Long.parseLong(timeSeries.getId());
seriesMap.computeIfAbsent(seriesId, x -> new ArrayList<>()).add(timeSeries);
}
Expand All @@ -112,17 +120,77 @@ public static ByteString serializeTimeSeriesBlock(TimeSeriesBlock timeSeriesBloc
return DataBlockUtils.toByteString(transferableBlock.getDataBlock());
}

/**
* This method is only used for encoding time-bucket-values to byte arrays, when the TimeSeries value type
* is byte[][].
*/
@VisibleForTesting
static byte[][] toBytesArray(double[] values) {
byte[][] result = new byte[values.length][8];
for (int index = 0; index < values.length; index++) {
ByteBuffer byteBuffer = ByteBuffer.wrap(result[index]);
byteBuffer.order(ByteOrder.LITTLE_ENDIAN);
byteBuffer.putDouble(values[index]);
}
return result;
}

/**
* This method is only used for decoding time-bucket-values from byte arrays, when the TimeSeries value type
* is byte[][].
*/
@VisibleForTesting
static double[] fromBytesArray(byte[][] bytes) {
double[] result = new double[bytes.length];
for (int index = 0; index < bytes.length; index++) {
ByteBuffer byteBuffer = ByteBuffer.wrap(bytes[index]);
byteBuffer.order(ByteOrder.LITTLE_ENDIAN);
result[index] = byteBuffer.getDouble();
}
return result;
}

/**
* Since {@link DataBlockBuilder} does not support {@link ColumnDataType#BYTES_ARRAY}, we have to encode the
* transmitted bytes as Hex to use String[].
*/
@VisibleForTesting
static String[] encodeAsHex(byte[][] byteValues) {
String[] result = new String[byteValues.length];
for (int index = 0; index < result.length; index++) {
result[index] = Hex.encodeHexString(byteValues[index]);
}
return result;
}

/**
* Used for decoding Hex strings. See {@link TimeSeriesBlockSerde#encodeAsHex} for more.
*/
@VisibleForTesting
static byte[][] decodeFromHex(String[] hexEncodedValues) {
byte[][] result = new byte[hexEncodedValues.length][];
for (int index = 0; index < hexEncodedValues.length; index++) {
try {
result[index] = Hex.decodeHex(hexEncodedValues[index]);
} catch (DecoderException e) {
throw new RuntimeException("Error decoding byte[] value from encoded hex string", e);
}
}
return result;
}

private static DataSchema generateDataSchema(TimeSeriesBlock timeSeriesBlock) {
TimeSeries sampledTimeSeries = sampleTimeSeries(timeSeriesBlock).orElse(null);
int numTags = sampledTimeSeries == null ? 0 : sampledTimeSeries.getTagNames().size();
ColumnDataType[] dataTypes = new ColumnDataType[numTags + 1];
final ColumnDataType valueDataType = inferValueDataType(sampledTimeSeries);
String[] columnNames = new String[numTags + 1];
for (int tagIndex = 0; tagIndex < numTags; tagIndex++) {
columnNames[tagIndex] = sampledTimeSeries.getTagNames().get(tagIndex);
dataTypes[tagIndex] = ColumnDataType.STRING;
}
columnNames[numTags] = "__ts_values";
dataTypes[numTags] = ColumnDataType.DOUBLE_ARRAY;
columnNames[numTags] = VALUES_COLUMN_NAME;
dataTypes[numTags] = valueDataType;
return new DataSchema(columnNames, dataTypes);
}

Expand All @@ -144,6 +212,14 @@ private static Optional<TimeSeries> sampleTimeSeries(TimeSeriesBlock timeSeriesB
return Optional.of(timeSeriesList.get(0));
}

private static ColumnDataType inferValueDataType(@Nullable TimeSeries timeSeries) {
if (timeSeries == null || timeSeries.getValues() instanceof Double[]) {
return ColumnDataType.DOUBLE_ARRAY;
}
// Byte values are encoded as hex array
return ColumnDataType.STRING_ARRAY;
}

private static Object[] timeBucketsToRow(TimeBuckets timeBuckets, DataSchema dataSchema) {
int numColumns = dataSchema.getColumnNames().length;
Object[] result = new Object[numColumns];
Expand All @@ -153,12 +229,27 @@ private static Object[] timeBucketsToRow(TimeBuckets timeBuckets, DataSchema dat
double firstBucketValue = timeBuckets.getTimeBuckets()[0];
double bucketSizeSeconds = timeBuckets.getBucketSize().getSeconds();
double numBuckets = timeBuckets.getNumBuckets();
result[numColumns - 1] = new double[]{firstBucketValue, bucketSizeSeconds, numBuckets};
final ColumnDataType valuesDataType = dataSchema.getColumnDataTypes()[numColumns - 1];
final double[] bucketsEncodedAsDouble = new double[]{firstBucketValue, bucketSizeSeconds, numBuckets};
if (valuesDataType == ColumnDataType.DOUBLE_ARRAY) {
result[numColumns - 1] = bucketsEncodedAsDouble;
} else {
Preconditions.checkState(valuesDataType == ColumnDataType.STRING_ARRAY,
"Expected bytes_array column type. Found: %s", valuesDataType);
result[numColumns - 1] = encodeAsHex(toBytesArray(bucketsEncodedAsDouble));
}
return result;
}

private static TimeBuckets timeBucketsFromRow(Object[] row) {
double[] values = (double[]) row[row.length - 1];
private static TimeBuckets timeBucketsFromRow(Object[] row, DataSchema dataSchema) {
int numColumns = dataSchema.getColumnDataTypes().length;
double[] values;
if (dataSchema.getColumnDataTypes()[numColumns - 1] == ColumnDataType.STRING_ARRAY) {
byte[][] byteValues = decodeFromHex((String[]) row[row.length - 1]);
values = fromBytesArray(byteValues);
} else {
values = (double[]) row[row.length - 1];
}
long fbv = (long) values[0];
Duration window = Duration.ofSeconds((long) values[1]);
int numBuckets = (int) values[2];
Expand All @@ -172,14 +263,25 @@ private static Object[] timeSeriesToRow(TimeSeries timeSeries, DataSchema dataSc
Object tagValue = timeSeries.getTagValues()[index];
result[index] = tagValue == null ? "null" : tagValue.toString();
}
result[numColumns - 1] = unboxDoubleArray(timeSeries.getValues());
if (dataSchema.getColumnDataTypes()[numColumns - 1] == ColumnDataType.DOUBLE_ARRAY) {
result[numColumns - 1] = unboxDoubleArray(timeSeries.getDoubleValues());
} else {
result[numColumns - 1] = encodeAsHex(timeSeries.getBytesValues());
}
return result;
}

private static TimeSeries timeSeriesFromRow(List<String> tagNames, Object[] row, TimeBuckets timeBuckets) {
Double[] values = boxDoubleArray((double[]) row[row.length - 1]);
private static TimeSeries timeSeriesFromRow(List<String> tagNames, Object[] row, TimeBuckets timeBuckets,
DataSchema dataSchema) {
int numColumns = dataSchema.getColumnDataTypes().length;
Object[] tagValues = new Object[row.length - 1];
System.arraycopy(row, 0, tagValues, 0, row.length - 1);
Object[] values;
if (dataSchema.getColumnDataTypes()[numColumns - 1] == ColumnDataType.DOUBLE_ARRAY) {
values = boxDoubleArray((double[]) row[row.length - 1]);
} else {
values = decodeFromHex((String[]) row[row.length - 1]);
}
return new TimeSeries(Long.toString(TimeSeries.hash(tagValues)), null, timeBuckets, values, tagNames, tagValues);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public void testCompileQueryContext() {
final String planId = "id";
final String tableName = "orderTable";
final String timeColumn = "orderTime";
final AggInfo aggInfo = new AggInfo("SUM", null);
final AggInfo aggInfo = new AggInfo("SUM", false, Collections.emptyMap());
final String filterExpr = "cityName = 'Chicago'";
PhysicalTimeSeriesServerPlanVisitor serverPlanVisitor = new PhysicalTimeSeriesServerPlanVisitor(
mock(QueryExecutor.class), mock(ExecutorService.class), mock(ServerMetrics.class));
Expand Down
Loading

0 comments on commit b64bd81

Please sign in to comment.