-
Notifications
You must be signed in to change notification settings - Fork 5.5k
Support retries of streaming sections #13717
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
4d35515
6df393e
a804320
117b065
9ca796d
35e204b
5e23f43
650a6d9
854d86d
4e21e81
c5e7103
2fb9a35
46aea21
098fe8c
54f258c
26016b6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -15,6 +15,7 @@ | |
|
|
||
| import com.facebook.airlift.log.Logger; | ||
| import com.facebook.presto.Session; | ||
| import com.facebook.presto.metadata.AllNodes; | ||
| import com.facebook.presto.server.testing.TestingPrestoServer; | ||
| import com.facebook.presto.spi.security.Identity; | ||
| import com.facebook.presto.spi.security.SelectedRole; | ||
|
|
@@ -25,6 +26,7 @@ | |
| import com.google.common.collect.ImmutableMap; | ||
| import com.google.common.util.concurrent.ListenableFuture; | ||
| import com.google.common.util.concurrent.ListeningExecutorService; | ||
| import io.airlift.units.Duration; | ||
| import org.intellij.lang.annotations.Language; | ||
| import org.testng.annotations.AfterClass; | ||
| import org.testng.annotations.BeforeClass; | ||
|
|
@@ -34,39 +36,43 @@ | |
| import java.util.List; | ||
| import java.util.Optional; | ||
| import java.util.OptionalLong; | ||
| import java.util.concurrent.TimeoutException; | ||
|
|
||
| import static com.facebook.presto.SystemSessionProperties.COLOCATED_JOIN; | ||
| import static com.facebook.presto.SystemSessionProperties.CONCURRENT_LIFESPANS_PER_NODE; | ||
| import static com.facebook.presto.SystemSessionProperties.DYNAMIC_SCHEDULE_FOR_GROUPED_EXECUTION; | ||
| import static com.facebook.presto.SystemSessionProperties.EXCHANGE_MATERIALIZATION_STRATEGY; | ||
| import static com.facebook.presto.SystemSessionProperties.GROUPED_EXECUTION_FOR_AGGREGATION; | ||
| import static com.facebook.presto.SystemSessionProperties.GROUPED_EXECUTION_FOR_ELIGIBLE_TABLE_SCANS; | ||
| import static com.facebook.presto.SystemSessionProperties.HASH_PARTITION_COUNT; | ||
| import static com.facebook.presto.SystemSessionProperties.MAX_STAGE_RETRIES; | ||
| import static com.facebook.presto.SystemSessionProperties.PARTITIONING_PROVIDER_CATALOG; | ||
| import static com.facebook.presto.SystemSessionProperties.RECOVERABLE_GROUPED_EXECUTION; | ||
| import static com.facebook.presto.SystemSessionProperties.REDISTRIBUTE_WRITES; | ||
| import static com.facebook.presto.SystemSessionProperties.SCALE_WRITERS; | ||
| import static com.facebook.presto.SystemSessionProperties.TASK_PARTITIONED_WRITER_COUNT; | ||
| import static com.facebook.presto.SystemSessionProperties.TASK_WRITER_COUNT; | ||
| import static com.facebook.presto.execution.TaskState.RUNNING; | ||
| import static com.facebook.presto.hive.HiveQueryRunner.HIVE_CATALOG; | ||
| import static com.facebook.presto.hive.HiveQueryRunner.TPCH_BUCKETED_SCHEMA; | ||
| import static com.facebook.presto.hive.HiveQueryRunner.createQueryRunner; | ||
| import static com.facebook.presto.hive.HiveSessionProperties.VIRTUAL_BUCKET_COUNT; | ||
| import static com.facebook.presto.spi.security.SelectedRole.Type.ALL; | ||
| import static com.facebook.presto.spi.security.SelectedRole.Type.ROLE; | ||
| import static com.facebook.presto.testing.TestingSession.testSessionBuilder; | ||
| import static com.google.common.util.concurrent.MoreExecutors.listeningDecorator; | ||
| import static io.airlift.tpch.TpchTable.ORDERS; | ||
| import static java.lang.String.format; | ||
| import static java.lang.Thread.sleep; | ||
| import static java.util.Collections.shuffle; | ||
| import static java.util.concurrent.Executors.newCachedThreadPool; | ||
| import static java.util.concurrent.TimeUnit.MILLISECONDS; | ||
| import static java.util.concurrent.TimeUnit.SECONDS; | ||
| import static java.util.stream.Collectors.toList; | ||
| import static org.testng.Assert.assertEquals; | ||
| import static org.testng.Assert.assertTrue; | ||
|
|
||
| @Test(singleThreaded = true) | ||
| public class TestHiveRecoverableGroupedExecution | ||
| public class TestHiveRecoverableExecution | ||
| { | ||
| private static final Logger log = Logger.get(TestHiveRecoverableGroupedExecution.class); | ||
| private static final Logger log = Logger.get(TestHiveRecoverableExecution.class); | ||
|
|
||
| private static final int TEST_TIMEOUT = 120_000; | ||
| private static final int INVOCATION_COUNT = 1; | ||
|
|
@@ -78,26 +84,42 @@ public class TestHiveRecoverableGroupedExecution | |
| public void setUp() | ||
| throws Exception | ||
| { | ||
| queryRunner = createQueryRunner( | ||
| queryRunner = createQueryRunner(); | ||
| executor = listeningDecorator(newCachedThreadPool()); | ||
| } | ||
|
|
||
| private DistributedQueryRunner createQueryRunner() | ||
| throws Exception | ||
| { | ||
| ImmutableMap.Builder<String, String> extraPropertiesBuilder = ImmutableMap.<String, String>builder() | ||
| // task get results timeout has to be significantly higher that the task status update timeout | ||
| .put("exchange.max-error-duration", "5m") | ||
| // set the timeout of a single HTTP request to 1s, to make sure that the task result requests are actually failing | ||
| // The default is 10s, that might not be sufficient to make sure that the request fails before the recoverable execution kicks in | ||
| .put("exchange.http-client.request-timeout", "1s"); | ||
|
|
||
| ImmutableMap.Builder<String, String> extraCoordinatorPropertiesBuilder = ImmutableMap.<String, String>builder(); | ||
|
|
||
| extraCoordinatorPropertiesBuilder | ||
| // decrease the heartbeat interval so we detect failed nodes faster | ||
| .put("failure-detector.heartbeat-interval", "1s") | ||
| .put("failure-detector.http-client.request-timeout", "500ms") | ||
| .put("failure-detector.exponential-decay-seconds", "1") | ||
| .put("failure-detector.threshold", "0.1") | ||
| // allow 2 out of 4 tasks to fail | ||
| .put("max-failed-task-percentage", "0.6") | ||
| // set the timeout of the task update requests to something low to improve overall test latency | ||
| .put("scheduler.http-client.request-timeout", "5s") | ||
| // this effectively disables the retries | ||
| .put("query.remote-task.max-error-duration", "1s"); | ||
|
|
||
| return HiveQueryRunner.createQueryRunner( | ||
| ImmutableList.of(ORDERS), | ||
| // extra properties | ||
| ImmutableMap.of( | ||
| // task get results timeout has to be significantly higher that the task status update timeout | ||
| "exchange.max-error-duration", "5m", | ||
| // set the timeout of a single HTTP request to 1s, to make sure that the task result requests are actually failing | ||
| // The default is 10s, that might not be sufficient to make sure that the request fails before the recoverable execution kicks in | ||
| "exchange.http-client.request-timeout", "1s"), | ||
| extraPropertiesBuilder.build(), | ||
| // extra coordinator properties | ||
| ImmutableMap.of( | ||
| // set the timeout of the task update requests to something low to improve overall test latency | ||
| "scheduler.http-client.request-timeout", "1s", | ||
| // this effectively disables the retries | ||
| "query.remote-task.max-error-duration", "1s", | ||
| // allow 2 out of 4 tasks to fail | ||
| "max-failed-task-percentage", "0.6"), | ||
|
|
||
| extraCoordinatorPropertiesBuilder.build(), | ||
| Optional.empty()); | ||
| executor = listeningDecorator(newCachedThreadPool()); | ||
| } | ||
|
|
||
| @AfterClass(alwaysRun = true) | ||
|
|
@@ -125,6 +147,7 @@ public void testCreateBucketedTable(int writerConcurrency) | |
| throws Exception | ||
| { | ||
| testRecoverableGroupedExecution( | ||
| queryRunner, | ||
| writerConcurrency, | ||
| ImmutableList.of( | ||
| "CREATE TABLE create_bucketed_table_1\n" + | ||
|
|
@@ -164,6 +187,7 @@ public void testInsertBucketedTable(int writerConcurrency) | |
| throws Exception | ||
| { | ||
| testRecoverableGroupedExecution( | ||
| queryRunner, | ||
| writerConcurrency, | ||
| ImmutableList.of( | ||
| "CREATE TABLE insert_bucketed_table_1\n" + | ||
|
|
@@ -207,6 +231,7 @@ public void testCreateUnbucketedTableWithGroupedExecution(int writerConcurrency) | |
| throws Exception | ||
| { | ||
| testRecoverableGroupedExecution( | ||
| queryRunner, | ||
| writerConcurrency, | ||
| ImmutableList.of( | ||
| "CREATE TABLE create_unbucketed_table_with_grouped_execution_1\n" + | ||
|
|
@@ -246,6 +271,7 @@ public void testInsertUnbucketedTableWithGroupedExecution(int writerConcurrency) | |
| throws Exception | ||
| { | ||
| testRecoverableGroupedExecution( | ||
| queryRunner, | ||
| writerConcurrency, | ||
| ImmutableList.of( | ||
| "CREATE TABLE insert_unbucketed_table_with_grouped_execution_1\n" + | ||
|
|
@@ -289,6 +315,7 @@ public void testScanFilterProjectionOnlyQueryOnUnbucketedTable(int writerConcurr | |
| throws Exception | ||
| { | ||
| testRecoverableGroupedExecution( | ||
| queryRunner, | ||
| writerConcurrency, | ||
| ImmutableList.of( | ||
| "CREATE TABLE scan_filter_projection_only_query_on_unbucketed_table AS\n" + | ||
|
|
@@ -311,6 +338,7 @@ public void testUnionAll(int writerConcurrency) | |
| throws Exception | ||
| { | ||
| testRecoverableGroupedExecution( | ||
| queryRunner, | ||
| writerConcurrency, | ||
| ImmutableList.of( | ||
| "CREATE TABLE test_union_all AS\n" + | ||
|
|
@@ -332,7 +360,30 @@ public void testUnionAll(int writerConcurrency) | |
| "DROP TABLE IF EXISTS test_union_all_failure")); | ||
| } | ||
|
|
||
| @Test(invocationCount = INVOCATION_COUNT) | ||
| public void testCountOnUnbucketedTable() | ||
| throws Exception | ||
| { | ||
| testRecoverableGroupedExecution( | ||
rschlussel marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| queryRunner, | ||
| 4, | ||
| ImmutableList.of( | ||
| "CREATE TABLE test_table AS\n" + | ||
| "SELECT orderkey, comment\n" + | ||
| "FROM orders\n"), | ||
| "CREATE TABLE test_success AS\n" + | ||
| "SELECT count(*) as a, comment FROM test_table group by comment", | ||
rschlussel marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| "create table test_failure AS\n" + | ||
| "SELECT count(*) as a, comment FROM test_table group by comment", | ||
| 14995, // there are 14995 distinct comments in the orders table | ||
| ImmutableList.of( | ||
| "DROP TABLE IF EXISTS test_table", | ||
| "DROP TABLE IF EXISTS test_success", | ||
| "DROP TABLE IF EXISTS test_failure")); | ||
| } | ||
|
|
||
| private void testRecoverableGroupedExecution( | ||
| DistributedQueryRunner queryRunner, | ||
| int writerConcurrency, | ||
| List<String> preQueries, | ||
| @Language("SQL") String queryWithoutFailure, | ||
|
|
@@ -341,7 +392,12 @@ private void testRecoverableGroupedExecution( | |
| List<String> postQueries) | ||
| throws Exception | ||
| { | ||
| waitUntilAllNodesAreHealthy(queryRunner, new Duration(10, SECONDS)); | ||
|
|
||
| Session recoverableSession = createRecoverableSession(writerConcurrency); | ||
| for (@Language("SQL") String postQuery : postQueries) { | ||
| queryRunner.execute(recoverableSession, postQuery); | ||
| } | ||
| try { | ||
| for (@Language("SQL") String preQuery : preQueries) { | ||
| queryRunner.execute(recoverableSession, preQuery); | ||
|
|
@@ -371,24 +427,10 @@ private void testRecoverableGroupedExecution( | |
|
|
||
| // kill worker2 only after the task has been scheduled | ||
| TestingPrestoServer worker2 = workers.get(1); | ||
| Stopwatch stopwatch = Stopwatch.createStarted(); | ||
| while (true) { | ||
| // wait for a while | ||
| sleep(500); | ||
|
|
||
| // if the task is already running - move ahead | ||
| if (hasTaskRunning(worker2)) { | ||
| break; | ||
| } | ||
|
|
||
| // don't fail the test if task execution already finished | ||
| if (stopwatch.elapsed(SECONDS) > 5) { | ||
| break; | ||
| } | ||
| } | ||
| sleep(1000); | ||
| worker2.stopResponding(); | ||
|
|
||
| assertEquals(result.get(60, SECONDS).getUpdateCount(), OptionalLong.of(expectedUpdateCount)); | ||
| assertEquals(result.get(1000, SECONDS).getUpdateCount(), OptionalLong.of(expectedUpdateCount)); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. question: It's 15 minutes. Why do we need such a large timeouts? I remember running these tests, and they were finishing withing ~20 seconds. Has it changed?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hasn't changed. 15 minutes there is unintentional. I can fix it in a follow up. |
||
| log.info("Query with recovery took %sms", recoveryStopwatch.elapsed(MILLISECONDS)); | ||
| } | ||
| finally { | ||
|
|
@@ -401,27 +443,34 @@ private void testRecoverableGroupedExecution( | |
| } | ||
| } | ||
|
|
||
| private static void waitUntilAllNodesAreHealthy(DistributedQueryRunner queryRunner, Duration timeout) | ||
| throws TimeoutException, InterruptedException | ||
| { | ||
| TestingPrestoServer coordinator = queryRunner.getCoordinator(); | ||
| long deadline = System.currentTimeMillis() + timeout.toMillis(); | ||
| while (System.currentTimeMillis() < deadline) { | ||
| AllNodes allNodes = coordinator.refreshNodes(); | ||
| if (allNodes.getActiveNodes().size() == queryRunner.getNodeCount()) { | ||
| return; | ||
| } | ||
| sleep(1000); | ||
| } | ||
| throw new TimeoutException(format("one of the nodes is still missing after: %s", timeout)); | ||
| } | ||
|
|
||
| private static void cancelAllQueries(DistributedQueryRunner queryRunner) | ||
| { | ||
| queryRunner.getQueries().forEach(query -> queryRunner.getCoordinator().getQueryManager().cancelQuery(query.getQueryId())); | ||
| queryRunner.getQueries().forEach(query -> assertTrue(query.getState().isDone())); | ||
| } | ||
|
|
||
| private static void cancelAllTasks(DistributedQueryRunner queryRunner) | ||
| { | ||
| queryRunner.getServers().forEach(TestHiveRecoverableGroupedExecution::cancelAllTasks); | ||
| queryRunner.getServers().forEach(TestHiveRecoverableExecution::cancelAllTasks); | ||
| } | ||
|
|
||
| private static void cancelAllTasks(TestingPrestoServer server) | ||
| { | ||
| server.getTaskManager().getAllTaskInfo().forEach(task -> server.getTaskManager().cancelTask(task.getTaskStatus().getTaskId())); | ||
| server.getTaskManager().getAllTaskInfo().forEach(task -> assertTrue(task.getTaskStatus().getState().isDone())); | ||
| } | ||
|
|
||
| private static boolean hasTaskRunning(TestingPrestoServer server) | ||
| { | ||
| return server.getTaskManager().getAllTaskInfo().stream() | ||
| .anyMatch(taskInfo -> taskInfo.getTaskStatus().getState() == RUNNING); | ||
| } | ||
|
|
||
| private static Session createRecoverableSession(int writerConcurrency) | ||
|
|
@@ -446,6 +495,10 @@ private static Session createRecoverableSession(int writerConcurrency) | |
| .setSystemProperty(GROUPED_EXECUTION_FOR_ELIGIBLE_TABLE_SCANS, "true") | ||
| .setSystemProperty(TASK_WRITER_COUNT, Integer.toString(writerConcurrency)) | ||
| .setSystemProperty(TASK_PARTITIONED_WRITER_COUNT, Integer.toString(writerConcurrency)) | ||
| .setSystemProperty(PARTITIONING_PROVIDER_CATALOG, "hive") | ||
| .setSystemProperty(EXCHANGE_MATERIALIZATION_STRATEGY, ALL.name()) | ||
| .setSystemProperty(HASH_PARTITION_COUNT, "11") | ||
| .setSystemProperty(MAX_STAGE_RETRIES, "4") | ||
| .setCatalogSessionProperty(HIVE_CATALOG, VIRTUAL_BUCKET_COUNT, "16") | ||
| .setCatalog(HIVE_CATALOG) | ||
| .setSchema(TPCH_BUCKETED_SCHEMA) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Super tiny nit, we can remove these formatting-only changes