Notify wallets of mined transactions.
This change allows btcwallet to keep a pool of transactions that have not yet been mined into a block, notifying wallet when transactions are mined, as well as introducing a new way to send the btcd:blockconnected notification with wallet-specific information as part of the same notification. When a transaction is sent using the RPC call 'sendrawtransaction', a notification request will be automatically registered with the connected wallet (if using websockets) to notify the wallet when the transaction first appears in a block. To perform this notification, and to avoid requiring wallets from waiting for seperate mined tx notifications (and resend after a timeout) or from sending an additional tx mined request for every tx in the pool after each new block, the blockconnected notification is now created seperately for each wallet. If the notified wallet has sent a transaction, an additional JSON field "minedtxs" will include an array of transaction IDs that the wallet has created and which are included in the new block. This new unique blockconnected notification can also be used for additional notifications that may happen each new block in the future, and to cut down on existing notification handlers in btcwallet, such as for transactions to a watched address.
This commit is contained in:
parent
2bb8c5d5cc
commit
618b885e9e
2 changed files with 113 additions and 67 deletions
|
@ -653,7 +653,7 @@ func (b *blockManager) handleNotifyMsg(notification *btcchain.Notification) {
|
|||
// Notify frontends
|
||||
if r := b.server.rpcServer; r != nil {
|
||||
go r.NotifyBlockConnected(block)
|
||||
go r.NotifyNewTxListeners(b.server.db, block)
|
||||
go r.NotifyBlockTXs(b.server.db, block)
|
||||
}
|
||||
|
||||
// A block has been disconnected from the main block chain.
|
||||
|
|
178
rpcserver.go
178
rpcserver.go
|
@ -57,15 +57,17 @@ type rpcServer struct {
|
|||
// wsContext holds the items the RPC server needs to handle websocket
|
||||
// connections for wallets.
|
||||
type wsContext struct {
|
||||
// walletListeners holds a map of each currently connected wallet
|
||||
// listener as the key. The value is ignored, as this is only used as
|
||||
// a set. A mutex is used to prevent incorrect multiple access.
|
||||
walletListeners struct {
|
||||
sync.RWMutex
|
||||
m map[chan []byte]bool
|
||||
}
|
||||
|
||||
// requests holds all wallet notification requests.
|
||||
requests wsRequests
|
||||
|
||||
// Channel to add a wallet listener.
|
||||
addWalletListener chan (chan []byte)
|
||||
|
||||
// Channel to removes a wallet listener.
|
||||
removeWalletListener chan (chan []byte)
|
||||
|
||||
// Any chain notifications meant to be received by every connected
|
||||
// wallet are sent across this channel.
|
||||
walletNotificationMaster chan []byte
|
||||
|
@ -85,8 +87,9 @@ func (r *wsRequests) getOrCreateContexts(walletNotification chan []byte) *reques
|
|||
rc, ok := r.m[walletNotification]
|
||||
if !ok {
|
||||
rc = &requestContexts{
|
||||
txRequests: make(map[addressHash]interface{}),
|
||||
spentRequests: make(map[btcwire.OutPoint]interface{}),
|
||||
txRequests: make(map[addressHash]interface{}),
|
||||
spentRequests: make(map[btcwire.OutPoint]interface{}),
|
||||
minedTxRequests: make(map[btcwire.ShaHash]bool),
|
||||
}
|
||||
r.m[walletNotification] = rc
|
||||
}
|
||||
|
@ -122,6 +125,26 @@ func (r *wsRequests) RemoveSpentRequest(walletNotification chan []byte, op *btcw
|
|||
delete(rc.spentRequests, *op)
|
||||
}
|
||||
|
||||
// AddMinedTxRequest adds request contexts for notifications of a
|
||||
// mined transaction.
|
||||
func (r *wsRequests) AddMinedTxRequest(walletNotification chan []byte, txID *btcwire.ShaHash) {
|
||||
r.Lock()
|
||||
defer r.Unlock()
|
||||
|
||||
rc := r.getOrCreateContexts(walletNotification)
|
||||
rc.minedTxRequests[*txID] = true
|
||||
}
|
||||
|
||||
// RemoveMinedTxRequest removes request contexts for notifications of a
|
||||
// mined transaction.
|
||||
func (r *wsRequests) RemoveMinedTxRequest(walletNotification chan []byte, txID *btcwire.ShaHash) {
|
||||
r.Lock()
|
||||
defer r.Unlock()
|
||||
|
||||
rc := r.getOrCreateContexts(walletNotification)
|
||||
delete(rc.minedTxRequests, *txID)
|
||||
}
|
||||
|
||||
// CloseListeners removes all request contexts for notifications sent
|
||||
// to a wallet notification channel and closes the channel to stop all
|
||||
// goroutines currently serving that wallet.
|
||||
|
@ -147,6 +170,14 @@ type requestContexts struct {
|
|||
// replies can be correctly routed back to the correct
|
||||
// btcwallet callback.
|
||||
spentRequests map[btcwire.OutPoint]interface{}
|
||||
|
||||
// minedTxRequests holds a set of transaction IDs (tx hashes) of
|
||||
// transactions created by a wallet. A wallet may request
|
||||
// notifications of when a tx it created is mined into a block and
|
||||
// removed from the mempool. Once a tx has been mined into a
|
||||
// block, wallet may remove the raw transaction from its unmined tx
|
||||
// pool.
|
||||
minedTxRequests map[btcwire.ShaHash]bool
|
||||
}
|
||||
|
||||
// Start is used by server.go to start the rpc listener.
|
||||
|
@ -217,8 +248,7 @@ func newRPCServer(s *server) (*rpcServer, error) {
|
|||
|
||||
// initialize memory for websocket connections
|
||||
rpc.ws.requests.m = make(map[chan []byte]*requestContexts)
|
||||
rpc.ws.addWalletListener = make(chan (chan []byte))
|
||||
rpc.ws.removeWalletListener = make(chan (chan []byte))
|
||||
rpc.ws.walletListeners.m = make(map[chan []byte]bool)
|
||||
rpc.ws.walletNotificationMaster = make(chan []byte)
|
||||
|
||||
// IPv4 listener.
|
||||
|
@ -267,7 +297,7 @@ func jsonRPCRead(w http.ResponseWriter, r *http.Request, s *rpcServer) {
|
|||
// websocket handler to tell when a method is not supported by
|
||||
// the standard RPC API, and is not needed here. Error logging
|
||||
// is done inside jsonRead, so no need to log the error here.
|
||||
reply, _ := jsonRead(body, s)
|
||||
reply, _ := jsonRead(body, s, nil)
|
||||
log.Tracef("[RPCS] reply: %v", reply)
|
||||
|
||||
msg, err := btcjson.MarshallAndSend(reply, w)
|
||||
|
@ -279,8 +309,10 @@ func jsonRPCRead(w http.ResponseWriter, r *http.Request, s *rpcServer) {
|
|||
}
|
||||
|
||||
// jsonRead abstracts the JSON unmarshalling and reply handling used
|
||||
// by both RPC and websockets.
|
||||
func jsonRead(body []byte, s *rpcServer) (reply btcjson.Reply, err error) {
|
||||
// by both RPC and websockets. If called from websocket code, a non-nil
|
||||
// wallet notification channel can be used to automatically register
|
||||
// various notifications for the wallet.
|
||||
func jsonRead(body []byte, s *rpcServer, walletNotification chan []byte) (reply btcjson.Reply, err error) {
|
||||
var message btcjson.Message
|
||||
if err := json.Unmarshal(body, &message); err != nil {
|
||||
jsonError := btcjson.ErrParse
|
||||
|
@ -708,6 +740,13 @@ func jsonRead(body []byte, s *rpcServer) (reply btcjson.Reply, err error) {
|
|||
if err == nil {
|
||||
result = txsha.String()
|
||||
}
|
||||
|
||||
// If called from websocket code, add a mined tx hashes
|
||||
// request.
|
||||
if walletNotification != nil {
|
||||
s.ws.requests.AddMinedTxRequest(walletNotification, &txsha)
|
||||
}
|
||||
|
||||
reply = btcjson.Reply{
|
||||
Result: result,
|
||||
Error: nil,
|
||||
|
@ -990,58 +1029,32 @@ func getDifficultyRatio(bits uint32) float64 {
|
|||
// AddWalletListener adds a channel to listen for new messages from a
|
||||
// wallet.
|
||||
func (s *rpcServer) AddWalletListener(c chan []byte) {
|
||||
s.ws.addWalletListener <- c
|
||||
s.ws.walletListeners.Lock()
|
||||
s.ws.walletListeners.m[c] = true
|
||||
s.ws.walletListeners.Unlock()
|
||||
}
|
||||
|
||||
// RemoveWalletListener removes a wallet listener channel.
|
||||
func (s *rpcServer) RemoveWalletListener(c chan []byte) {
|
||||
s.ws.removeWalletListener <- c
|
||||
s.ws.walletListeners.Lock()
|
||||
delete(s.ws.walletListeners.m, c)
|
||||
s.ws.walletListeners.Unlock()
|
||||
}
|
||||
|
||||
// walletListenerDuplicator listens for new wallet listener channels
|
||||
// and duplicates messages sent to walletNotificationMaster to all
|
||||
// connected listeners.
|
||||
func (s *rpcServer) walletListenerDuplicator() {
|
||||
// walletListeners is a map holding each currently connected wallet
|
||||
// listener as the key. The value is ignored, as this is only used as
|
||||
// a set.
|
||||
walletListeners := make(map[chan []byte]bool)
|
||||
|
||||
// Don't want to add or delete a wallet listener while iterating
|
||||
// through each to propigate to every attached wallet. Use a mutex to
|
||||
// prevent this.
|
||||
var mtx sync.Mutex
|
||||
|
||||
// Check for listener channels to add or remove from set.
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case c := <-s.ws.addWalletListener:
|
||||
mtx.Lock()
|
||||
walletListeners[c] = true
|
||||
mtx.Unlock()
|
||||
|
||||
case c := <-s.ws.removeWalletListener:
|
||||
mtx.Lock()
|
||||
delete(walletListeners, c)
|
||||
mtx.Unlock()
|
||||
|
||||
case <-s.quit:
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Duplicate all messages sent across walletNotificationMaster to each
|
||||
// listening wallet.
|
||||
for {
|
||||
select {
|
||||
case ntfn := <-s.ws.walletNotificationMaster:
|
||||
mtx.Lock()
|
||||
for c := range walletListeners {
|
||||
s.ws.walletListeners.RLock()
|
||||
for c := range s.ws.walletListeners.m {
|
||||
c <- ntfn
|
||||
}
|
||||
mtx.Unlock()
|
||||
s.ws.walletListeners.RUnlock()
|
||||
|
||||
case <-s.quit:
|
||||
return
|
||||
|
@ -1108,7 +1121,7 @@ func (s *rpcServer) walletReqsNotifications(ws *websocket.Conn) {
|
|||
// sending the marshalled reply to a wallet notification channel.
|
||||
func (s *rpcServer) websocketJSONHandler(walletNotification chan []byte, msg []byte) {
|
||||
s.wg.Add(1)
|
||||
reply, err := jsonRead(msg, s)
|
||||
reply, err := jsonRead(msg, s, walletNotification)
|
||||
s.wg.Done()
|
||||
|
||||
if err != ErrMethodNotImplemented {
|
||||
|
@ -1162,27 +1175,60 @@ func (s *rpcServer) websocketJSONHandler(walletNotification chan []byte, msg []b
|
|||
// of a new block connected to the main chain. The notification is sent
|
||||
// to each connected wallet.
|
||||
func (s *rpcServer) NotifyBlockConnected(block *btcutil.Block) {
|
||||
var id interface{} = "btcd:blockconnected"
|
||||
hash, err := block.Sha()
|
||||
if err != nil {
|
||||
log.Error("Bad block; connected block notification dropped.")
|
||||
return
|
||||
}
|
||||
ntfn := btcjson.Reply{
|
||||
Result: struct {
|
||||
Hash string `json:"hash"`
|
||||
Height int64 `json:"height"`
|
||||
s.ws.walletListeners.RLock()
|
||||
for wltNtfn := range s.ws.walletListeners.m {
|
||||
// Create notification with basic information filled in.
|
||||
// This data is the same for every connected wallet.
|
||||
hash, err := block.Sha()
|
||||
if err != nil {
|
||||
log.Error("Bad block; connected block notification dropped.")
|
||||
return
|
||||
}
|
||||
ntfnResult := struct {
|
||||
Hash string `json:"hash"`
|
||||
Height int64 `json:"height"`
|
||||
MinedTXs []string `json:"minedtxs"`
|
||||
}{
|
||||
Hash: hash.String(),
|
||||
Height: block.Height(),
|
||||
},
|
||||
Id: &id,
|
||||
}
|
||||
|
||||
// Fill in additional wallet-specific notifications. If there
|
||||
// is no request context for this wallet, no need to give this
|
||||
// wallet any extra notifications.
|
||||
if cxt := s.ws.requests.m[wltNtfn]; cxt != nil {
|
||||
// Create list of all txs created by this wallet that appear in this
|
||||
// block.
|
||||
minedTxShas := make([]string, 0, len(cxt.minedTxRequests))
|
||||
|
||||
// TxShas does not return a non-nil error.
|
||||
txShaList, _ := block.TxShas()
|
||||
txList := s.server.db.FetchTxByShaList(txShaList)
|
||||
for _, txReply := range txList {
|
||||
if txReply.Err != nil {
|
||||
continue
|
||||
}
|
||||
if _, ok := cxt.minedTxRequests[*txReply.Sha]; ok {
|
||||
minedTxShas = append(minedTxShas, txReply.Sha.String())
|
||||
s.ws.requests.RemoveMinedTxRequest(wltNtfn, txReply.Sha)
|
||||
}
|
||||
}
|
||||
|
||||
ntfnResult.MinedTXs = minedTxShas
|
||||
}
|
||||
|
||||
var id interface{} = "btcd:blockconnected"
|
||||
ntfn := btcjson.Reply{
|
||||
Result: ntfnResult,
|
||||
Id: &id,
|
||||
}
|
||||
m, _ := json.Marshal(ntfn)
|
||||
wltNtfn <- m
|
||||
}
|
||||
m, _ := json.Marshal(ntfn)
|
||||
s.ws.walletNotificationMaster <- m
|
||||
s.ws.walletListeners.RUnlock()
|
||||
}
|
||||
|
||||
// NotifyBlockDisconnected creates and marshalls a JSON message to notify
|
||||
// NotifyBlockDisconnected creates and marshals a JSON message to notify
|
||||
// of a new block disconnected from the main chain. The notification is sent
|
||||
// to each connected wallet.
|
||||
func (s *rpcServer) NotifyBlockDisconnected(block *btcutil.Block) {
|
||||
|
@ -1206,10 +1252,10 @@ func (s *rpcServer) NotifyBlockDisconnected(block *btcutil.Block) {
|
|||
s.ws.walletNotificationMaster <- m
|
||||
}
|
||||
|
||||
// NotifyNewTxListeners creates and marshals a JSON message to notify wallets
|
||||
// NotifyBlockTXs creates and marshals a JSON message to notify wallets
|
||||
// of new transactions (with both spent and unspent outputs) for a watched
|
||||
// address.
|
||||
func (s *rpcServer) NotifyNewTxListeners(db btcdb.Db, block *btcutil.Block) {
|
||||
func (s *rpcServer) NotifyBlockTXs(db btcdb.Db, block *btcutil.Block) {
|
||||
txShaList, err := block.TxShas()
|
||||
if err != nil {
|
||||
log.Error("Bad block; All notifications for block dropped.")
|
||||
|
|
Loading…
Add table
Reference in a new issue