diff --git a/go/vt/vtgate/gateway/discoverygateway.go b/go/vt/vtgate/gateway/discoverygateway.go index e46d0abf82d..1bb2664b582 100644 --- a/go/vt/vtgate/gateway/discoverygateway.go +++ b/go/vt/vtgate/gateway/discoverygateway.go @@ -34,7 +34,6 @@ import ( "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vtgate/buffer" - "vitess.io/vitess/go/vt/vtgate/masterbuffer" "vitess.io/vitess/go/vt/vttablet/queryservice" querypb "vitess.io/vitess/go/vt/proto/query" @@ -275,11 +274,6 @@ func (dg *discoveryGateway) withRetry(ctx context.Context, target *querypb.Targe continue } - // Potentially buffer this request. - if bufferErr := masterbuffer.FakeBuffer(target, inTransaction, i); bufferErr != nil { - return bufferErr - } - startTime := time.Now() var canRetry bool err, canRetry = inner(ctx, ts.Target, conn) diff --git a/go/vt/vtgate/masterbuffer/masterbuffer.go b/go/vt/vtgate/masterbuffer/masterbuffer.go deleted file mode 100644 index 0078ecc00cf..00000000000 --- a/go/vt/vtgate/masterbuffer/masterbuffer.go +++ /dev/null @@ -1,95 +0,0 @@ -/* -Copyright 2017 Google Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -/* -Package masterbuffer contains experimental logic to buffer master requests in VTGate. -Only statements outside of transactinos will be buffered (including the initial Begin -to start a transaction). - -The reason why it might be useful to buffer master requests is during failovers: -the master vttablet can become unavailable for a few seconds. Upstream clients -(e.g., web workers) might not retry on failures, and instead may prefer for VTGate to wait for -a few seconds for the failover to complete. Thiis will block upstream callers for that time, -but will not return transient errors during the buffering time. -*/ -package masterbuffer - -import ( - "flag" - "sync" - "time" - - "vitess.io/vitess/go/stats" - "vitess.io/vitess/go/vt/vterrors" - - querypb "vitess.io/vitess/go/vt/proto/query" - topodatapb "vitess.io/vitess/go/vt/proto/topodata" - vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" -) - -var ( - enableFakeMasterBuffer = flag.Bool("enable_fake_master_buffer", false, "Enable fake master buffering.") - bufferKeyspace = flag.String("buffer_keyspace", "", "The name of the keyspace to buffer master requests on.") - bufferShard = flag.String("buffer_shard", "", "The name of the shard to buffer master requests on.") - maxBufferSize = flag.Int("max_buffer_size", 10, "The maximum number of master requests to buffer at a time.") - fakeBufferDelay = flag.Duration("fake_buffer_delay", 1*time.Second, "The amount of time that we should delay all master requests for, to fake a buffer.") - - bufferedRequestsAttempted = stats.NewInt("BufferedRequestsAttempted") - bufferedRequestsSuccessful = stats.NewInt("BufferedRequestsSuccessful") - // Use this lock when adding to the number of currently buffered requests. - bufferMu sync.Mutex - bufferedRequests = stats.NewInt("BufferedRequests") -) - -// timeSleep can be mocked out in unit tests -var timeSleep = time.Sleep - -// errBufferFull is the error returned a buffer request is rejected because the buffer is full. -var errBufferFull = vterrors.New(vtrpcpb.Code_UNAVAILABLE, "master request buffer full, rejecting request") - -// FakeBuffer will pretend to buffer master requests in VTGate. -// Requests *will NOT actually be buffered*, they will just be delayed. -// This can be useful to understand what the impact of master request buffering will be -// on upstream callers. Once the impact is measured, it can be used to tweak parameter values -// for the best behavior. -// FakeBuffer should be called before a potential VtTablet Begin, otherwise it will increase transaction times. -func FakeBuffer(target *querypb.Target, inTransaction bool, attemptNumber int) error { - if !*enableFakeMasterBuffer { - return nil - } - // Don't buffer non-master traffic, requests that are inside transactions, or retries. - if target.TabletType != topodatapb.TabletType_MASTER || inTransaction || attemptNumber != 0 { - return nil - } - if target.Keyspace != *bufferKeyspace || target.Shard != *bufferShard { - return nil - } - bufferedRequestsAttempted.Add(1) - - bufferMu.Lock() - if int(bufferedRequests.Get()) >= *maxBufferSize { - bufferMu.Unlock() - return errBufferFull - } - bufferedRequests.Add(1) - bufferMu.Unlock() - - defer bufferedRequestsSuccessful.Add(1) - timeSleep(*fakeBufferDelay) - // Don't need to lock for this, as there's no race when decrementing the count - bufferedRequests.Add(-1) - return nil -} diff --git a/go/vt/vtgate/masterbuffer/masterbuffer_test.go b/go/vt/vtgate/masterbuffer/masterbuffer_test.go deleted file mode 100644 index 5f5b8dd834a..00000000000 --- a/go/vt/vtgate/masterbuffer/masterbuffer_test.go +++ /dev/null @@ -1,272 +0,0 @@ -/* -Copyright 2017 Google Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package masterbuffer - -import ( - "sync" - "testing" - "time" - - querypb "vitess.io/vitess/go/vt/proto/query" - topodatapb "vitess.io/vitess/go/vt/proto/topodata" -) - -// fakeSleepController is used to control a fake sleepFunc -type fakeSleepController struct { - called bool - block bool - // block until the done channel if closed, if configured to do so. - done chan struct{} - // will close this channel when blocked - blocked chan struct{} -} - -type sleepFunc func(d time.Duration) - -// createFakeSleep creates a function that can be called to fake sleeping. -// The created fake is managed by the passed in fakeSleepController. -func createFakeSleep(c *fakeSleepController) sleepFunc { - return func(d time.Duration) { - c.called = true - if !c.block { - return - } - close(c.blocked) - select { - case <-c.done: - return - } - } -} - -func TestFakeBuffer(t *testing.T) { - unbufferedKeyspace := "ukeyspace" - unbufferedShard := "80-" - bufferedKeyspace := "bkeyspace" - bufferedShard := "-80" - - *bufferKeyspace = bufferedKeyspace - *bufferShard = bufferedShard - - for _, test := range []struct { - desc string - enableFakeBuffer bool - keyspace string - shard string - tabletType topodatapb.TabletType - inTransaction bool - attemptNumber int - bufferedRequests int - // was this request buffered? - wantCalled bool - // expected value of BufferedRequestsAttempts - wantAttempted int - wantErr error - }{ - { - desc: "enableFakeBuffer=False", - enableFakeBuffer: false, - }, - { - desc: "tabletType=REPLICA", - enableFakeBuffer: true, - tabletType: topodatapb.TabletType_REPLICA, - }, - { - desc: "inTransaction=True", - enableFakeBuffer: true, - inTransaction: true, - }, - { - desc: "attemptNumber != 0", - enableFakeBuffer: true, - attemptNumber: 1, - }, - { - desc: "unbuffered keyspace", - enableFakeBuffer: true, - keyspace: unbufferedKeyspace, - shard: bufferedShard, - }, - { - desc: "unbuffered shard", - enableFakeBuffer: true, - keyspace: bufferedKeyspace, - shard: unbufferedShard, - }, - { - desc: "buffer full", - enableFakeBuffer: true, - keyspace: bufferedKeyspace, - shard: bufferedShard, - bufferedRequests: *maxBufferSize, - // When the buffer is full, bufferedRequestsAttempted should still be incremented - wantAttempted: 1, - wantErr: errBufferFull, - }, - { - desc: "buffered successful", - enableFakeBuffer: true, - keyspace: bufferedKeyspace, - shard: bufferedShard, - wantCalled: true, - wantAttempted: 1, - }, - } { - controller := &fakeSleepController{} - timeSleep = createFakeSleep(controller) - // reset counters - bufferedRequestsAttempted.Set(0) - bufferedRequestsSuccessful.Set(0) - bufferedRequests.Set(int64(test.bufferedRequests)) - - *enableFakeMasterBuffer = test.enableFakeBuffer - - var tabletType topodatapb.TabletType - // Default to MASTER tablet type. - if test.tabletType == topodatapb.TabletType_UNKNOWN { - tabletType = topodatapb.TabletType_MASTER - } - - target := &querypb.Target{ - Keyspace: test.keyspace, - Shard: test.shard, - TabletType: tabletType, - } - gotErr := FakeBuffer(target, test.inTransaction, test.attemptNumber) - - if gotErr != test.wantErr { - t.Errorf("With %v, FakeBuffer() => %v; want: %v", test.desc, gotErr, test.wantErr) - } - - if controller.called != test.wantCalled { - t.Errorf("With %v, FakeBuffer() => timeSleep.called: %v; want: %v", - test.desc, controller.called, test.wantCalled) - } - - if bufferedRequestsAttempted.Get() != int64(test.wantAttempted) { - t.Errorf("With %v, FakeBuffer() => bufferedRequestsAttempted got: %v; want: %v", - test.desc, bufferedRequestsAttempted.Get(), test.wantAttempted) - } - - if (!test.wantCalled && (bufferedRequestsSuccessful.Get() == 1)) || - (test.wantCalled && (bufferedRequestsSuccessful.Get() != 1)) { - t.Errorf("With %v, FakeBuffer() => bufferedRequestsSuccessful got: %v; want: 1", - test.desc, bufferedRequestsSuccessful.Get()) - } - } -} - -// min for ints -func min(x, y int) int { - if x < y { - return x - } - return y -} - -func TestParallelFakeBuffer(t *testing.T) { - bufferedKeyspace := "bkeyspace" - bufferedShard := "-80" - - *bufferKeyspace = bufferedKeyspace - *bufferShard = bufferedShard - *enableFakeMasterBuffer = true - - // reset counters - bufferedRequestsAttempted.Set(0) - bufferedRequestsSuccessful.Set(0) - - var controllers []*fakeSleepController - var wg sync.WaitGroup - - for i := 1; i <= *maxBufferSize+2; i++ { - controller := &fakeSleepController{ - block: true, - done: make(chan struct{}), - blocked: make(chan struct{}), - } - timeSleep = createFakeSleep(controller) - // Only the first maxBufferSize calls to FakeBuffer should actually call fakeSleep - wantFakeSleepCalled := (i <= *maxBufferSize) - - wg.Add(1) - finished := make(chan struct{}) - var gotErr error - go func() { - defer wg.Done() - target := &querypb.Target{ - Keyspace: *bufferKeyspace, - Shard: *bufferShard, - TabletType: topodatapb.TabletType_MASTER, - } - gotErr = FakeBuffer(target, false, 0) - close(finished) - }() - - // wait until either the gorouotine is blocked (because it's buffering) or until - // it's finished (because it shouldn't be buffered). - select { - case <-controller.blocked: - case <-finished: - } - - if controller.called { - controllers = append(controllers, controller) - } else { - // if we didn't call fakeSleep, the buffer is full and should return an error saying so. - if gotErr != errBufferFull { - t.Errorf("On iteration %v, FakeBuffer() => %v; want: %v", i, gotErr, errBufferFull) - } - } - - if controller.called != wantFakeSleepCalled { - t.Errorf("On iteration %v, FakeBuffer() => timeSleep.called: %v; want: %v", - i, controller.called, wantFakeSleepCalled) - } - - if int(bufferedRequestsAttempted.Get()) != i { - t.Errorf("On iteration %v, FakeBuffer() => bufferedRequestsAttempted got: %v; want: %v", - i, bufferedRequestsAttempted.Get(), i) - } - - if int(bufferedRequests.Get()) != min(i, *maxBufferSize) { - t.Errorf("On iteration %v, FakeBuffer() => bufferedRequests got: %v; want: %v", - i, bufferedRequests.Get(), min(i, *maxBufferSize)) - } - - if int(bufferedRequestsSuccessful.Get()) != 0 { - t.Errorf("On iteration %v, FakeBuffer() => bufferedRequestsSuccessful got: %v; want: 0", - i, bufferedRequestsSuccessful.Get()) - } - } - - // signal to all the buffered calls that they can stop buffering, and wait for them. - for _, c := range controllers { - close(c.done) - } - wg.Wait() - - if int(bufferedRequestsSuccessful.Get()) != *maxBufferSize { - t.Errorf("After all FakeBuffer() calls are done, bufferedRequestsSuccessful got: %v; want: %v", - bufferedRequestsSuccessful.Get(), *maxBufferSize) - } - if int(bufferedRequests.Get()) != 0 { - t.Errorf("After all FakeBuffer() calls are done, bufferedRequests got: %v; want: %v", - bufferedRequests.Get(), 0) - } -} diff --git a/test/config.json b/test/config.json index 3deff1a2b84..2222f062857 100644 --- a/test/config.json +++ b/test/config.json @@ -181,15 +181,6 @@ "RetryMax": 0, "Tags": [] }, - "master_buffering": { - "File": "master_buffering_test.py", - "Args": [], - "Command": [], - "Manual": false, - "Shard": 4, - "RetryMax": 0, - "Tags": [] - }, "mysql_server": { "File": "mysql_server_test.py", "Args": [], diff --git a/test/master_buffering_test.py b/test/master_buffering_test.py deleted file mode 100755 index f1e04373af8..00000000000 --- a/test/master_buffering_test.py +++ /dev/null @@ -1,296 +0,0 @@ -#!/usr/bin/env python -# coding: utf-8 - -# Copyright 2017 Google Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -"""Tests that VTGate buffers master traffic when expected.""" - -import logging -import struct -import unittest - -from vtdb import keyrange -from vtdb import vtgate_client - -import environment -import tablet -import utils - - -shard_0_master = tablet.Tablet() -shard_0_replica1 = tablet.Tablet() - -KEYSPACE_NAME = 'test_keyspace' -SHARD_NAMES = ['0'] -SHARD_KID_MAP = { - '0': [ - 527875958493693904, 626750931627689502, - 345387386794260318, 332484755310826578, - 1842642426274125671, 1326307661227634652, - 1761124146422844620, 1661669973250483744, - 3361397649937244239, 2444880764308344533, - 9767889778372766922, 9742070682920810358, - 10296850775085416642, 9537430901666854108, - 10440455099304929791, 11454183276974683945, - 11185910247776122031, 10460396697869122981, - 13379616110062597001, 12826553979133932576], -} - -CREATE_VT_INSERT_TEST = '''create table vt_insert_test ( - id bigint auto_increment, - msg varchar(64), - keyspace_id bigint(20) unsigned NOT NULL, - primary key (id) -) Engine=InnoDB''' - -create_tables = [ - CREATE_VT_INSERT_TEST, -] -pack_kid = struct.Struct('!Q').pack - - -def setUpModule(): - logging.debug('in setUpModule') - try: - environment.topo_server().setup() - - # start mysql instance external to the test - setup_procs = [shard_0_master.init_mysql(), - shard_0_replica1.init_mysql(), - ] - utils.wait_procs(setup_procs) - setup_tablets() - setup_vtgate() - # After VTGate comes up, populate it with some initial data - initial_writes(0, keyrange.KeyRange('')) - except Exception, e: - logging.exception('error during set up: %s', e) - tearDownModule() - raise - - -def tearDownModule(): - logging.debug('in tearDownModule') - utils.required_teardown() - if utils.options.skip_teardown: - return - logging.debug('Tearing down the servers and setup') - tablet.kill_tablets([shard_0_master, - shard_0_replica1]) - teardown_procs = [shard_0_master.teardown_mysql(), - shard_0_replica1.teardown_mysql(), - ] - utils.wait_procs(teardown_procs, raise_on_error=False) - - environment.topo_server().teardown() - - utils.kill_sub_processes() - utils.remove_tmp_files() - - shard_0_master.remove_tree() - shard_0_replica1.remove_tree() - - -def setup_tablets(): - # Start up a master mysql and vttablet - logging.debug('Setting up tablets') - utils.run_vtctl(['CreateKeyspace', KEYSPACE_NAME]) - utils.run_vtctl(['SetKeyspaceShardingInfo', '-force', KEYSPACE_NAME, - 'keyspace_id', 'uint64']) - shard_0_master.init_tablet( - 'replica', - keyspace=KEYSPACE_NAME, - shard='0', - tablet_index=0) - shard_0_replica1.init_tablet( - 'replica', - keyspace=KEYSPACE_NAME, - shard='0', - tablet_index=1) - - for t in [shard_0_master, shard_0_replica1]: - t.create_db('vt_test_keyspace') - for create_table in create_tables: - t.mquery(shard_0_master.dbname, create_table) - t.start_vttablet(wait_for_state=None) - - for t in [shard_0_master, shard_0_replica1]: - t.wait_for_vttablet_state('NOT_SERVING') - - utils.run_vtctl(['InitShardMaster', '-force', KEYSPACE_NAME+'/0', - shard_0_master.tablet_alias], auto_log=True) - - for t in [shard_0_replica1]: - utils.wait_for_tablet_type(t.tablet_alias, 'replica') - - for t in [shard_0_master, shard_0_replica1]: - t.wait_for_vttablet_state('SERVING') - - utils.check_srv_keyspace( - 'test_nj', KEYSPACE_NAME, - 'Partitions(master): -\n' - 'Partitions(rdonly): -\n' - 'Partitions(replica): -\n') - - -def setup_vtgate(port=None, extra_args=None): - utils.VtGate(port=port).start( - extra_args=extra_args, - tablets=[shard_0_master, shard_0_replica1]) - utils.vtgate.wait_for_endpoints( - '%s.%s.master' % (KEYSPACE_NAME, SHARD_NAMES[0]), - 1) - utils.vtgate.wait_for_endpoints( - '%s.%s.replica' % (KEYSPACE_NAME, SHARD_NAMES[0]), - 1) - - -def initial_writes(shard_index, writes_keyrange): - vtgate_conn = get_connection() - _delete_all('vt_insert_test') - count = 10 - kid_list = SHARD_KID_MAP[SHARD_NAMES[shard_index]] - for x in xrange(count): - keyspace_id = kid_list[count%len(kid_list)] - cursor = vtgate_conn.cursor( - tablet_type='master', keyspace=KEYSPACE_NAME, - keyspace_ids=[pack_kid(keyspace_id)], - writable=True) - cursor.begin() - cursor.execute( - 'insert into vt_insert_test (msg, keyspace_id) ' - 'values (:msg, :keyspace_id)', - {'msg': 'test %s' % x, 'keyspace_id': keyspace_id}) - cursor.commit() - cursor = vtgate_conn.cursor( - tablet_type='master', keyspace=KEYSPACE_NAME, - keyranges=[writes_keyrange]) - rowcount = cursor.execute('select * from vt_insert_test', {}) - assert rowcount == count, 'master fetch works' - - -def get_connection(timeout=10.0): - protocol, endpoint = utils.vtgate.rpc_endpoint(python=True) - try: - return vtgate_client.connect(protocol, endpoint, timeout) - except Exception: - logging.exception('Connection to vtgate (timeout=%s) failed.', timeout) - raise - - -def _delete_all(table_name): - vtgate_conn = get_connection() - # This write is to set up the test with fresh insert - # and hence performing it directly on the connection. - vtgate_conn.begin() - vtgate_conn._execute( - 'delete from %s' % table_name, {}, - tablet_type='master', keyspace_name=KEYSPACE_NAME, - keyranges=[keyrange.KeyRange('')]) - vtgate_conn.commit() - - -def restart_vtgate(extra_args=None): - if extra_args is None: - extra_args = [] - port = utils.vtgate.port - utils.vtgate.kill() - setup_vtgate(port=port, extra_args=extra_args) - - -class BaseTestCase(unittest.TestCase): - - def setUp(self): - super(BaseTestCase, self).setUp() - logging.info('Start: %s.', '.'.join(self.id().split('.')[-2:])) - - -# TODO(liguo): once we have the final master buffering code in place, these -# tests should verify that we only buffer when the master is unavailable. -class TestMasterBuffering(BaseTestCase): - - shard_index = 0 - keyrange = keyrange.KeyRange('') - - def setUp(self): - super(TestMasterBuffering, self).setUp() - restart_vtgate(extra_args=[ - '-enable_fake_master_buffer', - '-buffer_keyspace', KEYSPACE_NAME, - '-buffer_shard', SHARD_NAMES[self.shard_index], - '-fake_buffer_delay', '1ms', - ]) - - def get_sucessful_buffered_requests(self): - return utils.vtgate.get_vars()['BufferedRequestsSuccessful'] - - def test_tx_is_buffered(self): - """Tests that for a transaction, we buffer exactly one request.""" - vtgate_conn = get_connection() - kid_list = SHARD_KID_MAP[SHARD_NAMES[self.shard_index]] - keyspace_id = kid_list[0] - - initial_buffered = self.get_sucessful_buffered_requests() - - cursor = vtgate_conn.cursor( - tablet_type='master', keyspace=KEYSPACE_NAME, - keyspace_ids=[pack_kid(keyspace_id)], - writable=True) - cursor.begin() - cursor.execute( - 'insert into vt_insert_test (msg, keyspace_id) ' - 'values (:msg, :keyspace_id)', - {'msg': 'test %s' % 1000, 'keyspace_id': keyspace_id}) - cursor.execute('select * from vt_insert_test', {}) - cursor.rollback() - - num_buffered = self.get_sucessful_buffered_requests() - initial_buffered - # No matter how many requests there were in the transaction, we should only - # buffer one request (the Begin to the vttablet). - self.assertEqual(num_buffered, 1) - - def test_master_read_is_buffered(self): - """Tests that we buffer master reads.""" - vtgate_conn = get_connection() - kid_list = SHARD_KID_MAP[SHARD_NAMES[self.shard_index]] - keyspace_id = kid_list[0] - - initial_buffered = self.get_sucessful_buffered_requests() - - cursor = vtgate_conn.cursor( - tablet_type='master', keyspace=KEYSPACE_NAME, - keyspace_ids=[pack_kid(keyspace_id)]) - cursor.execute('select * from vt_insert_test', {}) - - num_buffered = self.get_sucessful_buffered_requests() - initial_buffered - self.assertEqual(num_buffered, 1) - - def test_replica_read_is_not_buffered(self): - """Tests that we do not buffer replica reads.""" - vtgate_conn = get_connection() - - initial_buffered = self.get_sucessful_buffered_requests() - vtgate_conn._execute( - 'select * from vt_insert_test', {}, - tablet_type='replica', keyspace_name=KEYSPACE_NAME, - keyranges=[self.keyrange] - ) - num_buffered = self.get_sucessful_buffered_requests() - initial_buffered - self.assertEqual(num_buffered, 0) - - -if __name__ == '__main__': - utils.main()