Merge #15644: Make orphan processing interruptible

866c8058a7 Interrupt orphan processing after every transaction (Pieter Wuille)
6e051f3d32 [MOVEONLY] Move processing of orphan queue to ProcessOrphanTx (Pieter Wuille)
9453018fdc Simplify orphan processing in preparation for interruptibility (Pieter Wuille)

Pull request description:

  As individual orphan transactions can be relatively expensive to handle, it's undesirable to process all of them (max 100) as soon as the parent becomes available, as it pegs the net processing the whole time.

  Change this by interrupting orphan handling after every transactions, and continue in the next processing slot of the peer that gave us the parent - similar to how getdata processing works now. Messages from other peers arriving in the mean time are processed normally, but other messages from the peer that gave us the parent have to wait until all orphan processing is done.

ACKs for commit 866c80:
  sdaftuar:
    ACK 866c8058a7
  MarcoFalke:
    utACK 866c8058a7
  promag:
    utACK 866c805. Verified refactor in 9453018fdc and moved code in 6e051f3d32. Not so sure about change in 866c8058a7 just because I'm not familiar with net processing.

Tree-SHA512: d8e8a1ee5f2999446cdeb8fc9756ed9c24f3d5cd769a7774ec4c317fc8d463fdfceec88de97266f389b715a5dfcc2b0a3abaa573955ea451786cc43b870e8cde
This commit is contained in:
MarcoFalke 2019-04-01 11:09:32 -04:00
commit 35477e9e4e
No known key found for this signature in database
GPG key ID: D2EA4850E7528B25
2 changed files with 80 additions and 62 deletions

View file

@ -739,6 +739,8 @@ public:
CAmount lastSentFeeFilter{0}; CAmount lastSentFeeFilter{0};
int64_t nextSendTimeFeeFilter{0}; int64_t nextSendTimeFeeFilter{0};
std::set<uint256> orphan_work_set;
CNode(NodeId id, ServiceFlags nLocalServicesIn, int nMyStartingHeightIn, SOCKET hSocketIn, const CAddress &addrIn, uint64_t nKeyedNetGroupIn, uint64_t nLocalHostNonceIn, const CAddress &addrBindIn, const std::string &addrNameIn = "", bool fInboundIn = false); CNode(NodeId id, ServiceFlags nLocalServicesIn, int nMyStartingHeightIn, SOCKET hSocketIn, const CAddress &addrIn, uint64_t nKeyedNetGroupIn, uint64_t nLocalHostNonceIn, const CAddress &addrBindIn, const std::string &addrNameIn = "", bool fInboundIn = false);
~CNode(); ~CNode();
CNode(const CNode&) = delete; CNode(const CNode&) = delete;

View file

@ -1713,6 +1713,67 @@ bool static ProcessHeadersMessage(CNode *pfrom, CConnman *connman, const std::ve
return true; return true;
} }
void static ProcessOrphanTx(CConnman* connman, std::set<uint256>& orphan_work_set, std::list<CTransactionRef>& removed_txn) EXCLUSIVE_LOCKS_REQUIRED(cs_main, g_cs_orphans)
{
AssertLockHeld(cs_main);
AssertLockHeld(g_cs_orphans);
std::set<NodeId> setMisbehaving;
bool done = false;
while (!done && !orphan_work_set.empty()) {
const uint256 orphanHash = *orphan_work_set.begin();
orphan_work_set.erase(orphan_work_set.begin());
auto orphan_it = mapOrphanTransactions.find(orphanHash);
if (orphan_it == mapOrphanTransactions.end()) continue;
const CTransactionRef porphanTx = orphan_it->second.tx;
const CTransaction& orphanTx = *porphanTx;
NodeId fromPeer = orphan_it->second.fromPeer;
bool fMissingInputs2 = false;
// Use a dummy CValidationState so someone can't setup nodes to counter-DoS based on orphan
// resolution (that is, feeding people an invalid transaction based on LegitTxX in order to get
// anyone relaying LegitTxX banned)
CValidationState stateDummy;
if (setMisbehaving.count(fromPeer)) continue;
if (AcceptToMemoryPool(mempool, stateDummy, porphanTx, &fMissingInputs2, &removed_txn, false /* bypass_limits */, 0 /* nAbsurdFee */)) {
LogPrint(BCLog::MEMPOOL, " accepted orphan tx %s\n", orphanHash.ToString());
RelayTransaction(orphanTx, connman);
for (unsigned int i = 0; i < orphanTx.vout.size(); i++) {
auto it_by_prev = mapOrphanTransactionsByPrev.find(COutPoint(orphanHash, i));
if (it_by_prev != mapOrphanTransactionsByPrev.end()) {
for (const auto& elem : it_by_prev->second) {
orphan_work_set.insert(elem->first);
}
}
}
EraseOrphanTx(orphanHash);
done = true;
} else if (!fMissingInputs2) {
int nDos = 0;
if (stateDummy.IsInvalid(nDos) && nDos > 0) {
// Punish peer that gave us an invalid orphan tx
Misbehaving(fromPeer, nDos);
setMisbehaving.insert(fromPeer);
LogPrint(BCLog::MEMPOOL, " invalid orphan tx %s\n", orphanHash.ToString());
}
// Has inputs but not accepted to mempool
// Probably non-standard or insufficient fee
LogPrint(BCLog::MEMPOOL, " removed orphan tx %s\n", orphanHash.ToString());
if (!orphanTx.HasWitness() && !stateDummy.CorruptionPossible()) {
// Do not use rejection cache for witness transactions or
// witness-stripped transactions, as they can have been malleated.
// See https://github.com/bitcoin/bitcoin/issues/8279 for details.
assert(recentRejects);
recentRejects->insert(orphanHash);
}
EraseOrphanTx(orphanHash);
done = true;
}
mempool.check(pcoinsTip.get());
}
}
bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStream& vRecv, int64_t nTimeReceived, const CChainParams& chainparams, CConnman* connman, const std::atomic<bool>& interruptMsgProc, bool enable_bip61) bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStream& vRecv, int64_t nTimeReceived, const CChainParams& chainparams, CConnman* connman, const std::atomic<bool>& interruptMsgProc, bool enable_bip61)
{ {
LogPrint(BCLog::NET, "received: %s (%u bytes) peer=%d\n", SanitizeString(strCommand), vRecv.size(), pfrom->GetId()); LogPrint(BCLog::NET, "received: %s (%u bytes) peer=%d\n", SanitizeString(strCommand), vRecv.size(), pfrom->GetId());
@ -2342,8 +2403,6 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr
return true; return true;
} }
std::deque<COutPoint> vWorkQueue;
std::vector<uint256> vEraseQueue;
CTransactionRef ptx; CTransactionRef ptx;
vRecv >> ptx; vRecv >> ptx;
const CTransaction& tx = *ptx; const CTransaction& tx = *ptx;
@ -2368,7 +2427,12 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr
mempool.check(pcoinsTip.get()); mempool.check(pcoinsTip.get());
RelayTransaction(tx, connman); RelayTransaction(tx, connman);
for (unsigned int i = 0; i < tx.vout.size(); i++) { for (unsigned int i = 0; i < tx.vout.size(); i++) {
vWorkQueue.emplace_back(inv.hash, i); auto it_by_prev = mapOrphanTransactionsByPrev.find(COutPoint(inv.hash, i));
if (it_by_prev != mapOrphanTransactionsByPrev.end()) {
for (const auto& elem : it_by_prev->second) {
pfrom->orphan_work_set.insert(elem->first);
}
}
} }
pfrom->nLastTXTime = GetTime(); pfrom->nLastTXTime = GetTime();
@ -2379,65 +2443,7 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr
mempool.size(), mempool.DynamicMemoryUsage() / 1000); mempool.size(), mempool.DynamicMemoryUsage() / 1000);
// Recursively process any orphan transactions that depended on this one // Recursively process any orphan transactions that depended on this one
std::set<NodeId> setMisbehaving; ProcessOrphanTx(connman, pfrom->orphan_work_set, lRemovedTxn);
while (!vWorkQueue.empty()) {
auto itByPrev = mapOrphanTransactionsByPrev.find(vWorkQueue.front());
vWorkQueue.pop_front();
if (itByPrev == mapOrphanTransactionsByPrev.end())
continue;
for (auto mi = itByPrev->second.begin();
mi != itByPrev->second.end();
++mi)
{
const CTransactionRef& porphanTx = (*mi)->second.tx;
const CTransaction& orphanTx = *porphanTx;
const uint256& orphanHash = orphanTx.GetHash();
NodeId fromPeer = (*mi)->second.fromPeer;
bool fMissingInputs2 = false;
// Use a dummy CValidationState so someone can't setup nodes to counter-DoS based on orphan
// resolution (that is, feeding people an invalid transaction based on LegitTxX in order to get
// anyone relaying LegitTxX banned)
CValidationState stateDummy;
if (setMisbehaving.count(fromPeer))
continue;
if (AcceptToMemoryPool(mempool, stateDummy, porphanTx, &fMissingInputs2, &lRemovedTxn, false /* bypass_limits */, 0 /* nAbsurdFee */)) {
LogPrint(BCLog::MEMPOOL, " accepted orphan tx %s\n", orphanHash.ToString());
RelayTransaction(orphanTx, connman);
for (unsigned int i = 0; i < orphanTx.vout.size(); i++) {
vWorkQueue.emplace_back(orphanHash, i);
}
vEraseQueue.push_back(orphanHash);
}
else if (!fMissingInputs2)
{
int nDos = 0;
if (stateDummy.IsInvalid(nDos) && nDos > 0)
{
// Punish peer that gave us an invalid orphan tx
Misbehaving(fromPeer, nDos);
setMisbehaving.insert(fromPeer);
LogPrint(BCLog::MEMPOOL, " invalid orphan tx %s\n", orphanHash.ToString());
}
// Has inputs but not accepted to mempool
// Probably non-standard or insufficient fee
LogPrint(BCLog::MEMPOOL, " removed orphan tx %s\n", orphanHash.ToString());
vEraseQueue.push_back(orphanHash);
if (!orphanTx.HasWitness() && !stateDummy.CorruptionPossible()) {
// Do not use rejection cache for witness transactions or
// witness-stripped transactions, as they can have been malleated.
// See https://github.com/bitcoin/bitcoin/issues/8279 for details.
assert(recentRejects);
recentRejects->insert(orphanHash);
}
}
mempool.check(pcoinsTip.get());
}
}
for (const uint256& hash : vEraseQueue)
EraseOrphanTx(hash);
} }
else if (fMissingInputs) else if (fMissingInputs)
{ {
@ -3169,11 +3175,21 @@ bool PeerLogicValidation::ProcessMessages(CNode* pfrom, std::atomic<bool>& inter
if (!pfrom->vRecvGetData.empty()) if (!pfrom->vRecvGetData.empty())
ProcessGetData(pfrom, chainparams, connman, interruptMsgProc); ProcessGetData(pfrom, chainparams, connman, interruptMsgProc);
if (!pfrom->orphan_work_set.empty()) {
std::list<CTransactionRef> removed_txn;
LOCK2(cs_main, g_cs_orphans);
ProcessOrphanTx(connman, pfrom->orphan_work_set, removed_txn);
for (const CTransactionRef& removedTx : removed_txn) {
AddToCompactExtraTransactions(removedTx);
}
}
if (pfrom->fDisconnect) if (pfrom->fDisconnect)
return false; return false;
// this maintains the order of responses // this maintains the order of responses
if (!pfrom->vRecvGetData.empty()) return true; if (!pfrom->vRecvGetData.empty()) return true;
if (!pfrom->orphan_work_set.empty()) return true;
// Don't bother if send buffer is too full to respond anyway // Don't bother if send buffer is too full to respond anyway
if (pfrom->fPauseSend) if (pfrom->fPauseSend)