61e96a69e4
Remove the addresses field from TransactionDetails.Output. It is assumed that the caller is able to deserialize the transaction and encode the output scripts to addresses, so this is unnecessary server overhead and conflicts with the current API philosophy of not duplicating data already included in another field. Since there is no additional data included for outputs not controlled by the wallet, remove the `mine` specifier from the Output message and replace it with an output index. Only include messages for controlled outputs, rather than creating messages for both controlled and uncontrolled outputs. Rename the repeated field from `outputs` to `credits` to be consistent with the `debits` field. Bump major API version as this is a breaking change. Closes #408.
623 lines
19 KiB
Go
623 lines
19 KiB
Go
// Copyright (c) 2015-2016 The btcsuite developers
|
|
// Use of this source code is governed by an ISC
|
|
// license that can be found in the LICENSE file.
|
|
|
|
package wallet
|
|
|
|
import (
|
|
"bytes"
|
|
"sync"
|
|
|
|
"github.com/btcsuite/btcd/chaincfg/chainhash"
|
|
"github.com/btcsuite/btcd/txscript"
|
|
"github.com/btcsuite/btcd/wire"
|
|
"github.com/btcsuite/btcutil"
|
|
"github.com/btcsuite/btcwallet/waddrmgr"
|
|
"github.com/btcsuite/btcwallet/wtxmgr"
|
|
)
|
|
|
|
// TODO: It would be good to send errors during notification creation to the rpc
|
|
// server instead of just logging them here so the client is aware that wallet
|
|
// isn't working correctly and notifications are missing.
|
|
|
|
// TODO: Anything dealing with accounts here is expensive because the database
|
|
// is not organized correctly for true account support, but do the slow thing
|
|
// instead of the easy thing since the db can be fixed later, and we want the
|
|
// api correct now.
|
|
|
|
// NotificationServer is a server that interested clients may hook into to
|
|
// receive notifications of changes in a wallet. A client is created for each
|
|
// registered notification. Clients are guaranteed to receive messages in the
|
|
// order wallet created them, but there is no guaranteed synchronization between
|
|
// different clients.
|
|
type NotificationServer struct {
|
|
transactions []chan *TransactionNotifications
|
|
currentTxNtfn *TransactionNotifications // coalesce this since wallet does not add mined txs together
|
|
spentness map[uint32][]chan *SpentnessNotifications
|
|
accountClients []chan *AccountNotification
|
|
mu sync.Mutex // Only protects registered client channels
|
|
wallet *Wallet // smells like hacks
|
|
}
|
|
|
|
func newNotificationServer(wallet *Wallet) *NotificationServer {
|
|
return &NotificationServer{
|
|
spentness: make(map[uint32][]chan *SpentnessNotifications),
|
|
wallet: wallet,
|
|
}
|
|
}
|
|
|
|
func lookupInputAccount(w *Wallet, details *wtxmgr.TxDetails, deb wtxmgr.DebitRecord) uint32 {
|
|
// TODO: Debits should record which account(s?) they
|
|
// debit from so this doesn't need to be looked up.
|
|
prevOP := &details.MsgTx.TxIn[deb.Index].PreviousOutPoint
|
|
prev, err := w.TxStore.TxDetails(&prevOP.Hash)
|
|
if err != nil {
|
|
log.Errorf("Cannot query previous transaction details for %v: %v", prevOP.Hash, err)
|
|
return 0
|
|
}
|
|
if prev == nil {
|
|
log.Errorf("Missing previous transaction %v", prevOP.Hash)
|
|
return 0
|
|
}
|
|
prevOut := prev.MsgTx.TxOut[prevOP.Index]
|
|
_, addrs, _, err := txscript.ExtractPkScriptAddrs(prevOut.PkScript, w.chainParams)
|
|
var inputAcct uint32
|
|
if err == nil && len(addrs) > 0 {
|
|
inputAcct, err = w.Manager.AddrAccount(addrs[0])
|
|
}
|
|
if err != nil {
|
|
log.Errorf("Cannot fetch account for previous output %v: %v", prevOP, err)
|
|
inputAcct = 0
|
|
}
|
|
return inputAcct
|
|
}
|
|
|
|
func lookupOutputChain(w *Wallet, details *wtxmgr.TxDetails, cred wtxmgr.CreditRecord) (account uint32, internal bool) {
|
|
output := details.MsgTx.TxOut[cred.Index]
|
|
_, addrs, _, err := txscript.ExtractPkScriptAddrs(output.PkScript, w.chainParams)
|
|
var ma waddrmgr.ManagedAddress
|
|
if err == nil && len(addrs) > 0 {
|
|
ma, err = w.Manager.Address(addrs[0])
|
|
}
|
|
if err != nil {
|
|
log.Errorf("Cannot fetch account for wallet output: %v", err)
|
|
} else {
|
|
account = ma.Account()
|
|
internal = ma.Internal()
|
|
}
|
|
return
|
|
}
|
|
|
|
func makeTxSummary(w *Wallet, details *wtxmgr.TxDetails) TransactionSummary {
|
|
serializedTx := details.SerializedTx
|
|
if serializedTx == nil {
|
|
var buf bytes.Buffer
|
|
err := details.MsgTx.Serialize(&buf)
|
|
if err != nil {
|
|
log.Errorf("Transaction serialization: %v", err)
|
|
}
|
|
serializedTx = buf.Bytes()
|
|
}
|
|
var fee btcutil.Amount
|
|
if len(details.Debits) == len(details.MsgTx.TxIn) {
|
|
for _, deb := range details.Debits {
|
|
fee += deb.Amount
|
|
}
|
|
for _, txOut := range details.MsgTx.TxOut {
|
|
fee -= btcutil.Amount(txOut.Value)
|
|
}
|
|
}
|
|
var inputs []TransactionSummaryInput
|
|
if len(details.Debits) != 0 {
|
|
inputs = make([]TransactionSummaryInput, len(details.Debits))
|
|
for i, d := range details.Debits {
|
|
inputs[i] = TransactionSummaryInput{
|
|
Index: d.Index,
|
|
PreviousAccount: lookupInputAccount(w, details, d),
|
|
PreviousAmount: d.Amount,
|
|
}
|
|
}
|
|
}
|
|
outputs := make([]TransactionSummaryOutput, 0, len(details.MsgTx.TxOut))
|
|
var credIndex int
|
|
for i := range details.MsgTx.TxOut {
|
|
mine := len(details.Credits) > credIndex && details.Credits[credIndex].Index == uint32(i)
|
|
if !mine {
|
|
continue
|
|
}
|
|
acct, internal := lookupOutputChain(w, details, details.Credits[credIndex])
|
|
output := TransactionSummaryOutput{
|
|
Index: uint32(i),
|
|
Account: acct,
|
|
Internal: internal,
|
|
}
|
|
outputs = append(outputs, output)
|
|
}
|
|
return TransactionSummary{
|
|
Hash: &details.Hash,
|
|
Transaction: serializedTx,
|
|
MyInputs: inputs,
|
|
MyOutputs: outputs,
|
|
Fee: fee,
|
|
Timestamp: details.Received.Unix(),
|
|
}
|
|
}
|
|
|
|
func totalBalances(w *Wallet, m map[uint32]btcutil.Amount) error {
|
|
unspent, err := w.TxStore.UnspentOutputs()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for i := range unspent {
|
|
output := &unspent[i]
|
|
var outputAcct uint32
|
|
_, addrs, _, err := txscript.ExtractPkScriptAddrs(
|
|
output.PkScript, w.chainParams)
|
|
if err == nil && len(addrs) > 0 {
|
|
outputAcct, err = w.Manager.AddrAccount(addrs[0])
|
|
}
|
|
if err == nil {
|
|
_, ok := m[outputAcct]
|
|
if ok {
|
|
m[outputAcct] += output.Amount
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func flattenBalanceMap(m map[uint32]btcutil.Amount) []AccountBalance {
|
|
s := make([]AccountBalance, 0, len(m))
|
|
for k, v := range m {
|
|
s = append(s, AccountBalance{Account: k, TotalBalance: v})
|
|
}
|
|
return s
|
|
}
|
|
|
|
func relevantAccounts(w *Wallet, m map[uint32]btcutil.Amount, txs []TransactionSummary) {
|
|
for _, tx := range txs {
|
|
for _, d := range tx.MyInputs {
|
|
m[d.PreviousAccount] = 0
|
|
}
|
|
for _, c := range tx.MyOutputs {
|
|
m[c.Account] = 0
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *NotificationServer) notifyUnminedTransaction(details *wtxmgr.TxDetails) {
|
|
// Sanity check: should not be currently coalescing a notification for
|
|
// mined transactions at the same time that an unmined tx is notified.
|
|
if s.currentTxNtfn != nil {
|
|
log.Errorf("Notifying unmined tx notification while creating notification for blocks")
|
|
}
|
|
|
|
defer s.mu.Unlock()
|
|
s.mu.Lock()
|
|
clients := s.transactions
|
|
if len(clients) == 0 {
|
|
return
|
|
}
|
|
|
|
unminedTxs := []TransactionSummary{makeTxSummary(s.wallet, details)}
|
|
unminedHashes, err := s.wallet.TxStore.UnminedTxHashes()
|
|
if err != nil {
|
|
log.Errorf("Cannot fetch unmined transaction hashes: %v", err)
|
|
return
|
|
}
|
|
bals := make(map[uint32]btcutil.Amount)
|
|
relevantAccounts(s.wallet, bals, unminedTxs)
|
|
err = totalBalances(s.wallet, bals)
|
|
if err != nil {
|
|
log.Errorf("Cannot determine balances for relevant accounts: %v", err)
|
|
return
|
|
}
|
|
n := &TransactionNotifications{
|
|
UnminedTransactions: unminedTxs,
|
|
UnminedTransactionHashes: unminedHashes,
|
|
NewBalances: flattenBalanceMap(bals),
|
|
}
|
|
for _, c := range clients {
|
|
c <- n
|
|
}
|
|
}
|
|
|
|
func (s *NotificationServer) notifyDetachedBlock(hash *chainhash.Hash) {
|
|
if s.currentTxNtfn == nil {
|
|
s.currentTxNtfn = &TransactionNotifications{}
|
|
}
|
|
s.currentTxNtfn.DetachedBlocks = append(s.currentTxNtfn.DetachedBlocks, hash)
|
|
}
|
|
|
|
func (s *NotificationServer) notifyMinedTransaction(details *wtxmgr.TxDetails, block *wtxmgr.BlockMeta) {
|
|
if s.currentTxNtfn == nil {
|
|
s.currentTxNtfn = &TransactionNotifications{}
|
|
}
|
|
n := len(s.currentTxNtfn.AttachedBlocks)
|
|
if n == 0 || *s.currentTxNtfn.AttachedBlocks[n-1].Hash != block.Hash {
|
|
s.currentTxNtfn.AttachedBlocks = append(s.currentTxNtfn.AttachedBlocks, Block{
|
|
Hash: &block.Hash,
|
|
Height: block.Height,
|
|
Timestamp: block.Time.Unix(),
|
|
})
|
|
n++
|
|
}
|
|
txs := s.currentTxNtfn.AttachedBlocks[n-1].Transactions
|
|
s.currentTxNtfn.AttachedBlocks[n-1].Transactions = append(txs, makeTxSummary(s.wallet, details))
|
|
}
|
|
|
|
func (s *NotificationServer) notifyAttachedBlock(block *wtxmgr.BlockMeta) {
|
|
if s.currentTxNtfn == nil {
|
|
s.currentTxNtfn = &TransactionNotifications{}
|
|
}
|
|
|
|
// Add block details if it wasn't already included for previously
|
|
// notified mined transactions.
|
|
n := len(s.currentTxNtfn.AttachedBlocks)
|
|
if n == 0 || *s.currentTxNtfn.AttachedBlocks[n-1].Hash != block.Hash {
|
|
s.currentTxNtfn.AttachedBlocks = append(s.currentTxNtfn.AttachedBlocks, Block{
|
|
Hash: &block.Hash,
|
|
Height: block.Height,
|
|
Timestamp: block.Time.Unix(),
|
|
})
|
|
}
|
|
|
|
// For now (until notification coalescing isn't necessary) just use
|
|
// chain length to determine if this is the new best block.
|
|
if s.wallet.ChainSynced() {
|
|
if len(s.currentTxNtfn.DetachedBlocks) >= len(s.currentTxNtfn.AttachedBlocks) {
|
|
return
|
|
}
|
|
}
|
|
|
|
defer s.mu.Unlock()
|
|
s.mu.Lock()
|
|
clients := s.transactions
|
|
if len(clients) == 0 {
|
|
s.currentTxNtfn = nil
|
|
return
|
|
}
|
|
|
|
// The UnminedTransactions field is intentionally not set. Since the
|
|
// hashes of all detached blocks are reported, and all transactions
|
|
// moved from a mined block back to unconfirmed are either in the
|
|
// UnminedTransactionHashes slice or don't exist due to conflicting with
|
|
// a mined transaction in the new best chain, there is no possiblity of
|
|
// a new, previously unseen transaction appearing in unconfirmed.
|
|
|
|
unminedHashes, err := s.wallet.TxStore.UnminedTxHashes()
|
|
if err != nil {
|
|
log.Errorf("Cannot fetch unmined transaction hashes: %v", err)
|
|
return
|
|
}
|
|
s.currentTxNtfn.UnminedTransactionHashes = unminedHashes
|
|
|
|
bals := make(map[uint32]btcutil.Amount)
|
|
for _, b := range s.currentTxNtfn.AttachedBlocks {
|
|
relevantAccounts(s.wallet, bals, b.Transactions)
|
|
}
|
|
err = totalBalances(s.wallet, bals)
|
|
if err != nil {
|
|
log.Errorf("Cannot determine balances for relevant accounts: %v", err)
|
|
return
|
|
}
|
|
s.currentTxNtfn.NewBalances = flattenBalanceMap(bals)
|
|
|
|
for _, c := range clients {
|
|
c <- s.currentTxNtfn
|
|
}
|
|
s.currentTxNtfn = nil
|
|
}
|
|
|
|
// TransactionNotifications is a notification of changes to the wallet's
|
|
// transaction set and the current chain tip that wallet is considered to be
|
|
// synced with. All transactions added to the blockchain are organized by the
|
|
// block they were mined in.
|
|
//
|
|
// During a chain switch, all removed block hashes are included. Detached
|
|
// blocks are sorted in the reverse order they were mined. Attached blocks are
|
|
// sorted in the order mined.
|
|
//
|
|
// All newly added unmined transactions are included. Removed unmined
|
|
// transactions are not explicitly included. Instead, the hashes of all
|
|
// transactions still unmined are included.
|
|
//
|
|
// If any transactions were involved, each affected account's new total balance
|
|
// is included.
|
|
//
|
|
// TODO: Because this includes stuff about blocks and can be fired without any
|
|
// changes to transactions, it needs a better name.
|
|
type TransactionNotifications struct {
|
|
AttachedBlocks []Block
|
|
DetachedBlocks []*chainhash.Hash
|
|
UnminedTransactions []TransactionSummary
|
|
UnminedTransactionHashes []*chainhash.Hash
|
|
NewBalances []AccountBalance
|
|
}
|
|
|
|
// Block contains the properties and all relevant transactions of an attached
|
|
// block.
|
|
type Block struct {
|
|
Hash *chainhash.Hash
|
|
Height int32
|
|
Timestamp int64
|
|
Transactions []TransactionSummary
|
|
}
|
|
|
|
// TransactionSummary contains a transaction relevant to the wallet and marks
|
|
// which inputs and outputs were relevant.
|
|
type TransactionSummary struct {
|
|
Hash *chainhash.Hash
|
|
Transaction []byte
|
|
MyInputs []TransactionSummaryInput
|
|
MyOutputs []TransactionSummaryOutput
|
|
Fee btcutil.Amount
|
|
Timestamp int64
|
|
}
|
|
|
|
// TransactionSummaryInput describes a transaction input that is relevant to the
|
|
// wallet. The Index field marks the transaction input index of the transaction
|
|
// (not included here). The PreviousAccount and PreviousAmount fields describe
|
|
// how much this input debits from a wallet account.
|
|
type TransactionSummaryInput struct {
|
|
Index uint32
|
|
PreviousAccount uint32
|
|
PreviousAmount btcutil.Amount
|
|
}
|
|
|
|
// TransactionSummaryOutput describes wallet properties of a transaction output
|
|
// controlled by the wallet. The Index field marks the transaction output index
|
|
// of the transaction (not included here).
|
|
type TransactionSummaryOutput struct {
|
|
Index uint32
|
|
Account uint32
|
|
Internal bool
|
|
}
|
|
|
|
// AccountBalance associates a total (zero confirmation) balance with an
|
|
// account. Balances for other minimum confirmation counts require more
|
|
// expensive logic and it is not clear which minimums a client is interested in,
|
|
// so they are not included.
|
|
type AccountBalance struct {
|
|
Account uint32
|
|
TotalBalance btcutil.Amount
|
|
}
|
|
|
|
// TransactionNotificationsClient receives TransactionNotifications from the
|
|
// NotificationServer over the channel C.
|
|
type TransactionNotificationsClient struct {
|
|
C <-chan *TransactionNotifications
|
|
server *NotificationServer
|
|
}
|
|
|
|
// TransactionNotifications returns a client for receiving
|
|
// TransactionNotifiations notifications over a channel. The channel is
|
|
// unbuffered.
|
|
//
|
|
// When finished, the Done method should be called on the client to disassociate
|
|
// it from the server.
|
|
func (s *NotificationServer) TransactionNotifications() TransactionNotificationsClient {
|
|
c := make(chan *TransactionNotifications)
|
|
s.mu.Lock()
|
|
s.transactions = append(s.transactions, c)
|
|
s.mu.Unlock()
|
|
return TransactionNotificationsClient{
|
|
C: c,
|
|
server: s,
|
|
}
|
|
}
|
|
|
|
// Done deregisters the client from the server and drains any remaining
|
|
// messages. It must be called exactly once when the client is finished
|
|
// receiving notifications.
|
|
func (c *TransactionNotificationsClient) Done() {
|
|
go func() {
|
|
// Drain notifications until the client channel is removed from
|
|
// the server and closed.
|
|
for range c.C {
|
|
}
|
|
}()
|
|
go func() {
|
|
s := c.server
|
|
s.mu.Lock()
|
|
clients := s.transactions
|
|
for i, ch := range clients {
|
|
if c.C == ch {
|
|
clients[i] = clients[len(clients)-1]
|
|
s.transactions = clients[:len(clients)-1]
|
|
close(ch)
|
|
break
|
|
}
|
|
}
|
|
s.mu.Unlock()
|
|
}()
|
|
}
|
|
|
|
// SpentnessNotifications is a notification that is fired for transaction
|
|
// outputs controlled by some account's keys. The notification may be about a
|
|
// newly added unspent transaction output or that a previously unspent output is
|
|
// now spent. When spent, the notification includes the spending transaction's
|
|
// hash and input index.
|
|
type SpentnessNotifications struct {
|
|
hash *chainhash.Hash
|
|
spenderHash *chainhash.Hash
|
|
index uint32
|
|
spenderIndex uint32
|
|
}
|
|
|
|
// Hash returns the transaction hash of the spent output.
|
|
func (n *SpentnessNotifications) Hash() *chainhash.Hash {
|
|
return n.hash
|
|
}
|
|
|
|
// Index returns the transaction output index of the spent output.
|
|
func (n *SpentnessNotifications) Index() uint32 {
|
|
return n.index
|
|
}
|
|
|
|
// Spender returns the spending transction's hash and input index, if any. If
|
|
// the output is unspent, the final bool return is false.
|
|
func (n *SpentnessNotifications) Spender() (*chainhash.Hash, uint32, bool) {
|
|
return n.spenderHash, n.spenderIndex, n.spenderHash != nil
|
|
}
|
|
|
|
// notifyUnspentOutput notifies registered clients of a new unspent output that
|
|
// is controlled by the wallet.
|
|
func (s *NotificationServer) notifyUnspentOutput(account uint32, hash *chainhash.Hash, index uint32) {
|
|
defer s.mu.Unlock()
|
|
s.mu.Lock()
|
|
clients := s.spentness[account]
|
|
if len(clients) == 0 {
|
|
return
|
|
}
|
|
n := &SpentnessNotifications{
|
|
hash: hash,
|
|
index: index,
|
|
}
|
|
for _, c := range clients {
|
|
c <- n
|
|
}
|
|
}
|
|
|
|
// notifySpentOutput notifies registered clients that a previously-unspent
|
|
// output is now spent, and includes the spender hash and input index in the
|
|
// notification.
|
|
func (s *NotificationServer) notifySpentOutput(account uint32, op *wire.OutPoint, spenderHash *chainhash.Hash, spenderIndex uint32) {
|
|
defer s.mu.Unlock()
|
|
s.mu.Lock()
|
|
clients := s.spentness[account]
|
|
if len(clients) == 0 {
|
|
return
|
|
}
|
|
n := &SpentnessNotifications{
|
|
hash: &op.Hash,
|
|
index: op.Index,
|
|
spenderHash: spenderHash,
|
|
spenderIndex: spenderIndex,
|
|
}
|
|
for _, c := range clients {
|
|
c <- n
|
|
}
|
|
}
|
|
|
|
// SpentnessNotificationsClient receives SpentnessNotifications from the
|
|
// NotificationServer over the channel C.
|
|
type SpentnessNotificationsClient struct {
|
|
C <-chan *SpentnessNotifications
|
|
account uint32
|
|
server *NotificationServer
|
|
}
|
|
|
|
// AccountSpentnessNotifications registers a client for spentness changes of
|
|
// outputs controlled by the account.
|
|
func (s *NotificationServer) AccountSpentnessNotifications(account uint32) SpentnessNotificationsClient {
|
|
c := make(chan *SpentnessNotifications)
|
|
s.mu.Lock()
|
|
s.spentness[account] = append(s.spentness[account], c)
|
|
s.mu.Unlock()
|
|
return SpentnessNotificationsClient{
|
|
C: c,
|
|
account: account,
|
|
server: s,
|
|
}
|
|
}
|
|
|
|
// Done deregisters the client from the server and drains any remaining
|
|
// messages. It must be called exactly once when the client is finished
|
|
// receiving notifications.
|
|
func (c *SpentnessNotificationsClient) Done() {
|
|
go func() {
|
|
// Drain notifications until the client channel is removed from
|
|
// the server and closed.
|
|
for range c.C {
|
|
}
|
|
}()
|
|
go func() {
|
|
s := c.server
|
|
s.mu.Lock()
|
|
clients := s.spentness[c.account]
|
|
for i, ch := range clients {
|
|
if c.C == ch {
|
|
clients[i] = clients[len(clients)-1]
|
|
s.spentness[c.account] = clients[:len(clients)-1]
|
|
close(ch)
|
|
break
|
|
}
|
|
}
|
|
s.mu.Unlock()
|
|
}()
|
|
}
|
|
|
|
// AccountNotification contains properties regarding an account, such as its
|
|
// name and the number of derived and imported keys. When any of these
|
|
// properties change, the notification is fired.
|
|
type AccountNotification struct {
|
|
AccountNumber uint32
|
|
AccountName string
|
|
ExternalKeyCount uint32
|
|
InternalKeyCount uint32
|
|
ImportedKeyCount uint32
|
|
}
|
|
|
|
func (s *NotificationServer) notifyAccountProperties(props *waddrmgr.AccountProperties) {
|
|
defer s.mu.Unlock()
|
|
s.mu.Lock()
|
|
clients := s.accountClients
|
|
if len(clients) == 0 {
|
|
return
|
|
}
|
|
n := &AccountNotification{
|
|
AccountNumber: props.AccountNumber,
|
|
AccountName: props.AccountName,
|
|
ExternalKeyCount: props.ExternalKeyCount,
|
|
InternalKeyCount: props.InternalKeyCount,
|
|
ImportedKeyCount: props.ImportedKeyCount,
|
|
}
|
|
for _, c := range clients {
|
|
c <- n
|
|
}
|
|
}
|
|
|
|
// AccountNotificationsClient receives AccountNotifications over the channel C.
|
|
type AccountNotificationsClient struct {
|
|
C chan *AccountNotification
|
|
server *NotificationServer
|
|
}
|
|
|
|
// AccountNotifications returns a client for receiving AccountNotifications over
|
|
// a channel. The channel is unbuffered. When finished, the client's Done
|
|
// method should be called to disassociate the client from the server.
|
|
func (s *NotificationServer) AccountNotifications() AccountNotificationsClient {
|
|
c := make(chan *AccountNotification)
|
|
s.mu.Lock()
|
|
s.accountClients = append(s.accountClients, c)
|
|
s.mu.Unlock()
|
|
return AccountNotificationsClient{
|
|
C: c,
|
|
server: s,
|
|
}
|
|
}
|
|
|
|
// Done deregisters the client from the server and drains any remaining
|
|
// messages. It must be called exactly once when the client is finished
|
|
// receiving notifications.
|
|
func (c *AccountNotificationsClient) Done() {
|
|
go func() {
|
|
for range c.C {
|
|
}
|
|
}()
|
|
go func() {
|
|
s := c.server
|
|
s.mu.Lock()
|
|
clients := s.accountClients
|
|
for i, ch := range clients {
|
|
if c.C == ch {
|
|
clients[i] = clients[len(clients)-1]
|
|
s.accountClients = clients[:len(clients)-1]
|
|
close(ch)
|
|
break
|
|
}
|
|
}
|
|
s.mu.Unlock()
|
|
}()
|
|
}
|