Have a list per class of peer instead of just one.

persistentpeers and outbound(nonpersistent) peers get their own lists,
so iterating over them can be much simpler (and quicker when you have
125 peer of which 8 are outbound).
This commit is contained in:
Owain G. Ainsworth 2013-10-31 17:51:40 +00:00
parent 4d80750afe
commit 6949a4f940

194
server.go
View file

@ -70,12 +70,26 @@ type server struct {
} }
type peerState struct { type peerState struct {
peers *list.List peers *list.List
banned map[string]time.Time outboundPeers *list.List
outboundPeers int persistentPeers *list.List
banned map[string]time.Time
maxOutboundPeers int maxOutboundPeers int
} }
func (p *peerState) Count() int {
return p.peers.Len() + p.outboundPeers.Len() + p.persistentPeers.Len()
}
func (p *peerState) OutboundCount() int {
return p.outboundPeers.Len() + p.persistentPeers.Len()
}
func (p *peerState) NeedMoreOutbound() bool {
return p.OutboundCount() < p.maxOutboundPeers &&
p.Count() < cfg.MaxPeers
}
// handleAddPeerMsg deals with adding new peers. It is invoked from the // handleAddPeerMsg deals with adding new peers. It is invoked from the
// peerHandler goroutine. // peerHandler goroutine.
func (s *server) handleAddPeerMsg(state *peerState, p *peer) bool { func (s *server) handleAddPeerMsg(state *peerState, p *peer) bool {
@ -113,7 +127,7 @@ func (s *server) handleAddPeerMsg(state *peerState, p *peer) bool {
// TODO: Check for max peers from a single IP. // TODO: Check for max peers from a single IP.
// Limit max number of total peers. // Limit max number of total peers.
if state.peers.Len() >= cfg.MaxPeers { if state.Count() >= cfg.MaxPeers {
log.Infof("SRVR: Max peers reached [%d] - disconnecting "+ log.Infof("SRVR: Max peers reached [%d] - disconnecting "+
"peer %s", cfg.MaxPeers, p) "peer %s", cfg.MaxPeers, p)
p.Shutdown() p.Shutdown()
@ -124,9 +138,15 @@ func (s *server) handleAddPeerMsg(state *peerState, p *peer) bool {
// Add the new peer and start it. // Add the new peer and start it.
log.Debugf("SRVR: New peer %s", p) log.Debugf("SRVR: New peer %s", p)
state.peers.PushBack(p) if p.persistent {
if p.inbound { state.persistentPeers.PushBack(p)
p.Start() } else {
if p.inbound {
state.peers.PushBack(p)
p.Start()
} else {
state.outboundPeers.PushBack(p)
}
} }
return true return true
@ -134,23 +154,31 @@ func (s *server) handleAddPeerMsg(state *peerState, p *peer) bool {
// handleDonePeerMsg deals with peers that have signalled they are done. It is // handleDonePeerMsg deals with peers that have signalled they are done. It is
// invoked from the peerHandler goroutine. // invoked from the peerHandler goroutine.
func (s *server) handleDonePeerMsg(state *peerState, p *peer) bool { func (s *server) handleDonePeerMsg(state *peerState, p *peer) {
for e := state.peers.Front(); e != nil; e = e.Next() { var list *list.List
if p.persistent {
list = state.persistentPeers
} else if p.inbound {
list = state.peers
} else {
list = state.outboundPeers
}
for e := list.Front(); e != nil; e = e.Next() {
if e.Value == p { if e.Value == p {
// Issue an asynchronous reconnect if the peer was a // Issue an asynchronous reconnect if the peer was a
// persistent outbound connection. // persistent outbound connection.
if !p.inbound && p.persistent && if !p.inbound && p.persistent &&
atomic.LoadInt32(&s.shutdown) == 0 { atomic.LoadInt32(&s.shutdown) == 0 {
e.Value = newOutboundPeer(s, p.addr, true) e.Value = newOutboundPeer(s, p.addr, true)
return false return
} }
state.peers.Remove(e) list.Remove(e)
log.Debugf("SRVR: Removed peer %s", p) log.Debugf("SRVR: Removed peer %s", p)
return true return
} }
} }
log.Warnf("SRVR: Lost peer %v that we never had!", p) // If we get here it means that either we didn't know about the peer
return false // or we purposefully deleted it.
} }
// handleBanPeerMsg deals with banning peers. It is invoked from the // handleBanPeerMsg deals with banning peers. It is invoked from the
@ -168,35 +196,51 @@ func (s *server) handleBanPeerMsg(state *peerState, p *peer) {
} }
// forAllOutboundPeers is a helper function that runs closure on all outbound
// peers known to peerState.
func forAllOutboundPeers(state *peerState, closure func(p *peer)) {
for e := state.outboundPeers.Front(); e != nil; e = e.Next() {
closure(e.Value.(*peer))
}
for e := state.persistentPeers.Front(); e != nil; e = e.Next() {
closure(e.Value.(*peer))
}
}
// forAllPeers is a helper function that runs closure on all peers known to
// peerSTate.
func forAllPeers(state *peerState, closure func(p *peer)) {
for e := state.peers.Front(); e != nil; e = e.Next() {
closure(e.Value.(*peer))
}
forAllOutboundPeers(state, closure)
}
// handleRelayInvMsg deals with relaying inventory to peers that are not already // handleRelayInvMsg deals with relaying inventory to peers that are not already
// known to have it. It is invoked from the peerHandler goroutine. // known to have it. It is invoked from the peerHandler goroutine.
func (s *server) handleRelayInvMsg(state *peerState, iv *btcwire.InvVect) { func (s *server) handleRelayInvMsg(state *peerState, iv *btcwire.InvVect) {
// Loop through all connected peers and relay the inventory to those forAllPeers(state, func(p *peer) {
// which are not already known to have it.
for e := state.peers.Front(); e != nil; e = e.Next() {
p := e.Value.(*peer)
if !p.Connected() { if !p.Connected() {
continue return
} }
// Queue the inventory to be relayed with the next batch. It // Queue the inventory to be relayed with the next batch. It
// will be ignored if the peer is already known to have the // will be ignored if the peer is already known to have the
// inventory. // inventory.
p.QueueInventory(iv) p.QueueInventory(iv)
} })
} }
// handleBroadcastMsg deals with broadcasting messages to peers. It is invoked // handleBroadcastMsg deals with broadcasting messages to peers. It is invoked
// from the peerHandler goroutine. // from the peerHandler goroutine.
func (s *server) handleBroadcastMsg(state *peerState, bmsg *broadcastMsg) { func (s *server) handleBroadcastMsg(state *peerState, bmsg *broadcastMsg) {
for e := state.peers.Front(); e != nil; e = e.Next() { forAllPeers(state, func(p *peer) {
excluded := false excluded := false
for _, p := range bmsg.excludePeers { for _, ep := range bmsg.excludePeers {
if e.Value == p { if p == ep {
excluded = true excluded = true
} }
} }
p := e.Value.(*peer)
// Don't broadcast to still connecting outbound peers . // Don't broadcast to still connecting outbound peers .
if !p.Connected() { if !p.Connected() {
excluded = true excluded = true
@ -204,7 +248,7 @@ func (s *server) handleBroadcastMsg(state *peerState, bmsg *broadcastMsg) {
if !excluded { if !excluded {
p.QueueMessage(bmsg.message, nil) p.QueueMessage(bmsg.message, nil)
} }
} })
} }
// PeerInfo represents the information requested by the getpeerinfo rpc command. // PeerInfo represents the information requested by the getpeerinfo rpc command.
@ -249,50 +293,51 @@ func (s *server) handleQuery(querymsg interface{}, state *peerState) {
switch msg := querymsg.(type) { switch msg := querymsg.(type) {
case getConnCountMsg: case getConnCountMsg:
nconnected := 0 nconnected := 0
for e := state.peers.Front(); e != nil; e = e.Next() {
peer := e.Value.(*peer) forAllPeers(state, func(p *peer) {
if peer.Connected() { if p.Connected() {
nconnected++ nconnected++
} }
} })
msg.reply <- nconnected msg.reply <- nconnected
case getPeerInfoMsg: case getPeerInfoMsg:
infos := make([]*PeerInfo, 0, state.peers.Len()) infos := make([]*PeerInfo, 0, state.peers.Len())
for e := state.peers.Front(); e != nil; e = e.Next() { forAllPeers(state, func(p *peer) {
peer := e.Value.(*peer) if !p.Connected() {
if !peer.Connected() { return
continue
} }
// A lot of this will make the race detector go mad, // A lot of this will make the race detector go mad,
// however it is statistics for purely informational purposes // however it is statistics for purely informational purposes
// and we don't really care if they are raced to get the new // and we don't really care if they are raced to get the new
// version. // version.
info := &PeerInfo{ info := &PeerInfo{
Addr: peer.addr, Addr: p.addr,
Services: peer.services, Services: p.services,
LastSend: peer.lastSend.Unix(), LastSend: p.lastSend.Unix(),
LastRecv: peer.lastRecv.Unix(), LastRecv: p.lastRecv.Unix(),
BytesSent: 0, // TODO(oga) we need this from wire. BytesSent: 0, // TODO(oga) we need this from wire.
BytesRecv: 0, // TODO(oga) we need this from wire. BytesRecv: 0, // TODO(oga) we need this from wire.
ConnTime: peer.timeConnected.Unix(), ConnTime: p.timeConnected.Unix(),
Version: peer.protocolVersion, Version: p.protocolVersion,
SubVer: peer.userAgent, SubVer: p.userAgent,
Inbound: peer.inbound, Inbound: p.inbound,
StartingHeight: peer.lastBlock, StartingHeight: p.lastBlock,
BanScore: 0, BanScore: 0,
SyncNode: false, // TODO(oga) for now. bm knows this. SyncNode: false, // TODO(oga) for now. bm knows this.
} }
infos = append(infos, info) infos = append(infos, info)
} })
msg.reply <- infos msg.reply <- infos
case addNodeMsg: case addNodeMsg:
// TODO(oga) really these checks only apply to permanent peers. // XXX(oga) duplicate oneshots?
for e := state.peers.Front(); e != nil; e = e.Next() { if msg.permanent {
peer := e.Value.(*peer) for e := state.persistentPeers.Front(); e != nil; e = e.Next() {
if peer.addr == msg.addr { peer := e.Value.(*peer)
msg.reply <- errors.New("peer already connected") if peer.addr == msg.addr {
return msg.reply <- errors.New("peer already connected")
return
}
} }
} }
// TODO(oga) if too many, nuke a non-perm peer. // TODO(oga) if too many, nuke a non-perm peer.
@ -305,11 +350,12 @@ func (s *server) handleQuery(querymsg interface{}, state *peerState) {
case delNodeMsg: case delNodeMsg:
found := false found := false
// TODO(oga) really these checks only apply to permanent peers. for e := state.persistentPeers.Front(); e != nil; e = e.Next() {
for e := state.peers.Front(); e != nil; e = e.Next() {
peer := e.Value.(*peer) peer := e.Value.(*peer)
if peer.addr == msg.addr { if peer.addr == msg.addr {
peer.persistent = false // XXX hack! // This is ok because we are not continuing
// to iterate so won't corrupt the loop.
state.persistentPeers.Remove(e)
peer.Disconnect() peer.Disconnect()
found = true found = true
break break
@ -399,9 +445,10 @@ func (s *server) peerHandler() {
log.Tracef("SRVR: Starting peer handler") log.Tracef("SRVR: Starting peer handler")
state := &peerState{ state := &peerState{
peers: list.New(), peers: list.New(),
persistentPeers: list.New(), persistentPeers: list.New(),
banned: make(map[string]time.Time), outboundPeers: list.New(),
banned: make(map[string]time.Time),
maxOutboundPeers: defaultMaxOutbound, maxOutboundPeers: defaultMaxOutbound,
} }
if cfg.MaxPeers < state.maxOutboundPeers { if cfg.MaxPeers < state.maxOutboundPeers {
@ -417,9 +464,7 @@ func (s *server) peerHandler() {
permanentPeers = cfg.AddPeers permanentPeers = cfg.AddPeers
} }
for _, addr := range permanentPeers { for _, addr := range permanentPeers {
if s.handleAddPeerMsg(state, newOutboundPeer(s, addr, true)) { s.handleAddPeerMsg(state, newOutboundPeer(s, addr, true))
state.outboundPeers++
}
} }
// if nothing else happens, wake us up soon. // if nothing else happens, wake us up soon.
@ -430,16 +475,11 @@ out:
select { select {
// New peers connected to the server. // New peers connected to the server.
case p := <-s.newPeers: case p := <-s.newPeers:
if s.handleAddPeerMsg(state, p) && !p.inbound { s.handleAddPeerMsg(state, p)
state.outboundPeers++
}
// Disconnected peers. // Disconnected peers.
case p := <-s.donePeers: case p := <-s.donePeers:
// handleDonePeerMsg return true if it removed a peer s.handleDonePeerMsg(state, p)
if s.handleDonePeerMsg(state, p) {
state.outboundPeers--
}
// Peer to ban. // Peer to ban.
case p := <-s.banPeers: case p := <-s.banPeers:
@ -464,36 +504,30 @@ out:
// Shutdown the peer handler. // Shutdown the peer handler.
case <-s.quit: case <-s.quit:
// Shutdown peers. // Shutdown peers.
for e := state.peers.Front(); e != nil; e = e.Next() { forAllPeers(state, func(p *peer) {
p := e.Value.(*peer)
p.Shutdown() p.Shutdown()
} })
break out break out
} }
// Only try connect to more peers if we actually need more // Only try connect to more peers if we actually need more
if state.outboundPeers >= state.maxOutboundPeers || if !state.NeedMoreOutbound() || len(cfg.ConnectPeers) > 0 ||
len(cfg.ConnectPeers) > 0 ||
atomic.LoadInt32(&s.shutdown) != 0 { atomic.LoadInt32(&s.shutdown) != 0 {
continue continue
} }
groups := make(map[string]int) groups := make(map[string]int)
for e := state.peers.Front(); e != nil; e = e.Next() { forAllOutboundPeers(state, func(p *peer) {
peer := e.Value.(*peer) groups[GroupKey(p.na)]++
if !peer.inbound { })
groups[GroupKey(peer.na)]++
}
}
tries := 0 tries := 0
for state.outboundPeers < state.maxOutboundPeers && for state.NeedMoreOutbound() &&
state.peers.Len() < cfg.MaxPeers &&
atomic.LoadInt32(&s.shutdown) == 0 { atomic.LoadInt32(&s.shutdown) == 0 {
// We bias like bitcoind does, 10 for no outgoing // We bias like bitcoind does, 10 for no outgoing
// up to 90 (8) for the selection of new vs tried // up to 90 (8) for the selection of new vs tried
//addresses. //addresses.
nPeers := state.outboundPeers nPeers := state.OutboundCount()
if nPeers > 8 { if nPeers > 8 {
nPeers = 8 nPeers = 8
} }
@ -542,14 +576,12 @@ out:
// already checked that we have room for more peers. // already checked that we have room for more peers.
if s.handleAddPeerMsg(state, if s.handleAddPeerMsg(state,
newOutboundPeer(s, addrStr, false)) { newOutboundPeer(s, addrStr, false)) {
state.outboundPeers++
groups[key]++ groups[key]++
} }
} }
// We we need more peers, wake up in ten seconds and try again. // We we need more peers, wake up in ten seconds and try again.
if state.outboundPeers < state.maxOutboundPeers && if state.NeedMoreOutbound() {
state.peers.Len() < cfg.MaxPeers {
time.AfterFunc(10*time.Second, func() { time.AfterFunc(10*time.Second, func() {
s.wakeup <- true s.wakeup <- true
}) })