Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix throwing errors from Executor.join_async() #208

Merged
3 commits merged into from
Nov 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions python/srf/_pysrf/src/executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,9 +191,15 @@ void Executor::stop()

void Executor::join()
{
// Release the GIL before blocking
py::gil_scoped_release nogil;
{
// Release the GIL before blocking
py::gil_scoped_release nogil;

// Wait without the GIL
m_join_future.wait();
}

// Call get() with the GIL to rethrow any exceptions
m_join_future.get();
}

Expand All @@ -207,6 +213,9 @@ std::shared_ptr<Awaitable> Executor::join_async()
// Grab the GIL to return a py::object
py::gil_scoped_acquire gil;

// Once we have the GIL, call get() to propagate any exceptions
this->m_join_future.get();

return py::none();
});

Expand Down
4 changes: 4 additions & 0 deletions python/srf/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,8 @@ srf_add_pybind11_module(test_edges_cpp
SOURCE_FILES test_edges.cpp
)

srf_add_pybind11_module(utils
SOURCE_FILES utils.cpp
)

list(POP_BACK CMAKE_MESSAGE_CONTEXT)
53 changes: 53 additions & 0 deletions python/srf/tests/utils.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/**
* SPDX-FileCopyrightText: Copyright (c) 2021-2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "pysrf/utils.hpp"

#include <pybind11/cast.h>
#include <pybind11/pybind11.h>

#include <stdexcept>

namespace srf::pytests {

namespace py = pybind11;

PYBIND11_MODULE(utils, module)
{
module.doc() = R"pbdoc()pbdoc";

pysrf::import(module, "srf");

module.def(
"throw_cpp_error",
[](std::string msg = "") {
if (msg.empty())
{
msg = "Exception from C++ code";
}

throw std::runtime_error(msg);
},
py::arg("msg") = "");

#ifdef VERSION_INFO
m.attr("__version__") = MACRO_STRINGIFY(VERSION_INFO);
#else
module.attr("__version__") = "dev";
#endif
}
} // namespace srf::pytests
187 changes: 187 additions & 0 deletions python/tests/test_executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
# SPDX-FileCopyrightText: Copyright (c) 2021-2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import asyncio
import typing

import pytest

import srf
from srf.tests.utils import throw_cpp_error


def pairwise(t):
it = iter(t)
return zip(it, it)


node_fn_type = typing.Callable[[srf.Builder], srf.SegmentObject]


@pytest.fixture
def source_pyexception():

def build(builder: srf.Builder):

def gen_data_and_raise():
yield 1
yield 2
yield 3

raise RuntimeError("Raised python error")

return builder.make_source("source", gen_data_and_raise)

return build


@pytest.fixture
def source_cppexception():

def build(builder: srf.Builder):

def gen_data_and_raise():
yield 1
yield 2
yield 3

throw_cpp_error()

return builder.make_source("source", gen_data_and_raise)

return build


@pytest.fixture
def sink():

def build(builder: srf.Builder):

def sink_on_next(data):
print("Got value: {}".format(data))

return builder.make_sink("sink", sink_on_next, None, None)

return build


@pytest.fixture
def build_pipeline():

def inner(*node_fns: node_fn_type):

def init_segment(builder: srf.Builder):

created_nodes = []

# Loop over node creation functions
for n in node_fns:
created_nodes.append(n(builder))

# For each pair, call make_edge
for source, sink in pairwise(created_nodes):
builder.make_edge(source, sink)

pipe = srf.Pipeline()

pipe.make_segment("TestSegment11", init_segment)

return pipe

return inner


build_pipeline_type = typing.Callable[[typing.Tuple[node_fn_type, ...]], srf.Pipeline]


@pytest.fixture
def build_executor():

def inner(pipe: srf.Pipeline):
options = srf.Options()

executor = srf.Executor(options)
executor.register_pipeline(pipe)

executor.start()

return executor

return inner


build_executor_type = typing.Callable[[srf.Pipeline], srf.Executor]


def test_pyexception_in_source(source_pyexception: node_fn_type,
sink: node_fn_type,
build_pipeline: build_pipeline_type,
build_executor: build_executor_type):

pipe = build_pipeline(source_pyexception, sink)

executor = build_executor(pipe)

with pytest.raises(RuntimeError):
executor.join()


def test_cppexception_in_source(source_cppexception: node_fn_type,
sink: node_fn_type,
build_pipeline: build_pipeline_type,
build_executor: build_executor_type):

pipe = build_pipeline(source_cppexception, sink)

executor = build_executor(pipe)

with pytest.raises(RuntimeError):
executor.join()


def test_pyexception_in_source_async(source_pyexception: node_fn_type,
sink: node_fn_type,
build_pipeline: build_pipeline_type,
build_executor: build_executor_type):

pipe = build_pipeline(source_pyexception, sink)

async def run_pipeline():
executor = build_executor(pipe)

with pytest.raises(RuntimeError):
await executor.join_async()

asyncio.run(run_pipeline())


def test_cppexception_in_source_async(source_cppexception: node_fn_type,
sink: node_fn_type,
build_pipeline: build_pipeline_type,
build_executor: build_executor_type):

pipe = build_pipeline(source_cppexception, sink)

async def run_pipeline():
executor = build_executor(pipe)

with pytest.raises(RuntimeError):
await executor.join_async()

asyncio.run(run_pipeline())


if (__name__ in ("__main__", )):
test_pyexception_in_source()