12519bf62b
Make a copy of the boost time-point to wait for, otherwise the head of the queue may be deleted by another thread while this one is waiting, while the boost function still has a reference to it. Although this problem is in non-test code, this is not an actual problem outside of the tests because we use the thread scheduler with only one service thread, so there will never be threads fighting at the head of the queue. The old boost fallback escapes this problem because it passes a scalar value to wait_until instead of a const object reference. Found by running the tests in LLVM-4.0-master asan.
133 lines
4.2 KiB
C++
133 lines
4.2 KiB
C++
// Copyright (c) 2015 The Bitcoin Core developers
|
|
// Distributed under the MIT software license, see the accompanying
|
|
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
|
|
|
|
#include "scheduler.h"
|
|
|
|
#include "reverselock.h"
|
|
|
|
#include <assert.h>
|
|
#include <boost/bind.hpp>
|
|
#include <utility>
|
|
|
|
CScheduler::CScheduler() : nThreadsServicingQueue(0), stopRequested(false), stopWhenEmpty(false)
|
|
{
|
|
}
|
|
|
|
CScheduler::~CScheduler()
|
|
{
|
|
assert(nThreadsServicingQueue == 0);
|
|
}
|
|
|
|
|
|
#if BOOST_VERSION < 105000
|
|
static boost::system_time toPosixTime(const boost::chrono::system_clock::time_point& t)
|
|
{
|
|
return boost::posix_time::from_time_t(boost::chrono::system_clock::to_time_t(t));
|
|
}
|
|
#endif
|
|
|
|
void CScheduler::serviceQueue()
|
|
{
|
|
boost::unique_lock<boost::mutex> lock(newTaskMutex);
|
|
++nThreadsServicingQueue;
|
|
|
|
// newTaskMutex is locked throughout this loop EXCEPT
|
|
// when the thread is waiting or when the user's function
|
|
// is called.
|
|
while (!shouldStop()) {
|
|
try {
|
|
while (!shouldStop() && taskQueue.empty()) {
|
|
// Wait until there is something to do.
|
|
newTaskScheduled.wait(lock);
|
|
}
|
|
|
|
// Wait until either there is a new task, or until
|
|
// the time of the first item on the queue:
|
|
|
|
// wait_until needs boost 1.50 or later; older versions have timed_wait:
|
|
#if BOOST_VERSION < 105000
|
|
while (!shouldStop() && !taskQueue.empty() &&
|
|
newTaskScheduled.timed_wait(lock, toPosixTime(taskQueue.begin()->first))) {
|
|
// Keep waiting until timeout
|
|
}
|
|
#else
|
|
// Some boost versions have a conflicting overload of wait_until that returns void.
|
|
// Explicitly use a template here to avoid hitting that overload.
|
|
while (!shouldStop() && !taskQueue.empty()) {
|
|
boost::chrono::system_clock::time_point timeToWaitFor = taskQueue.begin()->first;
|
|
if (newTaskScheduled.wait_until<>(lock, timeToWaitFor) == boost::cv_status::timeout)
|
|
break; // Exit loop after timeout, it means we reached the time of the event
|
|
}
|
|
#endif
|
|
// If there are multiple threads, the queue can empty while we're waiting (another
|
|
// thread may service the task we were waiting on).
|
|
if (shouldStop() || taskQueue.empty())
|
|
continue;
|
|
|
|
Function f = taskQueue.begin()->second;
|
|
taskQueue.erase(taskQueue.begin());
|
|
|
|
{
|
|
// Unlock before calling f, so it can reschedule itself or another task
|
|
// without deadlocking:
|
|
reverse_lock<boost::unique_lock<boost::mutex> > rlock(lock);
|
|
f();
|
|
}
|
|
} catch (...) {
|
|
--nThreadsServicingQueue;
|
|
throw;
|
|
}
|
|
}
|
|
--nThreadsServicingQueue;
|
|
newTaskScheduled.notify_one();
|
|
}
|
|
|
|
void CScheduler::stop(bool drain)
|
|
{
|
|
{
|
|
boost::unique_lock<boost::mutex> lock(newTaskMutex);
|
|
if (drain)
|
|
stopWhenEmpty = true;
|
|
else
|
|
stopRequested = true;
|
|
}
|
|
newTaskScheduled.notify_all();
|
|
}
|
|
|
|
void CScheduler::schedule(CScheduler::Function f, boost::chrono::system_clock::time_point t)
|
|
{
|
|
{
|
|
boost::unique_lock<boost::mutex> lock(newTaskMutex);
|
|
taskQueue.insert(std::make_pair(t, f));
|
|
}
|
|
newTaskScheduled.notify_one();
|
|
}
|
|
|
|
void CScheduler::scheduleFromNow(CScheduler::Function f, int64_t deltaSeconds)
|
|
{
|
|
schedule(f, boost::chrono::system_clock::now() + boost::chrono::seconds(deltaSeconds));
|
|
}
|
|
|
|
static void Repeat(CScheduler* s, CScheduler::Function f, int64_t deltaSeconds)
|
|
{
|
|
f();
|
|
s->scheduleFromNow(boost::bind(&Repeat, s, f, deltaSeconds), deltaSeconds);
|
|
}
|
|
|
|
void CScheduler::scheduleEvery(CScheduler::Function f, int64_t deltaSeconds)
|
|
{
|
|
scheduleFromNow(boost::bind(&Repeat, this, f, deltaSeconds), deltaSeconds);
|
|
}
|
|
|
|
size_t CScheduler::getQueueInfo(boost::chrono::system_clock::time_point &first,
|
|
boost::chrono::system_clock::time_point &last) const
|
|
{
|
|
boost::unique_lock<boost::mutex> lock(newTaskMutex);
|
|
size_t result = taskQueue.size();
|
|
if (!taskQueue.empty()) {
|
|
first = taskQueue.begin()->first;
|
|
last = taskQueue.rbegin()->first;
|
|
}
|
|
return result;
|
|
}
|