-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathprogress_certificate.py
134 lines (102 loc) · 4.88 KB
/
progress_certificate.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
import logging
import math
import sys
from typing import List, Dict, Tuple
from utils.config import SystemConfig
from utils.types import NodeId, RegencyNumber
from messages import Message, MessageType, parse_message, parse_message_from_dict
class CommitProof:
def __init__(self, system_config: SystemConfig):
self.accepted_values: Dict[NodeId, Message] = {}
self.system_config = system_config
def add_part(self, accepted_message: Message):
assert accepted_message.type == MessageType.ACCEPTED
vk = self.system_config.all_nodes[accepted_message.sender_id].verifying_key
if not accepted_message.verify(vk):
logging.warn(f"Signature verification failed for {accepted_message}")
return
if accepted_message.sender_id not in self.accepted_values or (
self.accepted_values[accepted_message.sender_id].get_field("pnumber")
< accepted_message.get_field("pnumber")
):
self.accepted_values[accepted_message.sender_id] = accepted_message
def valid(self, value: int, pnumber: RegencyNumber) -> bool:
vote_count = 0
for accepted_message in self.accepted_values.values():
accepted = (
accepted_message.get_field("value"),
accepted_message.get_field("pnumber"),
)
if (value, pnumber) == accepted:
vote_count += 1
quorum_needed = math.ceil((self.system_config.A + self.system_config.f + 1) / 2)
return vote_count >= quorum_needed
def as_list(self) -> List:
return [message.__dict__() for message in self.accepted_values.values()]
@classmethod
def decode(cls, system_config: SystemConfig, accepted_message_dicts: List[Dict]):
commit_proof = CommitProof(system_config)
try:
for message_dict in accepted_message_dicts:
message = parse_message_from_dict(message_dict)
if message is not None:
commit_proof.add_part(message)
except Exception as e:
logging.warn(f"Failed to decode commit proof: {e}")
return CommitProof(system_config)
return commit_proof
class ProgressCertificate:
def __init__(self, system_config: SystemConfig):
self.replies: Dict[NodeId, Message] = {}
self.system_config = system_config
def add_part(self, reply_message: Message):
assert reply_message.type == MessageType.REPLY
vk = self.system_config.all_nodes[reply_message.sender_id].verifying_key
if not reply_message.verify(vk):
logging.warn(f"Signature verification failed for {reply_message}")
return
self.replies[reply_message.sender_id] = reply_message
def has_quorum(self) -> bool:
return len(self.replies) >= self.system_config.A - self.system_config.f
def vouches_for(self, value: int, pnumber: RegencyNumber) -> bool:
counts: Dict[int, int] = {}
for reply in self.replies.values():
accepted_value = reply.get_field("accepted_value")
if accepted_value is not None:
counts[accepted_value] = counts.get(accepted_value, 0) + 1
for reply_value, count in counts.items():
if reply_value != value and count >= math.ceil(
(self.system_config.A - self.system_config.f + 1) / 2
):
return False
for reply in self.replies.values():
commit_proof = CommitProof.decode(
self.system_config, reply.get_field("commit_proof")
)
accepted_value = reply.get_field("accepted_value")
if value != accepted_value and commit_proof.valid(accepted_value, pnumber):
return False
return True
def get_value_to_propose(self, original_proposal: int, pnumber: int) -> int:
# print([reply.get_field("accepted_value") for reply in self.replies.values()])
accepted_values = set(
reply.get_field("accepted_value") for reply in self.replies.values()
)
for accepted_value in accepted_values:
if accepted_value is not None and self.vouches_for(accepted_value, pnumber):
return accepted_value
return original_proposal
def as_list(self) -> List[Dict]:
return [message.__dict__() for message in self.replies.values()]
@classmethod
def decode(cls, system_config: SystemConfig, reply_dicts: List[Dict]):
progress_cert = ProgressCertificate(system_config)
try:
for reply_dict in reply_dicts:
message = parse_message_from_dict(reply_dict)
if message is not None:
progress_cert.add_part(message)
except Exception as e:
logging.warn(f"Failed to decode progress certificate: {e}")
return ProgressCertificate(system_config)
return progress_cert