net: create generic functor accessors and move vNodes to CConnman

This commit is contained in:
Cory Fields 2016-04-16 19:13:12 -04:00
parent c0569c7fa1
commit 53347f0cb9
7 changed files with 164 additions and 73 deletions

View file

@ -470,7 +470,7 @@ void UpdateBlockAvailability(NodeId nodeid, const uint256 &hash) {
}
}
void MaybeSetPeerAsAnnouncingHeaderAndIDs(const CNodeState* nodestate, CNode* pfrom) {
void MaybeSetPeerAsAnnouncingHeaderAndIDs(const CNodeState* nodestate, CNode* pfrom, CConnman& connman) {
if (nLocalServices & NODE_WITNESS) {
// Don't ever request compact blocks when segwit is enabled.
return;
@ -484,11 +484,12 @@ void MaybeSetPeerAsAnnouncingHeaderAndIDs(const CNodeState* nodestate, CNode* pf
if (lNodesAnnouncingHeaderAndIDs.size() >= 3) {
// As per BIP152, we only get 3 of our peers to announce
// blocks using compact encodings.
CNode* pnodeStop = FindNode(lNodesAnnouncingHeaderAndIDs.front());
if (pnodeStop) {
bool found = connman.ForNode(lNodesAnnouncingHeaderAndIDs.front(), [fAnnounceUsingCMPCTBLOCK, nCMPCTBLOCKVersion](CNode* pnodeStop){
pnodeStop->PushMessage(NetMsgType::SENDCMPCT, fAnnounceUsingCMPCTBLOCK, nCMPCTBLOCKVersion);
return true;
});
if(found)
lNodesAnnouncingHeaderAndIDs.pop_front();
}
}
fAnnounceUsingCMPCTBLOCK = true;
pfrom->PushMessage(NetMsgType::SENDCMPCT, fAnnounceUsingCMPCTBLOCK, nCMPCTBLOCKVersion);
@ -3089,15 +3090,15 @@ bool ActivateBestChain(CValidationState &state, const CChainParams& chainparams,
int nBlockEstimate = 0;
if (fCheckpointsEnabled)
nBlockEstimate = Checkpoints::GetTotalBlocksEstimate(chainparams.Checkpoints());
{
LOCK(cs_vNodes);
BOOST_FOREACH(CNode* pnode, vNodes) {
if(connman) {
connman->ForEachNode([nNewHeight, nBlockEstimate, &vHashes](CNode* pnode) {
if (nNewHeight > (pnode->nStartingHeight != -1 ? pnode->nStartingHeight - 2000 : nBlockEstimate)) {
BOOST_REVERSE_FOREACH(const uint256& hash, vHashes) {
pnode->PushBlockHash(hash);
}
}
}
return true;
});
}
// Notify external listeners about the new tip.
if (!vHashes.empty()) {
@ -4726,6 +4727,45 @@ bool static AlreadyHave(const CInv& inv) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
return true;
}
static void RelayTransaction(const CTransaction& tx, CConnman& connman)
{
CInv inv(MSG_TX, tx.GetHash());
connman.ForEachNode([&inv](CNode* pnode)
{
pnode->PushInventory(inv);
return true;
});
}
static void RelayAddress(const CAddress& addr, bool fReachable, CConnman& connman)
{
int nRelayNodes = fReachable ? 2 : 1; // limited relaying of addresses outside our network(s)
// Relay to a limited number of other nodes
// Use deterministic randomness to send to the same nodes for 24 hours
// at a time so the addrKnowns of the chosen nodes prevent repeats
static const uint64_t salt0 = GetRand(std::numeric_limits<uint64_t>::max());
static const uint64_t salt1 = GetRand(std::numeric_limits<uint64_t>::max());
uint64_t hashAddr = addr.GetHash();
std::multimap<uint64_t, CNode*> mapMix;
const CSipHasher hasher = CSipHasher(salt0, salt1).Write(hashAddr << 32).Write((GetTime() + hashAddr) / (24*60*60));
auto sortfunc = [&mapMix, &hasher](CNode* pnode) {
if (pnode->nVersion >= CADDR_TIME_VERSION) {
uint64_t hashKey = CSipHasher(hasher).Write(pnode->id).Finalize();
mapMix.emplace(hashKey, pnode);
}
return true;
};
auto pushfunc = [&addr, &mapMix, &nRelayNodes] {
for (auto mi = mapMix.begin(); mi != mapMix.end() && nRelayNodes-- > 0; ++mi)
mi->second->PushAddress(addr);
};
connman.ForEachNodeThen(std::move(sortfunc), std::move(pushfunc));
}
void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParams)
{
std::deque<CInv>::iterator it = pfrom->vRecvGetData.begin();
@ -5135,26 +5175,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
if (addr.nTime > nSince && !pfrom->fGetAddr && vAddr.size() <= 10 && addr.IsRoutable())
{
// Relay to a limited number of other nodes
{
LOCK(cs_vNodes);
// Use deterministic randomness to send to the same nodes for 24 hours
// at a time so the addrKnowns of the chosen nodes prevent repeats
static const uint64_t salt0 = GetRand(std::numeric_limits<uint64_t>::max());
static const uint64_t salt1 = GetRand(std::numeric_limits<uint64_t>::max());
uint64_t hashAddr = addr.GetHash();
multimap<uint64_t, CNode*> mapMix;
const CSipHasher hasher = CSipHasher(salt0, salt1).Write(hashAddr << 32).Write((GetTime() + hashAddr) / (24*60*60));
BOOST_FOREACH(CNode* pnode, vNodes)
{
if (pnode->nVersion < CADDR_TIME_VERSION)
continue;
uint64_t hashKey = CSipHasher(hasher).Write(pnode->id).Finalize();
mapMix.insert(make_pair(hashKey, pnode));
}
int nRelayNodes = fReachable ? 2 : 1; // limited relaying of addresses outside our network(s)
for (multimap<uint64_t, CNode*>::iterator mi = mapMix.begin(); mi != mapMix.end() && nRelayNodes-- > 0; ++mi)
((*mi).second)->PushAddress(addr);
}
RelayAddress(addr, fReachable, connman);
}
// Do not store addresses outside our network
if (fReachable)
@ -5448,7 +5469,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
if (!AlreadyHave(inv) && AcceptToMemoryPool(mempool, state, tx, true, &fMissingInputs)) {
mempool.check(pcoinsTip);
RelayTransaction(tx);
RelayTransaction(tx, connman);
for (unsigned int i = 0; i < tx.vout.size(); i++) {
vWorkQueue.emplace_back(inv.hash, i);
}
@ -5485,7 +5506,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
continue;
if (AcceptToMemoryPool(mempool, stateDummy, orphanTx, true, &fMissingInputs2)) {
LogPrint("mempool", " accepted orphan tx %s\n", orphanHash.ToString());
RelayTransaction(orphanTx);
RelayTransaction(orphanTx, connman);
for (unsigned int i = 0; i < orphanTx.vout.size(); i++) {
vWorkQueue.emplace_back(orphanHash, i);
}
@ -5560,7 +5581,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
int nDoS = 0;
if (!state.IsInvalid(nDoS) || nDoS == 0) {
LogPrintf("Force relaying tx %s from whitelisted peer=%d\n", tx.GetHash().ToString(), pfrom->id);
RelayTransaction(tx);
RelayTransaction(tx, connman);
} else {
LogPrintf("Not relaying invalid transaction %s from whitelisted peer=%d (%s)\n", tx.GetHash().ToString(), pfrom->id, FormatStateMessage(state));
}
@ -5886,7 +5907,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
if (nodestate->fProvidesHeaderAndIDs && vGetData.size() == 1 && mapBlocksInFlight.size() == 1 && pindexLast->pprev->IsValid(BLOCK_VALID_CHAIN) && !(nLocalServices & NODE_WITNESS)) {
// We seem to be rather well-synced, so it appears pfrom was the first to provide us
// with this block! Let's get them to announce using compact blocks in the future.
MaybeSetPeerAsAnnouncingHeaderAndIDs(nodestate, pfrom);
MaybeSetPeerAsAnnouncingHeaderAndIDs(nodestate, pfrom, connman);
// In any case, we want to download using a compact block, not a regular one
vGetData[0] = CInv(MSG_CMPCT_BLOCK, vGetData[0].hash);
}

View file

@ -87,8 +87,6 @@ uint64_t nLocalHostNonce = 0;
int nMaxConnections = DEFAULT_MAX_PEER_CONNECTIONS;
std::string strSubVersion;
std::vector<CNode*> vNodes;
CCriticalSection cs_vNodes;
limitedmap<uint256, int64_t> mapAlreadyAskedFor(MAX_INV_SZ);
NodeId nLastNodeId = 0;
@ -315,7 +313,7 @@ uint64_t CNode::nMaxOutboundTotalBytesSentInCycle = 0;
uint64_t CNode::nMaxOutboundTimeframe = 60*60*24; //1 day
uint64_t CNode::nMaxOutboundCycleStartTime = 0;
CNode* FindNode(const CNetAddr& ip)
CNode* CConnman::FindNode(const CNetAddr& ip)
{
LOCK(cs_vNodes);
BOOST_FOREACH(CNode* pnode, vNodes)
@ -324,7 +322,7 @@ CNode* FindNode(const CNetAddr& ip)
return NULL;
}
CNode* FindNode(const CSubNet& subNet)
CNode* CConnman::FindNode(const CSubNet& subNet)
{
LOCK(cs_vNodes);
BOOST_FOREACH(CNode* pnode, vNodes)
@ -333,7 +331,7 @@ CNode* FindNode(const CSubNet& subNet)
return NULL;
}
CNode* FindNode(const std::string& addrName)
CNode* CConnman::FindNode(const std::string& addrName)
{
LOCK(cs_vNodes);
BOOST_FOREACH(CNode* pnode, vNodes)
@ -342,7 +340,7 @@ CNode* FindNode(const std::string& addrName)
return NULL;
}
CNode* FindNode(const CService& addr)
CNode* CConnman::FindNode(const CService& addr)
{
LOCK(cs_vNodes);
BOOST_FOREACH(CNode* pnode, vNodes)
@ -351,16 +349,6 @@ CNode* FindNode(const CService& addr)
return NULL;
}
//TODO: This is used in only one place in main, and should be removed
CNode* FindNode(const NodeId nodeid)
{
LOCK(cs_vNodes);
BOOST_FOREACH(CNode* pnode, vNodes)
if (pnode->GetId() == nodeid)
return (pnode);
return NULL;
}
CNode* CConnman::ConnectNode(CAddress addrConnect, const char *pszDest, bool fCountFailure)
{
if (pszDest == NULL) {
@ -899,7 +887,8 @@ static bool CompareNodeTXTime(const NodeEvictionCandidate &a, const NodeEviction
* to forge. In order to partition a node the attacker must be
* simultaneously better at all of them than honest peers.
*/
static bool AttemptToEvictConnection() {
bool CConnman::AttemptToEvictConnection()
{
std::vector<NodeEvictionCandidate> vEvictionCandidates;
{
LOCK(cs_vNodes);
@ -2320,7 +2309,6 @@ bool CConnman::DisconnectNode(const std::string& strNode)
}
return false;
}
bool CConnman::DisconnectNode(NodeId id)
{
LOCK(cs_vNodes);
@ -2333,7 +2321,7 @@ bool CConnman::DisconnectNode(NodeId id)
return false;
}
void RelayTransaction(const CTransaction& tx)
void CConnman::RelayTransaction(const CTransaction& tx)
{
CInv inv(MSG_TX, tx.GetHash());
LOCK(cs_vNodes);
@ -2671,6 +2659,63 @@ void CNode::EndMessage(const char* pszCommand) UNLOCK_FUNCTION(cs_vSend)
LEAVE_CRITICAL_SECTION(cs_vSend);
}
bool CConnman::ForNode(NodeId id, std::function<bool(CNode* pnode)> func)
{
CNode* found = nullptr;
LOCK(cs_vNodes);
for (auto&& pnode : vNodes) {
if(pnode->id == id) {
found = pnode;
break;
}
}
return found != nullptr && func(found);
}
bool CConnman::ForEachNode(std::function<bool(CNode* pnode)> func)
{
LOCK(cs_vNodes);
for (auto&& node : vNodes)
if(!func(node))
return false;
return true;
}
bool CConnman::ForEachNode(std::function<bool(const CNode* pnode)> func) const
{
LOCK(cs_vNodes);
for (const auto& node : vNodes)
if(!func(node))
return false;
return true;
}
bool CConnman::ForEachNodeThen(std::function<bool(CNode* pnode)> pre, std::function<void()> post)
{
bool ret = true;
LOCK(cs_vNodes);
for (auto&& node : vNodes)
if(!pre(node)) {
ret = false;
break;
}
post();
return ret;
}
bool CConnman::ForEachNodeThen(std::function<bool(const CNode* pnode)> pre, std::function<void()> post) const
{
bool ret = true;
LOCK(cs_vNodes);
for (const auto& node : vNodes)
if(!pre(node)) {
ret = false;
break;
}
post();
return ret;
}
int64_t PoissonNextSend(int64_t nNow, int average_interval_seconds) {
return nNow + (int64_t)(log1p(GetRand(1ULL << 48) * -0.0000000000000035527136788 /* -1/2^48 */) * average_interval_seconds * -1000000.0 + 0.5);
}

View file

@ -95,12 +95,7 @@ struct AddedNodeInfo
bool fInbound;
};
CNode* FindNode(const CNetAddr& ip);
CNode* FindNode(const CSubNet& subNet);
CNode* FindNode(const std::string& addrName);
CNode* FindNode(const CService& ip);
CNode* FindNode(const NodeId id); //TODO: Remove this
class CTransaction;
class CNodeStats;
class CConnman
{
@ -120,6 +115,14 @@ public:
bool BindListenPort(const CService &bindAddr, std::string& strError, bool fWhitelisted = false);
bool OpenNetworkConnection(const CAddress& addrConnect, bool fCountFailure, CSemaphoreGrant *grantOutbound = NULL, const char *strDest = NULL, bool fOneShot = false, bool fFeeler = false);
bool ForNode(NodeId id, std::function<bool(CNode* pnode)> func);
bool ForEachNode(std::function<bool(CNode* pnode)> func);
bool ForEachNode(std::function<bool(const CNode* pnode)> func) const;
bool ForEachNodeThen(std::function<bool(CNode* pnode)> pre, std::function<void()> post);
bool ForEachNodeThen(std::function<bool(const CNode* pnode)> pre, std::function<void()> post) const;
void RelayTransaction(const CTransaction& tx);
// Addrman functions
size_t GetAddressCount() const;
void SetServices(const CService &addr, ServiceFlags nServices);
@ -182,6 +185,12 @@ private:
void ThreadSocketHandler();
void ThreadDNSAddressSeed();
CNode* FindNode(const CNetAddr& ip);
CNode* FindNode(const CSubNet& subNet);
CNode* FindNode(const std::string& addrName);
CNode* FindNode(const CService& addr);
bool AttemptToEvictConnection();
CNode* ConnectNode(CAddress addrConnect, const char *pszDest, bool fCountFailure);
void DeleteNode(CNode* pnode);
//!check is the banlist has unwritten changes
@ -204,6 +213,8 @@ private:
CCriticalSection cs_vOneShots;
std::vector<std::string> vAddedNodes;
CCriticalSection cs_vAddedNodes;
std::vector<CNode*> vNodes;
mutable CCriticalSection cs_vNodes;
};
extern std::unique_ptr<CConnman> g_connman;
void MapPort(bool fUseUPnP);
@ -279,8 +290,6 @@ extern uint64_t nLocalHostNonce;
/** Maximum number of connections to simultaneously allow (aka connection slots) */
extern int nMaxConnections;
extern std::vector<CNode*> vNodes;
extern CCriticalSection cs_vNodes;
extern limitedmap<uint256, int64_t> mapAlreadyAskedFor;
extern NodeId nLastNodeId;
@ -828,8 +837,6 @@ public:
class CTransaction;
void RelayTransaction(const CTransaction& tx);
/** Return a timestamp in the future (in microseconds) for exponentially distributed events. */

View file

@ -472,14 +472,17 @@ UniValue setmocktime(const UniValue& params, bool fHelp)
// atomically with the time change to prevent peers from being
// disconnected because we think we haven't communicated with them
// in a long time.
LOCK2(cs_main, cs_vNodes);
LOCK(cs_main);
RPCTypeCheck(params, boost::assign::list_of(UniValue::VNUM));
SetMockTime(params[0].get_int64());
uint64_t t = GetTime();
BOOST_FOREACH(CNode* pnode, vNodes) {
pnode->nLastSend = pnode->nLastRecv = t;
if(g_connman) {
g_connman->ForEachNode([t](CNode* pnode) {
pnode->nLastSend = pnode->nLastRecv = t;
return true;
});
}
return NullUniValue;

View file

@ -55,13 +55,14 @@ UniValue ping(const UniValue& params, bool fHelp)
+ HelpExampleRpc("ping", "")
);
if(!g_connman)
throw JSONRPCError(RPC_CLIENT_P2P_DISABLED, "Error: Peer-to-peer functionality missing or disabled");
// Request that each node send a ping during next message processing pass
LOCK2(cs_main, cs_vNodes);
BOOST_FOREACH(CNode* pNode, vNodes) {
pNode->fPingQueued = true;
}
g_connman->ForEachNode([](CNode* pnode) {
pnode->fPingQueued = true;
return true;
});
return NullUniValue;
}

View file

@ -891,8 +891,15 @@ UniValue sendrawtransaction(const UniValue& params, bool fHelp)
} else if (fHaveChain) {
throw JSONRPCError(RPC_TRANSACTION_ALREADY_IN_CHAIN, "transaction already in block chain");
}
RelayTransaction(tx);
if(!g_connman)
throw JSONRPCError(RPC_CLIENT_P2P_DISABLED, "Error: Peer-to-peer functionality missing or disabled");
CInv inv(MSG_TX, hashTx);
g_connman->ForEachNode([&inv](CNode* pnode)
{
pnode->PushInventory(inv);
return true;
});
return hashTx.GetHex();
}

View file

@ -1460,8 +1460,15 @@ bool CWalletTx::RelayWalletTransaction(CConnman* connman)
{
if (GetDepthInMainChain() == 0 && !isAbandoned() && InMempool()) {
LogPrintf("Relaying wtx %s\n", GetHash().ToString());
RelayTransaction((CTransaction)*this);
return true;
if (connman) {
CInv inv(MSG_TX, GetHash());
connman->ForEachNode([&inv](CNode* pnode)
{
pnode->PushInventory(inv);
return true;
});
return true;
}
}
}
return false;