Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 34 additions & 36 deletions mixpanel/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import time
import uuid

import requests
from requests.auth import HTTPBasicAuth
import six
from six.moves import range
import urllib3
Expand Down Expand Up @@ -172,7 +174,6 @@ def alias(self, alias_id, original, meta=None):
Calling this method *always* results in a synchronous HTTP request
to Mixpanel servers, regardless of any custom consumer.
"""
sync_consumer = Consumer()
event = {
'event': '$create_alias',
'properties': {
Expand All @@ -183,6 +184,8 @@ def alias(self, alias_id, original, meta=None):
}
if meta:
event.update(meta)

sync_consumer = Consumer()
sync_consumer.send('events', json_dumps(event, cls=self._serializer))

def merge(self, api_key, distinct_id1, distinct_id2, meta=None, api_secret=None):
Expand Down Expand Up @@ -540,7 +543,7 @@ class Consumer(object):

def __init__(self, events_url=None, people_url=None, import_url=None,
request_timeout=None, groups_url=None, api_host="api.mixpanel.com",
retry_limit=4, retry_backoff_factor=0.25, verify_cert=False):
retry_limit=4, retry_backoff_factor=0.25, verify_cert=True):
# TODO: With next major version, make the above args kwarg-only, and reorder them.
self._endpoints = {
'events': events_url or 'https://{}/track'.format(api_host),
Expand All @@ -549,31 +552,28 @@ def __init__(self, events_url=None, people_url=None, import_url=None,
'imports': import_url or 'https://{}/import'.format(api_host),
}

retry_args = {
"total": retry_limit,
"backoff_factor": retry_backoff_factor,
"status_forcelist": set(range(500, 600)),
}
self._verify_cert = verify_cert
self._request_timeout = request_timeout

# Work around renamed argument in urllib3.
if hasattr(urllib3.util.Retry.DEFAULT, "allowed_methods"):
methods_arg = "allowed_methods"
else:
methods_arg = "method_whitelist"

retry_args[methods_arg] = {"POST"}
retry_config = urllib3.Retry(**retry_args)

if not verify_cert:
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

cert_reqs = 'CERT_REQUIRED' if verify_cert else 'CERT_NONE'
self._http = urllib3.PoolManager(
retries=retry_config,
timeout=urllib3.Timeout(request_timeout),
cert_reqs=str(cert_reqs),
retry_args = {
"total": retry_limit,
"backoff_factor": retry_backoff_factor,
"status_forcelist": set(range(500, 600)),
methods_arg: {"POST"},
}
adapter = requests.adapters.HTTPAdapter(
max_retries=urllib3.Retry(**retry_args),
)

self._session = requests.Session()
self._session.mount('http', adapter)

def send(self, endpoint, json_message, api_key=None, api_secret=None):
"""Immediately record an event or a profile update.

Expand All @@ -594,40 +594,38 @@ def send(self, endpoint, json_message, api_key=None, api_secret=None):
self._write_request(self._endpoints[endpoint], json_message, api_key, api_secret)

def _write_request(self, request_url, json_message, api_key=None, api_secret=None):
data = {
'data': json_message,
'verbose': 1,
'ip': 0,
}

if isinstance(api_key, tuple):
# For compatibility with subclassers, allow the auth details to be
# packed into the existing api_key param.
api_key, api_secret = api_key

params = {
'data': json_message,
'verbose': 1,
'ip': 0,
}
if api_key:
data.update({'api_key': api_key})

headers = None
params['api_key'] = api_key

basic_auth = None
if api_secret is not None:
headers = urllib3.util.make_headers(basic_auth="{}:".format(api_secret))
basic_auth = HTTPBasicAuth(api_secret, '')

try:
response = self._http.request(
'POST',
response = self._session.post(
request_url,
fields=data,
headers=headers,
encode_multipart=False, # URL-encode payload in POST body.
data=params,
auth=basic_auth,
timeout=self._request_timeout,
verify=self._verify_cert,
)
except Exception as e:
six.raise_from(MixpanelException(e), e)

try:
response_dict = json.loads(response.data.decode('utf-8'))
response_dict = response.json()
except ValueError:
raise MixpanelException('Cannot interpret Mixpanel server response: {0}'.format(response.data))
raise MixpanelException('Cannot interpret Mixpanel server response: {0}'.format(response.text))

if response_dict['status'] != 1:
raise MixpanelException('Mixpanel error: {0}'.format(response_dict['error']))
Expand Down Expand Up @@ -669,7 +667,7 @@ class BufferedConsumer(object):
"""
def __init__(self, max_size=50, events_url=None, people_url=None, import_url=None,
request_timeout=None, groups_url=None, api_host="api.mixpanel.com",
retry_limit=4, retry_backoff_factor=0.25, verify_cert=False):
retry_limit=4, retry_backoff_factor=0.25, verify_cert=True):
self._consumer = Consumer(events_url, people_url, import_url, request_timeout,
groups_url, api_host, retry_limit, retry_backoff_factor, verify_cert)
self._buffers = {
Expand Down
6 changes: 3 additions & 3 deletions requirements-testing.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
mock==1.3.0
pytest==4.6.11
typing; python_version >='3.4' and python_version <'3.5' # To work around CI fail.
pytest~=4.6
responses~=0.13.3
typing; python_version>='3.4' and python_version<'3.5' # To work around CI fail.
5 changes: 3 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ def find_version(*paths):
author_email='[email protected]',
license='Apache',
install_requires=[
'six >= 1.9.0',
'urllib3 >= 1.21.1',
'six>=1.9.0',
'requests>=2.4.2',
'urllib3',
],

classifiers=[
Expand Down
Loading