Merge pull request #1101 from jgarzik/http11

Multithreaded JSON-RPC with HTTP 1.1 Keep-Alive support
This commit is contained in:
Jeff Garzik 2012-05-11 09:57:08 -07:00
commit b34c5f3c0f
4 changed files with 102 additions and 39 deletions

View file

@ -46,6 +46,8 @@ extern Value importprivkey(const Array& params, bool fHelp);
const Object emptyobj; const Object emptyobj;
void ThreadRPCServer3(void* parg);
Object JSONRPCError(int code, const string& message) Object JSONRPCError(int code, const string& message)
{ {
Object error; Object error;
@ -2021,7 +2023,7 @@ Value getwork(const Array& params, bool fHelp)
throw JSONRPCError(-10, "Bitcoin is downloading blocks..."); throw JSONRPCError(-10, "Bitcoin is downloading blocks...");
typedef map<uint256, pair<CBlock*, CScript> > mapNewBlock_t; typedef map<uint256, pair<CBlock*, CScript> > mapNewBlock_t;
static mapNewBlock_t mapNewBlock; static mapNewBlock_t mapNewBlock; // FIXME: thread safety
static vector<CBlock*> vNewBlock; static vector<CBlock*> vNewBlock;
static CReserveKey reservekey(pwalletMain); static CReserveKey reservekey(pwalletMain);
@ -2355,7 +2357,7 @@ string rfc1123Time()
return string(buffer); return string(buffer);
} }
static string HTTPReply(int nStatus, const string& strMsg) static string HTTPReply(int nStatus, const string& strMsg, bool keepalive)
{ {
if (nStatus == 401) if (nStatus == 401)
return strprintf("HTTP/1.0 401 Authorization Required\r\n" return strprintf("HTTP/1.0 401 Authorization Required\r\n"
@ -2384,7 +2386,7 @@ static string HTTPReply(int nStatus, const string& strMsg)
return strprintf( return strprintf(
"HTTP/1.1 %d %s\r\n" "HTTP/1.1 %d %s\r\n"
"Date: %s\r\n" "Date: %s\r\n"
"Connection: close\r\n" "Connection: %s\r\n"
"Content-Length: %d\r\n" "Content-Length: %d\r\n"
"Content-Type: application/json\r\n" "Content-Type: application/json\r\n"
"Server: bitcoin-json-rpc/%s\r\n" "Server: bitcoin-json-rpc/%s\r\n"
@ -2393,12 +2395,13 @@ static string HTTPReply(int nStatus, const string& strMsg)
nStatus, nStatus,
cStatus, cStatus,
rfc1123Time().c_str(), rfc1123Time().c_str(),
keepalive ? "keep-alive" : "close",
strMsg.size(), strMsg.size(),
FormatFullVersion().c_str(), FormatFullVersion().c_str(),
strMsg.c_str()); strMsg.c_str());
} }
int ReadHTTPStatus(std::basic_istream<char>& stream) int ReadHTTPStatus(std::basic_istream<char>& stream, int &proto)
{ {
string str; string str;
getline(stream, str); getline(stream, str);
@ -2406,6 +2409,10 @@ int ReadHTTPStatus(std::basic_istream<char>& stream)
boost::split(vWords, str, boost::is_any_of(" ")); boost::split(vWords, str, boost::is_any_of(" "));
if (vWords.size() < 2) if (vWords.size() < 2)
return 500; return 500;
proto = 0;
const char *ver = strstr(str.c_str(), "HTTP/1.");
if (ver != NULL)
proto = atoi(ver+7);
return atoi(vWords[1].c_str()); return atoi(vWords[1].c_str());
} }
@ -2440,7 +2447,8 @@ int ReadHTTP(std::basic_istream<char>& stream, map<string, string>& mapHeadersRe
strMessageRet = ""; strMessageRet = "";
// Read status // Read status
int nStatus = ReadHTTPStatus(stream); int nProto;
int nStatus = ReadHTTPStatus(stream, nProto);
// Read header // Read header
int nLen = ReadHTTPHeader(stream, mapHeadersRet); int nLen = ReadHTTPHeader(stream, mapHeadersRet);
@ -2455,6 +2463,16 @@ int ReadHTTP(std::basic_istream<char>& stream, map<string, string>& mapHeadersRe
strMessageRet = string(vch.begin(), vch.end()); strMessageRet = string(vch.begin(), vch.end());
} }
string sConHdr = mapHeadersRet["connection"];
if ((sConHdr != "close") && (sConHdr != "keep-alive"))
{
if (nProto >= 1)
mapHeadersRet["connection"] = "keep-alive";
else
mapHeadersRet["connection"] = "close";
}
return nStatus; return nStatus;
} }
@ -2507,7 +2525,7 @@ void ErrorReply(std::ostream& stream, const Object& objError, const Value& id)
if (code == -32600) nStatus = 400; if (code == -32600) nStatus = 400;
else if (code == -32601) nStatus = 404; else if (code == -32601) nStatus = 404;
string strReply = JSONRPCReply(Value::null, objError, id); string strReply = JSONRPCReply(Value::null, objError, id);
stream << HTTPReply(nStatus, strReply) << std::flush; stream << HTTPReply(nStatus, strReply, false) << std::flush;
} }
bool ClientAllowed(const string& strAddress) bool ClientAllowed(const string& strAddress)
@ -2573,20 +2591,34 @@ private:
SSLStream& stream; SSLStream& stream;
}; };
class AcceptedConnection
{
public:
SSLStream sslStream;
SSLIOStreamDevice d;
iostreams::stream<SSLIOStreamDevice> stream;
ip::tcp::endpoint peer;
AcceptedConnection(asio::io_service &io_service, ssl::context &context,
bool fUseSSL) : sslStream(io_service, context), d(sslStream, fUseSSL),
stream(d) { ; }
};
void ThreadRPCServer(void* parg) void ThreadRPCServer(void* parg)
{ {
IMPLEMENT_RANDOMIZE_STACK(ThreadRPCServer(parg)); IMPLEMENT_RANDOMIZE_STACK(ThreadRPCServer(parg));
try try
{ {
vnThreadsRunning[THREAD_RPCSERVER]++; vnThreadsRunning[THREAD_RPCLISTENER]++;
ThreadRPCServer2(parg); ThreadRPCServer2(parg);
vnThreadsRunning[THREAD_RPCSERVER]--; vnThreadsRunning[THREAD_RPCLISTENER]--;
} }
catch (std::exception& e) { catch (std::exception& e) {
vnThreadsRunning[THREAD_RPCSERVER]--; vnThreadsRunning[THREAD_RPCLISTENER]--;
PrintException(&e, "ThreadRPCServer()"); PrintException(&e, "ThreadRPCServer()");
} catch (...) { } catch (...) {
vnThreadsRunning[THREAD_RPCSERVER]--; vnThreadsRunning[THREAD_RPCLISTENER]--;
PrintException(NULL, "ThreadRPCServer()"); PrintException(NULL, "ThreadRPCServer()");
} }
printf("ThreadRPCServer exiting\n"); printf("ThreadRPCServer exiting\n");
@ -2664,55 +2696,78 @@ void ThreadRPCServer2(void* parg)
loop loop
{ {
// Accept connection // Accept connection
SSLStream sslStream(io_service, context); AcceptedConnection *conn =
SSLIOStreamDevice d(sslStream, fUseSSL); new AcceptedConnection(io_service, context, fUseSSL);
iostreams::stream<SSLIOStreamDevice> stream(d);
vnThreadsRunning[THREAD_RPCLISTENER]--;
acceptor.accept(conn->sslStream.lowest_layer(), conn->peer);
vnThreadsRunning[THREAD_RPCLISTENER]++;
ip::tcp::endpoint peer;
vnThreadsRunning[THREAD_RPCSERVER]--;
acceptor.accept(sslStream.lowest_layer(), peer);
vnThreadsRunning[4]++;
if (fShutdown) if (fShutdown)
{
delete conn;
return; return;
}
// Restrict callers by IP // Restrict callers by IP. It is important to
if (!ClientAllowed(peer.address().to_string())) // do this before starting client thread, to filter out
// certain DoS and misbehaving clients.
if (!ClientAllowed(conn->peer.address().to_string()))
{ {
// Only send a 403 if we're not using SSL to prevent a DoS during the SSL handshake. // Only send a 403 if we're not using SSL to prevent a DoS during the SSL handshake.
if (!fUseSSL) if (!fUseSSL)
stream << HTTPReply(403, "") << std::flush; conn->stream << HTTPReply(403, "", false) << std::flush;
continue; delete conn;
} }
// start HTTP client thread
else if (!CreateThread(ThreadRPCServer3, conn)) {
printf("Failed to create RPC server client thread\n");
delete conn;
}
}
}
void ThreadRPCServer3(void* parg)
{
IMPLEMENT_RANDOMIZE_STACK(ThreadRPCServer3(parg));
vnThreadsRunning[THREAD_RPCHANDLER]++;
AcceptedConnection *conn = (AcceptedConnection *) parg;
bool fRun = true;
loop {
if (fShutdown || !fRun)
{
conn->stream.close();
delete conn;
--vnThreadsRunning[THREAD_RPCHANDLER];
return;
}
map<string, string> mapHeaders; map<string, string> mapHeaders;
string strRequest; string strRequest;
boost::thread api_caller(ReadHTTP, boost::ref(stream), boost::ref(mapHeaders), boost::ref(strRequest)); ReadHTTP(conn->stream, mapHeaders, strRequest);
if (!api_caller.timed_join(boost::posix_time::seconds(GetArg("-rpctimeout", 30))))
{ // Timed out:
acceptor.cancel();
printf("ThreadRPCServer ReadHTTP timeout\n");
continue;
}
// Check authorization // Check authorization
if (mapHeaders.count("authorization") == 0) if (mapHeaders.count("authorization") == 0)
{ {
stream << HTTPReply(401, "") << std::flush; conn->stream << HTTPReply(401, "", false) << std::flush;
continue; break;
} }
if (!HTTPAuthorized(mapHeaders)) if (!HTTPAuthorized(mapHeaders))
{ {
printf("ThreadRPCServer incorrect password attempt from %s\n",peer.address().to_string().c_str()); printf("ThreadRPCServer incorrect password attempt from %s\n", conn->peer.address().to_string().c_str());
/* Deter brute-forcing short passwords. /* Deter brute-forcing short passwords.
If this results in a DOS the user really If this results in a DOS the user really
shouldn't have their RPC port exposed.*/ shouldn't have their RPC port exposed.*/
if (mapArgs["-rpcpassword"].size() < 20) if (mapArgs["-rpcpassword"].size() < 20)
Sleep(250); Sleep(250);
stream << HTTPReply(401, "") << std::flush; conn->stream << HTTPReply(401, "", false) << std::flush;
continue; break;
} }
if (mapHeaders["connection"] == "close")
fRun = false;
Value id = Value::null; Value id = Value::null;
try try
@ -2750,17 +2805,22 @@ void ThreadRPCServer2(void* parg)
// Send reply // Send reply
string strReply = JSONRPCReply(result, Value::null, id); string strReply = JSONRPCReply(result, Value::null, id);
stream << HTTPReply(200, strReply) << std::flush; conn->stream << HTTPReply(200, strReply, fRun) << std::flush;
} }
catch (Object& objError) catch (Object& objError)
{ {
ErrorReply(stream, objError, id); ErrorReply(conn->stream, objError, id);
break;
} }
catch (std::exception& e) catch (std::exception& e)
{ {
ErrorReply(stream, JSONRPCError(-32700, e.what()), id); ErrorReply(conn->stream, JSONRPCError(-32700, e.what()), id);
break;
} }
} }
delete conn;
vnThreadsRunning[THREAD_RPCHANDLER]--;
} }
json_spirit::Value CRPCTable::execute(const std::string &strMethod, const json_spirit::Array &params) const json_spirit::Value CRPCTable::execute(const std::string &strMethod, const json_spirit::Array &params) const

View file

@ -9,6 +9,7 @@
#include <string> #include <string>
#include <map> #include <map>
#define BOOST_SPIRIT_THREADSAFE
#include "json/json_spirit_reader_template.h" #include "json/json_spirit_reader_template.h"
#include "json/json_spirit_writer_template.h" #include "json/json_spirit_writer_template.h"
#include "json/json_spirit_utils.h" #include "json/json_spirit_utils.h"

View file

@ -1839,12 +1839,13 @@ bool StopNode()
if (vnThreadsRunning[THREAD_OPENCONNECTIONS] > 0) printf("ThreadOpenConnections still running\n"); if (vnThreadsRunning[THREAD_OPENCONNECTIONS] > 0) printf("ThreadOpenConnections still running\n");
if (vnThreadsRunning[THREAD_MESSAGEHANDLER] > 0) printf("ThreadMessageHandler still running\n"); if (vnThreadsRunning[THREAD_MESSAGEHANDLER] > 0) printf("ThreadMessageHandler still running\n");
if (vnThreadsRunning[THREAD_MINER] > 0) printf("ThreadBitcoinMiner still running\n"); if (vnThreadsRunning[THREAD_MINER] > 0) printf("ThreadBitcoinMiner still running\n");
if (vnThreadsRunning[THREAD_RPCSERVER] > 0) printf("ThreadRPCServer still running\n"); if (vnThreadsRunning[THREAD_RPCLISTENER] > 0) printf("ThreadRPCListener still running\n");
if (vnThreadsRunning[THREAD_RPCHANDLER] > 0) printf("ThreadsRPCServer still running\n");
if (fHaveUPnP && vnThreadsRunning[THREAD_UPNP] > 0) printf("ThreadMapPort still running\n"); if (fHaveUPnP && vnThreadsRunning[THREAD_UPNP] > 0) printf("ThreadMapPort still running\n");
if (vnThreadsRunning[THREAD_DNSSEED] > 0) printf("ThreadDNSAddressSeed still running\n"); if (vnThreadsRunning[THREAD_DNSSEED] > 0) printf("ThreadDNSAddressSeed still running\n");
if (vnThreadsRunning[THREAD_ADDEDCONNECTIONS] > 0) printf("ThreadOpenAddedConnections still running\n"); if (vnThreadsRunning[THREAD_ADDEDCONNECTIONS] > 0) printf("ThreadOpenAddedConnections still running\n");
if (vnThreadsRunning[THREAD_DUMPADDRESS] > 0) printf("ThreadDumpAddresses still running\n"); if (vnThreadsRunning[THREAD_DUMPADDRESS] > 0) printf("ThreadDumpAddresses still running\n");
while (vnThreadsRunning[THREAD_MESSAGEHANDLER] > 0 || vnThreadsRunning[THREAD_RPCSERVER] > 0) while (vnThreadsRunning[THREAD_MESSAGEHANDLER] > 0 || vnThreadsRunning[THREAD_RPCHANDLER] > 0)
Sleep(20); Sleep(20);
Sleep(50); Sleep(50);
DumpAddresses(); DumpAddresses();

View file

@ -92,11 +92,12 @@ enum threadId
THREAD_OPENCONNECTIONS, THREAD_OPENCONNECTIONS,
THREAD_MESSAGEHANDLER, THREAD_MESSAGEHANDLER,
THREAD_MINER, THREAD_MINER,
THREAD_RPCSERVER, THREAD_RPCLISTENER,
THREAD_UPNP, THREAD_UPNP,
THREAD_DNSSEED, THREAD_DNSSEED,
THREAD_ADDEDCONNECTIONS, THREAD_ADDEDCONNECTIONS,
THREAD_DUMPADDRESS, THREAD_DUMPADDRESS,
THREAD_RPCHANDLER,
THREAD_MAX THREAD_MAX
}; };