Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 27 additions & 17 deletions channels/generic/http.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,21 @@
import logging
import traceback

from channels.consumer import AsyncConsumer

from ..db import aclose_old_connections
from ..exceptions import StopConsumer

logger = logging.getLogger("channels.consumer")
if not logger.hasHandlers():
handler = logging.StreamHandler()
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.DEBUG)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding the logging call is fine I guess, but I don't think we should be adding handlers for users. That's up to them.


class AsyncHttpConsumer(AsyncConsumer):
"""
Expand All @@ -17,10 +30,6 @@ async def send_headers(self, *, status=200, headers=None):
"""
Sets the HTTP response status and headers. Headers may be provided as
a list of tuples or as a dictionary.

Note that the ASGI spec requires that the protocol server only starts
sending the response to the client after ``self.send_body`` has been
called the first time.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you undo all of these unrelated changes to comments please.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am so sorry about these

"""
if headers is None:
headers = []
Expand All @@ -34,10 +43,6 @@ async def send_headers(self, *, status=200, headers=None):
async def send_body(self, body, *, more_body=False):
"""
Sends a response body to the client. The method expects a bytestring.

Set ``more_body=True`` if you want to send more body content later.
The default behavior closes the response, and further messages on
the channel will be ignored.
"""
assert isinstance(body, bytes), "Body is not bytes"
await self.send(
Expand All @@ -46,18 +51,14 @@ async def send_body(self, body, *, more_body=False):

async def send_response(self, status, body, **kwargs):
"""
Sends a response to the client. This is a thin wrapper over
``self.send_headers`` and ``self.send_body``, and everything said
above applies here as well. This method may only be called once.
Sends a response to the client.
"""
await self.send_headers(status=status, **kwargs)
await self.send_body(body)

async def handle(self, body):
"""
Receives the request body as a bytestring. Response may be composed
using the ``self.send*`` methods; the return value of this method is
thrown away.
Receives the request body as a bytestring.
"""
raise NotImplementedError(
"Subclasses of AsyncHttpConsumer must provide a handle() method."
Expand All @@ -77,9 +78,14 @@ async def http_request(self, message):
"""
if "body" in message:
self.body.append(message["body"])

if not message.get("more_body"):
try:
await self.handle(b"".join(self.body))
except Exception:
logger.error(f"Error in handle(): {traceback.format_exc()}")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about using logger.exception() ?

await self.send_response(500, b"Internal Server Error")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this is the essence of the proposal.

raise
finally:
await self.disconnect()
raise StopConsumer()
Expand All @@ -88,6 +94,10 @@ async def http_disconnect(self, message):
"""
Let the user do their cleanup and close the consumer.
"""
await self.disconnect()
await aclose_old_connections()
raise StopConsumer()
try:
await self.disconnect()
await aclose_old_connections()
except Exception as e:
logger.error(f"Error during disconnect: {str(e)}")
finally:
raise StopConsumer()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an extra.

54 changes: 41 additions & 13 deletions tests/test_generic_http.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import asyncio
import json
import time
from unittest.mock import patch

import pytest

Expand Down Expand Up @@ -57,8 +58,7 @@ async def handle(self, body):
@pytest.mark.asyncio
async def test_per_scope_consumers():
"""
Tests that a distinct consumer is used per scope, with AsyncHttpConsumer as
the example consumer class.
Tests that a distinct consumer is used per scope.
"""

class TestConsumer(AsyncHttpConsumer):
Expand All @@ -68,7 +68,6 @@ def __init__(self):

async def handle(self, body):
body = f"{self.__class__.__name__} {id(self)} {self.time}"

await self.send_response(
200,
body.encode("utf-8"),
Expand All @@ -77,27 +76,21 @@ async def handle(self, body):

app = TestConsumer.as_asgi()

# Open a connection
communicator = HttpCommunicator(app, method="GET", path="/test/")
response = await communicator.get_response()
assert response["status"] == 200

# And another one.
communicator = HttpCommunicator(app, method="GET", path="/test2/")
second_response = await communicator.get_response()
assert second_response["status"] == 200

assert response["body"] != second_response["body"]


@pytest.mark.django_db(transaction=True)
@pytest.mark.asyncio
async def test_async_http_consumer_future():
"""
Regression test for channels accepting only coroutines. The ASGI specification
states that the `receive` and `send` arguments to an ASGI application should be
"awaitable callable" objects. That includes non-coroutine functions that return
Futures.
Regression test for channels accepting only coroutines.
"""

class TestConsumer(AsyncHttpConsumer):
Expand All @@ -110,7 +103,6 @@ async def handle(self, body):

app = TestConsumer()

# Ensure the passed functions are specifically coroutines.
async def coroutine_app(scope, receive, send):
async def receive_coroutine():
return await asyncio.ensure_future(receive())
Expand All @@ -126,7 +118,6 @@ async def send_coroutine(*args, **kwargs):
assert response["status"] == 200
assert response["headers"] == [(b"Content-Type", b"text/plain")]

# Ensure the passed functions are "Awaitable Callables" and NOT coroutines.
async def awaitable_callable_app(scope, receive, send):
def receive_awaitable_callable():
return asyncio.ensure_future(receive())
Expand All @@ -136,9 +127,46 @@ def send_awaitable_callable(*args, **kwargs):

await app(scope, receive_awaitable_callable, send_awaitable_callable)

# Open a connection
communicator = HttpCommunicator(awaitable_callable_app, method="GET", path="/")
response = await communicator.get_response()
assert response["body"] == b"42"
assert response["status"] == 200
assert response["headers"] == [(b"Content-Type", b"text/plain")]


@pytest.mark.django_db(transaction=True)
@pytest.mark.asyncio
async def test_error_logging():
"""Regression test for error logging."""

class TestConsumer(AsyncHttpConsumer):
async def handle(self, body):
raise AssertionError("Error correctly raised")

communicator = HttpCommunicator(TestConsumer(), "GET", "/")
with patch("channels.generic.http.logger.error") as mock_logger_error:
try:
await communicator.get_response(timeout=0.05)
except AssertionError:
pass
args, _ = mock_logger_error.call_args
assert "Error in handle()" in args[0]
assert "AssertionError: Error correctly raised" in args[0]


@pytest.mark.django_db(transaction=True)
@pytest.mark.asyncio
async def test_error_handling_and_send_response():
"""Regression test to check error handling."""

class TestConsumer(AsyncHttpConsumer):
async def handle(self, body):
raise AssertionError("Error correctly raised")

communicator = HttpCommunicator(TestConsumer(), "GET", "/")
with patch.object(AsyncHttpConsumer, "send_response") as mock_send_response:
try:
await communicator.get_response(timeout=0.05)
except AssertionError:
pass
mock_send_response.assert_called_once_with(500, b"Internal Server Error")
Loading