Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 1 addition & 12 deletions .github/workflows/python-app.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,6 @@ jobs:
- "3.11"
- "3.12"
- "3.13"
# redis service
services:
redis:
image: redis:latest
ports:
- 6379:6379
options: >-
--health-cmd "redis-cli ping"
--health-interval 10s
--health-timeout 5s
--health-retries 5
env:
TERM: 'dumb'
steps:
Expand All @@ -47,4 +36,4 @@ jobs:
uv run pyright streaq/ tests/ example.py
- name: Test with pytest
run: |
uv run pytest -n auto --cov=streaq --cov-report=term-missing tests/ --cov-fail-under=95
docker compose run --rm tests uv run --locked --all-extras --dev pytest -n auto --dist=loadgroup --cov=streaq tests/
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ tmp.py

# C extensions
*.so
data/

# Distribution / packaging
.Python
Expand Down
5 changes: 4 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ lint:
uv run pyright streaq/ tests/ example.py

test:
uv run pytest -n auto --cov=streaq --cov-report=term-missing --cov-fail-under=95
UV_PYTHON=3.10 docker compose run --rm tests uv run --locked --all-extras --dev pytest -n auto --dist=loadgroup --cov=streaq tests/

docs:
uv run -m sphinx -T -b html -d docs/_build/doctrees -D language=en docs/ docs/_build/

cleanup:
docker compose down --remove-orphans
19 changes: 10 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@

# streaQ

Fast, async, type-safe distributed task queue via Redis streams
Fast, async, fully-typed distributed task queue via Redis streams

## Features

- Up to [5x faster](https://github.com/tastyware/streaq/tree/master/benchmarks) than `arq`
- Strongly typed
- Fully typed
- 95%+ unit test coverage
- Comprehensive documentation
- Support for delayed/scheduled tasks
Expand All @@ -23,7 +23,7 @@ Fast, async, type-safe distributed task queue via Redis streams
- Support for synchronous tasks (run in separate threads)
- Redis Sentinel support for production
- Built-in web UI for monitoring tasks
- Built with structured concurrency on `anyio`
- Built with structured concurrency on `anyio`, supports both `asyncio` and `trio`

## Installation

Expand Down Expand Up @@ -56,14 +56,15 @@ async def cronjob() -> None:
print("Nobody respects the spammish repetition!")
```

Finally, let's queue up some tasks:
Finally, let's initialize the worker and queue up some tasks:

```python
await sleeper.enqueue(3)
# enqueue returns a task object that can be used to get results/info
task = await sleeper.enqueue(1).start(delay=3)
print(await task.info())
print(await task.result(timeout=5))
async with worker:
await sleeper.enqueue(3)
# enqueue returns a task object that can be used to get results/info
task = await sleeper.enqueue(1).start(delay=3)
print(await task.info())
print(await task.result(timeout=5))
```

Putting this all together gives us [example.py](https://github.com/tastyware/streaq/blob/master/example.py). Let's spin up a worker:
Expand Down
182 changes: 182 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
services:
redis-master:
image: redis:latest
container_name: redis-master
hostname: redis-master
ports:
- "6379:6379"
volumes:
- ./data/master:/data
command:
[
"redis-server",
"--appendonly",
"yes",
"--repl-diskless-load",
"on-empty-db",
"--protected-mode",
"no"
]
healthcheck:
test: ["CMD-SHELL", "redis-cli -p 6379 ping | grep -q PONG"]
interval: 2s
timeout: 3s
retries: 15
start_period: 5s

slave-1:
image: redis:latest
container_name: slave-1
hostname: slave-1
depends_on:
- redis-master
ports:
- "6380:6379"
volumes:
- ./data/slave1:/data
command:
[
"redis-server",
"--appendonly",
"yes",
"--replicaof",
"redis-master",
"6379",
"--repl-diskless-load",
"on-empty-db",
"--protected-mode",
"no"
]
healthcheck:
test: ["CMD-SHELL", "redis-cli -p 6379 --raw INFO replication | grep -q '^role:slave' && redis-cli -p 6379 --raw INFO replication | grep -q 'master_link_status:up'"]
interval: 3s
timeout: 4s
retries: 15
start_period: 15s

slave-2:
image: redis:latest
container_name: slave-2
hostname: slave-2
depends_on:
- redis-master
ports:
- "6381:6379"
volumes:
- ./data/slave2:/data
command:
[
"redis-server",
"--appendonly",
"yes",
"--replicaof",
"redis-master",
"6379",
"--repl-diskless-load",
"on-empty-db",
"--protected-mode",
"no"
]
healthcheck:
test: ["CMD-SHELL", "redis-cli -p 6379 --raw INFO replication | grep -q '^role:slave' && redis-cli -p 6379 --raw INFO replication | grep -q 'master_link_status:up'"]
interval: 3s
timeout: 4s
retries: 15
start_period: 15s

sentinel-1:
image: redis:latest
container_name: sentinel-1
hostname: sentinel-1
depends_on:
- redis-master
ports:
- "26379:26379"
command: >
sh -c 'echo "bind 0.0.0.0" > /etc/sentinel.conf &&
echo "sentinel monitor mymaster redis-master 6379 2" >> /etc/sentinel.conf &&
echo "sentinel resolve-hostnames yes" >> /etc/sentinel.conf &&
echo "sentinel down-after-milliseconds mymaster 10000" >> /etc/sentinel.conf &&
echo "sentinel failover-timeout mymaster 10000" >> /etc/sentinel.conf &&
echo "sentinel parallel-syncs mymaster 1" >> /etc/sentinel.conf &&
redis-sentinel /etc/sentinel.conf'
healthcheck:
test: ["CMD-SHELL", "redis-cli -p 26379 SENTINEL ckquorum mymaster | grep -q ^OK"]
interval: 3s
timeout: 4s
retries: 60
start_period: 10s

sentinel-2:
image: redis:latest
container_name: sentinel-2
hostname: sentinel-2
depends_on:
- redis-master
ports:
- "26380:26379"
command: >
sh -c 'echo "bind 0.0.0.0" > /etc/sentinel.conf &&
echo "sentinel monitor mymaster redis-master 6379 2" >> /etc/sentinel.conf &&
echo "sentinel resolve-hostnames yes" >> /etc/sentinel.conf &&
echo "sentinel down-after-milliseconds mymaster 10000" >> /etc/sentinel.conf &&
echo "sentinel failover-timeout mymaster 10000" >> /etc/sentinel.conf &&
echo "sentinel parallel-syncs mymaster 1" >> /etc/sentinel.conf &&
redis-sentinel /etc/sentinel.conf'
healthcheck:
test: ["CMD-SHELL", "redis-cli -p 26379 SENTINEL ckquorum mymaster | grep -q ^OK"]
interval: 3s
timeout: 4s
retries: 60
start_period: 10s

sentinel-3:
image: redis:latest
container_name: sentinel-3
hostname: sentinel-3
depends_on:
- redis-master
ports:
- "26381:26379"
command: >
sh -c 'echo "bind 0.0.0.0" > /etc/sentinel.conf &&
echo "sentinel monitor mymaster redis-master 6379 2" >> /etc/sentinel.conf &&
echo "sentinel resolve-hostnames yes" >> /etc/sentinel.conf &&
echo "sentinel down-after-milliseconds mymaster 10000" >> /etc/sentinel.conf &&
echo "sentinel failover-timeout mymaster 10000" >> /etc/sentinel.conf &&
echo "sentinel parallel-syncs mymaster 1" >> /etc/sentinel.conf &&
redis-sentinel /etc/sentinel.conf'
healthcheck:
test: ["CMD-SHELL", "redis-cli -p 26379 SENTINEL ckquorum mymaster | grep -q ^OK"]
interval: 3s
timeout: 4s
retries: 60
start_period: 10s

tests:
image: ghcr.io/astral-sh/uv:debian
environment:
- UV_PYTHON=${PYTHON_VERSION}
- UV_LINK_MODE=copy
- UV_PYTHON_CACHE_DIR=/root/.cache/uv/python
working_dir: /app
volumes:
- .:/app
- /app/.venv
- uv-cache:/root/.cache/uv
depends_on:
redis-master:
condition: service_healthy
sentinel-1:
condition: service_healthy
sentinel-2:
condition: service_healthy
sentinel-3:
condition: service_healthy
slave-1:
condition: service_healthy
slave-2:
condition: service_healthy

volumes:
uv-cache: {} # named volume for uv/pip caches
50 changes: 50 additions & 0 deletions docs/contributing.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
Contributing
============

Development
-----------

Contributions to streaQ are always welcome! Most development tasks are in the included ``Makefile``:

- ``make install``: set up the linting environment
- ``make lint``: run ruff to check formatting and pyright to check types
- ``make test``: use the included ``docker-compose.yml`` file to spin up Redis and Sentinel containers, then run test suite. This uses caching so it's faster after the first run. You'll need Docker and compose installed.
- ``make docs``: build the documentation pages with Sphinx
- ``make cleanup``: tear down running Docker containers

If you need to test individual tests instead of the entire suite, you can do this:

.. code-block:: bash

UV_PYTHON=3.10 docker compose run --rm tests uv run --locked --all-extras --dev pytest -sk 'test_name'

Benchmarks
----------

If you want to run the benchmarks yourself, first install the dependencies:

.. code-block:: bash

uv add streaq[benchmark]

You can enqueue jobs like so:

.. code-block:: bash

python benchmarks/bench_streaq.py --time 1

Here, ``time`` is the number of seconds to sleep per task.

You can run a worker with one of these commands, adjusting the number of workers as desired:

.. code-block:: bash

arq --workers ? --burst bench_arq.WorkerSettings
saq --quiet bench_saq.settings --workers ?
streaq --burst --workers ? bench_streaq.worker
taskiq worker --workers ? --max-async-tasks 32 bench_taskiq:broker --max-prefetch 32

Donating
--------

If you're interested in supporting the ongoing development of this project, donations are welcome! You can do so through GitHub: https://github.com/sponsors/tastyware
17 changes: 9 additions & 8 deletions docs/getting-started.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ To start, you'll need to create a ``Worker`` object. At worker creation, you can

from contextlib import asynccontextmanager
from dataclasses import dataclass
from typing import AsyncIterator
from typing import AsyncGenerator
from httpx import AsyncClient
from streaq import Worker

Expand All @@ -20,7 +20,7 @@ To start, you'll need to create a ``Worker`` object. At worker creation, you can
http_client: AsyncClient

@asynccontextmanager
async def lifespan() -> AsyncIterator[WorkerContext]:
async def lifespan() -> AsyncGenerator[WorkerContext]:
"""
Here, we initialize the worker's dependencies.
You can also do any startup/shutdown work here
Expand All @@ -40,15 +40,16 @@ You can then register async tasks with the worker like this:
res = await worker.context.http_client.get(url)
return len(res.text)

Finally, let's queue up some tasks:
Finally, let's queue up some tasks via the worker's async context manager:

.. code-block:: python

await fetch.enqueue("https://tastyware.dev/")
# enqueue returns a task object that can be used to get results/info
task = await fetch.enqueue("https://github.com/tastyware/streaq").start(delay=3)
print(await task.info())
print(await task.result(timeout=5))
async with worker:
await fetch.enqueue("https://tastyware.dev/")
# enqueue returns a task object that can be used to get results/info
task = await fetch.enqueue("https://github.com/tastyware/streaq").start(delay=3)
print(await task.info())
print(await task.result(timeout=5))

Put this all together in a script and spin up a worker:

Expand Down
7 changes: 5 additions & 2 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ Fast, async, type-safe job queuing with Redis streams
+----------------------------+--------+-----+-----+--------+
| Task middleware | ✅ | ✅ | ✅ | ✅ |
+----------------------------+--------+-----+-----+--------+
| Web UI available | ✅ | ✅ | ✅ | ✅ |
+----------------------------+--------+-----+-----+--------+
| Actively maintained | ✅ | ❌ | ✅ | ✅ |
+----------------------------+--------+-----+-----+--------+
| Custom serializers | ✅ | ✅ | ❌ | ✅ |
Expand All @@ -48,10 +50,10 @@ Fast, async, type-safe job queuing with Redis streams
+----------------------------+--------+-----+-----+--------+
| Redis Sentinel support | ✅ | ❌ | ❌ | ✅ |
+----------------------------+--------+-----+-----+--------+
| Web UI available | ✅ | ✅ | ✅ | ✅ |
+----------------------------+--------+-----+-----+--------+
| Structured concurrency | ❌ | ❌ | ❌ | ✅ |
+----------------------------+--------+-----+-----+--------+
| Trio support | ❌ | ❌ | ❌ | ✅ |
+----------------------------+--------+-----+-----+--------+

.. toctree::
:maxdepth: 2
Expand All @@ -65,6 +67,7 @@ Fast, async, type-safe job queuing with Redis streams
middleware
cli
integrations
contributing

.. toctree::
:maxdepth: 2
Expand Down
Loading
Loading