Cancel outstanding listen ops for RPC when shutting down

Use Boost's signal2 slot tracking mechanism to cancel any (still open)
listening sockets when receiving a shutdown signal.

Signed-off-by: Giel van Schijndel <me@mortis.eu>
This commit is contained in:
Giel van Schijndel 2012-06-24 13:20:17 +02:00
parent 415a87ef36
commit ad25804feb

View file

@ -2741,7 +2741,6 @@ static void RPCListen(boost::shared_ptr< basic_socket_acceptor<Protocol, SocketA
ssl::context& context, ssl::context& context,
const bool fUseSSL) const bool fUseSSL)
{ {
// Accept connection // Accept connection
AcceptedConnectionImpl<Protocol>* conn = new AcceptedConnectionImpl<Protocol>(acceptor->get_io_service(), context, fUseSSL); AcceptedConnectionImpl<Protocol>* conn = new AcceptedConnectionImpl<Protocol>(acceptor->get_io_service(), context, fUseSSL);
@ -2768,8 +2767,10 @@ static void RPCAcceptHandler(boost::shared_ptr< basic_socket_acceptor<Protocol,
{ {
vnThreadsRunning[THREAD_RPCLISTENER]++; vnThreadsRunning[THREAD_RPCLISTENER]++;
// Immediately start accepting new connections // Immediately start accepting new connections, except when we're canceled or our socket is closed.
RPCListen(acceptor, context, fUseSSL); if (error != error::operation_aborted
&& acceptor->is_open())
RPCListen(acceptor, context, fUseSSL);
AcceptedConnectionImpl<ip::tcp>* tcp_conn = dynamic_cast< AcceptedConnectionImpl<ip::tcp>* >(conn); AcceptedConnectionImpl<ip::tcp>* tcp_conn = dynamic_cast< AcceptedConnectionImpl<ip::tcp>* >(conn);
@ -2833,11 +2834,6 @@ void ThreadRPCServer2(void* parg)
asio::io_service io_service; asio::io_service io_service;
// Make sure that we'll get stopped when the application shuts down
boost::signals2::scoped_connection rpc_listen_thread_stop(
uiInterface.QueueShutdown.connect(boost::bind(
&asio::io_service::stop, &io_service)));
ssl::context context(io_service, ssl::context::sslv23); ssl::context context(io_service, ssl::context::sslv23);
if (fUseSSL) if (fUseSSL)
{ {
@ -2862,21 +2858,24 @@ void ThreadRPCServer2(void* parg)
asio::ip::address bindAddress = loopback ? asio::ip::address_v6::loopback() : asio::ip::address_v6::any(); asio::ip::address bindAddress = loopback ? asio::ip::address_v6::loopback() : asio::ip::address_v6::any();
ip::tcp::endpoint endpoint(bindAddress, GetArg("-rpcport", 8332)); ip::tcp::endpoint endpoint(bindAddress, GetArg("-rpcport", 8332));
std::list< boost::shared_ptr<ip::tcp::acceptor> > acceptors;
try try
{ {
acceptors.push_back(boost::shared_ptr<ip::tcp::acceptor>(new ip::tcp::acceptor(io_service))); boost::shared_ptr<ip::tcp::acceptor> acceptor(new ip::tcp::acceptor(io_service));
acceptors.back()->open(endpoint.protocol()); acceptor->open(endpoint.protocol());
acceptors.back()->set_option(boost::asio::ip::tcp::acceptor::reuse_address(true)); acceptor->set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));
// Try making the socket dual IPv6/IPv4 (if listening on the "any" address) // Try making the socket dual IPv6/IPv4 (if listening on the "any" address)
boost::system::error_code v6_only_error; boost::system::error_code v6_only_error;
acceptors.back()->set_option(boost::asio::ip::v6_only(loopback), v6_only_error); acceptor->set_option(boost::asio::ip::v6_only(loopback), v6_only_error);
acceptors.back()->bind(endpoint); acceptor->bind(endpoint);
acceptors.back()->listen(socket_base::max_connections); acceptor->listen(socket_base::max_connections);
RPCListen(acceptors.back(), context, fUseSSL); RPCListen(acceptor, context, fUseSSL);
// Cancel outstanding listen-requests for this acceptor when shutting down
uiInterface.QueueShutdown.connect(signals2::slot<void ()>(
static_cast<void (ip::tcp::acceptor::*)()>(&ip::tcp::acceptor::close), acceptor.get())
.track(acceptor));
// If dual IPv6/IPv4 failed (or we're opening loopback interfaces only), open IPv4 separately // If dual IPv6/IPv4 failed (or we're opening loopback interfaces only), open IPv4 separately
if (loopback || v6_only_error) if (loopback || v6_only_error)
@ -2884,13 +2883,17 @@ void ThreadRPCServer2(void* parg)
bindAddress = loopback ? asio::ip::address_v4::loopback() : asio::ip::address_v4::any(); bindAddress = loopback ? asio::ip::address_v4::loopback() : asio::ip::address_v4::any();
endpoint.address(bindAddress); endpoint.address(bindAddress);
acceptors.push_back(boost::shared_ptr<ip::tcp::acceptor>(new ip::tcp::acceptor(io_service))); acceptor.reset(new ip::tcp::acceptor(io_service));
acceptors.back()->open(endpoint.protocol()); acceptor->open(endpoint.protocol());
acceptors.back()->set_option(boost::asio::ip::tcp::acceptor::reuse_address(true)); acceptor->set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));
acceptors.back()->bind(endpoint); acceptor->bind(endpoint);
acceptors.back()->listen(socket_base::max_connections); acceptor->listen(socket_base::max_connections);
RPCListen(acceptors.back(), context, fUseSSL); RPCListen(acceptor, context, fUseSSL);
// Cancel outstanding listen-requests for this acceptor when shutting down
uiInterface.QueueShutdown.connect(signals2::slot<void ()>(
static_cast<void (ip::tcp::acceptor::*)()>(&ip::tcp::acceptor::close), acceptor.get())
.track(acceptor));
} }
} }
catch(boost::system::system_error &e) catch(boost::system::system_error &e)
@ -2904,14 +2907,6 @@ void ThreadRPCServer2(void* parg)
vnThreadsRunning[THREAD_RPCLISTENER]--; vnThreadsRunning[THREAD_RPCLISTENER]--;
io_service.run(); io_service.run();
vnThreadsRunning[THREAD_RPCLISTENER]++; vnThreadsRunning[THREAD_RPCLISTENER]++;
// Terminate all outstanding accept-requests
BOOST_FOREACH(boost::shared_ptr<ip::tcp::acceptor>& acceptor, acceptors)
{
acceptor->cancel();
acceptor->close();
}
acceptors.clear();
} }
void ThreadRPCServer3(void* parg) void ThreadRPCServer3(void* parg)