Move all local data to peerhandler into a peerState structure

Pass peerstate around instead of indivdual bits.
This commit is contained in:
Owain G. Ainsworth 2013-10-30 17:22:35 +00:00
parent d8c5222474
commit 4d80750afe

105
server.go
View file

@ -69,9 +69,16 @@ type server struct {
db btcdb.Db db btcdb.Db
} }
type peerState struct {
peers *list.List
banned map[string]time.Time
outboundPeers int
maxOutboundPeers int
}
// handleAddPeerMsg deals with adding new peers. It is invoked from the // handleAddPeerMsg deals with adding new peers. It is invoked from the
// peerHandler goroutine. // peerHandler goroutine.
func (s *server) handleAddPeerMsg(peers *list.List, banned map[string]time.Time, p *peer) bool { func (s *server) handleAddPeerMsg(state *peerState, p *peer) bool {
if p == nil { if p == nil {
return false return false
} }
@ -91,7 +98,7 @@ func (s *server) handleAddPeerMsg(peers *list.List, banned map[string]time.Time,
p.Shutdown() p.Shutdown()
return false return false
} }
if banEnd, ok := banned[host]; ok { if banEnd, ok := state.banned[host]; ok {
if time.Now().Before(banEnd) { if time.Now().Before(banEnd) {
log.Debugf("SRVR: Peer %s is banned for another %v - "+ log.Debugf("SRVR: Peer %s is banned for another %v - "+
"disconnecting", host, banEnd.Sub(time.Now())) "disconnecting", host, banEnd.Sub(time.Now()))
@ -100,13 +107,13 @@ func (s *server) handleAddPeerMsg(peers *list.List, banned map[string]time.Time,
} }
log.Infof("SRVR: Peer %s is no longer banned", host) log.Infof("SRVR: Peer %s is no longer banned", host)
delete(banned, host) delete(state.banned, host)
} }
// TODO: Check for max peers from a single IP. // TODO: Check for max peers from a single IP.
// Limit max number of total peers. // Limit max number of total peers.
if peers.Len() >= cfg.MaxPeers { if state.peers.Len() >= cfg.MaxPeers {
log.Infof("SRVR: Max peers reached [%d] - disconnecting "+ log.Infof("SRVR: Max peers reached [%d] - disconnecting "+
"peer %s", cfg.MaxPeers, p) "peer %s", cfg.MaxPeers, p)
p.Shutdown() p.Shutdown()
@ -117,7 +124,7 @@ func (s *server) handleAddPeerMsg(peers *list.List, banned map[string]time.Time,
// Add the new peer and start it. // Add the new peer and start it.
log.Debugf("SRVR: New peer %s", p) log.Debugf("SRVR: New peer %s", p)
peers.PushBack(p) state.peers.PushBack(p)
if p.inbound { if p.inbound {
p.Start() p.Start()
} }
@ -127,8 +134,8 @@ func (s *server) handleAddPeerMsg(peers *list.List, banned map[string]time.Time,
// handleDonePeerMsg deals with peers that have signalled they are done. It is // handleDonePeerMsg deals with peers that have signalled they are done. It is
// invoked from the peerHandler goroutine. // invoked from the peerHandler goroutine.
func (s *server) handleDonePeerMsg(peers *list.List, p *peer) bool { func (s *server) handleDonePeerMsg(state *peerState, p *peer) bool {
for e := peers.Front(); e != nil; e = e.Next() { for e := state.peers.Front(); e != nil; e = e.Next() {
if e.Value == p { if e.Value == p {
// Issue an asynchronous reconnect if the peer was a // Issue an asynchronous reconnect if the peer was a
// persistent outbound connection. // persistent outbound connection.
@ -137,7 +144,7 @@ func (s *server) handleDonePeerMsg(peers *list.List, p *peer) bool {
e.Value = newOutboundPeer(s, p.addr, true) e.Value = newOutboundPeer(s, p.addr, true)
return false return false
} }
peers.Remove(e) state.peers.Remove(e)
log.Debugf("SRVR: Removed peer %s", p) log.Debugf("SRVR: Removed peer %s", p)
return true return true
} }
@ -148,7 +155,7 @@ func (s *server) handleDonePeerMsg(peers *list.List, p *peer) bool {
// handleBanPeerMsg deals with banning peers. It is invoked from the // handleBanPeerMsg deals with banning peers. It is invoked from the
// peerHandler goroutine. // peerHandler goroutine.
func (s *server) handleBanPeerMsg(banned map[string]time.Time, p *peer) { func (s *server) handleBanPeerMsg(state *peerState, p *peer) {
host, _, err := net.SplitHostPort(p.addr) host, _, err := net.SplitHostPort(p.addr)
if err != nil { if err != nil {
log.Debugf("SRVR: can't split ban peer %s %v", p.addr, err) log.Debugf("SRVR: can't split ban peer %s %v", p.addr, err)
@ -157,16 +164,16 @@ func (s *server) handleBanPeerMsg(banned map[string]time.Time, p *peer) {
direction := directionString(p.inbound) direction := directionString(p.inbound)
log.Infof("SRVR: Banned peer %s (%s) for %v", host, direction, log.Infof("SRVR: Banned peer %s (%s) for %v", host, direction,
cfg.BanDuration) cfg.BanDuration)
banned[host] = time.Now().Add(cfg.BanDuration) state.banned[host] = time.Now().Add(cfg.BanDuration)
} }
// handleRelayInvMsg deals with relaying inventory to peers that are not already // handleRelayInvMsg deals with relaying inventory to peers that are not already
// known to have it. It is invoked from the peerHandler goroutine. // known to have it. It is invoked from the peerHandler goroutine.
func (s *server) handleRelayInvMsg(peers *list.List, iv *btcwire.InvVect) { func (s *server) handleRelayInvMsg(state *peerState, iv *btcwire.InvVect) {
// Loop through all connected peers and relay the inventory to those // Loop through all connected peers and relay the inventory to those
// which are not already known to have it. // which are not already known to have it.
for e := peers.Front(); e != nil; e = e.Next() { for e := state.peers.Front(); e != nil; e = e.Next() {
p := e.Value.(*peer) p := e.Value.(*peer)
if !p.Connected() { if !p.Connected() {
continue continue
@ -181,8 +188,8 @@ func (s *server) handleRelayInvMsg(peers *list.List, iv *btcwire.InvVect) {
// handleBroadcastMsg deals with broadcasting messages to peers. It is invoked // handleBroadcastMsg deals with broadcasting messages to peers. It is invoked
// from the peerHandler goroutine. // from the peerHandler goroutine.
func (s *server) handleBroadcastMsg(peers *list.List, bmsg *broadcastMsg) { func (s *server) handleBroadcastMsg(state *peerState, bmsg *broadcastMsg) {
for e := peers.Front(); e != nil; e = e.Next() { for e := state.peers.Front(); e != nil; e = e.Next() {
excluded := false excluded := false
for _, p := range bmsg.excludePeers { for _, p := range bmsg.excludePeers {
if e.Value == p { if e.Value == p {
@ -238,11 +245,11 @@ type delNodeMsg struct {
// handleQuery is the central handler for all queries and commands from other // handleQuery is the central handler for all queries and commands from other
// goroutines related to peer state. // goroutines related to peer state.
func (s *server) handleQuery(querymsg interface{}, peers *list.List, bannedPeers map[string]time.Time) { func (s *server) handleQuery(querymsg interface{}, state *peerState) {
switch msg := querymsg.(type) { switch msg := querymsg.(type) {
case getConnCountMsg: case getConnCountMsg:
nconnected := 0 nconnected := 0
for e := peers.Front(); e != nil; e = e.Next() { for e := state.peers.Front(); e != nil; e = e.Next() {
peer := e.Value.(*peer) peer := e.Value.(*peer)
if peer.Connected() { if peer.Connected() {
nconnected++ nconnected++
@ -251,8 +258,8 @@ func (s *server) handleQuery(querymsg interface{}, peers *list.List, bannedPeers
msg.reply <- nconnected msg.reply <- nconnected
case getPeerInfoMsg: case getPeerInfoMsg:
infos := make([]*PeerInfo, 0, peers.Len()) infos := make([]*PeerInfo, 0, state.peers.Len())
for e := peers.Front(); e != nil; e = e.Next() { for e := state.peers.Front(); e != nil; e = e.Next() {
peer := e.Value.(*peer) peer := e.Value.(*peer)
if !peer.Connected() { if !peer.Connected() {
continue continue
@ -281,7 +288,7 @@ func (s *server) handleQuery(querymsg interface{}, peers *list.List, bannedPeers
msg.reply <- infos msg.reply <- infos
case addNodeMsg: case addNodeMsg:
// TODO(oga) really these checks only apply to permanent peers. // TODO(oga) really these checks only apply to permanent peers.
for e := peers.Front(); e != nil; e = e.Next() { for e := state.peers.Front(); e != nil; e = e.Next() {
peer := e.Value.(*peer) peer := e.Value.(*peer)
if peer.addr == msg.addr { if peer.addr == msg.addr {
msg.reply <- errors.New("peer already connected") msg.reply <- errors.New("peer already connected")
@ -289,7 +296,7 @@ func (s *server) handleQuery(querymsg interface{}, peers *list.List, bannedPeers
} }
} }
// TODO(oga) if too many, nuke a non-perm peer. // TODO(oga) if too many, nuke a non-perm peer.
if s.handleAddPeerMsg(peers, bannedPeers, if s.handleAddPeerMsg(state,
newOutboundPeer(s, msg.addr, msg.permanent)) { newOutboundPeer(s, msg.addr, msg.permanent)) {
msg.reply <- nil msg.reply <- nil
} else { } else {
@ -299,7 +306,7 @@ func (s *server) handleQuery(querymsg interface{}, peers *list.List, bannedPeers
case delNodeMsg: case delNodeMsg:
found := false found := false
// TODO(oga) really these checks only apply to permanent peers. // TODO(oga) really these checks only apply to permanent peers.
for e := peers.Front(); e != nil; e = e.Next() { for e := state.peers.Front(); e != nil; e = e.Next() {
peer := e.Value.(*peer) peer := e.Value.(*peer)
if peer.addr == msg.addr { if peer.addr == msg.addr {
peer.persistent = false // XXX hack! peer.persistent = false // XXX hack!
@ -391,12 +398,14 @@ func (s *server) peerHandler() {
s.blockManager.Start() s.blockManager.Start()
log.Tracef("SRVR: Starting peer handler") log.Tracef("SRVR: Starting peer handler")
peers := list.New() state := &peerState{
bannedPeers := make(map[string]time.Time) peers: list.New(),
outboundPeers := 0 persistentPeers: list.New(),
maxOutbound := defaultMaxOutbound banned: make(map[string]time.Time),
if cfg.MaxPeers < maxOutbound { maxOutboundPeers: defaultMaxOutbound,
maxOutbound = cfg.MaxPeers }
if cfg.MaxPeers < state.maxOutboundPeers {
state.maxOutboundPeers = cfg.MaxPeers
} }
// Add peers discovered through DNS to the address manager. // Add peers discovered through DNS to the address manager.
@ -408,9 +417,8 @@ func (s *server) peerHandler() {
permanentPeers = cfg.AddPeers permanentPeers = cfg.AddPeers
} }
for _, addr := range permanentPeers { for _, addr := range permanentPeers {
if s.handleAddPeerMsg(peers, bannedPeers, if s.handleAddPeerMsg(state, newOutboundPeer(s, addr, true)) {
newOutboundPeer(s, addr, true)) { state.outboundPeers++
outboundPeers++
} }
} }
@ -422,42 +430,41 @@ out:
select { select {
// New peers connected to the server. // New peers connected to the server.
case p := <-s.newPeers: case p := <-s.newPeers:
if s.handleAddPeerMsg(peers, bannedPeers, p) && if s.handleAddPeerMsg(state, p) && !p.inbound {
!p.inbound { state.outboundPeers++
outboundPeers++
} }
// Disconnected peers. // Disconnected peers.
case p := <-s.donePeers: case p := <-s.donePeers:
// handleDonePeerMsg return true if it removed a peer // handleDonePeerMsg return true if it removed a peer
if s.handleDonePeerMsg(peers, p) { if s.handleDonePeerMsg(state, p) {
outboundPeers-- state.outboundPeers--
} }
// Peer to ban. // Peer to ban.
case p := <-s.banPeers: case p := <-s.banPeers:
s.handleBanPeerMsg(bannedPeers, p) s.handleBanPeerMsg(state, p)
// New inventory to potentially be relayed to other peers. // New inventory to potentially be relayed to other peers.
case invMsg := <-s.relayInv: case invMsg := <-s.relayInv:
s.handleRelayInvMsg(peers, invMsg) s.handleRelayInvMsg(state, invMsg)
// Message to broadcast to all connected peers except those // Message to broadcast to all connected peers except those
// which are excluded by the message. // which are excluded by the message.
case bmsg := <-s.broadcast: case bmsg := <-s.broadcast:
s.handleBroadcastMsg(peers, &bmsg) s.handleBroadcastMsg(state, &bmsg)
// Used by timers below to wake us back up. // Used by timers below to wake us back up.
case <-s.wakeup: case <-s.wakeup:
// this page left intentionally blank // this page left intentionally blank
case qmsg := <-s.query: case qmsg := <-s.query:
s.handleQuery(qmsg, peers, bannedPeers) s.handleQuery(qmsg, state)
// Shutdown the peer handler. // Shutdown the peer handler.
case <-s.quit: case <-s.quit:
// Shutdown peers. // Shutdown peers.
for e := peers.Front(); e != nil; e = e.Next() { for e := state.peers.Front(); e != nil; e = e.Next() {
p := e.Value.(*peer) p := e.Value.(*peer)
p.Shutdown() p.Shutdown()
} }
@ -465,12 +472,13 @@ out:
} }
// Only try connect to more peers if we actually need more // Only try connect to more peers if we actually need more
if outboundPeers >= maxOutbound || len(cfg.ConnectPeers) > 0 || if state.outboundPeers >= state.maxOutboundPeers ||
len(cfg.ConnectPeers) > 0 ||
atomic.LoadInt32(&s.shutdown) != 0 { atomic.LoadInt32(&s.shutdown) != 0 {
continue continue
} }
groups := make(map[string]int) groups := make(map[string]int)
for e := peers.Front(); e != nil; e = e.Next() { for e := state.peers.Front(); e != nil; e = e.Next() {
peer := e.Value.(*peer) peer := e.Value.(*peer)
if !peer.inbound { if !peer.inbound {
groups[GroupKey(peer.na)]++ groups[GroupKey(peer.na)]++
@ -478,14 +486,14 @@ out:
} }
tries := 0 tries := 0
for outboundPeers < maxOutbound && for state.outboundPeers < state.maxOutboundPeers &&
peers.Len() < cfg.MaxPeers && state.peers.Len() < cfg.MaxPeers &&
atomic.LoadInt32(&s.shutdown) == 0 { atomic.LoadInt32(&s.shutdown) == 0 {
// We bias like bitcoind does, 10 for no outgoing // We bias like bitcoind does, 10 for no outgoing
// up to 90 (8) for the selection of new vs tried // up to 90 (8) for the selection of new vs tried
//addresses. //addresses.
nPeers := outboundPeers nPeers := state.outboundPeers
if nPeers > 8 { if nPeers > 8 {
nPeers = 8 nPeers = 8
} }
@ -532,15 +540,16 @@ out:
tries = 0 tries = 0
// any failure will be due to banned peers etc. we have // any failure will be due to banned peers etc. we have
// already checked that we have room for more peers. // already checked that we have room for more peers.
if s.handleAddPeerMsg(peers, bannedPeers, if s.handleAddPeerMsg(state,
newOutboundPeer(s, addrStr, false)) { newOutboundPeer(s, addrStr, false)) {
outboundPeers++ state.outboundPeers++
groups[key]++ groups[key]++
} }
} }
// We we need more peers, wake up in ten seconds and try again. // We we need more peers, wake up in ten seconds and try again.
if outboundPeers < maxOutbound && peers.Len() < cfg.MaxPeers { if state.outboundPeers < state.maxOutboundPeers &&
state.peers.Len() < cfg.MaxPeers {
time.AfterFunc(10*time.Second, func() { time.AfterFunc(10*time.Second, func() {
s.wakeup <- true s.wakeup <- true
}) })