diff --git a/botocore/serialize.py b/botocore/serialize.py index 23e904c41d..3d988b5b0f 100644 --- a/botocore/serialize.py +++ b/botocore/serialize.py @@ -331,6 +331,15 @@ def _serialize_type_map(self, serialized, value, shape, key): for sub_key, sub_value in value.items(): self._serialize(map_obj, sub_value, shape.value, sub_key) + def _serialize_type_list(self, serialized, value, shape, key): + list_obj = [] + serialized[key] = list_obj + for list_item in value: + shell = {} + self._serialize(shell, list_item, shape.member, "s") + list_obj.append(shell["s"]) + pass + def _default_serialize(self, serialized, value, shape, key): serialized[key] = value diff --git a/tests/integration/test_kinesis.py b/tests/integration/test_kinesis.py index 8b1cb72109..8fdbb85337 100644 --- a/tests/integration/test_kinesis.py +++ b/tests/integration/test_kinesis.py @@ -52,6 +52,68 @@ def test_can_put_stream_blob(self): self.assertTrue(len(records['Records']) > 0) self.assertEqual(records['Records'][0]['Data'], b'foobar') + def test_can_put_records_single_blob(self): + self.client.create_stream(StreamName=self.stream_name, + ShardCount=1) + waiter = self.client.get_waiter('stream_exists') + waiter.wait(StreamName=self.stream_name) + self.addCleanup(self.client.delete_stream, + StreamName=self.stream_name) + + self.client.put_records( + StreamName=self.stream_name, + Records=[{ + 'Data': 'foobar', + 'PartitionKey': 'foo' + }] + ) + # Give it a few seconds for the record to get into the stream. + time.sleep(10) + + stream = self.client.describe_stream(StreamName=self.stream_name) + shard = stream['StreamDescription']['Shards'][0] + shard_iterator = self.client.get_shard_iterator( + StreamName=self.stream_name, ShardId=shard['ShardId'], + ShardIteratorType='TRIM_HORIZON') + + records = self.client.get_records( + ShardIterator=shard_iterator['ShardIterator']) + self.assertTrue(len(records['Records']) > 0) + self.assertEqual(records['Records'][0]['Data'], b'foobar') + + def test_can_put_records_multiple_blob(self): + self.client.create_stream(StreamName=self.stream_name, + ShardCount=1) + waiter = self.client.get_waiter('stream_exists') + waiter.wait(StreamName=self.stream_name) + self.addCleanup(self.client.delete_stream, + StreamName=self.stream_name) + + self.client.put_records( + StreamName=self.stream_name, + Records=[{ + 'Data': 'foobar', + 'PartitionKey': 'foo' + },{ + 'Data': 'barfoo', + 'PartitionKey': 'foo' + }] + ) + # Give it a few seconds for the record to get into the stream. + time.sleep(10) + + stream = self.client.describe_stream(StreamName=self.stream_name) + shard = stream['StreamDescription']['Shards'][0] + shard_iterator = self.client.get_shard_iterator( + StreamName=self.stream_name, ShardId=shard['ShardId'], + ShardIteratorType='TRIM_HORIZON') + + records = self.client.get_records( + ShardIterator=shard_iterator['ShardIterator']) + self.assertTrue(len(records['Records']) == 2) + #verify that both made it through + record_data = [r['Data'] for r in records['Records']] + self.assertItemsEqual(['foobar', 'barfoo'], record_data) if __name__ == '__main__': unittest.main()