Rework server startup and shutdown sequence.
The commit reworks the server statup and shutdown sequence to ensure the server can always shutdown cleanly. The peer code depends on being able to send messages to the address and block managers, so they need to have their lifecycle tied to the peer handler to prevent issues with asynchronous shutdown order.
This commit is contained in:
parent
f80bc8c8f9
commit
a3531957f4
1 changed files with 30 additions and 9 deletions
39
server.go
39
server.go
|
@ -184,6 +184,14 @@ func (s *server) listenHandler(listener net.Listener) {
|
|||
// peers to and from the server, banning peers, and broadcasting messages to
|
||||
// peers. It must be run a a goroutine.
|
||||
func (s *server) peerHandler() {
|
||||
// Start the address manager and block manager, both of which are needed
|
||||
// by peers. This is done here since their lifecycle is closely tied
|
||||
// to this handler and rather than adding more channels to sychronize
|
||||
// things, it's easier and slightly faster to simply start and stop them
|
||||
// in this handler.
|
||||
s.addrManager.Start()
|
||||
s.blockManager.Start()
|
||||
|
||||
log.Tracef("[SRVR] Starting peer handler")
|
||||
peers := list.New()
|
||||
bannedPeers := make(map[string]time.Time)
|
||||
|
@ -191,7 +199,6 @@ func (s *server) peerHandler() {
|
|||
// Live while we're not shutting down or there are still connected peers.
|
||||
for !s.shutdown || peers.Len() != 0 {
|
||||
select {
|
||||
|
||||
// New peers connected to the server.
|
||||
case p := <-s.newPeers:
|
||||
s.handleAddPeerMsg(peers, bannedPeers, p)
|
||||
|
@ -218,6 +225,9 @@ func (s *server) peerHandler() {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
s.blockManager.Stop()
|
||||
s.addrManager.Stop()
|
||||
s.wg.Done()
|
||||
log.Tracef("[SRVR] Peer handler done")
|
||||
}
|
||||
|
@ -295,14 +305,20 @@ func (s *server) Start() {
|
|||
}
|
||||
|
||||
log.Trace("[SRVR] Starting server")
|
||||
|
||||
// Start all the listeners. There will not be any if listening is
|
||||
// disabled.
|
||||
for _, listener := range s.listeners {
|
||||
go s.listenHandler(listener)
|
||||
s.wg.Add(1)
|
||||
}
|
||||
|
||||
// Start the peer handler which in turn starts the address and block
|
||||
// managers.
|
||||
go s.peerHandler()
|
||||
s.wg.Add(1)
|
||||
s.addrManager.Start()
|
||||
s.blockManager.Start()
|
||||
|
||||
// Start the RPC server if it's not disabled.
|
||||
if !cfg.DisableRpc {
|
||||
s.rpcServer.Start()
|
||||
}
|
||||
|
@ -319,19 +335,24 @@ func (s *server) Stop() error {
|
|||
}
|
||||
|
||||
log.Warnf("[SRVR] Server shutting down")
|
||||
|
||||
// Set the shutdown flag and stop all the listeners. There will not be
|
||||
// any listeners if listening is disabled.
|
||||
s.shutdown = true
|
||||
close(s.quit)
|
||||
if !cfg.DisableRpc {
|
||||
s.rpcServer.Stop()
|
||||
}
|
||||
s.addrManager.Stop()
|
||||
s.blockManager.Stop()
|
||||
for _, listener := range s.listeners {
|
||||
err := listener.Close()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Shutdown the RPC server if it's not disabled.
|
||||
if !cfg.DisableRpc {
|
||||
s.rpcServer.Stop()
|
||||
}
|
||||
|
||||
// Signal the remaining goroutines to quit.
|
||||
close(s.quit)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue