Merge pull request #567 from wpaulino/wtxmgr-migrations

wallet: add atomic migration logic for sub-buckets
This commit is contained in:
Olaoluwa Osuntokun 2018-11-07 16:36:42 +11:00 committed by GitHub
commit ea4b832693
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 973 additions and 317 deletions

View file

@ -11,20 +11,17 @@ import (
"fmt" "fmt"
"time" "time"
"github.com/btcsuite/btcd/chaincfg"
"github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcwallet/walletdb" "github.com/btcsuite/btcwallet/walletdb"
) )
const (
// LatestMgrVersion is the most recent manager version.
LatestMgrVersion = 5
)
var ( var (
// LatestMgrVersion is the most recent manager version.
LatestMgrVersion = getLatestVersion()
// latestMgrVersion is the most recent manager version as a variable so // latestMgrVersion is the most recent manager version as a variable so
// the tests can change it to force errors. // the tests can change it to force errors.
latestMgrVersion uint32 = LatestMgrVersion latestMgrVersion = LatestMgrVersion
) )
// ObtainUserInputFunc is a function that reads a user input and returns it as // ObtainUserInputFunc is a function that reads a user input and returns it as
@ -2103,213 +2100,3 @@ func createManagerNS(ns walletdb.ReadWriteBucket,
return nil return nil
} }
// upgradeToVersion2 upgrades the database from version 1 to version 2
// 'usedAddrBucketName' a bucket for storing addrs flagged as marked is
// initialized and it will be updated on the next rescan.
func upgradeToVersion2(ns walletdb.ReadWriteBucket) error {
currentMgrVersion := uint32(2)
_, err := ns.CreateBucketIfNotExists(usedAddrBucketName)
if err != nil {
str := "failed to create used addresses bucket"
return managerError(ErrDatabase, str, err)
}
return putManagerVersion(ns, currentMgrVersion)
}
// upgradeManager upgrades the data in the provided manager namespace to newer
// versions as neeeded.
func upgradeManager(db walletdb.DB, namespaceKey []byte, pubPassPhrase []byte,
chainParams *chaincfg.Params, cbs *OpenCallbacks) error {
var version uint32
err := walletdb.View(db, func(tx walletdb.ReadTx) error {
ns := tx.ReadBucket(namespaceKey)
var err error
version, err = fetchManagerVersion(ns)
return err
})
if err != nil {
str := "failed to fetch version for update"
return managerError(ErrDatabase, str, err)
}
if version < 5 {
err := walletdb.Update(db, func(tx walletdb.ReadWriteTx) error {
ns := tx.ReadWriteBucket(namespaceKey)
return upgradeToVersion5(ns, pubPassPhrase)
})
if err != nil {
return err
}
// The manager is now at version 5.
version = 5
}
// Ensure the manager is upraded to the latest version. This check is
// to intentionally cause a failure if the manager version is updated
// without writing code to handle the upgrade.
if version < latestMgrVersion {
str := fmt.Sprintf("the latest manager version is %d, but the "+
"current version after upgrades is only %d",
latestMgrVersion, version)
return managerError(ErrUpgrade, str, nil)
}
return nil
}
// upgradeToVersion5 upgrades the database from version 4 to version 5. After
// this update, the new ScopedKeyManager features cannot be used. This is due
// to the fact that in version 5, we now store the encrypted master private
// keys on disk. However, using the BIP0044 key scope, users will still be able
// to create old p2pkh addresses.
func upgradeToVersion5(ns walletdb.ReadWriteBucket, pubPassPhrase []byte) error {
// First, we'll check if there are any existing segwit addresses, which
// can't be upgraded to the new version. If so, we abort and warn the
// user.
err := ns.NestedReadBucket(addrBucketName).ForEach(
func(k []byte, v []byte) error {
row, err := deserializeAddressRow(v)
if err != nil {
return err
}
if row.addrType > adtScript {
return fmt.Errorf("segwit address exists in " +
"wallet, can't upgrade from v4 to " +
"v5: well, we tried ¯\\_(ツ)_/¯")
}
return nil
})
if err != nil {
return err
}
// Next, we'll write out the new database version.
if err := putManagerVersion(ns, 5); err != nil {
return err
}
// First, we'll need to create the new buckets that are used in the new
// database version.
scopeBucket, err := ns.CreateBucket(scopeBucketName)
if err != nil {
str := "failed to create scope bucket"
return managerError(ErrDatabase, str, err)
}
scopeSchemas, err := ns.CreateBucket(scopeSchemaBucketName)
if err != nil {
str := "failed to create scope schema bucket"
return managerError(ErrDatabase, str, err)
}
// With the buckets created, we can now create the default BIP0044
// scope which will be the only scope usable in the database after this
// update.
scopeKey := scopeToBytes(&KeyScopeBIP0044)
scopeSchema := ScopeAddrMap[KeyScopeBIP0044]
schemaBytes := scopeSchemaToBytes(&scopeSchema)
if err := scopeSchemas.Put(scopeKey[:], schemaBytes); err != nil {
return err
}
if err := createScopedManagerNS(scopeBucket, &KeyScopeBIP0044); err != nil {
return err
}
bip44Bucket := scopeBucket.NestedReadWriteBucket(scopeKey[:])
// With the buckets created, we now need to port over *each* item in
// the prior main bucket, into the new default scope.
mainBucket := ns.NestedReadWriteBucket(mainBucketName)
// First, we'll move over the encrypted coin type private and public
// keys to the new sub-bucket.
encCoinPrivKeys := mainBucket.Get(coinTypePrivKeyName)
encCoinPubKeys := mainBucket.Get(coinTypePubKeyName)
err = bip44Bucket.Put(coinTypePrivKeyName, encCoinPrivKeys)
if err != nil {
return err
}
err = bip44Bucket.Put(coinTypePubKeyName, encCoinPubKeys)
if err != nil {
return err
}
if err := mainBucket.Delete(coinTypePrivKeyName); err != nil {
return err
}
if err := mainBucket.Delete(coinTypePubKeyName); err != nil {
return err
}
// Next, we'll move over everything that was in the meta bucket to the
// meta bucket within the new scope.
metaBucket := ns.NestedReadWriteBucket(metaBucketName)
lastAccount := metaBucket.Get(lastAccountName)
if err := metaBucket.Delete(lastAccountName); err != nil {
return err
}
scopedMetaBucket := bip44Bucket.NestedReadWriteBucket(metaBucketName)
err = scopedMetaBucket.Put(lastAccountName, lastAccount)
if err != nil {
return err
}
// Finally, we'll recursively move over a set of keys which were
// formerly under the main bucket, into the new scoped buckets. We'll
// do so by obtaining a slice of all the keys that we need to modify
// and then recursing through each of them, moving both nested buckets
// and key/value pairs.
keysToMigrate := [][]byte{
acctBucketName, addrBucketName, usedAddrBucketName,
addrAcctIdxBucketName, acctNameIdxBucketName, acctIDIdxBucketName,
}
// Migrate each bucket recursively.
for _, bucketKey := range keysToMigrate {
err := migrateRecursively(ns, bip44Bucket, bucketKey)
if err != nil {
return err
}
}
return nil
}
// migrateRecursively moves a nested bucket from one bucket to another,
// recursing into nested buckets as required.
func migrateRecursively(src, dst walletdb.ReadWriteBucket,
bucketKey []byte) error {
// Within this bucket key, we'll migrate over, then delete each key.
bucketToMigrate := src.NestedReadWriteBucket(bucketKey)
newBucket, err := dst.CreateBucketIfNotExists(bucketKey)
if err != nil {
return err
}
err = bucketToMigrate.ForEach(func(k, v []byte) error {
if nestedBucket := bucketToMigrate.
NestedReadBucket(k); nestedBucket != nil {
// We have a nested bucket, so recurse into it.
return migrateRecursively(bucketToMigrate, newBucket, k)
}
if err := newBucket.Put(k, v); err != nil {
return err
}
return bucketToMigrate.Delete(k)
})
if err != nil {
return err
}
// Finally, we'll delete the bucket itself.
if err := src.DeleteNestedBucket(bucketKey); err != nil {
return err
}
return nil
}

View file

@ -1522,14 +1522,6 @@ func Open(ns walletdb.ReadBucket, pubPassphrase []byte,
return loadManager(ns, pubPassphrase, chainParams) return loadManager(ns, pubPassphrase, chainParams)
} }
// DoUpgrades performs any necessary upgrades to the address manager contained
// in the wallet database, namespaced by the top level bucket key namespaceKey.
func DoUpgrades(db walletdb.DB, namespaceKey []byte, pubPassphrase []byte,
chainParams *chaincfg.Params, cbs *OpenCallbacks) error {
return upgradeManager(db, namespaceKey, pubPassphrase, chainParams, cbs)
}
// createManagerKeyScope creates a new key scoped for a target manager's scope. // createManagerKeyScope creates a new key scoped for a target manager's scope.
// This partitions key derivation for a particular purpose+coin tuple, allowing // This partitions key derivation for a particular purpose+coin tuple, allowing
// multiple address derivation schems to be maintained concurrently. // multiple address derivation schems to be maintained concurrently.

256
waddrmgr/migrations.go Normal file
View file

@ -0,0 +1,256 @@
package waddrmgr
import (
"fmt"
"github.com/btcsuite/btcwallet/walletdb"
"github.com/btcsuite/btcwallet/walletdb/migration"
)
// versions is a list of the different database versions. The last entry should
// reflect the latest database state. If the database happens to be at a version
// number lower than the latest, migrations will be performed in order to catch
// it up.
var versions = []migration.Version{
{
Number: 2,
Migration: upgradeToVersion2,
},
{
Number: 5,
Migration: upgradeToVersion5,
},
}
// getLatestVersion returns the version number of the latest database version.
func getLatestVersion() uint32 {
return versions[len(versions)-1].Number
}
// MigrationManager is an implementation of the migration.Manager interface that
// will be used to handle migrations for the address manager. It exposes the
// necessary parameters required to successfully perform migrations.
type MigrationManager struct {
ns walletdb.ReadWriteBucket
}
// A compile-time assertion to ensure that MigrationManager implements the
// migration.Manager interface.
var _ migration.Manager = (*MigrationManager)(nil)
// NewMigrationManager creates a new migration manager for the address manager.
// The given bucket should reflect the top-level bucket in which all of the
// address manager's data is contained within.
func NewMigrationManager(ns walletdb.ReadWriteBucket) *MigrationManager {
return &MigrationManager{ns: ns}
}
// Name returns the name of the service we'll be attempting to upgrade.
//
// NOTE: This method is part of the migration.Manager interface.
func (m *MigrationManager) Name() string {
return "wallet address manager"
}
// Namespace returns the top-level bucket of the service.
//
// NOTE: This method is part of the migration.Manager interface.
func (m *MigrationManager) Namespace() walletdb.ReadWriteBucket {
return m.ns
}
// CurrentVersion returns the current version of the service's database.
//
// NOTE: This method is part of the migration.Manager interface.
func (m *MigrationManager) CurrentVersion(ns walletdb.ReadBucket) (uint32, error) {
if ns == nil {
ns = m.ns
}
return fetchManagerVersion(ns)
}
// SetVersion sets the version of the service's database.
//
// NOTE: This method is part of the migration.Manager interface.
func (m *MigrationManager) SetVersion(ns walletdb.ReadWriteBucket,
version uint32) error {
if ns == nil {
ns = m.ns
}
return putManagerVersion(m.ns, version)
}
// Versions returns all of the available database versions of the service.
//
// NOTE: This method is part of the migration.Manager interface.
func (m *MigrationManager) Versions() []migration.Version {
return versions
}
// upgradeToVersion2 upgrades the database from version 1 to version 2
// 'usedAddrBucketName' a bucket for storing addrs flagged as marked is
// initialized and it will be updated on the next rescan.
func upgradeToVersion2(ns walletdb.ReadWriteBucket) error {
currentMgrVersion := uint32(2)
_, err := ns.CreateBucketIfNotExists(usedAddrBucketName)
if err != nil {
str := "failed to create used addresses bucket"
return managerError(ErrDatabase, str, err)
}
return putManagerVersion(ns, currentMgrVersion)
}
// upgradeToVersion5 upgrades the database from version 4 to version 5. After
// this update, the new ScopedKeyManager features cannot be used. This is due
// to the fact that in version 5, we now store the encrypted master private
// keys on disk. However, using the BIP0044 key scope, users will still be able
// to create old p2pkh addresses.
func upgradeToVersion5(ns walletdb.ReadWriteBucket) error {
// First, we'll check if there are any existing segwit addresses, which
// can't be upgraded to the new version. If so, we abort and warn the
// user.
err := ns.NestedReadBucket(addrBucketName).ForEach(
func(k []byte, v []byte) error {
row, err := deserializeAddressRow(v)
if err != nil {
return err
}
if row.addrType > adtScript {
return fmt.Errorf("segwit address exists in " +
"wallet, can't upgrade from v4 to " +
"v5: well, we tried ¯\\_(ツ)_/¯")
}
return nil
})
if err != nil {
return err
}
// Next, we'll write out the new database version.
if err := putManagerVersion(ns, 5); err != nil {
return err
}
// First, we'll need to create the new buckets that are used in the new
// database version.
scopeBucket, err := ns.CreateBucket(scopeBucketName)
if err != nil {
str := "failed to create scope bucket"
return managerError(ErrDatabase, str, err)
}
scopeSchemas, err := ns.CreateBucket(scopeSchemaBucketName)
if err != nil {
str := "failed to create scope schema bucket"
return managerError(ErrDatabase, str, err)
}
// With the buckets created, we can now create the default BIP0044
// scope which will be the only scope usable in the database after this
// update.
scopeKey := scopeToBytes(&KeyScopeBIP0044)
scopeSchema := ScopeAddrMap[KeyScopeBIP0044]
schemaBytes := scopeSchemaToBytes(&scopeSchema)
if err := scopeSchemas.Put(scopeKey[:], schemaBytes); err != nil {
return err
}
if err := createScopedManagerNS(scopeBucket, &KeyScopeBIP0044); err != nil {
return err
}
bip44Bucket := scopeBucket.NestedReadWriteBucket(scopeKey[:])
// With the buckets created, we now need to port over *each* item in
// the prior main bucket, into the new default scope.
mainBucket := ns.NestedReadWriteBucket(mainBucketName)
// First, we'll move over the encrypted coin type private and public
// keys to the new sub-bucket.
encCoinPrivKeys := mainBucket.Get(coinTypePrivKeyName)
encCoinPubKeys := mainBucket.Get(coinTypePubKeyName)
err = bip44Bucket.Put(coinTypePrivKeyName, encCoinPrivKeys)
if err != nil {
return err
}
err = bip44Bucket.Put(coinTypePubKeyName, encCoinPubKeys)
if err != nil {
return err
}
if err := mainBucket.Delete(coinTypePrivKeyName); err != nil {
return err
}
if err := mainBucket.Delete(coinTypePubKeyName); err != nil {
return err
}
// Next, we'll move over everything that was in the meta bucket to the
// meta bucket within the new scope.
metaBucket := ns.NestedReadWriteBucket(metaBucketName)
lastAccount := metaBucket.Get(lastAccountName)
if err := metaBucket.Delete(lastAccountName); err != nil {
return err
}
scopedMetaBucket := bip44Bucket.NestedReadWriteBucket(metaBucketName)
err = scopedMetaBucket.Put(lastAccountName, lastAccount)
if err != nil {
return err
}
// Finally, we'll recursively move over a set of keys which were
// formerly under the main bucket, into the new scoped buckets. We'll
// do so by obtaining a slice of all the keys that we need to modify
// and then recursing through each of them, moving both nested buckets
// and key/value pairs.
keysToMigrate := [][]byte{
acctBucketName, addrBucketName, usedAddrBucketName,
addrAcctIdxBucketName, acctNameIdxBucketName, acctIDIdxBucketName,
}
// Migrate each bucket recursively.
for _, bucketKey := range keysToMigrate {
err := migrateRecursively(ns, bip44Bucket, bucketKey)
if err != nil {
return err
}
}
return nil
}
// migrateRecursively moves a nested bucket from one bucket to another,
// recursing into nested buckets as required.
func migrateRecursively(src, dst walletdb.ReadWriteBucket,
bucketKey []byte) error {
// Within this bucket key, we'll migrate over, then delete each key.
bucketToMigrate := src.NestedReadWriteBucket(bucketKey)
newBucket, err := dst.CreateBucketIfNotExists(bucketKey)
if err != nil {
return err
}
err = bucketToMigrate.ForEach(func(k, v []byte) error {
if nestedBucket := bucketToMigrate.
NestedReadBucket(k); nestedBucket != nil {
// We have a nested bucket, so recurse into it.
return migrateRecursively(bucketToMigrate, newBucket, k)
}
if err := newBucket.Put(k, v); err != nil {
return err
}
return bucketToMigrate.Delete(k)
})
if err != nil {
return err
}
// Finally, we'll delete the bucket itself.
if err := src.DeleteNestedBucket(bucketKey); err != nil {
return err
}
return nil
}

View file

@ -4,7 +4,10 @@
package wallet package wallet
import "github.com/btcsuite/btclog" import (
"github.com/btcsuite/btclog"
"github.com/btcsuite/btcwallet/walletdb/migration"
)
// log is a logger that is initialized with no output filters. This // log is a logger that is initialized with no output filters. This
// means the package will not perform any logging by default until the caller // means the package will not perform any logging by default until the caller
@ -19,7 +22,7 @@ func init() {
// DisableLog disables all library log output. Logging output is disabled // DisableLog disables all library log output. Logging output is disabled
// by default until either UseLogger or SetLogWriter are called. // by default until either UseLogger or SetLogWriter are called.
func DisableLog() { func DisableLog() {
log = btclog.Disabled UseLogger(btclog.Disabled)
} }
// UseLogger uses a specified Logger to output package logging info. // UseLogger uses a specified Logger to output package logging info.
@ -27,6 +30,8 @@ func DisableLog() {
// using btclog. // using btclog.
func UseLogger(logger btclog.Logger) { func UseLogger(logger btclog.Logger) {
log = logger log = logger
migration.UseLogger(logger)
} }
// LogClosure is a closure that can be printed with %v to be used to // LogClosure is a closure that can be printed with %v to be used to

View file

@ -30,6 +30,7 @@ import (
"github.com/btcsuite/btcwallet/wallet/txauthor" "github.com/btcsuite/btcwallet/wallet/txauthor"
"github.com/btcsuite/btcwallet/wallet/txrules" "github.com/btcsuite/btcwallet/wallet/txrules"
"github.com/btcsuite/btcwallet/walletdb" "github.com/btcsuite/btcwallet/walletdb"
"github.com/btcsuite/btcwallet/walletdb/migration"
"github.com/btcsuite/btcwallet/wtxmgr" "github.com/btcsuite/btcwallet/wtxmgr"
"github.com/davecgh/go-spew/spew" "github.com/davecgh/go-spew/spew"
) )
@ -3382,58 +3383,49 @@ func Create(db walletdb.DB, pubPass, privPass, seed []byte, params *chaincfg.Par
func Open(db walletdb.DB, pubPass []byte, cbs *waddrmgr.OpenCallbacks, func Open(db walletdb.DB, pubPass []byte, cbs *waddrmgr.OpenCallbacks,
params *chaincfg.Params, recoveryWindow uint32) (*Wallet, error) { params *chaincfg.Params, recoveryWindow uint32) (*Wallet, error) {
err := walletdb.View(db, func(tx walletdb.ReadTx) error { var (
waddrmgrBucket := tx.ReadBucket(waddrmgrNamespaceKey) addrMgr *waddrmgr.Manager
if waddrmgrBucket == nil { txMgr *wtxmgr.Store
)
// Before attempting to open the wallet, we'll check if there are any
// database upgrades for us to proceed. We'll also create our references
// to the address and transaction managers, as they are backed by the
// database.
err := walletdb.Update(db, func(tx walletdb.ReadWriteTx) error {
addrMgrBucket := tx.ReadWriteBucket(waddrmgrNamespaceKey)
if addrMgrBucket == nil {
return errors.New("missing address manager namespace") return errors.New("missing address manager namespace")
} }
wtxmgrBucket := tx.ReadBucket(wtxmgrNamespaceKey) txMgrBucket := tx.ReadWriteBucket(wtxmgrNamespaceKey)
if wtxmgrBucket == nil { if txMgrBucket == nil {
return errors.New("missing transaction manager namespace") return errors.New("missing transaction manager namespace")
} }
addrMgrUpgrader := waddrmgr.NewMigrationManager(addrMgrBucket)
txMgrUpgrader := wtxmgr.NewMigrationManager(txMgrBucket)
err := migration.Upgrade(txMgrUpgrader, addrMgrUpgrader)
if err != nil {
return err
}
addrMgr, err = waddrmgr.Open(addrMgrBucket, pubPass, params)
if err != nil {
return err
}
txMgr, err = wtxmgr.Open(txMgrBucket, params)
if err != nil {
return err
}
return nil return nil
}) })
if err != nil { if err != nil {
return nil, err return nil, err
} }
// Perform upgrades as necessary. Each upgrade is done under its own
// transaction, which is managed by each package itself, so the entire
// DB is passed instead of passing already opened write transaction.
//
// This will need to change later when upgrades in one package depend on
// data in another (such as removing chain synchronization from address
// manager).
err = waddrmgr.DoUpgrades(db, waddrmgrNamespaceKey, pubPass, params, cbs)
if err != nil {
return nil, err
}
err = wtxmgr.DoUpgrades(db, wtxmgrNamespaceKey)
if err != nil {
return nil, err
}
// Open database abstraction instances
var (
addrMgr *waddrmgr.Manager
txMgr *wtxmgr.Store
)
err = walletdb.View(db, func(tx walletdb.ReadTx) error {
addrmgrNs := tx.ReadBucket(waddrmgrNamespaceKey)
txmgrNs := tx.ReadBucket(wtxmgrNamespaceKey)
var err error
addrMgr, err = waddrmgr.Open(addrmgrNs, pubPass, params)
if err != nil {
return err
}
txMgr, err = wtxmgr.Open(txmgrNs, params)
return err
})
if err != nil {
return nil, err
}
log.Infof("Opened wallet") // TODO: log balance? last sync height? log.Infof("Opened wallet") // TODO: log balance? last sync height?
w := &Wallet{ w := &Wallet{
publicPassphrase: pubPass, publicPassphrase: pubPass,
db: db, db: db,
@ -3456,9 +3448,11 @@ func Open(db walletdb.DB, pubPass []byte, cbs *waddrmgr.OpenCallbacks,
chainParams: params, chainParams: params,
quit: make(chan struct{}), quit: make(chan struct{}),
} }
w.NtfnServer = newNotificationServer(w) w.NtfnServer = newNotificationServer(w)
w.TxStore.NotifyUnspent = func(hash *chainhash.Hash, index uint32) { w.TxStore.NotifyUnspent = func(hash *chainhash.Hash, index uint32) {
w.NtfnServer.notifyUnspentOutput(0, hash, index) w.NtfnServer.notifyUnspentOutput(0, hash, index)
} }
return w, nil return w, nil
} }

43
walletdb/migration/log.go Normal file
View file

@ -0,0 +1,43 @@
package migration
import "github.com/btcsuite/btclog"
// log is a logger that is initialized with no output filters. This
// means the package will not perform any logging by default until the caller
// requests it.
var log btclog.Logger
// The default amount of logging is none.
func init() {
DisableLog()
}
// DisableLog disables all library log output. Logging output is disabled
// by default until either UseLogger or SetLogWriter are called.
func DisableLog() {
UseLogger(btclog.Disabled)
}
// UseLogger uses a specified Logger to output package logging info.
// This should be used in preference to SetLogWriter if the caller is also
// using btclog.
func UseLogger(logger btclog.Logger) {
log = logger
}
// LogClosure is a closure that can be printed with %v to be used to
// generate expensive-to-create data for a detailed log level and avoid doing
// the work if the data isn't printed.
type logClosure func() string
// String invokes the log closure and returns the results string.
func (c logClosure) String() string {
return c()
}
// newLogClosure returns a new closure over the passed function which allows
// it to be used as a parameter in a logging function that is only invoked when
// the logging level is such that the message will actually be logged.
func newLogClosure(c func() string) logClosure {
return logClosure(c)
}

View file

@ -0,0 +1,162 @@
package migration
import (
"errors"
"sort"
"github.com/btcsuite/btcwallet/walletdb"
)
var (
// ErrReversion is an error returned when an attempt to revert to a
// previous version is detected. This is done to provide safety to users
// as some upgrades may not be backwards-compatible.
ErrReversion = errors.New("reverting to a previous version is not " +
"supported")
)
// Version denotes the version number of the database. A migration can be used
// to bring a previous version of the database to a later one.
type Version struct {
// Number represents the number of this version.
Number uint32
// Migration represents a migration function that modifies the database
// state. Care must be taken so that consequent migrations build off of
// the previous one in order to ensure the consistency of the database.
Migration func(walletdb.ReadWriteBucket) error
}
// Manager is an interface that exposes the necessary methods needed in order to
// migrate/upgrade a service. Each service (i.e., an implementation of this
// interface) can then use the Upgrade function to perform any required database
// migrations.
type Manager interface {
// Name returns the name of the service we'll be attempting to upgrade.
Name() string
// Namespace returns the top-level bucket of the service.
Namespace() walletdb.ReadWriteBucket
// CurrentVersion returns the current version of the service's database.
CurrentVersion(walletdb.ReadBucket) (uint32, error)
// SetVersion sets the version of the service's database.
SetVersion(walletdb.ReadWriteBucket, uint32) error
// Versions returns all of the available database versions of the
// service.
Versions() []Version
}
// GetLatestVersion returns the latest version available from the given slice.
func GetLatestVersion(versions []Version) uint32 {
if len(versions) == 0 {
return 0
}
// Before determining the latest version number, we'll sort the slice to
// ensure it reflects the last element.
sort.Slice(versions, func(i, j int) bool {
return versions[i].Number < versions[j].Number
})
return versions[len(versions)-1].Number
}
// VersionsToApply determines which versions should be applied as migrations
// based on the current version.
func VersionsToApply(currentVersion uint32, versions []Version) []Version {
// Assuming the migration versions are in increasing order, we'll apply
// any migrations that have a version number lower than our current one.
var upgradeVersions []Version
for _, version := range versions {
if version.Number > currentVersion {
upgradeVersions = append(upgradeVersions, version)
}
}
// Before returning, we'll sort the slice by its version number to
// ensure the migrations are applied in their intended order.
sort.Slice(upgradeVersions, func(i, j int) bool {
return upgradeVersions[i].Number < upgradeVersions[j].Number
})
return upgradeVersions
}
// Upgrade attempts to upgrade a group of services exposed through the Manager
// interface. Each service will go through its available versions and determine
// whether it needs to apply any.
//
// NOTE: In order to guarantee fault-tolerance, each service upgrade should
// happen within the same database transaction.
func Upgrade(mgrs ...Manager) error {
for _, mgr := range mgrs {
if err := upgrade(mgr); err != nil {
return err
}
}
return nil
}
// upgrade attempts to upgrade a service expose through its implementation of
// the Manager interface. This function will determine whether any new versions
// need to be applied based on the service's current version and latest
// available one.
func upgrade(mgr Manager) error {
// We'll start by fetching the service's current and latest version.
ns := mgr.Namespace()
currentVersion, err := mgr.CurrentVersion(ns)
if err != nil {
return err
}
versions := mgr.Versions()
latestVersion := GetLatestVersion(versions)
switch {
// If the current version is greater than the latest, then the service
// is attempting to revert to a previous version that's possibly
// backwards-incompatible. To prevent this, we'll return an error
// indicating so.
case currentVersion > latestVersion:
return ErrReversion
// If the current version is behind the latest version, we'll need to
// apply all of the newer versions in order to catch up to the latest.
case currentVersion < latestVersion:
versions := VersionsToApply(currentVersion, versions)
mgrName := mgr.Name()
ns := mgr.Namespace()
for _, version := range versions {
log.Infof("Applying %v migration #%d", mgrName,
version.Number)
// We'll only run a migration if there is one available
// for this version.
if version.Migration != nil {
err := version.Migration(ns)
if err != nil {
log.Errorf("Unable to apply %v "+
"migration #%d: %v", mgrName,
version.Number, err)
return err
}
}
}
// With all of the versions applied, we can now reflect the
// latest version upon the service.
if err := mgr.SetVersion(ns, latestVersion); err != nil {
return err
}
// If the current version matches the latest one, there's no upgrade
// needed and we can safely exit.
case currentVersion == latestVersion:
}
return nil
}

View file

@ -0,0 +1,343 @@
package migration_test
import (
"errors"
"fmt"
"reflect"
"testing"
"github.com/btcsuite/btcwallet/walletdb"
"github.com/btcsuite/btcwallet/walletdb/migration"
"github.com/davecgh/go-spew/spew"
)
type mockMigrationManager struct {
currentVersion uint32
versions []migration.Version
}
var _ migration.Manager = (*mockMigrationManager)(nil)
func (m *mockMigrationManager) Name() string {
return "mock"
}
func (m *mockMigrationManager) Namespace() walletdb.ReadWriteBucket {
return nil
}
func (m *mockMigrationManager) CurrentVersion(_ walletdb.ReadBucket) (uint32, error) {
return m.currentVersion, nil
}
func (m *mockMigrationManager) SetVersion(_ walletdb.ReadWriteBucket, version uint32) error {
m.currentVersion = version
return nil
}
func (m *mockMigrationManager) Versions() []migration.Version {
return m.versions
}
// TestGetLatestVersion ensures that we can properly retrieve the latest version
// from a slice of versions.
func TestGetLatestVersion(t *testing.T) {
t.Parallel()
tests := []struct {
versions []migration.Version
latestVersion uint32
}{
{
versions: []migration.Version{},
latestVersion: 0,
},
{
versions: []migration.Version{
{
Number: 1,
Migration: nil,
},
},
latestVersion: 1,
},
{
versions: []migration.Version{
{
Number: 1,
Migration: nil,
},
{
Number: 2,
Migration: nil,
},
},
latestVersion: 2,
},
{
versions: []migration.Version{
{
Number: 2,
Migration: nil,
},
{
Number: 0,
Migration: nil,
},
{
Number: 1,
Migration: nil,
},
},
latestVersion: 2,
},
}
for i, test := range tests {
latestVersion := migration.GetLatestVersion(test.versions)
if latestVersion != test.latestVersion {
t.Fatalf("test %d: expected latest version %d, got %d",
i, test.latestVersion, latestVersion)
}
}
}
// TestVersionsToApply ensures that the proper versions that needs to be applied
// are returned given the current version.
func TestVersionsToApply(t *testing.T) {
t.Parallel()
tests := []struct {
currentVersion uint32
versions []migration.Version
versionsToApply []migration.Version
}{
{
currentVersion: 0,
versions: []migration.Version{
{
Number: 0,
Migration: nil,
},
},
versionsToApply: nil,
},
{
currentVersion: 1,
versions: []migration.Version{
{
Number: 0,
Migration: nil,
},
},
versionsToApply: nil,
},
{
currentVersion: 0,
versions: []migration.Version{
{
Number: 0,
Migration: nil,
},
{
Number: 1,
Migration: nil,
},
{
Number: 2,
Migration: nil,
},
},
versionsToApply: []migration.Version{
{
Number: 1,
Migration: nil,
},
{
Number: 2,
Migration: nil,
},
},
},
{
currentVersion: 0,
versions: []migration.Version{
{
Number: 2,
Migration: nil,
},
{
Number: 0,
Migration: nil,
},
{
Number: 1,
Migration: nil,
},
},
versionsToApply: []migration.Version{
{
Number: 1,
Migration: nil,
},
{
Number: 2,
Migration: nil,
},
},
},
}
for i, test := range tests {
versionsToApply := migration.VersionsToApply(
test.currentVersion, test.versions,
)
if !reflect.DeepEqual(versionsToApply, test.versionsToApply) {
t.Fatalf("test %d: versions to apply mismatch\n"+
"expected: %v\ngot: %v", i,
spew.Sdump(test.versionsToApply),
spew.Sdump(versionsToApply))
}
}
}
// TestUpgradeRevert ensures that we are not able to revert to a previous
// version.
func TestUpgradeRevert(t *testing.T) {
t.Parallel()
m := &mockMigrationManager{
currentVersion: 1,
versions: []migration.Version{
{
Number: 0,
Migration: nil,
},
},
}
if err := migration.Upgrade(m); err != migration.ErrReversion {
t.Fatalf("expected Upgrade to fail with ErrReversion, got %v",
err)
}
}
// TestUpgradeSameVersion ensures that no upgrades happen if the current version
// matches the latest.
func TestUpgradeSameVersion(t *testing.T) {
t.Parallel()
m := &mockMigrationManager{
currentVersion: 1,
versions: []migration.Version{
{
Number: 0,
Migration: nil,
},
{
Number: 1,
Migration: func(walletdb.ReadWriteBucket) error {
return errors.New("migration should " +
"not happen due to already " +
"being on the latest version")
},
},
},
}
if err := migration.Upgrade(m); err != nil {
t.Fatalf("unable to upgrade: %v", err)
}
}
// TestUpgradeNewVersion ensures that we can properly upgrade to a newer version
// if available.
func TestUpgradeNewVersion(t *testing.T) {
t.Parallel()
versions := []migration.Version{
{
Number: 0,
Migration: nil,
},
{
Number: 1,
Migration: func(walletdb.ReadWriteBucket) error {
return nil
},
},
}
m := &mockMigrationManager{
currentVersion: 0,
versions: versions,
}
if err := migration.Upgrade(m); err != nil {
t.Fatalf("unable to upgrade: %v", err)
}
latestVersion := migration.GetLatestVersion(versions)
if m.currentVersion != latestVersion {
t.Fatalf("expected current version to match latest: "+
"current=%d vs latest=%d", m.currentVersion,
latestVersion)
}
}
// TestUpgradeMultipleVersions ensures that we can go through multiple upgrades
// in-order to reach the latest version.
func TestUpgradeMultipleVersions(t *testing.T) {
t.Parallel()
previousVersion := uint32(0)
versions := []migration.Version{
{
Number: previousVersion,
Migration: nil,
},
{
Number: 1,
Migration: func(walletdb.ReadWriteBucket) error {
if previousVersion != 0 {
return fmt.Errorf("expected previous "+
"version to be %d, got %d", 0,
previousVersion)
}
previousVersion = 1
return nil
},
},
{
Number: 2,
Migration: func(walletdb.ReadWriteBucket) error {
if previousVersion != 1 {
return fmt.Errorf("expected previous "+
"version to be %d, got %d", 1,
previousVersion)
}
previousVersion = 2
return nil
},
},
}
m := &mockMigrationManager{
currentVersion: 0,
versions: versions,
}
if err := migration.Upgrade(m); err != nil {
t.Fatalf("unable to upgrade: %v", err)
}
latestVersion := migration.GetLatestVersion(versions)
if m.currentVersion != latestVersion {
t.Fatalf("expected current version to match latest: "+
"current=%d vs latest=%d", m.currentVersion,
latestVersion)
}
}

View file

@ -52,13 +52,6 @@ import (
// keys iterating in order. // keys iterating in order.
var byteOrder = binary.BigEndian var byteOrder = binary.BigEndian
// Database versions. Versions start at 1 and increment for each database
// change.
const (
// LatestVersion is the most recent store version.
LatestVersion = 1
)
// This package makes assumptions that the width of a chainhash.Hash is always // This package makes assumptions that the width of a chainhash.Hash is always
// 32 bytes. If this is ever changed (unlikely for bitcoin, possible for alts), // 32 bytes. If this is ever changed (unlikely for bitcoin, possible for alts),
// offsets have to be rewritten. Use a compile-time assertion that this // offsets have to be rewritten. Use a compile-time assertion that this
@ -1264,39 +1257,25 @@ func deleteRawUnminedInput(ns walletdb.ReadWriteBucket, k []byte) error {
// openStore opens an existing transaction store from the passed namespace. // openStore opens an existing transaction store from the passed namespace.
func openStore(ns walletdb.ReadBucket) error { func openStore(ns walletdb.ReadBucket) error {
v := ns.Get(rootVersion) version, err := fetchVersion(ns)
if len(v) != 4 { if err != nil {
str := "no transaction store exists in namespace" return err
return storeError(ErrNoExists, str, nil)
} }
version := byteOrder.Uint32(v)
if version < LatestVersion { latestVersion := getLatestVersion()
if version < latestVersion {
str := fmt.Sprintf("a database upgrade is required to upgrade "+ str := fmt.Sprintf("a database upgrade is required to upgrade "+
"wtxmgr from recorded version %d to the latest version %d", "wtxmgr from recorded version %d to the latest version %d",
version, LatestVersion) version, latestVersion)
return storeError(ErrNeedsUpgrade, str, nil) return storeError(ErrNeedsUpgrade, str, nil)
} }
if version > LatestVersion { if version > latestVersion {
str := fmt.Sprintf("version recorded version %d is newer that latest "+ str := fmt.Sprintf("version recorded version %d is newer that "+
"understood version %d", version, LatestVersion) "latest understood version %d", version, latestVersion)
return storeError(ErrUnknownVersion, str, nil) return storeError(ErrUnknownVersion, str, nil)
} }
// Upgrade the tx store as needed, one version at a time, until
// LatestVersion is reached. Versions are not skipped when performing
// database upgrades, and each upgrade is done in its own transaction.
//
// No upgrades yet.
//if version < LatestVersion {
// err := scopedUpdate(namespace, func(ns walletdb.Bucket) error {
// })
// if err != nil {
// // Handle err
// }
//}
return nil return nil
} }
@ -1311,26 +1290,22 @@ func createStore(ns walletdb.ReadWriteBucket) error {
} }
// Write the latest store version. // Write the latest store version.
v := make([]byte, 4) if err := putVersion(ns, getLatestVersion()); err != nil {
byteOrder.PutUint32(v, LatestVersion) return err
err := ns.Put(rootVersion, v)
if err != nil {
str := "failed to store latest database version"
return storeError(ErrDatabase, str, err)
} }
// Save the creation date of the store. // Save the creation date of the store.
v = make([]byte, 8) var v [8]byte
byteOrder.PutUint64(v, uint64(time.Now().Unix())) byteOrder.PutUint64(v[:], uint64(time.Now().Unix()))
err = ns.Put(rootCreateDate, v) err := ns.Put(rootCreateDate, v[:])
if err != nil { if err != nil {
str := "failed to store database creation time" str := "failed to store database creation time"
return storeError(ErrDatabase, str, err) return storeError(ErrDatabase, str, err)
} }
// Write a zero balance. // Write a zero balance.
v = make([]byte, 8) byteOrder.PutUint64(v[:], 0)
err = ns.Put(rootMinedBalance, v) err = ns.Put(rootMinedBalance, v[:])
if err != nil { if err != nil {
str := "failed to write zero balance" str := "failed to write zero balance"
return storeError(ErrDatabase, str, err) return storeError(ErrDatabase, str, err)
@ -1387,6 +1362,30 @@ func createStore(ns walletdb.ReadWriteBucket) error {
return nil return nil
} }
// putVersion modifies the version of the store to reflect the given version
// number.
func putVersion(ns walletdb.ReadWriteBucket, version uint32) error {
var v [4]byte
byteOrder.PutUint32(v[:], version)
if err := ns.Put(rootVersion, v[:]); err != nil {
str := "failed to store database version"
return storeError(ErrDatabase, str, err)
}
return nil
}
// fetchVersion fetches the current version of the store.
func fetchVersion(ns walletdb.ReadBucket) (uint32, error) {
v := ns.Get(rootVersion)
if len(v) != 4 {
str := "no transaction store exists in namespace"
return 0, storeError(ErrNoExists, str, nil)
}
return byteOrder.Uint32(v), nil
}
func scopedUpdate(db walletdb.DB, namespaceKey []byte, f func(walletdb.ReadWriteBucket) error) error { func scopedUpdate(db walletdb.DB, namespaceKey []byte, f func(walletdb.ReadWriteBucket) error) error {
tx, err := db.BeginReadWriteTx() tx, err := db.BeginReadWriteTx()
if err != nil { if err != nil {

83
wtxmgr/migrations.go Normal file
View file

@ -0,0 +1,83 @@
package wtxmgr
import (
"github.com/btcsuite/btcwallet/walletdb"
"github.com/btcsuite/btcwallet/walletdb/migration"
)
// versions is a list of the different database versions. The last entry should
// reflect the latest database state. If the database happens to be at a version
// number lower than the latest, migrations will be performed in order to catch
// it up.
var versions = []migration.Version{
{
Number: 1,
Migration: nil,
},
}
// getLatestVersion returns the version number of the latest database version.
func getLatestVersion() uint32 {
return versions[len(versions)-1].Number
}
// MigrationManager is an implementation of the migration.Manager interface that
// will be used to handle migrations for the address manager. It exposes the
// necessary parameters required to successfully perform migrations.
type MigrationManager struct {
ns walletdb.ReadWriteBucket
}
// A compile-time assertion to ensure that MigrationManager implements the
// migration.Manager interface.
var _ migration.Manager = (*MigrationManager)(nil)
// NewMigrationManager creates a new migration manager for the transaction
// manager. The given bucket should reflect the top-level bucket in which all
// of the transaction manager's data is contained within.
func NewMigrationManager(ns walletdb.ReadWriteBucket) *MigrationManager {
return &MigrationManager{ns: ns}
}
// Name returns the name of the service we'll be attempting to upgrade.
//
// NOTE: This method is part of the migration.Manager interface.
func (m *MigrationManager) Name() string {
return "wallet transaction manager"
}
// Namespace returns the top-level bucket of the service.
//
// NOTE: This method is part of the migration.Manager interface.
func (m *MigrationManager) Namespace() walletdb.ReadWriteBucket {
return m.ns
}
// CurrentVersion returns the current version of the service's database.
//
// NOTE: This method is part of the migration.Manager interface.
func (m *MigrationManager) CurrentVersion(ns walletdb.ReadBucket) (uint32, error) {
if ns == nil {
ns = m.ns
}
return fetchVersion(m.ns)
}
// SetVersion sets the version of the service's database.
//
// NOTE: This method is part of the migration.Manager interface.
func (m *MigrationManager) SetVersion(ns walletdb.ReadWriteBucket,
version uint32) error {
if ns == nil {
ns = m.ns
}
return putVersion(m.ns, version)
}
// Versions returns all of the available database versions of the service.
//
// NOTE: This method is part of the migration.Manager interface.
func (m *MigrationManager) Versions() []migration.Version {
return versions
}

View file

@ -140,14 +140,6 @@ type Store struct {
NotifyUnspent func(hash *chainhash.Hash, index uint32) NotifyUnspent func(hash *chainhash.Hash, index uint32)
} }
// DoUpgrades performs any necessary upgrades to the transaction history
// contained in the wallet database, namespaced by the top level bucket key
// namespaceKey.
func DoUpgrades(db walletdb.DB, namespaceKey []byte) error {
// No upgrades
return nil
}
// Open opens the wallet transaction store from a walletdb namespace. If the // Open opens the wallet transaction store from a walletdb namespace. If the
// store does not exist, ErrNoExist is returned. // store does not exist, ErrNoExist is returned.
func Open(ns walletdb.ReadBucket, chainParams *chaincfg.Params) (*Store, error) { func Open(ns walletdb.ReadBucket, chainParams *chaincfg.Params) (*Store, error) {