Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: verify signature from event webhook #901

Merged
merged 3 commits into from
Jun 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
.PHONY: venv install test-install test test-integ test-docker clean nopyc

venv:
venv: clean
@python --version || (echo "Python is not installed, please install Python 2 or Python 3"; exit 1);
virtualenv --python=python venv

Expand Down
14 changes: 14 additions & 0 deletions examples/helpers/eventwebhook/eventwebhook_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from sendgrid.helpers.eventwebhook import EventWebhook, EventWebhookHeader

def is_valid_signature(request):
public_key = 'base64-encoded public key'

event_webhook = EventWebhook()
ec_public_key = event_webhook.convert_public_key_to_ecdsa(public_key)

return event_webhook.verify_signature(
request.text,
request.headers[EventWebhookHeader.SIGNATURE],
request.headers[EventWebhookHeader.TIMESTAMP],
ec_public_key
)
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ PyYAML>=4.2b1
python-http-client>=3.2.1
six==1.11.0
pytest==3.8.2
starkbank-ecdsa>=1.0.0
1 change: 1 addition & 0 deletions sendgrid/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from .helpers.endpoints import * # noqa
from .helpers.mail import * # noqa
from .helpers.stats import * # noqa
from .helpers.eventwebhook import * # noqa
from .sendgrid import SendGridAPIClient # noqa
from .twilio_email import TwilioEmailAPIClient # noqa
from .version import __version__
50 changes: 50 additions & 0 deletions sendgrid/helpers/eventwebhook/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
from ellipticcurve.ecdsa import Ecdsa
from ellipticcurve.publicKey import PublicKey
from ellipticcurve.signature import Signature

from .eventwebhook_header import EventWebhookHeader

class EventWebhook:
"""
This class allows you to use the Event Webhook feature. Read the docs for
more details: https://sendgrid.com/docs/for-developers/tracking-events/event
"""

def __init__(self, public_key=None):
"""
Construct the Event Webhook verifier object
:param public_key: verification key under Mail Settings
:type public_key: string
"""
self.public_key = self.convert_public_key_to_ecdsa(public_key) if public_key else public_key

def convert_public_key_to_ecdsa(self, public_key):
"""
Convert the public key string to a ECPublicKey.

:param public_key: verification key under Mail Settings
:type public_key string
:return: public key using the ECDSA algorithm
:rtype PublicKey
"""
return PublicKey.fromPem(public_key)

def verify_signature(self, payload, signature, timestamp, public_key=None):
"""
Verify signed event webhook requests.

:param payload: event payload in the request body
:type payload: string
:param signature: value obtained from the 'X-Twilio-Email-Event-Webhook-Signature' header
:type signature: string
:param timestamp: value obtained from the 'X-Twilio-Email-Event-Webhook-Timestamp' header
:type timestamp: string
:param public_key: elliptic curve public key
:type public_key: PublicKey
:return: true or false if signature is valid
"""
timestamped_payload = timestamp + payload
decoded_signature = Signature.fromBase64(signature)

key = public_key or self.public_key
return Ecdsa.verify(timestamped_payload, decoded_signature, key)
10 changes: 10 additions & 0 deletions sendgrid/helpers/eventwebhook/eventwebhook_header.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
class EventWebhookHeader:
"""
This class lists headers that get posted to the webhook. Read the docs for
more details: https://sendgrid.com/docs/for-developers/tracking-events/event
"""
SIGNATURE = 'X-Twilio-Email-Event-Webhook-Signature'
TIMESTAMP = 'X-Twilio-Email-Event-Webhook-Timestamp'

def __init__(self):
eshanholtz marked this conversation as resolved.
Show resolved Hide resolved
pass
2 changes: 1 addition & 1 deletion sendgrid/helpers/inbound/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def __init__(self, **opts):
'path', os.path.abspath(os.path.dirname(__file__))
)
with open('{0}/config.yml'.format(self.path)) as stream:
config = yaml.load(stream)
config = yaml.load(stream, Loader=yaml.FullLoader)
eshanholtz marked this conversation as resolved.
Show resolved Hide resolved
self._debug_mode = config['debug_mode']
self._endpoint = config['endpoint']
self._host = config['host']
Expand Down
42 changes: 42 additions & 0 deletions test/test_eventwebhook.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import json
import unittest

from sendgrid import EventWebhook


class UnitTests(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.PUBLIC_KEY = 'MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEEDr2LjtURuePQzplybdC+u4CwrqDqBaWjcMMsTbhdbcwHBcepxo7yAQGhHPTnlvFYPAZFceEu/1FwCM/QmGUhA=='
cls.SIGNATURE = 'MEUCIQCtIHJeH93Y+qpYeWrySphQgpNGNr/U+UyUlBkU6n7RAwIgJTz2C+8a8xonZGi6BpSzoQsbVRamr2nlxFDWYNH2j/0='
cls.TIMESTAMP = '1588788367'
cls.PAYLOAD = json.dumps({
'event': 'test_event',
'category': 'example_payload',
'message_id': 'message_id',
}, sort_keys=True, separators=(',', ':'))

def test_verify_valid_signature(self):
ew = EventWebhook()
key = ew.convert_public_key_to_ecdsa(self.PUBLIC_KEY)
self.assertTrue(ew.verify_signature(self.PAYLOAD, self.SIGNATURE, self.TIMESTAMP, key))

def test_verify_bad_key(self):
ew = EventWebhook('MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEqTxd43gyp8IOEto2LdIfjRQrIbsd4SXZkLW6jDutdhXSJCWHw8REntlo7aNDthvj+y7GjUuFDb/R1NGe1OPzpA==')
self.assertFalse(ew.verify_signature(self.PAYLOAD, self.SIGNATURE, self.TIMESTAMP))

def test_verify_bad_payload(self):
ew = EventWebhook(self.PUBLIC_KEY)
self.assertFalse(ew.verify_signature('payload', self.SIGNATURE, self.TIMESTAMP))

def test_verify_bad_signature(self):
ew = EventWebhook(self.PUBLIC_KEY)
self.assertFalse(ew.verify_signature(
self.PAYLOAD,
'MEUCIQCtIHJeH93Y+qpYeWrySphQgpNGNr/U+UyUlBkU6n7RAwIgJTz2C+8a8xonZGi6BpSzoQsbVRamr2nlxFDWYNH3j/0=',
self.TIMESTAMP
))

def test_verify_bad_timestamp(self):
ew = EventWebhook(self.PUBLIC_KEY)
self.assertFalse(ew.verify_signature(self.PAYLOAD, self.SIGNATURE, 'timestamp'))