From b64bd81de15859407f18a802573d667070de8768 Mon Sep 17 00:00:00 2001 From: Ankit Sultana Date: Tue, 17 Dec 2024 09:45:29 -0600 Subject: [PATCH] [timeseries] Part-3.1: Add Support for Partial Aggregate and Complex Intermediate Type (#14631) * [timeseries] Add Support for Partial Aggregation and Complex Intermediate Type * Fix tests + add tests + cleanup * address feedback * add missing todo --- .../PinotBrokerTimeSeriesResponse.java | 2 +- .../TimeSeriesAggregationOperatorTest.java | 2 +- .../query/executor/QueryExecutorTest.java | 14 +- .../pinot/tsdb/m3ql/M3TimeSeriesPlanner.java | 3 +- .../m3ql/operator/KeepLastValueOperator.java | 2 +- .../m3ql/operator/TransformNullOperator.java | 2 +- .../serde/TimeSeriesBlockSerde.java | 124 ++++++++++++++++-- ...ysicalTimeSeriesServerPlanVisitorTest.java | 2 +- ...TimeSeriesExchangeReceiveOperatorTest.java | 12 +- .../serde/TimeSeriesBlockSerdeTest.java | 43 +++++- .../planner/TimeSeriesPlanFragmenter.java | 13 +- .../org/apache/pinot/tsdb/spi/AggInfo.java | 30 ++++- .../tsdb/spi/plan/LeafTimeSeriesPlanNode.java | 5 + .../spi/series/BaseTimeSeriesBuilder.java | 18 +-- .../pinot/tsdb/spi/series/TimeSeries.java | 19 ++- .../spi/plan/LeafTimeSeriesPlanNodeTest.java | 9 +- .../plan/serde/TimeSeriesPlanSerdeTest.java | 4 +- .../pinot/tsdb/spi/series/TimeSeriesTest.java | 54 ++++++++ 18 files changed, 301 insertions(+), 57 deletions(-) create mode 100644 pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/series/TimeSeriesTest.java diff --git a/pinot-common/src/main/java/org/apache/pinot/common/response/PinotBrokerTimeSeriesResponse.java b/pinot-common/src/main/java/org/apache/pinot/common/response/PinotBrokerTimeSeriesResponse.java index 96320b8326a..4a1f347d16a 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/response/PinotBrokerTimeSeriesResponse.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/response/PinotBrokerTimeSeriesResponse.java @@ -118,7 +118,7 @@ private static PinotBrokerTimeSeriesResponse convertBucketedSeriesBlock(TimeSeri for (TimeSeries timeSeries : listOfTimeSeries) { Object[][] values = new Object[timeValues.length][]; for (int i = 0; i < timeValues.length; i++) { - Object nullableValue = timeSeries.getValues()[i]; + Object nullableValue = timeSeries.getDoubleValues()[i]; values[i] = new Object[]{timeValues[i], nullableValue == null ? null : nullableValue.toString()}; } result.add(new PinotBrokerTimeSeriesResponse.Value(metricMap, values)); diff --git a/pinot-core/src/test/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperatorTest.java b/pinot-core/src/test/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperatorTest.java index eea81a4ba16..b6e97c849ff 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperatorTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/operator/timeseries/TimeSeriesAggregationOperatorTest.java @@ -44,7 +44,7 @@ public class TimeSeriesAggregationOperatorTest { private static final Random RANDOM = new Random(); private static final String DUMMY_TIME_COLUMN = "someTimeColumn"; private static final String GROUP_BY_COLUMN = "city"; - private static final AggInfo AGG_INFO = new AggInfo("SUM", Collections.emptyMap()); + private static final AggInfo AGG_INFO = new AggInfo("SUM", false, Collections.emptyMap()); private static final ExpressionContext VALUE_EXPRESSION = ExpressionContext.forIdentifier("someValueColumn"); private static final TimeBuckets TIME_BUCKETS = TimeBuckets.ofSeconds(1000, Duration.ofSeconds(100), 10); private static final int NUM_DOCS_IN_DUMMY_DATA = 1000; diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java index 4a171128c81..0b59468e0d7 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java @@ -223,7 +223,7 @@ public void testTimeSeriesSumQuery() { ExpressionContext valueExpression = ExpressionContext.forIdentifier("orderAmount"); TimeSeriesContext timeSeriesContext = new TimeSeriesContext(TIME_SERIES_LANGUAGE_NAME, TIME_SERIES_TIME_COL_NAME, TimeUnit.SECONDS, timeBuckets, - 0L /* offsetSeconds */, valueExpression, new AggInfo("SUM", null)); + 0L /* offsetSeconds */, valueExpression, new AggInfo("SUM", false, Collections.emptyMap())); QueryContext queryContext = getQueryContextForTimeSeries(timeSeriesContext, Collections.emptyList()); ServerQueryRequest serverQueryRequest = new ServerQueryRequest(queryContext, _segmentNames, new HashMap<>(), ServerMetrics.get()); @@ -232,8 +232,8 @@ public void testTimeSeriesSumQuery() { TimeSeriesResultsBlock resultsBlock = (TimeSeriesResultsBlock) instanceResponse.getResultsBlock(); TimeSeriesBlock timeSeriesBlock = resultsBlock.getTimeSeriesBuilderBlock().build(); assertEquals(timeSeriesBlock.getSeriesMap().size(), 1); - assertNull(timeSeriesBlock.getSeriesMap().values().iterator().next().get(0).getValues()[0]); - assertEquals(timeSeriesBlock.getSeriesMap().values().iterator().next().get(0).getValues()[1], 29885544.0); + assertNull(timeSeriesBlock.getSeriesMap().values().iterator().next().get(0).getDoubleValues()[0]); + assertEquals(timeSeriesBlock.getSeriesMap().values().iterator().next().get(0).getDoubleValues()[1], 29885544.0); } @Test @@ -242,7 +242,7 @@ public void testTimeSeriesMaxQuery() { ExpressionContext valueExpression = ExpressionContext.forIdentifier("orderItemCount"); TimeSeriesContext timeSeriesContext = new TimeSeriesContext(TIME_SERIES_LANGUAGE_NAME, TIME_SERIES_TIME_COL_NAME, TimeUnit.SECONDS, timeBuckets, - 0L /* offsetSeconds */, valueExpression, new AggInfo("MAX", null)); + 0L /* offsetSeconds */, valueExpression, new AggInfo("MAX", false, Collections.emptyMap())); QueryContext queryContext = getQueryContextForTimeSeries(timeSeriesContext); ServerQueryRequest serverQueryRequest = new ServerQueryRequest(queryContext, _segmentNames, new HashMap<>(), ServerMetrics.get()); @@ -260,7 +260,7 @@ public void testTimeSeriesMaxQuery() { assertFalse(foundNewYork, "Found multiple time-series for New York"); foundNewYork = true; Optional maxValue = - Arrays.stream(timeSeries.getValues()).filter(Objects::nonNull).max(Comparator.naturalOrder()); + Arrays.stream(timeSeries.getDoubleValues()).filter(Objects::nonNull).max(Comparator.naturalOrder()); assertTrue(maxValue.isPresent()); assertEquals(maxValue.get().longValue(), 4L); } @@ -274,7 +274,7 @@ public void testTimeSeriesMinQuery() { ExpressionContext valueExpression = ExpressionContext.forIdentifier("orderItemCount"); TimeSeriesContext timeSeriesContext = new TimeSeriesContext(TIME_SERIES_LANGUAGE_NAME, TIME_SERIES_TIME_COL_NAME, TimeUnit.SECONDS, timeBuckets, - 0L /* offsetSeconds */, valueExpression, new AggInfo("MIN", null)); + 0L /* offsetSeconds */, valueExpression, new AggInfo("MIN", false, Collections.emptyMap())); QueryContext queryContext = getQueryContextForTimeSeries(timeSeriesContext); ServerQueryRequest serverQueryRequest = new ServerQueryRequest(queryContext, _segmentNames, new HashMap<>(), ServerMetrics.get()); @@ -292,7 +292,7 @@ public void testTimeSeriesMinQuery() { assertFalse(foundChicago, "Found multiple time-series for Chicago"); foundChicago = true; Optional minValue = - Arrays.stream(timeSeries.getValues()).filter(Objects::nonNull).min(Comparator.naturalOrder()); + Arrays.stream(timeSeries.getDoubleValues()).filter(Objects::nonNull).min(Comparator.naturalOrder()); assertTrue(minValue.isPresent()); assertEquals(minValue.get().longValue(), 0L); } diff --git a/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/M3TimeSeriesPlanner.java b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/M3TimeSeriesPlanner.java index 53844048a79..42515083c0d 100644 --- a/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/M3TimeSeriesPlanner.java +++ b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/M3TimeSeriesPlanner.java @@ -20,6 +20,7 @@ import com.google.common.base.Preconditions; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Locale; import java.util.concurrent.TimeUnit; @@ -84,7 +85,7 @@ public BaseTimeSeriesPlanNode planQuery(RangeTimeSeriesRequest request) { case "max": Preconditions.checkState(commandId == 1, "Aggregation should be the second command (fetch should be first)"); Preconditions.checkState(aggInfo == null, "Aggregation already set. Only single agg allowed."); - aggInfo = new AggInfo(command.toUpperCase(Locale.ENGLISH), null); + aggInfo = new AggInfo(command.toUpperCase(Locale.ENGLISH), false, Collections.emptyMap()); if (commands.get(commandId).size() > 1) { String[] cols = commands.get(commandId).get(1).split(","); groupByColumns = Stream.of(cols).map(String::trim).collect(Collectors.toList()); diff --git a/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/operator/KeepLastValueOperator.java b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/operator/KeepLastValueOperator.java index 0330dff13b1..cef90b69af0 100644 --- a/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/operator/KeepLastValueOperator.java +++ b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/operator/KeepLastValueOperator.java @@ -34,7 +34,7 @@ public TimeSeriesBlock getNextBlock() { TimeSeriesBlock seriesBlock = _childOperators.get(0).nextBlock(); seriesBlock.getSeriesMap().values().parallelStream().forEach(unionOfSeries -> { for (TimeSeries series : unionOfSeries) { - Double[] values = series.getValues(); + Double[] values = series.getDoubleValues(); Double lastValue = null; for (int index = 0; index < values.length; index++) { if (values[index] != null) { diff --git a/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/operator/TransformNullOperator.java b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/operator/TransformNullOperator.java index ca971c932cb..661e4de4980 100644 --- a/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/operator/TransformNullOperator.java +++ b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/operator/TransformNullOperator.java @@ -37,7 +37,7 @@ public TimeSeriesBlock getNextBlock() { TimeSeriesBlock seriesBlock = _childOperators.get(0).nextBlock(); seriesBlock.getSeriesMap().values().parallelStream().forEach(unionOfSeries -> { for (TimeSeries series : unionOfSeries) { - Double[] values = series.getValues(); + Double[] values = series.getDoubleValues(); for (int index = 0; index < values.length; index++) { values[index] = values[index] == null ? _defaultValue : values[index]; } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/serde/TimeSeriesBlockSerde.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/serde/TimeSeriesBlockSerde.java index cdbf668123b..5978e295072 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/serde/TimeSeriesBlockSerde.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/serde/TimeSeriesBlockSerde.java @@ -18,10 +18,12 @@ */ package org.apache.pinot.query.runtime.timeseries.serde; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.protobuf.ByteString; import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.time.Duration; import java.util.ArrayList; import java.util.HashMap; @@ -29,10 +31,14 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; +import javax.annotation.Nullable; +import org.apache.commons.codec.DecoderException; +import org.apache.commons.codec.binary.Hex; import org.apache.pinot.common.datablock.DataBlock; import org.apache.pinot.common.datablock.DataBlockUtils; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.common.utils.DataSchema.ColumnDataType; +import org.apache.pinot.core.common.datablock.DataBlockBuilder; import org.apache.pinot.query.runtime.blocks.TransferableBlock; import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils; import org.apache.pinot.tsdb.spi.TimeBuckets; @@ -51,7 +57,7 @@ * the last column. As an example, consider the following, where FBV represents the first bucket value of TimeBuckets. *
  *     +-------------+------------+-------------+---------------------------------+
- *     | tag-0       | tag-1      | tag-n       | values                          |
+ *     | tag-0       | tag-1      | tag-n       | values (String[] or double[])  |
  *     +-------------+------------+-------------+---------------------------------+
  *     | null        | null       | null        | [FBV, bucketSize, numBuckets]   |
  *     +-------------+------------+-------------+---------------------------------+
@@ -74,6 +80,7 @@ public class TimeSeriesBlockSerde {
    * Using Double.MIN_VALUE is better than using Double.NaN since Double.NaN can help detect divide by 0.
    * TODO(timeseries): Check if we can get rid of boxed Doubles altogether.
    */
+  private static final String VALUES_COLUMN_NAME = "__ts_serde_values";
   private static final double NULL_PLACEHOLDER = Double.MIN_VALUE;
 
   private TimeSeriesBlockSerde() {
@@ -85,12 +92,13 @@ public static TimeSeriesBlock deserializeTimeSeriesBlock(ByteBuffer readOnlyByte
     TransferableBlock transferableBlock = TransferableBlockUtils.wrap(dataBlock);
     List tagNames = generateTagNames(Objects.requireNonNull(transferableBlock.getDataSchema(),
         "Missing data schema in TransferableBlock"));
+    final DataSchema dataSchema = transferableBlock.getDataSchema();
     List container = transferableBlock.getContainer();
-    TimeBuckets timeBuckets = timeBucketsFromRow(container.get(0));
+    TimeBuckets timeBuckets = timeBucketsFromRow(container.get(0), dataSchema);
     Map> seriesMap = new HashMap<>();
     for (int index = 1; index < container.size(); index++) {
       Object[] row = container.get(index);
-      TimeSeries timeSeries = timeSeriesFromRow(tagNames, row, timeBuckets);
+      TimeSeries timeSeries = timeSeriesFromRow(tagNames, row, timeBuckets, dataSchema);
       long seriesId = Long.parseLong(timeSeries.getId());
       seriesMap.computeIfAbsent(seriesId, x -> new ArrayList<>()).add(timeSeries);
     }
@@ -112,17 +120,77 @@ public static ByteString serializeTimeSeriesBlock(TimeSeriesBlock timeSeriesBloc
     return DataBlockUtils.toByteString(transferableBlock.getDataBlock());
   }
 
+  /**
+   * This method is only used for encoding time-bucket-values to byte arrays, when the TimeSeries value type
+   * is byte[][].
+   */
+  @VisibleForTesting
+  static byte[][] toBytesArray(double[] values) {
+    byte[][] result = new byte[values.length][8];
+    for (int index = 0; index < values.length; index++) {
+      ByteBuffer byteBuffer = ByteBuffer.wrap(result[index]);
+      byteBuffer.order(ByteOrder.LITTLE_ENDIAN);
+      byteBuffer.putDouble(values[index]);
+    }
+    return result;
+  }
+
+  /**
+   * This method is only used for decoding time-bucket-values from byte arrays, when the TimeSeries value type
+   * is byte[][].
+   */
+  @VisibleForTesting
+  static double[] fromBytesArray(byte[][] bytes) {
+    double[] result = new double[bytes.length];
+    for (int index = 0; index < bytes.length; index++) {
+      ByteBuffer byteBuffer = ByteBuffer.wrap(bytes[index]);
+      byteBuffer.order(ByteOrder.LITTLE_ENDIAN);
+      result[index] = byteBuffer.getDouble();
+    }
+    return result;
+  }
+
+  /**
+   * Since {@link DataBlockBuilder} does not support {@link ColumnDataType#BYTES_ARRAY}, we have to encode the
+   * transmitted bytes as Hex to use String[].
+   */
+  @VisibleForTesting
+  static String[] encodeAsHex(byte[][] byteValues) {
+    String[] result = new String[byteValues.length];
+    for (int index = 0; index < result.length; index++) {
+      result[index] = Hex.encodeHexString(byteValues[index]);
+    }
+    return result;
+  }
+
+  /**
+   * Used for decoding Hex strings. See {@link TimeSeriesBlockSerde#encodeAsHex} for more.
+   */
+  @VisibleForTesting
+  static byte[][] decodeFromHex(String[] hexEncodedValues) {
+    byte[][] result = new byte[hexEncodedValues.length][];
+    for (int index = 0; index < hexEncodedValues.length; index++) {
+      try {
+        result[index] = Hex.decodeHex(hexEncodedValues[index]);
+      } catch (DecoderException e) {
+        throw new RuntimeException("Error decoding byte[] value from encoded hex string", e);
+      }
+    }
+    return result;
+  }
+
   private static DataSchema generateDataSchema(TimeSeriesBlock timeSeriesBlock) {
     TimeSeries sampledTimeSeries = sampleTimeSeries(timeSeriesBlock).orElse(null);
     int numTags = sampledTimeSeries == null ? 0 : sampledTimeSeries.getTagNames().size();
     ColumnDataType[] dataTypes = new ColumnDataType[numTags + 1];
+    final ColumnDataType valueDataType = inferValueDataType(sampledTimeSeries);
     String[] columnNames = new String[numTags + 1];
     for (int tagIndex = 0; tagIndex < numTags; tagIndex++) {
       columnNames[tagIndex] = sampledTimeSeries.getTagNames().get(tagIndex);
       dataTypes[tagIndex] = ColumnDataType.STRING;
     }
-    columnNames[numTags] = "__ts_values";
-    dataTypes[numTags] = ColumnDataType.DOUBLE_ARRAY;
+    columnNames[numTags] = VALUES_COLUMN_NAME;
+    dataTypes[numTags] = valueDataType;
     return new DataSchema(columnNames, dataTypes);
   }
 
@@ -144,6 +212,14 @@ private static Optional sampleTimeSeries(TimeSeriesBlock timeSeriesB
     return Optional.of(timeSeriesList.get(0));
   }
 
+  private static ColumnDataType inferValueDataType(@Nullable TimeSeries timeSeries) {
+    if (timeSeries == null || timeSeries.getValues() instanceof Double[]) {
+      return ColumnDataType.DOUBLE_ARRAY;
+    }
+    // Byte values are encoded as hex array
+    return ColumnDataType.STRING_ARRAY;
+  }
+
   private static Object[] timeBucketsToRow(TimeBuckets timeBuckets, DataSchema dataSchema) {
     int numColumns = dataSchema.getColumnNames().length;
     Object[] result = new Object[numColumns];
@@ -153,12 +229,27 @@ private static Object[] timeBucketsToRow(TimeBuckets timeBuckets, DataSchema dat
     double firstBucketValue = timeBuckets.getTimeBuckets()[0];
     double bucketSizeSeconds = timeBuckets.getBucketSize().getSeconds();
     double numBuckets = timeBuckets.getNumBuckets();
-    result[numColumns - 1] = new double[]{firstBucketValue, bucketSizeSeconds, numBuckets};
+    final ColumnDataType valuesDataType = dataSchema.getColumnDataTypes()[numColumns - 1];
+    final double[] bucketsEncodedAsDouble = new double[]{firstBucketValue, bucketSizeSeconds, numBuckets};
+    if (valuesDataType == ColumnDataType.DOUBLE_ARRAY) {
+      result[numColumns - 1] = bucketsEncodedAsDouble;
+    } else {
+      Preconditions.checkState(valuesDataType == ColumnDataType.STRING_ARRAY,
+          "Expected bytes_array column type. Found: %s", valuesDataType);
+      result[numColumns - 1] = encodeAsHex(toBytesArray(bucketsEncodedAsDouble));
+    }
     return result;
   }
 
-  private static TimeBuckets timeBucketsFromRow(Object[] row) {
-    double[] values = (double[]) row[row.length - 1];
+  private static TimeBuckets timeBucketsFromRow(Object[] row, DataSchema dataSchema) {
+    int numColumns = dataSchema.getColumnDataTypes().length;
+    double[] values;
+    if (dataSchema.getColumnDataTypes()[numColumns - 1] == ColumnDataType.STRING_ARRAY) {
+      byte[][] byteValues = decodeFromHex((String[]) row[row.length - 1]);
+      values = fromBytesArray(byteValues);
+    } else {
+      values = (double[]) row[row.length - 1];
+    }
     long fbv = (long) values[0];
     Duration window = Duration.ofSeconds((long) values[1]);
     int numBuckets = (int) values[2];
@@ -172,14 +263,25 @@ private static Object[] timeSeriesToRow(TimeSeries timeSeries, DataSchema dataSc
       Object tagValue = timeSeries.getTagValues()[index];
       result[index] = tagValue == null ? "null" : tagValue.toString();
     }
-    result[numColumns - 1] = unboxDoubleArray(timeSeries.getValues());
+    if (dataSchema.getColumnDataTypes()[numColumns - 1] == ColumnDataType.DOUBLE_ARRAY) {
+      result[numColumns - 1] = unboxDoubleArray(timeSeries.getDoubleValues());
+    } else {
+      result[numColumns - 1] = encodeAsHex(timeSeries.getBytesValues());
+    }
     return result;
   }
 
-  private static TimeSeries timeSeriesFromRow(List tagNames, Object[] row, TimeBuckets timeBuckets) {
-    Double[] values = boxDoubleArray((double[]) row[row.length - 1]);
+  private static TimeSeries timeSeriesFromRow(List tagNames, Object[] row, TimeBuckets timeBuckets,
+      DataSchema dataSchema) {
+    int numColumns = dataSchema.getColumnDataTypes().length;
     Object[] tagValues = new Object[row.length - 1];
     System.arraycopy(row, 0, tagValues, 0, row.length - 1);
+    Object[] values;
+    if (dataSchema.getColumnDataTypes()[numColumns - 1] == ColumnDataType.DOUBLE_ARRAY) {
+      values = boxDoubleArray((double[]) row[row.length - 1]);
+    } else {
+      values = decodeFromHex((String[]) row[row.length - 1]);
+    }
     return new TimeSeries(Long.toString(TimeSeries.hash(tagValues)), null, timeBuckets, values, tagNames, tagValues);
   }
 
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesServerPlanVisitorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesServerPlanVisitorTest.java
index e85d17cf6cc..43b3496dfb8 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesServerPlanVisitorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesServerPlanVisitorTest.java
@@ -45,7 +45,7 @@ public void testCompileQueryContext() {
     final String planId = "id";
     final String tableName = "orderTable";
     final String timeColumn = "orderTime";
-    final AggInfo aggInfo = new AggInfo("SUM", null);
+    final AggInfo aggInfo = new AggInfo("SUM", false, Collections.emptyMap());
     final String filterExpr = "cityName = 'Chicago'";
     PhysicalTimeSeriesServerPlanVisitor serverPlanVisitor = new PhysicalTimeSeriesServerPlanVisitor(
         mock(QueryExecutor.class), mock(ExecutorService.class), mock(ServerMetrics.class));
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesExchangeReceiveOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesExchangeReceiveOperatorTest.java
index c9fd9293335..5a9079de2cd 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesExchangeReceiveOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesExchangeReceiveOperatorTest.java
@@ -39,7 +39,7 @@
 
 public class TimeSeriesExchangeReceiveOperatorTest {
   private static final int NUM_SERVERS_QUERIED = 3;
-  private static final AggInfo SUM_AGG_INFO = new AggInfo("SUM", null);
+  private static final AggInfo SUM_AGG_INFO = new AggInfo("SUM", false, Collections.emptyMap());
   private static final TimeBuckets TIME_BUCKETS = TimeBuckets.ofSeconds(1000, Duration.ofSeconds(200), 4);
   private static final List TAG_NAMES = ImmutableList.of("city", "zip");
   private static final Object[] CHICAGO_SERIES_VALUES = new Object[]{"Chicago", "60605"};
@@ -65,10 +65,10 @@ public void testGetNextBlockWithAggregation() {
     assertEquals(block.getSeriesMap().get(CHICAGO_SERIES_HASH).size(), 1, "Expected 1 series for Chicago");
     assertEquals(block.getSeriesMap().get(SF_SERIES_HASH).size(), 1, "Expected 1 series for SF");
     // Ensure Chicago had series addition performed
-    Double[] chicagoSeriesValues = block.getSeriesMap().get(CHICAGO_SERIES_HASH).get(0).getValues();
+    Double[] chicagoSeriesValues = block.getSeriesMap().get(CHICAGO_SERIES_HASH).get(0).getDoubleValues();
     assertEquals(chicagoSeriesValues, new Double[]{20.0, 20.0, 20.0, 20.0});
     // Ensure SF had input series unmodified
-    Double[] sanFranciscoSeriesValues = block.getSeriesMap().get(SF_SERIES_HASH).get(0).getValues();
+    Double[] sanFranciscoSeriesValues = block.getSeriesMap().get(SF_SERIES_HASH).get(0).getDoubleValues();
     assertEquals(sanFranciscoSeriesValues, new Double[]{10.0, 10.0, 10.0, 10.0});
   }
 
@@ -89,12 +89,12 @@ public void testGetNextBlockNoAggregation() {
     assertEquals(block.getSeriesMap().get(CHICAGO_SERIES_HASH).size(), 2, "Expected 2 series for Chicago");
     assertEquals(block.getSeriesMap().get(SF_SERIES_HASH).size(), 1, "Expected 1 series for SF");
     // Ensure Chicago has unmodified series values
-    Double[] firstChicagoSeriesValues = block.getSeriesMap().get(CHICAGO_SERIES_HASH).get(0).getValues();
-    Double[] secondChicagoSeriesValues = block.getSeriesMap().get(CHICAGO_SERIES_HASH).get(1).getValues();
+    Double[] firstChicagoSeriesValues = block.getSeriesMap().get(CHICAGO_SERIES_HASH).get(0).getDoubleValues();
+    Double[] secondChicagoSeriesValues = block.getSeriesMap().get(CHICAGO_SERIES_HASH).get(1).getDoubleValues();
     assertEquals(firstChicagoSeriesValues, new Double[]{10.0, 10.0, 10.0, 10.0});
     assertEquals(secondChicagoSeriesValues, new Double[]{10.0, 10.0, 10.0, 10.0});
     // Ensure SF has input unmodified series values
-    Double[] sanFranciscoSeriesValues = block.getSeriesMap().get(SF_SERIES_HASH).get(0).getValues();
+    Double[] sanFranciscoSeriesValues = block.getSeriesMap().get(SF_SERIES_HASH).get(0).getDoubleValues();
     assertEquals(sanFranciscoSeriesValues, new Double[]{10.0, 10.0, 10.0, 10.0});
   }
 
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/serde/TimeSeriesBlockSerdeTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/serde/TimeSeriesBlockSerdeTest.java
index f08d39ca0a9..d488d8fbd01 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/serde/TimeSeriesBlockSerdeTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/timeseries/serde/TimeSeriesBlockSerdeTest.java
@@ -47,7 +47,7 @@ public void testSerde()
     // 4. Compare ByteString-1 and ByteString-2.
     // 5. Compare values of Block-1 and Block-2.
     List blocks = List.of(buildBlockWithNoTags(), buildBlockWithSingleTag(),
-        buildBlockWithMultipleTags());
+        buildBlockWithMultipleTags(), buildBlockWithByteValues());
     for (TimeSeriesBlock block1 : blocks) {
       // Serialize, deserialize and serialize again
       ByteString byteString1 = TimeSeriesBlockSerde.serializeTimeSeriesBlock(block1);
@@ -61,6 +61,31 @@ public void testSerde()
     }
   }
 
+  @Test
+  public void testFromToBytesArray() {
+    // Encode and decode a double[] array to confirm the values turn out to be the same.
+    double[][] inputs = new double[][]{
+        {131.0, 1.31, 0.0},
+        {1.0, 1231.0, 1.0}
+    };
+    for (double[] input : inputs) {
+      byte[][] encodedBytes = TimeSeriesBlockSerde.toBytesArray(input);
+      double[] decodedValues = TimeSeriesBlockSerde.fromBytesArray(encodedBytes);
+      assertEquals(decodedValues, input);
+    }
+  }
+
+  @Test
+  public void testFromToHex() {
+    byte[][] input = new byte[][]{
+        {0x1a}, {0x00}, {0x77}, {Byte.MIN_VALUE},
+        {Byte.MAX_VALUE}, {0x13}, {0x19}, {0x77}
+    };
+    String[] encodedValues = TimeSeriesBlockSerde.encodeAsHex(input);
+    byte[][] decodedValues = TimeSeriesBlockSerde.decodeFromHex(encodedValues);
+    assertEquals(decodedValues, input);
+  }
+
   /**
    * Compares time series blocks in a way which makes it easy to debug test failures when/if they happen in CI.
    */
@@ -132,4 +157,20 @@ private static TimeSeriesBlock buildBlockWithMultipleTags() {
         new Double[]{Double.NaN, -1.0, -1231231.0, 3.14}, tagNames, seriesTwoValues)));
     return new TimeSeriesBlock(timeBuckets, seriesMap);
   }
+
+  private static TimeSeriesBlock buildBlockWithByteValues() {
+    TimeBuckets timeBuckets = TIME_BUCKETS;
+    // Series are: [cityId=Chicago, zip=60605] and [cityId=San Francisco, zip=94107]
+    List tagNames = ImmutableList.of("cityId", "zip");
+    Object[] seriesOneValues = new Object[]{"Chicago", "60605"};
+    Object[] seriesTwoValues = new Object[]{"San Francisco", "94107"};
+    long seriesOneHash = TimeSeries.hash(seriesOneValues);
+    long seriesTwoHash = TimeSeries.hash(seriesTwoValues);
+    Map> seriesMap = new HashMap<>();
+    seriesMap.put(seriesOneHash, ImmutableList.of(new TimeSeries(Long.toString(seriesOneHash), null, timeBuckets,
+        new byte[][]{{0x13}, {0x1b}, {0x12}, {0x00}}, tagNames, seriesOneValues)));
+    seriesMap.put(seriesTwoHash, ImmutableList.of(new TimeSeries(Long.toString(seriesTwoHash), null, timeBuckets,
+        new byte[][]{{0x00}, {0x00}, {Byte.MIN_VALUE}, {0x7f}}, tagNames, seriesTwoValues)));
+    return new TimeSeriesBlock(timeBuckets, seriesMap);
+  }
 }
diff --git a/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/TimeSeriesPlanFragmenter.java b/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/TimeSeriesPlanFragmenter.java
index 46a3f68c31d..32287f4d834 100644
--- a/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/TimeSeriesPlanFragmenter.java
+++ b/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/TimeSeriesPlanFragmenter.java
@@ -18,10 +18,12 @@
  */
 package org.apache.pinot.tsdb.planner;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import org.apache.pinot.tsdb.spi.AggInfo;
 import org.apache.pinot.tsdb.spi.plan.BaseTimeSeriesPlanNode;
 import org.apache.pinot.tsdb.spi.plan.LeafTimeSeriesPlanNode;
 
@@ -102,8 +104,15 @@ public static List getFragments(BaseTimeSeriesPlanNode r
   private static BaseTimeSeriesPlanNode fragmentRecursively(BaseTimeSeriesPlanNode planNode, Context context) {
     if (planNode instanceof LeafTimeSeriesPlanNode) {
       LeafTimeSeriesPlanNode leafNode = (LeafTimeSeriesPlanNode) planNode;
-      context._fragments.add(leafNode.withInputs(Collections.emptyList()));
-      return new TimeSeriesExchangeNode(planNode.getId(), Collections.emptyList(), leafNode.getAggInfo());
+      AggInfo currentAggInfo = leafNode.getAggInfo();
+      if (currentAggInfo == null) {
+        context._fragments.add(leafNode.withInputs(Collections.emptyList()));
+      } else {
+        Preconditions.checkState(!currentAggInfo.getIsPartial(),
+            "Leaf node in the logical plan should not have partial agg");
+        context._fragments.add(leafNode.withAggInfo(currentAggInfo.withPartialAggregation()));
+      }
+      return new TimeSeriesExchangeNode(planNode.getId(), Collections.emptyList(), currentAggInfo);
     }
     List newInputs = new ArrayList<>();
     for (BaseTimeSeriesPlanNode input : planNode.getInputs()) {
diff --git a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/AggInfo.java b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/AggInfo.java
index 0dc3e0502de..33b66bff1f7 100644
--- a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/AggInfo.java
+++ b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/AggInfo.java
@@ -23,7 +23,6 @@
 import com.google.common.base.Preconditions;
 import java.util.Collections;
 import java.util.Map;
-import javax.annotation.Nullable;
 
 
 /**
@@ -41,24 +40,47 @@
  * Example usage:
  * Map params = new HashMap<>();
  * params.put("window", "5m");
- * AggInfo aggInfo = new AggInfo("rate", params);
+ * AggInfo aggInfo = new AggInfo("rate", true, params);
  */
 public class AggInfo {
   private final String _aggFunction;
+  /**
+   * Denotes whether an aggregate is partial or full. When returning the logical plan, language developers must not
+   * set this to true. This is used during Physical planning, and Pinot may set this to true if the corresponding
+   * aggregate node is not guaranteed to have the full data. In such cases, the physical plan will always add a
+   * complimentary full aggregate.
+   * 

+ * TODO(timeseries): Ideally we should remove this from the logical plan completely. + *

+ */ + private final boolean _isPartial; private final Map _params; @JsonCreator - public AggInfo(@JsonProperty("aggFunction") String aggFunction, - @JsonProperty("params") @Nullable Map params) { + public AggInfo(@JsonProperty("aggFunction") String aggFunction, @JsonProperty("isPartial") boolean isPartial, + @JsonProperty("params") Map params) { Preconditions.checkNotNull(aggFunction, "Received null aggFunction in AggInfo"); _aggFunction = aggFunction; + _isPartial = isPartial; _params = params != null ? params : Collections.emptyMap(); } + public AggInfo withPartialAggregation() { + return new AggInfo(_aggFunction, true, _params); + } + + public AggInfo withFullAggregation() { + return new AggInfo(_aggFunction, false, _params); + } + public String getAggFunction() { return _aggFunction; } + public boolean getIsPartial() { + return _isPartial; + } + public Map getParams() { return Collections.unmodifiableMap(_params); } diff --git a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNode.java b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNode.java index 1986f4713d2..3deb4c68e68 100644 --- a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNode.java +++ b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNode.java @@ -64,6 +64,11 @@ public LeafTimeSeriesPlanNode( _groupByExpressions = groupByExpressions; } + public LeafTimeSeriesPlanNode withAggInfo(AggInfo newAggInfo) { + return new LeafTimeSeriesPlanNode(_id, _inputs, _tableName, _timeColumn, _timeUnit, _offsetSeconds, + _filterExpression, _valueExpression, newAggInfo, _groupByExpressions); + } + @Override public BaseTimeSeriesPlanNode withInputs(List newInputs) { return new LeafTimeSeriesPlanNode(_id, newInputs, _tableName, _timeColumn, _timeUnit, _offsetSeconds, diff --git a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/BaseTimeSeriesBuilder.java b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/BaseTimeSeriesBuilder.java index 20ac1714a8f..9cca55ebcbb 100644 --- a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/BaseTimeSeriesBuilder.java +++ b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/BaseTimeSeriesBuilder.java @@ -19,7 +19,6 @@ package org.apache.pinot.tsdb.spi.series; import java.util.List; -import java.util.Objects; import javax.annotation.Nullable; import org.apache.pinot.tsdb.spi.TimeBuckets; @@ -61,19 +60,14 @@ public void addValueAtIndex(int timeBucketIndex, String value) { public abstract void addValue(long timeValue, Double value); - public void mergeSeries(TimeSeries series) { - int numDataPoints = series.getValues().length; - Long[] timeValues = Objects.requireNonNull(series.getTimeValues(), - "Cannot merge series: found null timeValues"); - for (int i = 0; i < numDataPoints; i++) { - addValue(timeValues[i], series.getValues()[i]); - } - } - + /** + * Assumes Double[] values and attempts to merge the given series with this builder. Implementations are + * recommended to override this to either optimize, or add bytes[][] values from the input Series. + */ public void mergeAlignedSeries(TimeSeries series) { - int numDataPoints = series.getValues().length; + int numDataPoints = series.getDoubleValues().length; for (int i = 0; i < numDataPoints; i++) { - addValueAtIndex(i, series.getValues()[i]); + addValueAtIndex(i, series.getDoubleValues()[i]); } } diff --git a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/TimeSeries.java b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/TimeSeries.java index 55e2a9a7302..4a2e452116e 100644 --- a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/TimeSeries.java +++ b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/series/TimeSeries.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.tsdb.spi.series; +import com.google.common.base.Preconditions; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -67,12 +68,16 @@ public class TimeSeries { private final String _id; private final Long[] _timeValues; private final TimeBuckets _timeBuckets; - private final Double[] _values; + private final Object[] _values; private final List _tagNames; private final Object[] _tagValues; - public TimeSeries(String id, @Nullable Long[] timeValues, @Nullable TimeBuckets timeBuckets, Double[] values, + // TODO(timeseries): Time series may also benefit from storing extremal/outlier value traces, similar to Monarch. + // TODO(timeseries): It may make sense to allow types other than Double and byte[] arrays. + public TimeSeries(String id, @Nullable Long[] timeValues, @Nullable TimeBuckets timeBuckets, Object[] values, List tagNames, Object[] tagValues) { + Preconditions.checkArgument(values instanceof Double[] || values instanceof byte[][], + "Time Series can only take Double[] or byte[][] values"); _id = id; _timeValues = timeValues; _timeBuckets = timeBuckets; @@ -95,10 +100,18 @@ public TimeBuckets getTimeBuckets() { return _timeBuckets; } - public Double[] getValues() { + public Object[] getValues() { return _values; } + public Double[] getDoubleValues() { + return (Double[]) _values; + } + + public byte[][] getBytesValues() { + return (byte[][]) _values; + } + public List getTagNames() { return _tagNames; } diff --git a/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNodeTest.java b/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNodeTest.java index 011cb6fbc63..d326ed49b58 100644 --- a/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNodeTest.java +++ b/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNodeTest.java @@ -44,7 +44,7 @@ public void testGetEffectiveFilter() { { LeafTimeSeriesPlanNode planNode = new LeafTimeSeriesPlanNode(ID, Collections.emptyList(), TABLE, TIME_COLUMN, TIME_UNIT, 0L, "", "value_col", - new AggInfo("SUM", null), Collections.singletonList("cityName")); + new AggInfo("SUM", false, null), Collections.singletonList("cityName")); assertEquals(planNode.getEffectiveFilter(timeBuckets), "orderTime > " + expectedStartTimeInFilter + " AND orderTime <= " + expectedEndTimeInFilter); } @@ -52,7 +52,7 @@ public void testGetEffectiveFilter() { { LeafTimeSeriesPlanNode planNode = new LeafTimeSeriesPlanNode(ID, Collections.emptyList(), TABLE, TIME_COLUMN, TIME_UNIT, 123L, "", "value_col", - new AggInfo("SUM", null), Collections.singletonList("cityName")); + new AggInfo("SUM", false, null), Collections.singletonList("cityName")); assertEquals(planNode.getEffectiveFilter(timeBuckets), "orderTime > " + (expectedStartTimeInFilter - 123) + " AND orderTime <= " + (expectedEndTimeInFilter - 123)); } @@ -60,7 +60,7 @@ public void testGetEffectiveFilter() { { LeafTimeSeriesPlanNode planNode = new LeafTimeSeriesPlanNode(ID, Collections.emptyList(), TABLE, TIME_COLUMN, TIME_UNIT, 123L, nonEmptyFilter, - "value_col", new AggInfo("SUM", null), Collections.singletonList("cityName")); + "value_col", new AggInfo("SUM", false, Collections.emptyMap()), Collections.singletonList("cityName")); assertEquals(planNode.getEffectiveFilter(timeBuckets), String.format("(%s) AND (orderTime > %s AND orderTime <= %s)", nonEmptyFilter, (expectedStartTimeInFilter - 123), (expectedEndTimeInFilter - 123))); @@ -69,7 +69,8 @@ public void testGetEffectiveFilter() { { LeafTimeSeriesPlanNode planNode = new LeafTimeSeriesPlanNode(ID, Collections.emptyList(), TABLE, TIME_COLUMN, TimeUnit.MILLISECONDS, 123L, - nonEmptyFilter, "value_col", new AggInfo("SUM", null), Collections.singletonList("cityName")); + nonEmptyFilter, "value_col", new AggInfo("SUM", false, Collections.emptyMap()), + Collections.singletonList("cityName")); assertEquals(planNode.getEffectiveFilter(timeBuckets), String.format("(%s) AND (orderTime > %s AND orderTime <= %s)", nonEmptyFilter, (expectedStartTimeInFilter * 1000 - 123 * 1000), (expectedEndTimeInFilter * 1000 - 123 * 1000))); diff --git a/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/serde/TimeSeriesPlanSerdeTest.java b/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/serde/TimeSeriesPlanSerdeTest.java index 4bd5c37a5ae..71bf2323fdb 100644 --- a/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/serde/TimeSeriesPlanSerdeTest.java +++ b/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/plan/serde/TimeSeriesPlanSerdeTest.java @@ -28,6 +28,7 @@ import org.testng.annotations.Test; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; @@ -40,7 +41,7 @@ public void testSerdeForScanFilterProjectNode() { LeafTimeSeriesPlanNode leafTimeSeriesPlanNode = new LeafTimeSeriesPlanNode("sfp#0", new ArrayList<>(), "myTable", "myTimeColumn", TimeUnit.MILLISECONDS, 0L, - "myFilterExpression", "myValueExpression", new AggInfo("SUM", aggParams), new ArrayList<>()); + "myFilterExpression", "myValueExpression", new AggInfo("SUM", false, aggParams), new ArrayList<>()); BaseTimeSeriesPlanNode planNode = TimeSeriesPlanSerde.deserialize(TimeSeriesPlanSerde.serialize(leafTimeSeriesPlanNode)); assertTrue(planNode instanceof LeafTimeSeriesPlanNode); @@ -52,6 +53,7 @@ public void testSerdeForScanFilterProjectNode() { assertEquals(deserializedNode.getFilterExpression(), "myFilterExpression"); assertEquals(deserializedNode.getValueExpression(), "myValueExpression"); assertNotNull(deserializedNode.getAggInfo()); + assertFalse(deserializedNode.getAggInfo().getIsPartial()); assertNotNull(deserializedNode.getAggInfo().getParams()); assertEquals(deserializedNode.getAggInfo().getParams().get("window"), "5m"); assertEquals(deserializedNode.getGroupByExpressions().size(), 0); diff --git a/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/series/TimeSeriesTest.java b/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/series/TimeSeriesTest.java new file mode 100644 index 00000000000..db651785e8d --- /dev/null +++ b/pinot-timeseries/pinot-timeseries-spi/src/test/java/org/apache/pinot/tsdb/spi/series/TimeSeriesTest.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.tsdb.spi.series; + +import java.time.Duration; +import java.util.Collections; +import org.apache.pinot.tsdb.spi.TimeBuckets; +import org.testng.annotations.Test; + +import static org.testng.Assert.*; + + +public class TimeSeriesTest { + private static final TimeBuckets TIME_BUCKETS = TimeBuckets.ofSeconds(100, Duration.ofSeconds(10), 10); + + @Test + public void testTimeSeriesAcceptsDoubleValues() { + Double[] values = new Double[10]; + TimeSeries timeSeries = new TimeSeries("anything", null, TIME_BUCKETS, values, Collections.emptyList(), + new Object[0]); + assertEquals(timeSeries.getDoubleValues(), values); + } + + @Test + public void testTimeSeriesAcceptsBytesValues() { + byte[][] byteValues = new byte[10][1231]; + TimeSeries timeSeries = new TimeSeries("anything", null, TIME_BUCKETS, byteValues, Collections.emptyList(), + new Object[0]); + assertEquals(timeSeries.getBytesValues(), byteValues); + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void testTimeSeriesDeniesWhenValuesNotDoubleOrBytes() { + Object[] someValues = new Long[10]; + TimeSeries timeSeries = new TimeSeries("anything", null, TIME_BUCKETS, someValues, Collections.emptyList(), + new Object[0]); + } +}