From 8ce64efbb2102a56ecd12f0dda7a91961e461053 Mon Sep 17 00:00:00 2001 From: Michael Berlin Date: Fri, 26 Aug 2016 11:39:05 -0700 Subject: [PATCH] test: worker.py: Fix worker test flakiness. The test failed because a vtworker retry was only triggered for the first shard and not the second one. That's because the rows are not well balanced across both destination shards when you read them in primary key order. The rows of the first shard come first, then the rest. This problem was exposed due to my recent decoupling of the number of chunks and the number of concurrent chunks processed. Before this fix, only one out of 6 chunks was processed at a time. That means, vtworker didn't write to the second destination shard while the test triggered the reparent. Other changes: - Reduced the number of rows for the reparent test. On my laptop this reduced the clone duration from ~50 seconds to ~10 seconds. That's long enough to issue a reparent in between. - Wait for the destination replicas to catch up explicitly because it may take several seconds. (We write 1 row/query and therefore it takes so long to catch up.) --- test/worker.py | 54 +++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 42 insertions(+), 12 deletions(-) diff --git a/test/worker.py b/test/worker.py index dab95d19050..1038d31a95a 100755 --- a/test/worker.py +++ b/test/worker.py @@ -298,6 +298,10 @@ def insert_values(self, vttablet, num_values, num_shards, offset=0, """ shard_width = keyspace_id_range / num_shards shard_offsets = [i * shard_width for i in xrange(num_shards)] + # TODO(mberlin): Change the "id" column values from the keyspace id to a + # counter starting at 1. The incrementing ids must + # alternate between the two shards. Without this, the + # vtworker chunking won't be well balanced across shards. for shard_num in xrange(num_shards): self._insert_values( vttablet, @@ -441,15 +445,29 @@ def verify_successful_worker_copy_with_reparent(self, mysql_down=False): # --max_tps is only specified to enable the throttler and ensure that the # code is executed. But the intent here is not to throttle the test, hence # the rate limit is set very high. - workerclient_proc = utils.run_vtworker_client_bg( - ['SplitClone', - '--source_reader_count', '1', - '--destination_writer_count', '1', - '--write_query_max_rows', '1', - '--min_healthy_rdonly_tablets', '1', - '--max_tps', '9999', - 'test_keyspace/0'], - worker_rpc_port) + # --chunk_count is 2 because rows are currently ordered by primary key such + # that all rows of the first shard come first and then the second shard. + # TODO(mberlin): Remove --offline=false once vtworker ensures that the + # destination shards are not behind the master's replication + # position. + args = ['SplitClone', + '--offline=false', + '--destination_writer_count', '1', + '--min_healthy_rdonly_tablets', '1', + '--max_tps', '9999'] + if not mysql_down: + # Make the clone as slow as necessary such that there is enough time to + # run PlannedReparent in the meantime. + # TOOD(mberlin): Once insert_values is fixed to uniformly distribute the + # rows across shards when sorted by primary key, remove + # --chunk_count 2, --min_rows_per_chunk 1 and set + # --source_reader_count back to 1. + args.extend(['--source_reader_count', '2', + '--chunk_count', '2', + '--min_rows_per_chunk', '1', + '--write_query_max_rows', '1']) + args.append('test_keyspace/0') + workerclient_proc = utils.run_vtworker_client_bg(args, worker_rpc_port) if mysql_down: # If MySQL is down, we wait until vtworker retried at least once to make @@ -515,12 +533,24 @@ def verify_successful_worker_copy_with_reparent(self, mysql_down=False): # Verify that we were forced to re-resolve and retry. worker_vars = utils.get_vars(worker_port) - # There should be two retries at least, one for each destination shard. - self.assertGreater(worker_vars['WorkerRetryCount'], 1) + self.assertGreater(worker_vars['WorkerRetryCount'], 1, + "expected vtworker to retry each of the two reparented" + " destination masters at least once, but it didn't") self.assertNotEqual(worker_vars['WorkerRetryCount'], {}, "expected vtworker to retry, but it didn't") utils.kill_sub_process(worker_proc, soft=True) + # Wait for the destination RDONLYs to catch up or the following offline + # clone will try to insert rows which already exist. + # TODO(mberlin): Remove this once SplitClone supports it natively. + utils.wait_for_replication_pos(shard_0_replica, shard_0_rdonly1) + utils.wait_for_replication_pos(shard_1_replica, shard_1_rdonly1) + # Run final offline clone to enable filtered replication. + _, _ = utils.run_vtworker(['-cell', 'test_nj', 'SplitClone', + '--online=false', + '--min_healthy_rdonly_tablets', '1', + 'test_keyspace/0'], auto_log=True) + # Make sure that everything is caught up to the same replication point self.run_split_diff('test_keyspace/-80', all_shard_tablets, shard_0_tablets) self.run_split_diff('test_keyspace/80-', all_shard_tablets, shard_1_tablets) @@ -635,7 +665,7 @@ def add_test_options(parser): help='The number of rows, per shard, that we should insert before ' 'resharding for this test.') parser.add_option( - '--num_insert_rows_before_reparent_test', type='int', default=3000, + '--num_insert_rows_before_reparent_test', type='int', default=4500, help='The number of rows, per shard, that we should insert before ' 'running TestReparentDuringWorkerCopy (supersedes --num_insert_rows in ' 'that test). There must be enough rows such that SplitClone takes '