Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pre-compile a set of graphs / apps in docker-build stage. #163

Merged
merged 6 commits into from
Feb 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 20 additions & 22 deletions coordinator/gscoordinator/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@
from gscoordinator.utils import distribute_lib_on_k8s
from gscoordinator.utils import distribute_lib_via_hosts
from gscoordinator.utils import dump_string
from gscoordinator.utils import generate_graph_type_sig
from gscoordinator.utils import get_app_sha256
from gscoordinator.utils import get_graph_sha256
from gscoordinator.utils import get_lib_path
from gscoordinator.utils import str2bool
from gscoordinator.utils import to_maxgraph_schema
from gscoordinator.version import __version__
Expand Down Expand Up @@ -331,27 +333,32 @@ def RunStep(self, request, context): # noqa: C901
return response

def _maybe_compile_app(self, op):
app_sig = self._generate_app_sig(op.attr)
if app_sig in self._object_manager:
app_lib_path = self._object_manager.get(app_sig).lib_path
else:
app_lib_path = self._compile_lib_and_distribute(compile_app, app_sig, op)
app_sig = get_app_sha256(op.attr)
space = self._builtin_workspace
if types_pb2.GAR in op.attr:
space = self._udf_app_workspace
app_lib_path = get_lib_path(os.path.join(space, app_sig), app_sig)
if not os.path.isfile(app_lib_path):
compiled_path = self._compile_lib_and_distribute(compile_app, app_sig, op)
if app_lib_path != compiled_path:
raise RuntimeError("Computed path not equal to compiled path.")

op.attr[types_pb2.APP_LIBRARY_PATH].CopyFrom(
attr_value_pb2.AttrValue(s=app_lib_path.encode("utf-8"))
)
return op, app_sig, app_lib_path

def _maybe_register_graph(self, op, session_id):
graph_sig = self._generate_graph_sig(op.attr)
if graph_sig in self._object_manager:
lib_meta = self._object_manager.get(graph_sig)
graph_lib_path = lib_meta.lib_path
else:
graph_lib_path = self._compile_lib_and_distribute(
graph_sig = get_graph_sha256(op.attr)
space = self._builtin_workspace
graph_lib_path = get_lib_path(os.path.join(space, graph_sig), graph_sig)
if not os.path.isfile(graph_lib_path):
compiled_path = self._compile_lib_and_distribute(
compile_graph_frame, graph_sig, op
)

if graph_lib_path != compiled_path:
raise RuntimeError("Computed path not equal to compiled path.")
if graph_sig not in self._object_manager:
# register graph
op_def = op_def_pb2.OpDef(op=types_pb2.REGISTER_GRAPH_TYPE)
op_def.attr[types_pb2.GRAPH_LIBRARY_PATH].CopyFrom(
Expand Down Expand Up @@ -610,15 +617,6 @@ def _create_grpc_stub(self):
)
return engine_service_pb2_grpc.EngineServiceStub(channel)

def _generate_app_sig(self, attr):
return hashlib.sha256(
attr[types_pb2.APP_SIGNATURE].s + attr[types_pb2.GRAPH_SIGNATURE].s
).hexdigest()

def _generate_graph_sig(self, attr: dict):
graph_signature = generate_graph_type_sig(attr)
return hashlib.sha256(graph_signature.encode("utf-8")).hexdigest()

def _get_engine_config(self):
op_def = op_def_pb2.OpDef(op=types_pb2.GET_ENGINE_CONFIG)
dag_def = op_def_pb2.DagDef()
Expand Down
122 changes: 63 additions & 59 deletions coordinator/gscoordinator/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import copy
import glob
import hashlib
import json
import logging
import numbers
Expand Down Expand Up @@ -89,28 +90,53 @@ def get_lib_path(app_dir, app_name):
lib_path = os.path.join(app_dir, "lib%s.dylib" % app_name)
else:
raise RuntimeError("Unsupported platform.")
assert os.path.isfile(lib_path), "Error occurs when building the frame library."
return lib_path


def compile_app(workspace: str, app_name: str, attr, engine_config: dict):
def get_app_sha256(attr):
(
app_type,
app_header,
app_class,
vd_type,
md_type,
pregel_combine,
) = _codegen_app_info(attr, DEFAULT_GS_CONFIG_FILE)
graph_header, graph_type = _codegen_graph_info(attr)
logger.info("Codegened graph type: %s, Graph header: %s", graph_type, graph_header)
if app_type == "cpp_pie":
return hashlib.sha256(
f"{app_type}.{app_class}.{graph_type}".encode("utf-8")
).hexdigest()
else:
s = hashlib.sha256()
s.update(f"{app_type}.{app_class}.{graph_type}".encode("utf-8"))
if types_pb2.GAR in attr:
s.update(attr[types_pb2.GAR].s)
return s.hexdigest()


def get_graph_sha256(attr):
_, graph_class = _codegen_graph_info(attr)
return hashlib.sha256(graph_class.encode("utf-8")).hexdigest()


def compile_app(workspace: str, library_name, attr, engine_config: dict):
"""Compile an application.

Args:
workspace (str): working dir.
app_name (str): target app_name.
library_name (str): name of library
attr (`AttrValue`): All information needed to compile an app.
engine_config (dict): for options of experimental_on

Returns:
str: Path of the built library.
"""

app_dir = os.path.join(workspace, app_name)
app_dir = os.path.join(workspace, library_name)
os.makedirs(app_dir, exist_ok=True)

# extract gar content
_extract_gar(app_dir, attr)

# codegen app and graph info
# vd_type and md_type is None in cpp_pie
(
Expand All @@ -120,7 +146,7 @@ def compile_app(workspace: str, app_name: str, attr, engine_config: dict):
vd_type,
md_type,
pregel_combine,
) = _codegen_app_info(app_dir, DEFAULT_GS_CONFIG_FILE, attr)
) = _codegen_app_info(attr, DEFAULT_GS_CONFIG_FILE)
logger.info(
"Codegened application type: %s, app header: %s, app_class: %s, vd_type: %s, md_type: %s, pregel_combine: %s",
app_type,
Expand Down Expand Up @@ -171,7 +197,7 @@ def compile_app(workspace: str, app_name: str, attr, engine_config: dict):
content = template.read()
content = Template(content).safe_substitute(
_analytical_engine_home=ANALYTICAL_ENGINE_HOME,
_frame_name=app_name,
_frame_name=library_name,
_vd_type=vd_type,
_md_type=md_type,
_graph_type=graph_type,
Expand Down Expand Up @@ -208,45 +234,19 @@ def compile_app(workspace: str, app_name: str, attr, engine_config: dict):
make_stderr_watcher = PipeWatcher(make_process.stderr, sys.stdout)
setattr(make_process, "stderr_watcher", make_stderr_watcher)
make_process.wait()

return get_lib_path(app_dir, app_name)


def generate_graph_type_sig(attr: dict):
graph_type = attr[types_pb2.GRAPH_TYPE].graph_type

if graph_type == types_pb2.ARROW_PROPERTY:
oid_type = attr[types_pb2.OID_TYPE].s.decode("utf-8")
vid_type = attr[types_pb2.VID_TYPE].s.decode("utf-8")
graph_signature = "vineyard::ArrowFragment<{},{}>".format(oid_type, vid_type)
elif graph_type == types_pb2.ARROW_PROJECTED:
oid_type = attr[types_pb2.OID_TYPE].s.decode("utf-8")
vid_type = attr[types_pb2.VID_TYPE].s.decode("utf-8")
vdata_type = attr[types_pb2.V_DATA_TYPE].s.decode("utf-8")
edata_type = attr[types_pb2.E_DATA_TYPE].s.decode("utf-8")
graph_signature = "gs::ArrowProjectedFragment<{},{},{},{}>".format(
oid_type, vid_type, vdata_type, edata_type
)
elif graph_type == types_pb2.DYNAMIC_PROJECTED:
vdata_type = attr[types_pb2.V_DATA_TYPE].s.decode("utf-8")
edata_type = attr[types_pb2.E_DATA_TYPE].s.decode("utf-8")
graph_signature = "gs::DynamicProjectedFragment<{},{}>".format(
vdata_type, edata_type
)
else:
raise ValueError("Unsupported graph type: {}".format(graph_type))
return graph_signature
lib_path = get_lib_path(app_dir, library_name)
assert os.path.isfile(lib_path), "Error occurs when building the frame library."
return lib_path


def compile_graph_frame(
workspace: str, frame_name: str, attr: dict, engine_config: dict
):
def compile_graph_frame(workspace: str, library_name, attr: dict, engine_config: dict):
"""Compile an application.

Args:
workspace (str): Working dir.
frame_name (str): Target app_name.
library_name (str): name of library
attr (`AttrValue`): All information needed to compile a graph library.
engine_config (dict): for options of experimental_on

Raises:
ValueError: When graph_type is not supported.
Expand All @@ -255,14 +255,14 @@ def compile_graph_frame(
str: Path of the built graph library.
"""

frame_dir = os.path.join(workspace, frame_name)
os.makedirs(frame_dir, exist_ok=True)
_, graph_class = _codegen_graph_info(attr)

graph_signature = generate_graph_type_sig(attr)
logger.info("Codegened graph frame type: %s", graph_class)

logger.info("Codegened graph frame type: %s", graph_signature)
library_dir = os.path.join(workspace, library_name)
os.makedirs(library_dir, exist_ok=True)

os.chdir(frame_dir)
os.chdir(library_dir)

graph_type = attr[types_pb2.GRAPH_TYPE].graph_type

Expand All @@ -282,13 +282,13 @@ def compile_graph_frame(
raise ValueError("Illegal graph type: {}".format(graph_type))
# replace and generate cmakelist
cmakelists_file_tmp = os.path.join(TEMPLATE_DIR, "CMakeLists.template")
cmakelists_file = os.path.join(frame_dir, "CMakeLists.txt")
cmakelists_file = os.path.join(library_dir, "CMakeLists.txt")
with open(cmakelists_file_tmp, mode="r") as template:
content = template.read()
content = Template(content).safe_substitute(
_analytical_engine_home=ANALYTICAL_ENGINE_HOME,
_frame_name=frame_name,
_graph_type=graph_signature,
_frame_name=library_name,
_graph_type=graph_class,
)
with open(cmakelists_file, mode="w") as f:
f.write(content)
Expand Down Expand Up @@ -318,13 +318,13 @@ def compile_graph_frame(
make_stderr_watcher = PipeWatcher(make_process.stderr, sys.stdout)
setattr(make_process, "stderr_watcher", make_stderr_watcher)
make_process.wait()

return get_lib_path(frame_dir, frame_name)
lib_path = get_lib_path(library_dir, library_name)
assert os.path.isfile(lib_path), "Error occurs when building the frame library."
return lib_path


def _extract_gar(workspace: str, attr):
def _extract_gar(app_dir: str, attr):
"""Extract gar to workspace

Args:
workspace (str): Working directory
attr (`AttrValue`): Optionally it can contains the bytes of gar.
Expand All @@ -334,10 +334,10 @@ def _extract_gar(workspace: str, attr):
# if gar sent via bytecode in attr, overwrite.
fp = BytesIO(attr[types_pb2.GAR].s)
with zipfile.ZipFile(fp, "r") as zip_ref:
zip_ref.extractall(workspace)
zip_ref.extractall(app_dir)


def _codegen_app_info(workspace: str, meta_file: str, attr):
def _codegen_app_info(attr, meta_file: str):
"""Codegen application by instanize the template specialization.

Args:
Expand All @@ -352,11 +352,15 @@ def _codegen_app_info(workspace: str, meta_file: str, attr):
type: app_type
app class: for fulfilling the CMakelists.
"""
algo = attr[types_pb2.APP_ALGO].s.decode("utf-8")

with open(os.path.join(workspace, meta_file), "r") as f:
config_yaml = yaml.safe_load(f)
fp = BUILTIN_APP_RESOURCE_PATH # default is builtin app resources.
if types_pb2.GAR in attr:
# if gar sent via bytecode in attr, overwrite.
fp = BytesIO(attr[types_pb2.GAR].s)
with zipfile.ZipFile(fp, "r") as zip_ref:
with zip_ref.open(meta_file, "r") as f:
config_yaml = yaml.safe_load(f)

algo = attr[types_pb2.APP_ALGO].s.decode("utf-8")
for app in config_yaml["app"]:
if app["algo"] == algo:
app_type = app["type"] # cpp_pie or cython_pregel or cython_pie
Expand Down
2 changes: 2 additions & 0 deletions k8s/graphscope.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ ARG profile=release

COPY --from=builder /opt/graphscope /usr/local/
RUN cd /usr/local/dist/ && pip3 install ./*.whl
COPY --from=builder /root/gs/k8s/precompile.py /tmp/precompile.py
RUN python3 /tmp/precompile.py && rm /tmp/precompile.py

RUN mkdir -p /home/maxgraph
ENV VINEYARD_IPC_SOCKET /home/maxgraph/data/vineyard/vineyard.sock
Expand Down
Loading