diff --git a/plugin/trino-pinot/pom.xml b/plugin/trino-pinot/pom.xml
index d422ea4fcf69..d4df442d9b12 100755
--- a/plugin/trino-pinot/pom.xml
+++ b/plugin/trino-pinot/pom.xml
@@ -546,6 +546,18 @@
+
+ io.trino
+ trino-tpch
+ test
+
+
+
+ io.trino.tpch
+ tpch
+ test
+
+
io.airlift
testing
diff --git a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/AbstractPinotIntegrationSmokeTest.java b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/BasePinotIntegrationConnectorSmokeTest.java
similarity index 93%
rename from plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/AbstractPinotIntegrationSmokeTest.java
rename to plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/BasePinotIntegrationConnectorSmokeTest.java
index cffea1e42a6f..ed109cc6ace2 100644
--- a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/AbstractPinotIntegrationSmokeTest.java
+++ b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/BasePinotIntegrationConnectorSmokeTest.java
@@ -21,15 +21,19 @@
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import io.trino.Session;
import io.trino.plugin.pinot.client.PinotHostMapper;
+import io.trino.plugin.tpch.TpchPlugin;
import io.trino.sql.planner.plan.AggregationNode;
import io.trino.sql.planner.plan.ExchangeNode;
import io.trino.sql.planner.plan.FilterNode;
import io.trino.sql.planner.plan.LimitNode;
import io.trino.sql.planner.plan.MarkDistinctNode;
import io.trino.sql.planner.plan.ProjectNode;
-import io.trino.testing.AbstractTestQueryFramework;
+import io.trino.testing.BaseConnectorSmokeTest;
+import io.trino.testing.DistributedQueryRunner;
import io.trino.testing.MaterializedResult;
+import io.trino.testing.MaterializedRow;
import io.trino.testing.QueryRunner;
+import io.trino.testing.TestingConnectorBehavior;
import io.trino.testing.kafka.TestingKafka;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
@@ -76,6 +80,7 @@
import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE;
import static com.google.inject.multibindings.OptionalBinder.newOptionalBinder;
import static io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG;
+import static io.trino.plugin.pinot.PinotQueryRunner.createPinotQueryRunner;
import static io.trino.plugin.pinot.TestingPinotCluster.PINOT_PREVIOUS_IMAGE_NAME;
import static io.trino.spi.type.DoubleType.DOUBLE;
import static io.trino.spi.type.RealType.REAL;
@@ -93,9 +98,8 @@
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.testng.Assert.assertEquals;
-public abstract class AbstractPinotIntegrationSmokeTest
- // TODO extend BaseConnectorTest
- extends AbstractTestQueryFramework
+public abstract class BasePinotIntegrationConnectorSmokeTest
+ extends BaseConnectorSmokeTest
{
private static final int MAX_ROWS_PER_SPLIT_FOR_SEGMENT_QUERIES = 11;
private static final int MAX_ROWS_PER_SPLIT_FOR_BROKER_QUERIES = 12;
@@ -142,6 +146,38 @@ protected QueryRunner createQueryRunner()
TestingPinotCluster pinot = closeAfterClass(new TestingPinotCluster(kafka.getNetwork(), isSecured(), getPinotImageName()));
pinot.start();
+ createAndPopulateAllTypesTopic(kafka, pinot);
+ createAndPopulateMixedCaseTableAndTopic(kafka, pinot);
+ createAndPopulateMixedCaseDistinctTableAndTopic(kafka, pinot);
+ createAndPopulateTooManyRowsTable(kafka, pinot);
+ createAndPopulateTooManyBrokerRowsTableAndTopic(kafka, pinot);
+ createTheDuplicateTablesAndTopics(kafka, pinot);
+ createAndPopulateDateTimeFieldsTableAndTopic(kafka, pinot);
+ createAndPopulateJsonTypeTable(kafka, pinot);
+ createAndPopulateJsonTable(kafka, pinot);
+ createAndPopulateMixedCaseHybridTablesAndTopic(kafka, pinot);
+ createAndPopulateTableHavingReservedKeywordColumnNames(kafka, pinot);
+ createAndPopulateHavingQuotesInColumnNames(kafka, pinot);
+ createAndPopulateHavingMultipleColumnsWithDuplicateValues(kafka, pinot);
+
+ DistributedQueryRunner queryRunner = createPinotQueryRunner(
+ ImmutableMap.of(),
+ pinotProperties(pinot),
+ Optional.of(binder -> newOptionalBinder(binder, PinotHostMapper.class).setBinding()
+ .toInstance(new TestingPinotHostMapper(pinot.getBrokerHostAndPort(), pinot.getServerHostAndPort(), pinot.getServerGrpcHostAndPort()))));
+
+ queryRunner.installPlugin(new TpchPlugin());
+ queryRunner.createCatalog("tpch", "tpch");
+
+ // We need the query runner to populate nation and region data from tpch schema
+ createAndPopulateNationAndRegionData(kafka, pinot, queryRunner);
+
+ return queryRunner;
+ }
+
+ private void createAndPopulateAllTypesTopic(TestingKafka kafka, TestingPinotCluster pinot)
+ throws Exception
+ {
// Create and populate the all_types topic and table
kafka.createTopic(ALL_TYPES_TABLE);
@@ -165,7 +201,11 @@ protected QueryRunner createQueryRunner()
pinot.createSchema(getClass().getClassLoader().getResourceAsStream("alltypes_schema.json"), ALL_TYPES_TABLE);
pinot.addRealTimeTable(getClass().getClassLoader().getResourceAsStream("alltypes_realtimeSpec.json"), ALL_TYPES_TABLE);
+ }
+ private void createAndPopulateMixedCaseTableAndTopic(TestingKafka kafka, TestingPinotCluster pinot)
+ throws Exception
+ {
// Create and populate mixed case table and topic
kafka.createTopic(MIXED_CASE_COLUMN_NAMES_TABLE);
Schema mixedCaseAvroSchema = SchemaBuilder.record(MIXED_CASE_COLUMN_NAMES_TABLE).fields()
@@ -200,7 +240,11 @@ protected QueryRunner createQueryRunner()
kafka.sendMessages(mixedCaseProducerRecords.stream(), schemaRegistryAwareProducer(kafka));
pinot.createSchema(getClass().getClassLoader().getResourceAsStream("mixed_case_schema.json"), MIXED_CASE_COLUMN_NAMES_TABLE);
pinot.addRealTimeTable(getClass().getClassLoader().getResourceAsStream("mixed_case_realtimeSpec.json"), MIXED_CASE_COLUMN_NAMES_TABLE);
+ }
+ private void createAndPopulateMixedCaseDistinctTableAndTopic(TestingKafka kafka, TestingPinotCluster pinot)
+ throws Exception
+ {
// Create and populate mixed case distinct table and topic
kafka.createTopic(MIXED_CASE_DISTINCT_TABLE);
Schema mixedCaseDistinctAvroSchema = SchemaBuilder.record(MIXED_CASE_DISTINCT_TABLE).fields()
@@ -234,7 +278,11 @@ protected QueryRunner createQueryRunner()
// Create mixed case table name, populated from the mixed case topic
pinot.createSchema(getClass().getClassLoader().getResourceAsStream("mixed_case_table_name_schema.json"), MIXED_CASE_TABLE_NAME);
pinot.addRealTimeTable(getClass().getClassLoader().getResourceAsStream("mixed_case_table_name_realtimeSpec.json"), MIXED_CASE_TABLE_NAME);
+ }
+ private void createAndPopulateTooManyRowsTable(TestingKafka kafka, TestingPinotCluster pinot)
+ throws Exception
+ {
// Create and populate too many rows table and topic
kafka.createTopic(TOO_MANY_ROWS_TABLE);
Schema tooManyRowsAvroSchema = SchemaBuilder.record(TOO_MANY_ROWS_TABLE).fields()
@@ -255,7 +303,11 @@ protected QueryRunner createQueryRunner()
kafka.sendMessages(tooManyRowsRecordsBuilder.build().stream(), schemaRegistryAwareProducer(kafka));
pinot.createSchema(getClass().getClassLoader().getResourceAsStream("too_many_rows_schema.json"), TOO_MANY_ROWS_TABLE);
pinot.addRealTimeTable(getClass().getClassLoader().getResourceAsStream("too_many_rows_realtimeSpec.json"), TOO_MANY_ROWS_TABLE);
+ }
+ private void createAndPopulateTooManyBrokerRowsTableAndTopic(TestingKafka kafka, TestingPinotCluster pinot)
+ throws Exception
+ {
// Create and populate too many broker rows table and topic
kafka.createTopic(TOO_MANY_BROKER_ROWS_TABLE);
Schema tooManyBrokerRowsAvroSchema = SchemaBuilder.record(TOO_MANY_BROKER_ROWS_TABLE).fields()
@@ -273,7 +325,11 @@ protected QueryRunner createQueryRunner()
kafka.sendMessages(tooManyBrokerRowsRecordsBuilder.build().stream(), schemaRegistryAwareProducer(kafka));
pinot.createSchema(getClass().getClassLoader().getResourceAsStream("too_many_broker_rows_schema.json"), TOO_MANY_BROKER_ROWS_TABLE);
pinot.addRealTimeTable(getClass().getClassLoader().getResourceAsStream("too_many_broker_rows_realtimeSpec.json"), TOO_MANY_BROKER_ROWS_TABLE);
+ }
+ private void createTheDuplicateTablesAndTopics(TestingKafka kafka, TestingPinotCluster pinot)
+ throws Exception
+ {
// Create the duplicate tables and topics
kafka.createTopic(DUPLICATE_TABLE_LOWERCASE);
pinot.createSchema(getClass().getClassLoader().getResourceAsStream("dup_table_lower_case_schema.json"), DUPLICATE_TABLE_LOWERCASE);
@@ -282,7 +338,11 @@ protected QueryRunner createQueryRunner()
kafka.createTopic(DUPLICATE_TABLE_MIXED_CASE);
pinot.createSchema(getClass().getClassLoader().getResourceAsStream("dup_table_mixed_case_schema.json"), DUPLICATE_TABLE_MIXED_CASE);
pinot.addRealTimeTable(getClass().getClassLoader().getResourceAsStream("dup_table_mixed_case_realtimeSpec.json"), DUPLICATE_TABLE_MIXED_CASE);
+ }
+ private void createAndPopulateDateTimeFieldsTableAndTopic(TestingKafka kafka, TestingPinotCluster pinot)
+ throws Exception
+ {
// Create and populate date time fields table and topic
kafka.createTopic(DATE_TIME_FIELDS_TABLE);
Schema dateTimeFieldsAvroSchema = SchemaBuilder.record(DATE_TIME_FIELDS_TABLE).fields()
@@ -310,7 +370,11 @@ protected QueryRunner createQueryRunner()
kafka.sendMessages(dateTimeFieldsProducerRecords.stream(), schemaRegistryAwareProducer(kafka));
pinot.createSchema(getClass().getClassLoader().getResourceAsStream("date_time_fields_schema.json"), DATE_TIME_FIELDS_TABLE);
pinot.addRealTimeTable(getClass().getClassLoader().getResourceAsStream("date_time_fields_realtimeSpec.json"), DATE_TIME_FIELDS_TABLE);
+ }
+ private void createAndPopulateJsonTypeTable(TestingKafka kafka, TestingPinotCluster pinot)
+ throws Exception
+ {
// Create json type table
kafka.createTopic(JSON_TYPE_TABLE);
@@ -332,7 +396,11 @@ protected QueryRunner createQueryRunner()
pinot.createSchema(getClass().getClassLoader().getResourceAsStream("json_schema.json"), JSON_TYPE_TABLE);
pinot.addRealTimeTable(getClass().getClassLoader().getResourceAsStream("json_realtimeSpec.json"), JSON_TYPE_TABLE);
pinot.addOfflineTable(getClass().getClassLoader().getResourceAsStream("json_offlineSpec.json"), JSON_TYPE_TABLE);
+ }
+ private void createAndPopulateJsonTable(TestingKafka kafka, TestingPinotCluster pinot)
+ throws Exception
+ {
// Create json table
kafka.createTopic(JSON_TABLE);
long key = 0L;
@@ -347,7 +415,11 @@ protected QueryRunner createQueryRunner()
pinot.createSchema(getClass().getClassLoader().getResourceAsStream("schema.json"), JSON_TABLE);
pinot.addRealTimeTable(getClass().getClassLoader().getResourceAsStream("realtimeSpec.json"), JSON_TABLE);
+ }
+ private void createAndPopulateMixedCaseHybridTablesAndTopic(TestingKafka kafka, TestingPinotCluster pinot)
+ throws Exception
+ {
// Create and populate mixed case table and topic
kafka.createTopic(HYBRID_TABLE_NAME);
Schema hybridAvroSchema = SchemaBuilder.record(HYBRID_TABLE_NAME).fields()
@@ -433,7 +505,11 @@ protected QueryRunner createQueryRunner()
}
kafka.sendMessages(hybridProducerRecords.stream(), schemaRegistryAwareProducer(kafka));
+ }
+ private void createAndPopulateTableHavingReservedKeywordColumnNames(TestingKafka kafka, TestingPinotCluster pinot)
+ throws Exception
+ {
// Create a table having reserved keyword column names
kafka.createTopic(RESERVED_KEYWORD_TABLE);
Schema reservedKeywordAvroSchema = SchemaBuilder.record(RESERVED_KEYWORD_TABLE).fields()
@@ -447,7 +523,11 @@ protected QueryRunner createQueryRunner()
kafka.sendMessages(reservedKeywordRecordsBuilder.build().stream(), schemaRegistryAwareProducer(kafka));
pinot.createSchema(getClass().getClassLoader().getResourceAsStream("reserved_keyword_schema.json"), RESERVED_KEYWORD_TABLE);
pinot.addRealTimeTable(getClass().getClassLoader().getResourceAsStream("reserved_keyword_realtimeSpec.json"), RESERVED_KEYWORD_TABLE);
+ }
+ private void createAndPopulateHavingQuotesInColumnNames(TestingKafka kafka, TestingPinotCluster pinot)
+ throws Exception
+ {
// Create a table having quotes in column names
kafka.createTopic(QUOTES_IN_COLUMN_NAME_TABLE);
Schema quotesInColumnNameAvroSchema = SchemaBuilder.record(QUOTES_IN_COLUMN_NAME_TABLE).fields()
@@ -460,7 +540,11 @@ protected QueryRunner createQueryRunner()
kafka.sendMessages(quotesInColumnNameRecordsBuilder.build().stream(), schemaRegistryAwareProducer(kafka));
pinot.createSchema(getClass().getClassLoader().getResourceAsStream("quotes_in_column_name_schema.json"), QUOTES_IN_COLUMN_NAME_TABLE);
pinot.addRealTimeTable(getClass().getClassLoader().getResourceAsStream("quotes_in_column_name_realtimeSpec.json"), QUOTES_IN_COLUMN_NAME_TABLE);
+ }
+ private void createAndPopulateHavingMultipleColumnsWithDuplicateValues(TestingKafka kafka, TestingPinotCluster pinot)
+ throws Exception
+ {
// Create a table having multiple columns with duplicate values
kafka.createTopic(DUPLICATE_VALUES_IN_COLUMNS_TABLE);
Schema duplicateValuesInColumnsAvroSchema = SchemaBuilder.record(DUPLICATE_VALUES_IN_COLUMNS_TABLE).fields()
@@ -523,12 +607,71 @@ protected QueryRunner createQueryRunner()
kafka.sendMessages(duplicateValuesInColumnsRecordsBuilder.build().stream(), schemaRegistryAwareProducer(kafka));
pinot.createSchema(getClass().getClassLoader().getResourceAsStream("duplicate_values_in_columns_schema.json"), DUPLICATE_VALUES_IN_COLUMNS_TABLE);
pinot.addRealTimeTable(getClass().getClassLoader().getResourceAsStream("duplicate_values_in_columns_realtimeSpec.json"), DUPLICATE_VALUES_IN_COLUMNS_TABLE);
+ }
- return PinotQueryRunner.createPinotQueryRunner(
- ImmutableMap.of(),
- pinotProperties(pinot),
- Optional.of(binder -> newOptionalBinder(binder, PinotHostMapper.class).setBinding()
- .toInstance(new TestingPinotHostMapper(pinot.getBrokerHostAndPort(), pinot.getServerHostAndPort(), pinot.getServerGrpcHostAndPort()))));
+ private void createAndPopulateNationAndRegionData(TestingKafka kafka, TestingPinotCluster pinot, DistributedQueryRunner queryRunner)
+ throws Exception
+ {
+ // Create and populate table and topic data
+ String regionTableName = "region";
+ kafka.createTopicWithConfig(2, 1, regionTableName, false);
+ Schema regionSchema = SchemaBuilder.record(regionTableName).fields()
+ // regionkey bigint, name varchar, comment varchar
+ .name("regionkey").type().longType().noDefault()
+ .name("name").type().stringType().noDefault()
+ .name("comment").type().stringType().noDefault()
+ .name("updated_at_seconds").type().longType().noDefault()
+ .endRecord();
+ ImmutableList.Builder> regionRowsBuilder = ImmutableList.builder();
+ MaterializedResult regionRows = queryRunner.execute("SELECT * FROM tpch.tiny.region");
+ for (MaterializedRow row : regionRows.getMaterializedRows()) {
+ regionRowsBuilder.add(new ProducerRecord<>(regionTableName, "key" + row.getField(0), new GenericRecordBuilder(regionSchema)
+ .set("regionkey", row.getField(0))
+ .set("name", row.getField(1))
+ .set("comment", row.getField(2))
+ .set("updated_at_seconds", initialUpdatedAt.plusMillis(1000).toEpochMilli())
+ .build()));
+ }
+ kafka.sendMessages(regionRowsBuilder.build().stream(), schemaRegistryAwareProducer(kafka));
+ pinot.createSchema(getClass().getClassLoader().getResourceAsStream("region_schema.json"), regionTableName);
+ pinot.addRealTimeTable(getClass().getClassLoader().getResourceAsStream("region_realtimeSpec.json"), regionTableName);
+
+ String nationTableName = "nation";
+ kafka.createTopicWithConfig(2, 1, nationTableName, false);
+ Schema nationSchema = SchemaBuilder.record(nationTableName).fields()
+ // nationkey BIGINT, name VARCHAR, VARCHAR, regionkey BIGINT
+ .name("nationkey").type().longType().noDefault()
+ .name("name").type().stringType().noDefault()
+ .name("comment").type().stringType().noDefault()
+ .name("regionkey").type().longType().noDefault()
+ .name("updated_at_seconds").type().longType().noDefault()
+ .endRecord();
+ ImmutableList.Builder> nationRowsBuilder = ImmutableList.builder();
+ MaterializedResult nationRows = queryRunner.execute("SELECT * FROM tpch.tiny.nation");
+ for (MaterializedRow row : nationRows.getMaterializedRows()) {
+ nationRowsBuilder.add(new ProducerRecord<>(nationTableName, "key" + row.getField(0), new GenericRecordBuilder(nationSchema)
+ .set("nationkey", row.getField(0))
+ .set("name", row.getField(1))
+ .set("comment", row.getField(3))
+ .set("regionkey", row.getField(2))
+ .set("updated_at_seconds", initialUpdatedAt.plusMillis(1000).toEpochMilli())
+ .build()));
+ }
+ kafka.sendMessages(nationRowsBuilder.build().stream(), schemaRegistryAwareProducer(kafka));
+ pinot.createSchema(getClass().getClassLoader().getResourceAsStream("nation_schema.json"), nationTableName);
+ pinot.addRealTimeTable(getClass().getClassLoader().getResourceAsStream("nation_realtimeSpec.json"), nationTableName);
+ }
+
+ @Override
+ protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior)
+ {
+ return switch (connectorBehavior) {
+ case SUPPORTS_CREATE_SCHEMA,
+ SUPPORTS_CREATE_TABLE,
+ SUPPORTS_INSERT,
+ SUPPORTS_RENAME_TABLE -> false;
+ default -> super.hasBehavior(connectorBehavior);
+ };
}
private Map pinotProperties(TestingPinotCluster pinot)
@@ -833,6 +976,37 @@ public static Object of(
}
}
+ @Override
+ public void testShowCreateTable()
+ {
+ assertQueryFails("SHOW CREATE TABLE region", "No PropertyMetadata for property: pinotColumnName");
+ }
+
+ @Override
+ public void testSelectInformationSchemaColumns()
+ {
+ // Override because there's updated_at_seconds column
+ assertThat(query("SELECT column_name FROM information_schema.columns WHERE table_schema = 'default' AND table_name = 'region'"))
+ .skippingTypesCheck()
+ .matches("VALUES 'regionkey', 'name', 'comment', 'updated_at_seconds'");
+ }
+
+ @Override
+ public void testTopN()
+ {
+ // TODO https://github.com/trinodb/trino/issues/14045 Fix ORDER BY ... LIMIT query
+ assertQueryFails("SELECT regionkey FROM nation ORDER BY name LIMIT 3",
+ format("Segment query returned '%2$s' rows per split, maximum allowed is '%1$s' rows. with query \"SELECT \"regionkey\", \"name\" FROM nation_REALTIME LIMIT 12\"", MAX_ROWS_PER_SPLIT_FOR_SEGMENT_QUERIES, MAX_ROWS_PER_SPLIT_FOR_SEGMENT_QUERIES + 1));
+ }
+
+ @Override
+ public void testJoin()
+ {
+ // TODO https://github.com/trinodb/trino/issues/14046 Fix JOIN query
+ assertQueryFails("SELECT n.name, r.name FROM nation n JOIN region r on n.regionkey = r.regionkey",
+ format("Segment query returned '%2$s' rows per split, maximum allowed is '%1$s' rows. with query \"SELECT \"regionkey\", \"name\" FROM nation_REALTIME LIMIT 12\"", MAX_ROWS_PER_SPLIT_FOR_SEGMENT_QUERIES, MAX_ROWS_PER_SPLIT_FOR_SEGMENT_QUERIES + 1));
+ }
+
@Test
public void testRealType()
{
@@ -1361,12 +1535,6 @@ public void testLimitPushdown()
.isNotFullyPushedDown(LimitNode.class);
}
- @Test
- public void testCreateTable()
- {
- assertQueryFails("CREATE TABLE test_create_table (col INT)", "This connector does not support creating tables");
- }
-
/**
* https://github.com/trinodb/trino/issues/8307
*/
diff --git a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/PinotQueryRunner.java b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/PinotQueryRunner.java
index 235e55bd9bce..565561700b60 100755
--- a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/PinotQueryRunner.java
+++ b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/PinotQueryRunner.java
@@ -43,7 +43,6 @@ public static DistributedQueryRunner createPinotQueryRunner(Map
throws Exception
{
DistributedQueryRunner queryRunner = DistributedQueryRunner.builder(createSession("default"))
- .setNodeCount(2)
.setExtraProperties(extraProperties)
.build();
queryRunner.installPlugin(new PinotPlugin(extension));
diff --git a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotWithoutAuthenticationIntegrationSmokeTest.java b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotWithoutAuthenticationIntegrationConnectorConnectorSmokeTest.java
similarity index 83%
rename from plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotWithoutAuthenticationIntegrationSmokeTest.java
rename to plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotWithoutAuthenticationIntegrationConnectorConnectorSmokeTest.java
index 54d1008c3475..424aa53ec07d 100644
--- a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotWithoutAuthenticationIntegrationSmokeTest.java
+++ b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotWithoutAuthenticationIntegrationConnectorConnectorSmokeTest.java
@@ -13,8 +13,8 @@
*/
package io.trino.plugin.pinot;
-public class TestPinotWithoutAuthenticationIntegrationSmokeTest
- extends AbstractPinotIntegrationSmokeTest
+public class TestPinotWithoutAuthenticationIntegrationConnectorConnectorSmokeTest
+ extends BasePinotIntegrationConnectorSmokeTest
{
@Override
protected boolean isSecured()
diff --git a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotWithoutAuthenticationIntegrationSmokeTestLatestVersion.java b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotWithoutAuthenticationIntegrationLatestVersionConnectorConnectorSmokeTest.java
similarity index 85%
rename from plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotWithoutAuthenticationIntegrationSmokeTestLatestVersion.java
rename to plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotWithoutAuthenticationIntegrationLatestVersionConnectorConnectorSmokeTest.java
index 05b7c5d89e38..35795c92c566 100644
--- a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotWithoutAuthenticationIntegrationSmokeTestLatestVersion.java
+++ b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotWithoutAuthenticationIntegrationLatestVersionConnectorConnectorSmokeTest.java
@@ -15,8 +15,8 @@
import static io.trino.plugin.pinot.TestingPinotCluster.PINOT_LATEST_IMAGE_NAME;
-public class TestPinotWithoutAuthenticationIntegrationSmokeTestLatestVersion
- extends AbstractPinotIntegrationSmokeTest
+public class TestPinotWithoutAuthenticationIntegrationLatestVersionConnectorConnectorSmokeTest
+ extends BasePinotIntegrationConnectorSmokeTest
{
@Override
protected boolean isSecured()
diff --git a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotWithoutAuthenticationIntegrationSmokeTestLatestVersionNoGrpc.java b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotWithoutAuthenticationIntegrationLatestVersionNoGrpcConnectorSmokeTest.java
similarity index 86%
rename from plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotWithoutAuthenticationIntegrationSmokeTestLatestVersionNoGrpc.java
rename to plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotWithoutAuthenticationIntegrationLatestVersionNoGrpcConnectorSmokeTest.java
index 979ed75d5c8b..274e6f2da1a0 100644
--- a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotWithoutAuthenticationIntegrationSmokeTestLatestVersionNoGrpc.java
+++ b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotWithoutAuthenticationIntegrationLatestVersionNoGrpcConnectorSmokeTest.java
@@ -15,8 +15,8 @@
import static io.trino.plugin.pinot.TestingPinotCluster.PINOT_LATEST_IMAGE_NAME;
-public class TestPinotWithoutAuthenticationIntegrationSmokeTestLatestVersionNoGrpc
- extends AbstractPinotIntegrationSmokeTest
+public class TestPinotWithoutAuthenticationIntegrationLatestVersionNoGrpcConnectorSmokeTest
+ extends BasePinotIntegrationConnectorSmokeTest
{
@Override
protected boolean isSecured()
diff --git a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestSecuredPinotIntegrationSmokeTest.java b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestSecuredPinotIntegrationConnectorSmokeTest.java
similarity index 92%
rename from plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestSecuredPinotIntegrationSmokeTest.java
rename to plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestSecuredPinotIntegrationConnectorSmokeTest.java
index 189b6236a327..4cd780e1b046 100644
--- a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestSecuredPinotIntegrationSmokeTest.java
+++ b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestSecuredPinotIntegrationConnectorSmokeTest.java
@@ -19,8 +19,8 @@
import static io.trino.plugin.pinot.auth.PinotAuthenticationType.PASSWORD;
-public class TestSecuredPinotIntegrationSmokeTest
- extends AbstractPinotIntegrationSmokeTest
+public class TestSecuredPinotIntegrationConnectorSmokeTest
+ extends BasePinotIntegrationConnectorSmokeTest
{
@Override
protected boolean isSecured()
diff --git a/plugin/trino-pinot/src/test/resources/nation_realtimeSpec.json b/plugin/trino-pinot/src/test/resources/nation_realtimeSpec.json
new file mode 100644
index 000000000000..9cce19a01f8c
--- /dev/null
+++ b/plugin/trino-pinot/src/test/resources/nation_realtimeSpec.json
@@ -0,0 +1,46 @@
+{
+ "tableName": "nation",
+ "tableType": "REALTIME",
+ "segmentsConfig": {
+ "timeColumnName": "updated_at_seconds",
+ "retentionTimeUnit": "DAYS",
+ "retentionTimeValue": "365",
+ "segmentPushType": "APPEND",
+ "segmentPushFrequency": "daily",
+ "segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy",
+ "schemaName": "nation",
+ "replicasPerPartition": "1"
+ },
+ "tenants": {
+ "broker": "DefaultTenant",
+ "server": "DefaultTenant"
+ },
+ "tableIndexConfig": {
+ "loadMode": "MMAP",
+ "noDictionaryColumns": [],
+ "sortedColumn": [
+ "updated_at_seconds"
+ ],
+ "aggregateMetrics": "false",
+ "nullHandlingEnabled": "true",
+ "streamConfigs": {
+ "streamType": "kafka",
+ "stream.kafka.consumer.type": "lowLevel",
+ "stream.kafka.topic.name": "nation",
+ "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder",
+ "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
+ "stream.kafka.decoder.prop.schema.registry.rest.url": "http://schema-registry:8081",
+ "stream.kafka.zk.broker.url": "zookeeper:2181/",
+ "stream.kafka.broker.list": "kafka:9092",
+ "realtime.segment.flush.threshold.time": "1m",
+ "realtime.segment.flush.threshold.size": "0",
+ "realtime.segment.flush.desired.size": "1M",
+ "isolation.level": "read_committed",
+ "stream.kafka.consumer.prop.auto.offset.reset": "smallest",
+ "stream.kafka.consumer.prop.group.id": "pinot_nation"
+ }
+ },
+ "metadata": {
+ "customConfigs": {}
+ }
+}
diff --git a/plugin/trino-pinot/src/test/resources/nation_schema.json b/plugin/trino-pinot/src/test/resources/nation_schema.json
new file mode 100644
index 000000000000..b6c2922d8f07
--- /dev/null
+++ b/plugin/trino-pinot/src/test/resources/nation_schema.json
@@ -0,0 +1,31 @@
+{
+ "schemaName": "nation",
+ "dimensionFieldSpecs": [
+ {
+ "name": "nationkey",
+ "dataType": "LONG"
+ },
+ {
+ "name": "name",
+ "dataType": "STRING"
+ },
+ {
+ "name": "comment",
+ "dataType": "STRING"
+ },
+ {
+ "name": "regionkey",
+ "dataType": "LONG"
+ }
+ ],
+ "dateTimeFieldSpecs": [
+ {
+ "name": "updated_at_seconds",
+ "dataType": "LONG",
+ "defaultNullValue": 0,
+ "format": "1:SECONDS:EPOCH",
+ "transformFunction": "toEpochSeconds(updated_at)",
+ "granularity": "1:SECONDS"
+ }
+ ]
+}
diff --git a/plugin/trino-pinot/src/test/resources/region_realtimeSpec.json b/plugin/trino-pinot/src/test/resources/region_realtimeSpec.json
new file mode 100644
index 000000000000..2fa3cdea0320
--- /dev/null
+++ b/plugin/trino-pinot/src/test/resources/region_realtimeSpec.json
@@ -0,0 +1,46 @@
+{
+ "tableName": "region",
+ "tableType": "REALTIME",
+ "segmentsConfig": {
+ "timeColumnName": "updated_at_seconds",
+ "retentionTimeUnit": "DAYS",
+ "retentionTimeValue": "365",
+ "segmentPushType": "APPEND",
+ "segmentPushFrequency": "daily",
+ "segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy",
+ "schemaName": "region",
+ "replicasPerPartition": "1"
+ },
+ "tenants": {
+ "broker": "DefaultTenant",
+ "server": "DefaultTenant"
+ },
+ "tableIndexConfig": {
+ "loadMode": "MMAP",
+ "noDictionaryColumns": [],
+ "sortedColumn": [
+ "updated_at_seconds"
+ ],
+ "aggregateMetrics": "false",
+ "nullHandlingEnabled": "true",
+ "streamConfigs": {
+ "streamType": "kafka",
+ "stream.kafka.consumer.type": "lowLevel",
+ "stream.kafka.topic.name": "region",
+ "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder",
+ "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
+ "stream.kafka.decoder.prop.schema.registry.rest.url": "http://schema-registry:8081",
+ "stream.kafka.zk.broker.url": "zookeeper:2181/",
+ "stream.kafka.broker.list": "kafka:9092",
+ "realtime.segment.flush.threshold.time": "1m",
+ "realtime.segment.flush.threshold.size": "0",
+ "realtime.segment.flush.desired.size": "1M",
+ "isolation.level": "read_committed",
+ "stream.kafka.consumer.prop.auto.offset.reset": "smallest",
+ "stream.kafka.consumer.prop.group.id": "pinot_region"
+ }
+ },
+ "metadata": {
+ "customConfigs": {}
+ }
+}
diff --git a/plugin/trino-pinot/src/test/resources/region_schema.json b/plugin/trino-pinot/src/test/resources/region_schema.json
new file mode 100644
index 000000000000..65f3dee8c8b9
--- /dev/null
+++ b/plugin/trino-pinot/src/test/resources/region_schema.json
@@ -0,0 +1,27 @@
+{
+ "schemaName": "region",
+ "dimensionFieldSpecs": [
+ {
+ "name": "regionkey",
+ "dataType": "LONG"
+ },
+ {
+ "name": "name",
+ "dataType": "STRING"
+ },
+ {
+ "name": "comment",
+ "dataType": "STRING"
+ }
+ ],
+ "dateTimeFieldSpecs": [
+ {
+ "name": "updated_at_seconds",
+ "dataType": "LONG",
+ "defaultNullValue": 0,
+ "format": "1:SECONDS:EPOCH",
+ "transformFunction": "toEpochSeconds(updated_at)",
+ "granularity": "1:SECONDS"
+ }
+ ]
+}