-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathverification_methods.py
70 lines (55 loc) · 2.73 KB
/
verification_methods.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import base64
import logging
from typing import List, Tuple, Optional
import base58
from aries_cloudagent.core.profile import Profile
from aries_cloudagent.storage.base import BaseStorage
from aries_cloudagent.wallet.base import BaseWallet
from aries_cloudagent.wallet.default_verification_key_strategy import BaseVerificationKeyStrategy
from aries_cloudagent.wallet.error import WalletNotFoundError
from aries_cloudagent.wallet.key_type import KeyType
from pydid.verification_method import JsonWebKey2020, Ed25519VerificationKey2018
from didmanagement.retention import StorageBackendStorageStrategy
Did = str
logger = logging.getLogger(__name__)
def json_web_key_2020(did_value: Did, key_index: int, key: bytes) -> Tuple[JsonWebKey2020, List[str]]:
return JsonWebKey2020(
id=_verification_method_id(did_value, key_index),
type=JsonWebKey2020.__name__,
controller=did_value,
public_key_jwk={
"kty": "OKP",
# TODO: remove hard-coding if we want to support more key types
"crv": "Ed25519",
"x": base64.b64encode(key),
},
), ["https://w3id.org/security/suite/jws-2020/v1"]
def ed25519_verification_key_2018(
did_value: Did, key_index: int, key: bytes
) -> Tuple[Ed25519VerificationKey2018, List[str]]:
return Ed25519VerificationKey2018(
id=_verification_method_id(did_value, key_index),
type=Ed25519VerificationKey2018.__name__,
controller=did_value,
public_key_base58=base58.b58encode(key),
), ["https://w3id.org/security/suites/ed25519-2018/v1"]
class LatestVerificationKeyStrategy(BaseVerificationKeyStrategy):
async def get_verification_method_id_for_did(self, did: str,
profile: Optional[Profile],
allowed_verification_method_types: Optional[List[KeyType]] = None,
proof_purpose: Optional[str] = None) -> Optional[str]:
async with profile.session() as session:
wallet = session.inject(BaseWallet)
try:
# Check is DID is known
await wallet.get_local_did(did.replace("did:sov:", ""))
# DID is known, get current keys count and derive key ID
storage = session.inject(BaseStorage)
storage_strategy = StorageBackendStorageStrategy(storage)
curr_idx = await storage_strategy.current_index(did)
return _verification_method_id(did, curr_idx)
except WalletNotFoundError:
# DID is unknown
return None
def _verification_method_id(did_value: Did, key_index: int) -> str:
return f"{did_value}#key-{key_index}"