diff --git a/server.go b/server.go index 469befdb..2403ffdc 100644 --- a/server.go +++ b/server.go @@ -70,12 +70,26 @@ type server struct { } type peerState struct { - peers *list.List - banned map[string]time.Time - outboundPeers int + peers *list.List + outboundPeers *list.List + persistentPeers *list.List + banned map[string]time.Time maxOutboundPeers int } +func (p *peerState) Count() int { + return p.peers.Len() + p.outboundPeers.Len() + p.persistentPeers.Len() +} + +func (p *peerState) OutboundCount() int { + return p.outboundPeers.Len() + p.persistentPeers.Len() +} + +func (p *peerState) NeedMoreOutbound() bool { + return p.OutboundCount() < p.maxOutboundPeers && + p.Count() < cfg.MaxPeers +} + // handleAddPeerMsg deals with adding new peers. It is invoked from the // peerHandler goroutine. func (s *server) handleAddPeerMsg(state *peerState, p *peer) bool { @@ -113,7 +127,7 @@ func (s *server) handleAddPeerMsg(state *peerState, p *peer) bool { // TODO: Check for max peers from a single IP. // Limit max number of total peers. - if state.peers.Len() >= cfg.MaxPeers { + if state.Count() >= cfg.MaxPeers { log.Infof("SRVR: Max peers reached [%d] - disconnecting "+ "peer %s", cfg.MaxPeers, p) p.Shutdown() @@ -124,9 +138,15 @@ func (s *server) handleAddPeerMsg(state *peerState, p *peer) bool { // Add the new peer and start it. log.Debugf("SRVR: New peer %s", p) - state.peers.PushBack(p) - if p.inbound { - p.Start() + if p.persistent { + state.persistentPeers.PushBack(p) + } else { + if p.inbound { + state.peers.PushBack(p) + p.Start() + } else { + state.outboundPeers.PushBack(p) + } } return true @@ -134,23 +154,31 @@ func (s *server) handleAddPeerMsg(state *peerState, p *peer) bool { // handleDonePeerMsg deals with peers that have signalled they are done. It is // invoked from the peerHandler goroutine. -func (s *server) handleDonePeerMsg(state *peerState, p *peer) bool { - for e := state.peers.Front(); e != nil; e = e.Next() { +func (s *server) handleDonePeerMsg(state *peerState, p *peer) { + var list *list.List + if p.persistent { + list = state.persistentPeers + } else if p.inbound { + list = state.peers + } else { + list = state.outboundPeers + } + for e := list.Front(); e != nil; e = e.Next() { if e.Value == p { // Issue an asynchronous reconnect if the peer was a // persistent outbound connection. if !p.inbound && p.persistent && atomic.LoadInt32(&s.shutdown) == 0 { e.Value = newOutboundPeer(s, p.addr, true) - return false + return } - state.peers.Remove(e) + list.Remove(e) log.Debugf("SRVR: Removed peer %s", p) - return true + return } } - log.Warnf("SRVR: Lost peer %v that we never had!", p) - return false + // If we get here it means that either we didn't know about the peer + // or we purposefully deleted it. } // handleBanPeerMsg deals with banning peers. It is invoked from the @@ -168,35 +196,51 @@ func (s *server) handleBanPeerMsg(state *peerState, p *peer) { } +// forAllOutboundPeers is a helper function that runs closure on all outbound +// peers known to peerState. +func forAllOutboundPeers(state *peerState, closure func(p *peer)) { + for e := state.outboundPeers.Front(); e != nil; e = e.Next() { + closure(e.Value.(*peer)) + } + for e := state.persistentPeers.Front(); e != nil; e = e.Next() { + closure(e.Value.(*peer)) + } +} + +// forAllPeers is a helper function that runs closure on all peers known to +// peerSTate. +func forAllPeers(state *peerState, closure func(p *peer)) { + for e := state.peers.Front(); e != nil; e = e.Next() { + closure(e.Value.(*peer)) + } + forAllOutboundPeers(state, closure) +} + // handleRelayInvMsg deals with relaying inventory to peers that are not already // known to have it. It is invoked from the peerHandler goroutine. func (s *server) handleRelayInvMsg(state *peerState, iv *btcwire.InvVect) { - // Loop through all connected peers and relay the inventory to those - // which are not already known to have it. - for e := state.peers.Front(); e != nil; e = e.Next() { - p := e.Value.(*peer) + forAllPeers(state, func(p *peer) { if !p.Connected() { - continue + return } // Queue the inventory to be relayed with the next batch. It // will be ignored if the peer is already known to have the // inventory. p.QueueInventory(iv) - } + }) } // handleBroadcastMsg deals with broadcasting messages to peers. It is invoked // from the peerHandler goroutine. func (s *server) handleBroadcastMsg(state *peerState, bmsg *broadcastMsg) { - for e := state.peers.Front(); e != nil; e = e.Next() { + forAllPeers(state, func(p *peer) { excluded := false - for _, p := range bmsg.excludePeers { - if e.Value == p { + for _, ep := range bmsg.excludePeers { + if p == ep { excluded = true } } - p := e.Value.(*peer) // Don't broadcast to still connecting outbound peers . if !p.Connected() { excluded = true @@ -204,7 +248,7 @@ func (s *server) handleBroadcastMsg(state *peerState, bmsg *broadcastMsg) { if !excluded { p.QueueMessage(bmsg.message, nil) } - } + }) } // PeerInfo represents the information requested by the getpeerinfo rpc command. @@ -249,50 +293,51 @@ func (s *server) handleQuery(querymsg interface{}, state *peerState) { switch msg := querymsg.(type) { case getConnCountMsg: nconnected := 0 - for e := state.peers.Front(); e != nil; e = e.Next() { - peer := e.Value.(*peer) - if peer.Connected() { + + forAllPeers(state, func(p *peer) { + if p.Connected() { nconnected++ } - } + }) msg.reply <- nconnected case getPeerInfoMsg: infos := make([]*PeerInfo, 0, state.peers.Len()) - for e := state.peers.Front(); e != nil; e = e.Next() { - peer := e.Value.(*peer) - if !peer.Connected() { - continue + forAllPeers(state, func(p *peer) { + if !p.Connected() { + return } // A lot of this will make the race detector go mad, // however it is statistics for purely informational purposes // and we don't really care if they are raced to get the new // version. info := &PeerInfo{ - Addr: peer.addr, - Services: peer.services, - LastSend: peer.lastSend.Unix(), - LastRecv: peer.lastRecv.Unix(), + Addr: p.addr, + Services: p.services, + LastSend: p.lastSend.Unix(), + LastRecv: p.lastRecv.Unix(), BytesSent: 0, // TODO(oga) we need this from wire. BytesRecv: 0, // TODO(oga) we need this from wire. - ConnTime: peer.timeConnected.Unix(), - Version: peer.protocolVersion, - SubVer: peer.userAgent, - Inbound: peer.inbound, - StartingHeight: peer.lastBlock, + ConnTime: p.timeConnected.Unix(), + Version: p.protocolVersion, + SubVer: p.userAgent, + Inbound: p.inbound, + StartingHeight: p.lastBlock, BanScore: 0, SyncNode: false, // TODO(oga) for now. bm knows this. } infos = append(infos, info) - } + }) msg.reply <- infos case addNodeMsg: - // TODO(oga) really these checks only apply to permanent peers. - for e := state.peers.Front(); e != nil; e = e.Next() { - peer := e.Value.(*peer) - if peer.addr == msg.addr { - msg.reply <- errors.New("peer already connected") - return + // XXX(oga) duplicate oneshots? + if msg.permanent { + for e := state.persistentPeers.Front(); e != nil; e = e.Next() { + peer := e.Value.(*peer) + if peer.addr == msg.addr { + msg.reply <- errors.New("peer already connected") + return + } } } // TODO(oga) if too many, nuke a non-perm peer. @@ -305,11 +350,12 @@ func (s *server) handleQuery(querymsg interface{}, state *peerState) { case delNodeMsg: found := false - // TODO(oga) really these checks only apply to permanent peers. - for e := state.peers.Front(); e != nil; e = e.Next() { + for e := state.persistentPeers.Front(); e != nil; e = e.Next() { peer := e.Value.(*peer) if peer.addr == msg.addr { - peer.persistent = false // XXX hack! + // This is ok because we are not continuing + // to iterate so won't corrupt the loop. + state.persistentPeers.Remove(e) peer.Disconnect() found = true break @@ -399,9 +445,10 @@ func (s *server) peerHandler() { log.Tracef("SRVR: Starting peer handler") state := &peerState{ - peers: list.New(), + peers: list.New(), persistentPeers: list.New(), - banned: make(map[string]time.Time), + outboundPeers: list.New(), + banned: make(map[string]time.Time), maxOutboundPeers: defaultMaxOutbound, } if cfg.MaxPeers < state.maxOutboundPeers { @@ -417,9 +464,7 @@ func (s *server) peerHandler() { permanentPeers = cfg.AddPeers } for _, addr := range permanentPeers { - if s.handleAddPeerMsg(state, newOutboundPeer(s, addr, true)) { - state.outboundPeers++ - } + s.handleAddPeerMsg(state, newOutboundPeer(s, addr, true)) } // if nothing else happens, wake us up soon. @@ -430,16 +475,11 @@ out: select { // New peers connected to the server. case p := <-s.newPeers: - if s.handleAddPeerMsg(state, p) && !p.inbound { - state.outboundPeers++ - } + s.handleAddPeerMsg(state, p) // Disconnected peers. case p := <-s.donePeers: - // handleDonePeerMsg return true if it removed a peer - if s.handleDonePeerMsg(state, p) { - state.outboundPeers-- - } + s.handleDonePeerMsg(state, p) // Peer to ban. case p := <-s.banPeers: @@ -464,36 +504,30 @@ out: // Shutdown the peer handler. case <-s.quit: // Shutdown peers. - for e := state.peers.Front(); e != nil; e = e.Next() { - p := e.Value.(*peer) + forAllPeers(state, func(p *peer) { p.Shutdown() - } + }) break out } // Only try connect to more peers if we actually need more - if state.outboundPeers >= state.maxOutboundPeers || - len(cfg.ConnectPeers) > 0 || + if !state.NeedMoreOutbound() || len(cfg.ConnectPeers) > 0 || atomic.LoadInt32(&s.shutdown) != 0 { continue } groups := make(map[string]int) - for e := state.peers.Front(); e != nil; e = e.Next() { - peer := e.Value.(*peer) - if !peer.inbound { - groups[GroupKey(peer.na)]++ - } - } + forAllOutboundPeers(state, func(p *peer) { + groups[GroupKey(p.na)]++ + }) tries := 0 - for state.outboundPeers < state.maxOutboundPeers && - state.peers.Len() < cfg.MaxPeers && + for state.NeedMoreOutbound() && atomic.LoadInt32(&s.shutdown) == 0 { // We bias like bitcoind does, 10 for no outgoing // up to 90 (8) for the selection of new vs tried //addresses. - nPeers := state.outboundPeers + nPeers := state.OutboundCount() if nPeers > 8 { nPeers = 8 } @@ -542,14 +576,12 @@ out: // already checked that we have room for more peers. if s.handleAddPeerMsg(state, newOutboundPeer(s, addrStr, false)) { - state.outboundPeers++ groups[key]++ } } // We we need more peers, wake up in ten seconds and try again. - if state.outboundPeers < state.maxOutboundPeers && - state.peers.Len() < cfg.MaxPeers { + if state.NeedMoreOutbound() { time.AfterFunc(10*time.Second, func() { s.wakeup <- true })