-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathetherpad.py
83 lines (61 loc) · 2.03 KB
/
etherpad.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
import asyncio
from arsenic.errors import NoSuchElement, ArsenicTimeout, JavascriptError
_SET = """
var editor = require('ep_etherpad-lite/static/js/pad_editor').padeditor.ace;
editor.importText('%s');
"""
_GET = """
var text = require('ep_etherpad-lite/static/js/pad_editor').padeditor.ace.exportText();
var div = document.getElementById('padText');
if (!div) {
var div = document.createElement('div');
div.id = 'padText';
document.body.appendChild(div);
}
div.textContent = text;
"""
class Notifier(object):
def __init__(self, readers=5):
self._current = 1
self._until = readers
self._readers = asyncio.Event()
self._writer = asyncio.Event()
def _is_set(self):
return self._current == self._until
async def wait_for_writer(self):
await self._writer.wait()
async def one_read(self):
if self._is_set():
return
self._current += 1
if self._current == self._until:
self._readers.set()
def written(self):
self._writer.set()
async def wait_for_readers(self):
await self._readers.wait()
class EtherpadLite(object):
def __init__(self, browser, url, sleep=1.):
self.browser = browser
self.url = url
self.sleep = sleep
async def visit(self):
return (await self.browser.get(self.url))
async def get_text(self):
while True:
await asyncio.sleep(self.sleep)
try:
await self.browser.execute_script(_GET)
el = await self.browser.wait_for_element(5, '#padText')
except (NoSuchElement, ArsenicTimeout, JavascriptError):
continue
else:
return (await el.get_text())
async def set_text(self, text):
while True:
await asyncio.sleep(self.sleep)
try:
await self.browser.execute_script(_SET % text)
break
except (NoSuchElement, ArsenicTimeout, JavascriptError):
continue