Skip to content

Commit

Permalink
Observe changes to internal YDoc to broadcast and store
Browse files Browse the repository at this point in the history
  • Loading branch information
davidbrochart committed Nov 17, 2022
1 parent 2d8de60 commit 8303e17
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 121 deletions.
1 change: 0 additions & 1 deletion ypy_websocket/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from .websocket_provider import WebsocketProvider # noqa
from .websocket_server import WebsocketServer, YRoom # noqa
from .ydoc import YDoc # noqa
from .yutils import YMessageType # noqa

__version__ = "0.4.0"
5 changes: 3 additions & 2 deletions ypy_websocket/websocket_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import y_py as Y

from .yutils import process_message, put_updates, sync
from .yutils import YMessageType, process_sync_message, put_updates, sync


class WebsocketProvider:
Expand All @@ -24,7 +24,8 @@ async def _run(self):
await sync(self._ydoc, self._websocket)
send_task = asyncio.create_task(self._send())
async for message in self._websocket:
await process_message(message, self._ydoc, self._websocket, self.log)
if message[0] == YMessageType.SYNC:
await process_sync_message(message[1:], self._ydoc, self._websocket, self.log)
send_task.cancel()

async def _send(self):
Expand Down
51 changes: 34 additions & 17 deletions ypy_websocket/websocket_server.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,28 @@
import asyncio
import logging
from functools import partial
from typing import Callable, Dict, List, Optional

import y_py as Y

from .awareness import Awareness
from .ydoc import YDoc
from .ystore import BaseYStore
from .yutils import sync, update
from .yutils import YMessageType, create_update_message, process_sync_message, put_updates, sync


class YRoom:

clients: List
ydoc: YDoc
ydoc: Y.YDoc
ystore: Optional[BaseYStore]
_on_message: Optional[Callable]
_update_queue: asyncio.Queue
_ready: bool

def __init__(self, ready: bool = True, ystore: Optional[BaseYStore] = None, log=None):
self._update_queue = asyncio.Queue()
self.ydoc = YDoc()
self.ydoc.init(self._update_queue) # FIXME: overriding Y.YDoc.__init__ doesn't seem to work
self.ydoc = Y.YDoc()
self.awareness = Awareness(self.ydoc)
self._update_queue = asyncio.Queue()
self._ready = False
self.ready = ready
self.ystore = ystore
Expand All @@ -38,7 +39,7 @@ def ready(self) -> bool:
def ready(self, value: bool) -> None:
self._ready = value
if value:
self.ydoc.ready = True
self.ydoc.observe_after_transaction(partial(put_updates, self._update_queue, self.ydoc))

@property
def on_message(self) -> Optional[Callable]:
Expand All @@ -51,12 +52,17 @@ def on_message(self, value: Optional[Callable]):
async def _broadcast_updates(self):
while True:
update = await self._update_queue.get()
# broadcast internal ydoc's update made from the backend to all clients
# broadcast internal ydoc's update to all clients, that includes changes from the
# clients and changes from the backend (out-of-band changes)
for client in self.clients:
self.log.debug(
"Sending Y update from backend to client with endpoint: %s", client.path
"Sending Y update to client with endpoint: %s", client.path
)
asyncio.create_task(client.send(update))
message = create_update_message(update)
asyncio.create_task(client.send(message))
if self.ystore:
self.log.debug("Writing Y update to YStore")
asyncio.create_task(self.ystore.write(update))

def _clean(self):
self._broadcast_task.cancel()
Expand Down Expand Up @@ -110,16 +116,27 @@ async def serve(self, websocket):
skip = await room.on_message(message)
if skip:
continue
# update our internal state and the YStore (if any)
asyncio.create_task(update(message, room, websocket, self.log))
# forward messages from this client to every other client in the background
for client in [c for c in room.clients if c != websocket]:
message_type = message[0]
if message_type == YMessageType.SYNC:
# update our internal state in the background
# changes to the internal state are then forwarded to all clients
# and stored in the YStore (if any)
asyncio.create_task(process_sync_message(message[1:], room.ydoc, websocket, self.log))
elif message_type == YMessageType.AWARENESS:
# forward awareness messages from this client to all clients,
# including itself, because it's used to keep the connection alive
self.log.debug(
"Sending Y update from client with endpoint %s to client with endpoint: %s",
"Received %s message from endpoint: %s",
YMessageType.AWARENESS.raw_str(),
websocket.path,
client.path,
)
asyncio.create_task(client.send(message))
for client in room.clients:
self.log.debug(
"Sending Y awareness from client with endpoint %s to client with endpoint: %s",
websocket.path,
client.path,
)
asyncio.create_task(client.send(message))
# remove this client
room.clients = [c for c in room.clients if c != websocket]
if self.auto_clean_rooms and not room.clients:
Expand Down
66 changes: 0 additions & 66 deletions ypy_websocket/ydoc.py

This file was deleted.

52 changes: 17 additions & 35 deletions ypy_websocket/yutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,54 +101,36 @@ def read_var_string(self):


def put_updates(update_queue: asyncio.Queue, ydoc: Y.YDoc, event: Y.AfterTransactionEvent) -> None:
message = create_update_message(event.get_update())
update_queue.put_nowait(message)
update_queue.put_nowait(event.get_update())


async def process_message(message: bytes, ydoc: Y.YDoc, websocket, log) -> Optional[bytes]:
async def process_sync_message(message: bytes, ydoc: Y.YDoc, websocket, log) -> None:
message_type = message[0]
msg = message[1:]
log.debug(
"Received %s message from endpoint: %s",
YMessageType(message_type).raw_str(),
YSyncMessageType(message_type).raw_str(),
websocket.path,
)
if message_type == YMessageType.SYNC:
message_type = message[1]
msg = message[2:]
if message_type == YSyncMessageType.SYNC_STEP1:
state = read_message(msg)
update = Y.encode_state_as_update(ydoc, state)
reply = create_sync_step2_message(update)
log.debug(
"Received %s message from endpoint: %s",
YSyncMessageType(message_type).raw_str(),
"Sending %s message to endpoint: %s",
YSyncMessageType.SYNC_STEP2.raw_str(),
websocket.path,
)
if message_type == YSyncMessageType.SYNC_STEP1:
state = read_message(msg)
update = Y.encode_state_as_update(ydoc, state)
reply = create_sync_step2_message(update)
log.debug(
"Sending %s message to endpoint: %s",
YSyncMessageType.SYNC_STEP2.raw_str(),
websocket.path,
)
await websocket.send(reply)
elif message_type in (
YSyncMessageType.SYNC_STEP2,
YSyncMessageType.SYNC_UPDATE,
):
update = read_message(msg)
Y.apply_update(ydoc, update)
return update

return None
await websocket.send(reply)
elif message_type in (
YSyncMessageType.SYNC_STEP2,
YSyncMessageType.SYNC_UPDATE,
):
update = read_message(msg)
Y.apply_update(ydoc, update)


async def sync(ydoc: Y.YDoc, websocket):
state = Y.encode_state_vector(ydoc)
msg = create_sync_step1_message(state)
await websocket.send(msg)


async def update(message, room, websocket, log):
yupdate = await process_message(message, room.ydoc, websocket, log)
if room.ystore and yupdate:
log.debug("Writing Y update to YStore from endpoint: %s", websocket.path)
await room.ystore.write(yupdate)

0 comments on commit 8303e17

Please sign in to comment.