-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcommon_checks.py
263 lines (223 loc) · 11.3 KB
/
common_checks.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
import json
import re
import typing
from textwrap import dedent
from jsonschema.exceptions import ValidationError, _RefResolutionError
from libcove.lib.common import common_checks_context, get_additional_codelist_values, unique_ids, validator
from referencing.exceptions import Unresolvable
from libcoveocds.exceptions import LibCoveOCDSError
from libcoveocds.lib.additional_checks import CHECKS, run_additional_checks
from libcoveocds.lib.common_checks import get_bad_ocid_prefixes
try:
import bleach
from django.utils.html import conditional_escape, escape, format_html, mark_safe
from markdown_it import MarkdownIt
md = MarkdownIt()
validation_error_lookup = {
"date-time": mark_safe(
"Incorrect date format. Dates should use the form YYYY-MM-DDT00:00:00Z. Learn more about "
'<a href="https://standard.open-contracting.org/latest/en/schema/reference/#date">dates in OCDS</a>.'
),
}
WEB_EXTRA_INSTALLED = True
except ImportError:
WEB_EXTRA_INSTALLED = False
def unique_ids_or_ocids(validator, ui, instance, schema):
# `records` key from the JSON schema doesn't get passed through to here, so
# we look out for this $ref — this may change if the way the schema files
# are structured changes.
if schema.get("items") == {"$ref": "#/definitions/record"}:
return unique_ids(validator, ui, instance, schema, id_names=["ocid"])
if "$ref" in schema.get("items", {}) and schema["items"]["$ref"].endswith("release-schema.json"):
return unique_ids(validator, ui, instance, schema, id_names=["ocid", "id"])
return unique_ids(validator, ui, instance, schema, id_names=["id"])
def one_of_draft4(validator, one_of, instance, schema):
"""
Modify oneOf_draft4 from https://github.com/Julian/jsonschema/blob/d16713a/jsonschema/_validators.py#L337.
- Sort the instance JSON, so we get a reproducible output that we can can test more easily.
- Yield all the individual errors for linked or embedded releases within a record.
- Return more information on the ValidationError, to allow us to use a translated message in cove-ocds.
"""
subschemas = enumerate(one_of)
all_errors = []
for index, subschema in subschemas:
errs = list(validator.descend(instance, subschema, schema_path=index))
if not errs:
first_valid = subschema
break
# We check the title, because we don't have access to the field name,
# as it lives in the parent.
# It will not match the releases array in a release package, because
# there is no oneOf.
if (
schema.get("title") == "Releases"
or schema.get("description") == "An array of linking identifiers or releases"
):
# If instance is not a list, or is a list of zero length, then
# validating against either subschema will work.
# Assume instance is an array of Linked releases, if there are no
# "id"s in any of the releases.
if type(instance) is not list or all("id" not in release for release in instance):
if "properties" in subschema.get("items", {}) and "id" not in subschema["items"]["properties"]:
for err in errs:
err.assumption = "linked_releases"
yield err
return
# Assume instance is an array of Embedded releases, if there is an
# "id" in each of the releases
elif all("id" in release for release in instance):
if "id" in subschema.get("items", {}).get("properties", {}) or subschema.get("items", {}).get(
"$ref", ""
).endswith("release-schema.json"):
for err in errs:
err.assumption = "embedded_releases"
yield err
return
else:
err = ValidationError(
"This array should contain either entirely embedded releases or "
"linked releases. Embedded releases contain an 'id' whereas linked "
"releases do not. Your releases contain a mixture."
)
err.error_id = "releases_both_embedded_and_linked"
yield err
return
all_errors.extend(errs)
else:
err = ValidationError(
f"{json.dumps(instance, sort_keys=True)} " "is not valid under any of the given schemas",
context=all_errors,
)
err.error_id = "oneOf_any"
yield err
more_valid = [s for i, s in subschemas if validator.evolve(schema=s).is_valid(instance)]
if more_valid:
more_valid.append(first_valid)
reprs = ", ".join(repr(schema) for schema in more_valid)
err = ValidationError(f"{instance!r} is valid under each of {reprs}")
err.error_id = "oneOf_each"
err.reprs = reprs
yield err
validator.VALIDATORS["uniqueItems"] = unique_ids_or_ocids
validator.VALIDATORS["oneOf"] = one_of_draft4
# ref_info is used calculate the HTML anchor for the field in the OCDS documentation.
def _lookup_schema(schema, path, ref_info=None):
if not path:
return schema, ref_info
if hasattr(schema, "__reference__"):
ref_info = {"path": path, "reference": schema.__reference__}
if "items" in schema:
return _lookup_schema(schema["items"], path, ref_info)
if "properties" in schema:
head, *tail = path
if head in schema["properties"]:
return _lookup_schema(schema["properties"][head], tail, ref_info)
return None, None
def common_checks_ocds(
context,
upload_dir,
json_data,
schema_obj,
*,
cache=True,
):
"""
Perform all checks.
param skip_aggregates: whether to skip "count" and "unique_ocids_count"
"""
skip_aggregates = schema_obj.config.config["skip_aggregates"]
additional_checks = CHECKS[schema_obj.config.config["additional_checks"]]
# Pass "-" as the schema name. The associated logic is not required by lib-cove-ocds.
try:
common_checks = common_checks_context(
upload_dir, json_data, schema_obj, "-", context, fields_regex=True, api=schema_obj.api, cache=cache
)
except (Unresolvable, _RefResolutionError) as e:
# For example: "PointerToNowhere: '/definitions/Unresolvable' does not exist within {big JSON blob}"
schema_obj.json_deref_error = re.sub(r" within .+", "", str(e))
return context
# Note: Pelican checks whether the OCID prefix is registered.
ocds_prefixes_bad_format = get_bad_ocid_prefixes(json_data)
if ocds_prefixes_bad_format:
context["conformance_errors"] = {"ocds_prefixes_bad_format": ocds_prefixes_bad_format}
if not schema_obj.api and not WEB_EXTRA_INSTALLED:
raise LibCoveOCDSError(
dedent(
"""
Cannot format errors for web context if the libcoveocds[web] extra is not installed.
To use libcoveocds in a web context, run:
pip install libcoveocds[web]
To use libcoveocds in an API context, set the context on the configuration:
lib_cove_ocds_config = libcoveocds.config.LibCoveOCDSConfig()
lib_cove_ocds_config.config["context"] = "api"
schema_obj = libcoveocds.schema.SchemaOCDS(lib_cove_ocds_config=lib_cove_ocds_config)
"""
)
)
# If called in an API context:
# - Skip the schema description and reference URL for OCID prefix conformance errors.
# - Skip the formatted message, schema title, schema description and reference URL for validation errors.
if not schema_obj.api:
if "conformance_errors" in context:
ocid_description = schema_obj.get_schema_obj()["properties"]["ocid"]["description"]
# The last sentence of the `ocid` description is assumed to contain a guidance URL in all OCDS versions.
index = ocid_description.rindex(". ") + 1
context["conformance_errors"]["ocid_description"] = ocid_description[:index]
context["conformance_errors"]["ocid_info_url"] = re.search(r"\((\S+)\)", ocid_description[index:]).group(1)
new_validation_errors = []
for json_key, values in common_checks["context"]["validation_errors"]:
error = json.loads(json_key)
new_message = validation_error_lookup.get(error["message_type"])
if new_message:
error["message_safe"] = conditional_escape(new_message)
elif "message_safe" in error:
error["message_safe"] = mark_safe(error["message_safe"])
else:
error["message_safe"] = conditional_escape(error["message"])
schema_block, ref_info = _lookup_schema(
schema_obj.get_pkg_schema_obj(deref=True, proxies=True), error["path_no_number"].split("/")
)
if schema_block and error["message_type"] != "required":
if "description" in schema_block:
error["schema_title"] = escape(schema_block.get("title", ""))
error["schema_description_safe"] = mark_safe(
bleach.clean(
md.render(schema_block["description"]), tags=bleach.sanitizer.ALLOWED_TAGS | {"p"}
)
)
if ref_info:
ref = ref_info["reference"]["$ref"]
ref = "" if ref.endswith("release-schema.json") else ref.strip("#")
ref_path = "/".join(ref_info["path"])
schema = "record-package-schema.json" if ref == "/definitions/record" else "release-schema.json"
else:
ref = ""
ref_path = error["path_no_number"]
schema = schema_obj.package_schema_name
error["docs_ref"] = format_html("{},{},{}", schema, ref, ref_path)
new_validation_errors.append([json.dumps(error, sort_keys=True), values])
common_checks["context"]["validation_errors"] = new_validation_errors
context.update(common_checks["context"])
if not skip_aggregates and isinstance(json_data, dict):
count = 0
unique_ocids = set()
releases_or_records = json_data.get("records", []) or json_data.get("releases", [])
if isinstance(releases_or_records, list):
for release_or_record in releases_or_records:
if not isinstance(release_or_record, dict):
continue
count += 1
if (ocid := release_or_record.get("ocid")) and isinstance(ocid, typing.Hashable):
unique_ocids.add(release_or_record["ocid"])
context["count"] = count
context["unique_ocids_count"] = len(unique_ocids)
additional_codelist_values = get_additional_codelist_values(schema_obj, json_data)
context["additional_closed_codelist_values"] = {
key: value for key, value in additional_codelist_values.items() if not value["isopen"]
}
context["additional_open_codelist_values"] = {
key: value for key, value in additional_codelist_values.items() if value["isopen"]
}
if additional_checks:
context["additional_checks"] = run_additional_checks(json_data, additional_checks)
return context