Improve websocket transaction notifications.

This change improves the mechanism by which btcd notifies a websocket
client of transaction relating to watched address and unspent outputs
in the following ways:

1. The old processedtx notification has been replaced with the new
   recvtx notification.  This notification, rather than parsing out
   details used by wallet clients, sends the serialized transaction
   (as hexadecimal) and any block details (if mined) if any transaction
   output sends to one of the websocket client's watched addresses.

2. The old txspent notification has been replaced with the new
   redeemingtx notification.  This notification, rather than parsing
   out details used by wallet clients, sends the serialized transaction
   (as hexadecimal) and any block details (if mined) if any transaction
   input spends a watched output.

3. When processing notifications for transaction outputs, if any output
   spends to a client's watched address, a corresponding spent request
   is automatically registered.

4. Transaction notifications originating from mempool now include both
   transaction inputs and outputs, rather than only processing

5. When processing notifications for transaction inputs, a client's
   output spent request is only removed if the transaction being
   processed has also been mined into a block.  In combination with the
   4th change, this results in two redeemingtx notifications for
   transactions which first appear in mempool and are subsequently mined
   into a block.
This commit is contained in:
Josh Rickmar 2014-02-24 09:10:59 -05:00
parent a3ccc25e5a
commit 0c6d7bbeae
2 changed files with 161 additions and 119 deletions

View file

@ -912,7 +912,7 @@ func (mp *txMemPool) maybeAcceptTransaction(tx *btcutil.Tx, isOrphan *bool, isNe
// Notify websocket clients about mempool transactions.
if mp.server.rpcServer != nil {
go func() {
mp.server.rpcServer.ntfnMgr.NotifyForTxOuts(tx, nil)
mp.server.rpcServer.ntfnMgr.NotifyForTx(tx, nil)
if isNew {
mp.server.rpcServer.ntfnMgr.NotifyForNewTx(tx)

View file

@ -13,8 +13,8 @@ import (
"encoding/base64"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"github.com/conformal/btcdb"
"github.com/conformal/btcjson"
"github.com/conformal/btcscript"
"github.com/conformal/btcutil"
@ -312,16 +312,11 @@ func (m *wsNotificationManager) NotifyForNewTx(tx *btcutil.Tx) {
}
}
// AddSpentRequest requests an notification when the passed outpoint is
// confirmed spent (contained in a block connected to the main chain) for the
// passed websocket client. The request is automatically removed once the
// notification has been sent.
// addSpentRequest is the internal function which implements the public
// AddSpentRequest. See the comment for AddSpentRequest for more details.
//
// This function is safe for concurrent access.
func (m *wsNotificationManager) AddSpentRequest(wsc *wsClient, op *btcwire.OutPoint) {
m.Lock()
defer m.Unlock()
// This function MUST be called with the notification manager lock held.
func (m *wsNotificationManager) addSpentRequest(wsc *wsClient, op *btcwire.OutPoint) {
// Track the request in the client as well so it can be quickly be
// removed on disconnect.
wsc.spentRequests[*op] = struct{}{}
@ -336,6 +331,19 @@ func (m *wsNotificationManager) AddSpentRequest(wsc *wsClient, op *btcwire.OutPo
cmap[wsc.quit] = wsc
}
// AddSpentRequest requests an notification when the passed outpoint is
// confirmed spent (contained in a block connected to the main chain) for the
// passed websocket client. The request is automatically removed once the
// notification has been sent.
//
// This function is safe for concurrent access.
func (m *wsNotificationManager) AddSpentRequest(wsc *wsClient, op *btcwire.OutPoint) {
m.Lock()
defer m.Unlock()
m.addSpentRequest(wsc, op)
}
// removeSpentRequest is the internal function which implements the public
// RemoveSpentRequest. See the comment for RemoveSpentRequest for more details.
//
@ -372,8 +380,18 @@ func (m *wsNotificationManager) RemoveSpentRequest(wsc *wsClient, op *btcwire.Ou
m.removeSpentRequest(wsc, op)
}
// notifyForTxOuts is the internal function which implements the public
// NotifyForTxOuts. See the comment for NotifyForTxOuts for more details.
// txHexString returns the serialized transaction encoded in hexadecimal.
func txHexString(tx *btcutil.Tx) string {
var buf bytes.Buffer
// Ignore Serialize's error, as writing to a bytes.buffer cannot fail.
tx.MsgTx().Serialize(&buf)
return hex.EncodeToString(buf.Bytes())
}
// notifyForTxOuts examines each transaction output, notifying interested
// websocket clients of the transaction if an output spends to a watched
// address. A spent notification request is automatically registered for
// the client for each matching output.
//
// This function MUST be called with the notification manager lock held.
func (m *wsNotificationManager) notifyForTxOuts(tx *btcutil.Tx, block *btcutil.Block) {
@ -382,9 +400,11 @@ func (m *wsNotificationManager) notifyForTxOuts(tx *btcutil.Tx, block *btcutil.B
return
}
for i, txout := range tx.MsgTx().TxOut {
txHex := ""
wscNotified := make(map[chan bool]bool)
for i, txOut := range tx.MsgTx().TxOut {
_, addrs, _, err := btcscript.ExtractPkScriptAddrs(
txout.PkScript, m.server.server.btcnet)
txOut.PkScript, m.server.server.btcnet)
if err != nil {
continue
}
@ -396,95 +416,85 @@ func (m *wsNotificationManager) notifyForTxOuts(tx *btcutil.Tx, block *btcutil.B
continue
}
ntfn := &btcws.ProcessedTxNtfn{
Receiver: encodedAddr,
TxID: tx.Sha().String(),
TxOutIndex: uint32(i),
Amount: txout.Value,
PkScript: hex.EncodeToString(txout.PkScript),
// TODO(jrick): hardcoding unspent is WRONG and needs
// to be either calculated from other block txs, or dropped.
Spent: false,
}
if block != nil {
blkhash, err := block.Sha()
if err != nil {
rpcsLog.Error("Error getting block sha; dropping Tx notification")
break
}
ntfn.BlockHeight = int32(block.Height())
ntfn.BlockHash = blkhash.String()
ntfn.BlockIndex = tx.Index()
ntfn.BlockTime = block.MsgBlock().Header.Timestamp.Unix()
} else {
ntfn.BlockHeight = -1
ntfn.BlockIndex = -1
if txHex == "" {
txHex = txHexString(tx)
}
ntfn := btcws.NewRecvTxNtfn(txHex, blockDetails(block, tx.Index()))
marshalledJSON, err := json.Marshal(ntfn)
if err != nil {
rpcsLog.Errorf("Failed to marshal processedtx notification: %v", err)
continue
}
for _, wsc := range cmap {
wsc.QueueNotification(marshalledJSON)
op := btcwire.NewOutPoint(tx.Sha(), uint32(i))
for wscQuit, wsc := range cmap {
m.addSpentRequest(wsc, op)
if !wscNotified[wscQuit] {
wscNotified[wscQuit] = true
wsc.QueueNotification(marshalledJSON)
}
}
}
}
}
// NotifyForTxOuts examines the outputs of the passed transaction and sends a
// notification to any websocket clients that are interested in an address the
// transaction pays to.
// NotifyForTx examines the inputs and outputs of the passed transaction,
// notifying websocket clients of outputs spending to a watched address
// and inputs spending a watched outpoint.
//
// This function is safe for concurrent access.
func (m *wsNotificationManager) NotifyForTxOuts(tx *btcutil.Tx, block *btcutil.Block) {
func (m *wsNotificationManager) NotifyForTx(tx *btcutil.Tx, block *btcutil.Block) {
m.Lock()
defer m.Unlock()
m.notifyForTxIns(tx, block)
m.notifyForTxOuts(tx, block)
}
// newSpentNotification returns a new marshalled spent notification with the
// passed parameters.
func newSpentNotification(prevOut *btcwire.OutPoint, spender *btcutil.Tx) []byte {
// Ignore Serialize's error, as writing to a bytes.buffer cannot fail.
var serializedTx bytes.Buffer
spender.MsgTx().Serialize(&serializedTx)
txHex := hex.EncodeToString(serializedTx.Bytes())
// Create and marsh the notification.
ntfn := btcws.NewTxSpentNtfn(prevOut.Hash.String(), int(prevOut.Index),
txHex)
marshalledJSON, err := json.Marshal(ntfn)
if err != nil {
rpcsLog.Errorf("Failed to marshal spent notification: %v", err)
return nil
}
return marshalledJSON
// newRedeemingTxNotification returns a new marshalled redeemingtx notification
// with the passed parameters.
func newRedeemingTxNotification(txHex string, index int, block *btcutil.Block) ([]byte, error) {
// Create and marshal the notification.
ntfn := btcws.NewRedeemingTxNtfn(txHex, blockDetails(block, index))
return json.Marshal(ntfn)
}
// notifySpent examines the inputs of the passed transaction and sends
// interested websocket clients a notification.
// notifyForTxIns examines the inputs of the passed transaction and sends
// interested websocket clients a redeemingtx notification if any inputs
// spend a watched output. If block is non-nil, any matching spent
// requests are removed.
//
// This function MUST be called with the notification manager lock held.
func (m *wsNotificationManager) notifySpent(tx *btcutil.Tx) {
func (m *wsNotificationManager) notifyForTxIns(tx *btcutil.Tx, block *btcutil.Block) {
// Nothing to do if nobody is listening for spent notifications.
if len(m.spentNotifications) == 0 {
return
}
txHex := ""
wscNotified := make(map[chan bool]bool)
for _, txIn := range tx.MsgTx().TxIn {
prevOut := &txIn.PreviousOutpoint
if cmap, ok := m.spentNotifications[*prevOut]; ok {
marshalledJSON := newSpentNotification(prevOut, tx)
if marshalledJSON == nil {
if txHex == "" {
txHex = txHexString(tx)
}
marshalledJSON, err := newRedeemingTxNotification(txHex, tx.Index(), block)
if err != nil {
rpcsLog.Warnf("Failed to marshal redeemingtx notification: %v", err)
continue
}
for _, wsc := range cmap {
wsc.QueueNotification(marshalledJSON)
m.removeSpentRequest(wsc, prevOut)
for wscQuit, wsc := range cmap {
if block != nil {
m.removeSpentRequest(wsc, prevOut)
}
if !wscNotified[wscQuit] {
wscNotified[wscQuit] = true
wsc.QueueNotification(marshalledJSON)
}
}
}
}
@ -505,11 +515,24 @@ func (m *wsNotificationManager) NotifyBlockTXs(block *btcutil.Block) {
}
for _, tx := range block.Transactions() {
m.notifySpent(tx)
m.notifyForTxIns(tx, block)
m.notifyForTxOuts(tx, block)
}
}
func blockDetails(block *btcutil.Block, txIndex int) *btcws.BlockDetails {
if block == nil {
return nil
}
blockSha, _ := block.Sha() // never errors
return &btcws.BlockDetails{
Height: int32(block.Height()),
Hash: blockSha.String(),
Index: txIndex,
Time: block.MsgBlock().Header.Timestamp.Unix(),
}
}
// AddAddrRequest requests notifications to the passed websocket client when
// a transaction pays to the passed address.
//
@ -1086,20 +1109,29 @@ func (c *wsClient) SendMessage(marshalledJSON []byte, doneChan chan bool) {
c.sendChan <- wsResponse{msg: marshalledJSON, doneChan: doneChan}
}
// ErrClientQuit describes the error where a client send is not processed due
// to the client having already been disconnected or dropped.
var ErrClientQuit = errors.New("client quit")
// QueueMessage queues the passed notification to be sent to the websocket
// client. This function, as the name implies, is only intended for
// notifications since it has additional logic to prevent other subsystems, such
// as the memory pool and block manager, from blocking even when the send
// channel is full.
func (c *wsClient) QueueNotification(marshalledJSON []byte) {
//
// If the client is in the process of shutting down, this function returns
// ErrClientQuit. This is intended to be checked by long-running notification
// handlers to stop processing if there is no more work needed to be done.
func (c *wsClient) QueueNotification(marshalledJSON []byte) error {
// Don't queue the message if in the process of shutting down.
select {
case <-c.quit:
return
return ErrClientQuit
default:
}
c.ntfnChan <- marshalledJSON
return nil
}
// Disconnect disconnects the websocket client.
@ -1239,73 +1271,82 @@ func handleNotifyNewTXs(wsc *wsClient, icmd btcjson.Cmd) (interface{}, *btcjson.
// rescanBlock rescans all transactions in a single block. This is a helper
// function for handleRescan.
func rescanBlock(wsc *wsClient, cmd *btcws.RescanCmd, blk *btcutil.Block) {
db := wsc.server.server.db
func rescanBlock(wsc *wsClient, cmd *btcws.RescanCmd, blk *btcutil.Block,
unspent map[btcwire.OutPoint]struct{}) {
for _, tx := range blk.Transactions() {
var txReply *btcdb.TxListReply
txouts:
for txOutIdx, txout := range tx.MsgTx().TxOut {
_, addrs, _, err := btcscript.ExtractPkScriptAddrs(
txout.PkScript, wsc.server.server.btcnet)
if err != nil {
continue txouts
// Hexadecimal representation of this tx. Only created if
// needed, and reused for later notifications if already made.
var txHex string
// All inputs and outputs must be iterated through to correctly
// modify the unspent map, however, just a single notification
// for any matching transaction inputs or outputs should be
// created and sent.
spentNotified := false
recvNotified := false
for _, txin := range tx.MsgTx().TxIn {
if _, ok := unspent[txin.PreviousOutpoint]; ok {
delete(unspent, txin.PreviousOutpoint)
if spentNotified {
continue
}
if txHex == "" {
txHex = txHexString(tx)
}
marshalledJSON, err := newRedeemingTxNotification(txHex, tx.Index(), blk)
if err != nil {
rpcsLog.Errorf("Failed to marshal redeemingtx notification: %v", err)
continue
}
err = wsc.QueueNotification(marshalledJSON)
// Stop the rescan early if the websocket client
// disconnected.
if err == ErrClientQuit {
return
}
spentNotified = true
}
}
for txOutIdx, txout := range tx.MsgTx().TxOut {
_, addrs, _, _ := btcscript.ExtractPkScriptAddrs(
txout.PkScript, wsc.server.server.btcnet)
for _, addr := range addrs {
encodedAddr := addr.EncodeAddress()
if _, ok := cmd.Addresses[encodedAddr]; !ok {
continue
}
// TODO(jrick): This lookup is expensive and can be avoided
// if the wallet is sent the previous outpoints for all inputs
// of the tx, so any can removed from the utxo set (since
// they are, as of this tx, now spent).
if txReply == nil {
txReplyList, err := db.FetchTxBySha(tx.Sha())
if err != nil {
rpcsLog.Errorf("Tx Sha %v not found by db", tx.Sha())
continue txouts
}
for i := range txReplyList {
if txReplyList[i].Height == blk.Height() {
txReply = txReplyList[i]
break
}
}
unspent[*btcwire.NewOutPoint(tx.Sha(), uint32(txOutIdx))] = struct{}{}
if recvNotified {
continue
}
// Sha never errors.
blksha, _ := blk.Sha()
ntfn := &btcws.ProcessedTxNtfn{
Receiver: encodedAddr,
Amount: txout.Value,
TxID: tx.Sha().String(),
TxOutIndex: uint32(txOutIdx),
PkScript: hex.EncodeToString(txout.PkScript),
BlockHash: blksha.String(),
BlockHeight: int32(blk.Height()),
BlockIndex: tx.Index(),
BlockTime: blk.MsgBlock().Header.Timestamp.Unix(),
Spent: txReply.TxSpent[txOutIdx],
if txHex == "" {
txHex = txHexString(tx)
}
ntfn := btcws.NewRecvTxNtfn(txHex, blockDetails(blk, tx.Index()))
marshalledJSON, err := json.Marshal(ntfn)
if err != nil {
rpcsLog.Errorf("Failed to marshal processedtx notification: %v", err)
rpcsLog.Errorf("Failed to marshal recvtx notification: %v", err)
return
}
err = wsc.QueueNotification(marshalledJSON)
// Stop the rescan early if the websocket client
// disconnected.
select {
case <-wsc.quit:
if err == ErrClientQuit {
return
default:
wsc.SendMessage(marshalledJSON, nil)
}
recvNotified = true
}
}
}
@ -1328,6 +1369,7 @@ func handleRescan(wsc *wsClient, icmd btcjson.Cmd) (interface{}, *btcjson.Error)
minBlock := int64(cmd.BeginBlock)
maxBlock := int64(cmd.EndBlock)
unspent := make(map[btcwire.OutPoint]struct{})
// FetchHeightRange may not return a complete list of block shas for
// the given range, so fetch range as many times as necessary.
@ -1357,7 +1399,7 @@ func handleRescan(wsc *wsClient, icmd btcjson.Cmd) (interface{}, *btcjson.Error)
blk.Height())
return nil, nil
default:
rescanBlock(wsc, cmd, blk)
rescanBlock(wsc, cmd, blk, unspent)
}
}