Remove data races from switching lock impls.

sync.Locker cannot be safely used to switch a sync.Mutex to a noop
locker since other goroutines that attempt to lock the mutex will race
on the changing interface.  Instead, just statically dispatch
sync.Mutex methods.
This commit is contained in:
Josh Rickmar 2015-06-12 11:40:04 -04:00
parent 9d5abaf14e
commit 411eacbeea
2 changed files with 38 additions and 85 deletions

View file

@ -256,7 +256,7 @@ type rpcServer struct {
chainSvr *chain.Client chainSvr *chain.Client
createOK bool createOK bool
handlerLookup func(string) (requestHandler, bool) handlerLookup func(string) (requestHandler, bool)
handlerLock sync.Locker handlerMu sync.Mutex
listeners []net.Listener listeners []net.Listener
authsha [sha256.Size]byte authsha [sha256.Size]byte
@ -303,7 +303,6 @@ func newRPCServer(listenAddrs []string, maxPost, maxWebsockets int64) (*rpcServe
auth := "Basic " + base64.StdEncoding.EncodeToString([]byte(login)) auth := "Basic " + base64.StdEncoding.EncodeToString([]byte(login))
s := rpcServer{ s := rpcServer{
handlerLookup: unloadedWalletHandlerFunc, handlerLookup: unloadedWalletHandlerFunc,
handlerLock: new(sync.Mutex),
authsha: sha256.Sum256([]byte(auth)), authsha: sha256.Sum256([]byte(auth)),
maxPostClients: maxPost, maxPostClients: maxPost,
maxWebsocketClients: maxWebsockets, maxWebsocketClients: maxWebsockets,
@ -474,14 +473,14 @@ func (s *rpcServer) Stop() {
log.Warn("Server shutting down") log.Warn("Server shutting down")
// Stop the connected wallet and chain server, if any. // Stop the connected wallet and chain server, if any.
s.handlerLock.Lock() s.handlerMu.Lock()
if s.wallet != nil { if s.wallet != nil {
s.wallet.Stop() s.wallet.Stop()
} }
if s.chainSvr != nil { if s.chainSvr != nil {
s.chainSvr.Stop() s.chainSvr.Stop()
} }
s.handlerLock.Unlock() s.handlerMu.Unlock()
// Stop all the listeners. // Stop all the listeners.
for _, listener := range s.listeners { for _, listener := range s.listeners {
@ -499,30 +498,25 @@ func (s *rpcServer) Stop() {
func (s *rpcServer) WaitForShutdown() { func (s *rpcServer) WaitForShutdown() {
// First wait for the wallet and chain server to stop, if they // First wait for the wallet and chain server to stop, if they
// were ever set. // were ever set.
s.handlerLock.Lock() s.handlerMu.Lock()
if s.wallet != nil { if s.wallet != nil {
s.wallet.WaitForShutdown() s.wallet.WaitForShutdown()
} }
if s.chainSvr != nil { if s.chainSvr != nil {
s.chainSvr.WaitForShutdown() s.chainSvr.WaitForShutdown()
} }
s.handlerLock.Unlock() s.handlerMu.Unlock()
s.wg.Wait() s.wg.Wait()
} }
type noopLocker struct{}
func (noopLocker) Lock() {}
func (noopLocker) Unlock() {}
// SetWallet sets the wallet dependency component needed to run a fully // SetWallet sets the wallet dependency component needed to run a fully
// functional bitcoin wallet RPC server. If wallet is nil, this informs the // functional bitcoin wallet RPC server. If wallet is nil, this informs the
// server that the createencryptedwallet RPC method is valid and must be called // server that the createencryptedwallet RPC method is valid and must be called
// by a client before any other wallet methods are allowed. // by a client before any other wallet methods are allowed.
func (s *rpcServer) SetWallet(wallet *wallet.Wallet) { func (s *rpcServer) SetWallet(wallet *wallet.Wallet) {
s.handlerLock.Lock() defer s.handlerMu.Unlock()
defer s.handlerLock.Unlock() s.handlerMu.Lock()
if wallet == nil { if wallet == nil {
s.handlerLookup = missingWalletHandlerFunc s.handlerLookup = missingWalletHandlerFunc
@ -534,11 +528,6 @@ func (s *rpcServer) SetWallet(wallet *wallet.Wallet) {
s.registerWalletNtfns <- struct{}{} s.registerWalletNtfns <- struct{}{}
if s.chainSvr != nil { if s.chainSvr != nil {
// If the chain server rpc client is also set, there's no reason
// to keep the mutex around. Make the locker simply execute
// noops instead.
s.handlerLock = noopLocker{}
// With both the wallet and chain server set, all handlers are // With both the wallet and chain server set, all handlers are
// ok to run. // ok to run.
s.handlerLookup = lookupAnyHandler s.handlerLookup = lookupAnyHandler
@ -551,17 +540,12 @@ func (s *rpcServer) SetWallet(wallet *wallet.Wallet) {
// a never connected client, rather than panicking (or never being looked up) // a never connected client, rather than panicking (or never being looked up)
// if the client was never conneceted and added. // if the client was never conneceted and added.
func (s *rpcServer) SetChainServer(chainSvr *chain.Client) { func (s *rpcServer) SetChainServer(chainSvr *chain.Client) {
s.handlerLock.Lock() defer s.handlerMu.Unlock()
defer s.handlerLock.Unlock() s.handlerMu.Lock()
s.chainSvr = chainSvr s.chainSvr = chainSvr
if s.wallet != nil { if s.wallet != nil {
// If the wallet had already been set, there's no reason to keep
// the mutex around. Make the locker simply execute noops
// instead.
s.handlerLock = noopLocker{}
// With both the chain server and wallet set, all handlers are // With both the chain server and wallet set, all handlers are
// ok to run. // ok to run.
s.handlerLookup = lookupAnyHandler s.handlerLookup = lookupAnyHandler
@ -576,8 +560,8 @@ func (s *rpcServer) SetChainServer(chainSvr *chain.Client) {
// method. Each of these must be checked beforehand (the method is already // method. Each of these must be checked beforehand (the method is already
// known) and handled accordingly. // known) and handled accordingly.
func (s *rpcServer) HandlerClosure(method string) requestHandlerClosure { func (s *rpcServer) HandlerClosure(method string) requestHandlerClosure {
s.handlerLock.Lock() defer s.handlerMu.Unlock()
defer s.handlerLock.Unlock() s.handlerMu.Lock()
// With the lock held, make copies of these pointers for the closure. // With the lock held, make copies of these pointers for the closure.
wallet := s.wallet wallet := s.wallet

View file

@ -56,11 +56,6 @@ var (
wtxmgrNamespaceKey = []byte("wtxmgr") wtxmgrNamespaceKey = []byte("wtxmgr")
) )
type noopLocker struct{}
func (noopLocker) Lock() {}
func (noopLocker) Unlock() {}
// Wallet is a structure containing all the components for a // Wallet is a structure containing all the components for a
// complete wallet. It contains the Armory-style key store // complete wallet. It contains the Armory-style key store
// addresses and keys), // addresses and keys),
@ -71,7 +66,7 @@ type Wallet struct {
TxStore *wtxmgr.Store TxStore *wtxmgr.Store
chainSvr *chain.Client chainSvr *chain.Client
chainSvrLock sync.Locker chainSvrLock sync.Mutex
chainSvrSynced bool chainSvrSynced bool
chainSvrSyncMtx sync.Mutex chainSvrSyncMtx sync.Mutex
@ -107,7 +102,7 @@ type Wallet struct {
lockStateChanges chan bool // true when locked lockStateChanges chan bool // true when locked
confirmedBalance chan btcutil.Amount confirmedBalance chan btcutil.Amount
unconfirmedBalance chan btcutil.Amount unconfirmedBalance chan btcutil.Amount
notificationLock sync.Locker notificationMu sync.Mutex
chainParams *chaincfg.Params chainParams *chaincfg.Params
wg sync.WaitGroup wg sync.WaitGroup
@ -119,36 +114,19 @@ type Wallet struct {
// multiple places, they must broadcast it themself. // multiple places, they must broadcast it themself.
var ErrDuplicateListen = errors.New("duplicate listen") var ErrDuplicateListen = errors.New("duplicate listen")
func (w *Wallet) updateNotificationLock() {
switch {
case w.connectedBlocks == nil:
fallthrough
case w.disconnectedBlocks == nil:
fallthrough
case w.lockStateChanges == nil:
fallthrough
case w.confirmedBalance == nil:
fallthrough
case w.unconfirmedBalance == nil:
return
}
w.notificationLock = noopLocker{}
}
// ListenConnectedBlocks returns a channel that passes all blocks that a wallet // ListenConnectedBlocks returns a channel that passes all blocks that a wallet
// has been marked in sync with. The channel must be read, or other wallet // has been marked in sync with. The channel must be read, or other wallet
// methods will block. // methods will block.
// //
// If this is called twice, ErrDuplicateListen is returned. // If this is called twice, ErrDuplicateListen is returned.
func (w *Wallet) ListenConnectedBlocks() (<-chan waddrmgr.BlockStamp, error) { func (w *Wallet) ListenConnectedBlocks() (<-chan waddrmgr.BlockStamp, error) {
w.notificationLock.Lock() defer w.notificationMu.Unlock()
defer w.notificationLock.Unlock() w.notificationMu.Lock()
if w.connectedBlocks != nil { if w.connectedBlocks != nil {
return nil, ErrDuplicateListen return nil, ErrDuplicateListen
} }
w.connectedBlocks = make(chan waddrmgr.BlockStamp) w.connectedBlocks = make(chan waddrmgr.BlockStamp)
w.updateNotificationLock()
return w.connectedBlocks, nil return w.connectedBlocks, nil
} }
@ -158,14 +136,13 @@ func (w *Wallet) ListenConnectedBlocks() (<-chan waddrmgr.BlockStamp, error) {
// //
// If this is called twice, ErrDuplicateListen is returned. // If this is called twice, ErrDuplicateListen is returned.
func (w *Wallet) ListenDisconnectedBlocks() (<-chan waddrmgr.BlockStamp, error) { func (w *Wallet) ListenDisconnectedBlocks() (<-chan waddrmgr.BlockStamp, error) {
w.notificationLock.Lock() defer w.notificationMu.Unlock()
defer w.notificationLock.Unlock() w.notificationMu.Lock()
if w.disconnectedBlocks != nil { if w.disconnectedBlocks != nil {
return nil, ErrDuplicateListen return nil, ErrDuplicateListen
} }
w.disconnectedBlocks = make(chan waddrmgr.BlockStamp) w.disconnectedBlocks = make(chan waddrmgr.BlockStamp)
w.updateNotificationLock()
return w.disconnectedBlocks, nil return w.disconnectedBlocks, nil
} }
@ -176,14 +153,13 @@ func (w *Wallet) ListenDisconnectedBlocks() (<-chan waddrmgr.BlockStamp, error)
// //
// If this is called twice, ErrDuplicateListen is returned. // If this is called twice, ErrDuplicateListen is returned.
func (w *Wallet) ListenLockStatus() (<-chan bool, error) { func (w *Wallet) ListenLockStatus() (<-chan bool, error) {
w.notificationLock.Lock() defer w.notificationMu.Unlock()
defer w.notificationLock.Unlock() w.notificationMu.Lock()
if w.lockStateChanges != nil { if w.lockStateChanges != nil {
return nil, ErrDuplicateListen return nil, ErrDuplicateListen
} }
w.lockStateChanges = make(chan bool) w.lockStateChanges = make(chan bool)
w.updateNotificationLock()
return w.lockStateChanges, nil return w.lockStateChanges, nil
} }
@ -193,14 +169,13 @@ func (w *Wallet) ListenLockStatus() (<-chan bool, error) {
// //
// If this is called twice, ErrDuplicateListen is returned. // If this is called twice, ErrDuplicateListen is returned.
func (w *Wallet) ListenConfirmedBalance() (<-chan btcutil.Amount, error) { func (w *Wallet) ListenConfirmedBalance() (<-chan btcutil.Amount, error) {
w.notificationLock.Lock() defer w.notificationMu.Unlock()
defer w.notificationLock.Unlock() w.notificationMu.Lock()
if w.confirmedBalance != nil { if w.confirmedBalance != nil {
return nil, ErrDuplicateListen return nil, ErrDuplicateListen
} }
w.confirmedBalance = make(chan btcutil.Amount) w.confirmedBalance = make(chan btcutil.Amount)
w.updateNotificationLock()
return w.confirmedBalance, nil return w.confirmedBalance, nil
} }
@ -210,14 +185,13 @@ func (w *Wallet) ListenConfirmedBalance() (<-chan btcutil.Amount, error) {
// //
// If this is called twice, ErrDuplicateListen is returned. // If this is called twice, ErrDuplicateListen is returned.
func (w *Wallet) ListenUnconfirmedBalance() (<-chan btcutil.Amount, error) { func (w *Wallet) ListenUnconfirmedBalance() (<-chan btcutil.Amount, error) {
w.notificationLock.Lock() defer w.notificationMu.Unlock()
defer w.notificationLock.Unlock() w.notificationMu.Lock()
if w.unconfirmedBalance != nil { if w.unconfirmedBalance != nil {
return nil, ErrDuplicateListen return nil, ErrDuplicateListen
} }
w.unconfirmedBalance = make(chan btcutil.Amount) w.unconfirmedBalance = make(chan btcutil.Amount)
w.updateNotificationLock()
return w.unconfirmedBalance, nil return w.unconfirmedBalance, nil
} }
@ -227,63 +201,62 @@ func (w *Wallet) ListenUnconfirmedBalance() (<-chan btcutil.Amount, error) {
// //
// If this is called twice, ErrDuplicateListen is returned. // If this is called twice, ErrDuplicateListen is returned.
func (w *Wallet) ListenRelevantTxs() (<-chan chain.RelevantTx, error) { func (w *Wallet) ListenRelevantTxs() (<-chan chain.RelevantTx, error) {
defer w.notificationLock.Unlock() defer w.notificationMu.Unlock()
w.notificationLock.Lock() w.notificationMu.Lock()
if w.relevantTxs != nil { if w.relevantTxs != nil {
return nil, ErrDuplicateListen return nil, ErrDuplicateListen
} }
w.relevantTxs = make(chan chain.RelevantTx) w.relevantTxs = make(chan chain.RelevantTx)
w.updateNotificationLock()
return w.relevantTxs, nil return w.relevantTxs, nil
} }
func (w *Wallet) notifyConnectedBlock(block waddrmgr.BlockStamp) { func (w *Wallet) notifyConnectedBlock(block waddrmgr.BlockStamp) {
w.notificationLock.Lock() w.notificationMu.Lock()
if w.connectedBlocks != nil { if w.connectedBlocks != nil {
w.connectedBlocks <- block w.connectedBlocks <- block
} }
w.notificationLock.Unlock() w.notificationMu.Unlock()
} }
func (w *Wallet) notifyDisconnectedBlock(block waddrmgr.BlockStamp) { func (w *Wallet) notifyDisconnectedBlock(block waddrmgr.BlockStamp) {
w.notificationLock.Lock() w.notificationMu.Lock()
if w.disconnectedBlocks != nil { if w.disconnectedBlocks != nil {
w.disconnectedBlocks <- block w.disconnectedBlocks <- block
} }
w.notificationLock.Unlock() w.notificationMu.Unlock()
} }
func (w *Wallet) notifyLockStateChange(locked bool) { func (w *Wallet) notifyLockStateChange(locked bool) {
w.notificationLock.Lock() w.notificationMu.Lock()
if w.lockStateChanges != nil { if w.lockStateChanges != nil {
w.lockStateChanges <- locked w.lockStateChanges <- locked
} }
w.notificationLock.Unlock() w.notificationMu.Unlock()
} }
func (w *Wallet) notifyConfirmedBalance(bal btcutil.Amount) { func (w *Wallet) notifyConfirmedBalance(bal btcutil.Amount) {
w.notificationLock.Lock() w.notificationMu.Lock()
if w.confirmedBalance != nil { if w.confirmedBalance != nil {
w.confirmedBalance <- bal w.confirmedBalance <- bal
} }
w.notificationLock.Unlock() w.notificationMu.Unlock()
} }
func (w *Wallet) notifyUnconfirmedBalance(bal btcutil.Amount) { func (w *Wallet) notifyUnconfirmedBalance(bal btcutil.Amount) {
w.notificationLock.Lock() w.notificationMu.Lock()
if w.unconfirmedBalance != nil { if w.unconfirmedBalance != nil {
w.unconfirmedBalance <- bal w.unconfirmedBalance <- bal
} }
w.notificationLock.Unlock() w.notificationMu.Unlock()
} }
func (w *Wallet) notifyRelevantTx(relevantTx chain.RelevantTx) { func (w *Wallet) notifyRelevantTx(relevantTx chain.RelevantTx) {
w.notificationLock.Lock() w.notificationMu.Lock()
if w.relevantTxs != nil { if w.relevantTxs != nil {
w.relevantTxs <- relevantTx w.relevantTxs <- relevantTx
} }
w.notificationLock.Unlock() w.notificationMu.Unlock()
} }
// Start starts the goroutines necessary to manage a wallet. // Start starts the goroutines necessary to manage a wallet.
@ -294,11 +267,9 @@ func (w *Wallet) Start(chainServer *chain.Client) {
default: default:
} }
w.chainSvrLock.Lock()
defer w.chainSvrLock.Unlock() defer w.chainSvrLock.Unlock()
w.chainSvrLock.Lock()
w.chainSvr = chainServer w.chainSvr = chainServer
w.chainSvrLock = noopLocker{}
w.wg.Add(6) w.wg.Add(6)
go w.handleChainNotifications() go w.handleChainNotifications()
@ -1655,7 +1626,6 @@ func Open(pubPass []byte, params *chaincfg.Params, db walletdb.DB, waddrmgrNS, w
db: db, db: db,
Manager: addrMgr, Manager: addrMgr,
TxStore: txMgr, TxStore: txMgr,
chainSvrLock: new(sync.Mutex),
lockedOutpoints: map[wire.OutPoint]struct{}{}, lockedOutpoints: map[wire.OutPoint]struct{}{},
FeeIncrement: defaultFeeIncrement, FeeIncrement: defaultFeeIncrement,
rescanAddJob: make(chan *RescanJob), rescanAddJob: make(chan *RescanJob),
@ -1669,7 +1639,6 @@ func Open(pubPass []byte, params *chaincfg.Params, db walletdb.DB, waddrmgrNS, w
holdUnlockRequests: make(chan chan HeldUnlock), holdUnlockRequests: make(chan chan HeldUnlock),
lockState: make(chan bool), lockState: make(chan bool),
changePassphrase: make(chan changePassphraseRequest), changePassphrase: make(chan changePassphraseRequest),
notificationLock: new(sync.Mutex),
chainParams: params, chainParams: params,
quit: make(chan struct{}), quit: make(chan struct{}),
} }