Merge pull request #2461 from sipa/syncnode

Make sure we always have a node to do IBD from
This commit is contained in:
Gavin Andresen 2013-04-09 10:51:06 -07:00
commit 1fd3ed25be
4 changed files with 76 additions and 19 deletions

View file

@ -3235,18 +3235,6 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv)
} }
} }
// Ask the first connected node for block updates
static int nAskedForBlocks = 0;
if (!pfrom->fClient && !pfrom->fOneShot && !fImporting && !fReindex &&
(pfrom->nStartingHeight > (nBestHeight - 144)) &&
(pfrom->nVersion < NOBLKS_VERSION_START ||
pfrom->nVersion >= NOBLKS_VERSION_END) &&
(nAskedForBlocks < 1 || vNodes.size() <= 1))
{
nAskedForBlocks++;
pfrom->PushGetBlocks(pindexBest, uint256(0));
}
// Relay alerts // Relay alerts
{ {
LOCK(cs_mapAlerts); LOCK(cs_mapAlerts);
@ -3855,6 +3843,12 @@ bool SendMessages(CNode* pto, bool fSendTrickle)
pto->PushMessage("ping"); pto->PushMessage("ping");
} }
// Start block sync
if (pto->fStartSync && !fImporting && !fReindex) {
pto->fStartSync = false;
pto->PushGetBlocks(pindexBest, uint256(0));
}
// Resend wallet transactions that haven't gotten in a block yet // Resend wallet transactions that haven't gotten in a block yet
// Except during reindex, importing and IBD, when old wallet // Except during reindex, importing and IBD, when old wallet
// transactions become unconfirmed and spams other nodes. // transactions become unconfirmed and spams other nodes.

View file

@ -44,6 +44,7 @@ static map<CNetAddr, LocalServiceInfo> mapLocalHost;
static bool vfReachable[NET_MAX] = {}; static bool vfReachable[NET_MAX] = {};
static bool vfLimited[NET_MAX] = {}; static bool vfLimited[NET_MAX] = {};
static CNode* pnodeLocalHost = NULL; static CNode* pnodeLocalHost = NULL;
static CNode* pnodeSync = NULL;
uint64 nLocalHostNonce = 0; uint64 nLocalHostNonce = 0;
static std::vector<SOCKET> vhListenSocket; static std::vector<SOCKET> vhListenSocket;
CAddrMan addrman; CAddrMan addrman;
@ -515,12 +516,16 @@ void CNode::CloseSocketDisconnect()
printf("disconnecting node %s\n", addrName.c_str()); printf("disconnecting node %s\n", addrName.c_str());
closesocket(hSocket); closesocket(hSocket);
hSocket = INVALID_SOCKET; hSocket = INVALID_SOCKET;
// in case this fails, we'll empty the recv buffer when the CNode is deleted
TRY_LOCK(cs_vRecvMsg, lockRecv);
if (lockRecv)
vRecvMsg.clear();
} }
// in case this fails, we'll empty the recv buffer when the CNode is deleted
TRY_LOCK(cs_vRecvMsg, lockRecv);
if (lockRecv)
vRecvMsg.clear();
// if this was the sync node, we'll need a new one
if (this == pnodeSync)
pnodeSync = NULL;
} }
void CNode::Cleanup() void CNode::Cleanup()
@ -607,6 +612,9 @@ void CNode::copyStats(CNodeStats &stats)
X(fInbound); X(fInbound);
X(nStartingHeight); X(nStartingHeight);
X(nMisbehavior); X(nMisbehavior);
X(nSendBytes);
X(nRecvBytes);
stats.fSyncNode = (this == pnodeSync);
} }
#undef X #undef X
@ -701,6 +709,7 @@ void SocketSendData(CNode *pnode)
int nBytes = send(pnode->hSocket, &data[pnode->nSendOffset], data.size() - pnode->nSendOffset, MSG_NOSIGNAL | MSG_DONTWAIT); int nBytes = send(pnode->hSocket, &data[pnode->nSendOffset], data.size() - pnode->nSendOffset, MSG_NOSIGNAL | MSG_DONTWAIT);
if (nBytes > 0) { if (nBytes > 0) {
pnode->nLastSend = GetTime(); pnode->nLastSend = GetTime();
pnode->nSendBytes += nBytes;
pnode->nSendOffset += nBytes; pnode->nSendOffset += nBytes;
if (pnode->nSendOffset == data.size()) { if (pnode->nSendOffset == data.size()) {
pnode->nSendOffset = 0; pnode->nSendOffset = 0;
@ -963,6 +972,7 @@ void ThreadSocketHandler()
if (!pnode->ReceiveMsgBytes(pchBuf, nBytes)) if (!pnode->ReceiveMsgBytes(pchBuf, nBytes))
pnode->CloseSocketDisconnect(); pnode->CloseSocketDisconnect();
pnode->nLastRecv = GetTime(); pnode->nLastRecv = GetTime();
pnode->nRecvBytes += nBytes;
} }
else if (nBytes == 0) else if (nBytes == 0)
{ {
@ -1538,24 +1548,64 @@ bool OpenNetworkConnection(const CAddress& addrConnect, CSemaphoreGrant *grantOu
} }
// for now, use a very simple selection metric: the node from which we received
// most recently
double static NodeSyncScore(const CNode *pnode) {
return -pnode->nLastRecv;
}
void static StartSync(const vector<CNode*> &vNodes) {
CNode *pnodeNewSync = NULL;
double dBestScore = 0;
// fImporting and fReindex are accessed out of cs_main here, but only
// as an optimization - they are checked again in SendMessages.
if (fImporting || fReindex)
return;
// Iterate over all nodes
BOOST_FOREACH(CNode* pnode, vNodes) {
// check preconditions for allowing a sync
if (!pnode->fClient && !pnode->fOneShot &&
!pnode->fDisconnect && pnode->fSuccessfullyConnected &&
(pnode->nStartingHeight > (nBestHeight - 144)) &&
(pnode->nVersion < NOBLKS_VERSION_START || pnode->nVersion >= NOBLKS_VERSION_END)) {
// if ok, compare node's score with the best so far
double dScore = NodeSyncScore(pnode);
if (pnodeNewSync == NULL || dScore > dBestScore) {
pnodeNewSync = pnode;
dBestScore = dScore;
}
}
}
// if a new sync candidate was found, start sync!
if (pnodeNewSync) {
pnodeNewSync->fStartSync = true;
pnodeSync = pnodeNewSync;
}
}
void ThreadMessageHandler() void ThreadMessageHandler()
{ {
SetThreadPriority(THREAD_PRIORITY_BELOW_NORMAL); SetThreadPriority(THREAD_PRIORITY_BELOW_NORMAL);
while (true) while (true)
{ {
bool fHaveSyncNode = false;
vector<CNode*> vNodesCopy; vector<CNode*> vNodesCopy;
{ {
LOCK(cs_vNodes); LOCK(cs_vNodes);
vNodesCopy = vNodes; vNodesCopy = vNodes;
BOOST_FOREACH(CNode* pnode, vNodesCopy) BOOST_FOREACH(CNode* pnode, vNodesCopy) {
pnode->AddRef(); pnode->AddRef();
if (pnode == pnodeSync)
fHaveSyncNode = true;
}
} }
if (!fHaveSyncNode)
StartSync(vNodesCopy);
// Poll the connected nodes for messages // Poll the connected nodes for messages
CNode* pnodeTrickle = NULL; CNode* pnodeTrickle = NULL;
if (!vNodesCopy.empty()) if (!vNodesCopy.empty())

View file

@ -101,6 +101,9 @@ public:
bool fInbound; bool fInbound;
int nStartingHeight; int nStartingHeight;
int nMisbehavior; int nMisbehavior;
uint64 nSendBytes;
uint64 nRecvBytes;
bool fSyncNode;
}; };
@ -155,12 +158,14 @@ public:
CDataStream ssSend; CDataStream ssSend;
size_t nSendSize; // total size of all vSendMsg entries size_t nSendSize; // total size of all vSendMsg entries
size_t nSendOffset; // offset inside the first vSendMsg already sent size_t nSendOffset; // offset inside the first vSendMsg already sent
uint64 nSendBytes;
std::deque<CSerializeData> vSendMsg; std::deque<CSerializeData> vSendMsg;
CCriticalSection cs_vSend; CCriticalSection cs_vSend;
std::deque<CInv> vRecvGetData; std::deque<CInv> vRecvGetData;
std::deque<CNetMessage> vRecvMsg; std::deque<CNetMessage> vRecvMsg;
CCriticalSection cs_vRecvMsg; CCriticalSection cs_vRecvMsg;
uint64 nRecvBytes;
int nRecvVersion; int nRecvVersion;
int64 nLastSend; int64 nLastSend;
@ -200,6 +205,7 @@ public:
CBlockIndex* pindexLastGetBlocksBegin; CBlockIndex* pindexLastGetBlocksBegin;
uint256 hashLastGetBlocksEnd; uint256 hashLastGetBlocksEnd;
int nStartingHeight; int nStartingHeight;
bool fStartSync;
// flood relay // flood relay
std::vector<CAddress> vAddrToSend; std::vector<CAddress> vAddrToSend;
@ -220,6 +226,8 @@ public:
nRecvVersion = MIN_PROTO_VERSION; nRecvVersion = MIN_PROTO_VERSION;
nLastSend = 0; nLastSend = 0;
nLastRecv = 0; nLastRecv = 0;
nSendBytes = 0;
nRecvBytes = 0;
nLastSendEmpty = GetTime(); nLastSendEmpty = GetTime();
nTimeConnected = GetTime(); nTimeConnected = GetTime();
addr = addrIn; addr = addrIn;
@ -239,6 +247,7 @@ public:
pindexLastGetBlocksBegin = 0; pindexLastGetBlocksBegin = 0;
hashLastGetBlocksEnd = 0; hashLastGetBlocksEnd = 0;
nStartingHeight = -1; nStartingHeight = -1;
fStartSync = false;
fGetAddr = false; fGetAddr = false;
nMisbehavior = 0; nMisbehavior = 0;
fRelayTxes = false; fRelayTxes = false;

View file

@ -51,12 +51,16 @@ Value getpeerinfo(const Array& params, bool fHelp)
obj.push_back(Pair("services", strprintf("%08"PRI64x, stats.nServices))); obj.push_back(Pair("services", strprintf("%08"PRI64x, stats.nServices)));
obj.push_back(Pair("lastsend", (boost::int64_t)stats.nLastSend)); obj.push_back(Pair("lastsend", (boost::int64_t)stats.nLastSend));
obj.push_back(Pair("lastrecv", (boost::int64_t)stats.nLastRecv)); obj.push_back(Pair("lastrecv", (boost::int64_t)stats.nLastRecv));
obj.push_back(Pair("bytessent", (boost::int64_t)stats.nSendBytes));
obj.push_back(Pair("bytesrecv", (boost::int64_t)stats.nRecvBytes));
obj.push_back(Pair("conntime", (boost::int64_t)stats.nTimeConnected)); obj.push_back(Pair("conntime", (boost::int64_t)stats.nTimeConnected));
obj.push_back(Pair("version", stats.nVersion)); obj.push_back(Pair("version", stats.nVersion));
obj.push_back(Pair("subver", stats.strSubVer)); obj.push_back(Pair("subver", stats.strSubVer));
obj.push_back(Pair("inbound", stats.fInbound)); obj.push_back(Pair("inbound", stats.fInbound));
obj.push_back(Pair("startingheight", stats.nStartingHeight)); obj.push_back(Pair("startingheight", stats.nStartingHeight));
obj.push_back(Pair("banscore", stats.nMisbehavior)); obj.push_back(Pair("banscore", stats.nMisbehavior));
if (stats.fSyncNode)
obj.push_back(Pair("syncnode", true));
ret.push_back(obj); ret.push_back(obj);
} }