From 6949a4f940b717114f296329983c74878af3b802 Mon Sep 17 00:00:00 2001 From: "Owain G. Ainsworth" Date: Thu, 31 Oct 2013 17:51:40 +0000 Subject: [PATCH] Have a list per class of peer instead of just one. persistentpeers and outbound(nonpersistent) peers get their own lists, so iterating over them can be much simpler (and quicker when you have 125 peer of which 8 are outbound). --- server.go | 194 +++++++++++++++++++++++++++++++----------------------- 1 file changed, 113 insertions(+), 81 deletions(-) diff --git a/server.go b/server.go index 469befdb..2403ffdc 100644 --- a/server.go +++ b/server.go @@ -70,12 +70,26 @@ type server struct { } type peerState struct { - peers *list.List - banned map[string]time.Time - outboundPeers int + peers *list.List + outboundPeers *list.List + persistentPeers *list.List + banned map[string]time.Time maxOutboundPeers int } +func (p *peerState) Count() int { + return p.peers.Len() + p.outboundPeers.Len() + p.persistentPeers.Len() +} + +func (p *peerState) OutboundCount() int { + return p.outboundPeers.Len() + p.persistentPeers.Len() +} + +func (p *peerState) NeedMoreOutbound() bool { + return p.OutboundCount() < p.maxOutboundPeers && + p.Count() < cfg.MaxPeers +} + // handleAddPeerMsg deals with adding new peers. It is invoked from the // peerHandler goroutine. func (s *server) handleAddPeerMsg(state *peerState, p *peer) bool { @@ -113,7 +127,7 @@ func (s *server) handleAddPeerMsg(state *peerState, p *peer) bool { // TODO: Check for max peers from a single IP. // Limit max number of total peers. - if state.peers.Len() >= cfg.MaxPeers { + if state.Count() >= cfg.MaxPeers { log.Infof("SRVR: Max peers reached [%d] - disconnecting "+ "peer %s", cfg.MaxPeers, p) p.Shutdown() @@ -124,9 +138,15 @@ func (s *server) handleAddPeerMsg(state *peerState, p *peer) bool { // Add the new peer and start it. log.Debugf("SRVR: New peer %s", p) - state.peers.PushBack(p) - if p.inbound { - p.Start() + if p.persistent { + state.persistentPeers.PushBack(p) + } else { + if p.inbound { + state.peers.PushBack(p) + p.Start() + } else { + state.outboundPeers.PushBack(p) + } } return true @@ -134,23 +154,31 @@ func (s *server) handleAddPeerMsg(state *peerState, p *peer) bool { // handleDonePeerMsg deals with peers that have signalled they are done. It is // invoked from the peerHandler goroutine. -func (s *server) handleDonePeerMsg(state *peerState, p *peer) bool { - for e := state.peers.Front(); e != nil; e = e.Next() { +func (s *server) handleDonePeerMsg(state *peerState, p *peer) { + var list *list.List + if p.persistent { + list = state.persistentPeers + } else if p.inbound { + list = state.peers + } else { + list = state.outboundPeers + } + for e := list.Front(); e != nil; e = e.Next() { if e.Value == p { // Issue an asynchronous reconnect if the peer was a // persistent outbound connection. if !p.inbound && p.persistent && atomic.LoadInt32(&s.shutdown) == 0 { e.Value = newOutboundPeer(s, p.addr, true) - return false + return } - state.peers.Remove(e) + list.Remove(e) log.Debugf("SRVR: Removed peer %s", p) - return true + return } } - log.Warnf("SRVR: Lost peer %v that we never had!", p) - return false + // If we get here it means that either we didn't know about the peer + // or we purposefully deleted it. } // handleBanPeerMsg deals with banning peers. It is invoked from the @@ -168,35 +196,51 @@ func (s *server) handleBanPeerMsg(state *peerState, p *peer) { } +// forAllOutboundPeers is a helper function that runs closure on all outbound +// peers known to peerState. +func forAllOutboundPeers(state *peerState, closure func(p *peer)) { + for e := state.outboundPeers.Front(); e != nil; e = e.Next() { + closure(e.Value.(*peer)) + } + for e := state.persistentPeers.Front(); e != nil; e = e.Next() { + closure(e.Value.(*peer)) + } +} + +// forAllPeers is a helper function that runs closure on all peers known to +// peerSTate. +func forAllPeers(state *peerState, closure func(p *peer)) { + for e := state.peers.Front(); e != nil; e = e.Next() { + closure(e.Value.(*peer)) + } + forAllOutboundPeers(state, closure) +} + // handleRelayInvMsg deals with relaying inventory to peers that are not already // known to have it. It is invoked from the peerHandler goroutine. func (s *server) handleRelayInvMsg(state *peerState, iv *btcwire.InvVect) { - // Loop through all connected peers and relay the inventory to those - // which are not already known to have it. - for e := state.peers.Front(); e != nil; e = e.Next() { - p := e.Value.(*peer) + forAllPeers(state, func(p *peer) { if !p.Connected() { - continue + return } // Queue the inventory to be relayed with the next batch. It // will be ignored if the peer is already known to have the // inventory. p.QueueInventory(iv) - } + }) } // handleBroadcastMsg deals with broadcasting messages to peers. It is invoked // from the peerHandler goroutine. func (s *server) handleBroadcastMsg(state *peerState, bmsg *broadcastMsg) { - for e := state.peers.Front(); e != nil; e = e.Next() { + forAllPeers(state, func(p *peer) { excluded := false - for _, p := range bmsg.excludePeers { - if e.Value == p { + for _, ep := range bmsg.excludePeers { + if p == ep { excluded = true } } - p := e.Value.(*peer) // Don't broadcast to still connecting outbound peers . if !p.Connected() { excluded = true @@ -204,7 +248,7 @@ func (s *server) handleBroadcastMsg(state *peerState, bmsg *broadcastMsg) { if !excluded { p.QueueMessage(bmsg.message, nil) } - } + }) } // PeerInfo represents the information requested by the getpeerinfo rpc command. @@ -249,50 +293,51 @@ func (s *server) handleQuery(querymsg interface{}, state *peerState) { switch msg := querymsg.(type) { case getConnCountMsg: nconnected := 0 - for e := state.peers.Front(); e != nil; e = e.Next() { - peer := e.Value.(*peer) - if peer.Connected() { + + forAllPeers(state, func(p *peer) { + if p.Connected() { nconnected++ } - } + }) msg.reply <- nconnected case getPeerInfoMsg: infos := make([]*PeerInfo, 0, state.peers.Len()) - for e := state.peers.Front(); e != nil; e = e.Next() { - peer := e.Value.(*peer) - if !peer.Connected() { - continue + forAllPeers(state, func(p *peer) { + if !p.Connected() { + return } // A lot of this will make the race detector go mad, // however it is statistics for purely informational purposes // and we don't really care if they are raced to get the new // version. info := &PeerInfo{ - Addr: peer.addr, - Services: peer.services, - LastSend: peer.lastSend.Unix(), - LastRecv: peer.lastRecv.Unix(), + Addr: p.addr, + Services: p.services, + LastSend: p.lastSend.Unix(), + LastRecv: p.lastRecv.Unix(), BytesSent: 0, // TODO(oga) we need this from wire. BytesRecv: 0, // TODO(oga) we need this from wire. - ConnTime: peer.timeConnected.Unix(), - Version: peer.protocolVersion, - SubVer: peer.userAgent, - Inbound: peer.inbound, - StartingHeight: peer.lastBlock, + ConnTime: p.timeConnected.Unix(), + Version: p.protocolVersion, + SubVer: p.userAgent, + Inbound: p.inbound, + StartingHeight: p.lastBlock, BanScore: 0, SyncNode: false, // TODO(oga) for now. bm knows this. } infos = append(infos, info) - } + }) msg.reply <- infos case addNodeMsg: - // TODO(oga) really these checks only apply to permanent peers. - for e := state.peers.Front(); e != nil; e = e.Next() { - peer := e.Value.(*peer) - if peer.addr == msg.addr { - msg.reply <- errors.New("peer already connected") - return + // XXX(oga) duplicate oneshots? + if msg.permanent { + for e := state.persistentPeers.Front(); e != nil; e = e.Next() { + peer := e.Value.(*peer) + if peer.addr == msg.addr { + msg.reply <- errors.New("peer already connected") + return + } } } // TODO(oga) if too many, nuke a non-perm peer. @@ -305,11 +350,12 @@ func (s *server) handleQuery(querymsg interface{}, state *peerState) { case delNodeMsg: found := false - // TODO(oga) really these checks only apply to permanent peers. - for e := state.peers.Front(); e != nil; e = e.Next() { + for e := state.persistentPeers.Front(); e != nil; e = e.Next() { peer := e.Value.(*peer) if peer.addr == msg.addr { - peer.persistent = false // XXX hack! + // This is ok because we are not continuing + // to iterate so won't corrupt the loop. + state.persistentPeers.Remove(e) peer.Disconnect() found = true break @@ -399,9 +445,10 @@ func (s *server) peerHandler() { log.Tracef("SRVR: Starting peer handler") state := &peerState{ - peers: list.New(), + peers: list.New(), persistentPeers: list.New(), - banned: make(map[string]time.Time), + outboundPeers: list.New(), + banned: make(map[string]time.Time), maxOutboundPeers: defaultMaxOutbound, } if cfg.MaxPeers < state.maxOutboundPeers { @@ -417,9 +464,7 @@ func (s *server) peerHandler() { permanentPeers = cfg.AddPeers } for _, addr := range permanentPeers { - if s.handleAddPeerMsg(state, newOutboundPeer(s, addr, true)) { - state.outboundPeers++ - } + s.handleAddPeerMsg(state, newOutboundPeer(s, addr, true)) } // if nothing else happens, wake us up soon. @@ -430,16 +475,11 @@ out: select { // New peers connected to the server. case p := <-s.newPeers: - if s.handleAddPeerMsg(state, p) && !p.inbound { - state.outboundPeers++ - } + s.handleAddPeerMsg(state, p) // Disconnected peers. case p := <-s.donePeers: - // handleDonePeerMsg return true if it removed a peer - if s.handleDonePeerMsg(state, p) { - state.outboundPeers-- - } + s.handleDonePeerMsg(state, p) // Peer to ban. case p := <-s.banPeers: @@ -464,36 +504,30 @@ out: // Shutdown the peer handler. case <-s.quit: // Shutdown peers. - for e := state.peers.Front(); e != nil; e = e.Next() { - p := e.Value.(*peer) + forAllPeers(state, func(p *peer) { p.Shutdown() - } + }) break out } // Only try connect to more peers if we actually need more - if state.outboundPeers >= state.maxOutboundPeers || - len(cfg.ConnectPeers) > 0 || + if !state.NeedMoreOutbound() || len(cfg.ConnectPeers) > 0 || atomic.LoadInt32(&s.shutdown) != 0 { continue } groups := make(map[string]int) - for e := state.peers.Front(); e != nil; e = e.Next() { - peer := e.Value.(*peer) - if !peer.inbound { - groups[GroupKey(peer.na)]++ - } - } + forAllOutboundPeers(state, func(p *peer) { + groups[GroupKey(p.na)]++ + }) tries := 0 - for state.outboundPeers < state.maxOutboundPeers && - state.peers.Len() < cfg.MaxPeers && + for state.NeedMoreOutbound() && atomic.LoadInt32(&s.shutdown) == 0 { // We bias like bitcoind does, 10 for no outgoing // up to 90 (8) for the selection of new vs tried //addresses. - nPeers := state.outboundPeers + nPeers := state.OutboundCount() if nPeers > 8 { nPeers = 8 } @@ -542,14 +576,12 @@ out: // already checked that we have room for more peers. if s.handleAddPeerMsg(state, newOutboundPeer(s, addrStr, false)) { - state.outboundPeers++ groups[key]++ } } // We we need more peers, wake up in ten seconds and try again. - if state.outboundPeers < state.maxOutboundPeers && - state.peers.Len() < cfg.MaxPeers { + if state.NeedMoreOutbound() { time.AfterFunc(10*time.Second, func() { s.wakeup <- true })