Skip to content

Commit

Permalink
feat: support print_to_web
Browse files Browse the repository at this point in the history
  • Loading branch information
Yazawazi committed Nov 28, 2023
1 parent cec8cdb commit efb7f7e
Showing 1 changed file with 105 additions and 30 deletions.
135 changes: 105 additions & 30 deletions backend/funix/decorator/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
Funix decorator. The central logic of Funix.
"""
import dataclasses
import sys
import time
from collections import deque
from copy import deepcopy
Expand All @@ -16,6 +17,7 @@
signature,
ismethod,
)
from io import StringIO
from json import dumps, loads
from secrets import token_hex
from traceback import format_exc
Expand Down Expand Up @@ -251,6 +253,24 @@
"""


class StdoutToWebsocket:
def __init__(self, ws):
self.ws = ws
self.value = StringIO()

def write(self, data):
self.value.write(data)
self.ws.send(dumps([self.value.getvalue()]))

def writelines(self, data):
self.value.writelines(data)
self.ws.send(dumps([self.value.getvalue()]))

def flush(self):
self.value.flush()
self.ws.send(dumps([self.value.getvalue()]))


def funix_class_params(*args, **kwargs):
def decorator(func):
class_method_ids_to_params[id(func)] = {
Expand Down Expand Up @@ -685,6 +705,7 @@ def funix(
menu: Optional[str] = None,
default: bool = False,
rate_limit: Limiter | list | dict = [],
print_to_web: bool = False,
):
"""
Decorator for functions to convert them to web apps
Expand Down Expand Up @@ -720,6 +741,7 @@ def funix(
You don't need to set it unless you are funixing a directory and package.
default(bool): whether this function is the default function
rate_limit(Limiter | list[Limiter]): rate limiters, an object or a list
print_to_web(bool): handle all stdout to web
Returns:
function: the decorated function
Expand Down Expand Up @@ -870,18 +892,29 @@ def decorator(function: callable) -> callable:
else:
__decorated_functions_names_list.append(function_title)

is_generator_function = isgeneratorfunction(function)
need_websocket = isgeneratorfunction(function)

decorated_function_ids.append(id(function))

function_signature = signature(function)
function_params = function_signature.parameters

if print_to_web:
print(
f"WARNING: the {function_name} function turn on the `print_to_web` option, "
f"the return annotation will be forced to be `str`, and the websocket mode is forced to be on."
)
need_websocket = True
setattr(function_signature, "_return_annotation", str)

__decorated_functions_list.append(
{
"name": function_title,
"path": endpoint,
"module": replace_module,
"secret": secret_key,
"id": function_id,
"websocket": is_generator_function,
"websocket": need_websocket,
}
)

Expand All @@ -890,8 +923,6 @@ def decorator(function: callable) -> callable:
else:
source_code = ""

function_signature = signature(function)
function_params = function_signature.parameters
decorated_params = {}
json_schema_props = {}

Expand Down Expand Up @@ -1687,7 +1718,7 @@ def wrapper(ws=None):
try:
if not session.get("__funix_id"):
session["__funix_id"] = uuid4().hex
if is_generator_function:
if need_websocket:
function_kwargs = loads(ws.receive())
else:
function_kwargs = request.get_json()
Expand Down Expand Up @@ -1777,6 +1808,29 @@ def wrapped_function(**wrapped_function_kwargs):
"error_body": format_exc(),
}

def output_to_web_function(**wrapped_function_kwargs):
try:
fake_stdout = StdoutToWebsocket(ws)
org_stdout, sys.stdout = sys.stdout, fake_stdout
if isgeneratorfunction(function):
for single_result in function(
**wrapped_function_kwargs
):
print(single_result)
else:
print(function(**wrapped_function_kwargs))
sys.stdout = org_stdout
except:
ws.send(
dumps(
{
"error_type": "function",
"error_body": format_exc(),
}
)
)
ws.close()

cell_names = []
upload_base64_files = {}

Expand Down Expand Up @@ -1815,7 +1869,7 @@ def wrapped_function(**wrapped_function_kwargs):
"error_type": "wrapper",
"error_body": "No arguments passed to function.",
}
if is_generator_function:
if need_websocket:
ws.send(dumps(empty_function_kwargs_error))
ws.close()
else:
Expand All @@ -1830,7 +1884,7 @@ def wrapped_function(**wrapped_function_kwargs):
"error_type": "wrapper",
"error_body": "Provided secret is incorrect.",
}
if is_generator_function:
if need_websocket:
ws.send(dumps(incorrect_secret_error))
ws.close()
else:
Expand All @@ -1842,7 +1896,7 @@ def wrapped_function(**wrapped_function_kwargs):
"error_type": "wrapper",
"error_body": "No secret provided.",
}
if is_generator_function:
if need_websocket:
ws.send(dumps(no_secret_error))
ws.close()
else:
Expand All @@ -1858,18 +1912,29 @@ def wrapped_function(**wrapped_function_kwargs):
arg[cell_name] = function_kwargs[cell_name][i]
for static_key in static_keys:
arg[static_key] = function_kwargs[static_key]
if is_generator_function:
result = []
for temp_function_result in function(**arg):
function_result = pre_anal_result(
temp_function_result
if need_websocket:
if print_to_web:
ws.send(
dumps(
{
"error_type": "wrapper",
"error_body": "Funix cannot handle cell, print_to_web and stream mode "
"in the same time",
}
)
)
if isinstance(function_result, list):
result.extend(function_result)
else:
result.append(function_result)
ws.send(dumps({"result": result}))
else:
result = []
for temp_function_result in function(**arg):
function_result = pre_anal_result(
temp_function_result
)
if isinstance(function_result, list):
result.extend(function_result)
else:
result.append(function_result)
ws.send(dumps({"result": result}))
result = []
ws.close()
else:
temp_result = wrapped_function(**arg)
Expand Down Expand Up @@ -1897,24 +1962,34 @@ def wrapped_function(**wrapped_function_kwargs):
new_args[upload_base64_file_key][
pos
] = rsp.read()
if is_generator_function:
for temp_function_result in function(**new_args):
function_result = pre_anal_result(temp_function_result)
ws.send(dumps(function_result))
ws.close()
if need_websocket:
if print_to_web:
output_to_web_function(**new_args)
else:
for temp_function_result in function(**new_args):
function_result = pre_anal_result(
temp_function_result
)
ws.send(dumps(function_result))
ws.close()
else:
return wrapped_function(**new_args)
else:
if is_generator_function:
for temp_function_result in function(**function_kwargs):
function_result = pre_anal_result(temp_function_result)
ws.send(dumps(function_result))
ws.close()
if need_websocket:
if print_to_web:
output_to_web_function(**function_kwargs)
else:
for temp_function_result in function(**function_kwargs):
function_result = pre_anal_result(
temp_function_result
)
ws.send(dumps(function_result))
ws.close()
else:
return wrapped_function(**function_kwargs)
except:
error = {"error_type": "wrapper", "error_body": format_exc()}
if is_generator_function:
if need_websocket:
ws.send(dumps(error))
ws.close()
else:
Expand All @@ -1925,7 +2000,7 @@ def wrapped_function(**wrapped_function_kwargs):
if safe_module_now:
wrapper.__setattr__("__name__", safe_module_now + "_" + function_name)

if is_generator_function:
if need_websocket:
sock.route(f"/call/{function_id}")(wrapper)
else:
app.post(f"/call/{endpoint}")(wrapper)
Expand Down

0 comments on commit efb7f7e

Please sign in to comment.