peer: Refactor peer code into its own package.

This commit introduces package peer which contains peer related features
refactored from peer.go.

The following is an overview of the features the package provides:

- Provides a basic concurrent safe bitcoin peer for handling bitcoin
  communications via the peer-to-peer protocol
- Full duplex reading and writing of bitcoin protocol messages
- Automatic handling of the initial handshake process including protocol
  version negotiation
- Automatic periodic keep-alive pinging and pong responses
- Asynchronous message queueing of outbound messages with optional
  channel for notification when the message is actually sent
- Inventory message batching and send trickling with known inventory
  detection and avoidance
- Ability to wait for shutdown/disconnect
- Flexible peer configuration
  - Caller is responsible for creating outgoing connections and listening
    for incoming connections so they have flexibility to establish
    connections as they see fit (proxies, etc.)
  - User agent name and version
  - Bitcoin network
  - Service support signalling (full nodes, bloom filters, etc.)
  - Maximum supported protocol version
  - Ability to register callbacks for handling bitcoin protocol messages
- Proper handling of bloom filter related commands when the caller does
  not specify the related flag to signal support
  - Disconnects the peer when the protocol version is high enough
  - Does not invoke the related callbacks for older protocol versions
- Snapshottable peer statistics such as the total number of bytes read
  and written, the remote address, user agent, and negotiated protocol
  version
- Helper functions for pushing addresses, getblocks, getheaders, and
  reject messages
  - These could all be sent manually via the standard message output
    function, but the helpers provide additional nice functionality such
    as duplicate filtering and address randomization
- Full documentation with example usage
- Test coverage

In addition to the addition of the new package, btcd has been refactored
to make use of the new package by extending the basic peer it provides to
work with the blockmanager and server to act as a full node.  The
following is a broad overview of the changes to integrate the package:

- The server is responsible for all connection management including
  persistent peers and banning
- Callbacks for all messages that are required to implement a full node
  are registered
- Logic necessary to serve data and behave as a full node is now in the
  callback registered with the peer

Finally, the following peer-related things have been improved as a part
of this refactor:

- Don't log or send reject message due to peer disconnects
- Remove trace logs that aren't particularly helpful
- Finish an old TODO to switch the queue WaitGroup over to a channel
- Improve various comments and fix some code consistency cases
- Improve a few logging bits
- Implement a most-recently-used nonce tracking for detecting self
  connections and generate a unique nonce for each peer
This commit is contained in:
Javed Khan 2015-10-02 01:03:20 -05:00
parent 2e6e896aa6
commit 00bddf7540
19 changed files with 4559 additions and 2365 deletions

View file

@ -34,48 +34,51 @@ const (
blockDbNamePrefix = "blocks"
)
// zeroHash is the zero value hash (all zeros). It is defined as a convenience.
var zeroHash wire.ShaHash
// newPeerMsg signifies a newly connected peer to the block handler.
type newPeerMsg struct {
peer *peer
peer *serverPeer
}
// blockMsg packages a bitcoin block message and the peer it came from together
// so the block handler has access to that information.
type blockMsg struct {
block *btcutil.Block
peer *peer
peer *serverPeer
}
// invMsg packages a bitcoin inv message and the peer it came from together
// so the block handler has access to that information.
type invMsg struct {
inv *wire.MsgInv
peer *peer
peer *serverPeer
}
// headersMsg packages a bitcoin headers message and the peer it came from
// together so the block handler has access to that information.
type headersMsg struct {
headers *wire.MsgHeaders
peer *peer
peer *serverPeer
}
// donePeerMsg signifies a newly disconnected peer to the block handler.
type donePeerMsg struct {
peer *peer
peer *serverPeer
}
// txMsg packages a bitcoin tx message and the peer it came from together
// so the block handler has access to that information.
type txMsg struct {
tx *btcutil.Tx
peer *peer
peer *serverPeer
}
// getSyncPeerMsg is a message type to be sent across the message channel for
// retrieving the current sync peer.
type getSyncPeerMsg struct {
reply chan *peer
reply chan *serverPeer
}
// checkConnectBlockMsg is a message type to be sent across the message channel
@ -192,7 +195,7 @@ type blockManager struct {
receivedLogBlocks int64
receivedLogTx int64
processingReqs bool
syncPeer *peer
syncPeer *serverPeer
msgChan chan interface{}
chainState chainState
wg sync.WaitGroup
@ -289,11 +292,11 @@ func (b *blockManager) startSync(peers *list.List) {
return
}
var bestPeer *peer
var bestPeer *serverPeer
var enext *list.Element
for e := peers.Front(); e != nil; e = enext {
enext = e.Next()
p := e.Value.(*peer)
sp := e.Value.(*serverPeer)
// Remove sync candidate peers that are no longer candidates due
// to passing their latest known block. NOTE: The < is
@ -301,14 +304,14 @@ func (b *blockManager) startSync(peers *list.List) {
// doesn't have a later block when it's equal, it will likely
// have one soon so it is a reasonable choice. It also allows
// the case where both are at 0 such as during regression test.
if p.lastBlock < int32(height) {
if sp.LastBlock() < int32(height) {
peers.Remove(e)
continue
}
// TODO(davec): Use a better algorithm to choose the best peer.
// For now, just pick the first available candidate.
bestPeer = p
bestPeer = sp
}
// Start syncing from the best peer if one was selected.
@ -321,7 +324,7 @@ func (b *blockManager) startSync(peers *list.List) {
}
bmgrLog.Infof("Syncing to block height %d from peer %v",
bestPeer.lastBlock, bestPeer.addr)
bestPeer.LastBlock(), bestPeer.Addr())
// When the current height is less than a known checkpoint we
// can use block headers to learn about which blocks comprise
@ -347,7 +350,7 @@ func (b *blockManager) startSync(peers *list.List) {
b.headersFirstMode = true
bmgrLog.Infof("Downloading headers for blocks %d to "+
"%d from peer %s", height+1,
b.nextCheckpoint.Height, bestPeer.addr)
b.nextCheckpoint.Height, bestPeer.Addr())
} else {
bestPeer.PushGetBlocksMsg(locator, &zeroHash)
}
@ -359,14 +362,14 @@ func (b *blockManager) startSync(peers *list.List) {
// isSyncCandidate returns whether or not the peer is a candidate to consider
// syncing from.
func (b *blockManager) isSyncCandidate(p *peer) bool {
func (b *blockManager) isSyncCandidate(sp *serverPeer) bool {
// Typically a peer is not a candidate for sync if it's not a full node,
// however regression test is special in that the regression tool is
// not a full node and still needs to be considered a sync candidate.
if cfg.RegressionTest {
// The peer is not a candidate if it's not coming from localhost
// or the hostname can't be determined for some reason.
host, _, err := net.SplitHostPort(p.addr)
host, _, err := net.SplitHostPort(sp.Addr())
if err != nil {
return false
}
@ -376,7 +379,7 @@ func (b *blockManager) isSyncCandidate(p *peer) bool {
}
} else {
// The peer is not a candidate for sync if it's not a full node.
if p.services&wire.SFNodeNetwork != wire.SFNodeNetwork {
if sp.Services()&wire.SFNodeNetwork != wire.SFNodeNetwork {
return false
}
}
@ -388,21 +391,21 @@ func (b *blockManager) isSyncCandidate(p *peer) bool {
// handleNewPeerMsg deals with new peers that have signalled they may
// be considered as a sync peer (they have already successfully negotiated). It
// also starts syncing if needed. It is invoked from the syncHandler goroutine.
func (b *blockManager) handleNewPeerMsg(peers *list.List, p *peer) {
func (b *blockManager) handleNewPeerMsg(peers *list.List, sp *serverPeer) {
// Ignore if in the process of shutting down.
if atomic.LoadInt32(&b.shutdown) != 0 {
return
}
bmgrLog.Infof("New valid peer %s (%s)", p, p.userAgent)
bmgrLog.Infof("New valid peer %s (%s)", sp, sp.UserAgent())
// Ignore the peer if it's not a sync candidate.
if !b.isSyncCandidate(p) {
if !b.isSyncCandidate(sp) {
return
}
// Add the peer as a candidate to sync from.
peers.PushBack(p)
peers.PushBack(sp)
// Start syncing by choosing the best candidate if needed.
b.startSync(peers)
@ -412,20 +415,20 @@ func (b *blockManager) handleNewPeerMsg(peers *list.List, p *peer) {
// removes the peer as a candidate for syncing and in the case where it was
// the current sync peer, attempts to select a new best peer to sync from. It
// is invoked from the syncHandler goroutine.
func (b *blockManager) handleDonePeerMsg(peers *list.List, p *peer) {
func (b *blockManager) handleDonePeerMsg(peers *list.List, sp *serverPeer) {
// Remove the peer from the list of candidate peers.
for e := peers.Front(); e != nil; e = e.Next() {
if e.Value == p {
if e.Value == sp {
peers.Remove(e)
break
}
}
bmgrLog.Infof("Lost peer %s", p)
bmgrLog.Infof("Lost peer %s", sp)
// Remove requested transactions from the global map so that they will
// be fetched from elsewhere next time we get an inv.
for k := range p.requestedTxns {
for k := range sp.requestedTxns {
delete(b.requestedTxns, k)
}
@ -433,14 +436,14 @@ func (b *blockManager) handleDonePeerMsg(peers *list.List, p *peer) {
// fetched from elsewhere next time we get an inv.
// TODO(oga) we could possibly here check which peers have these blocks
// and request them now to speed things up a little.
for k := range p.requestedBlocks {
for k := range sp.requestedBlocks {
delete(b.requestedBlocks, k)
}
// Attempt to find a new peer to sync from if the quitting peer is the
// sync peer. Also, reset the headers-first state if in headers-first
// mode so
if b.syncPeer != nil && b.syncPeer == p {
if b.syncPeer != nil && b.syncPeer == sp {
b.syncPeer = nil
if b.headersFirstMode {
// This really shouldn't fail. We have a fairly
@ -472,7 +475,7 @@ func (b *blockManager) handleTxMsg(tmsg *txMsg) {
// Process the transaction to include validation, insertion in the
// memory pool, orphan handling, etc.
allowOrphans := cfg.MaxOrphanTxs > 0
err := tmsg.peer.server.txMemPool.ProcessTransaction(tmsg.tx,
err := b.server.txMemPool.ProcessTransaction(tmsg.tx,
allowOrphans, true)
// Remove transaction from request maps. Either the mempool/chain
@ -524,7 +527,7 @@ func (b *blockManager) current() bool {
// TODO(oga) we can get chain to return the height of each block when we
// parse an orphan, which would allow us to update the height of peers
// from what it was at initial handshake.
if err != nil || height < b.syncPeer.lastBlock {
if err != nil || height < b.syncPeer.LastBlock() {
return false
}
return true
@ -534,7 +537,7 @@ func (b *blockManager) current() bool {
func (b *blockManager) handleBlockMsg(bmsg *blockMsg) {
// If we didn't ask for this block then the peer is misbehaving.
blockSha := bmsg.block.Sha()
if _, ok := bmsg.peer.requestedBlocks[*blockSha]; !ok {
if _, exists := bmsg.peer.requestedBlocks[*blockSha]; !exists {
// The regression test intentionally sends some blocks twice
// to test duplicate block insertion fails. Don't disconnect
// the peer or ignore the block when we're in regression test
@ -542,7 +545,7 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) {
// duplicate blocks.
if !cfg.RegressionTest {
bmgrLog.Warnf("Got unrequested block %v from %s -- "+
"disconnecting", blockSha, bmsg.peer.addr)
"disconnecting", blockSha, bmsg.peer.Addr())
bmsg.peer.Disconnect()
return
}
@ -711,12 +714,12 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) {
err := bmsg.peer.PushGetHeadersMsg(locator, b.nextCheckpoint.Hash)
if err != nil {
bmgrLog.Warnf("Failed to send getheaders message to "+
"peer %s: %v", bmsg.peer.addr, err)
"peer %s: %v", bmsg.peer.Addr(), err)
return
}
bmgrLog.Infof("Downloading headers for blocks %d to %d from "+
"peer %s", prevHeight+1, b.nextCheckpoint.Height,
b.syncPeer.addr)
b.syncPeer.Addr())
return
}
@ -730,7 +733,7 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) {
err = bmsg.peer.PushGetBlocksMsg(locator, &zeroHash)
if err != nil {
bmgrLog.Warnf("Failed to send getblocks message to peer %s: %v",
bmsg.peer.addr, err)
bmsg.peer.Addr(), err)
return
}
}
@ -786,7 +789,7 @@ func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) {
numHeaders := len(msg.Headers)
if !b.headersFirstMode {
bmgrLog.Warnf("Got %d unrequested headers from %s -- "+
"disconnecting", numHeaders, hmsg.peer.addr)
"disconnecting", numHeaders, hmsg.peer.Addr())
hmsg.peer.Disconnect()
return
}
@ -826,7 +829,7 @@ func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) {
} else {
bmgrLog.Warnf("Received block header that does not "+
"properly connect to the chain from peer %s "+
"-- disconnecting", hmsg.peer.addr)
"-- disconnecting", hmsg.peer.Addr())
hmsg.peer.Disconnect()
return
}
@ -843,7 +846,7 @@ func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) {
"%s from peer %s does NOT match "+
"expected checkpoint hash of %s -- "+
"disconnecting", node.height,
node.sha, hmsg.peer.addr,
node.sha, hmsg.peer.Addr(),
b.nextCheckpoint.Hash)
hmsg.peer.Disconnect()
return
@ -874,7 +877,7 @@ func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) {
err := hmsg.peer.PushGetHeadersMsg(locator, b.nextCheckpoint.Hash)
if err != nil {
bmgrLog.Warnf("Failed to send getheaders message to "+
"peer %s: %v", hmsg.peer.addr, err)
"peer %s: %v", hmsg.peer.Addr(), err)
return
}
}
@ -1270,69 +1273,68 @@ func (b *blockManager) handleNotifyMsg(notification *blockchain.Notification) {
}
// NewPeer informs the block manager of a newly active peer.
func (b *blockManager) NewPeer(p *peer) {
func (b *blockManager) NewPeer(sp *serverPeer) {
// Ignore if we are shutting down.
if atomic.LoadInt32(&b.shutdown) != 0 {
return
}
b.msgChan <- &newPeerMsg{peer: p}
b.msgChan <- &newPeerMsg{peer: sp}
}
// QueueTx adds the passed transaction message and peer to the block handling
// queue.
func (b *blockManager) QueueTx(tx *btcutil.Tx, p *peer) {
func (b *blockManager) QueueTx(tx *btcutil.Tx, sp *serverPeer) {
// Don't accept more transactions if we're shutting down.
if atomic.LoadInt32(&b.shutdown) != 0 {
p.txProcessed <- struct{}{}
sp.txProcessed <- struct{}{}
return
}
b.msgChan <- &txMsg{tx: tx, peer: p}
b.msgChan <- &txMsg{tx: tx, peer: sp}
}
// QueueBlock adds the passed block message and peer to the block handling queue.
func (b *blockManager) QueueBlock(block *btcutil.Block, p *peer) {
func (b *blockManager) QueueBlock(block *btcutil.Block, sp *serverPeer) {
// Don't accept more blocks if we're shutting down.
if atomic.LoadInt32(&b.shutdown) != 0 {
p.blockProcessed <- struct{}{}
sp.blockProcessed <- struct{}{}
return
}
b.msgChan <- &blockMsg{block: block, peer: p}
b.msgChan <- &blockMsg{block: block, peer: sp}
}
// QueueInv adds the passed inv message and peer to the block handling queue.
func (b *blockManager) QueueInv(inv *wire.MsgInv, p *peer) {
func (b *blockManager) QueueInv(inv *wire.MsgInv, sp *serverPeer) {
// No channel handling here because peers do not need to block on inv
// messages.
if atomic.LoadInt32(&b.shutdown) != 0 {
return
}
b.msgChan <- &invMsg{inv: inv, peer: p}
b.msgChan <- &invMsg{inv: inv, peer: sp}
}
// QueueHeaders adds the passed headers message and peer to the block handling
// queue.
func (b *blockManager) QueueHeaders(headers *wire.MsgHeaders, p *peer) {
func (b *blockManager) QueueHeaders(headers *wire.MsgHeaders, sp *serverPeer) {
// No channel handling here because peers do not need to block on
// headers messages.
if atomic.LoadInt32(&b.shutdown) != 0 {
return
}
b.msgChan <- &headersMsg{headers: headers, peer: p}
b.msgChan <- &headersMsg{headers: headers, peer: sp}
}
// DonePeer informs the blockmanager that a peer has disconnected.
func (b *blockManager) DonePeer(p *peer) {
func (b *blockManager) DonePeer(sp *serverPeer) {
// Ignore if we are shutting down.
if atomic.LoadInt32(&b.shutdown) != 0 {
return
}
b.msgChan <- &donePeerMsg{peer: p}
b.msgChan <- &donePeerMsg{peer: sp}
}
// Start begins the core block handler which processes block and inv messages.
@ -1363,8 +1365,8 @@ func (b *blockManager) Stop() error {
}
// SyncPeer returns the current sync peer.
func (b *blockManager) SyncPeer() *peer {
reply := make(chan *peer)
func (b *blockManager) SyncPeer() *serverPeer {
reply := make(chan *serverPeer)
b.msgChan <- getSyncPeerMsg{reply: reply}
return <-reply
}

View file

@ -62,6 +62,15 @@ var (
// to parse and execute service commands specified via the -s flag.
var runServiceCommand func(string) error
// minUint32 is a helper function to return the minimum of two uint32s.
// This avoids a math import and the need to cast to floats.
func minUint32(a, b uint32) uint32 {
if a < b {
return a
}
return b
}
// config defines the configuration options for btcd.
//
// See loadConfig for details on the configuration load process.

View file

@ -203,6 +203,8 @@ information.
for the underlying JSON-RPC command and return values
* [wire](https://github.com/btcsuite/btcd/tree/master/wire) - Implements the
Bitcoin wire protocol
* [peer](https://github.com/btcsuite/btcd/tree/master/peer) -
Provides a common base for creating and managing Bitcoin network peers.
* [blockchain](https://github.com/btcsuite/btcd/tree/master/blockchain) -
Implements Bitcoin block handling and chain selection rules
* [txscript](https://github.com/btcsuite/btcd/tree/master/txscript) -

3
log.go
View file

@ -11,9 +11,9 @@ import (
"time"
"github.com/btcsuite/btcd/addrmgr"
"github.com/btcsuite/btcd/blockchain"
"github.com/btcsuite/btcd/database"
"github.com/btcsuite/btcd/peer"
"github.com/btcsuite/btcd/txscript"
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btclog"
@ -118,6 +118,7 @@ func useLogger(subsystemID string, logger btclog.Logger) {
case "PEER":
peerLog = logger
peer.UseLogger(logger)
case "RPCS":
rpcsLog = logger

2083
peer.go

File diff suppressed because it is too large Load diff

38
peer/README.md Normal file
View file

@ -0,0 +1,38 @@
peer
====
[![Build Status](https://travis-ci.org/btcsuite/btcd.png?branch=master)]
(https://travis-ci.org/btcsuite/btcd)
Package peer provides a common base for creating and managing bitcoin network
peers.
## Overview
- Create peers for full nodes, Simplified Payment Verification (SPV) nodes,
proxies etc
- Built-in handlers for common messages like initial message version
negotiation, handling and responding to pings
- Register and manage multiple custom handlers for all messages
## Documentation
[![GoDoc](https://godoc.org/github.com/btcsuite/btcd/peer?status.png)]
(http://godoc.org/github.com/btcsuite/btcd/peer)
Full `go doc` style documentation for the project can be viewed online without
installing this package by using the GoDoc site here:
http://godoc.org/github.com/btcsuite/btcd/peer
You can also view the documentation locally once the package is installed with
the `godoc` tool by running `godoc -http=":6060"` and pointing your browser to
http://localhost:6060/pkg/github.com/btcsuite/btcd/peer
## Installation
```bash
$ go get github.com/btcsuite/btcd/peer
```
Package peer is licensed under the [copyfree](http://copyfree.org) ISC
License.

23
peer/doc.go Normal file
View file

@ -0,0 +1,23 @@
// Copyright (c) 2015 The btcsuite developers
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
/*
Package peer provides a common base for creating and managing bitcoin network
peers for fully validating nodes, Simplified Payment Verification (SPV) nodes,
proxies, etc. It includes basic protocol exchanges like version negotiation,
responding to pings etc.
Inbound peers accept a connection and respond to the version message to begin
version negotiation.
Outbound peers connect and push the initial version message over a given
connection.
Both peers accept a configuration to customize options such as user agent,
service flag, protocol version, chain parameters, and proxy.
To extend the basic peer functionality provided by package peer, listeners can
be configured for all message types using callbacks in the peer configuration.
*/
package peer

113
peer/example_test.go Normal file
View file

@ -0,0 +1,113 @@
// Copyright (c) 2015 The btcsuite developers
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
package peer_test
import (
"fmt"
"net"
"time"
"github.com/btcsuite/btcd/chaincfg"
"github.com/btcsuite/btcd/peer"
"github.com/btcsuite/btcd/wire"
)
// mockRemotePeer creates a basic inbound peer listening on the simnet port for
// use with Example_peerConnection. It does not return until the listner is
// active.
func mockRemotePeer() error {
// Configure peer to act as a simnet node that offers no services.
peerCfg := &peer.Config{
UserAgentName: "peer", // User agent name to advertise.
UserAgentVersion: "1.0.0", // User agent version to advertise.
ChainParams: &chaincfg.SimNetParams,
}
// Accept connections on the simnet port.
listener, err := net.Listen("tcp", "127.0.0.1:18555")
if err != nil {
return err
}
go func() {
conn, err := listener.Accept()
if err != nil {
fmt.Printf("Accept: error %v\n", err)
return
}
// Create and start the inbound peer.
p := peer.NewInboundPeer(peerCfg, conn)
if err := p.Start(); err != nil {
fmt.Printf("Start: error %v\n", err)
return
}
}()
return nil
}
// This example demonstrates the basic process for initializing and creating an
// outbound peer. Peers negotiate by exchanging version and verack messages.
// For demonstration, a simple handler for version message is attached to the
// peer.
func Example_newOutboundPeer() {
// Ordinarily this will not be needed since the outbound peer will be
// connecting to a remote peer, however, since this example is executed
// and tested, a mock remote peer is needed to listen for the outbound
// peer.
if err := mockRemotePeer(); err != nil {
fmt.Printf("mockRemotePeer: unexpected error %v\n", err)
return
}
// Create an outbound peer that is configured to act as a simnet node
// that offers no services and has listeners for the version and verack
// messages. The verack listener is used here to signal the code below
// when the handshake has been finished by signalling a channel.
verack := make(chan struct{})
peerCfg := &peer.Config{
UserAgentName: "peer", // User agent name to advertise.
UserAgentVersion: "1.0.0", // User agent version to advertise.
ChainParams: &chaincfg.SimNetParams,
Services: 0,
Listeners: peer.MessageListeners{
OnVersion: func(p *peer.Peer, msg *wire.MsgVersion) {
fmt.Println("outbound: received version")
},
OnVerAck: func(p *peer.Peer, msg *wire.MsgVerAck) {
verack <- struct{}{}
},
},
}
p, err := peer.NewOutboundPeer(peerCfg, "127.0.0.1:18555")
if err != nil {
fmt.Printf("NewOutboundPeer: error %v\n", err)
return
}
// Establish the connection to the peer address and mark it connected.
conn, err := net.Dial("tcp", p.Addr())
if err != nil {
fmt.Printf("net.Dial: error %v\n", err)
return
}
if err := p.Connect(conn); err != nil {
fmt.Printf("Connect: error %v\n", err)
return
}
// Wait for the verack message or timeout in case of failure.
select {
case <-verack:
case <-time.After(time.Second * 1):
fmt.Printf("Example_peerConnection: verack timeout")
}
// Shutdown the peer.
p.Shutdown()
// Output:
// outbound: received version
}

18
peer/export_test.go Normal file
View file

@ -0,0 +1,18 @@
// Copyright (c) 2015 The btcsuite developers
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
/*
This test file is part of the peer package rather than than the peer_test
package so it can bridge access to the internals to properly test cases which
are either not possible or can't reliably be tested via the public interface.
The functions are only exported while the tests are being run.
*/
package peer
// TstAllowSelfConns allows the test package to allow self connections by
// disabling the detection logic.
func TstAllowSelfConns() {
allowSelfConns = true
}

241
peer/log.go Normal file
View file

@ -0,0 +1,241 @@
// Copyright (c) 2015 The btcsuite developers
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
package peer
import (
"errors"
"fmt"
"io"
"strings"
"time"
"github.com/btcsuite/btcd/txscript"
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btclog"
)
const (
// maxRejectReasonLen is the maximum length of a sanitized reject reason
// that will be logged.
maxRejectReasonLen = 250
)
// log is a logger that is initialized with no output filters. This
// means the package will not perform any logging by default until the caller
// requests it.
var log btclog.Logger
// The default amount of logging is none.
func init() {
DisableLog()
}
// DisableLog disables all library log output. Logging output is disabled
// by default until either UseLogger or SetLogWriter are called.
func DisableLog() {
log = btclog.Disabled
}
// UseLogger uses a specified Logger to output package logging info.
// This should be used in preference to SetLogWriter if the caller is also
// using btclog.
func UseLogger(logger btclog.Logger) {
log = logger
}
// SetLogWriter uses a specified io.Writer to output package logging info.
// This allows a caller to direct package logging output without needing a
// dependency on seelog. If the caller is also using btclog, UseLogger should
// be used instead.
func SetLogWriter(w io.Writer, level string) error {
if w == nil {
return errors.New("nil writer")
}
lvl, ok := btclog.LogLevelFromString(level)
if !ok {
return errors.New("invalid log level")
}
l, err := btclog.NewLoggerFromWriter(w, lvl)
if err != nil {
return err
}
UseLogger(l)
return nil
}
// LogClosure is a closure that can be printed with %v to be used to
// generate expensive-to-create data for a detailed log level and avoid doing
// the work if the data isn't printed.
type logClosure func() string
func (c logClosure) String() string {
return c()
}
func newLogClosure(c func() string) logClosure {
return logClosure(c)
}
// directionString is a helper function that returns a string that represents
// the direction of a connection (inbound or outbound).
func directionString(inbound bool) string {
if inbound {
return "inbound"
}
return "outbound"
}
// formatLockTime returns a transaction lock time as a human-readable string.
func formatLockTime(lockTime uint32) string {
// The lock time field of a transaction is either a block height at
// which the transaction is finalized or a timestamp depending on if the
// value is before the lockTimeThreshold. When it is under the
// threshold it is a block height.
if lockTime < txscript.LockTimeThreshold {
return fmt.Sprintf("height %d", lockTime)
}
return time.Unix(int64(lockTime), 0).String()
}
// invSummary returns an inventory message as a human-readable string.
func invSummary(invList []*wire.InvVect) string {
// No inventory.
invLen := len(invList)
if invLen == 0 {
return "empty"
}
// One inventory item.
if invLen == 1 {
iv := invList[0]
switch iv.Type {
case wire.InvTypeError:
return fmt.Sprintf("error %s", iv.Hash)
case wire.InvTypeBlock:
return fmt.Sprintf("block %s", iv.Hash)
case wire.InvTypeTx:
return fmt.Sprintf("tx %s", iv.Hash)
}
return fmt.Sprintf("unknown (%d) %s", uint32(iv.Type), iv.Hash)
}
// More than one inv item.
return fmt.Sprintf("size %d", invLen)
}
// locatorSummary returns a block locator as a human-readable string.
func locatorSummary(locator []*wire.ShaHash, stopHash *wire.ShaHash) string {
if len(locator) > 0 {
return fmt.Sprintf("locator %s, stop %s", locator[0], stopHash)
}
return fmt.Sprintf("no locator, stop %s", stopHash)
}
// sanitizeString strips any characters which are even remotely dangerous, such
// as html control characters, from the passed string. It also limits it to
// the passed maximum size, which can be 0 for unlimited. When the string is
// limited, it will also add "..." to the string to indicate it was truncated.
func sanitizeString(str string, maxLength uint) string {
const safeChars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXY" +
"Z01234567890 .,;_/:?@"
// Strip any characters not in the safeChars string removed.
str = strings.Map(func(r rune) rune {
if strings.IndexRune(safeChars, r) >= 0 {
return r
}
return -1
}, str)
// Limit the string to the max allowed length.
if maxLength > 0 && uint(len(str)) > maxLength {
str = str[:maxLength]
str = str + "..."
}
return str
}
// messageSummary returns a human-readable string which summarizes a message.
// Not all messages have or need a summary. This is used for debug logging.
func messageSummary(msg wire.Message) string {
switch msg := msg.(type) {
case *wire.MsgVersion:
return fmt.Sprintf("agent %s, pver %d, block %d",
msg.UserAgent, msg.ProtocolVersion, msg.LastBlock)
case *wire.MsgVerAck:
// No summary.
case *wire.MsgGetAddr:
// No summary.
case *wire.MsgAddr:
return fmt.Sprintf("%d addr", len(msg.AddrList))
case *wire.MsgPing:
// No summary - perhaps add nonce.
case *wire.MsgPong:
// No summary - perhaps add nonce.
case *wire.MsgAlert:
// No summary.
case *wire.MsgMemPool:
// No summary.
case *wire.MsgTx:
return fmt.Sprintf("hash %s, %d inputs, %d outputs, lock %s",
msg.TxSha(), len(msg.TxIn), len(msg.TxOut),
formatLockTime(msg.LockTime))
case *wire.MsgBlock:
header := &msg.Header
return fmt.Sprintf("hash %s, ver %d, %d tx, %s", msg.BlockSha(),
header.Version, len(msg.Transactions), header.Timestamp)
case *wire.MsgInv:
return invSummary(msg.InvList)
case *wire.MsgNotFound:
return invSummary(msg.InvList)
case *wire.MsgGetData:
return invSummary(msg.InvList)
case *wire.MsgGetBlocks:
return locatorSummary(msg.BlockLocatorHashes, &msg.HashStop)
case *wire.MsgGetHeaders:
return locatorSummary(msg.BlockLocatorHashes, &msg.HashStop)
case *wire.MsgHeaders:
return fmt.Sprintf("num %d", len(msg.Headers))
case *wire.MsgReject:
// Ensure the variable length strings don't contain any
// characters which are even remotely dangerous such as HTML
// control characters, etc. Also limit them to sane length for
// logging.
rejCommand := sanitizeString(msg.Cmd, wire.CommandSize)
rejReason := sanitizeString(msg.Reason, maxRejectReasonLen)
summary := fmt.Sprintf("cmd %v, code %v, reason %v", rejCommand,
msg.Code, rejReason)
if rejCommand == wire.CmdBlock || rejCommand == wire.CmdTx {
summary += fmt.Sprintf(", hash %v", msg.Hash)
}
return summary
}
// No summary for other messages.
return ""
}

65
peer/log_test.go Normal file
View file

@ -0,0 +1,65 @@
// Copyright (c) 2015 The btcsuite developers Use of this source code is
// governed by an ISC license that can be found in the LICENSE file.
package peer_test
import (
"bytes"
"errors"
"io"
"testing"
"github.com/btcsuite/btcd/peer"
)
func TestSetLogWriter(t *testing.T) {
tests := []struct {
name string
w io.Writer
level string
expected error
}{
{
name: "nil writer",
w: nil,
level: "trace",
expected: errors.New("nil writer"),
},
{
name: "invalid log level",
w: bytes.NewBuffer(nil),
level: "wrong",
expected: errors.New("invalid log level"),
},
{
name: "use off level",
w: bytes.NewBuffer(nil),
level: "off",
expected: errors.New("min level can't be greater than max. Got min: 6, max: 5"),
},
{
name: "pass",
w: bytes.NewBuffer(nil),
level: "debug",
expected: nil,
},
}
t.Logf("Running %d tests", len(tests))
for i, test := range tests {
err := peer.SetLogWriter(test.w, test.level)
if err != nil {
if err.Error() != test.expected.Error() {
t.Errorf("SetLogWriter #%d (%s) wrong result\n"+
"got: %v\nwant: %v", i, test.name, err,
test.expected)
}
} else {
if test.expected != nil {
t.Errorf("SetLogWriter #%d (%s) wrong result\n"+
"got: %v\nwant: %v", i, test.name, err,
test.expected)
}
}
}
}

View file

@ -2,26 +2,34 @@
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
package main
package peer
import (
"bytes"
"container/list"
"fmt"
"sync"
"github.com/btcsuite/btcd/wire"
)
// MruInventoryMap provides a map that is limited to a maximum number of items
// with eviction for the oldest entry when the limit is exceeded.
// MruInventoryMap provides a concurrency safe map that is limited to a maximum
// number of items with eviction for the oldest entry when the limit is
// exceeded.
type MruInventoryMap struct {
invMtx sync.Mutex
invMap map[wire.InvVect]*list.Element // nearly O(1) lookups
invList *list.List // O(1) insert, update, delete
limit uint
}
// String returns the map as a human-readable string.
func (m MruInventoryMap) String() string {
//
// This function is safe for concurrent access.
func (m *MruInventoryMap) String() string {
m.invMtx.Lock()
defer m.invMtx.Unlock()
lastEntryNum := len(m.invMap) - 1
curEntry := 0
buf := bytes.NewBufferString("[")
@ -38,7 +46,12 @@ func (m MruInventoryMap) String() string {
}
// Exists returns whether or not the passed inventory item is in the map.
//
// This function is safe for concurrent access.
func (m *MruInventoryMap) Exists(iv *wire.InvVect) bool {
m.invMtx.Lock()
defer m.invMtx.Unlock()
if _, exists := m.invMap[*iv]; exists {
return true
}
@ -48,7 +61,12 @@ func (m *MruInventoryMap) Exists(iv *wire.InvVect) bool {
// Add adds the passed inventory to the map and handles eviction of the oldest
// item if adding the new item would exceed the max limit. Adding an existing
// item makes it the most recently used item.
//
// This function is safe for concurrent access.
func (m *MruInventoryMap) Add(iv *wire.InvVect) {
m.invMtx.Lock()
defer m.invMtx.Unlock()
// When the limit is zero, nothing can be added to the map, so just
// return.
if m.limit == 0 {
@ -87,7 +105,12 @@ func (m *MruInventoryMap) Add(iv *wire.InvVect) {
}
// Delete deletes the passed inventory item from the map (if it exists).
//
// This function is safe for concurrent access.
func (m *MruInventoryMap) Delete(iv *wire.InvVect) {
m.invMtx.Lock()
defer m.invMtx.Unlock()
if node, exists := m.invMap[*iv]; exists {
m.invList.Remove(node)
delete(m.invMap, *iv)

View file

@ -2,7 +2,7 @@
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
package main
package peer
import (
"crypto/rand"
@ -51,7 +51,7 @@ testLoop:
// Ensure the limited number of most recent entries in the
// inventory vector list exist.
for j := numInvVects - 1; j >= numInvVects-test.limit; j-- {
for j := numInvVects - test.limit; j < numInvVects; j++ {
if !mruInvMap.Exists(invVects[j]) {
t.Errorf("Exists #%d (%s) entry %s does not "+
"exist", i, test.name, *invVects[j])
@ -61,7 +61,7 @@ testLoop:
// Ensure the entries before the limited number of most recent
// entries in the inventory vector list do not exist.
for j := numInvVects - test.limit - 1; j >= 0; j-- {
for j := 0; j < numInvVects-test.limit; j++ {
if mruInvMap.Exists(invVects[j]) {
t.Errorf("Exists #%d (%s) entry %s exists", i,
test.name, *invVects[j])

129
peer/mrunoncemap.go Normal file
View file

@ -0,0 +1,129 @@
// Copyright (c) 2015 The btcsuite developers
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
package peer
import (
"bytes"
"container/list"
"fmt"
"sync"
)
// mruNonceMap provides a concurrency safe map that is limited to a maximum
// number of items with eviction for the oldest entry when the limit is
// exceeded.
type mruNonceMap struct {
mtx sync.Mutex
nonceMap map[uint64]*list.Element // nearly O(1) lookups
nonceList *list.List // O(1) insert, update, delete
limit uint
}
// String returns the map as a human-readable string.
//
// This function is safe for concurrent access.
func (m *mruNonceMap) String() string {
m.mtx.Lock()
defer m.mtx.Unlock()
lastEntryNum := len(m.nonceMap) - 1
curEntry := 0
buf := bytes.NewBufferString("[")
for nonce := range m.nonceMap {
buf.WriteString(fmt.Sprintf("%d", nonce))
if curEntry < lastEntryNum {
buf.WriteString(", ")
}
curEntry++
}
buf.WriteString("]")
return fmt.Sprintf("<%d>%s", m.limit, buf.String())
}
// Exists returns whether or not the passed nonce is in the map.
//
// This function is safe for concurrent access.
func (m *mruNonceMap) Exists(nonce uint64) bool {
m.mtx.Lock()
defer m.mtx.Unlock()
if _, exists := m.nonceMap[nonce]; exists {
return true
}
return false
}
// Add adds the passed nonce to the map and handles eviction of the oldest item
// if adding the new item would exceed the max limit. Adding an existing item
// makes it the most recently used item.
//
// This function is safe for concurrent access.
func (m *mruNonceMap) Add(nonce uint64) {
m.mtx.Lock()
defer m.mtx.Unlock()
// When the limit is zero, nothing can be added to the map, so just
// return.
if m.limit == 0 {
return
}
// When the entry already exists move it to the front of the list
// thereby marking it most recently used.
if node, exists := m.nonceMap[nonce]; exists {
m.nonceList.MoveToFront(node)
return
}
// Evict the least recently used entry (back of the list) if the the new
// entry would exceed the size limit for the map. Also reuse the list
// node so a new one doesn't have to be allocated.
if uint(len(m.nonceMap))+1 > m.limit {
node := m.nonceList.Back()
lru := node.Value.(uint64)
// Evict least recently used item.
delete(m.nonceMap, lru)
// Reuse the list node of the item that was just evicted for the
// new item.
node.Value = nonce
m.nonceList.MoveToFront(node)
m.nonceMap[nonce] = node
return
}
// The limit hasn't been reached yet, so just add the new item.
node := m.nonceList.PushFront(nonce)
m.nonceMap[nonce] = node
return
}
// Delete deletes the passed nonce from the map (if it exists).
//
// This function is safe for concurrent access.
func (m *mruNonceMap) Delete(nonce uint64) {
m.mtx.Lock()
defer m.mtx.Unlock()
if node, exists := m.nonceMap[nonce]; exists {
m.nonceList.Remove(node)
delete(m.nonceMap, nonce)
}
}
// newMruNonceMap returns a new nonce map that is limited to the number of
// entries specified by limit. When the number of entries exceeds the limit,
// the oldest (least recently used) entry will be removed to make room for the
// new entry.
func newMruNonceMap(limit uint) *mruNonceMap {
m := mruNonceMap{
nonceMap: make(map[uint64]*list.Element),
nonceList: list.New(),
limit: limit,
}
return &m
}

152
peer/mrunoncemap_test.go Normal file
View file

@ -0,0 +1,152 @@
// Copyright (c) 2015 The btcsuite developers
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
package peer
import (
"fmt"
"testing"
)
// TestMruNonceMap ensures the mruNonceMap behaves as expected including
// limiting, eviction of least-recently used entries, specific entry removal,
// and existence tests.
func TestMruNonceMap(t *testing.T) {
// Create a bunch of fake nonces to use in testing the mru nonce code.
numNonces := 10
nonces := make([]uint64, 0, numNonces)
for i := 0; i < numNonces; i++ {
nonces = append(nonces, uint64(i))
}
tests := []struct {
name string
limit int
}{
{name: "limit 0", limit: 0},
{name: "limit 1", limit: 1},
{name: "limit 5", limit: 5},
{name: "limit 7", limit: 7},
{name: "limit one less than available", limit: numNonces - 1},
{name: "limit all available", limit: numNonces},
}
testLoop:
for i, test := range tests {
// Create a new mru nonce map limited by the specified test
// limit and add all of the test nonces. This will cause
// evicition since there are more test nonces than the limits.
mruNonceMap := newMruNonceMap(uint(test.limit))
for j := 0; j < numNonces; j++ {
mruNonceMap.Add(nonces[j])
}
// Ensure the limited number of most recent entries in the list
// exist.
for j := numNonces - test.limit; j < numNonces; j++ {
if !mruNonceMap.Exists(nonces[j]) {
t.Errorf("Exists #%d (%s) entry %d does not "+
"exist", i, test.name, nonces[j])
continue testLoop
}
}
// Ensure the entries before the limited number of most recent
// entries in the list do not exist.
for j := 0; j < numNonces-test.limit; j++ {
if mruNonceMap.Exists(nonces[j]) {
t.Errorf("Exists #%d (%s) entry %d exists", i,
test.name, nonces[j])
continue testLoop
}
}
// Readd the entry that should currently be the least-recently
// used entry so it becomes the most-recently used entry, then
// force an eviction by adding an entry that doesn't exist and
// ensure the evicted entry is the new least-recently used
// entry.
//
// This check needs at least 2 entries.
if test.limit > 1 {
origLruIndex := numNonces - test.limit
mruNonceMap.Add(nonces[origLruIndex])
mruNonceMap.Add(uint64(numNonces) + 1)
// Ensure the original lru entry still exists since it
// was updated and should've have become the mru entry.
if !mruNonceMap.Exists(nonces[origLruIndex]) {
t.Errorf("MRU #%d (%s) entry %d does not exist",
i, test.name, nonces[origLruIndex])
continue testLoop
}
// Ensure the entry that should've become the new lru
// entry was evicted.
newLruIndex := origLruIndex + 1
if mruNonceMap.Exists(nonces[newLruIndex]) {
t.Errorf("MRU #%d (%s) entry %d exists", i,
test.name, nonces[newLruIndex])
continue testLoop
}
}
// Delete all of the entries in the list, including those that
// don't exist in the map, and ensure they no longer exist.
for j := 0; j < numNonces; j++ {
mruNonceMap.Delete(nonces[j])
if mruNonceMap.Exists(nonces[j]) {
t.Errorf("Delete #%d (%s) entry %d exists", i,
test.name, nonces[j])
continue testLoop
}
}
}
}
// TestMruNonceMapStringer tests the stringized output for the mruNonceMap type.
func TestMruNonceMapStringer(t *testing.T) {
// Create a couple of fake nonces to use in testing the mru nonce
// stringer code.
nonce1 := uint64(10)
nonce2 := uint64(20)
// Create new mru nonce map and add the nonces.
mruNonceMap := newMruNonceMap(uint(2))
mruNonceMap.Add(nonce1)
mruNonceMap.Add(nonce2)
// Ensure the stringer gives the expected result. Since map iteration
// is not ordered, either entry could be first, so account for both
// cases.
wantStr1 := fmt.Sprintf("<%d>[%d, %d]", 2, nonce1, nonce2)
wantStr2 := fmt.Sprintf("<%d>[%d, %d]", 2, nonce2, nonce1)
gotStr := mruNonceMap.String()
if gotStr != wantStr1 && gotStr != wantStr2 {
t.Fatalf("unexpected string representation - got %q, want %q "+
"or %q", gotStr, wantStr1, wantStr2)
}
}
// BenchmarkMruNonceList performs basic benchmarks on the most recently used
// nonce handling.
func BenchmarkMruNonceList(b *testing.B) {
// Create a bunch of fake nonces to use in benchmarking the mru nonce
// code.
b.StopTimer()
numNonces := 100000
nonces := make([]uint64, 0, numNonces)
for i := 0; i < numNonces; i++ {
nonces = append(nonces, uint64(i))
}
b.StartTimer()
// Benchmark the add plus evicition code.
limit := 20000
mruNonceMap := newMruNonceMap(uint(limit))
for i := 0; i < b.N; i++ {
mruNonceMap.Add(nonces[i%numNonces])
}
}

1870
peer/peer.go Normal file

File diff suppressed because it is too large Load diff

659
peer/peer_test.go Normal file
View file

@ -0,0 +1,659 @@
// Copyright (c) 2015 The btcsuite developers
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
package peer_test
import (
"errors"
"io"
"net"
"strconv"
"testing"
"time"
"github.com/btcsuite/btcd/chaincfg"
"github.com/btcsuite/btcd/peer"
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/go-socks/socks"
)
// conn mocks a network connection by implementing the net.Conn interface. It
// is used to test peer connection without actually opening a network
// connection.
type conn struct {
io.Reader
io.Writer
io.Closer
// local network, address for the connection.
lnet, laddr string
// remote network, address for the connection.
rnet, raddr string
// mocks socks proxy if true
proxy bool
}
// LocalAddr returns the local address for the connection.
func (c conn) LocalAddr() net.Addr {
return &addr{c.lnet, c.laddr}
}
// Remote returns the remote address for the connection.
func (c conn) RemoteAddr() net.Addr {
if !c.proxy {
return &addr{c.rnet, c.raddr}
}
host, strPort, _ := net.SplitHostPort(c.raddr)
port, _ := strconv.Atoi(strPort)
return &socks.ProxiedAddr{
Net: c.rnet,
Host: host,
Port: port,
}
}
// Close handles closing the connection.
func (c conn) Close() error {
return nil
}
func (c conn) SetDeadline(t time.Time) error { return nil }
func (c conn) SetReadDeadline(t time.Time) error { return nil }
func (c conn) SetWriteDeadline(t time.Time) error { return nil }
// addr mocks a network address
type addr struct {
net, address string
}
func (m addr) Network() string { return m.net }
func (m addr) String() string { return m.address }
// pipe turns two mock connections into a full-duplex connection similar to
// net.Pipe to allow pipe's with (fake) addresses.
func pipe(c1, c2 *conn) (*conn, *conn) {
r1, w1 := io.Pipe()
r2, w2 := io.Pipe()
c1.Writer = w1
c2.Reader = r1
c1.Reader = r2
c2.Writer = w2
return c1, c2
}
// peerStats holds the expected peer stats used for testing peer.
type peerStats struct {
wantUserAgent string
wantServices wire.ServiceFlag
wantProtocolVersion uint32
wantConnected bool
wantVersionKnown bool
wantVerAckReceived bool
wantLastBlock int32
wantStartingHeight int32
wantLastPingTime time.Time
wantLastPingNonce uint64
wantLastPingMicros int64
wantTimeOffset int64
wantBytesSent uint64
wantBytesReceived uint64
}
// testPeer tests the given peer's flags and stats
func testPeer(t *testing.T, p *peer.Peer, s peerStats) {
if p.UserAgent() != s.wantUserAgent {
t.Errorf("testPeer: wrong UserAgent - got %v, want %v", p.UserAgent(), s.wantUserAgent)
return
}
if p.Services() != s.wantServices {
t.Errorf("testPeer: wrong Services - got %v, want %v", p.Services(), s.wantServices)
return
}
if !p.LastPingTime().Equal(s.wantLastPingTime) {
t.Errorf("testPeer: wrong LastPingTime - got %v, want %v", p.LastPingTime(), s.wantLastPingTime)
return
}
if p.LastPingNonce() != s.wantLastPingNonce {
t.Errorf("testPeer: wrong LastPingNonce - got %v, want %v", p.LastPingNonce(), s.wantLastPingNonce)
return
}
if p.LastPingMicros() != s.wantLastPingMicros {
t.Errorf("testPeer: wrong LastPingMicros - got %v, want %v", p.LastPingMicros(), s.wantLastPingMicros)
return
}
if p.VerAckReceived() != s.wantVerAckReceived {
t.Errorf("testPeer: wrong VerAckReceived - got %v, want %v", p.VerAckReceived(), s.wantVerAckReceived)
return
}
if p.VersionKnown() != s.wantVersionKnown {
t.Errorf("testPeer: wrong VersionKnown - got %v, want %v", p.VersionKnown(), s.wantVersionKnown)
return
}
if p.ProtocolVersion() != s.wantProtocolVersion {
t.Errorf("testPeer: wrong ProtocolVersion - got %v, want %v", p.ProtocolVersion(), s.wantProtocolVersion)
return
}
if p.LastBlock() != s.wantLastBlock {
t.Errorf("testPeer: wrong LastBlock - got %v, want %v", p.LastBlock(), s.wantLastBlock)
return
}
if p.TimeOffset() != s.wantTimeOffset {
t.Errorf("testPeer: wrong TimeOffset - got %v, want %v", p.TimeOffset(), s.wantTimeOffset)
return
}
if p.BytesSent() != s.wantBytesSent {
t.Errorf("testPeer: wrong BytesSent - got %v, want %v", p.BytesSent(), s.wantBytesSent)
return
}
if p.BytesReceived() != s.wantBytesReceived {
t.Errorf("testPeer: wrong BytesReceived - got %v, want %v", p.BytesReceived(), s.wantBytesReceived)
return
}
if p.StartingHeight() != s.wantStartingHeight {
t.Errorf("testPeer: wrong StartingHeight - got %v, want %v", p.StartingHeight(), s.wantStartingHeight)
return
}
if p.Connected() != s.wantConnected {
t.Errorf("testPeer: wrong Connected - got %v, want %v", p.Connected(), s.wantConnected)
return
}
stats := p.StatsSnapshot()
if p.ID() != stats.ID {
t.Errorf("testPeer: wrong ID - got %v, want %v", p.ID(), stats.ID)
return
}
if p.Addr() != stats.Addr {
t.Errorf("testPeer: wrong Addr - got %v, want %v", p.Addr(), stats.Addr)
return
}
if p.LastSend() != stats.LastSend {
t.Errorf("testPeer: wrong LastSend - got %v, want %v", p.LastSend(), stats.LastSend)
return
}
if p.LastRecv() != stats.LastRecv {
t.Errorf("testPeer: wrong LastRecv - got %v, want %v", p.LastRecv(), stats.LastRecv)
return
}
}
// TestPeerConnection tests connection between inbound and outbound peers.
func TestPeerConnection(t *testing.T) {
verack := make(chan struct{}, 1)
peerCfg := &peer.Config{
Listeners: peer.MessageListeners{
OnWrite: func(p *peer.Peer, bytesWritten int, msg wire.Message, err error) {
switch msg.(type) {
case *wire.MsgVerAck:
verack <- struct{}{}
}
},
},
UserAgentName: "peer",
UserAgentVersion: "1.0",
ChainParams: &chaincfg.MainNetParams,
Services: 0,
}
wantStats := peerStats{
wantUserAgent: wire.DefaultUserAgent + "peer:1.0/",
wantServices: 0,
wantProtocolVersion: peer.MaxProtocolVersion,
wantConnected: true,
wantVersionKnown: true,
wantVerAckReceived: true,
wantLastPingTime: time.Time{},
wantLastPingNonce: uint64(0),
wantLastPingMicros: int64(0),
wantTimeOffset: int64(0),
wantBytesSent: 158, // 134 version + 24 verack
wantBytesReceived: 158,
}
tests := []struct {
name string
setup func() (*peer.Peer, *peer.Peer, error)
}{
{
"basic handshake",
func() (*peer.Peer, *peer.Peer, error) {
inConn, outConn := pipe(
&conn{raddr: "10.0.0.1:8333"},
&conn{raddr: "10.0.0.2:8333"},
)
inPeer := peer.NewInboundPeer(peerCfg, inConn)
err := inPeer.Start()
if err != nil {
return nil, nil, err
}
outPeer, err := peer.NewOutboundPeer(peerCfg, "10.0.0.2:8333")
if err != nil {
return nil, nil, err
}
if err := outPeer.Connect(outConn); err != nil {
return nil, nil, err
}
for i := 0; i < 2; i++ {
select {
case <-verack:
case <-time.After(time.Second * 1):
return nil, nil, errors.New("verack timeout")
}
}
return inPeer, outPeer, nil
},
},
{
"socks proxy",
func() (*peer.Peer, *peer.Peer, error) {
inConn, outConn := pipe(
&conn{raddr: "10.0.0.1:8333", proxy: true},
&conn{raddr: "10.0.0.2:8333"},
)
inPeer := peer.NewInboundPeer(peerCfg, inConn)
err := inPeer.Start()
if err != nil {
return nil, nil, err
}
outPeer, err := peer.NewOutboundPeer(peerCfg, "10.0.0.2:8333")
if err != nil {
return nil, nil, err
}
if err := outPeer.Connect(outConn); err != nil {
return nil, nil, err
}
for i := 0; i < 2; i++ {
select {
case <-verack:
case <-time.After(time.Second * 1):
return nil, nil, errors.New("verack timeout")
}
}
return inPeer, outPeer, nil
},
},
}
t.Logf("Running %d tests", len(tests))
for i, test := range tests {
inPeer, outPeer, err := test.setup()
if err != nil {
t.Errorf("TestPeerConnection setup #%d: unexpected err %v\n", i, err)
return
}
testPeer(t, inPeer, wantStats)
testPeer(t, outPeer, wantStats)
inPeer.Shutdown()
outPeer.Shutdown()
}
}
// TestPeerListeners tests that the peer listeners are called as expected.
func TestPeerListeners(t *testing.T) {
verack := make(chan struct{}, 1)
ok := make(chan wire.Message, 20)
peerCfg := &peer.Config{
Listeners: peer.MessageListeners{
OnGetAddr: func(p *peer.Peer, msg *wire.MsgGetAddr) {
ok <- msg
},
OnAddr: func(p *peer.Peer, msg *wire.MsgAddr) {
ok <- msg
},
OnPing: func(p *peer.Peer, msg *wire.MsgPing) {
ok <- msg
},
OnPong: func(p *peer.Peer, msg *wire.MsgPong) {
ok <- msg
},
OnAlert: func(p *peer.Peer, msg *wire.MsgAlert) {
ok <- msg
},
OnMemPool: func(p *peer.Peer, msg *wire.MsgMemPool) {
ok <- msg
},
OnTx: func(p *peer.Peer, msg *wire.MsgTx) {
ok <- msg
},
OnBlock: func(p *peer.Peer, msg *wire.MsgBlock, buf []byte) {
ok <- msg
},
OnInv: func(p *peer.Peer, msg *wire.MsgInv) {
ok <- msg
},
OnHeaders: func(p *peer.Peer, msg *wire.MsgHeaders) {
ok <- msg
},
OnNotFound: func(p *peer.Peer, msg *wire.MsgNotFound) {
ok <- msg
},
OnGetData: func(p *peer.Peer, msg *wire.MsgGetData) {
ok <- msg
},
OnGetBlocks: func(p *peer.Peer, msg *wire.MsgGetBlocks) {
ok <- msg
},
OnGetHeaders: func(p *peer.Peer, msg *wire.MsgGetHeaders) {
ok <- msg
},
OnFilterAdd: func(p *peer.Peer, msg *wire.MsgFilterAdd) {
ok <- msg
},
OnFilterClear: func(p *peer.Peer, msg *wire.MsgFilterClear) {
ok <- msg
},
OnFilterLoad: func(p *peer.Peer, msg *wire.MsgFilterLoad) {
ok <- msg
},
OnMerkleBlock: func(p *peer.Peer, msg *wire.MsgMerkleBlock) {
ok <- msg
},
OnVersion: func(p *peer.Peer, msg *wire.MsgVersion) {
ok <- msg
},
OnVerAck: func(p *peer.Peer, msg *wire.MsgVerAck) {
verack <- struct{}{}
},
OnReject: func(p *peer.Peer, msg *wire.MsgReject) {
ok <- msg
},
},
UserAgentName: "peer",
UserAgentVersion: "1.0",
ChainParams: &chaincfg.MainNetParams,
Services: wire.SFNodeBloom,
}
inConn, outConn := pipe(
&conn{raddr: "10.0.0.1:8333"},
&conn{raddr: "10.0.0.2:8333"},
)
inPeer := peer.NewInboundPeer(peerCfg, inConn)
err := inPeer.Start()
if err != nil {
t.Errorf("TestPeerListeners: unexpected err %v\n", err)
return
}
peerCfg.Listeners = peer.MessageListeners{
OnVerAck: func(p *peer.Peer, msg *wire.MsgVerAck) {
verack <- struct{}{}
},
}
outPeer, err := peer.NewOutboundPeer(peerCfg, "10.0.0.1:8333")
if err != nil {
t.Errorf("NewOutboundPeer: unexpected err %v\n", err)
return
}
if err := outPeer.Connect(outConn); err != nil {
t.Errorf("TestPeerListeners: unexpected err %v\n", err)
return
}
for i := 0; i < 2; i++ {
select {
case <-verack:
case <-time.After(time.Second * 1):
t.Errorf("TestPeerListeners: verack timeout\n")
return
}
}
tests := []struct {
listener string
msg wire.Message
}{
{
"OnGetAddr",
wire.NewMsgGetAddr(),
},
{
"OnAddr",
wire.NewMsgAddr(),
},
{
"OnPing",
wire.NewMsgPing(42),
},
{
"OnPong",
wire.NewMsgPong(42),
},
{
"OnAlert",
wire.NewMsgAlert([]byte("payload"), []byte("signature")),
},
{
"OnMemPool",
wire.NewMsgMemPool(),
},
{
"OnTx",
wire.NewMsgTx(),
},
{
"OnBlock",
wire.NewMsgBlock(wire.NewBlockHeader(&wire.ShaHash{}, &wire.ShaHash{}, 1, 1)),
},
{
"OnInv",
wire.NewMsgInv(),
},
{
"OnHeaders",
wire.NewMsgHeaders(),
},
{
"OnNotFound",
wire.NewMsgNotFound(),
},
{
"OnGetData",
wire.NewMsgGetData(),
},
{
"OnGetBlocks",
wire.NewMsgGetBlocks(&wire.ShaHash{}),
},
{
"OnGetHeaders",
wire.NewMsgGetHeaders(),
},
{
"OnFilterAdd",
wire.NewMsgFilterAdd([]byte{0x01}),
},
{
"OnFilterClear",
wire.NewMsgFilterClear(),
},
{
"OnFilterLoad",
wire.NewMsgFilterLoad([]byte{0x01}, 10, 0, wire.BloomUpdateNone),
},
{
"OnMerkleBlock",
wire.NewMsgMerkleBlock(wire.NewBlockHeader(&wire.ShaHash{}, &wire.ShaHash{}, 1, 1)),
},
// only one version message is allowed
// only one verack message is allowed
{
"OnMsgReject",
wire.NewMsgReject("block", wire.RejectDuplicate, "dupe block"),
},
}
t.Logf("Running %d tests", len(tests))
for _, test := range tests {
// Queue the test message
outPeer.QueueMessage(test.msg, nil)
select {
case <-ok:
case <-time.After(time.Second * 1):
t.Errorf("TestPeerListeners: %s timeout", test.listener)
return
}
}
inPeer.Shutdown()
outPeer.Shutdown()
}
// TestOutboundPeer tests that the outbound peer works as expected.
func TestOutboundPeer(t *testing.T) {
// Use a mock NewestBlock func to test errs
var errBlockNotFound = errors.New("newest block not found")
var mockNewestSha = func() (*wire.ShaHash, int32, error) {
return nil, 0, errBlockNotFound
}
peerCfg := &peer.Config{
NewestBlock: mockNewestSha,
UserAgentName: "peer",
UserAgentVersion: "1.0",
ChainParams: &chaincfg.MainNetParams,
Services: 0,
}
r, w := io.Pipe()
c := &conn{raddr: "10.0.0.1:8333", Writer: w, Reader: r}
p, err := peer.NewOutboundPeer(peerCfg, "10.0.0.1:8333")
if err != nil {
t.Errorf("NewOutboundPeer: unexpected err - %v\n", err)
return
}
// Test Connect err
wantErr := errBlockNotFound
if err := p.Connect(c); err != wantErr {
t.Errorf("Connect: expected err %v, got %v\n", wantErr, err)
return
}
// Test already connected
if err := p.Connect(c); err != nil {
t.Errorf("Connect: unexpected err %v\n", err)
return
}
// Test already started
if err := p.Start(); err != nil {
t.Errorf("Start: unexpected err %v\n", err)
return
}
// Test Queue Inv
fakeBlockHash := &wire.ShaHash{0x00, 0x01}
fakeInv := wire.NewInvVect(wire.InvTypeBlock, fakeBlockHash)
p.QueueInventory(fakeInv)
p.AddKnownInventory(fakeInv)
p.QueueInventory(fakeInv)
// Test Queue Message
fakeMsg := wire.NewMsgVerAck()
p.QueueMessage(fakeMsg, nil)
done := make(chan struct{}, 5)
p.QueueMessage(fakeMsg, done)
<-done
p.Shutdown()
// Test NewestBlock
var newestBlock = func() (*wire.ShaHash, int32, error) {
hashStr := "14a0810ac680a3eb3f82edc878cea25ec41d6b790744e5daeef"
hash, err := wire.NewShaHashFromStr(hashStr)
if err != nil {
return nil, 0, err
}
return hash, 234439, nil
}
peerCfg.NewestBlock = newestBlock
p1, err := peer.NewOutboundPeer(peerCfg, "10.0.0.1:8333")
if err != nil {
t.Errorf("NewOutboundPeer: unexpected err - %v\n", err)
return
}
if err := p1.Connect(c); err != nil {
t.Errorf("Connect: unexpected err %v\n", err)
return
}
// Test update latest block
latestBlockSha, err := wire.NewShaHashFromStr("1a63f9cdff1752e6375c8c76e543a71d239e1a2e5c6db1aa679")
if err != nil {
t.Errorf("NewShaHashFromStr: unexpected err %v\n", err)
return
}
p1.UpdateLastAnnouncedBlock(latestBlockSha)
p1.UpdateLastBlockHeight(234440)
if p1.LastAnnouncedBlock() != latestBlockSha {
t.Errorf("LastAnnouncedBlock: wrong block - got %v, want %v",
p1.LastAnnouncedBlock(), latestBlockSha)
return
}
// Test Queue Inv after connection
p1.QueueInventory(fakeInv)
p1.Shutdown()
// Test regression
peerCfg.ChainParams = &chaincfg.RegressionNetParams
peerCfg.Services = wire.SFNodeBloom
p2, err := peer.NewOutboundPeer(peerCfg, "10.0.0.1:8333")
if err != nil {
t.Errorf("NewOutboundPeer: unexpected err - %v\n", err)
return
}
if err := p2.Connect(c); err != nil {
t.Errorf("Connect: unexpected err %v\n", err)
return
}
// Test PushXXX
var addrs []*wire.NetAddress
for i := 0; i < 5; i++ {
na := wire.NetAddress{}
addrs = append(addrs, &na)
}
if _, err := p2.PushAddrMsg(addrs); err != nil {
t.Errorf("PushAddrMsg: unexpected err %v\n", err)
return
}
if err := p2.PushGetBlocksMsg(nil, &wire.ShaHash{}); err != nil {
t.Errorf("PushGetBlocksMsg: unexpected err %v\n", err)
return
}
if err := p2.PushGetHeadersMsg(nil, &wire.ShaHash{}); err != nil {
t.Errorf("PushGetHeadersMsg: unexpected err %v\n", err)
return
}
p2.PushRejectMsg("block", wire.RejectMalformed, "malformed", nil, true)
p2.PushRejectMsg("block", wire.RejectInvalid, "invalid", nil, false)
// Test Queue Messages
p2.QueueMessage(wire.NewMsgGetAddr(), done)
p2.QueueMessage(wire.NewMsgPing(1), done)
p2.QueueMessage(wire.NewMsgMemPool(), done)
p2.QueueMessage(wire.NewMsgGetData(), done)
p2.QueueMessage(wire.NewMsgGetHeaders(), done)
p2.Shutdown()
}
func init() {
// Allow self connection when running the tests.
peer.TstAllowSelfConns()
}

View file

@ -76,6 +76,9 @@ const (
// changed and there have been changes to the available transactions
// in the memory pool.
gbtRegenerateSeconds = 60
// maxProtocolVersion is the max protocol version the server supports.
maxProtocolVersion = 70002
)
var (
@ -415,7 +418,7 @@ func handleNode(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (inter
}
}
}
if err != nil && peerExists(s.server.PeerInfo(), addr, int32(nodeID)) {
if err != nil && peerExists(s.server.Peers(), addr, int32(nodeID)) {
return nil, &btcjson.RPCError{
Code: btcjson.ErrRPCMisc,
Message: "can't disconnect a permanent peer, use remove",
@ -438,7 +441,7 @@ func handleNode(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (inter
}
}
}
if err != nil && peerExists(s.server.PeerInfo(), addr, int32(nodeID)) {
if err != nil && peerExists(s.server.Peers(), addr, int32(nodeID)) {
return nil, &btcjson.RPCError{
Code: btcjson.ErrRPCMisc,
Message: "can't remove a temporary peer, use disconnect",
@ -483,9 +486,9 @@ func handleNode(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (inter
// peerExists determines if a certain peer is currently connected given
// information about all currently connected peers. Peer existence is
// determined using either a target address or node id.
func peerExists(peerInfos []*btcjson.GetPeerInfoResult, addr string, nodeID int32) bool {
for _, peerInfo := range peerInfos {
if peerInfo.ID == nodeID || peerInfo.Addr == addr {
func peerExists(peers []*serverPeer, addr string, nodeID int32) bool {
for _, p := range peers {
if p.ID() == nodeID || p.Addr() == addr {
return true
}
}
@ -945,7 +948,7 @@ func handleGetAddedNodeInfo(s *rpcServer, cmd interface{}, closeChan <-chan stru
node := *c.Node
found := false
for i, peer := range peers {
if peer.addr == node {
if peer.Addr() == node {
peers = peers[i : i+1]
found = true
}
@ -963,7 +966,7 @@ func handleGetAddedNodeInfo(s *rpcServer, cmd interface{}, closeChan <-chan stru
if !c.DNS {
results := make([]string, 0, len(peers))
for _, peer := range peers {
results = append(results, peer.addr)
results = append(results, peer.Addr())
}
return results, nil
}
@ -975,15 +978,15 @@ func handleGetAddedNodeInfo(s *rpcServer, cmd interface{}, closeChan <-chan stru
// Set the "address" of the peer which could be an ip address
// or a domain name.
var result btcjson.GetAddedNodeInfoResult
result.AddedNode = peer.addr
result.AddedNode = peer.Addr()
result.Connected = btcjson.Bool(peer.Connected())
// Split the address into host and port portions so we can do
// a DNS lookup against the host. When no port is specified in
// the address, just use the address as the host.
host, _, err := net.SplitHostPort(peer.addr)
host, _, err := net.SplitHostPort(peer.Addr())
if err != nil {
host = peer.addr
host = peer.Addr()
}
// Do a DNS lookup for the address. If the lookup fails, just
@ -1007,7 +1010,7 @@ func handleGetAddedNodeInfo(s *rpcServer, cmd interface{}, closeChan <-chan stru
addr.Address = ip
addr.Connected = "false"
if ip == host && peer.Connected() {
addr.Connected = directionString(peer.inbound)
addr.Connected = directionString(peer.Inbound())
}
addrs = append(addrs, addr)
}
@ -2282,7 +2285,38 @@ func handleGetNetworkHashPS(s *rpcServer, cmd interface{}, closeChan <-chan stru
// handleGetPeerInfo implements the getpeerinfo command.
func handleGetPeerInfo(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (interface{}, error) {
return s.server.PeerInfo(), nil
peers := s.server.Peers()
syncPeer := s.server.blockManager.SyncPeer()
infos := make([]*btcjson.GetPeerInfoResult, 0, len(peers))
for _, p := range peers {
statsSnap := p.StatsSnapshot()
info := &btcjson.GetPeerInfoResult{
ID: statsSnap.ID,
Addr: statsSnap.Addr,
Services: fmt.Sprintf("%08d", uint64(statsSnap.Services)),
LastSend: statsSnap.LastSend.Unix(),
LastRecv: statsSnap.LastRecv.Unix(),
BytesSent: statsSnap.BytesSent,
BytesRecv: statsSnap.BytesRecv,
ConnTime: statsSnap.ConnTime.Unix(),
PingTime: float64(statsSnap.LastPingMicros),
TimeOffset: statsSnap.TimeOffset,
Version: statsSnap.Version,
SubVer: statsSnap.UserAgent,
Inbound: statsSnap.Inbound,
StartingHeight: statsSnap.StartingHeight,
CurrentHeight: statsSnap.LastBlock,
BanScore: 0,
SyncNode: p == syncPeer,
}
if p.LastPingNonce() != 0 {
wait := float64(time.Now().Sub(statsSnap.LastPingTime).Nanoseconds())
// We actually want microseconds.
info.PingWait = wait / 1000
}
infos = append(infos, info)
}
return infos, nil
}
// handleGetRawMempool implements the getrawmempool command.

1312
server.go

File diff suppressed because it is too large Load diff