Merge pull request #1033 from sipa/wait

Condition variables instead of polling
This commit is contained in:
Pieter Wuille 2012-04-06 04:11:14 -07:00
commit 9362da78b0
3 changed files with 146 additions and 136 deletions

View file

@ -64,6 +64,9 @@ map<CInv, int64> mapAlreadyAskedFor;
set<CNetAddr> setservAddNodeAddresses; set<CNetAddr> setservAddNodeAddresses;
CCriticalSection cs_setservAddNodeAddresses; CCriticalSection cs_setservAddNodeAddresses;
static CWaitableCriticalSection csOutbound;
static int nOutbound = 0;
static CConditionVariable condOutbound;
unsigned short GetListenPort() unsigned short GetListenPort()
@ -361,6 +364,8 @@ CNode* ConnectNode(CAddress addrConnect, int64 nTimeout)
pnode->AddRef(); pnode->AddRef();
CRITICAL_BLOCK(cs_vNodes) CRITICAL_BLOCK(cs_vNodes)
vNodes.push_back(pnode); vNodes.push_back(pnode);
WAITABLE_CRITICAL_BLOCK(csOutbound)
nOutbound++;
pnode->nTimeConnected = GetTime(); pnode->nTimeConnected = GetTime();
return pnode; return pnode;
@ -504,6 +509,15 @@ void ThreadSocketHandler2(void* parg)
// remove from vNodes // remove from vNodes
vNodes.erase(remove(vNodes.begin(), vNodes.end(), pnode), vNodes.end()); vNodes.erase(remove(vNodes.begin(), vNodes.end(), pnode), vNodes.end());
if (!pnode->fInbound)
WAITABLE_CRITICAL_BLOCK(csOutbound)
{
nOutbound--;
// Connection slot(s) were removed, notify connection creator(s)
NOTIFY(condOutbound);
}
// close socket and cleanup // close socket and cleanup
pnode->CloseSocketDisconnect(); pnode->CloseSocketDisconnect();
pnode->Cleanup(); pnode->Cleanup();
@ -1172,8 +1186,6 @@ void ThreadOpenConnections2(void* parg)
int64 nStart = GetTime(); int64 nStart = GetTime();
loop loop
{ {
int nOutbound = 0;
vnThreadsRunning[THREAD_OPENCONNECTIONS]--; vnThreadsRunning[THREAD_OPENCONNECTIONS]--;
Sleep(500); Sleep(500);
vnThreadsRunning[THREAD_OPENCONNECTIONS]++; vnThreadsRunning[THREAD_OPENCONNECTIONS]++;
@ -1181,23 +1193,13 @@ void ThreadOpenConnections2(void* parg)
return; return;
// Limit outbound connections // Limit outbound connections
loop int nMaxOutbound = min(MAX_OUTBOUND_CONNECTIONS, (int)GetArg("-maxconnections", 125));
{ vnThreadsRunning[THREAD_OPENCONNECTIONS]--;
nOutbound = 0; WAITABLE_CRITICAL_BLOCK(csOutbound)
CRITICAL_BLOCK(cs_vNodes) WAIT(condOutbound, fShutdown || nOutbound < nMaxOutbound);
BOOST_FOREACH(CNode* pnode, vNodes) vnThreadsRunning[THREAD_OPENCONNECTIONS]++;
if (!pnode->fInbound) if (fShutdown)
nOutbound++; return;
int nMaxOutboundConnections = MAX_OUTBOUND_CONNECTIONS;
nMaxOutboundConnections = min(nMaxOutboundConnections, (int)GetArg("-maxconnections", 125));
if (nOutbound < nMaxOutboundConnections)
break;
vnThreadsRunning[THREAD_OPENCONNECTIONS]--;
Sleep(2000);
vnThreadsRunning[THREAD_OPENCONNECTIONS]++;
if (fShutdown)
return;
}
bool fAddSeeds = false; bool fAddSeeds = false;
@ -1646,6 +1648,7 @@ bool StopNode()
fShutdown = true; fShutdown = true;
nTransactionsUpdated++; nTransactionsUpdated++;
int64 nStart = GetTime(); int64 nStart = GetTime();
NOTIFY_ALL(condOutbound);
do do
{ {
int nThreadsRunning = 0; int nThreadsRunning = 0;

View file

@ -1183,62 +1183,14 @@ static void pop_lock()
dd_mutex.unlock(); dd_mutex.unlock();
} }
void CCriticalSection::Enter(const char* pszName, const char* pszFile, int nLine) void EnterCritical(const char* pszName, const char* pszFile, int nLine, void* cs)
{ {
push_lock(this, CLockLocation(pszName, pszFile, nLine)); push_lock(cs, CLockLocation(pszName, pszFile, nLine));
#ifdef DEBUG_LOCKCONTENTION
bool result = mutex.try_lock();
if (!result)
{
printf("LOCKCONTENTION: %s\n", pszName);
printf("Locker: %s:%d\n", pszFile, nLine);
mutex.lock();
printf("Locked\n");
}
#else
mutex.lock();
#endif
} }
void CCriticalSection::Leave()
void LeaveCritical()
{ {
mutex.unlock();
pop_lock(); pop_lock();
} }
bool CCriticalSection::TryEnter(const char* pszName, const char* pszFile, int nLine)
{
push_lock(this, CLockLocation(pszName, pszFile, nLine));
bool result = mutex.try_lock();
if (!result) pop_lock();
return result;
}
#else
void CCriticalSection::Enter(const char* pszName, const char* pszFile, int nLine)
{
#ifdef DEBUG_LOCKCONTENTION
bool result = mutex.try_lock();
if (!result)
{
printf("LOCKCONTENTION: %s\n", pszName);
printf("Locker: %s:%d\n", pszFile, nLine);
mutex.lock();
}
#else
mutex.lock();
#endif
}
void CCriticalSection::Leave()
{
mutex.unlock();
}
bool CCriticalSection::TryEnter(const char*, const char*, int)
{
bool result = mutex.try_lock();
return result;
}
#endif /* DEBUG_LOCKORDER */ #endif /* DEBUG_LOCKORDER */

View file

@ -20,6 +20,9 @@ typedef int pid_t; /* define for windows compatiblity */
#include <boost/thread.hpp> #include <boost/thread.hpp>
#include <boost/interprocess/sync/interprocess_recursive_mutex.hpp> #include <boost/interprocess/sync/interprocess_recursive_mutex.hpp>
#include <boost/interprocess/sync/scoped_lock.hpp>
#include <boost/interprocess/sync/interprocess_condition.hpp>
#include <boost/interprocess/sync/lock_options.hpp>
#include <boost/date_time/gregorian/gregorian_types.hpp> #include <boost/date_time/gregorian/gregorian_types.hpp>
#include <boost/date_time/posix_time/posix_time_types.hpp> #include <boost/date_time/posix_time/posix_time_types.hpp>
@ -180,82 +183,134 @@ void AddTimeData(const CNetAddr& ip, int64 nTime);
/** Wrapped boost mutex: supports recursive locking, but no waiting */
typedef boost::interprocess::interprocess_recursive_mutex CCriticalSection;
/** Wrapper to automatically initialize mutex. */ /** Wrapped boost mutex: supports waiting but not recursive locking */
class CCriticalSection typedef boost::interprocess::interprocess_mutex CWaitableCriticalSection;
#ifdef DEBUG_LOCKORDER
void EnterCritical(const char* pszName, const char* pszFile, int nLine, void* cs);
void LeaveCritical();
#else
void static inline EnterCritical(const char* pszName, const char* pszFile, int nLine, void* cs) {}
void static inline LeaveCritical() {}
#endif
/** Wrapper around boost::interprocess::scoped_lock */
template<typename Mutex>
class CMutexLock
{ {
protected: private:
boost::interprocess::interprocess_recursive_mutex mutex; boost::interprocess::scoped_lock<Mutex> lock;
public: public:
explicit CCriticalSection() { }
~CCriticalSection() { }
void Enter(const char* pszName, const char* pszFile, int nLine);
void Leave();
bool TryEnter(const char* pszName, const char* pszFile, int nLine);
};
/** RAII object that acquires mutex. Needed for exception safety. */ void Enter(const char* pszName, const char* pszFile, int nLine)
class CCriticalBlock
{
protected:
CCriticalSection* pcs;
public:
CCriticalBlock(CCriticalSection& csIn, const char* pszName, const char* pszFile, int nLine)
{ {
pcs = &csIn; if (!lock.owns())
pcs->Enter(pszName, pszFile, nLine);
}
operator bool() const
{
return true;
}
~CCriticalBlock()
{
pcs->Leave();
}
};
#define CRITICAL_BLOCK(cs) \
if (CCriticalBlock criticalblock = CCriticalBlock(cs, #cs, __FILE__, __LINE__))
#define ENTER_CRITICAL_SECTION(cs) \
(cs).Enter(#cs, __FILE__, __LINE__)
#define LEAVE_CRITICAL_SECTION(cs) \
(cs).Leave()
/** RAII object that tries to acquire mutex. Needed for exception safety. */
class CTryCriticalBlock
{
protected:
CCriticalSection* pcs;
public:
CTryCriticalBlock(CCriticalSection& csIn, const char* pszName, const char* pszFile, int nLine)
{
pcs = (csIn.TryEnter(pszName, pszFile, nLine) ? &csIn : NULL);
}
operator bool() const
{
return Entered();
}
~CTryCriticalBlock()
{
if (pcs)
{ {
pcs->Leave(); EnterCritical(pszName, pszFile, nLine, (void*)(lock.mutex()));
#ifdef DEBUG_LOCKCONTENTION
if (!lock.try_lock())
{
printf("LOCKCONTENTION: %s\n", pszName);
printf("Locker: %s:%d\n", pszFile, nLine);
}
#endif
lock.lock();
} }
} }
bool Entered() const { return pcs != NULL; }
void Leave()
{
if (lock.owns())
{
lock.unlock();
LeaveCritical();
}
}
bool TryEnter(const char* pszName, const char* pszFile, int nLine)
{
if (!lock.owns())
{
EnterCritical(pszName, pszFile, nLine, (void*)(lock.mutex()));
lock.try_lock();
if (!lock.owns())
LeaveCritical();
}
return lock.owns();
}
CMutexLock(Mutex& mutexIn, const char* pszName, const char* pszFile, int nLine, bool fTry = false) : lock(mutexIn, boost::interprocess::defer_lock)
{
if (fTry)
TryEnter(pszName, pszFile, nLine);
else
Enter(pszName, pszFile, nLine);
}
~CMutexLock()
{
if (lock.owns())
LeaveCritical();
}
operator bool()
{
return lock.owns();
}
boost::interprocess::scoped_lock<Mutex> &GetLock()
{
return lock;
}
}; };
typedef CMutexLock<CCriticalSection> CCriticalBlock;
typedef CMutexLock<CWaitableCriticalSection> CWaitableCriticalBlock;
typedef boost::interprocess::interprocess_condition CConditionVariable;
/** Wait for a given condition inside a WAITABLE_CRITICAL_BLOCK */
#define WAIT(name,condition) \
do { while(!(condition)) { (name).wait(waitablecriticalblock.GetLock()); } } while(0)
/** Notify waiting threads that a condition may hold now */
#define NOTIFY(name) \
do { (name).notify_one(); } while(0)
#define NOTIFY_ALL(name) \
do { (name).notify_all(); } while(0)
#define CRITICAL_BLOCK(cs) \
for (bool fcriticalblockonce=true; fcriticalblockonce; assert(("break caught by CRITICAL_BLOCK!" && !fcriticalblockonce)), fcriticalblockonce=false) \
for (CCriticalBlock criticalblock(cs, #cs, __FILE__, __LINE__); fcriticalblockonce; fcriticalblockonce=false)
#define WAITABLE_CRITICAL_BLOCK(cs) \
for (bool fcriticalblockonce=true; fcriticalblockonce; assert(("break caught by WAITABLE_CRITICAL_BLOCK!" && !fcriticalblockonce)), fcriticalblockonce=false) \
for (CWaitableCriticalBlock waitablecriticalblock(cs, #cs, __FILE__, __LINE__); fcriticalblockonce; fcriticalblockonce=false)
#define ENTER_CRITICAL_SECTION(cs) \
{ \
EnterCritical(#cs, __FILE__, __LINE__, (void*)(&cs)); \
(cs).lock(); \
}
#define LEAVE_CRITICAL_SECTION(cs) \
{ \
(cs).unlock(); \
LeaveCritical(); \
}
#define TRY_CRITICAL_BLOCK(cs) \ #define TRY_CRITICAL_BLOCK(cs) \
if (CTryCriticalBlock criticalblock = CTryCriticalBlock(cs, #cs, __FILE__, __LINE__)) for (bool fcriticalblockonce=true; fcriticalblockonce; assert(("break caught by TRY_CRITICAL_BLOCK!" && !fcriticalblockonce)), fcriticalblockonce=false) \
for (CCriticalBlock criticalblock(cs, #cs, __FILE__, __LINE__, true); fcriticalblockonce && (fcriticalblockonce = criticalblock); fcriticalblockonce=false)
// This is exactly like std::string, but with a custom allocator.
// (secure_allocator<> is defined in serialize.h)
typedef std::basic_string<char, std::char_traits<char>, secure_allocator<char> > SecureString;