Move address manager to its own package.

This commit does just enough to move the address manager into its own
package.  Since it was not originally written as a package, it will
require a bit of refactoring and cleanup to turn it into a robust
package with a friendly API.
This commit is contained in:
Dave Collins 2014-07-06 01:04:24 -05:00
parent ebc5db2710
commit 62f21d3600
5 changed files with 227 additions and 203 deletions

View file

@ -2,7 +2,7 @@
// Use of this source code is governed by an ISC // Use of this source code is governed by an ISC
// license that can be found in the LICENSE file. // license that can be found in the LICENSE file.
package main package addrmgr
import ( import (
"container/list" "container/list"
@ -94,84 +94,27 @@ const (
serialisationVersion = 1 serialisationVersion = 1
) )
// updateAddress is a helper function to either update an address already known // knownAddress tracks information about a known network address that is used
// to the address manager, or to add the address if not already known. // to determine how viable an address is.
func (a *AddrManager) updateAddress(netAddr, srcAddr *btcwire.NetAddress) { type knownAddress struct {
// Filter out non-routable addresses. Note that non-routable na *btcwire.NetAddress
// also includes invalid and local addresses. srcAddr *btcwire.NetAddress
if !Routable(netAddr) { attempts int
return lastattempt time.Time
lastsuccess time.Time
tried bool
refs int // reference count of new buckets
} }
// Protect concurrent access. // NetAddress returns the underlying btcwire.NetAddress associated with the
a.mtx.Lock() // known address.
defer a.mtx.Unlock() func (ka *knownAddress) NetAddress() *btcwire.NetAddress {
return ka.na
addr := NetAddressKey(netAddr)
ka := a.find(netAddr)
if ka != nil {
// TODO(oga) only update adresses periodically.
// Update the last seen time and services.
// note that to prevent causing excess garbage on getaddr
// messages the netaddresses in addrmaanger are *immutable*,
// if we need to change them then we replace the pointer with a
// new copy so that we don't have to copy every na for getaddr.
if netAddr.Timestamp.After(ka.na.Timestamp) ||
(ka.na.Services&netAddr.Services) !=
netAddr.Services {
naCopy := *ka.na
naCopy.Timestamp = netAddr.Timestamp
naCopy.AddService(netAddr.Services)
ka.na = &naCopy
} }
// If already in tried, we have nothing to do here. // LastAttempt returns the last time the known address was attempted.
if ka.tried { func (ka *knownAddress) LastAttempt() time.Time {
return return ka.lastattempt
}
// Already at our max?
if ka.refs == newBucketsPerAddress {
return
}
// The more entries we have, the less likely we are to add more.
// likelyhood is 2N.
factor := int32(2 * ka.refs)
if a.rand.Int31n(factor) != 0 {
return
}
} else {
// Make a copy of the net address to avoid races since it is
// updated elsewhere in the addrmanager code and would otherwise
// change the actual netaddress on the peer.
netAddrCopy := *netAddr
ka = &knownAddress{na: &netAddrCopy, srcAddr: srcAddr}
a.addrIndex[addr] = ka
a.nNew++
// XXX time penalty?
}
bucket := a.getNewBucket(netAddr, srcAddr)
// Already exists?
if _, ok := a.addrNew[bucket][addr]; ok {
return
}
// Enforce max addresses.
if len(a.addrNew[bucket]) > newBucketSize {
amgrLog.Tracef("new bucket is full, expiring old ")
a.expireNew(bucket)
}
// Add to new bucket.
ka.refs++
a.addrNew[bucket][addr] = ka
amgrLog.Tracef("Added new address %s for a total of %d addresses",
addr, a.nTried+a.nNew)
} }
// bad returns true if the address in question has not been tried in the last // bad returns true if the address in question has not been tried in the last
@ -257,6 +200,107 @@ func chance(ka *knownAddress) float64 {
return c return c
} }
// AddrManager provides a concurrency safe address manager for caching potential
// peers on the bitcoin network.
type AddrManager struct {
mtx sync.Mutex
dataDir string
lookupFunc func(string) ([]net.IP, error)
rand *rand.Rand
key [32]byte
addrIndex map[string]*knownAddress // address key to ka for all addrs.
addrNew [newBucketCount]map[string]*knownAddress
addrTried [triedBucketCount]*list.List
started int32
shutdown int32
wg sync.WaitGroup
quit chan struct{}
nTried int
nNew int
lamtx sync.Mutex
localAddresses map[string]*localAddress
}
// updateAddress is a helper function to either update an address already known
// to the address manager, or to add the address if not already known.
func (a *AddrManager) updateAddress(netAddr, srcAddr *btcwire.NetAddress) {
// Filter out non-routable addresses. Note that non-routable
// also includes invalid and local addresses.
if !Routable(netAddr) {
return
}
// Protect concurrent access.
a.mtx.Lock()
defer a.mtx.Unlock()
addr := NetAddressKey(netAddr)
ka := a.find(netAddr)
if ka != nil {
// TODO(oga) only update adresses periodically.
// Update the last seen time and services.
// note that to prevent causing excess garbage on getaddr
// messages the netaddresses in addrmaanger are *immutable*,
// if we need to change them then we replace the pointer with a
// new copy so that we don't have to copy every na for getaddr.
if netAddr.Timestamp.After(ka.na.Timestamp) ||
(ka.na.Services&netAddr.Services) !=
netAddr.Services {
naCopy := *ka.na
naCopy.Timestamp = netAddr.Timestamp
naCopy.AddService(netAddr.Services)
ka.na = &naCopy
}
// If already in tried, we have nothing to do here.
if ka.tried {
return
}
// Already at our max?
if ka.refs == newBucketsPerAddress {
return
}
// The more entries we have, the less likely we are to add more.
// likelyhood is 2N.
factor := int32(2 * ka.refs)
if a.rand.Int31n(factor) != 0 {
return
}
} else {
// Make a copy of the net address to avoid races since it is
// updated elsewhere in the addrmanager code and would otherwise
// change the actual netaddress on the peer.
netAddrCopy := *netAddr
ka = &knownAddress{na: &netAddrCopy, srcAddr: srcAddr}
a.addrIndex[addr] = ka
a.nNew++
// XXX time penalty?
}
bucket := a.getNewBucket(netAddr, srcAddr)
// Already exists?
if _, ok := a.addrNew[bucket][addr]; ok {
return
}
// Enforce max addresses.
if len(a.addrNew[bucket]) > newBucketSize {
log.Tracef("new bucket is full, expiring old ")
a.expireNew(bucket)
}
// Add to new bucket.
ka.refs++
a.addrNew[bucket][addr] = ka
log.Tracef("Added new address %s for a total of %d addresses", addr,
a.nTried+a.nNew)
}
// expireNew makes space in the new buckets by expiring the really bad entries. // expireNew makes space in the new buckets by expiring the really bad entries.
// If no bad entries are available we look at a few and remove the oldest. // If no bad entries are available we look at a few and remove the oldest.
func (a *AddrManager) expireNew(bucket int) { func (a *AddrManager) expireNew(bucket int) {
@ -268,7 +312,7 @@ func (a *AddrManager) expireNew(bucket int) {
var oldest *knownAddress var oldest *knownAddress
for k, v := range a.addrNew[bucket] { for k, v := range a.addrNew[bucket] {
if bad(v) { if bad(v) {
amgrLog.Tracef("expiring bad address %v", k) log.Tracef("expiring bad address %v", k)
delete(a.addrNew[bucket], k) delete(a.addrNew[bucket], k)
v.refs-- v.refs--
if v.refs == 0 { if v.refs == 0 {
@ -286,7 +330,7 @@ func (a *AddrManager) expireNew(bucket int) {
if oldest != nil { if oldest != nil {
key := NetAddressKey(oldest.na) key := NetAddressKey(oldest.na)
amgrLog.Tracef("expiring oldest address %v", key) log.Tracef("expiring oldest address %v", key)
delete(a.addrNew[bucket], key) delete(a.addrNew[bucket], key)
oldest.refs-- oldest.refs--
@ -314,37 +358,6 @@ func (a *AddrManager) pickTried(bucket int) *list.Element {
return oldestElem return oldestElem
} }
// knownAddress tracks information about a known network address that is used
// to determine how viable an address is.
type knownAddress struct {
na *btcwire.NetAddress
srcAddr *btcwire.NetAddress
attempts int
lastattempt time.Time
lastsuccess time.Time
tried bool
refs int // reference count of new buckets
}
// AddrManager provides a concurrency safe address manager for caching potential
// peers on the bitcoin network.
type AddrManager struct {
mtx sync.Mutex
rand *rand.Rand
key [32]byte
addrIndex map[string]*knownAddress // address key to ka for all addrs.
addrNew [newBucketCount]map[string]*knownAddress
addrTried [triedBucketCount]*list.List
started int32
shutdown int32
wg sync.WaitGroup
quit chan struct{}
nTried int
nNew int
lamtx sync.Mutex
localAddresses map[string]*localAddress
}
func (a *AddrManager) getNewBucket(netAddr, srcAddr *btcwire.NetAddress) int { func (a *AddrManager) getNewBucket(netAddr, srcAddr *btcwire.NetAddress) int {
// bitcoind: // bitcoind:
// doublesha256(key + sourcegroup + int64(doublesha256(key + group + sourcegroup))%bucket_per_source_group) % num_new_buckes // doublesha256(key + sourcegroup + int64(doublesha256(key + group + sourcegroup))%bucket_per_source_group) % num_new_buckes
@ -404,7 +417,7 @@ out:
dumpAddressTicker.Stop() dumpAddressTicker.Stop()
a.savePeers() a.savePeers()
a.wg.Done() a.wg.Done()
amgrLog.Trace("Address handler done") log.Trace("Address handler done")
} }
type serialisedKnownAddress struct { type serialisedKnownAddress struct {
@ -472,17 +485,17 @@ func (a *AddrManager) savePeers() {
// May give some way to specify this later. // May give some way to specify this later.
filename := "peers.json" filename := "peers.json"
filePath := filepath.Join(cfg.DataDir, filename) filePath := filepath.Join(a.dataDir, filename)
w, err := os.Create(filePath) w, err := os.Create(filePath)
if err != nil { if err != nil {
amgrLog.Error("Error opening file: ", filePath, err) log.Error("Error opening file: ", filePath, err)
return return
} }
enc := json.NewEncoder(w) enc := json.NewEncoder(w)
defer w.Close() defer w.Close()
if err := enc.Encode(&sam); err != nil { if err := enc.Encode(&sam); err != nil {
amgrLog.Errorf("Failed to encode %s: %v", filePath, err) log.Errorf("Failed to encode %s: %v", filePath, err)
return return
} }
} }
@ -495,21 +508,20 @@ func (a *AddrManager) loadPeers() {
// May give some way to specify this later. // May give some way to specify this later.
filename := "peers.json" filename := "peers.json"
filePath := filepath.Join(cfg.DataDir, filename) filePath := filepath.Join(a.dataDir, filename)
err := a.deserialisePeers(filePath) err := a.deserialisePeers(filePath)
if err != nil { if err != nil {
amgrLog.Errorf("Failed to parse %s: %v", filePath, err) log.Errorf("Failed to parse %s: %v", filePath, err)
// if it is invalid we nuke the old one unconditionally. // if it is invalid we nuke the old one unconditionally.
err = os.Remove(filePath) err = os.Remove(filePath)
if err != nil { if err != nil {
amgrLog.Warn("Failed to remove corrupt peers "+ log.Warn("Failed to remove corrupt peers file: ", err)
"file: ", err)
} }
a.reset() a.reset()
return return
} }
amgrLog.Infof("Loaded %d addresses from '%s'", a.nNew+a.nTried, filePath) log.Infof("Loaded %d addresses from '%s'", a.nNew+a.nTried, filePath)
} }
func (a *AddrManager) deserialisePeers(filePath string) error { func (a *AddrManager) deserialisePeers(filePath string) error {
@ -539,12 +551,12 @@ func (a *AddrManager) deserialisePeers(filePath string) error {
for _, v := range sam.Addresses { for _, v := range sam.Addresses {
ka := new(knownAddress) ka := new(knownAddress)
ka.na, err = deserialiseNetAddress(v.Addr) ka.na, err = a.DeserialiseNetAddress(v.Addr)
if err != nil { if err != nil {
return fmt.Errorf("failed to deserialise netaddress "+ return fmt.Errorf("failed to deserialise netaddress "+
"%s: %v", v.Addr, err) "%s: %v", v.Addr, err)
} }
ka.srcAddr, err = deserialiseNetAddress(v.Src) ka.srcAddr, err = a.DeserialiseNetAddress(v.Src)
if err != nil { if err != nil {
return fmt.Errorf("failed to deserialise netaddress "+ return fmt.Errorf("failed to deserialise netaddress "+
"%s: %v", v.Src, err) "%s: %v", v.Src, err)
@ -600,7 +612,7 @@ func (a *AddrManager) deserialisePeers(filePath string) error {
return nil return nil
} }
func deserialiseNetAddress(addr string) (*btcwire.NetAddress, error) { func (a *AddrManager) DeserialiseNetAddress(addr string) (*btcwire.NetAddress, error) {
host, portStr, err := net.SplitHostPort(addr) host, portStr, err := net.SplitHostPort(addr)
if err != nil { if err != nil {
return nil, err return nil, err
@ -610,7 +622,7 @@ func deserialiseNetAddress(addr string) (*btcwire.NetAddress, error) {
return nil, err return nil, err
} }
return hostToNetAddress(host, uint16(port), btcwire.SFNodeNetwork) return a.HostToNetAddress(host, uint16(port), btcwire.SFNodeNetwork)
} }
// Start begins the core address handler which manages a pool of known // Start begins the core address handler which manages a pool of known
@ -621,7 +633,7 @@ func (a *AddrManager) Start() {
return return
} }
amgrLog.Trace("Starting address manager") log.Trace("Starting address manager")
a.wg.Add(1) a.wg.Add(1)
@ -635,12 +647,12 @@ func (a *AddrManager) Start() {
// Stop gracefully shuts down the address manager by stopping the main handler. // Stop gracefully shuts down the address manager by stopping the main handler.
func (a *AddrManager) Stop() error { func (a *AddrManager) Stop() error {
if atomic.AddInt32(&a.shutdown, 1) != 1 { if atomic.AddInt32(&a.shutdown, 1) != 1 {
amgrLog.Warnf("Address manager is already in the process of " + log.Warnf("Address manager is already in the process of " +
"shutting down") "shutting down")
return nil return nil
} }
amgrLog.Infof("Address manager shutting down") log.Infof("Address manager shutting down")
close(a.quit) close(a.quit)
a.wg.Wait() a.wg.Wait()
return nil return nil
@ -649,8 +661,7 @@ func (a *AddrManager) Stop() error {
// AddAddresses adds new addresses to the address manager. It enforces a max // AddAddresses adds new addresses to the address manager. It enforces a max
// number of addresses and silently ignores duplicate addresses. It is // number of addresses and silently ignores duplicate addresses. It is
// safe for concurrent access. // safe for concurrent access.
func (a *AddrManager) AddAddresses(addrs []*btcwire.NetAddress, func (a *AddrManager) AddAddresses(addrs []*btcwire.NetAddress, srcAddr *btcwire.NetAddress) {
srcAddr *btcwire.NetAddress) {
for _, na := range addrs { for _, na := range addrs {
a.updateAddress(na, srcAddr) a.updateAddress(na, srcAddr)
} }
@ -670,8 +681,8 @@ func (a *AddrManager) AddAddressByIP(addrIP string) {
// Split IP and port // Split IP and port
addr, portStr, err := net.SplitHostPort(addrIP) addr, portStr, err := net.SplitHostPort(addrIP)
if err != nil { if err != nil {
amgrLog.Warnf("AddADddressByIP given bullshit adddress"+ log.Warnf("AddADddressByIP given bullshit adddress (%s): %v",
"(%s): %v", err) err)
return return
} }
// Put it in btcwire.Netaddress // Put it in btcwire.Netaddress
@ -679,12 +690,12 @@ func (a *AddrManager) AddAddressByIP(addrIP string) {
na.Timestamp = time.Now() na.Timestamp = time.Now()
na.IP = net.ParseIP(addr) na.IP = net.ParseIP(addr)
if na.IP == nil { if na.IP == nil {
amgrLog.Error("Invalid ip address:", addr) log.Error("Invalid ip address:", addr)
return return
} }
port, err := strconv.ParseUint(portStr, 10, 0) port, err := strconv.ParseUint(portStr, 10, 0)
if err != nil { if err != nil {
amgrLog.Error("Invalid port: ", portStr, err) log.Error("Invalid port: ", portStr, err)
return return
} }
na.Port = uint16(port) na.Port = uint16(port)
@ -757,22 +768,10 @@ func (a *AddrManager) reset() {
} }
} }
// NewAddrManager returns a new bitcoin address manager. // HostToNetAddress returns a netaddress given a host address. If the address is
// Use Start to begin processing asynchronous address updates.
func NewAddrManager() *AddrManager {
am := AddrManager{
rand: rand.New(rand.NewSource(time.Now().UnixNano())),
quit: make(chan struct{}),
localAddresses: make(map[string]*localAddress),
}
am.reset()
return &am
}
// hostToNetAddress returns a netaddress given a host address. If the address is
// a tor .onion address this will be taken care of. else if the host is not an // a tor .onion address this will be taken care of. else if the host is not an
// IP address it will be resolved (via tor if required). // IP address it will be resolved (via tor if required).
func hostToNetAddress(host string, port uint16, services btcwire.ServiceFlag) (*btcwire.NetAddress, error) { func (a *AddrManager) HostToNetAddress(host string, port uint16, services btcwire.ServiceFlag) (*btcwire.NetAddress, error) {
// tor address is 16 char base32 + ".onion" // tor address is 16 char base32 + ".onion"
var ip net.IP var ip net.IP
if len(host) == 22 && host[16:] == ".onion" { if len(host) == 22 && host[16:] == ".onion" {
@ -787,7 +786,7 @@ func hostToNetAddress(host string, port uint16, services btcwire.ServiceFlag) (*
prefix := []byte{0xfd, 0x87, 0xd8, 0x7e, 0xeb, 0x43} prefix := []byte{0xfd, 0x87, 0xd8, 0x7e, 0xeb, 0x43}
ip = net.IP(append(prefix, data...)) ip = net.IP(append(prefix, data...))
} else if ip = net.ParseIP(host); ip == nil { } else if ip = net.ParseIP(host); ip == nil {
ips, err := btcdLookup(host) ips, err := a.lookupFunc(host)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -866,7 +865,7 @@ func (a *AddrManager) GetAddress(class string, newBias int) *knownAddress {
ka := e.Value.(*knownAddress) ka := e.Value.(*knownAddress)
randval := a.rand.Intn(large) randval := a.rand.Intn(large)
if float64(randval) < (factor * chance(ka) * float64(large)) { if float64(randval) < (factor * chance(ka) * float64(large)) {
amgrLog.Tracef("Selected %v from tried bucket", log.Tracef("Selected %v from tried bucket",
NetAddressKey(ka.na)) NetAddressKey(ka.na))
return ka return ka
} }
@ -894,7 +893,7 @@ func (a *AddrManager) GetAddress(class string, newBias int) *knownAddress {
} }
randval := a.rand.Intn(large) randval := a.rand.Intn(large)
if float64(randval) < (factor * chance(ka) * float64(large)) { if float64(randval) < (factor * chance(ka) * float64(large)) {
amgrLog.Tracef("Selected %v from new bucket", log.Tracef("Selected %v from new bucket",
NetAddressKey(ka.na)) NetAddressKey(ka.na))
return ka return ka
} }
@ -1034,7 +1033,7 @@ func (a *AddrManager) Good(addr *btcwire.NetAddress) {
a.nNew++ a.nNew++
rmkey := NetAddressKey(rmka.na) rmkey := NetAddressKey(rmka.na)
amgrLog.Tracef("Replacing %s with %s in tried", rmkey, addrKey) log.Tracef("Replacing %s with %s in tried", rmkey, addrKey)
// We made sure there is space here just above. // We made sure there is space here just above.
a.addrNew[newBucket][rmkey] = rmka a.addrNew[newBucket][rmkey] = rmka
@ -1240,18 +1239,17 @@ type localAddress struct {
score addressPrio score addressPrio
} }
// addLocalAddress adds na to the list of known local addresses to advertise // AddLocalAddress adds na to the list of known local addresses to advertise
// with the given priority. // with the given priority.
func (a *AddrManager) addLocalAddress(na *btcwire.NetAddress, func (a *AddrManager) AddLocalAddress(na *btcwire.NetAddress,
priority addressPrio) { priority addressPrio) {
// sanity check. // sanity check.
if !Routable(na) { if !Routable(na) {
amgrLog.Debugf("rejecting address %s:%d due to routability", log.Debugf("rejecting address %s:%d due to routability",
na.IP, na.Port) na.IP, na.Port)
return return
} }
amgrLog.Debugf("adding address %s:%d", log.Debugf("adding address %s:%d", na.IP, na.Port)
na.IP, na.Port)
a.lamtx.Lock() a.lamtx.Lock()
defer a.lamtx.Unlock() defer a.lamtx.Unlock()
@ -1349,8 +1347,8 @@ func getReachabilityFrom(na, fromna *btcwire.NetAddress) int {
} }
// getBestLocalAddress returns the most appropriate local address that we know // getBestLocalAddress returns the most appropriate local address that we know
// of to be contacted by rna // of to be contacted by rna.
func (a *AddrManager) getBestLocalAddress(rna *btcwire.NetAddress) *btcwire.NetAddress { func (a *AddrManager) GetBestLocalAddress(rna *btcwire.NetAddress) *btcwire.NetAddress {
a.lamtx.Lock() a.lamtx.Lock()
defer a.lamtx.Unlock() defer a.lamtx.Unlock()
@ -1367,11 +1365,10 @@ func (a *AddrManager) getBestLocalAddress(rna *btcwire.NetAddress) *btcwire.NetA
} }
} }
if bestna != nil { if bestna != nil {
amgrLog.Debugf("Suggesting address %s:%d for %s:%d", log.Debugf("Suggesting address %s:%d for %s:%d", bestna.IP,
bestna.IP, bestna.Port, rna.IP, rna.Port) bestna.Port, rna.IP, rna.Port)
} else { } else {
amgrLog.Debugf("No worthy address for %s:%d", log.Debugf("No worthy address for %s:%d", rna.IP, rna.Port)
rna.IP, rna.Port)
// Send something unroutable if nothing suitable. // Send something unroutable if nothing suitable.
bestna = &btcwire.NetAddress{ bestna = &btcwire.NetAddress{
Timestamp: time.Now(), Timestamp: time.Now(),
@ -1383,3 +1380,17 @@ func (a *AddrManager) getBestLocalAddress(rna *btcwire.NetAddress) *btcwire.NetA
return bestna return bestna
} }
// New returns a new bitcoin address manager.
// Use Start to begin processing asynchronous address updates.
func New(dataDir string, lookupFunc func(string) ([]net.IP, error)) *AddrManager {
am := AddrManager{
dataDir: dataDir,
lookupFunc: lookupFunc,
rand: rand.New(rand.NewSource(time.Now().UnixNano())),
quit: make(chan struct{}),
localAddresses: make(map[string]*localAddress),
}
am.reset()
return &am
}

View file

@ -2,13 +2,15 @@
// Use of this source code is governed by an ISC // Use of this source code is governed by an ISC
// license that can be found in the LICENSE file. // license that can be found in the LICENSE file.
package main package addrmgr_test
import ( import (
"errors"
"net" "net"
"testing" "testing"
"time" "time"
"github.com/conformal/btcd/addrmgr"
"github.com/conformal/btcwire" "github.com/conformal/btcwire"
) )
@ -163,8 +165,12 @@ func addNaTests() {
addNaTest("fef3::4:4", 8336, "[fef3::4:4]:8336") addNaTest("fef3::4:4", 8336, "[fef3::4:4]:8336")
} }
func lookupFunc(host string) ([]net.IP, error) {
return nil, errors.New("not implemented")
}
func TestGetAddress(t *testing.T) { func TestGetAddress(t *testing.T) {
n := NewAddrManager() n := addrmgr.New("testdir", lookupFunc)
if rv := n.GetAddress("any", 10); rv != nil { if rv := n.GetAddress("any", 10); rv != nil {
t.Errorf("GetAddress failed: got: %v want: %v\n", rv, nil) t.Errorf("GetAddress failed: got: %v want: %v\n", rv, nil)
} }
@ -175,91 +181,91 @@ func TestIPTypes(t *testing.T) {
t.Logf("Running %d tests", len(ipTests)) t.Logf("Running %d tests", len(ipTests))
for _, test := range ipTests { for _, test := range ipTests {
rv := RFC1918(&test.in) rv := addrmgr.RFC1918(&test.in)
if rv != test.rfc1918 { if rv != test.rfc1918 {
t.Errorf("RFC1918 %s\n got: %v want: %v", test.in.IP, rv, test.rfc1918) t.Errorf("RFC1918 %s\n got: %v want: %v", test.in.IP, rv, test.rfc1918)
continue continue
} }
} }
for _, test := range ipTests { for _, test := range ipTests {
rv := RFC3849(&test.in) rv := addrmgr.RFC3849(&test.in)
if rv != test.rfc3849 { if rv != test.rfc3849 {
t.Errorf("RFC3849 %s\n got: %v want: %v", test.in.IP, rv, test.rfc3849) t.Errorf("RFC3849 %s\n got: %v want: %v", test.in.IP, rv, test.rfc3849)
continue continue
} }
} }
for _, test := range ipTests { for _, test := range ipTests {
rv := RFC3927(&test.in) rv := addrmgr.RFC3927(&test.in)
if rv != test.rfc3927 { if rv != test.rfc3927 {
t.Errorf("RFC3927 %s\n got: %v want: %v", test.in.IP, rv, test.rfc3927) t.Errorf("RFC3927 %s\n got: %v want: %v", test.in.IP, rv, test.rfc3927)
continue continue
} }
} }
for _, test := range ipTests { for _, test := range ipTests {
rv := RFC3964(&test.in) rv := addrmgr.RFC3964(&test.in)
if rv != test.rfc3964 { if rv != test.rfc3964 {
t.Errorf("RFC3964 %s\n got: %v want: %v", test.in.IP, rv, test.rfc3964) t.Errorf("RFC3964 %s\n got: %v want: %v", test.in.IP, rv, test.rfc3964)
continue continue
} }
} }
for _, test := range ipTests { for _, test := range ipTests {
rv := RFC4193(&test.in) rv := addrmgr.RFC4193(&test.in)
if rv != test.rfc4193 { if rv != test.rfc4193 {
t.Errorf("RFC4193 %s\n got: %v want: %v", test.in.IP, rv, test.rfc4193) t.Errorf("RFC4193 %s\n got: %v want: %v", test.in.IP, rv, test.rfc4193)
continue continue
} }
} }
for _, test := range ipTests { for _, test := range ipTests {
rv := RFC4380(&test.in) rv := addrmgr.RFC4380(&test.in)
if rv != test.rfc4380 { if rv != test.rfc4380 {
t.Errorf("RFC4380 %s\n got: %v want: %v", test.in.IP, rv, test.rfc4380) t.Errorf("RFC4380 %s\n got: %v want: %v", test.in.IP, rv, test.rfc4380)
continue continue
} }
} }
for _, test := range ipTests { for _, test := range ipTests {
rv := RFC4843(&test.in) rv := addrmgr.RFC4843(&test.in)
if rv != test.rfc4843 { if rv != test.rfc4843 {
t.Errorf("RFC4843 %s\n got: %v want: %v", test.in.IP, rv, test.rfc4843) t.Errorf("RFC4843 %s\n got: %v want: %v", test.in.IP, rv, test.rfc4843)
continue continue
} }
} }
for _, test := range ipTests { for _, test := range ipTests {
rv := RFC4862(&test.in) rv := addrmgr.RFC4862(&test.in)
if rv != test.rfc4862 { if rv != test.rfc4862 {
t.Errorf("RFC4862 %s\n got: %v want: %v", test.in.IP, rv, test.rfc4862) t.Errorf("RFC4862 %s\n got: %v want: %v", test.in.IP, rv, test.rfc4862)
continue continue
} }
} }
for _, test := range ipTests { for _, test := range ipTests {
rv := RFC6052(&test.in) rv := addrmgr.RFC6052(&test.in)
if rv != test.rfc6052 { if rv != test.rfc6052 {
t.Errorf("RFC6052 %s\n got: %v want: %v", test.in.IP, rv, test.rfc6052) t.Errorf("RFC6052 %s\n got: %v want: %v", test.in.IP, rv, test.rfc6052)
continue continue
} }
} }
for _, test := range ipTests { for _, test := range ipTests {
rv := RFC6145(&test.in) rv := addrmgr.RFC6145(&test.in)
if rv != test.rfc6145 { if rv != test.rfc6145 {
t.Errorf("RFC1918 %s\n got: %v want: %v", test.in.IP, rv, test.rfc6145) t.Errorf("RFC1918 %s\n got: %v want: %v", test.in.IP, rv, test.rfc6145)
continue continue
} }
} }
for _, test := range ipTests { for _, test := range ipTests {
rv := Local(&test.in) rv := addrmgr.Local(&test.in)
if rv != test.local { if rv != test.local {
t.Errorf("Local %s\n got: %v want: %v", test.in.IP, rv, test.local) t.Errorf("Local %s\n got: %v want: %v", test.in.IP, rv, test.local)
continue continue
} }
} }
for _, test := range ipTests { for _, test := range ipTests {
rv := Valid(&test.in) rv := addrmgr.Valid(&test.in)
if rv != test.valid { if rv != test.valid {
t.Errorf("Valid %s\n got: %v want: %v", test.in.IP, rv, test.valid) t.Errorf("Valid %s\n got: %v want: %v", test.in.IP, rv, test.valid)
continue continue
} }
} }
for _, test := range ipTests { for _, test := range ipTests {
rv := Routable(&test.in) rv := addrmgr.Routable(&test.in)
if rv != test.routable { if rv != test.routable {
t.Errorf("Routable %s\n got: %v want: %v", test.in.IP, rv, test.routable) t.Errorf("Routable %s\n got: %v want: %v", test.in.IP, rv, test.routable)
continue continue
@ -272,7 +278,7 @@ func TestNetAddressKey(t *testing.T) {
t.Logf("Running %d tests", len(naTests)) t.Logf("Running %d tests", len(naTests))
for i, test := range naTests { for i, test := range naTests {
key := NetAddressKey(&test.in) key := addrmgr.NetAddressKey(&test.in)
if key != test.want { if key != test.want {
t.Errorf("NetAddressKey #%d\n got: %s want: %s", i, key, test.want) t.Errorf("NetAddressKey #%d\n got: %s want: %s", i, key, test.want)
continue continue

3
log.go
View file

@ -9,6 +9,8 @@ import (
"os" "os"
"time" "time"
"github.com/conformal/btcd/addrmgr"
"github.com/conformal/btcchain" "github.com/conformal/btcchain"
"github.com/conformal/btcdb" "github.com/conformal/btcdb"
"github.com/conformal/btclog" "github.com/conformal/btclog"
@ -89,6 +91,7 @@ func useLogger(subsystemID string, logger btclog.Logger) {
switch subsystemID { switch subsystemID {
case "AMGR": case "AMGR":
amgrLog = logger amgrLog = logger
addrmgr.UseLogger(logger)
case "BCDB": case "BCDB":
bcdbLog = logger bcdbLog = logger

15
peer.go
View file

@ -15,6 +15,7 @@ import (
"time" "time"
"github.com/conformal/btcchain" "github.com/conformal/btcchain"
"github.com/conformal/btcd/addrmgr"
"github.com/conformal/btcdb" "github.com/conformal/btcdb"
"github.com/conformal/btcutil" "github.com/conformal/btcutil"
"github.com/conformal/btcwire" "github.com/conformal/btcwire"
@ -253,7 +254,7 @@ func (p *peer) pushVersionMsg() error {
// Version message. // Version message.
msg := btcwire.NewMsgVersion( msg := btcwire.NewMsgVersion(
p.server.addrManager.getBestLocalAddress(p.na), theirNa, p.server.addrManager.GetBestLocalAddress(p.na), theirNa,
p.server.nonce, int32(blockNum)) p.server.nonce, int32(blockNum))
msg.AddUserAgent(userAgentName, userAgentVersion) msg.AddUserAgent(userAgentName, userAgentVersion)
@ -296,8 +297,8 @@ func (p *peer) updateAddresses(msg *btcwire.MsgVersion) {
// download and the local address is routable. // download and the local address is routable.
if !cfg.DisableListen /* && isCurrent? */ { if !cfg.DisableListen /* && isCurrent? */ {
// Get address that best matches. // Get address that best matches.
lna := p.server.addrManager.getBestLocalAddress(p.na) lna := p.server.addrManager.GetBestLocalAddress(p.na)
if Routable(lna) { if addrmgr.Routable(lna) {
addresses := []*btcwire.NetAddress{lna} addresses := []*btcwire.NetAddress{lna}
p.pushAddrMsg(addresses) p.pushAddrMsg(addresses)
} }
@ -319,7 +320,7 @@ func (p *peer) updateAddresses(msg *btcwire.MsgVersion) {
// actually connected from. One example of why this can happen // actually connected from. One example of why this can happen
// is with NAT. Only add the address to the address manager if // is with NAT. Only add the address to the address manager if
// the addresses agree. // the addresses agree.
if NetAddressKey(&msg.AddrMe) == NetAddressKey(p.na) { if addrmgr.NetAddressKey(&msg.AddrMe) == addrmgr.NetAddressKey(p.na) {
p.server.addrManager.AddAddress(p.na, p.na) p.server.addrManager.AddAddress(p.na, p.na)
p.server.addrManager.Good(p.na) p.server.addrManager.Good(p.na)
} }
@ -925,7 +926,7 @@ func (p *peer) pushAddrMsg(addresses []*btcwire.NetAddress) error {
msg := btcwire.NewMsgAddr() msg := btcwire.NewMsgAddr()
for _, na := range addresses { for _, na := range addresses {
// Filter addresses the peer already knows about. // Filter addresses the peer already knows about.
if _, ok := p.knownAddresses[NetAddressKey(na)]; ok { if _, ok := p.knownAddresses[addrmgr.NetAddressKey(na)]; ok {
continue continue
} }
@ -993,7 +994,7 @@ func (p *peer) handleAddrMsg(msg *btcwire.MsgAddr) {
} }
// Add address to known addresses for this peer. // Add address to known addresses for this peer.
p.knownAddresses[NetAddressKey(na)] = struct{}{} p.knownAddresses[addrmgr.NetAddressKey(na)] = struct{}{}
} }
// Add addresses to server address manager. The address manager handles // Add addresses to server address manager. The address manager handles
@ -1687,7 +1688,7 @@ func newOutboundPeer(s *server, addr string, persistent bool) *peer {
return nil return nil
} }
p.na, err = hostToNetAddress(host, uint16(port), 0) p.na, err = s.addrManager.HostToNetAddress(host, uint16(port), 0)
if err != nil { if err != nil {
p.logError("Can not turn host %s into netaddress: %v", p.logError("Can not turn host %s into netaddress: %v",
host, err) host, err)

View file

@ -11,6 +11,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"math" "math"
mrand "math/rand"
"net" "net"
"runtime" "runtime"
"strconv" "strconv"
@ -18,6 +19,7 @@ import (
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/conformal/btcd/addrmgr"
"github.com/conformal/btcdb" "github.com/conformal/btcdb"
"github.com/conformal/btcjson" "github.com/conformal/btcjson"
"github.com/conformal/btcnet" "github.com/conformal/btcnet"
@ -71,7 +73,7 @@ type server struct {
bytesMutex sync.Mutex // For the following two fields. bytesMutex sync.Mutex // For the following two fields.
bytesReceived uint64 // Total bytes received from all peers since start. bytesReceived uint64 // Total bytes received from all peers since start.
bytesSent uint64 // Total bytes sent by all peers since start. bytesSent uint64 // Total bytes sent by all peers since start.
addrManager *AddrManager addrManager *addrmgr.AddrManager
rpcServer *rpcServer rpcServer *rpcServer
blockManager *blockManager blockManager *blockManager
txMemPool *txMemPool txMemPool *txMemPool
@ -224,7 +226,7 @@ func (s *server) handleAddPeerMsg(state *peerState, p *peer) bool {
state.peers.PushBack(p) state.peers.PushBack(p)
p.Start() p.Start()
} else { } else {
state.outboundGroups[GroupKey(p.na)]++ state.outboundGroups[addrmgr.GroupKey(p.na)]++
if p.persistent { if p.persistent {
state.persistentPeers.PushBack(p) state.persistentPeers.PushBack(p)
} else { } else {
@ -256,7 +258,7 @@ func (s *server) handleDonePeerMsg(state *peerState, p *peer) {
return return
} }
if !p.inbound { if !p.inbound {
state.outboundGroups[GroupKey(p.na)]-- state.outboundGroups[addrmgr.GroupKey(p.na)]--
} }
list.Remove(e) list.Remove(e)
srvrLog.Debugf("Removed peer %s", p) srvrLog.Debugf("Removed peer %s", p)
@ -418,7 +420,7 @@ func (s *server) handleQuery(querymsg interface{}, state *peerState) {
if peer.addr == msg.addr { if peer.addr == msg.addr {
// Keep group counts ok since we remove from // Keep group counts ok since we remove from
// the list now. // the list now.
state.outboundGroups[GroupKey(peer.na)]-- state.outboundGroups[addrmgr.GroupKey(peer.na)]--
// This is ok because we are not continuing // This is ok because we are not continuing
// to iterate so won't corrupt the loop. // to iterate so won't corrupt the loop.
state.persistentPeers.Remove(e) state.persistentPeers.Remove(e)
@ -473,6 +475,7 @@ func (s *server) seedFromDNS() {
return return
} }
randSource := mrand.New(mrand.NewSource(time.Now().UnixNano()))
for _, seeder := range activeNetParams.dnsSeeds { for _, seeder := range activeNetParams.dnsSeeds {
go func(seeder string) { go func(seeder string) {
seedpeers, err := dnsDiscover(seeder) seedpeers, err := dnsDiscover(seeder)
@ -498,7 +501,7 @@ func (s *server) seedFromDNS() {
// and 7 days ago. // and 7 days ago.
addresses[i].Timestamp = time.Now().Add(-1 * addresses[i].Timestamp = time.Now().Add(-1 *
time.Second * time.Duration(secondsIn3Days+ time.Second * time.Duration(secondsIn3Days+
s.addrManager.rand.Int31n(secondsIn4Days))) randSource.Int31n(secondsIn4Days)))
} }
// Bitcoind uses a lookup of the dns seeder here. This // Bitcoind uses a lookup of the dns seeder here. This
@ -619,7 +622,7 @@ out:
if addr == nil { if addr == nil {
break break
} }
key := GroupKey(addr.na) key := addrmgr.GroupKey(addr.NetAddress())
// Address will not be invalid, local or unroutable // Address will not be invalid, local or unroutable
// because addrmanager rejects those on addition. // because addrmanager rejects those on addition.
// Just check that we don't already have an address // Just check that we don't already have an address
@ -641,18 +644,18 @@ out:
// only allow recent nodes (10mins) after we failed 30 // only allow recent nodes (10mins) after we failed 30
// times // times
if time.Now().After(addr.lastattempt.Add(10*time.Minute)) && if time.Now().After(addr.LastAttempt().Add(10*time.Minute)) &&
tries < 30 { tries < 30 {
continue continue
} }
// allow nondefault ports after 50 failed tries. // allow nondefault ports after 50 failed tries.
if fmt.Sprintf("%d", addr.na.Port) != if fmt.Sprintf("%d", addr.NetAddress().Port) !=
activeNetParams.DefaultPort && tries < 50 { activeNetParams.DefaultPort && tries < 50 {
continue continue
} }
addrStr := NetAddressKey(addr.na) addrStr := addrmgr.NetAddressKey(addr.NetAddress())
tries = 0 tries = 0
// any failure will be due to banned peers etc. we have // any failure will be due to banned peers etc. we have
@ -1027,8 +1030,8 @@ out:
} }
na := btcwire.NewNetAddressIPPort(externalip, uint16(listenPort), na := btcwire.NewNetAddressIPPort(externalip, uint16(listenPort),
btcwire.SFNodeNetwork) btcwire.SFNodeNetwork)
s.addrManager.addLocalAddress(na, UpnpPrio) s.addrManager.AddLocalAddress(na, addrmgr.UpnpPrio)
srvrLog.Warnf("Successfully bound via UPnP to %s", NetAddressKey(na)) srvrLog.Warnf("Successfully bound via UPnP to %s", addrmgr.NetAddressKey(na))
first = false first = false
} }
timer.Reset(time.Minute * 15) timer.Reset(time.Minute * 15)
@ -1057,7 +1060,7 @@ func newServer(listenAddrs []string, db btcdb.Db, netParams *btcnet.Params) (*se
return nil, err return nil, err
} }
amgr := NewAddrManager() amgr := addrmgr.New(cfg.DataDir, btcdLookup)
var listeners []net.Listener var listeners []net.Listener
var nat NAT var nat NAT
@ -1093,7 +1096,7 @@ func newServer(listenAddrs []string, db btcdb.Db, netParams *btcnet.Params) (*se
} }
eport = uint16(port) eport = uint16(port)
} }
na, err := hostToNetAddress(host, eport, na, err := amgr.HostToNetAddress(host, eport,
btcwire.SFNodeNetwork) btcwire.SFNodeNetwork)
if err != nil { if err != nil {
srvrLog.Warnf("Not adding %s as "+ srvrLog.Warnf("Not adding %s as "+
@ -1101,7 +1104,7 @@ func newServer(listenAddrs []string, db btcdb.Db, netParams *btcnet.Params) (*se
continue continue
} }
amgr.addLocalAddress(na, ManualPrio) amgr.AddLocalAddress(na, addrmgr.ManualPrio)
} }
} else if discover && cfg.Upnp { } else if discover && cfg.Upnp {
nat, err = Discover() nat, err = Discover()
@ -1129,7 +1132,7 @@ func newServer(listenAddrs []string, db btcdb.Db, netParams *btcnet.Params) (*se
na := btcwire.NewNetAddressIPPort(ip, na := btcwire.NewNetAddressIPPort(ip,
uint16(port), btcwire.SFNodeNetwork) uint16(port), btcwire.SFNodeNetwork)
if discover { if discover {
amgr.addLocalAddress(na, InterfacePrio) amgr.AddLocalAddress(na, addrmgr.InterfacePrio)
} }
} }
} }
@ -1145,8 +1148,8 @@ func newServer(listenAddrs []string, db btcdb.Db, netParams *btcnet.Params) (*se
listeners = append(listeners, listener) listeners = append(listeners, listener)
if discover { if discover {
if na, err := deserialiseNetAddress(addr); err == nil { if na, err := amgr.DeserialiseNetAddress(addr); err == nil {
amgr.addLocalAddress(na, BoundPrio) amgr.AddLocalAddress(na, addrmgr.BoundPrio)
} }
} }
} }
@ -1160,8 +1163,8 @@ func newServer(listenAddrs []string, db btcdb.Db, netParams *btcnet.Params) (*se
} }
listeners = append(listeners, listener) listeners = append(listeners, listener)
if discover { if discover {
if na, err := deserialiseNetAddress(addr); err == nil { if na, err := amgr.DeserialiseNetAddress(addr); err == nil {
amgr.addLocalAddress(na, BoundPrio) amgr.AddLocalAddress(na, addrmgr.BoundPrio)
} }
} }
} }