From a3a5831861ff0d65a3e083cb4747cbf101907f7b Mon Sep 17 00:00:00 2001 From: Sam Curren Date: Thu, 11 Dec 2014 18:30:22 -0800 Subject: [PATCH 1/2] added failing unit tests for kinesis put_records method. --- tests/integration/test_kinesis.py | 62 +++++++++++++++++++++++++++++++ 1 file changed, 62 insertions(+) diff --git a/tests/integration/test_kinesis.py b/tests/integration/test_kinesis.py index 8b1cb72109..8fdbb85337 100644 --- a/tests/integration/test_kinesis.py +++ b/tests/integration/test_kinesis.py @@ -52,6 +52,68 @@ def test_can_put_stream_blob(self): self.assertTrue(len(records['Records']) > 0) self.assertEqual(records['Records'][0]['Data'], b'foobar') + def test_can_put_records_single_blob(self): + self.client.create_stream(StreamName=self.stream_name, + ShardCount=1) + waiter = self.client.get_waiter('stream_exists') + waiter.wait(StreamName=self.stream_name) + self.addCleanup(self.client.delete_stream, + StreamName=self.stream_name) + + self.client.put_records( + StreamName=self.stream_name, + Records=[{ + 'Data': 'foobar', + 'PartitionKey': 'foo' + }] + ) + # Give it a few seconds for the record to get into the stream. + time.sleep(10) + + stream = self.client.describe_stream(StreamName=self.stream_name) + shard = stream['StreamDescription']['Shards'][0] + shard_iterator = self.client.get_shard_iterator( + StreamName=self.stream_name, ShardId=shard['ShardId'], + ShardIteratorType='TRIM_HORIZON') + + records = self.client.get_records( + ShardIterator=shard_iterator['ShardIterator']) + self.assertTrue(len(records['Records']) > 0) + self.assertEqual(records['Records'][0]['Data'], b'foobar') + + def test_can_put_records_multiple_blob(self): + self.client.create_stream(StreamName=self.stream_name, + ShardCount=1) + waiter = self.client.get_waiter('stream_exists') + waiter.wait(StreamName=self.stream_name) + self.addCleanup(self.client.delete_stream, + StreamName=self.stream_name) + + self.client.put_records( + StreamName=self.stream_name, + Records=[{ + 'Data': 'foobar', + 'PartitionKey': 'foo' + },{ + 'Data': 'barfoo', + 'PartitionKey': 'foo' + }] + ) + # Give it a few seconds for the record to get into the stream. + time.sleep(10) + + stream = self.client.describe_stream(StreamName=self.stream_name) + shard = stream['StreamDescription']['Shards'][0] + shard_iterator = self.client.get_shard_iterator( + StreamName=self.stream_name, ShardId=shard['ShardId'], + ShardIteratorType='TRIM_HORIZON') + + records = self.client.get_records( + ShardIterator=shard_iterator['ShardIterator']) + self.assertTrue(len(records['Records']) == 2) + #verify that both made it through + record_data = [r['Data'] for r in records['Records']] + self.assertItemsEqual(['foobar', 'barfoo'], record_data) if __name__ == '__main__': unittest.main() From 3ebe25f645f513218fcac1a398102435154bdf2d Mon Sep 17 00:00:00 2001 From: Sam Curren Date: Fri, 16 Jan 2015 09:05:05 -0800 Subject: [PATCH 2/2] added serialize method for the list type, properly serializing member types. Fixes Kinesis PutRecords serialization error. --- botocore/serialize.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/botocore/serialize.py b/botocore/serialize.py index 23e904c41d..3d988b5b0f 100644 --- a/botocore/serialize.py +++ b/botocore/serialize.py @@ -331,6 +331,15 @@ def _serialize_type_map(self, serialized, value, shape, key): for sub_key, sub_value in value.items(): self._serialize(map_obj, sub_value, shape.value, sub_key) + def _serialize_type_list(self, serialized, value, shape, key): + list_obj = [] + serialized[key] = list_obj + for list_item in value: + shell = {} + self._serialize(shell, list_item, shape.member, "s") + list_obj.append(shell["s"]) + pass + def _default_serialize(self, serialized, value, shape, key): serialized[key] = value