rpcserver: implement loadtxfilter backported from dcrd

This commit is contained in:
Alex 2017-01-25 11:49:35 -07:00
parent 5c689c8b77
commit 4943ed11b3
3 changed files with 441 additions and 13 deletions

View file

@ -44,9 +44,9 @@ import (
// API version constants
const (
jsonrpcSemverString = "1.1.0"
jsonrpcSemverString = "1.2.0"
jsonrpcSemverMajor = 1
jsonrpcSemverMinor = 1
jsonrpcSemverMinor = 2
jsonrpcSemverPatch = 0
)
@ -232,6 +232,7 @@ var rpcUnimplemented = map[string]struct{}{
// Commands that are available to a limited user
var rpcLimited = map[string]struct{}{
// Websockets commands
"loadtxfilter": {},
"notifyblocks": {},
"notifynewtransactions": {},
"notifyreceived": {},

View file

@ -590,6 +590,12 @@ var helpDescsEnUS = map[string]string{
"stopnotifyspent--synopsis": "Cancel registered spending notifications for each passed outpoint.",
"stopnotifyspent-outpoints": "List of transaction outpoints to stop monitoring.",
// LoadTxFilterCmd help.
"loadtxfilter--synopsis": "Load, add to, or reload a websocket client's transaction filter for mempool transactions, new blocks and rescanblocks.",
"loadtxfilter-reload": "Load a new filter instead of adding data to an existing one",
"loadtxfilter-addresses": "Array of addresses to add to the transaction filter",
"loadtxfilter-outpoints": "Array of outpoints to add to the transaction filter",
// Rescan help.
"rescan--synopsis": "Rescan block chain for transactions to addresses.\n" +
"When the endblock parameter is omitted, the rescan continues through the best block in the main chain.\n" +
@ -663,6 +669,7 @@ var rpcResultTypes = map[string][]interface{}{
"version": {(*map[string]btcjson.VersionResult)(nil)},
// Websocket commands.
"loadtxfilter": nil,
"session": {(*btcjson.SessionResult)(nil)},
"notifyblocks": nil,
"stopnotifyblocks": nil,

View file

@ -1,4 +1,5 @@
// Copyright (c) 2013-2017 The btcsuite developers
// Copyright (c) 2015-2017 The Decred developers
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
@ -61,6 +62,7 @@ type wsCommandHandler func(*wsClient, interface{}) (interface{}, error)
// causes a dependency loop.
var wsHandlers map[string]wsCommandHandler
var wsHandlersBeforeInit = map[string]wsCommandHandler{
"loadtxfilter": handleLoadTxFilter,
"help": handleWebsocketHelp,
"notifyblocks": handleNotifyBlocks,
"notifynewtransactions": handleNotifyNewTransactions,
@ -242,6 +244,205 @@ func (m *wsNotificationManager) NotifyMempoolTx(tx *btcutil.Tx, isNew bool) {
}
}
// wsClientFilter tracks relevant addresses for each websocket client for
// the future `rescanblocks` extension. It is modified by the `loadtxfilter`
// command.
//
// NOTE: This extension was ported from github.com/decred/dcrd
type wsClientFilter struct {
mu sync.Mutex
// Implemented fast paths for address lookup.
pubKeyHashes map[[ripemd160.Size]byte]struct{}
scriptHashes map[[ripemd160.Size]byte]struct{}
compressedPubKeys map[[33]byte]struct{}
uncompressedPubKeys map[[65]byte]struct{}
// A fallback address lookup map in case a fast path doesn't exist.
// Only exists for completeness. If using this shows up in a profile,
// there's a good chance a fast path should be added.
otherAddresses map[string]struct{}
// Outpoints of unspent outputs.
unspent map[wire.OutPoint]struct{}
}
// newWSClientFilter creates a new, empty wsClientFilter struct to be used
// for a websocket client.
//
// NOTE: This extension was ported from github.com/decred/dcrd
func newWSClientFilter(addresses []string, unspentOutPoints []wire.OutPoint) *wsClientFilter {
filter := &wsClientFilter{
pubKeyHashes: map[[ripemd160.Size]byte]struct{}{},
scriptHashes: map[[ripemd160.Size]byte]struct{}{},
compressedPubKeys: map[[33]byte]struct{}{},
uncompressedPubKeys: map[[65]byte]struct{}{},
otherAddresses: map[string]struct{}{},
unspent: make(map[wire.OutPoint]struct{}, len(unspentOutPoints)),
}
for _, s := range addresses {
filter.addAddressStr(s)
}
for i := range unspentOutPoints {
filter.addUnspentOutPoint(&unspentOutPoints[i])
}
return filter
}
// addAddress adds an address to a wsClientFilter, treating it correctly based
// on the type of address passed as an argument.
//
// NOTE: This extension was ported from github.com/decred/dcrd
func (f *wsClientFilter) addAddress(a btcutil.Address) {
switch a := a.(type) {
case *btcutil.AddressPubKeyHash:
f.pubKeyHashes[*a.Hash160()] = struct{}{}
return
case *btcutil.AddressScriptHash:
f.scriptHashes[*a.Hash160()] = struct{}{}
return
case *btcutil.AddressPubKey:
serializedPubKey := a.ScriptAddress()
switch len(serializedPubKey) {
case 33: // compressed
var compressedPubKey [33]byte
copy(compressedPubKey[:], serializedPubKey)
f.compressedPubKeys[compressedPubKey] = struct{}{}
return
case 65: // uncompressed
var uncompressedPubKey [65]byte
copy(uncompressedPubKey[:], serializedPubKey)
f.uncompressedPubKeys[uncompressedPubKey] = struct{}{}
return
}
}
f.otherAddresses[a.EncodeAddress()] = struct{}{}
}
// addAddressStr parses an address from a string and then adds it to the
// wsClientFilter using addAddress.
//
// NOTE: This extension was ported from github.com/decred/dcrd
func (f *wsClientFilter) addAddressStr(s string) {
// If address can't be decoded, no point in saving it since it should also
// impossible to create the address from an inspected transaction output
// script.
a, err := btcutil.DecodeAddress(s, activeNetParams.Params)
if err != nil {
return
}
f.addAddress(a)
}
// existsAddress returns true if the passed address has been added to the
// wsClientFilter.
//
// NOTE: This extension was ported from github.com/decred/dcrd
func (f *wsClientFilter) existsAddress(a btcutil.Address) bool {
switch a := a.(type) {
case *btcutil.AddressPubKeyHash:
_, ok := f.pubKeyHashes[*a.Hash160()]
return ok
case *btcutil.AddressScriptHash:
_, ok := f.scriptHashes[*a.Hash160()]
return ok
case *btcutil.AddressPubKey:
serializedPubKey := a.ScriptAddress()
switch len(serializedPubKey) {
case 33: // compressed
var compressedPubKey [33]byte
copy(compressedPubKey[:], serializedPubKey)
_, ok := f.compressedPubKeys[compressedPubKey]
if !ok {
_, ok = f.pubKeyHashes[*a.AddressPubKeyHash().Hash160()]
}
return ok
case 65: // uncompressed
var uncompressedPubKey [65]byte
copy(uncompressedPubKey[:], serializedPubKey)
_, ok := f.uncompressedPubKeys[uncompressedPubKey]
if !ok {
_, ok = f.pubKeyHashes[*a.AddressPubKeyHash().Hash160()]
}
return ok
}
}
_, ok := f.otherAddresses[a.EncodeAddress()]
return ok
}
// removeAddress removes the passed address, if it exists, from the
// wsClientFilter.
//
// NOTE: This extension was ported from github.com/decred/dcrd
func (f *wsClientFilter) removeAddress(a btcutil.Address) {
switch a := a.(type) {
case *btcutil.AddressPubKeyHash:
delete(f.pubKeyHashes, *a.Hash160())
return
case *btcutil.AddressScriptHash:
delete(f.scriptHashes, *a.Hash160())
return
case *btcutil.AddressPubKey:
serializedPubKey := a.ScriptAddress()
switch len(serializedPubKey) {
case 33: // compressed
var compressedPubKey [33]byte
copy(compressedPubKey[:], serializedPubKey)
delete(f.compressedPubKeys, compressedPubKey)
return
case 65: // uncompressed
var uncompressedPubKey [65]byte
copy(uncompressedPubKey[:], serializedPubKey)
delete(f.uncompressedPubKeys, uncompressedPubKey)
return
}
}
delete(f.otherAddresses, a.EncodeAddress())
}
// removeAddressStr parses an address from a string and then removes it from the
// wsClientFilter using removeAddress.
//
// NOTE: This extension was ported from github.com/decred/dcrd
func (f *wsClientFilter) removeAddressStr(s string) {
a, err := btcutil.DecodeAddress(s, activeNetParams.Params)
if err == nil {
f.removeAddress(a)
} else {
delete(f.otherAddresses, s)
}
}
// addUnspentOutPoint adds an outpoint to the wsClientFilter.
//
// NOTE: This extension was ported from github.com/decred/dcrd
func (f *wsClientFilter) addUnspentOutPoint(op *wire.OutPoint) {
f.unspent[*op] = struct{}{}
}
// existsUnspentOutPoint returns true if the passed outpoint has been added to
// the wsClientFilter.
//
// NOTE: This extension was ported from github.com/decred/dcrd
func (f *wsClientFilter) existsUnspentOutPoint(op *wire.OutPoint) bool {
_, ok := f.unspent[*op]
return ok
}
// removeUnspentOutPoint removes the passed outpoint, if it exists, from the
// wsClientFilter.
//
// NOTE: This extension was ported from github.com/decred/dcrd
func (f *wsClientFilter) removeUnspentOutPoint(op *wire.OutPoint) {
delete(f.unspent, *op)
}
// Notification types
type notificationBlockConnected btcutil.Block
type notificationBlockDisconnected btcutil.Block
@ -316,17 +517,26 @@ out:
if len(blockNotifications) != 0 {
m.notifyBlockConnected(blockNotifications,
block)
m.notifyFilteredBlockConnected(blockNotifications,
block)
}
case *notificationBlockDisconnected:
m.notifyBlockDisconnected(blockNotifications,
(*btcutil.Block)(n))
block := (*btcutil.Block)(n)
if len(blockNotifications) != 0 {
m.notifyBlockDisconnected(blockNotifications,
block)
m.notifyFilteredBlockDisconnected(blockNotifications,
block)
}
case *notificationTxAcceptedByMempool:
if n.isNew && len(txNotifications) != 0 {
m.notifyForNewTx(txNotifications, n.tx)
}
m.notifyForTx(watchedOutPoints, watchedAddrs, n.tx, nil)
m.notifyRelevantTxAccepted(n.tx, clients)
case *notificationRegisterBlocks:
wsc := (*wsClient)(n)
@ -414,6 +624,68 @@ func (m *wsNotificationManager) UnregisterBlockUpdates(wsc *wsClient) {
m.queueNotification <- (*notificationUnregisterBlocks)(wsc)
}
// subscribedClients returns the set of all websocket client quit channels that
// are registered to receive notifications regarding tx, either due to tx
// spending a watched output or outputting to a watched address. Matching
// client's filters are updated based on this transaction's outputs and output
// addresses that may be relevant for a client.
func (m *wsNotificationManager) subscribedClients(tx *btcutil.Tx,
clients map[chan struct{}]*wsClient) map[chan struct{}]struct{} {
// Use a map of client quit channels as keys to prevent duplicates when
// multiple inputs and/or outputs are relevant to the client.
subscribed := make(map[chan struct{}]struct{})
msgTx := tx.MsgTx()
for _, input := range msgTx.TxIn {
for quitChan, wsc := range clients {
wsc.Lock()
filter := wsc.filterData
wsc.Unlock()
if filter == nil {
continue
}
filter.mu.Lock()
if filter.existsUnspentOutPoint(&input.PreviousOutPoint) {
subscribed[quitChan] = struct{}{}
}
filter.mu.Unlock()
}
}
for i, output := range msgTx.TxOut {
_, addrs, _, err := txscript.ExtractPkScriptAddrs(
output.PkScript, m.server.server.chainParams)
if err != nil {
// Clients are not able to subscribe to
// nonstandard or non-address outputs.
continue
}
for quitChan, wsc := range clients {
wsc.Lock()
filter := wsc.filterData
wsc.Unlock()
if filter == nil {
continue
}
filter.mu.Lock()
for _, a := range addrs {
if filter.existsAddress(a) {
subscribed[quitChan] = struct{}{}
op := wire.OutPoint{
Hash: *tx.Hash(),
Index: uint32(i),
}
filter.addUnspentOutPoint(&op)
}
}
filter.mu.Unlock()
}
}
return subscribed
}
// notifyBlockConnected notifies websocket clients that have registered for
// block updates when a block is connected to the main chain.
func (*wsNotificationManager) notifyBlockConnected(clients map[chan struct{}]*wsClient,
@ -424,7 +696,7 @@ func (*wsNotificationManager) notifyBlockConnected(clients map[chan struct{}]*ws
block.MsgBlock().Header.Timestamp.Unix())
marshalledJSON, err := btcjson.MarshalCmd(nil, ntfn)
if err != nil {
rpcsLog.Error("Failed to marshal block connected notification: "+
rpcsLog.Errorf("Failed to marshal block connected notification: "+
"%v", err)
return
}
@ -448,7 +720,85 @@ func (*wsNotificationManager) notifyBlockDisconnected(clients map[chan struct{}]
block.Height(), block.MsgBlock().Header.Timestamp.Unix())
marshalledJSON, err := btcjson.MarshalCmd(nil, ntfn)
if err != nil {
rpcsLog.Error("Failed to marshal block disconnected "+
rpcsLog.Errorf("Failed to marshal block disconnected "+
"notification: %v", err)
return
}
for _, wsc := range clients {
wsc.QueueNotification(marshalledJSON)
}
}
// notifyFilteredBlockConnected notifies websocket clients that have registered for
// block updates when a block is connected to the main chain.
func (m *wsNotificationManager) notifyFilteredBlockConnected(clients map[chan struct{}]*wsClient,
block *btcutil.Block) {
// Create the common portion of the notification that is the same for
// every client.
var w bytes.Buffer
err := block.MsgBlock().Header.Serialize(&w)
if err != nil {
rpcsLog.Errorf("Failed to serialize header for filtered block "+
"connected notification: %v", err)
return
}
ntfn := btcjson.NewFilteredBlockConnectedNtfn(block.Height(),
hex.EncodeToString(w.Bytes()), nil)
// Search for relevant transactions for each client and save them
// serialized in hex encoding for the notification.
subscribedTxs := make(map[chan struct{}][]string)
for _, tx := range block.Transactions() {
var txHex string
for quitChan := range m.subscribedClients(tx, clients) {
if txHex == "" {
txHex = txHexString(tx.MsgTx())
}
subscribedTxs[quitChan] = append(subscribedTxs[quitChan], txHex)
}
}
for quitChan, wsc := range clients {
// Add all discovered transactions for this client. For clients
// that have no new-style filter, add the empty string slice.
ntfn.SubscribedTxs = subscribedTxs[quitChan]
// Marshal and queue notification.
marshalledJSON, err := btcjson.MarshalCmd(nil, ntfn)
if err != nil {
rpcsLog.Errorf("Failed to marshal filtered block "+
"connected notification: %v", err)
return
}
wsc.QueueNotification(marshalledJSON)
}
}
// notifyFilteredBlockDisconnected notifies websocket clients that have registered for
// block updates when a block is disconnected from the main chain (due to a
// reorganize).
func (*wsNotificationManager) notifyFilteredBlockDisconnected(clients map[chan struct{}]*wsClient,
block *btcutil.Block) {
// Skip notification creation if no clients have requested block
// connected/disconnected notifications.
if len(clients) == 0 {
return
}
// Notify interested websocket clients about the disconnected block.
var w bytes.Buffer
err := block.MsgBlock().Header.Serialize(&w)
if err != nil {
rpcsLog.Errorf("Failed to serialize header for filtered block "+
"disconnected notification: %v", err)
return
}
ntfn := btcjson.NewFilteredBlockDisconnectedNtfn(block.Height(),
hex.EncodeToString(w.Bytes()))
marshalledJSON, err := btcjson.MarshalCmd(nil, ntfn)
if err != nil {
rpcsLog.Errorf("Failed to marshal filtered block disconnected "+
"notification: %v", err)
return
}
@ -588,10 +938,10 @@ func (*wsNotificationManager) removeSpentRequest(ops map[wire.OutPoint]map[chan
}
// txHexString returns the serialized transaction encoded in hexadecimal.
func txHexString(tx *btcutil.Tx) string {
buf := bytes.NewBuffer(make([]byte, 0, tx.MsgTx().SerializeSize()))
func txHexString(tx *wire.MsgTx) string {
buf := bytes.NewBuffer(make([]byte, 0, tx.SerializeSize()))
// Ignore Serialize's error, as writing to a bytes.buffer cannot fail.
tx.MsgTx().Serialize(buf)
tx.Serialize(buf)
return hex.EncodeToString(buf.Bytes())
}
@ -645,7 +995,7 @@ func (m *wsNotificationManager) notifyForTxOuts(ops map[wire.OutPoint]map[chan s
}
if txHex == "" {
txHex = txHexString(tx)
txHex = txHexString(tx.MsgTx())
}
ntfn := btcjson.NewRecvTxNtfn(txHex, blockDetails(block,
tx.Index()))
@ -669,6 +1019,29 @@ func (m *wsNotificationManager) notifyForTxOuts(ops map[wire.OutPoint]map[chan s
}
}
// notifyRelevantTxAccepted examines the inputs and outputs of the passed
// transaction, notifying websocket clients of outputs spending to a watched
// address and inputs spending a watched outpoint. Any outputs paying to a
// watched address result in the output being watched as well for future
// notifications.
func (m *wsNotificationManager) notifyRelevantTxAccepted(tx *btcutil.Tx,
clients map[chan struct{}]*wsClient) {
clientsToNotify := m.subscribedClients(tx, clients)
if len(clientsToNotify) != 0 {
n := btcjson.NewRelevantTxAcceptedNtfn(txHexString(tx.MsgTx()))
marshalled, err := btcjson.MarshalCmd(nil, n)
if err != nil {
rpcsLog.Errorf("Failed to marshal notification: %v", err)
return
}
for quitChan := range clientsToNotify {
clients[quitChan].QueueNotification(marshalled)
}
}
}
// notifyForTx examines the inputs and outputs of the passed transaction,
// notifying websocket clients of outputs spending to a watched address
// and inputs spending a watched outpoint.
@ -701,7 +1074,7 @@ func (m *wsNotificationManager) notifyForTxIns(ops map[wire.OutPoint]map[chan st
prevOut := &txIn.PreviousOutPoint
if cmap, ok := ops[*prevOut]; ok {
if txHex == "" {
txHex = txHexString(tx)
txHex = txHexString(tx.MsgTx())
}
marshalledJSON, err := newRedeemingTxNotification(txHex, tx.Index(), block)
if err != nil {
@ -898,6 +1271,11 @@ type wsClient struct {
// Owned by the notification manager.
spentRequests map[wire.OutPoint]struct{}
// filterData is the new generation transaction filter backported from
// github.com/decred/dcrd for the new backported `loadtxfilter` and
// future `rescanblocks` methods.
filterData *wsClientFilter
// Networking infrastructure.
serviceRequestSem semaphore
ntfnChan chan []byte
@ -1386,6 +1764,48 @@ func handleWebsocketHelp(wsc *wsClient, icmd interface{}) (interface{}, error) {
return help, nil
}
// handleLoadTxFilter implements the loadtxfilter command extension for
// websocket connections.
//
// NOTE: This extension is ported from github.com/decred/dcrd
func handleLoadTxFilter(wsc *wsClient, icmd interface{}) (interface{}, error) {
cmd := icmd.(*btcjson.LoadTxFilterCmd)
outPoints := make([]wire.OutPoint, len(cmd.OutPoints))
for i := range cmd.OutPoints {
hash, err := chainhash.NewHashFromStr(cmd.OutPoints[i].Hash)
if err != nil {
return nil, &btcjson.RPCError{
Code: btcjson.ErrRPCInvalidParameter,
Message: err.Error(),
}
}
outPoints[i] = wire.OutPoint{
Hash: *hash,
Index: cmd.OutPoints[i].Index,
}
}
wsc.Lock()
if cmd.Reload || wsc.filterData == nil {
wsc.filterData = newWSClientFilter(cmd.Addresses, outPoints)
wsc.Unlock()
} else {
wsc.Unlock()
wsc.filterData.mu.Lock()
for _, a := range cmd.Addresses {
wsc.filterData.addAddressStr(a)
}
for i := range outPoints {
wsc.filterData.addUnspentOutPoint(&outPoints[i])
}
wsc.filterData.mu.Unlock()
}
return nil, nil
}
// handleNotifyBlocks implements the notifyblocks command extension for
// websocket connections.
func handleNotifyBlocks(wsc *wsClient, icmd interface{}) (interface{}, error) {
@ -1589,7 +2009,7 @@ func rescanBlock(wsc *wsClient, lookups *rescanKeys, blk *btcutil.Block) {
}
if txHex == "" {
txHex = txHexString(tx)
txHex = txHexString(tx.MsgTx())
}
marshalledJSON, err := newRedeemingTxNotification(txHex, tx.Index(), blk)
if err != nil {
@ -1676,7 +2096,7 @@ func rescanBlock(wsc *wsClient, lookups *rescanKeys, blk *btcutil.Block) {
}
if txHex == "" {
txHex = txHexString(tx)
txHex = txHexString(tx.MsgTx())
}
ntfn := btcjson.NewRecvTxNtfn(txHex,
blockDetails(blk, tx.Index()))