Per-peer block download tracking and stalled download detection.
Keep track of which block is being requested (and to be requested) from each peer, and limit the number of blocks in-flight per peer. In addition, detect stalled downloads, and disconnect if they persist for too long. This means blocks are never requested twice, and should eliminate duplicate downloads during synchronization.
This commit is contained in:
parent
95e66247eb
commit
f59d8f0b64
4 changed files with 131 additions and 29 deletions
149
src/main.cpp
149
src/main.cpp
|
@ -112,6 +112,16 @@ uint32_t nBlockSequenceId = 1;
|
||||||
// Sources of received blocks, to be able to send them reject messages or ban
|
// Sources of received blocks, to be able to send them reject messages or ban
|
||||||
// them, if processing happens afterwards. Protected by cs_main.
|
// them, if processing happens afterwards. Protected by cs_main.
|
||||||
map<uint256, NodeId> mapBlockSource;
|
map<uint256, NodeId> mapBlockSource;
|
||||||
|
|
||||||
|
// Blocks that are in flight, and that are in the queue to be downloaded.
|
||||||
|
// Protected by cs_main.
|
||||||
|
struct QueuedBlock {
|
||||||
|
uint256 hash;
|
||||||
|
int64_t nTime; // Time of "getdata" request in microseconds.
|
||||||
|
int nQueuedBefore; // Number of blocks in flight at the time of request.
|
||||||
|
};
|
||||||
|
map<uint256, pair<NodeId, list<QueuedBlock>::iterator> > mapBlocksInFlight;
|
||||||
|
map<uint256, pair<NodeId, list<uint256>::iterator> > mapBlocksToDownload;
|
||||||
}
|
}
|
||||||
|
|
||||||
//////////////////////////////////////////////////////////////////////////////
|
//////////////////////////////////////////////////////////////////////////////
|
||||||
|
@ -195,10 +205,20 @@ struct CNodeState {
|
||||||
std::string name;
|
std::string name;
|
||||||
// List of asynchronously-determined block rejections to notify this peer about.
|
// List of asynchronously-determined block rejections to notify this peer about.
|
||||||
std::vector<CBlockReject> rejects;
|
std::vector<CBlockReject> rejects;
|
||||||
|
list<QueuedBlock> vBlocksInFlight;
|
||||||
|
int nBlocksInFlight;
|
||||||
|
list<uint256> vBlocksToDownload;
|
||||||
|
int nBlocksToDownload;
|
||||||
|
int64_t nLastBlockReceive;
|
||||||
|
int64_t nLastBlockProcess;
|
||||||
|
|
||||||
CNodeState() {
|
CNodeState() {
|
||||||
nMisbehavior = 0;
|
nMisbehavior = 0;
|
||||||
fShouldBan = false;
|
fShouldBan = false;
|
||||||
|
nBlocksToDownload = 0;
|
||||||
|
nBlocksInFlight = 0;
|
||||||
|
nLastBlockReceive = 0;
|
||||||
|
nLastBlockProcess = 0;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -227,8 +247,71 @@ void InitializeNode(NodeId nodeid, const CNode *pnode) {
|
||||||
|
|
||||||
void FinalizeNode(NodeId nodeid) {
|
void FinalizeNode(NodeId nodeid) {
|
||||||
LOCK(cs_main);
|
LOCK(cs_main);
|
||||||
|
CNodeState *state = State(nodeid);
|
||||||
|
|
||||||
|
BOOST_FOREACH(const QueuedBlock& entry, state->vBlocksInFlight)
|
||||||
|
mapBlocksInFlight.erase(entry.hash);
|
||||||
|
BOOST_FOREACH(const uint256& hash, state->vBlocksToDownload)
|
||||||
|
mapBlocksToDownload.erase(hash);
|
||||||
|
|
||||||
mapNodeState.erase(nodeid);
|
mapNodeState.erase(nodeid);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Requires cs_main.
|
||||||
|
void MarkBlockAsReceived(const uint256 &hash, NodeId nodeFrom = -1) {
|
||||||
|
map<uint256, pair<NodeId, list<uint256>::iterator> >::iterator itToDownload = mapBlocksToDownload.find(hash);
|
||||||
|
if (itToDownload != mapBlocksToDownload.end()) {
|
||||||
|
CNodeState *state = State(itToDownload->second.first);
|
||||||
|
state->vBlocksToDownload.erase(itToDownload->second.second);
|
||||||
|
state->nBlocksToDownload--;
|
||||||
|
mapBlocksToDownload.erase(itToDownload);
|
||||||
|
}
|
||||||
|
|
||||||
|
map<uint256, pair<NodeId, list<QueuedBlock>::iterator> >::iterator itInFlight = mapBlocksInFlight.find(hash);
|
||||||
|
if (itInFlight != mapBlocksInFlight.end()) {
|
||||||
|
CNodeState *state = State(itInFlight->second.first);
|
||||||
|
state->vBlocksInFlight.erase(itInFlight->second.second);
|
||||||
|
state->nBlocksInFlight--;
|
||||||
|
if (itInFlight->second.first == nodeFrom)
|
||||||
|
state->nLastBlockReceive = GetTimeMicros();
|
||||||
|
mapBlocksInFlight.erase(itInFlight);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
// Requires cs_main.
|
||||||
|
bool AddBlockToQueue(NodeId nodeid, const uint256 &hash) {
|
||||||
|
if (mapBlocksToDownload.count(hash) || mapBlocksInFlight.count(hash))
|
||||||
|
return false;
|
||||||
|
|
||||||
|
CNodeState *state = State(nodeid);
|
||||||
|
if (state == NULL)
|
||||||
|
return false;
|
||||||
|
|
||||||
|
list<uint256>::iterator it = state->vBlocksToDownload.insert(state->vBlocksToDownload.end(), hash);
|
||||||
|
state->nBlocksToDownload++;
|
||||||
|
if (state->nBlocksToDownload > 5000)
|
||||||
|
Misbehaving(nodeid, 10);
|
||||||
|
mapBlocksToDownload[hash] = std::make_pair(nodeid, it);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Requires cs_main.
|
||||||
|
void MarkBlockAsInFlight(NodeId nodeid, const uint256 &hash) {
|
||||||
|
CNodeState *state = State(nodeid);
|
||||||
|
assert(state != NULL);
|
||||||
|
|
||||||
|
// Make sure it's not listed somewhere already.
|
||||||
|
MarkBlockAsReceived(hash);
|
||||||
|
|
||||||
|
QueuedBlock newentry = {hash, GetTimeMicros(), state->nBlocksInFlight};
|
||||||
|
if (state->nBlocksInFlight == 0)
|
||||||
|
state->nLastBlockReceive = newentry.nTime; // Reset when a first request is sent.
|
||||||
|
list<QueuedBlock>::iterator it = state->vBlocksInFlight.insert(state->vBlocksInFlight.end(), newentry);
|
||||||
|
state->nBlocksInFlight++;
|
||||||
|
mapBlocksInFlight[hash] = std::make_pair(nodeid, it);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
bool GetNodeStateStats(NodeId nodeid, CNodeStateStats &stats) {
|
bool GetNodeStateStats(NodeId nodeid, CNodeStateStats &stats) {
|
||||||
|
@ -1299,6 +1382,7 @@ void CheckForkWarningConditionsOnNewFork(CBlockIndex* pindexNewForkTip)
|
||||||
CheckForkWarningConditions();
|
CheckForkWarningConditions();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Requires cs_main.
|
||||||
void Misbehaving(NodeId pnode, int howmuch)
|
void Misbehaving(NodeId pnode, int howmuch)
|
||||||
{
|
{
|
||||||
if (howmuch == 0)
|
if (howmuch == 0)
|
||||||
|
@ -2021,7 +2105,6 @@ bool AddToBlockIndex(CBlock& block, CValidationState& state, const CDiskBlockPos
|
||||||
pindexNew->nSequenceId = nBlockSequenceId++;
|
pindexNew->nSequenceId = nBlockSequenceId++;
|
||||||
}
|
}
|
||||||
assert(pindexNew);
|
assert(pindexNew);
|
||||||
mapAlreadyAskedFor.erase(CInv(MSG_BLOCK, hash));
|
|
||||||
map<uint256, CBlockIndex*>::iterator mi = mapBlockIndex.insert(make_pair(hash, pindexNew)).first;
|
map<uint256, CBlockIndex*>::iterator mi = mapBlockIndex.insert(make_pair(hash, pindexNew)).first;
|
||||||
pindexNew->phashBlock = &((*mi).first);
|
pindexNew->phashBlock = &((*mi).first);
|
||||||
map<uint256, CBlockIndex*>::iterator miPrev = mapBlockIndex.find(block.hashPrevBlock);
|
map<uint256, CBlockIndex*>::iterator miPrev = mapBlockIndex.find(block.hashPrevBlock);
|
||||||
|
@ -2367,11 +2450,8 @@ bool ProcessBlock(CValidationState &state, CNode* pfrom, CBlock* pblock, CDiskBl
|
||||||
return state.Invalid(error("ProcessBlock() : already have block (orphan) %s", hash.ToString()), 0, "duplicate");
|
return state.Invalid(error("ProcessBlock() : already have block (orphan) %s", hash.ToString()), 0, "duplicate");
|
||||||
|
|
||||||
// Preliminary checks
|
// Preliminary checks
|
||||||
if (!CheckBlock(*pblock, state)) {
|
if (!CheckBlock(*pblock, state))
|
||||||
if (state.CorruptionPossible())
|
|
||||||
mapAlreadyAskedFor.erase(CInv(MSG_BLOCK, hash));
|
|
||||||
return error("ProcessBlock() : CheckBlock FAILED");
|
return error("ProcessBlock() : CheckBlock FAILED");
|
||||||
}
|
|
||||||
|
|
||||||
CBlockIndex* pcheckpoint = Checkpoints::GetLastCheckpoint(mapBlockIndex);
|
CBlockIndex* pcheckpoint = Checkpoints::GetLastCheckpoint(mapBlockIndex);
|
||||||
if (pcheckpoint && pblock->hashPrevBlock != (chainActive.Tip() ? chainActive.Tip()->GetBlockHash() : uint256(0)))
|
if (pcheckpoint && pblock->hashPrevBlock != (chainActive.Tip() ? chainActive.Tip()->GetBlockHash() : uint256(0)))
|
||||||
|
@ -3223,7 +3303,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv)
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
State(pfrom->GetId())->nLastBlockProcess = GetTimeMicros();
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
@ -3426,15 +3506,6 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv)
|
||||||
return error("message inv size() = %"PRIszu"", vInv.size());
|
return error("message inv size() = %"PRIszu"", vInv.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
// find last block in inv vector
|
|
||||||
unsigned int nLastBlock = (unsigned int)(-1);
|
|
||||||
for (unsigned int nInv = 0; nInv < vInv.size(); nInv++) {
|
|
||||||
if (vInv[vInv.size() - 1 - nInv].type == MSG_BLOCK) {
|
|
||||||
nLastBlock = vInv.size() - 1 - nInv;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
LOCK(cs_main);
|
LOCK(cs_main);
|
||||||
|
|
||||||
for (unsigned int nInv = 0; nInv < vInv.size(); nInv++)
|
for (unsigned int nInv = 0; nInv < vInv.size(); nInv++)
|
||||||
|
@ -3448,17 +3519,14 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv)
|
||||||
LogPrint("net", " got inventory: %s %s\n", inv.ToString(), fAlreadyHave ? "have" : "new");
|
LogPrint("net", " got inventory: %s %s\n", inv.ToString(), fAlreadyHave ? "have" : "new");
|
||||||
|
|
||||||
if (!fAlreadyHave) {
|
if (!fAlreadyHave) {
|
||||||
if (!fImporting && !fReindex)
|
if (!fImporting && !fReindex) {
|
||||||
pfrom->AskFor(inv);
|
if (inv.type == MSG_BLOCK)
|
||||||
|
AddBlockToQueue(pfrom->GetId(), inv.hash);
|
||||||
|
else
|
||||||
|
pfrom->AskFor(inv);
|
||||||
|
}
|
||||||
} else if (inv.type == MSG_BLOCK && mapOrphanBlocks.count(inv.hash)) {
|
} else if (inv.type == MSG_BLOCK && mapOrphanBlocks.count(inv.hash)) {
|
||||||
PushGetBlocks(pfrom, chainActive.Tip(), GetOrphanRoot(inv.hash));
|
PushGetBlocks(pfrom, chainActive.Tip(), GetOrphanRoot(inv.hash));
|
||||||
} else if (nInv == nLastBlock) {
|
|
||||||
// In case we are on a very long side-chain, it is possible that we already have
|
|
||||||
// the last block in an inv bundle sent in response to getblocks. Try to detect
|
|
||||||
// this situation and push another getblocks to continue.
|
|
||||||
PushGetBlocks(pfrom, mapBlockIndex[inv.hash], uint256(0));
|
|
||||||
if (fDebug)
|
|
||||||
LogPrintf("force request: %s\n", inv.ToString());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Track requests for our stuff
|
// Track requests for our stuff
|
||||||
|
@ -3665,6 +3733,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv)
|
||||||
LOCK(cs_main);
|
LOCK(cs_main);
|
||||||
// Remember who we got this block from.
|
// Remember who we got this block from.
|
||||||
mapBlockSource[inv.hash] = pfrom->GetId();
|
mapBlockSource[inv.hash] = pfrom->GetId();
|
||||||
|
MarkBlockAsReceived(inv.hash, pfrom->GetId());
|
||||||
|
|
||||||
CValidationState state;
|
CValidationState state;
|
||||||
ProcessBlock(state, pfrom, &block);
|
ProcessBlock(state, pfrom, &block);
|
||||||
|
@ -4192,12 +4261,38 @@ bool SendMessages(CNode* pto, bool fSendTrickle)
|
||||||
pto->PushMessage("inv", vInv);
|
pto->PushMessage("inv", vInv);
|
||||||
|
|
||||||
|
|
||||||
|
// Detect stalled peers. Require that blocks are in flight, we haven't
|
||||||
|
// received a (requested) block in one minute, and that all blocks are
|
||||||
|
// in flight for over two minutes, since we first had a chance to
|
||||||
|
// process an incoming block.
|
||||||
|
int64_t nNow = GetTimeMicros();
|
||||||
|
if (!pto->fDisconnect && state.nBlocksInFlight &&
|
||||||
|
state.nLastBlockReceive < state.nLastBlockProcess - BLOCK_DOWNLOAD_TIMEOUT*1000000 &&
|
||||||
|
state.vBlocksInFlight.front().nTime < state.nLastBlockProcess - 2*BLOCK_DOWNLOAD_TIMEOUT*1000000) {
|
||||||
|
LogPrintf("Peer %s is stalling block download, disconnecting\n", state.name.c_str());
|
||||||
|
pto->fDisconnect = true;
|
||||||
|
}
|
||||||
|
|
||||||
//
|
//
|
||||||
// Message: getdata
|
// Message: getdata (blocks)
|
||||||
//
|
//
|
||||||
vector<CInv> vGetData;
|
vector<CInv> vGetData;
|
||||||
int64_t nNow = GetTime() * 1000000;
|
while (!pto->fDisconnect && state.nBlocksToDownload && state.nBlocksInFlight < MAX_BLOCKS_IN_TRANSIT_PER_PEER) {
|
||||||
while (!pto->mapAskFor.empty() && (*pto->mapAskFor.begin()).first <= nNow)
|
uint256 hash = state.vBlocksToDownload.front();
|
||||||
|
vGetData.push_back(CInv(MSG_BLOCK, hash));
|
||||||
|
MarkBlockAsInFlight(pto->GetId(), hash);
|
||||||
|
LogPrint("net", "Requesting block %s from %s\n", hash.ToString().c_str(), state.name.c_str());
|
||||||
|
if (vGetData.size() >= 1000)
|
||||||
|
{
|
||||||
|
pto->PushMessage("getdata", vGetData);
|
||||||
|
vGetData.clear();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
//
|
||||||
|
// Message: getdata (non-blocks)
|
||||||
|
//
|
||||||
|
while (!pto->fDisconnect && !pto->mapAskFor.empty() && (*pto->mapAskFor.begin()).first <= nNow)
|
||||||
{
|
{
|
||||||
const CInv& inv = (*pto->mapAskFor.begin()).second;
|
const CInv& inv = (*pto->mapAskFor.begin()).second;
|
||||||
if (!AlreadyHave(inv))
|
if (!AlreadyHave(inv))
|
||||||
|
|
|
@ -59,6 +59,11 @@ static const int COINBASE_MATURITY = 100;
|
||||||
static const unsigned int LOCKTIME_THRESHOLD = 500000000; // Tue Nov 5 00:53:20 1985 UTC
|
static const unsigned int LOCKTIME_THRESHOLD = 500000000; // Tue Nov 5 00:53:20 1985 UTC
|
||||||
/** Maximum number of script-checking threads allowed */
|
/** Maximum number of script-checking threads allowed */
|
||||||
static const int MAX_SCRIPTCHECK_THREADS = 16;
|
static const int MAX_SCRIPTCHECK_THREADS = 16;
|
||||||
|
/** Number of blocks that can be requested at any given time from a single peer. */
|
||||||
|
static const int MAX_BLOCKS_IN_TRANSIT_PER_PEER = 128;
|
||||||
|
/** Timeout in seconds before considering a block download peer unresponsive. */
|
||||||
|
static const unsigned int BLOCK_DOWNLOAD_TIMEOUT = 60;
|
||||||
|
|
||||||
#ifdef USE_UPNP
|
#ifdef USE_UPNP
|
||||||
static const int fHaveUPnP = true;
|
static const int fHaveUPnP = true;
|
||||||
#else
|
#else
|
||||||
|
@ -182,6 +187,9 @@ bool VerifySignature(const CCoins& txFrom, const CTransaction& txTo, unsigned in
|
||||||
bool AbortNode(const std::string &msg);
|
bool AbortNode(const std::string &msg);
|
||||||
/** Get statistics from node state */
|
/** Get statistics from node state */
|
||||||
bool GetNodeStateStats(NodeId nodeid, CNodeStateStats &stats);
|
bool GetNodeStateStats(NodeId nodeid, CNodeStateStats &stats);
|
||||||
|
/** Increase a node's misbehavior score. */
|
||||||
|
void Misbehaving(NodeId nodeid, int howmuch);
|
||||||
|
|
||||||
|
|
||||||
/** (try to) add transaction to memory pool **/
|
/** (try to) add transaction to memory pool **/
|
||||||
bool AcceptToMemoryPool(CTxMemPool& pool, CValidationState &state, const CTransaction &tx, bool fLimitFree,
|
bool AcceptToMemoryPool(CTxMemPool& pool, CValidationState &state, const CTransaction &tx, bool fLimitFree,
|
||||||
|
|
|
@ -427,7 +427,7 @@ public:
|
||||||
LogPrint("net", "askfor %s %"PRId64" (%s)\n", inv.ToString().c_str(), nRequestTime, DateTimeStrFormat("%H:%M:%S", nRequestTime/1000000).c_str());
|
LogPrint("net", "askfor %s %"PRId64" (%s)\n", inv.ToString().c_str(), nRequestTime, DateTimeStrFormat("%H:%M:%S", nRequestTime/1000000).c_str());
|
||||||
|
|
||||||
// Make sure not to reuse time indexes to keep things in the same order
|
// Make sure not to reuse time indexes to keep things in the same order
|
||||||
int64_t nNow = (GetTime() - 1) * 1000000;
|
int64_t nNow = GetTimeMicros() - 1000000;
|
||||||
static int64_t nLastTime;
|
static int64_t nLastTime;
|
||||||
++nLastTime;
|
++nLastTime;
|
||||||
nNow = std::max(nNow, nLastTime);
|
nNow = std::max(nNow, nLastTime);
|
||||||
|
|
|
@ -21,7 +21,6 @@
|
||||||
// Tests this internal-to-main.cpp method:
|
// Tests this internal-to-main.cpp method:
|
||||||
extern bool AddOrphanTx(const CTransaction& tx);
|
extern bool AddOrphanTx(const CTransaction& tx);
|
||||||
extern unsigned int LimitOrphanTxSize(unsigned int nMaxOrphans);
|
extern unsigned int LimitOrphanTxSize(unsigned int nMaxOrphans);
|
||||||
extern void Misbehaving(NodeId nodeid, int howmuch);
|
|
||||||
extern std::map<uint256, CTransaction> mapOrphanTransactions;
|
extern std::map<uint256, CTransaction> mapOrphanTransactions;
|
||||||
extern std::map<uint256, std::set<uint256> > mapOrphanTransactionsByPrev;
|
extern std::map<uint256, std::set<uint256> > mapOrphanTransactionsByPrev;
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue