Cleanly remove disconnected clients.

Also fixes a bug where responses for a single client would be sent to
every connected client.
This commit is contained in:
Josh Rickmar 2014-02-17 22:18:30 -05:00
parent 3a6ae93a4b
commit e837ca5b64
5 changed files with 53 additions and 76 deletions

View file

@ -229,8 +229,8 @@ func (am *AccountManager) BlockNotify(bs *wallet.BlockStamp) {
// changes, or sending these notifications as the utxos are added. // changes, or sending these notifications as the utxos are added.
confirmed := a.CalculateBalance(1) confirmed := a.CalculateBalance(1)
unconfirmed := a.CalculateBalance(0) - confirmed unconfirmed := a.CalculateBalance(0) - confirmed
NotifyWalletBalance(frontendNotificationMaster, a.name, confirmed) NotifyWalletBalance(allClients, a.name, confirmed)
NotifyWalletBalanceUnconfirmed(frontendNotificationMaster, a.name, NotifyWalletBalanceUnconfirmed(allClients, a.name,
unconfirmed) unconfirmed)
// If this is the default account, update the block all accounts // If this is the default account, update the block all accounts

4
cmd.go
View file

@ -227,7 +227,7 @@ func main() {
} }
updateBtcd <- btcd updateBtcd <- btcd
NotifyBtcdConnection(frontendNotificationMaster) NotifyBtcdConnection(allClients)
log.Info("Established connection to btcd") log.Info("Established connection to btcd")
// Perform handshake. // Perform handshake.
@ -246,7 +246,7 @@ func main() {
// Block goroutine until the connection is lost. // Block goroutine until the connection is lost.
<-btcd.closed <-btcd.closed
NotifyBtcdConnection(frontendNotificationMaster) NotifyBtcdConnection(allClients)
log.Info("Lost btcd connection") log.Info("Lost btcd connection")
} }
} }

View file

@ -129,7 +129,7 @@ func NtfnProcessedTx(n btcjson.Cmd) {
} else { } else {
// Notify frontends of new recv tx and mark as notified. // Notify frontends of new recv tx and mark as notified.
NotifiedRecvTxChans.add <- *recvTxOP NotifiedRecvTxChans.add <- *recvTxOP
NotifyNewTxDetails(frontendNotificationMaster, a.Name(), t.TxInfo(a.Name(), NotifyNewTxDetails(allClients, a.Name(), t.TxInfo(a.Name(),
ptn.BlockHeight, a.Wallet.Net())[0]) ptn.BlockHeight, a.Wallet.Net())[0])
} }
@ -151,16 +151,15 @@ func NtfnProcessedTx(n btcjson.Cmd) {
// the blockconnected notifiation is processed. // the blockconnected notifiation is processed.
if u.Height == -1 { if u.Height == -1 {
bal := a.CalculateBalance(0) - a.CalculateBalance(1) bal := a.CalculateBalance(0) - a.CalculateBalance(1)
NotifyWalletBalanceUnconfirmed(frontendNotificationMaster, NotifyWalletBalanceUnconfirmed(allClients, a.name, bal)
a.name, bal)
} }
} }
// Notify frontends of new account balance. // Notify frontends of new account balance.
confirmed := a.CalculateBalance(1) confirmed := a.CalculateBalance(1)
unconfirmed := a.CalculateBalance(0) - confirmed unconfirmed := a.CalculateBalance(0) - confirmed
NotifyWalletBalance(frontendNotificationMaster, a.name, confirmed) NotifyWalletBalance(allClients, a.name, confirmed)
NotifyWalletBalanceUnconfirmed(frontendNotificationMaster, a.name, unconfirmed) NotifyWalletBalanceUnconfirmed(allClients, a.name, unconfirmed)
} }
// NtfnBlockConnected handles btcd notifications resulting from newly // NtfnBlockConnected handles btcd notifications resulting from newly
@ -208,7 +207,7 @@ func NtfnBlockConnected(n btcjson.Cmd) {
// Pass notification to frontends too. // Pass notification to frontends too.
marshaled, _ := n.MarshalJSON() marshaled, _ := n.MarshalJSON()
frontendNotificationMaster <- marshaled allClients <- marshaled
} }
// NtfnBlockDisconnected handles btcd notifications resulting from // NtfnBlockDisconnected handles btcd notifications resulting from
@ -231,7 +230,7 @@ func NtfnBlockDisconnected(n btcjson.Cmd) {
// Pass notification to frontends too. // Pass notification to frontends too.
marshaled, _ := n.MarshalJSON() marshaled, _ := n.MarshalJSON()
frontendNotificationMaster <- marshaled allClients <- marshaled
} }
// NtfnTxMined handles btcd notifications resulting from newly // NtfnTxMined handles btcd notifications resulting from newly

View file

@ -1357,7 +1357,7 @@ func handleSendRawTxReply(icmd btcjson.Cmd, txIDStr string, a *Account, txInfo *
bs, err := GetCurBlock() bs, err := GetCurBlock()
if err == nil { if err == nil {
for _, details := range sendtx.TxInfo(a.Name(), bs.Height, a.Net()) { for _, details := range sendtx.TxInfo(a.Name(), bs.Height, a.Net()) {
NotifyNewTxDetails(frontendNotificationMaster, a.Name(), NotifyNewTxDetails(allClients, a.Name(),
details) details)
} }
} }
@ -1379,8 +1379,8 @@ func handleSendRawTxReply(icmd btcjson.Cmd, txIDStr string, a *Account, txInfo *
// confirmed balance. // confirmed balance.
confirmed := a.CalculateBalance(1) confirmed := a.CalculateBalance(1)
unconfirmed := a.CalculateBalance(0) - confirmed unconfirmed := a.CalculateBalance(0) - confirmed
NotifyWalletBalance(frontendNotificationMaster, a.name, confirmed) NotifyWalletBalance(allClients, a.name, confirmed)
NotifyWalletBalanceUnconfirmed(frontendNotificationMaster, a.name, unconfirmed) NotifyWalletBalanceUnconfirmed(allClients, a.name, unconfirmed)
// btcd cannot be trusted to successfully relay the tx to the // btcd cannot be trusted to successfully relay the tx to the
// Bitcoin network. Even if this succeeds, the rawtx must be // Bitcoin network. Even if this succeeds, the rawtx must be
@ -1796,7 +1796,7 @@ type AccountNtfn struct {
func NotifyWalletLockStateChange(account string, locked bool) { func NotifyWalletLockStateChange(account string, locked bool) {
ntfn := btcws.NewWalletLockStateNtfn(account, locked) ntfn := btcws.NewWalletLockStateNtfn(account, locked)
mntfn, _ := ntfn.MarshalJSON() mntfn, _ := ntfn.MarshalJSON()
frontendNotificationMaster <- mntfn allClients <- mntfn
} }
// NotifyWalletBalance sends a confirmed account balance notification // NotifyWalletBalance sends a confirmed account balance notification
@ -1892,7 +1892,7 @@ func NotifyMinedTxSender(in chan *tx.RecvTx) {
recv.BlockHash.String(), recv.BlockHeight, recv.BlockHash.String(), recv.BlockHeight,
recv.BlockTime, int(recv.BlockIndex)) recv.BlockTime, int(recv.BlockIndex))
mntfn, _ := ntfn.MarshalJSON() mntfn, _ := ntfn.MarshalJSON()
frontendNotificationMaster <- mntfn allClients <- mntfn
// Mark as sent. // Mark as sent.
m[recv.TxID] = struct{}{} m[recv.TxID] = struct{}{}

View file

@ -52,13 +52,10 @@ var (
ErrConnLost = errors.New("connection lost") ErrConnLost = errors.New("connection lost")
// Adds a frontend listener channel // Adds a frontend listener channel
addFrontendListener = make(chan (chan []byte)) addClient = make(chan clientContext)
// Removes a frontend listener channel
deleteFrontendListener = make(chan (chan []byte))
// Messages sent to this channel are sent to each connected frontend. // Messages sent to this channel are sent to each connected frontend.
frontendNotificationMaster = make(chan []byte, 100) allClients = make(chan []byte, 100)
) )
// server holds the items the RPC server may need to access (auth, // server holds the items the RPC server may need to access (auth,
@ -69,6 +66,11 @@ type server struct {
authsha [sha256.Size]byte authsha [sha256.Size]byte
} }
type clientContext struct {
send chan []byte
disconnected chan struct{} // closed on disconnect
}
// parseListeners splits the list of listen addresses passed in addrs into // parseListeners splits the list of listen addresses passed in addrs into
// IPv4 and IPv6 slices and returns them. This allows easy creation of the // IPv4 and IPv6 slices and returns them. This allows easy creation of the
// listeners on the correct interface "tcp4" and "tcp6". It also properly // listeners on the correct interface "tcp4" and "tcp6". It also properly
@ -262,55 +264,26 @@ func (s *server) ServeRPCRequest(w http.ResponseWriter, r *http.Request) {
} }
} }
// frontendListenerDuplicator listens for new wallet listener channels // clientResponseDuplicator listens for new wallet listener channels
// and duplicates messages sent to frontendNotificationMaster to all // and duplicates messages sent to allClients to all connected clients.
// connected listeners. func clientResponseDuplicator() {
func frontendListenerDuplicator() { clients := make(map[clientContext]struct{})
// frontendListeners is a map holding each currently connected frontend
// listener as the key. The value is ignored, as this is only used as
// a set.
frontendListeners := make(map[chan []byte]bool)
// Don't want to add or delete a wallet listener while iterating for {
// through each to propigate to every attached wallet. Use a binary select {
// semaphore to prevent this. case cc := <-addClient:
sem := make(chan struct{}, 1) clients[cc] = struct{}{}
sem <- struct{}{}
// Check for listener channels to add or remove from set. case n := <-allClients:
go func() { for cc := range clients {
for { select {
select { case <-cc.disconnected:
case c := <-addFrontendListener: delete(clients, cc)
<-sem default:
frontendListeners[c] = true cc.send <- n
sem <- struct{}{}
NotifyBtcdConnection(c)
bs, err := GetCurBlock()
if err == nil {
NotifyNewBlockChainHeight(c, bs)
NotifyBalances(c)
} }
case c := <-deleteFrontendListener:
<-sem
delete(frontendListeners, c)
sem <- struct{}{}
} }
} }
}()
// Duplicate all messages sent across frontendNotificationMaster, as
// well as internal btcwallet notifications, to each listening wallet.
for {
ntfn := <-frontendNotificationMaster
<-sem
for c := range frontendListeners {
c <- ntfn
}
sem <- struct{}{}
} }
} }
@ -331,11 +304,12 @@ func NotifyBtcdConnection(reply chan []byte) {
func WSSendRecv(ws *websocket.Conn) { func WSSendRecv(ws *websocket.Conn) {
// Add frontend notification channel to set so this handler receives // Add frontend notification channel to set so this handler receives
// updates. // updates.
frontendNotification := make(chan []byte) cc := clientContext{
addFrontendListener <- frontendNotification send: make(chan []byte),
defer func() { disconnected: make(chan struct{}),
deleteFrontendListener <- frontendNotification }
}() addClient <- cc
defer close(cc.disconnected)
// jsonMsgs receives JSON messages from the currently connected frontend. // jsonMsgs receives JSON messages from the currently connected frontend.
jsonMsgs := make(chan []byte) jsonMsgs := make(chan []byte)
@ -363,11 +337,15 @@ func WSSendRecv(ws *websocket.Conn) {
// Handle request here. // Handle request here.
go func(m []byte) { go func(m []byte) {
resp := ReplyToFrontend(m, true) resp := ReplyToFrontend(m, true)
frontendNotification <- resp
select {
case cc.send <- resp:
case <-cc.disconnected:
}
}(m) }(m)
case ntfn, _ := <-frontendNotification: case m := <-cc.send:
if err := websocket.Message.Send(ws, ntfn); err != nil { if err := websocket.Message.Send(ws, m); err != nil {
// Frontend disconnected. // Frontend disconnected.
return return
} }
@ -395,7 +373,7 @@ func (s *server) Start() {
// requests for each channel in the set. // requests for each channel in the set.
// //
// Use a sync.Once to insure no extra duplicators run. // Use a sync.Once to insure no extra duplicators run.
go duplicateOnce.Do(frontendListenerDuplicator) go duplicateOnce.Do(clientResponseDuplicator)
log.Trace("Starting RPC server") log.Trace("Starting RPC server")
@ -554,8 +532,8 @@ func Handshake(rpc ServerConn) error {
if err != nil { if err != nil {
return fmt.Errorf("cannot get best block: %v", err) return fmt.Errorf("cannot get best block: %v", err)
} }
NotifyNewBlockChainHeight(frontendNotificationMaster, bs) NotifyNewBlockChainHeight(allClients, bs)
NotifyBalances(frontendNotificationMaster) NotifyBalances(allClients)
// Get default account. Only the default account is used to // Get default account. Only the default account is used to
// track recently-seen blocks. // track recently-seen blocks.