Merge #9605: Use CScheduler for wallet flushing, remove ThreadFlushWalletDB

0235be1 Rename FlushWalletDB -> CompactWalletDB, add function description (Matt Corallo)
735d9b5 Use CScheduler for wallet flushing, remove ThreadFlushWalletDB (Matt Corallo)
73296f5 CScheduler boost->std::function, use millisecs for times, not secs (Matt Corallo)

Tree-SHA512: c04f97beab65706c444c126be229d02887df9b0972d8fb15ca1f779ef0e628cf7ecef2bf533c650d9b44645b63e01de22f17266a05907e778938d64cc6e19de6
This commit is contained in:
Wladimir J. van der Laan 2017-03-07 11:00:46 +01:00
commit 779f2f9747
No known key found for this signature in database
GPG key ID: 74810B012346C9A6
8 changed files with 46 additions and 50 deletions

View file

@ -1639,7 +1639,7 @@ bool AppInitMain(boost::thread_group& threadGroup, CScheduler& scheduler)
#ifdef ENABLE_WALLET #ifdef ENABLE_WALLET
if (pwalletMain) if (pwalletMain)
pwalletMain->postInitProcess(threadGroup); pwalletMain->postInitProcess(scheduler);
#endif #endif
return !fRequestShutdown; return !fRequestShutdown;

View file

@ -2288,7 +2288,7 @@ bool CConnman::Start(CScheduler& scheduler, std::string& strNodeError, Options c
threadMessageHandler = std::thread(&TraceThread<std::function<void()> >, "msghand", std::function<void()>(std::bind(&CConnman::ThreadMessageHandler, this))); threadMessageHandler = std::thread(&TraceThread<std::function<void()> >, "msghand", std::function<void()>(std::bind(&CConnman::ThreadMessageHandler, this)));
// Dump network addresses // Dump network addresses
scheduler.scheduleEvery(boost::bind(&CConnman::DumpData, this), DUMP_ADDRESSES_INTERVAL); scheduler.scheduleEvery(std::bind(&CConnman::DumpData, this), DUMP_ADDRESSES_INTERVAL * 1000);
return true; return true;
} }

View file

@ -104,20 +104,20 @@ void CScheduler::schedule(CScheduler::Function f, boost::chrono::system_clock::t
newTaskScheduled.notify_one(); newTaskScheduled.notify_one();
} }
void CScheduler::scheduleFromNow(CScheduler::Function f, int64_t deltaSeconds) void CScheduler::scheduleFromNow(CScheduler::Function f, int64_t deltaMilliSeconds)
{ {
schedule(f, boost::chrono::system_clock::now() + boost::chrono::seconds(deltaSeconds)); schedule(f, boost::chrono::system_clock::now() + boost::chrono::milliseconds(deltaMilliSeconds));
} }
static void Repeat(CScheduler* s, CScheduler::Function f, int64_t deltaSeconds) static void Repeat(CScheduler* s, CScheduler::Function f, int64_t deltaMilliSeconds)
{ {
f(); f();
s->scheduleFromNow(boost::bind(&Repeat, s, f, deltaSeconds), deltaSeconds); s->scheduleFromNow(boost::bind(&Repeat, s, f, deltaMilliSeconds), deltaMilliSeconds);
} }
void CScheduler::scheduleEvery(CScheduler::Function f, int64_t deltaSeconds) void CScheduler::scheduleEvery(CScheduler::Function f, int64_t deltaMilliSeconds)
{ {
scheduleFromNow(boost::bind(&Repeat, this, f, deltaSeconds), deltaSeconds); scheduleFromNow(boost::bind(&Repeat, this, f, deltaMilliSeconds), deltaMilliSeconds);
} }
size_t CScheduler::getQueueInfo(boost::chrono::system_clock::time_point &first, size_t CScheduler::getQueueInfo(boost::chrono::system_clock::time_point &first,

View file

@ -10,7 +10,6 @@
// boost::thread / boost::function / boost::chrono should be ported to // boost::thread / boost::function / boost::chrono should be ported to
// std::thread / std::function / std::chrono when we support C++11. // std::thread / std::function / std::chrono when we support C++11.
// //
#include <boost/function.hpp>
#include <boost/chrono/chrono.hpp> #include <boost/chrono/chrono.hpp>
#include <boost/thread.hpp> #include <boost/thread.hpp>
#include <map> #include <map>
@ -23,7 +22,7 @@
// //
// CScheduler* s = new CScheduler(); // CScheduler* s = new CScheduler();
// s->scheduleFromNow(doSomething, 11); // Assuming a: void doSomething() { } // s->scheduleFromNow(doSomething, 11); // Assuming a: void doSomething() { }
// s->scheduleFromNow(boost::bind(Class::func, this, argument), 3); // s->scheduleFromNow(std::bind(Class::func, this, argument), 3);
// boost::thread* t = new boost::thread(boost::bind(CScheduler::serviceQueue, s)); // boost::thread* t = new boost::thread(boost::bind(CScheduler::serviceQueue, s));
// //
// ... then at program shutdown, clean up the thread running serviceQueue: // ... then at program shutdown, clean up the thread running serviceQueue:
@ -39,20 +38,20 @@ public:
CScheduler(); CScheduler();
~CScheduler(); ~CScheduler();
typedef boost::function<void(void)> Function; typedef std::function<void(void)> Function;
// Call func at/after time t // Call func at/after time t
void schedule(Function f, boost::chrono::system_clock::time_point t); void schedule(Function f, boost::chrono::system_clock::time_point t);
// Convenience method: call f once deltaSeconds from now // Convenience method: call f once deltaSeconds from now
void scheduleFromNow(Function f, int64_t deltaSeconds); void scheduleFromNow(Function f, int64_t deltaMilliSeconds);
// Another convenience method: call f approximately // Another convenience method: call f approximately
// every deltaSeconds forever, starting deltaSeconds from now. // every deltaSeconds forever, starting deltaSeconds from now.
// To be more precise: every time f is finished, it // To be more precise: every time f is finished, it
// is rescheduled to run deltaSeconds later. If you // is rescheduled to run deltaSeconds later. If you
// need more accurate scheduling, don't use this method. // need more accurate scheduling, don't use this method.
void scheduleEvery(Function f, int64_t deltaSeconds); void scheduleEvery(Function f, int64_t deltaMilliSeconds);
// To keep things as simple as possible, there is no unschedule. // To keep things as simple as possible, there is no unschedule.

View file

@ -20,6 +20,7 @@
#include "primitives/transaction.h" #include "primitives/transaction.h"
#include "script/script.h" #include "script/script.h"
#include "script/sign.h" #include "script/sign.h"
#include "scheduler.h"
#include "timedata.h" #include "timedata.h"
#include "txmempool.h" #include "txmempool.h"
#include "util.h" #include "util.h"
@ -3754,17 +3755,17 @@ bool CWallet::InitLoadWallet()
return true; return true;
} }
std::atomic<bool> CWallet::fFlushThreadRunning(false); std::atomic<bool> CWallet::fFlushScheduled(false);
void CWallet::postInitProcess(boost::thread_group& threadGroup) void CWallet::postInitProcess(CScheduler& scheduler)
{ {
// Add wallet transactions that aren't already in a block to mempool // Add wallet transactions that aren't already in a block to mempool
// Do this here as mempool requires genesis block to be loaded // Do this here as mempool requires genesis block to be loaded
ReacceptWalletTransactions(); ReacceptWalletTransactions();
// Run a thread to flush wallet periodically // Run a thread to flush wallet periodically
if (!CWallet::fFlushThreadRunning.exchange(true)) { if (!CWallet::fFlushScheduled.exchange(true)) {
threadGroup.create_thread(ThreadFlushWalletDB); scheduler.scheduleEvery(MaybeCompactWalletDB, 500);
} }
} }

View file

@ -29,7 +29,6 @@
#include <vector> #include <vector>
#include <boost/shared_ptr.hpp> #include <boost/shared_ptr.hpp>
#include <boost/thread.hpp>
extern CWallet* pwalletMain; extern CWallet* pwalletMain;
@ -79,6 +78,7 @@ class CCoinControl;
class COutput; class COutput;
class CReserveKey; class CReserveKey;
class CScript; class CScript;
class CScheduler;
class CTxMemPool; class CTxMemPool;
class CWalletTx; class CWalletTx;
@ -593,7 +593,7 @@ private:
class CWallet : public CCryptoKeyStore, public CValidationInterface class CWallet : public CCryptoKeyStore, public CValidationInterface
{ {
private: private:
static std::atomic<bool> fFlushThreadRunning; static std::atomic<bool> fFlushScheduled;
/** /**
* Select a set of coins such that nValueRet >= nTargetValue and at least * Select a set of coins such that nValueRet >= nTargetValue and at least
@ -1001,7 +1001,7 @@ public:
* Wallet post-init setup * Wallet post-init setup
* Gives the wallet a chance to register repetitive tasks and complete post-init tasks * Gives the wallet a chance to register repetitive tasks and complete post-init tasks
*/ */
void postInitProcess(boost::thread_group& threadGroup); void postInitProcess(CScheduler& scheduler);
/* Wallets parameter interaction */ /* Wallets parameter interaction */
static bool ParameterInteraction(); static bool ParameterInteraction();

View file

@ -777,38 +777,33 @@ DBErrors CWalletDB::ZapWalletTx(vector<CWalletTx>& vWtx)
return DB_LOAD_OK; return DB_LOAD_OK;
} }
void ThreadFlushWalletDB() void MaybeCompactWalletDB()
{ {
// Make this thread recognisable as the wallet flushing thread static std::atomic<bool> fOneThread;
RenameThread("bitcoin-wallet"); if (fOneThread.exchange(true)) {
static bool fOneThread;
if (fOneThread)
return; return;
fOneThread = true;
if (!GetBoolArg("-flushwallet", DEFAULT_FLUSHWALLET))
return;
unsigned int nLastSeen = CWalletDB::GetUpdateCounter();
unsigned int nLastFlushed = CWalletDB::GetUpdateCounter();
int64_t nLastWalletUpdate = GetTime();
while (true)
{
MilliSleep(500);
if (nLastSeen != CWalletDB::GetUpdateCounter())
{
nLastSeen = CWalletDB::GetUpdateCounter();
nLastWalletUpdate = GetTime();
}
if (nLastFlushed != CWalletDB::GetUpdateCounter() && GetTime() - nLastWalletUpdate >= 2)
{
const std::string& strFile = pwalletMain->strWalletFile;
if (CDB::PeriodicFlush(strFile))
nLastFlushed = CWalletDB::GetUpdateCounter();
}
} }
if (!GetBoolArg("-flushwallet", DEFAULT_FLUSHWALLET)) {
return;
}
static unsigned int nLastSeen = CWalletDB::GetUpdateCounter();
static unsigned int nLastFlushed = CWalletDB::GetUpdateCounter();
static int64_t nLastWalletUpdate = GetTime();
if (nLastSeen != CWalletDB::GetUpdateCounter())
{
nLastSeen = CWalletDB::GetUpdateCounter();
nLastWalletUpdate = GetTime();
}
if (nLastFlushed != CWalletDB::GetUpdateCounter() && GetTime() - nLastWalletUpdate >= 2)
{
const std::string& strFile = pwalletMain->strWalletFile;
if (CDB::PeriodicFlush(strFile))
nLastFlushed = CWalletDB::GetUpdateCounter();
}
fOneThread = false;
} }
// //

View file

@ -193,6 +193,7 @@ private:
void operator=(const CWalletDB&); void operator=(const CWalletDB&);
}; };
void ThreadFlushWalletDB(); //! Compacts BDB state so that wallet.dat is self-contained (if there are changes)
void MaybeCompactWalletDB();
#endif // BITCOIN_WALLET_WALLETDB_H #endif // BITCOIN_WALLET_WALLETDB_H