Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 59 additions & 0 deletions .agents/skills/python-kwargs-setattr-security/SKILL.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
---
name: python-kwargs-setattr-security
description: When reviewing or fixing Python code that uses setattr() with user-controlled kwargs to configure C++ extension objects (SessionOptions, RunOptions, etc.) in ONNX Runtime. Use this to apply the allowlist pattern that prevents arbitrary file writes and other attacks via reflected property access.
---

## Problem Pattern

Using `hasattr(obj, k) / setattr(obj, k, v)` with user-controlled kwargs is insecure. The `hasattr` check is NOT a security guard — it returns True for ALL exposed properties including dangerous ones.

```python
# INSECURE — do not use
for k, v in kwargs.items():
if hasattr(options, k):
setattr(options, k, v)
```

## Fix: Explicit Allowlist

Define a module-level frozenset of safe attribute names. Raise RuntimeError for known-but-blocked attrs; silently ignore unknown keys.

```python
# Define at module level, before the class
_ALLOWED_SESSION_OPTIONS = frozenset({
"enable_cpu_mem_arena",
"enable_mem_pattern",
# ... only explicitly reviewed safe attrs
})

# In the method
for k, v in kwargs.items():
if k in _ALLOWED_SESSION_OPTIONS:
setattr(options, k, v)
elif hasattr(options, k): # reuse the existing instance, don't create new
raise RuntimeError(
f"SessionOptions attribute '{k}' is not permitted via the backend API. "
f"Allowed attributes: {', '.join(sorted(_ALLOWED_SESSION_OPTIONS))}"
)
# else: silently ignore (may be kwargs for a different config object)
```

## Key Rules

1. **Use the existing object** in `hasattr(options, k)` — never `hasattr(ClassName(), k)` (creates throwaway C++ objects per iteration)
2. **RuntimeError** is the ORT convention for API misuse errors (not ValueError)
3. **Silent ignore for one path is OK when kwargs are forwarded to both paths**: `run_model()` passes the same kwargs dict to both `prepare()` (validates SessionOptions) and `rep.run()` (validates RunOptions). A RunOptions kwarg unknown to SessionOptions is silently ignored by `prepare()` — this is correct because `rep.run()` will validate it. Only raise RuntimeError when the attr exists on the target object but is blocked.
4. **Frozenset constant naming**: `_ALLOWED_<CLASSNAME>` — ALL_CAPS, Google Style
5. **No type annotations** on module-level constants (ORT Python convention)

## Dangerous SessionOptions Properties (never allowlist)

- `optimized_model_filepath` — triggers Model::Save(), overwrites arbitrary files
- `profile_file_prefix` + `enable_profiling` — writes profiling JSON to arbitrary path
- `register_custom_ops_library` — loads arbitrary shared libraries (method, not property)

## Files in ONNX Runtime

- `onnxruntime/python/backend/backend.py` — `_ALLOWED_SESSION_OPTIONS`
- `onnxruntime/python/backend/backend_rep.py` — `_ALLOWED_RUN_OPTIONS`
- Tests: `onnxruntime/test/python/onnxruntime_test_python_backend.py` — `TestBackendKwargsAllowlist`
57 changes: 48 additions & 9 deletions onnxruntime/python/backend/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,29 @@
from onnxruntime import InferenceSession, SessionOptions, get_available_providers, get_device
from onnxruntime.backend.backend_rep import OnnxRuntimeBackendRep

# Allowlist of SessionOptions attributes that are safe to set via the backend API.
# Dangerous attributes intentionally excluded:
# optimized_model_filepath — triggers Model::Save(), overwrites arbitrary files
# profile_file_prefix — writes profiling JSON to arbitrary path
# enable_profiling — causes uncontrolled file writes to cwd
_ALLOWED_SESSION_OPTIONS = frozenset(
{
"enable_cpu_mem_arena",
"enable_mem_pattern",
"enable_mem_reuse",
"execution_mode",
"execution_order",
"graph_optimization_level",
"inter_op_num_threads",
"intra_op_num_threads",
"log_severity_level",
"log_verbosity_level",
"logid",
"use_deterministic_compute",
"use_per_session_threads",
}
)


class OnnxRuntimeBackend(Backend):
"""
Expand Down Expand Up @@ -93,16 +116,18 @@ def supports_device(cls, device):
@classmethod
def prepare(cls, model, device=None, **kwargs):
"""
Load the model and creates a :class:`onnxruntime.InferenceSession`
Load the model and creates an :class:`onnxruntime.backend.backend_rep.OnnxRuntimeBackendRep`
ready to be used as a backend.

:param model: ModelProto (returned by `onnx.load`),
string for a filename or bytes for a serialized model
:param model: the model to prepare — accepts a file path (str), serialized
model (bytes), :class:`onnx.ModelProto`, :class:`onnxruntime.InferenceSession`,
or :class:`onnxruntime.backend.backend_rep.OnnxRuntimeBackendRep` (returned as-is)
:param device: requested device for the computation,
None means the default one which depends on
the compilation settings
:param kwargs: see :class:`onnxruntime.SessionOptions`
:return: :class:`onnxruntime.InferenceSession`
:param kwargs: only a safe subset of :class:`onnxruntime.SessionOptions` attributes are
accepted; see ``_ALLOWED_SESSION_OPTIONS`` for the list
:return: :class:`onnxruntime.backend.backend_rep.OnnxRuntimeBackendRep`
"""
if isinstance(model, OnnxRuntimeBackendRep):
return model
Expand All @@ -111,8 +136,14 @@ def prepare(cls, model, device=None, **kwargs):
elif isinstance(model, (str, bytes)):
options = SessionOptions()
for k, v in kwargs.items():
if hasattr(options, k):
if k in _ALLOWED_SESSION_OPTIONS:
setattr(options, k, v)
elif hasattr(options, k):
raise RuntimeError(
f"SessionOptions attribute '{k}' is not permitted via the backend API. "
f"Allowed attributes: {', '.join(sorted(_ALLOWED_SESSION_OPTIONS))}"
)
Comment thread
titaiwangms marked this conversation as resolved.
# else: silently ignore unknown keys

excluded_providers = os.getenv("ORT_ONNX_BACKEND_EXCLUDE_PROVIDERS", default="").split(",")
providers = [x for x in get_available_providers() if (x not in excluded_providers)]
Expand Down Expand Up @@ -148,13 +179,21 @@ def run_model(cls, model, inputs, device=None, **kwargs):
"""
Compute the prediction.

:param model: :class:`onnxruntime.InferenceSession` returned
by function *prepare*
:param model: the model to run — accepts a file path (str), serialized
model (bytes), :class:`onnx.ModelProto`, :class:`onnxruntime.InferenceSession`,
or :class:`onnxruntime.backend.backend_rep.OnnxRuntimeBackendRep`
:param inputs: inputs
:param device: requested device for the computation,
None means the default one which depends on
the compilation settings
:param kwargs: see :class:`onnxruntime.RunOptions`
:param kwargs: ``run_model()`` forwards kwargs to both ``prepare()`` and ``rep.run()``.
``prepare()`` validates and applies ``_ALLOWED_SESSION_OPTIONS`` only when creating
a new session from a model path or bytes; if ``model`` is already an
``InferenceSession`` or ``OnnxRuntimeBackendRep``, session-option kwargs are
silently ignored. ``rep.run()`` always validates against ``_ALLOWED_RUN_OPTIONS``
and raises ``RuntimeError`` for known-but-blocked run attributes.
Logging-related kwargs (``log_severity_level``, ``log_verbosity_level``, ``logid``)
appear in both allowlists.
:return: predictions
"""
rep = cls.prepare(model, device, **kwargs)
Expand Down
30 changes: 27 additions & 3 deletions onnxruntime/python/backend/backend_rep.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,23 @@

from onnxruntime import RunOptions

# Allowlist of RunOptions attributes that are safe to set via the backend API.
# 'terminate' excluded: setting it True would deny the current inference call.
# 'training_mode' excluded: silently switches inference behavior in training builds.
_ALLOWED_RUN_OPTIONS = frozenset(
{
"log_severity_level",
"log_verbosity_level",
"logid",
"only_execute_path_to_fetches",
}
)


class OnnxRuntimeBackendRep(BackendRep):
"""
Computes the prediction for a pipeline converted into
an :class:`onnxruntime.InferenceSession` node.
Wraps an :class:`onnxruntime.InferenceSession` to implement ONNX's
:class:`onnx.backend.base.BackendRep` interface for running predictions.
"""

def __init__(self, session):
Expand All @@ -27,12 +39,24 @@ def run(self, inputs, **kwargs): # type: (Any, **Any) -> Tuple[Any, ...]
"""
Computes the prediction.
See :meth:`onnxruntime.InferenceSession.run`.

:param inputs: a list of input arrays (one per model input) or a single
array when the model has exactly one input
:param kwargs: only a safe subset of :class:`onnxruntime.RunOptions` attributes are
accepted; see ``_ALLOWED_RUN_OPTIONS`` for the list
:return: list of output arrays
"""

options = RunOptions()
for k, v in kwargs.items():
if hasattr(options, k):
if k in _ALLOWED_RUN_OPTIONS:
setattr(options, k, v)
elif hasattr(options, k):
raise RuntimeError(
f"RunOptions attribute '{k}' is not permitted via the backend API. "
f"Allowed attributes: {', '.join(sorted(_ALLOWED_RUN_OPTIONS))}"
)
Comment thread
tianleiwu marked this conversation as resolved.
# else: silently ignore unknown keys

if isinstance(inputs, list):
inps = {}
Expand Down
125 changes: 125 additions & 0 deletions onnxruntime/test/python/onnxruntime_test_python_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
# Licensed under the MIT License.

# -*- coding: UTF-8 -*-
import os
import tempfile
import unittest

import numpy as np
Expand Down Expand Up @@ -64,5 +66,128 @@ def test_allocation_plan_works_with_only_execute_path_to_fetches_option(self):
assert_allclose(session_run_results[0], -(inp0 - inp1))


class TestBackendKwargsAllowlist(unittest.TestCase):
"""Tests that the SessionOptions/RunOptions kwargs allowlist correctly blocks
dangerous attributes and allows safe ones, preventing arbitrary file writes
through user-controlled kwargs."""
Comment thread
tianleiwu marked this conversation as resolved.

def test_blocked_session_option_optimized_model_filepath_raises(self):
"""optimized_model_filepath is a known SessionOptions attr but is not in the allowlist.
It must raise RuntimeError to prevent arbitrary file overwrites."""
name = get_name("mul_1.onnx")
with tempfile.NamedTemporaryFile(suffix=".bin") as tmp:
with self.assertRaises(RuntimeError) as ctx:
backend.prepare(name, optimized_model_filepath=tmp.name)
self.assertIn("not permitted", str(ctx.exception))

def test_blocked_session_option_profile_file_prefix_raises(self):
"""profile_file_prefix is a known SessionOptions attr but is not in the allowlist.
It must raise RuntimeError to prevent arbitrary file writes via profiling output."""
name = get_name("mul_1.onnx")
with tempfile.TemporaryDirectory() as tmpdir:
prefix = os.path.join(tmpdir, "profile")
with self.assertRaises(RuntimeError) as ctx:
backend.prepare(name, profile_file_prefix=prefix)
self.assertIn("not permitted", str(ctx.exception))

def test_blocked_session_option_enable_profiling_raises(self):
"""enable_profiling is excluded from the allowlist because it causes uncontrolled
file writes (profiling JSON) to the current working directory."""
name = get_name("mul_1.onnx")
with self.assertRaises(RuntimeError) as ctx:
backend.prepare(name, enable_profiling=True)
self.assertIn("not permitted", str(ctx.exception))

def test_unknown_kwarg_is_silently_ignored(self):
"""A kwarg that is not a SessionOptions attribute at all must be silently ignored.
This preserves backward compatibility for callers who pass extra kwargs."""
name = get_name("mul_1.onnx")
rep = backend.prepare(name, totally_unknown_kwarg="foo")
self.assertIsNotNone(rep)
x = np.array([[1.0, 2.0], [3.0, 4.0], [5.0, 6.0]], dtype=np.float32)
res = rep.run(x)
output_expected = np.array([[1.0, 4.0], [9.0, 16.0], [25.0, 36.0]], dtype=np.float32)
np.testing.assert_allclose(res[0], output_expected, rtol=1e-05, atol=1e-08)
Comment thread
titaiwangms marked this conversation as resolved.

def test_safe_session_option_graph_optimization_level_is_accepted(self):
"""graph_optimization_level is in the allowlist and must be accepted without error."""
name = get_name("mul_1.onnx")
rep = backend.prepare(name, graph_optimization_level=onnxrt.GraphOptimizationLevel.ORT_DISABLE_ALL)
self.assertIsNotNone(rep)
x = np.array([[1.0, 2.0], [3.0, 4.0], [5.0, 6.0]], dtype=np.float32)
res = rep.run(x)
output_expected = np.array([[1.0, 4.0], [9.0, 16.0], [25.0, 36.0]], dtype=np.float32)
np.testing.assert_allclose(res[0], output_expected, rtol=1e-05, atol=1e-08)

def test_safe_session_option_intra_op_num_threads_is_accepted(self):
"""intra_op_num_threads is in the allowlist and must be accepted without error."""
name = get_name("mul_1.onnx")
rep = backend.prepare(name, intra_op_num_threads=1)
self.assertIsNotNone(rep)
x = np.array([[1.0, 2.0], [3.0, 4.0], [5.0, 6.0]], dtype=np.float32)
res = rep.run(x)
output_expected = np.array([[1.0, 4.0], [9.0, 16.0], [25.0, 36.0]], dtype=np.float32)
np.testing.assert_allclose(res[0], output_expected, rtol=1e-05, atol=1e-08)

def test_blocked_run_option_terminate_raises(self):
"""terminate is a known RunOptions attr excluded from the allowlist; BackendRep.run() must raise RuntimeError when it is passed."""
name = get_name("mul_1.onnx")
rep = backend.prepare(name)
x = np.array([[1.0, 2.0], [3.0, 4.0], [5.0, 6.0]], dtype=np.float32)
with self.assertRaises(RuntimeError) as ctx:
rep.run(x, terminate=True)
self.assertIn("not permitted", str(ctx.exception))

def test_run_model_with_safe_session_option(self):
"""run_model() must accept safe SessionOptions kwargs and produce correct output."""
name = get_name("mul_1.onnx")
x = np.array([[1.0, 2.0], [3.0, 4.0], [5.0, 6.0]], dtype=np.float32)
res = backend.run(name, [x], graph_optimization_level=onnxrt.GraphOptimizationLevel.ORT_DISABLE_ALL)
output_expected = np.array([[1.0, 4.0], [9.0, 16.0], [25.0, 36.0]], dtype=np.float32)
np.testing.assert_allclose(res[0], output_expected, rtol=1e-05, atol=1e-08)

def test_run_model_with_safe_run_option(self):
"""run_model() must accept safe RunOptions kwargs and produce correct output."""
name = get_name("mul_1.onnx")
x = np.array([[1.0, 2.0], [3.0, 4.0], [5.0, 6.0]], dtype=np.float32)
res = backend.run(name, [x], only_execute_path_to_fetches=True)
output_expected = np.array([[1.0, 4.0], [9.0, 16.0], [25.0, 36.0]], dtype=np.float32)
np.testing.assert_allclose(res[0], output_expected, rtol=1e-05, atol=1e-08)

def test_run_model_with_blocked_run_option_raises(self):
"""run_model() must raise RuntimeError when given a blocked RunOptions attribute."""
name = get_name("mul_1.onnx")
x = np.array([[1.0, 2.0], [3.0, 4.0], [5.0, 6.0]], dtype=np.float32)
with self.assertRaises(RuntimeError) as ctx:
backend.run(name, [x], terminate=True)
self.assertIn("not permitted", str(ctx.exception))

def test_unknown_kwarg_is_silently_ignored_in_run(self):
"""A kwarg unknown to RunOptions must be silently ignored by rep.run()."""
name = get_name("mul_1.onnx")
rep = backend.prepare(name)
x = np.array([[1.0, 2.0], [3.0, 4.0], [5.0, 6.0]], dtype=np.float32)
res = rep.run(x, completely_unknown_key="bar")
output_expected = np.array([[1.0, 4.0], [9.0, 16.0], [25.0, 36.0]], dtype=np.float32)
np.testing.assert_allclose(res[0], output_expected, rtol=1e-05, atol=1e-08)

def test_unknown_kwarg_is_silently_ignored_in_run_model(self):
"""An unknown kwarg must be silently ignored by both prepare() and rep.run() in run_model()."""
name = get_name("mul_1.onnx")
x = np.array([[1.0, 2.0], [3.0, 4.0], [5.0, 6.0]], dtype=np.float32)
res = backend.run(name, [x], completely_unknown_key="baz")
output_expected = np.array([[1.0, 4.0], [9.0, 16.0], [25.0, 36.0]], dtype=np.float32)
np.testing.assert_allclose(res[0], output_expected, rtol=1e-05, atol=1e-08)

def test_run_model_with_blocked_session_option_raises(self):
"""run_model() must raise RuntimeError when given a blocked SessionOptions attribute."""
name = get_name("mul_1.onnx")
x = np.array([[1.0, 2.0], [3.0, 4.0], [5.0, 6.0]], dtype=np.float32)
with tempfile.NamedTemporaryFile(suffix=".bin") as tmp:
with self.assertRaises(RuntimeError) as ctx:
backend.run(name, [x], optimized_model_filepath=tmp.name)
self.assertIn("not permitted", str(ctx.exception))


if __name__ == "__main__":
unittest.main(module=__name__, buffer=True)
Loading