Allow multiple subscribers to blockchain notifications.

The BlockChain struct emits notifications for various events, but
it is only possible to register one listener. This changes the
interface and implementations to allow multiple listeners.
This commit is contained in:
Jim Posen 2017-08-11 13:17:54 -07:00 committed by Dave Collins
parent bc36cf51c6
commit 19b42c3c9b
4 changed files with 79 additions and 24 deletions

View file

@ -81,7 +81,6 @@ type BlockChain struct {
db database.DB
chainParams *chaincfg.Params
timeSource MedianTimeSource
notifications NotificationCallback
sigCache *txscript.SigCache
indexManager IndexManager
hashCache *txscript.HashCache
@ -168,6 +167,11 @@ type BlockChain struct {
// being mined.
unknownRulesWarned bool
unknownVersionsWarned bool
// The notifications field stores a slice of callbacks to be executed on
// certain blockchain events.
notificationsLock sync.RWMutex
notifications []NotificationCallback
}
// DisableVerify provides a mechanism to disable transaction script validation
@ -1308,15 +1312,6 @@ type Config struct {
// time is adjusted to be in agreement with other peers.
TimeSource MedianTimeSource
// Notifications defines a callback to which notifications will be sent
// when various events take place. See the documentation for
// Notification and NotificationType for details on the types and
// contents of notifications.
//
// This field can be nil if the caller is not interested in receiving
// notifications.
Notifications NotificationCallback
// SigCache defines a signature cache to use when when validating
// signatures. This is typically most useful when individual
// transactions are already being validated prior to their inclusion in
@ -1385,7 +1380,7 @@ func New(config *Config) (*BlockChain, error) {
db: config.DB,
chainParams: params,
timeSource: config.TimeSource,
notifications: config.Notifications,
notifications: make([]NotificationCallback, 0),
sigCache: config.SigCache,
indexManager: config.IndexManager,
minRetargetTimespan: targetTimespan / adjustmentFactor,

View file

@ -58,16 +58,25 @@ type Notification struct {
Data interface{}
}
// Subscribe to block chain notifications. Registers a callback to be executed
// when various events take place. See the documentation on Notification and
// NotificationType for details on the types and contents of notifications.
func (b *BlockChain) Subscribe(callback NotificationCallback) {
b.notificationsLock.Lock()
b.notifications = append(b.notifications, callback)
b.notificationsLock.Unlock()
}
// sendNotification sends a notification with the passed type and data if the
// caller requested notifications by providing a callback function in the call
// to New.
func (b *BlockChain) sendNotification(typ NotificationType, data interface{}) {
// Ignore it if the caller didn't request notifications.
if b.notifications == nil {
return
}
b.notificationsLock.RLock()
defer b.notificationsLock.RUnlock()
// Generate and send the notification.
n := Notification{Type: typ, Data: data}
b.notifications(&n)
for _, callback := range b.notifications {
callback(&n)
}
}

View file

@ -0,0 +1,50 @@
// Copyright (c) 2017 The btcsuite developers
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
package blockchain_test
import (
"testing"
"time"
"github.com/btcsuite/btcd/blockchain"
"github.com/btcsuite/btcd/chaincfg"
)
// Test that notification callbacks are fired on events.
func TestNotifications(t *testing.T) {
blocks, err := loadBlocks("blk_0_to_4.dat.bz2")
if err != nil {
t.Fatalf("Error loading file: %v\n", err)
}
// Create a new database and chain instance to run tests against.
chain, teardownFunc, err :=
chainSetup("notifications", &chaincfg.MainNetParams)
if err != nil {
t.Fatalf("Failed to setup chain instance: %v", err)
}
defer teardownFunc()
notifications := make(chan blockchain.Notification)
chain.Subscribe(func(notification *blockchain.Notification) {
go func() {
notifications <- *notification
}()
})
_, _, err = chain.ProcessBlock(blocks[1], blockchain.BFNone)
if err != nil {
t.Fatalf("ProcessBlock fail on block 1: %v\n", err)
}
select {
case notification := <-notifications:
if notification.Type != blockchain.NTBlockAccepted {
t.Errorf("Expected NTBlockAccepted notification, got %v", notification.Type)
}
case <-time.After(time.Second):
t.Error("Expected blockchain notification callback to fire")
}
}

View file

@ -1466,18 +1466,19 @@ func newBlockManager(s *server, indexManager blockchain.IndexManager) (*blockMan
// Create a new block chain instance with the appropriate configuration.
var err error
bm.chain, err = blockchain.New(&blockchain.Config{
DB: s.db,
ChainParams: s.chainParams,
Checkpoints: checkpoints,
TimeSource: s.timeSource,
Notifications: bm.handleNotifyMsg,
SigCache: s.sigCache,
IndexManager: indexManager,
HashCache: s.hashCache,
DB: s.db,
ChainParams: s.chainParams,
Checkpoints: checkpoints,
TimeSource: s.timeSource,
SigCache: s.sigCache,
IndexManager: indexManager,
HashCache: s.hashCache,
})
if err != nil {
return nil, err
}
bm.chain.Subscribe(bm.handleNotifyMsg)
best := bm.chain.BestSnapshot()
if !cfg.DisableCheckpoints {
// Initialize the next checkpoint based on the current height.