diff --git a/pom.xml b/pom.xml
index 74a87f6cbe563..341963bf6b2c0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -67,6 +67,7 @@
1.9.17
2.7.0
2.3.1
+ 0.9.3
0.19.0
2.3.1
0.9.0
@@ -1676,6 +1677,219 @@
+
+ org.apache.pinot
+ presto-pinot-driver
+ ${dep.pinot.version}
+ shaded
+
+
+ org.apache.pinot
+ pinot-spi
+
+
+ org.apache.pinot
+ pinot-common
+
+
+ org.apache.pinot
+ pinot-core
+
+
+ org.apache.pinot
+ pinot-segment-spi
+
+
+ org.apache.pinot
+ pinot-segment-local
+
+
+ org.apache.pinot
+ pinot-yammer
+
+
+ io.grpc
+ grpc-api
+
+
+ io.grpc
+ grpc-context
+
+
+ org.codehaus.mojo
+ animal-sniffer-annotations
+
+
+ com.google.protobuf
+ protobuf-java
+
+
+ commons-codec
+ commons-codec
+
+
+ commons-codec
+ commons-lang3
+
+
+ org.apache.commons
+ commons-lang3
+
+
+ org.apache.httpcomponents
+ httpclient
+
+
+ org.apache.thrift
+ libthrift
+
+
+ com.github.luben
+ zstd-jni
+
+
+ org.glassfish.jersey.core
+ jersey-common
+
+
+ com.google.errorprone
+ error_prone_annotations
+
+
+ org.checkerframework
+ checker-compat-qual
+
+
+ org.reflections
+ reflections
+
+
+ org.slf4j
+ slf4j-api
+
+
+ jline
+ jline
+
+
+ org.slf4j
+ slf4j-log4j12
+
+
+ org.antlr
+ antlr4-annotations
+
+
+ org.apache.kafka
+ kafka-clients
+
+
+ org.codehaus.jackson
+ jackson-mapper-asl
+
+
+ org.apache.kafka
+ kafka_2.10
+
+
+ commons-beanutils
+ commons-beanutils-core
+
+
+ log4j
+ log4j
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+
+
+ com.fasterxml.jackson.core
+ jackson-core
+
+
+ com.fasterxml.jackson.core
+ jackson-annotations
+
+
+ javax.validation
+ validation-api
+
+
+ javax.servlet
+ javax.servlet-api
+
+
+ org.glassfish.hk2.external
+ jakarta.inject
+
+
+ jakarta.ws.rs
+ jakarta.ws.rs-api
+
+
+ jakarta.annotation
+ jakarta.annotation-api
+
+
+ com.google.guava
+ guava
+
+
+ joda-time
+ joda-time
+
+
+ org.apache.httpcomponents
+ httpcore
+
+
+ org.apache.zookeeper
+ zookeeper
+
+
+ commons-codec
+ commons-codec
+
+
+ org.slf4j
+ jcl-over-slf4j
+
+
+ javax.servlet
+ servlet-api
+
+
+ com.clearspring.analytics
+ stream
+
+
+ com.tdunning
+ t-digest
+
+
+ org.apache.datasketches
+ datasketches-java
+
+
+ org.roaringbitmap
+ RoaringBitmap
+
+
+ javax.inject
+ javax.inject
+
+
+ org.reflections
+ reflections
+
+
+ javax.validation
+ validation-api
+
+
+
+
org.xerial.snappy
snappy-java
@@ -1890,58 +2104,6 @@
-
- com.facebook.presto.pinot
- pinot-driver
- 0.1.2
-
-
- org.slf4j
- slf4j-api
-
-
- org.slf4j
- log4j-over-slf4j
-
-
- org.apache.logging.log4j
- log4j-slf4j-api
-
-
- org.apache.logging.log4j
- log4j-slf4j-impl
-
-
- org.apache.logging.log4j
- log4j-api
-
-
- org.apache.logging.log4j
- log4j-core
-
-
- org.apache.logging.log4j
- log4j-1.2-api
-
-
- org.apache.httpcomponents
- httpclient
-
-
- org.apache.httpcomponents
- httpcore
-
-
- org.apache.lucene
- lucene-analyzers-common
-
-
- org.roaringbitmap
- RoaringBitmap
-
-
-
-
io.delta
delta-standalone_2.12
diff --git a/presto-pinot-toolkit/pom.xml b/presto-pinot-toolkit/pom.xml
index 4c37b7dd3d8d3..27d6a9fc76936 100644
--- a/presto-pinot-toolkit/pom.xml
+++ b/presto-pinot-toolkit/pom.xml
@@ -20,16 +20,64 @@
- com.facebook.presto.pinot
- pinot-driver
+ org.apache.pinot
+ presto-pinot-driver
+ shaded
+
+
+
+ io.perfmark
+ perfmark-api
+ 0.19.0
+
+
+
+ com.clearspring.analytics
+ stream
+ test
+
+
+ org.slf4j
+ slf4j-api
+
+
+
+
+
+ com.tdunning
+ t-digest
+ 3.2
+ test
- org.codehaus.mojo
- animal-sniffer-annotations
+ org.slf4j
+ slf4j-api
+
+
+
+
+ org.apache.datasketches
+ datasketches-java
+ 1.2.0-incubating
+ test
+
- com.google.protobuf
- protobuf-java
+ org.slf4j
+ slf4j-api
+
+
+
+
+
+ org.roaringbitmap
+ RoaringBitmap
+ 0.9.23
+ test
+
+
+ org.slf4j
+ slf4j-api
@@ -49,6 +97,11 @@
jmxutils
+
+ javax.inject
+ javax.inject
+
+
com.facebook.airlift
bootstrap
@@ -86,13 +139,20 @@
- javax.validation
- validation-api
+ org.reflections
+ reflections
+ 0.9.9
+
+
+ com.google.code.findbugs
+ annotations
+
+
- javax.inject
- javax.inject
+ javax.validation
+ validation-api
@@ -208,6 +268,16 @@
test
+
+ org.apache.thrift
+ libthrift
+
+
+
+ org.antlr
+ antlr4-runtime
+
+
com.facebook.airlift
http-client
@@ -218,4 +288,39 @@
concurrent
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-dependency-plugin
+ 2.10
+
+
+ default
+ process-test-classes
+
+ analyze-only
+ analyze-duplicate
+ analyze-dep-mgt
+
+
+
+
+ org.apache.commons:commons-lang3::
+
+ org.antlr:antlr4-runtime::
+
+ org.reflections:reflections::
+ org.apache.thrift:libthrift::
+ io.perfmark:perfmark-api::
+
+
+
+
+
+
+
+
diff --git a/presto-pinot-toolkit/src/main/java/com/facebook/presto/pinot/PinotBrokerPageSourceBase.java b/presto-pinot-toolkit/src/main/java/com/facebook/presto/pinot/PinotBrokerPageSourceBase.java
index 708d6226ac64d..ae267204cfcaa 100644
--- a/presto-pinot-toolkit/src/main/java/com/facebook/presto/pinot/PinotBrokerPageSourceBase.java
+++ b/presto-pinot-toolkit/src/main/java/com/facebook/presto/pinot/PinotBrokerPageSourceBase.java
@@ -25,6 +25,7 @@
import com.facebook.presto.common.type.DoubleType;
import com.facebook.presto.common.type.FixedWidthType;
import com.facebook.presto.common.type.IntegerType;
+import com.facebook.presto.common.type.JsonType;
import com.facebook.presto.common.type.SmallintType;
import com.facebook.presto.common.type.TimestampType;
import com.facebook.presto.common.type.TinyintType;
@@ -41,6 +42,7 @@
import com.google.common.collect.ImmutableMap;
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;
+import org.apache.pinot.spi.utils.TimestampUtils;
import java.net.URI;
import java.util.ArrayList;
@@ -145,7 +147,7 @@ protected void setValue(Type type, BlockBuilder blockBuilder, String value)
blockBuilder.appendNull();
return;
}
- if (!(type instanceof FixedWidthType) && !(type instanceof VarcharType)) {
+ if (!(type instanceof FixedWidthType) && !(type instanceof VarcharType) && !(type instanceof JsonType)) {
throw new PinotException(PINOT_UNSUPPORTED_COLUMN_TYPE, Optional.empty(), "type '" + type + "' not supported");
}
if (type instanceof FixedWidthType) {
@@ -169,7 +171,7 @@ else if (type instanceof DecimalType || type instanceof DoubleType) {
type.writeDouble(blockBuilder, parseDouble(value));
}
else if (type instanceof TimestampType) {
- type.writeLong(blockBuilder, parseLong(value));
+ type.writeLong(blockBuilder, parseTimestamp(value));
}
else if (type instanceof DateType) {
type.writeLong(blockBuilder, parseLong(value));
@@ -185,6 +187,16 @@ else if (type instanceof DateType) {
}
}
+ private long parseTimestamp(String value)
+ {
+ try {
+ return parseLong(value);
+ }
+ catch (Exception e) {
+ return TimestampUtils.toMillisSinceEpoch(value);
+ }
+ }
+
@Override
public long getCompletedBytes()
{
diff --git a/presto-pinot-toolkit/src/main/java/com/facebook/presto/pinot/PinotColumnUtils.java b/presto-pinot-toolkit/src/main/java/com/facebook/presto/pinot/PinotColumnUtils.java
index b378ed2288fff..0784948a46920 100644
--- a/presto-pinot-toolkit/src/main/java/com/facebook/presto/pinot/PinotColumnUtils.java
+++ b/presto-pinot-toolkit/src/main/java/com/facebook/presto/pinot/PinotColumnUtils.java
@@ -19,6 +19,7 @@
import com.facebook.presto.common.type.DateType;
import com.facebook.presto.common.type.DoubleType;
import com.facebook.presto.common.type.IntegerType;
+import com.facebook.presto.common.type.JsonType;
import com.facebook.presto.common.type.TimestampType;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.common.type.VarbinaryType;
@@ -128,6 +129,10 @@ private static Type getPrestoTypeFromPinotType(DataType dataType)
return VarcharType.VARCHAR;
case BYTES:
return VarbinaryType.VARBINARY;
+ case TIMESTAMP:
+ return TimestampType.TIMESTAMP;
+ case JSON:
+ return JsonType.JSON;
default:
break;
}
diff --git a/presto-pinot-toolkit/src/main/java/com/facebook/presto/pinot/PinotPageSourceProvider.java b/presto-pinot-toolkit/src/main/java/com/facebook/presto/pinot/PinotPageSourceProvider.java
index ad10c3808fe7b..3819be01a1af7 100644
--- a/presto-pinot-toolkit/src/main/java/com/facebook/presto/pinot/PinotPageSourceProvider.java
+++ b/presto-pinot-toolkit/src/main/java/com/facebook/presto/pinot/PinotPageSourceProvider.java
@@ -13,7 +13,6 @@
*/
package com.facebook.presto.pinot;
-import com.facebook.presto.pinot.grpc.PinotStreamingQueryClient;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorId;
import com.facebook.presto.spi.ConnectorPageSource;
@@ -24,6 +23,9 @@
import com.facebook.presto.spi.connector.ConnectorPageSourceProvider;
import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.pinot.common.utils.grpc.GrpcQueryClient;
+import org.apache.pinot.connector.presto.PinotScatterGatherQueryClient;
+import org.apache.pinot.connector.presto.grpc.PinotStreamingQueryClient;
import javax.inject.Inject;
@@ -58,7 +60,7 @@ public PinotPageSourceProvider(
pinotConfig.getMinConnectionsPerServer(),
pinotConfig.getMaxBacklogPerServer(),
pinotConfig.getMaxConnectionsPerServer()));
- this.pinotStreamingQueryClient = new PinotStreamingQueryClient(new PinotStreamingQueryClient.Config(
+ this.pinotStreamingQueryClient = new PinotStreamingQueryClient(new GrpcQueryClient.Config(
pinotConfig.getStreamingServerGrpcMaxInboundMessageBytes(),
true));
this.clusterInfoFetcher = requireNonNull(clusterInfoFetcher, "cluster info fetcher is null");
diff --git a/presto-pinot-toolkit/src/main/java/com/facebook/presto/pinot/PinotSegmentPageSource.java b/presto-pinot-toolkit/src/main/java/com/facebook/presto/pinot/PinotSegmentPageSource.java
index 8aea5bdb85337..e3891c99ab4a9 100644
--- a/presto-pinot-toolkit/src/main/java/com/facebook/presto/pinot/PinotSegmentPageSource.java
+++ b/presto-pinot-toolkit/src/main/java/com/facebook/presto/pinot/PinotSegmentPageSource.java
@@ -18,7 +18,6 @@
import com.facebook.presto.common.block.BlockBuilder;
import com.facebook.presto.common.type.ArrayType;
import com.facebook.presto.common.type.Type;
-import com.facebook.presto.pinot.PinotScatterGatherQueryClient.ErrorCode;
import com.facebook.presto.spi.ConnectorPageSource;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.PrestoException;
@@ -27,6 +26,10 @@
import io.airlift.slice.Slices;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.connector.presto.PinotScatterGatherQueryClient;
+import org.apache.pinot.connector.presto.PinotScatterGatherQueryClient.ErrorCode;
+import org.apache.pinot.core.transport.ServerInstance;
+import org.apache.pinot.spi.data.FieldSpec;
import java.util.ArrayList;
import java.util.Arrays;
@@ -39,6 +42,7 @@
import static com.facebook.presto.common.type.BigintType.BIGINT;
import static com.facebook.presto.common.type.IntegerType.INTEGER;
+import static com.facebook.presto.common.type.JsonType.JSON;
import static com.facebook.presto.common.type.VarcharType.VARCHAR;
import static com.facebook.presto.pinot.PinotErrorCode.PINOT_DATA_FETCH_EXCEPTION;
import static com.facebook.presto.pinot.PinotErrorCode.PINOT_EXCEPTION;
@@ -284,7 +288,12 @@ else if (javaType.equals(boolean.class)) {
writeBooleanBlock(blockBuilder, columnType, columnIndex);
}
else if (javaType.equals(long.class)) {
- writeLongBlock(blockBuilder, columnType, columnIndex);
+ if (pinotColumnType.toDataType().equals(FieldSpec.DataType.TIMESTAMP)) {
+ writeTimestampBlock(blockBuilder, columnType, columnIndex);
+ }
+ else {
+ writeLongBlock(blockBuilder, columnType, columnIndex);
+ }
}
else if (javaType.equals(double.class)) {
writeDoubleBlock(blockBuilder, columnType, columnIndex);
@@ -310,6 +319,15 @@ private void writeArrayBlock(BlockBuilder blockBuilder, Type columnType, int col
Type columnPrestoType = ((ArrayType) columnType).getElementType();
BlockBuilder childBuilder = blockBuilder.beginBlockEntry();
switch (columnPinotType) {
+ case BOOLEAN_ARRAY:
+ int[] booleanArray = currentDataTable.getDataTable().getIntArray(rowIndex, columnIndex);
+ for (int i = 0; i < booleanArray.length; i++) {
+ // Both the numeric types implement a writeLong method which write if the bounds for
+ // the type allows else throw exception.
+ columnPrestoType.writeBoolean(childBuilder, Boolean.valueOf(booleanArray[i] > 0));
+ completedBytes += 1;
+ }
+ break;
case INT_ARRAY:
int[] intArray = currentDataTable.getDataTable().getIntArray(rowIndex, columnIndex);
for (int i = 0; i < intArray.length; i++) {
@@ -320,6 +338,7 @@ private void writeArrayBlock(BlockBuilder blockBuilder, Type columnType, int col
}
break;
case LONG_ARRAY:
+ case TIMESTAMP_ARRAY:
long[] longArray = currentDataTable.getDataTable().getLongArray(rowIndex, columnIndex);
for (int i = 0; i < longArray.length; i++) {
columnPrestoType.writeLong(childBuilder, longArray[i]);
@@ -357,6 +376,7 @@ private void writeArrayBlock(BlockBuilder blockBuilder, Type columnType, int col
}
break;
case STRING_ARRAY:
+ case BYTES_ARRAY:
String[] stringArray = currentDataTable.getDataTable().getStringArray(rowIndex, columnIndex);
for (int i = 0; i < stringArray.length; i++) {
Slice slice = Slices.utf8Slice(stringArray[i]);
@@ -393,6 +413,14 @@ private void writeLongBlock(BlockBuilder blockBuilder, Type columnType, int colu
}
}
+ private void writeTimestampBlock(BlockBuilder blockBuilder, Type columnType, int columnIndex)
+ {
+ for (int i = 0; i < currentDataTable.getDataTable().getNumberOfRows(); i++) {
+ columnType.writeLong(blockBuilder, getLong(i, columnIndex));
+ completedBytes += Long.BYTES;
+ }
+ }
+
private void writeDoubleBlock(BlockBuilder blockBuilder, Type columnType, int columnIndex)
{
for (int i = 0; i < currentDataTable.getDataTable().getNumberOfRows(); i++) {
@@ -412,7 +440,7 @@ private void writeSliceBlock(BlockBuilder blockBuilder, Type columnType, int col
private boolean getBoolean(int rowIndex, int columnIndex)
{
- return Boolean.getBoolean(currentDataTable.getDataTable().getString(rowIndex, columnIndex));
+ return currentDataTable.getDataTable().getInt(rowIndex, columnIndex) > 0;
}
private long getLong(int rowIndex, int columnIndex)
@@ -445,7 +473,9 @@ private double getDouble(int rowIndex, int columnIndex)
private Slice getSlice(int rowIndex, int columnIndex)
{
- checkColumnType(columnIndex, VARCHAR);
+ checkColumnType(columnIndex, new Type[] {
+ VARCHAR, JSON
+ });
DataSchema.ColumnDataType columnType = currentDataTable.getDataTable().getDataSchema().getColumnDataType(columnIndex);
switch (columnType) {
case INT_ARRAY:
@@ -498,11 +528,17 @@ private int getEstimatedColumnSizeInBytes(DataSchema.ColumnDataType dataType)
return pinotConfig.getEstimatedSizeInBytesForNonNumericColumn();
}
- private void checkColumnType(int columnIndex, Type expected)
+ private void checkColumnType(int columnIndex, Type[] expectedTypes)
{
checkArgument(columnIndex < split.getExpectedColumnHandles().size(), "Invalid field index");
Type actual = split.getExpectedColumnHandles().get(columnIndex).getDataType();
- checkArgument(actual.equals(expected), "Expected column %s to be type %s but is %s", columnIndex, expected, actual);
+ boolean matches = false;
+ for (Type expectedType : expectedTypes) {
+ if (actual.equals(expectedType)) {
+ matches = true;
+ }
+ }
+ checkArgument(matches, "Expected column %s to be type %s but is %s", columnIndex, expectedTypes, actual);
}
protected static Type getTypeForBlock(PinotColumnHandle pinotColumnHandle)
diff --git a/presto-pinot-toolkit/src/main/java/com/facebook/presto/pinot/PinotSegmentStreamingPageSource.java b/presto-pinot-toolkit/src/main/java/com/facebook/presto/pinot/PinotSegmentStreamingPageSource.java
index 58074fb709d07..61c981d75ff81 100644
--- a/presto-pinot-toolkit/src/main/java/com/facebook/presto/pinot/PinotSegmentStreamingPageSource.java
+++ b/presto-pinot-toolkit/src/main/java/com/facebook/presto/pinot/PinotSegmentStreamingPageSource.java
@@ -14,12 +14,13 @@
package com.facebook.presto.pinot;
import com.facebook.presto.common.Page;
-import com.facebook.presto.pinot.grpc.Constants;
-import com.facebook.presto.pinot.grpc.PinotStreamingQueryClient;
-import com.facebook.presto.pinot.grpc.ServerResponse;
import com.facebook.presto.pinot.query.PinotProxyGrpcRequestBuilder;
import com.facebook.presto.spi.ConnectorSession;
+import org.apache.pinot.common.proto.Server.ServerResponse;
import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.connector.presto.grpc.PinotStreamingQueryClient;
+import org.apache.pinot.core.common.datatable.DataTableFactory;
+import org.apache.pinot.spi.utils.CommonConstants;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -82,13 +83,14 @@ public Page getNextPage()
long startTimeNanos = System.nanoTime();
ServerResponse serverResponse = serverResponseIterator.next();
readTimeNanos += System.nanoTime() - startTimeNanos;
- switch (serverResponse.getResponseType()) {
- case Constants.Response.ResponseType.DATA:
+ final String responseType = serverResponse.getMetadataOrThrow("responseType");
+ switch (responseType) {
+ case CommonConstants.Query.Response.ResponseType.DATA:
estimatedMemoryUsageInBytes = serverResponse.getSerializedSize();
// Store each dataTable which will later be constructed into Pages.
try {
- byteBuffer = serverResponse.getPayloadReadOnlyByteBuffer();
- DataTable dataTable = serverResponse.getDataTable(byteBuffer);
+ byteBuffer = serverResponse.getPayload().asReadOnlyByteBuffer();
+ DataTable dataTable = DataTableFactory.getDataTable(byteBuffer);
checkExceptions(dataTable, split, PinotSessionProperties.isMarkDataFetchExceptionsAsRetriable(session));
currentDataTable = new PinotSegmentPageSource.PinotDataTableWithSize(dataTable, serverResponse.getSerializedSize());
}
@@ -100,7 +102,7 @@ public Page getNextPage()
e);
}
break;
- case Constants.Response.ResponseType.METADATA:
+ case CommonConstants.Query.Response.ResponseType.METADATA:
// The last part of the response is Metadata
currentDataTable = null;
serverResponseIterator = null;
@@ -110,7 +112,7 @@ public Page getNextPage()
throw new PinotException(
PINOT_UNEXPECTED_RESPONSE,
split.getSegmentPinotQuery(),
- String.format("Encountered Pinot exceptions, unknown response type - %s", serverResponse.getResponseType()));
+ String.format("Encountered Pinot exceptions, unknown response type - %s", responseType));
}
}
Page page = fillNextPage();
diff --git a/presto-pinot-toolkit/src/main/java/com/facebook/presto/pinot/query/PinotProxyGrpcRequestBuilder.java b/presto-pinot-toolkit/src/main/java/com/facebook/presto/pinot/query/PinotProxyGrpcRequestBuilder.java
index 1d4a5b4389942..c3ad2742c8c7f 100644
--- a/presto-pinot-toolkit/src/main/java/com/facebook/presto/pinot/query/PinotProxyGrpcRequestBuilder.java
+++ b/presto-pinot-toolkit/src/main/java/com/facebook/presto/pinot/query/PinotProxyGrpcRequestBuilder.java
@@ -13,10 +13,10 @@
*/
package com.facebook.presto.pinot.query;
-import com.facebook.presto.pinot.PinotScatterGatherQueryClient;
-import com.facebook.presto.pinot.grpc.Constants;
-import com.facebook.presto.pinot.grpc.GrpcRequestBuilder;
import org.apache.pinot.common.proto.Server;
+import org.apache.pinot.common.utils.grpc.GrpcRequestBuilder;
+import org.apache.pinot.connector.presto.PinotScatterGatherQueryClient;
+import org.apache.pinot.spi.utils.CommonConstants;
import java.util.HashMap;
import java.util.List;
@@ -77,7 +77,7 @@ public PinotProxyGrpcRequestBuilder setEnableStreaming(boolean enableStreaming)
public PinotProxyGrpcRequestBuilder setSql(String sql)
{
- payloadType = Constants.Request.PayloadType.SQL;
+ payloadType = CommonConstants.Query.Request.PayloadType.SQL;
this.sql = sql;
return this;
}
@@ -99,15 +99,15 @@ public Server.ServerRequest build()
if (payloadType == null || segments.isEmpty()) {
throw new PinotScatterGatherQueryClient.PinotException(PinotScatterGatherQueryClient.ErrorCode.PINOT_UNCLASSIFIED_ERROR, "Query and segmentsToQuery must be set");
}
- if (!payloadType.equals(Constants.Request.PayloadType.SQL)) {
+ if (!payloadType.equals(CommonConstants.Query.Request.PayloadType.SQL)) {
throw new RuntimeException("Only [SQL] Payload type is allowed: " + payloadType);
}
Map metadata = new HashMap<>();
- metadata.put(Constants.Request.MetadataKeys.REQUEST_ID, Integer.toString(requestId));
- metadata.put(Constants.Request.MetadataKeys.BROKER_ID, brokerId);
- metadata.put(Constants.Request.MetadataKeys.ENABLE_TRACE, Boolean.toString(enableTrace));
- metadata.put(Constants.Request.MetadataKeys.ENABLE_STREAMING, Boolean.toString(enableStreaming));
- metadata.put(Constants.Request.MetadataKeys.PAYLOAD_TYPE, payloadType);
+ metadata.put(CommonConstants.Query.Request.MetadataKeys.REQUEST_ID, Integer.toString(requestId));
+ metadata.put(CommonConstants.Query.Request.MetadataKeys.BROKER_ID, brokerId);
+ metadata.put(CommonConstants.Query.Request.MetadataKeys.ENABLE_TRACE, Boolean.toString(enableTrace));
+ metadata.put(CommonConstants.Query.Request.MetadataKeys.ENABLE_STREAMING, Boolean.toString(enableStreaming));
+ metadata.put(CommonConstants.Query.Request.MetadataKeys.PAYLOAD_TYPE, payloadType);
if (this.hostName != null) {
metadata.put(KEY_OF_PROXY_GRPC_FORWARD_HOST, this.hostName);
}
diff --git a/presto-pinot-toolkit/src/test/java/com/facebook/presto/pinot/TestPinotColumnMetadata.java b/presto-pinot-toolkit/src/test/java/com/facebook/presto/pinot/TestPinotColumnMetadata.java
index 972e74648ead2..e1b0067b803ba 100644
--- a/presto-pinot-toolkit/src/test/java/com/facebook/presto/pinot/TestPinotColumnMetadata.java
+++ b/presto-pinot-toolkit/src/test/java/com/facebook/presto/pinot/TestPinotColumnMetadata.java
@@ -28,6 +28,7 @@
import java.util.concurrent.TimeUnit;
import static com.facebook.presto.common.type.BigintType.BIGINT;
+import static com.facebook.presto.common.type.BooleanType.BOOLEAN;
import static com.facebook.presto.common.type.DoubleType.DOUBLE;
import static com.facebook.presto.common.type.IntegerType.INTEGER;
import static com.facebook.presto.common.type.VarbinaryType.VARBINARY;
@@ -76,14 +77,14 @@ public void testParsePinotSchemaToPinotColumns()
.put("singleValueFloatDimension", DOUBLE)
.put("singleValueDoubleDimension", DOUBLE)
.put("singleValueBytesDimension", VARBINARY)
- .put("singleValueBooleanDimension", VARCHAR)
+ .put("singleValueBooleanDimension", BOOLEAN)
.put("singleValueStringDimension", VARCHAR)
.put("multiValueIntDimension", new ArrayType(INTEGER))
.put("multiValueLongDimension", new ArrayType(BIGINT))
.put("multiValueFloatDimension", new ArrayType(DOUBLE))
.put("multiValueDoubleDimension", new ArrayType(DOUBLE))
.put("multiValueBytesDimension", new ArrayType(VARBINARY))
- .put("multiValueBooleanDimension", new ArrayType(VARCHAR))
+ .put("multiValueBooleanDimension", new ArrayType(BOOLEAN))
.put("multiValueStringDimension", new ArrayType(VARCHAR))
.put("intMetric", INTEGER)
.put("longMetric", BIGINT)
diff --git a/presto-pinot-toolkit/src/test/java/com/facebook/presto/pinot/TestPinotQueryBase.java b/presto-pinot-toolkit/src/test/java/com/facebook/presto/pinot/TestPinotQueryBase.java
index f734f2b485af3..db6875c136378 100644
--- a/presto-pinot-toolkit/src/test/java/com/facebook/presto/pinot/TestPinotQueryBase.java
+++ b/presto-pinot-toolkit/src/test/java/com/facebook/presto/pinot/TestPinotQueryBase.java
@@ -102,7 +102,10 @@ public class TestPinotQueryBase
protected static final Metadata metadata = MetadataManager.createTestMetadataManager();
- protected final PinotConfig pinotConfig = new PinotConfig();
+ protected final PinotConfig pinotConfig = new PinotConfig()
+ .setMinConnectionsPerServer(1)
+ .setMaxConnectionsPerServer(2)
+ .setThreadPoolSize(2);
protected static final Map testInput =
ImmutableMap.builder()
diff --git a/presto-pinot-toolkit/src/test/java/com/facebook/presto/pinot/TestPinotSegmentPageSource.java b/presto-pinot-toolkit/src/test/java/com/facebook/presto/pinot/TestPinotSegmentPageSource.java
index c3f0688fb1052..4bf63cd7fbf8d 100644
--- a/presto-pinot-toolkit/src/test/java/com/facebook/presto/pinot/TestPinotSegmentPageSource.java
+++ b/presto-pinot-toolkit/src/test/java/com/facebook/presto/pinot/TestPinotSegmentPageSource.java
@@ -26,20 +26,17 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
-import org.apache.pinot.common.response.ProcessingException;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.connector.presto.PinotScatterGatherQueryClient;
+import org.apache.pinot.connector.presto.grpc.Utils;
+import org.apache.pinot.core.common.datatable.DataTableBuilder;
+import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.spi.data.DimensionFieldSpec;
import org.apache.pinot.spi.data.FieldSpec;
-import org.apache.pinot.spi.utils.ByteArray;
import org.testng.annotations.Test;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.ObjectInput;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.nio.ByteBuffer;
+import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
@@ -57,6 +54,7 @@ public class TestPinotSegmentPageSource
{
private static final Random RANDOM = new Random(1234);
protected static final int NUM_ROWS = 100;
+ private PinotScatterGatherQueryClient mockPinotQueryClient;
private static final Set UNSUPPORTED_TYPES = ImmutableSet.of(
DataSchema.ColumnDataType.OBJECT, DataSchema.ColumnDataType.BYTES);
@@ -87,6 +85,8 @@ protected List createPinotColumnHandlesWithAllTypes()
protected FieldSpec getFieldSpec(String columnName, DataSchema.ColumnDataType columnDataType)
{
switch (columnDataType) {
+ case BOOLEAN:
+ return new DimensionFieldSpec(columnName, FieldSpec.DataType.BOOLEAN, true);
case INT:
return new DimensionFieldSpec(columnName, FieldSpec.DataType.INT, true);
case LONG:
@@ -99,6 +99,12 @@ protected FieldSpec getFieldSpec(String columnName, DataSchema.ColumnDataType co
return new DimensionFieldSpec(columnName, FieldSpec.DataType.STRING, true);
case BYTES:
return new DimensionFieldSpec(columnName, FieldSpec.DataType.BYTES, true);
+ case TIMESTAMP:
+ return new DimensionFieldSpec(columnName, FieldSpec.DataType.TIMESTAMP, true);
+ case JSON:
+ return new DimensionFieldSpec(columnName, FieldSpec.DataType.JSON, true);
+ case BOOLEAN_ARRAY:
+ return new DimensionFieldSpec(columnName, FieldSpec.DataType.BOOLEAN, false);
case INT_ARRAY:
return new DimensionFieldSpec(columnName, FieldSpec.DataType.INT, false);
case LONG_ARRAY:
@@ -109,274 +115,133 @@ protected FieldSpec getFieldSpec(String columnName, DataSchema.ColumnDataType co
return new DimensionFieldSpec(columnName, FieldSpec.DataType.DOUBLE, false);
case STRING_ARRAY:
return new DimensionFieldSpec(columnName, FieldSpec.DataType.STRING, false);
+ case BYTES_ARRAY:
+ return new DimensionFieldSpec(columnName, FieldSpec.DataType.BYTES, false);
+ case TIMESTAMP_ARRAY:
+ return new DimensionFieldSpec(columnName, FieldSpec.DataType.TIMESTAMP, false);
default:
throw new IllegalStateException("Unexpected column type " + columnDataType);
}
}
- protected static final class SimpleDataTable
- implements DataTable
- {
- private final DataSchema dataSchema;
- private final int numRows;
- private final transient Object[][] data;
-
- public SimpleDataTable(int numRows, DataSchema dataSchema)
- {
- this.numRows = numRows;
- this.dataSchema = dataSchema;
- this.data = new Object[numRows][];
- for (int i = 0; i < numRows; ++i) {
- this.data[i] = new Object[dataSchema.size()];
- }
- }
-
- public SimpleDataTable(int numRows, DataSchema dataSchema, Object[][] data)
- {
- this.numRows = numRows;
- this.dataSchema = dataSchema;
- this.data = data;
- }
-
- public static DataTable fromBytes(ByteBuffer byteBuffer)
- {
- try {
- int numRows = byteBuffer.getInt();
- int numDataSchemaBytes = byteBuffer.getInt();
- int numDataBytes = byteBuffer.getInt();
-
- byte[] dataSchemaBytes = new byte[numDataSchemaBytes];
- byteBuffer.get(dataSchemaBytes);
- DataSchema dataSchema = DataSchema.fromBytes(dataSchemaBytes);
-
- byte[] dataBytes = new byte[numDataBytes];
- byteBuffer.get(dataBytes);
- ByteArrayInputStream bis = new ByteArrayInputStream(dataBytes);
- ObjectInput in = new ObjectInputStream(bis);
- Object[][] data = (Object[][]) in.readObject();
- return new SimpleDataTable(numRows, dataSchema, data);
- }
- catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public void addException(ProcessingException e)
- {
- throw new UnsupportedOperationException("Unsupported", e);
- }
-
- @Override
- public byte[] toBytes()
- {
- try {
- byte[] dataSchemaBytes = dataSchema.toBytes();
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- ObjectOutputStream out = new ObjectOutputStream(bos);
- out.writeObject(data);
- out.flush();
- byte[] dataBytes = bos.toByteArray();
- int totalBytes = 12 + dataSchemaBytes.length + dataBytes.length;
- ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[totalBytes]);
- byteBuffer.putInt(numRows);
- byteBuffer.putInt(dataSchemaBytes.length);
- byteBuffer.putInt(dataBytes.length);
- byteBuffer.put(dataSchemaBytes);
- byteBuffer.put(dataBytes);
- return byteBuffer.array();
- }
- catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public Map getMetadata()
- {
- return ImmutableMap.of();
- }
-
- @Override
- public DataSchema getDataSchema()
- {
- return dataSchema;
- }
-
- @Override
- public int getNumberOfRows()
- {
- return numRows;
- }
-
- protected Object get(int rowIndex, int columnIndex)
- {
- return this.data[rowIndex][columnIndex];
- }
-
- protected void set(int rowIndex, int columnIndex, Object o)
- {
- this.data[rowIndex][columnIndex] = o;
- }
-
- @Override
- public int getInt(int rowIndex, int colIndex)
- {
- return getObject(rowIndex, colIndex);
- }
-
- @Override
- public long getLong(int rowIndex, int colIndex)
- {
- return getObject(rowIndex, colIndex);
- }
-
- @Override
- public float getFloat(int rowIndex, int colIndex)
- {
- return getObject(rowIndex, colIndex);
- }
-
- @Override
- public double getDouble(int rowIndex, int colIndex)
- {
- return getObject(rowIndex, colIndex);
- }
-
- @Override
- public String getString(int rowIndex, int colIndex)
- {
- return getObject(rowIndex, colIndex);
- }
-
- @Override
- public ByteArray getBytes(int rowIndex, int colIndex)
- {
- return getBytes(rowIndex, colIndex);
- }
-
- @Override
- public T getObject(int rowIndex, int columnIndex)
- {
- return (T) get(rowIndex, columnIndex);
- }
-
- @Override
- public int[] getIntArray(int rowIndex, int colIndex)
- {
- return getObject(rowIndex, colIndex);
- }
-
- @Override
- public long[] getLongArray(int rowIndex, int colIndex)
- {
- return getObject(rowIndex, colIndex);
- }
-
- @Override
- public float[] getFloatArray(int rowIndex, int colIndex)
- {
- return getObject(rowIndex, colIndex);
- }
-
- @Override
- public double[] getDoubleArray(int rowIndex, int colIndex)
- {
- return getObject(rowIndex, colIndex);
- }
-
- @Override
- public String[] getStringArray(int rowIndex, int colIndex)
- {
- return getObject(rowIndex, colIndex);
- }
- }
-
protected static DataTable createDataTableWithAllTypes()
{
- int numColumns = ALL_TYPES.size();
- String[] columnNames = new String[numColumns];
- for (int i = 0; i < numColumns; i++) {
- columnNames[i] = ALL_TYPES.get(i).name();
- }
- DataSchema.ColumnDataType[] columnDataTypes = ALL_TYPES_ARRAY;
- DataSchema dataSchema = new DataSchema(columnNames, columnDataTypes);
- SimpleDataTable dataTable = new SimpleDataTable(NUM_ROWS, dataSchema);
- for (int rowId = 0; rowId < NUM_ROWS; rowId++) {
- for (int colId = 0; colId < numColumns; colId++) {
- switch (columnDataTypes[colId]) {
- case INT:
- dataTable.set(rowId, colId, RANDOM.nextInt());
- break;
- case LONG:
- dataTable.set(rowId, colId, RANDOM.nextLong());
- break;
- case FLOAT:
- dataTable.set(rowId, colId, RANDOM.nextFloat());
- break;
- case DOUBLE:
- dataTable.set(rowId, colId, RANDOM.nextDouble());
- break;
- case STRING:
- dataTable.set(rowId, colId, generateRandomStringWithLength(RANDOM.nextInt(20)));
- break;
- case OBJECT:
- dataTable.set(rowId, colId, (Object) RANDOM.nextDouble());
- break;
- case INT_ARRAY:
- int length = RANDOM.nextInt(20);
- int[] intArray = new int[length];
- for (int i = 0; i < length; i++) {
- intArray[i] = RANDOM.nextInt();
- }
- dataTable.set(rowId, colId, intArray);
- break;
- case LONG_ARRAY:
- length = RANDOM.nextInt(20);
- long[] longArray = new long[length];
- for (int i = 0; i < length; i++) {
- longArray[i] = RANDOM.nextLong();
- }
- dataTable.set(rowId, colId, longArray);
- break;
- case FLOAT_ARRAY:
- length = RANDOM.nextInt(20);
- float[] floatArray = new float[length];
- for (int i = 0; i < length; i++) {
- floatArray[i] = RANDOM.nextFloat();
- }
- dataTable.set(rowId, colId, floatArray);
- break;
- case DOUBLE_ARRAY:
- length = RANDOM.nextInt(20);
- double[] doubleArray = new double[length];
- for (int i = 0; i < length; i++) {
- doubleArray[i] = RANDOM.nextDouble();
- }
- dataTable.set(rowId, colId, doubleArray);
- break;
- case STRING_ARRAY:
- length = RANDOM.nextInt(20);
- String[] stringArray = new String[length];
- for (int i = 0; i < length; i++) {
- stringArray[i] = generateRandomStringWithLength(RANDOM.nextInt(20));
- }
- dataTable.set(rowId, colId, stringArray);
- break;
+ try {
+ int numColumns = ALL_TYPES.size();
+ String[] columnNames = new String[numColumns];
+ for (int i = 0; i < numColumns; i++) {
+ columnNames[i] = ALL_TYPES.get(i).name();
+ }
+ DataSchema.ColumnDataType[] columnDataTypes = ALL_TYPES_ARRAY;
+ DataSchema dataSchema = new DataSchema(columnNames, columnDataTypes);
+ DataTableBuilder dataTableBuilder = new DataTableBuilder(dataSchema);
+ for (int rowId = 0; rowId < NUM_ROWS; rowId++) {
+ dataTableBuilder.startRow();
+ for (int colId = 0; colId < numColumns; colId++) {
+ switch (columnDataTypes[colId]) {
+ case BOOLEAN:
+ dataTableBuilder.setColumn(colId, RANDOM.nextBoolean());
+ break;
+ case INT:
+ dataTableBuilder.setColumn(colId, RANDOM.nextInt());
+ break;
+ case LONG:
+ case TIMESTAMP:
+ dataTableBuilder.setColumn(colId, RANDOM.nextLong());
+ break;
+ case FLOAT:
+ dataTableBuilder.setColumn(colId, RANDOM.nextFloat());
+ break;
+ case DOUBLE:
+ dataTableBuilder.setColumn(colId, RANDOM.nextDouble());
+ break;
+ case STRING:
+ dataTableBuilder.setColumn(colId, generateRandomStringWithLength(RANDOM.nextInt(20)));
+ break;
+ case OBJECT:
+ dataTableBuilder.setColumn(colId, (Object) RANDOM.nextDouble());
+ break;
+ case BOOLEAN_ARRAY:
+ int length = RANDOM.nextInt(20);
+ int[] booleanArray = new int[length];
+ for (int i = 0; i < length; i++) {
+ booleanArray[i] = RANDOM.nextInt(2);
+ }
+ dataTableBuilder.setColumn(colId, booleanArray);
+ break;
+ case INT_ARRAY:
+ length = RANDOM.nextInt(20);
+ int[] intArray = new int[length];
+ for (int i = 0; i < length; i++) {
+ intArray[i] = RANDOM.nextInt();
+ }
+ dataTableBuilder.setColumn(colId, intArray);
+ break;
+ case LONG_ARRAY:
+ case TIMESTAMP_ARRAY:
+ length = RANDOM.nextInt(20);
+ long[] longArray = new long[length];
+ for (int i = 0; i < length; i++) {
+ longArray[i] = RANDOM.nextLong();
+ }
+ dataTableBuilder.setColumn(colId, longArray);
+ break;
+ case FLOAT_ARRAY:
+ length = RANDOM.nextInt(20);
+ float[] floatArray = new float[length];
+ for (int i = 0; i < length; i++) {
+ floatArray[i] = RANDOM.nextFloat();
+ }
+ dataTableBuilder.setColumn(colId, floatArray);
+ break;
+ case DOUBLE_ARRAY:
+ length = RANDOM.nextInt(20);
+ double[] doubleArray = new double[length];
+ for (int i = 0; i < length; i++) {
+ doubleArray[i] = RANDOM.nextDouble();
+ }
+ dataTableBuilder.setColumn(colId, doubleArray);
+ break;
+ case STRING_ARRAY:
+ case BYTES_ARRAY:
+ length = RANDOM.nextInt(20);
+ String[] stringArray = new String[length];
+ for (int i = 0; i < length; i++) {
+ stringArray[i] = generateRandomStringWithLength(RANDOM.nextInt(20));
+ }
+ dataTableBuilder.setColumn(colId, stringArray);
+ break;
+ case JSON:
+ dataTableBuilder.setColumn(colId,
+ "{ " + generateRandomStringWithLength(RANDOM.nextInt(5)) + " : "
+ + generateRandomStringWithLength(RANDOM.nextInt(10)) + " }");
+ break;
+ default:
+ throw new RuntimeException("Unsupported type - " + columnDataTypes[colId]);
+ }
}
+ dataTableBuilder.finishRow();
}
+ return dataTableBuilder.build();
+ }
+ catch (IOException e) {
+ Assert.fail("Failed to create Pinot DataTable with all types", e);
+ throw new RuntimeException(e);
}
- return dataTable;
}
private static final class MockPinotScatterGatherQueryClient
extends PinotScatterGatherQueryClient
{
- private final ImmutableList dataTables;
+ private ImmutableList dataTables;
- MockPinotScatterGatherQueryClient(Config pinotConfig, List dataTables)
+ MockPinotScatterGatherQueryClient(Config pinotConfig)
{
super(pinotConfig);
+ }
+
+ public void setDataTables(List dataTables)
+ {
this.dataTables = ImmutableList.copyOf(dataTables);
}
@@ -385,7 +250,7 @@ public Map queryPinotServerForDataTable(String pql, S
{
ImmutableMap.Builder response = ImmutableMap.builder();
for (int i = 0; i < dataTables.size(); ++i) {
- response.put(new ServerInstance(String.format("Server_localhost_%d", i + 9000)), dataTables.get(i));
+ response.put(new ServerInstance(Utils.toInstanceConfig(String.format("Server_localhost_%d", i + 9000))), dataTables.get(i));
}
return response.build();
}
@@ -397,12 +262,15 @@ PinotSegmentPageSource getPinotSegmentPageSource(
PinotSplit mockPinotSplit,
List pinotColumnHandles)
{
- PinotScatterGatherQueryClient mockPinotQueryClient = new MockPinotScatterGatherQueryClient(new PinotScatterGatherQueryClient.Config(
- pinotConfig.getIdleTimeout().toMillis(),
- pinotConfig.getThreadPoolSize(),
- pinotConfig.getMinConnectionsPerServer(),
- pinotConfig.getMaxBacklogPerServer(),
- pinotConfig.getMaxConnectionsPerServer()), dataTables);
+ if (mockPinotQueryClient == null) {
+ mockPinotQueryClient = new MockPinotScatterGatherQueryClient(new PinotScatterGatherQueryClient.Config(
+ pinotConfig.getIdleTimeout().toMillis(),
+ pinotConfig.getThreadPoolSize(),
+ pinotConfig.getMinConnectionsPerServer(),
+ pinotConfig.getMaxBacklogPerServer(),
+ pinotConfig.getMaxConnectionsPerServer()));
+ }
+ ((MockPinotScatterGatherQueryClient) mockPinotQueryClient).setDataTables(dataTables);
PinotSegmentPageSource pinotSegmentPageSource = new PinotSegmentPageSource(session, pinotConfig, mockPinotQueryClient, mockPinotSplit, pinotColumnHandles);
return pinotSegmentPageSource;
}
@@ -473,17 +341,19 @@ public void testAllDataTypes()
@Test
public void testMultivaluedType()
+ throws IOException
{
String[] columnNames = {"col1", "col2"};
DataSchema.ColumnDataType[] columnDataTypes = {DataSchema.ColumnDataType.INT_ARRAY, DataSchema.ColumnDataType.STRING_ARRAY};
DataSchema dataSchema = new DataSchema(columnNames, columnDataTypes);
- SimpleDataTable dataTable = new SimpleDataTable(1, dataSchema);
-
- int numRows = 1;
String[] stringArray = {"stringVal1", "stringVal2"};
int[] intArray = {10, 34, 67};
- dataTable.set(0, 0, intArray);
- dataTable.set(0, 1, stringArray);
+ DataTableBuilder dataTableBuilder = new DataTableBuilder(dataSchema);
+ dataTableBuilder.startRow();
+ dataTableBuilder.setColumn(0, intArray);
+ dataTableBuilder.setColumn(1, stringArray);
+ dataTableBuilder.finishRow();
+ DataTable dataTable = dataTableBuilder.build();
PinotSessionProperties pinotSessionProperties = new PinotSessionProperties(pinotConfig);
ConnectorSession session = new TestingConnectorSession(pinotSessionProperties.getSessionProperties());
diff --git a/presto-pinot-toolkit/src/test/java/com/facebook/presto/pinot/TestPinotSegmentStreamingPageSource.java b/presto-pinot-toolkit/src/test/java/com/facebook/presto/pinot/TestPinotSegmentStreamingPageSource.java
index c1ecbb1af1a6d..8e7decf17ea43 100644
--- a/presto-pinot-toolkit/src/test/java/com/facebook/presto/pinot/TestPinotSegmentStreamingPageSource.java
+++ b/presto-pinot-toolkit/src/test/java/com/facebook/presto/pinot/TestPinotSegmentStreamingPageSource.java
@@ -13,10 +13,6 @@
*/
package com.facebook.presto.pinot;
-import com.facebook.presto.pinot.grpc.Constants;
-import com.facebook.presto.pinot.grpc.GrpcRequestBuilder;
-import com.facebook.presto.pinot.grpc.PinotStreamingQueryClient;
-import com.facebook.presto.pinot.grpc.ServerResponse;
import com.facebook.presto.pinot.query.PinotProxyGrpcRequestBuilder;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.testing.assertions.Assert;
@@ -24,10 +20,14 @@
import com.google.common.collect.ImmutableMap;
import org.apache.pinot.common.proto.Server;
import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.common.utils.grpc.GrpcQueryClient;
+import org.apache.pinot.common.utils.grpc.GrpcRequestBuilder;
+import org.apache.pinot.connector.presto.grpc.PinotStreamingQueryClient;
+import org.apache.pinot.connector.presto.grpc.Utils;
+import org.apache.pinot.spi.utils.CommonConstants;
import org.testng.annotations.Test;
import java.io.IOException;
-import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
@@ -37,76 +37,45 @@
public class TestPinotSegmentStreamingPageSource
extends TestPinotSegmentPageSource
{
- private static final class MockServerResponse
- extends ServerResponse
- {
- private DataTable dataTable;
-
- public MockServerResponse(Server.ServerResponse serverResponse)
- {
- super(serverResponse);
- }
-
- public MockServerResponse(DataTable dataTable)
- {
- super(null);
- this.dataTable = dataTable;
- }
-
- public String getResponseType()
- {
- return Constants.Response.ResponseType.DATA;
- }
-
- public int getSerializedSize()
- {
- return 0;
- }
-
- public ByteBuffer getPayloadReadOnlyByteBuffer()
- {
- try {
- return ByteBuffer.wrap(dataTable.toBytes());
- }
- catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- public DataTable getDataTable(ByteBuffer byteBuffer) throws IOException
- {
- return SimpleDataTable.fromBytes(byteBuffer);
- }
- }
-
private static final class MockPinotStreamingQueryClient
extends PinotStreamingQueryClient
{
private final ImmutableList dataTables;
- MockPinotStreamingQueryClient(PinotStreamingQueryClient.Config pinotConfig, List dataTables)
+ MockPinotStreamingQueryClient(GrpcQueryClient.Config pinotConfig, List dataTables)
{
super(pinotConfig);
this.dataTables = ImmutableList.copyOf(dataTables);
}
@Override
- public Iterator submit(String host, int port, GrpcRequestBuilder requestBuilder)
+ public Iterator submit(String host, int port, GrpcRequestBuilder requestBuilder)
{
- return new Iterator()
+ return new Iterator()
{
int index;
@Override
public boolean hasNext()
{
- return index < dataTables.size();
+ return index <= dataTables.size();
}
@Override
- public ServerResponse next()
+ public Server.ServerResponse next()
{
- return new MockServerResponse(dataTables.get(index++));
+ if (index < dataTables.size()) {
+ final DataTable dataTable = dataTables.get(index++);
+ try {
+ return Server.ServerResponse.newBuilder().setPayload(Utils.toByteString(dataTable.toBytes())).putMetadata("responseType", "data").build();
+ }
+ catch (IOException e) {
+ throw new RuntimeException();
+ }
+ }
+ else {
+ return Server.ServerResponse.newBuilder().putMetadata("responseType", "metadata").build();
+ }
}
};
}
@@ -119,7 +88,7 @@ PinotSegmentPageSource getPinotSegmentPageSource(
PinotSplit mockPinotSplit,
List handlesSurviving)
{
- MockPinotStreamingQueryClient mockPinotQueryClient = new MockPinotStreamingQueryClient(new PinotStreamingQueryClient.Config(pinotConfig.getStreamingServerGrpcMaxInboundMessageBytes(), true), dataTables);
+ MockPinotStreamingQueryClient mockPinotQueryClient = new MockPinotStreamingQueryClient(new GrpcQueryClient.Config(pinotConfig.getStreamingServerGrpcMaxInboundMessageBytes(), true), dataTables);
return new PinotSegmentStreamingPageSource(session, pinotConfig, mockPinotQueryClient, mockPinotSplit, handlesSurviving);
}
@@ -150,11 +119,11 @@ public void testPinotProxyGrpcRequest()
Assert.assertEquals(grpcRequest.getMetadataOrThrow("k2"), "v2");
Assert.assertEquals(grpcRequest.getMetadataOrThrow("FORWARD_HOST"), "localhost");
Assert.assertEquals(grpcRequest.getMetadataOrThrow("FORWARD_PORT"), "8124");
- Assert.assertEquals(grpcRequest.getMetadataOrThrow(Constants.Request.MetadataKeys.REQUEST_ID), "121");
- Assert.assertEquals(grpcRequest.getMetadataOrThrow(Constants.Request.MetadataKeys.BROKER_ID), "presto-coordinator-grpc");
- Assert.assertEquals(grpcRequest.getMetadataOrThrow(Constants.Request.MetadataKeys.ENABLE_TRACE), "false");
- Assert.assertEquals(grpcRequest.getMetadataOrThrow(Constants.Request.MetadataKeys.ENABLE_STREAMING), "true");
- Assert.assertEquals(grpcRequest.getMetadataOrThrow(Constants.Request.MetadataKeys.PAYLOAD_TYPE), "sql");
+ Assert.assertEquals(grpcRequest.getMetadataOrThrow(CommonConstants.Query.Request.MetadataKeys.REQUEST_ID), "121");
+ Assert.assertEquals(grpcRequest.getMetadataOrThrow(CommonConstants.Query.Request.MetadataKeys.BROKER_ID), "presto-coordinator-grpc");
+ Assert.assertEquals(grpcRequest.getMetadataOrThrow(CommonConstants.Query.Request.MetadataKeys.ENABLE_TRACE), "false");
+ Assert.assertEquals(grpcRequest.getMetadataOrThrow(CommonConstants.Query.Request.MetadataKeys.ENABLE_STREAMING), "true");
+ Assert.assertEquals(grpcRequest.getMetadataOrThrow(CommonConstants.Query.Request.MetadataKeys.PAYLOAD_TYPE), "sql");
grpcRequest = new PinotProxyGrpcRequestBuilder()
.setSegments(ImmutableList.of("segment1"))
@@ -170,11 +139,11 @@ public void testPinotProxyGrpcRequest()
Assert.assertEquals(grpcRequest.getMetadataCount(), 7);
Assert.assertEquals(grpcRequest.getMetadataOrThrow("k1"), "v1");
Assert.assertEquals(grpcRequest.getMetadataOrThrow("k2"), "v2");
- Assert.assertEquals(grpcRequest.getMetadataOrThrow(Constants.Request.MetadataKeys.REQUEST_ID), "121");
- Assert.assertEquals(grpcRequest.getMetadataOrThrow(Constants.Request.MetadataKeys.BROKER_ID), "presto-coordinator-grpc");
- Assert.assertEquals(grpcRequest.getMetadataOrThrow(Constants.Request.MetadataKeys.ENABLE_TRACE), "false");
- Assert.assertEquals(grpcRequest.getMetadataOrThrow(Constants.Request.MetadataKeys.ENABLE_STREAMING), "true");
- Assert.assertEquals(grpcRequest.getMetadataOrThrow(Constants.Request.MetadataKeys.PAYLOAD_TYPE), "sql");
+ Assert.assertEquals(grpcRequest.getMetadataOrThrow(CommonConstants.Query.Request.MetadataKeys.REQUEST_ID), "121");
+ Assert.assertEquals(grpcRequest.getMetadataOrThrow(CommonConstants.Query.Request.MetadataKeys.BROKER_ID), "presto-coordinator-grpc");
+ Assert.assertEquals(grpcRequest.getMetadataOrThrow(CommonConstants.Query.Request.MetadataKeys.ENABLE_TRACE), "false");
+ Assert.assertEquals(grpcRequest.getMetadataOrThrow(CommonConstants.Query.Request.MetadataKeys.ENABLE_STREAMING), "true");
+ Assert.assertEquals(grpcRequest.getMetadataOrThrow(CommonConstants.Query.Request.MetadataKeys.PAYLOAD_TYPE), "sql");
}
@Test
@@ -191,10 +160,10 @@ public void testPinotGrpcRequest()
Assert.assertEquals(grpcRequest.getSegmentsCount(), 1);
Assert.assertEquals(grpcRequest.getSegments(0), "segment1");
Assert.assertEquals(grpcRequest.getMetadataCount(), 5);
- Assert.assertEquals(grpcRequest.getMetadataOrThrow(Constants.Request.MetadataKeys.REQUEST_ID), "121");
- Assert.assertEquals(grpcRequest.getMetadataOrThrow(Constants.Request.MetadataKeys.BROKER_ID), "presto-coordinator-grpc");
- Assert.assertEquals(grpcRequest.getMetadataOrThrow(Constants.Request.MetadataKeys.ENABLE_TRACE), "false");
- Assert.assertEquals(grpcRequest.getMetadataOrThrow(Constants.Request.MetadataKeys.ENABLE_STREAMING), "true");
- Assert.assertEquals(grpcRequest.getMetadataOrThrow(Constants.Request.MetadataKeys.PAYLOAD_TYPE), "sql");
+ Assert.assertEquals(grpcRequest.getMetadataOrThrow(CommonConstants.Query.Request.MetadataKeys.REQUEST_ID), "121");
+ Assert.assertEquals(grpcRequest.getMetadataOrThrow(CommonConstants.Query.Request.MetadataKeys.BROKER_ID), "presto-coordinator-grpc");
+ Assert.assertEquals(grpcRequest.getMetadataOrThrow(CommonConstants.Query.Request.MetadataKeys.ENABLE_TRACE), "false");
+ Assert.assertEquals(grpcRequest.getMetadataOrThrow(CommonConstants.Query.Request.MetadataKeys.ENABLE_STREAMING), "true");
+ Assert.assertEquals(grpcRequest.getMetadataOrThrow(CommonConstants.Query.Request.MetadataKeys.PAYLOAD_TYPE), "sql");
}
}