Skip to content

Commit 89f45e7

Browse files
authored
Add audit (#17951)
Changelog: Omit Docs: Omit All merit goes to @AbrilRBS! I had to squash her commits because of the CLA supersedes #17950 (review) that had a problem with the CLA
2 parents d1f92a8 + 79261b2 commit 89f45e7

File tree

8 files changed

+1002
-0
lines changed

8 files changed

+1002
-0
lines changed

conan/api/conan_api.py

+2
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import sys
33

44
from conan.api.output import init_colorama
5+
from conan.api.subapi.audit import AuditAPI
56
from conan.api.subapi.cache import CacheAPI
67
from conan.api.subapi.command import CommandAPI
78
from conan.api.subapi.local import LocalAPI
@@ -69,6 +70,7 @@ def __init__(self, cache_folder=None):
6970
self.cache = CacheAPI(self)
7071
self.lockfile = LockfileAPI(self)
7172
self.local = LocalAPI(self)
73+
self.audit = AuditAPI(self)
7274

7375
_check_conan_version(self)
7476

conan/api/subapi/audit.py

+159
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
import binascii
2+
import json
3+
import os
4+
import base64
5+
6+
from conan.internal.api.audit.providers import ConanCenterProvider, PrivateProvider
7+
from conan.errors import ConanException
8+
from conan.internal.api.remotes.encrypt import encode, decode
9+
from conan.internal.model.recipe_ref import RecipeReference
10+
from conans.util.files import save, load
11+
12+
CONAN_CENTER_AUDIT_PROVIDER_NAME = "conancenter"
13+
CYPHER_KEY = "private"
14+
15+
class AuditAPI:
16+
"""
17+
This class provides the functionality to scan references for vulnerabilities.
18+
"""
19+
20+
def __init__(self, conan_api):
21+
self.conan_api = conan_api
22+
self._home_folder = conan_api.home_folder
23+
self._providers_path = os.path.join(self._home_folder, "audit_providers.json")
24+
self._provider_cls = {
25+
"conan-center-proxy": ConanCenterProvider,
26+
"private": PrivateProvider,
27+
}
28+
29+
def scan(self, deps_graph, provider):
30+
"""
31+
Scan a given recipe for vulnerabilities in its dependencies.
32+
"""
33+
refs = sorted(set(RecipeReference.loads(f"{node.ref.name}/{node.ref.version}")
34+
for node in deps_graph.nodes[1:]), key= lambda ref: ref.name)
35+
return provider.get_cves(refs)
36+
37+
def list(self, references, provider):
38+
"""
39+
List the vulnerabilities of the given reference.
40+
"""
41+
refs = [RecipeReference.loads(ref) for ref in references]
42+
for ref in refs:
43+
ref.validate_ref()
44+
return provider.get_cves(refs)
45+
46+
def get_provider(self, provider_name):
47+
"""
48+
Get the provider by name.
49+
"""
50+
# TODO: More work remains to be done here, hardcoded for now for testing
51+
providers = _load_providers(self._providers_path)
52+
if provider_name not in providers:
53+
add_arguments = (
54+
"--url=https://audit.conan.io/ --type=conan-center-proxy"
55+
if provider_name == CONAN_CENTER_AUDIT_PROVIDER_NAME
56+
else "--url=<url> --type=<type>"
57+
)
58+
59+
register_message = (
60+
f"If you don't have a valid token, register at: https://audit.conan.io/register."
61+
if provider_name == CONAN_CENTER_AUDIT_PROVIDER_NAME
62+
else ""
63+
)
64+
65+
raise ConanException(
66+
f"Provider '{provider_name}' not found. Please specify a valid provider name or add it using: "
67+
f"'conan audit provider add {provider_name} {add_arguments} --token=<token>'\n"
68+
f"{register_message}"
69+
)
70+
71+
provider_data = providers[provider_name]
72+
safe_provider_name = provider_name.replace("-", "_")
73+
env_token = os.getenv(f"CONAN_AUDIT_PROVIDER_TOKEN_{safe_provider_name.upper()}")
74+
75+
if env_token:
76+
# Always override the token with the environment variable
77+
provider_data["token"] = env_token
78+
elif "token" in provider_data:
79+
try:
80+
provider_data["token"] = decode(base64.standard_b64decode(provider_data["token"]).decode(), CYPHER_KEY)
81+
except binascii.Error as e:
82+
raise ConanException(f"Invalid token format for provider '{provider_name}'. The token might be corrupt.")
83+
84+
provider_cls = self._provider_cls.get(provider_data["type"])
85+
86+
return provider_cls(self.conan_api, provider_name, provider_data)
87+
88+
def list_providers(self):
89+
"""
90+
Get all available providers.
91+
"""
92+
providers = _load_providers(self._providers_path)
93+
result = []
94+
for name, provider_data in providers.items():
95+
provider_cls = self._provider_cls.get(provider_data["type"])
96+
result.append(provider_cls(self.conan_api, name, provider_data))
97+
return result
98+
99+
def add_provider(self, name, url, provider_type):
100+
"""
101+
Add a provider.
102+
"""
103+
providers = _load_providers(self._providers_path)
104+
if name in providers:
105+
raise ConanException(f"Provider '{name}' already exists")
106+
107+
if provider_type not in self._provider_cls:
108+
raise ConanException(f"Provider type '{provider_type}' not found")
109+
110+
providers[name] = {
111+
"name": name,
112+
"url": url,
113+
"type": provider_type
114+
}
115+
116+
_save_providers(self._providers_path, providers)
117+
118+
def remove_provider(self, provider_name):
119+
"""
120+
Remove a provider.
121+
"""
122+
providers = _load_providers(self._providers_path)
123+
if provider_name not in providers:
124+
raise ConanException(f"Provider '{provider_name}' not found")
125+
126+
del providers[provider_name]
127+
128+
_save_providers(self._providers_path, providers)
129+
130+
def auth_provider(self, provider, token):
131+
"""
132+
Authenticate a provider.
133+
"""
134+
if not provider:
135+
raise ConanException("Provider not found")
136+
137+
providers = _load_providers(self._providers_path)
138+
139+
assert provider.name in providers
140+
providers[provider.name]["token"] = base64.standard_b64encode(encode(token, CYPHER_KEY).encode()).decode()
141+
setattr(provider, "token", token)
142+
_save_providers(self._providers_path, providers)
143+
144+
145+
def _load_providers(providers_path):
146+
if not os.path.exists(providers_path):
147+
default_providers = {
148+
CONAN_CENTER_AUDIT_PROVIDER_NAME: {
149+
"url": "https://audit.conan.io/",
150+
"type": "conan-center-proxy"
151+
}
152+
}
153+
save(providers_path, json.dumps(default_providers, indent=4))
154+
155+
return json.loads(load(providers_path))
156+
157+
158+
def _save_providers(providers_path, providers):
159+
save(providers_path, json.dumps(providers, indent=4))

conan/cli/commands/audit.py

+195
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,195 @@
1+
import json
2+
import os
3+
4+
from conan.api.conan_api import ConanAPI
5+
from conan.api.input import UserInput
6+
from conan.api.model import MultiPackagesList
7+
from conan.api.output import cli_out_write, ConanOutput
8+
from conan.api.subapi.audit import CONAN_CENTER_AUDIT_PROVIDER_NAME
9+
from conan.cli import make_abs_path
10+
from conan.cli.args import common_graph_args, validate_common_graph_args
11+
from conan.cli.command import conan_command, conan_subcommand
12+
from conan.cli.formatters.audit.vulnerabilities import text_vuln_formatter, json_vuln_formatter, \
13+
html_vuln_formatter
14+
from conan.cli.printers import print_profiles
15+
from conan.cli.printers.graph import print_graph_basic
16+
from conan.errors import ConanException
17+
18+
19+
def _add_provider_arg(subparser):
20+
subparser.add_argument("-p", "--provider", help="Provider to use for scanning")
21+
22+
23+
@conan_subcommand(formatters={"text": text_vuln_formatter,
24+
"json": json_vuln_formatter,
25+
"html": html_vuln_formatter})
26+
def audit_scan(conan_api: ConanAPI, parser, subparser, *args):
27+
"""
28+
Scan a given recipe for vulnerabilities in its dependencies.
29+
"""
30+
common_graph_args(subparser)
31+
# Needed for the validation of args, but this should usually be left as False here
32+
# TODO: Do we then want to hide it in the --help?
33+
subparser.add_argument("--build-require", action='store_true', default=False,
34+
help='Whether the provided reference is a build-require')
35+
36+
_add_provider_arg(subparser)
37+
args = parser.parse_args(*args)
38+
39+
# This comes from install command
40+
41+
validate_common_graph_args(args)
42+
# basic paths
43+
cwd = os.getcwd()
44+
path = conan_api.local.get_conanfile_path(args.path, cwd, py=None) if args.path else None
45+
46+
# Basic collaborators: remotes, lockfile, profiles
47+
remotes = conan_api.remotes.list(args.remote) if not args.no_remote else []
48+
overrides = eval(args.lockfile_overrides) if args.lockfile_overrides else None
49+
lockfile = conan_api.lockfile.get_lockfile(lockfile=args.lockfile, conanfile_path=path, cwd=cwd,
50+
partial=args.lockfile_partial, overrides=overrides)
51+
profile_host, profile_build = conan_api.profiles.get_profiles_from_args(args)
52+
print_profiles(profile_host, profile_build)
53+
54+
# Graph computation (without installation of binaries)
55+
gapi = conan_api.graph
56+
if path:
57+
deps_graph = gapi.load_graph_consumer(path, args.name, args.version, args.user, args.channel,
58+
profile_host, profile_build, lockfile, remotes,
59+
# TO DISCUSS: defaulting to False the is_build_require
60+
args.update, is_build_require=args.build_require)
61+
else:
62+
deps_graph = gapi.load_graph_requires(args.requires, args.tool_requires, profile_host,
63+
profile_build, lockfile, remotes, args.update)
64+
print_graph_basic(deps_graph)
65+
deps_graph.report_graph_error()
66+
67+
if deps_graph.error:
68+
return {"error": deps_graph.error}
69+
70+
provider = conan_api.audit.get_provider(args.provider or CONAN_CENTER_AUDIT_PROVIDER_NAME)
71+
72+
return conan_api.audit.scan(deps_graph, provider)
73+
74+
75+
@conan_subcommand(formatters={"text": text_vuln_formatter,
76+
"json": json_vuln_formatter,
77+
"html": html_vuln_formatter})
78+
def audit_list(conan_api: ConanAPI, parser, subparser, *args):
79+
"""
80+
List the vulnerabilities of the given reference.
81+
"""
82+
subparser.add_argument("reference", help="Reference to list vulnerabilities for", nargs="?")
83+
subparser.add_argument("-l", "--list", help="pkglist file to list vulnerabilities for", default=None)
84+
subparser.add_argument("-r", "--remote", help="Remote to use for listing", default=None)
85+
_add_provider_arg(subparser)
86+
args = parser.parse_args(*args)
87+
88+
if not args.reference and not args.list:
89+
raise ConanException("Please specify a reference or a pkglist file")
90+
91+
if args.reference and args.list:
92+
raise ConanException("Please specify a reference or a pkglist file, not both")
93+
94+
provider = conan_api.audit.get_provider(args.provider or CONAN_CENTER_AUDIT_PROVIDER_NAME)
95+
96+
references = []
97+
if args.list:
98+
listfile = make_abs_path(args.list)
99+
multi_package_list = MultiPackagesList.load(listfile)
100+
cache_name = "Local Cache" if not args.remote else args.remote
101+
package_list = multi_package_list[cache_name]
102+
refs_to_list = package_list.serialize()
103+
references = list(refs_to_list.keys())
104+
if not references: # the package list might contain only refs, no revs
105+
ConanOutput().warning("Nothing to list, package list do not contain recipe revisions")
106+
else:
107+
references = [args.reference]
108+
return conan_api.audit.list(references, provider)
109+
110+
def text_provider_formatter(providers_action):
111+
providers = providers_action[0]
112+
action = providers_action[1]
113+
114+
if action == "remove":
115+
cli_out_write("Provider removed successfully.")
116+
elif action == "add":
117+
cli_out_write("Provider added successfully.")
118+
elif action == "auth":
119+
cli_out_write("Provider authentication added.")
120+
elif action == "list":
121+
if not providers:
122+
cli_out_write("No providers found.")
123+
else:
124+
for provider in providers:
125+
if provider:
126+
cli_out_write(f"{provider.name} (type: {provider.type}) - {provider.url}")
127+
128+
def json_provider_formatter(providers_action):
129+
ret = []
130+
for provider in providers_action[0]:
131+
if provider:
132+
ret.append({"name": provider.name, "url": provider.url, "type": provider.type})
133+
cli_out_write(json.dumps(ret, indent=4))
134+
135+
136+
@conan_subcommand(formatters={"text": text_provider_formatter, "json": json_provider_formatter})
137+
def audit_provider(conan_api, parser, subparser, *args):
138+
"""
139+
Manage security providers for the 'conan audit' command.
140+
"""
141+
142+
subparser.add_argument("action", choices=["add", "list", "auth", "remove"], help="Action to perform from 'add', 'list' , 'remove' or 'auth'")
143+
subparser.add_argument("name", help="Provider name", nargs="?")
144+
145+
subparser.add_argument("--url", help="Provider URL")
146+
subparser.add_argument("--type", help="Provider type", choices=["conan-center-proxy", "private"])
147+
subparser.add_argument("--token", help="Provider token")
148+
args = parser.parse_args(*args)
149+
150+
if args.action == "add":
151+
if not args.name or not args.url or not args.type:
152+
raise ConanException("Name, URL and type are required to add a provider")
153+
if " " in args.name:
154+
raise ConanException("Name cannot contain spaces")
155+
conan_api.audit.add_provider(args.name, args.url, args.type)
156+
157+
if not args.token:
158+
user_input = UserInput(conan_api.config.get("core:non_interactive"))
159+
ConanOutput().write(f"Please enter a token for {args.name} the provider: ")
160+
token = user_input.get_password()
161+
else:
162+
token = args.token
163+
164+
provider = conan_api.audit.get_provider(args.name)
165+
if token:
166+
conan_api.audit.auth_provider(provider, token)
167+
168+
return [provider], args.action
169+
elif args.action == "remove":
170+
if not args.name:
171+
raise ConanException("Name required to remove a provider")
172+
conan_api.audit.remove_provider(args.name)
173+
return [], args.action
174+
elif args.action == "list":
175+
providers = conan_api.audit.list_providers()
176+
return providers, args.action
177+
elif args.action == "auth":
178+
if not args.name:
179+
raise ConanException("Name is required to authenticate on a provider")
180+
if not args.token:
181+
user_input = UserInput(conan_api.config.get("core:non_interactive"))
182+
ConanOutput().write(f"Please enter a token for {args.name} the provider: ")
183+
token = user_input.get_password()
184+
else:
185+
token = args.token
186+
187+
provider = conan_api.audit.get_provider(args.name)
188+
conan_api.audit.auth_provider(provider, token)
189+
return [provider], args.action
190+
191+
@conan_command(group="Security")
192+
def audit(conan_api, parser, *args):
193+
"""
194+
Find vulnerabilities in your dependencies.
195+
"""

conan/cli/formatters/audit/__init__.py

Whitespace-only changes.

0 commit comments

Comments
 (0)