diff --git a/plugin/trino-pinot/pom.xml b/plugin/trino-pinot/pom.xml index 62a9b4f5fd22..4f148524afcb 100644 --- a/plugin/trino-pinot/pom.xml +++ b/plugin/trino-pinot/pom.xml @@ -14,7 +14,7 @@ Trino - Pinot connector - 1.3.0 + 1.4.0 true ${air.test.jvm.additional-arguments.default} @@ -126,7 +126,7 @@ org.apache.helix helix-core - 1.3.1 + 1.3.2 javax.annotation diff --git a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/client/PinotGrpcDataFetcher.java b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/client/PinotGrpcDataFetcher.java index 9a3483c10e7c..941bcec81ebc 100644 --- a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/client/PinotGrpcDataFetcher.java +++ b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/client/PinotGrpcDataFetcher.java @@ -27,7 +27,7 @@ import org.apache.pinot.common.datatable.DataTable; import org.apache.pinot.common.datatable.DataTableFactory; import org.apache.pinot.common.proto.Server; -import org.apache.pinot.common.utils.grpc.GrpcQueryClient; +import org.apache.pinot.common.utils.grpc.ServerGrpcQueryClient; import org.apache.pinot.spi.utils.CommonConstants.Query.Response.MetadataKeys; import org.apache.pinot.spi.utils.CommonConstants.Query.Response.ResponseType; @@ -150,7 +150,7 @@ public int getRowLimit() public interface GrpcQueryClientFactory { - GrpcQueryClient create(HostAndPort hostAndPort); + ServerGrpcQueryClient create(HostAndPort hostAndPort); } public static class PlainTextGrpcQueryClientFactory @@ -169,9 +169,9 @@ public PlainTextGrpcQueryClientFactory(PinotGrpcServerQueryClientConfig grpcClie } @Override - public GrpcQueryClient create(HostAndPort hostAndPort) + public ServerGrpcQueryClient create(HostAndPort hostAndPort) { - return new GrpcQueryClient(hostAndPort.getHost(), hostAndPort.getPort(), config); + return new ServerGrpcQueryClient(hostAndPort.getHost(), hostAndPort.getPort(), config); } } @@ -214,16 +214,16 @@ public TlsGrpcQueryClientFactory(PinotGrpcServerQueryClientConfig grpcClientConf } @Override - public GrpcQueryClient create(HostAndPort hostAndPort) + public ServerGrpcQueryClient create(HostAndPort hostAndPort) { - return new GrpcQueryClient(hostAndPort.getHost(), hostAndPort.getPort(), config); + return new ServerGrpcQueryClient(hostAndPort.getHost(), hostAndPort.getPort(), config); } } public static class PinotGrpcServerQueryClient { private final PinotHostMapper pinotHostMapper; - private final Map clientCache = new ConcurrentHashMap<>(); + private final Map clientCache = new ConcurrentHashMap<>(); private final int grpcPort; private final GrpcQueryClientFactory grpcQueryClientFactory; private final Optional proxyUri; @@ -242,9 +242,9 @@ private PinotGrpcServerQueryClient(PinotHostMapper pinotHostMapper, PinotGrpcSer public Iterator queryPinot(String query, String serverHost, List segments) { HostAndPort mappedHostAndPort = pinotHostMapper.getServerGrpcHostAndPort(serverHost, grpcPort); - // GrpcQueryClient does not implement Closeable. The idle timeout is 30 minutes (grpc default). - GrpcQueryClient client = clientCache.computeIfAbsent(mappedHostAndPort, hostAndPort -> { - GrpcQueryClient queryClient = proxyUri.isPresent() ? grpcQueryClientFactory.create(HostAndPort.fromString(proxyUri.get())) : grpcQueryClientFactory.create(hostAndPort); + // ServerGrpcQueryClient does not implement Closeable. The idle timeout is 30 minutes (grpc default). + ServerGrpcQueryClient client = clientCache.computeIfAbsent(mappedHostAndPort, hostAndPort -> { + ServerGrpcQueryClient queryClient = proxyUri.isPresent() ? grpcQueryClientFactory.create(HostAndPort.fromString(proxyUri.get())) : grpcQueryClientFactory.create(hostAndPort); closer.register(queryClient); return queryClient; }); diff --git a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/query/PinotTypeResolver.java b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/query/PinotTypeResolver.java index acc0d3e828ab..93031108a33d 100644 --- a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/query/PinotTypeResolver.java +++ b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/query/PinotTypeResolver.java @@ -27,7 +27,6 @@ import org.apache.pinot.core.operator.transform.function.TransformFunctionFactory; import org.apache.pinot.segment.local.segment.index.datasource.EmptyDataSource; import org.apache.pinot.segment.spi.datasource.DataSource; -import org.apache.pinot.segment.spi.index.metadata.ColumnMetadataImpl; import org.apache.pinot.spi.data.FieldSpec; import java.util.Map; @@ -56,9 +55,7 @@ private static Map getDataSourceMap(PinotClient pinotClient, try { return pinotClient.getTableSchema(pinotTableName).getFieldSpecMap().entrySet().stream() .collect(toImmutableMap(Map.Entry::getKey, - entry -> new EmptyDataSource(new ColumnMetadataImpl.Builder() - .setFieldSpec(entry.getValue()) - .build()))); + entry -> new EmptyDataSource(entry.getValue()))); } catch (Exception e) { throw new RuntimeException(e); diff --git a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotConnectorSmokeTest.java b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotConnectorSmokeTest.java index a172592f2221..2e488ff95430 100644 --- a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotConnectorSmokeTest.java +++ b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotConnectorSmokeTest.java @@ -40,7 +40,6 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.pinot.common.utils.TarCompressionUtils; -import org.apache.pinot.segment.local.recordtransformer.CompositeTransformer; import org.apache.pinot.segment.local.segment.creator.RecordReaderSegmentCreationDataSource; import org.apache.pinot.segment.local.segment.creator.TransformPipeline; import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl; @@ -52,7 +51,6 @@ import org.apache.pinot.spi.data.DateTimeFormatSpec; import org.apache.pinot.spi.data.readers.GenericRow; import org.apache.pinot.spi.data.readers.RecordReader; -import org.apache.pinot.spi.recordtransformer.RecordTransformer; import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.junit.jupiter.api.Test; @@ -634,19 +632,8 @@ private static Path createSegment(String tableConfigResourceName, String pinotSc } segmentGeneratorConfig.setSequenceId(sequenceId); SegmentCreationDataSource dataSource = new RecordReaderSegmentCreationDataSource(recordReader); - RecordTransformer recordTransformer = genericRow -> { - GenericRow record = null; - try { - record = CompositeTransformer.getDefaultTransformer(tableConfig, pinotSchema).transform(genericRow); - } - catch (Exception e) { - // ignored - record = null; - } - return record; - }; SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl(); - driver.init(segmentGeneratorConfig, dataSource, new TransformPipeline(recordTransformer, null)); + driver.init(segmentGeneratorConfig, dataSource, new TransformPipeline(tableConfig, pinotSchema)); driver.build(); File segmentOutputDirectory = driver.getOutputDirectory(); File tgzPath = new File(String.join(File.separator, outputDirectory, segmentOutputDirectory.getName() + ".tar.gz")); @@ -2434,7 +2421,7 @@ public void testTransformFunctions() " WHERE string_col in ('string_0', 'array_null')\"")) .matches("VALUES (3), (1)"); - assertThat(query("SELECT \"cast(floor(arrayaverage(long_array_col)),'long')\" FROM " + + assertThat(query("SELECT \"cast(floor(arrayaverage(long_array_col)),'BIGINT')\" FROM " + "\"SELECT cast(floor(arrayaverage(long_array_col)) as long)" + " FROM " + ALL_TYPES_TABLE + " WHERE double_array_col is not null and double_col != -17.33\"")) @@ -2487,7 +2474,7 @@ public void testPassthroughQueriesWithAliases() // Test without aliases to verify fieldName is correctly handled assertThat(query("SELECT \"timeconvert(created_at_seconds,'seconds','hours')\"," + - " \"cast(floor(divide(created_at_seconds,'3600')),'long')\" FROM " + + " \"cast(floor(divide(created_at_seconds,'3600')),'BIGINT')\" FROM " + "\"SELECT timeconvert(created_at_seconds, 'SECONDS', 'HOURS')," + " CAST(FLOOR(created_at_seconds / 3600) as long)" + " FROM " + DATE_TIME_FIELDS_TABLE + "\"")) @@ -2531,7 +2518,7 @@ public void testPassthroughQueriesWithAliases() public void testPassthroughQueriesWithPushdowns() { assertThat(query("SELECT DISTINCT \"timeconvert(created_at_seconds,'seconds','hours')\"," + - " \"cast(floor(divide(created_at_seconds,'3600')),'long')\" FROM " + + " \"cast(floor(divide(created_at_seconds,'3600')),'BIGINT')\" FROM " + "\"SELECT timeconvert(created_at_seconds, 'SECONDS', 'HOURS')," + " CAST(FLOOR(created_at_seconds / 3600) AS long)" + " FROM " + DATE_TIME_FIELDS_TABLE + "\"")) @@ -2539,7 +2526,7 @@ public void testPassthroughQueriesWithPushdowns() assertThat(query("SELECT DISTINCT \"timeconvert(created_at_seconds,'seconds','milliseconds')\"," + " \"cast(floor(divide(created_at_seco" + - "nds,'3600')),'long')\" FROM " + + "nds,'3600')),'BIGINT')\" FROM " + "\"SELECT timeconvert(created_at_seconds, 'SECONDS', 'MILLISECONDS')," + " CAST(FLOOR(created_at_seconds / 3600) as long)" + " FROM " + DATE_TIME_FIELDS_TABLE + "\"")) diff --git a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestingPinotCluster.java b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestingPinotCluster.java index 70259be4029b..e69dfa7a0bbf 100644 --- a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestingPinotCluster.java +++ b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestingPinotCluster.java @@ -65,7 +65,7 @@ public class TestingPinotCluster implements Closeable { - public static final String PINOT_LATEST_IMAGE_NAME = "apachepinot/pinot:1.2.0"; + public static final String PINOT_LATEST_IMAGE_NAME = "apachepinot/pinot:1.4.0"; private static final String ZOOKEEPER_INTERNAL_HOST = "zookeeper"; private static final JsonCodec> LIST_JSON_CODEC = listJsonCodec(String.class); private static final JsonCodec PINOT_SUCCESS_RESPONSE_JSON_CODEC = jsonCodec(PinotSuccessResponse.class); diff --git a/plugin/trino-pinot/src/test/resources/alltypes_nullable_realtimeSpec.json b/plugin/trino-pinot/src/test/resources/alltypes_nullable_realtimeSpec.json index c22eb60b197d..b09365c5a4f8 100644 --- a/plugin/trino-pinot/src/test/resources/alltypes_nullable_realtimeSpec.json +++ b/plugin/trino-pinot/src/test/resources/alltypes_nullable_realtimeSpec.json @@ -26,9 +26,6 @@ "dimensionsSplitOrder": ["string_col"], "functionColumnPairs": [ "COUNT__int_col", - "COUNT__float_col", - "COUNT__double_col", - "COUNT__long_col", "MIN__int_col", "MIN__float_col", "MIN__double_col", diff --git a/plugin/trino-pinot/src/test/resources/alltypes_realtimeSpec.json b/plugin/trino-pinot/src/test/resources/alltypes_realtimeSpec.json index fe4227b418ab..3931c494b4e3 100644 --- a/plugin/trino-pinot/src/test/resources/alltypes_realtimeSpec.json +++ b/plugin/trino-pinot/src/test/resources/alltypes_realtimeSpec.json @@ -26,9 +26,6 @@ "dimensionsSplitOrder": ["string_col"], "functionColumnPairs": [ "COUNT__int_col", - "COUNT__float_col", - "COUNT__double_col", - "COUNT__long_col", "MIN__int_col", "MIN__float_col", "MIN__double_col",