diff --git a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/client/PinotClient.java b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/client/PinotClient.java index 0818e030e15..215ef09e890 100755 --- a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/client/PinotClient.java +++ b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/client/PinotClient.java @@ -93,6 +93,7 @@ import static io.trino.plugin.pinot.PinotErrorCode.PINOT_AMBIGUOUS_TABLE_NAME; import static io.trino.plugin.pinot.PinotErrorCode.PINOT_EXCEPTION; import static io.trino.plugin.pinot.PinotErrorCode.PINOT_UNABLE_TO_FIND_BROKER; +import static io.trino.plugin.pinot.PinotErrorCode.PINOT_UNCLASSIFIED_ERROR; import static io.trino.plugin.pinot.PinotMetadata.SCHEMA_NAME; import static java.lang.String.format; import static java.util.Locale.ENGLISH; @@ -117,6 +118,8 @@ public class PinotClient private static final String ROUTING_TABLE_API_TEMPLATE = "debug/routingTable/%s"; private static final String TIME_BOUNDARY_API_TEMPLATE = "debug/timeBoundary/%s"; private static final String QUERY_URL_PATH = "query/sql"; + private static final int DEFAULT_HTTP_RETRY_COUNT = 10; + private static final int DEFAULT_RETRY_INTERVAL = 1000; private final List controllerUrls; private final HttpClient httpClient; @@ -270,18 +273,21 @@ public List getTables() protected Multimap getAllTables() { - List allTables = sendHttpGetToControllerJson(GET_ALL_TABLES_API_TEMPLATE, tablesJsonCodec).getTables(); - ImmutableListMultimap.Builder builder = ImmutableListMultimap.builder(); - for (String table : allTables) { - builder.put(table.toLowerCase(ENGLISH), table); - } - return builder.build(); + return doWithRetries(DEFAULT_HTTP_RETRY_COUNT, retryNumber -> { + List allTables = + sendHttpGetToControllerJson(GET_ALL_TABLES_API_TEMPLATE, tablesJsonCodec).getTables(); + ImmutableListMultimap.Builder builder = ImmutableListMultimap.builder(); + for (String table : allTables) { + builder.put(table.toLowerCase(ENGLISH), table); + } + return builder.build(); + }); } public Schema getTableSchema(String table) throws Exception { - return sendHttpGetToControllerJson(format(TABLE_SCHEMA_API_TEMPLATE, table), schemaJsonCodec); + return doWithRetries(DEFAULT_HTTP_RETRY_COUNT, retryNumber -> sendHttpGetToControllerJson(format(TABLE_SCHEMA_API_TEMPLATE, table), schemaJsonCodec)); } public List getPinotTableNames() @@ -361,63 +367,73 @@ public List getBrokers() @VisibleForTesting public List getAllBrokersForTable(String table) { - ArrayList brokers = sendHttpGetToControllerJson(format(TABLE_INSTANCES_API_TEMPLATE, table), brokersForTableJsonCodec) - .getBrokers().stream() - .flatMap(broker -> broker.getInstances().stream()) - .distinct() - .map(brokerToParse -> { - Matcher matcher = BROKER_PATTERN.matcher(brokerToParse); - if (matcher.matches() && matcher.groupCount() == 2) { - return pinotHostMapper.getBrokerHost(matcher.group(1), matcher.group(2)); - } - throw new PinotException( - PINOT_UNABLE_TO_FIND_BROKER, - Optional.empty(), - format("Cannot parse %s in the broker instance", brokerToParse)); - }) - .collect(Collectors.toCollection(ArrayList::new)); - Collections.shuffle(brokers); - return ImmutableList.copyOf(brokers); + return doWithRetries(DEFAULT_HTTP_RETRY_COUNT, retryNumber -> { + ArrayList brokers = sendHttpGetToControllerJson(format(TABLE_INSTANCES_API_TEMPLATE, table), brokersForTableJsonCodec) + .getBrokers().stream() + .flatMap(broker -> broker.getInstances().stream()) + .distinct() + .map(brokerToParse -> { + Matcher matcher = BROKER_PATTERN.matcher(brokerToParse); + if (matcher.matches() && matcher.groupCount() == 2) { + return pinotHostMapper.getBrokerHost(matcher.group(1), matcher.group(2)); + } + throw new PinotException( + PINOT_UNABLE_TO_FIND_BROKER, + Optional.empty(), + format("Cannot parse %s in the broker instance", brokerToParse)); + }) + .collect(Collectors.toCollection(ArrayList::new)); + Collections.shuffle(brokers); + return ImmutableList.copyOf(brokers); + }); } public String getBrokerHost(String table) { - try { - List brokers = brokersForTableCache.get(table); - if (brokers.isEmpty()) { - throw new PinotException(PINOT_UNABLE_TO_FIND_BROKER, Optional.empty(), "No valid brokers found for " + table); + return doWithRetries(DEFAULT_HTTP_RETRY_COUNT, retryNumber -> { + try { + List brokers = brokersForTableCache.get(table); + if (brokers.isEmpty()) { + throw new PinotException(PINOT_UNABLE_TO_FIND_BROKER, Optional.empty(), + "No valid brokers found for " + table, true); + } + return brokers.get(ThreadLocalRandom.current().nextInt(brokers.size())); } - return brokers.get(ThreadLocalRandom.current().nextInt(brokers.size())); - } - catch (ExecutionException e) { - Throwable throwable = e.getCause(); - if (throwable instanceof PinotException) { - throw (PinotException) throwable; + catch (ExecutionException e) { + Throwable throwable = e.getCause(); + if (throwable instanceof PinotException) { + throw (PinotException) throwable; + } + throw new PinotException(PINOT_UNABLE_TO_FIND_BROKER, Optional.empty(), + "Error when getting brokers for table " + table, true, throwable); } - throw new PinotException(PINOT_UNABLE_TO_FIND_BROKER, Optional.empty(), "Error when getting brokers for table " + table, throwable); - } + }); } public Map>> getRoutingTableForTable(String tableName) { - Map>> routingTable = sendHttpGetToBrokerJson(tableName, format(ROUTING_TABLE_API_TEMPLATE, tableName), ROUTING_TABLE_CODEC); - ImmutableMap.Builder>> routingTableMap = ImmutableMap.builder(); - for (Map.Entry>> entry : routingTable.entrySet()) { - String tableNameWithType = entry.getKey(); - if (!entry.getValue().isEmpty() && tableName.equals(extractRawTableName(tableNameWithType))) { - ImmutableMap.Builder> segmentBuilder = ImmutableMap.builder(); - for (Map.Entry> segmentEntry : entry.getValue().entrySet()) { - if (!segmentEntry.getValue().isEmpty()) { - segmentBuilder.put(segmentEntry.getKey(), segmentEntry.getValue()); + return doWithRetries(DEFAULT_HTTP_RETRY_COUNT, retryNumber -> { + Map>> routingTable = + sendHttpGetToBrokerJson(tableName, format(ROUTING_TABLE_API_TEMPLATE, tableName), + ROUTING_TABLE_CODEC); + ImmutableMap.Builder>> routingTableMap = ImmutableMap.builder(); + for (Map.Entry>> entry : routingTable.entrySet()) { + String tableNameWithType = entry.getKey(); + if (!entry.getValue().isEmpty() && tableName.equals(extractRawTableName(tableNameWithType))) { + ImmutableMap.Builder> segmentBuilder = ImmutableMap.builder(); + for (Map.Entry> segmentEntry : entry.getValue().entrySet()) { + if (!segmentEntry.getValue().isEmpty()) { + segmentBuilder.put(segmentEntry.getKey(), segmentEntry.getValue()); + } + } + Map> segmentMap = segmentBuilder.buildOrThrow(); + if (!segmentMap.isEmpty()) { + routingTableMap.put(tableNameWithType, segmentMap); } - } - Map> segmentMap = segmentBuilder.buildOrThrow(); - if (!segmentMap.isEmpty()) { - routingTableMap.put(tableNameWithType, segmentMap); } } - } - return routingTableMap.buildOrThrow(); + return routingTableMap.buildOrThrow(); + }); } public static class TimeBoundary @@ -458,16 +474,19 @@ public Optional getOfflineTimePredicate() public TimeBoundary getTimeBoundaryForTable(String table) { - try { - return sendHttpGetToBrokerJson(table, format(TIME_BOUNDARY_API_TEMPLATE, table), timeBoundaryJsonCodec); - } - catch (Exception e) { - String[] errorMessageSplits = e.getMessage().split(" "); - if (errorMessageSplits.length >= 4 && errorMessageSplits[3].equalsIgnoreCase(TIME_BOUNDARY_NOT_FOUND_ERROR_CODE)) { - return timeBoundaryJsonCodec.fromJson("{}"); + return doWithRetries(DEFAULT_HTTP_RETRY_COUNT, retryNumber -> { + try { + return sendHttpGetToBrokerJson(table, format(TIME_BOUNDARY_API_TEMPLATE, table), timeBoundaryJsonCodec); } - throw e; - } + catch (Exception e) { + String[] errorMessageSplits = e.getMessage().split(" "); + if (errorMessageSplits.length >= 4 && errorMessageSplits[3].equalsIgnoreCase( + TIME_BOUNDARY_NOT_FOUND_ERROR_CODE)) { + return timeBoundaryJsonCodec.fromJson("{}"); + } + throw e; + } + }); } public static class QueryRequest @@ -560,7 +579,8 @@ private BrokerResponseNative submitBrokerQueryJson(ConnectorSession session, Pin throw new PinotException( PINOT_EXCEPTION, Optional.of(query.getQuery()), - format("Query %s encountered exception %s", query.getQuery(), processingExceptionMessage)); + format("Query %s encountered exception %s", query.getQuery(), processingExceptionMessage), + true); } if (response.getNumServersQueried() == 0 || response.getNumServersResponded() == 0 || response.getNumServersQueried() > response.getNumServersResponded()) { throw new PinotInsufficientServerResponseException(query, response.getNumServersResponded(), response.getNumServersQueried()); @@ -623,6 +643,11 @@ public static ResultsIterator fromResultTable(BrokerResponseNative brokerRespons } public static T doWithRetries(int retries, Function caller) + { + return doWithRetries(retries, caller, DEFAULT_RETRY_INTERVAL); + } + + public static T doWithRetries(int retries, Function caller, int retryInterval) { PinotException firstError = null; checkState(retries > 0, "Invalid num of retries %s", retries); @@ -630,12 +655,26 @@ public static T doWithRetries(int retries, Function caller) try { return caller.apply(i); } - catch (PinotException e) { - if (firstError == null) { - firstError = e; + catch (Exception e) { + if (e instanceof PinotException pinotException) { + if (firstError == null) { + firstError = pinotException; + } + if (!pinotException.isRetryable()) { + throw pinotException; + } + } + else { + if (firstError == null) { + firstError = new PinotException(PINOT_UNCLASSIFIED_ERROR, Optional.empty(), + "Unexpected exception", e); + } + } + try { + Thread.sleep(retryInterval); } - if (!e.isRetryable()) { - throw e; + catch (InterruptedException ex) { + // Sleep interrupted, ignore } } } diff --git a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/BasePinotIntegrationConnectorSmokeTest.java b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/BasePinotIntegrationConnectorSmokeTest.java index 6e27e2c99d0..ec681f26e1f 100644 --- a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/BasePinotIntegrationConnectorSmokeTest.java +++ b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/BasePinotIntegrationConnectorSmokeTest.java @@ -56,6 +56,7 @@ import org.apache.pinot.spi.data.readers.RecordReader; import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.testcontainers.shaded.org.bouncycastle.util.encoders.Hex; +import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; import java.io.File; @@ -73,6 +74,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.TimeUnit; import java.util.stream.Stream; import static com.google.common.base.Preconditions.checkState; @@ -145,6 +147,37 @@ protected boolean isLatestVersion() return getPinotImageName().equals(PINOT_LATEST_IMAGE_NAME); } + @Override + @BeforeClass + public void init() + throws Exception + { + super.init(); + // Ensure test tables are available in Pinot with expected number of rows. + validateTableRows(ALL_TYPES_TABLE, MAX_ROWS_PER_SPLIT_FOR_SEGMENT_QUERIES); + validateTableRows(MIXED_CASE_COLUMN_NAMES_TABLE, 4); + validateTableRows(MIXED_CASE_DISTINCT_TABLE, 4); + validateTableRows(TOO_MANY_ROWS_TABLE, MAX_ROWS_PER_SPLIT_FOR_SEGMENT_QUERIES + 1); + validateTableRows(TOO_MANY_BROKER_ROWS_TABLE, MAX_ROWS_PER_SPLIT_FOR_BROKER_QUERIES + 1); + validateTableRows(MIXED_CASE_TABLE_NAME, 4); + validateTableRows(JSON_TABLE, 7); + validateTableRows(JSON_TYPE_TABLE, 3); + validateTableRows(RESERVED_KEYWORD_TABLE, 2); + validateTableRows(QUOTES_IN_COLUMN_NAME_TABLE, 2); + validateTableRows(DUPLICATE_VALUES_IN_COLUMNS_TABLE, 5); + validateTableRows("region", getQueryRunner().execute("SELECT * FROM tpch.tiny.region").getRowCount()); + validateTableRows("nation", getQueryRunner().execute("SELECT * FROM tpch.tiny.nation").getRowCount()); + } + + private void validateTableRows(String tableName, int expectedRows) + { + assertQueryEventually( + getQueryRunner().getDefaultSession(), + "SELECT COUNT(*) FROM " + tableName, + "VALUES '" + expectedRows + "'", + new io.airlift.units.Duration(10, TimeUnit.SECONDS)); + } + @Override protected QueryRunner createQueryRunner() throws Exception diff --git a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestingPinotCluster.java b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestingPinotCluster.java index 8a6ffd9047f..1e51e252e8d 100644 --- a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestingPinotCluster.java +++ b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestingPinotCluster.java @@ -48,6 +48,7 @@ import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; +import java.time.Duration; import java.util.List; import java.util.function.Supplier; @@ -85,51 +86,50 @@ public class TestingPinotCluster private final GenericContainer server; private final GenericContainer zookeeper; private final HttpClient httpClient; - private final Closer closer = Closer.create(); private final boolean secured; public TestingPinotCluster(Network network, boolean secured, String pinotImageName) { - httpClient = closer.register(new JettyHttpClient()); + httpClient = new JettyHttpClient(); zookeeper = new GenericContainer<>(parse("zookeeper:3.5.6")) .withStartupAttempts(3) + .withStartupTimeout(Duration.ofMinutes(2)) .withNetwork(network) .withNetworkAliases(ZOOKEEPER_INTERNAL_HOST) .withEnv("ZOOKEEPER_CLIENT_PORT", String.valueOf(ZOOKEEPER_PORT)) .withExposedPorts(ZOOKEEPER_PORT); - closer.register(zookeeper::stop); String controllerConfig = secured ? "/var/pinot/controller/config/pinot-controller-secured.conf" : "/var/pinot/controller/config/pinot-controller.conf"; controller = new GenericContainer<>(parse(pinotImageName)) .withStartupAttempts(3) + .withStartupTimeout(Duration.ofMinutes(2)) .withNetwork(network) .withClasspathResourceMapping("/pinot-controller", "/var/pinot/controller/config", BindMode.READ_ONLY) .withEnv("JAVA_OPTS", "-Xmx512m -Dlog4j2.configurationFile=/opt/pinot/conf/pinot-controller-log4j2.xml -Dplugins.dir=/opt/pinot/plugins") .withCommand("StartController", "-configFileName", controllerConfig) .withNetworkAliases("pinot-controller", "localhost") .withExposedPorts(CONTROLLER_PORT); - closer.register(controller::stop); String brokerConfig = secured ? "/var/pinot/broker/config/pinot-broker-secured.conf" : "/var/pinot/broker/config/pinot-broker.conf"; broker = new GenericContainer<>(parse(pinotImageName)) .withStartupAttempts(3) + .withStartupTimeout(Duration.ofMinutes(2)) .withNetwork(network) .withClasspathResourceMapping("/pinot-broker", "/var/pinot/broker/config", BindMode.READ_ONLY) .withEnv("JAVA_OPTS", "-Xmx512m -Dlog4j2.configurationFile=/opt/pinot/conf/pinot-broker-log4j2.xml -Dplugins.dir=/opt/pinot/plugins") .withCommand("StartBroker", "-clusterName", "pinot", "-zkAddress", getZookeeperInternalHostPort(), "-configFileName", brokerConfig) .withNetworkAliases("pinot-broker", "localhost") .withExposedPorts(BROKER_PORT); - closer.register(broker::stop); server = new GenericContainer<>(parse(pinotImageName)) .withStartupAttempts(3) + .withStartupTimeout(Duration.ofMinutes(2)) .withNetwork(network) .withClasspathResourceMapping("/pinot-server", "/var/pinot/server/config", BindMode.READ_ONLY) .withEnv("JAVA_OPTS", "-Xmx512m -Dlog4j2.configurationFile=/opt/pinot/conf/pinot-server-log4j2.xml -Dplugins.dir=/opt/pinot/plugins") .withCommand("StartServer", "-clusterName", "pinot", "-zkAddress", getZookeeperInternalHostPort(), "-configFileName", "/var/pinot/server/config/pinot-server.conf") .withNetworkAliases("pinot-server", "localhost") .withExposedPorts(SERVER_PORT, SERVER_ADMIN_PORT, GRPC_PORT); - closer.register(server::stop); this.secured = secured; } @@ -146,7 +146,13 @@ public void start() public void close() throws IOException { - closer.close(); + try (Closer closer = Closer.create()) { + closer.register(zookeeper::stop); + closer.register(controller::stop); + closer.register(broker::stop); + closer.register(server::stop); + closer.register(httpClient); + } } private static String getZookeeperInternalHostPort()