From 4943ed11b3c5008f79231a14a5e82c32b09f7667 Mon Sep 17 00:00:00 2001 From: Alex Date: Wed, 25 Jan 2017 11:49:35 -0700 Subject: [PATCH] rpcserver: implement `loadtxfilter` backported from dcrd --- rpcserver.go | 5 +- rpcserverhelp.go | 7 + rpcwebsocket.go | 442 +++++++++++++++++++++++++++++++++++++++++++++-- 3 files changed, 441 insertions(+), 13 deletions(-) diff --git a/rpcserver.go b/rpcserver.go index 87666983..9b4da54f 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -44,9 +44,9 @@ import ( // API version constants const ( - jsonrpcSemverString = "1.1.0" + jsonrpcSemverString = "1.2.0" jsonrpcSemverMajor = 1 - jsonrpcSemverMinor = 1 + jsonrpcSemverMinor = 2 jsonrpcSemverPatch = 0 ) @@ -232,6 +232,7 @@ var rpcUnimplemented = map[string]struct{}{ // Commands that are available to a limited user var rpcLimited = map[string]struct{}{ // Websockets commands + "loadtxfilter": {}, "notifyblocks": {}, "notifynewtransactions": {}, "notifyreceived": {}, diff --git a/rpcserverhelp.go b/rpcserverhelp.go index 3e9e4151..7e92e32c 100644 --- a/rpcserverhelp.go +++ b/rpcserverhelp.go @@ -590,6 +590,12 @@ var helpDescsEnUS = map[string]string{ "stopnotifyspent--synopsis": "Cancel registered spending notifications for each passed outpoint.", "stopnotifyspent-outpoints": "List of transaction outpoints to stop monitoring.", + // LoadTxFilterCmd help. + "loadtxfilter--synopsis": "Load, add to, or reload a websocket client's transaction filter for mempool transactions, new blocks and rescanblocks.", + "loadtxfilter-reload": "Load a new filter instead of adding data to an existing one", + "loadtxfilter-addresses": "Array of addresses to add to the transaction filter", + "loadtxfilter-outpoints": "Array of outpoints to add to the transaction filter", + // Rescan help. "rescan--synopsis": "Rescan block chain for transactions to addresses.\n" + "When the endblock parameter is omitted, the rescan continues through the best block in the main chain.\n" + @@ -663,6 +669,7 @@ var rpcResultTypes = map[string][]interface{}{ "version": {(*map[string]btcjson.VersionResult)(nil)}, // Websocket commands. + "loadtxfilter": nil, "session": {(*btcjson.SessionResult)(nil)}, "notifyblocks": nil, "stopnotifyblocks": nil, diff --git a/rpcwebsocket.go b/rpcwebsocket.go index 00d74653..b56a1e14 100644 --- a/rpcwebsocket.go +++ b/rpcwebsocket.go @@ -1,4 +1,5 @@ // Copyright (c) 2013-2017 The btcsuite developers +// Copyright (c) 2015-2017 The Decred developers // Use of this source code is governed by an ISC // license that can be found in the LICENSE file. @@ -61,6 +62,7 @@ type wsCommandHandler func(*wsClient, interface{}) (interface{}, error) // causes a dependency loop. var wsHandlers map[string]wsCommandHandler var wsHandlersBeforeInit = map[string]wsCommandHandler{ + "loadtxfilter": handleLoadTxFilter, "help": handleWebsocketHelp, "notifyblocks": handleNotifyBlocks, "notifynewtransactions": handleNotifyNewTransactions, @@ -242,6 +244,205 @@ func (m *wsNotificationManager) NotifyMempoolTx(tx *btcutil.Tx, isNew bool) { } } +// wsClientFilter tracks relevant addresses for each websocket client for +// the future `rescanblocks` extension. It is modified by the `loadtxfilter` +// command. +// +// NOTE: This extension was ported from github.com/decred/dcrd +type wsClientFilter struct { + mu sync.Mutex + + // Implemented fast paths for address lookup. + pubKeyHashes map[[ripemd160.Size]byte]struct{} + scriptHashes map[[ripemd160.Size]byte]struct{} + compressedPubKeys map[[33]byte]struct{} + uncompressedPubKeys map[[65]byte]struct{} + + // A fallback address lookup map in case a fast path doesn't exist. + // Only exists for completeness. If using this shows up in a profile, + // there's a good chance a fast path should be added. + otherAddresses map[string]struct{} + + // Outpoints of unspent outputs. + unspent map[wire.OutPoint]struct{} +} + +// newWSClientFilter creates a new, empty wsClientFilter struct to be used +// for a websocket client. +// +// NOTE: This extension was ported from github.com/decred/dcrd +func newWSClientFilter(addresses []string, unspentOutPoints []wire.OutPoint) *wsClientFilter { + filter := &wsClientFilter{ + pubKeyHashes: map[[ripemd160.Size]byte]struct{}{}, + scriptHashes: map[[ripemd160.Size]byte]struct{}{}, + compressedPubKeys: map[[33]byte]struct{}{}, + uncompressedPubKeys: map[[65]byte]struct{}{}, + otherAddresses: map[string]struct{}{}, + unspent: make(map[wire.OutPoint]struct{}, len(unspentOutPoints)), + } + + for _, s := range addresses { + filter.addAddressStr(s) + } + for i := range unspentOutPoints { + filter.addUnspentOutPoint(&unspentOutPoints[i]) + } + + return filter +} + +// addAddress adds an address to a wsClientFilter, treating it correctly based +// on the type of address passed as an argument. +// +// NOTE: This extension was ported from github.com/decred/dcrd +func (f *wsClientFilter) addAddress(a btcutil.Address) { + switch a := a.(type) { + case *btcutil.AddressPubKeyHash: + f.pubKeyHashes[*a.Hash160()] = struct{}{} + return + case *btcutil.AddressScriptHash: + f.scriptHashes[*a.Hash160()] = struct{}{} + return + case *btcutil.AddressPubKey: + serializedPubKey := a.ScriptAddress() + switch len(serializedPubKey) { + case 33: // compressed + var compressedPubKey [33]byte + copy(compressedPubKey[:], serializedPubKey) + f.compressedPubKeys[compressedPubKey] = struct{}{} + return + case 65: // uncompressed + var uncompressedPubKey [65]byte + copy(uncompressedPubKey[:], serializedPubKey) + f.uncompressedPubKeys[uncompressedPubKey] = struct{}{} + return + } + } + + f.otherAddresses[a.EncodeAddress()] = struct{}{} +} + +// addAddressStr parses an address from a string and then adds it to the +// wsClientFilter using addAddress. +// +// NOTE: This extension was ported from github.com/decred/dcrd +func (f *wsClientFilter) addAddressStr(s string) { + // If address can't be decoded, no point in saving it since it should also + // impossible to create the address from an inspected transaction output + // script. + a, err := btcutil.DecodeAddress(s, activeNetParams.Params) + if err != nil { + return + } + f.addAddress(a) +} + +// existsAddress returns true if the passed address has been added to the +// wsClientFilter. +// +// NOTE: This extension was ported from github.com/decred/dcrd +func (f *wsClientFilter) existsAddress(a btcutil.Address) bool { + switch a := a.(type) { + case *btcutil.AddressPubKeyHash: + _, ok := f.pubKeyHashes[*a.Hash160()] + return ok + case *btcutil.AddressScriptHash: + _, ok := f.scriptHashes[*a.Hash160()] + return ok + case *btcutil.AddressPubKey: + serializedPubKey := a.ScriptAddress() + switch len(serializedPubKey) { + case 33: // compressed + var compressedPubKey [33]byte + copy(compressedPubKey[:], serializedPubKey) + _, ok := f.compressedPubKeys[compressedPubKey] + if !ok { + _, ok = f.pubKeyHashes[*a.AddressPubKeyHash().Hash160()] + } + return ok + case 65: // uncompressed + var uncompressedPubKey [65]byte + copy(uncompressedPubKey[:], serializedPubKey) + _, ok := f.uncompressedPubKeys[uncompressedPubKey] + if !ok { + _, ok = f.pubKeyHashes[*a.AddressPubKeyHash().Hash160()] + } + return ok + } + } + + _, ok := f.otherAddresses[a.EncodeAddress()] + return ok +} + +// removeAddress removes the passed address, if it exists, from the +// wsClientFilter. +// +// NOTE: This extension was ported from github.com/decred/dcrd +func (f *wsClientFilter) removeAddress(a btcutil.Address) { + switch a := a.(type) { + case *btcutil.AddressPubKeyHash: + delete(f.pubKeyHashes, *a.Hash160()) + return + case *btcutil.AddressScriptHash: + delete(f.scriptHashes, *a.Hash160()) + return + case *btcutil.AddressPubKey: + serializedPubKey := a.ScriptAddress() + switch len(serializedPubKey) { + case 33: // compressed + var compressedPubKey [33]byte + copy(compressedPubKey[:], serializedPubKey) + delete(f.compressedPubKeys, compressedPubKey) + return + case 65: // uncompressed + var uncompressedPubKey [65]byte + copy(uncompressedPubKey[:], serializedPubKey) + delete(f.uncompressedPubKeys, uncompressedPubKey) + return + } + } + + delete(f.otherAddresses, a.EncodeAddress()) +} + +// removeAddressStr parses an address from a string and then removes it from the +// wsClientFilter using removeAddress. +// +// NOTE: This extension was ported from github.com/decred/dcrd +func (f *wsClientFilter) removeAddressStr(s string) { + a, err := btcutil.DecodeAddress(s, activeNetParams.Params) + if err == nil { + f.removeAddress(a) + } else { + delete(f.otherAddresses, s) + } +} + +// addUnspentOutPoint adds an outpoint to the wsClientFilter. +// +// NOTE: This extension was ported from github.com/decred/dcrd +func (f *wsClientFilter) addUnspentOutPoint(op *wire.OutPoint) { + f.unspent[*op] = struct{}{} +} + +// existsUnspentOutPoint returns true if the passed outpoint has been added to +// the wsClientFilter. +// +// NOTE: This extension was ported from github.com/decred/dcrd +func (f *wsClientFilter) existsUnspentOutPoint(op *wire.OutPoint) bool { + _, ok := f.unspent[*op] + return ok +} + +// removeUnspentOutPoint removes the passed outpoint, if it exists, from the +// wsClientFilter. +// +// NOTE: This extension was ported from github.com/decred/dcrd +func (f *wsClientFilter) removeUnspentOutPoint(op *wire.OutPoint) { + delete(f.unspent, *op) +} + // Notification types type notificationBlockConnected btcutil.Block type notificationBlockDisconnected btcutil.Block @@ -316,17 +517,26 @@ out: if len(blockNotifications) != 0 { m.notifyBlockConnected(blockNotifications, block) + m.notifyFilteredBlockConnected(blockNotifications, + block) } case *notificationBlockDisconnected: - m.notifyBlockDisconnected(blockNotifications, - (*btcutil.Block)(n)) + block := (*btcutil.Block)(n) + + if len(blockNotifications) != 0 { + m.notifyBlockDisconnected(blockNotifications, + block) + m.notifyFilteredBlockDisconnected(blockNotifications, + block) + } case *notificationTxAcceptedByMempool: if n.isNew && len(txNotifications) != 0 { m.notifyForNewTx(txNotifications, n.tx) } m.notifyForTx(watchedOutPoints, watchedAddrs, n.tx, nil) + m.notifyRelevantTxAccepted(n.tx, clients) case *notificationRegisterBlocks: wsc := (*wsClient)(n) @@ -414,6 +624,68 @@ func (m *wsNotificationManager) UnregisterBlockUpdates(wsc *wsClient) { m.queueNotification <- (*notificationUnregisterBlocks)(wsc) } +// subscribedClients returns the set of all websocket client quit channels that +// are registered to receive notifications regarding tx, either due to tx +// spending a watched output or outputting to a watched address. Matching +// client's filters are updated based on this transaction's outputs and output +// addresses that may be relevant for a client. +func (m *wsNotificationManager) subscribedClients(tx *btcutil.Tx, + clients map[chan struct{}]*wsClient) map[chan struct{}]struct{} { + + // Use a map of client quit channels as keys to prevent duplicates when + // multiple inputs and/or outputs are relevant to the client. + subscribed := make(map[chan struct{}]struct{}) + + msgTx := tx.MsgTx() + for _, input := range msgTx.TxIn { + for quitChan, wsc := range clients { + wsc.Lock() + filter := wsc.filterData + wsc.Unlock() + if filter == nil { + continue + } + filter.mu.Lock() + if filter.existsUnspentOutPoint(&input.PreviousOutPoint) { + subscribed[quitChan] = struct{}{} + } + filter.mu.Unlock() + } + } + + for i, output := range msgTx.TxOut { + _, addrs, _, err := txscript.ExtractPkScriptAddrs( + output.PkScript, m.server.server.chainParams) + if err != nil { + // Clients are not able to subscribe to + // nonstandard or non-address outputs. + continue + } + for quitChan, wsc := range clients { + wsc.Lock() + filter := wsc.filterData + wsc.Unlock() + if filter == nil { + continue + } + filter.mu.Lock() + for _, a := range addrs { + if filter.existsAddress(a) { + subscribed[quitChan] = struct{}{} + op := wire.OutPoint{ + Hash: *tx.Hash(), + Index: uint32(i), + } + filter.addUnspentOutPoint(&op) + } + } + filter.mu.Unlock() + } + } + + return subscribed +} + // notifyBlockConnected notifies websocket clients that have registered for // block updates when a block is connected to the main chain. func (*wsNotificationManager) notifyBlockConnected(clients map[chan struct{}]*wsClient, @@ -424,7 +696,7 @@ func (*wsNotificationManager) notifyBlockConnected(clients map[chan struct{}]*ws block.MsgBlock().Header.Timestamp.Unix()) marshalledJSON, err := btcjson.MarshalCmd(nil, ntfn) if err != nil { - rpcsLog.Error("Failed to marshal block connected notification: "+ + rpcsLog.Errorf("Failed to marshal block connected notification: "+ "%v", err) return } @@ -448,7 +720,85 @@ func (*wsNotificationManager) notifyBlockDisconnected(clients map[chan struct{}] block.Height(), block.MsgBlock().Header.Timestamp.Unix()) marshalledJSON, err := btcjson.MarshalCmd(nil, ntfn) if err != nil { - rpcsLog.Error("Failed to marshal block disconnected "+ + rpcsLog.Errorf("Failed to marshal block disconnected "+ + "notification: %v", err) + return + } + for _, wsc := range clients { + wsc.QueueNotification(marshalledJSON) + } +} + +// notifyFilteredBlockConnected notifies websocket clients that have registered for +// block updates when a block is connected to the main chain. +func (m *wsNotificationManager) notifyFilteredBlockConnected(clients map[chan struct{}]*wsClient, + block *btcutil.Block) { + + // Create the common portion of the notification that is the same for + // every client. + var w bytes.Buffer + err := block.MsgBlock().Header.Serialize(&w) + if err != nil { + rpcsLog.Errorf("Failed to serialize header for filtered block "+ + "connected notification: %v", err) + return + } + ntfn := btcjson.NewFilteredBlockConnectedNtfn(block.Height(), + hex.EncodeToString(w.Bytes()), nil) + + // Search for relevant transactions for each client and save them + // serialized in hex encoding for the notification. + subscribedTxs := make(map[chan struct{}][]string) + for _, tx := range block.Transactions() { + var txHex string + for quitChan := range m.subscribedClients(tx, clients) { + if txHex == "" { + txHex = txHexString(tx.MsgTx()) + } + subscribedTxs[quitChan] = append(subscribedTxs[quitChan], txHex) + } + } + for quitChan, wsc := range clients { + + // Add all discovered transactions for this client. For clients + // that have no new-style filter, add the empty string slice. + ntfn.SubscribedTxs = subscribedTxs[quitChan] + + // Marshal and queue notification. + marshalledJSON, err := btcjson.MarshalCmd(nil, ntfn) + if err != nil { + rpcsLog.Errorf("Failed to marshal filtered block "+ + "connected notification: %v", err) + return + } + wsc.QueueNotification(marshalledJSON) + } +} + +// notifyFilteredBlockDisconnected notifies websocket clients that have registered for +// block updates when a block is disconnected from the main chain (due to a +// reorganize). +func (*wsNotificationManager) notifyFilteredBlockDisconnected(clients map[chan struct{}]*wsClient, + block *btcutil.Block) { + // Skip notification creation if no clients have requested block + // connected/disconnected notifications. + if len(clients) == 0 { + return + } + + // Notify interested websocket clients about the disconnected block. + var w bytes.Buffer + err := block.MsgBlock().Header.Serialize(&w) + if err != nil { + rpcsLog.Errorf("Failed to serialize header for filtered block "+ + "disconnected notification: %v", err) + return + } + ntfn := btcjson.NewFilteredBlockDisconnectedNtfn(block.Height(), + hex.EncodeToString(w.Bytes())) + marshalledJSON, err := btcjson.MarshalCmd(nil, ntfn) + if err != nil { + rpcsLog.Errorf("Failed to marshal filtered block disconnected "+ "notification: %v", err) return } @@ -588,10 +938,10 @@ func (*wsNotificationManager) removeSpentRequest(ops map[wire.OutPoint]map[chan } // txHexString returns the serialized transaction encoded in hexadecimal. -func txHexString(tx *btcutil.Tx) string { - buf := bytes.NewBuffer(make([]byte, 0, tx.MsgTx().SerializeSize())) +func txHexString(tx *wire.MsgTx) string { + buf := bytes.NewBuffer(make([]byte, 0, tx.SerializeSize())) // Ignore Serialize's error, as writing to a bytes.buffer cannot fail. - tx.MsgTx().Serialize(buf) + tx.Serialize(buf) return hex.EncodeToString(buf.Bytes()) } @@ -645,7 +995,7 @@ func (m *wsNotificationManager) notifyForTxOuts(ops map[wire.OutPoint]map[chan s } if txHex == "" { - txHex = txHexString(tx) + txHex = txHexString(tx.MsgTx()) } ntfn := btcjson.NewRecvTxNtfn(txHex, blockDetails(block, tx.Index())) @@ -669,6 +1019,29 @@ func (m *wsNotificationManager) notifyForTxOuts(ops map[wire.OutPoint]map[chan s } } +// notifyRelevantTxAccepted examines the inputs and outputs of the passed +// transaction, notifying websocket clients of outputs spending to a watched +// address and inputs spending a watched outpoint. Any outputs paying to a +// watched address result in the output being watched as well for future +// notifications. +func (m *wsNotificationManager) notifyRelevantTxAccepted(tx *btcutil.Tx, + clients map[chan struct{}]*wsClient) { + + clientsToNotify := m.subscribedClients(tx, clients) + + if len(clientsToNotify) != 0 { + n := btcjson.NewRelevantTxAcceptedNtfn(txHexString(tx.MsgTx())) + marshalled, err := btcjson.MarshalCmd(nil, n) + if err != nil { + rpcsLog.Errorf("Failed to marshal notification: %v", err) + return + } + for quitChan := range clientsToNotify { + clients[quitChan].QueueNotification(marshalled) + } + } +} + // notifyForTx examines the inputs and outputs of the passed transaction, // notifying websocket clients of outputs spending to a watched address // and inputs spending a watched outpoint. @@ -701,7 +1074,7 @@ func (m *wsNotificationManager) notifyForTxIns(ops map[wire.OutPoint]map[chan st prevOut := &txIn.PreviousOutPoint if cmap, ok := ops[*prevOut]; ok { if txHex == "" { - txHex = txHexString(tx) + txHex = txHexString(tx.MsgTx()) } marshalledJSON, err := newRedeemingTxNotification(txHex, tx.Index(), block) if err != nil { @@ -898,6 +1271,11 @@ type wsClient struct { // Owned by the notification manager. spentRequests map[wire.OutPoint]struct{} + // filterData is the new generation transaction filter backported from + // github.com/decred/dcrd for the new backported `loadtxfilter` and + // future `rescanblocks` methods. + filterData *wsClientFilter + // Networking infrastructure. serviceRequestSem semaphore ntfnChan chan []byte @@ -1386,6 +1764,48 @@ func handleWebsocketHelp(wsc *wsClient, icmd interface{}) (interface{}, error) { return help, nil } +// handleLoadTxFilter implements the loadtxfilter command extension for +// websocket connections. +// +// NOTE: This extension is ported from github.com/decred/dcrd +func handleLoadTxFilter(wsc *wsClient, icmd interface{}) (interface{}, error) { + cmd := icmd.(*btcjson.LoadTxFilterCmd) + + outPoints := make([]wire.OutPoint, len(cmd.OutPoints)) + for i := range cmd.OutPoints { + hash, err := chainhash.NewHashFromStr(cmd.OutPoints[i].Hash) + if err != nil { + return nil, &btcjson.RPCError{ + Code: btcjson.ErrRPCInvalidParameter, + Message: err.Error(), + } + } + outPoints[i] = wire.OutPoint{ + Hash: *hash, + Index: cmd.OutPoints[i].Index, + } + } + + wsc.Lock() + if cmd.Reload || wsc.filterData == nil { + wsc.filterData = newWSClientFilter(cmd.Addresses, outPoints) + wsc.Unlock() + } else { + wsc.Unlock() + + wsc.filterData.mu.Lock() + for _, a := range cmd.Addresses { + wsc.filterData.addAddressStr(a) + } + for i := range outPoints { + wsc.filterData.addUnspentOutPoint(&outPoints[i]) + } + wsc.filterData.mu.Unlock() + } + + return nil, nil +} + // handleNotifyBlocks implements the notifyblocks command extension for // websocket connections. func handleNotifyBlocks(wsc *wsClient, icmd interface{}) (interface{}, error) { @@ -1589,7 +2009,7 @@ func rescanBlock(wsc *wsClient, lookups *rescanKeys, blk *btcutil.Block) { } if txHex == "" { - txHex = txHexString(tx) + txHex = txHexString(tx.MsgTx()) } marshalledJSON, err := newRedeemingTxNotification(txHex, tx.Index(), blk) if err != nil { @@ -1676,7 +2096,7 @@ func rescanBlock(wsc *wsClient, lookups *rescanKeys, blk *btcutil.Block) { } if txHex == "" { - txHex = txHexString(tx) + txHex = txHexString(tx.MsgTx()) } ntfn := btcjson.NewRecvTxNtfn(txHex, blockDetails(blk, tx.Index()))