Skip to content

Commit

Permalink
Pass logger to YRoom, require ypy-websocket >=0.5.0
Browse files Browse the repository at this point in the history
  • Loading branch information
davidbrochart committed Nov 17, 2022
1 parent 16bf915 commit 09c02ee
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 19 deletions.
54 changes: 36 additions & 18 deletions jupyter_server_ydoc/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ class DocumentRoom(YRoom):

is_transient = False

def __init__(self, type: str, ystore: BaseYStore):
super().__init__(ready=False, ystore=ystore)
def __init__(self, type: str, ystore: BaseYStore, log):
super().__init__(ready=False, ystore=ystore, log=log)
self.type = type
self.cleaner: Optional["asyncio.Task[Any]"] = None
self.watcher: Optional["asyncio.Task[Any]"] = None
Expand All @@ -50,6 +50,9 @@ class TransientRoom(YRoom):

is_transient = True

def __init__(self, log):
super().__init__(log=log)


async def metadata_callback() -> bytes:
# the current datetime will be stored in metadata as bytes
Expand Down Expand Up @@ -81,7 +84,7 @@ async def monitor(self):
self.ypatch_nb = 0

def get_room(self, path: str) -> YRoom:
if path not in self.rooms.keys():
if path not in self.rooms:
if path.count(":") >= 2:
# it is a stored document (e.g. a notebook)
file_format, file_type, file_path = path.split(":", 2)
Expand All @@ -90,10 +93,10 @@ def get_room(self, path: str) -> YRoom:
ystore = self.ystore_class(
path=updates_file_path, metadata_callback=metadata_callback
)
self.rooms[path] = DocumentRoom(file_type, ystore)
self.rooms[path] = DocumentRoom(file_type, ystore, self.log)
else:
# it is a transient document (e.g. awareness)
self.rooms[path] = TransientRoom()
self.rooms[path] = TransientRoom(self.log)
return self.rooms[path]


Expand Down Expand Up @@ -148,6 +151,9 @@ def get_file_info(self) -> Tuple[str, str, str]:
raise RuntimeError(f"File {self.room.document.path} cannot be found anymore")
assert file_path is not None
if file_path != self.room.document.path:
self.log.debug(
"File with ID %s was moved from %s to %s", self.room.document.path, file_path
)
self.room.document.path = file_path
return file_format, file_type, file_path

Expand All @@ -169,6 +175,7 @@ async def open(self, path):
rooms_ready=False, auto_clean_rooms=False, ystore_class=ystore_class, log=self.log
)
self._message_queue = asyncio.Queue()
self.lock = asyncio.Lock()
assert self.websocket_server is not None
self.room = self.websocket_server.get_room(path)
self.set_file_info(path)
Expand Down Expand Up @@ -219,12 +226,15 @@ async def watch_file(self):

async def maybe_load_document(self):
file_format, file_type, file_path = self.get_file_info()
model = await ensure_async(
self.contents_manager.get(file_path, content=False, type=file_type, format=file_format)
)
async with self.lock:
model = await ensure_async(
self.contents_manager.get(
file_path, content=False, type=file_type, format=file_format
)
)
# do nothing if the file was saved by us
if self.last_modified < model["last_modified"]:
self.log.debug("Opening Y document from disk: %s", file_path)
self.log.debug("Reverting file that had out-of-band changes: %s", file_path)
model = await ensure_async(
self.contents_manager.get(file_path, type=file_type, format=file_format)
)
Expand All @@ -233,7 +243,10 @@ async def maybe_load_document(self):

async def send(self, message):
# needed to be compatible with WebsocketServer (websocket.send)
self.write_message(message, binary=True)
try:
self.write_message(message, binary=True)
except BaseException:
pass

async def recv(self):
message = await self._message_queue.get()
Expand All @@ -255,9 +268,10 @@ def on_message(self, message):
self.websocket_server.connected_users[user] = name
self.log.debug("Y user joined: %s", name)
for user in removed_users:
name = self.websocket_server.connected_users[user]
del self.websocket_server.connected_users[user]
self.log.debug("Y user left: %s", name)
if user in self.websocket_server.connected_users:
name = self.websocket_server.connected_users[user]
del self.websocket_server.connected_users[user]
self.log.debug("Y user left: %s", name)
# filter out message depending on changes
if skip:
self.log.debug(
Expand Down Expand Up @@ -314,11 +328,13 @@ async def maybe_save_document(self):
except Exception:
return
self.log.debug("Opening Y document from disk: %s", file_path)
model = await ensure_async(
self.contents_manager.get(file_path, type=file_type, format=file_format)
)
async with self.lock:
model = await ensure_async(
self.contents_manager.get(file_path, type=file_type, format=file_format)
)
if self.last_modified < model["last_modified"]:
# file changed on disk, let's revert
self.log.debug("Reverting file that had out-of-band changes: %s", file_path)
self.room.document.source = model["content"]
self.last_modified = model["last_modified"]
return
Expand All @@ -329,8 +345,10 @@ async def maybe_save_document(self):
model["format"] = file_format
model["content"] = self.room.document.source
self.log.debug("Saving Y document to disk: %s", file_path)
model = await ensure_async(self.contents_manager.save(model, file_path))
self.last_modified = model["last_modified"]
async with self.lock:
model = await ensure_async(self.contents_manager.save(model, file_path))
self.last_modified = model["last_modified"]
self.log.debug(f"{self.last_modified=}")
self.room.document.dirty = False

def check_origin(self, origin):
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ classifiers = [
requires-python = ">=3.7"
dependencies = [
"jupyter_ydoc>=0.2.0,<0.4.0",
"ypy-websocket>=0.4.0,<0.5.0",
"ypy-websocket>=0.5.0,<0.6.0",
"jupyter_server_fileid >=0.6.0,<1"
]

Expand Down

0 comments on commit 09c02ee

Please sign in to comment.