Support more than one CScheduler thread for serial clients
This will be used by CValidationInterface soon. This requires a bit of work as we need to ensure that most of our callbacks happen in-order (to avoid synchronization issues in wallet) - we keep our own internal queue and push things onto it, scheduling a queue-draining function immediately upon new callbacks.
This commit is contained in:
parent
2fbf2dbe15
commit
08096bbbc6
4 changed files with 90 additions and 10 deletions
|
@ -139,3 +139,55 @@ size_t CScheduler::getQueueInfo(boost::chrono::system_clock::time_point &first,
|
|||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
|
||||
void SingleThreadedSchedulerClient::MaybeScheduleProcessQueue() {
|
||||
{
|
||||
LOCK(m_cs_callbacks_pending);
|
||||
// Try to avoid scheduling too many copies here, but if we
|
||||
// accidentally have two ProcessQueue's scheduled at once its
|
||||
// not a big deal.
|
||||
if (m_are_callbacks_running) return;
|
||||
if (m_callbacks_pending.empty()) return;
|
||||
}
|
||||
m_pscheduler->schedule(std::bind(&SingleThreadedSchedulerClient::ProcessQueue, this));
|
||||
}
|
||||
|
||||
void SingleThreadedSchedulerClient::ProcessQueue() {
|
||||
std::function<void (void)> callback;
|
||||
{
|
||||
LOCK(m_cs_callbacks_pending);
|
||||
if (m_are_callbacks_running) return;
|
||||
if (m_callbacks_pending.empty()) return;
|
||||
m_are_callbacks_running = true;
|
||||
|
||||
callback = std::move(m_callbacks_pending.front());
|
||||
m_callbacks_pending.pop_front();
|
||||
}
|
||||
|
||||
// RAII the setting of fCallbacksRunning and calling MaybeScheduleProcessQueue
|
||||
// to ensure both happen safely even if callback() throws.
|
||||
struct RAIICallbacksRunning {
|
||||
SingleThreadedSchedulerClient* instance;
|
||||
RAIICallbacksRunning(SingleThreadedSchedulerClient* _instance) : instance(_instance) {}
|
||||
~RAIICallbacksRunning() {
|
||||
{
|
||||
LOCK(instance->m_cs_callbacks_pending);
|
||||
instance->m_are_callbacks_running = false;
|
||||
}
|
||||
instance->MaybeScheduleProcessQueue();
|
||||
}
|
||||
} raiicallbacksrunning(this);
|
||||
|
||||
callback();
|
||||
}
|
||||
|
||||
void SingleThreadedSchedulerClient::AddToProcessQueue(std::function<void (void)> func) {
|
||||
assert(m_pscheduler);
|
||||
|
||||
{
|
||||
LOCK(m_cs_callbacks_pending);
|
||||
m_callbacks_pending.emplace_back(std::move(func));
|
||||
}
|
||||
MaybeScheduleProcessQueue();
|
||||
}
|
||||
|
|
|
@ -14,6 +14,8 @@
|
|||
#include <boost/thread.hpp>
|
||||
#include <map>
|
||||
|
||||
#include "sync.h"
|
||||
|
||||
//
|
||||
// Simple class for background tasks that should be run
|
||||
// periodically or once "after a while"
|
||||
|
@ -79,4 +81,26 @@ private:
|
|||
bool shouldStop() { return stopRequested || (stopWhenEmpty && taskQueue.empty()); }
|
||||
};
|
||||
|
||||
/**
|
||||
* Class used by CScheduler clients which may schedule multiple jobs
|
||||
* which are required to be run serially. Does not require such jobs
|
||||
* to be executed on the same thread, but no two jobs will be executed
|
||||
* at the same time.
|
||||
*/
|
||||
class SingleThreadedSchedulerClient {
|
||||
private:
|
||||
CScheduler *m_pscheduler;
|
||||
|
||||
CCriticalSection m_cs_callbacks_pending;
|
||||
std::list<std::function<void (void)>> m_callbacks_pending;
|
||||
bool m_are_callbacks_running = false;
|
||||
|
||||
void MaybeScheduleProcessQueue();
|
||||
void ProcessQueue();
|
||||
|
||||
public:
|
||||
SingleThreadedSchedulerClient(CScheduler *pschedulerIn) : m_pscheduler(pschedulerIn) {}
|
||||
void AddToProcessQueue(std::function<void (void)> func);
|
||||
};
|
||||
|
||||
#endif
|
||||
|
|
|
@ -6,6 +6,11 @@
|
|||
#include "validationinterface.h"
|
||||
#include "init.h"
|
||||
#include "scheduler.h"
|
||||
#include "sync.h"
|
||||
#include "util.h"
|
||||
|
||||
#include <list>
|
||||
#include <atomic>
|
||||
|
||||
#include <boost/signals2/signal.hpp>
|
||||
|
||||
|
@ -20,22 +25,23 @@ struct MainSignalsInstance {
|
|||
boost::signals2::signal<void (const CBlock&, const CValidationState&)> BlockChecked;
|
||||
boost::signals2::signal<void (const CBlockIndex *, const std::shared_ptr<const CBlock>&)> NewPoWValidBlock;
|
||||
|
||||
CScheduler *m_scheduler = NULL;
|
||||
// We are not allowed to assume the scheduler only runs in one thread,
|
||||
// but must ensure all callbacks happen in-order, so we end up creating
|
||||
// our own queue here :(
|
||||
SingleThreadedSchedulerClient m_schedulerClient;
|
||||
|
||||
MainSignalsInstance(CScheduler *pscheduler) : m_schedulerClient(pscheduler) {}
|
||||
};
|
||||
|
||||
static CMainSignals g_signals;
|
||||
|
||||
CMainSignals::CMainSignals() {
|
||||
m_internals.reset(new MainSignalsInstance());
|
||||
}
|
||||
|
||||
void CMainSignals::RegisterBackgroundSignalScheduler(CScheduler& scheduler) {
|
||||
assert(!m_internals->m_scheduler);
|
||||
m_internals->m_scheduler = &scheduler;
|
||||
assert(!m_internals);
|
||||
m_internals.reset(new MainSignalsInstance(&scheduler));
|
||||
}
|
||||
|
||||
void CMainSignals::UnregisterBackgroundSignalScheduler() {
|
||||
m_internals->m_scheduler = NULL;
|
||||
m_internals.reset(nullptr);
|
||||
}
|
||||
|
||||
CMainSignals& GetMainSignals()
|
||||
|
|
|
@ -75,8 +75,6 @@ private:
|
|||
friend void ::UnregisterAllValidationInterfaces();
|
||||
|
||||
public:
|
||||
CMainSignals();
|
||||
|
||||
/** Register a CScheduler to give callbacks which should run in the background (may only be called once) */
|
||||
void RegisterBackgroundSignalScheduler(CScheduler& scheduler);
|
||||
/** Unregister a CScheduler to give callbacks which should run in the background - these callbacks will now be dropped! */
|
||||
|
|
Loading…
Add table
Reference in a new issue