Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 42 additions & 12 deletions test/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,10 @@ def insert_values(self, vttablet, num_values, num_shards, offset=0,
"""
shard_width = keyspace_id_range / num_shards
shard_offsets = [i * shard_width for i in xrange(num_shards)]
# TODO(mberlin): Change the "id" column values from the keyspace id to a
# counter starting at 1. The incrementing ids must
# alternate between the two shards. Without this, the
# vtworker chunking won't be well balanced across shards.
for shard_num in xrange(num_shards):
self._insert_values(
vttablet,
Expand Down Expand Up @@ -441,15 +445,29 @@ def verify_successful_worker_copy_with_reparent(self, mysql_down=False):
# --max_tps is only specified to enable the throttler and ensure that the
# code is executed. But the intent here is not to throttle the test, hence
# the rate limit is set very high.
workerclient_proc = utils.run_vtworker_client_bg(
['SplitClone',
'--source_reader_count', '1',
'--destination_writer_count', '1',
'--write_query_max_rows', '1',
'--min_healthy_rdonly_tablets', '1',
'--max_tps', '9999',
'test_keyspace/0'],
worker_rpc_port)
# --chunk_count is 2 because rows are currently ordered by primary key such
# that all rows of the first shard come first and then the second shard.
# TODO(mberlin): Remove --offline=false once vtworker ensures that the
# destination shards are not behind the master's replication
# position.
args = ['SplitClone',
'--offline=false',
'--destination_writer_count', '1',
'--min_healthy_rdonly_tablets', '1',
'--max_tps', '9999']
if not mysql_down:
# Make the clone as slow as necessary such that there is enough time to
# run PlannedReparent in the meantime.
# TOOD(mberlin): Once insert_values is fixed to uniformly distribute the
# rows across shards when sorted by primary key, remove
# --chunk_count 2, --min_rows_per_chunk 1 and set
# --source_reader_count back to 1.
args.extend(['--source_reader_count', '2',
'--chunk_count', '2',
'--min_rows_per_chunk', '1',
'--write_query_max_rows', '1'])
args.append('test_keyspace/0')
workerclient_proc = utils.run_vtworker_client_bg(args, worker_rpc_port)

if mysql_down:
# If MySQL is down, we wait until vtworker retried at least once to make
Expand Down Expand Up @@ -515,12 +533,24 @@ def verify_successful_worker_copy_with_reparent(self, mysql_down=False):

# Verify that we were forced to re-resolve and retry.
worker_vars = utils.get_vars(worker_port)
# There should be two retries at least, one for each destination shard.
self.assertGreater(worker_vars['WorkerRetryCount'], 1)
self.assertGreater(worker_vars['WorkerRetryCount'], 1,
"expected vtworker to retry each of the two reparented"
" destination masters at least once, but it didn't")
self.assertNotEqual(worker_vars['WorkerRetryCount'], {},
"expected vtworker to retry, but it didn't")
utils.kill_sub_process(worker_proc, soft=True)

# Wait for the destination RDONLYs to catch up or the following offline
# clone will try to insert rows which already exist.
# TODO(mberlin): Remove this once SplitClone supports it natively.
utils.wait_for_replication_pos(shard_0_replica, shard_0_rdonly1)
utils.wait_for_replication_pos(shard_1_replica, shard_1_rdonly1)
# Run final offline clone to enable filtered replication.
_, _ = utils.run_vtworker(['-cell', 'test_nj', 'SplitClone',
'--online=false',
'--min_healthy_rdonly_tablets', '1',
'test_keyspace/0'], auto_log=True)

# Make sure that everything is caught up to the same replication point
self.run_split_diff('test_keyspace/-80', all_shard_tablets, shard_0_tablets)
self.run_split_diff('test_keyspace/80-', all_shard_tablets, shard_1_tablets)
Expand Down Expand Up @@ -635,7 +665,7 @@ def add_test_options(parser):
help='The number of rows, per shard, that we should insert before '
'resharding for this test.')
parser.add_option(
'--num_insert_rows_before_reparent_test', type='int', default=3000,
'--num_insert_rows_before_reparent_test', type='int', default=4500,
help='The number of rows, per shard, that we should insert before '
'running TestReparentDuringWorkerCopy (supersedes --num_insert_rows in '
'that test). There must be enough rows such that SplitClone takes '
Expand Down