From 19b42c3c9b8f0b14c9e2b1638a39bda1a64fb46e Mon Sep 17 00:00:00 2001 From: Jim Posen Date: Fri, 11 Aug 2017 13:17:54 -0700 Subject: [PATCH] Allow multiple subscribers to blockchain notifications. The BlockChain struct emits notifications for various events, but it is only possible to register one listener. This changes the interface and implementations to allow multiple listeners. --- blockchain/chain.go | 17 ++++------- blockchain/notifications.go | 19 ++++++++---- blockchain/notifications_test.go | 50 ++++++++++++++++++++++++++++++++ blockmanager.go | 17 ++++++----- 4 files changed, 79 insertions(+), 24 deletions(-) create mode 100644 blockchain/notifications_test.go diff --git a/blockchain/chain.go b/blockchain/chain.go index c302f481..389bcaec 100644 --- a/blockchain/chain.go +++ b/blockchain/chain.go @@ -81,7 +81,6 @@ type BlockChain struct { db database.DB chainParams *chaincfg.Params timeSource MedianTimeSource - notifications NotificationCallback sigCache *txscript.SigCache indexManager IndexManager hashCache *txscript.HashCache @@ -168,6 +167,11 @@ type BlockChain struct { // being mined. unknownRulesWarned bool unknownVersionsWarned bool + + // The notifications field stores a slice of callbacks to be executed on + // certain blockchain events. + notificationsLock sync.RWMutex + notifications []NotificationCallback } // DisableVerify provides a mechanism to disable transaction script validation @@ -1308,15 +1312,6 @@ type Config struct { // time is adjusted to be in agreement with other peers. TimeSource MedianTimeSource - // Notifications defines a callback to which notifications will be sent - // when various events take place. See the documentation for - // Notification and NotificationType for details on the types and - // contents of notifications. - // - // This field can be nil if the caller is not interested in receiving - // notifications. - Notifications NotificationCallback - // SigCache defines a signature cache to use when when validating // signatures. This is typically most useful when individual // transactions are already being validated prior to their inclusion in @@ -1385,7 +1380,7 @@ func New(config *Config) (*BlockChain, error) { db: config.DB, chainParams: params, timeSource: config.TimeSource, - notifications: config.Notifications, + notifications: make([]NotificationCallback, 0), sigCache: config.SigCache, indexManager: config.IndexManager, minRetargetTimespan: targetTimespan / adjustmentFactor, diff --git a/blockchain/notifications.go b/blockchain/notifications.go index e55ed263..728b2224 100644 --- a/blockchain/notifications.go +++ b/blockchain/notifications.go @@ -58,16 +58,25 @@ type Notification struct { Data interface{} } +// Subscribe to block chain notifications. Registers a callback to be executed +// when various events take place. See the documentation on Notification and +// NotificationType for details on the types and contents of notifications. +func (b *BlockChain) Subscribe(callback NotificationCallback) { + b.notificationsLock.Lock() + b.notifications = append(b.notifications, callback) + b.notificationsLock.Unlock() +} + // sendNotification sends a notification with the passed type and data if the // caller requested notifications by providing a callback function in the call // to New. func (b *BlockChain) sendNotification(typ NotificationType, data interface{}) { - // Ignore it if the caller didn't request notifications. - if b.notifications == nil { - return - } + b.notificationsLock.RLock() + defer b.notificationsLock.RUnlock() // Generate and send the notification. n := Notification{Type: typ, Data: data} - b.notifications(&n) + for _, callback := range b.notifications { + callback(&n) + } } diff --git a/blockchain/notifications_test.go b/blockchain/notifications_test.go new file mode 100644 index 00000000..50467a3a --- /dev/null +++ b/blockchain/notifications_test.go @@ -0,0 +1,50 @@ +// Copyright (c) 2017 The btcsuite developers +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +package blockchain_test + +import ( + "testing" + "time" + + "github.com/btcsuite/btcd/blockchain" + "github.com/btcsuite/btcd/chaincfg" +) + +// Test that notification callbacks are fired on events. +func TestNotifications(t *testing.T) { + blocks, err := loadBlocks("blk_0_to_4.dat.bz2") + if err != nil { + t.Fatalf("Error loading file: %v\n", err) + } + + // Create a new database and chain instance to run tests against. + chain, teardownFunc, err := + chainSetup("notifications", &chaincfg.MainNetParams) + if err != nil { + t.Fatalf("Failed to setup chain instance: %v", err) + } + defer teardownFunc() + + notifications := make(chan blockchain.Notification) + chain.Subscribe(func(notification *blockchain.Notification) { + go func() { + notifications <- *notification + }() + }) + + _, _, err = chain.ProcessBlock(blocks[1], blockchain.BFNone) + if err != nil { + t.Fatalf("ProcessBlock fail on block 1: %v\n", err) + } + + select { + case notification := <-notifications: + if notification.Type != blockchain.NTBlockAccepted { + t.Errorf("Expected NTBlockAccepted notification, got %v", notification.Type) + } + case <-time.After(time.Second): + t.Error("Expected blockchain notification callback to fire") + } +} diff --git a/blockmanager.go b/blockmanager.go index 50bc7a2d..201ac3ce 100644 --- a/blockmanager.go +++ b/blockmanager.go @@ -1466,18 +1466,19 @@ func newBlockManager(s *server, indexManager blockchain.IndexManager) (*blockMan // Create a new block chain instance with the appropriate configuration. var err error bm.chain, err = blockchain.New(&blockchain.Config{ - DB: s.db, - ChainParams: s.chainParams, - Checkpoints: checkpoints, - TimeSource: s.timeSource, - Notifications: bm.handleNotifyMsg, - SigCache: s.sigCache, - IndexManager: indexManager, - HashCache: s.hashCache, + DB: s.db, + ChainParams: s.chainParams, + Checkpoints: checkpoints, + TimeSource: s.timeSource, + SigCache: s.sigCache, + IndexManager: indexManager, + HashCache: s.hashCache, }) if err != nil { return nil, err } + bm.chain.Subscribe(bm.handleNotifyMsg) + best := bm.chain.BestSnapshot() if !cfg.DisableCheckpoints { // Initialize the next checkpoint based on the current height.