diff --git a/blockmanager.go b/blockmanager.go index 53e17ca1..48ca4236 100644 --- a/blockmanager.go +++ b/blockmanager.go @@ -653,7 +653,7 @@ func (b *blockManager) handleNotifyMsg(notification *btcchain.Notification) { // Notify frontends if r := b.server.rpcServer; r != nil { go r.NotifyBlockConnected(block) - go r.NotifyNewTxListeners(b.server.db, block) + go r.NotifyBlockTXs(b.server.db, block) } // A block has been disconnected from the main block chain. diff --git a/rpcserver.go b/rpcserver.go index 41b7a9a6..f442e786 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -57,15 +57,17 @@ type rpcServer struct { // wsContext holds the items the RPC server needs to handle websocket // connections for wallets. type wsContext struct { + // walletListeners holds a map of each currently connected wallet + // listener as the key. The value is ignored, as this is only used as + // a set. A mutex is used to prevent incorrect multiple access. + walletListeners struct { + sync.RWMutex + m map[chan []byte]bool + } + // requests holds all wallet notification requests. requests wsRequests - // Channel to add a wallet listener. - addWalletListener chan (chan []byte) - - // Channel to removes a wallet listener. - removeWalletListener chan (chan []byte) - // Any chain notifications meant to be received by every connected // wallet are sent across this channel. walletNotificationMaster chan []byte @@ -85,8 +87,9 @@ func (r *wsRequests) getOrCreateContexts(walletNotification chan []byte) *reques rc, ok := r.m[walletNotification] if !ok { rc = &requestContexts{ - txRequests: make(map[addressHash]interface{}), - spentRequests: make(map[btcwire.OutPoint]interface{}), + txRequests: make(map[addressHash]interface{}), + spentRequests: make(map[btcwire.OutPoint]interface{}), + minedTxRequests: make(map[btcwire.ShaHash]bool), } r.m[walletNotification] = rc } @@ -122,6 +125,26 @@ func (r *wsRequests) RemoveSpentRequest(walletNotification chan []byte, op *btcw delete(rc.spentRequests, *op) } +// AddMinedTxRequest adds request contexts for notifications of a +// mined transaction. +func (r *wsRequests) AddMinedTxRequest(walletNotification chan []byte, txID *btcwire.ShaHash) { + r.Lock() + defer r.Unlock() + + rc := r.getOrCreateContexts(walletNotification) + rc.minedTxRequests[*txID] = true +} + +// RemoveMinedTxRequest removes request contexts for notifications of a +// mined transaction. +func (r *wsRequests) RemoveMinedTxRequest(walletNotification chan []byte, txID *btcwire.ShaHash) { + r.Lock() + defer r.Unlock() + + rc := r.getOrCreateContexts(walletNotification) + delete(rc.minedTxRequests, *txID) +} + // CloseListeners removes all request contexts for notifications sent // to a wallet notification channel and closes the channel to stop all // goroutines currently serving that wallet. @@ -147,6 +170,14 @@ type requestContexts struct { // replies can be correctly routed back to the correct // btcwallet callback. spentRequests map[btcwire.OutPoint]interface{} + + // minedTxRequests holds a set of transaction IDs (tx hashes) of + // transactions created by a wallet. A wallet may request + // notifications of when a tx it created is mined into a block and + // removed from the mempool. Once a tx has been mined into a + // block, wallet may remove the raw transaction from its unmined tx + // pool. + minedTxRequests map[btcwire.ShaHash]bool } // Start is used by server.go to start the rpc listener. @@ -217,8 +248,7 @@ func newRPCServer(s *server) (*rpcServer, error) { // initialize memory for websocket connections rpc.ws.requests.m = make(map[chan []byte]*requestContexts) - rpc.ws.addWalletListener = make(chan (chan []byte)) - rpc.ws.removeWalletListener = make(chan (chan []byte)) + rpc.ws.walletListeners.m = make(map[chan []byte]bool) rpc.ws.walletNotificationMaster = make(chan []byte) // IPv4 listener. @@ -267,7 +297,7 @@ func jsonRPCRead(w http.ResponseWriter, r *http.Request, s *rpcServer) { // websocket handler to tell when a method is not supported by // the standard RPC API, and is not needed here. Error logging // is done inside jsonRead, so no need to log the error here. - reply, _ := jsonRead(body, s) + reply, _ := jsonRead(body, s, nil) log.Tracef("[RPCS] reply: %v", reply) msg, err := btcjson.MarshallAndSend(reply, w) @@ -279,8 +309,10 @@ func jsonRPCRead(w http.ResponseWriter, r *http.Request, s *rpcServer) { } // jsonRead abstracts the JSON unmarshalling and reply handling used -// by both RPC and websockets. -func jsonRead(body []byte, s *rpcServer) (reply btcjson.Reply, err error) { +// by both RPC and websockets. If called from websocket code, a non-nil +// wallet notification channel can be used to automatically register +// various notifications for the wallet. +func jsonRead(body []byte, s *rpcServer, walletNotification chan []byte) (reply btcjson.Reply, err error) { var message btcjson.Message if err := json.Unmarshal(body, &message); err != nil { jsonError := btcjson.ErrParse @@ -708,6 +740,13 @@ func jsonRead(body []byte, s *rpcServer) (reply btcjson.Reply, err error) { if err == nil { result = txsha.String() } + + // If called from websocket code, add a mined tx hashes + // request. + if walletNotification != nil { + s.ws.requests.AddMinedTxRequest(walletNotification, &txsha) + } + reply = btcjson.Reply{ Result: result, Error: nil, @@ -990,58 +1029,32 @@ func getDifficultyRatio(bits uint32) float64 { // AddWalletListener adds a channel to listen for new messages from a // wallet. func (s *rpcServer) AddWalletListener(c chan []byte) { - s.ws.addWalletListener <- c + s.ws.walletListeners.Lock() + s.ws.walletListeners.m[c] = true + s.ws.walletListeners.Unlock() } // RemoveWalletListener removes a wallet listener channel. func (s *rpcServer) RemoveWalletListener(c chan []byte) { - s.ws.removeWalletListener <- c + s.ws.walletListeners.Lock() + delete(s.ws.walletListeners.m, c) + s.ws.walletListeners.Unlock() } // walletListenerDuplicator listens for new wallet listener channels // and duplicates messages sent to walletNotificationMaster to all // connected listeners. func (s *rpcServer) walletListenerDuplicator() { - // walletListeners is a map holding each currently connected wallet - // listener as the key. The value is ignored, as this is only used as - // a set. - walletListeners := make(map[chan []byte]bool) - - // Don't want to add or delete a wallet listener while iterating - // through each to propigate to every attached wallet. Use a mutex to - // prevent this. - var mtx sync.Mutex - - // Check for listener channels to add or remove from set. - go func() { - for { - select { - case c := <-s.ws.addWalletListener: - mtx.Lock() - walletListeners[c] = true - mtx.Unlock() - - case c := <-s.ws.removeWalletListener: - mtx.Lock() - delete(walletListeners, c) - mtx.Unlock() - - case <-s.quit: - return - } - } - }() - // Duplicate all messages sent across walletNotificationMaster to each // listening wallet. for { select { case ntfn := <-s.ws.walletNotificationMaster: - mtx.Lock() - for c := range walletListeners { + s.ws.walletListeners.RLock() + for c := range s.ws.walletListeners.m { c <- ntfn } - mtx.Unlock() + s.ws.walletListeners.RUnlock() case <-s.quit: return @@ -1108,7 +1121,7 @@ func (s *rpcServer) walletReqsNotifications(ws *websocket.Conn) { // sending the marshalled reply to a wallet notification channel. func (s *rpcServer) websocketJSONHandler(walletNotification chan []byte, msg []byte) { s.wg.Add(1) - reply, err := jsonRead(msg, s) + reply, err := jsonRead(msg, s, walletNotification) s.wg.Done() if err != ErrMethodNotImplemented { @@ -1162,27 +1175,60 @@ func (s *rpcServer) websocketJSONHandler(walletNotification chan []byte, msg []b // of a new block connected to the main chain. The notification is sent // to each connected wallet. func (s *rpcServer) NotifyBlockConnected(block *btcutil.Block) { - var id interface{} = "btcd:blockconnected" - hash, err := block.Sha() - if err != nil { - log.Error("Bad block; connected block notification dropped.") - return - } - ntfn := btcjson.Reply{ - Result: struct { - Hash string `json:"hash"` - Height int64 `json:"height"` + s.ws.walletListeners.RLock() + for wltNtfn := range s.ws.walletListeners.m { + // Create notification with basic information filled in. + // This data is the same for every connected wallet. + hash, err := block.Sha() + if err != nil { + log.Error("Bad block; connected block notification dropped.") + return + } + ntfnResult := struct { + Hash string `json:"hash"` + Height int64 `json:"height"` + MinedTXs []string `json:"minedtxs"` }{ Hash: hash.String(), Height: block.Height(), - }, - Id: &id, + } + + // Fill in additional wallet-specific notifications. If there + // is no request context for this wallet, no need to give this + // wallet any extra notifications. + if cxt := s.ws.requests.m[wltNtfn]; cxt != nil { + // Create list of all txs created by this wallet that appear in this + // block. + minedTxShas := make([]string, 0, len(cxt.minedTxRequests)) + + // TxShas does not return a non-nil error. + txShaList, _ := block.TxShas() + txList := s.server.db.FetchTxByShaList(txShaList) + for _, txReply := range txList { + if txReply.Err != nil { + continue + } + if _, ok := cxt.minedTxRequests[*txReply.Sha]; ok { + minedTxShas = append(minedTxShas, txReply.Sha.String()) + s.ws.requests.RemoveMinedTxRequest(wltNtfn, txReply.Sha) + } + } + + ntfnResult.MinedTXs = minedTxShas + } + + var id interface{} = "btcd:blockconnected" + ntfn := btcjson.Reply{ + Result: ntfnResult, + Id: &id, + } + m, _ := json.Marshal(ntfn) + wltNtfn <- m } - m, _ := json.Marshal(ntfn) - s.ws.walletNotificationMaster <- m + s.ws.walletListeners.RUnlock() } -// NotifyBlockDisconnected creates and marshalls a JSON message to notify +// NotifyBlockDisconnected creates and marshals a JSON message to notify // of a new block disconnected from the main chain. The notification is sent // to each connected wallet. func (s *rpcServer) NotifyBlockDisconnected(block *btcutil.Block) { @@ -1206,10 +1252,10 @@ func (s *rpcServer) NotifyBlockDisconnected(block *btcutil.Block) { s.ws.walletNotificationMaster <- m } -// NotifyNewTxListeners creates and marshals a JSON message to notify wallets +// NotifyBlockTXs creates and marshals a JSON message to notify wallets // of new transactions (with both spent and unspent outputs) for a watched // address. -func (s *rpcServer) NotifyNewTxListeners(db btcdb.Db, block *btcutil.Block) { +func (s *rpcServer) NotifyBlockTXs(db btcdb.Db, block *btcutil.Block) { txShaList, err := block.TxShas() if err != nil { log.Error("Bad block; All notifications for block dropped.")