Initial import.

This commit is contained in:
Dave Collins 2013-08-06 16:55:22 -05:00
parent c59a86c467
commit 3f54e4199f
14 changed files with 3227 additions and 0 deletions

13
LICENSE Normal file
View file

@ -0,0 +1,13 @@
Copyright (c) 2013 Conformal Systems LLC.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.

View file

@ -2,3 +2,18 @@ btcd
====
btcd is an alternative full node bitcoin implementation written in Go (golang).
This project is currently under active development and is not production ready
yet.
## TODO
The following is a list of major items remaining before production release:
- Implement multi-peer support
- Implement relay
- Complete address manager
- Rework the block syncing code to work with headers
- Documentation
- A lot of code cleanup
- Optimize

183
btcd/addrmanager.go Normal file
View file

@ -0,0 +1,183 @@
// Copyright (c) 2013 Conformal Systems LLC.
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
package main
import (
"github.com/conformal/btcwire"
"net"
"strconv"
"sync"
"time"
)
const (
// maxAddresses identifies the maximum number of addresses that the
// address manager will track.
maxAddresses = 2500
newAddressBufferSize = 50
dumpAddressInterval = time.Minute * 2
)
// updateAddress is a helper function to either update an address already known
// to the address manager, or to add the address if not already known.
func updateAddress(a *AddrManager, netAddr *btcwire.NetAddress) {
// Protect concurrent access.
a.addrCacheLock.Lock()
defer a.addrCacheLock.Unlock()
// Update address if it already exists.
addr := NetAddressKey(netAddr)
if na, ok := a.addrCache[addr]; ok {
// Update the last seen time.
if netAddr.Timestamp.After(na.Timestamp) {
na.Timestamp = netAddr.Timestamp
}
// Update services.
na.AddService(na.Services)
log.Tracef("[AMGR] Updated address manager address %s", addr)
return
}
// Enforce max addresses.
if len(a.addrCache)+1 > maxAddresses {
log.Tracef("[AMGR] Max addresses of %d reached", maxAddresses)
return
}
a.addrCache[addr] = netAddr
log.Tracef("[AMGR] Added new address %s for a total of %d addresses",
addr, len(a.addrCache))
}
// AddrManager provides a concurrency safe address manager for caching potential
// peers on the bitcoin network.
type AddrManager struct {
addrCache map[string]*btcwire.NetAddress
addrCacheLock sync.Mutex
started bool
shutdown bool
newAddresses chan []*btcwire.NetAddress
removeAddresses chan []*btcwire.NetAddress
wg sync.WaitGroup
quit chan bool
}
// addressHandler is the main handler for the address manager. It must be run
// as a goroutine.
func (a *AddrManager) addressHandler() {
dumpAddressTicker := time.NewTicker(dumpAddressInterval)
out:
for !a.shutdown {
select {
case addrs := <-a.newAddresses:
for _, na := range addrs {
updateAddress(a, na)
}
case <-dumpAddressTicker.C:
if !a.shutdown {
// TODO: Dump addresses to database.
}
case <-a.quit:
// TODO: Dump addresses to database.
break out
}
}
dumpAddressTicker.Stop()
a.wg.Done()
log.Trace("[AMGR] Address handler done")
}
// Start begins the core address handler which manages a pool of known
// addresses, timeouts, and interval based writes.
func (a *AddrManager) Start() {
// Already started?
if a.started {
return
}
log.Trace("[AMGR] Starting address manager")
go a.addressHandler()
a.wg.Add(1)
a.started = true
}
// Stop gracefully shuts down the address manager by stopping the main handler.
func (a *AddrManager) Stop() error {
if a.shutdown {
log.Warnf("[AMGR] Address manager is already in the process of " +
"shutting down")
return nil
}
log.Infof("[AMGR] Address manager shutting down")
a.shutdown = true
a.quit <- true
a.wg.Wait()
return nil
}
// AddAddresses adds new addresses to the address manager. It enforces a max
// number of addresses and silently ignores duplicate addresses. It is
// safe for concurrent access.
func (a *AddrManager) AddAddresses(addrs []*btcwire.NetAddress) {
a.newAddresses <- addrs
}
// AddAddress adds a new address to the address manager. It enforces a max
// number of addresses and silently ignores duplicate addresses. It is
// safe for concurrent access.
func (a *AddrManager) AddAddress(addr *btcwire.NetAddress) {
addrs := []*btcwire.NetAddress{addr}
a.newAddresses <- addrs
}
// NeedMoreAddresses returns whether or not the address manager needs more
// addresses.
func (a *AddrManager) NeedMoreAddresses() bool {
// Protect concurrent access.
a.addrCacheLock.Lock()
defer a.addrCacheLock.Unlock()
return len(a.addrCache)+1 <= maxAddresses
}
// NumAddresses returns the number of addresses known to the address manager.
func (a *AddrManager) NumAddresses() int {
// Protect concurrent access.
a.addrCacheLock.Lock()
defer a.addrCacheLock.Unlock()
return len(a.addrCache)
}
// AddressCache returns the current address cache. It must be treated as
// read-only.
func (a *AddrManager) AddressCache() map[string]*btcwire.NetAddress {
return a.addrCache
}
// New returns a new bitcoin address manager.
// Use Start to begin processing asynchronous address updates.
func NewAddrManager() *AddrManager {
am := AddrManager{
addrCache: make(map[string]*btcwire.NetAddress),
newAddresses: make(chan []*btcwire.NetAddress, newAddressBufferSize),
quit: make(chan bool),
}
return &am
}
// NetAddressKey returns a string key in the form of ip:port for IPv4 addresses
// or [ip]:port for IPv6 addresses.
func NetAddressKey(na *btcwire.NetAddress) string {
port := strconv.FormatUint(uint64(na.Port), 10)
addr := net.JoinHostPort(na.IP.String(), port)
return addr
}

110
btcd/addrmanager_test.go Normal file
View file

@ -0,0 +1,110 @@
// Copyright (c) 2013 Conformal Systems LLC.
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
package main_test
import (
"github.com/conformal/btcwire"
"net"
"opensource.conformal.com/go/btcd-internal/addrmgr"
"testing"
"time"
)
// naTest is used to describe a test to be perfomed against the NetAddressKey
// method.
type naTest struct {
in btcwire.NetAddress
want string
}
// naTests houses all of the tests to be performed against the NetAddressKey
// method.
var naTests = make([]naTest, 0)
func addNaTest(ip string, port uint16, want string) {
nip := net.ParseIP(ip)
na := btcwire.NetAddress{
Timestamp: time.Now(),
Services: btcwire.SFNodeNetwork,
IP: nip,
Port: port,
}
test := naTest{na, want}
naTests = append(naTests, test)
}
// addNaTests
func addNaTests() {
// IPv4
// Localhost
addNaTest("127.0.0.1", 8333, "127.0.0.1:8333")
addNaTest("127.0.0.1", 8334, "127.0.0.1:8334")
// Class A
addNaTest("1.0.0.1", 8333, "1.0.0.1:8333")
addNaTest("2.2.2.2", 8334, "2.2.2.2:8334")
addNaTest("27.253.252.251", 8335, "27.253.252.251:8335")
addNaTest("123.3.2.1", 8336, "123.3.2.1:8336")
// Private Class A
addNaTest("10.0.0.1", 8333, "10.0.0.1:8333")
addNaTest("10.1.1.1", 8334, "10.1.1.1:8334")
addNaTest("10.2.2.2", 8335, "10.2.2.2:8335")
addNaTest("10.10.10.10", 8336, "10.10.10.10:8336")
// Class B
addNaTest("128.0.0.1", 8333, "128.0.0.1:8333")
addNaTest("129.1.1.1", 8334, "129.1.1.1:8334")
addNaTest("180.2.2.2", 8335, "180.2.2.2:8335")
addNaTest("191.10.10.10", 8336, "191.10.10.10:8336")
// Private Class B
addNaTest("172.16.0.1", 8333, "172.16.0.1:8333")
addNaTest("172.16.1.1", 8334, "172.16.1.1:8334")
addNaTest("172.16.2.2", 8335, "172.16.2.2:8335")
addNaTest("172.16.172.172", 8336, "172.16.172.172:8336")
// Class C
addNaTest("193.0.0.1", 8333, "193.0.0.1:8333")
addNaTest("200.1.1.1", 8334, "200.1.1.1:8334")
addNaTest("205.2.2.2", 8335, "205.2.2.2:8335")
addNaTest("223.10.10.10", 8336, "223.10.10.10:8336")
// Private Class C
addNaTest("192.168.0.1", 8333, "192.168.0.1:8333")
addNaTest("192.168.1.1", 8334, "192.168.1.1:8334")
addNaTest("192.168.2.2", 8335, "192.168.2.2:8335")
addNaTest("192.168.192.192", 8336, "192.168.192.192:8336")
// IPv6
// Localhost
addNaTest("::1", 8333, "[::1]:8333")
addNaTest("fe80::1", 8334, "[fe80::1]:8334")
// Link-local
addNaTest("fe80::1:1", 8333, "[fe80::1:1]:8333")
addNaTest("fe91::2:2", 8334, "[fe91::2:2]:8334")
addNaTest("fea2::3:3", 8335, "[fea2::3:3]:8335")
addNaTest("feb3::4:4", 8336, "[feb3::4:4]:8336")
// Site-local
addNaTest("fec0::1:1", 8333, "[fec0::1:1]:8333")
addNaTest("fed1::2:2", 8334, "[fed1::2:2]:8334")
addNaTest("fee2::3:3", 8335, "[fee2::3:3]:8335")
addNaTest("fef3::4:4", 8336, "[fef3::4:4]:8336")
}
func TestNetAddressKey(t *testing.T) {
addNaTests()
t.Logf("Running %d tests", len(naTests))
for i, test := range naTests {
key := addrmgr.NetAddressKey(&test.in)
if key != test.want {
t.Errorf("NetAddressKey #%d\n got: %s want: %s", i, key, test.want)
continue
}
}
}

449
btcd/blockmanager.go Normal file
View file

@ -0,0 +1,449 @@
// Copyright (c) 2013 Conformal Systems LLC.
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
package main
import (
"container/list"
"github.com/conformal/btcchain"
"github.com/conformal/btcdb"
_ "github.com/conformal/btcdb/sqlite3"
"github.com/conformal/btcutil"
"github.com/conformal/btcwire"
"os"
"path/filepath"
"sync"
"time"
)
const (
chanBufferSize = 50
)
// inventoryItem is used to track known and requested inventory items.
type inventoryItem struct {
invVect *btcwire.InvVect
peers []*peer
}
// blockMsg packages a bitcoin block message and the peer it came from together
// so the block handler has access to that information.
type blockMsg struct {
block *btcutil.Block
}
// invMsg packages a bitcoin inv message and the peer it came from together
// so the block handler has access to that information.
type invMsg struct {
msg *btcwire.MsgInv
peer *peer
}
// txMsg packages a bitcoin tx message and the peer it came from together
// so the block handler has access to that information.
type txMsg struct {
msg *btcwire.MsgTx
peer *peer
}
// blockManager provides a concurrency safe block manager for handling all
// incoming block inventory advertisement as well as issuing requests to
// download needed blocks of the block chain from other peers. It works by
// forcing all incoming block inventory advertisements through a single
// goroutine which then determines whether the block is needed and how the
// requests should be made amongst multiple peers.
type blockManager struct {
server *server
started bool
shutdown bool
blockChain *btcchain.BlockChain
requestQueue *list.List
requestMap map[string]*inventoryItem
outstandingBlocks int
receivedLogBlocks int64
receivedLogTx int64
lastBlockLogTime time.Time
processingReqs bool
newBlocks chan bool
blockQueue chan *blockMsg
invQueue chan *invMsg
chainNotify chan *btcchain.Notification
wg sync.WaitGroup
quit chan bool
}
// logBlockHeight logs a new block height as an information message to show
// progress to the user. In order to prevent spam, it limits logging to one
// message every 10 seconds with duration and totals included.
func (b *blockManager) logBlockHeight(numTx, height int64) {
b.receivedLogBlocks++
b.receivedLogTx += numTx
now := time.Now()
duration := now.Sub(b.lastBlockLogTime)
if b.outstandingBlocks != 0 && duration < time.Second*10 {
return
}
// Log information about new block height.
blockStr := "blocks"
if b.receivedLogBlocks == 1 {
blockStr = "block"
}
txStr := "transactions"
if b.receivedLogTx == 1 {
txStr = "transaction"
}
log.Infof("[BMGR] Processed %d %s (%d %s) in the last %s - Block "+
"height %d", b.receivedLogBlocks, blockStr, b.receivedLogTx,
txStr, duration, height)
b.receivedLogBlocks = 0
b.receivedLogTx = 0
b.lastBlockLogTime = now
}
// handleInvMsg handles inventory messages for all peers. It adds blocks that
// we need along with which peers know about each block to a request queue
// based upon the advertised inventory. It also attempts to strike a balance
// between the number of in-flight blocks and keeping the request queue full
// by issuing more getblocks (MsgGetBlocks) requests as needed.
func (b *blockManager) handleInvMsg(msg *btcwire.MsgInv, p *peer) {
// Find the last block in the inventory list.
invVects := msg.InvList
var lastHash *btcwire.ShaHash
for i := len(invVects) - 1; i >= 0; i-- {
if invVects[i].Type == btcwire.InvVect_Block {
lastHash = &invVects[i].Hash
break
}
}
for _, iv := range invVects {
switch iv.Type {
case btcwire.InvVect_Block:
// Ignore this block if we already have it.
// TODO(davec): Need to check orphans too.
if b.server.db.ExistsSha(&iv.Hash) {
log.Tracef("[BMGR] Ignoring known block %v.", &iv.Hash)
continue
}
// Add the peer to the list of peers which can serve the block if
// it's already queued to be fetched.
if item, ok := b.requestMap[iv.Hash.String()]; ok {
item.peers = append(item.peers, p)
continue
}
// Add the item to the end of the request queue.
item := &inventoryItem{
invVect: iv,
peers: []*peer{p},
}
b.requestMap[item.invVect.Hash.String()] = item
b.requestQueue.PushBack(item)
b.outstandingBlocks++
case btcwire.InvVect_Tx:
// XXX: Handle transactions here.
}
}
// Request more blocks if there aren't enough in-flight blocks.
if lastHash != nil && b.outstandingBlocks < btcwire.MaxBlocksPerMsg*5 {
stopHash := btcwire.ShaHash{}
gbmsg := btcwire.NewMsgGetBlocks(&stopHash)
gbmsg.AddBlockLocatorHash(lastHash)
p.QueueMessage(gbmsg)
}
}
// handleBlockMsg handles block messages from all peers. It is currently
// very simple. It doesn't validate the block or handle orphans and side
// chains. It simply inserts the block into the database after ensuring the
// previous block is already inserted.
func (b *blockManager) handleBlockMsg(block *btcutil.Block) {
b.outstandingBlocks--
msg := block.MsgBlock()
// Process the block to include validation, best chain selection, orphan
// handling, etc.
err := b.blockChain.ProcessBlock(block)
if err != nil {
blockSha, err2 := block.Sha()
if err2 != nil {
log.Errorf("[BMGR] %v", err2)
}
log.Warnf("[BMGR] Failed to process block %v: %v", blockSha, err)
return
}
// Log info about the new block height.
_, height, err := b.server.db.NewestSha()
if err != nil {
log.Warnf("[BMGR] Failed to obtain latest sha - %v", err)
return
}
b.logBlockHeight(int64(len(msg.Transactions)), height)
// Sync the db to disk when there are no more outstanding blocks.
// NOTE: Periodic syncs happen as new data is requested as well.
if b.outstandingBlocks <= 0 {
b.server.db.Sync()
}
}
// blockHandler is the main handler for the block manager. It must be run as a
// goroutine. It processes block and inv messages in a separate goroutine from
// the peer handlers so the block (MsgBlock) and tx (MsgTx) messages are handled
// by a single thread without needing to lock memory data structures. This is
// important because the block manager controls which blocks are needed and how
// the fetching should proceed.
//
// NOTE: Tx messages need to be handled here too.
// (either that or block and tx need to be handled in separate threads)
func (b *blockManager) blockHandler() {
out:
for !b.shutdown {
select {
// Handle new block messages.
case msg := <-b.blockQueue:
b.handleBlockMsg(msg.block)
// Handle new inventory messages.
case msg := <-b.invQueue:
b.handleInvMsg(msg.msg, msg.peer)
// Request the blocks.
if b.requestQueue.Len() > 0 && !b.processingReqs {
b.processingReqs = true
b.newBlocks <- true
}
case <-b.newBlocks:
numRequested := 0
gdmsg := btcwire.NewMsgGetData()
var p *peer
for e := b.requestQueue.Front(); e != nil; e = b.requestQueue.Front() {
item := e.Value.(*inventoryItem)
p = item.peers[0]
gdmsg.AddInvVect(item.invVect)
delete(b.requestMap, item.invVect.Hash.String())
b.requestQueue.Remove(e)
numRequested++
if numRequested >= btcwire.MaxInvPerMsg {
break
}
}
b.server.db.Sync()
if len(gdmsg.InvList) > 0 && p != nil {
p.QueueMessage(gdmsg)
}
b.processingReqs = false
case <-b.quit:
break out
}
}
b.wg.Done()
log.Trace("[BMGR] Block handler done")
}
// handleNotifyMsg handles notifications from btcchain. Currently it doesn't
// respond to any notifications, but the idea is that it requests missing blocks
// in response to orphan notifications and updates the wallet for blocks
// connected and disconnected to the main chain.
func (b *blockManager) handleNotifyMsg(notification *btcchain.Notification) {
switch notification.Type {
case btcchain.NTOrphanBlock:
// TODO(davec): Ask the peer to fill in the missing blocks for the
// orphan root if it's not nil.
orphanRoot := notification.Data.(*btcwire.ShaHash)
_ = orphanRoot
case btcchain.NTBlockAccepted:
// TODO(davec): Relay inventory, but don't relay old inventory
// during initial block download.
}
}
// chainNotificationHandler is the handler for asynchronous notifications from
// btcchain. It must be run as a goroutine.
func (b *blockManager) chainNotificationHandler() {
out:
for !b.shutdown {
select {
case notification := <-b.chainNotify:
b.handleNotifyMsg(notification)
case <-b.quit:
break out
}
}
b.wg.Done()
log.Trace("[BMGR] Chain notification handler done")
}
// QueueBlock adds the passed block message and peer to the block handling queue.
func (b *blockManager) QueueBlock(block *btcutil.Block) {
// Don't accept more blocks if we're shutting down.
if b.shutdown {
return
}
bmsg := blockMsg{block: block}
b.blockQueue <- &bmsg
}
// QueueInv adds the passed inventory message and peer to the inventory handling
// queue.
func (b *blockManager) QueueInv(msg *btcwire.MsgInv, p *peer) {
// Don't accept more inventory if we're shutting down.
if b.shutdown {
return
}
imsg := invMsg{msg: msg, peer: p}
b.invQueue <- &imsg
}
// Start begins the core block handler which processes block and inv messages.
func (b *blockManager) Start() {
// Already started?
if b.started {
return
}
log.Trace("[BMGR] Starting block manager")
go b.blockHandler()
go b.chainNotificationHandler()
b.wg.Add(2)
b.started = true
}
// Stop gracefully shuts down the block manager by stopping all asynchronous
// handlers and waiting for them to finish.
func (b *blockManager) Stop() error {
if b.shutdown {
log.Warnf("[BMGR] Block manager is already in the process of " +
"shutting down")
return nil
}
log.Infof("[BMGR] Block manager shutting down")
b.shutdown = true
close(b.quit)
b.wg.Wait()
return nil
}
// AddBlockLocators adds block locators to a getblocks message starting with
// the passed hash back to the genesis block hash. In order to keep the list
// of locator hashes to a reasonable number of entries, first it adds the
// most recent 10 block hashes (starting with the passed hash), then doubles the
// step each loop iteration to exponentially decrease the number of hashes the
// further away from head and closer to the genesis block it gets.
func (b *blockManager) AddBlockLocators(hash *btcwire.ShaHash, msg *btcwire.MsgGetBlocks) error {
// XXX(davec): This is fetching the block data too.
block, err := b.server.db.FetchBlockBySha(hash)
if err != nil {
log.Warnf("[BMGR] Lookup of known valid index failed %v", hash)
return err
}
blockIndex := block.Height()
// We want inventory after the passed hash.
msg.AddBlockLocatorHash(hash)
// Generate the block locators according to the algorithm described in
// in the function comment and make sure to leave room for the already
// added hash and final genesis hash.
increment := int64(1)
for i := 1; i < btcwire.MaxBlockLocatorsPerMsg-2; i++ {
if i > 10 {
increment *= 2
}
blockIndex -= increment
if blockIndex <= 1 {
break
}
h, err := b.server.db.FetchBlockShaByHeight(blockIndex)
if err != nil {
// This shouldn't happen and it's ok to ignore, so just
// continue to the next.
log.Warnf("[BMGR] Lookup of known valid index failed %v",
blockIndex)
continue
}
msg.AddBlockLocatorHash(h)
}
msg.AddBlockLocatorHash(&btcwire.GenesisHash)
return nil
}
// newBlockManager returns a new bitcoin block manager.
// Use Start to begin processing asynchronous block and inv updates.
func newBlockManager(s *server) *blockManager {
chainNotify := make(chan *btcchain.Notification, chanBufferSize)
bm := blockManager{
server: s,
blockChain: btcchain.New(s.db, s.btcnet, chainNotify),
requestQueue: list.New(),
requestMap: make(map[string]*inventoryItem),
lastBlockLogTime: time.Now(),
newBlocks: make(chan bool, 1),
blockQueue: make(chan *blockMsg, chanBufferSize),
invQueue: make(chan *invMsg, chanBufferSize),
chainNotify: chainNotify,
quit: make(chan bool),
}
bm.blockChain.DisableVerify(cfg.VerifyDisabled)
return &bm
}
// loadBlockDB opens the block database and returns a handle to it.
func loadBlockDB() (btcdb.Db, error) {
dbPath := filepath.Join(cfg.DbDir, activeNetParams.dbName)
log.Infof("[BMGR] Loading block database from '%s'", dbPath)
db, err := btcdb.OpenDB("sqlite", dbPath)
if err != nil {
// Return the error if it's not because the database doesn't
// exist.
if err != btcdb.DbDoesNotExist {
return nil, err
}
// Create the db if it does not exist.
err = os.MkdirAll(cfg.DbDir, 0700)
if err != nil {
return nil, err
}
db, err = btcdb.CreateDB("sqlite", dbPath)
if err != nil {
return nil, err
}
// Insert the appropriate genesis block for the bitcoin network
// being connected to.
genesis := btcutil.NewBlock(activeNetParams.genesisBlock)
_, err := db.InsertBlock(genesis)
if err != nil {
db.Close()
return nil, err
}
log.Infof("[BMGR] Inserted genesis block %v",
activeNetParams.genesisHash)
}
// Get the latest block height from the database.
_, height, err := db.NewestSha()
if err != nil {
db.Close()
return nil, err
}
log.Infof("[BMGR] Block database loaded with block height %d", height)
return db, nil
}

185
btcd/btcd.go Normal file
View file

@ -0,0 +1,185 @@
// Copyright (c) 2013 Conformal Systems LLC.
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
package main
import (
"fmt"
"github.com/conformal/btcchain"
"github.com/conformal/btcdb"
"github.com/conformal/btcscript"
"github.com/conformal/btcwire"
"github.com/conformal/seelog"
"math/rand"
"net"
"os"
"runtime"
"strconv"
"time"
)
const userAgent = "/btcd:0.0.1/"
// used by the dns seed code to pick a random last seen time
const (
secondsIn3Days int32 = 24 * 60 * 60 * 3
secondsIn4Days int32 = 24 * 60 * 60 * 4
)
var (
log seelog.LoggerInterface = seelog.Disabled
cfg *config
)
// newLogger creates a new seelog logger using the provided logging level and
// log message prefix.
func newLogger(level string, prefix string) seelog.LoggerInterface {
fmtstring := `
<seelog type="adaptive" mininterval="2000000" maxinterval="100000000"
critmsgcount="500" minlevel="%s">
<outputs formatid="all">
<console/>
</outputs>
<formats>
<format id="all" format="[%%Time %%Date] [%%LEV] [%s] %%Msg%%n" />
</formats>
</seelog>`
config := fmt.Sprintf(fmtstring, level, prefix)
logger, err := seelog.LoggerFromConfigAsString(config)
if err != nil {
fmt.Fprintf(os.Stderr, "failed to create logger: %v", err)
os.Exit(1)
}
return logger
}
// useLogger sets the btcd logger to the passed logger.
func useLogger(logger seelog.LoggerInterface) {
log = logger
}
// setLogLevel sets the log level for the logging system. It initialises a
// logger for each subsystem at the provided level.
func setLogLevel(logLevel string) []seelog.LoggerInterface {
var loggers []seelog.LoggerInterface
// Define sub-systems.
subSystems := []struct {
level string
prefix string
useLogger func(seelog.LoggerInterface)
}{
{logLevel, "BTCD", useLogger},
{logLevel, "BCDB", btcdb.UseLogger},
{logLevel, "CHAN", btcchain.UseLogger},
{logLevel, "SCRP", btcscript.UseLogger},
}
// Configure all sub-systems with new loggers while keeping track of
// the created loggers to return so they can be flushed.
for _, s := range subSystems {
newLog := newLogger(s.level, s.prefix)
loggers = append(loggers, newLog)
s.useLogger(newLog)
}
return loggers
}
// btcdMain is the real main function for btcd. It is necessary to work around
// the fact that deferred functions do not run when os.Exit() is called.
func btcdMain() error {
// Initialize logging and setup deferred flushing to ensure all
// outstanding messages are written on shutdown.
loggers := setLogLevel(defaultLogLevel)
defer func() {
for _, logger := range loggers {
logger.Flush()
}
}()
// Load configuration and parse command line.
tcfg, _, err := loadConfig()
if err != nil {
return err
}
cfg = tcfg
// Change the logging level if needed.
if cfg.DebugLevel != defaultLogLevel {
loggers = setLogLevel(cfg.DebugLevel)
}
// Load the block database.
db, err := loadBlockDB()
if err != nil {
log.Errorf("%v", err)
return err
}
defer db.Close()
// Ensure the database is sync'd and closed on Ctrl+C.
addInterruptHandler(func() {
db.RollbackClose()
})
// Create server and start it.
listenAddr := net.JoinHostPort("", cfg.Port)
server, err := newServer(listenAddr, db, activeNetParams.btcnet)
if err != nil {
log.Errorf("Unable to start server on %v", listenAddr)
log.Errorf("%v", err)
return err
}
server.Start()
// only ask dns for peers if we don't have a list of initial seeds.
if !cfg.DisableDNSSeed {
// XXX need a proxy config entry
seedpeers := dnsDiscover(activeNetParams.dnsSeeds, "")
addresses := make([]*btcwire.NetAddress, len(seedpeers))
// if this errors then we have *real* problems
intPort, _ := strconv.Atoi(activeNetParams.peerPort)
for i, peer := range seedpeers {
addresses[i] = new(btcwire.NetAddress)
addresses[i].SetAddress(peer, uint16(intPort))
// bitcoind seeds with addresses from
// a time randomly selected between 3
// and 7 days ago.
addresses[i].Timestamp = time.Now().Add(-1 *
time.Second * time.Duration(secondsIn3Days+
rand.Int31n(secondsIn4Days)))
}
server.addrManager.AddAddresses(addresses)
// XXX if this is empty do we want to use hardcoded
// XXX peers like bitcoind does?
}
peers := cfg.ConnectPeers
if len(peers) == 0 {
peers = cfg.AddPeers
}
// Connect to initial peers.
for _, addr := range peers {
// Connect to peer and add it to the server.
server.ConnectPeerAsync(addr, true)
}
server.WaitForShutdown()
return nil
}
func main() {
// Use all processor cores.
runtime.GOMAXPROCS(runtime.NumCPU())
// Work around defer not working after os.Exit()
err := btcdMain()
if err != nil {
os.Exit(1)
}
}

271
btcd/config.go Normal file
View file

@ -0,0 +1,271 @@
// Copyright (c) 2013 Conformal Systems LLC.
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
package main
import (
"errors"
"fmt"
"github.com/conformal/btcwire"
"net"
"opensource.conformal.com/go/go-flags"
"os"
"path/filepath"
"time"
)
const (
defaultConfigFilename = "btcd.conf"
defaultLogLevel = "info"
defaultBtcnet = btcwire.MainNet
defaultMaxPeers = 8
defaultBanDuration = time.Hour * 24
defaultVerifyEnabled = false
)
var (
defaultConfigFile = filepath.Join(btcdHomeDir(), defaultConfigFilename)
defaultDbDir = filepath.Join(btcdHomeDir(), "db")
)
// config defines the configuration options for btcd.
//
// See loadConfig for details on the configuration load process.
type config struct {
DebugLevel string `short:"d" long:"debuglevel" description:"Logging level {trace, debug, info, warn, error, critical}"`
AddPeers []string `short:"a" long:"addpeer" description:"Add a peer to connect with at startup"`
ConnectPeers []string `long:"connect" description:"Connect only to the specified peers at startup"`
SeedPeer string `short:"s" long:"seedpeer" description:"Retrieve peer addresses from this peer and then disconnect"`
Port string `short:"p" long:"port" description:"Listen for connections on this port (default: 8333, testnet: 18333)"`
RpcPort string `short:"r" long:"rpcport" description:"Listen for json/rpc messages on this port"`
MaxPeers int `long:"maxpeers" description:"Max number of inbound and outbound peers"`
BanDuration time.Duration `long:"banduration" description:"How long to ban misbehaving peers. Valid time units are {s, m, h}. Minimum 1 second"`
VerifyDisabled bool `long:"noverify" description:"Disable block/transaction verification -- WARNING: This option can be dangerous and is for development use only"`
ConfigFile string `short:"C" long:"configfile" description:"Path to configuration file"`
DbDir string `short:"b" long:"dbdir" description:"Directory to store database"`
RpcUser string `short:"u" long:"rpcuser" description:"Username for rpc connections"`
RpcPass string `short:"P" long:"rpcpass" description:"Password for rpc connections"`
DisableRpc bool `long:"norpc" description:"Disable built-in RPC server"`
DisableDNSSeed bool `long:"nodnsseed" description:"Disable DNS seeding for peers"`
TestNet3 bool `long:"testnet" description:"Use the test network"`
RegressionTest bool `long:"regtest" description:"Use the regression test network"`
}
// btcdHomeDir returns an OS appropriate home directory for btcd.
func btcdHomeDir() string {
// Search for Windows APPDATA first. This won't exist on POSIX OSes.
appData := os.Getenv("APPDATA")
if appData != "" {
return filepath.Join(appData, "btcd")
}
// Fall back to standard HOME directory that works for most POSIX OSes.
home := os.Getenv("HOME")
if home != "" {
return filepath.Join(home, ".btcd")
}
// In the worst case, use the current directory.
return "."
}
// validLogLevel returns whether or not logLevel is a valid debug log level.
func validLogLevel(logLevel string) bool {
switch logLevel {
case "trace":
fallthrough
case "debug":
fallthrough
case "info":
fallthrough
case "warn":
fallthrough
case "error":
fallthrough
case "critical":
return true
}
return false
}
// normalizePeerAddress returns addr with the default peer port appended if
// there is not already a port specified.
func normalizePeerAddress(addr string) string {
_, _, err := net.SplitHostPort(addr)
if err != nil {
return net.JoinHostPort(addr, activeNetParams.peerPort)
}
return addr
}
// removeDuplicateAddresses returns a new slice with all duplicate entries in
// addrs removed.
func removeDuplicateAddresses(addrs []string) []string {
result := make([]string, 0)
seen := map[string]bool{}
for _, val := range addrs {
if _, ok := seen[val]; !ok {
result = append(result, val)
seen[val] = true
}
}
return result
}
func normalizeAndRemoveDuplicateAddresses(addrs []string) []string {
for i, addr := range addrs {
addrs[i] = normalizePeerAddress(addr)
}
addrs = removeDuplicateAddresses(addrs)
return addrs
}
// updateConfigWithActiveParams update the passed config with parameters
// from the active net params if the relevant options in the passed config
// object are the default so options specified by the user on the command line
// are not overridden.
func updateConfigWithActiveParams(cfg *config) {
if cfg.Port == netParams(defaultBtcnet).listenPort {
cfg.Port = activeNetParams.listenPort
}
if cfg.RpcPort == netParams(defaultBtcnet).rpcPort {
cfg.RpcPort = activeNetParams.rpcPort
}
}
// filesExists reports whether the named file or directory exists.
func fileExists(name string) bool {
if _, err := os.Stat(name); err != nil {
if os.IsNotExist(err) {
return false
}
}
return true
}
// loadConfig initializes and parses the config using a config file and command
// line options.
//
// The configuration proceeds as follows:
// 1) Start with a default config with sane settings
// 2) Pre-parse the command line to check for an alternative config file
// 3) Load configuration file overwriting defaults with any specified options
// 4) Parse CLI options and overwrite/add any specified options
//
// The above results in btcd functioning properly without any config settings
// while still allowing the user to override settings with config files and
// command line options. Command line options always take precedence.
func loadConfig() (*config, []string, error) {
// Default config.
cfg := config{
DebugLevel: defaultLogLevel,
Port: netParams(defaultBtcnet).listenPort,
RpcPort: netParams(defaultBtcnet).rpcPort,
MaxPeers: defaultMaxPeers,
BanDuration: defaultBanDuration,
ConfigFile: defaultConfigFile,
DbDir: defaultDbDir,
}
// A config file in the current directory takes precedence.
if fileExists(defaultConfigFilename) {
cfg.ConfigFile = defaultConfigFilename
}
// Pre-parse the command line options to see if an alternative config
// file was specified.
preCfg := cfg
preParser := flags.NewParser(&preCfg, flags.Default)
_, err := preParser.Parse()
if err != nil {
if e, ok := err.(*flags.Error); !ok || e.Type != flags.ErrHelp {
preParser.WriteHelp(os.Stderr)
}
return nil, nil, err
}
// Load additional config from file.
parser := flags.NewParser(&cfg, flags.Default)
err = parser.ParseIniFile(preCfg.ConfigFile)
if err != nil {
if _, ok := err.(*os.PathError); !ok {
fmt.Fprintln(os.Stderr, err)
parser.WriteHelp(os.Stderr)
return nil, nil, err
}
log.Warnf("%v", err)
}
// Parse command line options again to ensure they take precedence.
remainingArgs, err := parser.Parse()
if err != nil {
if e, ok := err.(*flags.Error); !ok || e.Type != flags.ErrHelp {
parser.WriteHelp(os.Stderr)
}
return nil, nil, err
}
// The two test networks can't be selected simultaneously.
if cfg.TestNet3 && cfg.RegressionTest {
str := "%s: The testnet and regtest params can't be used " +
"together -- choose one of the two"
err := errors.New(fmt.Sprintf(str, "loadConfig"))
fmt.Fprintln(os.Stderr, err)
parser.WriteHelp(os.Stderr)
return nil, nil, err
}
// Choose the active network params based on the testnet and regression
// test net flags.
if cfg.TestNet3 {
activeNetParams = netParams(btcwire.TestNet3)
} else if cfg.RegressionTest {
activeNetParams = netParams(btcwire.TestNet)
}
updateConfigWithActiveParams(&cfg)
// Validate debug log level.
if !validLogLevel(cfg.DebugLevel) {
str := "%s: The specified debug level is invalid -- parsed [%v]"
err := errors.New(fmt.Sprintf(str, "loadConfig", cfg.DebugLevel))
fmt.Fprintln(os.Stderr, err)
parser.WriteHelp(os.Stderr)
return nil, nil, err
}
// Don't allow ban durations that are too short.
if cfg.BanDuration < time.Duration(time.Second) {
str := "%s: The banduration option may not be less than 1s -- parsed [%v]"
err := errors.New(fmt.Sprintf(str, "loadConfig", cfg.BanDuration))
fmt.Fprintln(os.Stderr, err)
parser.WriteHelp(os.Stderr)
return nil, nil, err
}
// --addPeer and --connect do not mix.
if len(cfg.AddPeers) > 0 && len(cfg.ConnectPeers) > 0 {
str := "%s: the --addpeer and --connect options can not be " +
"mixed"
err := errors.New(fmt.Sprintf(str, "loadConfig"))
fmt.Fprintln(os.Stderr, err)
parser.WriteHelp(os.Stderr)
return nil, nil, err
}
// Connect means no seeding or listening.
if len(cfg.ConnectPeers) > 0 {
cfg.DisableDNSSeed = true
// XXX turn off server listening.
}
// Add default port to all added peer addresses if needed and remove
// duplicate addresses.
cfg.AddPeers = normalizeAndRemoveDuplicateAddresses(cfg.AddPeers)
cfg.ConnectPeers =
normalizeAndRemoveDuplicateAddresses(cfg.ConnectPeers)
return &cfg, remainingArgs, nil
}

171
btcd/discovery.go Normal file
View file

@ -0,0 +1,171 @@
// Copyright (c) 2013 Conformal Systems LLC.
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
package main
import (
"encoding/binary"
"errors"
"fmt"
"net"
)
const (
torSucceeded = 0x00
torGeneralError = 0x01
torNotAllowed = 0x02
torNetUnreachable = 0x03
torHostUnreachable = 0x04
torConnectionRefused = 0x05
torTtlExpired = 0x06
torCmdNotSupported = 0x07
torAddrNotSupported = 0x08
)
var (
ErrTorInvalidAddressResponse = errors.New("Invalid address response")
ErrTorInvalidProxyResponse = errors.New("Invalid proxy response")
ErrTorUnrecognizedAuthMethod = errors.New("Invalid proxy authentication method")
torStatusErrors = map[byte]error{
torSucceeded: errors.New("Tor succeeded"),
torGeneralError: errors.New("Tor general error"),
torNotAllowed: errors.New("Tor not allowed"),
torNetUnreachable: errors.New("Tor network is unreachable"),
torHostUnreachable: errors.New("Tor host is unreachable"),
torConnectionRefused: errors.New("Tor connection refused"),
torTtlExpired: errors.New("Tor ttl expired"),
torCmdNotSupported: errors.New("Tor command not supported"),
torAddrNotSupported: errors.New("Tor address type not supported"),
}
)
// try individual DNS server return list of strings for responses.
func doDNSLookup(host, proxy string) ([]net.IP, error) {
var err error
var addrs []net.IP
if proxy != "" {
addrs, err = torLookupIP(host, proxy)
} else {
addrs, err = net.LookupIP(host)
}
if err != nil {
return nil, err
}
return addrs, nil
}
// Use Tor to resolve DNS.
/*
TODO:
* this function must be documented internally
* this function does not handle IPv6
*/
func torLookupIP(host, proxy string) ([]net.IP, error) {
conn, err := net.Dial("tcp", proxy)
if err != nil {
return nil, err
}
defer conn.Close()
buf := []byte{'\x05', '\x01', '\x00'}
_, err = conn.Write(buf)
if err != nil {
return nil, err
}
buf = make([]byte, 2)
_, err = conn.Read(buf)
if err != nil {
return nil, err
}
if buf[0] != '\x05' {
return nil, ErrTorInvalidProxyResponse
}
if buf[1] != '\x00' {
return nil, ErrTorUnrecognizedAuthMethod
}
buf = make([]byte, 7+len(host))
buf[0] = 5 // protocol version
buf[1] = '\xF0' // Tor Resolve
buf[2] = 0 // reserved
buf[3] = 3 // Tor Resolve
buf[4] = byte(len(host))
copy(buf[5:], host)
buf[5+len(host)] = 0 // Port 0
_, err = conn.Write(buf)
if err != nil {
return nil, err
}
buf = make([]byte, 4)
_, err = conn.Read(buf)
if err != nil {
return nil, err
}
if buf[0] != 5 {
return nil, ErrTorInvalidProxyResponse
}
if buf[1] != 0 {
if int(buf[1]) > len(torStatusErrors) {
err = ErrTorInvalidProxyResponse
} else {
err := torStatusErrors[buf[1]]
if err == nil {
err = ErrTorInvalidProxyResponse
}
}
return nil, err
}
if buf[3] != 1 {
err := torStatusErrors[torGeneralError]
return nil, err
}
buf = make([]byte, 4)
bytes, err := conn.Read(buf)
if err != nil {
return nil, err
}
if bytes != 4 {
return nil, ErrTorInvalidAddressResponse
}
r := binary.BigEndian.Uint32(buf)
addr := make([]net.IP, 1)
addr[0] = net.IPv4(byte(r>>24), byte(r>>16), byte(r>>8), byte(r))
return addr, nil
}
// dnsDiscover looks up the list of peers resolved by dns for all hosts in
// seeders. If proxy is not "" then this is used as a tor proxy for the
// resolution. If any errors occur then that seeder that errored will not have
// any hosts in the list. Therefore if all hosts failed an empty slice of
// strings will be returned.
func dnsDiscover(seeders []string, proxy string) []net.IP {
peers := []net.IP{}
for _, seeder := range seeders {
log.Debugf("[DISC] Fetching list of seeds from %v", seeder)
newPeers, err := doDNSLookup(seeder, proxy)
if err != nil {
seederPlusProxy := seeder
if proxy != "" {
seederPlusProxy = fmt.Sprintf("%s (proxy %s)",
seeder, proxy)
}
log.Warnf("[DISC] Failed to fetch dns seeds "+
"from %s: %v", seederPlusProxy, err)
continue
}
peers = append(peers, newPeers...)
}
return peers
}

100
btcd/params.go Normal file
View file

@ -0,0 +1,100 @@
// Copyright (c) 2013 Conformal Systems LLC.
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
package main
import (
"github.com/conformal/btcchain"
"github.com/conformal/btcwire"
"math/big"
)
// activeNetParams is a pointer to the parameters specific to the
// currently active bitcoin network.
var activeNetParams = netParams(defaultBtcnet)
// params is used to group parameters for various networks such as the main
// network and test networks.
type params struct {
dbName string
btcnet btcwire.BitcoinNet
genesisBlock *btcwire.MsgBlock
genesisHash *btcwire.ShaHash
powLimit *big.Int
powLimitBits uint32
peerPort string
listenPort string
rpcPort string
dnsSeeds []string
}
// mainNetParams contains parameters specific to the main network
// (btcwire.MainNet).
var mainNetParams = params{
dbName: "btcd.db",
btcnet: btcwire.MainNet,
genesisBlock: btcchain.ChainParams(btcwire.MainNet).GenesisBlock,
genesisHash: btcchain.ChainParams(btcwire.MainNet).GenesisHash,
powLimit: btcchain.ChainParams(btcwire.MainNet).PowLimit,
powLimitBits: btcchain.ChainParams(btcwire.MainNet).PowLimitBits,
listenPort: btcwire.MainPort,
peerPort: btcwire.MainPort,
rpcPort: "8332",
dnsSeeds: []string{
"seed.bitcoin.sipa.be",
"dnsseed.bluematt.me",
"dnsseed.bitcoin.dashjr.org",
"bitseed.xf2.org",
},
}
// regressionParams contains parameters specific to the regression test network
// (btcwire.TestNet).
var regressionParams = params{
dbName: "btcd_regtest.db",
btcnet: btcwire.TestNet,
genesisBlock: btcchain.ChainParams(btcwire.TestNet).GenesisBlock,
genesisHash: btcchain.ChainParams(btcwire.TestNet).GenesisHash,
powLimit: btcchain.ChainParams(btcwire.TestNet).PowLimit,
powLimitBits: btcchain.ChainParams(btcwire.TestNet).PowLimitBits,
listenPort: btcwire.RegressionTestPort,
peerPort: btcwire.TestNetPort,
rpcPort: "18332",
dnsSeeds: []string{},
}
// testNet3Params contains parameters specific to the test network (version 3)
// (btcwire.TestNet3).
var testNet3Params = params{
dbName: "btcd_testnet.db",
btcnet: btcwire.TestNet3,
genesisBlock: btcchain.ChainParams(btcwire.TestNet3).GenesisBlock,
genesisHash: btcchain.ChainParams(btcwire.TestNet3).GenesisHash,
powLimit: btcchain.ChainParams(btcwire.TestNet3).PowLimit,
powLimitBits: btcchain.ChainParams(btcwire.TestNet3).PowLimitBits,
listenPort: btcwire.TestNetPort,
peerPort: btcwire.TestNetPort,
rpcPort: "18332",
dnsSeeds: []string{
"testnet-seed.bitcoin.petertodd.org",
"testnet-seed.bluematt.me",
},
}
// netParams returns parameters specific to the passed bitcoin network.
func netParams(btcnet btcwire.BitcoinNet) *params {
switch btcnet {
case btcwire.TestNet:
return &regressionParams
case btcwire.TestNet3:
return &testNet3Params
// Return main net by default.
case btcwire.MainNet:
fallthrough
default:
return &mainNetParams
}
}

752
btcd/peer.go Normal file
View file

@ -0,0 +1,752 @@
// Copyright (c) 2013 Conformal Systems LLC.
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
package main
import (
"bytes"
"errors"
"github.com/conformal/btcdb"
"github.com/conformal/btcutil"
"github.com/conformal/btcwire"
"github.com/davecgh/go-spew/spew"
"net"
"opensource.conformal.com/go/btcd-internal/addrmgr"
"sync"
"time"
)
const outputBufferSize = 50
// zeroHash is the zero value hash (all zeros). It is defined as a convenience.
var zeroHash btcwire.ShaHash
// minUint32 is a helper function to return the minimum of two uint32s.
// This avoids a math import and the need to cast to floats.
func minUint32(a, b uint32) uint32 {
if a < b {
return a
}
return b
}
// peer provides a bitcoin peer for handling bitcoin communications.
type peer struct {
server *server
protocolVersion uint32
btcnet btcwire.BitcoinNet
services btcwire.ServiceFlag
started bool
conn net.Conn
timeConnected time.Time
inbound bool
disconnect bool
persistent bool
versionKnown bool
knownAddresses map[string]bool
lastBlock int32
wg sync.WaitGroup
outputQueue chan btcwire.Message
quit chan bool
}
// pushVersionMsg sends a version message to the connected peer using the
// current state.
func (p *peer) pushVersionMsg() error {
_, blockNum, err := p.server.db.NewestSha()
if err != nil {
return err
}
msg, err := btcwire.NewMsgVersionFromConn(p.conn, p.server.nonce,
userAgent, int32(blockNum))
if err != nil {
return err
}
// XXX: bitcoind appears to always enable the full node services flag
// of the remote peer netaddress field in the version message regardless
// of whether it knows it supports it or not. Also, bitcoind sets
// the services field of the local peer to 0 regardless of support.
//
// Realistically, this should be set as follows:
// - For outgoing connections:
// - Set the local netaddress services to what the local peer
// actually supports
// - Set the remote netaddress services to 0 to indicate no services
// as they are still unknown
// - For incoming connections:
// - Set the local netaddress services to what the local peer
// actually supports
// - Set the remote netaddress services to the what was advertised by
// by the remote peer in its version message
msg.AddrYou.Services = btcwire.SFNodeNetwork
// Advertise that we're a full node.
msg.Services = btcwire.SFNodeNetwork
p.outputQueue <- msg
return nil
}
// handleVersionMsg is invoked when a peer receives a version bitcoin message
// and is used to negotiate the protocol version details as well as kick start
// the communications.
func (p *peer) handleVersionMsg(msg *btcwire.MsgVersion) {
// Detect self connections.
if msg.Nonce == p.server.nonce {
log.Debugf("[PEER] Disconnecting peer connected to self %s",
p.conn.RemoteAddr())
p.disconnect = true
p.conn.Close()
return
}
// Limit to one version message per peer.
if p.versionKnown {
log.Errorf("[PEER] Only one version message per peer is allowed %s.",
p.conn.RemoteAddr())
p.disconnect = true
p.conn.Close()
return
}
// Negotiate the protocol version.
p.protocolVersion = minUint32(p.protocolVersion, uint32(msg.ProtocolVersion))
p.versionKnown = true
log.Debugf("[PEER] Negotiated protocol version %d for peer %s",
p.protocolVersion, p.conn.RemoteAddr())
p.lastBlock = msg.LastBlock
// Inbound connections.
if p.inbound {
// Set the supported services for the peer to what the remote
// peer advertised.
p.services = msg.Services
// Send version.
err := p.pushVersionMsg()
if err != nil {
log.Errorf("[PEER] %v", err)
p.disconnect = true
p.conn.Close()
return
}
// Add inbound peer address to the server address manager.
na, err := btcwire.NewNetAddress(p.conn.RemoteAddr(), p.services)
if err != nil {
log.Errorf("[PEER] %v", err)
p.disconnect = true
p.conn.Close()
return
}
p.server.addrManager.AddAddress(na)
}
// Send verack.
p.outputQueue <- btcwire.NewMsgVerAck()
// Outbound connections.
if !p.inbound {
// TODO: Only do this if we're listening, not doing the initial
// block download, and are routable.
// Advertise the local address.
na, err := btcwire.NewNetAddress(p.conn.LocalAddr(), p.services)
if err != nil {
log.Errorf("[PEER] %v", err)
p.disconnect = true
p.conn.Close()
return
}
na.Services = p.services
addresses := map[string]*btcwire.NetAddress{
addrmgr.NetAddressKey(na): na,
}
p.pushAddrMsg(addresses)
// Request known addresses if the server address manager needs
// more and the peer has a protocol version new enough to
// include a timestamp with addresses.
hasTimestamp := p.protocolVersion >= btcwire.NetAddressTimeVersion
if p.server.addrManager.NeedMoreAddresses() && hasTimestamp {
p.outputQueue <- btcwire.NewMsgGetAddr()
}
}
// Request latest blocks if the peer has blocks we're interested in.
// XXX: Ask block manager for latest so we get in-flight too...
sha, lastBlock, err := p.server.db.NewestSha()
if err != nil {
log.Errorf("[PEER] %v", err)
p.disconnect = true
p.conn.Close()
}
// If the peer has blocks we're interested in.
if p.lastBlock > int32(lastBlock) {
stopHash := btcwire.ShaHash{}
gbmsg := btcwire.NewMsgGetBlocks(&stopHash)
p.server.blockManager.AddBlockLocators(sha, gbmsg)
p.outputQueue <- gbmsg
}
// TODO: Relay alerts.
}
// pushTxMsg sends a tx message for the provided transaction hash to the
// connected peer. An error is returned if the transaction sha is not known.
func (p *peer) pushTxMsg(sha btcwire.ShaHash) error {
// We dont deal with these for now.
return errors.New("Tx fetching not implemented")
}
// pushBlockMsg sends a block message for the provided block hash to the
// connected peer. An error is returned if the block hash is not known.
func (p *peer) pushBlockMsg(sha btcwire.ShaHash) error {
// What should this function do about the rate limiting the
// number of blocks queued for this peer?
// Current thought is have a counting mutex in the peer
// such that if > N Tx/Block requests are currently in
// the tx queue, wait until the mutex clears allowing more to be
// sent. This prevents 500 1+MB blocks from being loaded into
// memory and sit around until the output queue drains.
// Actually the outputQueue has a limit of 50 in its queue
// but still 50MB to 1.6GB(50 32MB blocks) just setting
// in memory waiting to be sent is pointless.
// I would recommend a getdata request limit of about 5
// outstanding objects.
// Should the tx complete api be a mutex or channel?
blk, err := p.server.db.FetchBlockBySha(&sha)
if err != nil {
log.Tracef("[PEER] Unable to fetch requested block sha %v: %v",
&sha, err)
return err
}
p.QueueMessage(blk.MsgBlock())
return nil
}
// handleGetData is invoked when a peer receives a getdata bitcoin message and
// is used to deliver block and transaction information.
func (p *peer) handleGetDataMsg(msg *btcwire.MsgGetData) {
notFound := btcwire.NewMsgNotFound()
out:
for _, iv := range msg.InvList {
var err error
switch iv.Type {
case btcwire.InvVect_Tx:
err = p.pushTxMsg(iv.Hash)
case btcwire.InvVect_Block:
err = p.pushBlockMsg(iv.Hash)
default:
log.Warnf("[PEER] Unknown type in inventory request %d",
iv.Type)
break out
}
if err != nil {
notFound.AddInvVect(iv)
}
}
if len(notFound.InvList) != 0 {
p.QueueMessage(notFound)
}
}
// handleGetBlocksMsg is invoked when a peer receives a getdata bitcoin message.
func (p *peer) handleGetBlocksMsg(msg *btcwire.MsgGetBlocks) {
var err error
startIdx := int64(0)
endIdx := btcdb.AllShas
// Return all block hashes to the latest one (up to max per message) if
// no stop hash was specified.
// Attempt to find the ending index of the stop hash if specified.
if !msg.HashStop.IsEqual(&zeroHash) {
block, err := p.server.db.FetchBlockBySha(&msg.HashStop)
if err != nil {
// Fetch all if we dont recognize the stop hash.
endIdx = btcdb.AllShas
}
endIdx = block.Height()
}
// TODO(davec): This should have some logic to utilize the additional
// locator hashes to ensure the proper chain.
for _, hash := range msg.BlockLocatorHashes {
// TODO(drahn) does using the caching interface make sense
// on index lookups ?
block, err := p.server.db.FetchBlockBySha(hash)
if err == nil {
// Start with the next hash since we know this one.
startIdx = block.Height() + 1
break
}
}
// Don't attempt to fetch more than we can put into a single message.
if endIdx-startIdx > btcwire.MaxInvPerMsg {
endIdx = startIdx + btcwire.MaxInvPerMsg
}
// Fetch the inventory from the block database.
hashList, err := p.server.db.FetchHeightRange(startIdx, endIdx)
if err != nil {
log.Warnf(" lookup returned %v ", err)
return
}
// Nothing to send.
if len(hashList) == 0 {
return
}
// Generate inventory vectors and push the inventory message.
inv := btcwire.NewMsgInv()
for _, hash := range hashList {
iv := btcwire.InvVect{Type: btcwire.InvVect_Block, Hash: hash}
inv.AddInvVect(&iv)
}
p.QueueMessage(inv)
}
// handleGetBlocksMsg is invoked when a peer receives a getheaders bitcoin
// message.
func (p *peer) handleGetHeadersMsg(msg *btcwire.MsgGetHeaders) {
var err error
startIdx := int64(0)
endIdx := btcdb.AllShas
// Return all block hashes to the latest one (up to max per message) if
// no stop hash was specified.
// Attempt to find the ending index of the stop hash if specified.
if !msg.HashStop.IsEqual(&zeroHash) {
block, err := p.server.db.FetchBlockBySha(&msg.HashStop)
if err != nil {
// Fetch all if we dont recognize the stop hash.
endIdx = btcdb.AllShas
}
endIdx = block.Height()
}
// TODO(davec): This should have some logic to utilize the additional
// locator hashes to ensure the proper chain.
for _, hash := range msg.BlockLocatorHashes {
// TODO(drahn) does using the caching interface make sense
// on index lookups ?
block, err := p.server.db.FetchBlockBySha(hash)
if err == nil {
// Start with the next hash since we know this one.
startIdx = block.Height() + 1
break
}
}
// Don't attempt to fetch more than we can put into a single message.
if endIdx-startIdx > btcwire.MaxBlockHeadersPerMsg {
endIdx = startIdx + btcwire.MaxBlockHeadersPerMsg
}
// Fetch the inventory from the block database.
hashList, err := p.server.db.FetchHeightRange(startIdx, endIdx)
if err != nil {
log.Warnf("lookup returned %v ", err)
return
}
// Nothing to send.
if len(hashList) == 0 {
return
}
// Generate inventory vectors and push the inventory message.
headersMsg := btcwire.NewMsgHeaders()
for _, hash := range hashList {
block, err := p.server.db.FetchBlockBySha(&hash)
if err != nil {
log.Warnf("[PEER] badness %v", err)
}
hdr := block.MsgBlock().Header // copy
hdr.TxnCount = 0
headersMsg.AddBlockHeader(&hdr)
}
p.QueueMessage(headersMsg)
}
// handleGetAddrMsg is invoked when a peer receives a getaddr bitcoin message
// and is used to provide the peer with known addresses from the address
// manager.
func (p *peer) handleGetAddrMsg(msg *btcwire.MsgGetAddr) {
// Get the current known addresses from the address manager.
addrCache := p.server.addrManager.AddressCache()
// Push the addresses.
err := p.pushAddrMsg(addrCache)
if err != nil {
log.Errorf("[PEER] %v", err)
p.disconnect = true
p.conn.Close()
return
}
}
// pushAddrMsg sends one, or more, addr message(s) to the connected peer using
// the provided addresses.
func (p *peer) pushAddrMsg(addresses map[string]*btcwire.NetAddress) error {
// Nothing to send.
if len(addresses) == 0 {
return nil
}
numAdded := 0
msg := btcwire.NewMsgAddr()
for _, na := range addresses {
// Filter addresses the peer already knows about.
if p.knownAddresses[addrmgr.NetAddressKey(na)] {
continue
}
// Add the address to the message.
err := msg.AddAddress(na)
if err != nil {
return err
}
numAdded++
// Split into multiple messages as needed.
if numAdded > 0 && numAdded%btcwire.MaxAddrPerMsg == 0 {
p.outputQueue <- msg
msg.ClearAddresses()
}
}
// Send message with remaining addresses if needed.
if numAdded%btcwire.MaxAddrPerMsg != 0 {
p.outputQueue <- msg
}
return nil
}
// handleAddrMsg is invoked when a peer receives an addr bitcoin message and
// is used to notify the server about advertised addresses.
func (p *peer) handleAddrMsg(msg *btcwire.MsgAddr) {
// Ignore old style addresses which don't include a timestamp.
if p.protocolVersion < btcwire.NetAddressTimeVersion {
return
}
// A message that has no addresses is invalid.
if len(msg.AddrList) == 0 {
log.Errorf("[PEER] Command [%s] from %s does not contain any addresses",
msg.Command(), p.conn.RemoteAddr())
p.disconnect = true
p.conn.Close()
return
}
for _, na := range msg.AddrList {
// Don't add more address if we're disconnecting.
if p.disconnect {
return
}
// Set the timestamp to 5 days ago if it's more than 24 hours
// in the future so this address is one of the first to be
// removed when space is needed.
now := time.Now()
if na.Timestamp.After(now.Add(time.Minute * 10)) {
na.Timestamp = now.Add(-1 * time.Hour * 24 * 5)
}
// Add address to known addresses for this peer.
p.knownAddresses[addrmgr.NetAddressKey(na)] = true
}
// Add addresses to server address manager. The address manager handles
// the details of things such as preventing duplicate addresses, max
// addresses, and last seen updates.
p.server.addrManager.AddAddresses(msg.AddrList)
}
// handlePingMsg is invoked when a peer receives a ping bitcoin message. For
// recent clients (protocol version > BIP0031Version), it replies with a pong
// message. For older clients, it does nothing and anything other than failure
// is considered a successful ping.
func (p *peer) handlePingMsg(msg *btcwire.MsgPing) {
// Only Reply with pong is message comes from a new enough client.
if p.protocolVersion > btcwire.BIP0031Version {
// Include nonce from ping so pong can be identified.
p.outputQueue <- btcwire.NewMsgPong(msg.Nonce)
}
}
// readMessage reads the next bitcoin message from the peer with logging.
func (p *peer) readMessage() (msg btcwire.Message, buf []byte, err error) {
msg, buf, err = btcwire.ReadMessage(p.conn, p.protocolVersion, p.btcnet)
if err != nil {
return
}
log.Debugf("[PEER] Received command [%v] from %s", msg.Command(),
p.conn.RemoteAddr())
// Use closures to log expensive operations so they are only run when
// the logging level requires it.
log.Tracef("%v", newLogClosure(func() string {
return "[PEER] " + spew.Sdump(msg)
}))
log.Tracef("%v", newLogClosure(func() string {
return "[PEER] " + spew.Sdump(buf)
}))
return
}
// writeMessage sends a bitcoin Message to the peer with logging.
func (p *peer) writeMessage(msg btcwire.Message) error {
log.Debugf("[PEER] Sending command [%v] to %s", msg.Command(),
p.conn.RemoteAddr())
// Use closures to log expensive operations so they are only run when the
// logging level requires it.
log.Tracef("%v", newLogClosure(func() string {
return "[PEER] msg" + spew.Sdump(msg)
}))
log.Tracef("%v", newLogClosure(func() string {
var buf bytes.Buffer
err := btcwire.WriteMessage(&buf, msg, p.protocolVersion, p.btcnet)
if err != nil {
return err.Error()
}
return "[PEER] " + spew.Sdump(buf.Bytes())
}))
// Write the message to the peer.
err := btcwire.WriteMessage(p.conn, msg, p.protocolVersion, p.btcnet)
if err != nil {
return err
}
return nil
}
// isAllowedByRegression returns whether or not the passed error is allowed by
// regression tests without disconnecting the peer. In particular, regression
// tests need to be allowed to send malformed messages without the peer being
// disconnected.
func (p *peer) isAllowedByRegression(err error) bool {
// Don't allow the error if it's not specifically a malformed message
// error.
if _, ok := err.(*btcwire.MessageError); !ok {
return false
}
// Don't allow the error if it's not coming from localhost or the
// hostname can't be determined for some reason.
host, _, err := net.SplitHostPort(p.conn.RemoteAddr().String())
if err != nil {
return false
}
if host != "127.0.0.1" && host != "localhost" {
return false
}
// Allowed if all checks passed.
return true
}
// inHandler handles all incoming messages for the peer. It must be run as a
// goroutine.
func (p *peer) inHandler() {
out:
for !p.disconnect {
rmsg, buf, err := p.readMessage()
if err != nil {
// In order to allow regression tests with malformed
// messages, don't disconnect the peer when we're in
// regression test mode and the error is one of the
// allowed errors.
if cfg.RegressionTest && p.isAllowedByRegression(err) {
log.Errorf("[PEER] %v", err)
continue
}
// Only log the error if we're not forcibly disconnecting.
if !p.disconnect {
log.Errorf("[PEER] %v", err)
}
break out
}
// Ensure version message comes first.
if _, ok := rmsg.(*btcwire.MsgVersion); !ok && !p.versionKnown {
log.Errorf("[PEER] A version message must precede all others")
break out
}
// Some messages are handled directly, while other messages
// are sent to a queue to be processed. Directly handling
// getdata and getblocks messages makes it impossible for a peer
// to spam with requests. However, it means that our getdata
// requests to it may not get prompt replies.
switch msg := rmsg.(type) {
case *btcwire.MsgVersion:
p.handleVersionMsg(msg)
case *btcwire.MsgVerAck:
// Do nothing.
case *btcwire.MsgGetAddr:
p.handleGetAddrMsg(msg)
case *btcwire.MsgAddr:
p.handleAddrMsg(msg)
case *btcwire.MsgPing:
p.handlePingMsg(msg)
case *btcwire.MsgPong:
// Don't do anything, but could try to work out network
// timing or similar.
case *btcwire.MsgAlert:
p.server.BroadcastMessage(msg, p)
case *btcwire.MsgBlock:
block := btcutil.NewBlockFromBlockAndBytes(msg, buf)
p.server.blockManager.QueueBlock(block)
case *btcwire.MsgInv:
p.server.blockManager.QueueInv(msg, p)
case *btcwire.MsgGetData:
p.handleGetDataMsg(msg)
case *btcwire.MsgGetBlocks:
p.handleGetBlocksMsg(msg)
case *btcwire.MsgGetHeaders:
p.handleGetHeadersMsg(msg)
default:
log.Debugf("[PEER] Received unhandled message of type %v: Fix Me",
rmsg.Command())
}
}
// Ensure connection is closed and notify server that the peer is done.
p.disconnect = true
p.conn.Close()
p.server.donePeers <- p
p.quit <- true
p.wg.Done()
log.Tracef("[PEER] Peer input handler done for %s", p.conn.RemoteAddr())
}
// outHandler handles all outgoing messages for the peer. It must be run as a
// goroutine. It uses a buffered channel to serialize output messages while
// allowing the sender to continue running asynchronously.
func (p *peer) outHandler() {
out:
for {
select {
case msg := <-p.outputQueue:
// Don't send anything if we're disconnected.
if p.disconnect {
continue
}
err := p.writeMessage(msg)
if err != nil {
p.disconnect = true
log.Errorf("[PEER] %v", err)
}
case <-p.quit:
break out
}
}
p.wg.Done()
log.Tracef("[PEER] Peer output handler done for %s", p.conn.RemoteAddr())
}
// QueueMessage adds the passed bitcoin message to the peer send queue. It
// uses a buffered channel to communicate with the output handler goroutine so
// it is automatically rate limited and safe for concurrent access.
func (p *peer) QueueMessage(msg btcwire.Message) {
p.outputQueue <- msg
}
// Start begins processing input and output messages. It also sends the initial
// version message for outbound connections to start the negotiation process.
func (p *peer) Start() error {
// Already started?
if p.started {
return nil
}
log.Tracef("[PEER] Starting peer %s", p.conn.RemoteAddr())
// Send an initial version message if this is an outbound connection.
if !p.inbound {
err := p.pushVersionMsg()
if err != nil {
log.Errorf("[PEER] %v", err)
p.conn.Close()
return err
}
}
// Start processing input and output.
go p.inHandler()
go p.outHandler()
p.wg.Add(2)
p.started = true
// If server is shutting down, don't even start watchdog
if p.server.shutdown {
log.Debug("[PEER] server is shutting down")
return nil
}
return nil
}
// Shutdown gracefully shuts down the peer by signalling the async input and
// output handler and waiting for them to finish.
func (p *peer) Shutdown() {
log.Tracef("[PEER] Shutdown peer %s", p.conn.RemoteAddr())
p.disconnect = true
p.conn.Close()
p.wg.Wait()
}
// newPeer returns a new bitcoin peer for the provided server and connection.
// Use start to begin processing incoming and outgoing messages.
func newPeer(s *server, conn net.Conn, inbound bool, persistent bool) *peer {
p := peer{
server: s,
protocolVersion: btcwire.ProtocolVersion,
btcnet: s.btcnet,
services: btcwire.SFNodeNetwork,
conn: conn,
timeConnected: time.Now(),
inbound: inbound,
persistent: persistent,
knownAddresses: make(map[string]bool),
outputQueue: make(chan btcwire.Message, outputBufferSize),
quit: make(chan bool),
}
return &p
}
type logClosure func() string
func (c logClosure) String() string {
return c()
}
func newLogClosure(c func() string) logClosure {
return logClosure(c)
}

486
btcd/rpcserver.go Normal file
View file

@ -0,0 +1,486 @@
// Copyright (c) 2013 Conformal Systems LLC.
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
package main
import (
"encoding/json"
"github.com/conformal/btcchain"
"github.com/conformal/btcjson"
"github.com/conformal/btcscript"
"github.com/conformal/btcwire"
"github.com/davecgh/go-spew/spew"
"math/big"
"net"
"net/http"
"strconv"
"strings"
"sync"
)
// rpcServer holds the items the rpc server may need to access (config,
// shutdown, main server, etc.)
type rpcServer struct {
started bool
shutdown bool
server *server
wg sync.WaitGroup
rpcport string
username string
password string
listener net.Listener
}
// Start is used by server.go to start the rpc listener.
func (s *rpcServer) Start() {
if s.started {
return
}
log.Trace("[RPCS] Starting RPC server")
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
jsonRpcRead(w, r, s)
})
listenAddr := net.JoinHostPort("", s.rpcport)
httpServer := &http.Server{Addr: listenAddr}
go func() {
log.Infof("[RPCS] RPC server listening on %s", s.listener.Addr())
httpServer.Serve(s.listener)
s.wg.Done()
}()
s.wg.Add(1)
s.started = true
}
// Stop is used by server.go to stop the rpc listener.
func (s *rpcServer) Stop() error {
if s.shutdown {
log.Infof("[RPCS] RPC server is already in the process of shutting down")
return nil
}
log.Warnf("[RPCS] RPC server shutting down")
err := s.listener.Close()
if err != nil {
log.Errorf("[RPCS] Problem shutting down rpc: %v", err)
return err
}
log.Infof("[RPCS] RPC server shutdown complete")
s.wg.Wait()
s.shutdown = true
return nil
}
// newRpcServer returns a new instance of the rpcServer struct.
func newRpcServer(s *server) (*rpcServer, error) {
rpc := rpcServer{
server: s,
}
// Get values from config
rpc.rpcport = cfg.RpcPort
rpc.username = cfg.RpcUser
rpc.password = cfg.RpcPass
listenAddr := net.JoinHostPort("", rpc.rpcport)
listener, err := net.Listen("tcp", listenAddr)
if err != nil {
log.Errorf("[RPCS] Couldn't create listener: %v", err)
return nil, err
}
rpc.listener = listener
return &rpc, err
}
// jsonRpcRead is the main function that handles reading messages, getting
// the data the message requests, and writing the reply.
func jsonRpcRead(w http.ResponseWriter, r *http.Request, s *rpcServer) {
_ = spew.Dump
r.Close = true
if s.shutdown == true {
return
}
var rawReply btcjson.Reply
body, err := btcjson.GetRaw(r.Body)
if err != nil {
log.Errorf("[RPCS] Error getting json message: %v", err)
return
}
var message btcjson.Message
err = json.Unmarshal(body, &message)
if err != nil {
log.Errorf("[RPCS] Error unmarshalling json message: %v", err)
jsonError := btcjson.Error{
Code: -32700,
Message: "Parse error",
}
rawReply = btcjson.Reply{
Result: nil,
Error: &jsonError,
Id: nil,
}
log.Tracef("[RPCS] reply: %v", rawReply)
msg, err := btcjson.MarshallAndSend(rawReply, w)
if err != nil {
log.Errorf(msg)
return
}
log.Debugf(msg)
return
}
log.Tracef("[RPCS] received: %v", message)
// Deal with commands
switch message.Method {
case "getblockcount":
_, maxidx, _ := s.server.db.NewestSha()
rawReply = btcjson.Reply{
Result: maxidx,
Error: nil,
Id: &message.Id,
}
// btcd does not do mining so we can hardcode replies here.
case "getgenerate":
rawReply = btcjson.Reply{
Result: false,
Error: nil,
Id: &message.Id,
}
case "setgenerate":
rawReply = btcjson.Reply{
Result: nil,
Error: nil,
Id: &message.Id,
}
case "gethashespersec":
rawReply = btcjson.Reply{
Result: 0,
Error: nil,
Id: &message.Id,
}
case "getblockhash":
var f interface{}
err = json.Unmarshal(body, &f)
m := f.(map[string]interface{})
var idx float64
for _, v := range m {
switch vv := v.(type) {
case []interface{}:
for _, u := range vv {
idx, _ = u.(float64)
}
default:
}
}
sha, err := s.server.db.FetchBlockShaByHeight(int64(idx))
if err != nil {
log.Errorf("[RCPS] Error getting block: %v", err)
jsonError := btcjson.Error{
Code: -1,
Message: "Block number out of range.",
}
rawReply = btcjson.Reply{
Result: nil,
Error: &jsonError,
Id: &message.Id,
}
log.Tracef("[RPCS] reply: %v", rawReply)
break
}
rawReply = btcjson.Reply{
Result: sha.String(),
Error: nil,
Id: &message.Id,
}
case "getblock":
var f interface{}
err = json.Unmarshal(body, &f)
m := f.(map[string]interface{})
var hash string
for _, v := range m {
switch vv := v.(type) {
case []interface{}:
for _, u := range vv {
hash, _ = u.(string)
}
default:
}
}
sha, err := btcwire.NewShaHashFromStr(hash)
if err != nil {
log.Errorf("[RPCS] Error generating sha: %v", err)
jsonError := btcjson.Error{
Code: -5,
Message: "Block not found",
}
rawReply = btcjson.Reply{
Result: nil,
Error: &jsonError,
Id: &message.Id,
}
log.Tracef("[RPCS] reply: %v", rawReply)
break
}
blk, err := s.server.db.FetchBlockBySha(sha)
if err != nil {
log.Errorf("[RPCS] Error fetching sha: %v", err)
jsonError := btcjson.Error{
Code: -5,
Message: "Block not found",
}
rawReply = btcjson.Reply{
Result: nil,
Error: &jsonError,
Id: &message.Id,
}
log.Tracef("[RPCS] reply: %v", rawReply)
break
}
idx := blk.Height()
buf, err := blk.Bytes()
if err != nil {
log.Errorf("[RPCS] Error fetching block: %v", err)
jsonError := btcjson.Error{
Code: -5,
Message: "Block not found",
}
rawReply = btcjson.Reply{
Result: nil,
Error: &jsonError,
Id: &message.Id,
}
log.Tracef("[RPCS] reply: %v", rawReply)
break
}
txList, _ := blk.TxShas()
txNames := make([]string, len(txList))
for i, v := range txList {
txNames[i] = v.String()
}
_, maxidx, err := s.server.db.NewestSha()
if err != nil {
log.Errorf("[RPCS] Cannot get newest sha: %v", err)
return
}
blockHeader := &blk.MsgBlock().Header
blockReply := btcjson.BlockResult{
Hash: hash,
Version: blockHeader.Version,
MerkleRoot: blockHeader.MerkleRoot.String(),
PreviousHash: blockHeader.PrevBlock.String(),
Nonce: blockHeader.Nonce,
Time: blockHeader.Timestamp.Unix(),
Confirmations: uint64(1 + maxidx - idx),
Height: idx,
Tx: txNames,
Size: len(buf),
Bits: strconv.FormatInt(int64(blockHeader.Bits), 16),
Difficulty: getDifficultyRatio(blockHeader.Bits),
}
// Get next block unless we are already at the top.
if idx < maxidx {
shaNext, err := s.server.db.FetchBlockShaByHeight(int64(idx + 1))
if err != nil {
log.Errorf("[RPCS] No next block: %v", err)
} else {
blockReply.NextHash = shaNext.String()
}
}
rawReply = btcjson.Reply{
Result: blockReply,
Error: nil,
Id: &message.Id,
}
case "getrawtransaction":
var f interface{}
err = json.Unmarshal(body, &f)
m := f.(map[string]interface{})
var tx string
var verbose float64
for _, v := range m {
switch vv := v.(type) {
case []interface{}:
for _, u := range vv {
switch uu := u.(type) {
case string:
tx = uu
case float64:
verbose = uu
default:
}
}
default:
}
}
if int(verbose) != 1 {
// Don't return details
// not used yet
} else {
txSha, _ := btcwire.NewShaHashFromStr(tx)
var txS *btcwire.MsgTx
txS, _, blksha, err := s.server.db.FetchTxBySha(txSha)
if err != nil {
log.Errorf("[RPCS] Error fetching tx: %v", err)
jsonError := btcjson.Error{
Code: -5,
Message: "No information available about transaction",
}
rawReply = btcjson.Reply{
Result: nil,
Error: &jsonError,
Id: &message.Id,
}
log.Tracef("[RPCS] reply: %v", rawReply)
break
}
blk, err := s.server.db.FetchBlockBySha(blksha)
if err != nil {
log.Errorf("[RPCS] Error fetching sha: %v", err)
jsonError := btcjson.Error{
Code: -5,
Message: "Block not found",
}
rawReply = btcjson.Reply{
Result: nil,
Error: &jsonError,
Id: &message.Id,
}
log.Tracef("[RPCS] reply: %v", rawReply)
break
}
idx := blk.Height()
txOutList := txS.TxOut
voutList := make([]btcjson.Vout, len(txOutList))
txInList := txS.TxIn
vinList := make([]btcjson.Vin, len(txInList))
for i, v := range txInList {
vinList[i].Sequence = float64(v.Sequence)
disbuf, _ := btcscript.DisasmString(v.SignatureScript)
vinList[i].ScriptSig.Asm = strings.Replace(disbuf, " ", "", -1)
vinList[i].Vout = i + 1
log.Debugf(disbuf)
}
for i, v := range txOutList {
voutList[i].N = i
voutList[i].Value = float64(v.Value) / 100000000
isbuf, _ := btcscript.DisasmString(v.PkScript)
voutList[i].ScriptPubKey.Asm = isbuf
voutList[i].ScriptPubKey.ReqSig = strings.Count(isbuf, "OP_CHECKSIG")
_, addr, err := btcscript.ScriptToAddress(v.PkScript)
if err != nil {
log.Errorf("[RPCS] Error getting address for %v: %v", txSha, err)
} else {
addrList := make([]string, 1)
addrList[0] = addr
voutList[i].ScriptPubKey.Addresses = addrList
}
}
_, maxidx, err := s.server.db.NewestSha()
if err != nil {
log.Errorf("[RPCS] Cannot get newest sha: %v", err)
return
}
confirmations := uint64(1 + maxidx - idx)
blockHeader := &blk.MsgBlock().Header
txReply := btcjson.TxRawResult{
Txid: tx,
Vout: voutList,
Vin: vinList,
Version: txS.Version,
LockTime: txS.LockTime,
// This is not a typo, they are identical in
// bitcoind as well.
Time: blockHeader.Timestamp.Unix(),
Blocktime: blockHeader.Timestamp.Unix(),
BlockHash: blksha.String(),
Confirmations: confirmations,
}
rawReply = btcjson.Reply{
Result: txReply,
Error: nil,
Id: &message.Id,
}
}
case "decoderawtransaction":
var f interface{}
err = json.Unmarshal(body, &f)
m := f.(map[string]interface{})
var hash string
for _, v := range m {
switch vv := v.(type) {
case []interface{}:
for _, u := range vv {
hash, _ = u.(string)
}
default:
}
}
spew.Dump(hash)
txReply := btcjson.TxRawDecodeResult{}
rawReply = btcjson.Reply{
Result: txReply,
Error: nil,
Id: &message.Id,
}
default:
jsonError := btcjson.Error{
Code: -32601,
Message: "Method not found",
}
rawReply = btcjson.Reply{
Result: nil,
Error: &jsonError,
Id: &message.Id,
}
}
msg, err := btcjson.MarshallAndSend(rawReply, w)
if err != nil {
log.Errorf(msg)
return
}
log.Debugf(msg)
return
}
// getDifficultyRatio returns the proof-of-work difficulty as a multiple of the
// minimum difficulty using the passed bits field from the header of a block.
func getDifficultyRatio(bits uint32) float64 {
// The minimum difficulty is the max possible proof-of-work limit bits
// converted back to a number. Note this is not the same as the the
// proof of work limit directly because the block difficulty is encoded
// in a block with the compact form which loses precision.
max := btcchain.CompactToBig(activeNetParams.powLimitBits)
target := btcchain.CompactToBig(bits)
difficulty := new(big.Rat).SetFrac(max, target)
outString := difficulty.FloatString(2)
diff, err := strconv.ParseFloat(outString, 64)
if err != nil {
log.Errorf("[RPCS] Cannot get difficulty: %v", err)
return 0
}
return diff
}

22
btcd/sample-btcd.conf Normal file
View file

@ -0,0 +1,22 @@
[Application Options]
; Debug logging level.
; Valid options are {trace, debug, info, warn, error, critical}
; debuglevel=info
; Use testnet.
; testnet=1
; Add as many specific space separated peers to connect to as desired.
; addpeer=192.168.1.1
; addpeer=10.0.0.2:8333
; addpeer=fe80::1
; addpeer=[fe80::2]:8333
; addpeer=192.168.1.1 10.0.0.2:8333 fe80::1 [fe80::2]:8333
; Maximum number of inbound and outbound peers.
; maxpeers=8
; How long to ban misbehaving peers. Valid time units are {s, m, h}.
; Minimum 1s.
; banduration=24h
; banduration=11h30m15s

434
btcd/server.go Normal file
View file

@ -0,0 +1,434 @@
// Copyright (c) 2013 Conformal Systems LLC.
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
package main
import (
"container/list"
"github.com/conformal/btcdb"
"github.com/conformal/btcwire"
"net"
"opensource.conformal.com/go/btcd-internal/addrmgr"
"sync"
"time"
)
// supportedServices describes which services are supported by the server.
const supportedServices = btcwire.SFNodeNetwork
// connectionRetryInterval is the amount of time to wait in between retries
// when connecting to persistent peers.
const connectionRetryInterval = time.Second * 10
// directionString is a helper function that returns a string that represents
// the direction of a connection (inbound or outbound).
func directionString(inbound bool) string {
if inbound {
return "inbound"
}
return "outbound"
}
// broadcastMsg provides the ability to house a bitcoin message to be broadcast
// to all connected peers except specified excluded peers.
type broadcastMsg struct {
message btcwire.Message
excludePeers []*peer
}
// server provides a bitcoin server for handling communications to and from
// bitcoin peers.
type server struct {
nonce uint64
listener net.Listener
btcnet btcwire.BitcoinNet
started bool
shutdown bool
shutdownSched bool
addrManager *addrmgr.AddrManager
rpcServer *rpcServer
blockManager *blockManager
newPeers chan *peer
donePeers chan *peer
banPeers chan *peer
broadcast chan broadcastMsg
wg sync.WaitGroup
quit chan bool
db btcdb.Db
}
// handleAddPeerMsg deals with adding new peers. It is invoked from the
// peerHandler goroutine.
func (s *server) handleAddPeerMsg(peers *list.List, banned map[string]time.Time, p *peer) {
// Ignore new peers if we're shutting down.
direction := directionString(p.inbound)
if s.shutdown {
log.Infof("[SRVR] New peer %s (%s) ignored - server is "+
"shutting down", p.conn.RemoteAddr(), direction)
p.Shutdown()
return
}
// Disconnect banned peers.
host, _, err := net.SplitHostPort(p.conn.RemoteAddr().String())
if err != nil {
log.Errorf("[SRVR] %v", err)
p.Shutdown()
return
}
if banEnd, ok := banned[host]; ok {
if time.Now().Before(banEnd) {
log.Debugf("[SRVR] Peer %s is banned for another %v - "+
"disconnecting", host, banEnd.Sub(time.Now()))
p.Shutdown()
return
}
log.Infof("[SRVR] Peer %s is no longer banned", host)
delete(banned, host)
}
// TODO: Check for max peers from a single IP.
// Limit max number of total peers.
if peers.Len() >= cfg.MaxPeers {
log.Infof("[SRVR] Max peers reached [%d] - disconnecting "+
"peer %s (%s)", cfg.MaxPeers, p.conn.RemoteAddr(),
direction)
p.Shutdown()
return
}
// Add the new peer and start it.
log.Infof("[SRVR] New peer %s (%s)", p.conn.RemoteAddr(), direction)
peers.PushBack(p)
p.Start()
}
// handleDonePeerMsg deals with peers that have signalled they are done. It is
// invoked from the peerHandler goroutine.
func (s *server) handleDonePeerMsg(peers *list.List, p *peer) {
direction := directionString(p.inbound)
for e := peers.Front(); e != nil; e = e.Next() {
if e.Value == p {
peers.Remove(e)
log.Infof("[SRVR] Removed peer %s (%s)",
p.conn.RemoteAddr(), direction)
// Issue an asynchronous reconnect if the peer was a
// persistent outbound connection.
if !p.inbound && p.persistent {
addr := p.conn.RemoteAddr().String()
s.ConnectPeerAsync(addr, true)
}
return
}
}
}
// handleBanPeerMsg deals with banning peers. It is invoked from the
// peerHandler goroutine.
func (s *server) handleBanPeerMsg(banned map[string]time.Time, p *peer) {
host, _, err := net.SplitHostPort(p.conn.RemoteAddr().String())
if err != nil {
log.Errorf("[SRVR] %v", err)
return
}
direction := directionString(p.inbound)
log.Infof("[SRVR] Banned peer %s (%s) for %v", host, direction,
cfg.BanDuration)
banned[host] = time.Now().Add(cfg.BanDuration)
}
// handleBroadcastMsg deals with broadcasting messages to peers. It is invoked
// from the peerHandler goroutine.
func (s *server) handleBroadcastMsg(peers *list.List, bmsg *broadcastMsg) {
for e := peers.Front(); e != nil; e = e.Next() {
excluded := false
for _, p := range bmsg.excludePeers {
if e.Value == p {
excluded = true
}
}
if !excluded {
p := e.Value.(*peer)
p.QueueMessage(bmsg.message)
}
}
}
// listenHandler is the main listener which accepts incoming connections for the
// server. It must be run as a goroutine.
func (s *server) listenHandler() {
log.Infof("[SRVR] Server listening on %s", s.listener.Addr())
for !s.shutdown {
conn, err := s.listener.Accept()
if err != nil {
// Only log the error if we're not forcibly shutting down.
if !s.shutdown {
log.Errorf("[SRVR] %v", err)
}
continue
}
s.AddPeer(newPeer(s, conn, true, false))
}
s.wg.Done()
log.Tracef("[SRVR] Listener handler done for %s", s.listener.Addr())
}
// peerHandler is used to handle peer operations such as adding and removing
// peers to and from the server, banning peers, and broadcasting messages to
// peers. It must be run a a goroutine.
func (s *server) peerHandler() {
log.Tracef("[SRVR] Starting peer handler for %s", s.listener.Addr())
peers := list.New()
bannedPeers := make(map[string]time.Time)
// Live while we're not shutting down or there are still connected peers.
for !s.shutdown || peers.Len() != 0 {
select {
// New peers connected to the server.
case p := <-s.newPeers:
s.handleAddPeerMsg(peers, bannedPeers, p)
// Disconnected peers.
case p := <-s.donePeers:
s.handleDonePeerMsg(peers, p)
// Peer to ban.
case p := <-s.banPeers:
s.handleBanPeerMsg(bannedPeers, p)
// Message to broadcast to all connected peers except those
// which are excluded by the message.
case bmsg := <-s.broadcast:
s.handleBroadcastMsg(peers, &bmsg)
// Shutdown the peer handler.
case <-s.quit:
// Shutdown peers.
for e := peers.Front(); e != nil; e = e.Next() {
p := e.Value.(*peer)
p.Shutdown()
}
}
}
s.wg.Done()
log.Tracef("[SRVR] Peer handler done on %s", s.listener.Addr())
}
// AddPeer adds a new peer that has already been connected to the server.
func (s *server) AddPeer(p *peer) {
s.newPeers <- p
}
// BanPeer bans a peer that has already been connected to the server by ip.
func (s *server) BanPeer(p *peer) {
s.banPeers <- p
}
// BroadcastMessage sends msg to all peers currently connected to the server
// except those in the passed peers to exclude.
func (s *server) BroadcastMessage(msg btcwire.Message, exclPeers ...*peer) {
// XXX: Need to determine if this is an alert that has already been
// broadcast and refrain from broadcasting again.
bmsg := broadcastMsg{message: msg, excludePeers: exclPeers}
s.broadcast <- bmsg
}
// ConnectPeerAsync attempts to asynchronously connect to addr. If successful,
// a new peer is created and added to the server's peer list.
func (s *server) ConnectPeerAsync(addr string, persistent bool) {
// Don't try to connect to a peer if the server is shutting down.
if s.shutdown {
return
}
go func() {
// Attempt to connect to the peer. If the connection fails and
// this is a persistent connection, retry after the retry
// interval.
for !s.shutdown {
log.Debugf("[SRVR] Attempting to connect to %s", addr)
conn, err := net.Dial("tcp", addr)
if err != nil {
log.Errorf("[SRVR] %v", err)
if !persistent {
return
}
log.Infof("[SRVR] Retrying connection to %s "+
"in %s", addr, connectionRetryInterval)
time.Sleep(connectionRetryInterval)
continue
}
// Connection was successful so log it and create a new
// peer.
log.Infof("[SRVR] Connected to %s", conn.RemoteAddr())
s.AddPeer(newPeer(s, conn, false, persistent))
return
}
}()
}
// Start begins accepting connections from peers.
func (s *server) Start() {
// Already started?
if s.started {
return
}
log.Trace("[SRVR] Starting server")
go s.listenHandler()
go s.peerHandler()
s.wg.Add(2)
s.addrManager.Start()
s.blockManager.Start()
if !cfg.DisableRpc {
s.rpcServer.Start()
}
s.started = true
}
// Stop gracefully shuts down the server by stopping and disconnecting all
// peers and the main listener.
func (s *server) Stop() error {
if s.shutdown {
log.Infof("[SRVR] Server is already in the process of shutting down")
return nil
}
log.Warnf("[SRVR] Server shutting down")
s.shutdown = true
s.quit <- true
if !cfg.DisableRpc {
s.rpcServer.Stop()
}
s.addrManager.Stop()
s.blockManager.Stop()
err := s.listener.Close()
if err != nil {
return err
}
return nil
}
// WaitForShutdown blocks until the main listener and peer handlers are stopped.
func (s *server) WaitForShutdown() {
s.wg.Wait()
log.Infof("[SRVR] Server shutdown complete")
}
// ScheduleShutdown schedules a server shutdown after the specified duration.
// It also dynamically adjusts how often to warn the server is going down based
// on remaining duration.
func (s *server) ScheduleShutdown(duration time.Duration) {
// Don't schedule shutdown more than once.
if s.shutdownSched {
return
}
log.Warnf("[SRVR] Server shutdown in %v", duration)
go func() {
remaining := duration
tickDuration := dynamicTickDuration(remaining)
done := time.After(remaining)
ticker := time.NewTicker(tickDuration)
out:
for {
select {
case <-done:
ticker.Stop()
s.Stop()
break out
case <-ticker.C:
remaining = remaining - tickDuration
if remaining < time.Second {
continue
}
// Change tick duration dynamically based on remaining time.
newDuration := dynamicTickDuration(remaining)
if tickDuration != newDuration {
tickDuration = newDuration
ticker.Stop()
ticker = time.NewTicker(tickDuration)
}
log.Warnf("[SRVR] Server shutdown in %v", remaining)
}
}
}()
s.shutdownSched = true
}
// newServer returns a new btcd server configured to listen on addr for the
// bitcoin network type specified in btcnet. Use start to begin accepting
// connections from peers.
func newServer(addr string, db btcdb.Db, btcnet btcwire.BitcoinNet) (*server, error) {
nonce, err := btcwire.RandomUint64()
if err != nil {
return nil, err
}
listener, err := net.Listen("tcp", addr)
if err != nil {
return nil, err
}
s := server{
nonce: nonce,
listener: listener,
btcnet: btcnet,
addrManager: addrmgr.New(),
newPeers: make(chan *peer, cfg.MaxPeers),
donePeers: make(chan *peer, cfg.MaxPeers),
banPeers: make(chan *peer, cfg.MaxPeers),
broadcast: make(chan broadcastMsg, cfg.MaxPeers),
quit: make(chan bool),
db: db,
}
s.blockManager = newBlockManager(&s)
log.Infof("[BMGR] Generating initial block node index. This may " +
"take a while...")
err = s.blockManager.blockChain.GenerateInitialIndex()
if err != nil {
return nil, err
}
log.Infof("[BMGR] Block index generation complete")
if !cfg.DisableRpc {
s.rpcServer, err = newRpcServer(&s)
if err != nil {
return nil, err
}
}
return &s, nil
}
// dynamicTickDuration is a convenience function used to dynamically choose a
// tick duration based on remaining time. It is primarily used during
// server shutdown to make shutdown warnings more frequent as the shutdown time
// approaches.
func dynamicTickDuration(remaining time.Duration) time.Duration {
switch {
case remaining <= time.Second*5:
return time.Second
case remaining <= time.Second*15:
return time.Second * 5
case remaining <= time.Minute:
return time.Second * 15
case remaining <= time.Minute*5:
return time.Minute
case remaining <= time.Minute*15:
return time.Minute * 5
case remaining <= time.Hour:
return time.Minute * 15
}
return time.Hour
}

36
btcd/signal.go Normal file
View file

@ -0,0 +1,36 @@
// Copyright (c) 2013 Conformal Systems LLC.
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
package main
import (
"os"
"os/signal"
)
// interruptChannel is used to receive SIGINT (Ctrl+C) signals.
var interruptChannel chan os.Signal
// interruptCallbacks is a list of callbacks to invoke when a SIGINT (Ctrl+C) is
// received.
var interruptCallbacks []func()
// addInterruptHandler adds a handler to call when a SIGINT (Ctrl+C) is
// received.
func addInterruptHandler(handler func()) {
// Create the channel and start the main interrupt handler which invokes
// all other callbacks and exits if not already done.
if interruptChannel == nil {
interruptChannel = make(chan os.Signal, 1)
signal.Notify(interruptChannel, os.Interrupt)
go func() {
<-interruptChannel
for _, callback := range interruptCallbacks {
callback()
}
os.Exit(0)
}()
}
interruptCallbacks = append(interruptCallbacks, handler)
}