Convert the rest of the subsystems to use atomics for shutdown vars.

This commit is contained in:
Owain G. Ainsworth 2013-10-03 00:33:42 +01:00
parent f333cb4220
commit 8974e789f7
5 changed files with 34 additions and 41 deletions

View file

@ -20,6 +20,7 @@ import (
"path/filepath" "path/filepath"
"strconv" "strconv"
"sync" "sync"
"sync/atomic"
"time" "time"
) )
@ -325,8 +326,8 @@ type AddrManager struct {
addrIndex map[string]*knownAddress // address key to ka for all addrs. addrIndex map[string]*knownAddress // address key to ka for all addrs.
addrNew [newBucketCount]map[string]*knownAddress addrNew [newBucketCount]map[string]*knownAddress
addrTried [triedBucketCount]*list.List addrTried [triedBucketCount]*list.List
started bool started int32
shutdown bool shutdown int32
wg sync.WaitGroup wg sync.WaitGroup
quit chan bool quit chan bool
nTried int nTried int
@ -380,19 +381,17 @@ func (a *AddrManager) getTriedBucket(netAddr *btcwire.NetAddress) int {
func (a *AddrManager) addressHandler() { func (a *AddrManager) addressHandler() {
dumpAddressTicker := time.NewTicker(dumpAddressInterval) dumpAddressTicker := time.NewTicker(dumpAddressInterval)
out: out:
for !a.shutdown { for {
select { select {
case <-dumpAddressTicker.C: case <-dumpAddressTicker.C:
if !a.shutdown {
a.savePeers() a.savePeers()
}
case <-a.quit: case <-a.quit:
a.savePeers()
break out break out
} }
} }
dumpAddressTicker.Stop() dumpAddressTicker.Stop()
a.savePeers()
a.wg.Done() a.wg.Done()
log.Trace("[AMGR] Address handler done") log.Trace("[AMGR] Address handler done")
} }
@ -607,32 +606,31 @@ func deserialiseNetAddress(addr string) (*btcwire.NetAddress, error) {
// addresses, timeouts, and interval based writes. // addresses, timeouts, and interval based writes.
func (a *AddrManager) Start() { func (a *AddrManager) Start() {
// Already started? // Already started?
if a.started { if atomic.AddInt32(&a.started, 1) != 1 {
return return
} }
log.Trace("[AMGR] Starting address manager") log.Trace("[AMGR] Starting address manager")
a.wg.Add(1) a.wg.Add(1)
go a.addressHandler()
a.started = true
// Load peers we already know about from file. // Load peers we already know about from file.
a.loadPeers() a.loadPeers()
// Start the address ticker to save addresses periodically.
go a.addressHandler()
} }
// Stop gracefully shuts down the address manager by stopping the main handler. // Stop gracefully shuts down the address manager by stopping the main handler.
func (a *AddrManager) Stop() error { func (a *AddrManager) Stop() error {
if a.shutdown { if atomic.AddInt32(&a.shutdown, 1) != 1 {
log.Warnf("[AMGR] Address manager is already in the process of " + log.Warnf("[AMGR] Address manager is already in the process of " +
"shutting down") "shutting down")
return nil return nil
} }
log.Infof("[AMGR] Address manager shutting down") log.Infof("[AMGR] Address manager shutting down")
a.savePeers() close(a.quit)
a.shutdown = true
a.quit <- true
a.wg.Wait() a.wg.Wait()
return nil return nil
} }

View file

@ -13,6 +13,7 @@ import (
"os" "os"
"path/filepath" "path/filepath"
"sync" "sync"
"sync/atomic"
"time" "time"
) )
@ -60,8 +61,8 @@ type txMsg struct {
// incoming blocks. // incoming blocks.
type blockManager struct { type blockManager struct {
server *server server *server
started bool started int32
shutdown bool shutdown int32
blockChain *btcchain.BlockChain blockChain *btcchain.BlockChain
blockPeer map[btcwire.ShaHash]*peer blockPeer map[btcwire.ShaHash]*peer
requestedBlocks map[btcwire.ShaHash]bool requestedBlocks map[btcwire.ShaHash]bool
@ -132,7 +133,7 @@ func (b *blockManager) startSync(peers *list.List) {
// also starts syncing if needed. It is invoked from the syncHandler goroutine. // also starts syncing if needed. It is invoked from the syncHandler goroutine.
func (b *blockManager) handleNewPeerMsg(peers *list.List, p *peer) { func (b *blockManager) handleNewPeerMsg(peers *list.List, p *peer) {
// Ignore if in the process of shutting down. // Ignore if in the process of shutting down.
if b.shutdown { if atomic.LoadInt32(&b.shutdown) != 0 {
return return
} }
@ -385,7 +386,7 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) {
func (b *blockManager) blockHandler() { func (b *blockManager) blockHandler() {
candidatePeers := list.New() candidatePeers := list.New()
out: out:
for !b.shutdown { for {
select { select {
case m := <-b.msgChan: case m := <-b.msgChan:
switch msg := m.(type) { switch msg := m.(type) {
@ -486,7 +487,7 @@ func (b *blockManager) chainNotificationHandler() {
// when it sends notifications while retaining order. // when it sends notifications while retaining order.
pending := list.New() pending := list.New()
out: out:
for !b.shutdown { for {
// Sending on a nil channel always blocks and hence is ignored // Sending on a nil channel always blocks and hence is ignored
// by select. Thus enable send only when the list is non-empty. // by select. Thus enable send only when the list is non-empty.
var firstItem *btcchain.Notification var firstItem *btcchain.Notification
@ -514,7 +515,7 @@ out:
// NewPeer informs the blockmanager of a newly active peer. // NewPeer informs the blockmanager of a newly active peer.
func (b *blockManager) NewPeer(p *peer) { func (b *blockManager) NewPeer(p *peer) {
// Ignore if we are shutting down. // Ignore if we are shutting down.
if b.shutdown { if atomic.LoadInt32(&b.shutdown) != 0 {
return return
} }
b.msgChan <- &newPeerMsg{peer: p} b.msgChan <- &newPeerMsg{peer: p}
@ -523,7 +524,7 @@ func (b *blockManager) NewPeer(p *peer) {
// QueueBlock adds the passed block message and peer to the block handling queue. // QueueBlock adds the passed block message and peer to the block handling queue.
func (b *blockManager) QueueBlock(block *btcutil.Block, p *peer) { func (b *blockManager) QueueBlock(block *btcutil.Block, p *peer) {
// Don't accept more blocks if we're shutting down. // Don't accept more blocks if we're shutting down.
if b.shutdown { if atomic.LoadInt32(&b.shutdown) != 0 {
p.blockProcessed <- false p.blockProcessed <- false
return return
} }
@ -536,7 +537,7 @@ func (b *blockManager) QueueBlock(block *btcutil.Block, p *peer) {
func (b *blockManager) QueueInv(inv *btcwire.MsgInv, p *peer) { func (b *blockManager) QueueInv(inv *btcwire.MsgInv, p *peer) {
// No channel handling here because peers do not need to block on inv // No channel handling here because peers do not need to block on inv
// messages. // messages.
if b.shutdown { if atomic.LoadInt32(&b.shutdown) != 0 {
return return
} }
@ -547,7 +548,7 @@ func (b *blockManager) QueueInv(inv *btcwire.MsgInv, p *peer) {
// DonePeer informs the blockmanager that a peer has disconnected. // DonePeer informs the blockmanager that a peer has disconnected.
func (b *blockManager) DonePeer(p *peer) { func (b *blockManager) DonePeer(p *peer) {
// Ignore if we are shutting down. // Ignore if we are shutting down.
if b.shutdown { if atomic.LoadInt32(&b.shutdown) != 0 {
return return
} }
b.msgChan <- &donePeerMsg{peer: p} b.msgChan <- &donePeerMsg{peer: p}
@ -556,7 +557,7 @@ func (b *blockManager) DonePeer(p *peer) {
// Start begins the core block handler which processes block and inv messages. // Start begins the core block handler which processes block and inv messages.
func (b *blockManager) Start() { func (b *blockManager) Start() {
// Already started? // Already started?
if b.started { if atomic.AddInt32(&b.started, 1) != 1 {
return return
} }
@ -565,20 +566,18 @@ func (b *blockManager) Start() {
go b.blockHandler() go b.blockHandler()
go b.chainNotificationSinkHandler() go b.chainNotificationSinkHandler()
go b.chainNotificationHandler() go b.chainNotificationHandler()
b.started = true
} }
// Stop gracefully shuts down the block manager by stopping all asynchronous // Stop gracefully shuts down the block manager by stopping all asynchronous
// handlers and waiting for them to finish. // handlers and waiting for them to finish.
func (b *blockManager) Stop() error { func (b *blockManager) Stop() error {
if b.shutdown { if atomic.AddInt32(&b.shutdown, 1) != 1 {
log.Warnf("[BMGR] Block manager is already in the process of " + log.Warnf("[BMGR] Block manager is already in the process of " +
"shutting down") "shutting down")
return nil return nil
} }
log.Infof("[BMGR] Block manager shutting down") log.Infof("[BMGR] Block manager shutting down")
b.shutdown = true
close(b.quit) close(b.quit)
b.wg.Wait() b.wg.Wait()
return nil return nil

View file

@ -96,7 +96,7 @@ type peer struct {
protocolVersion uint32 protocolVersion uint32
btcnet btcwire.BitcoinNet btcnet btcwire.BitcoinNet
services btcwire.ServiceFlag services btcwire.ServiceFlag
started bool started int32
conn net.Conn conn net.Conn
addr string addr string
na *btcwire.NetAddress na *btcwire.NetAddress
@ -993,7 +993,7 @@ func (p *peer) QueueInventory(invVect *btcwire.InvVect) {
// version message for outbound connections to start the negotiation process. // version message for outbound connections to start the negotiation process.
func (p *peer) Start() error { func (p *peer) Start() error {
// Already started? // Already started?
if p.started { if atomic.AddInt32(&p.started, 1) != 1 {
return nil return nil
} }
@ -1013,7 +1013,6 @@ func (p *peer) Start() error {
// Start processing input and output. // Start processing input and output.
go p.inHandler() go p.inHandler()
go p.outHandler() go p.outHandler()
p.started = true
return nil return nil
} }

View file

@ -19,13 +19,14 @@ import (
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
"sync/atomic"
) )
// rpcServer holds the items the rpc server may need to access (config, // rpcServer holds the items the rpc server may need to access (config,
// shutdown, main server, etc.) // shutdown, main server, etc.)
type rpcServer struct { type rpcServer struct {
started bool started int32
shutdown bool shutdown int32
server *server server *server
wg sync.WaitGroup wg sync.WaitGroup
rpcport string rpcport string
@ -36,7 +37,7 @@ type rpcServer struct {
// Start is used by server.go to start the rpc listener. // Start is used by server.go to start the rpc listener.
func (s *rpcServer) Start() { func (s *rpcServer) Start() {
if s.started { if atomic.AddInt32(&s.started, 1) != 1 {
return return
} }
@ -61,12 +62,11 @@ func (s *rpcServer) Start() {
s.wg.Done() s.wg.Done()
}(listener) }(listener)
} }
s.started = true
} }
// Stop is used by server.go to stop the rpc listener. // Stop is used by server.go to stop the rpc listener.
func (s *rpcServer) Stop() error { func (s *rpcServer) Stop() error {
if s.shutdown { if atomic.AddInt32(&s.shutdown, 1) != 1 {
log.Infof("[RPCS] RPC server is already in the process of shutting down") log.Infof("[RPCS] RPC server is already in the process of shutting down")
return nil return nil
} }
@ -80,7 +80,6 @@ func (s *rpcServer) Stop() error {
} }
log.Infof("[RPCS] RPC server shutdown complete") log.Infof("[RPCS] RPC server shutdown complete")
s.wg.Wait() s.wg.Wait()
s.shutdown = true
return nil return nil
} }
@ -127,7 +126,7 @@ func jsonAuthFail(w http.ResponseWriter, r *http.Request, s *rpcServer) {
func jsonRPCRead(w http.ResponseWriter, r *http.Request, s *rpcServer) { func jsonRPCRead(w http.ResponseWriter, r *http.Request, s *rpcServer) {
_ = spew.Dump _ = spew.Dump
r.Close = true r.Close = true
if s.shutdown == true { if atomic.LoadInt32(&s.shutdown) != 0 {
return return
} }
var rawReply btcjson.Reply var rawReply btcjson.Reply

View file

@ -48,7 +48,7 @@ type server struct {
nonce uint64 nonce uint64
listeners []net.Listener listeners []net.Listener
btcnet btcwire.BitcoinNet btcnet btcwire.BitcoinNet
started bool started int32 // atomic
shutdown int32 // atomic shutdown int32 // atomic
shutdownSched int32 // atomic shutdownSched int32 // atomic
addrManager *AddrManager addrManager *AddrManager
@ -458,7 +458,7 @@ func (s *server) BroadcastMessage(msg btcwire.Message, exclPeers ...*peer) {
// Start begins accepting connections from peers. // Start begins accepting connections from peers.
func (s *server) Start() { func (s *server) Start() {
// Already started? // Already started?
if s.started { if atomic.AddInt32(&s.started, 1) != 1 {
return return
} }
@ -480,8 +480,6 @@ func (s *server) Start() {
if !cfg.DisableRPC { if !cfg.DisableRPC {
s.rpcServer.Start() s.rpcServer.Start()
} }
s.started = true
} }
// Stop gracefully shuts down the server by stopping and disconnecting all // Stop gracefully shuts down the server by stopping and disconnecting all