From b4627820a2e9ca2e5fe054dcbdaee177e7afdf38 Mon Sep 17 00:00:00 2001 From: "Taylor C. Richberger" Date: Fri, 20 Nov 2020 14:01:15 -0700 Subject: [PATCH 1/2] fix asynchronous_mail_send The previous solution was still a blocking solution. It had an async function that didn't actually await a future, meaning it would still block the entire event loop. This uses the event loop executor to background it onto an executor thread, which will still at least let IO properly parallelize while the GIL is released. --- use_cases/asynchronous_mail_send.md | 76 +++++++++++++++++++---------- 1 file changed, 49 insertions(+), 27 deletions(-) diff --git a/use_cases/asynchronous_mail_send.md b/use_cases/asynchronous_mail_send.md index de38dc751..38cfdb494 100644 --- a/use_cases/asynchronous_mail_send.md +++ b/use_cases/asynchronous_mail_send.md @@ -1,15 +1,35 @@ # Asynchronous Mail Send -## Using `asyncio` (3.5+) +## Using `asyncio` (3.6+) The built-in `asyncio` library can be used to send email in a non-blocking manner. `asyncio` helps us execute mail sending in a separate context, allowing us to continue the execution of business logic without waiting for all our emails to send first. ```python from sendgrid import SendGridAPIClient from sendgrid.helpers.mail import Content, Mail, From, To, Mail +from functools import partial import os import asyncio +try: + # Python 3.7+ only. This is more efficient than get_event_loop where + # available. + from asyncio import get_running_loop + from asyncio import run as async_run +except ImportError: + # Python 3.6+ compatibility + from asyncio import get_event_loop as get_running_loop + + def async_run(future): + loop = asyncio.new_event_loop() + try: + try: + return loop.run_until_complete(future) + finally: + loop.run_until_complete(loop.shutdown_asyncgens()) + finally: + loop.close() + sendgrid_client = SendGridAPIClient( api_key=os.environ.get('SENDGRID_API_KEY')) @@ -36,48 +56,50 @@ em10 = Mail(from_email, to_email, "Message #10", content) ems = [em1, em2, em3, em4, em5, em6, em7, em8, em9, em10] -async def send_email(n, email): +async def send_email(n: int, email: Mail) -> None: ''' send_mail wraps Twilio SendGrid's API client, and makes a POST request to - the api/v3/mail/send endpoint with `email`. - Args: - email: single mail object. + the api/v3/mail/send endpoint with ``email``. ''' try: - response = sendgrid_client.send(request_body=email) + loop = get_running_loop() + + # This runs the sending in a thread managed by the executor. This is + # what allows blocking IO calls to operate asynchronously, because they + # are actually run in a different thread, allowing parallel + # asynchronous operation to proceed as desired, at least while active + # IO has released Python's GIL + response = await loop.run_in_executor( + None, + partial( + sendgrid_client.send, + request_body=email)) + if response.status_code < 300: print("Email #{} processed".format(n), response.body, response.status_code) except urllib.error.HTTPError as e: e.read() -@asyncio.coroutine -def send_many(emails, cb): +async def send_many(emails, cb): ''' send_many creates a number of non-blocking tasks (to send email) - that will run on the existing event loop. Due to non-blocking nature, - you can include a callback that will run after all tasks have been queued. + that will run on the existing event loop's executor (ThreadPoolExecutor by + default). Args: - emails: contains any # of `sendgrid.helpers.mail.Mail`. - cb: a function that will execute immediately. + emails: contains any # of `sendgrid.helpers.mail.Mail`. ''' print("START - sending emails ...") - for n, em in enumerate(emails): - asyncio.async(send_email(n, em)) - print("END - returning control...") - cb() - - -def sample_cb(): - print("Executing callback now...") - for i in range(0, 100): - print(i) - return + # gather is necessary to actually batch futures to run in parallel. + # create_task and ensure_future are also options to do the same, but you + # would have to await all those tasks individually anyway otherwise you + # could end up with your program killing unfinished futures. This is the + # simplest way to wait for all futures to finish. + await asyncio.gather(*[send_email(n, em) for n, em in enumerate(emails)]) + print("END - returning control...") if __name__ == "__main__": - loop = asyncio.get_event_loop() - task = asyncio.async(send_many(ems, sample_cb)) - loop.run_until_complete(task) -``` \ No newline at end of file + async_run(send_many(ems, sample_cb)) +``` From a51f4a30cd513943dc8fa06ccc3527ee04892b24 Mon Sep 17 00:00:00 2001 From: "Taylor C. Richberger" Date: Fri, 20 Nov 2020 14:15:34 -0700 Subject: [PATCH 2/2] remove callback --- use_cases/asynchronous_mail_send.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/use_cases/asynchronous_mail_send.md b/use_cases/asynchronous_mail_send.md index 38cfdb494..06e31f98a 100644 --- a/use_cases/asynchronous_mail_send.md +++ b/use_cases/asynchronous_mail_send.md @@ -81,7 +81,7 @@ async def send_email(n: int, email: Mail) -> None: e.read() -async def send_many(emails, cb): +async def send_many(emails): ''' send_many creates a number of non-blocking tasks (to send email) that will run on the existing event loop's executor (ThreadPoolExecutor by @@ -101,5 +101,5 @@ async def send_many(emails, cb): print("END - returning control...") if __name__ == "__main__": - async_run(send_many(ems, sample_cb)) + async_run(send_many(ems)) ```