Fix #122 by allowing clients to cancel websockets notifications.

This commit adds 4 new websockets JSON-RPC methods for canceling
notifications:
  * stopnotifyspent
  * stopnotifyreceived
  * stopnotifyblocks
  * stopnotifynewtransactions
This commit is contained in:
Olaoluwa Osuntokun 2015-03-03 12:37:02 -08:00
parent 415d7742a2
commit 6801c0000a
5 changed files with 285 additions and 26 deletions

View file

@ -31,6 +31,15 @@ func NewNotifyBlocksCmd() *NotifyBlocksCmd {
return &NotifyBlocksCmd{}
}
// StopNotifyBlocksCmd defines the stopnotifyblocks JSON-RPC command.
type StopNotifyBlocksCmd struct{}
// NewStopNotifyBlocksCmd returns a new instance which can be used to issue a
// stopnotifyblocks JSON-RPC command.
func NewStopNotifyBlocksCmd() *StopNotifyBlocksCmd {
return &StopNotifyBlocksCmd{}
}
// NotifyNewTransactionsCmd defines the notifynewtransactions JSON-RPC command.
type NotifyNewTransactionsCmd struct {
Verbose *bool `jsonrpcdefault:"false"`
@ -47,6 +56,18 @@ func NewNotifyNewTransactionsCmd(verbose *bool) *NotifyNewTransactionsCmd {
}
}
// StopNotifyNewTransactionsCmd defines the stopnotifynewtransactions JSON-RPC command.
type StopNotifyNewTransactionsCmd struct{}
// NewStopNotifyNewTransactionsCmd returns a new instance which can be used to issue
// a stopnotifynewtransactions JSON-RPC command.
//
// The parameters which are pointers indicate they are optional. Passing nil
// for optional parameters will use the default value.
func NewStopNotifyNewTransactionsCmd() *StopNotifyNewTransactionsCmd {
return &StopNotifyNewTransactionsCmd{}
}
// NotifyReceivedCmd defines the notifyreceived JSON-RPC command.
type NotifyReceivedCmd struct {
Addresses []string
@ -80,6 +101,32 @@ func NewNotifySpentCmd(outPoints []OutPoint) *NotifySpentCmd {
}
}
// StopNotifyReceivedCmd defines the stopnotifyreceived JSON-RPC command.
type StopNotifyReceivedCmd struct {
Addresses []string
}
// NewStopNotifyReceivedCmd returns a new instance which can be used to issue a
// stopnotifyreceived JSON-RPC command.
func NewStopNotifyReceivedCmd(addresses []string) *StopNotifyReceivedCmd {
return &StopNotifyReceivedCmd{
Addresses: addresses,
}
}
// StopNotifySpentCmd defines the stopnotifyspent JSON-RPC command.
type StopNotifySpentCmd struct {
OutPoints []OutPoint
}
// NewStopNotifySpentCmd returns a new instance which can be used to issue a
// stopnotifyspent JSON-RPC command.
func NewStopNotifySpentCmd(outPoints []OutPoint) *StopNotifySpentCmd {
return &StopNotifySpentCmd{
OutPoints: outPoints,
}
}
// RescanCmd defines the rescan JSON-RPC command.
type RescanCmd struct {
BeginBlock string
@ -111,5 +158,9 @@ func init() {
MustRegisterCmd("notifynewtransactions", (*NotifyNewTransactionsCmd)(nil), flags)
MustRegisterCmd("notifyreceived", (*NotifyReceivedCmd)(nil), flags)
MustRegisterCmd("notifyspent", (*NotifySpentCmd)(nil), flags)
MustRegisterCmd("stopnotifyblocks", (*StopNotifyBlocksCmd)(nil), flags)
MustRegisterCmd("stopnotifynewtransactions", (*StopNotifyNewTransactionsCmd)(nil), flags)
MustRegisterCmd("stopnotifyspent", (*StopNotifySpentCmd)(nil), flags)
MustRegisterCmd("stopnotifyreceived", (*StopNotifyReceivedCmd)(nil), flags)
MustRegisterCmd("rescan", (*RescanCmd)(nil), flags)
}

View file

@ -51,6 +51,17 @@ func TestChainSvrWsCmds(t *testing.T) {
marshalled: `{"jsonrpc":"1.0","method":"notifyblocks","params":[],"id":1}`,
unmarshalled: &btcjson.NotifyBlocksCmd{},
},
{
name: "stopnotifyblocks",
newCmd: func() (interface{}, error) {
return btcjson.NewCmd("stopnotifyblocks")
},
staticCmd: func() interface{} {
return btcjson.NewStopNotifyBlocksCmd()
},
marshalled: `{"jsonrpc":"1.0","method":"stopnotifyblocks","params":[],"id":1}`,
unmarshalled: &btcjson.StopNotifyBlocksCmd{},
},
{
name: "notifynewtransactions",
newCmd: func() (interface{}, error) {
@ -77,6 +88,17 @@ func TestChainSvrWsCmds(t *testing.T) {
Verbose: btcjson.Bool(true),
},
},
{
name: "stopnotifynewtransactions",
newCmd: func() (interface{}, error) {
return btcjson.NewCmd("stopnotifynewtransactions")
},
staticCmd: func() interface{} {
return btcjson.NewStopNotifyNewTransactionsCmd()
},
marshalled: `{"jsonrpc":"1.0","method":"stopnotifynewtransactions","params":[],"id":1}`,
unmarshalled: &btcjson.StopNotifyNewTransactionsCmd{},
},
{
name: "notifyreceived",
newCmd: func() (interface{}, error) {
@ -90,6 +112,19 @@ func TestChainSvrWsCmds(t *testing.T) {
Addresses: []string{"1Address"},
},
},
{
name: "stopnotifyreceived",
newCmd: func() (interface{}, error) {
return btcjson.NewCmd("stopnotifyreceived", []string{"1Address"})
},
staticCmd: func() interface{} {
return btcjson.NewStopNotifyReceivedCmd([]string{"1Address"})
},
marshalled: `{"jsonrpc":"1.0","method":"stopnotifyreceived","params":[["1Address"]],"id":1}`,
unmarshalled: &btcjson.StopNotifyReceivedCmd{
Addresses: []string{"1Address"},
},
},
{
name: "notifyspent",
newCmd: func() (interface{}, error) {
@ -104,6 +139,20 @@ func TestChainSvrWsCmds(t *testing.T) {
OutPoints: []btcjson.OutPoint{{Hash: "123", Index: 0}},
},
},
{
name: "stopnotifyspent",
newCmd: func() (interface{}, error) {
return btcjson.NewCmd("stopnotifyspent", `[{"hash":"123","index":0}]`)
},
staticCmd: func() interface{} {
ops := []btcjson.OutPoint{{Hash: "123", Index: 0}}
return btcjson.NewStopNotifySpentCmd(ops)
},
marshalled: `{"jsonrpc":"1.0","method":"stopnotifyspent","params":[[{"hash":"123","index":0}]],"id":1}`,
unmarshalled: &btcjson.StopNotifySpentCmd{
OutPoints: []btcjson.OutPoint{{Hash: "123", Index: 0}},
},
},
{
name: "rescan",
newCmd: func() (interface{}, error) {

View file

@ -649,10 +649,14 @@ user. Click the method name for further details such as parameter and return in
|---|------|-----------|-------------|
|1|[authenticate](#authenticate)|Authenticate the connection against the username and passphrase configured for the RPC server.<br /><font color="orange">NOTE: This is only required if an HTTP Authorization header is not being used.</font>|None|
|2|[notifyblocks](#notifyblocks)|Send notifications when a block is connected or disconnected from the best chain.|[blockconnected](#blockconnected) and [blockdisconnected](#blockdisconnected)|
|3|[notifyreceived](#notifyreceived)|Send notifications when a txout spends to an address.|[recvtx](#recvtx) and [redeemingtx](#redeemingtx)|
|4|[notifyspent](#notifyspent)|Send notification when a txout is spent.|[redeemingtx](#redeemingtx)|
|5|[rescan](#rescan)|Rescan block chain for transactions to addresses and spent transaction outpoints.|[recvtx](#recvtx), [redeemingtx](#redeemingtx), [rescanprogress](#rescanprogress), and [rescanfinished](#rescanfinished) |
|6|[notifynewtransactions](#notifynewtransactions)|Send notifications for all new transactions as they are accepted into the mempool.|[txaccepted](#txaccepted) or [txacceptedverbose](#txacceptedverbose)|
|3|[stopnotifyblocks](#stopnotifyblocks)|Cancel registered notifications for whenever a block is connected or disconnected from the main (best) chain. |None|
|4|[notifyreceived](#notifyreceived)|Send notifications when a txout spends to an address.|[recvtx](#recvtx) and [redeemingtx](#redeemingtx)|
|5|[stopnotifyreceived](#stopnotifyreceived)|Cancel registered notifications for when a txout spends to any of the passed addresses.|None|
|6|[notifyspent](#notifyspent)|Send notification when a txout is spent.|[redeemingtx](#redeemingtx)|
|7|[stopnotifyspent](#stopnotifyspent)|Cancel registered spending notifications for each passed outpoint.|None|
|8|[rescan](#rescan)|Rescan block chain for transactions to addresses and spent transaction outpoints.|[recvtx](#recvtx), [redeemingtx](#redeemingtx), [rescanprogress](#rescanprogress), and [rescanfinished](#rescanfinished) |
|9|[notifynewtransactions](#notifynewtransactions)|Send notifications for all new transactions as they are accepted into the mempool.|[txaccepted](#txaccepted) or [txacceptedverbose](#txacceptedverbose)|
|10|[stopnotifynewtransactions](#stopnotifynewtransactions)|Stop sending either a txaccepted or a txacceptedverbose notification when a new transaction is accepted into the mempool.|None|
<a name="WSExtMethodDetails" />
**7.2 Method Details**<br />
@ -667,6 +671,8 @@ user. Click the method name for further details such as parameter and return in
|Returns|Success: Nothing<br />Failure: Nothing (websocket disconnected)|
[Return to Overview](#ExtensionRequestOverview)<br />
***
<a name="notifyblocks"/>
| | |
@ -678,6 +684,18 @@ user. Click the method name for further details such as parameter and return in
|Returns|Nothing|
[Return to Overview](#ExtensionRequestOverview)<br />
***
<a name="stopnotifyblocks"/>
| | |
|---|---|
|Method|stopnotifyblocks|
|Notifications|None|
|Parameters|None|
|Description|Cancel sending notifications for whenever a block is connected or disconnected from the main (best) chain.|
|Returns|Nothing|
[Return to Overview](#ExtensionRequestOverview)<br />
***
<a name="notifyreceived"/>
@ -693,6 +711,19 @@ user. Click the method name for further details such as parameter and return in
***
<a name="stopnotifyreceived"/>
| | |
|---|---|
|Method|stopnotifyreceived|
|Notifications|None|
|Parameters|1. Addresses (JSON array, required)<br />&nbsp;`[ (json array of strings)`<br />&nbsp;&nbsp;`"bitcoinaddress", (string) the bitcoin address`<br />&nbsp;&nbsp;`...`<br />&nbsp;`]`|
|Description|Cancel registered receive notifications for each passed address.|
|Returns|Nothing|
[Return to Overview](#ExtensionRequestOverview)<br />
***
<a name="notifyspent"/>
| | |
@ -706,6 +737,19 @@ user. Click the method name for further details such as parameter and return in
***
<a name="stopnotifyspent"/>
| | |
|---|---|
|Method|stopnotifyspent|
|Notifications|None|
|Parameters|1. Outpoints (JSON array, required)<br />&nbsp;`[ (JSON array)`<br />&nbsp;&nbsp;`{ (JSON object)`<br />&nbsp;&nbsp;&nbsp;`"hash":"data", (string) the hex-encoded bytes of the outpoint hash`<br />&nbsp;&nbsp;&nbsp;`"index":n (numeric) the txout index of the outpoint`<br />&nbsp;&nbsp;`},`<br />&nbsp;&nbsp;`...`<br />&nbsp;`]`|
|Description|Cancel registered spending notifications for each passed outpoint.|
|Returns|Nothing|
[Return to Overview](#ExtensionRequestOverview)<br />
***
<a name="rescan"/>
| | |
@ -730,6 +774,19 @@ user. Click the method name for further details such as parameter and return in
|Returns|Nothing|
[Return to Overview](#ExtensionRequestOverview)<br />
***
<a name="stopnotifynewtransactions"/>
| | |
|---|---|
|Method|stopnotifynewtransactions|
|Notifications|None|
|Parameters|None|
|Description|Stop sending either a [txaccepted](#txaccepted) or a [txacceptedverbose](#txacceptedverbose) notification when a new transaction is accepted into the mempool.|
|Returns|Nothing|
[Return to Overview](#ExtensionRequestOverview)<br />
<a name="Notifications" />
### 8. Notifications (Websocket-specific)

View file

@ -475,15 +475,25 @@ var helpDescsEnUS = map[string]string{
// NotifyBlocksCmd help.
"notifyblocks--synopsis": "Request notifications for whenever a block is connected or disconnected from the main (best) chain.",
// StopNotifyBlocksCmd help.
"stopnotifyblocks--synopsis": "Cancel registered notifications for whenever a block is connected or disconnected from the main (best) chain.",
// NotifyNewTransactionsCmd help.
"notifynewtransactions--synopsis": "Send either a txaccepted or a txacceptedverbose notification when a new transaction is accepted into the mempool.",
"notifynewtransactions-verbose": "Specifies which type of notification to receive. If verbose is true, then the caller receives txacceptedverbose, otherwise the caller receives txaccepted",
// StopNotifyNewTransactionsCmd help.
"stopnotifynewtransactions--synopsis": "Stop sending either a txaccepted or a txacceptedverbose notification when a new transaction is accepted into the mempool.",
// NotifyReceivedCmd help.
"notifyreceived--synopsis": "Send a recvtx notification when a transaction added to mempool or appears in a newly-attached block contains a txout pkScript sending to any of the passed addresses.\n" +
"Matching outpoints are automatically registered for redeemingtx notifications.",
"notifyreceived-addresses": "List of address to receive notifications about",
// StopNotifyReceivedCmd help.
"stopnotifyreceived--synopsis": "Cancel registered receive notifications for each passed address.",
"stopnotifyreceived-addresses": "List of address to cancel receive notifications for",
// OutPoint help.
"outpoint-hash": "The hex-encoded bytes of the outpoint hash",
"outpoint-index": "The index of the outpoint",
@ -492,6 +502,10 @@ var helpDescsEnUS = map[string]string{
"notifyspent--synopsis": "Send a redeemingtx notification when a transaction spending an outpoint appears in mempool (if relayed to this btcd instance) and when such a transaction first appears in a newly-attached block.",
"notifyspent-outpoints": "List of transaction outpoints to monitor.",
// StopNotifySpentCmd help.
"stopnotifyspent--synopsis": "Cancel registered spending notifications for each passed outpoint.",
"stopnotifyspent-outpoints": "List of transaction outpoints to stop monitoring.",
// Rescan help.
"rescan--synopsis": "Rescan block chain for transactions to addresses.\n" +
"When the endblock parameter is omitted, the rescan continues through the best block in the main chain.\n" +
@ -547,11 +561,15 @@ var rpcResultTypes = map[string][]interface{}{
"verifymessage": []interface{}{(*bool)(nil)},
// Websocket commands.
"notifyblocks": nil,
"notifynewtransactions": nil,
"notifyreceived": nil,
"notifyspent": nil,
"rescan": nil,
"notifyblocks": nil,
"stopnotifyblocks": nil,
"notifynewtransactions": nil,
"stopnotifynewtransactions": nil,
"notifyreceived": nil,
"stopnotifyreceived": nil,
"notifyspent": nil,
"stopnotifyspent": nil,
"rescan": nil,
}
// helpCacher provides a concurrent safe type that provides help and usage for

View file

@ -49,12 +49,16 @@ type wsCommandHandler func(*wsClient, interface{}) (interface{}, error)
// causes a dependency loop.
var wsHandlers map[string]wsCommandHandler
var wsHandlersBeforeInit = map[string]wsCommandHandler{
"help": handleWebsocketHelp,
"notifyblocks": handleNotifyBlocks,
"notifynewtransactions": handleNotifyNewTransactions,
"notifyreceived": handleNotifyReceived,
"notifyspent": handleNotifySpent,
"rescan": handleRescan,
"help": handleWebsocketHelp,
"notifyblocks": handleNotifyBlocks,
"notifynewtransactions": handleNotifyNewTransactions,
"notifyreceived": handleNotifyReceived,
"notifyspent": handleNotifySpent,
"stopnotifyblocks": handleStopNotifyBlocks,
"stopnotifynewtransactions": handleStopNotifyNewTransactions,
"stopnotifyspent": handleStopNotifySpent,
"stopnotifyreceived": handleStopNotifyReceived,
"rescan": handleRescan,
}
// wsAsyncHandlers holds the websocket commands which should be run
@ -1451,6 +1455,13 @@ func handleNotifyBlocks(wsc *wsClient, icmd interface{}) (interface{}, error) {
return nil, nil
}
// handleStopNotifyBlocks implements the stopnotifyblocks command extension for
// websocket connections.
func handleStopNotifyBlocks(wsc *wsClient, icmd interface{}) (interface{}, error) {
wsc.server.ntfnMgr.UnregisterBlockUpdates(wsc)
return nil, nil
}
// handleNotifySpent implements the notifyspent command extension for
// websocket connections.
func handleNotifySpent(wsc *wsClient, icmd interface{}) (interface{}, error) {
@ -1459,15 +1470,11 @@ func handleNotifySpent(wsc *wsClient, icmd interface{}) (interface{}, error) {
return nil, btcjson.ErrRPCInternal
}
outpoints := make([]*wire.OutPoint, 0, len(cmd.OutPoints))
for i := range cmd.OutPoints {
blockHash, err := wire.NewShaHashFromStr(cmd.OutPoints[i].Hash)
if err != nil {
return nil, rpcDecodeHexError(cmd.OutPoints[i].Hash)
}
index := cmd.OutPoints[i].Index
outpoints = append(outpoints, wire.NewOutPoint(blockHash, index))
outpoints, err := deserializeOutpoints(cmd.OutPoints)
if err != nil {
return nil, err
}
wsc.server.ntfnMgr.RegisterSpentRequests(wsc, outpoints)
return nil, nil
}
@ -1485,6 +1492,13 @@ func handleNotifyNewTransactions(wsc *wsClient, icmd interface{}) (interface{},
return nil, nil
}
// handleStopNotifyNewTransations implements the stopnotifynewtransactions
// command extension for websocket connections.
func handleStopNotifyNewTransactions(wsc *wsClient, icmd interface{}) (interface{}, error) {
wsc.server.ntfnMgr.UnregisterNewMempoolTxsUpdates(wsc)
return nil, nil
}
// handleNotifyReceived implements the notifyreceived command extension for
// websocket connections.
func handleNotifyReceived(wsc *wsClient, icmd interface{}) (interface{}, error) {
@ -1495,18 +1509,88 @@ func handleNotifyReceived(wsc *wsClient, icmd interface{}) (interface{}, error)
// Decode addresses to validate input, but the strings slice is used
// directly if these are all ok.
err := checkAddressValidity(cmd.Addresses)
if err != nil {
return nil, err
}
wsc.server.ntfnMgr.RegisterTxOutAddressRequests(wsc, cmd.Addresses)
return nil, nil
}
// handleStopNotifySpent implements the stopnotifyspent command extension for
// websocket connections.
func handleStopNotifySpent(wsc *wsClient, icmd interface{}) (interface{}, error) {
cmd, ok := icmd.(*btcjson.StopNotifySpentCmd)
if !ok {
return nil, btcjson.ErrRPCInternal
}
outpoints, err := deserializeOutpoints(cmd.OutPoints)
if err != nil {
return nil, err
}
for _, outpoint := range outpoints {
wsc.server.ntfnMgr.UnregisterSpentRequest(wsc, outpoint)
}
return nil, nil
}
// handleStopNotifyReceived implements the stopnotifyreceived command extension
// for websocket connections.
func handleStopNotifyReceived(wsc *wsClient, icmd interface{}) (interface{}, error) {
cmd, ok := icmd.(*btcjson.StopNotifyReceivedCmd)
if !ok {
return nil, btcjson.ErrRPCInternal
}
// Decode addresses to validate input, but the strings slice is used
// directly if these are all ok.
err := checkAddressValidity(cmd.Addresses)
if err != nil {
return nil, err
}
for _, addr := range cmd.Addresses {
wsc.server.ntfnMgr.UnregisterTxOutAddressRequest(wsc, addr)
}
return nil, nil
}
// checkAddressValidity checks the validity of each address in the passed
// string slice. It does this by attempting to decode each address using the
// current active network parameters. If any single address fails to decode
// properly, the function returns an error. Otherwise, nil is returned.
func checkAddressValidity(addrs []string) error {
for _, addr := range addrs {
_, err := btcutil.DecodeAddress(addr, activeNetParams.Params)
if err != nil {
return nil, &btcjson.RPCError{
return &btcjson.RPCError{
Code: btcjson.ErrRPCInvalidAddressOrKey,
Message: fmt.Sprintf("Invalid address or key: %v",
addr),
}
}
}
wsc.server.ntfnMgr.RegisterTxOutAddressRequests(wsc, cmd.Addresses)
return nil, nil
return nil
}
// deserializeOutpoints deserializes each serialized outpoint.
func deserializeOutpoints(serializedOuts []btcjson.OutPoint) ([]*wire.OutPoint, error) {
outpoints := make([]*wire.OutPoint, 0, len(serializedOuts))
for i := range serializedOuts {
blockHash, err := wire.NewShaHashFromStr(serializedOuts[i].Hash)
if err != nil {
return nil, rpcDecodeHexError(serializedOuts[i].Hash)
}
index := serializedOuts[i].Index
outpoints = append(outpoints, wire.NewOutPoint(blockHash, index))
}
return outpoints, nil
}
type rescanKeys struct {