From 9453018fdc8f02d42832374bcf1d6e3a1df02281 Mon Sep 17 00:00:00 2001 From: Pieter Wuille Date: Tue, 19 Mar 2019 18:06:35 -0700 Subject: [PATCH 1/3] Simplify orphan processing in preparation for interruptibility --- src/net_processing.cpp | 108 ++++++++++++++++++++--------------------- 1 file changed, 53 insertions(+), 55 deletions(-) diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 0247b9cc7..d97ad4a3a 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -2342,8 +2342,8 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr return true; } - std::deque vWorkQueue; - std::vector vEraseQueue; + std::set orphan_work_set; + CTransactionRef ptx; vRecv >> ptx; const CTransaction& tx = *ptx; @@ -2368,7 +2368,12 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr mempool.check(pcoinsTip.get()); RelayTransaction(tx, connman); for (unsigned int i = 0; i < tx.vout.size(); i++) { - vWorkQueue.emplace_back(inv.hash, i); + auto it_by_prev = mapOrphanTransactionsByPrev.find(COutPoint(inv.hash, i)); + if (it_by_prev != mapOrphanTransactionsByPrev.end()) { + for (const auto& elem : it_by_prev->second) { + orphan_work_set.insert(elem->first); + } + } } pfrom->nLastTXTime = GetTime(); @@ -2380,64 +2385,57 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr // Recursively process any orphan transactions that depended on this one std::set setMisbehaving; - while (!vWorkQueue.empty()) { - auto itByPrev = mapOrphanTransactionsByPrev.find(vWorkQueue.front()); - vWorkQueue.pop_front(); - if (itByPrev == mapOrphanTransactionsByPrev.end()) - continue; - for (auto mi = itByPrev->second.begin(); - mi != itByPrev->second.end(); - ++mi) - { - const CTransactionRef& porphanTx = (*mi)->second.tx; - const CTransaction& orphanTx = *porphanTx; - const uint256& orphanHash = orphanTx.GetHash(); - NodeId fromPeer = (*mi)->second.fromPeer; - bool fMissingInputs2 = false; - // Use a dummy CValidationState so someone can't setup nodes to counter-DoS based on orphan - // resolution (that is, feeding people an invalid transaction based on LegitTxX in order to get - // anyone relaying LegitTxX banned) - CValidationState stateDummy; + while (!orphan_work_set.empty()) { + const uint256 orphanHash = *orphan_work_set.begin(); + orphan_work_set.erase(orphan_work_set.begin()); + auto orphan_it = mapOrphanTransactions.find(orphanHash); + if (orphan_it == mapOrphanTransactions.end()) continue; - if (setMisbehaving.count(fromPeer)) - continue; - if (AcceptToMemoryPool(mempool, stateDummy, porphanTx, &fMissingInputs2, &lRemovedTxn, false /* bypass_limits */, 0 /* nAbsurdFee */)) { - LogPrint(BCLog::MEMPOOL, " accepted orphan tx %s\n", orphanHash.ToString()); - RelayTransaction(orphanTx, connman); - for (unsigned int i = 0; i < orphanTx.vout.size(); i++) { - vWorkQueue.emplace_back(orphanHash, i); - } - vEraseQueue.push_back(orphanHash); - } - else if (!fMissingInputs2) - { - int nDos = 0; - if (stateDummy.IsInvalid(nDos) && nDos > 0) - { - // Punish peer that gave us an invalid orphan tx - Misbehaving(fromPeer, nDos); - setMisbehaving.insert(fromPeer); - LogPrint(BCLog::MEMPOOL, " invalid orphan tx %s\n", orphanHash.ToString()); - } - // Has inputs but not accepted to mempool - // Probably non-standard or insufficient fee - LogPrint(BCLog::MEMPOOL, " removed orphan tx %s\n", orphanHash.ToString()); - vEraseQueue.push_back(orphanHash); - if (!orphanTx.HasWitness() && !stateDummy.CorruptionPossible()) { - // Do not use rejection cache for witness transactions or - // witness-stripped transactions, as they can have been malleated. - // See https://github.com/bitcoin/bitcoin/issues/8279 for details. - assert(recentRejects); - recentRejects->insert(orphanHash); + const CTransactionRef porphanTx = orphan_it->second.tx; + const CTransaction& orphanTx = *porphanTx; + NodeId fromPeer = orphan_it->second.fromPeer; + bool fMissingInputs2 = false; + // Use a dummy CValidationState so someone can't setup nodes to counter-DoS based on orphan + // resolution (that is, feeding people an invalid transaction based on LegitTxX in order to get + // anyone relaying LegitTxX banned) + CValidationState stateDummy; + + if (setMisbehaving.count(fromPeer)) continue; + if (AcceptToMemoryPool(mempool, stateDummy, porphanTx, &fMissingInputs2, &lRemovedTxn, false /* bypass_limits */, 0 /* nAbsurdFee */)) { + LogPrint(BCLog::MEMPOOL, " accepted orphan tx %s\n", orphanHash.ToString()); + RelayTransaction(orphanTx, connman); + for (unsigned int i = 0; i < orphanTx.vout.size(); i++) { + auto it_by_prev = mapOrphanTransactionsByPrev.find(COutPoint(orphanHash, i)); + if (it_by_prev != mapOrphanTransactionsByPrev.end()) { + for (const auto& elem : it_by_prev->second) { + orphan_work_set.insert(elem->first); + } } } - mempool.check(pcoinsTip.get()); + EraseOrphanTx(orphanHash); + } else if (!fMissingInputs2) { + int nDos = 0; + if (stateDummy.IsInvalid(nDos) && nDos > 0) { + // Punish peer that gave us an invalid orphan tx + Misbehaving(fromPeer, nDos); + setMisbehaving.insert(fromPeer); + LogPrint(BCLog::MEMPOOL, " invalid orphan tx %s\n", orphanHash.ToString()); + } + // Has inputs but not accepted to mempool + // Probably non-standard or insufficient fee + LogPrint(BCLog::MEMPOOL, " removed orphan tx %s\n", orphanHash.ToString()); + if (!orphanTx.HasWitness() && !stateDummy.CorruptionPossible()) { + // Do not use rejection cache for witness transactions or + // witness-stripped transactions, as they can have been malleated. + // See https://github.com/bitcoin/bitcoin/issues/8279 for details. + assert(recentRejects); + recentRejects->insert(orphanHash); + } + EraseOrphanTx(orphanHash); } + mempool.check(pcoinsTip.get()); } - - for (const uint256& hash : vEraseQueue) - EraseOrphanTx(hash); } else if (fMissingInputs) { From 6e051f3d323af1d209c02e7a4319834f1947ffa7 Mon Sep 17 00:00:00 2001 From: Pieter Wuille Date: Wed, 20 Mar 2019 15:09:12 -0700 Subject: [PATCH 2/3] [MOVEONLY] Move processing of orphan queue to ProcessOrphanTx --- src/net_processing.cpp | 111 ++++++++++++++++++++++------------------- 1 file changed, 59 insertions(+), 52 deletions(-) diff --git a/src/net_processing.cpp b/src/net_processing.cpp index d97ad4a3a..55ab826c7 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -1713,6 +1713,64 @@ bool static ProcessHeadersMessage(CNode *pfrom, CConnman *connman, const std::ve return true; } +void static ProcessOrphanTx(CConnman* connman, std::set& orphan_work_set, std::list& removed_txn) EXCLUSIVE_LOCKS_REQUIRED(cs_main, g_cs_orphans) +{ + AssertLockHeld(cs_main); + AssertLockHeld(g_cs_orphans); + std::set setMisbehaving; + while (!orphan_work_set.empty()) { + const uint256 orphanHash = *orphan_work_set.begin(); + orphan_work_set.erase(orphan_work_set.begin()); + + auto orphan_it = mapOrphanTransactions.find(orphanHash); + if (orphan_it == mapOrphanTransactions.end()) continue; + + const CTransactionRef porphanTx = orphan_it->second.tx; + const CTransaction& orphanTx = *porphanTx; + NodeId fromPeer = orphan_it->second.fromPeer; + bool fMissingInputs2 = false; + // Use a dummy CValidationState so someone can't setup nodes to counter-DoS based on orphan + // resolution (that is, feeding people an invalid transaction based on LegitTxX in order to get + // anyone relaying LegitTxX banned) + CValidationState stateDummy; + + if (setMisbehaving.count(fromPeer)) continue; + if (AcceptToMemoryPool(mempool, stateDummy, porphanTx, &fMissingInputs2, &removed_txn, false /* bypass_limits */, 0 /* nAbsurdFee */)) { + LogPrint(BCLog::MEMPOOL, " accepted orphan tx %s\n", orphanHash.ToString()); + RelayTransaction(orphanTx, connman); + for (unsigned int i = 0; i < orphanTx.vout.size(); i++) { + auto it_by_prev = mapOrphanTransactionsByPrev.find(COutPoint(orphanHash, i)); + if (it_by_prev != mapOrphanTransactionsByPrev.end()) { + for (const auto& elem : it_by_prev->second) { + orphan_work_set.insert(elem->first); + } + } + } + EraseOrphanTx(orphanHash); + } else if (!fMissingInputs2) { + int nDos = 0; + if (stateDummy.IsInvalid(nDos) && nDos > 0) { + // Punish peer that gave us an invalid orphan tx + Misbehaving(fromPeer, nDos); + setMisbehaving.insert(fromPeer); + LogPrint(BCLog::MEMPOOL, " invalid orphan tx %s\n", orphanHash.ToString()); + } + // Has inputs but not accepted to mempool + // Probably non-standard or insufficient fee + LogPrint(BCLog::MEMPOOL, " removed orphan tx %s\n", orphanHash.ToString()); + if (!orphanTx.HasWitness() && !stateDummy.CorruptionPossible()) { + // Do not use rejection cache for witness transactions or + // witness-stripped transactions, as they can have been malleated. + // See https://github.com/bitcoin/bitcoin/issues/8279 for details. + assert(recentRejects); + recentRejects->insert(orphanHash); + } + EraseOrphanTx(orphanHash); + } + mempool.check(pcoinsTip.get()); + } +} + bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStream& vRecv, int64_t nTimeReceived, const CChainParams& chainparams, CConnman* connman, const std::atomic& interruptMsgProc, bool enable_bip61) { LogPrint(BCLog::NET, "received: %s (%u bytes) peer=%d\n", SanitizeString(strCommand), vRecv.size(), pfrom->GetId()); @@ -2384,58 +2442,7 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr mempool.size(), mempool.DynamicMemoryUsage() / 1000); // Recursively process any orphan transactions that depended on this one - std::set setMisbehaving; - while (!orphan_work_set.empty()) { - const uint256 orphanHash = *orphan_work_set.begin(); - orphan_work_set.erase(orphan_work_set.begin()); - - auto orphan_it = mapOrphanTransactions.find(orphanHash); - if (orphan_it == mapOrphanTransactions.end()) continue; - - const CTransactionRef porphanTx = orphan_it->second.tx; - const CTransaction& orphanTx = *porphanTx; - NodeId fromPeer = orphan_it->second.fromPeer; - bool fMissingInputs2 = false; - // Use a dummy CValidationState so someone can't setup nodes to counter-DoS based on orphan - // resolution (that is, feeding people an invalid transaction based on LegitTxX in order to get - // anyone relaying LegitTxX banned) - CValidationState stateDummy; - - if (setMisbehaving.count(fromPeer)) continue; - if (AcceptToMemoryPool(mempool, stateDummy, porphanTx, &fMissingInputs2, &lRemovedTxn, false /* bypass_limits */, 0 /* nAbsurdFee */)) { - LogPrint(BCLog::MEMPOOL, " accepted orphan tx %s\n", orphanHash.ToString()); - RelayTransaction(orphanTx, connman); - for (unsigned int i = 0; i < orphanTx.vout.size(); i++) { - auto it_by_prev = mapOrphanTransactionsByPrev.find(COutPoint(orphanHash, i)); - if (it_by_prev != mapOrphanTransactionsByPrev.end()) { - for (const auto& elem : it_by_prev->second) { - orphan_work_set.insert(elem->first); - } - } - } - EraseOrphanTx(orphanHash); - } else if (!fMissingInputs2) { - int nDos = 0; - if (stateDummy.IsInvalid(nDos) && nDos > 0) { - // Punish peer that gave us an invalid orphan tx - Misbehaving(fromPeer, nDos); - setMisbehaving.insert(fromPeer); - LogPrint(BCLog::MEMPOOL, " invalid orphan tx %s\n", orphanHash.ToString()); - } - // Has inputs but not accepted to mempool - // Probably non-standard or insufficient fee - LogPrint(BCLog::MEMPOOL, " removed orphan tx %s\n", orphanHash.ToString()); - if (!orphanTx.HasWitness() && !stateDummy.CorruptionPossible()) { - // Do not use rejection cache for witness transactions or - // witness-stripped transactions, as they can have been malleated. - // See https://github.com/bitcoin/bitcoin/issues/8279 for details. - assert(recentRejects); - recentRejects->insert(orphanHash); - } - EraseOrphanTx(orphanHash); - } - mempool.check(pcoinsTip.get()); - } + ProcessOrphanTx(connman, orphan_work_set, lRemovedTxn); } else if (fMissingInputs) { From 866c8058a706931f025335b3e794ed2f4d287918 Mon Sep 17 00:00:00 2001 From: Pieter Wuille Date: Wed, 20 Mar 2019 15:26:21 -0700 Subject: [PATCH 3/3] Interrupt orphan processing after every transaction This makes orphan processing work like handling getdata messages: After every actual transaction validation attempt, interrupt processing to deal with messages arriving from other peers. --- src/net.h | 2 ++ src/net_processing.cpp | 21 ++++++++++++++++----- 2 files changed, 18 insertions(+), 5 deletions(-) diff --git a/src/net.h b/src/net.h index f4a90e01f..f1d09f593 100644 --- a/src/net.h +++ b/src/net.h @@ -739,6 +739,8 @@ public: CAmount lastSentFeeFilter{0}; int64_t nextSendTimeFeeFilter{0}; + std::set orphan_work_set; + CNode(NodeId id, ServiceFlags nLocalServicesIn, int nMyStartingHeightIn, SOCKET hSocketIn, const CAddress &addrIn, uint64_t nKeyedNetGroupIn, uint64_t nLocalHostNonceIn, const CAddress &addrBindIn, const std::string &addrNameIn = "", bool fInboundIn = false); ~CNode(); CNode(const CNode&) = delete; diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 55ab826c7..0654a96e2 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -1718,7 +1718,8 @@ void static ProcessOrphanTx(CConnman* connman, std::set& orphan_work_se AssertLockHeld(cs_main); AssertLockHeld(g_cs_orphans); std::set setMisbehaving; - while (!orphan_work_set.empty()) { + bool done = false; + while (!done && !orphan_work_set.empty()) { const uint256 orphanHash = *orphan_work_set.begin(); orphan_work_set.erase(orphan_work_set.begin()); @@ -1747,6 +1748,7 @@ void static ProcessOrphanTx(CConnman* connman, std::set& orphan_work_se } } EraseOrphanTx(orphanHash); + done = true; } else if (!fMissingInputs2) { int nDos = 0; if (stateDummy.IsInvalid(nDos) && nDos > 0) { @@ -1766,6 +1768,7 @@ void static ProcessOrphanTx(CConnman* connman, std::set& orphan_work_se recentRejects->insert(orphanHash); } EraseOrphanTx(orphanHash); + done = true; } mempool.check(pcoinsTip.get()); } @@ -2400,8 +2403,6 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr return true; } - std::set orphan_work_set; - CTransactionRef ptx; vRecv >> ptx; const CTransaction& tx = *ptx; @@ -2429,7 +2430,7 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr auto it_by_prev = mapOrphanTransactionsByPrev.find(COutPoint(inv.hash, i)); if (it_by_prev != mapOrphanTransactionsByPrev.end()) { for (const auto& elem : it_by_prev->second) { - orphan_work_set.insert(elem->first); + pfrom->orphan_work_set.insert(elem->first); } } } @@ -2442,7 +2443,7 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr mempool.size(), mempool.DynamicMemoryUsage() / 1000); // Recursively process any orphan transactions that depended on this one - ProcessOrphanTx(connman, orphan_work_set, lRemovedTxn); + ProcessOrphanTx(connman, pfrom->orphan_work_set, lRemovedTxn); } else if (fMissingInputs) { @@ -3174,11 +3175,21 @@ bool PeerLogicValidation::ProcessMessages(CNode* pfrom, std::atomic& inter if (!pfrom->vRecvGetData.empty()) ProcessGetData(pfrom, chainparams, connman, interruptMsgProc); + if (!pfrom->orphan_work_set.empty()) { + std::list removed_txn; + LOCK2(cs_main, g_cs_orphans); + ProcessOrphanTx(connman, pfrom->orphan_work_set, removed_txn); + for (const CTransactionRef& removedTx : removed_txn) { + AddToCompactExtraTransactions(removedTx); + } + } + if (pfrom->fDisconnect) return false; // this maintains the order of responses if (!pfrom->vRecvGetData.empty()) return true; + if (!pfrom->orphan_work_set.empty()) return true; // Don't bother if send buffer is too full to respond anyway if (pfrom->fPauseSend)