Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions plugin/trino-pinot/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
<description>Trino - Pinot connector</description>

<properties>
<dep.pinot.version>1.3.0</dep.pinot.version>
<dep.pinot.version>1.4.0</dep.pinot.version>
<air.compiler.fail-warnings>true</air.compiler.fail-warnings>
<!-- additional JVM flags required by chronicle-hft which is used by pinot-segment spi -->
<air.test.jvm.additional-arguments>${air.test.jvm.additional-arguments.default}
Expand Down Expand Up @@ -126,7 +126,7 @@
<dependency>
<groupId>org.apache.helix</groupId>
<artifactId>helix-core</artifactId>
<version>1.3.1</version>
<version>1.3.2</version>
<exclusions>
<exclusion>
<groupId>javax.annotation</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.apache.pinot.common.datatable.DataTable;
import org.apache.pinot.common.datatable.DataTableFactory;
import org.apache.pinot.common.proto.Server;
import org.apache.pinot.common.utils.grpc.GrpcQueryClient;
import org.apache.pinot.common.utils.grpc.ServerGrpcQueryClient;
import org.apache.pinot.spi.utils.CommonConstants.Query.Response.MetadataKeys;
import org.apache.pinot.spi.utils.CommonConstants.Query.Response.ResponseType;

Expand Down Expand Up @@ -150,7 +150,7 @@ public int getRowLimit()

public interface GrpcQueryClientFactory
{
GrpcQueryClient create(HostAndPort hostAndPort);
ServerGrpcQueryClient create(HostAndPort hostAndPort);
}

public static class PlainTextGrpcQueryClientFactory
Expand All @@ -169,9 +169,9 @@ public PlainTextGrpcQueryClientFactory(PinotGrpcServerQueryClientConfig grpcClie
}

@Override
public GrpcQueryClient create(HostAndPort hostAndPort)
public ServerGrpcQueryClient create(HostAndPort hostAndPort)
{
return new GrpcQueryClient(hostAndPort.getHost(), hostAndPort.getPort(), config);
return new ServerGrpcQueryClient(hostAndPort.getHost(), hostAndPort.getPort(), config);
}
}

Expand Down Expand Up @@ -214,16 +214,16 @@ public TlsGrpcQueryClientFactory(PinotGrpcServerQueryClientConfig grpcClientConf
}

@Override
public GrpcQueryClient create(HostAndPort hostAndPort)
public ServerGrpcQueryClient create(HostAndPort hostAndPort)
{
return new GrpcQueryClient(hostAndPort.getHost(), hostAndPort.getPort(), config);
return new ServerGrpcQueryClient(hostAndPort.getHost(), hostAndPort.getPort(), config);
}
}

public static class PinotGrpcServerQueryClient
{
private final PinotHostMapper pinotHostMapper;
private final Map<HostAndPort, GrpcQueryClient> clientCache = new ConcurrentHashMap<>();
private final Map<HostAndPort, ServerGrpcQueryClient> clientCache = new ConcurrentHashMap<>();
private final int grpcPort;
private final GrpcQueryClientFactory grpcQueryClientFactory;
private final Optional<String> proxyUri;
Expand All @@ -242,9 +242,9 @@ private PinotGrpcServerQueryClient(PinotHostMapper pinotHostMapper, PinotGrpcSer
public Iterator<PinotDataTableWithSize> queryPinot(String query, String serverHost, List<String> segments)
{
HostAndPort mappedHostAndPort = pinotHostMapper.getServerGrpcHostAndPort(serverHost, grpcPort);
// GrpcQueryClient does not implement Closeable. The idle timeout is 30 minutes (grpc default).
GrpcQueryClient client = clientCache.computeIfAbsent(mappedHostAndPort, hostAndPort -> {
GrpcQueryClient queryClient = proxyUri.isPresent() ? grpcQueryClientFactory.create(HostAndPort.fromString(proxyUri.get())) : grpcQueryClientFactory.create(hostAndPort);
// ServerGrpcQueryClient does not implement Closeable. The idle timeout is 30 minutes (grpc default).
ServerGrpcQueryClient client = clientCache.computeIfAbsent(mappedHostAndPort, hostAndPort -> {
ServerGrpcQueryClient queryClient = proxyUri.isPresent() ? grpcQueryClientFactory.create(HostAndPort.fromString(proxyUri.get())) : grpcQueryClientFactory.create(hostAndPort);
closer.register(queryClient);
return queryClient;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.pinot.core.operator.transform.function.TransformFunctionFactory;
import org.apache.pinot.segment.local.segment.index.datasource.EmptyDataSource;
import org.apache.pinot.segment.spi.datasource.DataSource;
import org.apache.pinot.segment.spi.index.metadata.ColumnMetadataImpl;
import org.apache.pinot.spi.data.FieldSpec;

import java.util.Map;
Expand Down Expand Up @@ -56,9 +55,7 @@ private static Map<String, DataSource> getDataSourceMap(PinotClient pinotClient,
try {
return pinotClient.getTableSchema(pinotTableName).getFieldSpecMap().entrySet().stream()
.collect(toImmutableMap(Map.Entry::getKey,
entry -> new EmptyDataSource(new ColumnMetadataImpl.Builder()
.setFieldSpec(entry.getValue())
.build())));
entry -> new EmptyDataSource(entry.getValue())));
}
catch (Exception e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.pinot.common.utils.TarCompressionUtils;
import org.apache.pinot.segment.local.recordtransformer.CompositeTransformer;
import org.apache.pinot.segment.local.segment.creator.RecordReaderSegmentCreationDataSource;
import org.apache.pinot.segment.local.segment.creator.TransformPipeline;
import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
Expand All @@ -52,7 +51,6 @@
import org.apache.pinot.spi.data.DateTimeFormatSpec;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.RecordReader;
import org.apache.pinot.spi.recordtransformer.RecordTransformer;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -634,19 +632,8 @@ private static Path createSegment(String tableConfigResourceName, String pinotSc
}
segmentGeneratorConfig.setSequenceId(sequenceId);
SegmentCreationDataSource dataSource = new RecordReaderSegmentCreationDataSource(recordReader);
RecordTransformer recordTransformer = genericRow -> {
GenericRow record = null;
try {
record = CompositeTransformer.getDefaultTransformer(tableConfig, pinotSchema).transform(genericRow);
}
catch (Exception e) {
// ignored
record = null;
}
return record;
};
SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
driver.init(segmentGeneratorConfig, dataSource, new TransformPipeline(recordTransformer, null));
driver.init(segmentGeneratorConfig, dataSource, new TransformPipeline(tableConfig, pinotSchema));
driver.build();
File segmentOutputDirectory = driver.getOutputDirectory();
File tgzPath = new File(String.join(File.separator, outputDirectory, segmentOutputDirectory.getName() + ".tar.gz"));
Expand Down Expand Up @@ -2434,7 +2421,7 @@ public void testTransformFunctions()
" WHERE string_col in ('string_0', 'array_null')\""))
.matches("VALUES (3), (1)");

assertThat(query("SELECT \"cast(floor(arrayaverage(long_array_col)),'long')\" FROM " +
assertThat(query("SELECT \"cast(floor(arrayaverage(long_array_col)),'BIGINT')\" FROM " +
"\"SELECT cast(floor(arrayaverage(long_array_col)) as long)" +
" FROM " + ALL_TYPES_TABLE +
" WHERE double_array_col is not null and double_col != -17.33\""))
Expand Down Expand Up @@ -2487,7 +2474,7 @@ public void testPassthroughQueriesWithAliases()

// Test without aliases to verify fieldName is correctly handled
assertThat(query("SELECT \"timeconvert(created_at_seconds,'seconds','hours')\"," +
" \"cast(floor(divide(created_at_seconds,'3600')),'long')\" FROM " +
" \"cast(floor(divide(created_at_seconds,'3600')),'BIGINT')\" FROM " +
"\"SELECT timeconvert(created_at_seconds, 'SECONDS', 'HOURS')," +
" CAST(FLOOR(created_at_seconds / 3600) as long)" +
" FROM " + DATE_TIME_FIELDS_TABLE + "\""))
Expand Down Expand Up @@ -2531,15 +2518,15 @@ public void testPassthroughQueriesWithAliases()
public void testPassthroughQueriesWithPushdowns()
{
assertThat(query("SELECT DISTINCT \"timeconvert(created_at_seconds,'seconds','hours')\"," +
" \"cast(floor(divide(created_at_seconds,'3600')),'long')\" FROM " +
" \"cast(floor(divide(created_at_seconds,'3600')),'BIGINT')\" FROM " +
"\"SELECT timeconvert(created_at_seconds, 'SECONDS', 'HOURS')," +
" CAST(FLOOR(created_at_seconds / 3600) AS long)" +
" FROM " + DATE_TIME_FIELDS_TABLE + "\""))
.matches("VALUES (BIGINT '450168', BIGINT '450168')");

assertThat(query("SELECT DISTINCT \"timeconvert(created_at_seconds,'seconds','milliseconds')\"," +
" \"cast(floor(divide(created_at_seco" +
"nds,'3600')),'long')\" FROM " +
"nds,'3600')),'BIGINT')\" FROM " +
"\"SELECT timeconvert(created_at_seconds, 'SECONDS', 'MILLISECONDS')," +
" CAST(FLOOR(created_at_seconds / 3600) as long)" +
" FROM " + DATE_TIME_FIELDS_TABLE + "\""))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
public class TestingPinotCluster
implements Closeable
{
public static final String PINOT_LATEST_IMAGE_NAME = "apachepinot/pinot:1.2.0";
public static final String PINOT_LATEST_IMAGE_NAME = "apachepinot/pinot:1.4.0";
private static final String ZOOKEEPER_INTERNAL_HOST = "zookeeper";
private static final JsonCodec<List<String>> LIST_JSON_CODEC = listJsonCodec(String.class);
private static final JsonCodec<PinotSuccessResponse> PINOT_SUCCESS_RESPONSE_JSON_CODEC = jsonCodec(PinotSuccessResponse.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,6 @@
"dimensionsSplitOrder": ["string_col"],
"functionColumnPairs": [
"COUNT__int_col",
"COUNT__float_col",
"COUNT__double_col",
"COUNT__long_col",
"MIN__int_col",
"MIN__float_col",
"MIN__double_col",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,6 @@
"dimensionsSplitOrder": ["string_col"],
"functionColumnPairs": [
"COUNT__int_col",
"COUNT__float_col",
"COUNT__double_col",
"COUNT__long_col",
"MIN__int_col",
"MIN__float_col",
"MIN__double_col",
Expand Down