From 4de0dbac9b286c42a9b10132b7c2d76712f1a319 Mon Sep 17 00:00:00 2001 From: Suhas Daftuar Date: Fri, 8 Mar 2019 14:26:36 -0500 Subject: [PATCH] [refactor] Move tx relay state to separate structure --- src/net.cpp | 15 +++--- src/net.h | 64 ++++++++++++++----------- src/net_processing.cpp | 105 +++++++++++++++++++++-------------------- 3 files changed, 95 insertions(+), 89 deletions(-) diff --git a/src/net.cpp b/src/net.cpp index 0464a6e9e..527c00130 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -500,8 +500,8 @@ void CNode::copyStats(CNodeStats &stats) X(addr); X(addrBind); { - LOCK(cs_filter); - X(fRelayTxes); + LOCK(m_tx_relay.cs_filter); + stats.fRelayTxes = m_tx_relay.fRelayTxes; } X(nLastSend); X(nLastRecv); @@ -529,8 +529,8 @@ void CNode::copyStats(CNodeStats &stats) X(m_legacyWhitelisted); X(m_permissionFlags); { - LOCK(cs_feeFilter); - X(minFeeFilter); + LOCK(m_tx_relay.cs_feeFilter); + stats.minFeeFilter = m_tx_relay.minFeeFilter; } // It is common for nodes with good ping times to suddenly become lagged, @@ -818,11 +818,11 @@ bool CConnman::AttemptToEvictConnection() continue; if (node->fDisconnect) continue; - LOCK(node->cs_filter); + LOCK(node->m_tx_relay.cs_filter); NodeEvictionCandidate candidate = {node->GetId(), node->nTimeConnected, node->nMinPingUsecTime, node->nLastBlockTime, node->nLastTXTime, HasAllDesirableServiceFlags(node->nServices), - node->fRelayTxes, node->pfilter != nullptr, node->addr, node->nKeyedNetGroup, + node->m_tx_relay.fRelayTxes, node->m_tx_relay.pfilter != nullptr, node->addr, node->nKeyedNetGroup, node->m_prefer_evict}; vEvictionCandidates.push_back(candidate); } @@ -2625,7 +2625,6 @@ CNode::CNode(NodeId idIn, ServiceFlags nLocalServicesIn, int nMyStartingHeightIn fInbound(fInboundIn), nKeyedNetGroup(nKeyedNetGroupIn), addrKnown(5000, 0.001), - filterInventoryKnown(50000, 0.000001), id(idIn), nLocalHostNonce(nLocalHostNonceIn), nLocalServices(nLocalServicesIn), @@ -2634,8 +2633,6 @@ CNode::CNode(NodeId idIn, ServiceFlags nLocalServicesIn, int nMyStartingHeightIn hSocket = hSocketIn; addrName = addrNameIn == "" ? addr.ToStringIPPort() : addrNameIn; hashContinue = uint256(); - filterInventoryKnown.reset(); - pfilter = MakeUnique(); for (const std::string &msg : getAllNetMessageTypes()) mapRecvBytesPerMsgCmd[msg] = 0; diff --git a/src/net.h b/src/net.h index 4e8a96497..bd46d995e 100644 --- a/src/net.h +++ b/src/net.h @@ -676,15 +676,8 @@ public: // Setting fDisconnect to true will cause the node to be disconnected the // next time DisconnectNodes() runs std::atomic_bool fDisconnect{false}; - // We use fRelayTxes for two purposes - - // a) it allows us to not relay tx invs before receiving the peer's version message - // b) the peer may tell us in its version message that we should not relay tx invs - // unless it loads a bloom filter. - bool fRelayTxes GUARDED_BY(cs_filter){false}; bool fSentAddr{false}; CSemaphoreGrant grantOutbound; - mutable CCriticalSection cs_filter; - std::unique_ptr pfilter PT_GUARDED_BY(cs_filter); std::atomic nRefCount{0}; const uint64_t nKeyedNetGroup; @@ -706,24 +699,43 @@ public: int64_t nNextAddrSend GUARDED_BY(cs_sendProcessing){0}; int64_t nNextLocalAddrSend GUARDED_BY(cs_sendProcessing){0}; - // inventory based relay - CRollingBloomFilter filterInventoryKnown GUARDED_BY(cs_inventory); - // Set of transaction ids we still have to announce. - // They are sorted by the mempool before relay, so the order is not important. - std::set setInventoryTxToSend; // List of block ids we still have announce. // There is no final sorting before sending, as they are always sent immediately // and in the order requested. std::vector vInventoryBlockToSend GUARDED_BY(cs_inventory); CCriticalSection cs_inventory; - int64_t nNextInvSend{0}; + + struct TxRelay { + TxRelay() { pfilter = MakeUnique(); } + mutable CCriticalSection cs_filter; + // We use fRelayTxes for two purposes - + // a) it allows us to not relay tx invs before receiving the peer's version message + // b) the peer may tell us in its version message that we should not relay tx invs + // unless it loads a bloom filter. + bool fRelayTxes GUARDED_BY(cs_filter){false}; + std::unique_ptr pfilter PT_GUARDED_BY(cs_filter) GUARDED_BY(cs_filter); + + mutable CCriticalSection cs_tx_inventory; + CRollingBloomFilter filterInventoryKnown GUARDED_BY(cs_tx_inventory){50000, 0.000001}; + // Set of transaction ids we still have to announce. + // They are sorted by the mempool before relay, so the order is not important. + std::set setInventoryTxToSend; + // Used for BIP35 mempool sending + bool fSendMempool GUARDED_BY(cs_tx_inventory){false}; + // Last time a "MEMPOOL" request was serviced. + std::atomic timeLastMempoolReq{0}; + int64_t nNextInvSend{0}; + + CCriticalSection cs_feeFilter; + // Minimum fee rate with which to filter inv's to this node + CAmount minFeeFilter GUARDED_BY(cs_feeFilter){0}; + CAmount lastSentFeeFilter{0}; + int64_t nextSendTimeFeeFilter{0}; + }; + + TxRelay m_tx_relay; // Used for headers announcements - unfiltered blocks to relay std::vector vBlockHashesToAnnounce GUARDED_BY(cs_inventory); - // Used for BIP35 mempool sending - bool fSendMempool GUARDED_BY(cs_inventory){false}; - - // Last time a "MEMPOOL" request was serviced. - std::atomic timeLastMempoolReq{0}; // Block and TXN accept times std::atomic nLastBlockTime{0}; @@ -740,11 +752,6 @@ public: std::atomic nMinPingUsecTime{std::numeric_limits::max()}; // Whether a ping is requested. std::atomic fPingQueued{false}; - // Minimum fee rate with which to filter inv's to this node - CAmount minFeeFilter GUARDED_BY(cs_feeFilter){0}; - CCriticalSection cs_feeFilter; - CAmount lastSentFeeFilter{0}; - int64_t nextSendTimeFeeFilter{0}; std::set orphan_work_set; @@ -842,19 +849,20 @@ public: void AddInventoryKnown(const CInv& inv) { { - LOCK(cs_inventory); - filterInventoryKnown.insert(inv.hash); + LOCK(m_tx_relay.cs_tx_inventory); + m_tx_relay.filterInventoryKnown.insert(inv.hash); } } void PushInventory(const CInv& inv) { - LOCK(cs_inventory); if (inv.type == MSG_TX) { - if (!filterInventoryKnown.contains(inv.hash)) { - setInventoryTxToSend.insert(inv.hash); + LOCK(m_tx_relay.cs_tx_inventory); + if (!m_tx_relay.filterInventoryKnown.contains(inv.hash)) { + m_tx_relay.setInventoryTxToSend.insert(inv.hash); } } else if (inv.type == MSG_BLOCK) { + LOCK(cs_inventory); vInventoryBlockToSend.push_back(inv.hash); } } diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 3db460d44..c88d17b6e 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -1449,10 +1449,10 @@ void static ProcessGetBlockData(CNode* pfrom, const CChainParams& chainparams, c bool sendMerkleBlock = false; CMerkleBlock merkleBlock; { - LOCK(pfrom->cs_filter); - if (pfrom->pfilter) { + LOCK(pfrom->m_tx_relay.cs_filter); + if (pfrom->m_tx_relay.pfilter) { sendMerkleBlock = true; - merkleBlock = CMerkleBlock(*pblock, *pfrom->pfilter); + merkleBlock = CMerkleBlock(*pblock, *pfrom->m_tx_relay.pfilter); } } if (sendMerkleBlock) { @@ -1532,11 +1532,11 @@ void static ProcessGetData(CNode* pfrom, const CChainParams& chainparams, CConnm if (mi != mapRelay.end()) { connman->PushMessage(pfrom, msgMaker.Make(nSendFlags, NetMsgType::TX, *mi->second)); push = true; - } else if (pfrom->timeLastMempoolReq) { + } else if (pfrom->m_tx_relay.timeLastMempoolReq) { auto txinfo = mempool.info(inv.hash); // To protect privacy, do not answer getdata using the mempool when // that TX couldn't have been INVed in reply to a MEMPOOL request. - if (txinfo.tx && txinfo.nTime <= pfrom->timeLastMempoolReq) { + if (txinfo.tx && txinfo.nTime <= pfrom->m_tx_relay.timeLastMempoolReq) { connman->PushMessage(pfrom, msgMaker.Make(nSendFlags, NetMsgType::TX, *txinfo.tx)); push = true; } @@ -1996,8 +1996,8 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr pfrom->m_limited_node = (!(nServices & NODE_NETWORK) && (nServices & NODE_NETWORK_LIMITED)); { - LOCK(pfrom->cs_filter); - pfrom->fRelayTxes = fRelay; // set to true after we get the first filter* message + LOCK(pfrom->m_tx_relay.cs_filter); + pfrom->m_tx_relay.fRelayTxes = fRelay; // set to true after we get the first filter* message } // Change version @@ -3030,8 +3030,8 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr return true; } - LOCK(pfrom->cs_inventory); - pfrom->fSendMempool = true; + LOCK(pfrom->m_tx_relay.cs_tx_inventory); + pfrom->m_tx_relay.fSendMempool = true; return true; } @@ -3124,10 +3124,10 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr } else { - LOCK(pfrom->cs_filter); - pfrom->pfilter.reset(new CBloomFilter(filter)); - pfrom->pfilter->UpdateEmptyFull(); - pfrom->fRelayTxes = true; + LOCK(pfrom->m_tx_relay.cs_filter); + pfrom->m_tx_relay.pfilter.reset(new CBloomFilter(filter)); + pfrom->m_tx_relay.pfilter->UpdateEmptyFull(); + pfrom->m_tx_relay.fRelayTxes = true; } return true; } @@ -3142,9 +3142,9 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr if (vData.size() > MAX_SCRIPT_ELEMENT_SIZE) { bad = true; } else { - LOCK(pfrom->cs_filter); - if (pfrom->pfilter) { - pfrom->pfilter->insert(vData); + LOCK(pfrom->m_tx_relay.cs_filter); + if (pfrom->m_tx_relay.pfilter) { + pfrom->m_tx_relay.pfilter->insert(vData); } else { bad = true; } @@ -3157,11 +3157,11 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr } if (strCommand == NetMsgType::FILTERCLEAR) { - LOCK(pfrom->cs_filter); + LOCK(pfrom->m_tx_relay.cs_filter); if (pfrom->GetLocalServices() & NODE_BLOOM) { - pfrom->pfilter.reset(new CBloomFilter()); + pfrom->m_tx_relay.pfilter.reset(new CBloomFilter()); } - pfrom->fRelayTxes = true; + pfrom->m_tx_relay.fRelayTxes = true; return true; } @@ -3170,8 +3170,8 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr vRecv >> newFeeFilter; if (MoneyRange(newFeeFilter)) { { - LOCK(pfrom->cs_feeFilter); - pfrom->minFeeFilter = newFeeFilter; + LOCK(pfrom->m_tx_relay.cs_feeFilter); + pfrom->m_tx_relay.minFeeFilter = newFeeFilter; } LogPrint(BCLog::NET, "received: feefilter of %s from peer=%d\n", CFeeRate(newFeeFilter).ToString(), pfrom->GetId()); } @@ -3791,69 +3791,70 @@ bool PeerLogicValidation::SendMessages(CNode* pto) } pto->vInventoryBlockToSend.clear(); + LOCK(pto->m_tx_relay.cs_tx_inventory); // Check whether periodic sends should happen bool fSendTrickle = pto->HasPermission(PF_NOBAN); - if (pto->nNextInvSend < nNow) { + if (pto->m_tx_relay.nNextInvSend < nNow) { fSendTrickle = true; if (pto->fInbound) { - pto->nNextInvSend = connman->PoissonNextSendInbound(nNow, INVENTORY_BROADCAST_INTERVAL); + pto->m_tx_relay.nNextInvSend = connman->PoissonNextSendInbound(nNow, INVENTORY_BROADCAST_INTERVAL); } else { // Use half the delay for outbound peers, as there is less privacy concern for them. - pto->nNextInvSend = PoissonNextSend(nNow, INVENTORY_BROADCAST_INTERVAL >> 1); + pto->m_tx_relay.nNextInvSend = PoissonNextSend(nNow, INVENTORY_BROADCAST_INTERVAL >> 1); } } // Time to send but the peer has requested we not relay transactions. if (fSendTrickle) { - LOCK(pto->cs_filter); - if (!pto->fRelayTxes) pto->setInventoryTxToSend.clear(); + LOCK(pto->m_tx_relay.cs_filter); + if (!pto->m_tx_relay.fRelayTxes) pto->m_tx_relay.setInventoryTxToSend.clear(); } // Respond to BIP35 mempool requests - if (fSendTrickle && pto->fSendMempool) { + if (fSendTrickle && pto->m_tx_relay.fSendMempool) { auto vtxinfo = mempool.infoAll(); - pto->fSendMempool = false; + pto->m_tx_relay.fSendMempool = false; CAmount filterrate = 0; { - LOCK(pto->cs_feeFilter); - filterrate = pto->minFeeFilter; + LOCK(pto->m_tx_relay.cs_feeFilter); + filterrate = pto->m_tx_relay.minFeeFilter; } - LOCK(pto->cs_filter); + LOCK(pto->m_tx_relay.cs_filter); for (const auto& txinfo : vtxinfo) { const uint256& hash = txinfo.tx->GetHash(); CInv inv(MSG_TX, hash); - pto->setInventoryTxToSend.erase(hash); + pto->m_tx_relay.setInventoryTxToSend.erase(hash); if (filterrate) { if (txinfo.feeRate.GetFeePerK() < filterrate) continue; } - if (pto->pfilter) { - if (!pto->pfilter->IsRelevantAndUpdate(*txinfo.tx)) continue; + if (pto->m_tx_relay.pfilter) { + if (!pto->m_tx_relay.pfilter->IsRelevantAndUpdate(*txinfo.tx)) continue; } - pto->filterInventoryKnown.insert(hash); + pto->m_tx_relay.filterInventoryKnown.insert(hash); vInv.push_back(inv); if (vInv.size() == MAX_INV_SZ) { connman->PushMessage(pto, msgMaker.Make(NetMsgType::INV, vInv)); vInv.clear(); } } - pto->timeLastMempoolReq = GetTime(); + pto->m_tx_relay.timeLastMempoolReq = GetTime(); } // Determine transactions to relay if (fSendTrickle) { // Produce a vector with all candidates for sending std::vector::iterator> vInvTx; - vInvTx.reserve(pto->setInventoryTxToSend.size()); - for (std::set::iterator it = pto->setInventoryTxToSend.begin(); it != pto->setInventoryTxToSend.end(); it++) { + vInvTx.reserve(pto->m_tx_relay.setInventoryTxToSend.size()); + for (std::set::iterator it = pto->m_tx_relay.setInventoryTxToSend.begin(); it != pto->m_tx_relay.setInventoryTxToSend.end(); it++) { vInvTx.push_back(it); } CAmount filterrate = 0; { - LOCK(pto->cs_feeFilter); - filterrate = pto->minFeeFilter; + LOCK(pto->m_tx_relay.cs_feeFilter); + filterrate = pto->m_tx_relay.minFeeFilter; } // Topologically and fee-rate sort the inventory we send for privacy and priority reasons. // A heap is used so that not all items need sorting if only a few are being sent. @@ -3862,7 +3863,7 @@ bool PeerLogicValidation::SendMessages(CNode* pto) // No reason to drain out at many times the network's capacity, // especially since we have many peers and some will draw much shorter delays. unsigned int nRelayedTransactions = 0; - LOCK(pto->cs_filter); + LOCK(pto->m_tx_relay.cs_filter); while (!vInvTx.empty() && nRelayedTransactions < INVENTORY_BROADCAST_MAX) { // Fetch the top element from the heap std::pop_heap(vInvTx.begin(), vInvTx.end(), compareInvMempoolOrder); @@ -3870,9 +3871,9 @@ bool PeerLogicValidation::SendMessages(CNode* pto) vInvTx.pop_back(); uint256 hash = *it; // Remove it from the to-be-sent set - pto->setInventoryTxToSend.erase(it); + pto->m_tx_relay.setInventoryTxToSend.erase(it); // Check if not in the filter already - if (pto->filterInventoryKnown.contains(hash)) { + if (pto->m_tx_relay.filterInventoryKnown.contains(hash)) { continue; } // Not in the mempool anymore? don't bother sending it. @@ -3883,7 +3884,7 @@ bool PeerLogicValidation::SendMessages(CNode* pto) if (filterrate && txinfo.feeRate.GetFeePerK() < filterrate) { continue; } - if (pto->pfilter && !pto->pfilter->IsRelevantAndUpdate(*txinfo.tx)) continue; + if (pto->m_tx_relay.pfilter && !pto->m_tx_relay.pfilter->IsRelevantAndUpdate(*txinfo.tx)) continue; // Send vInv.push_back(CInv(MSG_TX, hash)); nRelayedTransactions++; @@ -3904,7 +3905,7 @@ bool PeerLogicValidation::SendMessages(CNode* pto) connman->PushMessage(pto, msgMaker.Make(NetMsgType::INV, vInv)); vInv.clear(); } - pto->filterInventoryKnown.insert(hash); + pto->m_tx_relay.filterInventoryKnown.insert(hash); } } } @@ -4069,23 +4070,23 @@ bool PeerLogicValidation::SendMessages(CNode* pto) !pto->HasPermission(PF_FORCERELAY)) { CAmount currentFilter = mempool.GetMinFee(gArgs.GetArg("-maxmempool", DEFAULT_MAX_MEMPOOL_SIZE) * 1000000).GetFeePerK(); int64_t timeNow = GetTimeMicros(); - if (timeNow > pto->nextSendTimeFeeFilter) { + if (timeNow > pto->m_tx_relay.nextSendTimeFeeFilter) { static CFeeRate default_feerate(DEFAULT_MIN_RELAY_TX_FEE); static FeeFilterRounder filterRounder(default_feerate); CAmount filterToSend = filterRounder.round(currentFilter); // We always have a fee filter of at least minRelayTxFee filterToSend = std::max(filterToSend, ::minRelayTxFee.GetFeePerK()); - if (filterToSend != pto->lastSentFeeFilter) { + if (filterToSend != pto->m_tx_relay.lastSentFeeFilter) { connman->PushMessage(pto, msgMaker.Make(NetMsgType::FEEFILTER, filterToSend)); - pto->lastSentFeeFilter = filterToSend; + pto->m_tx_relay.lastSentFeeFilter = filterToSend; } - pto->nextSendTimeFeeFilter = PoissonNextSend(timeNow, AVG_FEEFILTER_BROADCAST_INTERVAL); + pto->m_tx_relay.nextSendTimeFeeFilter = PoissonNextSend(timeNow, AVG_FEEFILTER_BROADCAST_INTERVAL); } // If the fee filter has changed substantially and it's still more than MAX_FEEFILTER_CHANGE_DELAY // until scheduled broadcast, then move the broadcast to within MAX_FEEFILTER_CHANGE_DELAY. - else if (timeNow + MAX_FEEFILTER_CHANGE_DELAY * 1000000 < pto->nextSendTimeFeeFilter && - (currentFilter < 3 * pto->lastSentFeeFilter / 4 || currentFilter > 4 * pto->lastSentFeeFilter / 3)) { - pto->nextSendTimeFeeFilter = timeNow + GetRandInt(MAX_FEEFILTER_CHANGE_DELAY) * 1000000; + else if (timeNow + MAX_FEEFILTER_CHANGE_DELAY * 1000000 < pto->m_tx_relay.nextSendTimeFeeFilter && + (currentFilter < 3 * pto->m_tx_relay.lastSentFeeFilter / 4 || currentFilter > 4 * pto->m_tx_relay.lastSentFeeFilter / 3)) { + pto->m_tx_relay.nextSendTimeFeeFilter = timeNow + GetRandInt(MAX_FEEFILTER_CHANGE_DELAY) * 1000000; } } }