971 lines
29 KiB
Go
971 lines
29 KiB
Go
// Copyright (c) 2013 Conformal Systems LLC.
|
|
// Use of this source code is governed by an ISC
|
|
// license that can be found in the LICENSE file.
|
|
|
|
package main
|
|
|
|
import (
|
|
"bytes"
|
|
"container/list"
|
|
"errors"
|
|
"fmt"
|
|
"github.com/conformal/btcchain"
|
|
"github.com/conformal/btcdb"
|
|
"github.com/conformal/btcutil"
|
|
"github.com/conformal/btcwire"
|
|
"github.com/conformal/go-socks"
|
|
"github.com/davecgh/go-spew/spew"
|
|
"net"
|
|
"strconv"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
const outputBufferSize = 50
|
|
|
|
// userAgent is the user agent string used to identify ourselves to other
|
|
// bitcoin peers.
|
|
var userAgent = fmt.Sprintf("/btcd:%d.%d.%d/", appMajor, appMinor, appPatch)
|
|
|
|
// zeroHash is the zero value hash (all zeros). It is defined as a convenience.
|
|
var zeroHash btcwire.ShaHash
|
|
|
|
// minUint32 is a helper function to return the minimum of two uint32s.
|
|
// This avoids a math import and the need to cast to floats.
|
|
func minUint32(a, b uint32) uint32 {
|
|
if a < b {
|
|
return a
|
|
}
|
|
return b
|
|
}
|
|
|
|
// newNetAddress attempts to extract the IP address and port from the passed
|
|
// net.Addr interface and create a bitcoin NetAddress structure using that
|
|
// information.
|
|
func newNetAddress(addr net.Addr, services btcwire.ServiceFlag) (*btcwire.NetAddress, error) {
|
|
// addr will be a net.TCPAddr when not using a proxy.
|
|
if tcpAddr, ok := addr.(*net.TCPAddr); ok {
|
|
ip := tcpAddr.IP
|
|
port := uint16(tcpAddr.Port)
|
|
na := btcwire.NewNetAddressIPPort(ip, port, services)
|
|
return na, nil
|
|
}
|
|
|
|
// addr will be a socks.ProxiedAddr when using a proxy.
|
|
if proxiedAddr, ok := addr.(*socks.ProxiedAddr); ok {
|
|
ip := net.ParseIP(proxiedAddr.Host)
|
|
if ip == nil {
|
|
ip = net.ParseIP("0.0.0.0")
|
|
}
|
|
port := uint16(proxiedAddr.Port)
|
|
na := btcwire.NewNetAddressIPPort(ip, port, services)
|
|
return na, nil
|
|
}
|
|
|
|
// For the most part, addr should be one of the two above cases, but
|
|
// to be safe, fall back to trying to parse the information from the
|
|
// address string as a last resort.
|
|
host, portStr, err := net.SplitHostPort(addr.String())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
ip := net.ParseIP(host)
|
|
port, err := strconv.ParseUint(portStr, 10, 16)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
na := btcwire.NewNetAddressIPPort(ip, uint16(port), services)
|
|
return na, nil
|
|
}
|
|
|
|
// peer provides a bitcoin peer for handling bitcoin communications.
|
|
type peer struct {
|
|
server *server
|
|
protocolVersion uint32
|
|
btcnet btcwire.BitcoinNet
|
|
services btcwire.ServiceFlag
|
|
started bool
|
|
conn net.Conn
|
|
timeConnected time.Time
|
|
inbound bool
|
|
disconnect bool
|
|
persistent bool
|
|
versionKnown bool
|
|
knownAddresses map[string]bool
|
|
lastBlock int32
|
|
requestQueue *list.List
|
|
continueHash *btcwire.ShaHash
|
|
wg sync.WaitGroup
|
|
outputQueue chan btcwire.Message
|
|
blockProcessed chan bool
|
|
quit chan bool
|
|
}
|
|
|
|
// pushVersionMsg sends a version message to the connected peer using the
|
|
// current state.
|
|
func (p *peer) pushVersionMsg() error {
|
|
_, blockNum, err := p.server.db.NewestSha()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Create a NetAddress for the local IP. Don't assume any services
|
|
// until we know otherwise.
|
|
naMe, err := newNetAddress(p.conn.LocalAddr(), 0)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Create a NetAddress for the remote IP. Don't assume any services
|
|
// until we know otherwise.
|
|
naYou, err := newNetAddress(p.conn.RemoteAddr(), 0)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Version message.
|
|
msg := btcwire.NewMsgVersion(naMe, naYou, p.server.nonce, userAgent,
|
|
int32(blockNum))
|
|
|
|
// XXX: bitcoind appears to always enable the full node services flag
|
|
// of the remote peer netaddress field in the version message regardless
|
|
// of whether it knows it supports it or not. Also, bitcoind sets
|
|
// the services field of the local peer to 0 regardless of support.
|
|
//
|
|
// Realistically, this should be set as follows:
|
|
// - For outgoing connections:
|
|
// - Set the local netaddress services to what the local peer
|
|
// actually supports
|
|
// - Set the remote netaddress services to 0 to indicate no services
|
|
// as they are still unknown
|
|
// - For incoming connections:
|
|
// - Set the local netaddress services to what the local peer
|
|
// actually supports
|
|
// - Set the remote netaddress services to the what was advertised by
|
|
// by the remote peer in its version message
|
|
msg.AddrYou.Services = btcwire.SFNodeNetwork
|
|
|
|
// Advertise that we're a full node.
|
|
msg.Services = btcwire.SFNodeNetwork
|
|
|
|
p.outputQueue <- msg
|
|
return nil
|
|
}
|
|
|
|
// handleVersionMsg is invoked when a peer receives a version bitcoin message
|
|
// and is used to negotiate the protocol version details as well as kick start
|
|
// the communications.
|
|
func (p *peer) handleVersionMsg(msg *btcwire.MsgVersion) {
|
|
// Detect self connections.
|
|
if msg.Nonce == p.server.nonce {
|
|
log.Debugf("[PEER] Disconnecting peer connected to self %s",
|
|
p.conn.RemoteAddr())
|
|
p.Disconnect()
|
|
return
|
|
}
|
|
|
|
// Limit to one version message per peer.
|
|
if p.versionKnown {
|
|
log.Errorf("[PEER] Only one version message per peer is allowed %s.",
|
|
p.conn.RemoteAddr())
|
|
p.Disconnect()
|
|
return
|
|
}
|
|
|
|
// Negotiate the protocol version.
|
|
p.protocolVersion = minUint32(p.protocolVersion, uint32(msg.ProtocolVersion))
|
|
p.versionKnown = true
|
|
log.Debugf("[PEER] Negotiated protocol version %d for peer %s",
|
|
p.protocolVersion, p.conn.RemoteAddr())
|
|
p.lastBlock = msg.LastBlock
|
|
|
|
// Set the supported services for the peer to what the remote peer
|
|
// advertised.
|
|
p.services = msg.Services
|
|
|
|
// Inbound connections.
|
|
if p.inbound {
|
|
// Send version.
|
|
err := p.pushVersionMsg()
|
|
if err != nil {
|
|
log.Errorf("[PEER] %v", err)
|
|
p.Disconnect()
|
|
return
|
|
}
|
|
|
|
// Add inbound peer address to the server address manager.
|
|
na, err := btcwire.NewNetAddress(p.conn.RemoteAddr(), p.services)
|
|
if err != nil {
|
|
log.Errorf("[PEER] %v", err)
|
|
p.Disconnect()
|
|
return
|
|
}
|
|
p.server.addrManager.AddAddress(na)
|
|
}
|
|
|
|
// Send verack.
|
|
p.outputQueue <- btcwire.NewMsgVerAck()
|
|
|
|
// Outbound connections.
|
|
if !p.inbound {
|
|
// TODO(davec): Only do this if not doing the initial block
|
|
// download and the local address is routable.
|
|
if !cfg.DisableListen {
|
|
// Advertise the local address.
|
|
na, err := newNetAddress(p.conn.LocalAddr(), p.services)
|
|
if err != nil {
|
|
log.Errorf("[PEER] %v", err)
|
|
p.Disconnect()
|
|
return
|
|
}
|
|
addresses := map[string]*btcwire.NetAddress{
|
|
NetAddressKey(na): na,
|
|
}
|
|
p.pushAddrMsg(addresses)
|
|
}
|
|
|
|
// Request known addresses if the server address manager needs
|
|
// more and the peer has a protocol version new enough to
|
|
// include a timestamp with addresses.
|
|
hasTimestamp := p.protocolVersion >= btcwire.NetAddressTimeVersion
|
|
if p.server.addrManager.NeedMoreAddresses() && hasTimestamp {
|
|
p.outputQueue <- btcwire.NewMsgGetAddr()
|
|
}
|
|
}
|
|
|
|
// Signal the block manager this peer is a new sync candidate.
|
|
p.server.blockManager.newCandidates <- p
|
|
|
|
// TODO: Relay alerts.
|
|
}
|
|
|
|
// pushTxMsg sends a tx message for the provided transaction hash to the
|
|
// connected peer. An error is returned if the transaction sha is not known.
|
|
func (p *peer) pushTxMsg(sha btcwire.ShaHash) error {
|
|
// We dont deal with these for now.
|
|
return errors.New("Tx fetching not implemented")
|
|
}
|
|
|
|
// pushBlockMsg sends a block message for the provided block hash to the
|
|
// connected peer. An error is returned if the block hash is not known.
|
|
func (p *peer) pushBlockMsg(sha btcwire.ShaHash) error {
|
|
// What should this function do about the rate limiting the
|
|
// number of blocks queued for this peer?
|
|
// Current thought is have a counting mutex in the peer
|
|
// such that if > N Tx/Block requests are currently in
|
|
// the tx queue, wait until the mutex clears allowing more to be
|
|
// sent. This prevents 500 1+MB blocks from being loaded into
|
|
// memory and sit around until the output queue drains.
|
|
// Actually the outputQueue has a limit of 50 in its queue
|
|
// but still 50MB to 1.6GB(50 32MB blocks) just setting
|
|
// in memory waiting to be sent is pointless.
|
|
// I would recommend a getdata request limit of about 5
|
|
// outstanding objects.
|
|
// Should the tx complete api be a mutex or channel?
|
|
|
|
blk, err := p.server.db.FetchBlockBySha(&sha)
|
|
if err != nil {
|
|
log.Tracef("[PEER] Unable to fetch requested block sha %v: %v",
|
|
&sha, err)
|
|
return err
|
|
}
|
|
p.QueueMessage(blk.MsgBlock())
|
|
|
|
// When the peer requests the final block that was advertised in
|
|
// response to a getblocks message which requested more blocks than
|
|
// would fit into a single message, send it a new inventory message
|
|
// to trigger it to issue another getblocks message for the next
|
|
// batch of inventory.
|
|
if p.continueHash != nil && p.continueHash.IsEqual(&sha) {
|
|
hash, _, err := p.server.db.NewestSha()
|
|
if err == nil {
|
|
invMsg := btcwire.NewMsgInv()
|
|
iv := btcwire.NewInvVect(btcwire.InvVect_Block, hash)
|
|
invMsg.AddInvVect(iv)
|
|
p.QueueMessage(invMsg)
|
|
p.continueHash = nil
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// pushGetBlocksMsg send a getblocks message for the provided block locator
|
|
// and stop hash.
|
|
func (p *peer) pushGetBlocksMsg(locator btcchain.BlockLocator, stopHash *btcwire.ShaHash) error {
|
|
msg := btcwire.NewMsgGetBlocks(stopHash)
|
|
for _, hash := range locator {
|
|
err := msg.AddBlockLocatorHash(hash)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
p.QueueMessage(msg)
|
|
return nil
|
|
}
|
|
|
|
// handleInvMsg is invoked when a peer receives an inv bitcoin message and is
|
|
// used to examine the inventory being advertised by the remote peer and react
|
|
// accordingly.
|
|
//
|
|
// NOTE: This will need to have tx handling added as well when they are
|
|
// supported.
|
|
func (p *peer) handleInvMsg(msg *btcwire.MsgInv) {
|
|
// Attempt to find the final block in the inventory list. There may
|
|
// not be one.
|
|
lastBlock := -1
|
|
invVects := msg.InvList
|
|
for i := len(invVects) - 1; i >= 0; i-- {
|
|
if invVects[i].Type == btcwire.InvVect_Block {
|
|
lastBlock = i
|
|
break
|
|
}
|
|
}
|
|
|
|
// Request the advertised inventory if we don't already have it. Also,
|
|
// request parent blocks of orphans if we receive one we already have.
|
|
// Finally, attempt to detect potential stalls due to long side chains
|
|
// we already have and request more blocks to prevent them.
|
|
chain := p.server.blockManager.blockChain
|
|
for i, iv := range invVects {
|
|
switch iv.Type {
|
|
case btcwire.InvVect_Block:
|
|
if !chain.HaveInventory(iv) {
|
|
// Add it to the request queue.
|
|
p.requestQueue.PushBack(iv)
|
|
continue
|
|
}
|
|
|
|
// The block is an orphan block that we already have.
|
|
// When the existing orphan was processed, it requested
|
|
// the missing parent blocks. When this scenario
|
|
// happens, it means there were more blocks missing
|
|
// than are allowed into a single inventory message. As
|
|
// a result, once this peer requested the final
|
|
// advertised block, the remote peer noticed and is now
|
|
// resending the orphan block as an available block
|
|
// to signal there are more missing blocks that need to
|
|
// be requested.
|
|
if chain.IsKnownOrphan(&iv.Hash) {
|
|
// Request blocks starting at the latest known
|
|
// up to the root of the orphan that just came
|
|
// in.
|
|
orphanRoot := chain.GetOrphanRoot(&iv.Hash)
|
|
locator, err := chain.LatestBlockLocator()
|
|
if err != nil {
|
|
log.Errorf("[PEER] Failed to get block "+
|
|
"locator for the latest block: "+
|
|
"%v", err)
|
|
continue
|
|
}
|
|
p.pushGetBlocksMsg(locator, orphanRoot)
|
|
continue
|
|
}
|
|
|
|
// We already have the final block advertised by this
|
|
// inventory message, so force a request for more. This
|
|
// should only really happen if we're on a really long
|
|
// side chain.
|
|
if i == lastBlock {
|
|
// Request blocks after this one up to the
|
|
// final one the remote peer knows about (zero
|
|
// stop hash).
|
|
locator := chain.BlockLocatorFromHash(&iv.Hash)
|
|
p.pushGetBlocksMsg(locator, &zeroHash)
|
|
}
|
|
|
|
// Ignore unsupported inventory types.
|
|
default:
|
|
continue
|
|
}
|
|
}
|
|
|
|
// Request as much as possible at once. Anything that won't fit into
|
|
// the request will be requested on the next inv message.
|
|
numRequested := 0
|
|
gdmsg := btcwire.NewMsgGetData()
|
|
for e := p.requestQueue.Front(); e != nil; e = p.requestQueue.Front() {
|
|
iv := e.Value.(*btcwire.InvVect)
|
|
gdmsg.AddInvVect(iv)
|
|
p.requestQueue.Remove(e)
|
|
|
|
numRequested++
|
|
if numRequested >= btcwire.MaxInvPerMsg {
|
|
break
|
|
}
|
|
}
|
|
if len(gdmsg.InvList) > 0 {
|
|
p.QueueMessage(gdmsg)
|
|
}
|
|
}
|
|
|
|
// handleGetData is invoked when a peer receives a getdata bitcoin message and
|
|
// is used to deliver block and transaction information.
|
|
func (p *peer) handleGetDataMsg(msg *btcwire.MsgGetData) {
|
|
notFound := btcwire.NewMsgNotFound()
|
|
|
|
out:
|
|
for _, iv := range msg.InvList {
|
|
var err error
|
|
switch iv.Type {
|
|
case btcwire.InvVect_Tx:
|
|
err = p.pushTxMsg(iv.Hash)
|
|
case btcwire.InvVect_Block:
|
|
err = p.pushBlockMsg(iv.Hash)
|
|
default:
|
|
log.Warnf("[PEER] Unknown type in inventory request %d",
|
|
iv.Type)
|
|
break out
|
|
}
|
|
if err != nil {
|
|
notFound.AddInvVect(iv)
|
|
}
|
|
}
|
|
if len(notFound.InvList) != 0 {
|
|
p.QueueMessage(notFound)
|
|
}
|
|
}
|
|
|
|
// handleGetBlocksMsg is invoked when a peer receives a getdata bitcoin message.
|
|
func (p *peer) handleGetBlocksMsg(msg *btcwire.MsgGetBlocks) {
|
|
// Return all block hashes to the latest one (up to max per message) if
|
|
// no stop hash was specified.
|
|
// Attempt to find the ending index of the stop hash if specified.
|
|
endIdx := btcdb.AllShas
|
|
if !msg.HashStop.IsEqual(&zeroHash) {
|
|
block, err := p.server.db.FetchBlockBySha(&msg.HashStop)
|
|
if err == nil {
|
|
endIdx = block.Height() + 1
|
|
}
|
|
}
|
|
|
|
// Find the most recent known block based on the block locator.
|
|
// Use the block after the genesis block if no other blocks in the
|
|
// provided locator are known. This does mean the client will start
|
|
// over with the genesis block if unknown block locators are provided.
|
|
// This mirrors the behavior in the reference implementation.
|
|
startIdx := int64(1)
|
|
for _, hash := range msg.BlockLocatorHashes {
|
|
block, err := p.server.db.FetchBlockBySha(hash)
|
|
if err == nil {
|
|
// Start with the next hash since we know this one.
|
|
startIdx = block.Height() + 1
|
|
break
|
|
}
|
|
}
|
|
|
|
// Don't attempt to fetch more than we can put into a single message.
|
|
autoContinue := false
|
|
if endIdx-startIdx > btcwire.MaxBlocksPerMsg {
|
|
endIdx = startIdx + btcwire.MaxBlocksPerMsg
|
|
autoContinue = true
|
|
}
|
|
|
|
// Generate inventory message.
|
|
//
|
|
// The FetchBlockBySha call is limited to a maximum number of hashes
|
|
// per invocation. Since the maximum number of inventory per message
|
|
// might be larger, call it multiple times with the appropriate indices
|
|
// as needed.
|
|
invMsg := btcwire.NewMsgInv()
|
|
for start := startIdx; start < endIdx; {
|
|
// Fetch the inventory from the block database.
|
|
hashList, err := p.server.db.FetchHeightRange(start, endIdx)
|
|
if err != nil {
|
|
log.Warnf("[PEER] Block lookup failed: %v", err)
|
|
return
|
|
}
|
|
|
|
// The database did not return any further hashes. Break out of
|
|
// the loop now.
|
|
if len(hashList) == 0 {
|
|
break
|
|
}
|
|
|
|
// Add block inventory to the message.
|
|
for _, hash := range hashList {
|
|
hashCopy := hash
|
|
iv := btcwire.NewInvVect(btcwire.InvVect_Block, &hashCopy)
|
|
invMsg.AddInvVect(iv)
|
|
}
|
|
start += int64(len(hashList))
|
|
}
|
|
|
|
// Send the inventory message if there is anything to send.
|
|
if len(invMsg.InvList) > 0 {
|
|
invListLen := len(invMsg.InvList)
|
|
if autoContinue && invListLen == btcwire.MaxBlocksPerMsg {
|
|
// Intentionally use a copy of the final hash so there
|
|
// is not a reference into the inventory slice which
|
|
// would prevent the entire slice from being eligible
|
|
// for GC as soon as it's sent.
|
|
continueHash := invMsg.InvList[invListLen-1].Hash
|
|
p.continueHash = &continueHash
|
|
}
|
|
p.QueueMessage(invMsg)
|
|
}
|
|
}
|
|
|
|
// handleGetBlocksMsg is invoked when a peer receives a getheaders bitcoin
|
|
// message.
|
|
func (p *peer) handleGetHeadersMsg(msg *btcwire.MsgGetHeaders) {
|
|
// Attempt to look up the height of the provided stop hash.
|
|
endIdx := btcdb.AllShas
|
|
block, err := p.server.db.FetchBlockBySha(&msg.HashStop)
|
|
if err == nil {
|
|
endIdx = block.Height() + 1
|
|
}
|
|
|
|
// There are no block locators so a specific header is being requested
|
|
// as identified by the stop hash.
|
|
if len(msg.BlockLocatorHashes) == 0 {
|
|
// No blocks with the stop hash were found so there is nothing
|
|
// to do. Just return. This behavior mirrors the reference
|
|
// implementation.
|
|
if endIdx == btcdb.AllShas {
|
|
return
|
|
}
|
|
|
|
// Send the requested block header.
|
|
headersMsg := btcwire.NewMsgHeaders()
|
|
hdr := block.MsgBlock().Header // copy
|
|
hdr.TxnCount = 0
|
|
headersMsg.AddBlockHeader(&hdr)
|
|
p.QueueMessage(headersMsg)
|
|
return
|
|
}
|
|
|
|
// Find the most recent known block based on the block locator.
|
|
// Use the block after the genesis block if no other blocks in the
|
|
// provided locator are known. This does mean the client will start
|
|
// over with the genesis block if unknown block locators are provided.
|
|
// This mirrors the behavior in the reference implementation.
|
|
startIdx := int64(1)
|
|
for _, hash := range msg.BlockLocatorHashes {
|
|
block, err := p.server.db.FetchBlockBySha(hash)
|
|
if err == nil {
|
|
// Start with the next hash since we know this one.
|
|
startIdx = block.Height() + 1
|
|
break
|
|
}
|
|
}
|
|
|
|
// Don't attempt to fetch more than we can put into a single message.
|
|
if endIdx-startIdx > btcwire.MaxBlockHeadersPerMsg {
|
|
endIdx = startIdx + btcwire.MaxBlockHeadersPerMsg
|
|
}
|
|
|
|
// Generate headers message and send it.
|
|
//
|
|
// The FetchBlockBySha call is limited to a maximum number of hashes
|
|
// per invocation. Since the maximum number of headers per message
|
|
// might be larger, call it multiple times with the appropriate indices
|
|
// as needed.
|
|
headersMsg := btcwire.NewMsgHeaders()
|
|
for start := startIdx; start < endIdx; {
|
|
// Fetch the inventory from the block database.
|
|
hashList, err := p.server.db.FetchHeightRange(start, endIdx)
|
|
if err != nil {
|
|
log.Warnf("[PEER] Header lookup failed: %v", err)
|
|
return
|
|
}
|
|
|
|
// The database did not return any further hashes. Break out of
|
|
// the loop now.
|
|
if len(hashList) == 0 {
|
|
break
|
|
}
|
|
|
|
// Add headers to the message.
|
|
for _, hash := range hashList {
|
|
block, err := p.server.db.FetchBlockBySha(&hash)
|
|
if err != nil {
|
|
log.Warnf("[PEER] Lookup of known block hash "+
|
|
"failed: %v", err)
|
|
continue
|
|
}
|
|
hdr := block.MsgBlock().Header // copy
|
|
hdr.TxnCount = 0
|
|
headersMsg.AddBlockHeader(&hdr)
|
|
}
|
|
|
|
// Start at the next block header after the latest one on the
|
|
// next loop iteration.
|
|
start += int64(len(hashList))
|
|
}
|
|
p.QueueMessage(headersMsg)
|
|
}
|
|
|
|
// handleGetAddrMsg is invoked when a peer receives a getaddr bitcoin message
|
|
// and is used to provide the peer with known addresses from the address
|
|
// manager.
|
|
func (p *peer) handleGetAddrMsg(msg *btcwire.MsgGetAddr) {
|
|
// Get the current known addresses from the address manager.
|
|
addrCache := p.server.addrManager.AddressCache()
|
|
|
|
// Push the addresses.
|
|
err := p.pushAddrMsg(addrCache)
|
|
if err != nil {
|
|
log.Errorf("[PEER] %v", err)
|
|
p.Disconnect()
|
|
return
|
|
}
|
|
}
|
|
|
|
// pushAddrMsg sends one, or more, addr message(s) to the connected peer using
|
|
// the provided addresses.
|
|
func (p *peer) pushAddrMsg(addresses map[string]*btcwire.NetAddress) error {
|
|
// Nothing to send.
|
|
if len(addresses) == 0 {
|
|
return nil
|
|
}
|
|
|
|
numAdded := 0
|
|
msg := btcwire.NewMsgAddr()
|
|
for _, na := range addresses {
|
|
// Filter addresses the peer already knows about.
|
|
if p.knownAddresses[NetAddressKey(na)] {
|
|
continue
|
|
}
|
|
|
|
// Add the address to the message.
|
|
err := msg.AddAddress(na)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
numAdded++
|
|
|
|
// Split into multiple messages as needed.
|
|
if numAdded > 0 && numAdded%btcwire.MaxAddrPerMsg == 0 {
|
|
p.outputQueue <- msg
|
|
msg.ClearAddresses()
|
|
}
|
|
}
|
|
|
|
// Send message with remaining addresses if needed.
|
|
if numAdded%btcwire.MaxAddrPerMsg != 0 {
|
|
p.outputQueue <- msg
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// handleAddrMsg is invoked when a peer receives an addr bitcoin message and
|
|
// is used to notify the server about advertised addresses.
|
|
func (p *peer) handleAddrMsg(msg *btcwire.MsgAddr) {
|
|
// Ignore old style addresses which don't include a timestamp.
|
|
if p.protocolVersion < btcwire.NetAddressTimeVersion {
|
|
return
|
|
}
|
|
|
|
// A message that has no addresses is invalid.
|
|
if len(msg.AddrList) == 0 {
|
|
log.Errorf("[PEER] Command [%s] from %s does not contain any addresses",
|
|
msg.Command(), p.conn.RemoteAddr())
|
|
p.Disconnect()
|
|
return
|
|
}
|
|
|
|
for _, na := range msg.AddrList {
|
|
// Don't add more address if we're disconnecting.
|
|
if p.disconnect {
|
|
return
|
|
}
|
|
|
|
// Set the timestamp to 5 days ago if it's more than 24 hours
|
|
// in the future so this address is one of the first to be
|
|
// removed when space is needed.
|
|
now := time.Now()
|
|
if na.Timestamp.After(now.Add(time.Minute * 10)) {
|
|
na.Timestamp = now.Add(-1 * time.Hour * 24 * 5)
|
|
}
|
|
|
|
// Add address to known addresses for this peer.
|
|
p.knownAddresses[NetAddressKey(na)] = true
|
|
}
|
|
|
|
// Add addresses to server address manager. The address manager handles
|
|
// the details of things such as preventing duplicate addresses, max
|
|
// addresses, and last seen updates.
|
|
p.server.addrManager.AddAddresses(msg.AddrList)
|
|
}
|
|
|
|
// handlePingMsg is invoked when a peer receives a ping bitcoin message. For
|
|
// recent clients (protocol version > BIP0031Version), it replies with a pong
|
|
// message. For older clients, it does nothing and anything other than failure
|
|
// is considered a successful ping.
|
|
func (p *peer) handlePingMsg(msg *btcwire.MsgPing) {
|
|
// Only Reply with pong is message comes from a new enough client.
|
|
if p.protocolVersion > btcwire.BIP0031Version {
|
|
// Include nonce from ping so pong can be identified.
|
|
p.outputQueue <- btcwire.NewMsgPong(msg.Nonce)
|
|
}
|
|
}
|
|
|
|
// readMessage reads the next bitcoin message from the peer with logging.
|
|
func (p *peer) readMessage() (msg btcwire.Message, buf []byte, err error) {
|
|
msg, buf, err = btcwire.ReadMessage(p.conn, p.protocolVersion, p.btcnet)
|
|
if err != nil {
|
|
return
|
|
}
|
|
log.Debugf("[PEER] Received command [%v] from %s", msg.Command(),
|
|
p.conn.RemoteAddr())
|
|
|
|
// Use closures to log expensive operations so they are only run when
|
|
// the logging level requires it.
|
|
log.Tracef("%v", newLogClosure(func() string {
|
|
return "[PEER] " + spew.Sdump(msg)
|
|
}))
|
|
log.Tracef("%v", newLogClosure(func() string {
|
|
return "[PEER] " + spew.Sdump(buf)
|
|
}))
|
|
|
|
return
|
|
}
|
|
|
|
// writeMessage sends a bitcoin Message to the peer with logging.
|
|
func (p *peer) writeMessage(msg btcwire.Message) error {
|
|
log.Debugf("[PEER] Sending command [%v] to %s", msg.Command(),
|
|
p.conn.RemoteAddr())
|
|
|
|
// Use closures to log expensive operations so they are only run when the
|
|
// logging level requires it.
|
|
log.Tracef("%v", newLogClosure(func() string {
|
|
return "[PEER] msg" + spew.Sdump(msg)
|
|
}))
|
|
log.Tracef("%v", newLogClosure(func() string {
|
|
var buf bytes.Buffer
|
|
err := btcwire.WriteMessage(&buf, msg, p.protocolVersion, p.btcnet)
|
|
if err != nil {
|
|
return err.Error()
|
|
}
|
|
return "[PEER] " + spew.Sdump(buf.Bytes())
|
|
}))
|
|
|
|
// Write the message to the peer.
|
|
err := btcwire.WriteMessage(p.conn, msg, p.protocolVersion, p.btcnet)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// isAllowedByRegression returns whether or not the passed error is allowed by
|
|
// regression tests without disconnecting the peer. In particular, regression
|
|
// tests need to be allowed to send malformed messages without the peer being
|
|
// disconnected.
|
|
func (p *peer) isAllowedByRegression(err error) bool {
|
|
// Don't allow the error if it's not specifically a malformed message
|
|
// error.
|
|
if _, ok := err.(*btcwire.MessageError); !ok {
|
|
return false
|
|
}
|
|
|
|
// Don't allow the error if it's not coming from localhost or the
|
|
// hostname can't be determined for some reason.
|
|
host, _, err := net.SplitHostPort(p.conn.RemoteAddr().String())
|
|
if err != nil {
|
|
return false
|
|
}
|
|
|
|
if host != "127.0.0.1" && host != "localhost" {
|
|
return false
|
|
}
|
|
|
|
// Allowed if all checks passed.
|
|
return true
|
|
}
|
|
|
|
// inHandler handles all incoming messages for the peer. It must be run as a
|
|
// goroutine.
|
|
func (p *peer) inHandler() {
|
|
out:
|
|
for !p.disconnect {
|
|
rmsg, buf, err := p.readMessage()
|
|
if err != nil {
|
|
// In order to allow regression tests with malformed
|
|
// messages, don't disconnect the peer when we're in
|
|
// regression test mode and the error is one of the
|
|
// allowed errors.
|
|
if cfg.RegressionTest && p.isAllowedByRegression(err) {
|
|
log.Errorf("[PEER] %v", err)
|
|
continue
|
|
}
|
|
|
|
// Only log the error if we're not forcibly disconnecting.
|
|
if !p.disconnect {
|
|
log.Errorf("[PEER] %v", err)
|
|
}
|
|
break out
|
|
}
|
|
|
|
// Ensure version message comes first.
|
|
if _, ok := rmsg.(*btcwire.MsgVersion); !ok && !p.versionKnown {
|
|
log.Errorf("[PEER] A version message must precede all others")
|
|
break out
|
|
}
|
|
|
|
// Handle each supported message type.
|
|
switch msg := rmsg.(type) {
|
|
case *btcwire.MsgVersion:
|
|
p.handleVersionMsg(msg)
|
|
|
|
case *btcwire.MsgVerAck:
|
|
// Do nothing.
|
|
|
|
case *btcwire.MsgGetAddr:
|
|
p.handleGetAddrMsg(msg)
|
|
|
|
case *btcwire.MsgAddr:
|
|
p.handleAddrMsg(msg)
|
|
|
|
case *btcwire.MsgPing:
|
|
p.handlePingMsg(msg)
|
|
|
|
case *btcwire.MsgPong:
|
|
// Don't do anything, but could try to work out network
|
|
// timing or similar.
|
|
|
|
case *btcwire.MsgAlert:
|
|
p.server.BroadcastMessage(msg, p)
|
|
|
|
case *btcwire.MsgBlock:
|
|
// Queue the block up to be handled by the block
|
|
// manager and intentionally block further receives
|
|
// until the bitcoin block is fully processed and known
|
|
// good or bad. This helps prevent a malicious peer
|
|
// from queueing up a bunch of bad blocks before
|
|
// disconnecting (or being disconnected) and wasting
|
|
// memory. Additionally, this behavior is depended on
|
|
// by at least the block acceptance test tool as the
|
|
// reference implementation processes blocks in the same
|
|
// thread and therefore blocks further messages until
|
|
// the bitcoin block has been fully processed.
|
|
block := btcutil.NewBlockFromBlockAndBytes(msg, buf)
|
|
p.server.blockManager.QueueBlock(block, p)
|
|
<-p.blockProcessed
|
|
|
|
case *btcwire.MsgInv:
|
|
p.handleInvMsg(msg)
|
|
|
|
case *btcwire.MsgGetData:
|
|
p.handleGetDataMsg(msg)
|
|
|
|
case *btcwire.MsgGetBlocks:
|
|
p.handleGetBlocksMsg(msg)
|
|
|
|
case *btcwire.MsgGetHeaders:
|
|
p.handleGetHeadersMsg(msg)
|
|
|
|
default:
|
|
log.Debugf("[PEER] Received unhandled message of type %v: Fix Me",
|
|
rmsg.Command())
|
|
}
|
|
}
|
|
|
|
// Ensure connection is closed and notify server and block manager that
|
|
// the peer is done.
|
|
p.Disconnect()
|
|
p.server.donePeers <- p
|
|
p.server.blockManager.donePeers <- p
|
|
p.quit <- true
|
|
|
|
p.wg.Done()
|
|
log.Tracef("[PEER] Peer input handler done for %s", p.conn.RemoteAddr())
|
|
}
|
|
|
|
// outHandler handles all outgoing messages for the peer. It must be run as a
|
|
// goroutine. It uses a buffered channel to serialize output messages while
|
|
// allowing the sender to continue running asynchronously.
|
|
func (p *peer) outHandler() {
|
|
out:
|
|
for {
|
|
select {
|
|
case msg := <-p.outputQueue:
|
|
// Don't send anything if we're disconnected.
|
|
if p.disconnect {
|
|
continue
|
|
}
|
|
err := p.writeMessage(msg)
|
|
if err != nil {
|
|
p.Disconnect()
|
|
log.Errorf("[PEER] %v", err)
|
|
}
|
|
|
|
case <-p.quit:
|
|
break out
|
|
}
|
|
}
|
|
p.wg.Done()
|
|
log.Tracef("[PEER] Peer output handler done for %s", p.conn.RemoteAddr())
|
|
}
|
|
|
|
// QueueMessage adds the passed bitcoin message to the peer send queue. It
|
|
// uses a buffered channel to communicate with the output handler goroutine so
|
|
// it is automatically rate limited and safe for concurrent access.
|
|
func (p *peer) QueueMessage(msg btcwire.Message) {
|
|
p.outputQueue <- msg
|
|
}
|
|
|
|
// Start begins processing input and output messages. It also sends the initial
|
|
// version message for outbound connections to start the negotiation process.
|
|
func (p *peer) Start() error {
|
|
// Already started?
|
|
if p.started {
|
|
return nil
|
|
}
|
|
|
|
log.Tracef("[PEER] Starting peer %s", p.conn.RemoteAddr())
|
|
|
|
// Send an initial version message if this is an outbound connection.
|
|
if !p.inbound {
|
|
err := p.pushVersionMsg()
|
|
if err != nil {
|
|
log.Errorf("[PEER] %v", err)
|
|
p.conn.Close()
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Start processing input and output.
|
|
go p.inHandler()
|
|
go p.outHandler()
|
|
p.wg.Add(2)
|
|
p.started = true
|
|
|
|
return nil
|
|
}
|
|
|
|
// Disconnect disconnects the peer by closing the connection. It also sets
|
|
// a flag so the impending shutdown can be detected.
|
|
func (p *peer) Disconnect() {
|
|
p.disconnect = true
|
|
p.conn.Close()
|
|
}
|
|
|
|
// Shutdown gracefully shuts down the peer by disconnecting it and waiting for
|
|
// all goroutines to finish.
|
|
func (p *peer) Shutdown() {
|
|
log.Tracef("[PEER] Shutdown peer %s", p.conn.RemoteAddr())
|
|
p.Disconnect()
|
|
p.wg.Wait()
|
|
}
|
|
|
|
// newPeer returns a new bitcoin peer for the provided server and connection.
|
|
// Use start to begin processing incoming and outgoing messages.
|
|
func newPeer(s *server, conn net.Conn, inbound bool, persistent bool) *peer {
|
|
p := peer{
|
|
server: s,
|
|
protocolVersion: btcwire.ProtocolVersion,
|
|
btcnet: s.btcnet,
|
|
services: btcwire.SFNodeNetwork,
|
|
conn: conn,
|
|
timeConnected: time.Now(),
|
|
inbound: inbound,
|
|
persistent: persistent,
|
|
knownAddresses: make(map[string]bool),
|
|
requestQueue: list.New(),
|
|
outputQueue: make(chan btcwire.Message, outputBufferSize),
|
|
blockProcessed: make(chan bool, 1),
|
|
quit: make(chan bool),
|
|
}
|
|
return &p
|
|
}
|