connmgr: Implement inbound connection handling.

This modifies the connection manager to provide support for accepting
inbound connections on a caller-provided set of listeners and notify the
caller via a callback.

This is only the minimum work necessary to get inbound support into the
connection manager.  The intention for future commits is to move more
connection-related logic such as limiting the maximum number of overall
connections and banned peer tracking into the connection manager.
This commit is contained in:
Dave Collins 2016-11-03 22:51:07 -05:00
parent ea9bf748bb
commit d98430d8ca
No known key found for this signature in database
GPG key ID: B8904D9D9C93D1F2
2 changed files with 72 additions and 38 deletions

View file

@ -96,6 +96,29 @@ func (c *ConnReq) String() string {
// Config holds the configuration options related to the connection manager. // Config holds the configuration options related to the connection manager.
type Config struct { type Config struct {
// Listeners defines a slice of listeners for which the connection
// manager will take ownership of and accept connections. When a
// connection is accepted, the OnAccept handler will be invoked with the
// connection. Since the connection manager takes ownership of these
// listeners, they will be closed when the connection manager is
// stopped.
//
// This field will not have any effect if the OnAccept field is not
// also specified. It may be nil if the caller does not wish to listen
// for incoming connections.
Listeners []net.Listener
// OnAccept is a callback that is fired when an inbound connection is
// accepted. It is the caller's responsibility to close the connection.
// Failure to close the connection will result in the connection manager
// believing the connection is still active and thus have undesirable
// side effects such as still counting toward maximum connection limits.
//
// This field will not have any effect if the Listeners field is not
// also specified since there couldn't possibly be any accepted
// connections in that case.
OnAccept func(net.Conn)
// TargetOutbound is the number of outbound network connections to // TargetOutbound is the number of outbound network connections to
// maintain. Defaults to 8. // maintain. Defaults to 8.
TargetOutbound uint32 TargetOutbound uint32
@ -306,6 +329,26 @@ func (cm *ConnManager) Remove(id uint64) {
cm.requests <- handleDisconnected{id, false} cm.requests <- handleDisconnected{id, false}
} }
// listenHandler accepts incoming connections on a given listener. It must be
// run as a goroutine.
func (cm *ConnManager) listenHandler(listener net.Listener) {
log.Infof("Server listening on %s", listener.Addr())
for atomic.LoadInt32(&cm.stop) == 0 {
conn, err := listener.Accept()
if err != nil {
// Only log the error if not forcibly shutting down.
if atomic.LoadInt32(&cm.stop) == 0 {
log.Errorf("Can't accept connection: %v", err)
}
continue
}
go cm.cfg.OnAccept(conn)
}
cm.wg.Done()
log.Tracef("Listener handler done for %s", listener.Addr())
}
// Start launches the connection manager and begins connecting to the network. // Start launches the connection manager and begins connecting to the network.
func (cm *ConnManager) Start() { func (cm *ConnManager) Start() {
// Already started? // Already started?
@ -317,6 +360,15 @@ func (cm *ConnManager) Start() {
cm.wg.Add(1) cm.wg.Add(1)
go cm.connHandler() go cm.connHandler()
// Start all the listeners so long as the caller requested them and
// provided a callback to be invoked when connections are accepted.
if cm.cfg.OnAccept != nil {
for _, listner := range cm.cfg.Listeners {
cm.wg.Add(1)
go cm.listenHandler(listner)
}
}
for i := atomic.LoadUint64(&cm.connReqCount); i < uint64(cm.cfg.TargetOutbound); i++ { for i := atomic.LoadUint64(&cm.connReqCount); i < uint64(cm.cfg.TargetOutbound); i++ {
go cm.NewConnReq() go cm.NewConnReq()
} }
@ -333,6 +385,15 @@ func (cm *ConnManager) Stop() {
log.Warnf("Connection manager already stopped") log.Warnf("Connection manager already stopped")
return return
} }
// Stop all the listeners. There will not be any listeners if
// listening is disabled.
for _, listener := range cm.cfg.Listeners {
// Ignore the error since this is shutdown and there is no way
// to recover anyways.
_ = listener.Close()
}
close(cm.quit) close(cm.quit)
log.Trace("Connection manager stopped") log.Trace("Connection manager stopped")
} }

View file

@ -145,7 +145,6 @@ type server struct {
shutdown int32 shutdown int32
shutdownSched int32 shutdownSched int32
listeners []net.Listener
chainParams *chaincfg.Params chainParams *chaincfg.Params
addrManager *addrmgr.AddrManager addrManager *addrmgr.AddrManager
connManager *connmgr.ConnManager connManager *connmgr.ConnManager
@ -1582,26 +1581,15 @@ func newPeerConfig(sp *serverPeer) *peer.Config {
} }
} }
// listenHandler is the main listener which accepts incoming connections for the // inboundPeerConnected is invoked by the connection manager when a new inbound
// server. It must be run as a goroutine. // connection is established. It initializes a new inbound server peer
func (s *server) listenHandler(listener net.Listener) { // instance, associates it with the connection, and starts a goroutine to wait
srvrLog.Infof("Server listening on %s", listener.Addr()) // for disconnection.
for atomic.LoadInt32(&s.shutdown) == 0 { func (s *server) inboundPeerConnected(conn net.Conn) {
conn, err := listener.Accept() sp := newServerPeer(s, false)
if err != nil { sp.Peer = peer.NewInboundPeer(newPeerConfig(sp))
// Only log the error if we're not forcibly shutting down. sp.AssociateConnection(conn)
if atomic.LoadInt32(&s.shutdown) == 0 { go s.peerDoneHandler(sp)
srvrLog.Errorf("Can't accept connection: %v", err)
}
continue
}
sp := newServerPeer(s, false)
sp.Peer = peer.NewInboundPeer(newPeerConfig(sp))
sp.AssociateConnection(conn)
go s.peerDoneHandler(sp)
}
s.wg.Done()
srvrLog.Tracef("Listener handler done for %s", listener.Addr())
} }
// outboundPeerConnected is invoked by the connection manager when a new // outboundPeerConnected is invoked by the connection manager when a new
@ -1966,13 +1954,6 @@ func (s *server) Start() {
srvrLog.Trace("Starting server") srvrLog.Trace("Starting server")
// Start all the listeners. There will not be any if listening is
// disabled.
for _, listener := range s.listeners {
s.wg.Add(1)
go s.listenHandler(listener)
}
// Start the peer handler which in turn starts the address and block // Start the peer handler which in turn starts the address and block
// managers. // managers.
s.wg.Add(1) s.wg.Add(1)
@ -2010,15 +1991,6 @@ func (s *server) Stop() error {
srvrLog.Warnf("Server shutting down") srvrLog.Warnf("Server shutting down")
// Stop all the listeners. There will not be any listeners if
// listening is disabled.
for _, listener := range s.listeners {
err := listener.Close()
if err != nil {
return err
}
}
// Stop the CPU miner if needed // Stop the CPU miner if needed
s.cpuMiner.Stop() s.cpuMiner.Stop()
@ -2317,7 +2289,6 @@ func newServer(listenAddrs []string, db database.DB, chainParams *chaincfg.Param
} }
s := server{ s := server{
listeners: listeners,
chainParams: chainParams, chainParams: chainParams,
addrManager: amgr, addrManager: amgr,
newPeers: make(chan *serverPeer, cfg.MaxPeers), newPeers: make(chan *serverPeer, cfg.MaxPeers),
@ -2469,6 +2440,8 @@ func newServer(listenAddrs []string, db database.DB, chainParams *chaincfg.Param
targetOutbound = cfg.MaxPeers targetOutbound = cfg.MaxPeers
} }
cmgr, err := connmgr.New(&connmgr.Config{ cmgr, err := connmgr.New(&connmgr.Config{
Listeners: listeners,
OnAccept: s.inboundPeerConnected,
RetryDuration: connectionRetryInterval, RetryDuration: connectionRetryInterval,
TargetOutbound: uint32(targetOutbound), TargetOutbound: uint32(targetOutbound),
Dial: btcdDial, Dial: btcdDial,