Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sync with fork #23

Open
wants to merge 31 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
e66e5ee
Ignore *.egg.info (#71)
Kludex Aug 19, 2022
40af624
Remove await from Kafka `unsubscribe` (#72)
Kludex Aug 19, 2022
8c0382d
Replace references to httpx by broadcaster (#73)
Kludex Aug 20, 2022
25b364d
Pin dependencies (#74)
Kludex Aug 20, 2022
e9fdd02
Comply with PEP 561 (#75)
Kludex Aug 20, 2022
956571d
Remove deprecated loop parameter from AIOKafka constructors (#76)
Kludex Aug 20, 2022
ed15621
Add GitHub funding (#77)
Kludex Aug 20, 2022
f11d94d
Support Python 3.10 (#78)
Kludex Aug 20, 2022
e1f2977
Add `password` support for Redis backend (#79)
Kludex Aug 20, 2022
312fde9
Support `rediss://` scheme for Redis backend (#52)
spravesh1818 Aug 20, 2022
4d993ad
Add `websockets` requirement to example (#43)
Buuntu Aug 20, 2022
ccd476e
Add `publish` GitHub workflow (#80)
Kludex Aug 20, 2022
9aa890c
Drop Python 3.7 support (#95)
anabasalo Jun 18, 2023
2499fbd
Use `pyproject.toml` with hatch instead of `setup.py` (#96)
sumit-158 Jun 19, 2023
94bf887
Replace `run_until_first_complete` with task group (#101)
Jun 27, 2023
377b404
Move `setup.cfg` to `pyproject.toml` (#98)
sumit-158 Jul 5, 2023
de6ef40
Switch to `redis-py` library (#111)
alex-oleshkevich Mar 28, 2024
eeba6a7
Fix subscriber not properly unsubscribing when an exception is raised…
jolorke Apr 3, 2024
eeb09de
Allow user backends (#110)
alex-oleshkevich Apr 4, 2024
6dc07d6
General project maintenance. (#114)
alex-oleshkevich Apr 5, 2024
672d10d
Update README.md (#116)
tomchristie Apr 22, 2024
c4b4d59
Add redis-streams backend (#115)
alex-oleshkevich Apr 22, 2024
4ff8fa6
Add mqtt link (#126)
alex-oleshkevich Jun 6, 2024
6daa0d2
[redis] defer listener initialization (#128)
alex-oleshkevich Jun 11, 2024
399442d
Improvements to Kafka backend (#125)
alex-oleshkevich Jun 11, 2024
d6806ac
Bump version (#131)
alex-oleshkevich Jun 27, 2024
22e8b2a
sync publish script with starlette (#132)
alex-oleshkevich Jun 27, 2024
528cf42
Fix redisBackend `_pubsub_listener` just listen once (#134)
Fly-Playgroud Aug 1, 2024
23c9b40
bump version
alex-oleshkevich Aug 1, 2024
69cf29a
#136 improve typing (#139)
alex-oleshkevich Aug 23, 2024
a422d8a
export backends (#145)
alex-oleshkevich Nov 2, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/FUNDING.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
github: encode
29 changes: 29 additions & 0 deletions .github/workflows/publish.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
name: Publish

on:
push:
tags:
- "*"

jobs:
publish:
name: "Publish release"
runs-on: "ubuntu-latest"

steps:
- uses: "actions/checkout@v3"
- uses: "actions/setup-python@v4"
with:
python-version: "3.10"

- name: "Install dependencies"
run: "scripts/install"

- name: "Build package & docs"
run: "scripts/build"

- name: "Publish to PyPI & deploy docs"
run: "scripts/publish"
env:
TWINE_USERNAME: __token__
TWINE_PASSWORD: ${{ secrets.PYPI_TOKEN }}
4 changes: 2 additions & 2 deletions .github/workflows/test-suite.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ jobs:

strategy:
matrix:
python-version: ["3.7", "3.8", "3.9"]
python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"]

services:
zookeeper:
Expand Down Expand Up @@ -66,4 +66,4 @@ jobs:
- name: "Run tests"
run: "scripts/test"
- name: "Enforce coverage"
run: "scripts/coverage"
run: "scripts/coverage"
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,7 @@ test.db
.coverage
.pytest_cache/
.mypy_cache/
starlette.egg-info/
*.egg-info/
venv/
build/
dist/
46 changes: 37 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
Broadcaster helps you develop realtime streaming functionality by providing
a simple broadcast API onto a number of different backend services.

It currently supports [Redis PUB/SUB](https://redis.io/topics/pubsub), [Apache Kafka](https://kafka.apache.org/), and [Postgres LISTEN/NOTIFY](https://www.postgresql.org/docs/current/sql-notify.html), plus a simple in-memory backend, that you can use for local development or during testing.
It currently supports [Redis PUB/SUB](https://redis.io/topics/pubsub), [Redis Streams](https://redis.io/docs/latest/develop/data-types/streams/), [Apache Kafka](https://kafka.apache.org/), and [Postgres LISTEN/NOTIFY](https://www.postgresql.org/docs/current/sql-notify.html), plus a simple in-memory backend, that you can use for local development or during testing.

<img src="https://raw.githubusercontent.com/encode/broadcaster/master/docs/demo.gif" alt='WebSockets Demo'>

Expand All @@ -14,9 +14,9 @@ Here's a complete example of the backend code for a simple websocket chat app:
```python
# Requires: `starlette`, `uvicorn`, `jinja2`
# Run with `uvicorn example:app`
import anyio
from broadcaster import Broadcast
from starlette.applications import Starlette
from starlette.concurrency import run_until_first_complete
from starlette.routing import Route, WebSocketRoute
from starlette.templating import Jinja2Templates

Expand All @@ -33,10 +33,15 @@ async def homepage(request):

async def chatroom_ws(websocket):
await websocket.accept()
await run_until_first_complete(
(chatroom_ws_receiver, {"websocket": websocket}),
(chatroom_ws_sender, {"websocket": websocket}),
)

async with anyio.create_task_group() as task_group:
# run until first is complete
async def run_chatroom_ws_receiver() -> None:
await chatroom_ws_receiver(websocket=websocket)
task_group.cancel_scope.cancel()

task_group.start_soon(run_chatroom_ws_receiver)
await chatroom_ws_sender(websocket)


async def chatroom_ws_receiver(websocket):
Expand Down Expand Up @@ -65,7 +70,7 @@ The HTML template for the front end [is available here](https://github.com/encod

## Requirements

Python 3.7+
Python 3.8+

## Installation

Expand All @@ -78,19 +83,42 @@ Python 3.7+

* `Broadcast('memory://')`
* `Broadcast("redis://localhost:6379")`
* `Broadcast("redis-stream://localhost:6379")`
* `Broadcast("postgres://localhost:5432/broadcaster")`
* `Broadcast("kafka://localhost:9092")`


### Using custom backends

You can create your own backend and use it with `broadcaster`.
To do that you need to create a class which extends from `BroadcastBackend`
and pass it to the `broadcaster` via `backend` argument.

```python
from broadcaster import Broadcaster, BroadcastBackend

class MyBackend(BroadcastBackend):

broadcaster = Broadcaster(backend=MyBackend())
```

## Where next?

At the moment `broadcaster` is in Alpha, and should be considered a working design document.

The API should be considered subject to change. If you *do* want to use Broadcaster in its current
state, make sure to strictly pin your requirements to `broadcaster==0.2.0`.
state, make sure to strictly pin your requirements to `broadcaster==0.3.0`.

To be more capable we'd really want to add some additional backends, provide API support for reading recent event history from persistent stores, and provide a serialization/deserialization API...

* Serialization / deserialization to support broadcasting structured data.
* Backends for Redis Streams, Apache Kafka, and RabbitMQ.
* A backend for RabbitMQ.
* Add support for `subscribe('chatroom', history=100)` for backends which provide persistence. (Redis Streams, Apache Kafka) This will allow applications to subscribe to channel updates, while also being given an initial window onto the most recent events. We *might* also want to support some basic paging operations, to allow applications to scan back in the event history.
* Support for pattern subscribes in backends that support it.

## Third Party Packages

### MQTT backend
[Gist](https://gist.github.com/alex-oleshkevich/68411a0e7ad24d53afd28c3fa5da468c)

Integrates MQTT with Broadcaster
5 changes: 3 additions & 2 deletions broadcaster/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from ._base import Broadcast, Event
from .backends.base import BroadcastBackend

__version__ = "0.2.0"
__all__ = ["Broadcast", "Event"]
__version__ = "0.3.1"
__all__ = ["Broadcast", "Event", "BroadcastBackend"]
39 changes: 0 additions & 39 deletions broadcaster/_backends/kafka.py

This file was deleted.

36 changes: 0 additions & 36 deletions broadcaster/_backends/redis.py

This file was deleted.

61 changes: 34 additions & 27 deletions broadcaster/_base.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,21 @@
from __future__ import annotations

import asyncio
from contextlib import asynccontextmanager
from typing import Any, AsyncGenerator, AsyncIterator, Dict, Optional
from typing import TYPE_CHECKING, Any, AsyncGenerator, AsyncIterator, cast
from urllib.parse import urlparse

if TYPE_CHECKING: # pragma: no cover
from broadcaster.backends.base import BroadcastBackend


class Event:
def __init__(self, channel: str, message: str) -> None:
self.channel = channel
self.message = message

def __eq__(self, other: object) -> bool:
return (
isinstance(other, Event)
and self.channel == other.channel
and self.message == other.message
)
return isinstance(other, Event) and self.channel == other.channel and self.message == other.message

def __repr__(self) -> str:
return f"Event(channel={self.channel!r}, message={self.message!r})"
Expand All @@ -25,33 +26,40 @@ class Unsubscribed(Exception):


class Broadcast:
def __init__(self, url: str):
from broadcaster._backends.base import BroadcastBackend
def __init__(self, url: str | None = None, *, backend: BroadcastBackend | None = None) -> None:
assert url or backend, "Either `url` or `backend` must be provided."
self._backend = backend or self._create_backend(cast(str, url))
self._subscribers: dict[str, set[asyncio.Queue[Event | None]]] = {}

def _create_backend(self, url: str) -> BroadcastBackend:
parsed_url = urlparse(url)
self._backend: BroadcastBackend
self._subscribers: Dict[str, Any] = {}
if parsed_url.scheme == "redis":
from broadcaster._backends.redis import RedisBackend
if parsed_url.scheme in ("redis", "rediss"):
from broadcaster.backends.redis import RedisBackend

return RedisBackend(url)

elif parsed_url.scheme == "redis-stream":
from broadcaster.backends.redis import RedisStreamBackend

self._backend = RedisBackend(url)
return RedisStreamBackend(url)

elif parsed_url.scheme in ("postgres", "postgresql"):
from broadcaster._backends.postgres import PostgresBackend
from broadcaster.backends.postgres import PostgresBackend

self._backend = PostgresBackend(url)
return PostgresBackend(url)

if parsed_url.scheme == "kafka":
from broadcaster._backends.kafka import KafkaBackend
from broadcaster.backends.kafka import KafkaBackend

self._backend = KafkaBackend(url)
return KafkaBackend(url)

elif parsed_url.scheme == "memory":
from broadcaster._backends.memory import MemoryBackend
from broadcaster.backends.memory import MemoryBackend

self._backend = MemoryBackend(url)
return MemoryBackend(url)
raise ValueError(f"Unsupported backend: {parsed_url.scheme}")

async def __aenter__(self) -> "Broadcast":
async def __aenter__(self) -> Broadcast:
await self.connect()
return self

Expand Down Expand Up @@ -79,31 +87,30 @@ async def publish(self, channel: str, message: Any) -> None:
await self._backend.publish(channel, message)

@asynccontextmanager
async def subscribe(self, channel: str) -> AsyncIterator["Subscriber"]:
queue: asyncio.Queue = asyncio.Queue()
async def subscribe(self, channel: str) -> AsyncIterator[Subscriber]:
queue: asyncio.Queue[Event | None] = asyncio.Queue()

try:
if not self._subscribers.get(channel):
await self._backend.subscribe(channel)
self._subscribers[channel] = set([queue])
self._subscribers[channel] = {queue}
else:
self._subscribers[channel].add(queue)

yield Subscriber(queue)

finally:
self._subscribers[channel].remove(queue)
if not self._subscribers.get(channel):
del self._subscribers[channel]
await self._backend.unsubscribe(channel)
finally:
await queue.put(None)


class Subscriber:
def __init__(self, queue: asyncio.Queue) -> None:
def __init__(self, queue: asyncio.Queue[Event | None]) -> None:
self._queue = queue

async def __aiter__(self) -> Optional[AsyncGenerator]:
async def __aiter__(self) -> AsyncGenerator[Event | None, None]:
try:
while True:
yield await self.get()
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ async def connect(self) -> None:
async def disconnect(self) -> None:
raise NotImplementedError()

async def subscribe(self, group: str) -> None:
async def subscribe(self, channel: str) -> None:
raise NotImplementedError()

async def unsubscribe(self, group: str) -> None:
async def unsubscribe(self, channel: str) -> None:
raise NotImplementedError()

async def publish(self, channel: str, message: Any) -> None:
Expand Down
Loading