diff --git a/go/cmd/vtcombo/tablet_map.go b/go/cmd/vtcombo/tablet_map.go index e611373358d..3bc848ed293 100644 --- a/go/cmd/vtcombo/tablet_map.go +++ b/go/cmd/vtcombo/tablet_map.go @@ -267,7 +267,9 @@ func (itc *internalTabletConn) StreamExecute(ctx context.Context, query string, BindVariables: bindVars, TransactionId: transactionID, }, func(reply *sqltypes.Result) error { - result <- reply + // We need to deep-copy the reply before returning, + // because the underlying buffers are reused. + result <- sqltypes.Proto3ToResult(sqltypes.ResultToProto3(reply)) return nil }) diff --git a/test/vttest_sample_test.py b/test/vttest_sample_test.py index f13008eee6d..4b60956219d 100755 --- a/test/vttest_sample_test.py +++ b/test/vttest_sample_test.py @@ -4,7 +4,9 @@ # Use of this source code is governed by a BSD-style license that can # be found in the LICENSE file. -"""This sample test demonstrates how to setup and teardown a test +"""Sample test for vttest framework. + +This sample test demonstrates how to setup and teardown a test database with the associated Vitess processes. It is meant to be used as a template for unit tests, by developers writing applications on top of Vitess. The recommended workflow is to have the schema for the @@ -29,6 +31,7 @@ import unittest from vtdb import vtgate_client +from vtdb import vtgate_cursor from vtdb import dbexceptions import utils @@ -57,7 +60,7 @@ def test_standalone(self): 'test_keyspace/80-:test_keyspace_1', '--schema_dir', os.path.join(environment.vttop, 'test', 'vttest_schema'), - ] + ] sp = subprocess.Popen(args, stdin=subprocess.PIPE, stdout=subprocess.PIPE) config = json.loads(sp.stdout.readline()) @@ -93,8 +96,8 @@ def test_standalone(self): 'test_keyspace', 'master', keyspace_ids=[pack_kid(keyspace_id)], writable=True) cursor.begin() - insert = 'insert into test_table (id, msg, keyspace_id) values (%(id)s, '+\ - '%(msg)s, %(keyspace_id)s)' + insert = ('insert into test_table (id, msg, keyspace_id) values (%(id)s, ' + '%(msg)s, %(keyspace_id)s)') bind_variables = { 'id': row_id, 'msg': 'test %s' % row_id, @@ -117,6 +120,26 @@ def test_standalone(self): cursor.execute(insert, bind_variables) cursor.rollback() + # Insert a bunch of rows with long msg values. + bind_variables['msg'] = 'x' * 64 + id_start = 1000 + rowcount = 500 + cursor.begin() + for i in xrange(id_start, id_start+rowcount): + bind_variables['id'] = i + cursor.execute(insert, bind_variables) + cursor.commit() + + # Try to fetch a large number of rows + # (more than one streaming result packet). + stream_cursor = conn.cursor( + 'test_keyspace', 'master', keyspace_ids=[pack_kid(keyspace_id)], + cursorclass=vtgate_cursor.StreamVTGateCursor) + stream_cursor.execute('select * from test_table where id >= %(id_start)s', + {'id_start': id_start}) + self.assertEqual(rowcount, len(list(stream_cursor.fetchall()))) + stream_cursor.close() + # Clean up. cursor.close() conn.close()