Clean up shutdown process

This commit is contained in:
Gavin Andresen 2013-03-09 12:02:57 -05:00
parent 21eb5adadb
commit b31499ec72
15 changed files with 147 additions and 178 deletions

View file

@ -1221,13 +1221,14 @@ int CommandLineRPC(int argc, char *argv[])
strPrint = write_string(result, true);
}
}
catch (std::exception& e)
{
catch (boost::thread_interrupted) {
throw;
}
catch (std::exception& e) {
strPrint = string("error: ") + e.what();
nRet = 87;
}
catch (...)
{
catch (...) {
PrintException(NULL, "CommandLineRPC()");
}
@ -1265,6 +1266,9 @@ int main(int argc, char *argv[])
return CommandLineRPC(argc, argv);
}
}
catch (boost::thread_interrupted) {
throw;
}
catch (std::exception& e) {
PrintException(&e, "main()");
} catch (...) {

View file

@ -62,8 +62,7 @@ bool CDBEnv::Open(const boost::filesystem::path& path)
if (fDbEnvInit)
return true;
if (fShutdown)
return false;
boost::this_thread::interruption_point();
strPath = path.string();
filesystem::path pathLogDir = path / "database";
@ -108,8 +107,7 @@ void CDBEnv::MakeMock()
if (fDbEnvInit)
throw runtime_error("CDBEnv::MakeMock(): already initialized");
if (fShutdown)
throw runtime_error("CDBEnv::MakeMock(): during shutdown");
boost::this_thread::interruption_point();
printf("CDBEnv::MakeMock()\n");
@ -327,7 +325,7 @@ bool CDBEnv::RemoveDb(const string& strFile)
bool CDB::Rewrite(const string& strFile, const char* pszSkip)
{
while (!fShutdown)
while (true)
{
{
LOCK(bitdb.cs_db);

View file

@ -24,7 +24,7 @@ class CWalletTx;
extern unsigned int nWalletDBUpdated;
void ThreadFlushWalletDB(void* parg);
void ThreadFlushWalletDB(const std::string& strWalletFile);
bool BackupWallet(const CWallet& wallet, const std::string& strDest);

View file

@ -40,75 +40,65 @@ enum BindFlags {
// Shutdown
//
void ExitTimeout(void* parg)
{
#ifdef WIN32
MilliSleep(5000);
ExitProcess(0);
#endif
}
//
// Thread management and startup/shutdown:
//
// The network-processing threads are all part of a thread group
// created by AppInit() or the Qt main() function.
//
// A clean exit happens when StartShutdown() or the SIGTERM
// signal handler sets fRequestShutdown, which triggers
// the DetectShutdownThread(), which interrupts the main thread group.
// DetectShutdownThread() then exits, which causes AppInit() to
// continue (it .joins the shutdown thread).
// Shutdown() is then
// called to clean up database connections, and stop other
// threads that should only be stopped after the main network-processing
// threads have exited.
//
// Note that if running -daemon the parent process returns from AppInit2
// before adding any threads to the threadGroup, so .join_all() returns
// immediately and the parent exits from main().
//
// Shutdown for Qt is very similar, only it uses a QTimer to detect
// fRequestShutdown getting set (either by RPC stop or SIGTERM)
// and then does the normal Qt shutdown thing.
//
volatile bool fRequestShutdown = false;
void StartShutdown()
{
#ifdef QT_GUI
// ensure we leave the Qt main loop for a clean GUI exit (Shutdown() is called in bitcoin.cpp afterwards)
uiInterface.QueueShutdown();
#else
// Without UI, Shutdown() can simply be started in a new thread
NewThread(Shutdown, NULL);
#endif
fRequestShutdown = true;
}
static CCoinsViewDB *pcoinsdbview;
void Shutdown(void* parg)
void Shutdown()
{
static CCriticalSection cs_Shutdown;
static bool fTaken;
TRY_LOCK(cs_Shutdown, lockShutdown);
if (!lockShutdown) return;
// Make this thread recognisable as the shutdown thread
RenameThread("bitcoin-shutoff");
nTransactionsUpdated++;
StopRPCThreads();
bitdb.Flush(false);
StopNode();
{
fShutdown = true;
fRequestShutdown = true;
nTransactionsUpdated++;
StopRPCThreads();
bitdb.Flush(false);
StopNode();
{
LOCK(cs_main);
if (pblocktree)
pblocktree->Flush();
if (pcoinsTip)
pcoinsTip->Flush();
delete pcoinsTip;
delete pcoinsdbview;
delete pblocktree;
delete pcoinsTip; pcoinsTip = NULL;
delete pcoinsdbview; pcoinsdbview = NULL;
delete pblocktree; pblocktree = NULL;
}
bitdb.Flush(true);
boost::filesystem::remove(GetPidFile());
UnregisterWallet(pwalletMain);
delete pwalletMain;
NewThread(ExitTimeout, NULL);
MilliSleep(50);
printf("Bitcoin exited\n\n");
fExit = true;
#ifndef QT_GUI
// ensure non-UI client gets exited here, but let Bitcoin-Qt reach 'return 0;' in bitcoin.cpp
exit(0);
#endif
}
else
{
while (!fExit)
MilliSleep(500);
MilliSleep(100);
ExitThread(0);
}
}
//
@ -116,9 +106,13 @@ void Shutdown(void* parg)
//
void DetectShutdownThread(boost::thread_group* threadGroup)
{
while (fRequestShutdown == false)
// Tell the main threads to shutdown.
while (!fRequestShutdown)
{
MilliSleep(200);
if (fRequestShutdown)
threadGroup->interrupt_all();
}
}
void HandleSIGTERM(int)
@ -143,6 +137,8 @@ void HandleSIGHUP(int)
bool AppInit(int argc, char* argv[])
{
boost::thread_group threadGroup;
boost::thread* detectShutdownThread = NULL;
bool fRet = false;
try
{
@ -154,7 +150,7 @@ bool AppInit(int argc, char* argv[])
if (!boost::filesystem::is_directory(GetDataDir(false)))
{
fprintf(stderr, "Error: Specified directory does not exist\n");
Shutdown(NULL);
Shutdown();
}
ReadConfigFile(mapArgs, mapMultiArgs);
@ -184,7 +180,31 @@ bool AppInit(int argc, char* argv[])
int ret = CommandLineRPC(argc, argv);
exit(ret);
}
#if !defined(WIN32)
fDaemon = GetBoolArg("-daemon");
if (fDaemon)
{
// Daemonize
pid_t pid = fork();
if (pid < 0)
{
fprintf(stderr, "Error: fork() returned %d errno %d\n", pid, errno);
return false;
}
if (pid > 0) // Parent process, pid is child process id
{
CreatePidFile(GetPidFile(), pid);
return true;
}
// Child process falls through to rest of initialization
pid_t sid = setsid();
if (sid < 0)
fprintf(stderr, "Error: setsid() returned %d errno %d\n", sid, errno);
}
#endif
detectShutdownThread = new boost::thread(boost::bind(&DetectShutdownThread, &threadGroup));
fRet = AppInit2(threadGroup);
}
catch (std::exception& e) {
@ -192,12 +212,20 @@ bool AppInit(int argc, char* argv[])
} catch (...) {
PrintExceptionContinue(NULL, "AppInit()");
}
if (!fRet)
{
Shutdown(NULL);
if (!fRet) {
if (detectShutdownThread)
detectShutdownThread->interrupt();
threadGroup.interrupt_all();
threadGroup.join_all();
}
if (detectShutdownThread)
{
detectShutdownThread->join();
delete detectShutdownThread;
detectShutdownThread = NULL;
}
Shutdown();
return fRet;
}
@ -214,7 +242,7 @@ int main(int argc, char* argv[])
if (fRet && fDaemon)
return 0;
return 1;
return (fRet ? 0 : 1);
}
#endif
@ -302,7 +330,7 @@ std::string HelpMessage()
" -rpcport=<port> " + _("Listen for JSON-RPC connections on <port> (default: 8332 or testnet: 18332)") + "\n" +
" -rpcallowip=<ip> " + _("Allow JSON-RPC connections from specified IP address") + "\n" +
" -rpcconnect=<ip> " + _("Send commands to node running on <ip> (default: 127.0.0.1)") + "\n" +
" -rpcthreads=<n> " + _("Use this mean threads to service RPC calls (default: 4)") + "\n" +
" -rpcthreads=<n> " + _("Use this many threads to service RPC calls (default: 4)") + "\n" +
" -blocknotify=<cmd> " + _("Execute command when the best block changes (%s in cmd is replaced by block hash)") + "\n" +
" -walletnotify=<cmd> " + _("Execute command when a wallet transaction changes (%s in cmd is replaced by TxID)") + "\n" +
" -alertnotify=<cmd> " + _("Execute command when a relevant alert is received (%s in cmd is replaced by message)") + "\n" +
@ -440,8 +468,6 @@ bool AppInit2(boost::thread_group& threadGroup)
sigaction(SIGHUP, &sa_hup, NULL);
#endif
threadGroup.create_thread(boost::bind(&DetectShutdownThread, &threadGroup));
// ********************************************************* Step 2: parameter interactions
fTestNet = GetBoolArg("-testnet");
@ -499,12 +525,6 @@ bool AppInit2(boost::thread_group& threadGroup)
else
fDebugNet = GetBoolArg("-debugnet");
#if !defined(WIN32) && !defined(QT_GUI)
fDaemon = GetBoolArg("-daemon");
#else
fDaemon = false;
#endif
if (fDaemon)
fServer = true;
else
@ -552,28 +572,6 @@ bool AppInit2(boost::thread_group& threadGroup)
if (!lock.try_lock())
return InitError(strprintf(_("Cannot obtain a lock on data directory %s. Bitcoin is probably already running."), strDataDir.c_str()));
#if !defined(WIN32) && !defined(QT_GUI)
if (fDaemon)
{
// Daemonize
pid_t pid = fork();
if (pid < 0)
{
fprintf(stderr, "Error: fork() returned %d errno %d\n", pid, errno);
return false;
}
if (pid > 0)
{
CreatePidFile(GetPidFile(), pid);
return true;
}
pid_t sid = setsid();
if (sid < 0)
fprintf(stderr, "Error: setsid() returned %d errno %d\n", sid, errno);
}
#endif
if (GetBoolArg("-shrinkdebugfile", !fDebug))
ShrinkDebugFile();
printf("\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n");
@ -1011,8 +1009,7 @@ bool AppInit2(boost::thread_group& threadGroup)
printf("mapWallet.size() = %"PRIszu"\n", pwalletMain->mapWallet.size());
printf("mapAddressBook.size() = %"PRIszu"\n", pwalletMain->mapAddressBook.size());
if (!NewThread(StartNode, (void*)&threadGroup))
InitError(_("Error: could not start node"));
StartNode(threadGroup);
if (fServer)
StartRPCThreads();
@ -1030,12 +1027,8 @@ bool AppInit2(boost::thread_group& threadGroup)
// Add wallet transactions that aren't already in a block to mapTransactions
pwalletMain->ReacceptWalletTransactions();
#if !defined(QT_GUI)
// Loop until process is exit()ed from shutdown() function,
// called from ThreadRPCServer thread when a "stop" command is received.
while (1)
MilliSleep(5000);
#endif
// Run a thread to flush wallet periodically
threadGroup.create_thread(boost::bind(&ThreadFlushWalletDB, boost::ref(pwalletMain->strWalletFile)));
return true;
return !fRequestShutdown;
}

View file

@ -7,11 +7,10 @@
#include "wallet.h"
class boost::thread_group;
extern CWallet* pwalletMain;
void StartShutdown();
void Shutdown(void* parg);
void Shutdown();
bool AppInit2(boost::thread_group& threadGroup);
std::string HelpMessage();

View file

@ -1256,8 +1256,7 @@ bool ConnectBestBlock(CValidationState &state) {
if (pindexTest->pprev == NULL || pindexTest->pnext != NULL) {
reverse(vAttach.begin(), vAttach.end());
BOOST_FOREACH(CBlockIndex *pindexSwitch, vAttach) {
if (fRequestShutdown)
break;
boost::this_thread::interruption_point();
try {
if (!SetBestChain(state, pindexSwitch))
return false;
@ -2457,7 +2456,6 @@ uint256 CPartialMerkleTree::ExtractMatches(std::vector<uint256> &vMatch) {
bool AbortNode(const std::string &strMessage) {
fRequestShutdown = true;
strMiscWarning = strMessage;
printf("*** %s\n", strMessage.c_str());
uiInterface.ThreadSafeMessageBox(strMessage, "", CClientUIInterface::MSG_ERROR);
@ -2536,8 +2534,7 @@ bool static LoadBlockIndexDB()
if (!pblocktree->LoadBlockIndexGuts())
return false;
if (fRequestShutdown)
return true;
boost::this_thread::interruption_point();
// Calculate bnChainWork
vector<pair<int, CBlockIndex*> > vSortedByHeight;
@ -2617,7 +2614,8 @@ bool VerifyDB() {
CValidationState state;
for (CBlockIndex* pindex = pindexBest; pindex && pindex->pprev; pindex = pindex->pprev)
{
if (fRequestShutdown || pindex->nHeight < nBestHeight-nCheckDepth)
boost::this_thread::interruption_point();
if (pindex->nHeight < nBestHeight-nCheckDepth)
break;
CBlock block;
// check level 0: read from disk
@ -2654,7 +2652,8 @@ bool VerifyDB() {
// check level 4: try reconnecting blocks
if (nCheckLevel >= 4) {
CBlockIndex *pindex = pindexState;
while (pindex != pindexBest && !fRequestShutdown) {
while (pindex != pindexBest) {
boost::this_thread::interruption_point();
pindex = pindex->pnext;
CBlock block;
if (!block.ReadFromDisk(pindex))
@ -3038,8 +3037,7 @@ void static ProcessGetData(CNode* pfrom)
const CInv &inv = *it;
{
if (fShutdown)
break;
boost::this_thread::interruption_point();
it++;
if (inv.type == MSG_BLOCK || inv.type == MSG_FILTERED_BLOCK)
@ -3297,8 +3295,8 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv)
int64 nSince = nNow - 10 * 60;
BOOST_FOREACH(CAddress& addr, vAddr)
{
if (fShutdown)
return true;
boost::this_thread::interruption_point();
if (addr.nTime <= 100000000 || addr.nTime > nNow + 10 * 60)
addr.nTime = nNow - 5 * 24 * 60 * 60;
pfrom->AddAddressKnown(addr);
@ -3366,8 +3364,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv)
{
const CInv &inv = vInv[nInv];
if (fShutdown)
return true;
boost::this_thread::interruption_point();
pfrom->AddInventoryKnown(inv);
bool fAlreadyHave = AlreadyHave(inv);
@ -3799,8 +3796,7 @@ bool ProcessMessages(CNode* pfrom)
LOCK(cs_main);
fRet = ProcessMessage(pfrom, strCommand, vRecv);
}
if (fShutdown)
break;
boost::this_thread::interruption_point();
}
catch (std::ios_base::failure& e)
{
@ -3819,6 +3815,9 @@ bool ProcessMessages(CNode* pfrom)
PrintExceptionContinue(&e, "ProcessMessages()");
}
}
catch (boost::thread_interrupted) {
throw;
}
catch (std::exception& e) {
PrintExceptionContinue(&e, "ProcessMessages()");
} catch (...) {

View file

@ -45,7 +45,6 @@ static bool vfReachable[NET_MAX] = {};
static bool vfLimited[NET_MAX] = {};
static CNode* pnodeLocalHost = NULL;
uint64 nLocalHostNonce = 0;
array<int, THREAD_MAX> vnThreadsRunning;
static std::vector<SOCKET> vhListenSocket;
CAddrMan addrman;
@ -147,8 +146,7 @@ bool RecvLine(SOCKET hSocket, string& strLine)
}
else if (nBytes <= 0)
{
if (fShutdown)
return false;
boost::this_thread::interruption_point();
if (nBytes < 0)
{
int nErr = WSAGetLastError();
@ -1775,10 +1773,8 @@ void static Discover()
NewThread(ThreadGetMyExternalIP, NULL);
}
void StartNode(void* parg)
void StartNode(boost::thread_group& threadGroup)
{
boost::thread_group* threadGroup = (boost::thread_group*)parg;
// Make this thread recognisable as the startup thread
RenameThread("bitcoin-start");
@ -1800,25 +1796,27 @@ void StartNode(void* parg)
if (!GetBoolArg("-dnsseed", true))
printf("DNS seeding disabled\n");
else
threadGroup->create_thread(boost::bind(&TraceThread<boost::function<void()> >, "dnsseed", &ThreadDNSAddressSeed));
threadGroup.create_thread(boost::bind(&TraceThread<boost::function<void()> >, "dnsseed", &ThreadDNSAddressSeed));
#ifdef USE_UPNP
// Map ports with UPnP
MapPort(GetBoolArg("-upnp", USE_UPNP));
#endif
// Send and receive from sockets, accept connections
threadGroup->create_thread(boost::bind(&TraceThread<void (*)()>, "net", &ThreadSocketHandler));
threadGroup.create_thread(boost::bind(&TraceThread<void (*)()>, "net", &ThreadSocketHandler));
// Initiate outbound connections from -addnode
threadGroup->create_thread(boost::bind(&TraceThread<void (*)()>, "addcon", &ThreadOpenAddedConnections));
threadGroup.create_thread(boost::bind(&TraceThread<void (*)()>, "addcon", &ThreadOpenAddedConnections));
// Initiate outbound connections
threadGroup->create_thread(boost::bind(&TraceThread<void (*)()>, "opencon", &ThreadOpenConnections));
threadGroup.create_thread(boost::bind(&TraceThread<void (*)()>, "opencon", &ThreadOpenConnections));
// Process messages
threadGroup->create_thread(boost::bind(&TraceThread<void (*)()>, "msghand", &ThreadMessageHandler));
threadGroup.create_thread(boost::bind(&TraceThread<void (*)()>, "msghand", &ThreadMessageHandler));
// Dump network addresses
threadGroup->create_thread(boost::bind(&LoopForever<void (*)()>, "dumpaddr", &DumpAddresses, 10000));
threadGroup.create_thread(boost::bind(&LoopForever<void (*)()>, "dumpaddr", &DumpAddresses, 10000));
}
bool StopNode()
@ -1826,23 +1824,10 @@ bool StopNode()
printf("StopNode()\n");
GenerateBitcoins(false, NULL);
MapPort(false);
fShutdown = true;
nTransactionsUpdated++;
int64 nStart = GetTime();
if (semOutbound)
for (int i=0; i<MAX_OUTBOUND_CONNECTIONS; i++)
semOutbound->post();
do
{
int nThreadsRunning = 0;
for (int n = 0; n < THREAD_MAX; n++)
nThreadsRunning += vnThreadsRunning[n];
if (nThreadsRunning == 0)
break;
if (GetTime() - nStart > 20)
break;
MilliSleep(20);
} while(true);
MilliSleep(50);
DumpAddresses();

View file

@ -40,7 +40,7 @@ CNode* ConnectNode(CAddress addrConnect, const char *strDest = NULL, int64 nTime
void MapPort(bool fUseUPnP);
unsigned short GetListenPort();
bool BindListenPort(const CService &bindAddr, std::string& strError=REF(std::string()));
void StartNode(void* parg);
void StartNode(boost::thread_group& threadGroup);
bool StopNode();
void SocketSendData(CNode *pnode);
@ -69,16 +69,9 @@ void SetReachable(enum Network net, bool fFlag = true);
CAddress GetLocalAddress(const CNetAddr *paddrPeer = NULL);
/** Thread types */
enum threadId
{
THREAD_MAX
};
extern bool fDiscover;
extern uint64 nLocalServices;
extern uint64 nLocalHostNonce;
extern boost::array<int, THREAD_MAX> vnThreadsRunning;
extern CAddrMan addrman;
extern std::vector<CNode*> vNodes;

View file

@ -260,9 +260,9 @@ int main(int argc, char *argv[])
guiref = 0;
}
// Shutdown the core and its threads, but don't exit Bitcoin-Qt here
Shutdown(NULL);
threadGroup.interrupt_all();
threadGroup.join_all();
Shutdown();
}
else
{

View file

@ -115,12 +115,13 @@ bool CCoinsViewDB::GetStats(CCoinsStats &stats) {
pcursor->SeekToFirst();
while (pcursor->Valid()) {
boost::this_thread::interruption_point();
try {
leveldb::Slice slKey = pcursor->key();
CDataStream ssKey(slKey.data(), slKey.data()+slKey.size(), SER_DISK, CLIENT_VERSION);
char chType;
ssKey >> chType;
if (chType == 'c' && !fRequestShutdown) {
if (chType == 'c') {
leveldb::Slice slValue = pcursor->value();
CDataStream ssValue(slValue.data(), slValue.data()+slValue.size(), SER_DISK, CLIENT_VERSION);
CCoins coins;
@ -178,12 +179,13 @@ bool CBlockTreeDB::LoadBlockIndexGuts()
// Load mapBlockIndex
while (pcursor->Valid()) {
boost::this_thread::interruption_point();
try {
leveldb::Slice slKey = pcursor->key();
CDataStream ssKey(slKey.data(), slKey.data()+slKey.size(), SER_DISK, CLIENT_VERSION);
char chType;
ssKey >> chType;
if (chType == 'b' && !fRequestShutdown) {
if (chType == 'b') {
leveldb::Slice slValue = pcursor->value();
CDataStream ssValue(slValue.data(), slValue.data()+slValue.size(), SER_DISK, CLIENT_VERSION);
CDiskBlockIndex diskindex;

View file

@ -73,8 +73,6 @@ bool fDebug = false;
bool fDebugNet = false;
bool fPrintToConsole = false;
bool fPrintToDebugger = false;
volatile bool fRequestShutdown = false;
bool fShutdown = false;
bool fDaemon = false;
bool fServer = false;
bool fCommandLine = false;

View file

@ -134,8 +134,6 @@ extern bool fDebug;
extern bool fDebugNet;
extern bool fPrintToConsole;
extern bool fPrintToDebugger;
extern volatile bool fRequestShutdown;
extern bool fShutdown;
extern bool fDaemon;
extern bool fServer;
extern bool fCommandLine;

View file

@ -1383,7 +1383,6 @@ DBErrors CWallet::LoadWallet(bool& fFirstRunRet)
return nLoadWalletRet;
fFirstRunRet = !vchDefaultKey.IsValid();
NewThread(ThreadFlushWalletDB, &strWalletFile);
return DB_LOAD_OK;
}

View file

@ -327,7 +327,6 @@ public:
~CReserveKey()
{
if (!fShutdown)
ReturnKey();
}

View file

@ -451,8 +451,10 @@ DBErrors CWalletDB::LoadWallet(CWallet* pwallet)
}
pcursor->close();
}
catch (...)
{
catch (boost::thread_interrupted) {
throw;
}
catch (...) {
result = DB_CORRUPT;
}
@ -482,12 +484,11 @@ DBErrors CWalletDB::LoadWallet(CWallet* pwallet)
return result;
}
void ThreadFlushWalletDB(void* parg)
void ThreadFlushWalletDB(const string& strFile)
{
// Make this thread recognisable as the wallet flushing thread
RenameThread("bitcoin-wallet");
const string& strFile = ((const string*)parg)[0];
static bool fOneThread;
if (fOneThread)
return;
@ -498,7 +499,7 @@ void ThreadFlushWalletDB(void* parg)
unsigned int nLastSeen = nWalletDBUpdated;
unsigned int nLastFlushed = nWalletDBUpdated;
int64 nLastWalletUpdate = GetTime();
while (!fShutdown)
while (true)
{
MilliSleep(500);
@ -522,8 +523,9 @@ void ThreadFlushWalletDB(void* parg)
mi++;
}
if (nRefCount == 0 && !fShutdown)
if (nRefCount == 0)
{
boost::this_thread::interruption_point();
map<string, int>::iterator mi = bitdb.mapFileUseCount.find(strFile);
if (mi != bitdb.mapFileUseCount.end())
{
@ -548,7 +550,7 @@ bool BackupWallet(const CWallet& wallet, const string& strDest)
{
if (!wallet.fFileBacked)
return false;
while (!fShutdown)
while (true)
{
{
LOCK(bitdb.cs_db);