Expose a close ntfn channel to all RPC handlers.

This commit modifies the RPC server such that all handlers now receive a
channel which will be notified when a client disconnects.  This
notification can then be used to stop long-running operations early when a
client disconnects.

This capability was already present for websocket clients, but this commit
exposes it to standard HTTP clients as well.
This commit is contained in:
Dave Collins 2014-06-27 11:13:04 -05:00
parent 84af0d500f
commit d40cff64b0
2 changed files with 152 additions and 45 deletions

View file

@ -23,6 +23,7 @@ import (
"github.com/conformal/btcws"
"github.com/conformal/fastsha256"
"github.com/conformal/websocket"
"io"
"io/ioutil"
"math/big"
"math/rand"
@ -70,7 +71,7 @@ var (
ErrBadParamsField = errors.New("bad params field")
)
type commandHandler func(*rpcServer, btcjson.Cmd) (interface{}, error)
type commandHandler func(*rpcServer, btcjson.Cmd, <-chan struct{}) (interface{}, error)
// handlers maps RPC command strings to appropriate handler functions.
// this is copied by init because help references rpcHandlers and thus causes
@ -210,6 +211,8 @@ type rpcServer struct {
ntfnMgr *wsNotificationManager
numClients int
numClientsMutex sync.Mutex
statusLines map[int]string
statusLock sync.RWMutex
wg sync.WaitGroup
listeners []net.Listener
workState *workState
@ -285,6 +288,65 @@ func (s *rpcServer) Start() {
s.ntfnMgr.Start()
}
// httpStatusLine returns a response Status-Line (RFC 2616 Section 6.1)
// for the given request and response status code. This function was lifted and
// adapted from the standard library HTTP server code since it's not exported.
func (s *rpcServer) httpStatusLine(req *http.Request, code int) string {
// Fast path:
key := code
proto11 := req.ProtoAtLeast(1, 1)
if !proto11 {
key = -key
}
s.statusLock.RLock()
line, ok := s.statusLines[key]
s.statusLock.RUnlock()
if ok {
return line
}
// Slow path:
proto := "HTTP/1.0"
if proto11 {
proto = "HTTP/1.1"
}
codeStr := strconv.Itoa(code)
text := http.StatusText(code)
if text != "" {
line = proto + " " + codeStr + " " + text + "\r\n"
s.statusLock.Lock()
s.statusLines[key] = line
s.statusLock.Unlock()
} else {
text = "status code " + codeStr
line = proto + " " + codeStr + " " + text + "\r\n"
}
return line
}
// writeHTTPResponseHeaders writes the necessary response headers prior to
// writing an HTTP body given a request to use for protocol negotiation, headers
// to write, a status code, and a writer.
func (s *rpcServer) writeHTTPResponseHeaders(req *http.Request, headers http.Header, code int, w io.Writer) error {
_, err := io.WriteString(w, s.httpStatusLine(req, code))
if err != nil {
return err
}
err = headers.Write(w)
if err != nil {
return err
}
_, err = io.WriteString(w, "\r\n")
if err != nil {
return err
}
return nil
}
// limitConnections responds with a 503 service unavailable and returns true if
// adding another client would exceed the maximum allow RPC clients.
//
@ -408,6 +470,7 @@ func newRPCServer(listenAddrs []string, s *server) (*rpcServer, error) {
rpc := rpcServer{
authsha: fastsha256.Sum256([]byte(auth)),
server: s,
statusLines: make(map[int]string),
workState: newWorkState(),
quit: make(chan int),
}
@ -484,6 +547,33 @@ func jsonRPCRead(w http.ResponseWriter, r *http.Request, s *rpcServer) {
return
}
// Unfortunately, the http server doesn't provide the ability to
// change the read deadline for the new connection and having one breaks
// long polling. However, not having a read deadline on the initial
// connection would mean clients can connect and idle forever. Thus,
// hijack the connecton from the HTTP server, clear the read deadline,
// and handle writing the response manually.
hj, ok := w.(http.Hijacker)
if !ok {
errMsg := "webserver doesn't support hijacking"
rpcsLog.Warnf(errMsg)
errCode := http.StatusInternalServerError
http.Error(w, strconv.FormatInt(int64(errCode), 10)+" "+errMsg,
errCode)
return
}
conn, buf, err := hj.Hijack()
if err != nil {
rpcsLog.Warnf("Failed to hijack HTTP connection: %v", err)
errCode := http.StatusInternalServerError
http.Error(w, strconv.FormatInt(int64(errCode), 10)+" "+
err.Error(), errCode)
return
}
defer conn.Close()
defer buf.Flush()
conn.SetReadDeadline(timeZeroVal)
var reply btcjson.Reply
cmd, jsonErr := parseCmd(body)
if cmd != nil {
@ -495,14 +585,30 @@ func jsonRPCRead(w http.ResponseWriter, r *http.Request, s *rpcServer) {
if jsonErr != nil {
reply.Error = jsonErr
} else {
reply = standardCmdReply(cmd, s)
// Setup a close notifier. Since the connection is hijacked,
// the CloseNotifer on the ResponseWriter is not available.
closeChan := make(chan struct{}, 1)
go func() {
_, err := conn.Read(make([]byte, 1))
if err != nil {
close(closeChan)
}
}()
reply = standardCmdReply(cmd, s, closeChan)
}
rpcsLog.Tracef("reply: %v", reply)
msg, err := btcjson.MarshallAndSend(reply, w)
err = s.writeHTTPResponseHeaders(r, w.Header(), http.StatusOK, buf)
if err != nil {
rpcsLog.Errorf(msg)
rpcsLog.Error(err)
return
}
msg, err := btcjson.MarshallAndSend(reply, buf)
if err != nil {
rpcsLog.Error(err)
return
}
rpcsLog.Tracef(msg)
@ -510,19 +616,19 @@ func jsonRPCRead(w http.ResponseWriter, r *http.Request, s *rpcServer) {
// handleUnimplemented is a temporary handler for commands that we should
// support but do not.
func handleUnimplemented(s *rpcServer, cmd btcjson.Cmd) (interface{}, error) {
func handleUnimplemented(s *rpcServer, cmd btcjson.Cmd, closeChan <-chan struct{}) (interface{}, error) {
return nil, btcjson.ErrUnimplemented
}
// handleAskWallet is the handler for commands that we do recognise as valid
// but that we can not answer correctly since it involves wallet state.
// These commands will be implemented in btcwallet.
func handleAskWallet(s *rpcServer, cmd btcjson.Cmd) (interface{}, error) {
func handleAskWallet(s *rpcServer, cmd btcjson.Cmd, closeChan <-chan struct{}) (interface{}, error) {
return nil, btcjson.ErrNoWallet
}
// handleAddNode handles addnode commands.
func handleAddNode(s *rpcServer, cmd btcjson.Cmd) (interface{}, error) {
func handleAddNode(s *rpcServer, cmd btcjson.Cmd, closeChan <-chan struct{}) (interface{}, error) {
c := cmd.(*btcjson.AddNodeCmd)
addr := normalizeAddress(c.Addr, activeNetParams.DefaultPort)
@ -564,7 +670,7 @@ func messageToHex(msg btcwire.Message) (string, error) {
}
// handleCreateRawTransaction handles createrawtransaction commands.
func handleCreateRawTransaction(s *rpcServer, cmd btcjson.Cmd) (interface{}, error) {
func handleCreateRawTransaction(s *rpcServer, cmd btcjson.Cmd, closeChan <-chan struct{}) (interface{}, error) {
c := cmd.(*btcjson.CreateRawTransactionCmd)
// Add all transaction inputs to a new transaction after performing
@ -650,7 +756,7 @@ func handleCreateRawTransaction(s *rpcServer, cmd btcjson.Cmd) (interface{}, err
}
// handleDebugLevel handles debuglevel commands.
func handleDebugLevel(s *rpcServer, cmd btcjson.Cmd) (interface{}, error) {
func handleDebugLevel(s *rpcServer, cmd btcjson.Cmd, closeChan <-chan struct{}) (interface{}, error) {
c := cmd.(*btcjson.DebugLevelCmd)
// Special show command to list supported subsystems.
@ -781,7 +887,7 @@ func createTxRawResult(net *btcnet.Params, txSha string, mtx *btcwire.MsgTx,
}
// handleDecodeRawTransaction handles decoderawtransaction commands.
func handleDecodeRawTransaction(s *rpcServer, cmd btcjson.Cmd) (interface{}, error) {
func handleDecodeRawTransaction(s *rpcServer, cmd btcjson.Cmd, closeChan <-chan struct{}) (interface{}, error) {
c := cmd.(*btcjson.DecodeRawTransactionCmd)
// Deserialize the transaction.
@ -828,7 +934,7 @@ func handleDecodeRawTransaction(s *rpcServer, cmd btcjson.Cmd) (interface{}, err
}
// handleDecodeScript handles decodescript commands.
func handleDecodeScript(s *rpcServer, cmd btcjson.Cmd) (interface{}, error) {
func handleDecodeScript(s *rpcServer, cmd btcjson.Cmd, closeChan <-chan struct{}) (interface{}, error) {
c := cmd.(*btcjson.DecodeScriptCmd)
// Convert the hex script to bytes.
@ -876,7 +982,7 @@ func handleDecodeScript(s *rpcServer, cmd btcjson.Cmd) (interface{}, error) {
}
// handleGetAddedNodeInfo handles getaddednodeinfo commands.
func handleGetAddedNodeInfo(s *rpcServer, cmd btcjson.Cmd) (interface{}, error) {
func handleGetAddedNodeInfo(s *rpcServer, cmd btcjson.Cmd, closeChan <-chan struct{}) (interface{}, error) {
c := cmd.(*btcjson.GetAddedNodeInfoCmd)
// Retrieve a list of persistent (added) peers from the bitcoin server
@ -959,7 +1065,7 @@ func handleGetAddedNodeInfo(s *rpcServer, cmd btcjson.Cmd) (interface{}, error)
}
// handleGetBestBlock implements the getbestblock command.
func handleGetBestBlock(s *rpcServer, cmd btcjson.Cmd) (interface{}, error) {
func handleGetBestBlock(s *rpcServer, cmd btcjson.Cmd, closeChan <-chan struct{}) (interface{}, error) {
// All other "get block" commands give either the height, the
// hash, or both but require the block SHA. This gets both for
// the best block.
@ -976,7 +1082,7 @@ func handleGetBestBlock(s *rpcServer, cmd btcjson.Cmd) (interface{}, error) {
}
// handleGetBestBlockHash implements the getbestblockhash command.
func handleGetBestBlockHash(s *rpcServer, cmd btcjson.Cmd) (interface{}, error) {
func handleGetBestBlockHash(s *rpcServer, cmd btcjson.Cmd, closeChan <-chan struct{}) (interface{}, error) {
sha, _, err := s.server.db.NewestSha()
if err != nil {
rpcsLog.Errorf("Error getting newest sha: %v", err)
@ -987,7 +1093,7 @@ func handleGetBestBlockHash(s *rpcServer, cmd btcjson.Cmd) (interface{}, error)
}
// handleGetBlock implements the getblock command.
func handleGetBlock(s *rpcServer, cmd btcjson.Cmd) (interface{}, error) {
func handleGetBlock(s *rpcServer, cmd btcjson.Cmd, closeChan <-chan struct{}) (interface{}, error) {
c := cmd.(*btcjson.GetBlockCmd)
sha, err := btcwire.NewShaHashFromStr(c.Hash)
if err != nil {
@ -1083,7 +1189,7 @@ func handleGetBlock(s *rpcServer, cmd btcjson.Cmd) (interface{}, error) {
}
// handleGetBlockCount implements the getblockcount command.
func handleGetBlockCount(s *rpcServer, cmd btcjson.Cmd) (interface{}, error) {
func handleGetBlockCount(s *rpcServer, cmd btcjson.Cmd, closeChan <-chan struct{}) (interface{}, error) {
_, maxidx, err := s.server.db.NewestSha()
if err != nil {
rpcsLog.Errorf("Error getting newest sha: %v", err)
@ -1094,7 +1200,7 @@ func handleGetBlockCount(s *rpcServer, cmd btcjson.Cmd) (interface{}, error) {
}
// handleGetBlockHash implements the getblockhash command.
func handleGetBlockHash(s *rpcServer, cmd btcjson.Cmd) (interface{}, error) {
func handleGetBlockHash(s *rpcServer, cmd btcjson.Cmd, closeChan <-chan struct{}) (interface{}, error) {
c := cmd.(*btcjson.GetBlockHashCmd)
sha, err := s.server.db.FetchBlockShaByHeight(c.Index)
if err != nil {
@ -1106,17 +1212,17 @@ func handleGetBlockHash(s *rpcServer, cmd btcjson.Cmd) (interface{}, error) {
}
// handleGetConnectionCount implements the getconnectioncount command.
func handleGetConnectionCount(s *rpcServer, cmd btcjson.Cmd) (interface{}, error) {
func handleGetConnectionCount(s *rpcServer, cmd btcjson.Cmd, closeChan <-chan struct{}) (interface{}, error) {
return s.server.ConnectedCount(), nil
}
// handleGetCurrentNet implements the getcurrentnet command.
func handleGetCurrentNet(s *rpcServer, cmd btcjson.Cmd) (interface{}, error) {
func handleGetCurrentNet(s *rpcServer, cmd btcjson.Cmd, closeChan <-chan struct{}) (interface{}, error) {
return s.server.netParams.Net, nil
}
// handleGetDifficulty implements the getdifficulty command.
func handleGetDifficulty(s *rpcServer, cmd btcjson.Cmd) (interface{}, error) {
func handleGetDifficulty(s *rpcServer, cmd btcjson.Cmd, closeChan <-chan struct{}) (interface{}, error) {
sha, _, err := s.server.db.NewestSha()
if err != nil {
rpcsLog.Errorf("Error getting sha: %v", err)
@ -1131,18 +1237,18 @@ func handleGetDifficulty(s *rpcServer, cmd btcjson.Cmd) (interface{}, error) {
}
// handleGetGenerate implements the getgenerate command.
func handleGetGenerate(s *rpcServer, cmd btcjson.Cmd) (interface{}, error) {
func handleGetGenerate(s *rpcServer, cmd btcjson.Cmd, closeChan <-chan struct{}) (interface{}, error) {
return s.server.cpuMiner.IsMining(), nil
}
// handleGetHashesPerSec implements the gethashespersec command.
func handleGetHashesPerSec(s *rpcServer, cmd btcjson.Cmd) (interface{}, error) {
func handleGetHashesPerSec(s *rpcServer, cmd btcjson.Cmd, closeChan <-chan struct{}) (interface{}, error) {
return int64(s.server.cpuMiner.HashesPerSecond()), nil
}
// handleGetInfo implements the getinfo command. We only return the fields
// that are not related to wallet functionality.
func handleGetInfo(s *rpcServer, cmd btcjson.Cmd) (interface{}, error) {
func handleGetInfo(s *rpcServer, cmd btcjson.Cmd, closeChan <-chan struct{}) (interface{}, error) {
// We require the current block height and sha.
sha, height, err := s.server.db.NewestSha()
if err != nil {
@ -1172,7 +1278,7 @@ func handleGetInfo(s *rpcServer, cmd btcjson.Cmd) (interface{}, error) {
// handleGetMiningInfo implements the getmininginfo command. We only return the
// fields that are not related to wallet functionality.
func handleGetMiningInfo(s *rpcServer, cmd btcjson.Cmd) (interface{}, error) {
func handleGetMiningInfo(s *rpcServer, cmd btcjson.Cmd, closeChan <-chan struct{}) (interface{}, error) {
sha, height, err := s.server.db.NewestSha()
if err != nil {
rpcsLog.Errorf("Error getting sha: %v", err)
@ -1201,7 +1307,8 @@ func handleGetMiningInfo(s *rpcServer, cmd btcjson.Cmd) (interface{}, error) {
Message: err.Error(),
}
}
networkHashesPerSecIface, err := handleGetNetworkHashPS(s, gnhpsCmd)
networkHashesPerSecIface, err := handleGetNetworkHashPS(s, gnhpsCmd,
closeChan)
if err != nil {
// This is already a btcjson.Error from the handler.
return nil, err
@ -1230,7 +1337,7 @@ func handleGetMiningInfo(s *rpcServer, cmd btcjson.Cmd) (interface{}, error) {
}
// handleGetNetTotals implements the getnettotals command.
func handleGetNetTotals(s *rpcServer, cmd btcjson.Cmd) (interface{}, error) {
func handleGetNetTotals(s *rpcServer, cmd btcjson.Cmd, closeChan <-chan struct{}) (interface{}, error) {
totalBytesRecv, totalBytesSent := s.server.NetTotals()
reply := &btcjson.GetNetTotalsResult{
TotalBytesRecv: totalBytesRecv,
@ -1241,7 +1348,7 @@ func handleGetNetTotals(s *rpcServer, cmd btcjson.Cmd) (interface{}, error) {
}
// handleGetNetworkHashPS implements the getnetworkhashps command.
func handleGetNetworkHashPS(s *rpcServer, cmd btcjson.Cmd) (interface{}, error) {
func handleGetNetworkHashPS(s *rpcServer, cmd btcjson.Cmd, closeChan <-chan struct{}) (interface{}, error) {
c := cmd.(*btcjson.GetNetworkHashPSCmd)
_, newestHeight, err := s.server.db.NewestSha()
@ -1329,12 +1436,12 @@ func handleGetNetworkHashPS(s *rpcServer, cmd btcjson.Cmd) (interface{}, error)
}
// handleGetPeerInfo implements the getpeerinfo command.
func handleGetPeerInfo(s *rpcServer, cmd btcjson.Cmd) (interface{}, error) {
func handleGetPeerInfo(s *rpcServer, cmd btcjson.Cmd, closeChan <-chan struct{}) (interface{}, error) {
return s.server.PeerInfo(), nil
}
// handleGetRawMempool implements the getrawmempool command.
func handleGetRawMempool(s *rpcServer, cmd btcjson.Cmd) (interface{}, error) {
func handleGetRawMempool(s *rpcServer, cmd btcjson.Cmd, closeChan <-chan struct{}) (interface{}, error) {
c := cmd.(*btcjson.GetRawMempoolCmd)
descs := s.server.txMemPool.TxDescs()
@ -1376,7 +1483,7 @@ func handleGetRawMempool(s *rpcServer, cmd btcjson.Cmd) (interface{}, error) {
}
// handleGetRawTransaction implements the getrawtransaction command.
func handleGetRawTransaction(s *rpcServer, cmd btcjson.Cmd) (interface{}, error) {
func handleGetRawTransaction(s *rpcServer, cmd btcjson.Cmd, closeChan <-chan struct{}) (interface{}, error) {
c := cmd.(*btcjson.GetRawTransactionCmd)
// Convert the provided transaction hash hex to a ShaHash.
@ -1773,7 +1880,7 @@ func handleGetWorkSubmission(s *rpcServer, hexData string) (interface{}, error)
}
// handleGetWork implements the getwork command.
func handleGetWork(s *rpcServer, cmd btcjson.Cmd) (interface{}, error) {
func handleGetWork(s *rpcServer, cmd btcjson.Cmd, closeChan <-chan struct{}) (interface{}, error) {
c := cmd.(*btcjson.GetWorkCmd)
// Respond with an error if there are no addresses to pay the created
@ -1836,7 +1943,7 @@ func getHelpText(cmdName string) (string, error) {
}
// handleHelp implements the help command.
func handleHelp(s *rpcServer, cmd btcjson.Cmd) (interface{}, error) {
func handleHelp(s *rpcServer, cmd btcjson.Cmd, closeChan <-chan struct{}) (interface{}, error) {
help := cmd.(*btcjson.HelpCmd)
// if no args we give a list of all known commands
@ -1867,7 +1974,7 @@ func handleHelp(s *rpcServer, cmd btcjson.Cmd) (interface{}, error) {
}
// handlePing implements the ping command.
func handlePing(s *rpcServer, cmd btcjson.Cmd) (interface{}, error) {
func handlePing(s *rpcServer, cmd btcjson.Cmd, closeChan <-chan struct{}) (interface{}, error) {
// Ask server to ping \o_
nonce, err := btcwire.RandomUint64()
if err != nil {
@ -1880,7 +1987,7 @@ func handlePing(s *rpcServer, cmd btcjson.Cmd) (interface{}, error) {
}
// handleSendRawTransaction implements the sendrawtransaction command.
func handleSendRawTransaction(s *rpcServer, cmd btcjson.Cmd) (interface{}, error) {
func handleSendRawTransaction(s *rpcServer, cmd btcjson.Cmd, closeChan <-chan struct{}) (interface{}, error) {
c := cmd.(*btcjson.SendRawTransactionCmd)
// Deserialize and send off to tx relay
serializedTx, err := hex.DecodeString(c.HexTx)
@ -1929,7 +2036,7 @@ func handleSendRawTransaction(s *rpcServer, cmd btcjson.Cmd) (interface{}, error
}
// handleSetGenerate implements the setgenerate command.
func handleSetGenerate(s *rpcServer, cmd btcjson.Cmd) (interface{}, error) {
func handleSetGenerate(s *rpcServer, cmd btcjson.Cmd, closeChan <-chan struct{}) (interface{}, error) {
c := cmd.(*btcjson.SetGenerateCmd)
// Disable generation regardless of the provided generate flag if the
@ -1961,13 +2068,13 @@ func handleSetGenerate(s *rpcServer, cmd btcjson.Cmd) (interface{}, error) {
}
// handleStop implements the stop command.
func handleStop(s *rpcServer, cmd btcjson.Cmd) (interface{}, error) {
func handleStop(s *rpcServer, cmd btcjson.Cmd, closeChan <-chan struct{}) (interface{}, error) {
s.server.Stop()
return "btcd stopping.", nil
}
// handleSubmitBlock implements the submitblock command.
func handleSubmitBlock(s *rpcServer, cmd btcjson.Cmd) (interface{}, error) {
func handleSubmitBlock(s *rpcServer, cmd btcjson.Cmd, closeChan <-chan struct{}) (interface{}, error) {
c := cmd.(*btcjson.SubmitBlockCmd)
// Deserialize and send off to block processor.
serializedBlock, err := hex.DecodeString(c.HexBlock)
@ -2044,7 +2151,7 @@ func verifyChain(db btcdb.Db, level, depth int32) error {
return nil
}
func handleVerifyChain(s *rpcServer, cmd btcjson.Cmd) (interface{}, error) {
func handleVerifyChain(s *rpcServer, cmd btcjson.Cmd, closeChan <-chan struct{}) (interface{}, error) {
c := cmd.(*btcjson.VerifyChainCmd)
err := verifyChain(s.server.db, c.CheckLevel, c.CheckDepth)
@ -2072,7 +2179,7 @@ func parseCmd(b []byte) (btcjson.Cmd, *btcjson.Error) {
// standardCmdReply checks that a parsed command is a standard
// Bitcoin JSON-RPC command and runs the proper handler to reply to the
// command.
func standardCmdReply(cmd btcjson.Cmd, s *rpcServer) (reply btcjson.Reply) {
func standardCmdReply(cmd btcjson.Cmd, s *rpcServer, closeChan <-chan struct{}) (reply btcjson.Reply) {
id := cmd.Id()
reply.Id = &id
@ -2094,7 +2201,7 @@ func standardCmdReply(cmd btcjson.Cmd, s *rpcServer) (reply btcjson.Reply) {
return reply
handled:
result, err := handler(s, cmd)
result, err := handler(s, cmd, closeChan)
if err != nil {
jsonErr, ok := err.(btcjson.Error)
if !ok {

View file

@ -1015,7 +1015,7 @@ func (c *wsClient) handleMessage(msg []byte) {
if !ok {
// No websocket-specific handler so handle like a legacy
// RPC connection.
response := standardCmdReply(cmd, c.server)
response := standardCmdReply(cmd, c.server, nil)
reply, err := json.Marshal(response)
if err != nil {
rpcsLog.Errorf("Failed to marshal reply for <%s> "+