Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ repos:
- subprocess-tee>=0.3.5
- types-PyYAML
- types-pkg_resources
- types-jsonschema
- repo: https://github.com/pycqa/pylint
rev: v2.13.8
hooks:
Expand Down
58 changes: 0 additions & 58 deletions constraints.txt

This file was deleted.

8 changes: 7 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ ansible-pygments==0.1.1
argh==0.26.2
# via sphinx-autobuild
attrs==21.4.0
# via pytest
# via
# jsonschema
# pytest
babel==2.10.1
# via sphinx
certifi==2022.5.18.1
Expand Down Expand Up @@ -43,6 +45,8 @@ jinja2==3.1.2
# via
# myst-parser
# sphinx
jsonschema==4.5.1
# via ansible-compat (setup.cfg)
livereload==2.6.3
# via sphinx-autobuild
markdown-it-py==2.1.0
Expand Down Expand Up @@ -81,6 +85,8 @@ pygments==2.12.0
# sphinx
pyparsing==3.0.9
# via packaging
pyrsistent==0.18.1
# via jsonschema
pytest==6.2.5
# via
# ansible-compat (setup.cfg)
Expand Down
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ zip_safe = False
install_requires =
PyYAML
subprocess-tee >= 0.3.5
jsonschema >= 4.5.1

[options.extras_require]
docs =
Expand Down
128 changes: 128 additions & 0 deletions src/ansible_compat/schema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
"""Utils for JSON Schema validation."""
import json
from dataclasses import dataclass
from typing import Any, Deque, Dict, List, Mapping, Union

import jsonschema
from jsonschema.validators import validator_for


def to_path(schema_path: Deque[str]) -> str:
"""Flatten a path to a dot delimited string.

:param schema_path: The schema path
:returns: The dot delimited path
"""
return ".".join(str(index) for index in schema_path)


def json_path(absolute_path: Deque[str]) -> str:
"""Flatten a data path to a dot delimited string.

:param absolute_path: The path
:returns: The dot delimited string
"""
path = "$"
for elem in absolute_path:
if isinstance(elem, int):
path += "[" + str(elem) + "]"
else:
path += "." + elem
return path


@dataclass
class JsonSchemaError:
# pylint: disable=too-many-instance-attributes
"""Data structure to hold a json schema validation error."""

message: str
data_path: str
json_path: str
schema_path: str
relative_schema: str
expected: Union[bool, int, str]
validator: str
found: str

@property
def _hash_key(self) -> Any:
# line attr is knowingly excluded, as dict is not hashable
return (
self.schema_path,
self.data_path,
self.json_path,
self.message,
self.expected,
)

def __hash__(self) -> int:
"""Return a hash value of the instance."""
return hash(self._hash_key)

def __eq__(self, other: object) -> bool:
"""Identify whether the other object represents the same rule match."""
if not isinstance(other, self.__class__):
return NotImplemented
return self.__hash__() == other.__hash__()

def __lt__(self, other: object) -> bool:
"""Return whether the current object is less than the other."""
if not isinstance(other, self.__class__):
return NotImplemented
return bool(self._hash_key < other._hash_key)

def to_friendly(self) -> str:
"""Provide a friendly explanation of the error.

:returns: The error message
"""
return f"In '{self.data_path}': {self.message}."


def validate(
schema: Union[str, Mapping[str, Any]], data: Dict[str, Any]
) -> List[JsonSchemaError]:
"""Validate some data against a JSON schema.

:param schema: the JSON schema to use for validation
:param data: The data to validate
:returns: Any errors encountered
"""
errors: List[JsonSchemaError] = []

if isinstance(schema, str):
schema = json.loads(schema)
try:
if not isinstance(schema, Mapping):
raise jsonschema.SchemaError("Invalid schema, must be a mapping")
validator = validator_for(schema)
validator.check_schema(schema)
except jsonschema.SchemaError as exc:
error = JsonSchemaError(
message=str(exc),
data_path="schema sanity check",
json_path="",
schema_path="",
relative_schema="",
expected="",
validator="",
found="",
)
errors.append(error)
return errors

for validation_error in validator(schema).iter_errors(data):
if isinstance(validation_error, jsonschema.ValidationError):
error = JsonSchemaError(
message=validation_error.message,
data_path=to_path(validation_error.absolute_path),
json_path=json_path(validation_error.absolute_path),
schema_path=to_path(validation_error.schema_path),
relative_schema=validation_error.schema,
expected=validation_error.validator_value,
validator=validation_error.validator,
found=str(validation_error.instance),
)
errors.append(error)
return sorted(errors)
48 changes: 48 additions & 0 deletions test/test_schema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
"""Tests for schema utilities."""
from ansible_compat.schema import JsonSchemaError, validate

schema = {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
"environment": {"type": "object", "additionalProperties": {"type": "string"}}
},
}

instance = {"environment": {"a": False, "b": True, "c": "foo"}}

expected_results = [
JsonSchemaError(
message="False is not of type 'string'",
data_path="environment.a",
json_path="$.environment.a",
schema_path="properties.environment.additionalProperties.type",
relative_schema='{"type": "string"}',
expected="string",
validator="type",
found="False",
),
JsonSchemaError(
message="True is not of type 'string'",
data_path="environment.b",
json_path="$.environment.b",
schema_path="properties.environment.additionalProperties.type",
relative_schema='{"type": "string"}',
expected="string",
validator="type",
found="True",
),
]


def test_schema() -> None:
"""Test the schema validator."""
results = validate(schema=schema, data=instance)
# ensure we produce consistent results between runs
for _ in range(1, 100):
new_results = validate(schema=schema, data=instance)
assert results == new_results, "inconsistent returns"
# print(result)
assert len(results) == len(expected_results)
assert sorted(results) == results, "multiple errors not sorted"
for i, result in enumerate(results):
assert result == expected_results[i]
2 changes: 1 addition & 1 deletion tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ setenv =
PIP_DISABLE_PIP_VERSION_CHECK = 1
PIP_CONSTRAINT = {toxinidir}/requirements.txt
PRE_COMMIT_COLOR = always
PYTEST_REQPASS = 75
PYTEST_REQPASS = 76
FORCE_COLOR = 1
allowlist_externals =
ansible
Expand Down