Continue work on addrmgr and multi-peer.

- Remove leftover debug log prints
- Increment waitgroup outside of goroutine
- Various comment and log message consistency
- Combine peer setup and newPeer -> newInboundPeer
- Save and load peers.json to/from cfg.DataDir
- Only claim addrmgr needs more addresses when it has less than 1000
- Add warning if unkown peer on orphan block.
This commit is contained in:
Dave Collins 2013-09-12 14:19:10 -05:00
parent 6c05e9d475
commit 92a8605b24
4 changed files with 93 additions and 82 deletions

View file

@ -12,6 +12,7 @@ import (
"math/rand" "math/rand"
"net" "net"
"os" "os"
"path/filepath"
"strconv" "strconv"
"sync" "sync"
"time" "time"
@ -20,7 +21,12 @@ import (
const ( const (
// maxAddresses identifies the maximum number of addresses that the // maxAddresses identifies the maximum number of addresses that the
// address manager will track. // address manager will track.
maxAddresses = 2500 maxAddresses = 2500
// needAddressThreshold is the number of addresses under which the
// address manager will claim to need more addresses.
needAddressThreshold = 1000
newAddressBufferSize = 50 newAddressBufferSize = 50
// dumpAddressInterval is the interval used to dump the address // dumpAddressInterval is the interval used to dump the address
@ -94,12 +100,11 @@ func (a *AddrManager) updateAddress(netAddr, srcAddr *btcwire.NetAddress) {
} }
// bad returns true if the address in question has not been tried in the last // bad returns true if the address in question has not been tried in the last
// minute and meets one of the following // minute and meets one of the following criteria:
// criteria: // 1) It claims to be from the future
// 1) It claims to be from the future. // 2) It hasn't been seen in over a month
// 2) It hasn't been seen in over a month. // 3) It has failed at least three times and never succeeded
// 3) It has failed at least three times and never succeeded. // 4) It has failed ten times in the last week
// 4) It has failed ten times in the last week.
// All addresses that meet these criteria are assumed to be worthless and not // All addresses that meet these criteria are assumed to be worthless and not
// worth keeping hold of. // worth keeping hold of.
func bad(ka *knownAddress) bool { func bad(ka *knownAddress) bool {
@ -132,7 +137,7 @@ func bad(ka *knownAddress) bool {
return false return false
} }
// chance returns the selection probability for a known address. The priority // chance returns the selection probability for a known address. The priority
// depends upon how recent the address has been seen, how recent it was last // depends upon how recent the address has been seen, how recent it was last
// attempted and how often attempts to connect to it have failed. // attempted and how often attempts to connect to it have failed.
func chance(ka *knownAddress) float64 { func chance(ka *knownAddress) float64 {
@ -146,7 +151,7 @@ func chance(ka *knownAddress) float64 {
if ka.na.Timestamp.IsZero() { if ka.na.Timestamp.IsZero() {
// use unix epoch to match bitcoind. // use unix epoch to match bitcoind.
dur = now.Sub(time.Unix(0, 0)) dur = now.Sub(time.Unix(0, 0))
} else { } else {
dur = now.Sub(ka.na.Timestamp) dur = now.Sub(ka.na.Timestamp)
} }
@ -165,12 +170,12 @@ func chance(ka *knownAddress) float64 {
c = 600.0 / (600.0 + lastSeen) c = 600.0 / (600.0 + lastSeen)
// very recent attempts are less likely to be retried. // Very recent attempts are less likely to be retried.
if lastTry > 60.0*10.0 { if lastTry > 60.0*10.0 {
c *= 0.01 c *= 0.01
} }
// failed attempts deprioritise // Failed attempts deprioritise.
if ka.attempts > 0 { if ka.attempts > 0 {
c /= (float64(ka.attempts) * 1.5) c /= (float64(ka.attempts) * 1.5)
} }
@ -213,7 +218,7 @@ func (a *AddrManager) expireNew() {
// pickTried selects an address from the tried bucket to be evicted. // pickTried selects an address from the tried bucket to be evicted.
// We just choose the eldest. // We just choose the eldest.
func (a *AddrManager) pickTried() *list.Element { func (a *AddrManager) pickTried() *list.Element {
var oldest *knownAddress var oldest *knownAddress
var oldestElem *list.Element var oldestElem *list.Element
for e := a.addrTried.Front(); e != nil; e = e.Next() { for e := a.addrTried.Front(); e != nil; e = e.Next() {
ka := e.Value.(*knownAddress) ka := e.Value.(*knownAddress)
@ -226,6 +231,8 @@ func (a *AddrManager) pickTried() *list.Element {
return oldestElem return oldestElem
} }
// knownAddress tracks information about a known network address that is used
// to determine how viable an address is.
type knownAddress struct { type knownAddress struct {
na *btcwire.NetAddress na *btcwire.NetAddress
attempts int attempts int
@ -280,38 +287,36 @@ out:
func (a *AddrManager) savePeers() { func (a *AddrManager) savePeers() {
// May give some way to specify this later. // May give some way to specify this later.
filename := "peers.json" filename := "peers.json"
filePath := filepath.Join(cfg.DataDir, filename)
var toSave JsonSave var toSave JsonSave
list := a.AddressCacheFlat() list := a.AddressCacheFlat()
log.Info("LIST ", list)
toSave.AddrList = list toSave.AddrList = list
w, err := os.Create(filename) w, err := os.Create(filePath)
if err != nil { if err != nil {
log.Error("Error opening file: ", filename, err) log.Error("Error opening file: ", filePath, err)
} }
enc := json.NewEncoder(w) enc := json.NewEncoder(w)
defer w.Close() defer w.Close()
enc.Encode(&toSave) enc.Encode(&toSave)
log.Info("Saving peer list.")
} }
// loadPeers loads the known address from the saved file. If empty, missing, or // loadPeers loads the known address from the saved file. If empty, missing, or
// malformed file, just don't load anything and start fresh // malformed file, just don't load anything and start fresh
func (a *AddrManager) loadPeers() { func (a *AddrManager) loadPeers() {
log.Info("Loading saved peers")
// May give some way to specify this later. // May give some way to specify this later.
filename := "peers.json" filename := "peers.json"
filePath := filepath.Join(cfg.DataDir, filename)
_, err := os.Stat(filename) _, err := os.Stat(filePath)
if os.IsNotExist(err) { if os.IsNotExist(err) {
log.Debugf("%s does not exist.\n", filename) log.Debugf("%s does not exist.\n", filePath)
} else { } else {
r, err := os.Open(filename) r, err := os.Open(filePath)
if err != nil { if err != nil {
log.Error("Error opening file: ", filename, err) log.Error("Error opening file: ", filePath, err)
return return
} }
defer r.Close() defer r.Close()
@ -320,7 +325,7 @@ func (a *AddrManager) loadPeers() {
dec := json.NewDecoder(r) dec := json.NewDecoder(r)
err = dec.Decode(&inList) err = dec.Decode(&inList)
if err != nil { if err != nil {
log.Error("Error reading:", filename, err) log.Error("Error reading:", filePath, err)
return return
} }
log.Debug("Adding ", len(inList.AddrList), " saved peers.") log.Debug("Adding ", len(inList.AddrList), " saved peers.")
@ -420,7 +425,7 @@ func (a *AddrManager) AddAddressByIP(addrIP string) {
func (a *AddrManager) NeedMoreAddresses() bool { func (a *AddrManager) NeedMoreAddresses() bool {
// NumAddresses handles concurrent access for us. // NumAddresses handles concurrent access for us.
return a.NumAddresses()+1 <= maxAddresses return a.NumAddresses() < needAddressThreshold
} }
// NumAddresses returns the number of addresses known to the address manager. // NumAddresses returns the number of addresses known to the address manager.
@ -474,7 +479,7 @@ func (a *AddrManager) AddressCacheFlat() []string {
func NewAddrManager() *AddrManager { func NewAddrManager() *AddrManager {
am := AddrManager{ am := AddrManager{
rand: rand.New(rand.NewSource(time.Now().UnixNano())), rand: rand.New(rand.NewSource(time.Now().UnixNano())),
addrIndex: make(map[string]*knownAddress), addrIndex: make(map[string]*knownAddress),
addrNew: make(map[string]*knownAddress), addrNew: make(map[string]*knownAddress),
addrTried: list.New(), addrTried: list.New(),
quit: make(chan bool), quit: make(chan bool),
@ -519,8 +524,7 @@ func (a *AddrManager) GetAddress(class string, newBias int) *knownAddress {
for { for {
// Pick a random entry in the list // Pick a random entry in the list
e := a.addrTried.Front() e := a.addrTried.Front()
for i := a.rand.Int63n(int64(a.addrTried.Len())); for i := a.rand.Int63n(int64(a.addrTried.Len())); i > 0; i-- {
i > 0; i-- {
e = e.Next() e = e.Next()
} }
ka := e.Value.(*knownAddress) ka := e.Value.(*knownAddress)
@ -601,8 +605,8 @@ func (a *AddrManager) Connected(addr *btcwire.NetAddress) {
} }
} }
// Good marks the given address as good. To be called after a successful // Good marks the given address as good. To be called after a successful
// connection and version exchange. If the address is unkownto the addresss // connection and version exchange. If the address is unknown to the addresss
// manager it will be ignored. // manager it will be ignored.
func (a *AddrManager) Good(addr *btcwire.NetAddress) { func (a *AddrManager) Good(addr *btcwire.NetAddress) {
a.mtx.Lock() a.mtx.Lock()

View file

@ -297,6 +297,9 @@ func (b *blockManager) handleNotifyMsg(notification *btcchain.Notification) {
peer.pushGetBlocksMsg(locator, orphanRoot) peer.pushGetBlocksMsg(locator, orphanRoot)
delete(b.blockPeer, *orphanRoot) delete(b.blockPeer, *orphanRoot)
break break
} else {
log.Warnf("Notification for orphan %v with no peer",
orphanHash)
} }
// A block has been accepted into the block chain. // A block has been accepted into the block chain.

108
peer.go
View file

@ -232,14 +232,14 @@ func (p *peer) handleVersionMsg(msg *btcwire.MsgVersion) {
} }
} }
var err error // Set up a NetAddress for the peer to be used with AddrManager.
// Set up a netaddress for the peer to be used with addrmanager.. na, err := newNetAddress(p.conn.RemoteAddr(), p.services)
p.na, err = newNetAddress(p.conn.RemoteAddr(), p.services)
if err != nil { if err != nil {
log.Errorf("[PEER] %v", err) log.Errorf("[PEER] %v", err)
p.Disconnect() p.Disconnect()
return return
} }
p.na = na
// Send verack. // Send verack.
p.outputQueue <- btcwire.NewMsgVerAck() p.outputQueue <- btcwire.NewMsgVerAck()
@ -265,15 +265,18 @@ func (p *peer) handleVersionMsg(msg *btcwire.MsgVersion) {
// Request known addresses if the server address manager needs // Request known addresses if the server address manager needs
// more and the peer has a protocol version new enough to // more and the peer has a protocol version new enough to
// include a timestamp with addresses. // include a timestamp with addresses.
// XXX bitcoind only does this if we have < 1000 addresses, not
// the max of 2400
hasTimestamp := p.protocolVersion >= btcwire.NetAddressTimeVersion hasTimestamp := p.protocolVersion >= btcwire.NetAddressTimeVersion
if p.server.addrManager.NeedMoreAddresses() && hasTimestamp { if p.server.addrManager.NeedMoreAddresses() && hasTimestamp {
p.outputQueue <- btcwire.NewMsgGetAddr() p.outputQueue <- btcwire.NewMsgGetAddr()
} }
// Add inbound peer address to the server address manager.
// Mark the address as a known good address.
p.server.addrManager.Good(p.na) p.server.addrManager.Good(p.na)
} else { } else {
// A peer might not be advertising the same address that it
// actually connected from. One example of why this can happen
// is with NAT. Only add the address to the address manager if
// the addresses agree.
if NetAddressKey(&msg.AddrMe) == NetAddressKey(p.na) { if NetAddressKey(&msg.AddrMe) == NetAddressKey(p.na) {
p.server.addrManager.AddAddress(p.na, p.na) p.server.addrManager.AddAddress(p.na, p.na)
p.server.addrManager.Good(p.na) p.server.addrManager.Good(p.na)
@ -893,8 +896,8 @@ out:
break out break out
} }
markConnected := false
// Handle each supported message type. // Handle each supported message type.
markConnected := false
switch msg := rmsg.(type) { switch msg := rmsg.(type) {
case *btcwire.MsgVersion: case *btcwire.MsgVersion:
p.handleVersionMsg(msg) p.handleVersionMsg(msg)
@ -942,6 +945,9 @@ out:
log.Debugf("[PEER] Received unhandled message of type %v: Fix Me", log.Debugf("[PEER] Received unhandled message of type %v: Fix Me",
rmsg.Command()) rmsg.Command())
} }
// Mark the address as currently connected and working as of
// now if one of the messages that trigger
if markConnected && !p.disconnect { if markConnected && !p.disconnect {
if p.na == nil { if p.na == nil {
log.Warnf("we're getting stuff before we " + log.Warnf("we're getting stuff before we " +
@ -1085,39 +1091,17 @@ func (p *peer) Shutdown() {
p.wg.Wait() p.wg.Wait()
} }
// newPeer returns a new bitcoin peer for the provided server and connection. // newPeerBase returns a new base bitcoin peer for the provided server and
// Use start to begin processing incoming and outgoing messages. // inbound flag. This is used by the newInboundPeer and newOutboundPeer
func newPeer(s *server, conn net.Conn) *peer { // functions to perform base setup needed by both types of peers.
func newPeerBase(s *server, inbound bool) *peer {
p := peer{ p := peer{
server: s, server: s,
protocolVersion: btcwire.ProtocolVersion, protocolVersion: btcwire.ProtocolVersion,
btcnet: s.btcnet, btcnet: s.btcnet,
services: btcwire.SFNodeNetwork, services: btcwire.SFNodeNetwork,
conn: conn,
addr: conn.RemoteAddr().String(),
timeConnected: time.Now(), timeConnected: time.Now(),
inbound: true, inbound: inbound,
persistent: false,
knownAddresses: make(map[string]bool),
outputQueue: make(chan btcwire.Message, outputBufferSize),
quit: make(chan bool),
}
return &p
}
// newOutbountPeer returns a new bitcoin peer for the provided server and
// address and connects to it asynchronously. If the connetion is successful
// then the peer will also be started.
func newOutboundPeer(s *server, addr string, persistent bool) *peer {
p := peer{
server: s,
protocolVersion: btcwire.ProtocolVersion,
btcnet: s.btcnet,
services: btcwire.SFNodeNetwork,
addr: addr,
timeConnected: time.Now(),
inbound: false,
persistent: persistent,
knownAddresses: make(map[string]bool), knownAddresses: make(map[string]bool),
knownInventory: NewMruInventoryMap(maxKnownInventory), knownInventory: NewMruInventoryMap(maxKnownInventory),
requestQueue: list.New(), requestQueue: list.New(),
@ -1127,29 +1111,50 @@ func newOutboundPeer(s *server, addr string, persistent bool) *peer {
blockProcessed: make(chan bool, 1), blockProcessed: make(chan bool, 1),
quit: make(chan bool), quit: make(chan bool),
} }
// set up p.na with a temporary address that we are connecting to with return &p
// faked up service flags. We will replace this with the real one after }
// version negotiation is successful. The only failure case here would
// newPeer returns a new inbound bitcoin peer for the provided server and
// connection. Use Start to begin processing incoming and outgoing messages.
func newInboundPeer(s *server, conn net.Conn) *peer {
p := newPeerBase(s, true)
p.conn = conn
p.addr = conn.RemoteAddr().String()
return p
}
// newOutbountPeer returns a new outbound bitcoin peer for the provided server and
// address and connects to it asynchronously. If the connection is successful
// then the peer will also be started.
func newOutboundPeer(s *server, addr string, persistent bool) *peer {
p := newPeerBase(s, false)
p.addr = addr
p.persistent = persistent
// Setup p.na with a temporary address that we are connecting to with
// faked up service flags. We will replace this with the real one after
// version negotiation is successful. The only failure case here would
// be if the string was incomplete for connection so can't be split // be if the string was incomplete for connection so can't be split
// into address and port, and thus this would be invalid anyway. In // into address and port, and thus this would be invalid anyway. In
// which case we return nil to be handled by the caller. // which case we return nil to be handled by the caller. This must be
// This must be done before we fork off the goroutine because as soon // done before we fork off the goroutine because as soon as this
// as this function returns the peer must have a valid netaddress. // function returns the peer must have a valid netaddress.
ip, portStr, err := net.SplitHostPort(addr) ip, portStr, err := net.SplitHostPort(addr)
if err != nil { if err != nil {
log.Errorf("tried to create a new outbound peer with invalid "+ log.Errorf("Tried to create a new outbound peer with invalid "+
"address %s: %v", addr, err) "address %s: %v", addr, err)
return nil return nil
} }
port, err := strconv.ParseUint(portStr, 10, 16) port, err := strconv.ParseUint(portStr, 10, 16)
if err != nil { if err != nil {
log.Errorf("tried to create a new outbound peer with invalid "+ log.Errorf("Tried to create a new outbound peer with invalid "+
"port %s: %v", portStr, err) "port %s: %v", portStr, err)
return nil return nil
} }
p.na = btcwire.NewNetAddressIPPort(net.ParseIP(ip), uint16(port), 0) p.na = btcwire.NewNetAddressIPPort(net.ParseIP(ip), uint16(port), 0)
p.wg.Add(1)
go func() { go func() {
// Select which dial method to call depending on whether or // Select which dial method to call depending on whether or
// not a proxy is configured. Also, add proxy information to // not a proxy is configured. Also, add proxy information to
@ -1161,7 +1166,6 @@ func newOutboundPeer(s *server, addr string, persistent bool) *peer {
dial = proxy.Dial dial = proxy.Dial
faddr = fmt.Sprintf("%s via proxy %s", addr, cfg.Proxy) faddr = fmt.Sprintf("%s via proxy %s", addr, cfg.Proxy)
} }
p.wg.Add(1)
// Attempt to connect to the peer. If the connection fails and // Attempt to connect to the peer. If the connection fails and
// this is a persistent connection, retry after the retry // this is a persistent connection, retry after the retry
@ -1170,10 +1174,10 @@ func newOutboundPeer(s *server, addr string, persistent bool) *peer {
log.Debugf("[SRVR] Attempting to connect to %s", faddr) log.Debugf("[SRVR] Attempting to connect to %s", faddr)
conn, err := dial("tcp", addr) conn, err := dial("tcp", addr)
if err != nil { if err != nil {
log.Errorf("[SRVR] failed to connect to %s: %v", log.Errorf("[SRVR] Failed to connect to %s: %v",
faddr, err) faddr, err)
if !persistent { if !persistent {
p.server.donePeers <- &p p.server.donePeers <- p
p.wg.Done() p.wg.Done()
return return
} }
@ -1183,11 +1187,10 @@ func newOutboundPeer(s *server, addr string, persistent bool) *peer {
continue continue
} }
// while we were sleeping trying to get connect then // While we were sleeping trying to connect, the server
// the server may have scheduled a shutdown. In that // may have scheduled a shutdown. In that case ditch
// case we ditch the connection immediately. // the peer immediately.
if !s.shutdown { if !s.shutdown {
p.server.addrManager.Attempt(p.na) p.server.addrManager.Attempt(p.na)
// Connection was successful so log it and start peer. // Connection was successful so log it and start peer.
@ -1195,8 +1198,9 @@ func newOutboundPeer(s *server, addr string, persistent bool) *peer {
p.conn = conn p.conn = conn
p.Start() p.Start()
} else { } else {
p.server.donePeers <- &p p.server.donePeers <- p
} }
// We are done here, Start() will have grabbed // We are done here, Start() will have grabbed
// additional waitgroup entries if we are not shutting // additional waitgroup entries if we are not shutting
// down. // down.
@ -1204,5 +1208,5 @@ func newOutboundPeer(s *server, addr string, persistent bool) *peer {
return return
} }
}() }()
return &p return p
} }

View file

@ -213,7 +213,7 @@ func (s *server) listenHandler(listener net.Listener) {
} }
continue continue
} }
s.AddPeer(newPeer(s, conn)) s.AddPeer(newInboundPeer(s, conn))
} }
s.wg.Done() s.wg.Done()
log.Tracef("[SRVR] Listener handler done for %s", listener.Addr()) log.Tracef("[SRVR] Listener handler done for %s", listener.Addr())