// Copyright (c) 2013 Conformal Systems LLC. // Use of this source code is governed by an ISC // license that can be found in the LICENSE file. package main import ( "bytes" "errors" "fmt" "github.com/conformal/btcdb" "github.com/conformal/btcutil" "github.com/conformal/btcwire" "github.com/conformal/go-socks" "github.com/davecgh/go-spew/spew" "net" "strconv" "sync" "time" ) const outputBufferSize = 50 // userAgent is the user agent string used to identify ourselves to other // bitcoin peers. var userAgent = fmt.Sprintf("/btcd:%d.%d.%d/", appMajor, appMinor, appPatch) // zeroHash is the zero value hash (all zeros). It is defined as a convenience. var zeroHash btcwire.ShaHash // minUint32 is a helper function to return the minimum of two uint32s. // This avoids a math import and the need to cast to floats. func minUint32(a, b uint32) uint32 { if a < b { return a } return b } // newNetAddress attempts to extract the IP address and port from the passed // net.Addr interface and create a bitcoin NetAddress structure using that // information. func newNetAddress(addr net.Addr, services btcwire.ServiceFlag) (*btcwire.NetAddress, error) { // addr will be a net.TCPAddr when not using a proxy. if tcpAddr, ok := addr.(*net.TCPAddr); ok { ip := tcpAddr.IP port := uint16(tcpAddr.Port) na := btcwire.NewNetAddressIPPort(ip, port, services) return na, nil } // addr will be a socks.ProxiedAddr when using a proxy. if proxiedAddr, ok := addr.(*socks.ProxiedAddr); ok { ip := net.ParseIP(proxiedAddr.Host) if ip == nil { ip = net.ParseIP("0.0.0.0") } port := uint16(proxiedAddr.Port) na := btcwire.NewNetAddressIPPort(ip, port, services) return na, nil } // For the most part, addr should be one of the two above cases, but // to be safe, fall back to trying to parse the information from the // address string as a last resort. host, portStr, err := net.SplitHostPort(addr.String()) if err != nil { return nil, err } ip := net.ParseIP(host) port, err := strconv.ParseUint(portStr, 10, 16) if err != nil { return nil, err } na := btcwire.NewNetAddressIPPort(ip, uint16(port), services) return na, nil } // peer provides a bitcoin peer for handling bitcoin communications. type peer struct { server *server protocolVersion uint32 btcnet btcwire.BitcoinNet services btcwire.ServiceFlag started bool conn net.Conn timeConnected time.Time inbound bool disconnect bool persistent bool versionKnown bool knownAddresses map[string]bool lastBlock int32 wg sync.WaitGroup outputQueue chan btcwire.Message quit chan bool } // pushVersionMsg sends a version message to the connected peer using the // current state. func (p *peer) pushVersionMsg() error { _, blockNum, err := p.server.db.NewestSha() if err != nil { return err } // Create a NetAddress for the local IP. Don't assume any services // until we know otherwise. naMe, err := newNetAddress(p.conn.LocalAddr(), 0) if err != nil { return err } // Create a NetAddress for the remote IP. Don't assume any services // until we know otherwise. naYou, err := newNetAddress(p.conn.RemoteAddr(), 0) if err != nil { return err } // Version message. msg := btcwire.NewMsgVersion(naMe, naYou, p.server.nonce, userAgent, int32(blockNum)) // XXX: bitcoind appears to always enable the full node services flag // of the remote peer netaddress field in the version message regardless // of whether it knows it supports it or not. Also, bitcoind sets // the services field of the local peer to 0 regardless of support. // // Realistically, this should be set as follows: // - For outgoing connections: // - Set the local netaddress services to what the local peer // actually supports // - Set the remote netaddress services to 0 to indicate no services // as they are still unknown // - For incoming connections: // - Set the local netaddress services to what the local peer // actually supports // - Set the remote netaddress services to the what was advertised by // by the remote peer in its version message msg.AddrYou.Services = btcwire.SFNodeNetwork // Advertise that we're a full node. msg.Services = btcwire.SFNodeNetwork p.outputQueue <- msg return nil } // handleVersionMsg is invoked when a peer receives a version bitcoin message // and is used to negotiate the protocol version details as well as kick start // the communications. func (p *peer) handleVersionMsg(msg *btcwire.MsgVersion) { // Detect self connections. if msg.Nonce == p.server.nonce { log.Debugf("[PEER] Disconnecting peer connected to self %s", p.conn.RemoteAddr()) p.disconnect = true p.conn.Close() return } // Limit to one version message per peer. if p.versionKnown { log.Errorf("[PEER] Only one version message per peer is allowed %s.", p.conn.RemoteAddr()) p.disconnect = true p.conn.Close() return } // Negotiate the protocol version. p.protocolVersion = minUint32(p.protocolVersion, uint32(msg.ProtocolVersion)) p.versionKnown = true log.Debugf("[PEER] Negotiated protocol version %d for peer %s", p.protocolVersion, p.conn.RemoteAddr()) p.lastBlock = msg.LastBlock // Inbound connections. if p.inbound { // Set the supported services for the peer to what the remote // peer advertised. p.services = msg.Services // Send version. err := p.pushVersionMsg() if err != nil { log.Errorf("[PEER] %v", err) p.disconnect = true p.conn.Close() return } // Add inbound peer address to the server address manager. na, err := btcwire.NewNetAddress(p.conn.RemoteAddr(), p.services) if err != nil { log.Errorf("[PEER] %v", err) p.disconnect = true p.conn.Close() return } p.server.addrManager.AddAddress(na) } // Send verack. p.outputQueue <- btcwire.NewMsgVerAck() // Outbound connections. if !p.inbound { // TODO: Only do this if we're listening, not doing the initial // block download, and are routable. // Advertise the local address. na, err := newNetAddress(p.conn.LocalAddr(), p.services) if err != nil { log.Errorf("[PEER] %v", err) p.disconnect = true p.conn.Close() return } addresses := map[string]*btcwire.NetAddress{ NetAddressKey(na): na, } p.pushAddrMsg(addresses) // Request known addresses if the server address manager needs // more and the peer has a protocol version new enough to // include a timestamp with addresses. hasTimestamp := p.protocolVersion >= btcwire.NetAddressTimeVersion if p.server.addrManager.NeedMoreAddresses() && hasTimestamp { p.outputQueue <- btcwire.NewMsgGetAddr() } } // Request latest blocks if the peer has blocks we're interested in. // XXX: Ask block manager for latest so we get in-flight too... sha, lastBlock, err := p.server.db.NewestSha() if err != nil { log.Errorf("[PEER] %v", err) p.disconnect = true p.conn.Close() } // If the peer has blocks we're interested in. if p.lastBlock > int32(lastBlock) { stopHash := btcwire.ShaHash{} gbmsg := btcwire.NewMsgGetBlocks(&stopHash) p.server.blockManager.AddBlockLocators(sha, gbmsg) p.outputQueue <- gbmsg } // TODO: Relay alerts. } // pushTxMsg sends a tx message for the provided transaction hash to the // connected peer. An error is returned if the transaction sha is not known. func (p *peer) pushTxMsg(sha btcwire.ShaHash) error { // We dont deal with these for now. return errors.New("Tx fetching not implemented") } // pushBlockMsg sends a block message for the provided block hash to the // connected peer. An error is returned if the block hash is not known. func (p *peer) pushBlockMsg(sha btcwire.ShaHash) error { // What should this function do about the rate limiting the // number of blocks queued for this peer? // Current thought is have a counting mutex in the peer // such that if > N Tx/Block requests are currently in // the tx queue, wait until the mutex clears allowing more to be // sent. This prevents 500 1+MB blocks from being loaded into // memory and sit around until the output queue drains. // Actually the outputQueue has a limit of 50 in its queue // but still 50MB to 1.6GB(50 32MB blocks) just setting // in memory waiting to be sent is pointless. // I would recommend a getdata request limit of about 5 // outstanding objects. // Should the tx complete api be a mutex or channel? blk, err := p.server.db.FetchBlockBySha(&sha) if err != nil { log.Tracef("[PEER] Unable to fetch requested block sha %v: %v", &sha, err) return err } p.QueueMessage(blk.MsgBlock()) return nil } // handleGetData is invoked when a peer receives a getdata bitcoin message and // is used to deliver block and transaction information. func (p *peer) handleGetDataMsg(msg *btcwire.MsgGetData) { notFound := btcwire.NewMsgNotFound() out: for _, iv := range msg.InvList { var err error switch iv.Type { case btcwire.InvVect_Tx: err = p.pushTxMsg(iv.Hash) case btcwire.InvVect_Block: err = p.pushBlockMsg(iv.Hash) default: log.Warnf("[PEER] Unknown type in inventory request %d", iv.Type) break out } if err != nil { notFound.AddInvVect(iv) } } if len(notFound.InvList) != 0 { p.QueueMessage(notFound) } } // handleGetBlocksMsg is invoked when a peer receives a getdata bitcoin message. func (p *peer) handleGetBlocksMsg(msg *btcwire.MsgGetBlocks) { var err error startIdx := int64(0) endIdx := btcdb.AllShas // Return all block hashes to the latest one (up to max per message) if // no stop hash was specified. // Attempt to find the ending index of the stop hash if specified. if !msg.HashStop.IsEqual(&zeroHash) { block, err := p.server.db.FetchBlockBySha(&msg.HashStop) if err != nil { // Fetch all if we dont recognize the stop hash. endIdx = btcdb.AllShas } endIdx = block.Height() } // TODO(davec): This should have some logic to utilize the additional // locator hashes to ensure the proper chain. for _, hash := range msg.BlockLocatorHashes { // TODO(drahn) does using the caching interface make sense // on index lookups ? block, err := p.server.db.FetchBlockBySha(hash) if err == nil { // Start with the next hash since we know this one. startIdx = block.Height() + 1 break } } // Don't attempt to fetch more than we can put into a single message. if endIdx-startIdx > btcwire.MaxInvPerMsg { endIdx = startIdx + btcwire.MaxInvPerMsg } // Fetch the inventory from the block database. hashList, err := p.server.db.FetchHeightRange(startIdx, endIdx) if err != nil { log.Warnf(" lookup returned %v ", err) return } // Nothing to send. if len(hashList) == 0 { return } // Generate inventory vectors and push the inventory message. inv := btcwire.NewMsgInv() for _, hash := range hashList { iv := btcwire.InvVect{Type: btcwire.InvVect_Block, Hash: hash} inv.AddInvVect(&iv) } p.QueueMessage(inv) } // handleGetBlocksMsg is invoked when a peer receives a getheaders bitcoin // message. func (p *peer) handleGetHeadersMsg(msg *btcwire.MsgGetHeaders) { var err error startIdx := int64(0) endIdx := btcdb.AllShas // Return all block hashes to the latest one (up to max per message) if // no stop hash was specified. // Attempt to find the ending index of the stop hash if specified. if !msg.HashStop.IsEqual(&zeroHash) { block, err := p.server.db.FetchBlockBySha(&msg.HashStop) if err != nil { // Fetch all if we dont recognize the stop hash. endIdx = btcdb.AllShas } endIdx = block.Height() } // TODO(davec): This should have some logic to utilize the additional // locator hashes to ensure the proper chain. for _, hash := range msg.BlockLocatorHashes { // TODO(drahn) does using the caching interface make sense // on index lookups ? block, err := p.server.db.FetchBlockBySha(hash) if err == nil { // Start with the next hash since we know this one. startIdx = block.Height() + 1 break } } // Don't attempt to fetch more than we can put into a single message. if endIdx-startIdx > btcwire.MaxBlockHeadersPerMsg { endIdx = startIdx + btcwire.MaxBlockHeadersPerMsg } // Fetch the inventory from the block database. hashList, err := p.server.db.FetchHeightRange(startIdx, endIdx) if err != nil { log.Warnf("lookup returned %v ", err) return } // Nothing to send. if len(hashList) == 0 { return } // Generate inventory vectors and push the inventory message. headersMsg := btcwire.NewMsgHeaders() for _, hash := range hashList { block, err := p.server.db.FetchBlockBySha(&hash) if err != nil { log.Warnf("[PEER] badness %v", err) } hdr := block.MsgBlock().Header // copy hdr.TxnCount = 0 headersMsg.AddBlockHeader(&hdr) } p.QueueMessage(headersMsg) } // handleGetAddrMsg is invoked when a peer receives a getaddr bitcoin message // and is used to provide the peer with known addresses from the address // manager. func (p *peer) handleGetAddrMsg(msg *btcwire.MsgGetAddr) { // Get the current known addresses from the address manager. addrCache := p.server.addrManager.AddressCache() // Push the addresses. err := p.pushAddrMsg(addrCache) if err != nil { log.Errorf("[PEER] %v", err) p.disconnect = true p.conn.Close() return } } // pushAddrMsg sends one, or more, addr message(s) to the connected peer using // the provided addresses. func (p *peer) pushAddrMsg(addresses map[string]*btcwire.NetAddress) error { // Nothing to send. if len(addresses) == 0 { return nil } numAdded := 0 msg := btcwire.NewMsgAddr() for _, na := range addresses { // Filter addresses the peer already knows about. if p.knownAddresses[NetAddressKey(na)] { continue } // Add the address to the message. err := msg.AddAddress(na) if err != nil { return err } numAdded++ // Split into multiple messages as needed. if numAdded > 0 && numAdded%btcwire.MaxAddrPerMsg == 0 { p.outputQueue <- msg msg.ClearAddresses() } } // Send message with remaining addresses if needed. if numAdded%btcwire.MaxAddrPerMsg != 0 { p.outputQueue <- msg } return nil } // handleAddrMsg is invoked when a peer receives an addr bitcoin message and // is used to notify the server about advertised addresses. func (p *peer) handleAddrMsg(msg *btcwire.MsgAddr) { // Ignore old style addresses which don't include a timestamp. if p.protocolVersion < btcwire.NetAddressTimeVersion { return } // A message that has no addresses is invalid. if len(msg.AddrList) == 0 { log.Errorf("[PEER] Command [%s] from %s does not contain any addresses", msg.Command(), p.conn.RemoteAddr()) p.disconnect = true p.conn.Close() return } for _, na := range msg.AddrList { // Don't add more address if we're disconnecting. if p.disconnect { return } // Set the timestamp to 5 days ago if it's more than 24 hours // in the future so this address is one of the first to be // removed when space is needed. now := time.Now() if na.Timestamp.After(now.Add(time.Minute * 10)) { na.Timestamp = now.Add(-1 * time.Hour * 24 * 5) } // Add address to known addresses for this peer. p.knownAddresses[NetAddressKey(na)] = true } // Add addresses to server address manager. The address manager handles // the details of things such as preventing duplicate addresses, max // addresses, and last seen updates. p.server.addrManager.AddAddresses(msg.AddrList) } // handlePingMsg is invoked when a peer receives a ping bitcoin message. For // recent clients (protocol version > BIP0031Version), it replies with a pong // message. For older clients, it does nothing and anything other than failure // is considered a successful ping. func (p *peer) handlePingMsg(msg *btcwire.MsgPing) { // Only Reply with pong is message comes from a new enough client. if p.protocolVersion > btcwire.BIP0031Version { // Include nonce from ping so pong can be identified. p.outputQueue <- btcwire.NewMsgPong(msg.Nonce) } } // readMessage reads the next bitcoin message from the peer with logging. func (p *peer) readMessage() (msg btcwire.Message, buf []byte, err error) { msg, buf, err = btcwire.ReadMessage(p.conn, p.protocolVersion, p.btcnet) if err != nil { return } log.Debugf("[PEER] Received command [%v] from %s", msg.Command(), p.conn.RemoteAddr()) // Use closures to log expensive operations so they are only run when // the logging level requires it. log.Tracef("%v", newLogClosure(func() string { return "[PEER] " + spew.Sdump(msg) })) log.Tracef("%v", newLogClosure(func() string { return "[PEER] " + spew.Sdump(buf) })) return } // writeMessage sends a bitcoin Message to the peer with logging. func (p *peer) writeMessage(msg btcwire.Message) error { log.Debugf("[PEER] Sending command [%v] to %s", msg.Command(), p.conn.RemoteAddr()) // Use closures to log expensive operations so they are only run when the // logging level requires it. log.Tracef("%v", newLogClosure(func() string { return "[PEER] msg" + spew.Sdump(msg) })) log.Tracef("%v", newLogClosure(func() string { var buf bytes.Buffer err := btcwire.WriteMessage(&buf, msg, p.protocolVersion, p.btcnet) if err != nil { return err.Error() } return "[PEER] " + spew.Sdump(buf.Bytes()) })) // Write the message to the peer. err := btcwire.WriteMessage(p.conn, msg, p.protocolVersion, p.btcnet) if err != nil { return err } return nil } // isAllowedByRegression returns whether or not the passed error is allowed by // regression tests without disconnecting the peer. In particular, regression // tests need to be allowed to send malformed messages without the peer being // disconnected. func (p *peer) isAllowedByRegression(err error) bool { // Don't allow the error if it's not specifically a malformed message // error. if _, ok := err.(*btcwire.MessageError); !ok { return false } // Don't allow the error if it's not coming from localhost or the // hostname can't be determined for some reason. host, _, err := net.SplitHostPort(p.conn.RemoteAddr().String()) if err != nil { return false } if host != "127.0.0.1" && host != "localhost" { return false } // Allowed if all checks passed. return true } // inHandler handles all incoming messages for the peer. It must be run as a // goroutine. func (p *peer) inHandler() { out: for !p.disconnect { rmsg, buf, err := p.readMessage() if err != nil { // In order to allow regression tests with malformed // messages, don't disconnect the peer when we're in // regression test mode and the error is one of the // allowed errors. if cfg.RegressionTest && p.isAllowedByRegression(err) { log.Errorf("[PEER] %v", err) continue } // Only log the error if we're not forcibly disconnecting. if !p.disconnect { log.Errorf("[PEER] %v", err) } break out } // Ensure version message comes first. if _, ok := rmsg.(*btcwire.MsgVersion); !ok && !p.versionKnown { log.Errorf("[PEER] A version message must precede all others") break out } // Some messages are handled directly, while other messages // are sent to a queue to be processed. Directly handling // getdata and getblocks messages makes it impossible for a peer // to spam with requests. However, it means that our getdata // requests to it may not get prompt replies. switch msg := rmsg.(type) { case *btcwire.MsgVersion: p.handleVersionMsg(msg) case *btcwire.MsgVerAck: // Do nothing. case *btcwire.MsgGetAddr: p.handleGetAddrMsg(msg) case *btcwire.MsgAddr: p.handleAddrMsg(msg) case *btcwire.MsgPing: p.handlePingMsg(msg) case *btcwire.MsgPong: // Don't do anything, but could try to work out network // timing or similar. case *btcwire.MsgAlert: p.server.BroadcastMessage(msg, p) case *btcwire.MsgBlock: block := btcutil.NewBlockFromBlockAndBytes(msg, buf) p.server.blockManager.QueueBlock(block) case *btcwire.MsgInv: p.server.blockManager.QueueInv(msg, p) case *btcwire.MsgGetData: p.handleGetDataMsg(msg) case *btcwire.MsgGetBlocks: p.handleGetBlocksMsg(msg) case *btcwire.MsgGetHeaders: p.handleGetHeadersMsg(msg) default: log.Debugf("[PEER] Received unhandled message of type %v: Fix Me", rmsg.Command()) } } // Ensure connection is closed and notify server that the peer is done. p.disconnect = true p.conn.Close() p.server.donePeers <- p p.quit <- true p.wg.Done() log.Tracef("[PEER] Peer input handler done for %s", p.conn.RemoteAddr()) } // outHandler handles all outgoing messages for the peer. It must be run as a // goroutine. It uses a buffered channel to serialize output messages while // allowing the sender to continue running asynchronously. func (p *peer) outHandler() { out: for { select { case msg := <-p.outputQueue: // Don't send anything if we're disconnected. if p.disconnect { continue } err := p.writeMessage(msg) if err != nil { p.disconnect = true log.Errorf("[PEER] %v", err) } case <-p.quit: break out } } p.wg.Done() log.Tracef("[PEER] Peer output handler done for %s", p.conn.RemoteAddr()) } // QueueMessage adds the passed bitcoin message to the peer send queue. It // uses a buffered channel to communicate with the output handler goroutine so // it is automatically rate limited and safe for concurrent access. func (p *peer) QueueMessage(msg btcwire.Message) { p.outputQueue <- msg } // Start begins processing input and output messages. It also sends the initial // version message for outbound connections to start the negotiation process. func (p *peer) Start() error { // Already started? if p.started { return nil } log.Tracef("[PEER] Starting peer %s", p.conn.RemoteAddr()) // Send an initial version message if this is an outbound connection. if !p.inbound { err := p.pushVersionMsg() if err != nil { log.Errorf("[PEER] %v", err) p.conn.Close() return err } } // Start processing input and output. go p.inHandler() go p.outHandler() p.wg.Add(2) p.started = true return nil } // Shutdown gracefully shuts down the peer by signalling the async input and // output handler and waiting for them to finish. func (p *peer) Shutdown() { log.Tracef("[PEER] Shutdown peer %s", p.conn.RemoteAddr()) p.disconnect = true p.conn.Close() p.wg.Wait() } // newPeer returns a new bitcoin peer for the provided server and connection. // Use start to begin processing incoming and outgoing messages. func newPeer(s *server, conn net.Conn, inbound bool, persistent bool) *peer { p := peer{ server: s, protocolVersion: btcwire.ProtocolVersion, btcnet: s.btcnet, services: btcwire.SFNodeNetwork, conn: conn, timeConnected: time.Now(), inbound: inbound, persistent: persistent, knownAddresses: make(map[string]bool), outputQueue: make(chan btcwire.Message, outputBufferSize), quit: make(chan bool), } return &p } type logClosure func() string func (c logClosure) String() string { return c() } func newLogClosure(c func() string) logClosure { return logClosure(c) }