-
Notifications
You must be signed in to change notification settings - Fork 26
/
Copy pathvulnerability_overwriter.py
190 lines (153 loc) · 6.04 KB
/
vulnerability_overwriter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
"""Contains classes and functions to overwrite OSV vulnerability data.
In certain scenarios, you may want to overwrite vulnerability information. For
example, a severity may be critical for Android phones, but on a smart camera
that doesn't support third-party apps, it might be NSI. The OSV schema allows
you to specify severity per ecosystem, but in some cases, you may need even
further granularity. This is the scenario where you need to overwrite
vulnerability information so that it is reported with the correct severity for
each device.
The overwriting happens after signatures are loaded and before they are
passed to a scanner. The changes happen in memory and are not persisted. They
are applied to a vulnerability JSON object.
'detector_runner' supports a flag '--overwrite_specs' that takes a path to a
JSON file containing a list of OverwriteSpec objects. Each spec defines a
vulnerability ID and one or more paths to field using basic JSONPath syntax
and values to be set.
Users can validate their overwrite specifications by running the
'overwrite_specs_validity_test'.
"""
import dataclasses
import json
from typing import Any, Sequence, Mapping
import jsonpath_rw
@dataclasses.dataclass(frozen=True)
class Replace:
"""Represents a replacement operation on a OSV vulnerability JSON object.
Attributes:
value: The value to replace with.
path: This field specifies the JSONPath expression to locate the field to be
replaced. The expression should yield a single field, wildcards are
supported for modifying arrays in a single operation (e.g.,
affected[*].ecosystem_specific.severity). It is applied per vulnerability,
not file-wide.
"""
value: str
path: jsonpath_rw.jsonpath.JSONPath
@dataclasses.dataclass(frozen=True)
class OverwriteSpec:
"""Specifies how to overwrite vulnerability data in a JSON object.
Attributes:
vuln_id: The ID of the vulnerability to overwrite. Should match exactly the
'id' field in the OSV vulnerability.
replace: A sequence of replace operations to apply to the vulnerability.
reason: A mandatory explanation of the reason for the overwrite.
"""
vuln_id: str
replace: Sequence[Replace]
reason: str
def _parse_replace(
raw_replace: Mapping[str, Any],
vuln_id: str,
) -> Replace:
"""Parses and validates a replace operation from a raw JSON object."""
if 'value' not in raw_replace:
raise ValueError(
'Value is required for each replace operation even if it is empty,'
f' check spec for {vuln_id}'
)
if not raw_replace.get('path'):
raise ValueError(
'JSON path is required for each replace operation, check spec for'
f' {vuln_id}'
)
try:
json_path = jsonpath_rw.parser.parse(raw_replace['path'])
return Replace(
value=raw_replace['value'],
path=json_path,
)
except Exception as e:
raise ValueError(
f'Invalid JSON path in replace operation, check spec for {vuln_id}: {e}'
) from e
def load_overwrite_specs_from_json(
json_content: str,
) -> Sequence[OverwriteSpec]:
"""Loads overwrite specs from a JSON string.
Args:
json_content: JSON string containing a list of overwrite specs in a JSON
format.
Returns:
Sequence of OverwriteSpec objects.
Raises:
ValueError: If the JSON string is invalid or if any of the overwrite specs
is missing required fields.
"""
raw_overwriters_info = json.loads(json_content)
overwrite_specs = []
for raw_spec in raw_overwriters_info:
if not raw_spec.get('vuln_id'):
raise ValueError(
f'Vulnerability ID is required for each overwriter: {raw_spec}'
)
if not raw_spec.get('reason'):
raise ValueError(
'Reason is required for each overwriter, vuln_id:'
f' {raw_spec.get("vuln_id")}'
)
if not raw_spec.get('replace'):
raise ValueError(
'Replace operations are not provided, vuln_id:'
f' {raw_spec.get("vuln_id")}'
)
overwrite_specs.append(
OverwriteSpec(
vuln_id=raw_spec['vuln_id'],
replace=[
_parse_replace(raw_replace, raw_spec['vuln_id'])
for raw_replace in raw_spec['replace']
],
reason=raw_spec['reason'],
)
)
return overwrite_specs
def load_overwrite_specs_from_file(file_path: str) -> Sequence[OverwriteSpec]:
"""Loads overwrite specs from a JSON file.
See an example of a valid overwrite spec file in the
testdata/test_overwrite_specs.json file.
Args:
file_path: Path to the JSON file containing overwrite specs.
Returns:
Sequence of OverwriteSpec objects.
Raises:
ValueError: If the JSON file is invalid or if any of the overwrite specs
is missing required fields.
"""
with open(file_path, 'r') as f:
return load_overwrite_specs_from_json(f.read())
def overwrite(
vulnerabilities: Sequence[dict[str, Any]],
overwrite_specs: Sequence[OverwriteSpec],
):
"""Applies overwrite specs to a list of vulnerabilities.
This function works as following. First it creates a map of overwriter specs
by their vuln_id to quickly find the spec for a given vulnerability. Then it
iterates over the vulnerabilities and for each one it finds the
corresponding spec (if any) and applies the replace operations on it.
The replace operations rely on JSONPath to find the fields to modify.
JSONPath may return multiple matches for a given Replace operation. In this
case all the matches are modified.
Args:
vulnerabilities: Sequence of vulnerabilities in OSV schema format. The
function can modify these objects by applying overwrite specs.
overwrite_specs: Sequence of OverwriteSpec objects to apply.
"""
if not overwrite_specs:
return
overwrite_spec_map = {spec.vuln_id: spec for spec in overwrite_specs}
for vulnerability in vulnerabilities:
vulnerability_id = vulnerability['id']
if vulnerability_id in overwrite_spec_map:
spec = overwrite_spec_map[vulnerability_id]
for replace in spec.replace:
replace.path.update(vulnerability, replace.value)