Merge pull request #3514
f59d8f0
Per-peer block download tracking and stalled download detection. (Pieter Wuille)
This commit is contained in:
commit
b76733d8e8
4 changed files with 131 additions and 29 deletions
149
src/main.cpp
149
src/main.cpp
|
@ -111,6 +111,16 @@ uint32_t nBlockSequenceId = 1;
|
|||
// Sources of received blocks, to be able to send them reject messages or ban
|
||||
// them, if processing happens afterwards. Protected by cs_main.
|
||||
map<uint256, NodeId> mapBlockSource;
|
||||
|
||||
// Blocks that are in flight, and that are in the queue to be downloaded.
|
||||
// Protected by cs_main.
|
||||
struct QueuedBlock {
|
||||
uint256 hash;
|
||||
int64_t nTime; // Time of "getdata" request in microseconds.
|
||||
int nQueuedBefore; // Number of blocks in flight at the time of request.
|
||||
};
|
||||
map<uint256, pair<NodeId, list<QueuedBlock>::iterator> > mapBlocksInFlight;
|
||||
map<uint256, pair<NodeId, list<uint256>::iterator> > mapBlocksToDownload;
|
||||
}
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -194,10 +204,20 @@ struct CNodeState {
|
|||
std::string name;
|
||||
// List of asynchronously-determined block rejections to notify this peer about.
|
||||
std::vector<CBlockReject> rejects;
|
||||
list<QueuedBlock> vBlocksInFlight;
|
||||
int nBlocksInFlight;
|
||||
list<uint256> vBlocksToDownload;
|
||||
int nBlocksToDownload;
|
||||
int64_t nLastBlockReceive;
|
||||
int64_t nLastBlockProcess;
|
||||
|
||||
CNodeState() {
|
||||
nMisbehavior = 0;
|
||||
fShouldBan = false;
|
||||
nBlocksToDownload = 0;
|
||||
nBlocksInFlight = 0;
|
||||
nLastBlockReceive = 0;
|
||||
nLastBlockProcess = 0;
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -226,8 +246,71 @@ void InitializeNode(NodeId nodeid, const CNode *pnode) {
|
|||
|
||||
void FinalizeNode(NodeId nodeid) {
|
||||
LOCK(cs_main);
|
||||
CNodeState *state = State(nodeid);
|
||||
|
||||
BOOST_FOREACH(const QueuedBlock& entry, state->vBlocksInFlight)
|
||||
mapBlocksInFlight.erase(entry.hash);
|
||||
BOOST_FOREACH(const uint256& hash, state->vBlocksToDownload)
|
||||
mapBlocksToDownload.erase(hash);
|
||||
|
||||
mapNodeState.erase(nodeid);
|
||||
}
|
||||
|
||||
// Requires cs_main.
|
||||
void MarkBlockAsReceived(const uint256 &hash, NodeId nodeFrom = -1) {
|
||||
map<uint256, pair<NodeId, list<uint256>::iterator> >::iterator itToDownload = mapBlocksToDownload.find(hash);
|
||||
if (itToDownload != mapBlocksToDownload.end()) {
|
||||
CNodeState *state = State(itToDownload->second.first);
|
||||
state->vBlocksToDownload.erase(itToDownload->second.second);
|
||||
state->nBlocksToDownload--;
|
||||
mapBlocksToDownload.erase(itToDownload);
|
||||
}
|
||||
|
||||
map<uint256, pair<NodeId, list<QueuedBlock>::iterator> >::iterator itInFlight = mapBlocksInFlight.find(hash);
|
||||
if (itInFlight != mapBlocksInFlight.end()) {
|
||||
CNodeState *state = State(itInFlight->second.first);
|
||||
state->vBlocksInFlight.erase(itInFlight->second.second);
|
||||
state->nBlocksInFlight--;
|
||||
if (itInFlight->second.first == nodeFrom)
|
||||
state->nLastBlockReceive = GetTimeMicros();
|
||||
mapBlocksInFlight.erase(itInFlight);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// Requires cs_main.
|
||||
bool AddBlockToQueue(NodeId nodeid, const uint256 &hash) {
|
||||
if (mapBlocksToDownload.count(hash) || mapBlocksInFlight.count(hash))
|
||||
return false;
|
||||
|
||||
CNodeState *state = State(nodeid);
|
||||
if (state == NULL)
|
||||
return false;
|
||||
|
||||
list<uint256>::iterator it = state->vBlocksToDownload.insert(state->vBlocksToDownload.end(), hash);
|
||||
state->nBlocksToDownload++;
|
||||
if (state->nBlocksToDownload > 5000)
|
||||
Misbehaving(nodeid, 10);
|
||||
mapBlocksToDownload[hash] = std::make_pair(nodeid, it);
|
||||
return true;
|
||||
}
|
||||
|
||||
// Requires cs_main.
|
||||
void MarkBlockAsInFlight(NodeId nodeid, const uint256 &hash) {
|
||||
CNodeState *state = State(nodeid);
|
||||
assert(state != NULL);
|
||||
|
||||
// Make sure it's not listed somewhere already.
|
||||
MarkBlockAsReceived(hash);
|
||||
|
||||
QueuedBlock newentry = {hash, GetTimeMicros(), state->nBlocksInFlight};
|
||||
if (state->nBlocksInFlight == 0)
|
||||
state->nLastBlockReceive = newentry.nTime; // Reset when a first request is sent.
|
||||
list<QueuedBlock>::iterator it = state->vBlocksInFlight.insert(state->vBlocksInFlight.end(), newentry);
|
||||
state->nBlocksInFlight++;
|
||||
mapBlocksInFlight[hash] = std::make_pair(nodeid, it);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
bool GetNodeStateStats(NodeId nodeid, CNodeStateStats &stats) {
|
||||
|
@ -1310,6 +1393,7 @@ void CheckForkWarningConditionsOnNewFork(CBlockIndex* pindexNewForkTip)
|
|||
CheckForkWarningConditions();
|
||||
}
|
||||
|
||||
// Requires cs_main.
|
||||
void Misbehaving(NodeId pnode, int howmuch)
|
||||
{
|
||||
if (howmuch == 0)
|
||||
|
@ -2049,7 +2133,6 @@ bool AddToBlockIndex(CBlock& block, CValidationState& state, const CDiskBlockPos
|
|||
pindexNew->nSequenceId = nBlockSequenceId++;
|
||||
}
|
||||
assert(pindexNew);
|
||||
mapAlreadyAskedFor.erase(CInv(MSG_BLOCK, hash));
|
||||
map<uint256, CBlockIndex*>::iterator mi = mapBlockIndex.insert(make_pair(hash, pindexNew)).first;
|
||||
pindexNew->phashBlock = &((*mi).first);
|
||||
map<uint256, CBlockIndex*>::iterator miPrev = mapBlockIndex.find(block.hashPrevBlock);
|
||||
|
@ -2400,11 +2483,8 @@ bool ProcessBlock(CValidationState &state, CNode* pfrom, CBlock* pblock, CDiskBl
|
|||
return state.Invalid(error("ProcessBlock() : already have block (orphan) %s", hash.ToString()), 0, "duplicate");
|
||||
|
||||
// Preliminary checks
|
||||
if (!CheckBlock(*pblock, state)) {
|
||||
if (state.CorruptionPossible())
|
||||
mapAlreadyAskedFor.erase(CInv(MSG_BLOCK, hash));
|
||||
if (!CheckBlock(*pblock, state))
|
||||
return error("ProcessBlock() : CheckBlock FAILED");
|
||||
}
|
||||
|
||||
CBlockIndex* pcheckpoint = Checkpoints::GetLastCheckpoint(mapBlockIndex);
|
||||
if (pcheckpoint && pblock->hashPrevBlock != (chainActive.Tip() ? chainActive.Tip()->GetBlockHash() : uint256(0)))
|
||||
|
@ -3274,7 +3354,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv)
|
|||
return true;
|
||||
}
|
||||
|
||||
|
||||
State(pfrom->GetId())->nLastBlockProcess = GetTimeMicros();
|
||||
|
||||
|
||||
|
||||
|
@ -3477,15 +3557,6 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv)
|
|||
return error("message inv size() = %"PRIszu"", vInv.size());
|
||||
}
|
||||
|
||||
// find last block in inv vector
|
||||
unsigned int nLastBlock = (unsigned int)(-1);
|
||||
for (unsigned int nInv = 0; nInv < vInv.size(); nInv++) {
|
||||
if (vInv[vInv.size() - 1 - nInv].type == MSG_BLOCK) {
|
||||
nLastBlock = vInv.size() - 1 - nInv;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
LOCK(cs_main);
|
||||
|
||||
for (unsigned int nInv = 0; nInv < vInv.size(); nInv++)
|
||||
|
@ -3499,17 +3570,14 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv)
|
|||
LogPrint("net", " got inventory: %s %s\n", inv.ToString(), fAlreadyHave ? "have" : "new");
|
||||
|
||||
if (!fAlreadyHave) {
|
||||
if (!fImporting && !fReindex)
|
||||
pfrom->AskFor(inv);
|
||||
if (!fImporting && !fReindex) {
|
||||
if (inv.type == MSG_BLOCK)
|
||||
AddBlockToQueue(pfrom->GetId(), inv.hash);
|
||||
else
|
||||
pfrom->AskFor(inv);
|
||||
}
|
||||
} else if (inv.type == MSG_BLOCK && mapOrphanBlocks.count(inv.hash)) {
|
||||
PushGetBlocks(pfrom, chainActive.Tip(), GetOrphanRoot(inv.hash));
|
||||
} else if (nInv == nLastBlock) {
|
||||
// In case we are on a very long side-chain, it is possible that we already have
|
||||
// the last block in an inv bundle sent in response to getblocks. Try to detect
|
||||
// this situation and push another getblocks to continue.
|
||||
PushGetBlocks(pfrom, mapBlockIndex[inv.hash], uint256(0));
|
||||
if (fDebug)
|
||||
LogPrintf("force request: %s\n", inv.ToString());
|
||||
}
|
||||
|
||||
// Track requests for our stuff
|
||||
|
@ -3716,6 +3784,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv)
|
|||
LOCK(cs_main);
|
||||
// Remember who we got this block from.
|
||||
mapBlockSource[inv.hash] = pfrom->GetId();
|
||||
MarkBlockAsReceived(inv.hash, pfrom->GetId());
|
||||
|
||||
CValidationState state;
|
||||
ProcessBlock(state, pfrom, &block);
|
||||
|
@ -4243,12 +4312,38 @@ bool SendMessages(CNode* pto, bool fSendTrickle)
|
|||
pto->PushMessage("inv", vInv);
|
||||
|
||||
|
||||
// Detect stalled peers. Require that blocks are in flight, we haven't
|
||||
// received a (requested) block in one minute, and that all blocks are
|
||||
// in flight for over two minutes, since we first had a chance to
|
||||
// process an incoming block.
|
||||
int64_t nNow = GetTimeMicros();
|
||||
if (!pto->fDisconnect && state.nBlocksInFlight &&
|
||||
state.nLastBlockReceive < state.nLastBlockProcess - BLOCK_DOWNLOAD_TIMEOUT*1000000 &&
|
||||
state.vBlocksInFlight.front().nTime < state.nLastBlockProcess - 2*BLOCK_DOWNLOAD_TIMEOUT*1000000) {
|
||||
LogPrintf("Peer %s is stalling block download, disconnecting\n", state.name.c_str());
|
||||
pto->fDisconnect = true;
|
||||
}
|
||||
|
||||
//
|
||||
// Message: getdata
|
||||
// Message: getdata (blocks)
|
||||
//
|
||||
vector<CInv> vGetData;
|
||||
int64_t nNow = GetTime() * 1000000;
|
||||
while (!pto->mapAskFor.empty() && (*pto->mapAskFor.begin()).first <= nNow)
|
||||
while (!pto->fDisconnect && state.nBlocksToDownload && state.nBlocksInFlight < MAX_BLOCKS_IN_TRANSIT_PER_PEER) {
|
||||
uint256 hash = state.vBlocksToDownload.front();
|
||||
vGetData.push_back(CInv(MSG_BLOCK, hash));
|
||||
MarkBlockAsInFlight(pto->GetId(), hash);
|
||||
LogPrint("net", "Requesting block %s from %s\n", hash.ToString().c_str(), state.name.c_str());
|
||||
if (vGetData.size() >= 1000)
|
||||
{
|
||||
pto->PushMessage("getdata", vGetData);
|
||||
vGetData.clear();
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
// Message: getdata (non-blocks)
|
||||
//
|
||||
while (!pto->fDisconnect && !pto->mapAskFor.empty() && (*pto->mapAskFor.begin()).first <= nNow)
|
||||
{
|
||||
const CInv& inv = (*pto->mapAskFor.begin()).second;
|
||||
if (!AlreadyHave(inv))
|
||||
|
|
|
@ -59,6 +59,11 @@ static const int COINBASE_MATURITY = 100;
|
|||
static const unsigned int LOCKTIME_THRESHOLD = 500000000; // Tue Nov 5 00:53:20 1985 UTC
|
||||
/** Maximum number of script-checking threads allowed */
|
||||
static const int MAX_SCRIPTCHECK_THREADS = 16;
|
||||
/** Number of blocks that can be requested at any given time from a single peer. */
|
||||
static const int MAX_BLOCKS_IN_TRANSIT_PER_PEER = 128;
|
||||
/** Timeout in seconds before considering a block download peer unresponsive. */
|
||||
static const unsigned int BLOCK_DOWNLOAD_TIMEOUT = 60;
|
||||
|
||||
#ifdef USE_UPNP
|
||||
static const int fHaveUPnP = true;
|
||||
#else
|
||||
|
@ -175,6 +180,9 @@ bool VerifySignature(const CCoins& txFrom, const CTransaction& txTo, unsigned in
|
|||
bool AbortNode(const std::string &msg);
|
||||
/** Get statistics from node state */
|
||||
bool GetNodeStateStats(NodeId nodeid, CNodeStateStats &stats);
|
||||
/** Increase a node's misbehavior score. */
|
||||
void Misbehaving(NodeId nodeid, int howmuch);
|
||||
|
||||
|
||||
/** (try to) add transaction to memory pool **/
|
||||
bool AcceptToMemoryPool(CTxMemPool& pool, CValidationState &state, const CTransaction &tx, bool fLimitFree,
|
||||
|
|
|
@ -426,7 +426,7 @@ public:
|
|||
LogPrint("net", "askfor %s %d (%s)\n", inv.ToString().c_str(), nRequestTime, DateTimeStrFormat("%H:%M:%S", nRequestTime/1000000).c_str());
|
||||
|
||||
// Make sure not to reuse time indexes to keep things in the same order
|
||||
int64_t nNow = (GetTime() - 1) * 1000000;
|
||||
int64_t nNow = GetTimeMicros() - 1000000;
|
||||
static int64_t nLastTime;
|
||||
++nLastTime;
|
||||
nNow = std::max(nNow, nLastTime);
|
||||
|
|
|
@ -21,7 +21,6 @@
|
|||
// Tests this internal-to-main.cpp method:
|
||||
extern bool AddOrphanTx(const CTransaction& tx);
|
||||
extern unsigned int LimitOrphanTxSize(unsigned int nMaxOrphans);
|
||||
extern void Misbehaving(NodeId nodeid, int howmuch);
|
||||
extern std::map<uint256, CTransaction> mapOrphanTransactions;
|
||||
extern std::map<uint256, std::set<uint256> > mapOrphanTransactionsByPrev;
|
||||
|
||||
|
|
Loading…
Reference in a new issue