From 12cdb3ceb32e5bd5293686b2c2e985213c1c437f Mon Sep 17 00:00:00 2001 From: Nagaraj Tantri Date: Tue, 6 Sep 2022 18:26:50 +0100 Subject: [PATCH 1/2] Extract method to prepare Pinot testing tables --- .../AbstractPinotIntegrationSmokeTest.java | 78 +++++++++++++++++-- 1 file changed, 72 insertions(+), 6 deletions(-) diff --git a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/AbstractPinotIntegrationSmokeTest.java b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/AbstractPinotIntegrationSmokeTest.java index cffea1e42a6f..af3cf513b510 100644 --- a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/AbstractPinotIntegrationSmokeTest.java +++ b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/AbstractPinotIntegrationSmokeTest.java @@ -142,6 +142,30 @@ protected QueryRunner createQueryRunner() TestingPinotCluster pinot = closeAfterClass(new TestingPinotCluster(kafka.getNetwork(), isSecured(), getPinotImageName())); pinot.start(); + createAndPopulateAllTypesTopic(kafka, pinot); + createAndPopulateMixedCaseTableAndTopic(kafka, pinot); + createAndPopulateMixedCaseDistinctTableAndTopic(kafka, pinot); + createAndPopulateTooManyRowsTable(kafka, pinot); + createAndPopulateTooManyBrokerRowsTableAndTopic(kafka, pinot); + createTheDuplicateTablesAndTopics(kafka, pinot); + createAndPopulateDateTimeFieldsTableAndTopic(kafka, pinot); + createAndPopulateJsonTypeTable(kafka, pinot); + createAndPopulateJsonTable(kafka, pinot); + createAndPopulateMixedCaseHybridTablesAndTopic(kafka, pinot); + createAndPopulateTableHavingReservedKeywordColumnNames(kafka, pinot); + createAndPopulateHavingQuotesInColumnNames(kafka, pinot); + createAndPopulateHavingMultipleColumnsWithDuplicateValues(kafka, pinot); + + return PinotQueryRunner.createPinotQueryRunner( + ImmutableMap.of(), + pinotProperties(pinot), + Optional.of(binder -> newOptionalBinder(binder, PinotHostMapper.class).setBinding() + .toInstance(new TestingPinotHostMapper(pinot.getBrokerHostAndPort(), pinot.getServerHostAndPort(), pinot.getServerGrpcHostAndPort())))); + } + + private void createAndPopulateAllTypesTopic(TestingKafka kafka, TestingPinotCluster pinot) + throws Exception + { // Create and populate the all_types topic and table kafka.createTopic(ALL_TYPES_TABLE); @@ -165,7 +189,11 @@ protected QueryRunner createQueryRunner() pinot.createSchema(getClass().getClassLoader().getResourceAsStream("alltypes_schema.json"), ALL_TYPES_TABLE); pinot.addRealTimeTable(getClass().getClassLoader().getResourceAsStream("alltypes_realtimeSpec.json"), ALL_TYPES_TABLE); + } + private void createAndPopulateMixedCaseTableAndTopic(TestingKafka kafka, TestingPinotCluster pinot) + throws Exception + { // Create and populate mixed case table and topic kafka.createTopic(MIXED_CASE_COLUMN_NAMES_TABLE); Schema mixedCaseAvroSchema = SchemaBuilder.record(MIXED_CASE_COLUMN_NAMES_TABLE).fields() @@ -200,7 +228,11 @@ protected QueryRunner createQueryRunner() kafka.sendMessages(mixedCaseProducerRecords.stream(), schemaRegistryAwareProducer(kafka)); pinot.createSchema(getClass().getClassLoader().getResourceAsStream("mixed_case_schema.json"), MIXED_CASE_COLUMN_NAMES_TABLE); pinot.addRealTimeTable(getClass().getClassLoader().getResourceAsStream("mixed_case_realtimeSpec.json"), MIXED_CASE_COLUMN_NAMES_TABLE); + } + private void createAndPopulateMixedCaseDistinctTableAndTopic(TestingKafka kafka, TestingPinotCluster pinot) + throws Exception + { // Create and populate mixed case distinct table and topic kafka.createTopic(MIXED_CASE_DISTINCT_TABLE); Schema mixedCaseDistinctAvroSchema = SchemaBuilder.record(MIXED_CASE_DISTINCT_TABLE).fields() @@ -234,7 +266,11 @@ protected QueryRunner createQueryRunner() // Create mixed case table name, populated from the mixed case topic pinot.createSchema(getClass().getClassLoader().getResourceAsStream("mixed_case_table_name_schema.json"), MIXED_CASE_TABLE_NAME); pinot.addRealTimeTable(getClass().getClassLoader().getResourceAsStream("mixed_case_table_name_realtimeSpec.json"), MIXED_CASE_TABLE_NAME); + } + private void createAndPopulateTooManyRowsTable(TestingKafka kafka, TestingPinotCluster pinot) + throws Exception + { // Create and populate too many rows table and topic kafka.createTopic(TOO_MANY_ROWS_TABLE); Schema tooManyRowsAvroSchema = SchemaBuilder.record(TOO_MANY_ROWS_TABLE).fields() @@ -255,7 +291,11 @@ protected QueryRunner createQueryRunner() kafka.sendMessages(tooManyRowsRecordsBuilder.build().stream(), schemaRegistryAwareProducer(kafka)); pinot.createSchema(getClass().getClassLoader().getResourceAsStream("too_many_rows_schema.json"), TOO_MANY_ROWS_TABLE); pinot.addRealTimeTable(getClass().getClassLoader().getResourceAsStream("too_many_rows_realtimeSpec.json"), TOO_MANY_ROWS_TABLE); + } + private void createAndPopulateTooManyBrokerRowsTableAndTopic(TestingKafka kafka, TestingPinotCluster pinot) + throws Exception + { // Create and populate too many broker rows table and topic kafka.createTopic(TOO_MANY_BROKER_ROWS_TABLE); Schema tooManyBrokerRowsAvroSchema = SchemaBuilder.record(TOO_MANY_BROKER_ROWS_TABLE).fields() @@ -273,7 +313,11 @@ protected QueryRunner createQueryRunner() kafka.sendMessages(tooManyBrokerRowsRecordsBuilder.build().stream(), schemaRegistryAwareProducer(kafka)); pinot.createSchema(getClass().getClassLoader().getResourceAsStream("too_many_broker_rows_schema.json"), TOO_MANY_BROKER_ROWS_TABLE); pinot.addRealTimeTable(getClass().getClassLoader().getResourceAsStream("too_many_broker_rows_realtimeSpec.json"), TOO_MANY_BROKER_ROWS_TABLE); + } + private void createTheDuplicateTablesAndTopics(TestingKafka kafka, TestingPinotCluster pinot) + throws Exception + { // Create the duplicate tables and topics kafka.createTopic(DUPLICATE_TABLE_LOWERCASE); pinot.createSchema(getClass().getClassLoader().getResourceAsStream("dup_table_lower_case_schema.json"), DUPLICATE_TABLE_LOWERCASE); @@ -282,7 +326,11 @@ protected QueryRunner createQueryRunner() kafka.createTopic(DUPLICATE_TABLE_MIXED_CASE); pinot.createSchema(getClass().getClassLoader().getResourceAsStream("dup_table_mixed_case_schema.json"), DUPLICATE_TABLE_MIXED_CASE); pinot.addRealTimeTable(getClass().getClassLoader().getResourceAsStream("dup_table_mixed_case_realtimeSpec.json"), DUPLICATE_TABLE_MIXED_CASE); + } + private void createAndPopulateDateTimeFieldsTableAndTopic(TestingKafka kafka, TestingPinotCluster pinot) + throws Exception + { // Create and populate date time fields table and topic kafka.createTopic(DATE_TIME_FIELDS_TABLE); Schema dateTimeFieldsAvroSchema = SchemaBuilder.record(DATE_TIME_FIELDS_TABLE).fields() @@ -310,7 +358,11 @@ protected QueryRunner createQueryRunner() kafka.sendMessages(dateTimeFieldsProducerRecords.stream(), schemaRegistryAwareProducer(kafka)); pinot.createSchema(getClass().getClassLoader().getResourceAsStream("date_time_fields_schema.json"), DATE_TIME_FIELDS_TABLE); pinot.addRealTimeTable(getClass().getClassLoader().getResourceAsStream("date_time_fields_realtimeSpec.json"), DATE_TIME_FIELDS_TABLE); + } + private void createAndPopulateJsonTypeTable(TestingKafka kafka, TestingPinotCluster pinot) + throws Exception + { // Create json type table kafka.createTopic(JSON_TYPE_TABLE); @@ -332,7 +384,11 @@ protected QueryRunner createQueryRunner() pinot.createSchema(getClass().getClassLoader().getResourceAsStream("json_schema.json"), JSON_TYPE_TABLE); pinot.addRealTimeTable(getClass().getClassLoader().getResourceAsStream("json_realtimeSpec.json"), JSON_TYPE_TABLE); pinot.addOfflineTable(getClass().getClassLoader().getResourceAsStream("json_offlineSpec.json"), JSON_TYPE_TABLE); + } + private void createAndPopulateJsonTable(TestingKafka kafka, TestingPinotCluster pinot) + throws Exception + { // Create json table kafka.createTopic(JSON_TABLE); long key = 0L; @@ -347,7 +403,11 @@ protected QueryRunner createQueryRunner() pinot.createSchema(getClass().getClassLoader().getResourceAsStream("schema.json"), JSON_TABLE); pinot.addRealTimeTable(getClass().getClassLoader().getResourceAsStream("realtimeSpec.json"), JSON_TABLE); + } + private void createAndPopulateMixedCaseHybridTablesAndTopic(TestingKafka kafka, TestingPinotCluster pinot) + throws Exception + { // Create and populate mixed case table and topic kafka.createTopic(HYBRID_TABLE_NAME); Schema hybridAvroSchema = SchemaBuilder.record(HYBRID_TABLE_NAME).fields() @@ -433,7 +493,11 @@ protected QueryRunner createQueryRunner() } kafka.sendMessages(hybridProducerRecords.stream(), schemaRegistryAwareProducer(kafka)); + } + private void createAndPopulateTableHavingReservedKeywordColumnNames(TestingKafka kafka, TestingPinotCluster pinot) + throws Exception + { // Create a table having reserved keyword column names kafka.createTopic(RESERVED_KEYWORD_TABLE); Schema reservedKeywordAvroSchema = SchemaBuilder.record(RESERVED_KEYWORD_TABLE).fields() @@ -447,7 +511,11 @@ protected QueryRunner createQueryRunner() kafka.sendMessages(reservedKeywordRecordsBuilder.build().stream(), schemaRegistryAwareProducer(kafka)); pinot.createSchema(getClass().getClassLoader().getResourceAsStream("reserved_keyword_schema.json"), RESERVED_KEYWORD_TABLE); pinot.addRealTimeTable(getClass().getClassLoader().getResourceAsStream("reserved_keyword_realtimeSpec.json"), RESERVED_KEYWORD_TABLE); + } + private void createAndPopulateHavingQuotesInColumnNames(TestingKafka kafka, TestingPinotCluster pinot) + throws Exception + { // Create a table having quotes in column names kafka.createTopic(QUOTES_IN_COLUMN_NAME_TABLE); Schema quotesInColumnNameAvroSchema = SchemaBuilder.record(QUOTES_IN_COLUMN_NAME_TABLE).fields() @@ -460,7 +528,11 @@ protected QueryRunner createQueryRunner() kafka.sendMessages(quotesInColumnNameRecordsBuilder.build().stream(), schemaRegistryAwareProducer(kafka)); pinot.createSchema(getClass().getClassLoader().getResourceAsStream("quotes_in_column_name_schema.json"), QUOTES_IN_COLUMN_NAME_TABLE); pinot.addRealTimeTable(getClass().getClassLoader().getResourceAsStream("quotes_in_column_name_realtimeSpec.json"), QUOTES_IN_COLUMN_NAME_TABLE); + } + private void createAndPopulateHavingMultipleColumnsWithDuplicateValues(TestingKafka kafka, TestingPinotCluster pinot) + throws Exception + { // Create a table having multiple columns with duplicate values kafka.createTopic(DUPLICATE_VALUES_IN_COLUMNS_TABLE); Schema duplicateValuesInColumnsAvroSchema = SchemaBuilder.record(DUPLICATE_VALUES_IN_COLUMNS_TABLE).fields() @@ -523,12 +595,6 @@ protected QueryRunner createQueryRunner() kafka.sendMessages(duplicateValuesInColumnsRecordsBuilder.build().stream(), schemaRegistryAwareProducer(kafka)); pinot.createSchema(getClass().getClassLoader().getResourceAsStream("duplicate_values_in_columns_schema.json"), DUPLICATE_VALUES_IN_COLUMNS_TABLE); pinot.addRealTimeTable(getClass().getClassLoader().getResourceAsStream("duplicate_values_in_columns_realtimeSpec.json"), DUPLICATE_VALUES_IN_COLUMNS_TABLE); - - return PinotQueryRunner.createPinotQueryRunner( - ImmutableMap.of(), - pinotProperties(pinot), - Optional.of(binder -> newOptionalBinder(binder, PinotHostMapper.class).setBinding() - .toInstance(new TestingPinotHostMapper(pinot.getBrokerHostAndPort(), pinot.getServerHostAndPort(), pinot.getServerGrpcHostAndPort())))); } private Map pinotProperties(TestingPinotCluster pinot) From 6bd1802cf69224d887b08902cdc51d056a2fc74b Mon Sep 17 00:00:00 2001 From: Nagaraj Tantri Date: Tue, 6 Sep 2022 18:30:50 +0100 Subject: [PATCH 2/2] Extend BaseConnectorSmokeTest in Pinot tests Made the AbstractPinotIntegrationSmokeTest to extended the same to BaseConnectorSmokeTest Renamed AbstractPinotIntegrationSmokeTest to BasePinotIntegrationConnectorSmokeTest --- plugin/trino-pinot/pom.xml | 12 ++ ...sePinotIntegrationConnectorSmokeTest.java} | 124 ++++++++++++++++-- .../trino/plugin/pinot/PinotQueryRunner.java | 1 - ...tegrationConnectorConnectorSmokeTest.java} | 4 +- ...stVersionConnectorConnectorSmokeTest.java} | 4 +- ...atestVersionNoGrpcConnectorSmokeTest.java} | 4 +- ...edPinotIntegrationConnectorSmokeTest.java} | 4 +- .../test/resources/nation_realtimeSpec.json | 46 +++++++ .../src/test/resources/nation_schema.json | 31 +++++ .../test/resources/region_realtimeSpec.json | 46 +++++++ .../src/test/resources/region_schema.json | 27 ++++ 11 files changed, 283 insertions(+), 20 deletions(-) rename plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/{AbstractPinotIntegrationSmokeTest.java => BasePinotIntegrationConnectorSmokeTest.java} (95%) rename plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/{TestPinotWithoutAuthenticationIntegrationSmokeTest.java => TestPinotWithoutAuthenticationIntegrationConnectorConnectorSmokeTest.java} (83%) rename plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/{TestPinotWithoutAuthenticationIntegrationSmokeTestLatestVersion.java => TestPinotWithoutAuthenticationIntegrationLatestVersionConnectorConnectorSmokeTest.java} (85%) rename plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/{TestPinotWithoutAuthenticationIntegrationSmokeTestLatestVersionNoGrpc.java => TestPinotWithoutAuthenticationIntegrationLatestVersionNoGrpcConnectorSmokeTest.java} (86%) rename plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/{TestSecuredPinotIntegrationSmokeTest.java => TestSecuredPinotIntegrationConnectorSmokeTest.java} (92%) create mode 100644 plugin/trino-pinot/src/test/resources/nation_realtimeSpec.json create mode 100644 plugin/trino-pinot/src/test/resources/nation_schema.json create mode 100644 plugin/trino-pinot/src/test/resources/region_realtimeSpec.json create mode 100644 plugin/trino-pinot/src/test/resources/region_schema.json diff --git a/plugin/trino-pinot/pom.xml b/plugin/trino-pinot/pom.xml index d422ea4fcf69..d4df442d9b12 100755 --- a/plugin/trino-pinot/pom.xml +++ b/plugin/trino-pinot/pom.xml @@ -546,6 +546,18 @@ + + io.trino + trino-tpch + test + + + + io.trino.tpch + tpch + test + + io.airlift testing diff --git a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/AbstractPinotIntegrationSmokeTest.java b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/BasePinotIntegrationConnectorSmokeTest.java similarity index 95% rename from plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/AbstractPinotIntegrationSmokeTest.java rename to plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/BasePinotIntegrationConnectorSmokeTest.java index af3cf513b510..ed109cc6ace2 100644 --- a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/AbstractPinotIntegrationSmokeTest.java +++ b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/BasePinotIntegrationConnectorSmokeTest.java @@ -21,15 +21,19 @@ import io.confluent.kafka.serializers.KafkaAvroSerializer; import io.trino.Session; import io.trino.plugin.pinot.client.PinotHostMapper; +import io.trino.plugin.tpch.TpchPlugin; import io.trino.sql.planner.plan.AggregationNode; import io.trino.sql.planner.plan.ExchangeNode; import io.trino.sql.planner.plan.FilterNode; import io.trino.sql.planner.plan.LimitNode; import io.trino.sql.planner.plan.MarkDistinctNode; import io.trino.sql.planner.plan.ProjectNode; -import io.trino.testing.AbstractTestQueryFramework; +import io.trino.testing.BaseConnectorSmokeTest; +import io.trino.testing.DistributedQueryRunner; import io.trino.testing.MaterializedResult; +import io.trino.testing.MaterializedRow; import io.trino.testing.QueryRunner; +import io.trino.testing.TestingConnectorBehavior; import io.trino.testing.kafka.TestingKafka; import org.apache.avro.Schema; import org.apache.avro.SchemaBuilder; @@ -76,6 +80,7 @@ import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE; import static com.google.inject.multibindings.OptionalBinder.newOptionalBinder; import static io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG; +import static io.trino.plugin.pinot.PinotQueryRunner.createPinotQueryRunner; import static io.trino.plugin.pinot.TestingPinotCluster.PINOT_PREVIOUS_IMAGE_NAME; import static io.trino.spi.type.DoubleType.DOUBLE; import static io.trino.spi.type.RealType.REAL; @@ -93,9 +98,8 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.testng.Assert.assertEquals; -public abstract class AbstractPinotIntegrationSmokeTest - // TODO extend BaseConnectorTest - extends AbstractTestQueryFramework +public abstract class BasePinotIntegrationConnectorSmokeTest + extends BaseConnectorSmokeTest { private static final int MAX_ROWS_PER_SPLIT_FOR_SEGMENT_QUERIES = 11; private static final int MAX_ROWS_PER_SPLIT_FOR_BROKER_QUERIES = 12; @@ -156,11 +160,19 @@ protected QueryRunner createQueryRunner() createAndPopulateHavingQuotesInColumnNames(kafka, pinot); createAndPopulateHavingMultipleColumnsWithDuplicateValues(kafka, pinot); - return PinotQueryRunner.createPinotQueryRunner( + DistributedQueryRunner queryRunner = createPinotQueryRunner( ImmutableMap.of(), pinotProperties(pinot), Optional.of(binder -> newOptionalBinder(binder, PinotHostMapper.class).setBinding() .toInstance(new TestingPinotHostMapper(pinot.getBrokerHostAndPort(), pinot.getServerHostAndPort(), pinot.getServerGrpcHostAndPort())))); + + queryRunner.installPlugin(new TpchPlugin()); + queryRunner.createCatalog("tpch", "tpch"); + + // We need the query runner to populate nation and region data from tpch schema + createAndPopulateNationAndRegionData(kafka, pinot, queryRunner); + + return queryRunner; } private void createAndPopulateAllTypesTopic(TestingKafka kafka, TestingPinotCluster pinot) @@ -597,6 +609,71 @@ private void createAndPopulateHavingMultipleColumnsWithDuplicateValues(TestingKa pinot.addRealTimeTable(getClass().getClassLoader().getResourceAsStream("duplicate_values_in_columns_realtimeSpec.json"), DUPLICATE_VALUES_IN_COLUMNS_TABLE); } + private void createAndPopulateNationAndRegionData(TestingKafka kafka, TestingPinotCluster pinot, DistributedQueryRunner queryRunner) + throws Exception + { + // Create and populate table and topic data + String regionTableName = "region"; + kafka.createTopicWithConfig(2, 1, regionTableName, false); + Schema regionSchema = SchemaBuilder.record(regionTableName).fields() + // regionkey bigint, name varchar, comment varchar + .name("regionkey").type().longType().noDefault() + .name("name").type().stringType().noDefault() + .name("comment").type().stringType().noDefault() + .name("updated_at_seconds").type().longType().noDefault() + .endRecord(); + ImmutableList.Builder> regionRowsBuilder = ImmutableList.builder(); + MaterializedResult regionRows = queryRunner.execute("SELECT * FROM tpch.tiny.region"); + for (MaterializedRow row : regionRows.getMaterializedRows()) { + regionRowsBuilder.add(new ProducerRecord<>(regionTableName, "key" + row.getField(0), new GenericRecordBuilder(regionSchema) + .set("regionkey", row.getField(0)) + .set("name", row.getField(1)) + .set("comment", row.getField(2)) + .set("updated_at_seconds", initialUpdatedAt.plusMillis(1000).toEpochMilli()) + .build())); + } + kafka.sendMessages(regionRowsBuilder.build().stream(), schemaRegistryAwareProducer(kafka)); + pinot.createSchema(getClass().getClassLoader().getResourceAsStream("region_schema.json"), regionTableName); + pinot.addRealTimeTable(getClass().getClassLoader().getResourceAsStream("region_realtimeSpec.json"), regionTableName); + + String nationTableName = "nation"; + kafka.createTopicWithConfig(2, 1, nationTableName, false); + Schema nationSchema = SchemaBuilder.record(nationTableName).fields() + // nationkey BIGINT, name VARCHAR, VARCHAR, regionkey BIGINT + .name("nationkey").type().longType().noDefault() + .name("name").type().stringType().noDefault() + .name("comment").type().stringType().noDefault() + .name("regionkey").type().longType().noDefault() + .name("updated_at_seconds").type().longType().noDefault() + .endRecord(); + ImmutableList.Builder> nationRowsBuilder = ImmutableList.builder(); + MaterializedResult nationRows = queryRunner.execute("SELECT * FROM tpch.tiny.nation"); + for (MaterializedRow row : nationRows.getMaterializedRows()) { + nationRowsBuilder.add(new ProducerRecord<>(nationTableName, "key" + row.getField(0), new GenericRecordBuilder(nationSchema) + .set("nationkey", row.getField(0)) + .set("name", row.getField(1)) + .set("comment", row.getField(3)) + .set("regionkey", row.getField(2)) + .set("updated_at_seconds", initialUpdatedAt.plusMillis(1000).toEpochMilli()) + .build())); + } + kafka.sendMessages(nationRowsBuilder.build().stream(), schemaRegistryAwareProducer(kafka)); + pinot.createSchema(getClass().getClassLoader().getResourceAsStream("nation_schema.json"), nationTableName); + pinot.addRealTimeTable(getClass().getClassLoader().getResourceAsStream("nation_realtimeSpec.json"), nationTableName); + } + + @Override + protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior) + { + return switch (connectorBehavior) { + case SUPPORTS_CREATE_SCHEMA, + SUPPORTS_CREATE_TABLE, + SUPPORTS_INSERT, + SUPPORTS_RENAME_TABLE -> false; + default -> super.hasBehavior(connectorBehavior); + }; + } + private Map pinotProperties(TestingPinotCluster pinot) { return ImmutableMap.builder() @@ -899,6 +976,37 @@ public static Object of( } } + @Override + public void testShowCreateTable() + { + assertQueryFails("SHOW CREATE TABLE region", "No PropertyMetadata for property: pinotColumnName"); + } + + @Override + public void testSelectInformationSchemaColumns() + { + // Override because there's updated_at_seconds column + assertThat(query("SELECT column_name FROM information_schema.columns WHERE table_schema = 'default' AND table_name = 'region'")) + .skippingTypesCheck() + .matches("VALUES 'regionkey', 'name', 'comment', 'updated_at_seconds'"); + } + + @Override + public void testTopN() + { + // TODO https://github.com/trinodb/trino/issues/14045 Fix ORDER BY ... LIMIT query + assertQueryFails("SELECT regionkey FROM nation ORDER BY name LIMIT 3", + format("Segment query returned '%2$s' rows per split, maximum allowed is '%1$s' rows. with query \"SELECT \"regionkey\", \"name\" FROM nation_REALTIME LIMIT 12\"", MAX_ROWS_PER_SPLIT_FOR_SEGMENT_QUERIES, MAX_ROWS_PER_SPLIT_FOR_SEGMENT_QUERIES + 1)); + } + + @Override + public void testJoin() + { + // TODO https://github.com/trinodb/trino/issues/14046 Fix JOIN query + assertQueryFails("SELECT n.name, r.name FROM nation n JOIN region r on n.regionkey = r.regionkey", + format("Segment query returned '%2$s' rows per split, maximum allowed is '%1$s' rows. with query \"SELECT \"regionkey\", \"name\" FROM nation_REALTIME LIMIT 12\"", MAX_ROWS_PER_SPLIT_FOR_SEGMENT_QUERIES, MAX_ROWS_PER_SPLIT_FOR_SEGMENT_QUERIES + 1)); + } + @Test public void testRealType() { @@ -1427,12 +1535,6 @@ public void testLimitPushdown() .isNotFullyPushedDown(LimitNode.class); } - @Test - public void testCreateTable() - { - assertQueryFails("CREATE TABLE test_create_table (col INT)", "This connector does not support creating tables"); - } - /** * https://github.com/trinodb/trino/issues/8307 */ diff --git a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/PinotQueryRunner.java b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/PinotQueryRunner.java index 235e55bd9bce..565561700b60 100755 --- a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/PinotQueryRunner.java +++ b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/PinotQueryRunner.java @@ -43,7 +43,6 @@ public static DistributedQueryRunner createPinotQueryRunner(Map throws Exception { DistributedQueryRunner queryRunner = DistributedQueryRunner.builder(createSession("default")) - .setNodeCount(2) .setExtraProperties(extraProperties) .build(); queryRunner.installPlugin(new PinotPlugin(extension)); diff --git a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotWithoutAuthenticationIntegrationSmokeTest.java b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotWithoutAuthenticationIntegrationConnectorConnectorSmokeTest.java similarity index 83% rename from plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotWithoutAuthenticationIntegrationSmokeTest.java rename to plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotWithoutAuthenticationIntegrationConnectorConnectorSmokeTest.java index 54d1008c3475..424aa53ec07d 100644 --- a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotWithoutAuthenticationIntegrationSmokeTest.java +++ b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotWithoutAuthenticationIntegrationConnectorConnectorSmokeTest.java @@ -13,8 +13,8 @@ */ package io.trino.plugin.pinot; -public class TestPinotWithoutAuthenticationIntegrationSmokeTest - extends AbstractPinotIntegrationSmokeTest +public class TestPinotWithoutAuthenticationIntegrationConnectorConnectorSmokeTest + extends BasePinotIntegrationConnectorSmokeTest { @Override protected boolean isSecured() diff --git a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotWithoutAuthenticationIntegrationSmokeTestLatestVersion.java b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotWithoutAuthenticationIntegrationLatestVersionConnectorConnectorSmokeTest.java similarity index 85% rename from plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotWithoutAuthenticationIntegrationSmokeTestLatestVersion.java rename to plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotWithoutAuthenticationIntegrationLatestVersionConnectorConnectorSmokeTest.java index 05b7c5d89e38..35795c92c566 100644 --- a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotWithoutAuthenticationIntegrationSmokeTestLatestVersion.java +++ b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotWithoutAuthenticationIntegrationLatestVersionConnectorConnectorSmokeTest.java @@ -15,8 +15,8 @@ import static io.trino.plugin.pinot.TestingPinotCluster.PINOT_LATEST_IMAGE_NAME; -public class TestPinotWithoutAuthenticationIntegrationSmokeTestLatestVersion - extends AbstractPinotIntegrationSmokeTest +public class TestPinotWithoutAuthenticationIntegrationLatestVersionConnectorConnectorSmokeTest + extends BasePinotIntegrationConnectorSmokeTest { @Override protected boolean isSecured() diff --git a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotWithoutAuthenticationIntegrationSmokeTestLatestVersionNoGrpc.java b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotWithoutAuthenticationIntegrationLatestVersionNoGrpcConnectorSmokeTest.java similarity index 86% rename from plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotWithoutAuthenticationIntegrationSmokeTestLatestVersionNoGrpc.java rename to plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotWithoutAuthenticationIntegrationLatestVersionNoGrpcConnectorSmokeTest.java index 979ed75d5c8b..274e6f2da1a0 100644 --- a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotWithoutAuthenticationIntegrationSmokeTestLatestVersionNoGrpc.java +++ b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotWithoutAuthenticationIntegrationLatestVersionNoGrpcConnectorSmokeTest.java @@ -15,8 +15,8 @@ import static io.trino.plugin.pinot.TestingPinotCluster.PINOT_LATEST_IMAGE_NAME; -public class TestPinotWithoutAuthenticationIntegrationSmokeTestLatestVersionNoGrpc - extends AbstractPinotIntegrationSmokeTest +public class TestPinotWithoutAuthenticationIntegrationLatestVersionNoGrpcConnectorSmokeTest + extends BasePinotIntegrationConnectorSmokeTest { @Override protected boolean isSecured() diff --git a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestSecuredPinotIntegrationSmokeTest.java b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestSecuredPinotIntegrationConnectorSmokeTest.java similarity index 92% rename from plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestSecuredPinotIntegrationSmokeTest.java rename to plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestSecuredPinotIntegrationConnectorSmokeTest.java index 189b6236a327..4cd780e1b046 100644 --- a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestSecuredPinotIntegrationSmokeTest.java +++ b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestSecuredPinotIntegrationConnectorSmokeTest.java @@ -19,8 +19,8 @@ import static io.trino.plugin.pinot.auth.PinotAuthenticationType.PASSWORD; -public class TestSecuredPinotIntegrationSmokeTest - extends AbstractPinotIntegrationSmokeTest +public class TestSecuredPinotIntegrationConnectorSmokeTest + extends BasePinotIntegrationConnectorSmokeTest { @Override protected boolean isSecured() diff --git a/plugin/trino-pinot/src/test/resources/nation_realtimeSpec.json b/plugin/trino-pinot/src/test/resources/nation_realtimeSpec.json new file mode 100644 index 000000000000..9cce19a01f8c --- /dev/null +++ b/plugin/trino-pinot/src/test/resources/nation_realtimeSpec.json @@ -0,0 +1,46 @@ +{ + "tableName": "nation", + "tableType": "REALTIME", + "segmentsConfig": { + "timeColumnName": "updated_at_seconds", + "retentionTimeUnit": "DAYS", + "retentionTimeValue": "365", + "segmentPushType": "APPEND", + "segmentPushFrequency": "daily", + "segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy", + "schemaName": "nation", + "replicasPerPartition": "1" + }, + "tenants": { + "broker": "DefaultTenant", + "server": "DefaultTenant" + }, + "tableIndexConfig": { + "loadMode": "MMAP", + "noDictionaryColumns": [], + "sortedColumn": [ + "updated_at_seconds" + ], + "aggregateMetrics": "false", + "nullHandlingEnabled": "true", + "streamConfigs": { + "streamType": "kafka", + "stream.kafka.consumer.type": "lowLevel", + "stream.kafka.topic.name": "nation", + "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder", + "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory", + "stream.kafka.decoder.prop.schema.registry.rest.url": "http://schema-registry:8081", + "stream.kafka.zk.broker.url": "zookeeper:2181/", + "stream.kafka.broker.list": "kafka:9092", + "realtime.segment.flush.threshold.time": "1m", + "realtime.segment.flush.threshold.size": "0", + "realtime.segment.flush.desired.size": "1M", + "isolation.level": "read_committed", + "stream.kafka.consumer.prop.auto.offset.reset": "smallest", + "stream.kafka.consumer.prop.group.id": "pinot_nation" + } + }, + "metadata": { + "customConfigs": {} + } +} diff --git a/plugin/trino-pinot/src/test/resources/nation_schema.json b/plugin/trino-pinot/src/test/resources/nation_schema.json new file mode 100644 index 000000000000..b6c2922d8f07 --- /dev/null +++ b/plugin/trino-pinot/src/test/resources/nation_schema.json @@ -0,0 +1,31 @@ +{ + "schemaName": "nation", + "dimensionFieldSpecs": [ + { + "name": "nationkey", + "dataType": "LONG" + }, + { + "name": "name", + "dataType": "STRING" + }, + { + "name": "comment", + "dataType": "STRING" + }, + { + "name": "regionkey", + "dataType": "LONG" + } + ], + "dateTimeFieldSpecs": [ + { + "name": "updated_at_seconds", + "dataType": "LONG", + "defaultNullValue": 0, + "format": "1:SECONDS:EPOCH", + "transformFunction": "toEpochSeconds(updated_at)", + "granularity": "1:SECONDS" + } + ] +} diff --git a/plugin/trino-pinot/src/test/resources/region_realtimeSpec.json b/plugin/trino-pinot/src/test/resources/region_realtimeSpec.json new file mode 100644 index 000000000000..2fa3cdea0320 --- /dev/null +++ b/plugin/trino-pinot/src/test/resources/region_realtimeSpec.json @@ -0,0 +1,46 @@ +{ + "tableName": "region", + "tableType": "REALTIME", + "segmentsConfig": { + "timeColumnName": "updated_at_seconds", + "retentionTimeUnit": "DAYS", + "retentionTimeValue": "365", + "segmentPushType": "APPEND", + "segmentPushFrequency": "daily", + "segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy", + "schemaName": "region", + "replicasPerPartition": "1" + }, + "tenants": { + "broker": "DefaultTenant", + "server": "DefaultTenant" + }, + "tableIndexConfig": { + "loadMode": "MMAP", + "noDictionaryColumns": [], + "sortedColumn": [ + "updated_at_seconds" + ], + "aggregateMetrics": "false", + "nullHandlingEnabled": "true", + "streamConfigs": { + "streamType": "kafka", + "stream.kafka.consumer.type": "lowLevel", + "stream.kafka.topic.name": "region", + "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder", + "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory", + "stream.kafka.decoder.prop.schema.registry.rest.url": "http://schema-registry:8081", + "stream.kafka.zk.broker.url": "zookeeper:2181/", + "stream.kafka.broker.list": "kafka:9092", + "realtime.segment.flush.threshold.time": "1m", + "realtime.segment.flush.threshold.size": "0", + "realtime.segment.flush.desired.size": "1M", + "isolation.level": "read_committed", + "stream.kafka.consumer.prop.auto.offset.reset": "smallest", + "stream.kafka.consumer.prop.group.id": "pinot_region" + } + }, + "metadata": { + "customConfigs": {} + } +} diff --git a/plugin/trino-pinot/src/test/resources/region_schema.json b/plugin/trino-pinot/src/test/resources/region_schema.json new file mode 100644 index 000000000000..65f3dee8c8b9 --- /dev/null +++ b/plugin/trino-pinot/src/test/resources/region_schema.json @@ -0,0 +1,27 @@ +{ + "schemaName": "region", + "dimensionFieldSpecs": [ + { + "name": "regionkey", + "dataType": "LONG" + }, + { + "name": "name", + "dataType": "STRING" + }, + { + "name": "comment", + "dataType": "STRING" + } + ], + "dateTimeFieldSpecs": [ + { + "name": "updated_at_seconds", + "dataType": "LONG", + "defaultNullValue": 0, + "format": "1:SECONDS:EPOCH", + "transformFunction": "toEpochSeconds(updated_at)", + "granularity": "1:SECONDS" + } + ] +}