From c9127e1990de99cf7d99f743e72af0cb063bfac1 Mon Sep 17 00:00:00 2001 From: Alexander Block Date: Sun, 17 Feb 2019 12:38:56 +0100 Subject: [PATCH 01/10] Avoid using ordered maps in LLMQ signing code (#2708) * Implement and use SigShareMap instead of ordered map with helper methods The old implementation was relying on the maps being ordered, which allowed us to grab all sig shares for the same signHash by doing range queries on the map. This has the disadvantage of being unnecessarily slow when the maps get larger. Using an unordered map would be the naive solution, but then it's not possible to query by range anymore. The solution now is to have a specialized map "SigShareMap" which is indexed by "SigShareKey". It's internally just an unordered map, indexed by the sign hash and another unordered map for the value, indexed by the quorum member index. * Only use unordered maps/sets in CSigSharesManager These are faster when maps/sets get larger. * Use unorderes sets/maps in CSigningManager --- src/llmq/quorums_signing.cpp | 18 +-- src/llmq/quorums_signing.h | 22 +-- src/llmq/quorums_signing_shares.cpp | 238 +++++++++++++--------------- src/llmq/quorums_signing_shares.h | 200 +++++++++++++++++++++-- 4 files changed, 310 insertions(+), 168 deletions(-) diff --git a/src/llmq/quorums_signing.cpp b/src/llmq/quorums_signing.cpp index 896e38a14d743..95588d56d4ee3 100644 --- a/src/llmq/quorums_signing.cpp +++ b/src/llmq/quorums_signing.cpp @@ -17,6 +17,7 @@ #include #include +#include namespace llmq { @@ -155,7 +156,7 @@ void CRecoveredSigsDb::WriteRecoveredSig(const llmq::CRecoveredSig& recSig) } } -template +template static void TruncateCacheMap(std::unordered_map>& m, size_t maxSize, size_t truncateThreshold) { typedef typename std::unordered_map> Map; @@ -355,8 +356,8 @@ bool CSigningManager::PreVerifyRecoveredSig(NodeId nodeId, const CRecoveredSig& void CSigningManager::CollectPendingRecoveredSigsToVerify( size_t maxUniqueSessions, - std::map>& retSigShares, - std::map, CQuorumCPtr>& retQuorums) + std::unordered_map>& retSigShares, + std::unordered_map, CQuorumCPtr>& retQuorums) { { LOCK(cs); @@ -364,9 +365,8 @@ void CSigningManager::CollectPendingRecoveredSigsToVerify( return; } - std::set> uniqueSignHashes; - llmq::utils::IterateNodesRandom( - pendingRecoveredSigs, [&]() { return uniqueSignHashes.size() < maxUniqueSessions; }, [&](NodeId nodeId, std::list& ns) { + std::unordered_set> uniqueSignHashes; + llmq::utils::IterateNodesRandom(pendingRecoveredSigs, [&]() { return uniqueSignHashes.size() < maxUniqueSessions; }, [&](NodeId nodeId, std::list& ns) { if (ns.empty()) { return false; } @@ -419,8 +419,8 @@ void CSigningManager::CollectPendingRecoveredSigsToVerify( bool CSigningManager::ProcessPendingRecoveredSigs(CConnman& connman) { - std::map> recSigsByNode; - std::map, CQuorumCPtr> quorums; + std::unordered_map> recSigsByNode; + std::unordered_map, CQuorumCPtr> quorums; CollectPendingRecoveredSigsToVerify(32, recSigsByNode, quorums); if (recSigsByNode.empty()) { @@ -449,7 +449,7 @@ bool CSigningManager::ProcessPendingRecoveredSigs(CConnman& connman) LogPrintf("llmq", "CSigningManager::%s -- verified recovered sig(s). count=%d, vt=%d, nodes=%d\n", __func__, verifyCount, verifyTimer.count(), recSigsByNode.size()); - std::set processed; + std::unordered_set processed; for (auto& p : recSigsByNode) { NodeId nodeId = p.first; auto& v = p.second; diff --git a/src/llmq/quorums_signing.h b/src/llmq/quorums_signing.h index c76a4a3e8b2b2..ca81caffac80d 100644 --- a/src/llmq/quorums_signing.h +++ b/src/llmq/quorums_signing.h @@ -14,16 +14,16 @@ #include -namespace std { - template <> - struct hash> +namespace std +{ +template <> +struct hash> { + std::size_t operator()(const std::pair& k) const { - std::size_t operator()(const std::pair& k) const - { - return (std::size_t)((k.first + 1) * k.second.GetCheapHash()); - } - }; -} + return (std::size_t)((k.first + 1) * k.second.GetCheapHash()); + } +}; +} // namespace std namespace llmq { @@ -136,7 +136,7 @@ class CSigningManager CRecoveredSigsDb db; // Incoming and not verified yet - std::map> pendingRecoveredSigs; + std::unordered_map> pendingRecoveredSigs; // must be protected by cs FastRandomContext rnd; @@ -157,7 +157,7 @@ class CSigningManager void ProcessMessageRecoveredSig(CNode* pfrom, const CRecoveredSig& recoveredSig, CConnman& connman); bool PreVerifyRecoveredSig(NodeId nodeId, const CRecoveredSig& recoveredSig, bool& retBan); - void CollectPendingRecoveredSigsToVerify(size_t maxUniqueSessions, std::map>& retSigShares, std::map, CQuorumCPtr>& retQuorums); + void CollectPendingRecoveredSigsToVerify(size_t maxUniqueSessions, std::unordered_map>& retSigShares, std::unordered_map, CQuorumCPtr>& retQuorums); bool ProcessPendingRecoveredSigs(CConnman& connman); // called from the worker thread of CSigSharesManager void ProcessRecoveredSig(NodeId nodeId, const CRecoveredSig& recoveredSig, const CQuorumCPtr& quorum, CConnman& connman); void Cleanup(); // called from the worker thread of CSigSharesManager diff --git a/src/llmq/quorums_signing_shares.cpp b/src/llmq/quorums_signing_shares.cpp index cfed70077686e..6f0314b6a2318 100644 --- a/src/llmq/quorums_signing_shares.cpp +++ b/src/llmq/quorums_signing_shares.cpp @@ -24,32 +24,6 @@ namespace llmq std::unique_ptr quorumSigSharesManager{nullptr}; -template -static std::pair FindBySignHash(const M& m, const uint256& signHash) -{ - return std::make_pair( - m.lower_bound(std::make_pair(signHash, (uint16_t)0)), - m.upper_bound(std::make_pair(signHash, std::numeric_limits::max()))); -} -template -static size_t CountBySignHash(const M& m, const uint256& signHash) -{ - auto itPair = FindBySignHash(m, signHash); - size_t count = 0; - while (itPair.first != itPair.second) { - count++; - ++itPair.first; - } - return count; -} - -template -static void RemoveBySignHash(M& m, const uint256& signHash) -{ - auto itPair = FindBySignHash(m, signHash); - m.erase(itPair.first, itPair.second); -} - void CSigShare::UpdateKey() { key.first = llmq::utils::BuildSignHash(*this); @@ -160,9 +134,8 @@ void CSigSharesNodeState::MarkKnows(Consensus::LLMQType llmqType, const uint256& void CSigSharesNodeState::RemoveSession(const uint256& signHash) { sessions.erase(signHash); - // pendingIncomingRecSigs.erase(signHash); - RemoveBySignHash(requestedSigShares, signHash); - RemoveBySignHash(pendingIncomingSigShares, signHash); + requestedSigShares.EraseAllForSignHash(signHash); + pendingIncomingSigShares.EraseAllForSignHash(signHash); } CSigSharesInv CBatchedSigShares::ToInv() const @@ -304,13 +277,13 @@ void CSigSharesManager::ProcessMessageBatchedSigShares(CNode* pfrom, const CBatc for (size_t i = 0; i < batchedSigShares.sigShares.size(); i++) { CSigShare sigShare = batchedSigShares.RebuildSigShare(i); - nodeState.requestedSigShares.erase(sigShare.GetKey()); + nodeState.requestedSigShares.Erase(sigShare.GetKey()); // TODO track invalid sig shares received for PoSe? // It's important to only skip seen *valid* sig shares here. If a node sends us a // batch of mostly valid sig shares with a single invalid one and thus batched // verification fails, we'd skip the valid ones in the future if received from other nodes - if (this->sigShares.count(sigShare.GetKey())) { + if (this->sigShares.Has(sigShare.GetKey())) { continue; } @@ -333,7 +306,7 @@ void CSigSharesManager::ProcessMessageBatchedSigShares(CNode* pfrom, const CBatc LOCK(cs); auto& nodeState = nodeStates[pfrom->GetId()]; for (auto& s : sigShares) { - nodeState.pendingIncomingSigShares.emplace(s.GetKey(), s); + nodeState.pendingIncomingSigShares.Add(s.GetKey(), s); } } @@ -370,7 +343,7 @@ bool CSigSharesManager::PreVerifyBatchedSigShares(NodeId nodeId, const CBatchedS return false; } - std::set dupMembers; + std::unordered_set dupMembers; for (size_t i = 0; i < batchedSigShares.sigShares.size(); i++) { auto quorumMember = batchedSigShares.sigShares[i].first; @@ -395,8 +368,8 @@ bool CSigSharesManager::PreVerifyBatchedSigShares(NodeId nodeId, const CBatchedS void CSigSharesManager::CollectPendingSigSharesToVerify( size_t maxUniqueSessions, - std::map>& retSigShares, - std::map, CQuorumCPtr>& retQuorums) + std::unordered_map>& retSigShares, + std::unordered_map, CQuorumCPtr>& retQuorums) { { LOCK(cs); @@ -410,21 +383,20 @@ void CSigSharesManager::CollectPendingSigSharesToVerify( // invalid, making batch verification fail and revert to per-share verification, which in turn would slow down // the whole verification process - std::set> uniqueSignHashes; - llmq::utils::IterateNodesRandom( - nodeStates, [&]() { return uniqueSignHashes.size() < maxUniqueSessions; }, [&](NodeId nodeId, CSigSharesNodeState& ns) { - if (ns.pendingIncomingSigShares.empty()) { + std::unordered_set> uniqueSignHashes; + llmq::utils::IterateNodesRandom(nodeStates, [&]() { return uniqueSignHashes.size() < maxUniqueSessions; }, [&](NodeId nodeId, CSigSharesNodeState& ns) { + if (ns.pendingIncomingSigShares.Empty()) { return false; } - auto& sigShare = ns.pendingIncomingSigShares.begin()->second; + auto& sigShare = *ns.pendingIncomingSigShares.GetFirst(); - bool alreadyHave = this->sigShares.count(sigShare.GetKey()) != 0; + bool alreadyHave = this->sigShares.Has(sigShare.GetKey()); if (!alreadyHave) { uniqueSignHashes.emplace(nodeId, sigShare.GetSignHash()); retSigShares[nodeId].emplace_back(sigShare); } - ns.pendingIncomingSigShares.erase(ns.pendingIncomingSigShares.begin()); - return !ns.pendingIncomingSigShares.empty(); }, rnd); + ns.pendingIncomingSigShares.Erase(sigShare.GetKey()); + return !ns.pendingIncomingSigShares.Empty(); }, rnd); if (retSigShares.empty()) { return; @@ -455,8 +427,8 @@ void CSigSharesManager::CollectPendingSigSharesToVerify( bool CSigSharesManager::ProcessPendingSigShares(CConnman& connman) { - std::map> sigSharesByNodes; - std::map, CQuorumCPtr> quorums; + std::unordered_map> sigSharesByNodes; + std::unordered_map, CQuorumCPtr> quorums; CollectPendingSigSharesToVerify(32, sigSharesByNodes, quorums); if (sigSharesByNodes.empty()) { @@ -525,7 +497,7 @@ bool CSigSharesManager::ProcessPendingSigShares(CConnman& connman) } // It's ensured that no duplicates are passed to this method -void CSigSharesManager::ProcessPendingSigSharesFromNode(NodeId nodeId, const std::vector& sigShares, const std::map, CQuorumCPtr>& quorums, CConnman& connman) +void CSigSharesManager::ProcessPendingSigSharesFromNode(NodeId nodeId, const std::vector& sigShares, const std::unordered_map, CQuorumCPtr>& quorums, CConnman& connman) { LOCK(cs); auto& nodeState = nodeStates[nodeId]; @@ -571,11 +543,11 @@ void CSigSharesManager::ProcessSigShare(NodeId nodeId, const CSigShare& sigShare { LOCK(cs); - if (!sigShares.emplace(sigShare.GetKey(), sigShare).second) { + if (!sigShares.Add(sigShare.GetKey(), sigShare)) { return; } - sigSharesToAnnounce.emplace(sigShare.GetKey()); + sigSharesToAnnounce.Add(sigShare.GetKey(), true); firstSeenForSessions.emplace(sigShare.GetSignHash(), GetTimeMillis()); if (!quorumNodes.empty()) { @@ -591,7 +563,7 @@ void CSigSharesManager::ProcessSigShare(NodeId nodeId, const CSigShare& sigShare } } - size_t sigShareCount = CountBySignHash(sigShares, sigShare.GetSignHash()); + size_t sigShareCount = sigShares.CountForSignHash(sigShare.GetSignHash()); if (sigShareCount >= quorum->params.threshold) { canTryRecovery = true; } @@ -616,11 +588,14 @@ void CSigSharesManager::TryRecoverSig(const CQuorumCPtr& quorum, const uint256& auto k = std::make_pair(quorum->params.type, id); auto signHash = llmq::utils::BuildSignHash(quorum->params.type, quorum->pindexQuorum->GetBlockHash(), id, msgHash); - auto itPair = FindBySignHash(sigShares, signHash); + auto sigShares = this->sigShares.GetAllForSignHash(signHash); + if (!sigShares) { + return; + } sigSharesForRecovery.reserve((size_t)quorum->params.threshold); idsForRecovery.reserve((size_t)quorum->params.threshold); - for (auto it = itPair.first; it != itPair.second && sigSharesForRecovery.size() < quorum->params.threshold; ++it) { + for (auto it = sigShares->begin(); it != sigShares->end() && sigSharesForRecovery.size() < quorum->params.threshold; ++it) { auto& sigShare = it->second; sigSharesForRecovery.emplace_back(sigShare.sigShare.Get()); idsForRecovery.emplace_back(CBLSId(quorum->members[sigShare.quorumMember]->proTxHash)); @@ -665,10 +640,10 @@ void CSigSharesManager::TryRecoverSig(const CQuorumCPtr& quorum, const uint256& } // cs must be held -void CSigSharesManager::CollectSigSharesToRequest(std::map>& sigSharesToRequest) +void CSigSharesManager::CollectSigSharesToRequest(std::unordered_map>& sigSharesToRequest) { int64_t now = GetTimeMillis(); - std::map> nodesBySigShares; + std::unordered_map> nodesBySigShares; const size_t maxRequestsForNode = 32; @@ -690,18 +665,17 @@ void CSigSharesManager::CollectSigSharesToRequest(std::mapsecond >= SIG_SHARE_REQUEST_TIMEOUT) { + nodeState.requestedSigShares.EraseIf([&](const SigShareKey& k, int64_t t) { + if (now - t >= SIG_SHARE_REQUEST_TIMEOUT) { // timeout while waiting for this one, so retry it with another node - LogPrintf("llmq", "CSigSharesManager::%s -- timeout while waiting for %s-%d, node=%d\n", __func__, - it->first.first.ToString(), it->first.second, nodeId); - it = nodeState.requestedSigShares.erase(it); - } else { - ++it; + LogPrintf("llmq", "CSigSharesManager::CollectSigSharesToRequest -- timeout while waiting for %s-%d, node=%d\n", + k.first.ToString(), k.second, nodeId); + return true; } - } + return false; + }); - std::map* invMap = nullptr; + std::unordered_map* invMap = nullptr; for (auto& p2 : nodeState.sessions) { auto& signHash = p2.first; @@ -716,21 +690,21 @@ void CSigSharesManager::CollectSigSharesToRequest(std::map= maxRequestsForNode) { + if (nodeState.requestedSigShares.Size() >= maxRequestsForNode) { // too many pending requests for this node break; } - auto it = sigSharesRequested.find(k); - if (it != sigSharesRequested.end()) { - if (now - it->second.second >= SIG_SHARE_REQUEST_TIMEOUT && nodeId != it->second.first) { + auto p = sigSharesRequested.Get(k); + if (p) { + if (now - p->second >= SIG_SHARE_REQUEST_TIMEOUT && nodeId != p->first) { // other node timed out, re-request from this node LogPrintf("llmq", "CSigSharesManager::%s -- other node timeout while waiting for %s-%d, re-request from=%d, node=%d\n", __func__, - it->first.first.ToString(), it->first.second, nodeId, it->second.first); + k.first.ToString(), k.second, nodeId, p->first); } else { continue; } @@ -738,10 +712,10 @@ void CSigSharesManager::CollectSigSharesToRequest(std::map>& sigSharesToSend) +void CSigSharesManager::CollectSigSharesToSend(std::unordered_map>& sigSharesToSend) { for (auto& p : nodeStates) { auto nodeId = p.first; @@ -772,7 +746,7 @@ void CSigSharesManager::CollectSigSharesToSend(std::map* sigSharesToSend2 = nullptr; + std::unordered_map* sigSharesToSend2 = nullptr; for (auto& p2 : nodeState.sessions) { auto& signHash = p2.first; @@ -791,21 +765,20 @@ void CSigSharesManager::CollectSigSharesToSend(std::mapsecond; if (batchedSigShares.sigShares.empty()) { - batchedSigShares.llmqType = sigShare.llmqType; - batchedSigShares.quorumHash = sigShare.quorumHash; - batchedSigShares.id = sigShare.id; - batchedSigShares.msgHash = sigShare.msgHash; + batchedSigShares.llmqType = sigShare->llmqType; + batchedSigShares.quorumHash = sigShare->quorumHash; + batchedSigShares.id = sigShare->id; + batchedSigShares.msgHash = sigShare->msgHash; } - batchedSigShares.sigShares.emplace_back((uint16_t)i, sigShare.sigShare); + batchedSigShares.sigShares.emplace_back((uint16_t)i, sigShare->sigShare); } if (!batchedSigShares.sigShares.empty()) { @@ -820,20 +793,19 @@ void CSigSharesManager::CollectSigSharesToSend(std::map>& sigSharesToAnnounce) +void CSigSharesManager::CollectSigSharesToAnnounce(std::unordered_map>& sigSharesToAnnounce) { - std::set> quorumNodesPrepared; + std::unordered_set> quorumNodesPrepared; - for (auto& sigShareKey : this->sigSharesToAnnounce) { + this->sigSharesToAnnounce.ForEach([&](const SigShareKey& sigShareKey, bool) { auto& signHash = sigShareKey.first; auto quorumMember = sigShareKey.second; - auto sigShareIt = sigShares.find(sigShareKey); - if (sigShareIt == sigShares.end()) { - continue; + const CSigShare* sigShare = sigShares.Get(sigShareKey); + if (!sigShare) { + return; } - auto& sigShare = sigShareIt->second; - auto quorumKey = std::make_pair((Consensus::LLMQType)sigShare.llmqType, sigShare.quorumHash); + auto quorumKey = std::make_pair((Consensus::LLMQType)sigShare->llmqType, sigShare->quorumHash); if (quorumNodesPrepared.emplace(quorumKey).second) { // make sure we announce to at least the nodes which we know through the inter-quorum-communication system @@ -861,7 +833,7 @@ void CSigSharesManager::CollectSigSharesToAnnounce(std::mapllmqType, signHash); if (session.knows.inv[quorumMember]) { // he already knows that one @@ -870,18 +842,18 @@ void CSigSharesManager::CollectSigSharesToAnnounce(std::mapllmqType, signHash); } inv.inv[quorumMember] = true; session.knows.inv[quorumMember] = true; } - } + }); // don't announce these anymore // nodes which did not send us a valid sig share before were left out now, but this is ok as it only results in slower // propagation for the first signing session of a fresh quorum. The sig shares should still arrive on all nodes due to // the deterministic inter-quorum-communication system - this->sigSharesToAnnounce.clear(); + this->sigSharesToAnnounce.Clear(); } bool CSigSharesManager::SendMessages() @@ -891,9 +863,9 @@ bool CSigSharesManager::SendMessages() nodesByAddress.emplace(pnode->addr, pnode->GetId()); }); - std::map> sigSharesToRequest; - std::map> sigSharesToSend; - std::map> sigSharesToAnnounce; + std::unordered_map> sigSharesToRequest; + std::unordered_map> sigSharesToSend; + std::unordered_map> sigSharesToAnnounce; { LOCK(cs); @@ -953,13 +925,13 @@ void CSigSharesManager::Cleanup() return; } - std::set> quorumsToCheck; + std::unordered_set> quorumsToCheck; { LOCK(cs); // Remove sessions which timed out - std::set timeoutSessions; + std::unordered_set timeoutSessions; for (auto& p : firstSeenForSessions) { auto& signHash = p.first; int64_t time = p.second; @@ -969,13 +941,15 @@ void CSigSharesManager::Cleanup() } } for (auto& signHash : timeoutSessions) { - size_t count = CountBySignHash(sigShares, signHash); + size_t count = sigShares.CountForSignHash(signHash); if (count > 0) { - auto itPair = FindBySignHash(sigShares, signHash); - auto& firstSigShare = itPair.first->second; + auto m = sigShares.GetAllForSignHash(signHash); + assert(m); + + auto& oneSigShare = m->begin()->second; LogPrintf("CSigSharesManager::%s -- signing session timed out. signHash=%s, id=%s, msgHash=%s, sigShareCount=%d\n", __func__, - signHash.ToString(), firstSigShare.id.ToString(), firstSigShare.msgHash.ToString(), count); + signHash.ToString(), oneSigShare.id.ToString(), oneSigShare.msgHash.ToString(), count); } else { LogPrintf("CSigSharesManager::%s -- signing session timed out. signHash=%s, sigShareCount=%d\n", __func__, signHash.ToString(), count); @@ -984,22 +958,22 @@ void CSigSharesManager::Cleanup() } // Remove sessions which were successfully recovered - std::set doneSessions; - for (auto& p : sigShares) { - if (doneSessions.count(p.second.GetSignHash())) { - continue; + std::unordered_set doneSessions; + sigShares.ForEach([&](const SigShareKey& k, const CSigShare& sigShare) { + if (doneSessions.count(sigShare.GetSignHash())) { + return; } - if (quorumSigningManager->HasRecoveredSigForSession(p.second.GetSignHash())) { - doneSessions.emplace(p.second.GetSignHash()); + if (quorumSigningManager->HasRecoveredSigForSession(sigShare.GetSignHash())) { + doneSessions.emplace(sigShare.GetSignHash()); } - } + }); for (auto& signHash : doneSessions) { RemoveSigSharesForSession(signHash); } - for (auto& p : sigShares) { - quorumsToCheck.emplace((Consensus::LLMQType)p.second.llmqType, p.second.quorumHash); - } + sigShares.ForEach([&](const SigShareKey& k, const CSigShare& sigShare) { + quorumsToCheck.emplace((Consensus::LLMQType)sigShare.llmqType, sigShare.quorumHash); + }); } // Find quorums which became inactive @@ -1014,19 +988,19 @@ void CSigSharesManager::Cleanup() { // Now delete sessions which are for inactive quorums LOCK(cs); - std::set inactiveQuorumSessions; - for (auto& p : sigShares) { - if (quorumsToCheck.count(std::make_pair((Consensus::LLMQType)p.second.llmqType, p.second.quorumHash))) { - inactiveQuorumSessions.emplace(p.second.GetSignHash()); + std::unordered_set inactiveQuorumSessions; + sigShares.ForEach([&](const SigShareKey& k, const CSigShare& sigShare) { + if (quorumsToCheck.count(std::make_pair((Consensus::LLMQType)sigShare.llmqType, sigShare.quorumHash))) { + inactiveQuorumSessions.emplace(sigShare.GetSignHash()); } - } + }); for (auto& signHash : inactiveQuorumSessions) { RemoveSigSharesForSession(signHash); } } // Find node states for peers that disappeared from CConnman - std::set nodeStatesToDelete; + std::unordered_set nodeStatesToDelete; for (auto& p : nodeStates) { nodeStatesToDelete.emplace(p.first); } @@ -1039,9 +1013,9 @@ void CSigSharesManager::Cleanup() for (auto nodeId : nodeStatesToDelete) { auto& nodeState = nodeStates[nodeId]; // remove global requested state to force a re-request from another node - for (auto& p : nodeState.requestedSigShares) { - sigSharesRequested.erase(p.first); - } + nodeState.requestedSigShares.ForEach([&](const SigShareKey& k, bool) { + sigSharesRequested.Erase(k); + }); nodeStates.erase(nodeId); } @@ -1055,9 +1029,9 @@ void CSigSharesManager::RemoveSigSharesForSession(const uint256& signHash) ns.RemoveSession(signHash); } - RemoveBySignHash(sigSharesRequested, signHash); - RemoveBySignHash(sigSharesToAnnounce, signHash); - RemoveBySignHash(sigShares, signHash); + sigSharesRequested.EraseAllForSignHash(signHash); + sigSharesToAnnounce.EraseAllForSignHash(signHash); + sigShares.EraseAllForSignHash(signHash); firstSeenForSessions.erase(signHash); } @@ -1066,13 +1040,13 @@ void CSigSharesManager::RemoveBannedNodeStates() // Called regularly to cleanup local node states for banned nodes LOCK2(cs, cs_main); - std::set toRemove; + std::unordered_set toRemove; for (auto it = nodeStates.begin(); it != nodeStates.end();) { if (IsBanned(it->first)) { // re-request sigshares from other nodes - for (auto& p : it->second.requestedSigShares) { - sigSharesRequested.erase(p.first); - } + it->second.requestedSigShares.ForEach([&](const SigShareKey& k, int64_t) { + sigSharesRequested.Erase(k); + }); it = nodeStates.erase(it); } else { ++it; @@ -1099,10 +1073,10 @@ void CSigSharesManager::BanNode(NodeId nodeId) auto& nodeState = it->second; // Whatever we requested from him, let's request it from someone else now - for (auto& p : nodeState.requestedSigShares) { - sigSharesRequested.erase(p.first); - } - nodeState.requestedSigShares.clear(); + nodeState.requestedSigShares.ForEach([&](const SigShareKey& k, int64_t) { + sigSharesRequested.Erase(k); + }); + nodeState.requestedSigShares.Clear(); nodeState.banned = true; } @@ -1127,7 +1101,7 @@ void CSigSharesManager::WorkThreadMain() quorumSigningManager->Cleanup(); // TODO Wakeup when pending signing is needed? - if(!didWork) { + if (!didWork) { if (!interruptSigningShare.sleep_for(std::chrono::milliseconds(100))) { return; } diff --git a/src/llmq/quorums_signing_shares.h b/src/llmq/quorums_signing_shares.h index 23e403517b846..7768261c3a234 100644 --- a/src/llmq/quorums_signing_shares.h +++ b/src/llmq/quorums_signing_shares.h @@ -19,15 +19,38 @@ #include #include +#include +#include class CEvoDB; class CScheduler; namespace llmq { - // typedef std::pair SigShareKey; +} // namespace llmq + +namespace std +{ +template <> +struct hash { + std::size_t operator()(const llmq::SigShareKey& k) const + { + return (std::size_t)((k.second + 1) * k.first.GetCheapHash()); + } +}; +template <> +struct hash> { + std::size_t operator()(const std::pair& k) const + { + return (std::size_t)((k.first + 1) * k.second.GetCheapHash()); + } +}; +} // namespace std + +namespace llmq +{ // this one does not get transmitted over the wire as it is batched inside CBatchedSigShares class CSigShare @@ -124,6 +147,151 @@ class CBatchedSigShares CSigSharesInv ToInv() const; }; +template +class SigShareMap +{ +private: + std::unordered_map> internalMap; + +public: + bool Add(const SigShareKey& k, const T& v) + { + auto& m = internalMap[k.first]; + return m.emplace(k.second, v).second; + } + + void Erase(const SigShareKey& k) + { + auto it = internalMap.find(k.first); + if (it == internalMap.end()) { + return; + } + it->second.erase(k.second); + if (it->second.empty()) { + internalMap.erase(it); + } + } + + void Clear() + { + internalMap.clear(); + } + + bool Has(const SigShareKey& k) const + { + auto it = internalMap.find(k.first); + if (it == internalMap.end()) { + return false; + } + return it->second.count(k.second) != 0; + } + + T* Get(const SigShareKey& k) + { + auto it = internalMap.find(k.first); + if (it == internalMap.end()) { + return nullptr; + } + + auto jt = it->second.find(k.second); + if (jt == it->second.end()) { + return nullptr; + } + + return &jt->second; + } + + T& GetOrAdd(const SigShareKey& k) + { + T* v = Get(k); + if (!v) { + Add(k, T()); + v = Get(k); + } + return *v; + } + + const T* GetFirst() const + { + if (internalMap.empty()) { + return nullptr; + } + return &internalMap.begin()->second.begin()->second; + } + + size_t Size() const + { + size_t s = 0; + for (auto& p : internalMap) { + s += p.second.size(); + } + return s; + } + + size_t CountForSignHash(const uint256& signHash) const + { + auto it = internalMap.find(signHash); + if (it == internalMap.end()) { + return 0; + } + return it->second.size(); + } + + bool Empty() const + { + return internalMap.empty(); + } + + const std::unordered_map* GetAllForSignHash(const uint256& signHash) + { + auto it = internalMap.find(signHash); + if (it == internalMap.end()) { + return nullptr; + } + return &it->second; + } + + void EraseAllForSignHash(const uint256& signHash) + { + internalMap.erase(signHash); + } + + template + void EraseIf(F&& f) + { + for (auto it = internalMap.begin(); it != internalMap.end();) { + SigShareKey k; + k.first = it->first; + for (auto jt = it->second.begin(); jt != it->second.end();) { + k.second = jt->first; + if (f(k, jt->second)) { + jt = it->second.erase(jt); + } else { + ++jt; + } + } + if (it->second.empty()) { + it = internalMap.erase(it); + } else { + ++it; + } + } + } + + template + void ForEach(F&& f) + { + for (auto& p : internalMap) { + SigShareKey k; + k.first = p.first; + for (auto& p2 : p.second) { + k.second = p2.first; + f(k, p2.second); + } + } + } +}; + class CSigSharesNodeState { public: @@ -132,15 +300,15 @@ class CSigSharesNodeState CSigSharesInv requested; CSigSharesInv knows; }; - // TODO limit number of sessions per node; signHash Session - std::map sessions; + // TODO limit number of sessions per node + std::unordered_map sessions; - std::map pendingIncomingSigShares; - std::map requestedSigShares; + SigShareMap pendingIncomingSigShares; + SigShareMap requestedSigShares; // elements are added whenever we receive a valid sig share from this node // this triggers us to send inventory items to him as he seems to be interested in these - std::set> interestedIn; + std::unordered_set, StaticSaltedHasher> interestedIn; bool banned{false}; @@ -168,12 +336,12 @@ class CSigSharesManager std::thread workThread; CThreadInterrupt interruptSigningShare; - std::map sigShares; - std::map firstSeenForSessions; + SigShareMap sigShares; + std::unordered_map firstSeenForSessions; - std::map nodeStates; - std::map> sigSharesRequested; - std::set sigSharesToAnnounce; + std::unordered_map nodeStates; + SigShareMap> sigSharesRequested; + SigShareMap sigSharesToAnnounce; std::vector> pendingSigns; @@ -204,10 +372,10 @@ class CSigSharesManager bool VerifySigSharesInv(NodeId from, const CSigSharesInv& inv); bool PreVerifyBatchedSigShares(NodeId nodeId, const CBatchedSigShares& batchedSigShares, bool& retBan); - void CollectPendingSigSharesToVerify(size_t maxUniqueSessions, std::map>& retSigShares, std::map, CQuorumCPtr>& retQuorums); + void CollectPendingSigSharesToVerify(size_t maxUniqueSessions, std::unordered_map>& retSigShares, std::unordered_map, CQuorumCPtr>& retQuorums); bool ProcessPendingSigShares(CConnman& connman); - void ProcessPendingSigSharesFromNode(NodeId nodeId, const std::vector& sigShares, const std::map, CQuorumCPtr>& quorums, CConnman& connman); + void ProcessPendingSigSharesFromNode(NodeId nodeId, const std::vector& sigShares, const std::unordered_map, CQuorumCPtr>& quorums, CConnman& connman); void ProcessSigShare(NodeId nodeId, const CSigShare& sigShare, CConnman& connman, const CQuorumCPtr& quorum); void TryRecoverSig(const CQuorumCPtr& quorum, const uint256& id, const uint256& msgHash, CConnman& connman); @@ -220,9 +388,9 @@ class CSigSharesManager void BanNode(NodeId nodeId); bool SendMessages(); - void CollectSigSharesToRequest(std::map>& sigSharesToRequest); - void CollectSigSharesToSend(std::map>& sigSharesToSend); - void CollectSigSharesToAnnounce(std::map>& sigSharesToAnnounce); + void CollectSigSharesToRequest(std::unordered_map>& sigSharesToRequest); + void CollectSigSharesToSend(std::unordered_map>& sigSharesToSend); + void CollectSigSharesToAnnounce(std::unordered_map>& sigSharesToAnnounce); bool SignPendingSigShares(); void WorkThreadMain(); }; From 0613978bfd9a6d712587426533229ffe1cc56ad7 Mon Sep 17 00:00:00 2001 From: Alexander Block Date: Mon, 18 Feb 2019 12:07:57 +0100 Subject: [PATCH 02/10] Cleanup successful sessions before doing timeout check (#2712) Otherwise we get some false-positive timeout messages in logs. --- src/llmq/quorums_signing_shares.cpp | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/src/llmq/quorums_signing_shares.cpp b/src/llmq/quorums_signing_shares.cpp index 6f0314b6a2318..78782e2e60045 100644 --- a/src/llmq/quorums_signing_shares.cpp +++ b/src/llmq/quorums_signing_shares.cpp @@ -930,6 +930,20 @@ void CSigSharesManager::Cleanup() { LOCK(cs); + // Remove sessions which were successfully recovered + std::unordered_set doneSessions; + sigShares.ForEach([&](const SigShareKey& k, const CSigShare& sigShare) { + if (doneSessions.count(sigShare.GetSignHash())) { + return; + } + if (quorumSigningManager->HasRecoveredSigForSession(sigShare.GetSignHash())) { + doneSessions.emplace(sigShare.GetSignHash()); + } + }); + for (auto& signHash : doneSessions) { + RemoveSigSharesForSession(signHash); + } + // Remove sessions which timed out std::unordered_set timeoutSessions; for (auto& p : firstSeenForSessions) { @@ -957,20 +971,6 @@ void CSigSharesManager::Cleanup() RemoveSigSharesForSession(signHash); } - // Remove sessions which were successfully recovered - std::unordered_set doneSessions; - sigShares.ForEach([&](const SigShareKey& k, const CSigShare& sigShare) { - if (doneSessions.count(sigShare.GetSignHash())) { - return; - } - if (quorumSigningManager->HasRecoveredSigForSession(sigShare.GetSignHash())) { - doneSessions.emplace(sigShare.GetSignHash()); - } - }); - for (auto& signHash : doneSessions) { - RemoveSigSharesForSession(signHash); - } - sigShares.ForEach([&](const SigShareKey& k, const CSigShare& sigShare) { quorumsToCheck.emplace((Consensus::LLMQType)sigShare.llmqType, sigShare.quorumHash); }); From a0084f53b30eae7dcb815bd022af378ed13f600c Mon Sep 17 00:00:00 2001 From: Alexander Block Date: Wed, 27 Feb 2019 14:10:12 +0100 Subject: [PATCH 03/10] Multiple fixes and optimizations for LLMQs and ChainLocks (#2724) * Indicate success when signing was unnecessary * Fix typo in name of LLMQ_400_60 * Move RemoveAskFor call for CLSIGs into ProcessNewChainLock In case we got INV items for the same CLSIG that we recreated through HandleNewRecoveredSig, (re-)requesting of the CLSIG from other peers becomes unnecessary. * Move Cleanup() call in CChainLocksHandler::UpdatedBlockTip up We bail out early in a few situations from this method, so that Cleanup() might not be called while its at the bottom. * Bail out from CChainLocksHandler::UpdatedBlockTip if we already got the CLSIG * Call RemoveAskFor when QFCOMMITMENT was received Otherwise we might end up re-requesting it for a very long time when the commitment INV was received shortly before it got mined. * Call RemoveSigSharesForSession when a recovered sig is received Otherwise we end up with session data in node states lingering around until a fake "timeout" occurs (can be seen in the logs). * Better handling of false-positive conflicts in CSigningManager The old code was emitting a lot of messages in logs as it treated sigs for exactly the same session as a conflict. This commit fixes this by looking at the signHash before logging. Also handle a corner-case where a recovered sig might be deleted between the HasRecoveredSigForId and GetRecoveredSigById call. * Don't run into session timeout when sig shares come in slow Instead of just tracking when the first share was received, we now also track when the last (non-duplicate) share was received. Sessios will now timeout 5 minutes after the first share arrives, or 1 minute after the last one arrived. --- src/llmq/quorums_chainlocks.cpp | 20 +++++++++----- src/llmq/quorums_init.cpp | 2 ++ src/llmq/quorums_signing.cpp | 35 ++++++++++++++++++------ src/llmq/quorums_signing_shares.cpp | 42 ++++++++++++++++++++++++----- src/llmq/quorums_signing_shares.h | 13 ++++++--- 5 files changed, 87 insertions(+), 25 deletions(-) diff --git a/src/llmq/quorums_chainlocks.cpp b/src/llmq/quorums_chainlocks.cpp index e907b5bd4b3b0..d06a083565ba6 100644 --- a/src/llmq/quorums_chainlocks.cpp +++ b/src/llmq/quorums_chainlocks.cpp @@ -77,17 +77,17 @@ void CChainLocksHandler::ProcessMessage(CNode* pfrom, const std::string& strComm auto hash = ::SerializeHash(clsig); - { - LOCK(cs_main); - connman.RemoveAskFor(hash, MSG_CLSIG); - } - ProcessNewChainLock(pfrom->GetId(), clsig, hash); } } void CChainLocksHandler::ProcessNewChainLock(NodeId from, const llmq::CChainLockSig& clsig, const uint256& hash) { + { + LOCK(cs_main); + g_connman->RemoveAskFor(hash, MSG_CLSIG); + } + { LOCK(cs); if (!seenChainLocks.emplace(hash, GetTimeMillis()).second) { @@ -190,6 +190,8 @@ void CChainLocksHandler::UpdatedBlockTip(const CBlockIndex* pindexNew, const CBl return; } + Cleanup(); + // DIP8 defines a process called "Signing attempts" which should run before the CLSIG is finalized // To simplify the initial implementation, we skip this process and directly try to create a CLSIG // This will fail when multiple blocks compete, but we accept this for the initial implementation. @@ -201,6 +203,12 @@ void CChainLocksHandler::UpdatedBlockTip(const CBlockIndex* pindexNew, const CBl { LOCK(cs); + if (bestChainLockBlockIndex == pindexNew) { + // we first got the CLSIG, then the header, and then the block was connected. + // In this case there is no need to continue here. + return; + } + if (InternalHasConflictingChainLock(pindexNew->nHeight, pindexNew->GetBlockHash())) { if (!inEnforceBestChainLock) { // we accepted this block when there was no lock yet, but now a conflicting lock appeared. Invalidate it. @@ -226,8 +234,6 @@ void CChainLocksHandler::UpdatedBlockTip(const CBlockIndex* pindexNew, const CBl } quorumSigningManager->AsyncSignIfMember(Params().GetConsensus().llmqChainLocks, requestId, msgHash); - - Cleanup(); } // WARNING: cs_main and cs should not be held! diff --git a/src/llmq/quorums_init.cpp b/src/llmq/quorums_init.cpp index eb9964e61888c..87bd3320baa1b 100644 --- a/src/llmq/quorums_init.cpp +++ b/src/llmq/quorums_init.cpp @@ -54,6 +54,7 @@ void StartLLMQSystem() quorumDKGSessionManager->StartThreads(); } if (quorumSigSharesManager) { + quorumSigSharesManager->RegisterAsRecoveredSigsListener(); quorumSigSharesManager->StartWorkerThread(); } if (chainLocksHandler) { @@ -68,6 +69,7 @@ void StopLLMQSystem() } if (quorumSigSharesManager) { quorumSigSharesManager->StopWorkerThread(); + quorumSigSharesManager->UnregisterAsRecoveredSigsListener(); } if (quorumDKGSessionManager) { quorumDKGSessionManager->StopThreads(); diff --git a/src/llmq/quorums_signing.cpp b/src/llmq/quorums_signing.cpp index 95588d56d4ee3..f9f792e0cca47 100644 --- a/src/llmq/quorums_signing.cpp +++ b/src/llmq/quorums_signing.cpp @@ -494,11 +494,25 @@ void CSigningManager::ProcessRecoveredSig(NodeId nodeId, const CRecoveredSig& re signHash.ToString(), recoveredSig.id.ToString(), recoveredSig.msgHash.ToString(), nodeId); if (db.HasRecoveredSigForId(llmqType, recoveredSig.id)) { - // this should really not happen, as each masternode is participating in only one vote, - // even if it's a member of multiple quorums. so a majority is only possible on one quorum and one msgHash per id - LogPrintf("CSigningManager::%s -- conflicting recoveredSig for id=%s, msgHash=%s\n", __func__, - recoveredSig.id.ToString(), recoveredSig.msgHash.ToString()); - return; + CRecoveredSig otherRecoveredSig; + if (db.GetRecoveredSigById(llmqType, recoveredSig.id, otherRecoveredSig)) { + auto otherSignHash = llmq::utils::BuildSignHash(recoveredSig); + if (signHash != otherSignHash) { + // this should really not happen, as each masternode is participating in only one vote, + // even if it's a member of multiple quorums. so a majority is only possible on one quorum and one msgHash per id + LogPrintf("CSigningManager::%s -- conflicting recoveredSig for signHash=%s, id=%s, msgHash=%s, otherSignHash=%s\n", __func__, + signHash.ToString(), recoveredSig.id.ToString(), recoveredSig.msgHash.ToString(), otherSignHash.ToString()); + } else { + // Looks like we're trying to process a recSig that is already known. This might happen if the same + // recSig comes in through regular QRECSIG messages and at the same time through some other message + // which allowed to reconstruct a recSig (e.g. IXLOCK). In this case, just bail out. + } + return; + } else { + // This case is very unlikely. It can only happen when cleanup caused this specific recSig to vanish + // between the HasRecoveredSigForId and GetRecoveredSigById call. If that happens, treat it as if we + // never had that recSig + } } db.WriteRecoveredSig(recoveredSig); @@ -552,14 +566,19 @@ bool CSigningManager::AsyncSignIfMember(Consensus::LLMQType llmqType, const uint if (db.HasVotedOnId(llmqType, id)) { uint256 prevMsgHash; db.GetVoteForId(llmqType, id, prevMsgHash); - LogPrintf("CSigningManager::%s -- already voted for id=%s and msgHash=%s. Not voting on conflicting msgHash=%s\n", __func__, - id.ToString(), prevMsgHash.ToString(), msgHash.ToString()); + if (msgHash != prevMsgHash) { + LogPrintf("CSigningManager::%s -- already voted for id=%s and msgHash=%s. Not voting on conflicting msgHash=%s\n", __func__, + id.ToString(), prevMsgHash.ToString(), msgHash.ToString()); + } else { + LogPrintf("CSigningManager::%s -- already voted for id=%s and msgHash=%s. Not voting again.\n", __func__, + id.ToString(), prevMsgHash.ToString()); + } return false; } if (db.HasRecoveredSigForId(llmqType, id)) { // no need to sign it if we already have a recovered sig - return false; + return true; } db.WriteVoteForId(llmqType, id, msgHash); } diff --git a/src/llmq/quorums_signing_shares.cpp b/src/llmq/quorums_signing_shares.cpp index 78782e2e60045..617b456e048d4 100644 --- a/src/llmq/quorums_signing_shares.cpp +++ b/src/llmq/quorums_signing_shares.cpp @@ -3,7 +3,8 @@ // Distributed under the MIT/X11 software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. -#include "quorums_signing_shares.h" +#include "quorums_signing.h" + #include "activemasternode.h" #include "bls/bls_batchverifier.h" #include "cxxtimer.h" @@ -11,7 +12,7 @@ #include "net.h" #include "net_processing.h" #include "netmessagemaker.h" -#include "quorums_signing.h" +#include "quorums_signing_shares.h" #include "quorums_utils.h" #include "random.h" #include "shutdown.h" @@ -171,6 +172,16 @@ void CSigSharesManager::StopWorkerThread() } } +void CSigSharesManager::RegisterAsRecoveredSigsListener() +{ + quorumSigningManager->RegisterRecoveredSigsListener(this); +} + +void CSigSharesManager::UnregisterAsRecoveredSigsListener() +{ + quorumSigningManager->UnregisterRecoveredSigsListener(this); +} + void CSigSharesManager::Interrupt() { interruptSigningShare(); @@ -548,7 +559,16 @@ void CSigSharesManager::ProcessSigShare(NodeId nodeId, const CSigShare& sigShare } sigSharesToAnnounce.Add(sigShare.GetKey(), true); - firstSeenForSessions.emplace(sigShare.GetSignHash(), GetTimeMillis()); + + auto it = timeSeenForSessions.find(sigShare.GetSignHash()); + if (it == timeSeenForSessions.end()) { + auto t = GetTimeMillis(); + // insert first-seen and last-seen time + timeSeenForSessions.emplace(sigShare.GetSignHash(), std::make_pair(t, t)); + } else { + // update last-seen time + it->second.second = GetTimeMillis(); + } if (!quorumNodes.empty()) { // don't announce and wait for other nodes to request this share and directly send it to them @@ -946,11 +966,12 @@ void CSigSharesManager::Cleanup() // Remove sessions which timed out std::unordered_set timeoutSessions; - for (auto& p : firstSeenForSessions) { + for (auto& p : timeSeenForSessions) { auto& signHash = p.first; - int64_t time = p.second; + int64_t firstSeenTime = p.second.first; + int64_t lastSeenTime = p.second.second; - if (now - time >= SIGNING_SESSION_TIMEOUT) { + if (now - firstSeenTime >= SESSION_TOTAL_TIMEOUT || now - lastSeenTime >= SESSION_NEW_SHARES_TIMEOUT) { timeoutSessions.emplace(signHash); } } @@ -1032,7 +1053,7 @@ void CSigSharesManager::RemoveSigSharesForSession(const uint256& signHash) sigSharesRequested.EraseAllForSignHash(signHash); sigSharesToAnnounce.EraseAllForSignHash(signHash); sigShares.EraseAllForSignHash(signHash); - firstSeenForSessions.erase(signHash); + timeSeenForSessions.erase(signHash); } void CSigSharesManager::RemoveBannedNodeStates() @@ -1171,4 +1192,11 @@ void CSigSharesManager::Sign(const CQuorumCPtr& quorum, const uint256& id, const sigShare.id.ToString(), sigShare.msgHash.ToString(), t.count()); ProcessSigShare(-1, sigShare, *g_connman, quorum); } + +void CSigSharesManager::HandleNewRecoveredSig(const llmq::CRecoveredSig& recoveredSig) +{ + LOCK(cs); + RemoveSigSharesForSession(llmq::utils::BuildSignHash(recoveredSig)); +} + } // namespace llmq \ No newline at end of file diff --git a/src/llmq/quorums_signing_shares.h b/src/llmq/quorums_signing_shares.h index 7768261c3a234..1dbdc166f6b39 100644 --- a/src/llmq/quorums_signing_shares.h +++ b/src/llmq/quorums_signing_shares.h @@ -325,9 +325,10 @@ class CSigSharesNodeState void RemoveSession(const uint256& signHash); }; -class CSigSharesManager +class CSigSharesManager : public CRecoveredSigsListener { - static const int64_t SIGNING_SESSION_TIMEOUT = 60 * 1000; + static const int64_t SESSION_NEW_SHARES_TIMEOUT = 60 * 1000; + static const int64_t SESSION_TOTAL_TIMEOUT = 5 * 60 * 1000; static const int64_t SIG_SHARE_REQUEST_TIMEOUT = 5 * 1000; private: @@ -337,7 +338,9 @@ class CSigSharesManager CThreadInterrupt interruptSigningShare; SigShareMap sigShares; - std::unordered_map firstSeenForSessions; + + // stores time of first and last receivedSigShare. Used to detect timeouts + std::unordered_map> timeSeenForSessions; std::unordered_map nodeStates; SigShareMap> sigSharesRequested; @@ -357,6 +360,8 @@ class CSigSharesManager void StartWorkerThread(); void StopWorkerThread(); void Interrupt(); + void RegisterAsRecoveredSigsListener(); + void UnregisterAsRecoveredSigsListener(); public: void ProcessMessage(CNode* pnode, const std::string& strCommand, CDataStream& vRecv, CConnman& connman); @@ -364,6 +369,8 @@ class CSigSharesManager void AsyncSign(const CQuorumCPtr& quorum, const uint256& id, const uint256& msgHash); void Sign(const CQuorumCPtr& quorum, const uint256& id, const uint256& msgHash); + void HandleNewRecoveredSig(const CRecoveredSig& recoveredSig); + private: void ProcessMessageSigSharesInv(CNode* pfrom, const CSigSharesInv& inv, CConnman& connman); void ProcessMessageGetSigShares(CNode* pfrom, const CSigSharesInv& inv, CConnman& connman); From 7ccd790d445c4b25a7a0bdb8941bd155a6b644c3 Mon Sep 17 00:00:00 2001 From: Alexander Block Date: Wed, 27 Feb 2019 17:32:50 +0100 Subject: [PATCH 04/10] Merge pull request #2725 from codablock/pr_llmq_hashmaps Add more caching to CRecoveredSigsDb and use salted hashing for externally provided keys --- src/llmq/quorums_signing.cpp | 34 +++++++++++++++++------ src/llmq/quorums_signing.h | 22 +++++---------- src/llmq/quorums_signing_shares.cpp | 39 ++++++++++++++------------- src/llmq/quorums_signing_shares.h | 42 +++++++++-------------------- 4 files changed, 66 insertions(+), 71 deletions(-) diff --git a/src/llmq/quorums_signing.cpp b/src/llmq/quorums_signing.cpp index f9f792e0cca47..016cf324b5da4 100644 --- a/src/llmq/quorums_signing.cpp +++ b/src/llmq/quorums_signing.cpp @@ -80,8 +80,23 @@ bool CRecoveredSigsDb::HasRecoveredSigForSession(const uint256& signHash) bool CRecoveredSigsDb::HasRecoveredSigForHash(const uint256& hash) { + int64_t t = GetTimeMillis(); + + { + LOCK(cs); + auto it = hasSigForHashCache.find(hash); + if (it != hasSigForHashCache.end()) { + it->second.second = t; + return it->second.first; + } + } + auto k = std::make_tuple('h', hash); - return db.Exists(k); + bool ret = db.Exists(k); + + LOCK(cs); + hasSigForHashCache.emplace(hash, std::make_pair(ret, t)); + return ret; } bool CRecoveredSigsDb::ReadRecoveredSig(Consensus::LLMQType llmqType, const uint256& id, CRecoveredSig& ret) @@ -153,13 +168,14 @@ void CRecoveredSigsDb::WriteRecoveredSig(const llmq::CRecoveredSig& recSig) LOCK(cs); hasSigForIdCache[std::make_pair((Consensus::LLMQType)recSig.llmqType, recSig.id)] = std::make_pair(true, t); hasSigForSessionCache[signHash] = std::make_pair(true, t); + hasSigForHashCache[recSig.GetHash()] = std::make_pair(true, t); } } -template -static void TruncateCacheMap(std::unordered_map>& m, size_t maxSize, size_t truncateThreshold) +template +static void TruncateCacheMap(std::unordered_map, H>& m, size_t maxSize, size_t truncateThreshold) { - typedef typename std::unordered_map> Map; + typedef typename std::unordered_map, H> Map; typedef typename Map::iterator Iterator; if (m.size() <= truncateThreshold) { @@ -238,10 +254,12 @@ void CRecoveredSigsDb::CleanupOldRecoveredSigs(int64_t maxAge) hasSigForIdCache.erase(std::make_pair((Consensus::LLMQType)recSig.llmqType, recSig.id)); hasSigForSessionCache.erase(signHash); + hasSigForHashCache.erase(recSig.GetHash()); } TruncateCacheMap(hasSigForIdCache, MAX_CACHE_SIZE, MAX_CACHE_TRUNCATE_THRESHOLD); TruncateCacheMap(hasSigForSessionCache, MAX_CACHE_SIZE, MAX_CACHE_TRUNCATE_THRESHOLD); + TruncateCacheMap(hasSigForHashCache, MAX_CACHE_SIZE, MAX_CACHE_TRUNCATE_THRESHOLD); } for (auto& e : toDelete2) { @@ -357,7 +375,7 @@ bool CSigningManager::PreVerifyRecoveredSig(NodeId nodeId, const CRecoveredSig& void CSigningManager::CollectPendingRecoveredSigsToVerify( size_t maxUniqueSessions, std::unordered_map>& retSigShares, - std::unordered_map, CQuorumCPtr>& retQuorums) + std::unordered_map, CQuorumCPtr, StaticSaltedHasher>& retQuorums) { { LOCK(cs); @@ -365,7 +383,7 @@ void CSigningManager::CollectPendingRecoveredSigsToVerify( return; } - std::unordered_set> uniqueSignHashes; + std::unordered_set, StaticSaltedHasher> uniqueSignHashes; llmq::utils::IterateNodesRandom(pendingRecoveredSigs, [&]() { return uniqueSignHashes.size() < maxUniqueSessions; }, [&](NodeId nodeId, std::list& ns) { if (ns.empty()) { return false; @@ -420,7 +438,7 @@ void CSigningManager::CollectPendingRecoveredSigsToVerify( bool CSigningManager::ProcessPendingRecoveredSigs(CConnman& connman) { std::unordered_map> recSigsByNode; - std::unordered_map, CQuorumCPtr> quorums; + std::unordered_map, CQuorumCPtr, StaticSaltedHasher> quorums; CollectPendingRecoveredSigsToVerify(32, recSigsByNode, quorums); if (recSigsByNode.empty()) { @@ -449,7 +467,7 @@ bool CSigningManager::ProcessPendingRecoveredSigs(CConnman& connman) LogPrintf("llmq", "CSigningManager::%s -- verified recovered sig(s). count=%d, vt=%d, nodes=%d\n", __func__, verifyCount, verifyTimer.count(), recSigsByNode.size()); - std::unordered_set processed; + std::unordered_set processed; for (auto& p : recSigsByNode) { NodeId nodeId = p.first; auto& v = p.second; diff --git a/src/llmq/quorums_signing.h b/src/llmq/quorums_signing.h index ca81caffac80d..22ef82f297c2f 100644 --- a/src/llmq/quorums_signing.h +++ b/src/llmq/quorums_signing.h @@ -10,21 +10,11 @@ #include "chainparams.h" #include "net.h" +#include "saltedhasher.h" #include "sync.h" #include -namespace std -{ -template <> -struct hash> { - std::size_t operator()(const std::pair& k) const - { - return (std::size_t)((k.first + 1) * k.second.GetCheapHash()); - } -}; -} // namespace std - namespace llmq { @@ -77,7 +67,6 @@ class CRecoveredSig } }; -// TODO implement caching to speed things up class CRecoveredSigsDb { static const size_t MAX_CACHE_SIZE = 30000; @@ -87,8 +76,9 @@ class CRecoveredSigsDb CDBWrapper db; RecursiveMutex cs; - std::unordered_map, std::pair> hasSigForIdCache; - std::unordered_map> hasSigForSessionCache; + std::unordered_map, std::pair, StaticSaltedHasher> hasSigForIdCache; + std::unordered_map, StaticSaltedHasher> hasSigForSessionCache; + std::unordered_map, StaticSaltedHasher> hasSigForHashCache; public: CRecoveredSigsDb(bool fMemory); @@ -157,7 +147,9 @@ class CSigningManager void ProcessMessageRecoveredSig(CNode* pfrom, const CRecoveredSig& recoveredSig, CConnman& connman); bool PreVerifyRecoveredSig(NodeId nodeId, const CRecoveredSig& recoveredSig, bool& retBan); - void CollectPendingRecoveredSigsToVerify(size_t maxUniqueSessions, std::unordered_map>& retSigShares, std::unordered_map, CQuorumCPtr>& retQuorums); + void CollectPendingRecoveredSigsToVerify(size_t maxUniqueSessions, + std::unordered_map>& retSigShares, + std::unordered_map, CQuorumCPtr, StaticSaltedHasher>& retQuorums); bool ProcessPendingRecoveredSigs(CConnman& connman); // called from the worker thread of CSigSharesManager void ProcessRecoveredSig(NodeId nodeId, const CRecoveredSig& recoveredSig, const CQuorumCPtr& quorum, CConnman& connman); void Cleanup(); // called from the worker thread of CSigSharesManager diff --git a/src/llmq/quorums_signing_shares.cpp b/src/llmq/quorums_signing_shares.cpp index 617b456e048d4..079fd260c45f7 100644 --- a/src/llmq/quorums_signing_shares.cpp +++ b/src/llmq/quorums_signing_shares.cpp @@ -380,7 +380,7 @@ bool CSigSharesManager::PreVerifyBatchedSigShares(NodeId nodeId, const CBatchedS void CSigSharesManager::CollectPendingSigSharesToVerify( size_t maxUniqueSessions, std::unordered_map>& retSigShares, - std::unordered_map, CQuorumCPtr>& retQuorums) + std::unordered_map, CQuorumCPtr, StaticSaltedHasher>& retQuorums) { { LOCK(cs); @@ -394,7 +394,7 @@ void CSigSharesManager::CollectPendingSigSharesToVerify( // invalid, making batch verification fail and revert to per-share verification, which in turn would slow down // the whole verification process - std::unordered_set> uniqueSignHashes; + std::unordered_set, StaticSaltedHasher> uniqueSignHashes; llmq::utils::IterateNodesRandom(nodeStates, [&]() { return uniqueSignHashes.size() < maxUniqueSessions; }, [&](NodeId nodeId, CSigSharesNodeState& ns) { if (ns.pendingIncomingSigShares.Empty()) { return false; @@ -439,7 +439,7 @@ void CSigSharesManager::CollectPendingSigSharesToVerify( bool CSigSharesManager::ProcessPendingSigShares(CConnman& connman) { std::unordered_map> sigSharesByNodes; - std::unordered_map, CQuorumCPtr> quorums; + std::unordered_map, CQuorumCPtr, StaticSaltedHasher> quorums; CollectPendingSigSharesToVerify(32, sigSharesByNodes, quorums); if (sigSharesByNodes.empty()) { @@ -508,7 +508,10 @@ bool CSigSharesManager::ProcessPendingSigShares(CConnman& connman) } // It's ensured that no duplicates are passed to this method -void CSigSharesManager::ProcessPendingSigSharesFromNode(NodeId nodeId, const std::vector& sigShares, const std::unordered_map, CQuorumCPtr>& quorums, CConnman& connman) +void CSigSharesManager::ProcessPendingSigSharesFromNode(NodeId nodeId, + const std::vector& sigShares, + const std::unordered_map, CQuorumCPtr, StaticSaltedHasher>& quorums, + CConnman& connman) { LOCK(cs); auto& nodeState = nodeStates[nodeId]; @@ -660,11 +663,9 @@ void CSigSharesManager::TryRecoverSig(const CQuorumCPtr& quorum, const uint256& } // cs must be held -void CSigSharesManager::CollectSigSharesToRequest(std::unordered_map>& sigSharesToRequest) +void CSigSharesManager::CollectSigSharesToRequest(std::unordered_map>& sigSharesToRequest) { int64_t now = GetTimeMillis(); - std::unordered_map> nodesBySigShares; - const size_t maxRequestsForNode = 32; // avoid requesting from same nodes all the time @@ -695,7 +696,7 @@ void CSigSharesManager::CollectSigSharesToRequest(std::unordered_map* invMap = nullptr; + decltype(sigSharesToRequest.begin()->second)* invMap = nullptr; for (auto& p2 : nodeState.sessions) { auto& signHash = p2.first; @@ -756,7 +757,7 @@ void CSigSharesManager::CollectSigSharesToRequest(std::unordered_map>& sigSharesToSend) +void CSigSharesManager::CollectSigSharesToSend(std::unordered_map>& sigSharesToSend) { for (auto& p : nodeStates) { auto nodeId = p.first; @@ -766,7 +767,7 @@ void CSigSharesManager::CollectSigSharesToSend(std::unordered_map* sigSharesToSend2 = nullptr; + decltype(sigSharesToSend.begin()->second)* sigSharesToSend2 = nullptr; for (auto& p2 : nodeState.sessions) { auto& signHash = p2.first; @@ -813,9 +814,9 @@ void CSigSharesManager::CollectSigSharesToSend(std::unordered_map>& sigSharesToAnnounce) +void CSigSharesManager::CollectSigSharesToAnnounce(std::unordered_map>& sigSharesToAnnounce) { - std::unordered_set> quorumNodesPrepared; + std::unordered_set, StaticSaltedHasher> quorumNodesPrepared; this->sigSharesToAnnounce.ForEach([&](const SigShareKey& sigShareKey, bool) { auto& signHash = sigShareKey.first; @@ -883,9 +884,9 @@ bool CSigSharesManager::SendMessages() nodesByAddress.emplace(pnode->addr, pnode->GetId()); }); - std::unordered_map> sigSharesToRequest; - std::unordered_map> sigSharesToSend; - std::unordered_map> sigSharesToAnnounce; + std::unordered_map> sigSharesToRequest; + std::unordered_map> sigSharesToSend; + std::unordered_map> sigSharesToAnnounce; { LOCK(cs); @@ -945,13 +946,13 @@ void CSigSharesManager::Cleanup() return; } - std::unordered_set> quorumsToCheck; + std::unordered_set, StaticSaltedHasher> quorumsToCheck; { LOCK(cs); // Remove sessions which were successfully recovered - std::unordered_set doneSessions; + std::unordered_set doneSessions; sigShares.ForEach([&](const SigShareKey& k, const CSigShare& sigShare) { if (doneSessions.count(sigShare.GetSignHash())) { return; @@ -965,7 +966,7 @@ void CSigSharesManager::Cleanup() } // Remove sessions which timed out - std::unordered_set timeoutSessions; + std::unordered_set timeoutSessions; for (auto& p : timeSeenForSessions) { auto& signHash = p.first; int64_t firstSeenTime = p.second.first; @@ -1009,7 +1010,7 @@ void CSigSharesManager::Cleanup() { // Now delete sessions which are for inactive quorums LOCK(cs); - std::unordered_set inactiveQuorumSessions; + std::unordered_set inactiveQuorumSessions; sigShares.ForEach([&](const SigShareKey& k, const CSigShare& sigShare) { if (quorumsToCheck.count(std::make_pair((Consensus::LLMQType)sigShare.llmqType, sigShare.quorumHash))) { inactiveQuorumSessions.emplace(sigShare.GetSignHash()); diff --git a/src/llmq/quorums_signing_shares.h b/src/llmq/quorums_signing_shares.h index 1dbdc166f6b39..9d9954cedf839 100644 --- a/src/llmq/quorums_signing_shares.h +++ b/src/llmq/quorums_signing_shares.h @@ -10,6 +10,7 @@ #include "consensus/params.h" #include "net.h" #include "random.h" +#include "saltedhasher.h" #include "serialize.h" #include "sync.h" #include "tinyformat.h" @@ -29,28 +30,6 @@ namespace llmq { // typedef std::pair SigShareKey; -} // namespace llmq - -namespace std -{ -template <> -struct hash { - std::size_t operator()(const llmq::SigShareKey& k) const - { - return (std::size_t)((k.second + 1) * k.first.GetCheapHash()); - } -}; -template <> -struct hash> { - std::size_t operator()(const std::pair& k) const - { - return (std::size_t)((k.first + 1) * k.second.GetCheapHash()); - } -}; -} // namespace std - -namespace llmq -{ // this one does not get transmitted over the wire as it is batched inside CBatchedSigShares class CSigShare @@ -151,7 +130,7 @@ template class SigShareMap { private: - std::unordered_map> internalMap; + std::unordered_map, StaticSaltedHasher> internalMap; public: bool Add(const SigShareKey& k, const T& v) @@ -340,7 +319,7 @@ class CSigSharesManager : public CRecoveredSigsListener SigShareMap sigShares; // stores time of first and last receivedSigShare. Used to detect timeouts - std::unordered_map> timeSeenForSessions; + std::unordered_map, StaticSaltedHasher> timeSeenForSessions; std::unordered_map nodeStates; SigShareMap> sigSharesRequested; @@ -379,10 +358,15 @@ class CSigSharesManager : public CRecoveredSigsListener bool VerifySigSharesInv(NodeId from, const CSigSharesInv& inv); bool PreVerifyBatchedSigShares(NodeId nodeId, const CBatchedSigShares& batchedSigShares, bool& retBan); - void CollectPendingSigSharesToVerify(size_t maxUniqueSessions, std::unordered_map>& retSigShares, std::unordered_map, CQuorumCPtr>& retQuorums); + void CollectPendingSigSharesToVerify(size_t maxUniqueSessions, + std::unordered_map>& retSigShares, + std::unordered_map, CQuorumCPtr, StaticSaltedHasher>& retQuorums); bool ProcessPendingSigShares(CConnman& connman); - void ProcessPendingSigSharesFromNode(NodeId nodeId, const std::vector& sigShares, const std::unordered_map, CQuorumCPtr>& quorums, CConnman& connman); + void ProcessPendingSigSharesFromNode(NodeId nodeId, + const std::vector& sigShares, + const std::unordered_map, CQuorumCPtr, StaticSaltedHasher>& quorums, + CConnman& connman); void ProcessSigShare(NodeId nodeId, const CSigShare& sigShare, CConnman& connman, const CQuorumCPtr& quorum); void TryRecoverSig(const CQuorumCPtr& quorum, const uint256& id, const uint256& msgHash, CConnman& connman); @@ -395,9 +379,9 @@ class CSigSharesManager : public CRecoveredSigsListener void BanNode(NodeId nodeId); bool SendMessages(); - void CollectSigSharesToRequest(std::unordered_map>& sigSharesToRequest); - void CollectSigSharesToSend(std::unordered_map>& sigSharesToSend); - void CollectSigSharesToAnnounce(std::unordered_map>& sigSharesToAnnounce); + void CollectSigSharesToRequest(std::unordered_map>& sigSharesToRequest); + void CollectSigSharesToSend(std::unordered_map>& sigSharesToSend); + void CollectSigSharesToAnnounce(std::unordered_map>& sigSharesToAnnounce); bool SignPendingSigShares(); void WorkThreadMain(); }; From e73c238c0c402b4343f89bab979c5da6089b2932 Mon Sep 17 00:00:00 2001 From: UdjinM6 Date: Wed, 27 Feb 2019 20:41:45 +0300 Subject: [PATCH 05/10] Merge pull request #2726 from codablock/pr_llmq_sessionids Introduce "session announcements" and session IDs used in LLMQ P2P messages --- src/llmq/quorums_signing_shares.cpp | 341 +++++++++++++++++++--------- src/llmq/quorums_signing_shares.h | 117 ++++++---- src/protocol.cpp | 2 + src/protocol.h | 1 + src/tiertwo_networksync.cpp | 2 +- 5 files changed, 313 insertions(+), 150 deletions(-) diff --git a/src/llmq/quorums_signing_shares.cpp b/src/llmq/quorums_signing_shares.cpp index 079fd260c45f7..56d95e164793a 100644 --- a/src/llmq/quorums_signing_shares.cpp +++ b/src/llmq/quorums_signing_shares.cpp @@ -31,10 +31,14 @@ void CSigShare::UpdateKey() key.second = quorumMember; } +std::string CSigSesAnn::ToString() const +{ + return strprintf("sessionId=%d, llmqType=%d, quorumHash=%s, id=%s, msgHash=%s", + sessionId, llmqType, quorumHash.ToString(), id.ToString(), msgHash.ToString()); +} + void CSigSharesInv::Merge(const CSigSharesInv& inv2) { - assert(llmqType == inv2.llmqType); - assert(signHash == inv2.signHash); for (size_t i = 0; i < inv.size(); i++) { if (inv2.inv[i]) { inv[i] = inv2.inv[i]; @@ -49,7 +53,7 @@ size_t CSigSharesInv::CountSet() const std::string CSigSharesInv::ToString() const { - std::string str = strprintf("signHash=%s, inv=(", signHash.ToString()); + std::string str = "("; bool first = true; for (size_t i = 0; i < inv.size(); i++) { if (!inv[i]) { @@ -66,11 +70,8 @@ std::string CSigSharesInv::ToString() const return str; } -void CSigSharesInv::Init(Consensus::LLMQType _llmqType, const uint256& _signHash) +void CSigSharesInv::Init(Consensus::LLMQType _llmqType) { - llmqType = _llmqType; - signHash = _signHash; - size_t llmqSize = (size_t)(Params().GetConsensus().llmqs.at(_llmqType).size); inv.resize(llmqSize, false); } @@ -87,68 +88,94 @@ void CSigSharesInv::Set(uint16_t quorumMember, bool v) inv[quorumMember] = v; } -CSigSharesNodeState::Session& CSigSharesNodeState::GetOrCreateSession(Consensus::LLMQType llmqType, const uint256& signHash) +CSigSharesInv CBatchedSigShares::ToInv(Consensus::LLMQType llmqType) const { - auto& s = sessions[signHash]; - if (s.announced.inv.empty()) { - s.announced.Init(llmqType, signHash); - s.requested.Init(llmqType, signHash); - s.knows.Init(llmqType, signHash); - } else { - assert(s.announced.llmqType == llmqType); - assert(s.requested.llmqType == llmqType); - assert(s.knows.llmqType == llmqType); + CSigSharesInv inv; + inv.Init(llmqType); + for (size_t i = 0; i < sigShares.size(); i++) { + inv.inv[sigShares[i].first] = true; } - return s; + return inv; } -void CSigSharesNodeState::MarkAnnounced(const uint256& signHash, const CSigSharesInv& inv) +template +static void InitSession(CSigSharesNodeState::Session& s, const uint256& signHash, T& from) { - GetOrCreateSession((Consensus::LLMQType)inv.llmqType, signHash).announced.Merge(inv); + s.llmqType = (Consensus::LLMQType)from.llmqType; + s.quorumHash = from.quorumHash; + s.id = from.id; + s.msgHash = from.msgHash; + s.signHash = signHash; + s.announced.Init((Consensus::LLMQType)from.llmqType); + s.requested.Init((Consensus::LLMQType)from.llmqType); + s.knows.Init((Consensus::LLMQType)from.llmqType); } -void CSigSharesNodeState::MarkRequested(const uint256& signHash, const CSigSharesInv& inv) +CSigSharesNodeState::Session& CSigSharesNodeState::GetOrCreateSessionFromShare(const llmq::CSigShare& sigShare) { - GetOrCreateSession((Consensus::LLMQType)inv.llmqType, signHash).requested.Merge(inv); + auto& s = sessions[sigShare.GetSignHash()]; + if (s.announced.inv.empty()) { + InitSession(s, sigShare.GetSignHash(), sigShare); + } + return s; } -void CSigSharesNodeState::MarkKnows(const uint256& signHash, const CSigSharesInv& inv) +CSigSharesNodeState::Session& CSigSharesNodeState::GetOrCreateSessionFromAnn(const llmq::CSigSesAnn& ann) { - GetOrCreateSession((Consensus::LLMQType)inv.llmqType, signHash).knows.Merge(inv); + auto signHash = llmq::utils::BuildSignHash((Consensus::LLMQType)ann.llmqType, ann.quorumHash, ann.id, ann.msgHash); + auto& s = sessions[signHash]; + if (s.announced.inv.empty()) { + InitSession(s, signHash, ann); + } + return s; } -void CSigSharesNodeState::MarkAnnounced(Consensus::LLMQType llmqType, const uint256& signHash, uint16_t quorumMember) +CSigSharesNodeState::Session* CSigSharesNodeState::GetSessionBySignHash(const uint256& signHash) { - GetOrCreateSession(llmqType, signHash).announced.Set(quorumMember, true); + auto it = sessions.find(signHash); + if (it == sessions.end()) { + return nullptr; + } + return &it->second; } -void CSigSharesNodeState::MarkRequested(Consensus::LLMQType llmqType, const uint256& signHash, uint16_t quorumMember) +CSigSharesNodeState::Session* CSigSharesNodeState::GetSessionByRecvId(uint32_t sessionId) { - GetOrCreateSession(llmqType, signHash).requested.Set(quorumMember, true); + auto it = sessionByRecvId.find(sessionId); + if (it == sessionByRecvId.end()) { + return nullptr; + } + return it->second; } -void CSigSharesNodeState::MarkKnows(Consensus::LLMQType llmqType, const uint256& signHash, uint16_t quorumMember) +bool CSigSharesNodeState::GetSessionInfoByRecvId(uint32_t sessionId, SessionInfo& retInfo) { - GetOrCreateSession(llmqType, signHash).knows.Set(quorumMember, true); + auto s = GetSessionByRecvId(sessionId); + if (!s) { + return false; + } + retInfo.recvSessionId = sessionId; + retInfo.llmqType = s->llmqType; + retInfo.quorumHash = s->quorumHash; + retInfo.id = s->id; + retInfo.msgHash = s->msgHash; + retInfo.signHash = s->signHash; + retInfo.quorum = s->quorum; + + return true; } void CSigSharesNodeState::RemoveSession(const uint256& signHash) { - sessions.erase(signHash); + auto it = sessions.find(signHash); + if (it != sessions.end()) { + sessionByRecvId.erase(it->second.recvSessionId); + sessions.erase(it); + } requestedSigShares.EraseAllForSignHash(signHash); pendingIncomingSigShares.EraseAllForSignHash(signHash); } -CSigSharesInv CBatchedSigShares::ToInv() const -{ - CSigSharesInv inv; - inv.Init((Consensus::LLMQType)llmqType, llmq::utils::BuildSignHash(*this)); - for (size_t i = 0; i < sigShares.size(); i++) { - inv.inv[sigShares[i].first] = true; - } - return inv; -} - ////////////////////// CSigSharesManager::CSigSharesManager() @@ -194,7 +221,11 @@ void CSigSharesManager::ProcessMessage(CNode* pfrom, const std::string& strComma return; } - if (strCommand == NetMsgType::QSIGSHARESINV) { + if (strCommand == NetMsgType::QSIGSESANN) { + CSigSesAnn ann; + vRecv >> ann; + ProcessMessageSigSesAnn(pfrom, ann, connman); + } else if (strCommand == NetMsgType::QSIGSHARESINV) { CSigSharesInv inv; vRecv >> inv; ProcessMessageSigSharesInv(pfrom, inv, connman); @@ -209,15 +240,43 @@ void CSigSharesManager::ProcessMessage(CNode* pfrom, const std::string& strComma } } -bool CSigSharesManager::VerifySigSharesInv(NodeId from, const CSigSharesInv& inv) +void CSigSharesManager::ProcessMessageSigSesAnn(CNode* pfrom, const CSigSesAnn& ann, CConnman& connman) { - Consensus::LLMQType llmqType = (Consensus::LLMQType)inv.llmqType; - if (!Params().GetConsensus().llmqs.count(llmqType) || inv.signHash.IsNull()) { - BanNode(from); - return false; + auto llmqType = (Consensus::LLMQType)ann.llmqType; + if (!Params().GetConsensus().llmqs.count(llmqType)) { + BanNode(pfrom->GetId()); + return; + } + if (ann.sessionId == (uint32_t)-1 || ann.quorumHash.IsNull() || ann.id.IsNull() || ann.msgHash.IsNull()) { + BanNode(pfrom->GetId()); + return; } - if (!activeMasternodeManager) { + LogPrintf("llmq", "CSigSharesManager::%s -- ann={%s}, node=%d\n", __func__, ann.ToString(), pfrom->GetId()); + + auto quorum = quorumManager->GetQuorum(llmqType, ann.quorumHash); + if (!quorum) { + // TODO should we ban here? + LogPrintf("CSigSharesManager::%s -- quorum %s not found, node=%d\n", __func__, + ann.quorumHash.ToString(), pfrom->GetId()); + return; + } + + auto signHash = llmq::utils::BuildSignHash(llmqType, ann.quorumHash, ann.id, ann.msgHash); + + LOCK(cs); + auto& nodeState = nodeStates[pfrom->GetId()]; + auto& session = nodeState.GetOrCreateSessionFromAnn(ann); + nodeState.sessionByRecvId.erase(session.recvSessionId); + nodeState.sessionByRecvId.erase(ann.sessionId); + session.recvSessionId = ann.sessionId; + session.quorum = quorum; + nodeState.sessionByRecvId.emplace(ann.sessionId, &session); +} + +bool CSigSharesManager::VerifySigSharesInv(NodeId from, Consensus::LLMQType llmqType, const CSigSharesInv& inv) +{ + if (!fMasterNode || activeMasternodeManager->GetProTx().IsNull()) { return false; } @@ -232,46 +291,71 @@ bool CSigSharesManager::VerifySigSharesInv(NodeId from, const CSigSharesInv& inv void CSigSharesManager::ProcessMessageSigSharesInv(CNode* pfrom, const CSigSharesInv& inv, CConnman& connman) { - if (!VerifySigSharesInv(pfrom->GetId(), inv)) { + CSigSharesNodeState::SessionInfo sessionInfo; + if (!GetSessionInfoByRecvId(pfrom->GetId(), inv.sessionId, sessionInfo)) { + return; + } + + if (!VerifySigSharesInv(pfrom->GetId(), sessionInfo.llmqType, inv)) { return; } // TODO for PoSe, we should consider propagating shares even if we already have a recovered sig - if (quorumSigningManager->HasRecoveredSigForSession(inv.signHash)) { + if (quorumSigningManager->HasRecoveredSigForSession(sessionInfo.signHash)) { return; } - LogPrintf("llmq", "CSigSharesManager::%s -- inv={%s}, node=%d\n", __func__, inv.ToString(), pfrom->GetId()); + LogPrintf("llmq", "CSigSharesManager::%s -- signHash=%s, inv={%s}, node=%d\n", __func__, + sessionInfo.signHash.ToString(), inv.ToString(), pfrom->GetId()); LOCK(cs); auto& nodeState = nodeStates[pfrom->GetId()]; - nodeState.MarkAnnounced(inv.signHash, inv); - nodeState.MarkKnows(inv.signHash, inv); + auto session = nodeState.GetSessionByRecvId(inv.sessionId); + if (!session) { + return; + } + session->announced.Merge(inv); + session->knows.Merge(inv); } void CSigSharesManager::ProcessMessageGetSigShares(CNode* pfrom, const CSigSharesInv& inv, CConnman& connman) { - if (!VerifySigSharesInv(pfrom->GetId(), inv)) { + CSigSharesNodeState::SessionInfo sessionInfo; + if (!GetSessionInfoByRecvId(pfrom->GetId(), inv.sessionId, sessionInfo)) { + return; + } + + if (!VerifySigSharesInv(pfrom->GetId(), sessionInfo.llmqType, inv)) { return; } // TODO for PoSe, we should consider propagating shares even if we already have a recovered sig - if (quorumSigningManager->HasRecoveredSigForSession(inv.signHash)) { + if (quorumSigningManager->HasRecoveredSigForSession(sessionInfo.signHash)) { return; } - LogPrintf("llmq", "CSigSharesManager::%s -- inv={%s}, node=%d\n", __func__, inv.ToString(), pfrom->GetId()); + LogPrintf("llmq", "CSigSharesManager::%s -- signHash=%s, inv={%s}, node=%d\n", __func__, + sessionInfo.signHash.ToString(), inv.ToString(), pfrom->GetId()); LOCK(cs); auto& nodeState = nodeStates[pfrom->GetId()]; - nodeState.MarkRequested(inv.signHash, inv); - nodeState.MarkKnows(inv.signHash, inv); + auto session = nodeState.GetSessionByRecvId(inv.sessionId); + if (!session) { + return; + } + session->requested.Merge(inv); + session->knows.Merge(inv); } void CSigSharesManager::ProcessMessageBatchedSigShares(CNode* pfrom, const CBatchedSigShares& batchedSigShares, CConnman& connman) { + CSigSharesNodeState::SessionInfo sessionInfo; + if (!GetSessionInfoByRecvId(pfrom->GetId(), batchedSigShares.sessionId, sessionInfo)) { + return; + } + bool ban = false; - if (!PreVerifyBatchedSigShares(pfrom->GetId(), batchedSigShares, ban)) { + if (!PreVerifyBatchedSigShares(pfrom->GetId(), sessionInfo, batchedSigShares, ban)) { if (ban) { BanNode(pfrom->GetId()); return; @@ -287,7 +371,7 @@ void CSigSharesManager::ProcessMessageBatchedSigShares(CNode* pfrom, const CBatc auto& nodeState = nodeStates[pfrom->GetId()]; for (size_t i = 0; i < batchedSigShares.sigShares.size(); i++) { - CSigShare sigShare = batchedSigShares.RebuildSigShare(i); + CSigShare sigShare = RebuildSigShare(sessionInfo, batchedSigShares, i); nodeState.requestedSigShares.Erase(sigShare.GetKey()); // TODO track invalid sig shares received for PoSe? @@ -307,8 +391,8 @@ void CSigSharesManager::ProcessMessageBatchedSigShares(CNode* pfrom, const CBatc } } - LogPrintf("llmq", "CSigSharesManager::%s -- shares=%d, new=%d, inv={%s}, node=%d\n", __func__, - batchedSigShares.sigShares.size(), sigShares.size(), batchedSigShares.ToInv().ToString(), pfrom->GetId()); + LogPrintf("llmq", "CSigSharesManager::%s -- signHash=%s, shares=%d, new=%d, inv={%s}, node=%d\n", __func__, + sessionInfo.signHash.ToString(), batchedSigShares.sigShares.size(), sigShares.size(), batchedSigShares.ToInv(sessionInfo.llmqType).ToString(), pfrom->GetId()); if (sigShares.empty()) { return; @@ -321,36 +405,22 @@ void CSigSharesManager::ProcessMessageBatchedSigShares(CNode* pfrom, const CBatc } } -bool CSigSharesManager::PreVerifyBatchedSigShares(NodeId nodeId, const CBatchedSigShares& batchedSigShares, bool& retBan) +bool CSigSharesManager::PreVerifyBatchedSigShares(NodeId nodeId, const CSigSharesNodeState::SessionInfo& session, const CBatchedSigShares& batchedSigShares, bool& retBan) { retBan = false; - auto llmqType = (Consensus::LLMQType)batchedSigShares.llmqType; - if (!Params().GetConsensus().llmqs.count(llmqType)) { - retBan = true; - return false; - } - - CQuorumCPtr quorum = quorumManager->GetQuorum(llmqType, batchedSigShares.quorumHash); - - if (!quorum) { - // TODO should we ban here? - LogPrintf("CSigSharesManager::%s -- quorum %s not found, node=%d\n", __func__, - batchedSigShares.quorumHash.ToString(), nodeId); - return false; - } - if (!llmq::utils::IsQuorumActive(llmqType, quorum->pindexQuorum->GetBlockHash())) { + if (!llmq::utils::IsQuorumActive(session.llmqType, session.quorum->pindexQuorum->GetBlockHash())) { // quorum is too old return false; } - if (!quorum->IsMember(activeMasternodeManager->GetProTx())) { + if (!session.quorum->IsMember(activeMasternodeManager->GetProTx())) { // we're not a member so we can't verify it (we actually shouldn't have received it) return false; } - if (quorum->quorumVvec == nullptr) { + if (session.quorum->quorumVvec == nullptr) { // TODO we should allow to ask other nodes for the quorum vvec if we missed it in the DKG LogPrintf("CSigSharesManager::%s -- we don't have the quorum vvec for %s, no verification possible. node=%d\n", __func__, - batchedSigShares.quorumHash.ToString(), nodeId); + session.quorumHash.ToString(), nodeId); return false; } @@ -363,12 +433,12 @@ bool CSigSharesManager::PreVerifyBatchedSigShares(NodeId nodeId, const CBatchedS return false; } - if (quorumMember >= quorum->members.size()) { + if (quorumMember >= session.quorum->members.size()) { LogPrintf("CSigSharesManager::%s -- quorumMember out of bounds\n", __func__); retBan = true; return false; } - if (!quorum->validMembers[quorumMember]) { + if (!session.quorum->validMembers[quorumMember]) { LogPrintf("CSigSharesManager::%s -- quorumMember not valid\n", __func__); retBan = true; return false; @@ -581,8 +651,10 @@ void CSigSharesManager::ProcessSigShare(NodeId nodeId, const CSigShare& sigShare if (!quorumNodes.count(p.first) && !p.second.interestedIn.count(std::make_pair((Consensus::LLMQType)sigShare.llmqType, sigShare.quorumHash))) { continue; } - p.second.MarkRequested((Consensus::LLMQType)sigShare.llmqType, sigShare.GetSignHash(), sigShare.quorumMember); - p.second.MarkKnows((Consensus::LLMQType)sigShare.llmqType, sigShare.GetSignHash(), sigShare.quorumMember); + auto& session = p.second.GetOrCreateSessionFromShare(sigShare); + session.quorum = quorum; + session.requested.Set(sigShare.quorumMember, true); + session.knows.Set(sigShare.quorumMember, true); } } @@ -745,7 +817,7 @@ void CSigSharesManager::CollectSigSharesToRequest(std::unordered_mapllmqType; - batchedSigShares.quorumHash = sigShare->quorumHash; - batchedSigShares.id = sigShare->id; - batchedSigShares.msgHash = sigShare->msgHash; - } batchedSigShares.sigShares.emplace_back((uint16_t)i, sigShare->sigShare); } @@ -854,7 +920,7 @@ void CSigSharesManager::CollectSigSharesToAnnounce(std::unordered_mapllmqType, signHash); + auto& session = nodeState.GetOrCreateSessionFromShare(*sigShare); if (session.knows.inv[quorumMember]) { // he already knows that one @@ -863,7 +929,7 @@ void CSigSharesManager::CollectSigSharesToAnnounce(std::unordered_mapllmqType, signHash); + inv.Init((Consensus::LLMQType)sigShare->llmqType); } inv.inv[quorumMember] = true; session.knows.inv[quorumMember] = true; @@ -887,12 +953,48 @@ bool CSigSharesManager::SendMessages() std::unordered_map> sigSharesToRequest; std::unordered_map> sigSharesToSend; std::unordered_map> sigSharesToAnnounce; + std::unordered_map> sigSessionAnnouncements; + + auto addSigSesAnnIfNeeded = [&](NodeId nodeId, const uint256& signHash) { + auto& nodeState = nodeStates[nodeId]; + auto session = nodeState.GetSessionBySignHash(signHash); + assert(session); + if (session->sendSessionId == (uint32_t)-1) { + session->sendSessionId = nodeState.nextSendSessionId++; + + CSigSesAnn sigSesAnn; + sigSesAnn.sessionId = session->sendSessionId; + sigSesAnn.llmqType = (uint8_t)session->llmqType; + sigSesAnn.quorumHash = session->quorumHash; + sigSesAnn.id = session->id; + sigSesAnn.msgHash = session->msgHash; + + sigSessionAnnouncements[nodeId].emplace_back(sigSesAnn); + } + return session->sendSessionId; + }; { LOCK(cs); CollectSigSharesToRequest(sigSharesToRequest); CollectSigSharesToSend(sigSharesToSend); CollectSigSharesToAnnounce(sigSharesToAnnounce); + + for (auto& p : sigSharesToRequest) { + for (auto& p2 : p.second) { + p2.second.sessionId = addSigSesAnnIfNeeded(p.first, p2.first); + } + } + for (auto& p : sigSharesToSend) { + for (auto& p2 : p.second) { + p2.second.sessionId = addSigSesAnnIfNeeded(p.first, p2.first); + } + } + for (auto& p : sigSharesToAnnounce) { + for (auto& p2 : p.second) { + p2.second.sessionId = addSigSesAnnIfNeeded(p.first, p2.first); + } + } } bool didSend = false; @@ -900,6 +1002,16 @@ bool CSigSharesManager::SendMessages() g_connman->ForEachNode([&](CNode* pnode) { CNetMsgMaker msgMaker(pnode->GetSendVersion()); + auto it1 = sigSessionAnnouncements.find(pnode->GetId()); + if (it1 != sigSessionAnnouncements.end()) { + for (auto& sigSesAnn : it1->second) { + LogPrintf("llmq", "CSigSharesManager::SendMessages -- QSIGSESANN signHash=%s, sessionId=%d, node=%d\n", + llmq::utils::BuildSignHash(sigSesAnn).ToString(), sigSesAnn.sessionId, pnode->GetId()); + g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSESANN, sigSesAnn)); + didSend = true; + } + } + auto it = sigSharesToRequest.find(pnode->GetId()); if (it != sigSharesToRequest.end()) { for (auto& p : it->second) { @@ -915,8 +1027,12 @@ bool CSigSharesManager::SendMessages() if (jt != sigSharesToSend.end()) { for (auto& p : jt->second) { assert(!p.second.sigShares.empty()); - LogPrintf("llmq", "CSigSharesManager::SendMessages -- QBSIGSHARES inv={%s}, node=%d\n", - p.second.ToInv().ToString(), pnode->GetId()); + if (LogAcceptCategory(BCLog::LogFlags::LLMQ)) { + LOCK(cs); + auto session = nodeStates[pnode->GetId()].GetSessionBySignHash(p.first); + LogPrintf("llmq", "CSigSharesManager::SendMessages -- QBSIGSHARES signHash=%s, inv={%s}, node=%d\n", + p.first.ToString(), p.second.ToInv(session->llmqType).ToString(), pnode->GetId()); + } g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QBSIGSHARES, p.second)); didSend = true; } @@ -926,8 +1042,8 @@ bool CSigSharesManager::SendMessages() if (kt != sigSharesToAnnounce.end()) { for (auto& p : kt->second) { assert(p.second.CountSet() != 0); - LogPrintf("llmq", "CSigSharesManager::SendMessages -- QSIGSHARESINV inv={%s}, node=%d\n", - p.second.ToString(), pnode->GetId()); + LogPrintf("llmq", "CSigSharesManager::SendMessages -- QSIGSHARESINV signHash=%s, inv={%s}, node=%d\n", + p.first.ToString(), p.second.ToString(), pnode->GetId()); g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSHARESINV, p.second)); didSend = true; } @@ -939,6 +1055,27 @@ bool CSigSharesManager::SendMessages() return didSend; } +bool CSigSharesManager::GetSessionInfoByRecvId(NodeId nodeId, uint32_t sessionId, CSigSharesNodeState::SessionInfo& retInfo) +{ + LOCK(cs); + return nodeStates[nodeId].GetSessionInfoByRecvId(sessionId, retInfo); +} + +CSigShare CSigSharesManager::RebuildSigShare(const CSigSharesNodeState::SessionInfo& session, const CBatchedSigShares& batchedSigShares, size_t idx) +{ + assert(idx < batchedSigShares.sigShares.size()); + auto& s = batchedSigShares.sigShares[idx]; + CSigShare sigShare; + sigShare.llmqType = session.llmqType; + sigShare.quorumHash = session.quorumHash; + sigShare.quorumMember = s.first; + sigShare.id = session.id; + sigShare.msgHash = session.msgHash; + sigShare.sigShare = s.second; + sigShare.UpdateKey(); + return sigShare; +} + void CSigSharesManager::Cleanup() { int64_t now = GetTimeMillis(); @@ -1182,15 +1319,15 @@ void CSigSharesManager::Sign(const CQuorumCPtr& quorum, const uint256& id, const sigShare.sigShare.Set(skShare.Sign(signHash)); if (!sigShare.sigShare.Get().IsValid()) { - LogPrintf("CSigSharesManager::%s -- failed to sign sigShare. id=%s, msgHash=%s, time=%s\n", __func__, - sigShare.id.ToString(), sigShare.msgHash.ToString(), t.count()); + LogPrintf("CSigSharesManager::%s -- failed to sign sigShare. signHahs=%s, id=%s, msgHash=%s, time=%s\n", __func__, + signHash.ToString(), sigShare.id.ToString(), sigShare.msgHash.ToString(), t.count()); return; } sigShare.UpdateKey(); - LogPrintf("CSigSharesManager::%s -- signed sigShare. id=%s, msgHash=%s, time=%s\n", __func__, - sigShare.id.ToString(), sigShare.msgHash.ToString(), t.count()); + LogPrintf("CSigSharesManager::%s -- signed sigShare. signHash=%s, id=%s, msgHash=%s, time=%s\n", __func__, + signHash.ToString(), sigShare.id.ToString(), sigShare.msgHash.ToString(), t.count()); ProcessSigShare(-1, sigShare, *g_connman, quorum); } diff --git a/src/llmq/quorums_signing_shares.h b/src/llmq/quorums_signing_shares.h index 9d9954cedf839..d83e6da8e9995 100644 --- a/src/llmq/quorums_signing_shares.h +++ b/src/llmq/quorums_signing_shares.h @@ -57,29 +57,47 @@ class CSigShare } }; -class CSigSharesInv +// Nodes will first announce a signing session with a sessionId to be used in all future P2P messages related to that +// session. We locally keep track of the mapping for each node. We also assign new sessionIds for outgoing sessions +// and send QSIGSESANN messages appropriately. All values except the max value for uint32_t are valid as sessionId +class CSigSesAnn { public: + uint32_t sessionId{(uint32_t)-1}; uint8_t llmqType; - uint256 signHash; + uint256 quorumHash; + uint256 id; + uint256 msgHash; + + SERIALIZE_METHODS(CSigSesAnn, obj) + { + READWRITE(VARINT(obj.sessionId)); + READWRITE(obj.llmqType); + READWRITE(obj.quorumHash); + READWRITE(obj.id); + READWRITE(obj.msgHash); + } + + std::string ToString() const; +}; + +class CSigSharesInv +{ +public: + uint32_t sessionId{(uint32_t)-1}; std::vector inv; public: SERIALIZE_METHODS(CSigSharesInv, obj) { - READWRITE(obj.llmqType); + uint64_t invSize = obj.inv.size(); - auto& consensus = Params().GetConsensus(); - auto it = consensus.llmqs.find((Consensus::LLMQType)obj.llmqType); - if (it == consensus.llmqs.end()) { - throw std::ios_base::failure("invalid llmqType"); - } - const auto& params = it->second; - READWRITE(obj.signHash); - READWRITE(AUTOBITSET(obj.inv, (size_t)params.size)); + READWRITE(VARINT(obj.sessionId)); + READWRITE(COMPACTSIZE(invSize)); + READWRITE(AUTOBITSET(obj.inv, (size_t)invSize)); } - void Init(Consensus::LLMQType _llmqType, const uint256& _signHash); + void Init(Consensus::LLMQType _llmqType); bool IsSet(uint16_t quorumMember) const; void Set(uint16_t quorumMember, bool v); void Merge(const CSigSharesInv& inv2); @@ -92,38 +110,17 @@ class CSigSharesInv class CBatchedSigShares { public: - uint8_t llmqType; - uint256 quorumHash; - uint256 id; - uint256 msgHash; + uint32_t sessionId{(uint32_t)-1}; std::vector> sigShares; public: SERIALIZE_METHODS(CBatchedSigShares, obj) { - READWRITE(obj.llmqType); - READWRITE(obj.quorumHash); - READWRITE(obj.id); - READWRITE(obj.msgHash); + READWRITE(VARINT(obj.sessionId)); READWRITE(obj.sigShares); } - CSigShare RebuildSigShare(size_t idx) const - { - assert(idx < sigShares.size()); - auto& s = sigShares[idx]; - CSigShare sigShare; - sigShare.llmqType = llmqType; - sigShare.quorumHash = quorumHash; - sigShare.quorumMember = s.first; - sigShare.id = id; - sigShare.msgHash = msgHash; - sigShare.sigShare = s.second; - sigShare.UpdateKey(); - return sigShare; - } - - CSigSharesInv ToInv() const; + CSigSharesInv ToInv(Consensus::LLMQType llmqType) const; }; template @@ -274,7 +271,30 @@ class SigShareMap class CSigSharesNodeState { public: + // Used to avoid holding locks too long + struct SessionInfo { + uint32_t recvSessionId; + Consensus::LLMQType llmqType; + uint256 quorumHash; + uint256 id; + uint256 msgHash; + uint256 signHash; + + CQuorumCPtr quorum; + }; + struct Session { + uint32_t recvSessionId{(uint32_t)-1}; + uint32_t sendSessionId{(uint32_t)-1}; + + Consensus::LLMQType llmqType; + uint256 quorumHash; + uint256 id; + uint256 msgHash; + uint256 signHash; + + CQuorumCPtr quorum; + CSigSharesInv announced; CSigSharesInv requested; CSigSharesInv knows; @@ -282,6 +302,9 @@ class CSigSharesNodeState // TODO limit number of sessions per node std::unordered_map sessions; + std::unordered_map sessionByRecvId; + uint32_t nextSendSessionId{1}; + SigShareMap pendingIncomingSigShares; SigShareMap requestedSigShares; @@ -291,15 +314,11 @@ class CSigSharesNodeState bool banned{false}; - Session& GetOrCreateSession(Consensus::LLMQType llmqType, const uint256& signHash); - - void MarkAnnounced(const uint256& signHash, const CSigSharesInv& inv); - void MarkRequested(const uint256& signHash, const CSigSharesInv& inv); - void MarkKnows(const uint256& signHash, const CSigSharesInv& inv); - - void MarkAnnounced(Consensus::LLMQType llmqType, const uint256& signHash, uint16_t quorumMember); - void MarkRequested(Consensus::LLMQType llmqType, const uint256& signHash, uint16_t quorumMember); - void MarkKnows(Consensus::LLMQType llmqType, const uint256& signHash, uint16_t quorumMember); + Session& GetOrCreateSessionFromShare(const CSigShare& sigShare); + Session& GetOrCreateSessionFromAnn(const CSigSesAnn& ann); + Session* GetSessionBySignHash(const uint256& signHash); + Session* GetSessionByRecvId(uint32_t sessionId); + bool GetSessionInfoByRecvId(uint32_t sessionId, SessionInfo& retInfo); void RemoveSession(const uint256& signHash); }; @@ -351,12 +370,13 @@ class CSigSharesManager : public CRecoveredSigsListener void HandleNewRecoveredSig(const CRecoveredSig& recoveredSig); private: + void ProcessMessageSigSesAnn(CNode* pfrom, const CSigSesAnn& ann, CConnman& connman); void ProcessMessageSigSharesInv(CNode* pfrom, const CSigSharesInv& inv, CConnman& connman); void ProcessMessageGetSigShares(CNode* pfrom, const CSigSharesInv& inv, CConnman& connman); void ProcessMessageBatchedSigShares(CNode* pfrom, const CBatchedSigShares& batchedSigShares, CConnman& connman); - bool VerifySigSharesInv(NodeId from, const CSigSharesInv& inv); - bool PreVerifyBatchedSigShares(NodeId nodeId, const CBatchedSigShares& batchedSigShares, bool& retBan); + bool VerifySigSharesInv(NodeId from, Consensus::LLMQType llmqType, const CSigSharesInv& inv); + bool PreVerifyBatchedSigShares(NodeId nodeId, const CSigSharesNodeState::SessionInfo& session, const CBatchedSigShares& batchedSigShares, bool& retBan); void CollectPendingSigSharesToVerify(size_t maxUniqueSessions, std::unordered_map>& retSigShares, @@ -372,6 +392,9 @@ class CSigSharesManager : public CRecoveredSigsListener void TryRecoverSig(const CQuorumCPtr& quorum, const uint256& id, const uint256& msgHash, CConnman& connman); private: + bool GetSessionInfoByRecvId(NodeId nodeId, uint32_t sessionId, CSigSharesNodeState::SessionInfo& retInfo); + CSigShare RebuildSigShare(const CSigSharesNodeState::SessionInfo& session, const CBatchedSigShares& batchedSigShares, size_t idx); + void Cleanup(); void RemoveSigSharesForSession(const uint256& signHash); void RemoveBannedNodeStates(); diff --git a/src/protocol.cpp b/src/protocol.cpp index 5a332a920a2cb..b327b3daeb352 100644 --- a/src/protocol.cpp +++ b/src/protocol.cpp @@ -60,6 +60,7 @@ const char* QCONTRIB = "qcontrib"; const char* QCOMPLAINT = "qcomplaint"; const char* QJUSTIFICATION = "qjustify"; const char* QPCOMMITMENT = "qpcommit"; +const char* QSIGSESANN = "qsigsesann"; const char* QSIGSHARESINV = "qsigsinv"; const char* QGETSIGSHARES = "qgetsigs"; const char* QBSIGSHARES = "qbsigs"; @@ -122,6 +123,7 @@ const static std::string allNetMessageTypes[] = { NetMsgType::QCOMPLAINT, NetMsgType::QJUSTIFICATION, NetMsgType::QPCOMMITMENT, + NetMsgType::QSIGSESANN, NetMsgType::QSIGSHARESINV, NetMsgType::QGETSIGSHARES, NetMsgType::QBSIGSHARES, diff --git a/src/protocol.h b/src/protocol.h index fd378219492ab..c241e11b8ffb3 100644 --- a/src/protocol.h +++ b/src/protocol.h @@ -293,6 +293,7 @@ extern const char* QCONTRIB; extern const char* QCOMPLAINT; extern const char* QJUSTIFICATION; extern const char* QPCOMMITMENT; +extern const char* QSIGSESANN; extern const char* QSIGSHARESINV; extern const char* QGETSIGSHARES; extern const char* QBSIGSHARES; diff --git a/src/tiertwo_networksync.cpp b/src/tiertwo_networksync.cpp index 296e1824863d2..383e8b67caaa3 100644 --- a/src/tiertwo_networksync.cpp +++ b/src/tiertwo_networksync.cpp @@ -72,7 +72,7 @@ bool CMasternodeSync::MessageDispatcher(CNode* pfrom, std::string& strCommand, C } return true; } - if (strCommand == NetMsgType::QSIGSHARESINV || strCommand == NetMsgType::QGETSIGSHARES || strCommand == NetMsgType::QBSIGSHARES) { + if (strCommand == NetMsgType::QSIGSHARESINV || strCommand == NetMsgType::QGETSIGSHARES || strCommand == NetMsgType::QBSIGSHARES || strCommand == NetMsgType::QSIGSESANN) { llmq::quorumSigSharesManager->ProcessMessage(pfrom, strCommand, vRecv, *g_connman); return true; } From 71092e0f2542e7aa8d008e05f61751139d4a3238 Mon Sep 17 00:00:00 2001 From: Alexander Block Date: Fri, 1 Mar 2019 08:21:09 +0100 Subject: [PATCH 06/10] Send/Receive multiple messages as part of one P2P message in CSigSharesManager (#2729) * Return bool in ProcessMessageXXX methods to indicate misbehaviour * Send/Receive multiple messages as part of one P2P message in CSigSharesManager Many messages, especially QSIGSHARESINV and QGETSIGSHARES, are very small by nature (5-14 bytes for a 50 members LLMQ). The message headers are 24 bytes, meaning that we produce a lot of overhead for these small messages. This sums up quite a bit when thousands of signing sessions are happening in parallel. This commit changes all related P2P messages to send a vector of messages instead of a single message. * Remove bogus lines Included these by accident * Unify handling of BanNode in ProcessMessageXXX methods * Remove bogus check for fMasternodeMode * Properly use == instead of misleading >= in SendMessages * Put "didSend = true" near PushMessage --- src/llmq/quorums_signing_shares.cpp | 174 ++++++++++++++++++++-------- src/llmq/quorums_signing_shares.h | 16 ++- 2 files changed, 139 insertions(+), 51 deletions(-) diff --git a/src/llmq/quorums_signing_shares.cpp b/src/llmq/quorums_signing_shares.cpp index 56d95e164793a..6404a556debca 100644 --- a/src/llmq/quorums_signing_shares.cpp +++ b/src/llmq/quorums_signing_shares.cpp @@ -222,34 +222,76 @@ void CSigSharesManager::ProcessMessage(CNode* pfrom, const std::string& strComma } if (strCommand == NetMsgType::QSIGSESANN) { - CSigSesAnn ann; - vRecv >> ann; - ProcessMessageSigSesAnn(pfrom, ann, connman); + std::vector msgs; + vRecv >> msgs; + if (msgs.size() > MAX_MSGS_CNT_QSIGSESANN) { + LogPrintf("llmq", "CSigSharesManager::%s -- too many announcements in QSIGSESANN message. cnt=%d, max=%d, node=%d\n", __func__, msgs.size(), MAX_MSGS_CNT_QSIGSESANN, pfrom->GetId()); + BanNode(pfrom->GetId()); + return; + } + for (auto& ann : msgs) { + if (!ProcessMessageSigSesAnn(pfrom, ann, connman)) { + BanNode(pfrom->GetId()); + return; + } + } } else if (strCommand == NetMsgType::QSIGSHARESINV) { - CSigSharesInv inv; - vRecv >> inv; - ProcessMessageSigSharesInv(pfrom, inv, connman); + std::vector msgs; + vRecv >> msgs; + if (msgs.size() > MAX_MSGS_CNT_QSIGSHARESINV) { + LogPrintf("llmq", "CSigSharesManager::%s -- too many invs in QSIGSHARESINV message. cnt=%d, max=%d, node=%d\n", __func__, msgs.size(), MAX_MSGS_CNT_QSIGSHARESINV, pfrom->GetId()); + BanNode(pfrom->GetId()); + return; + } + for (auto& inv : msgs) { + if (!ProcessMessageSigSharesInv(pfrom, inv, connman)) { + BanNode(pfrom->GetId()); + return; + } + } } else if (strCommand == NetMsgType::QGETSIGSHARES) { - CSigSharesInv inv; - vRecv >> inv; - ProcessMessageGetSigShares(pfrom, inv, connman); + std::vector msgs; + vRecv >> msgs; + if (msgs.size() > MAX_MSGS_CNT_QGETSIGSHARES) { + LogPrintf("llmq", "CSigSharesManager::%s -- too many invs in QGETSIGSHARES message. cnt=%d, max=%d, node=%d\n", __func__, msgs.size(), MAX_MSGS_CNT_QGETSIGSHARES, pfrom->GetId()); + BanNode(pfrom->GetId()); + return; + } + for (auto& inv : msgs) { + if (!ProcessMessageGetSigShares(pfrom, inv, connman)) { + BanNode(pfrom->GetId()); + return; + } + } } else if (strCommand == NetMsgType::QBSIGSHARES) { - CBatchedSigShares batchedSigShares; - vRecv >> batchedSigShares; - ProcessMessageBatchedSigShares(pfrom, batchedSigShares, connman); + std::vector msgs; + vRecv >> msgs; + size_t totalSigsCount = 0; + for (auto& bs : msgs) { + totalSigsCount += bs.sigShares.size(); + } + if (totalSigsCount > MAX_MSGS_TOTAL_BATCHED_SIGS) { + LogPrintf("llmq", "CSigSharesManager::%s -- too many sigs in QBSIGSHARES message. cnt=%d, max=%d, node=%d\n", __func__, msgs.size(), MAX_MSGS_TOTAL_BATCHED_SIGS, pfrom->GetId()); + BanNode(pfrom->GetId()); + return; + } + for (auto& bs : msgs) { + if (!ProcessMessageBatchedSigShares(pfrom, bs, connman)) { + BanNode(pfrom->GetId()); + return; + } + } } } -void CSigSharesManager::ProcessMessageSigSesAnn(CNode* pfrom, const CSigSesAnn& ann, CConnman& connman) +bool CSigSharesManager::ProcessMessageSigSesAnn(CNode* pfrom, const CSigSesAnn& ann, CConnman& connman) { auto llmqType = (Consensus::LLMQType)ann.llmqType; if (!Params().GetConsensus().llmqs.count(llmqType)) { - BanNode(pfrom->GetId()); - return; + return false; } if (ann.sessionId == (uint32_t)-1 || ann.quorumHash.IsNull() || ann.id.IsNull() || ann.msgHash.IsNull()) { - BanNode(pfrom->GetId()); - return; + return false; } LogPrintf("llmq", "CSigSharesManager::%s -- ann={%s}, node=%d\n", __func__, ann.ToString(), pfrom->GetId()); @@ -259,7 +301,7 @@ void CSigSharesManager::ProcessMessageSigSesAnn(CNode* pfrom, const CSigSesAnn& // TODO should we ban here? LogPrintf("CSigSharesManager::%s -- quorum %s not found, node=%d\n", __func__, ann.quorumHash.ToString(), pfrom->GetId()); - return; + return true; // let's still try other announcements from the same message } auto signHash = llmq::utils::BuildSignHash(llmqType, ann.quorumHash, ann.id, ann.msgHash); @@ -272,37 +314,34 @@ void CSigSharesManager::ProcessMessageSigSesAnn(CNode* pfrom, const CSigSesAnn& session.recvSessionId = ann.sessionId; session.quorum = quorum; nodeState.sessionByRecvId.emplace(ann.sessionId, &session); + + return true; } bool CSigSharesManager::VerifySigSharesInv(NodeId from, Consensus::LLMQType llmqType, const CSigSharesInv& inv) { - if (!fMasterNode || activeMasternodeManager->GetProTx().IsNull()) { - return false; - } - size_t quorumSize = (size_t)Params().GetConsensus().llmqs.at(llmqType).size; if (inv.inv.size() != quorumSize) { - BanNode(from); return false; } return true; } -void CSigSharesManager::ProcessMessageSigSharesInv(CNode* pfrom, const CSigSharesInv& inv, CConnman& connman) +bool CSigSharesManager::ProcessMessageSigSharesInv(CNode* pfrom, const CSigSharesInv& inv, CConnman& connman) { CSigSharesNodeState::SessionInfo sessionInfo; if (!GetSessionInfoByRecvId(pfrom->GetId(), inv.sessionId, sessionInfo)) { - return; + return true; } if (!VerifySigSharesInv(pfrom->GetId(), sessionInfo.llmqType, inv)) { - return; + return false; } // TODO for PoSe, we should consider propagating shares even if we already have a recovered sig if (quorumSigningManager->HasRecoveredSigForSession(sessionInfo.signHash)) { - return; + return true; } LogPrintf("llmq", "CSigSharesManager::%s -- signHash=%s, inv={%s}, node=%d\n", __func__, @@ -312,26 +351,27 @@ void CSigSharesManager::ProcessMessageSigSharesInv(CNode* pfrom, const CSigShare auto& nodeState = nodeStates[pfrom->GetId()]; auto session = nodeState.GetSessionByRecvId(inv.sessionId); if (!session) { - return; + return true; } session->announced.Merge(inv); session->knows.Merge(inv); + return true; } -void CSigSharesManager::ProcessMessageGetSigShares(CNode* pfrom, const CSigSharesInv& inv, CConnman& connman) +bool CSigSharesManager::ProcessMessageGetSigShares(CNode* pfrom, const CSigSharesInv& inv, CConnman& connman) { CSigSharesNodeState::SessionInfo sessionInfo; if (!GetSessionInfoByRecvId(pfrom->GetId(), inv.sessionId, sessionInfo)) { - return; + return true; } if (!VerifySigSharesInv(pfrom->GetId(), sessionInfo.llmqType, inv)) { - return; + return false; } // TODO for PoSe, we should consider propagating shares even if we already have a recovered sig if (quorumSigningManager->HasRecoveredSigForSession(sessionInfo.signHash)) { - return; + return true; } LogPrintf("llmq", "CSigSharesManager::%s -- signHash=%s, inv={%s}, node=%d\n", __func__, @@ -341,26 +381,23 @@ void CSigSharesManager::ProcessMessageGetSigShares(CNode* pfrom, const CSigShare auto& nodeState = nodeStates[pfrom->GetId()]; auto session = nodeState.GetSessionByRecvId(inv.sessionId); if (!session) { - return; + return true; } session->requested.Merge(inv); session->knows.Merge(inv); + return true; } -void CSigSharesManager::ProcessMessageBatchedSigShares(CNode* pfrom, const CBatchedSigShares& batchedSigShares, CConnman& connman) +bool CSigSharesManager::ProcessMessageBatchedSigShares(CNode* pfrom, const CBatchedSigShares& batchedSigShares, CConnman& connman) { CSigSharesNodeState::SessionInfo sessionInfo; if (!GetSessionInfoByRecvId(pfrom->GetId(), batchedSigShares.sessionId, sessionInfo)) { - return; + return true; } bool ban = false; if (!PreVerifyBatchedSigShares(pfrom->GetId(), sessionInfo, batchedSigShares, ban)) { - if (ban) { - BanNode(pfrom->GetId()); - return; - } - return; + return ban; } std::vector sigShares; @@ -395,7 +432,7 @@ void CSigSharesManager::ProcessMessageBatchedSigShares(CNode* pfrom, const CBatc sessionInfo.signHash.ToString(), batchedSigShares.sigShares.size(), sigShares.size(), batchedSigShares.ToInv(sessionInfo.llmqType).ToString(), pfrom->GetId()); if (sigShares.empty()) { - return; + return true; } LOCK(cs); @@ -403,6 +440,7 @@ void CSigSharesManager::ProcessMessageBatchedSigShares(CNode* pfrom, const CBatc for (auto& s : sigShares) { nodeState.pendingIncomingSigShares.Add(s.GetKey(), s); } + return true; } bool CSigSharesManager::PreVerifyBatchedSigShares(NodeId nodeId, const CSigSharesNodeState::SessionInfo& session, const CBatchedSigShares& batchedSigShares, bool& retBan) @@ -1004,47 +1042,89 @@ bool CSigSharesManager::SendMessages() auto it1 = sigSessionAnnouncements.find(pnode->GetId()); if (it1 != sigSessionAnnouncements.end()) { + std::vector msgs; + msgs.reserve(it1->second.size()); for (auto& sigSesAnn : it1->second) { LogPrintf("llmq", "CSigSharesManager::SendMessages -- QSIGSESANN signHash=%s, sessionId=%d, node=%d\n", llmq::utils::BuildSignHash(sigSesAnn).ToString(), sigSesAnn.sessionId, pnode->GetId()); - g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSESANN, sigSesAnn)); + msgs.emplace_back(sigSesAnn); + if (msgs.size() == MAX_MSGS_CNT_QSIGSESANN) { + g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSESANN, msgs)); + msgs.clear(); + didSend = true; + } + } + if (!msgs.empty()) { + g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSESANN, msgs)); didSend = true; } } auto it = sigSharesToRequest.find(pnode->GetId()); if (it != sigSharesToRequest.end()) { + std::vector msgs; for (auto& p : it->second) { assert(p.second.CountSet() != 0); - LogPrintf("llmq", "CSigSharesManager::SendMessages -- QGETSIGSHARES inv={%s}, node=%d\n", - p.second.ToString(), pnode->GetId()); - g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QGETSIGSHARES, p.second)); + LogPrintf("llmq", "CSigSharesManager::SendMessages -- QGETSIGSHARES signHash=%s, inv={%s}, node=%d\n", + p.first.ToString(), p.second.ToString(), pnode->GetId()); + msgs.emplace_back(std::move(p.second)); + if (msgs.size() == MAX_MSGS_CNT_QGETSIGSHARES) { + g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QGETSIGSHARES, msgs)); + msgs.clear(); + didSend = true; + } + } + if (!msgs.empty()) { + g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QGETSIGSHARES, msgs)); didSend = true; } } auto jt = sigSharesToSend.find(pnode->GetId()); if (jt != sigSharesToSend.end()) { + size_t totalSigsCount = 0; + std::vector msgs; for (auto& p : jt->second) { assert(!p.second.sigShares.empty()); if (LogAcceptCategory(BCLog::LogFlags::LLMQ)) { LOCK(cs); auto session = nodeStates[pnode->GetId()].GetSessionBySignHash(p.first); + assert(session); LogPrintf("llmq", "CSigSharesManager::SendMessages -- QBSIGSHARES signHash=%s, inv={%s}, node=%d\n", p.first.ToString(), p.second.ToInv(session->llmqType).ToString(), pnode->GetId()); } - g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QBSIGSHARES, p.second)); + + if (totalSigsCount + p.second.sigShares.size() > MAX_MSGS_TOTAL_BATCHED_SIGS) { + g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QBSIGSHARES, msgs)); + msgs.clear(); + totalSigsCount = 0; + didSend = true; + } + totalSigsCount += p.second.sigShares.size(); + msgs.emplace_back(std::move(p.second)); + } + if (!msgs.empty()) { + g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QBSIGSHARES, std::move(msgs))); didSend = true; } } auto kt = sigSharesToAnnounce.find(pnode->GetId()); if (kt != sigSharesToAnnounce.end()) { + std::vector msgs; for (auto& p : kt->second) { assert(p.second.CountSet() != 0); LogPrintf("llmq", "CSigSharesManager::SendMessages -- QSIGSHARESINV signHash=%s, inv={%s}, node=%d\n", p.first.ToString(), p.second.ToString(), pnode->GetId()); - g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSHARESINV, p.second)); + msgs.emplace_back(std::move(p.second)); + if (msgs.size() == MAX_MSGS_CNT_QSIGSHARESINV) { + g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSHARESINV, msgs)); + msgs.clear(); + didSend = true; + } + } + if (!msgs.empty()) { + g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSHARESINV, msgs)); didSend = true; } } diff --git a/src/llmq/quorums_signing_shares.h b/src/llmq/quorums_signing_shares.h index d83e6da8e9995..ed47f92b9193a 100644 --- a/src/llmq/quorums_signing_shares.h +++ b/src/llmq/quorums_signing_shares.h @@ -329,6 +329,13 @@ class CSigSharesManager : public CRecoveredSigsListener static const int64_t SESSION_TOTAL_TIMEOUT = 5 * 60 * 1000; static const int64_t SIG_SHARE_REQUEST_TIMEOUT = 5 * 1000; + // we try to keep total message size below 10k + const size_t MAX_MSGS_CNT_QSIGSESANN = 100; + const size_t MAX_MSGS_CNT_QGETSIGSHARES = 200; + const size_t MAX_MSGS_CNT_QSIGSHARESINV = 200; + // 400 is the maximum quorum size, so this is also the maximum number of sigs we need to support + const size_t MAX_MSGS_TOTAL_BATCHED_SIGS = 400; + private: RecursiveMutex cs; @@ -370,10 +377,11 @@ class CSigSharesManager : public CRecoveredSigsListener void HandleNewRecoveredSig(const CRecoveredSig& recoveredSig); private: - void ProcessMessageSigSesAnn(CNode* pfrom, const CSigSesAnn& ann, CConnman& connman); - void ProcessMessageSigSharesInv(CNode* pfrom, const CSigSharesInv& inv, CConnman& connman); - void ProcessMessageGetSigShares(CNode* pfrom, const CSigSharesInv& inv, CConnman& connman); - void ProcessMessageBatchedSigShares(CNode* pfrom, const CBatchedSigShares& batchedSigShares, CConnman& connman); + // all of these return false when the currently processed message should be aborted (as each message actually contains multiple messages) + bool ProcessMessageSigSesAnn(CNode* pfrom, const CSigSesAnn& ann, CConnman& connman); + bool ProcessMessageSigSharesInv(CNode* pfrom, const CSigSharesInv& inv, CConnman& connman); + bool ProcessMessageGetSigShares(CNode* pfrom, const CSigSharesInv& inv, CConnman& connman); + bool ProcessMessageBatchedSigShares(CNode* pfrom, const CBatchedSigShares& batchedSigShares, CConnman& connman); bool VerifySigSharesInv(NodeId from, Consensus::LLMQType llmqType, const CSigSharesInv& inv); bool PreVerifyBatchedSigShares(NodeId nodeId, const CSigSharesNodeState::SessionInfo& session, const CBatchedSigShares& batchedSigShares, bool& retBan); From d1084e09e291613bc1ee44ff64e0794fcaddce5b Mon Sep 17 00:00:00 2001 From: Alexander Block Date: Fri, 1 Mar 2019 08:21:28 +0100 Subject: [PATCH 07/10] Actually start the timers for sig share and recSig verification (#2730) Was wondering why verification was always 0ms...this explains it :) --- src/llmq/quorums_signing.cpp | 2 +- src/llmq/quorums_signing_shares.cpp | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/llmq/quorums_signing.cpp b/src/llmq/quorums_signing.cpp index 016cf324b5da4..34a1a9816bfc4 100644 --- a/src/llmq/quorums_signing.cpp +++ b/src/llmq/quorums_signing.cpp @@ -461,7 +461,7 @@ bool CSigningManager::ProcessPendingRecoveredSigs(CConnman& connman) } } - cxxtimer::Timer verifyTimer; + cxxtimer::Timer verifyTimer(true); batchVerifier.Verify(); verifyTimer.stop(); diff --git a/src/llmq/quorums_signing_shares.cpp b/src/llmq/quorums_signing_shares.cpp index 6404a556debca..9ecaddabf6d07 100644 --- a/src/llmq/quorums_signing_shares.cpp +++ b/src/llmq/quorums_signing_shares.cpp @@ -591,7 +591,7 @@ bool CSigSharesManager::ProcessPendingSigShares(CConnman& connman) } } - cxxtimer::Timer verifyTimer; + cxxtimer::Timer verifyTimer(true); batchVerifier.Verify(); verifyTimer.stop(); From a2fb2763c25db3a4285f6d91c2816d27b077e038 Mon Sep 17 00:00:00 2001 From: Alexander Block Date: Fri, 1 Mar 2019 08:21:42 +0100 Subject: [PATCH 08/10] On timeout, print members proTxHashes from members which did not send a share (#2731) * On timeout, print members proTxHashes from members which did not send a share * Move inactive quorums check above timeout checks This allows to reuse things in the next commit * Avoid locking cs_main through GetQuorum by using a pre-filled map * Use find() instead of [] to access quorums map --- src/llmq/quorums_signing_shares.cpp | 83 ++++++++++++++++++----------- 1 file changed, 53 insertions(+), 30 deletions(-) diff --git a/src/llmq/quorums_signing_shares.cpp b/src/llmq/quorums_signing_shares.cpp index 9ecaddabf6d07..92e9c8a17c115 100644 --- a/src/llmq/quorums_signing_shares.cpp +++ b/src/llmq/quorums_signing_shares.cpp @@ -1163,7 +1163,42 @@ void CSigSharesManager::Cleanup() return; } - std::unordered_set, StaticSaltedHasher> quorumsToCheck; + // This map is first filled with all quorums found in all sig shares. Then we remove all inactive quorums and + // loop through all sig shares again to find the ones belonging to the inactive quorums. We then delete the + // sessions belonging to the sig shares. At the same time, we use this map as a cache when we later need to resolve + // quorumHash -> quorumPtr (as GetQuorum() requires cs_main, leading to deadlocks with cs held) + std::unordered_map, CQuorumCPtr, StaticSaltedHasher> quorums; + + { + LOCK(cs); + sigShares.ForEach([&](const SigShareKey& k, const CSigShare& sigShare) { + quorums.emplace(std::make_pair((Consensus::LLMQType) sigShare.llmqType, sigShare.quorumHash), nullptr); + }); + } + + // Find quorums which became inactive + for (auto it = quorums.begin(); it != quorums.end(); ) { + if (llmq::utils::IsQuorumActive(it->first.first, it->first.second)) { + it->second = quorumManager->GetQuorum(it->first.first, it->first.second); + ++it; + } else { + it = quorums.erase(it); + } + } + + { + // Now delete sessions which are for inactive quorums + LOCK(cs); + std::unordered_set inactiveQuorumSessions; + sigShares.ForEach([&](const SigShareKey& k, const CSigShare& sigShare) { + if (!quorums.count(std::make_pair((Consensus::LLMQType)sigShare.llmqType, sigShare.quorumHash))) { + inactiveQuorumSessions.emplace(sigShare.GetSignHash()); + } + }); + for (auto& signHash : inactiveQuorumSessions) { + RemoveSigSharesForSession(signHash); + } + } { LOCK(cs); @@ -1201,41 +1236,29 @@ void CSigSharesManager::Cleanup() assert(m); auto& oneSigShare = m->begin()->second; - LogPrintf("CSigSharesManager::%s -- signing session timed out. signHash=%s, id=%s, msgHash=%s, sigShareCount=%d\n", __func__, - signHash.ToString(), oneSigShare.id.ToString(), oneSigShare.msgHash.ToString(), count); + + std::string strMissingMembers; + if (LogAcceptCategory(BCLog::LogFlags::LLMQ)) { + auto quorumIt = quorums.find(std::make_pair((Consensus::LLMQType)oneSigShare.llmqType, oneSigShare.quorumHash)); + if (quorumIt != quorums.end()) { + auto& quorum = quorumIt->second; + for (size_t i = 0; i < quorum->members.size(); i++) { + if (!m->count((uint16_t)i)) { + auto& dmn = quorum->members[i]; + strMissingMembers += strprintf("\n %s", dmn->proTxHash.ToString()); + } + } + } + } + + LogPrintf("CSigSharesManager::%s -- signing session timed out. signHash=%s, id=%s, msgHash=%s, sigShareCount=%d, missingMembers=%s\n", __func__, + signHash.ToString(), oneSigShare.id.ToString(), oneSigShare.msgHash.ToString(), count, strMissingMembers); } else { LogPrintf("CSigSharesManager::%s -- signing session timed out. signHash=%s, sigShareCount=%d\n", __func__, signHash.ToString(), count); } RemoveSigSharesForSession(signHash); } - - sigShares.ForEach([&](const SigShareKey& k, const CSigShare& sigShare) { - quorumsToCheck.emplace((Consensus::LLMQType)sigShare.llmqType, sigShare.quorumHash); - }); - } - - // Find quorums which became inactive - for (auto it = quorumsToCheck.begin(); it != quorumsToCheck.end();) { - if (llmq::utils::IsQuorumActive(it->first, it->second)) { - it = quorumsToCheck.erase(it); - } else { - ++it; - } - } - - { - // Now delete sessions which are for inactive quorums - LOCK(cs); - std::unordered_set inactiveQuorumSessions; - sigShares.ForEach([&](const SigShareKey& k, const CSigShare& sigShare) { - if (quorumsToCheck.count(std::make_pair((Consensus::LLMQType)sigShare.llmqType, sigShare.quorumHash))) { - inactiveQuorumSessions.emplace(sigShare.GetSignHash()); - } - }); - for (auto& signHash : inactiveQuorumSessions) { - RemoveSigSharesForSession(signHash); - } } // Find node states for peers that disappeared from CConnman From b4a4e09c2c03a9a92f9518fb922ca47060e4cb74 Mon Sep 17 00:00:00 2001 From: Alexander Block Date: Fri, 1 Mar 2019 16:30:11 +0100 Subject: [PATCH 09/10] Ignore sig share inv messages when we don't have the quorum vvec (#2733) * Ignore sig share inv messages when we don't have the quorum vvec * Update src/llmq/quorums_signing_shares.cpp Co-Authored-By: codablock --- src/llmq/quorums_signing_shares.cpp | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/llmq/quorums_signing_shares.cpp b/src/llmq/quorums_signing_shares.cpp index 92e9c8a17c115..2b6fb28f88a27 100644 --- a/src/llmq/quorums_signing_shares.cpp +++ b/src/llmq/quorums_signing_shares.cpp @@ -347,6 +347,13 @@ bool CSigSharesManager::ProcessMessageSigSharesInv(CNode* pfrom, const CSigShare LogPrintf("llmq", "CSigSharesManager::%s -- signHash=%s, inv={%s}, node=%d\n", __func__, sessionInfo.signHash.ToString(), inv.ToString(), pfrom->GetId()); + if (sessionInfo.quorum->quorumVvec == nullptr) { + // TODO we should allow to ask other nodes for the quorum vvec if we missed it in the DKG + LogPrintf("CSigSharesManager::%s -- we don't have the quorum vvec for %s, not requesting sig shares. node=%d\n", __func__, + sessionInfo.quorumHash.ToString(), pfrom->GetId()); + return true; + } + LOCK(cs); auto& nodeState = nodeStates[pfrom->GetId()]; auto session = nodeState.GetSessionByRecvId(inv.sessionId); From a29d29474934099bd32b73258dc9d263669db66c Mon Sep 17 00:00:00 2001 From: Alexander Block Date: Mon, 11 Mar 2019 14:31:51 +0100 Subject: [PATCH 10/10] Fix deadlock in CSigSharesManager::SendMessages (#2757) * Fix deadlock in CSigSharesManager::SendMessages Locking "cs" at this location caused a (potential) deadlock due to changed order of cs and cs_vNodes locking. This changes the method to not require the session object anymore which removes the need for locking. * Pass size of LLMQ instead of llmqType into CSigSharesInv::Init This allows use of sizes which are not supported in chainparams. --- src/llmq/quorums_signing_shares.cpp | 40 ++++++++++++++--------------- src/llmq/quorums_signing_shares.h | 4 +-- 2 files changed, 21 insertions(+), 23 deletions(-) diff --git a/src/llmq/quorums_signing_shares.cpp b/src/llmq/quorums_signing_shares.cpp index 2b6fb28f88a27..538ef90734f97 100644 --- a/src/llmq/quorums_signing_shares.cpp +++ b/src/llmq/quorums_signing_shares.cpp @@ -70,10 +70,9 @@ std::string CSigSharesInv::ToString() const return str; } -void CSigSharesInv::Init(Consensus::LLMQType _llmqType) +void CSigSharesInv::Init(size_t size) { - size_t llmqSize = (size_t)(Params().GetConsensus().llmqs.at(_llmqType).size); - inv.resize(llmqSize, false); + inv.resize(size, false); } bool CSigSharesInv::IsSet(uint16_t quorumMember) const @@ -88,27 +87,30 @@ void CSigSharesInv::Set(uint16_t quorumMember, bool v) inv[quorumMember] = v; } -CSigSharesInv CBatchedSigShares::ToInv(Consensus::LLMQType llmqType) const +std::string CBatchedSigShares::ToInvString() const { CSigSharesInv inv; - inv.Init(llmqType); + // we use 400 here no matter what the real size is. We don't really care about that size as we just want to call ToString() + inv.Init(400); for (size_t i = 0; i < sigShares.size(); i++) { inv.inv[sigShares[i].first] = true; } - return inv; + return inv.ToString(); } template static void InitSession(CSigSharesNodeState::Session& s, const uint256& signHash, T& from) { + const auto& params = Params().GetConsensus().llmqs.at((Consensus::LLMQType)from.llmqType); + s.llmqType = (Consensus::LLMQType)from.llmqType; s.quorumHash = from.quorumHash; s.id = from.id; s.msgHash = from.msgHash; s.signHash = signHash; - s.announced.Init((Consensus::LLMQType)from.llmqType); - s.requested.Init((Consensus::LLMQType)from.llmqType); - s.knows.Init((Consensus::LLMQType)from.llmqType); + s.announced.Init((size_t)params.size); + s.requested.Init((size_t)params.size); + s.knows.Init((size_t)params.size); } CSigSharesNodeState::Session& CSigSharesNodeState::GetOrCreateSessionFromShare(const llmq::CSigShare& sigShare) @@ -435,8 +437,8 @@ bool CSigSharesManager::ProcessMessageBatchedSigShares(CNode* pfrom, const CBatc } } - LogPrintf("llmq", "CSigSharesManager::%s -- signHash=%s, shares=%d, new=%d, inv={%s}, node=%d\n", __func__, - sessionInfo.signHash.ToString(), batchedSigShares.sigShares.size(), sigShares.size(), batchedSigShares.ToInv(sessionInfo.llmqType).ToString(), pfrom->GetId()); + LogPrint(BCLog::LLMQ, "CSigSharesManager::%s -- signHash=%s, shares=%d, new=%d, inv={%s}, node=%d\n", __func__, + sessionInfo.signHash.ToString(), batchedSigShares.sigShares.size(), sigShares.size(), batchedSigShares.ToInvString(), pfrom->GetId()); if (sigShares.empty()) { return true; @@ -862,7 +864,8 @@ void CSigSharesManager::CollectSigSharesToRequest(std::unordered_mapllmqType); + const auto& params = Params().GetConsensus().llmqs.at((Consensus::LLMQType)sigShare->llmqType); + inv.Init((size_t)params.size); } inv.inv[quorumMember] = true; session.knows.inv[quorumMember] = true; @@ -1093,14 +1097,8 @@ bool CSigSharesManager::SendMessages() std::vector msgs; for (auto& p : jt->second) { assert(!p.second.sigShares.empty()); - if (LogAcceptCategory(BCLog::LogFlags::LLMQ)) { - LOCK(cs); - auto session = nodeStates[pnode->GetId()].GetSessionBySignHash(p.first); - assert(session); - LogPrintf("llmq", "CSigSharesManager::SendMessages -- QBSIGSHARES signHash=%s, inv={%s}, node=%d\n", - p.first.ToString(), p.second.ToInv(session->llmqType).ToString(), pnode->GetId()); - } - + LogPrint(BCLog::LLMQ, "CSigSharesManager::SendMessages -- QBSIGSHARES signHash=%s, inv={%s}, node=%d\n", + p.first.ToString(), p.second.ToInvString(), pnode->GetId()); if (totalSigsCount + p.second.sigShares.size() > MAX_MSGS_TOTAL_BATCHED_SIGS) { g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QBSIGSHARES, msgs)); msgs.clear(); diff --git a/src/llmq/quorums_signing_shares.h b/src/llmq/quorums_signing_shares.h index ed47f92b9193a..da53b93ae2ca2 100644 --- a/src/llmq/quorums_signing_shares.h +++ b/src/llmq/quorums_signing_shares.h @@ -97,7 +97,7 @@ class CSigSharesInv READWRITE(AUTOBITSET(obj.inv, (size_t)invSize)); } - void Init(Consensus::LLMQType _llmqType); + void Init(size_t size); bool IsSet(uint16_t quorumMember) const; void Set(uint16_t quorumMember, bool v); void Merge(const CSigSharesInv& inv2); @@ -120,7 +120,7 @@ class CBatchedSigShares READWRITE(obj.sigShares); } - CSigSharesInv ToInv(Consensus::LLMQType llmqType) const; + std::string ToInvString() const; }; template