Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@
import static io.trino.plugin.pinot.PinotErrorCode.PINOT_AMBIGUOUS_TABLE_NAME;
import static io.trino.plugin.pinot.PinotErrorCode.PINOT_EXCEPTION;
import static io.trino.plugin.pinot.PinotErrorCode.PINOT_UNABLE_TO_FIND_BROKER;
import static io.trino.plugin.pinot.PinotErrorCode.PINOT_UNCLASSIFIED_ERROR;
import static io.trino.plugin.pinot.PinotMetadata.SCHEMA_NAME;
import static java.lang.String.format;
import static java.util.Locale.ENGLISH;
Expand All @@ -117,6 +118,8 @@ public class PinotClient
private static final String ROUTING_TABLE_API_TEMPLATE = "debug/routingTable/%s";
private static final String TIME_BOUNDARY_API_TEMPLATE = "debug/timeBoundary/%s";
private static final String QUERY_URL_PATH = "query/sql";
private static final int DEFAULT_HTTP_RETRY_COUNT = 10;
private static final int DEFAULT_RETRY_INTERVAL = 1000;

private final List<URI> controllerUrls;
private final HttpClient httpClient;
Expand Down Expand Up @@ -270,18 +273,21 @@ public List<String> getTables()

protected Multimap<String, String> getAllTables()
{
List<String> allTables = sendHttpGetToControllerJson(GET_ALL_TABLES_API_TEMPLATE, tablesJsonCodec).getTables();
ImmutableListMultimap.Builder<String, String> builder = ImmutableListMultimap.builder();
for (String table : allTables) {
builder.put(table.toLowerCase(ENGLISH), table);
}
return builder.build();
return doWithRetries(DEFAULT_HTTP_RETRY_COUNT, retryNumber -> {
List<String> allTables =
sendHttpGetToControllerJson(GET_ALL_TABLES_API_TEMPLATE, tablesJsonCodec).getTables();
ImmutableListMultimap.Builder<String, String> builder = ImmutableListMultimap.builder();
for (String table : allTables) {
builder.put(table.toLowerCase(ENGLISH), table);
}
return builder.build();
});
}

public Schema getTableSchema(String table)
throws Exception
{
return sendHttpGetToControllerJson(format(TABLE_SCHEMA_API_TEMPLATE, table), schemaJsonCodec);
return doWithRetries(DEFAULT_HTTP_RETRY_COUNT, retryNumber -> sendHttpGetToControllerJson(format(TABLE_SCHEMA_API_TEMPLATE, table), schemaJsonCodec));
}

public List<String> getPinotTableNames()
Expand Down Expand Up @@ -361,63 +367,73 @@ public List<InstancesInBroker> getBrokers()
@VisibleForTesting
public List<String> getAllBrokersForTable(String table)
{
ArrayList<String> brokers = sendHttpGetToControllerJson(format(TABLE_INSTANCES_API_TEMPLATE, table), brokersForTableJsonCodec)
.getBrokers().stream()
.flatMap(broker -> broker.getInstances().stream())
.distinct()
.map(brokerToParse -> {
Matcher matcher = BROKER_PATTERN.matcher(brokerToParse);
if (matcher.matches() && matcher.groupCount() == 2) {
return pinotHostMapper.getBrokerHost(matcher.group(1), matcher.group(2));
}
throw new PinotException(
PINOT_UNABLE_TO_FIND_BROKER,
Optional.empty(),
format("Cannot parse %s in the broker instance", brokerToParse));
})
.collect(Collectors.toCollection(ArrayList::new));
Collections.shuffle(brokers);
return ImmutableList.copyOf(brokers);
return doWithRetries(DEFAULT_HTTP_RETRY_COUNT, retryNumber -> {
ArrayList<String> brokers = sendHttpGetToControllerJson(format(TABLE_INSTANCES_API_TEMPLATE, table), brokersForTableJsonCodec)
.getBrokers().stream()
.flatMap(broker -> broker.getInstances().stream())
.distinct()
.map(brokerToParse -> {
Matcher matcher = BROKER_PATTERN.matcher(brokerToParse);
if (matcher.matches() && matcher.groupCount() == 2) {
return pinotHostMapper.getBrokerHost(matcher.group(1), matcher.group(2));
}
throw new PinotException(
PINOT_UNABLE_TO_FIND_BROKER,
Optional.empty(),
format("Cannot parse %s in the broker instance", brokerToParse));
})
.collect(Collectors.toCollection(ArrayList::new));
Collections.shuffle(brokers);
return ImmutableList.copyOf(brokers);
});
}

public String getBrokerHost(String table)
{
try {
List<String> brokers = brokersForTableCache.get(table);
if (brokers.isEmpty()) {
throw new PinotException(PINOT_UNABLE_TO_FIND_BROKER, Optional.empty(), "No valid brokers found for " + table);
return doWithRetries(DEFAULT_HTTP_RETRY_COUNT, retryNumber -> {
try {
List<String> brokers = brokersForTableCache.get(table);
if (brokers.isEmpty()) {
throw new PinotException(PINOT_UNABLE_TO_FIND_BROKER, Optional.empty(),
"No valid brokers found for " + table, true);
}
return brokers.get(ThreadLocalRandom.current().nextInt(brokers.size()));
}
return brokers.get(ThreadLocalRandom.current().nextInt(brokers.size()));
}
catch (ExecutionException e) {
Throwable throwable = e.getCause();
if (throwable instanceof PinotException) {
throw (PinotException) throwable;
catch (ExecutionException e) {
Throwable throwable = e.getCause();
if (throwable instanceof PinotException) {
throw (PinotException) throwable;
}
throw new PinotException(PINOT_UNABLE_TO_FIND_BROKER, Optional.empty(),
"Error when getting brokers for table " + table, true, throwable);
}
throw new PinotException(PINOT_UNABLE_TO_FIND_BROKER, Optional.empty(), "Error when getting brokers for table " + table, throwable);
}
});
}

public Map<String, Map<String, List<String>>> getRoutingTableForTable(String tableName)
{
Map<String, Map<String, List<String>>> routingTable = sendHttpGetToBrokerJson(tableName, format(ROUTING_TABLE_API_TEMPLATE, tableName), ROUTING_TABLE_CODEC);
ImmutableMap.Builder<String, Map<String, List<String>>> routingTableMap = ImmutableMap.builder();
for (Map.Entry<String, Map<String, List<String>>> entry : routingTable.entrySet()) {
String tableNameWithType = entry.getKey();
if (!entry.getValue().isEmpty() && tableName.equals(extractRawTableName(tableNameWithType))) {
ImmutableMap.Builder<String, List<String>> segmentBuilder = ImmutableMap.builder();
for (Map.Entry<String, List<String>> segmentEntry : entry.getValue().entrySet()) {
if (!segmentEntry.getValue().isEmpty()) {
segmentBuilder.put(segmentEntry.getKey(), segmentEntry.getValue());
return doWithRetries(DEFAULT_HTTP_RETRY_COUNT, retryNumber -> {
Map<String, Map<String, List<String>>> routingTable =
sendHttpGetToBrokerJson(tableName, format(ROUTING_TABLE_API_TEMPLATE, tableName),
ROUTING_TABLE_CODEC);
ImmutableMap.Builder<String, Map<String, List<String>>> routingTableMap = ImmutableMap.builder();
for (Map.Entry<String, Map<String, List<String>>> entry : routingTable.entrySet()) {
String tableNameWithType = entry.getKey();
if (!entry.getValue().isEmpty() && tableName.equals(extractRawTableName(tableNameWithType))) {
ImmutableMap.Builder<String, List<String>> segmentBuilder = ImmutableMap.builder();
for (Map.Entry<String, List<String>> segmentEntry : entry.getValue().entrySet()) {
if (!segmentEntry.getValue().isEmpty()) {
segmentBuilder.put(segmentEntry.getKey(), segmentEntry.getValue());
}
}
Map<String, List<String>> segmentMap = segmentBuilder.buildOrThrow();
if (!segmentMap.isEmpty()) {
routingTableMap.put(tableNameWithType, segmentMap);
}
}
Map<String, List<String>> segmentMap = segmentBuilder.buildOrThrow();
if (!segmentMap.isEmpty()) {
routingTableMap.put(tableNameWithType, segmentMap);
}
}
}
return routingTableMap.buildOrThrow();
return routingTableMap.buildOrThrow();
});
}

public static class TimeBoundary
Expand Down Expand Up @@ -458,16 +474,19 @@ public Optional<String> getOfflineTimePredicate()

public TimeBoundary getTimeBoundaryForTable(String table)
{
try {
return sendHttpGetToBrokerJson(table, format(TIME_BOUNDARY_API_TEMPLATE, table), timeBoundaryJsonCodec);
}
catch (Exception e) {
String[] errorMessageSplits = e.getMessage().split(" ");
if (errorMessageSplits.length >= 4 && errorMessageSplits[3].equalsIgnoreCase(TIME_BOUNDARY_NOT_FOUND_ERROR_CODE)) {
return timeBoundaryJsonCodec.fromJson("{}");
return doWithRetries(DEFAULT_HTTP_RETRY_COUNT, retryNumber -> {
try {
return sendHttpGetToBrokerJson(table, format(TIME_BOUNDARY_API_TEMPLATE, table), timeBoundaryJsonCodec);
}
throw e;
}
catch (Exception e) {
String[] errorMessageSplits = e.getMessage().split(" ");
if (errorMessageSplits.length >= 4 && errorMessageSplits[3].equalsIgnoreCase(
TIME_BOUNDARY_NOT_FOUND_ERROR_CODE)) {
return timeBoundaryJsonCodec.fromJson("{}");
}
throw e;
}
});
}

public static class QueryRequest
Expand Down Expand Up @@ -560,7 +579,8 @@ private BrokerResponseNative submitBrokerQueryJson(ConnectorSession session, Pin
throw new PinotException(
PINOT_EXCEPTION,
Optional.of(query.getQuery()),
format("Query %s encountered exception %s", query.getQuery(), processingExceptionMessage));
format("Query %s encountered exception %s", query.getQuery(), processingExceptionMessage),
true);
}
if (response.getNumServersQueried() == 0 || response.getNumServersResponded() == 0 || response.getNumServersQueried() > response.getNumServersResponded()) {
throw new PinotInsufficientServerResponseException(query, response.getNumServersResponded(), response.getNumServersQueried());
Expand Down Expand Up @@ -623,19 +643,38 @@ public static ResultsIterator fromResultTable(BrokerResponseNative brokerRespons
}

public static <T> T doWithRetries(int retries, Function<Integer, T> caller)
{
return doWithRetries(retries, caller, DEFAULT_RETRY_INTERVAL);
}

public static <T> T doWithRetries(int retries, Function<Integer, T> caller, int retryInterval)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not using Failsafe?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not familiar with this lib. Feel free to modify this PR or point me to the sample usage.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

{
PinotException firstError = null;
checkState(retries > 0, "Invalid num of retries %s", retries);
for (int i = 0; i < retries; ++i) {
try {
return caller.apply(i);
}
catch (PinotException e) {
if (firstError == null) {
firstError = e;
catch (Exception e) {
if (e instanceof PinotException pinotException) {
if (firstError == null) {
firstError = pinotException;
}
if (!pinotException.isRetryable()) {
throw pinotException;
}
}
else {
if (firstError == null) {
firstError = new PinotException(PINOT_UNCLASSIFIED_ERROR, Optional.empty(),
"Unexpected exception", e);
}
}
try {
Thread.sleep(retryInterval);
}
if (!e.isRetryable()) {
throw e;
catch (InterruptedException ex) {
// Sleep interrupted, ignore
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.pinot.spi.data.readers.RecordReader;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.testcontainers.shaded.org.bouncycastle.util.encoders.Hex;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import java.io.File;
Expand All @@ -73,6 +74,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

import static com.google.common.base.Preconditions.checkState;
Expand Down Expand Up @@ -145,6 +147,37 @@ protected boolean isLatestVersion()
return getPinotImageName().equals(PINOT_LATEST_IMAGE_NAME);
}

@Override
@BeforeClass
public void init()
throws Exception
{
super.init();
// Ensure test tables are available in Pinot with expected number of rows.
validateTableRows(ALL_TYPES_TABLE, MAX_ROWS_PER_SPLIT_FOR_SEGMENT_QUERIES);
validateTableRows(MIXED_CASE_COLUMN_NAMES_TABLE, 4);
validateTableRows(MIXED_CASE_DISTINCT_TABLE, 4);
validateTableRows(TOO_MANY_ROWS_TABLE, MAX_ROWS_PER_SPLIT_FOR_SEGMENT_QUERIES + 1);
validateTableRows(TOO_MANY_BROKER_ROWS_TABLE, MAX_ROWS_PER_SPLIT_FOR_BROKER_QUERIES + 1);
validateTableRows(MIXED_CASE_TABLE_NAME, 4);
validateTableRows(JSON_TABLE, 7);
validateTableRows(JSON_TYPE_TABLE, 3);
validateTableRows(RESERVED_KEYWORD_TABLE, 2);
validateTableRows(QUOTES_IN_COLUMN_NAME_TABLE, 2);
validateTableRows(DUPLICATE_VALUES_IN_COLUMNS_TABLE, 5);
validateTableRows("region", getQueryRunner().execute("SELECT * FROM tpch.tiny.region").getRowCount());
validateTableRows("nation", getQueryRunner().execute("SELECT * FROM tpch.tiny.nation").getRowCount());
}

private void validateTableRows(String tableName, int expectedRows)
{
assertQueryEventually(
getQueryRunner().getDefaultSession(),
"SELECT COUNT(*) FROM " + tableName,
"VALUES '" + expectedRows + "'",
new io.airlift.units.Duration(10, TimeUnit.SECONDS));
}

@Override
protected QueryRunner createQueryRunner()
throws Exception
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.util.List;
import java.util.function.Supplier;

Expand Down Expand Up @@ -85,51 +86,50 @@ public class TestingPinotCluster
private final GenericContainer<?> server;
private final GenericContainer<?> zookeeper;
private final HttpClient httpClient;
private final Closer closer = Closer.create();
private final boolean secured;

public TestingPinotCluster(Network network, boolean secured, String pinotImageName)
{
httpClient = closer.register(new JettyHttpClient());
httpClient = new JettyHttpClient();
zookeeper = new GenericContainer<>(parse("zookeeper:3.5.6"))
.withStartupAttempts(3)
.withStartupTimeout(Duration.ofMinutes(2))
.withNetwork(network)
.withNetworkAliases(ZOOKEEPER_INTERNAL_HOST)
.withEnv("ZOOKEEPER_CLIENT_PORT", String.valueOf(ZOOKEEPER_PORT))
.withExposedPorts(ZOOKEEPER_PORT);
closer.register(zookeeper::stop);

String controllerConfig = secured ? "/var/pinot/controller/config/pinot-controller-secured.conf" : "/var/pinot/controller/config/pinot-controller.conf";
controller = new GenericContainer<>(parse(pinotImageName))
.withStartupAttempts(3)
.withStartupTimeout(Duration.ofMinutes(2))
.withNetwork(network)
.withClasspathResourceMapping("/pinot-controller", "/var/pinot/controller/config", BindMode.READ_ONLY)
.withEnv("JAVA_OPTS", "-Xmx512m -Dlog4j2.configurationFile=/opt/pinot/conf/pinot-controller-log4j2.xml -Dplugins.dir=/opt/pinot/plugins")
.withCommand("StartController", "-configFileName", controllerConfig)
.withNetworkAliases("pinot-controller", "localhost")
.withExposedPorts(CONTROLLER_PORT);
closer.register(controller::stop);

String brokerConfig = secured ? "/var/pinot/broker/config/pinot-broker-secured.conf" : "/var/pinot/broker/config/pinot-broker.conf";
broker = new GenericContainer<>(parse(pinotImageName))
.withStartupAttempts(3)
.withStartupTimeout(Duration.ofMinutes(2))
.withNetwork(network)
.withClasspathResourceMapping("/pinot-broker", "/var/pinot/broker/config", BindMode.READ_ONLY)
.withEnv("JAVA_OPTS", "-Xmx512m -Dlog4j2.configurationFile=/opt/pinot/conf/pinot-broker-log4j2.xml -Dplugins.dir=/opt/pinot/plugins")
.withCommand("StartBroker", "-clusterName", "pinot", "-zkAddress", getZookeeperInternalHostPort(), "-configFileName", brokerConfig)
.withNetworkAliases("pinot-broker", "localhost")
.withExposedPorts(BROKER_PORT);
closer.register(broker::stop);

server = new GenericContainer<>(parse(pinotImageName))
.withStartupAttempts(3)
.withStartupTimeout(Duration.ofMinutes(2))
.withNetwork(network)
.withClasspathResourceMapping("/pinot-server", "/var/pinot/server/config", BindMode.READ_ONLY)
.withEnv("JAVA_OPTS", "-Xmx512m -Dlog4j2.configurationFile=/opt/pinot/conf/pinot-server-log4j2.xml -Dplugins.dir=/opt/pinot/plugins")
.withCommand("StartServer", "-clusterName", "pinot", "-zkAddress", getZookeeperInternalHostPort(), "-configFileName", "/var/pinot/server/config/pinot-server.conf")
.withNetworkAliases("pinot-server", "localhost")
.withExposedPorts(SERVER_PORT, SERVER_ADMIN_PORT, GRPC_PORT);
closer.register(server::stop);

this.secured = secured;
}
Expand All @@ -146,7 +146,13 @@ public void start()
public void close()
throws IOException
{
closer.close();
try (Closer closer = Closer.create()) {
closer.register(zookeeper::stop);
closer.register(controller::stop);
closer.register(broker::stop);
closer.register(server::stop);
closer.register(httpClient);
}
}

private static String getZookeeperInternalHostPort()
Expand Down