diff --git a/src/net.cpp b/src/net.cpp index 7dc2d4c22..59bace41b 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -64,6 +64,9 @@ map mapAlreadyAskedFor; set setservAddNodeAddresses; CCriticalSection cs_setservAddNodeAddresses; +static CWaitableCriticalSection csOutbound; +static int nOutbound = 0; +static CConditionVariable condOutbound; unsigned short GetListenPort() @@ -361,6 +364,8 @@ CNode* ConnectNode(CAddress addrConnect, int64 nTimeout) pnode->AddRef(); CRITICAL_BLOCK(cs_vNodes) vNodes.push_back(pnode); + WAITABLE_CRITICAL_BLOCK(csOutbound) + nOutbound++; pnode->nTimeConnected = GetTime(); return pnode; @@ -504,6 +509,15 @@ void ThreadSocketHandler2(void* parg) // remove from vNodes vNodes.erase(remove(vNodes.begin(), vNodes.end(), pnode), vNodes.end()); + if (!pnode->fInbound) + WAITABLE_CRITICAL_BLOCK(csOutbound) + { + nOutbound--; + + // Connection slot(s) were removed, notify connection creator(s) + NOTIFY(condOutbound); + } + // close socket and cleanup pnode->CloseSocketDisconnect(); pnode->Cleanup(); @@ -1172,8 +1186,6 @@ void ThreadOpenConnections2(void* parg) int64 nStart = GetTime(); loop { - int nOutbound = 0; - vnThreadsRunning[THREAD_OPENCONNECTIONS]--; Sleep(500); vnThreadsRunning[THREAD_OPENCONNECTIONS]++; @@ -1181,23 +1193,13 @@ void ThreadOpenConnections2(void* parg) return; // Limit outbound connections - loop - { - nOutbound = 0; - CRITICAL_BLOCK(cs_vNodes) - BOOST_FOREACH(CNode* pnode, vNodes) - if (!pnode->fInbound) - nOutbound++; - int nMaxOutboundConnections = MAX_OUTBOUND_CONNECTIONS; - nMaxOutboundConnections = min(nMaxOutboundConnections, (int)GetArg("-maxconnections", 125)); - if (nOutbound < nMaxOutboundConnections) - break; - vnThreadsRunning[THREAD_OPENCONNECTIONS]--; - Sleep(2000); - vnThreadsRunning[THREAD_OPENCONNECTIONS]++; - if (fShutdown) - return; - } + int nMaxOutbound = min(MAX_OUTBOUND_CONNECTIONS, (int)GetArg("-maxconnections", 125)); + vnThreadsRunning[THREAD_OPENCONNECTIONS]--; + WAITABLE_CRITICAL_BLOCK(csOutbound) + WAIT(condOutbound, fShutdown || nOutbound < nMaxOutbound); + vnThreadsRunning[THREAD_OPENCONNECTIONS]++; + if (fShutdown) + return; bool fAddSeeds = false; @@ -1646,6 +1648,7 @@ bool StopNode() fShutdown = true; nTransactionsUpdated++; int64 nStart = GetTime(); + NOTIFY_ALL(condOutbound); do { int nThreadsRunning = 0; diff --git a/src/util.cpp b/src/util.cpp index 5c4755152..d55e7ae10 100644 --- a/src/util.cpp +++ b/src/util.cpp @@ -1183,62 +1183,14 @@ static void pop_lock() dd_mutex.unlock(); } -void CCriticalSection::Enter(const char* pszName, const char* pszFile, int nLine) +void EnterCritical(const char* pszName, const char* pszFile, int nLine, void* cs) { - push_lock(this, CLockLocation(pszName, pszFile, nLine)); -#ifdef DEBUG_LOCKCONTENTION - bool result = mutex.try_lock(); - if (!result) - { - printf("LOCKCONTENTION: %s\n", pszName); - printf("Locker: %s:%d\n", pszFile, nLine); - mutex.lock(); - printf("Locked\n"); - } -#else - mutex.lock(); -#endif + push_lock(cs, CLockLocation(pszName, pszFile, nLine)); } -void CCriticalSection::Leave() + +void LeaveCritical() { - mutex.unlock(); pop_lock(); } -bool CCriticalSection::TryEnter(const char* pszName, const char* pszFile, int nLine) -{ - push_lock(this, CLockLocation(pszName, pszFile, nLine)); - bool result = mutex.try_lock(); - if (!result) pop_lock(); - return result; -} - -#else - -void CCriticalSection::Enter(const char* pszName, const char* pszFile, int nLine) -{ -#ifdef DEBUG_LOCKCONTENTION - bool result = mutex.try_lock(); - if (!result) - { - printf("LOCKCONTENTION: %s\n", pszName); - printf("Locker: %s:%d\n", pszFile, nLine); - mutex.lock(); - } -#else - mutex.lock(); -#endif -} - -void CCriticalSection::Leave() -{ - mutex.unlock(); -} - -bool CCriticalSection::TryEnter(const char*, const char*, int) -{ - bool result = mutex.try_lock(); - return result; -} #endif /* DEBUG_LOCKORDER */ - diff --git a/src/util.h b/src/util.h index e4cf83f43..635790b71 100644 --- a/src/util.h +++ b/src/util.h @@ -20,6 +20,9 @@ typedef int pid_t; /* define for windows compatiblity */ #include #include +#include +#include +#include #include #include @@ -180,82 +183,134 @@ void AddTimeData(const CNetAddr& ip, int64 nTime); +/** Wrapped boost mutex: supports recursive locking, but no waiting */ +typedef boost::interprocess::interprocess_recursive_mutex CCriticalSection; -/** Wrapper to automatically initialize mutex. */ -class CCriticalSection +/** Wrapped boost mutex: supports waiting but not recursive locking */ +typedef boost::interprocess::interprocess_mutex CWaitableCriticalSection; + +#ifdef DEBUG_LOCKORDER +void EnterCritical(const char* pszName, const char* pszFile, int nLine, void* cs); +void LeaveCritical(); +#else +void static inline EnterCritical(const char* pszName, const char* pszFile, int nLine, void* cs) {} +void static inline LeaveCritical() {} +#endif + +/** Wrapper around boost::interprocess::scoped_lock */ +template +class CMutexLock { -protected: - boost::interprocess::interprocess_recursive_mutex mutex; +private: + boost::interprocess::scoped_lock lock; public: - explicit CCriticalSection() { } - ~CCriticalSection() { } - void Enter(const char* pszName, const char* pszFile, int nLine); - void Leave(); - bool TryEnter(const char* pszName, const char* pszFile, int nLine); -}; -/** RAII object that acquires mutex. Needed for exception safety. */ -class CCriticalBlock -{ -protected: - CCriticalSection* pcs; - -public: - CCriticalBlock(CCriticalSection& csIn, const char* pszName, const char* pszFile, int nLine) + void Enter(const char* pszName, const char* pszFile, int nLine) { - pcs = &csIn; - pcs->Enter(pszName, pszFile, nLine); - } - - operator bool() const - { - return true; - } - - ~CCriticalBlock() - { - pcs->Leave(); - } -}; - -#define CRITICAL_BLOCK(cs) \ - if (CCriticalBlock criticalblock = CCriticalBlock(cs, #cs, __FILE__, __LINE__)) - -#define ENTER_CRITICAL_SECTION(cs) \ - (cs).Enter(#cs, __FILE__, __LINE__) - -#define LEAVE_CRITICAL_SECTION(cs) \ - (cs).Leave() - -/** RAII object that tries to acquire mutex. Needed for exception safety. */ -class CTryCriticalBlock -{ -protected: - CCriticalSection* pcs; - -public: - CTryCriticalBlock(CCriticalSection& csIn, const char* pszName, const char* pszFile, int nLine) - { - pcs = (csIn.TryEnter(pszName, pszFile, nLine) ? &csIn : NULL); - } - - operator bool() const - { - return Entered(); - } - - ~CTryCriticalBlock() - { - if (pcs) + if (!lock.owns()) { - pcs->Leave(); + EnterCritical(pszName, pszFile, nLine, (void*)(lock.mutex())); +#ifdef DEBUG_LOCKCONTENTION + if (!lock.try_lock()) + { + printf("LOCKCONTENTION: %s\n", pszName); + printf("Locker: %s:%d\n", pszFile, nLine); + } +#endif + lock.lock(); } } - bool Entered() const { return pcs != NULL; } + + void Leave() + { + if (lock.owns()) + { + lock.unlock(); + LeaveCritical(); + } + } + + bool TryEnter(const char* pszName, const char* pszFile, int nLine) + { + if (!lock.owns()) + { + EnterCritical(pszName, pszFile, nLine, (void*)(lock.mutex())); + lock.try_lock(); + if (!lock.owns()) + LeaveCritical(); + } + return lock.owns(); + } + + CMutexLock(Mutex& mutexIn, const char* pszName, const char* pszFile, int nLine, bool fTry = false) : lock(mutexIn, boost::interprocess::defer_lock) + { + if (fTry) + TryEnter(pszName, pszFile, nLine); + else + Enter(pszName, pszFile, nLine); + } + + ~CMutexLock() + { + if (lock.owns()) + LeaveCritical(); + } + + operator bool() + { + return lock.owns(); + } + + boost::interprocess::scoped_lock &GetLock() + { + return lock; + } }; +typedef CMutexLock CCriticalBlock; +typedef CMutexLock CWaitableCriticalBlock; +typedef boost::interprocess::interprocess_condition CConditionVariable; + +/** Wait for a given condition inside a WAITABLE_CRITICAL_BLOCK */ +#define WAIT(name,condition) \ + do { while(!(condition)) { (name).wait(waitablecriticalblock.GetLock()); } } while(0) + +/** Notify waiting threads that a condition may hold now */ +#define NOTIFY(name) \ + do { (name).notify_one(); } while(0) + +#define NOTIFY_ALL(name) \ + do { (name).notify_all(); } while(0) + +#define CRITICAL_BLOCK(cs) \ + for (bool fcriticalblockonce=true; fcriticalblockonce; assert(("break caught by CRITICAL_BLOCK!" && !fcriticalblockonce)), fcriticalblockonce=false) \ + for (CCriticalBlock criticalblock(cs, #cs, __FILE__, __LINE__); fcriticalblockonce; fcriticalblockonce=false) + +#define WAITABLE_CRITICAL_BLOCK(cs) \ + for (bool fcriticalblockonce=true; fcriticalblockonce; assert(("break caught by WAITABLE_CRITICAL_BLOCK!" && !fcriticalblockonce)), fcriticalblockonce=false) \ + for (CWaitableCriticalBlock waitablecriticalblock(cs, #cs, __FILE__, __LINE__); fcriticalblockonce; fcriticalblockonce=false) + +#define ENTER_CRITICAL_SECTION(cs) \ + { \ + EnterCritical(#cs, __FILE__, __LINE__, (void*)(&cs)); \ + (cs).lock(); \ + } + +#define LEAVE_CRITICAL_SECTION(cs) \ + { \ + (cs).unlock(); \ + LeaveCritical(); \ + } + #define TRY_CRITICAL_BLOCK(cs) \ - if (CTryCriticalBlock criticalblock = CTryCriticalBlock(cs, #cs, __FILE__, __LINE__)) + for (bool fcriticalblockonce=true; fcriticalblockonce; assert(("break caught by TRY_CRITICAL_BLOCK!" && !fcriticalblockonce)), fcriticalblockonce=false) \ + for (CCriticalBlock criticalblock(cs, #cs, __FILE__, __LINE__, true); fcriticalblockonce && (fcriticalblockonce = criticalblock); fcriticalblockonce=false) + + +// This is exactly like std::string, but with a custom allocator. +// (secure_allocator<> is defined in serialize.h) +typedef std::basic_string, secure_allocator > SecureString; +