diff --git a/mixpanel/__init__.py b/mixpanel/__init__.py index 361f20c..437f68b 100644 --- a/mixpanel/__init__.py +++ b/mixpanel/__init__.py @@ -21,6 +21,8 @@ import time import uuid +import requests +from requests.auth import HTTPBasicAuth import six from six.moves import range import urllib3 @@ -172,7 +174,6 @@ def alias(self, alias_id, original, meta=None): Calling this method *always* results in a synchronous HTTP request to Mixpanel servers, regardless of any custom consumer. """ - sync_consumer = Consumer() event = { 'event': '$create_alias', 'properties': { @@ -183,6 +184,8 @@ def alias(self, alias_id, original, meta=None): } if meta: event.update(meta) + + sync_consumer = Consumer() sync_consumer.send('events', json_dumps(event, cls=self._serializer)) def merge(self, api_key, distinct_id1, distinct_id2, meta=None, api_secret=None): @@ -540,7 +543,7 @@ class Consumer(object): def __init__(self, events_url=None, people_url=None, import_url=None, request_timeout=None, groups_url=None, api_host="api.mixpanel.com", - retry_limit=4, retry_backoff_factor=0.25, verify_cert=False): + retry_limit=4, retry_backoff_factor=0.25, verify_cert=True): # TODO: With next major version, make the above args kwarg-only, and reorder them. self._endpoints = { 'events': events_url or 'https://{}/track'.format(api_host), @@ -549,11 +552,8 @@ def __init__(self, events_url=None, people_url=None, import_url=None, 'imports': import_url or 'https://{}/import'.format(api_host), } - retry_args = { - "total": retry_limit, - "backoff_factor": retry_backoff_factor, - "status_forcelist": set(range(500, 600)), - } + self._verify_cert = verify_cert + self._request_timeout = request_timeout # Work around renamed argument in urllib3. if hasattr(urllib3.util.Retry.DEFAULT, "allowed_methods"): @@ -561,19 +561,19 @@ def __init__(self, events_url=None, people_url=None, import_url=None, else: methods_arg = "method_whitelist" - retry_args[methods_arg] = {"POST"} - retry_config = urllib3.Retry(**retry_args) - - if not verify_cert: - urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) - - cert_reqs = 'CERT_REQUIRED' if verify_cert else 'CERT_NONE' - self._http = urllib3.PoolManager( - retries=retry_config, - timeout=urllib3.Timeout(request_timeout), - cert_reqs=str(cert_reqs), + retry_args = { + "total": retry_limit, + "backoff_factor": retry_backoff_factor, + "status_forcelist": set(range(500, 600)), + methods_arg: {"POST"}, + } + adapter = requests.adapters.HTTPAdapter( + max_retries=urllib3.Retry(**retry_args), ) + self._session = requests.Session() + self._session.mount('http', adapter) + def send(self, endpoint, json_message, api_key=None, api_secret=None): """Immediately record an event or a profile update. @@ -594,40 +594,38 @@ def send(self, endpoint, json_message, api_key=None, api_secret=None): self._write_request(self._endpoints[endpoint], json_message, api_key, api_secret) def _write_request(self, request_url, json_message, api_key=None, api_secret=None): - data = { - 'data': json_message, - 'verbose': 1, - 'ip': 0, - } - if isinstance(api_key, tuple): # For compatibility with subclassers, allow the auth details to be # packed into the existing api_key param. api_key, api_secret = api_key + params = { + 'data': json_message, + 'verbose': 1, + 'ip': 0, + } if api_key: - data.update({'api_key': api_key}) - - headers = None + params['api_key'] = api_key + basic_auth = None if api_secret is not None: - headers = urllib3.util.make_headers(basic_auth="{}:".format(api_secret)) + basic_auth = HTTPBasicAuth(api_secret, '') try: - response = self._http.request( - 'POST', + response = self._session.post( request_url, - fields=data, - headers=headers, - encode_multipart=False, # URL-encode payload in POST body. + data=params, + auth=basic_auth, + timeout=self._request_timeout, + verify=self._verify_cert, ) except Exception as e: six.raise_from(MixpanelException(e), e) try: - response_dict = json.loads(response.data.decode('utf-8')) + response_dict = response.json() except ValueError: - raise MixpanelException('Cannot interpret Mixpanel server response: {0}'.format(response.data)) + raise MixpanelException('Cannot interpret Mixpanel server response: {0}'.format(response.text)) if response_dict['status'] != 1: raise MixpanelException('Mixpanel error: {0}'.format(response_dict['error'])) @@ -669,7 +667,7 @@ class BufferedConsumer(object): """ def __init__(self, max_size=50, events_url=None, people_url=None, import_url=None, request_timeout=None, groups_url=None, api_host="api.mixpanel.com", - retry_limit=4, retry_backoff_factor=0.25, verify_cert=False): + retry_limit=4, retry_backoff_factor=0.25, verify_cert=True): self._consumer = Consumer(events_url, people_url, import_url, request_timeout, groups_url, api_host, retry_limit, retry_backoff_factor, verify_cert) self._buffers = { diff --git a/requirements-testing.txt b/requirements-testing.txt index 8f61b35..38ed87a 100644 --- a/requirements-testing.txt +++ b/requirements-testing.txt @@ -1,3 +1,3 @@ -mock==1.3.0 -pytest==4.6.11 -typing; python_version >='3.4' and python_version <'3.5' # To work around CI fail. +pytest~=4.6 +responses~=0.13.3 +typing; python_version>='3.4' and python_version<'3.5' # To work around CI fail. diff --git a/setup.py b/setup.py index c89af8a..dd49912 100644 --- a/setup.py +++ b/setup.py @@ -25,8 +25,9 @@ def find_version(*paths): author_email='dev@mixpanel.com', license='Apache', install_requires=[ - 'six >= 1.9.0', - 'urllib3 >= 1.21.1', + 'six>=1.9.0', + 'requests>=2.4.2', + 'urllib3', ], classifiers=[ diff --git a/test_mixpanel.py b/test_mixpanel.py index 4e4c8bf..42ed47a 100644 --- a/test_mixpanel.py +++ b/test_mixpanel.py @@ -1,16 +1,14 @@ from __future__ import absolute_import, unicode_literals -import base64 -import contextlib import datetime import decimal import json import time -from mock import Mock, patch import pytest +import responses import six -from six.moves import range -import urllib3 +from six.moves import range, urllib + import mixpanel @@ -288,18 +286,23 @@ def test_people_set_created_date_datetime(self): def test_alias(self): # More complicated since alias() forces a synchronous call. - mock_response = Mock() - mock_response.data = six.b('{"status": 1, "error": null}') - with patch('mixpanel.urllib3.PoolManager.request', return_value=mock_response) as req: + + with responses.RequestsMock() as rsps: + rsps.add( + responses.POST, + 'https://api.mixpanel.com/track', + json={"status": 1, "error": None}, + status=200, + ) + self.mp.alias('ALIAS', 'ORIGINAL ID') - assert self.consumer.log == [] - assert req.call_count == 1 - ((method, url), kwargs) = req.call_args - assert method == 'POST' - assert url == 'https://api.mixpanel.com/track' - expected_data = {"event":"$create_alias","properties":{"alias":"ALIAS","token":"12345","distinct_id":"ORIGINAL ID"}} - assert json.loads(kwargs["fields"]["data"]) == expected_data + assert self.consumer.log == [] + call = rsps.calls[0] + assert call.request.method == "POST" + assert call.request.url == "https://api.mixpanel.com/track" + posted_data = dict(urllib.parse.parse_qsl(six.ensure_str(call.request.body))) + assert json.loads(posted_data["data"]) == {"event":"$create_alias","properties":{"alias":"ALIAS","token":"12345","distinct_id":"ORIGINAL ID"}} def test_merge(self): self.mp.merge('my_good_api_key', 'd1', 'd2') @@ -449,36 +452,113 @@ class TestConsumer: def setup_class(cls): cls.consumer = mixpanel.Consumer(request_timeout=30) - @contextlib.contextmanager - def _assertSends(self, expect_url, expect_data, consumer=None): - if consumer is None: - consumer = self.consumer - - mock_response = Mock() - mock_response.data = six.b('{"status": 1, "error": null}') - with patch('mixpanel.urllib3.PoolManager.request', return_value=mock_response) as req: - yield - - assert req.call_count == 1 - (call_args, kwargs) = req.call_args - (method, url) = call_args - assert method == 'POST' - assert url == expect_url - assert kwargs["fields"] == expect_data - def test_send_events(self): - with self._assertSends('https://api.mixpanel.com/track', {"ip": 0, "verbose": 1, "data": '{"foo":"bar"}'}): + with responses.RequestsMock() as rsps: + rsps.add( + responses.POST, + 'https://api.mixpanel.com/track', + json={"status": 1, "error": None}, + status=200, + match=[responses.urlencoded_params_matcher({"ip": "0", "verbose": "1", "data": '{"foo":"bar"}'})], + ) self.consumer.send('events', '{"foo":"bar"}') def test_send_people(self): - with self._assertSends('https://api.mixpanel.com/engage', {"ip": 0, "verbose": 1, "data": '{"foo":"bar"}'}): + with responses.RequestsMock() as rsps: + rsps.add( + responses.POST, + 'https://api.mixpanel.com/engage', + json={"status": 1, "error": None}, + status=200, + match=[responses.urlencoded_params_matcher({"ip": "0", "verbose": "1", "data": '{"foo":"bar"}'})], + ) self.consumer.send('people', '{"foo":"bar"}') + def test_server_success(self): + with responses.RequestsMock() as rsps: + rsps.add( + responses.POST, + 'https://api.mixpanel.com/track', + json={"status": 1, "error": None}, + status=200, + match=[responses.urlencoded_params_matcher({"ip": "0", "verbose": "1", "data": '{"foo":"bar"}'})], + ) + self.consumer.send('events', '{"foo":"bar"}') + + def test_server_invalid_data(self): + with responses.RequestsMock() as rsps: + error_msg = "bad data" + rsps.add( + responses.POST, + 'https://api.mixpanel.com/track', + json={"status": 0, "error": error_msg}, + status=200, + match=[responses.urlencoded_params_matcher({"ip": "0", "verbose": "1", "data": '{INVALID "foo":"bar"}'})], + ) + + with pytest.raises(mixpanel.MixpanelException) as exc: + self.consumer.send('events', '{INVALID "foo":"bar"}') + assert error_msg in str(exc) + + def test_server_unauthorized(self): + with responses.RequestsMock() as rsps: + rsps.add( + responses.POST, + 'https://api.mixpanel.com/track', + json={"status": 0, "error": "unauthed"}, + status=401, + match=[responses.urlencoded_params_matcher({"ip": "0", "verbose": "1", "data": '{"foo":"bar"}'})], + ) + with pytest.raises(mixpanel.MixpanelException) as exc: + self.consumer.send('events', '{"foo":"bar"}') + assert "unauthed" in str(exc) + + def test_server_forbidden(self): + with responses.RequestsMock() as rsps: + rsps.add( + responses.POST, + 'https://api.mixpanel.com/track', + json={"status": 0, "error": "forbade"}, + status=403, + match=[responses.urlencoded_params_matcher({"ip": "0", "verbose": "1", "data": '{"foo":"bar"}'})], + ) + with pytest.raises(mixpanel.MixpanelException) as exc: + self.consumer.send('events', '{"foo":"bar"}') + assert "forbade" in str(exc) + + def test_server_5xx(self): + with responses.RequestsMock() as rsps: + rsps.add( + responses.POST, + 'https://api.mixpanel.com/track', + body="Internal server error", + status=500, + match=[responses.urlencoded_params_matcher({"ip": "0", "verbose": "1", "data": '{"foo":"bar"}'})], + ) + with pytest.raises(mixpanel.MixpanelException) as exc: + self.consumer.send('events', '{"foo":"bar"}') + def test_consumer_override_api_host(self): - consumer = mixpanel.Consumer(api_host="api-eu.mixpanel.com") - with self._assertSends('https://api-eu.mixpanel.com/track', {"ip": 0, "verbose": 1, "data": '{"foo":"bar"}'}, consumer=consumer): + consumer = mixpanel.Consumer(api_host="api-zoltan.mixpanel.com") + + with responses.RequestsMock() as rsps: + rsps.add( + responses.POST, + 'https://api-zoltan.mixpanel.com/track', + json={"status": 1, "error": None}, + status=200, + match=[responses.urlencoded_params_matcher({"ip": "0", "verbose": "1", "data": '{"foo":"bar"}'})], + ) consumer.send('events', '{"foo":"bar"}') - with self._assertSends('https://api-eu.mixpanel.com/engage', {"ip": 0, "verbose": 1, "data": '{"foo":"bar"}'}, consumer=consumer): + + with responses.RequestsMock() as rsps: + rsps.add( + responses.POST, + 'https://api-zoltan.mixpanel.com/engage', + json={"status": 1, "error": None}, + status=200, + match=[responses.urlencoded_params_matcher({"ip": "0", "verbose": "1", "data": '{"foo":"bar"}'})], + ) consumer.send('people', '{"foo":"bar"}') def test_unknown_endpoint(self): @@ -521,12 +601,18 @@ def test_unknown_endpoint_raises_on_send(self): self.consumer.send('unknown', '1') def test_useful_reraise_in_flush_endpoint(self): - error_mock = Mock() - error_mock.data = six.b('{"status": 0, "error": "arbitrary error"}') - broken_json = '{broken JSON' - consumer = mixpanel.BufferedConsumer(2) - with patch('mixpanel.urllib3.PoolManager.request', return_value=error_mock): + with responses.RequestsMock() as rsps: + rsps.add( + responses.POST, + 'https://api.mixpanel.com/track', + json={"status": 0, "error": "arbitrary error"}, + status=200, + ) + + broken_json = '{broken JSON' + consumer = mixpanel.BufferedConsumer(2) consumer.send('events', broken_json) + with pytest.raises(mixpanel.MixpanelException) as excinfo: consumer.flush() assert excinfo.value.message == '[%s]' % broken_json @@ -554,27 +640,41 @@ def setup_class(cls): cls.mp = mixpanel.Mixpanel(cls.TOKEN) cls.mp._now = lambda: 1000 - @contextlib.contextmanager - def _assertRequested(self, expect_url, expect_data): - res = Mock() - res.data = six.b('{"status": 1, "error": null}') - with patch('mixpanel.urllib3.PoolManager.request', return_value=res) as req: - yield - - assert req.call_count == 1 - ((method, url,), data) = req.call_args - data = data["fields"]["data"] - assert method == 'POST' - assert url == expect_url - payload = json.loads(data) - assert payload == expect_data - def test_track_functional(self): - expect_data = {'event': 'button_press', 'properties': {'size': 'big', 'color': 'blue', 'mp_lib': 'python', 'token': '12345', 'distinct_id': 'player1', '$lib_version': mixpanel.__version__, 'time': 1000, '$insert_id': 'xyz1200'}} - with self._assertRequested('https://api.mixpanel.com/track', expect_data): + with responses.RequestsMock() as rsps: + rsps.add( + responses.POST, + 'https://api.mixpanel.com/track', + json={"status": 1, "error": None}, + status=200, + ) + self.mp.track('player1', 'button_press', {'size': 'big', 'color': 'blue', '$insert_id': 'xyz1200'}) + body = six.ensure_str(rsps.calls[0].request.body) + wrapper = dict(urllib.parse.parse_qsl(body)) + data = json.loads(wrapper["data"]) + del wrapper["data"] + + assert {"ip": "0", "verbose": "1"} == wrapper + expected_data = {'event': 'button_press', 'properties': {'size': 'big', 'color': 'blue', 'mp_lib': 'python', 'token': '12345', 'distinct_id': 'player1', '$lib_version': mixpanel.__version__, 'time': 1000, '$insert_id': 'xyz1200'}} + assert expected_data == data + def test_people_set_functional(self): - expect_data = {'$distinct_id': 'amq', '$set': {'birth month': 'october', 'favorite color': 'purple'}, '$time': 1000, '$token': '12345'} - with self._assertRequested('https://api.mixpanel.com/engage', expect_data): + with responses.RequestsMock() as rsps: + rsps.add( + responses.POST, + 'https://api.mixpanel.com/engage', + json={"status": 1, "error": None}, + status=200, + ) + self.mp.people_set('amq', {'birth month': 'october', 'favorite color': 'purple'}) + body = six.ensure_str(rsps.calls[0].request.body) + wrapper = dict(urllib.parse.parse_qsl(body)) + data = json.loads(wrapper["data"]) + del wrapper["data"] + + assert {"ip": "0", "verbose": "1"} == wrapper + expected_data = {'$distinct_id': 'amq', '$set': {'birth month': 'october', 'favorite color': 'purple'}, '$time': 1000, '$token': '12345'} + assert expected_data == data