// Copyright (c) 2016 The btcsuite developers // Use of this source code is governed by an ISC // license that can be found in the LICENSE file. package connmgr import ( "errors" "fmt" "net" "sync" "sync/atomic" "time" ) // maxFailedAttempts is the maximum number of successive failed connection // attempts after which network failure is assumed and new connections will // be delayed by the configured retry duration. const maxFailedAttempts = 25 var ( //ErrDialNil is used to indicate that Dial cannot be nil in the configuration. ErrDialNil = errors.New("Config: Dial cannot be nil") // maxRetryDuration is the max duration of time retrying of a persistent // connection is allowed to grow to. This is necessary since the retry // logic uses a backoff mechanism which increases the interval base times // the number of retries that have been done. maxRetryDuration = time.Minute * 5 // defaultRetryDuration is the default duration of time for retrying // persistent connections. defaultRetryDuration = time.Second * 5 // defaultTargetOutbound is the default number of outbound connections to // maintain. defaultTargetOutbound = uint32(8) ) // ConnState represents the state of the requested connection. type ConnState uint8 // ConnState can be either pending, established, disconnected or failed. When // a new connection is requested, it is attempted and categorized as // established or failed depending on the connection result. An established // connection which was disconnected is categorized as disconnected. const ( ConnPending ConnState = iota ConnFailing ConnCanceled ConnEstablished ConnDisconnected ) // ConnReq is the connection request to a network address. If permanent, the // connection will be retried on disconnection. type ConnReq struct { // The following variables must only be used atomically. id uint64 Addr net.Addr Permanent bool conn net.Conn state ConnState stateMtx sync.RWMutex retryCount uint32 } // updateState updates the state of the connection request. func (c *ConnReq) updateState(state ConnState) { c.stateMtx.Lock() c.state = state c.stateMtx.Unlock() } // ID returns a unique identifier for the connection request. func (c *ConnReq) ID() uint64 { return atomic.LoadUint64(&c.id) } // State is the connection state of the requested connection. func (c *ConnReq) State() ConnState { c.stateMtx.RLock() state := c.state c.stateMtx.RUnlock() return state } // String returns a human-readable string for the connection request. func (c *ConnReq) String() string { if c.Addr == nil || c.Addr.String() == "" { return fmt.Sprintf("reqid %d", atomic.LoadUint64(&c.id)) } return fmt.Sprintf("%s (reqid %d)", c.Addr, atomic.LoadUint64(&c.id)) } // Config holds the configuration options related to the connection manager. type Config struct { // Listeners defines a slice of listeners for which the connection // manager will take ownership of and accept connections. When a // connection is accepted, the OnAccept handler will be invoked with the // connection. Since the connection manager takes ownership of these // listeners, they will be closed when the connection manager is // stopped. // // This field will not have any effect if the OnAccept field is not // also specified. It may be nil if the caller does not wish to listen // for incoming connections. Listeners []net.Listener // OnAccept is a callback that is fired when an inbound connection is // accepted. It is the caller's responsibility to close the connection. // Failure to close the connection will result in the connection manager // believing the connection is still active and thus have undesirable // side effects such as still counting toward maximum connection limits. // // This field will not have any effect if the Listeners field is not // also specified since there couldn't possibly be any accepted // connections in that case. OnAccept func(net.Conn) // TargetOutbound is the number of outbound network connections to // maintain. Defaults to 8. TargetOutbound uint32 // RetryDuration is the duration to wait before retrying connection // requests. Defaults to 5s. RetryDuration time.Duration // OnConnection is a callback that is fired when a new outbound // connection is established. OnConnection func(*ConnReq, net.Conn) // OnDisconnection is a callback that is fired when an outbound // connection is disconnected. OnDisconnection func(*ConnReq) // GetNewAddress is a way to get an address to make a network connection // to. If nil, no new connections will be made automatically. GetNewAddress func() (net.Addr, error) // Dial connects to the address on the named network. It cannot be nil. Dial func(net.Addr) (net.Conn, error) } // registerPending is used to register a pending connection attempt. By // registering pending connection attempts we allow callers to cancel pending // connection attempts before their successful or in the case they're not // longer wanted. type registerPending struct { c *ConnReq done chan struct{} } // handleConnected is used to queue a successful connection. type handleConnected struct { c *ConnReq conn net.Conn } // handleDisconnected is used to remove a connection. type handleDisconnected struct { id uint64 retry bool } // handleFailed is used to remove a pending connection. type handleFailed struct { c *ConnReq err error } // ConnManager provides a manager to handle network connections. type ConnManager struct { // The following variables must only be used atomically. connReqCount uint64 start int32 stop int32 cfg Config wg sync.WaitGroup failedAttempts uint64 requests chan interface{} quit chan struct{} } // handleFailedConn handles a connection failed due to a disconnect or any // other failure. If permanent, it retries the connection after the configured // retry duration. Otherwise, if required, it makes a new connection request. // After maxFailedConnectionAttempts new connections will be retried after the // configured retry duration. func (cm *ConnManager) handleFailedConn(c *ConnReq) { if atomic.LoadInt32(&cm.stop) != 0 { return } if c.Permanent { c.retryCount++ d := time.Duration(c.retryCount) * cm.cfg.RetryDuration if d > maxRetryDuration { d = maxRetryDuration } log.Debugf("Retrying connection to %v in %v", c, d) time.AfterFunc(d, func() { cm.Connect(c) }) } else if cm.cfg.GetNewAddress != nil { cm.failedAttempts++ if cm.failedAttempts >= maxFailedAttempts { log.Debugf("Max failed connection attempts reached: [%d] "+ "-- retrying connection in: %v", maxFailedAttempts, cm.cfg.RetryDuration) time.AfterFunc(cm.cfg.RetryDuration, func() { cm.NewConnReq() }) } else { go cm.NewConnReq() } } } // connHandler handles all connection related requests. It must be run as a // goroutine. // // The connection handler makes sure that we maintain a pool of active outbound // connections so that we remain connected to the network. Connection requests // are processed and mapped by their assigned ids. func (cm *ConnManager) connHandler() { var ( // pending holds all registered conn requests that have yet to // succeed. pending = make(map[uint64]*ConnReq) // conns represents the set of all actively connected peers. conns = make(map[uint64]*ConnReq, cm.cfg.TargetOutbound) ) out: for { select { case req := <-cm.requests: switch msg := req.(type) { case registerPending: connReq := msg.c connReq.updateState(ConnPending) pending[msg.c.id] = connReq close(msg.done) case handleConnected: connReq := msg.c if _, ok := pending[connReq.id]; !ok { if msg.conn != nil { msg.conn.Close() } log.Debugf("Ignoring connection for "+ "canceled connreq=%v", connReq) continue } connReq.updateState(ConnEstablished) connReq.conn = msg.conn conns[connReq.id] = connReq log.Debugf("Connected to %v", connReq) connReq.retryCount = 0 cm.failedAttempts = 0 delete(pending, connReq.id) if cm.cfg.OnConnection != nil { go cm.cfg.OnConnection(connReq, msg.conn) } case handleDisconnected: connReq, ok := conns[msg.id] if !ok { connReq, ok = pending[msg.id] if !ok { log.Errorf("Unknown connid=%d", msg.id) continue } // Pending connection was found, remove // it from pending map if we should // ignore a later, successful // connection. connReq.updateState(ConnCanceled) log.Debugf("Canceling: %v", connReq) delete(pending, msg.id) continue } // An existing connection was located, mark as // disconnected and execute disconnection // callback. log.Debugf("Disconnected from %v", connReq) delete(conns, msg.id) if connReq.conn != nil { connReq.conn.Close() } if cm.cfg.OnDisconnection != nil { go cm.cfg.OnDisconnection(connReq) } // All internal state has been cleaned up, if // this connection is being removed, we will // make no further attempts with this request. if !msg.retry { connReq.updateState(ConnDisconnected) continue } // Otherwise, we will attempt a reconnection if // we do not have enough peers, or if this is a // persistent peer. The connection request is // re added to the pending map, so that // subsequent processing of connections and // failures do not ignore the request. if uint32(len(conns)) < cm.cfg.TargetOutbound || connReq.Permanent { connReq.updateState(ConnPending) log.Debugf("Reconnecting to %v", connReq) pending[msg.id] = connReq cm.handleFailedConn(connReq) } case handleFailed: connReq := msg.c if _, ok := pending[connReq.id]; !ok { log.Debugf("Ignoring connection for "+ "canceled conn req: %v", connReq) continue } connReq.updateState(ConnFailing) log.Debugf("Failed to connect to %v: %v", connReq, msg.err) cm.handleFailedConn(connReq) } case <-cm.quit: break out } } cm.wg.Done() log.Trace("Connection handler done") } // NewConnReq creates a new connection request and connects to the // corresponding address. func (cm *ConnManager) NewConnReq() { if atomic.LoadInt32(&cm.stop) != 0 { return } if cm.cfg.GetNewAddress == nil { return } c := &ConnReq{} atomic.StoreUint64(&c.id, atomic.AddUint64(&cm.connReqCount, 1)) // Submit a request of a pending connection attempt to the connection // manager. By registering the id before the connection is even // established, we'll be able to later cancel the connection via the // Remove method. done := make(chan struct{}) select { case cm.requests <- registerPending{c, done}: case <-cm.quit: return } // Wait for the registration to successfully add the pending conn req to // the conn manager's internal state. select { case <-done: case <-cm.quit: return } addr, err := cm.cfg.GetNewAddress() if err != nil { select { case cm.requests <- handleFailed{c, err}: case <-cm.quit: } return } c.Addr = addr cm.Connect(c) } // Connect assigns an id and dials a connection to the address of the // connection request. func (cm *ConnManager) Connect(c *ConnReq) { if atomic.LoadInt32(&cm.stop) != 0 { return } // During the time we wait for retry there is a chance that // this connection was already cancelled if c.State() == ConnCanceled { log.Debugf("Ignoring connect for canceled connreq=%v", c) return } if atomic.LoadUint64(&c.id) == 0 { atomic.StoreUint64(&c.id, atomic.AddUint64(&cm.connReqCount, 1)) // Submit a request of a pending connection attempt to the // connection manager. By registering the id before the // connection is even established, we'll be able to later // cancel the connection via the Remove method. done := make(chan struct{}) select { case cm.requests <- registerPending{c, done}: case <-cm.quit: return } // Wait for the registration to successfully add the pending // conn req to the conn manager's internal state. select { case <-done: case <-cm.quit: return } } log.Debugf("Attempting to connect to %v", c) conn, err := cm.cfg.Dial(c.Addr) if err != nil { select { case cm.requests <- handleFailed{c, err}: case <-cm.quit: } return } select { case cm.requests <- handleConnected{c, conn}: case <-cm.quit: } } // Disconnect disconnects the connection corresponding to the given connection // id. If permanent, the connection will be retried with an increasing backoff // duration. func (cm *ConnManager) Disconnect(id uint64) { if atomic.LoadInt32(&cm.stop) != 0 { return } select { case cm.requests <- handleDisconnected{id, true}: case <-cm.quit: } } // Remove removes the connection corresponding to the given connection id from // known connections. // // NOTE: This method can also be used to cancel a lingering connection attempt // that hasn't yet succeeded. func (cm *ConnManager) Remove(id uint64) { if atomic.LoadInt32(&cm.stop) != 0 { return } select { case cm.requests <- handleDisconnected{id, false}: case <-cm.quit: } } // listenHandler accepts incoming connections on a given listener. It must be // run as a goroutine. func (cm *ConnManager) listenHandler(listener net.Listener) { log.Infof("Server listening on %s", listener.Addr()) for atomic.LoadInt32(&cm.stop) == 0 { conn, err := listener.Accept() if err != nil { // Only log the error if not forcibly shutting down. if atomic.LoadInt32(&cm.stop) == 0 { log.Errorf("Can't accept connection: %v", err) } continue } go cm.cfg.OnAccept(conn) } cm.wg.Done() log.Tracef("Listener handler done for %s", listener.Addr()) } // Start launches the connection manager and begins connecting to the network. func (cm *ConnManager) Start() { // Already started? if atomic.AddInt32(&cm.start, 1) != 1 { return } log.Trace("Connection manager started") cm.wg.Add(1) go cm.connHandler() // Start all the listeners so long as the caller requested them and // provided a callback to be invoked when connections are accepted. if cm.cfg.OnAccept != nil { for _, listner := range cm.cfg.Listeners { cm.wg.Add(1) go cm.listenHandler(listner) } } for i := atomic.LoadUint64(&cm.connReqCount); i < uint64(cm.cfg.TargetOutbound); i++ { go cm.NewConnReq() } } // Wait blocks until the connection manager halts gracefully. func (cm *ConnManager) Wait() { cm.wg.Wait() } // Stop gracefully shuts down the connection manager. func (cm *ConnManager) Stop() { if atomic.AddInt32(&cm.stop, 1) != 1 { log.Warnf("Connection manager already stopped") return } // Stop all the listeners. There will not be any listeners if // listening is disabled. for _, listener := range cm.cfg.Listeners { // Ignore the error since this is shutdown and there is no way // to recover anyways. _ = listener.Close() } close(cm.quit) log.Trace("Connection manager stopped") } // New returns a new connection manager. // Use Start to start connecting to the network. func New(cfg *Config) (*ConnManager, error) { if cfg.Dial == nil { return nil, ErrDialNil } // Default to sane values if cfg.RetryDuration <= 0 { cfg.RetryDuration = defaultRetryDuration } if cfg.TargetOutbound == 0 { cfg.TargetOutbound = defaultTargetOutbound } cm := ConnManager{ cfg: *cfg, // Copy so caller can't mutate requests: make(chan interface{}), quit: make(chan struct{}), } return &cm, nil }