-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathconfiguration.py
202 lines (154 loc) · 6.55 KB
/
configuration.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
import logging
from binascii import hexlify
from functools import lru_cache, singledispatch
from pathlib import Path
from typing import Any, Dict, Optional, Union
import yaml
from cryptography.hazmat.primitives.asymmetric.x25519 import (
X25519PrivateKey,
X25519PublicKey,
)
from cryptography.hazmat.primitives.serialization import (
Encoding,
load_der_private_key,
load_der_public_key,
load_pem_private_key,
load_pem_public_key,
)
from cryptography.x509 import load_pem_x509_certificate
from pydantic import root_validator # type: ignore
from pydantic import BaseModel, Extra, FilePath, StrictBytes
from ..configuration_file_model import ModelConfigurationModel
from .logging_utils import LogDict, LogIter
DEFAULT_MODEL_CONFIG: Dict[Any, Any] = {
"i_pairing_keys": {
0: {
"value": b"\x84/\xe3!\xa8$t\x0877\xff+\x9b\x88\xa2\xafBD-\xb0\xd8\xaa\xccm\xc6\x9e\x99S3D\xb2F" # noqa E501
}
},
"s_t_priv": b"H\xb9/\x05\x0b\xfb\x82@\"\xec\xef{\xc5\xec\xbc\xa4R\xd3\xfd'p\xe8\xb5T\x9e\x93g)\xacx\xc4m", # noqa E501
"s_t_pub": b"\x07z\xad\x06\x0b\xbb8F-:\xa5.\x9e\xef\xe8\xfa\xa7\x84\x16\x9b,g;\xe0n\xf3\xfe\x1f\xd1\xc1\x93G", # noqa E501
}
class KeyTypeError(TypeError):
def __init__(self, key_type: str, type_: type, expected_type: type) -> None:
super().__init__(f"{key_type} key type is not {expected_type}: {type_}")
def merge_dicts(*dcts: Dict[Any, Any]) -> Dict[Any, Any]:
"""Merge dictionaries, the first has the least priority."""
def _combine_into(d: Dict[Any, Any], combined: Dict[Any, Any]) -> None:
for k, v in d.items():
if isinstance(v, dict):
_combine_into(v, combined.setdefault(k, {})) # type: ignore
else:
combined[k] = v
result: Dict[Any, Any] = {}
for dct in dcts:
_combine_into(dct, result)
return result
@singledispatch
def load_private_key(__value: Any) -> X25519PrivateKey:
"""Load a X25519 private key."""
raise NotImplementedError(f"{type(__value)} not supported")
@load_private_key.register
def _(__value: bytes) -> X25519PrivateKey:
return X25519PrivateKey.from_private_bytes(__value)
@load_private_key.register
def _(__value: Path) -> X25519PrivateKey:
if __value.suffix == ".pem":
private_key = load_pem_private_key(__value.read_bytes(), None)
if __value.suffix == ".der":
private_key = load_der_private_key(__value.read_bytes(), None)
else:
raise TypeError("Private key not in DER nor PEM format")
if not isinstance(private_key, (expected := X25519PrivateKey)):
raise KeyTypeError("Private", type(private_key), expected)
return private_key
@singledispatch
def load_public_key(__value: Any) -> X25519PublicKey:
"""Load a X25519 public key."""
raise NotImplementedError(f"{type(__value)} not supported")
@load_public_key.register
def _(__value: bytes) -> X25519PublicKey:
return X25519PublicKey.from_public_bytes(__value)
@load_public_key.register
def _(__value: Path) -> X25519PublicKey:
if __value.suffix == ".pem":
public_key = load_pem_public_key(__value.read_bytes(), None)
if __value.suffix == ".der":
public_key = load_der_public_key(__value.read_bytes(), None)
else:
raise TypeError("Private key not in DER nor PEM format")
if not isinstance(public_key, (expected := X25519PublicKey)):
raise KeyTypeError("Private", type(public_key), expected)
return public_key
@singledispatch
def load_certificate(__value: Any) -> bytes:
"""Load a x509 certificate in bytes format."""
raise NotImplementedError(f"{type(__value)} not supported")
@load_certificate.register
def _(__value: bytes) -> bytes:
return __value
@load_certificate.register
def _(__value: Path) -> bytes:
if __value.suffix == ".pem":
return load_pem_x509_certificate(__value.read_bytes()).public_bytes(
Encoding.DER
)
if __value.suffix == ".der":
return __value.read_bytes()
raise TypeError("Private key not in DER nor PEM format")
class ConfigurationModel(BaseModel, extra=Extra.allow):
"""Pydantic model to validate the configuration file"""
s_t_priv: Union[StrictBytes, FilePath]
s_t_pub: Optional[Union[StrictBytes, FilePath]]
x509_certificate: Union[StrictBytes, FilePath]
@root_validator # type: ignore
def process_values(cls, values: Dict[str, Union[bytes, Path]]):
# process public key
if (s_t_pub := values.get("s_t_pub")) is not None:
public_key = load_public_key(s_t_pub)
values["s_t_pub"] = public_key.public_bytes_raw()
# process private key
if (s_t_priv := values.get("s_t_priv")) is not None:
private_key = load_private_key(s_t_priv)
values["s_t_priv"] = private_key.private_bytes_raw()
# define tropic public key if not defined
if values.get("s_t_pub") is None:
values["s_t_pub"] = private_key.public_key().public_bytes_raw()
# process certificate
if (cert := values.get("x509_certificate")) is not None:
values["x509_certificate"] = load_certificate(cert)
return values
@lru_cache
def _open_yaml_file(filepath: Path) -> Dict[Any, Any]:
return yaml.safe_load(filepath.read_bytes())
def _format(config: Dict[Any, Any]) -> Dict[Any, Any]:
formatted_config: Dict[Any, Any] = {}
for k, v in config.items():
if isinstance(v, dict):
formatted_config[k] = _format(v) # type: ignore
elif isinstance(v, bytes):
formatted_config[k] = hexlify(v)
else:
formatted_config[k] = v
return formatted_config
def load_configuration(
filepath: Optional[Path], logger: logging.Logger
) -> Dict[Any, Any]:
if filepath is None:
config: Dict[Any, Any] = {}
else:
logger.info("Loading target configuration from %s.", filepath)
config = _open_yaml_file(filepath)
logger.debug("Configuration from file:%s", LogDict(_format(config)))
# Checking file content
config = ConfigurationModel.parse_obj(config).dict()
# Merging file configuration with default configuration
config = merge_dicts(DEFAULT_MODEL_CONFIG, config)
logger.debug("Configuration after merge:%s", LogDict(_format(config)))
# Checking configuration
config = ModelConfigurationModel.parse_obj(config).dict(exclude_none=True)
logger.info(
"Target Configuration loaded and validated:%s", LogDict(_format(config))
)
logger.debug("STPUB[] = {%s};", LogIter(config["s_t_pub"], "%#04x"))
return config