Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[submodule "cedar"]
path = third_party/cedar
url = https://github.com/cedar-policy/cedar.git
11 changes: 11 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,17 @@ quick:
maturin develop ;\
pytest

submodule-cedar: third_party/cedar/cedar-integration-tests/
git submodule update --init --recursive

submodules: submodule-cedar

.PHONY: integration-tests
integration-tests: submodules
@echo Running integration tests
@echo Running official Cedar integration test cases
set -e ;\
pytest tests/integration/test_cedar_integration_tests.py

.PHONY: release
release:
Expand Down
16 changes: 14 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,20 @@ tests/unit/test_import_module.py::InvokeModuleTestFunctionTestCase::test_invoke_
================================================================================================ 10 passed in 0.51s =================================================================================================
```

### Integration tests
This project supports validating correctness with official Cedar integration tests. To run those tests you'll need to retrieve the `cedar-integration-tests` data with:

```shell
make submodules
```

Then you can run:
```shell
make integration-tests
```

`cedar-py` currently passes 46 of the 50 'example_use_cases_doc' tests. We will support executing more tests shortly. See [test_cedar_integration_tests.py](tests/integration/test_cedar_integration_tests.py) for details.

## Using the library
Releases of `cedarpolicy` will be available on PyPi soon. For now, if you'd like to use the library, you can build a release locally and install it with `pip`.

Expand Down Expand Up @@ -125,8 +139,6 @@ authz_resp: dict = cedarpolicy.is_authorized(request, policies, entities)
assert "Allow" == authz_resp['decision']
```

###

## Contributing

This project is very early stage. This project uses GitHub [issues](https://github.com/k9securityio/cedar-py/issues). Contributions are welcome.
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ classifiers = [
[project.optional-dependencies]
dev = [
'maturin==1.1.0',
'parameterized==0.9.0',
'pip-tools==6.13.0',
'pytest == 7.4.0',
]
Expand Down
4 changes: 3 additions & 1 deletion requirements.dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ build==0.10.0
# via pip-tools
click==8.1.3
# via pip-tools
exceptiongroup==1.1.1
exceptiongroup==1.1.2
# via pytest
iniconfig==2.0.0
# via pytest
Expand All @@ -18,6 +18,8 @@ packaging==23.1
# via
# build
# pytest
parameterized==0.9.0
# via cedarpolicy (pyproject.toml)
pip-tools==6.13.0
# via cedarpolicy (pyproject.toml)
pluggy==1.2.0
Expand Down
13 changes: 13 additions & 0 deletions tests/integration/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from typing import Union


def load_file_as_json(relative_file_path: str) -> Union[object, list, dict]:
import shared
return shared.load_file_as_json(relative_file_path=relative_file_path,
base_file=__file__)


def load_file_as_str(relative_file_path: str) -> str:
import shared
return shared.load_file_as_str(relative_file_path=relative_file_path,
base_file=__file__)
243 changes: 243 additions & 0 deletions tests/integration/test_cedar_integration_tests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,243 @@
from parameterized import parameterized
import unittest
from typing import List

import cedarpolicy

from shared import pretty_format, load_file_as_json, load_file_as_str


def custom_name_func(testcase_func, param_num, param):
# print(f'{type(param.args)} {param.args}')
return "%s_%s__%s" %(
testcase_func.__name__,
param_num,
parameterized.to_safe_name("__".join(parameterized.to_safe_name(str(x)) for x in param.args)),
)


def get_authz_test_params_for_use_case(use_case_id: str) -> list:
# Load the test data
cedar_int_tests_base = "resources/cedar-integration-tests"
test_def: dict = load_file_as_json(f"{cedar_int_tests_base}/tests/example_use_cases_doc/{use_case_id}.json")
print(f'loading tests defined for use case: {use_case_id}')

policies_file_name: str = test_def['policies']
entities_file_name: str = test_def['entities']
schema_file_name: str = test_def['schema']
should_validate: bool = test_def['should_validate']
queries: List[dict] = test_def['queries']
policies: str = load_file_as_str(f"{cedar_int_tests_base}/{policies_file_name}")
entities: list = load_file_as_json(f"{cedar_int_tests_base}/{entities_file_name}")
schema: object = load_file_as_json(f"{cedar_int_tests_base}/{schema_file_name}")

testing_params = []

for query in queries:
testing_params.append((policies,
entities,
schema,
should_validate,
query))

print(f'selected {len(testing_params)} test cases for {use_case_id}:\n{pretty_format(testing_params)}')
return testing_params


class CedarExampleUseCasesIntegrationTestCase(unittest.TestCase):

@parameterized.expand(get_authz_test_params_for_use_case("1a"),
name_func=custom_name_func)
def test_example_use_cases_doc_1a(self,
policies: str,
entities: list,
schema: dict,
should_validate: bool, # ignored; currently don't have the equivalent
query: dict):

self.exec_authz_query_with_assertions(policies=policies, entities=entities, schema=schema,
should_validate=should_validate,
query=query)

@parameterized.expand(get_authz_test_params_for_use_case("2a"),
name_func=custom_name_func)
def test_example_use_cases_doc_2a(self,
policies: str,
entities: list,
schema: dict,
should_validate: bool, # ignored; currently don't have the equivalent
query: dict):

self.exec_authz_query_with_assertions(policies=policies, entities=entities, schema=schema,
should_validate=should_validate,
query=query)

@parameterized.expand(get_authz_test_params_for_use_case("2b"),
name_func=custom_name_func)
def test_example_use_cases_doc_2b(self,
policies: str,
entities: list,
schema: dict,
should_validate: bool, # ignored; currently don't have the equivalent
query: dict):

self.exec_authz_query_with_assertions(policies=policies, entities=entities, schema=schema,
should_validate=should_validate,
query=query)

@parameterized.expand(get_authz_test_params_for_use_case("2c"),
name_func=custom_name_func)
def test_example_use_cases_doc_2c(self,
policies: str,
entities: list,
schema: dict,
should_validate: bool, # ignored; currently don't have the equivalent
query: dict):

self.exec_authz_query_with_assertions(policies=policies, entities=entities, schema=schema,
should_validate=should_validate,
query=query)

@parameterized.expand(get_authz_test_params_for_use_case("3a"),
name_func=custom_name_func)
def test_example_use_cases_doc_3a(self,
policies: str,
entities: list,
schema: dict,
should_validate: bool, # ignored; currently don't have the equivalent
query: dict):

self.exec_authz_query_with_assertions(policies=policies, entities=entities, schema=schema,
should_validate=should_validate,
query=query)

@parameterized.expand(get_authz_test_params_for_use_case("3b"),
name_func=custom_name_func)
def test_example_use_cases_doc_3b(self,
policies: str,
entities: list,
schema: dict,
should_validate: bool, # ignored; currently don't have the equivalent
query: dict):

self.exec_authz_query_with_assertions(policies=policies, entities=entities, schema=schema,
should_validate=should_validate,
query=query)

@parameterized.expand(get_authz_test_params_for_use_case("3c"),
name_func=custom_name_func)
def test_example_use_cases_doc_3c(self,
policies: str,
entities: list,
schema: dict,
should_validate: bool, # ignored; currently don't have the equivalent
query: dict):

self.exec_authz_query_with_assertions(policies=policies, entities=entities, schema=schema,
should_validate=should_validate,
query=query)

@parameterized.expand(get_authz_test_params_for_use_case("4a"),
name_func=custom_name_func)
def test_example_use_cases_doc_4a(self,
policies: str,
entities: list,
schema: dict,
should_validate: bool, # ignored; currently don't have the equivalent
query: dict):

self.exec_authz_query_with_assertions(policies=policies, entities=entities, schema=schema,
should_validate=should_validate,
query=query)

@parameterized.expand(get_authz_test_params_for_use_case("4c"),
name_func=custom_name_func)
@unittest.skip(reason="A couple of requests failing here; true reason TBD")
def test_example_use_cases_doc_4c(self,
policies: str,
entities: list,
schema: dict,
should_validate: bool, # ignored; currently don't have the equivalent
query: dict):

self.exec_authz_query_with_assertions(policies=policies, entities=entities, schema=schema,
should_validate=should_validate,
query=query)

@parameterized.expand(get_authz_test_params_for_use_case("4d"),
name_func=custom_name_func)
def test_example_use_cases_doc_4d(self,
policies: str,
entities: list,
schema: dict,
should_validate: bool, # ignored; currently don't have the equivalent
query: dict):

self.exec_authz_query_with_assertions(policies=policies, entities=entities, schema=schema,
should_validate=should_validate,
query=query)

@parameterized.expand(get_authz_test_params_for_use_case("4e"),
name_func=custom_name_func)
def test_example_use_cases_doc_4e(self,
policies: str,
entities: list,
schema: dict,
should_validate: bool, # ignored; currently don't have the equivalent
query: dict):

self.exec_authz_query_with_assertions(policies=policies, entities=entities, schema=schema,
should_validate=should_validate,
query=query)

@parameterized.expand(get_authz_test_params_for_use_case("4f"),
name_func=custom_name_func)
def test_example_use_cases_doc_4f(self,
policies: str,
entities: list,
schema: dict,
should_validate: bool, # ignored; currently don't have the equivalent
query: dict):

self.exec_authz_query_with_assertions(policies=policies, entities=entities, schema=schema,
should_validate=should_validate,
query=query)

@parameterized.expand(get_authz_test_params_for_use_case("5b"),
name_func=custom_name_func)
def test_example_use_cases_doc_5b(self,
policies: str,
entities: list,
schema: dict,
should_validate: bool, # ignored; currently don't have the equivalent
query: dict):

self.exec_authz_query_with_assertions(policies=policies, entities=entities, schema=schema,
should_validate=should_validate,
query=query)

def exec_authz_query_with_assertions(self,
policies: str,
entities: list,
schema: dict,
should_validate: bool, # ignored; currently don't have the equivalent
query: dict) -> None:
print(f"executing authz query:\n{pretty_format(query)}")
request = {
'principal': query['principal'],
'action': query['action'],
'resource': query['resource'],
'context': query.get('context', {}),
}
authz_resp: dict = cedarpolicy.is_authorized(request=request, policies=policies, entities=entities,
schema=schema)

description = query['desc']
self.assertEqual(query['decision'], authz_resp['decision'],
msg=f'unexpected decision for query desc: {description}')
# 'reason' spelling is correct here, but a debatable choice as it's a list
# 'reason' matches the (Rust) Decision enum but Java API has exposed as reasons (plural)
self.assertEqual(query['reasons'], authz_resp['diagnostics']['reason'],
msg=f'unexpected errors for query desc: {description}')
self.assertEqual(query['errors'], authz_resp['diagnostics']['errors'],
msg=f'unexpected errors for query desc: {description}')
38 changes: 38 additions & 0 deletions tests/shared/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import json
import os
import pprint
from typing import Union


def pretty_format(o: object) -> str:
"""Pretty print an object representation"""
return pprint.pformat(object=o, indent=2, width=120)


def load_file_as_json(relative_file_path: str, base_file=__file__) -> Union[object, list, dict]:
path = construct_path_relative_to_current_module(relative_file_path, base_file)

try:
with open(path) as json_file:
obj = json.load(json_file)
return obj
except FileNotFoundError:
print("File could not be found at: {}".format(path))
raise


def load_file_as_str(relative_file_path: str, base_file=__file__) -> str:
path = construct_path_relative_to_current_module(relative_file_path, base_file)

try:
with open(path) as f:
return f.read()
except FileNotFoundError:
print("File could not be found at: {}".format(path))
raise


def construct_path_relative_to_current_module(relative_file_path, base_file=__file__):
my_path = os.path.abspath(os.path.dirname(base_file))
path = os.path.join(my_path, relative_file_path)
return path
1 change: 1 addition & 0 deletions tests/shared/resources/cedar-integration-tests
Loading