From 8974e789f7b0599cc17e5e90bfff11e41f446e3d Mon Sep 17 00:00:00 2001 From: "Owain G. Ainsworth" Date: Thu, 3 Oct 2013 00:33:42 +0100 Subject: [PATCH] Convert the rest of the subsystems to use atomics for shutdown vars. --- addrmanager.go | 26 ++++++++++++-------------- blockmanager.go | 25 ++++++++++++------------- peer.go | 5 ++--- rpcserver.go | 13 ++++++------- server.go | 6 ++---- 5 files changed, 34 insertions(+), 41 deletions(-) diff --git a/addrmanager.go b/addrmanager.go index cabe0e2f..35b86514 100644 --- a/addrmanager.go +++ b/addrmanager.go @@ -20,6 +20,7 @@ import ( "path/filepath" "strconv" "sync" + "sync/atomic" "time" ) @@ -325,8 +326,8 @@ type AddrManager struct { addrIndex map[string]*knownAddress // address key to ka for all addrs. addrNew [newBucketCount]map[string]*knownAddress addrTried [triedBucketCount]*list.List - started bool - shutdown bool + started int32 + shutdown int32 wg sync.WaitGroup quit chan bool nTried int @@ -380,19 +381,17 @@ func (a *AddrManager) getTriedBucket(netAddr *btcwire.NetAddress) int { func (a *AddrManager) addressHandler() { dumpAddressTicker := time.NewTicker(dumpAddressInterval) out: - for !a.shutdown { + for { select { case <-dumpAddressTicker.C: - if !a.shutdown { - a.savePeers() - } + a.savePeers() case <-a.quit: - a.savePeers() break out } } dumpAddressTicker.Stop() + a.savePeers() a.wg.Done() log.Trace("[AMGR] Address handler done") } @@ -607,32 +606,31 @@ func deserialiseNetAddress(addr string) (*btcwire.NetAddress, error) { // addresses, timeouts, and interval based writes. func (a *AddrManager) Start() { // Already started? - if a.started { + if atomic.AddInt32(&a.started, 1) != 1 { return } log.Trace("[AMGR] Starting address manager") a.wg.Add(1) - go a.addressHandler() - a.started = true // Load peers we already know about from file. a.loadPeers() + + // Start the address ticker to save addresses periodically. + go a.addressHandler() } // Stop gracefully shuts down the address manager by stopping the main handler. func (a *AddrManager) Stop() error { - if a.shutdown { + if atomic.AddInt32(&a.shutdown, 1) != 1 { log.Warnf("[AMGR] Address manager is already in the process of " + "shutting down") return nil } log.Infof("[AMGR] Address manager shutting down") - a.savePeers() - a.shutdown = true - a.quit <- true + close(a.quit) a.wg.Wait() return nil } diff --git a/blockmanager.go b/blockmanager.go index 3e049e82..76d8fce5 100644 --- a/blockmanager.go +++ b/blockmanager.go @@ -13,6 +13,7 @@ import ( "os" "path/filepath" "sync" + "sync/atomic" "time" ) @@ -60,8 +61,8 @@ type txMsg struct { // incoming blocks. type blockManager struct { server *server - started bool - shutdown bool + started int32 + shutdown int32 blockChain *btcchain.BlockChain blockPeer map[btcwire.ShaHash]*peer requestedBlocks map[btcwire.ShaHash]bool @@ -132,7 +133,7 @@ func (b *blockManager) startSync(peers *list.List) { // also starts syncing if needed. It is invoked from the syncHandler goroutine. func (b *blockManager) handleNewPeerMsg(peers *list.List, p *peer) { // Ignore if in the process of shutting down. - if b.shutdown { + if atomic.LoadInt32(&b.shutdown) != 0 { return } @@ -385,7 +386,7 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) { func (b *blockManager) blockHandler() { candidatePeers := list.New() out: - for !b.shutdown { + for { select { case m := <-b.msgChan: switch msg := m.(type) { @@ -486,7 +487,7 @@ func (b *blockManager) chainNotificationHandler() { // when it sends notifications while retaining order. pending := list.New() out: - for !b.shutdown { + for { // Sending on a nil channel always blocks and hence is ignored // by select. Thus enable send only when the list is non-empty. var firstItem *btcchain.Notification @@ -514,7 +515,7 @@ out: // NewPeer informs the blockmanager of a newly active peer. func (b *blockManager) NewPeer(p *peer) { // Ignore if we are shutting down. - if b.shutdown { + if atomic.LoadInt32(&b.shutdown) != 0 { return } b.msgChan <- &newPeerMsg{peer: p} @@ -523,7 +524,7 @@ func (b *blockManager) NewPeer(p *peer) { // QueueBlock adds the passed block message and peer to the block handling queue. func (b *blockManager) QueueBlock(block *btcutil.Block, p *peer) { // Don't accept more blocks if we're shutting down. - if b.shutdown { + if atomic.LoadInt32(&b.shutdown) != 0 { p.blockProcessed <- false return } @@ -536,7 +537,7 @@ func (b *blockManager) QueueBlock(block *btcutil.Block, p *peer) { func (b *blockManager) QueueInv(inv *btcwire.MsgInv, p *peer) { // No channel handling here because peers do not need to block on inv // messages. - if b.shutdown { + if atomic.LoadInt32(&b.shutdown) != 0 { return } @@ -547,7 +548,7 @@ func (b *blockManager) QueueInv(inv *btcwire.MsgInv, p *peer) { // DonePeer informs the blockmanager that a peer has disconnected. func (b *blockManager) DonePeer(p *peer) { // Ignore if we are shutting down. - if b.shutdown { + if atomic.LoadInt32(&b.shutdown) != 0 { return } b.msgChan <- &donePeerMsg{peer: p} @@ -556,7 +557,7 @@ func (b *blockManager) DonePeer(p *peer) { // Start begins the core block handler which processes block and inv messages. func (b *blockManager) Start() { // Already started? - if b.started { + if atomic.AddInt32(&b.started, 1) != 1 { return } @@ -565,20 +566,18 @@ func (b *blockManager) Start() { go b.blockHandler() go b.chainNotificationSinkHandler() go b.chainNotificationHandler() - b.started = true } // Stop gracefully shuts down the block manager by stopping all asynchronous // handlers and waiting for them to finish. func (b *blockManager) Stop() error { - if b.shutdown { + if atomic.AddInt32(&b.shutdown, 1) != 1 { log.Warnf("[BMGR] Block manager is already in the process of " + "shutting down") return nil } log.Infof("[BMGR] Block manager shutting down") - b.shutdown = true close(b.quit) b.wg.Wait() return nil diff --git a/peer.go b/peer.go index 523fd7b2..d3077e43 100644 --- a/peer.go +++ b/peer.go @@ -96,7 +96,7 @@ type peer struct { protocolVersion uint32 btcnet btcwire.BitcoinNet services btcwire.ServiceFlag - started bool + started int32 conn net.Conn addr string na *btcwire.NetAddress @@ -993,7 +993,7 @@ func (p *peer) QueueInventory(invVect *btcwire.InvVect) { // version message for outbound connections to start the negotiation process. func (p *peer) Start() error { // Already started? - if p.started { + if atomic.AddInt32(&p.started, 1) != 1 { return nil } @@ -1013,7 +1013,6 @@ func (p *peer) Start() error { // Start processing input and output. go p.inHandler() go p.outHandler() - p.started = true return nil } diff --git a/rpcserver.go b/rpcserver.go index f89ad80f..ea25eb86 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -19,13 +19,14 @@ import ( "strconv" "strings" "sync" + "sync/atomic" ) // rpcServer holds the items the rpc server may need to access (config, // shutdown, main server, etc.) type rpcServer struct { - started bool - shutdown bool + started int32 + shutdown int32 server *server wg sync.WaitGroup rpcport string @@ -36,7 +37,7 @@ type rpcServer struct { // Start is used by server.go to start the rpc listener. func (s *rpcServer) Start() { - if s.started { + if atomic.AddInt32(&s.started, 1) != 1 { return } @@ -61,12 +62,11 @@ func (s *rpcServer) Start() { s.wg.Done() }(listener) } - s.started = true } // Stop is used by server.go to stop the rpc listener. func (s *rpcServer) Stop() error { - if s.shutdown { + if atomic.AddInt32(&s.shutdown, 1) != 1 { log.Infof("[RPCS] RPC server is already in the process of shutting down") return nil } @@ -80,7 +80,6 @@ func (s *rpcServer) Stop() error { } log.Infof("[RPCS] RPC server shutdown complete") s.wg.Wait() - s.shutdown = true return nil } @@ -127,7 +126,7 @@ func jsonAuthFail(w http.ResponseWriter, r *http.Request, s *rpcServer) { func jsonRPCRead(w http.ResponseWriter, r *http.Request, s *rpcServer) { _ = spew.Dump r.Close = true - if s.shutdown == true { + if atomic.LoadInt32(&s.shutdown) != 0 { return } var rawReply btcjson.Reply diff --git a/server.go b/server.go index fbcebf4b..1f895230 100644 --- a/server.go +++ b/server.go @@ -48,7 +48,7 @@ type server struct { nonce uint64 listeners []net.Listener btcnet btcwire.BitcoinNet - started bool + started int32 // atomic shutdown int32 // atomic shutdownSched int32 // atomic addrManager *AddrManager @@ -458,7 +458,7 @@ func (s *server) BroadcastMessage(msg btcwire.Message, exclPeers ...*peer) { // Start begins accepting connections from peers. func (s *server) Start() { // Already started? - if s.started { + if atomic.AddInt32(&s.started, 1) != 1 { return } @@ -480,8 +480,6 @@ func (s *server) Start() { if !cfg.DisableRPC { s.rpcServer.Start() } - - s.started = true } // Stop gracefully shuts down the server by stopping and disconnecting all