From e09b92d28a24875d80b7f86bc1c246fa9354e00f Mon Sep 17 00:00:00 2001 From: waidhoferj Date: Mon, 28 Feb 2022 11:55:41 -0800 Subject: [PATCH 1/5] basic example layout --- examples/websocket-provider/README.md | 19 +++++++++ examples/websocket-provider/main.py | 42 ++++++++++++++++++++ examples/websocket-provider/requirements.txt | 2 + 3 files changed, 63 insertions(+) create mode 100644 examples/websocket-provider/README.md create mode 100644 examples/websocket-provider/main.py create mode 100644 examples/websocket-provider/requirements.txt diff --git a/examples/websocket-provider/README.md b/examples/websocket-provider/README.md new file mode 100644 index 0000000..84cba38 --- /dev/null +++ b/examples/websocket-provider/README.md @@ -0,0 +1,19 @@ +# Web Socket Example + +Shows how you can use the Python WebSocket library to Ypy exchange updates between two users. + +## Getting Started + +1. Install Python dependencies: + +``` +pip install -r requirements.txt +``` + +2. Start server + +``` +python main.py +``` + +3. TODO diff --git a/examples/websocket-provider/main.py b/examples/websocket-provider/main.py new file mode 100644 index 0000000..808dd68 --- /dev/null +++ b/examples/websocket-provider/main.py @@ -0,0 +1,42 @@ +import asyncio +import websockets +import y_py as Y + + +async def hello(websocket): + name = await websocket.recv() + print(f"<<< {name}") + + greeting = f"Hello {name}!" + + await websocket.send(greeting) + print(f">>> {greeting}") + + +async def main(): + async with websockets.serve(hello, "localhost", 8765): + await asyncio.Future() # run forever + + +if __name__ == "__main__": + asyncio.run(main()) + + +def tut(): + d1 = Y.YDoc() + # Create a new YText object in the YDoc + text = d1.get_text("test") + # Start a transaction in order to update the text + with d1.begin_transaction() as txn: + # Add text contents + text.push(txn, "hello world!") + + # Create another document + d2 = Y.YDoc() + # Share state with the original document + state_vector = Y.encode_state_vector(d2) + diff = Y.encode_state_as_update(d1, state_vector) + Y.apply_update(d2, diff) + + with d2.begin_transaction() as txn: + value = d2.get_text("test").to_string(txn) diff --git a/examples/websocket-provider/requirements.txt b/examples/websocket-provider/requirements.txt new file mode 100644 index 0000000..0143ea0 --- /dev/null +++ b/examples/websocket-provider/requirements.txt @@ -0,0 +1,2 @@ +websockets +y-py \ No newline at end of file From f189a8c263c027b1ad5ca036590b88c958cef991 Mon Sep 17 00:00:00 2001 From: waidhoferj Date: Wed, 23 Mar 2022 09:22:32 -0700 Subject: [PATCH 2/5] progress --- examples/websocket-provider/client.py | 27 +++++++++++++ examples/websocket-provider/config.json | 4 ++ examples/websocket-provider/main.py | 42 -------------------- examples/websocket-provider/requirements.txt | 3 +- examples/websocket-provider/server.py | 30 ++++++++++++++ examples/websocket-provider/utils.py | 16 ++++++++ 6 files changed, 78 insertions(+), 44 deletions(-) create mode 100644 examples/websocket-provider/client.py create mode 100644 examples/websocket-provider/config.json delete mode 100644 examples/websocket-provider/main.py create mode 100644 examples/websocket-provider/server.py create mode 100644 examples/websocket-provider/utils.py diff --git a/examples/websocket-provider/client.py b/examples/websocket-provider/client.py new file mode 100644 index 0000000..7545ee5 --- /dev/null +++ b/examples/websocket-provider/client.py @@ -0,0 +1,27 @@ +import types +from utils import read_config +import y_py as Y + +doc = Y.YDoc() + + +def send_update(writer): + state_vector = Y.encode_state_vector(doc) + writer.write(state_vector) + receive(update) + Y.apply_update(doc, update) + + +def receive_update(reader): + sv = reader.read() + update = Y.encode_state_as_update(doc, sv) + send(update) + + +def setup_stream(): + pass + + +def main(): + doc.observe("update", send_update) + setup_stream() diff --git a/examples/websocket-provider/config.json b/examples/websocket-provider/config.json new file mode 100644 index 0000000..bda420c --- /dev/null +++ b/examples/websocket-provider/config.json @@ -0,0 +1,4 @@ +{ + "host": "127.0.0.1", + "port": 8888 +} \ No newline at end of file diff --git a/examples/websocket-provider/main.py b/examples/websocket-provider/main.py deleted file mode 100644 index 808dd68..0000000 --- a/examples/websocket-provider/main.py +++ /dev/null @@ -1,42 +0,0 @@ -import asyncio -import websockets -import y_py as Y - - -async def hello(websocket): - name = await websocket.recv() - print(f"<<< {name}") - - greeting = f"Hello {name}!" - - await websocket.send(greeting) - print(f">>> {greeting}") - - -async def main(): - async with websockets.serve(hello, "localhost", 8765): - await asyncio.Future() # run forever - - -if __name__ == "__main__": - asyncio.run(main()) - - -def tut(): - d1 = Y.YDoc() - # Create a new YText object in the YDoc - text = d1.get_text("test") - # Start a transaction in order to update the text - with d1.begin_transaction() as txn: - # Add text contents - text.push(txn, "hello world!") - - # Create another document - d2 = Y.YDoc() - # Share state with the original document - state_vector = Y.encode_state_vector(d2) - diff = Y.encode_state_as_update(d1, state_vector) - Y.apply_update(d2, diff) - - with d2.begin_transaction() as txn: - value = d2.get_text("test").to_string(txn) diff --git a/examples/websocket-provider/requirements.txt b/examples/websocket-provider/requirements.txt index 0143ea0..ae55baf 100644 --- a/examples/websocket-provider/requirements.txt +++ b/examples/websocket-provider/requirements.txt @@ -1,2 +1 @@ -websockets -y-py \ No newline at end of file +y_py \ No newline at end of file diff --git a/examples/websocket-provider/server.py b/examples/websocket-provider/server.py new file mode 100644 index 0000000..0f5fe1d --- /dev/null +++ b/examples/websocket-provider/server.py @@ -0,0 +1,30 @@ +import asyncio +from utils import read_config + + +async def handle_echo(reader: asyncio.StreamReader, writer: asyncio.StreamWriter): + data = await reader.read(1024) + message = data.decode() + addr = writer.get_extra_info("peername") + + print(f"Received {message!r} from {addr!r}") + + print(f"Send: {message!r}") + writer.write(data) + await writer.drain() + + print("Close the connection") + writer.close() + + +async def main(): + server = await asyncio.start_server(handle_echo, "127.0.0.1", 8888) + + addrs = ", ".join(str(sock.getsockname()) for sock in server.sockets) + print(f"Serving on {addrs}") + + async with server: + await server.serve_forever() + + +asyncio.run(main()) diff --git a/examples/websocket-provider/utils.py b/examples/websocket-provider/utils.py new file mode 100644 index 0000000..f5b588f --- /dev/null +++ b/examples/websocket-provider/utils.py @@ -0,0 +1,16 @@ +from collections import namedtuple +import json +from typing import TypedDict + + +class Config(TypedDict): + host: str + port: int + + +def read_config() -> Config: + """ + Reads config JSON file + """ + with open("config.json", "r") as f: + return json.load(f) From 2f5db8307ccafab426f14f1e0f0f1e66d11e98fe Mon Sep 17 00:00:00 2001 From: waidhoferj Date: Tue, 6 Sep 2022 07:59:08 -0700 Subject: [PATCH 3/5] MVP draw app --- examples/websocket-provider/client.py | 78 +++++++++++++++----- examples/websocket-provider/draw.py | 39 ++++++++++ examples/websocket-provider/requirements.txt | 6 +- examples/websocket-provider/server.py | 41 +++++----- 4 files changed, 124 insertions(+), 40 deletions(-) create mode 100644 examples/websocket-provider/draw.py diff --git a/examples/websocket-provider/client.py b/examples/websocket-provider/client.py index 7545ee5..0498954 100644 --- a/examples/websocket-provider/client.py +++ b/examples/websocket-provider/client.py @@ -1,27 +1,71 @@ -import types -from utils import read_config +import asyncio +from time import sleep +import websockets import y_py as Y +import queue +import concurrent.futures +import threading -doc = Y.YDoc() +# Code based on the [`websockets` patter documentation](https://websockets.readthedocs.io/en/stable/howto/patterns.html) +class YDocWSClient: -def send_update(writer): - state_vector = Y.encode_state_vector(doc) - writer.write(state_vector) - receive(update) - Y.apply_update(doc, update) + def __init__(self, uri = "ws://localhost:8765"): + self.send_q = queue.Queue() + self.recv_q = queue.Queue() + self.uri = uri + def between_callback(): + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + loop.run_until_complete(self.start_ws_client()) + loop.close() + _thread = threading.Thread(target=between_callback) + _thread.start() -def receive_update(reader): - sv = reader.read() - update = Y.encode_state_as_update(doc, sv) - send(update) + -def setup_stream(): - pass + def send_updates(self, txn_event: Y.AfterTransactionEvent): + update = txn_event.get_update() + if update != b'\x00\x00': + self.send_q.put_nowait(update) + def apply_updates(self, doc: Y.YDoc): + while not self.recv_q.empty(): + update = self.recv_q.get_nowait() + Y.apply_update(doc, update) -def main(): - doc.observe("update", send_update) - setup_stream() + def _send(self, thing): + self.send_q.put_nowait(thing) + + async def client_handler(self, websocket): + consumer_task = asyncio.create_task(self.consumer_handler(websocket)) + producer_task = asyncio.create_task(self.producer_handler(websocket)) + done, pending = await asyncio.wait( + [consumer_task, producer_task], + return_when=asyncio.FIRST_COMPLETED, + ) + for task in pending: + task.cancel() + + async def consumer_handler(self, websocket): + async for message in websocket: + self.recv_q.put_nowait(message) + + async def producer_handler(self, websocket): + loop = asyncio.get_running_loop() + while True: + update = await loop.run_in_executor(None,self.send_q.get) + await websocket.send(update) + + async def start_ws_client(self): + async with websockets.connect(self.uri) as websocket: + await self.client_handler(websocket) + + +if __name__ == "__main__": + client = YDocWSClient() + while True: + sleep(1) + client._send("hello!") diff --git a/examples/websocket-provider/draw.py b/examples/websocket-provider/draw.py new file mode 100644 index 0000000..8d9f210 --- /dev/null +++ b/examples/websocket-provider/draw.py @@ -0,0 +1,39 @@ +from p5 import * +from y_py import YDoc, YArray, AfterTransactionEvent +from client import YDocWSClient + +doc: YDoc +strokes: YArray +client: YDocWSClient + + +def setup(): + global strokes + global doc + global client + size(720, 480) + doc = YDoc(0) + strokes = doc.get_array("strokes") + client = YDocWSClient() + doc.observe_after_transaction(client.send_updates) + + + + + +def draw(): + global strokes + global doc + global client + client.apply_updates(doc) + rect_mode(CENTER) + background(255) + if mouse_is_pressed: + with doc.begin_transaction() as txn: + strokes.append(txn, [mouse_x, mouse_y]) + fill(0) + no_stroke() + for x,y in strokes: + ellipse((x, y), 33, 33) + +run(frame_rate=60) diff --git a/examples/websocket-provider/requirements.txt b/examples/websocket-provider/requirements.txt index ae55baf..73465c3 100644 --- a/examples/websocket-provider/requirements.txt +++ b/examples/websocket-provider/requirements.txt @@ -1 +1,5 @@ -y_py \ No newline at end of file +y_py +websockets +numpy +vispy +p5 \ No newline at end of file diff --git a/examples/websocket-provider/server.py b/examples/websocket-provider/server.py index 0f5fe1d..5c95882 100644 --- a/examples/websocket-provider/server.py +++ b/examples/websocket-provider/server.py @@ -1,30 +1,27 @@ import asyncio -from utils import read_config +from turtle import update +import websockets +connected = set() -async def handle_echo(reader: asyncio.StreamReader, writer: asyncio.StreamWriter): - data = await reader.read(1024) - message = data.decode() - addr = writer.get_extra_info("peername") +async def server_handler(websocket): + # Register. + connected.add(websocket) + try: + async for message in websocket: + peers = {peer for peer in connected if peer is not websocket} + websockets.broadcast(peers, message) - print(f"Received {message!r} from {addr!r}") - - print(f"Send: {message!r}") - writer.write(data) - await writer.drain() - - print("Close the connection") - writer.close() + except websockets.exceptions.ConnectionClosedError: + pass + finally: + # Unregister. + connected.remove(websocket) async def main(): - server = await asyncio.start_server(handle_echo, "127.0.0.1", 8888) - - addrs = ", ".join(str(sock.getsockname()) for sock in server.sockets) - print(f"Serving on {addrs}") - - async with server: - await server.serve_forever() - + async with websockets.serve(server_handler, "localhost", 8765): + await asyncio.Future() # run forever -asyncio.run(main()) +if __name__ == "__main__": + asyncio.run(main()) From 5ea9903f9639d2e25d6a40c0990a1b270092529e Mon Sep 17 00:00:00 2001 From: John Waidhofer <33008436+Waidhoferj@users.noreply.github.com> Date: Mon, 12 Sep 2022 20:33:20 -0700 Subject: [PATCH 4/5] Update examples/websocket-provider/README.md Co-authored-by: Kevin Jahns --- examples/websocket-provider/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/websocket-provider/README.md b/examples/websocket-provider/README.md index 84cba38..59e82f7 100644 --- a/examples/websocket-provider/README.md +++ b/examples/websocket-provider/README.md @@ -1,7 +1,7 @@ # Web Socket Example Shows how you can use the Python WebSocket library to Ypy exchange updates between two users. - +![drawing-demo](https://user-images.githubusercontent.com/5553757/189190756-21c2bc61-1816-488e-b2cd-bc910fece6d9.gif) ## Getting Started 1. Install Python dependencies: From 5313c590d924a2f1d14f9f93f221cce77df0aab7 Mon Sep 17 00:00:00 2001 From: waidhoferj Date: Mon, 12 Sep 2022 22:39:41 -0700 Subject: [PATCH 5/5] added demo code --- .../{websocket-provider => drawing}/README.md | 12 +++---- .../{websocket-provider => drawing}/client.py | 24 ++++---------- examples/drawing/demo.py | 33 +++++++++++++++++++ .../{websocket-provider => drawing}/draw.py | 10 ++++-- .../requirements.txt | 1 + .../{websocket-provider => drawing}/server.py | 2 +- examples/websocket-provider/config.json | 4 --- examples/websocket-provider/utils.py | 16 --------- 8 files changed, 56 insertions(+), 46 deletions(-) rename examples/{websocket-provider => drawing}/README.md (56%) rename examples/{websocket-provider => drawing}/client.py (82%) create mode 100644 examples/drawing/demo.py rename examples/{websocket-provider => drawing}/draw.py (79%) rename examples/{websocket-provider => drawing}/requirements.txt (50%) rename examples/{websocket-provider => drawing}/server.py (89%) delete mode 100644 examples/websocket-provider/config.json delete mode 100644 examples/websocket-provider/utils.py diff --git a/examples/websocket-provider/README.md b/examples/drawing/README.md similarity index 56% rename from examples/websocket-provider/README.md rename to examples/drawing/README.md index 59e82f7..7730031 100644 --- a/examples/websocket-provider/README.md +++ b/examples/drawing/README.md @@ -1,7 +1,9 @@ -# Web Socket Example +# Collaborative Drawing -Shows how you can use the Python WebSocket library to Ypy exchange updates between two users. ![drawing-demo](https://user-images.githubusercontent.com/5553757/189190756-21c2bc61-1816-488e-b2cd-bc910fece6d9.gif) + +A basic collaborative drawing application using Ypy and WebSockets. Left click on the canvas to leave a mark. + ## Getting Started 1. Install Python dependencies: @@ -10,10 +12,8 @@ Shows how you can use the Python WebSocket library to Ypy exchange updates betwe pip install -r requirements.txt ``` -2. Start server +2. Run the demo ``` -python main.py +python demo.py ``` - -3. TODO diff --git a/examples/websocket-provider/client.py b/examples/drawing/client.py similarity index 82% rename from examples/websocket-provider/client.py rename to examples/drawing/client.py index 0498954..6e47208 100644 --- a/examples/websocket-provider/client.py +++ b/examples/drawing/client.py @@ -10,24 +10,23 @@ class YDocWSClient: - def __init__(self, uri = "ws://localhost:8765"): + def __init__(self, uri = "ws://localhost:7654"): self.send_q = queue.Queue() self.recv_q = queue.Queue() self.uri = uri - def between_callback(): + def async_loop(): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) loop.run_until_complete(self.start_ws_client()) loop.close() - _thread = threading.Thread(target=between_callback) - _thread.start() - - - + ws_thread = threading.Thread(target=async_loop, daemon=True) + ws_thread.start() def send_updates(self, txn_event: Y.AfterTransactionEvent): update = txn_event.get_update() + # Sometimes transactions don't write, which means updates are empty. + # We only care about updates with meaningful mutations. if update != b'\x00\x00': self.send_q.put_nowait(update) @@ -35,9 +34,6 @@ def apply_updates(self, doc: Y.YDoc): while not self.recv_q.empty(): update = self.recv_q.get_nowait() Y.apply_update(doc, update) - - def _send(self, thing): - self.send_q.put_nowait(thing) async def client_handler(self, websocket): consumer_task = asyncio.create_task(self.consumer_handler(websocket)) @@ -62,10 +58,4 @@ async def producer_handler(self, websocket): async def start_ws_client(self): async with websockets.connect(self.uri) as websocket: await self.client_handler(websocket) - - -if __name__ == "__main__": - client = YDocWSClient() - while True: - sleep(1) - client._send("hello!") + \ No newline at end of file diff --git a/examples/drawing/demo.py b/examples/drawing/demo.py new file mode 100644 index 0000000..0517c57 --- /dev/null +++ b/examples/drawing/demo.py @@ -0,0 +1,33 @@ +import subprocess +from typing import List + + +def demo(): + """ + Spawns a server and two drawing clients. + """ + processes: List[subprocess.Popen] = [] + # Server + processes.append(subprocess.Popen(["python", "server.py"])) + + # Clients + for _ in range(2): + processes.append(subprocess.Popen(["python", "draw.py"])) + + + wait_until_done() + + for p in processes: + p.kill() + + + +def wait_until_done(): + print("waiting") + while input("Enter 'q' to quit: ").lower() != 'q': + continue + + + +if __name__ == "__main__": + demo() \ No newline at end of file diff --git a/examples/websocket-provider/draw.py b/examples/drawing/draw.py similarity index 79% rename from examples/websocket-provider/draw.py rename to examples/drawing/draw.py index 8d9f210..650989f 100644 --- a/examples/websocket-provider/draw.py +++ b/examples/drawing/draw.py @@ -1,3 +1,4 @@ +from turtle import position from p5 import * from y_py import YDoc, YArray, AfterTransactionEvent from client import YDocWSClient @@ -8,9 +9,13 @@ def setup(): + """ + Initialization logic that runs before the `draw()` loop. + """ global strokes global doc global client + title("Ypy Drawing Demo") size(720, 480) doc = YDoc(0) strokes = doc.get_array("strokes") @@ -19,9 +24,10 @@ def setup(): - - def draw(): + """ + Handles user input and updates the canvas. + """ global strokes global doc global client diff --git a/examples/websocket-provider/requirements.txt b/examples/drawing/requirements.txt similarity index 50% rename from examples/websocket-provider/requirements.txt rename to examples/drawing/requirements.txt index 73465c3..d658e50 100644 --- a/examples/websocket-provider/requirements.txt +++ b/examples/drawing/requirements.txt @@ -1,5 +1,6 @@ y_py websockets +glfw; sys_platform != 'win32' numpy vispy p5 \ No newline at end of file diff --git a/examples/websocket-provider/server.py b/examples/drawing/server.py similarity index 89% rename from examples/websocket-provider/server.py rename to examples/drawing/server.py index 5c95882..2a0a610 100644 --- a/examples/websocket-provider/server.py +++ b/examples/drawing/server.py @@ -20,7 +20,7 @@ async def server_handler(websocket): async def main(): - async with websockets.serve(server_handler, "localhost", 8765): + async with websockets.serve(server_handler, "localhost", 7654): await asyncio.Future() # run forever if __name__ == "__main__": diff --git a/examples/websocket-provider/config.json b/examples/websocket-provider/config.json deleted file mode 100644 index bda420c..0000000 --- a/examples/websocket-provider/config.json +++ /dev/null @@ -1,4 +0,0 @@ -{ - "host": "127.0.0.1", - "port": 8888 -} \ No newline at end of file diff --git a/examples/websocket-provider/utils.py b/examples/websocket-provider/utils.py deleted file mode 100644 index f5b588f..0000000 --- a/examples/websocket-provider/utils.py +++ /dev/null @@ -1,16 +0,0 @@ -from collections import namedtuple -import json -from typing import TypedDict - - -class Config(TypedDict): - host: str - port: int - - -def read_config() -> Config: - """ - Reads config JSON file - """ - with open("config.json", "r") as f: - return json.load(f)