lbrycrd/src/rpc/server.cpp
Wladimir J. van der Laan a88bd3186d
Merge #14670: http: Fix HTTP server shutdown
28479f926f qa: Test bitcond shutdown (João Barbosa)
8d3f46ec39 http: Remove timeout to exit event loop (João Barbosa)
e98a9eede2 http: Remove unnecessary event_base_loopexit call (João Barbosa)
6b13580f4e http: Unlisten sockets after all workers quit (João Barbosa)
18e9685816 http: Send "Connection: close" header if shutdown is requested (João Barbosa)
02e1e4eff6 rpc: Add wait argument to stop (João Barbosa)

Pull request description:

  Fixes #11777. Reverts #11006. Replaces #13501.

  With this change the HTTP server will exit gracefully, meaning that all requests will finish processing and sending the response, even if this means to wait more than 2 seconds (current time allowed to exit the event loop).

  Another small change is that connections are accepted even when the server is stopping, but HTTP requests are rejected. This can be improved later, especially if chunked replies are implemented.

  Briefly, before this PR, this is the order or events when a request arrives (RPC `stop`):
   1. `bufferevent_disable(..., EV_READ)`
   2. `StartShutdown()`
   3. `evhttp_del_accept_socket(...)`
   4. `ThreadHTTP` terminates (event loop exits) because there are no active or pending events thanks to 1. and 3.
   5. client doesn't get the response thanks to 4.

  This can be verified by applying
  ```diff
       // Event loop will exit after current HTTP requests have been handled, so
       // this reply will get back to the client.
       StartShutdown();
  +    MilliSleep(2000);
       return "Bitcoin server stopping";
   }
  ```
  and checking the log output:
  ```
      Received a POST request for / from 127.0.0.1:62443
      ThreadRPCServer method=stop user=__cookie__
      Interrupting HTTP server
  **  Exited http event loop
      Interrupting HTTP RPC server
      Interrupting RPC
      tor: Thread interrupt
      Shutdown: In progress...
      torcontrol thread exit
      Stopping HTTP RPC server
      addcon thread exit
      opencon thread exit
      Unregistering HTTP handler for / (exactmatch 1)
      Unregistering HTTP handler for /wallet/ (exactmatch 0)
      Stopping RPC
      RPC stopped.
      Stopping HTTP server
      Waiting for HTTP worker threads to exit
      msghand thread exit
      net thread exit

      ... sleep 2 seconds ...

      Waiting for HTTP event thread to exit
      Stopped HTTP server
  ```

  For this reason point 3. is moved right after all HTTP workers quit. In that moment HTTP replies are queued in the event loop which keeps spinning util all connections are closed. In order to trigger the server side close with keep alive connections (implicit in HTTP/1.1) the header `Connection: close` is sent if shutdown was requested. This can be tested by
  ```
  bitcoind -regtest
  nc localhost 18443
  POST / HTTP/1.1
  Authorization: Basic ...
  Content-Type: application/json
  Content-Length: 44

  {"jsonrpc": "2.0","method":"stop","id":123}
  ```

  Summing up, this PR:
   - removes explicit event loop exit — event loop exits once there are no active or pending events
   - changes the moment the listening sockets are removed — explained above
   - sends header `Connection: close` on active requests when shutdown was requested which is relevant when it's a persistent connection (default in HTTP 1.1) — libevent is aware of this header and closes the connection gracefully
   - removes event loop explicit break after 2 seconds timeout

Tree-SHA512: 4dac1e86abe388697c1e2dedbf31fb36a394cfafe5e64eadbf6ed01d829542785a8c3b91d1ab680d3f03f912d14fc87176428041141441d25dcb6c98a1e069d8
2018-12-06 17:43:07 +01:00

560 lines
18 KiB
C++

// Copyright (c) 2010 Satoshi Nakamoto
// Copyright (c) 2009-2018 The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#include <rpc/server.h>
#include <fs.h>
#include <key_io.h>
#include <random.h>
#include <rpc/util.h>
#include <shutdown.h>
#include <sync.h>
#include <ui_interface.h>
#include <util/strencodings.h>
#include <util/system.h>
#include <boost/bind.hpp>
#include <boost/signals2/signal.hpp>
#include <boost/algorithm/string/classification.hpp>
#include <boost/algorithm/string/split.hpp>
#include <memory> // for unique_ptr
#include <unordered_map>
static CCriticalSection cs_rpcWarmup;
static bool fRPCRunning = false;
static bool fRPCInWarmup GUARDED_BY(cs_rpcWarmup) = true;
static std::string rpcWarmupStatus GUARDED_BY(cs_rpcWarmup) = "RPC server started";
/* Timer-creating functions */
static RPCTimerInterface* timerInterface = nullptr;
/* Map of name to timer. */
static std::map<std::string, std::unique_ptr<RPCTimerBase> > deadlineTimers;
static struct CRPCSignals
{
boost::signals2::signal<void ()> Started;
boost::signals2::signal<void ()> Stopped;
boost::signals2::signal<void (const CRPCCommand&)> PreCommand;
} g_rpcSignals;
void RPCServer::OnStarted(std::function<void ()> slot)
{
g_rpcSignals.Started.connect(slot);
}
void RPCServer::OnStopped(std::function<void ()> slot)
{
g_rpcSignals.Stopped.connect(slot);
}
void RPCTypeCheck(const UniValue& params,
const std::list<UniValueType>& typesExpected,
bool fAllowNull)
{
unsigned int i = 0;
for (const UniValueType& t : typesExpected) {
if (params.size() <= i)
break;
const UniValue& v = params[i];
if (!(fAllowNull && v.isNull())) {
RPCTypeCheckArgument(v, t);
}
i++;
}
}
void RPCTypeCheckArgument(const UniValue& value, const UniValueType& typeExpected)
{
if (!typeExpected.typeAny && value.type() != typeExpected.type) {
throw JSONRPCError(RPC_TYPE_ERROR, strprintf("Expected type %s, got %s", uvTypeName(typeExpected.type), uvTypeName(value.type())));
}
}
void RPCTypeCheckObj(const UniValue& o,
const std::map<std::string, UniValueType>& typesExpected,
bool fAllowNull,
bool fStrict)
{
for (const auto& t : typesExpected) {
const UniValue& v = find_value(o, t.first);
if (!fAllowNull && v.isNull())
throw JSONRPCError(RPC_TYPE_ERROR, strprintf("Missing %s", t.first));
if (!(t.second.typeAny || v.type() == t.second.type || (fAllowNull && v.isNull()))) {
std::string err = strprintf("Expected type %s for %s, got %s",
uvTypeName(t.second.type), t.first, uvTypeName(v.type()));
throw JSONRPCError(RPC_TYPE_ERROR, err);
}
}
if (fStrict)
{
for (const std::string& k : o.getKeys())
{
if (typesExpected.count(k) == 0)
{
std::string err = strprintf("Unexpected key %s", k);
throw JSONRPCError(RPC_TYPE_ERROR, err);
}
}
}
}
CAmount AmountFromValue(const UniValue& value)
{
if (!value.isNum() && !value.isStr())
throw JSONRPCError(RPC_TYPE_ERROR, "Amount is not a number or string");
CAmount amount;
if (!ParseFixedPoint(value.getValStr(), 8, &amount))
throw JSONRPCError(RPC_TYPE_ERROR, "Invalid amount");
if (!MoneyRange(amount))
throw JSONRPCError(RPC_TYPE_ERROR, "Amount out of range");
return amount;
}
uint256 ParseHashV(const UniValue& v, std::string strName)
{
std::string strHex(v.get_str());
if (64 != strHex.length())
throw JSONRPCError(RPC_INVALID_PARAMETER, strprintf("%s must be of length %d (not %d, for '%s')", strName, 64, strHex.length(), strHex));
if (!IsHex(strHex)) // Note: IsHex("") is false
throw JSONRPCError(RPC_INVALID_PARAMETER, strName+" must be hexadecimal string (not '"+strHex+"')");
return uint256S(strHex);
}
uint256 ParseHashO(const UniValue& o, std::string strKey)
{
return ParseHashV(find_value(o, strKey), strKey);
}
std::vector<unsigned char> ParseHexV(const UniValue& v, std::string strName)
{
std::string strHex;
if (v.isStr())
strHex = v.get_str();
if (!IsHex(strHex))
throw JSONRPCError(RPC_INVALID_PARAMETER, strName+" must be hexadecimal string (not '"+strHex+"')");
return ParseHex(strHex);
}
std::vector<unsigned char> ParseHexO(const UniValue& o, std::string strKey)
{
return ParseHexV(find_value(o, strKey), strKey);
}
std::string CRPCTable::help(const std::string& strCommand, const JSONRPCRequest& helpreq) const
{
std::string strRet;
std::string category;
std::set<rpcfn_type> setDone;
std::vector<std::pair<std::string, const CRPCCommand*> > vCommands;
for (const auto& entry : mapCommands)
vCommands.push_back(make_pair(entry.second->category + entry.first, entry.second));
sort(vCommands.begin(), vCommands.end());
JSONRPCRequest jreq(helpreq);
jreq.fHelp = true;
jreq.params = UniValue();
for (const std::pair<std::string, const CRPCCommand*>& command : vCommands)
{
const CRPCCommand *pcmd = command.second;
std::string strMethod = pcmd->name;
if ((strCommand != "" || pcmd->category == "hidden") && strMethod != strCommand)
continue;
jreq.strMethod = strMethod;
try
{
rpcfn_type pfn = pcmd->actor;
if (setDone.insert(pfn).second)
(*pfn)(jreq);
}
catch (const std::exception& e)
{
// Help text is returned in an exception
std::string strHelp = std::string(e.what());
if (strCommand == "")
{
if (strHelp.find('\n') != std::string::npos)
strHelp = strHelp.substr(0, strHelp.find('\n'));
if (category != pcmd->category)
{
if (!category.empty())
strRet += "\n";
category = pcmd->category;
strRet += "== " + Capitalize(category) + " ==\n";
}
}
strRet += strHelp + "\n";
}
}
if (strRet == "")
strRet = strprintf("help: unknown command: %s\n", strCommand);
strRet = strRet.substr(0,strRet.size()-1);
return strRet;
}
UniValue help(const JSONRPCRequest& jsonRequest)
{
if (jsonRequest.fHelp || jsonRequest.params.size() > 1)
throw std::runtime_error(
RPCHelpMan{"help",
"\nList all commands, or get help for a specified command.\n",
{
{"command", RPCArg::Type::STR, /* opt */ true, /* default_val */ "", "The command to get help on"},
}}
.ToString() +
"\nResult:\n"
"\"text\" (string) The help text\n"
);
std::string strCommand;
if (jsonRequest.params.size() > 0)
strCommand = jsonRequest.params[0].get_str();
return tableRPC.help(strCommand, jsonRequest);
}
UniValue stop(const JSONRPCRequest& jsonRequest)
{
// Accept the deprecated and ignored 'detach' boolean argument
// Also accept the hidden 'wait' integer argument (milliseconds)
// For instance, 'stop 1000' makes the call wait 1 second before returning
// to the client (intended for testing)
if (jsonRequest.fHelp || jsonRequest.params.size() > 1)
throw std::runtime_error(
RPCHelpMan{"stop",
"\nStop Bitcoin server.", {}}
.ToString());
// Event loop will exit after current HTTP requests have been handled, so
// this reply will get back to the client.
StartShutdown();
if (jsonRequest.params[0].isNum()) {
MilliSleep(jsonRequest.params[0].get_int());
}
return "Bitcoin server stopping";
}
static UniValue uptime(const JSONRPCRequest& jsonRequest)
{
if (jsonRequest.fHelp || jsonRequest.params.size() > 0)
throw std::runtime_error(
RPCHelpMan{"uptime",
"\nReturns the total uptime of the server.\n", {}}
.ToString() +
"\nResult:\n"
"ttt (numeric) The number of seconds that the server has been running\n"
"\nExamples:\n"
+ HelpExampleCli("uptime", "")
+ HelpExampleRpc("uptime", "")
);
return GetTime() - GetStartupTime();
}
// clang-format off
static const CRPCCommand vRPCCommands[] =
{ // category name actor (function) argNames
// --------------------- ------------------------ ----------------------- ----------
/* Overall control/query calls */
{ "control", "help", &help, {"command"} },
{ "control", "stop", &stop, {"wait"} },
{ "control", "uptime", &uptime, {} },
};
// clang-format on
CRPCTable::CRPCTable()
{
unsigned int vcidx;
for (vcidx = 0; vcidx < (sizeof(vRPCCommands) / sizeof(vRPCCommands[0])); vcidx++)
{
const CRPCCommand *pcmd;
pcmd = &vRPCCommands[vcidx];
mapCommands[pcmd->name] = pcmd;
}
}
const CRPCCommand *CRPCTable::operator[](const std::string &name) const
{
std::map<std::string, const CRPCCommand*>::const_iterator it = mapCommands.find(name);
if (it == mapCommands.end())
return nullptr;
return (*it).second;
}
bool CRPCTable::appendCommand(const std::string& name, const CRPCCommand* pcmd)
{
if (IsRPCRunning())
return false;
// don't allow overwriting for now
std::map<std::string, const CRPCCommand*>::const_iterator it = mapCommands.find(name);
if (it != mapCommands.end())
return false;
mapCommands[name] = pcmd;
return true;
}
void StartRPC()
{
LogPrint(BCLog::RPC, "Starting RPC\n");
fRPCRunning = true;
g_rpcSignals.Started();
}
void InterruptRPC()
{
LogPrint(BCLog::RPC, "Interrupting RPC\n");
// Interrupt e.g. running longpolls
fRPCRunning = false;
}
void StopRPC()
{
LogPrint(BCLog::RPC, "Stopping RPC\n");
deadlineTimers.clear();
DeleteAuthCookie();
g_rpcSignals.Stopped();
}
bool IsRPCRunning()
{
return fRPCRunning;
}
void SetRPCWarmupStatus(const std::string& newStatus)
{
LOCK(cs_rpcWarmup);
rpcWarmupStatus = newStatus;
}
void SetRPCWarmupFinished()
{
LOCK(cs_rpcWarmup);
assert(fRPCInWarmup);
fRPCInWarmup = false;
}
bool RPCIsInWarmup(std::string *outStatus)
{
LOCK(cs_rpcWarmup);
if (outStatus)
*outStatus = rpcWarmupStatus;
return fRPCInWarmup;
}
void JSONRPCRequest::parse(const UniValue& valRequest)
{
// Parse request
if (!valRequest.isObject())
throw JSONRPCError(RPC_INVALID_REQUEST, "Invalid Request object");
const UniValue& request = valRequest.get_obj();
// Parse id now so errors from here on will have the id
id = find_value(request, "id");
// Parse method
UniValue valMethod = find_value(request, "method");
if (valMethod.isNull())
throw JSONRPCError(RPC_INVALID_REQUEST, "Missing method");
if (!valMethod.isStr())
throw JSONRPCError(RPC_INVALID_REQUEST, "Method must be a string");
strMethod = valMethod.get_str();
if (fLogIPs)
LogPrint(BCLog::RPC, "ThreadRPCServer method=%s user=%s peeraddr=%s\n", SanitizeString(strMethod),
this->authUser, this->peerAddr);
else
LogPrint(BCLog::RPC, "ThreadRPCServer method=%s user=%s\n", SanitizeString(strMethod), this->authUser);
// Parse params
UniValue valParams = find_value(request, "params");
if (valParams.isArray() || valParams.isObject())
params = valParams;
else if (valParams.isNull())
params = UniValue(UniValue::VARR);
else
throw JSONRPCError(RPC_INVALID_REQUEST, "Params must be an array or object");
}
bool IsDeprecatedRPCEnabled(const std::string& method)
{
const std::vector<std::string> enabled_methods = gArgs.GetArgs("-deprecatedrpc");
return find(enabled_methods.begin(), enabled_methods.end(), method) != enabled_methods.end();
}
static UniValue JSONRPCExecOne(JSONRPCRequest jreq, const UniValue& req)
{
UniValue rpc_result(UniValue::VOBJ);
try {
jreq.parse(req);
UniValue result = tableRPC.execute(jreq);
rpc_result = JSONRPCReplyObj(result, NullUniValue, jreq.id);
}
catch (const UniValue& objError)
{
rpc_result = JSONRPCReplyObj(NullUniValue, objError, jreq.id);
}
catch (const std::exception& e)
{
rpc_result = JSONRPCReplyObj(NullUniValue,
JSONRPCError(RPC_PARSE_ERROR, e.what()), jreq.id);
}
return rpc_result;
}
std::string JSONRPCExecBatch(const JSONRPCRequest& jreq, const UniValue& vReq)
{
UniValue ret(UniValue::VARR);
for (unsigned int reqIdx = 0; reqIdx < vReq.size(); reqIdx++)
ret.push_back(JSONRPCExecOne(jreq, vReq[reqIdx]));
return ret.write() + "\n";
}
/**
* Process named arguments into a vector of positional arguments, based on the
* passed-in specification for the RPC call's arguments.
*/
static inline JSONRPCRequest transformNamedArguments(const JSONRPCRequest& in, const std::vector<std::string>& argNames)
{
JSONRPCRequest out = in;
out.params = UniValue(UniValue::VARR);
// Build a map of parameters, and remove ones that have been processed, so that we can throw a focused error if
// there is an unknown one.
const std::vector<std::string>& keys = in.params.getKeys();
const std::vector<UniValue>& values = in.params.getValues();
std::unordered_map<std::string, const UniValue*> argsIn;
for (size_t i=0; i<keys.size(); ++i) {
argsIn[keys[i]] = &values[i];
}
// Process expected parameters.
int hole = 0;
for (const std::string &argNamePattern: argNames) {
std::vector<std::string> vargNames;
boost::algorithm::split(vargNames, argNamePattern, boost::algorithm::is_any_of("|"));
auto fr = argsIn.end();
for (const std::string & argName : vargNames) {
fr = argsIn.find(argName);
if (fr != argsIn.end()) {
break;
}
}
if (fr != argsIn.end()) {
for (int i = 0; i < hole; ++i) {
// Fill hole between specified parameters with JSON nulls,
// but not at the end (for backwards compatibility with calls
// that act based on number of specified parameters).
out.params.push_back(UniValue());
}
hole = 0;
out.params.push_back(*fr->second);
argsIn.erase(fr);
} else {
hole += 1;
}
}
// If there are still arguments in the argsIn map, this is an error.
if (!argsIn.empty()) {
throw JSONRPCError(RPC_INVALID_PARAMETER, "Unknown named parameter " + argsIn.begin()->first);
}
// Return request with named arguments transformed to positional arguments
return out;
}
UniValue CRPCTable::execute(const JSONRPCRequest &request) const
{
// Return immediately if in warmup
{
LOCK(cs_rpcWarmup);
if (fRPCInWarmup)
throw JSONRPCError(RPC_IN_WARMUP, rpcWarmupStatus);
}
// Find method
const CRPCCommand *pcmd = tableRPC[request.strMethod];
if (!pcmd)
throw JSONRPCError(RPC_METHOD_NOT_FOUND, "Method not found");
g_rpcSignals.PreCommand(*pcmd);
try
{
// Execute, convert arguments to array if necessary
if (request.params.isObject()) {
return pcmd->actor(transformNamedArguments(request, pcmd->argNames));
} else {
return pcmd->actor(request);
}
}
catch (const std::exception& e)
{
throw JSONRPCError(RPC_MISC_ERROR, e.what());
}
}
std::vector<std::string> CRPCTable::listCommands() const
{
std::vector<std::string> commandList;
typedef std::map<std::string, const CRPCCommand*> commandMap;
std::transform( mapCommands.begin(), mapCommands.end(),
std::back_inserter(commandList),
boost::bind(&commandMap::value_type::first,_1) );
return commandList;
}
std::string HelpExampleCli(const std::string& methodname, const std::string& args)
{
return "> bitcoin-cli " + methodname + " " + args + "\n";
}
std::string HelpExampleRpc(const std::string& methodname, const std::string& args)
{
return "> curl --user myusername --data-binary '{\"jsonrpc\": \"1.0\", \"id\":\"curltest\", "
"\"method\": \"" + methodname + "\", \"params\": [" + args + "] }' -H 'content-type: text/plain;' http://127.0.0.1:8332/\n";
}
void RPCSetTimerInterfaceIfUnset(RPCTimerInterface *iface)
{
if (!timerInterface)
timerInterface = iface;
}
void RPCSetTimerInterface(RPCTimerInterface *iface)
{
timerInterface = iface;
}
void RPCUnsetTimerInterface(RPCTimerInterface *iface)
{
if (timerInterface == iface)
timerInterface = nullptr;
}
void RPCRunLater(const std::string& name, std::function<void()> func, int64_t nSeconds)
{
if (!timerInterface)
throw JSONRPCError(RPC_INTERNAL_ERROR, "No timer handler registered for RPC");
deadlineTimers.erase(name);
LogPrint(BCLog::RPC, "queue run of timer %s in %i seconds (using %s)\n", name, nSeconds, timerInterface->Name());
deadlineTimers.emplace(name, std::unique_ptr<RPCTimerBase>(timerInterface->NewTimer(func, nSeconds*1000)));
}
int RPCSerializationFlags()
{
int flag = 0;
if (gArgs.GetArg("-rpcserialversion", DEFAULT_RPC_SERIALIZE_VERSION) == 0)
flag |= SERIALIZE_TRANSACTION_NO_WITNESS;
return flag;
}
CRPCTable tableRPC;