Added notifyallnewtxs custom websocket command

Changed mempool.MaybeAcceptTransaction to accept an additional parameter
to differentiate betwee new transactions and those added from
disconnected blocks.

Added new fields to requestContexts to indicate which clients want to
receive all new transaction notifications.

Added NotifyForNewTx to rpcServer to deliver approriate transaction
notification.
This commit is contained in:
Francis Lam 2014-02-08 17:15:17 -05:00
parent 642c834ada
commit b89e93e52f
3 changed files with 72 additions and 6 deletions

View file

@ -1014,7 +1014,7 @@ func (b *blockManager) handleNotifyMsg(notification *btcchain.Notification) {
// Reinsert all of the transactions (except the coinbase) into // Reinsert all of the transactions (except the coinbase) into
// the transaction pool. // the transaction pool.
for _, tx := range block.Transactions()[1:] { for _, tx := range block.Transactions()[1:] {
err := b.server.txMemPool.MaybeAcceptTransaction(tx, nil) err := b.server.txMemPool.MaybeAcceptTransaction(tx, nil, false)
if err != nil { if err != nil {
// Remove the transaction and all transactions // Remove the transaction and all transactions
// that depend on it if it wasn't accepted into // that depend on it if it wasn't accepted into

View file

@ -717,7 +717,7 @@ func (mp *txMemPool) FetchTransaction(txHash *btcwire.ShaHash) (*btcutil.Tx, err
// more details. // more details.
// //
// This function MUST be called with the mempool lock held (for writes). // This function MUST be called with the mempool lock held (for writes).
func (mp *txMemPool) maybeAcceptTransaction(tx *btcutil.Tx, isOrphan *bool) error { func (mp *txMemPool) maybeAcceptTransaction(tx *btcutil.Tx, isOrphan *bool, isNew bool) error {
if isOrphan != nil { if isOrphan != nil {
*isOrphan = false *isOrphan = false
} }
@ -879,6 +879,10 @@ func (mp *txMemPool) maybeAcceptTransaction(tx *btcutil.Tx, isOrphan *bool) erro
// Notify wallets of mempool transactions to wallet addresses. // Notify wallets of mempool transactions to wallet addresses.
if mp.server.rpcServer != nil { if mp.server.rpcServer != nil {
mp.server.rpcServer.NotifyForTxOuts(tx, nil) mp.server.rpcServer.NotifyForTxOuts(tx, nil)
if isNew {
mp.server.rpcServer.NotifyForNewTx(tx)
}
} }
return nil return nil
@ -892,12 +896,12 @@ func (mp *txMemPool) maybeAcceptTransaction(tx *btcutil.Tx, isOrphan *bool) erro
// or not the transaction is an orphan. // or not the transaction is an orphan.
// //
// This function is safe for concurrent access. // This function is safe for concurrent access.
func (mp *txMemPool) MaybeAcceptTransaction(tx *btcutil.Tx, isOrphan *bool) error { func (mp *txMemPool) MaybeAcceptTransaction(tx *btcutil.Tx, isOrphan *bool, isNew bool) error {
// Protect concurrent access. // Protect concurrent access.
mp.Lock() mp.Lock()
defer mp.Unlock() defer mp.Unlock()
return mp.maybeAcceptTransaction(tx, isOrphan) return mp.maybeAcceptTransaction(tx, isOrphan, isNew)
} }
// processOrphans determines if there are any orphans which depend on the passed // processOrphans determines if there are any orphans which depend on the passed
@ -937,7 +941,7 @@ func (mp *txMemPool) processOrphans(hash *btcwire.ShaHash) error {
// Potentially accept the transaction into the // Potentially accept the transaction into the
// transaction pool. // transaction pool.
var isOrphan bool var isOrphan bool
err := mp.maybeAcceptTransaction(tx, &isOrphan) err := mp.maybeAcceptTransaction(tx, &isOrphan, true)
if err != nil { if err != nil {
return err return err
} }
@ -975,7 +979,7 @@ func (mp *txMemPool) ProcessTransaction(tx *btcutil.Tx) error {
// Potentially accept the transaction to the memory pool. // Potentially accept the transaction to the memory pool.
var isOrphan bool var isOrphan bool
err := mp.maybeAcceptTransaction(tx, &isOrphan) err := mp.maybeAcceptTransaction(tx, &isOrphan, true)
if err != nil { if err != nil {
return err return err
} }

View file

@ -46,6 +46,7 @@ var wsHandlers = map[string]wsCommandHandler{
"getbestblock": handleGetBestBlock, "getbestblock": handleGetBestBlock,
"notifyblocks": handleNotifyBlocks, "notifyblocks": handleNotifyBlocks,
"notifynewtxs": handleNotifyNewTXs, "notifynewtxs": handleNotifyNewTXs,
"notifyallnewtxs": handleNotifyAllNewTXs,
"notifyspent": handleNotifySpent, "notifyspent": handleNotifySpent,
"rescan": handleRescan, "rescan": handleRescan,
"sendrawtransaction": handleWalletSendRawTransaction, "sendrawtransaction": handleWalletSendRawTransaction,
@ -83,6 +84,15 @@ func (r *wsContext) AddBlockUpdateRequest(n ntfnChan) {
rc.blockUpdates = true rc.blockUpdates = true
} }
func (r *wsContext) AddAllNewTxRequest(n ntfnChan, verbose bool) {
r.Lock()
defer r.Unlock()
rc := r.connections[n]
rc.allTxUpdates = true
rc.verboseTxUpdates = verbose
}
// AddTxRequest adds the request context for new transaction notifications. // AddTxRequest adds the request context for new transaction notifications.
func (r *wsContext) AddTxRequest(n ntfnChan, addr string) { func (r *wsContext) AddTxRequest(n ntfnChan, addr string) {
r.Lock() r.Lock()
@ -251,6 +261,14 @@ type requestContexts struct {
// chain. // chain.
blockUpdates bool blockUpdates bool
// allTxUpdates specifies whether a client has requested notifications
// for all new transactions.
allTxUpdates bool
// verboseTxUpdates specifies whether a client has requested more verbose
// information about all new transactions
verboseTxUpdates bool
// txRequests is a set of addresses a wallet has requested transactions // txRequests is a set of addresses a wallet has requested transactions
// updates for. It is maintained here so all requests can be removed // updates for. It is maintained here so all requests can be removed
// when a wallet disconnects. // when a wallet disconnects.
@ -362,6 +380,18 @@ func handleNotifyNewTXs(s *rpcServer, icmd btcjson.Cmd, c handlerChans) (interfa
return nil, nil return nil, nil
} }
// handleNotifyAllNewTXs implements the notifyallnewtxs command extension for
// websocket connections.
func handleNotifyAllNewTXs(s *rpcServer, icmd btcjson.Cmd, c handlerChans) (interface{}, *btcjson.Error) {
cmd, ok := icmd.(*btcws.NotifyAllNewTXsCmd)
if !ok {
return nil, &btcjson.ErrInternal
}
s.ws.AddAllNewTxRequest(c.n, cmd.Verbose)
return nil, nil
}
// handleNotifySpent implements the notifyspent command extension for // handleNotifySpent implements the notifyspent command extension for
// websocket connections. // websocket connections.
func handleNotifySpent(s *rpcServer, icmd btcjson.Cmd, c handlerChans) (interface{}, *btcjson.Error) { func handleNotifySpent(s *rpcServer, icmd btcjson.Cmd, c handlerChans) (interface{}, *btcjson.Error) {
@ -951,3 +981,35 @@ func (s *rpcServer) NotifyForTxOuts(tx *btcutil.Tx, block *btcutil.Block) {
} }
} }
} }
// NotifyForNewTx sends delivers the new tx to any client that has
// registered for all new TX.
func (s *rpcServer) NotifyForNewTx(tx *btcutil.Tx) {
txId := tx.Sha().String()
mtx := tx.MsgTx()
var amount int64
for _, txOut := range mtx.TxOut {
amount += txOut.Value
}
ntfn := btcws.NewAllTxNtfn(txId, amount)
var verboseNtfn *btcws.AllVerboseTxNtfn
for ntfnChan, rc := range s.ws.connections {
if rc.allTxUpdates {
if rc.verboseTxUpdates {
if verboseNtfn == nil {
rawTx, err := createTxRawResult(s.server.btcnet, txId, mtx, nil, 0, nil)
if err != nil {
return
}
verboseNtfn = btcws.NewAllVerboseTxNtfn(rawTx)
}
ntfnChan <- verboseNtfn
} else {
ntfnChan <- ntfn
}
}
}
}