From 712fd182b72b0f5a1bcf843f171c29ec0a49b50f Mon Sep 17 00:00:00 2001 From: Pieter Wuille Date: Mon, 2 Apr 2012 02:40:41 +0200 Subject: [PATCH 1/2] Locking system overhaul, add condition variables This commit simplifies the locking system: CCriticalSection becomes a simple typedef for boost::interprocess::interprocess_recursive_mutex, and CCriticalBlock and CTryCriticalBlock are replaced by a templated CMutexLock, which wraps boost::interprocess::scoped_lock. By making the lock type a template parameter, some critical sections can now be changed to non-recursive locks, which support waiting via condition variables. These are implemented in CWaitableCriticalSection and WAITABLE_CRITICAL_BLOCK. CWaitableCriticalSection is a wrapper for a different Boost mutex, which supports waiting/notification via condition variables. This should enable us to remove much of the used polling code. Important is that this mutex is not recursive, so functions that perform the locking must not call eachother. Because boost::interprocess::scoped_lock does not support assigning and copying, I had to revert to the older CRITICAL_BLOCK macros that use a nested for loop instead of a simple if. --- src/util.cpp | 56 ++-------------- src/util.h | 185 +++++++++++++++++++++++++++++++++------------------ 2 files changed, 124 insertions(+), 117 deletions(-) diff --git a/src/util.cpp b/src/util.cpp index 4f34ab616..09361ef8f 100644 --- a/src/util.cpp +++ b/src/util.cpp @@ -1193,62 +1193,14 @@ static void pop_lock() dd_mutex.unlock(); } -void CCriticalSection::Enter(const char* pszName, const char* pszFile, int nLine) +void EnterCritical(const char* pszName, const char* pszFile, int nLine, void* cs) { - push_lock(this, CLockLocation(pszName, pszFile, nLine)); -#ifdef DEBUG_LOCKCONTENTION - bool result = mutex.try_lock(); - if (!result) - { - printf("LOCKCONTENTION: %s\n", pszName); - printf("Locker: %s:%d\n", pszFile, nLine); - mutex.lock(); - printf("Locked\n"); - } -#else - mutex.lock(); -#endif + push_lock(cs, CLockLocation(pszName, pszFile, nLine)); } -void CCriticalSection::Leave() + +void LeaveCritical() { - mutex.unlock(); pop_lock(); } -bool CCriticalSection::TryEnter(const char* pszName, const char* pszFile, int nLine) -{ - push_lock(this, CLockLocation(pszName, pszFile, nLine)); - bool result = mutex.try_lock(); - if (!result) pop_lock(); - return result; -} - -#else - -void CCriticalSection::Enter(const char* pszName, const char* pszFile, int nLine) -{ -#ifdef DEBUG_LOCKCONTENTION - bool result = mutex.try_lock(); - if (!result) - { - printf("LOCKCONTENTION: %s\n", pszName); - printf("Locker: %s:%d\n", pszFile, nLine); - mutex.lock(); - } -#else - mutex.lock(); -#endif -} - -void CCriticalSection::Leave() -{ - mutex.unlock(); -} - -bool CCriticalSection::TryEnter(const char*, const char*, int) -{ - bool result = mutex.try_lock(); - return result; -} #endif /* DEBUG_LOCKORDER */ - diff --git a/src/util.h b/src/util.h index e4cf83f43..635790b71 100644 --- a/src/util.h +++ b/src/util.h @@ -20,6 +20,9 @@ typedef int pid_t; /* define for windows compatiblity */ #include #include +#include +#include +#include #include #include @@ -180,82 +183,134 @@ void AddTimeData(const CNetAddr& ip, int64 nTime); +/** Wrapped boost mutex: supports recursive locking, but no waiting */ +typedef boost::interprocess::interprocess_recursive_mutex CCriticalSection; -/** Wrapper to automatically initialize mutex. */ -class CCriticalSection +/** Wrapped boost mutex: supports waiting but not recursive locking */ +typedef boost::interprocess::interprocess_mutex CWaitableCriticalSection; + +#ifdef DEBUG_LOCKORDER +void EnterCritical(const char* pszName, const char* pszFile, int nLine, void* cs); +void LeaveCritical(); +#else +void static inline EnterCritical(const char* pszName, const char* pszFile, int nLine, void* cs) {} +void static inline LeaveCritical() {} +#endif + +/** Wrapper around boost::interprocess::scoped_lock */ +template +class CMutexLock { -protected: - boost::interprocess::interprocess_recursive_mutex mutex; +private: + boost::interprocess::scoped_lock lock; public: - explicit CCriticalSection() { } - ~CCriticalSection() { } - void Enter(const char* pszName, const char* pszFile, int nLine); - void Leave(); - bool TryEnter(const char* pszName, const char* pszFile, int nLine); -}; -/** RAII object that acquires mutex. Needed for exception safety. */ -class CCriticalBlock -{ -protected: - CCriticalSection* pcs; - -public: - CCriticalBlock(CCriticalSection& csIn, const char* pszName, const char* pszFile, int nLine) + void Enter(const char* pszName, const char* pszFile, int nLine) { - pcs = &csIn; - pcs->Enter(pszName, pszFile, nLine); - } - - operator bool() const - { - return true; - } - - ~CCriticalBlock() - { - pcs->Leave(); - } -}; - -#define CRITICAL_BLOCK(cs) \ - if (CCriticalBlock criticalblock = CCriticalBlock(cs, #cs, __FILE__, __LINE__)) - -#define ENTER_CRITICAL_SECTION(cs) \ - (cs).Enter(#cs, __FILE__, __LINE__) - -#define LEAVE_CRITICAL_SECTION(cs) \ - (cs).Leave() - -/** RAII object that tries to acquire mutex. Needed for exception safety. */ -class CTryCriticalBlock -{ -protected: - CCriticalSection* pcs; - -public: - CTryCriticalBlock(CCriticalSection& csIn, const char* pszName, const char* pszFile, int nLine) - { - pcs = (csIn.TryEnter(pszName, pszFile, nLine) ? &csIn : NULL); - } - - operator bool() const - { - return Entered(); - } - - ~CTryCriticalBlock() - { - if (pcs) + if (!lock.owns()) { - pcs->Leave(); + EnterCritical(pszName, pszFile, nLine, (void*)(lock.mutex())); +#ifdef DEBUG_LOCKCONTENTION + if (!lock.try_lock()) + { + printf("LOCKCONTENTION: %s\n", pszName); + printf("Locker: %s:%d\n", pszFile, nLine); + } +#endif + lock.lock(); } } - bool Entered() const { return pcs != NULL; } + + void Leave() + { + if (lock.owns()) + { + lock.unlock(); + LeaveCritical(); + } + } + + bool TryEnter(const char* pszName, const char* pszFile, int nLine) + { + if (!lock.owns()) + { + EnterCritical(pszName, pszFile, nLine, (void*)(lock.mutex())); + lock.try_lock(); + if (!lock.owns()) + LeaveCritical(); + } + return lock.owns(); + } + + CMutexLock(Mutex& mutexIn, const char* pszName, const char* pszFile, int nLine, bool fTry = false) : lock(mutexIn, boost::interprocess::defer_lock) + { + if (fTry) + TryEnter(pszName, pszFile, nLine); + else + Enter(pszName, pszFile, nLine); + } + + ~CMutexLock() + { + if (lock.owns()) + LeaveCritical(); + } + + operator bool() + { + return lock.owns(); + } + + boost::interprocess::scoped_lock &GetLock() + { + return lock; + } }; +typedef CMutexLock CCriticalBlock; +typedef CMutexLock CWaitableCriticalBlock; +typedef boost::interprocess::interprocess_condition CConditionVariable; + +/** Wait for a given condition inside a WAITABLE_CRITICAL_BLOCK */ +#define WAIT(name,condition) \ + do { while(!(condition)) { (name).wait(waitablecriticalblock.GetLock()); } } while(0) + +/** Notify waiting threads that a condition may hold now */ +#define NOTIFY(name) \ + do { (name).notify_one(); } while(0) + +#define NOTIFY_ALL(name) \ + do { (name).notify_all(); } while(0) + +#define CRITICAL_BLOCK(cs) \ + for (bool fcriticalblockonce=true; fcriticalblockonce; assert(("break caught by CRITICAL_BLOCK!" && !fcriticalblockonce)), fcriticalblockonce=false) \ + for (CCriticalBlock criticalblock(cs, #cs, __FILE__, __LINE__); fcriticalblockonce; fcriticalblockonce=false) + +#define WAITABLE_CRITICAL_BLOCK(cs) \ + for (bool fcriticalblockonce=true; fcriticalblockonce; assert(("break caught by WAITABLE_CRITICAL_BLOCK!" && !fcriticalblockonce)), fcriticalblockonce=false) \ + for (CWaitableCriticalBlock waitablecriticalblock(cs, #cs, __FILE__, __LINE__); fcriticalblockonce; fcriticalblockonce=false) + +#define ENTER_CRITICAL_SECTION(cs) \ + { \ + EnterCritical(#cs, __FILE__, __LINE__, (void*)(&cs)); \ + (cs).lock(); \ + } + +#define LEAVE_CRITICAL_SECTION(cs) \ + { \ + (cs).unlock(); \ + LeaveCritical(); \ + } + #define TRY_CRITICAL_BLOCK(cs) \ - if (CTryCriticalBlock criticalblock = CTryCriticalBlock(cs, #cs, __FILE__, __LINE__)) + for (bool fcriticalblockonce=true; fcriticalblockonce; assert(("break caught by TRY_CRITICAL_BLOCK!" && !fcriticalblockonce)), fcriticalblockonce=false) \ + for (CCriticalBlock criticalblock(cs, #cs, __FILE__, __LINE__, true); fcriticalblockonce && (fcriticalblockonce = criticalblock); fcriticalblockonce=false) + + +// This is exactly like std::string, but with a custom allocator. +// (secure_allocator<> is defined in serialize.h) +typedef std::basic_string, secure_allocator > SecureString; + From 092631f0ba0030dec1ccfa66e206fb2a6c9bf73b Mon Sep 17 00:00:00 2001 From: Pieter Wuille Date: Wed, 4 Apr 2012 16:01:57 +0200 Subject: [PATCH 2/2] Condition variable for outbound connection slots Keep a global counter for nOutbound, protected with its own waitable critical section, and wait when all outbound slots are filled, rather than polling. This removes the (on average) 1 second delay between a lost connection and a new connection attempt, and may speed up shutdowns. --- src/net.cpp | 41 ++++++++++++++++++++++------------------- 1 file changed, 22 insertions(+), 19 deletions(-) diff --git a/src/net.cpp b/src/net.cpp index 37e73c421..2d9cf6ede 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -64,6 +64,9 @@ map mapAlreadyAskedFor; set setservAddNodeAddresses; CCriticalSection cs_setservAddNodeAddresses; +static CWaitableCriticalSection csOutbound; +static int nOutbound = 0; +static CConditionVariable condOutbound; unsigned short GetListenPort() @@ -460,6 +463,8 @@ CNode* ConnectNode(CAddress addrConnect, int64 nTimeout) pnode->AddRef(); CRITICAL_BLOCK(cs_vNodes) vNodes.push_back(pnode); + WAITABLE_CRITICAL_BLOCK(csOutbound) + nOutbound++; pnode->nTimeConnected = GetTime(); return pnode; @@ -610,6 +615,15 @@ void ThreadSocketHandler2(void* parg) // remove from vNodes vNodes.erase(remove(vNodes.begin(), vNodes.end(), pnode), vNodes.end()); + if (!pnode->fInbound) + WAITABLE_CRITICAL_BLOCK(csOutbound) + { + nOutbound--; + + // Connection slot(s) were removed, notify connection creator(s) + NOTIFY(condOutbound); + } + // close socket and cleanup pnode->CloseSocketDisconnect(); pnode->Cleanup(); @@ -1278,8 +1292,6 @@ void ThreadOpenConnections2(void* parg) int64 nStart = GetTime(); loop { - int nOutbound = 0; - vnThreadsRunning[THREAD_OPENCONNECTIONS]--; Sleep(500); vnThreadsRunning[THREAD_OPENCONNECTIONS]++; @@ -1287,23 +1299,13 @@ void ThreadOpenConnections2(void* parg) return; // Limit outbound connections - loop - { - nOutbound = 0; - CRITICAL_BLOCK(cs_vNodes) - BOOST_FOREACH(CNode* pnode, vNodes) - if (!pnode->fInbound) - nOutbound++; - int nMaxOutboundConnections = MAX_OUTBOUND_CONNECTIONS; - nMaxOutboundConnections = min(nMaxOutboundConnections, (int)GetArg("-maxconnections", 125)); - if (nOutbound < nMaxOutboundConnections) - break; - vnThreadsRunning[THREAD_OPENCONNECTIONS]--; - Sleep(2000); - vnThreadsRunning[THREAD_OPENCONNECTIONS]++; - if (fShutdown) - return; - } + int nMaxOutbound = min(MAX_OUTBOUND_CONNECTIONS, (int)GetArg("-maxconnections", 125)); + vnThreadsRunning[THREAD_OPENCONNECTIONS]--; + WAITABLE_CRITICAL_BLOCK(csOutbound) + WAIT(condOutbound, fShutdown || nOutbound < nMaxOutbound); + vnThreadsRunning[THREAD_OPENCONNECTIONS]++; + if (fShutdown) + return; bool fAddSeeds = false; @@ -1752,6 +1754,7 @@ bool StopNode() fShutdown = true; nTransactionsUpdated++; int64 nStart = GetTime(); + NOTIFY_ALL(condOutbound); do { int nThreadsRunning = 0;