Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/src/main/sphinx/connector/pinot.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ Requirements

To connect to Pinot, you need:

* Pinot 0.8.0 or higher.
* Pinot 0.9.3 or higher.
* Network access from the Trino coordinator and workers to the Pinot controller
nodes. Port 8098 is the default port.

Expand Down
26 changes: 25 additions & 1 deletion plugin/trino-pinot/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,14 @@

<properties>
<air.main.basedir>${project.parent.basedir}</air.main.basedir>
<dep.pinot.version>0.8.0</dep.pinot.version>
<dep.pinot.version>0.10.0</dep.pinot.version>
<!--
Project's default for air.test.parallel is 'methods'. By design, 'instances' makes TestNG run tests from one class in a single thread.
As a side effect, it prevents TestNG from initializing multiple test instances upfront, which happens with 'methods'.
A potential downside can be long tail single-threaded execution of a single long test class.
TODO (https://github.com/trinodb/trino/issues/11294) remove when we upgrade to surefire with https://issues.apache.org/jira/browse/SUREFIRE-1967
-->
<air.test.parallel>instances</air.test.parallel>
</properties>

<repositories>
Expand All @@ -29,6 +36,11 @@

<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.17.2</version>
</dependency>
<dependency>
<groupId>org.glassfish.jersey.core</groupId>
<artifactId>jersey-common</artifactId>
Expand Down Expand Up @@ -287,6 +299,10 @@
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.code.findbugs</groupId>
<artifactId>annotations</artifactId>
</exclusion>
</exclusions>
</dependency>

Expand Down Expand Up @@ -343,6 +359,10 @@
<groupId>jakarta.ws.rs</groupId>
<artifactId>jakarta.ws.rs-api</artifactId>
</exclusion>
<exclusion>
<groupId>jakarta.annotation</groupId>
<artifactId>jakarta.annotation-api</artifactId>
</exclusion>
<exclusion>
<groupId>javax.validation</groupId>
<artifactId>validation-api</artifactId>
Expand All @@ -351,6 +371,10 @@
<groupId>org.glassfish.hk2.external</groupId>
<artifactId>jakarta.inject</artifactId>
</exclusion>
<exclusion>
<groupId>org.glassfish.hk2.external</groupId>
<artifactId>aopalliance-repackaged</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.lucene</groupId>
<artifactId>lucene-analyzers-common</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public TransformResultMetadata resolveExpressionType(ExpressionContext expressio
case FUNCTION:
return TransformFunctionFactory.get(expression, datasourceMap).getResultMetadata();
case LITERAL:
FieldSpec.DataType literalDataType = LiteralTransformFunction.inferLiteralDataType(new LiteralTransformFunction(expression.getLiteral()));
FieldSpec.DataType literalDataType = new LiteralTransformFunction(expression.getLiteral()).getResultMetadata().getDataType();
return new TransformResultMetadata(literalDataType, true, false);
default:
throw new PinotException(PINOT_INVALID_PQL_GENERATED, Optional.empty(), format("Unsupported expression: '%s'", expression));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE;
import static com.google.inject.multibindings.OptionalBinder.newOptionalBinder;
import static io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG;
import static io.trino.plugin.pinot.TestingPinotCluster.PINOT_PREVIOUS_IMAGE_NAME;
import static io.trino.spi.type.DoubleType.DOUBLE;
import static io.trino.spi.type.RealType.REAL;
import static java.lang.String.format;
Expand Down Expand Up @@ -121,13 +122,18 @@ public abstract class AbstractPinotIntegrationSmokeTest

protected abstract boolean isSecured();

protected String getPinotImageName()
{
return PINOT_PREVIOUS_IMAGE_NAME;
}

@Override
protected QueryRunner createQueryRunner()
throws Exception
{
TestingKafka kafka = closeAfterClass(TestingKafka.createWithSchemaRegistry());
kafka.start();
TestingPinotCluster pinot = closeAfterClass(new TestingPinotCluster(kafka.getNetwork(), isSecured()));
TestingPinotCluster pinot = closeAfterClass(new TestingPinotCluster(kafka.getNetwork(), isSecured(), getPinotImageName()));
pinot.start();

// Create and populate the all_types topic and table
Expand Down Expand Up @@ -532,7 +538,8 @@ private static Path createSegment(InputStream tableConfigInputStream, InputStrea
false,
tableConfig.getValidationConfig().getSegmentPushType(),
tableConfig.getValidationConfig().getSegmentPushFrequency(),
formatSpec));
formatSpec,
null));
}
else {
checkState(tableConfig.isDimTable(), "Null time column only allowed for dimension tables");
Expand Down Expand Up @@ -1883,15 +1890,10 @@ public void testPassthroughQueriesWithAliases()
" (56, BIGINT '-3147483640')," +
" (56, BIGINT '-3147483639')");

// Query with a function on a column and an alias with the same column name fails
// For more details see https://github.com/apache/pinot/issues/7545
assertThatExceptionOfType(RuntimeException.class)
.isThrownBy(() -> query("SELECT int_col FROM " +
"\"SELECT floor(int_col / 3) AS int_col" +
" FROM " + ALL_TYPES_TABLE +
" WHERE string_col IS NOT null AND string_col != 'array_null'\""))
.withRootCauseInstanceOf(RuntimeException.class)
.withMessage("Alias int_col cannot be referred in SELECT Clause");
assertQuerySucceeds("SELECT int_col FROM " +
"\"SELECT floor(int_col / 3) AS int_col" +
" FROM " + ALL_TYPES_TABLE +
" WHERE string_col IS NOT null AND string_col != 'array_null'\"");
}

@Test
Expand Down Expand Up @@ -1933,16 +1935,10 @@ public void testPassthroughQueriesWithPushdowns()
" GROUP BY int_col2, long_col2"))
.isFullyPushedDown();

// Query with grouping columns but no aggregates ignores aliases.
// For more details see: https://github.com/apache/pinot/issues/7546
assertThatExceptionOfType(RuntimeException.class)
.isThrownBy(() -> query("SELECT DISTINCT int_col2, long_col2 FROM " +
"\"SELECT int_col AS int_col2, long_col AS long_col2" +
" FROM " + ALL_TYPES_TABLE +
" WHERE string_col IS NOT null AND string_col != 'array_null'\""))
.withRootCauseInstanceOf(RuntimeException.class)
.withMessage("column index for 'int_col2' was not found");

assertQuerySucceeds("SELECT DISTINCT int_col2, long_col2 FROM " +
"\"SELECT int_col AS int_col2, long_col AS long_col2" +
" FROM " + ALL_TYPES_TABLE +
" WHERE string_col IS NOT null AND string_col != 'array_null'\"");
assertThat(query("SELECT int_col2, count(*) FROM " +
"\"SELECT int_col AS int_col2, long_col AS long_col2" +
" FROM " + ALL_TYPES_TABLE +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Map;
import java.util.Optional;

import static io.trino.plugin.pinot.TestingPinotCluster.PINOT_LATEST_IMAGE_NAME;
import static io.trino.testing.TestingSession.testSessionBuilder;

public class PinotQueryRunner
Expand Down Expand Up @@ -66,7 +67,7 @@ public static void main(String[] args)
{
TestingKafka kafka = TestingKafka.createWithSchemaRegistry();
kafka.start();
TestingPinotCluster pinot = new TestingPinotCluster(kafka.getNetwork(), false);
TestingPinotCluster pinot = new TestingPinotCluster(kafka.getNetwork(), false, PINOT_LATEST_IMAGE_NAME);
pinot.start();
Map<String, String> properties = ImmutableMap.of("http-server.http.port", "8080");
Map<String, String> pinotProperties = ImmutableMap.<String, String>builder()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.pinot;

import static io.trino.plugin.pinot.TestingPinotCluster.PINOT_LATEST_IMAGE_NAME;

public class TestPinotWithoutAuthenticationIntegrationSmokeTestLatestVersion
extends AbstractPinotIntegrationSmokeTest
{
@Override
protected boolean isSecured()
{
return false;
}

@Override
protected String getPinotImageName()
{
return PINOT_LATEST_IMAGE_NAME;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,9 @@
public class TestingPinotCluster
implements Closeable
{
private static final String BASE_IMAGE = "apachepinot/pinot:0.8.0-jdk11";
public static final String PINOT_LATEST_IMAGE_NAME = "apachepinot/pinot:0.10.0";
public static final String PINOT_PREVIOUS_IMAGE_NAME = "apachepinot/pinot:0.9.3-jdk11";

private static final String ZOOKEEPER_INTERNAL_HOST = "zookeeper";
private static final JsonCodec<List<String>> LIST_JSON_CODEC = listJsonCodec(String.class);
private static final JsonCodec<PinotSuccessResponse> PINOT_SUCCESS_RESPONSE_JSON_CODEC = jsonCodec(PinotSuccessResponse.class);
Expand All @@ -85,7 +87,7 @@ public class TestingPinotCluster
private final Closer closer = Closer.create();
private final boolean secured;

public TestingPinotCluster(Network network, boolean secured)
public TestingPinotCluster(Network network, boolean secured, String pinotImageName)
{
httpClient = closer.register(new JettyHttpClient());
zookeeper = new GenericContainer<>(parse("zookeeper:3.5.6"))
Expand All @@ -96,7 +98,7 @@ public TestingPinotCluster(Network network, boolean secured)
closer.register(zookeeper::stop);

String controllerConfig = secured ? "/var/pinot/controller/config/pinot-controller-secured.conf" : "/var/pinot/controller/config/pinot-controller.conf";
controller = new GenericContainer<>(parse(BASE_IMAGE))
controller = new GenericContainer<>(parse(pinotImageName))
.withNetwork(network)
.withClasspathResourceMapping("/pinot-controller", "/var/pinot/controller/config", BindMode.READ_ONLY)
.withEnv("JAVA_OPTS", "-Xmx512m -Dlog4j2.configurationFile=/opt/pinot/conf/pinot-controller-log4j2.xml -Dplugins.dir=/opt/pinot/plugins")
Expand All @@ -106,7 +108,7 @@ public TestingPinotCluster(Network network, boolean secured)
closer.register(controller::stop);

String brokerConfig = secured ? "/var/pinot/broker/config/pinot-broker-secured.conf" : "/var/pinot/broker/config/pinot-broker.conf";
broker = new GenericContainer<>(parse(BASE_IMAGE))
broker = new GenericContainer<>(parse(pinotImageName))
.withNetwork(network)
.withClasspathResourceMapping("/pinot-broker", "/var/pinot/broker/config", BindMode.READ_ONLY)
.withEnv("JAVA_OPTS", "-Xmx512m -Dlog4j2.configurationFile=/opt/pinot/conf/pinot-broker-log4j2.xml -Dplugins.dir=/opt/pinot/plugins")
Expand All @@ -115,7 +117,7 @@ public TestingPinotCluster(Network network, boolean secured)
.withExposedPorts(BROKER_PORT);
closer.register(broker::stop);

server = new GenericContainer<>(parse(BASE_IMAGE))
server = new GenericContainer<>(parse(pinotImageName))
.withNetwork(network)
.withClasspathResourceMapping("/pinot-server", "/var/pinot/server/config", BindMode.READ_ONLY)
.withEnv("JAVA_OPTS", "-Xmx512m -Dlog4j2.configurationFile=/opt/pinot/conf/pinot-server-log4j2.xml -Dplugins.dir=/opt/pinot/plugins")
Expand Down