Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Python] Allow to run pure Python request middlewares inside a Tower service #1734

Merged
merged 40 commits into from
Sep 22, 2022
Merged
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
d1d74fe
Initial runtime support for middlewares
crisidev Sep 14, 2022
da9e052
Cleanup
crisidev Sep 14, 2022
aeeaa89
Codegenerate middleware support
crisidev Sep 16, 2022
e20563e
Support responses, errors and header updating in middleware chain
crisidev Sep 16, 2022
583c967
Refactor and make errors consistently working
crisidev Sep 16, 2022
047401c
Fix the ability of changing the request between middlewares
crisidev Sep 16, 2022
88bbd11
Merge branch 'main' into crisidev/oxipy-middleware
crisidev Sep 16, 2022
aa2dc0a
Remove unused errors
crisidev Sep 16, 2022
5e784f6
Refactor
crisidev Sep 16, 2022
03c660b
Remove trait
crisidev Sep 16, 2022
28e3c09
Add testing of middleware handlers
crisidev Sep 17, 2022
f3e21de
Add end to end test of the service
crisidev Sep 17, 2022
84cc06b
Add end to end test of the layer
crisidev Sep 17, 2022
78a6b6f
Merge branch 'main' into crisidev/oxipy-middleware
crisidev Sep 17, 2022
38d971c
Remove useless dependency
crisidev Sep 17, 2022
cf7521d
Remove another useless dependency
crisidev Sep 17, 2022
80cdbfc
Idiomatic logging refactoring
crisidev Sep 17, 2022
e79797f
Enable back logging tests
crisidev Sep 17, 2022
b08d331
Make clippy happy
crisidev Sep 19, 2022
959b02c
Merge branch 'main' into crisidev/oxipy-middleware
crisidev Sep 19, 2022
8415503
Another nudge for clippy happyness
crisidev Sep 19, 2022
37036b8
Clippy again
crisidev Sep 19, 2022
1d51bb8
Span needs to be only available for not tests
crisidev Sep 20, 2022
2a00ec2
Fix integration tests
crisidev Sep 20, 2022
5e6bfe2
Merge branch 'main' into crisidev/oxipy-middleware
crisidev Sep 20, 2022
c3c2d65
Merge branch 'main' into crisidev/oxipy-middleware
crisidev Sep 20, 2022
e75d2db
Add documentation and examples
crisidev Sep 20, 2022
1cddc95
Fix test
crisidev Sep 20, 2022
324e30f
Fix kotlin linting
crisidev Sep 20, 2022
34c55f1
Reword middleware to explicitly tell we only support requests so far
crisidev Sep 20, 2022
fb58bdf
Update changelog
crisidev Sep 20, 2022
e06d7d7
Merge branch 'main' into crisidev/oxipy-middleware
crisidev Sep 20, 2022
8f98141
Apply suggestions from code review
crisidev Sep 21, 2022
964a470
Merge branch 'main' into crisidev/oxipy-middleware
crisidev Sep 21, 2022
4ee2743
Refactor logging for a more idiomatic experience
crisidev Sep 21, 2022
b9a8e3b
Merge branch 'main' into crisidev/oxipy-middleware
crisidev Sep 22, 2022
433c162
Remove useless dependency
crisidev Sep 22, 2022
747b6c0
Merge branch 'main' into crisidev/oxipy-middleware
crisidev Sep 22, 2022
da065a4
Fix documentation
crisidev Sep 22, 2022
6c1f9ab
Merge branch 'main' into crisidev/oxipy-middleware
crisidev Sep 22, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ object PythonServerCargoDependency {
val PyO3Asyncio: CargoDependency = CargoDependency("pyo3-asyncio", CratesIo("0.16"), features = setOf("attributes", "tokio-runtime"))
val Tokio: CargoDependency = CargoDependency("tokio", CratesIo("1.20.1"), features = setOf("full"))
val Tracing: CargoDependency = CargoDependency("tracing", CratesIo("0.1"))
val TracingAppender: CargoDependency = CargoDependency("tracing-appender", CratesIo("0.2"))
val Tower: CargoDependency = CargoDependency("tower", CratesIo("0.4"))
val TowerHttp: CargoDependency = CargoDependency("tower-http", CratesIo("0.3"), features = setOf("trace"))
val Hyper: CargoDependency = CargoDependency("hyper", CratesIo("0.14.12"), features = setOf("server", "http1", "http2", "tcp", "stream"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import software.amazon.smithy.rust.codegen.server.smithy.ServerCargoDependency
* Example:
* from pool import DatabasePool
* from my_library import App, OperationInput, OperationOutput

* @dataclass
* class Context:
* db = DatabasePool()
Expand Down Expand Up @@ -69,6 +68,7 @@ class PythonApplicationGenerator(
private val libName = "lib${coreCodegenContext.settings.moduleName.toSnakeCase()}"
private val runtimeConfig = coreCodegenContext.runtimeConfig
private val model = coreCodegenContext.model
private val protocol = coreCodegenContext.protocol
private val codegenScope =
arrayOf(
"SmithyPython" to PythonServerCargoDependency.SmithyHttpServerPython(runtimeConfig).asType(),
Expand All @@ -77,6 +77,7 @@ class PythonApplicationGenerator(
"pyo3_asyncio" to PythonServerCargoDependency.PyO3Asyncio.asType(),
"tokio" to PythonServerCargoDependency.Tokio.asType(),
"tracing" to PythonServerCargoDependency.Tracing.asType(),
"tracing_appender" to PythonServerCargoDependency.TracingAppender.asType(),
"tower" to PythonServerCargoDependency.Tower.asType(),
"tower_http" to PythonServerCargoDependency.TowerHttp.asType(),
"num_cpus" to PythonServerCargoDependency.NumCpus.asType(),
Expand All @@ -101,8 +102,11 @@ class PythonApplicationGenerator(
##[derive(Debug, Default)]
pub struct App {
handlers: #{HashMap}<String, #{SmithyPython}::PyHandler>,
middlewares: #{SmithyPython}::PyMiddlewares,
context: Option<#{pyo3}::PyObject>,
workers: #{parking_lot}::Mutex<Vec<#{pyo3}::PyObject>>,
_tracing_guard: Option<#{tracing_appender}::non_blocking::WorkerGuard>,
logfile: Option<std::path::PathBuf>
}
""",
*codegenScope,
Expand All @@ -116,8 +120,11 @@ class PythonApplicationGenerator(
fn clone(&self) -> Self {
Self {
handlers: self.handlers.clone(),
middlewares: self.middlewares.clone(),
context: self.context.clone(),
workers: #{parking_lot}::Mutex::new(vec![]),
_tracing_guard: None,
logfile: self.logfile.clone()
}
}
}
Expand Down Expand Up @@ -151,7 +158,7 @@ class PythonApplicationGenerator(
val name = operationName.toSnakeCase()
rustTemplate(
"""
let ${name}_locals = pyo3_asyncio::TaskLocals::new(event_loop);
let ${name}_locals = #{pyo3_asyncio}::TaskLocals::new(event_loop);
let handler = self.handlers.get("$name").expect("Python handler for operation `$name` not found").clone();
let router = router.$name(move |input, state| {
#{pyo3_asyncio}::tokio::scope(${name}_locals, crate::operation_handler::$name(input, state, handler))
Expand All @@ -162,11 +169,20 @@ class PythonApplicationGenerator(
}
rustTemplate(
"""
let middleware_locals = pyo3_asyncio::TaskLocals::new(event_loop);
use #{SmithyPython}::PyApp;
let service = #{tower}::ServiceBuilder::new().layer(
#{SmithyPython}::PyMiddlewareLayer::new(
self.middlewares.clone(),
self.protocol(),
middleware_locals
)?,
);
let router: #{SmithyServer}::Router = router
.build()
.expect("Unable to build operation registry")
.into();
Ok(router)
Ok(router.layer(service))
""",
*codegenScope,
)
Expand All @@ -175,20 +191,25 @@ class PythonApplicationGenerator(
}

private fun renderPyAppTrait(writer: RustWriter) {
val protocol = protocol.toString().replace("#", "##")
writer.rustTemplate(
"""
impl #{SmithyPython}::PyApp for App {
fn workers(&self) -> &#{parking_lot}::Mutex<Vec<#{pyo3}::PyObject>> {
&self.workers
}

fn context(&self) -> &Option<#{pyo3}::PyObject> {
&self.context
}

fn handlers(&mut self) -> &mut #{HashMap}<String, #{SmithyPython}::PyHandler> {
&mut self.handlers
}
fn middlewares(&mut self) -> &mut #{SmithyPython}::PyMiddlewares {
&mut self.middlewares
}
fn protocol(&self) -> &'static str {
"$protocol"
}
}
""",
*codegenScope,
Expand All @@ -207,16 +228,26 @@ class PythonApplicationGenerator(
"""
/// Create a new [App].
##[new]
pub fn new(py: #{pyo3}::Python, log_level: Option<#{SmithyPython}::LogLevel>) -> #{pyo3}::PyResult<Self> {
let log_level = log_level.unwrap_or(#{SmithyPython}::LogLevel::Info);
#{SmithyPython}::logging::setup(py, log_level)?;
Ok(Self::default())
pub fn new(logfile: Option<&#{pyo3}::PyAny>) -> #{pyo3}::PyResult<Self> {
let logfile = if let Some(logfile) = logfile {
let logfile = logfile.extract::<&str>()?;
Some(std::path::Path::new(logfile).to_path_buf())
} else {
None
};
Ok(Self { logfile, ..Default::default() })
}
/// Register a context object that will be shared between handlers.
##[pyo3(text_signature = "(${'$'}self, context)")]
pub fn context(&mut self, context: #{pyo3}::PyObject) {
self.context = Some(context);
}
/// Register a request middleware function that will be run inside a Tower layer, without cloning the body.
##[pyo3(text_signature = "(${'$'}self, func)")]
pub fn request_middleware(&mut self, py: #{pyo3}::Python, func: #{pyo3}::PyObject) -> #{pyo3}::PyResult<()> {
use #{SmithyPython}::PyApp;
self.register_middleware(py, func, #{SmithyPython}::PyMiddlewareType::Request)
}
/// Main entrypoint: start the server on multiple workers.
##[pyo3(text_signature = "(${'$'}self, address, port, backlog, workers)")]
pub fn run(
Expand All @@ -235,10 +266,11 @@ class PythonApplicationGenerator(
pub fn start_worker(
&mut self,
py: pyo3::Python,
socket: &pyo3::PyCell<aws_smithy_http_server_python::PySocket>,
socket: &pyo3::PyCell<#{SmithyPython}::PySocket>,
worker_number: isize,
) -> pyo3::PyResult<()> {
use #{SmithyPython}::PyApp;
self._tracing_guard = #{SmithyPython}::logging::setup_tracing(py, self.logfile.as_ref())?;
let event_loop = self.configure_python_event_loop(py)?;
let router = self.build_router(event_loop)?;
self.start_hyper_worker(py, socket, event_loop, router, worker_number)
Expand Down Expand Up @@ -280,21 +312,17 @@ class PythonApplicationGenerator(
""".trimIndent(),
)
writer.rust(
if (operations.any { it.errors.isNotEmpty() }) {
"""
/// from $libName import ${Inputs.namespace}
/// from $libName import ${Outputs.namespace}
/// from $libName import ${Errors.namespace}
""".trimIndent()
} else {
"""
/// from $libName import ${Inputs.namespace}
/// from $libName import ${Outputs.namespace}
""".trimIndent()
},
"""
/// from $libName import ${Inputs.namespace}
/// from $libName import ${Outputs.namespace}
""".trimIndent(),
)
if (operations.any { it.errors.isNotEmpty() }) {
writer.rust("""/// from $libName import ${Errors.namespace}""".trimIndent())
}
writer.rust(
"""
/// from $libName import middleware
/// from $libName import App
///
/// @dataclass
Expand All @@ -304,6 +332,11 @@ class PythonApplicationGenerator(
/// app = App()
/// app.context(Context())
///
/// @app.request_middleware
/// def request_middleware(request: middleware::Request):
/// if request.get_header("x-amzn-id") != "secret":
/// raise middleware.MiddlewareException("Unsupported `x-amz-id` header", 401)
///
""".trimIndent(),
)
writer.operationImplementationStubs(operations)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ class PythonServerModuleGenerator(
renderPyCodegeneratedTypes()
renderPyWrapperTypes()
renderPySocketType()
renderPyLogging()
renderPyMiddlewareTypes()
renderPyApplicationType()
}
}
Expand Down Expand Up @@ -125,6 +127,43 @@ class PythonServerModuleGenerator(
)
}

// Render Python shared socket type.
private fun RustWriter.renderPyLogging() {
rustTemplate(
"""
let logging = #{pyo3}::types::PyModule::new(py, "logging")?;
logging.add_function(#{pyo3}::wrap_pyfunction!(#{SmithyPython}::py_tracing_event, m)?)?;
logging.add_class::<#{SmithyPython}::PyTracingHandler>()?;
#{pyo3}::py_run!(
py,
logging,
"import sys; sys.modules['$libName.logging'] = logging"
);
m.add_submodule(logging)?;
""",
*codegenScope,
)
}

private fun RustWriter.renderPyMiddlewareTypes() {
rustTemplate(
"""
let middleware = #{pyo3}::types::PyModule::new(py, "middleware")?;
middleware.add_class::<#{SmithyPython}::PyRequest>()?;
middleware.add_class::<#{SmithyPython}::PyResponse>()?;
middleware.add_class::<#{SmithyPython}::PyMiddlewareException>()?;
middleware.add_class::<#{SmithyPython}::PyHttpVersion>()?;
pyo3::py_run!(
py,
middleware,
"import sys; sys.modules['libpokemon_service_server_sdk.middleware'] = middleware"
crisidev marked this conversation as resolved.
Show resolved Hide resolved
);
m.add_submodule(middleware)?;
""",
*codegenScope,
)
}

// Render Python application type.
private fun RustWriter.renderPyApplicationType() {
rustTemplate(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,20 @@ class PythonServerOperationHandlerGenerator(
handler: #{SmithyPython}::PyHandler,
) -> std::result::Result<$output, $error> {
// Async block used to run the handler and catch any Python error.
let span = #{tracing}::span!(
#{tracing}::Level::TRACE, "python",
pid = #{tracing}::field::Empty,
module = #{tracing}::field::Empty,
filename = #{tracing}::field::Empty,
lineno = #{tracing}::field::Empty
);
let guard = span.enter();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is handy if you prefer this style https://docs.rs/tracing/latest/tracing/struct.Span.html#method.entered

let result = if handler.is_coroutine {
#{PyCoroutine:W}
crisidev marked this conversation as resolved.
Show resolved Hide resolved
} else {
#{PyFunction:W}
};
drop(guard);
#{PyError:W}
}
""",
Expand All @@ -90,16 +99,14 @@ class PythonServerOperationHandlerGenerator(
rustTemplate(
"""
#{tracing}::debug!("Executing Python handler function `$name()`");
#{tokio}::task::block_in_place(move || {
#{pyo3}::Python::with_gil(|py| {
let pyhandler: &#{pyo3}::types::PyFunction = handler.extract(py)?;
let output = if handler.args == 1 {
pyhandler.call1((input,))?
} else {
pyhandler.call1((input, state.0))?
};
output.extract::<$output>()
})
#{pyo3}::Python::with_gil(|py| {
let pyhandler: &#{pyo3}::types::PyFunction = handler.extract(py)?;
let output = if handler.args == 1 {
pyhandler.call1((input,))?
} else {
pyhandler.call1((input, state.0))?
};
output.extract::<$output>()
Comment on lines +93 to +100
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason for removing tokio::task::block_in_place is that we are still bound to the mutex on the GIL that is taken inside the Python::with_gil closure.

No matter what we do, this synchronous mutex will not allow tokio to continue the execution.

Removing block_in_place yields a significant performance improvement since it removes the overhead of scheduling new tasks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other frameworks using the same approach as Smithy-rs Python are also doing the same: sparckles/Robyn@4ef01e6

})
""",
*codegenScope,
Expand Down
10 changes: 8 additions & 2 deletions rust-runtime/aws-smithy-http-server-python/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,27 +13,33 @@ Python server runtime for Smithy Rust Server Framework.
publish = true

[dependencies]
aws-smithy-http = { path = "../aws-smithy-http" }
aws-smithy-http-server = { path = "../aws-smithy-http-server" }
aws-smithy-json = { path = "../aws-smithy-json" }
aws-smithy-types = { path = "../aws-smithy-types" }
aws-smithy-http = { path = "../aws-smithy-http" }
aws-smithy-xml = { path = "../aws-smithy-xml" }
bytes = "1.2"
futures = "0.3"
http = "0.2"
hyper = { version = "0.14.20", features = ["server", "http1", "http2", "tcp", "stream"] }
num_cpus = "1.13.1"
parking_lot = "0.12.1"
pin-project-lite = "0.2"
pyo3 = "0.16.5"
pyo3-asyncio = { version = "0.16.0", features = ["tokio-runtime"] }
signal-hook = { version = "0.3.14", features = ["extended-siginfo"] }
socket2 = { version = "0.4.4", features = ["all"] }
thiserror = "1.0.32"
tokio = { version = "1.20.1", features = ["full"] }
tokio-stream = "0.1"
tower = "0.4.13"
tower = { version = "0.4.13", features = ["util"] }
tracing = "0.1.36"
tracing-subscriber = { version = "0.3.15", features = ["env-filter"] }
tracing-appender = { version = "0.2.2"}

[dev-dependencies]
pretty_assertions = "1"
futures-util = "0.3"

[package.metadata.docs.rs]
all-features = true
Expand Down
Loading