Implement getblocktemplate long poll support.

This commit implements the long polling portion of the getblocktemplate
RPC as defined by BIP0022.  Per the specification, each block template is
returned with a longpollid which can be used in a subsequent
getblocktemplate request to keep the connection open until the server
determines the block template associated with the longpollid should be
replaced with a new one.

This is work towards #124.
This commit is contained in:
Dave Collins 2014-06-27 14:12:22 -05:00
parent eb7ecdcc22
commit fc5656894d
3 changed files with 279 additions and 9 deletions

View file

@ -622,6 +622,14 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) {
// a reorg. // a reorg.
newestSha, newestHeight, _ := b.server.db.NewestSha() newestSha, newestHeight, _ := b.server.db.NewestSha()
b.updateChainState(newestSha, newestHeight) b.updateChainState(newestSha, newestHeight)
// Allow any clients performing long polling via the
// getblocktemplate RPC to be notified when the new block causes
// their old block template to become stale.
rpcServer := b.server.rpcServer
if rpcServer != nil {
rpcServer.gbtWorkState.NotifyBlockConnected(blockSha)
}
} }
// Sync the db to disk. // Sync the db to disk.

View file

@ -912,9 +912,13 @@ func (mp *txMemPool) maybeAcceptTransaction(tx *btcutil.Tx, isOrphan *bool, isNe
txmpLog.Debugf("Accepted transaction %v (pool size: %v)", txHash, txmpLog.Debugf("Accepted transaction %v (pool size: %v)", txHash,
len(mp.pool)) len(mp.pool))
// Notify websocket clients about mempool transactions.
if mp.server.rpcServer != nil { if mp.server.rpcServer != nil {
// Notify websocket clients about mempool transactions.
mp.server.rpcServer.ntfnMgr.NotifyMempoolTx(tx, isNew) mp.server.rpcServer.ntfnMgr.NotifyMempoolTx(tx, isNew)
// Potentially notify any getblocktemplate long poll clients
// about stale block templates due to the new transaction.
mp.server.rpcServer.gbtWorkState.NotifyMempoolTx(mp.lastUpdated)
} }
return nil return nil

View file

@ -21,6 +21,7 @@ import (
"net/http" "net/http"
"os" "os"
"strconv" "strconv"
"strings"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
@ -228,12 +229,15 @@ type gbtWorkState struct {
prevHash *btcwire.ShaHash prevHash *btcwire.ShaHash
minTimestamp time.Time minTimestamp time.Time
template *BlockTemplate template *BlockTemplate
notifyMap map[btcwire.ShaHash]map[int64]chan struct{}
} }
// newGbtWorkState returns a new instance of a gbtWorkState with all internal // newGbtWorkState returns a new instance of a gbtWorkState with all internal
// fields initialized and ready to use. // fields initialized and ready to use.
func newGbtWorkState() *gbtWorkState { func newGbtWorkState() *gbtWorkState {
return &gbtWorkState{} return &gbtWorkState{
notifyMap: make(map[btcwire.ShaHash]map[int64]chan struct{}),
}
} }
// rpcServer holds the items the rpc server may need to access (config, // rpcServer holds the items the rpc server may need to access (config,
@ -1242,6 +1246,146 @@ func handleGetBlockHash(s *rpcServer, cmd btcjson.Cmd, closeChan <-chan struct{}
return sha.String(), nil return sha.String(), nil
} }
// encodeTemplateID encodes the passed details into an ID that can be used to
// uniquely identify a block template.
func encodeTemplateID(prevHash *btcwire.ShaHash, lastGenerated time.Time) string {
return fmt.Sprintf("%s-%d", prevHash.String(), lastGenerated.Unix())
}
// decodeTemplateID decodes an ID that is used to uniquely identify a block
// template. This is mainly used as a mechanism to track when to update clients
// that are using long polling for block templates. The ID consists of the
// previous block hash for the associated template and the time the associated
// template was generated.
func decodeTemplateID(templateID string) (*btcwire.ShaHash, int64, error) {
fields := strings.Split(templateID, "-")
if len(fields) != 2 {
return nil, 0, errors.New("invalid longpollid format")
}
prevHash, err := btcwire.NewShaHashFromStr(fields[0])
if err != nil {
return nil, 0, errors.New("invalid longpollid format")
}
lastGenerated, err := strconv.Atoi(fields[1])
if err != nil {
return nil, 0, errors.New("invalid longpollid format")
}
return prevHash, int64(lastGenerated), nil
}
// notifyLongPollers notifies any channels that have been registered to be
// notified when block templates are stale.
//
// This function MUST be called with the state locked.
func (state *gbtWorkState) notifyLongPollers(latestHash *btcwire.ShaHash, lastGenerated time.Time) {
// Notify anything that is waiting for a block template update from a
// hash which is not the hash of the tip of the best chain since their
// work is now invalid.
for hash, channels := range state.notifyMap {
if !hash.IsEqual(latestHash) {
for _, c := range channels {
close(c)
}
delete(state.notifyMap, hash)
}
}
// Return now if the provided last generated timestamp has not been
// initialized.
if lastGenerated.IsZero() {
return
}
// Return now if there is nothing registered for updates to the current
// best block hash.
channels, ok := state.notifyMap[*latestHash]
if !ok {
return
}
// Notify anything that is waiting for a block template update from a
// block template generated before the most recently generated block
// template.
lastGeneratedUnix := lastGenerated.Unix()
for lastGen, c := range channels {
if lastGen < lastGeneratedUnix {
close(c)
delete(channels, lastGen)
}
}
// Remove the entry altogether if there are no more registered
// channels.
if len(channels) == 0 {
delete(state.notifyMap, *latestHash)
}
}
// NotifyBlockConnected uses the newly-connected block to notify any long poll
// clients with a new block template when their existing block template is
// stale due to the newly connected block.
func (state *gbtWorkState) NotifyBlockConnected(blockSha *btcwire.ShaHash) {
go func() {
state.Lock()
defer state.Unlock()
state.notifyLongPollers(blockSha, state.lastTxUpdate)
}()
}
// NotifyMempoolTx uses the new last updated time for the transaction memory
// pool to notify any long poll clients with a new block template when their
// existing block template is stale due to enough time passing and the contents
// of the memory pool changing.
func (state *gbtWorkState) NotifyMempoolTx(lastUpdated time.Time) {
go func() {
state.Lock()
defer state.Unlock()
// No need to notify anything if no block templates have been generated
// yet.
if state.prevHash == nil || state.lastGenerated.IsZero() {
return
}
if time.Now().After(state.lastGenerated.Add(time.Second *
gbtRegenerateSeconds)) {
state.notifyLongPollers(state.prevHash, lastUpdated)
}
}()
}
// templateUpdateChan returns a channel that will be closed once the block
// template associated with the passed previous hash and last generated time
// is stale. The function will return existing channels for duplicate
// parameters which allows multiple clients to wait for the same block template
// without requiring a different channel for each client.
//
// This function MUST be called with the state locked.
func (state *gbtWorkState) templateUpdateChan(prevHash *btcwire.ShaHash, lastGenerated int64) chan struct{} {
// Either get the current list of channels waiting for updates about
// changes to block template for the previous hash or create a new one.
channels, ok := state.notifyMap[*prevHash]
if !ok {
m := make(map[int64]chan struct{})
state.notifyMap[*prevHash] = m
channels = m
}
// Get the current channel associated with the time the block template
// was last generated or create a new one.
c, ok := channels[lastGenerated]
if !ok {
c = make(chan struct{})
channels[lastGenerated] = c
}
return c
}
// updateBlockTemplate creates or updates a block template for the work state. // updateBlockTemplate creates or updates a block template for the work state.
// A new block template will be generated when the current best block has // A new block template will be generated when the current best block has
// changed or the transactions in the memory pool have been updated and it has // changed or the transactions in the memory pool have been updated and it has
@ -1332,6 +1476,10 @@ func (state *gbtWorkState) updateBlockTemplate(s *rpcServer, useCoinbaseValue bo
"target %s, merkle root %s)", "target %s, merkle root %s)",
msgBlock.Header.Timestamp, targetDifficulty, msgBlock.Header.Timestamp, targetDifficulty,
msgBlock.Header.MerkleRoot) msgBlock.Header.MerkleRoot)
// Notify any clients that are long polling about the new
// template.
state.notifyLongPollers(latestHash, lastTxUpdate)
} else { } else {
// At this point, there is a saved block template and another // At this point, there is a saved block template and another
// request for a template was made, but either the available // request for a template was made, but either the available
@ -1392,7 +1540,7 @@ func (state *gbtWorkState) updateBlockTemplate(s *rpcServer, useCoinbaseValue bo
// and returned to the caller. // and returned to the caller.
// //
// This function MUST be called with the state locked. // This function MUST be called with the state locked.
func (state *gbtWorkState) blockTemplateResult(useCoinbaseValue bool) (*btcjson.GetBlockTemplateResult, error) { func (state *gbtWorkState) blockTemplateResult(useCoinbaseValue bool, submitOld *bool) (*btcjson.GetBlockTemplateResult, error) {
// Convert each transaction in the block template to a template result // Convert each transaction in the block template to a template result
// transaction. The result does not include the coinbase, so notice // transaction. The result does not include the coinbase, so notice
// the adjustments to the various lengths and indices. // the adjustments to the various lengths and indices.
@ -1448,6 +1596,7 @@ func (state *gbtWorkState) blockTemplateResult(useCoinbaseValue bool) (*btcjson.
// Generate the block template reply. // Generate the block template reply.
header := &msgBlock.Header header := &msgBlock.Header
templateID := encodeTemplateID(state.prevHash, state.lastGenerated)
reply := btcjson.GetBlockTemplateResult{ reply := btcjson.GetBlockTemplateResult{
Bits: strconv.FormatInt(int64(header.Bits), 16), Bits: strconv.FormatInt(int64(header.Bits), 16),
CurTime: time.Now().Unix(), CurTime: time.Now().Unix(),
@ -1457,6 +1606,9 @@ func (state *gbtWorkState) blockTemplateResult(useCoinbaseValue bool) (*btcjson.
SizeLimit: btcwire.MaxBlockPayload, SizeLimit: btcwire.MaxBlockPayload,
Transactions: transactions, Transactions: transactions,
Version: header.Version, Version: header.Version,
LongPollID: templateID,
SubmitOld: submitOld,
MinTime: state.minTimestamp.Unix(),
} }
if useCoinbaseValue { if useCoinbaseValue {
reply.CoinbaseAux = gbtCoinbaseAux reply.CoinbaseAux = gbtCoinbaseAux
@ -1499,12 +1651,110 @@ func (state *gbtWorkState) blockTemplateResult(useCoinbaseValue bool) (*btcjson.
return &reply, nil return &reply, nil
} }
// handleGetBlockTemplateLongPoll a helper for handleGetBlockTemplateRequest
// which deals with handling long polling for block templates. When a caller
// sends a request with a long poll ID that was previously returned, a response
// is not sent until the caller should stop working on the previous block
// template in favor of the new one. In particular, this is the case when the
// old block template is no longer valid due to a solution already being found
// and added to the block chain, or new transactions have shown up and some time
// has passed without finding a solution.
//
// See https://en.bitcoin.it/wiki/BIP_0022 for more details.
func handleGetBlockTemplateLongPoll(s *rpcServer, longPollID string, useCoinbaseValue bool, closeChan <-chan struct{}) (interface{}, error) {
state := s.gbtWorkState
state.Lock()
// The state unlock is intentionally not deferred here since it needs to
// be manually unlocked before waiting for a notification about block
// template changes.
if err := state.updateBlockTemplate(s, useCoinbaseValue); err != nil {
state.Unlock()
return nil, err
}
// Just return the current block template if the the long poll ID
// provided by the caller is invalid.
prevHash, lastGenerated, err := decodeTemplateID(longPollID)
if err != nil {
result, err := state.blockTemplateResult(useCoinbaseValue, nil)
if err != nil {
state.Unlock()
return nil, err
}
state.Unlock()
return result, nil
}
// Return the block template now if the specific block template
// identified by the long poll ID no longer matches the current block
// template as this means the provided template is stale.
prevTemplateHash := &state.template.block.Header.PrevBlock
if !prevHash.IsEqual(prevTemplateHash) ||
lastGenerated != state.lastGenerated.Unix() {
// Include whether or not it is valid to submit work against the
// old block template depending on whether or not a solution has
// already been found and added to the block chain.
submitOld := prevHash.IsEqual(prevTemplateHash)
result, err := state.blockTemplateResult(useCoinbaseValue,
&submitOld)
if err != nil {
state.Unlock()
return nil, err
}
state.Unlock()
return result, nil
}
// Register the previous hash and last generated time for notifications
// Get a channel that will be notified when the template associated with
// the provided ID is is stale and a new block template should be
// returned to the caller.
longPollChan := state.templateUpdateChan(prevHash, lastGenerated)
state.Unlock()
select {
// When the client closes before it's time to send a reply, just return
// now so the goroutine doesn't hang around.
case <-closeChan:
return nil, ErrClientQuit
// Wait until signal received to send the reply.
case <-longPollChan:
// Fallthrough
}
// Get the lastest block template
state.Lock()
defer state.Unlock()
if err := state.updateBlockTemplate(s, useCoinbaseValue); err != nil {
return nil, err
}
// Include whether or not it is valid to submit work against the old
// block template depending on whether or not a solution has already
// been found and added to the block chain.
submitOld := prevHash.IsEqual(&state.template.block.Header.PrevBlock)
result, err := state.blockTemplateResult(useCoinbaseValue, &submitOld)
if err != nil {
return nil, err
}
return result, nil
}
// handleGetBlockTemplateRequest is a helper for handleGetBlockTemplate which // handleGetBlockTemplateRequest is a helper for handleGetBlockTemplate which
// deals with generating and returning block templates to the caller. It // deals with generating and returning block templates to the caller. It
// detects the capabilities reported by the caller in regards to whether or not // handles both long poll requests as specified by BIP 0022 as well as regular
// it supports creating its own coinbase (the coinbasetxn and coinbasevalue // requests. In addition, it detects the capabilities reported by the caller
// capabilities) and modifies the returned block template accordingly. // in regards to whether or not it supports creating its own coinbase (the
func handleGetBlockTemplateRequest(s *rpcServer, request *btcjson.TemplateRequest) (interface{}, error) { // coinbasetxn and coinbasevalue capabilities) and modifies the returned block
// template accordingly.
func handleGetBlockTemplateRequest(s *rpcServer, request *btcjson.TemplateRequest, closeChan <-chan struct{}) (interface{}, error) {
// Extract the relevant passed capabilities and restrict the result to // Extract the relevant passed capabilities and restrict the result to
// either a coinbase value or a coinbase transaction object depending on // either a coinbase value or a coinbase transaction object depending on
// the request. Default to only providing a coinbase value. // the request. Default to only providing a coinbase value.
@ -1550,6 +1800,14 @@ func handleGetBlockTemplateRequest(s *rpcServer, request *btcjson.TemplateReques
return nil, btcjson.ErrClientInInitialDownload return nil, btcjson.ErrClientInInitialDownload
} }
// When a long poll ID was provided, this is a long poll request by the
// client to be notified when block template referenced by the ID should
// be replaced with a new one.
if request != nil && request.LongPollID != "" {
return handleGetBlockTemplateLongPoll(s, request.LongPollID,
useCoinbaseValue, closeChan)
}
// Protect concurrent access when updating block templates. // Protect concurrent access when updating block templates.
state := s.gbtWorkState state := s.gbtWorkState
state.Lock() state.Lock()
@ -1564,7 +1822,7 @@ func handleGetBlockTemplateRequest(s *rpcServer, request *btcjson.TemplateReques
if err := state.updateBlockTemplate(s, useCoinbaseValue); err != nil { if err := state.updateBlockTemplate(s, useCoinbaseValue); err != nil {
return nil, err return nil, err
} }
return state.blockTemplateResult(useCoinbaseValue) return state.blockTemplateResult(useCoinbaseValue, nil)
} }
// handleGetBlockTemplate implements the getblocktemplate command. // handleGetBlockTemplate implements the getblocktemplate command.
@ -1584,7 +1842,7 @@ func handleGetBlockTemplate(s *rpcServer, cmd btcjson.Cmd, closeChan <-chan stru
// make other modes easier to implement. // make other modes easier to implement.
switch mode { switch mode {
case "template": case "template":
return handleGetBlockTemplateRequest(s, request) return handleGetBlockTemplateRequest(s, request, closeChan)
} }
return nil, btcjson.Error{ return nil, btcjson.Error{