Synchronize all account file writes.

Fixes several hangs cased by incorrect locking, by removing the
locking.  Instead, a single goroutine manages all file writes.

The old account 'dirty' boolean flags have been removed.  Instead,
anytime an account structure is modified, the portion that was
modified (wallet, tx store, or utxo store) must be scheduled to be
written.
This commit is contained in:
Josh Rickmar 2014-01-28 23:04:10 -05:00
parent 0b371b09e8
commit 430db140ee
6 changed files with 412 additions and 396 deletions

View file

@ -70,53 +70,17 @@ type Account struct {
*wallet.Wallet *wallet.Wallet
mtx sync.RWMutex mtx sync.RWMutex
name string name string
dirty bool
fullRescan bool fullRescan bool
UtxoStore struct { UtxoStore struct {
sync.RWMutex sync.RWMutex
dirty bool s tx.UtxoStore
s tx.UtxoStore
} }
TxStore struct { TxStore struct {
sync.RWMutex sync.RWMutex
dirty bool s tx.TxStore
s tx.TxStore
} }
} }
// MarkDirtyWallet marks an account's wallet as dirty, and adds the
// account to the list of dirty accounts to be schedule to be synced to
// disk. It is a runtime error to call this without holding the wallet
// writer lock.
func (a *Account) MarkDirtyWallet() {
a.dirty = true
dirtyAccounts.Lock()
dirtyAccounts.m[a] = true
dirtyAccounts.Unlock()
}
// MarkDirtyUtxoStore marks an account's utxo store as dirty, and adds
// the account to the list of dirty accounts to be schedule to be synced to
// disk. It is a runtime error to call this without holding the utxo store
// writer lock.
func (a *Account) MarkDirtyUtxoStore() {
a.UtxoStore.dirty = true
dirtyAccounts.Lock()
dirtyAccounts.m[a] = true
dirtyAccounts.Unlock()
}
// MarkDirtyTxStore marks an account's tx store as dirty, and adds the
// account to the list of dirty accounts to be schedule to be synced to
// disk. It is a runtime error to call this without holding the tx store
// writer lock.
func (a *Account) MarkDirtyTxStore() {
a.TxStore.dirty = true
dirtyAccounts.Lock()
dirtyAccounts.m[a] = true
dirtyAccounts.Unlock()
}
// Lock locks the underlying wallet for an account. // Lock locks the underlying wallet for an account.
func (a *Account) Lock() error { func (a *Account) Lock() error {
a.mtx.Lock() a.mtx.Lock()
@ -155,15 +119,17 @@ func (a *Account) Unlock(passphrase []byte) error {
// that occured on a chain no longer considered to be the main chain. // that occured on a chain no longer considered to be the main chain.
func (a *Account) Rollback(height int32, hash *btcwire.ShaHash) { func (a *Account) Rollback(height int32, hash *btcwire.ShaHash) {
a.UtxoStore.Lock() a.UtxoStore.Lock()
a.UtxoStore.dirty = a.UtxoStore.dirty || a.UtxoStore.s.Rollback(height, hash) modified := a.UtxoStore.s.Rollback(height, hash)
a.UtxoStore.Unlock() a.UtxoStore.Unlock()
if modified {
a.ScheduleUtxoStoreWrite()
}
a.TxStore.Lock() a.TxStore.Lock()
a.TxStore.dirty = a.TxStore.dirty || a.TxStore.s.Rollback(height, hash) modified = a.TxStore.s.Rollback(height, hash)
a.TxStore.Unlock() a.TxStore.Unlock()
if modified {
if err := a.writeDirtyToDisk(); err != nil { a.ScheduleTxStoreWrite()
log.Errorf("cannot sync dirty wallet: %v", err)
} }
} }
@ -442,7 +408,7 @@ func (a *Account) ImportPrivKey(wif string, rescan bool) error {
} }
Rescan(CurrentRPCConn(), bs.Height, addrs) Rescan(CurrentRPCConn(), bs.Height, addrs)
a.writeDirtyToDisk() a.WriteScheduledToDisk()
}() }()
} }
return nil return nil
@ -464,22 +430,16 @@ func (a *Account) ImportWIFPrivateKey(wif string, bs *wallet.BlockStamp) (string
// Attempt to import private key into wallet. // Attempt to import private key into wallet.
a.mtx.Lock() a.mtx.Lock()
addr, err := a.Wallet.ImportPrivateKey(privkey, compressed, bs) addr, err := a.Wallet.ImportPrivateKey(privkey, compressed, bs)
a.mtx.Unlock()
if err != nil { if err != nil {
a.mtx.Unlock()
return "", err return "", err
} }
addrStr := addr.String() addrStr := addr.String()
// Immediately write dirty wallet to disk. // Immediately write wallet to disk.
// a.ScheduleWalletWrite()
// TODO(jrick): change writeDirtyToDisk to not grab the writer lock. if err := a.WriteScheduledToDisk(); err != nil {
// Don't want to let another goroutine waiting on the mutex to grab return "", fmt.Errorf("cannot write account: %v", err)
// the mutex before it is written to disk.
a.dirty = true
a.mtx.Unlock()
if err := a.writeDirtyToDisk(); err != nil {
log.Errorf("cannot write dirty wallet: %v", err)
return "", fmt.Errorf("import failed: cannot write wallet: %v", err)
} }
// Associate the imported address with this account. // Associate the imported address with this account.
@ -600,7 +560,7 @@ func (a *Account) RescanActiveAddresses() {
// Rescan active addresses starting at the determined block height. // Rescan active addresses starting at the determined block height.
Rescan(CurrentRPCConn(), beginBlock, a.ActivePaymentAddresses()) Rescan(CurrentRPCConn(), beginBlock, a.ActivePaymentAddresses())
a.writeDirtyToDisk() a.WriteScheduledToDisk()
} }
// SortedActivePaymentAddresses returns a slice of all active payment // SortedActivePaymentAddresses returns a slice of all active payment
@ -641,20 +601,18 @@ func (a *Account) NewAddress() (btcutil.Address, error) {
return nil, err return nil, err
} }
a.mtx.Lock()
// Get next address from wallet. // Get next address from wallet.
a.mtx.Lock()
addr, err := a.Wallet.NextChainedAddress(&bs, cfg.KeypoolSize) addr, err := a.Wallet.NextChainedAddress(&bs, cfg.KeypoolSize)
a.mtx.Unlock()
if err != nil { if err != nil {
a.mtx.Unlock()
return nil, err return nil, err
} }
// Immediately write updated wallet to disk. // Immediately write updated wallet to disk.
a.dirty = true a.ScheduleWalletWrite()
a.mtx.Unlock() if err := a.WriteScheduledToDisk(); err != nil {
if err = a.writeDirtyToDisk(); err != nil { return nil, fmt.Errorf("account write failed: %v", err)
log.Errorf("cannot sync dirty wallet: %v", err)
} }
// Mark this new address as belonging to this account. // Mark this new address as belonging to this account.

View file

@ -86,34 +86,24 @@ func (store *AccountStore) BlockNotify(bs *wallet.BlockStamp) {
store.RLock() store.RLock()
defer store.RUnlock() defer store.RUnlock()
for _, a := range store.accounts { for name, a := range store.accounts {
// The UTXO store will be dirty if it was modified // TODO: need a flag or check that the utxo store was actually
// from a tx notification. // modified, or this will notify even if there are no balance
if a.UtxoStore.dirty { // changes, or sending these notifications as the utxos are added.
// Notify all frontends of account's new unconfirmed confirmed := a.CalculateBalance(1)
// and confirmed balance. unconfirmed := a.CalculateBalance(0) - confirmed
confirmed := a.CalculateBalance(1) NotifyWalletBalance(frontendNotificationMaster, a.name, confirmed)
unconfirmed := a.CalculateBalance(0) - confirmed NotifyWalletBalanceUnconfirmed(frontendNotificationMaster, a.name,
NotifyWalletBalance(frontendNotificationMaster, unconfirmed)
a.name, confirmed)
NotifyWalletBalanceUnconfirmed(frontendNotificationMaster,
a.name, unconfirmed)
}
// The account is intentionaly not immediately synced to disk. // If this is the default account, update the block all accounts
// If btcd is performing an IBD, writing the wallet file for // are synced with, and schedule a wallet write.
// each newly-connected block would result in too many if name == "" {
// unnecessary disk writes. The UTXO and transaction stores a.mtx.Lock()
// could be written, but in the case of btcwallet closing a.Wallet.SetSyncedWith(bs)
// before writing the dirty wallet, both would have to be a.mtx.Unlock()
// pruned anyways. a.ScheduleWalletWrite()
// }
// Instead, the wallet is queued to be written to disk at the
// next scheduled disk sync.
a.mtx.Lock()
a.Wallet.SetSyncedWith(bs)
a.MarkDirtyWallet()
a.mtx.Unlock()
} }
} }
@ -121,7 +111,7 @@ func (store *AccountStore) BlockNotify(bs *wallet.BlockStamp) {
// sent transaction with the same txid as from a txmined notification. If // sent transaction with the same txid as from a txmined notification. If
// the transaction IDs match, the record in the TxStore is updated with // the transaction IDs match, the record in the TxStore is updated with
// the full information about the newly-mined tx, and the TxStore is // the full information about the newly-mined tx, and the TxStore is
// marked as dirty. // scheduled to be written to disk..
func (store *AccountStore) RecordMinedTx(txid *btcwire.ShaHash, func (store *AccountStore) RecordMinedTx(txid *btcwire.ShaHash,
blkhash *btcwire.ShaHash, blkheight int32, blkindex int, blkhash *btcwire.ShaHash, blkheight int32, blkindex int,
blktime int64) error { blktime int64) error {
@ -141,19 +131,17 @@ func (store *AccountStore) RecordMinedTx(txid *btcwire.ShaHash,
sendtx, ok := account.TxStore.s[i].(*tx.SendTx) sendtx, ok := account.TxStore.s[i].(*tx.SendTx)
if ok { if ok {
if bytes.Equal(txid.Bytes(), sendtx.TxID[:]) { if bytes.Equal(txid.Bytes(), sendtx.TxID[:]) {
// Unlock the held reader lock and wait for
// the writer lock.
account.TxStore.RUnlock() account.TxStore.RUnlock()
account.TxStore.Lock()
account.TxStore.Lock()
copy(sendtx.BlockHash[:], blkhash.Bytes()) copy(sendtx.BlockHash[:], blkhash.Bytes())
sendtx.BlockHeight = blkheight sendtx.BlockHeight = blkheight
sendtx.BlockIndex = int32(blkindex) sendtx.BlockIndex = int32(blkindex)
sendtx.BlockTime = blktime sendtx.BlockTime = blktime
account.MarkDirtyTxStore()
// Release writer lock and return.
account.TxStore.Unlock() account.TxStore.Unlock()
account.ScheduleTxStoreWrite()
return nil return nil
} }
} }
@ -205,10 +193,10 @@ func (store *AccountStore) CreateEncryptedWallet(name, desc string, passphrase [
account := &Account{ account := &Account{
Wallet: wlt, Wallet: wlt,
name: name, name: name,
dirty: true,
} }
account.UtxoStore.dirty = true account.ScheduleWalletWrite()
account.TxStore.dirty = true account.ScheduleTxStoreWrite()
account.ScheduleUtxoStoreWrite()
// Mark all active payment addresses as belonging to this account. // Mark all active payment addresses as belonging to this account.
for addr := range account.ActivePaymentAddresses() { for addr := range account.ActivePaymentAddresses() {
@ -227,8 +215,8 @@ func (store *AccountStore) CreateEncryptedWallet(name, desc string, passphrase [
// TODO(jrick): this should *only* happen if btcd is connected. // TODO(jrick): this should *only* happen if btcd is connected.
account.Track() account.Track()
// Write new wallet to disk. // Ensure that the account is written out to disk.
if err := account.writeDirtyToDisk(); err != nil { if err := account.WriteScheduledToDisk(); err != nil {
return err return err
} }
@ -237,15 +225,30 @@ func (store *AccountStore) CreateEncryptedWallet(name, desc string, passphrase [
// ChangePassphrase unlocks all account wallets with the old // ChangePassphrase unlocks all account wallets with the old
// passphrase, and re-encrypts each using the new passphrase. // passphrase, and re-encrypts each using the new passphrase.
//
// TODO(jrick): this is a perfect example of how awful the account
// locking is. It must be replaced.
func (store *AccountStore) ChangePassphrase(old, new []byte) error { func (store *AccountStore) ChangePassphrase(old, new []byte) error {
store.RLock() // Due to the undefined order of ranging over the accountstore
defer store.RUnlock() // map and how all account wallet writer locks are grabbed
// simultaneously and unlocked with a defer, this function is
// unsafe to call simulateously with other accountstore functions,
// even though the store itself is not modified.
store.Lock()
defer store.Unlock()
if err := store.changePassphrase(old, new); err != nil {
return err
}
// Immediately write out to disk.
return store.WriteAllToDisk()
}
// changePassphrase changes all passphrases for all accounts without grabbing
// any accountstore locks.
func (store *AccountStore) changePassphrase(old, new []byte) error {
// Check that each account can be unlocked with the old passphrase. // Check that each account can be unlocked with the old passphrase.
// Each's account's wallet mutex is unlocked with a defer so they
// will be held for the duration of this function. This prevents
// a wallet from being locked after some timeout after a RPC call
// to walletpassphrase.
for _, a := range store.accounts { for _, a := range store.accounts {
a.mtx.Lock() a.mtx.Lock()
defer a.mtx.Unlock() defer a.mtx.Unlock()
@ -267,38 +270,6 @@ func (store *AccountStore) ChangePassphrase(old, new []byte) error {
if err := a.Wallet.ChangePassphrase(new); err != nil { if err := a.Wallet.ChangePassphrase(new); err != nil {
return err return err
} }
a.dirty = true
}
// Immediately write out to disk. Create a new temporary network
// directory to write to, write all account files there, then move
// to the real network directory. This provides an safe
// replacement of all account files and ensures that all wallets
// are using either the old or new passphrase, but never two wallets
// with different passphrases.
netDir := networkDir(cfg.Net())
tmpNetDir := tmpNetworkDir(cfg.Net())
for _, a := range store.accounts {
// Writer locks must be held for the tx and utxo stores as well,
// to unset the dirty flag.
a.UtxoStore.Lock()
defer a.UtxoStore.Unlock()
a.TxStore.Lock()
defer a.TxStore.Unlock()
if err := a.writeAllToFreshDir(tmpNetDir); err != nil {
return err
}
}
// This is technically NOT an atomic operation, but at startup, if the
// network directory is missing but the temporary network directory
// exists, the temporary is moved before accounts are opened.
if err := os.RemoveAll(netDir); err != nil {
return err
}
if err := Rename(tmpNetDir, netDir); err != nil {
return err
} }
return nil return nil

View file

@ -368,8 +368,8 @@ func NtfnProcessedTx(n btcjson.Cmd, marshaled []byte) {
// Record the tx history. // Record the tx history.
a.TxStore.Lock() a.TxStore.Lock()
a.TxStore.s.InsertRecvTx(t) a.TxStore.s.InsertRecvTx(t)
a.MarkDirtyTxStore()
a.TxStore.Unlock() a.TxStore.Unlock()
a.ScheduleTxStoreWrite()
// Notify frontends of tx. If the tx is unconfirmed, it is always // Notify frontends of tx. If the tx is unconfirmed, it is always
// notified and the outpoint is marked as notified. If the outpoint // notified and the outpoint is marked as notified. If the outpoint
@ -404,8 +404,8 @@ func NtfnProcessedTx(n btcjson.Cmd, marshaled []byte) {
copy(u.BlockHash[:], blockHash[:]) copy(u.BlockHash[:], blockHash[:])
a.UtxoStore.Lock() a.UtxoStore.Lock()
a.UtxoStore.s.Insert(u) a.UtxoStore.s.Insert(u)
a.MarkDirtyUtxoStore()
a.UtxoStore.Unlock() a.UtxoStore.Unlock()
a.ScheduleUtxoStoreWrite()
// If this notification came from mempool, notify frontends of // If this notification came from mempool, notify frontends of
// the new unconfirmed balance immediately. Otherwise, wait until // the new unconfirmed balance immediately. Otherwise, wait until

2
cmd.go
View file

@ -160,7 +160,7 @@ func main() {
} }
// Start account disk syncer goroutine. // Start account disk syncer goroutine.
go DirtyAccountSyncer() go AccountDiskSyncer()
go func() { go func() {
s, err := newServer(cfg.SvrListeners) s, err := newServer(cfg.SvrListeners)

View file

@ -278,7 +278,7 @@ func ExportWatchingWallet(icmd btcjson.Cmd) (interface{}, *btcjson.Error) {
} }
// Create export directory, write files there. // Create export directory, write files there.
if err = wa.WriteExport("watchingwallet"); err != nil { if err = wa.ExportToDirectory("watchingwallet"); err != nil {
e := btcjson.Error{ e := btcjson.Error{
Code: btcjson.ErrWallet.Code, Code: btcjson.ErrWallet.Code,
Message: err.Error(), Message: err.Error(),
@ -942,12 +942,16 @@ func SendFrom(icmd btcjson.Cmd) (interface{}, *btcjson.Error) {
// wait until all send history has been written. // wait until all send history has been written.
SendTxHistSyncChans.add <- createdTx.txid SendTxHistSyncChans.add <- createdTx.txid
// If a change address was added, mark wallet as dirty, sync to disk, // If a change address was added, sync wallet to disk and request
// and request updates for change address. // transaction notifications to the change address.
if createdTx.changeAddr != nil { if createdTx.changeAddr != nil {
a.dirty = true a.ScheduleWalletWrite()
if err := a.writeDirtyToDisk(); err != nil { if err := a.WriteScheduledToDisk(); err != nil {
log.Errorf("cannot write dirty wallet: %v", err) e := btcjson.Error{
Code: btcjson.ErrWallet.Code,
Message: "Cannot write account: " + err.Error(),
}
return nil, &e
} }
a.ReqNewTxsForAddress(createdTx.changeAddr) a.ReqNewTxsForAddress(createdTx.changeAddr)
} }
@ -1021,12 +1025,16 @@ func SendMany(icmd btcjson.Cmd) (interface{}, *btcjson.Error) {
// wait until all send history has been written. // wait until all send history has been written.
SendTxHistSyncChans.add <- createdTx.txid SendTxHistSyncChans.add <- createdTx.txid
// If a change address was added, mark wallet as dirty, sync to disk, // If a change address was added, sync wallet to disk and request
// and request updates for change address. // transaction notifications to the change address.
if createdTx.changeAddr != nil { if createdTx.changeAddr != nil {
a.dirty = true a.ScheduleWalletWrite()
if err := a.writeDirtyToDisk(); err != nil { if err := a.WriteScheduledToDisk(); err != nil {
log.Errorf("cannot write dirty wallet: %v", err) e := btcjson.Error{
Code: btcjson.ErrWallet.Code,
Message: "Cannot write account: " + err.Error(),
}
return nil, &e
} }
a.ReqNewTxsForAddress(createdTx.changeAddr) a.ReqNewTxsForAddress(createdTx.changeAddr)
} }
@ -1122,8 +1130,8 @@ func handleSendRawTxReply(icmd btcjson.Cmd, txIDStr string, a *Account, txInfo *
} }
a.TxStore.Lock() a.TxStore.Lock()
a.TxStore.s = append(a.TxStore.s, sendtx) a.TxStore.s = append(a.TxStore.s, sendtx)
a.TxStore.dirty = true
a.TxStore.Unlock() a.TxStore.Unlock()
a.ScheduleTxStoreWrite()
// Notify frontends of new SendTx. // Notify frontends of new SendTx.
bs, err := GetCurBlock() bs, err := GetCurBlock()
@ -1140,12 +1148,14 @@ func handleSendRawTxReply(icmd btcjson.Cmd, txIDStr string, a *Account, txInfo *
// Remove previous unspent outputs now spent by the tx. // Remove previous unspent outputs now spent by the tx.
a.UtxoStore.Lock() a.UtxoStore.Lock()
modified := a.UtxoStore.s.Remove(txInfo.inputs) modified := a.UtxoStore.s.Remove(txInfo.inputs)
a.UtxoStore.dirty = a.UtxoStore.dirty || modified
a.UtxoStore.Unlock() a.UtxoStore.Unlock()
if modified {
a.ScheduleUtxoStoreWrite()
}
// Disk sync tx and utxo stores. // Disk sync tx and utxo stores.
if err := a.writeDirtyToDisk(); err != nil { if err := a.WriteScheduledToDisk(); err != nil {
log.Errorf("cannot sync dirty wallet: %v", err) log.Errorf("cannot write account: %v", err)
} }
// Notify all frontends of account's new unconfirmed and // Notify all frontends of account's new unconfirmed and

View file

@ -22,20 +22,9 @@ import (
"io/ioutil" "io/ioutil"
"os" "os"
"path/filepath" "path/filepath"
"sync"
"time" "time"
) )
var (
// dirtyAccounts holds a set of accounts that include dirty components.
dirtyAccounts = struct {
sync.Mutex
m map[*Account]bool
}{
m: make(map[*Account]bool),
}
)
// networkDir returns the directory name of a network directory to hold account // networkDir returns the directory name of a network directory to hold account
// files. // files.
func networkDir(net btcwire.BitcoinNet) string { func networkDir(net btcwire.BitcoinNet) string {
@ -53,6 +42,33 @@ func tmpNetworkDir(net btcwire.BitcoinNet) string {
return networkDir(net) + "_tmp" return networkDir(net) + "_tmp"
} }
// freshDir creates a new directory specified by path if it does not
// exist. If the directory already exists, all files contained in the
// directory are removed.
func freshDir(path string) error {
if err := checkCreateDir(path); err != nil {
return err
}
// Remove all files in the directory.
fd, err := os.Open(path)
if err != nil {
return err
}
defer fd.Close()
names, err := fd.Readdirnames(0)
if err != nil {
return err
}
for _, name := range names {
if err := os.RemoveAll(name); err != nil {
return err
}
}
return nil
}
// checkCreateDir checks that the path exists and is a directory. // checkCreateDir checks that the path exists and is a directory.
// If path does not exist, it is created. // If path does not exist, it is created.
func checkCreateDir(path string) error { func checkCreateDir(path string) error {
@ -87,259 +103,320 @@ func accountFilename(suffix, account, netdir string) string {
return filepath.Join(netdir, fmt.Sprintf("%v-%v", account, suffix)) return filepath.Join(netdir, fmt.Sprintf("%v-%v", account, suffix))
} }
// DirtyAccountSyncer synces dirty accounts for cases where the updated // syncSchedule references the account files which have been
// information was not required to be immediately written to disk. Accounts // scheduled to be written and the directory to write to.
// may be added to dirtyAccounts and will be checked and processed every 10 type syncSchedule struct {
// seconds by this function. dir string
wallets map[*Account]struct{}
txs map[*Account]struct{}
utxos map[*Account]struct{}
}
func newSyncSchedule(dir string) *syncSchedule {
s := &syncSchedule{
dir: dir,
wallets: make(map[*Account]struct{}),
txs: make(map[*Account]struct{}),
utxos: make(map[*Account]struct{}),
}
return s
}
// FlushAccount writes all scheduled account files to disk for
// a single account and removes them from the schedule.
func (s *syncSchedule) FlushAccount(a *Account) error {
if _, ok := s.utxos[a]; ok {
if err := a.writeUtxoStore(s.dir); err != nil {
return err
}
delete(s.utxos, a)
}
if _, ok := s.txs[a]; ok {
if err := a.writeTxStore(s.dir); err != nil {
return err
}
delete(s.txs, a)
}
if _, ok := s.wallets[a]; ok {
if err := a.writeWallet(s.dir); err != nil {
return err
}
delete(s.wallets, a)
}
return nil
}
// Flush writes all scheduled account files and removes each
// from the schedule.
func (s *syncSchedule) Flush() error {
for a := range s.utxos {
if err := a.writeUtxoStore(s.dir); err != nil {
return err
}
delete(s.utxos, a)
}
for a := range s.txs {
if err := a.writeTxStore(s.dir); err != nil {
return err
}
delete(s.txs, a)
}
for a := range s.wallets {
if err := a.writeWallet(s.dir); err != nil {
return err
}
delete(s.wallets, a)
}
return nil
}
// Channels for AccountDiskSyncer.
var (
scheduleWalletWrite = make(chan *Account)
scheduleTxStoreWrite = make(chan *Account)
scheduleUtxoStoreWrite = make(chan *Account)
syncBatch = make(chan *syncBatchRequest)
syncAccount = make(chan *syncRequest)
exportAccount = make(chan *exportRequest)
)
type syncRequest struct {
a *Account
err chan error
}
type syncBatchRequest struct {
a []*Account
err chan error
}
type exportRequest struct {
dir string
a *Account
err chan error
}
// AccountDiskSyncer manages a set of "dirty" account files which must
// be written to disk, and synchronizes all writes in a single goroutine.
// After 10 seconds since the latest sync, all unwritten files are written
// and removed. Writes for a single account may be scheduled immediately by
// calling WriteScheduledToDisk.
// //
// This never returns and is meant to be called from a goroutine. // This never returns and is meant to be called from a goroutine.
func DirtyAccountSyncer() { func AccountDiskSyncer() {
netdir := networkDir(cfg.Net())
if err := checkCreateDir(netdir); err != nil {
log.Errorf("Unable to create or write to account directory: %v", err)
}
tmpnetdir := tmpNetworkDir(cfg.Net())
schedule := newSyncSchedule(netdir)
ticker := time.Tick(10 * time.Second) ticker := time.Tick(10 * time.Second)
for { for {
select { select {
case <-ticker: case a := <-scheduleWalletWrite:
dirtyAccounts.Lock() schedule.wallets[a] = struct{}{}
for a := range dirtyAccounts.m {
log.Debugf("Syncing account '%v' to disk", case a := <-scheduleTxStoreWrite:
a.Wallet.Name()) schedule.txs[a] = struct{}{}
if err := a.writeDirtyToDisk(); err != nil {
log.Errorf("cannot sync dirty wallet: %v", case a := <-scheduleUtxoStoreWrite:
err) schedule.utxos[a] = struct{}{}
} else {
delete(dirtyAccounts.m, a) case sr := <-syncAccount:
} sr.err <- schedule.FlushAccount(sr.a)
case sr := <-syncBatch:
err := batchWriteAccounts(sr.a, tmpnetdir, netdir)
if err == nil {
// All accounts have been synced, old schedule
// can be discarded.
schedule = newSyncSchedule(netdir)
}
sr.err <- err
case er := <-exportAccount:
a := er.a
dir := er.dir
er.err <- a.writeAll(dir)
case <-ticker:
if err := schedule.Flush(); err != nil {
log.Errorf("Cannot write account: %v", err)
} }
dirtyAccounts.Unlock()
} }
} }
} }
// freshDir creates a new directory specified by path if it does not // WriteAllToDisk writes all account files for all accounts at once. Unlike
// exist. If the directory already exists, all files contained in the // writing individual account files, this causes each account file to be
// directory are removed. // written to a new network directory to replace the old one. Use this
func freshDir(path string) error { // function when it is needed to ensure an all or nothing write for all
if err := checkCreateDir(path); err != nil { // account files.
return err
}
// Remove all files in the directory.
fd, err := os.Open(path)
if err != nil {
return err
}
defer fd.Close()
names, err := fd.Readdirnames(0)
if err != nil {
return err
}
for _, name := range names {
if err := os.RemoveAll(name); err != nil {
return err
}
}
return nil
}
// writeAllToFreshDir writes all account files to the specified directory.
// If dir already exists, any old files are removed. If dir does not
// exist, it is created.
// //
// It is a runtime error to call this function while not holding each // It is a runtime error to call this without holding the store writer lock.
// wallet, tx store, and utxo store writer lock. func (store *AccountStore) WriteAllToDisk() error {
func (a *Account) writeAllToFreshDir(dir string) error { accts := make([]*Account, 0, len(store.accounts))
if err := freshDir(dir); err != nil { for _, a := range store.accounts {
return err accts = append(accts, a)
} }
err := make(chan error, 1)
syncBatch <- &syncBatchRequest{
a: accts,
err: err,
}
return <-err
}
func batchWriteAccounts(accts []*Account, tmpdir, netdir string) error {
if err := freshDir(tmpdir); err != nil {
return err
}
for _, a := range accts {
if err := a.writeAll(tmpdir); err != nil {
return err
}
}
// This is technically NOT an atomic operation, but at startup, if the
// network directory is missing but the temporary network directory
// exists, the temporary is moved before accounts are opened.
if err := os.RemoveAll(netdir); err != nil {
return err
}
if err := Rename(tmpdir, netdir); err != nil {
return err
}
return nil
}
// WriteScheduledToDisk signals AccountDiskSyncer to write all scheduled
// account files for a to disk now instead of waiting for the next sync
// interval. This function blocks until all the file writes for a have
// finished, and returns a non-nil error if any of the file writes failed.
func (a *Account) WriteScheduledToDisk() error {
err := make(chan error, 1)
syncAccount <- &syncRequest{
a: a,
err: err,
}
return <-err
}
// ScheduleWalletWrite schedules a write of an account's wallet file.
func (a *Account) ScheduleWalletWrite() {
scheduleWalletWrite <- a
}
// ScheduleTxStoreWrite schedules a write of an account's tx store file.
func (a *Account) ScheduleTxStoreWrite() {
scheduleTxStoreWrite <- a
}
// ScheduleUtxoStoreWrite schedules a write of an account's utxo store file.
func (a *Account) ScheduleUtxoStoreWrite() {
scheduleUtxoStoreWrite <- a
}
// ExportToDirectory writes an account to a special export directory. Any
// previous files are overwritten.
func (a *Account) ExportToDirectory(dirBaseName string) error {
dir := filepath.Join(networkDir(cfg.Net()), dirBaseName)
if err := checkCreateDir(dir); err != nil {
return err
}
fmt.Println("exporting to %v", dir)
err := make(chan error)
er := &exportRequest{
dir: dir,
a: a,
err: err,
}
exportAccount <- er
return <-err
}
func (a *Account) writeAll(dir string) error {
if err := a.writeUtxoStore(dir); err != nil {
return err
}
if err := a.writeTxStore(dir); err != nil {
return err
}
if err := a.writeWallet(dir); err != nil {
return err
}
return nil
}
func (a *Account) writeWallet(dir string) error {
wfilepath := accountFilename("wallet.bin", a.name, dir) wfilepath := accountFilename("wallet.bin", a.name, dir)
_, filename := filepath.Split(wfilepath)
tmpfile, err := ioutil.TempFile(dir, filename)
if err != nil {
return err
}
defer tmpfile.Close()
a.mtx.RLock()
_, err = a.Wallet.WriteTo(tmpfile)
a.mtx.RUnlock()
if err != nil {
return err
}
if err = Rename(tmpfile.Name(), wfilepath); err != nil {
return err
}
return nil
}
func (a *Account) writeTxStore(dir string) error {
txfilepath := accountFilename("tx.bin", a.name, dir) txfilepath := accountFilename("tx.bin", a.name, dir)
_, filename := filepath.Split(txfilepath)
tmpfile, err := ioutil.TempFile(dir, filename)
if err != nil {
return err
}
defer tmpfile.Close()
a.TxStore.RLock()
_, err = a.TxStore.s.WriteTo(tmpfile)
a.TxStore.RUnlock()
if err != nil {
return err
}
if err = Rename(tmpfile.Name(), txfilepath); err != nil {
return err
}
return nil
}
func (a *Account) writeUtxoStore(dir string) error {
utxofilepath := accountFilename("utxo.bin", a.name, dir) utxofilepath := accountFilename("utxo.bin", a.name, dir)
_, filename := filepath.Split(utxofilepath)
wfile, err := os.Create(wfilepath) tmpfile, err := ioutil.TempFile(dir, filename)
if err != nil { if err != nil {
return err return err
} }
defer wfile.Close() defer tmpfile.Close()
txfile, err := os.Create(txfilepath)
if err != nil {
return err
}
defer txfile.Close()
utxofile, err := os.Create(utxofilepath)
if err != nil {
return err
}
defer utxofile.Close()
if _, err := a.Wallet.WriteTo(wfile); err != nil {
return err
}
a.dirty = false
if _, err := a.TxStore.s.WriteTo(txfile); err != nil {
return err
}
a.TxStore.dirty = false
if _, err := a.UtxoStore.s.WriteTo(utxofile); err != nil {
return err
}
a.UtxoStore.dirty = false
return nil
}
// writeDirtyToDisk checks for the dirty flag on an account's wallet,
// txstore, and utxostore, writing them to disk if any are dirty.
func (a *Account) writeDirtyToDisk() error {
netdir := networkDir(cfg.Net())
if err := checkCreateDir(netdir); err != nil {
return err
}
wfilepath := accountFilename("wallet.bin", a.name, netdir)
txfilepath := accountFilename("tx.bin", a.name, netdir)
utxofilepath := accountFilename("utxo.bin", a.name, netdir)
// UTXOs and transactions are synced to disk first. This prevents
// any races from saving a wallet marked to be synced with block N
// and btcwallet closing while the UTXO and Tx files are only synced
// with block N-1.
// UTXOs
a.UtxoStore.RLock() a.UtxoStore.RLock()
dirty := a.UtxoStore.dirty _, err = a.UtxoStore.s.WriteTo(tmpfile)
a.UtxoStore.RUnlock()
if dirty {
netdir, filename := filepath.Split(utxofilepath)
tmpfile, err := ioutil.TempFile(netdir, filename)
if err != nil {
return err
}
defer tmpfile.Close()
a.UtxoStore.RLock()
_, err = a.UtxoStore.s.WriteTo(tmpfile)
a.UtxoStore.RUnlock()
if err != nil {
return err
}
if err = Rename(tmpfile.Name(), utxofilepath); err != nil {
return err
}
a.UtxoStore.Lock()
a.UtxoStore.dirty = false
a.UtxoStore.Unlock()
}
// Transactions
a.TxStore.RLock()
dirty = a.TxStore.dirty
a.TxStore.RUnlock()
if dirty {
netdir, filename := filepath.Split(txfilepath)
tmpfile, err := ioutil.TempFile(netdir, filename)
if err != nil {
return err
}
defer tmpfile.Close()
a.TxStore.RLock()
_, err = a.TxStore.s.WriteTo(tmpfile)
a.TxStore.RUnlock()
if err != nil {
return err
}
if err = Rename(tmpfile.Name(), txfilepath); err != nil {
return err
}
a.TxStore.Lock()
a.TxStore.dirty = false
a.TxStore.Unlock()
}
// Wallet
a.mtx.RLock()
dirty = a.dirty
a.mtx.RUnlock()
if dirty {
netdir, filename := filepath.Split(wfilepath)
tmpfile, err := ioutil.TempFile(netdir, filename)
if err != nil {
return err
}
defer tmpfile.Close()
a.mtx.RLock()
_, err = a.Wallet.WriteTo(tmpfile)
a.mtx.RUnlock()
if err != nil {
return err
}
if err = Rename(tmpfile.Name(), wfilepath); err != nil {
return err
}
a.mtx.Lock()
a.dirty = false
a.mtx.Unlock()
}
return nil
}
// WriteExport writes an account to a special export directory named
// by dirName. Any previous files are overwritten.
func (a *Account) WriteExport(dirName string) error {
exportPath := filepath.Join(networkDir(cfg.Net()), dirName)
if err := checkCreateDir(exportPath); err != nil {
return err
}
aname := a.Name()
wfilepath := accountFilename("wallet.bin", aname, exportPath)
txfilepath := accountFilename("tx.bin", aname, exportPath)
utxofilepath := accountFilename("utxo.bin", aname, exportPath)
utxofile, err := os.Create(utxofilepath)
if err != nil {
return err
}
defer utxofile.Close()
a.UtxoStore.RLock()
_, err = a.UtxoStore.s.WriteTo(utxofile)
a.UtxoStore.RUnlock() a.UtxoStore.RUnlock()
if err != nil { if err != nil {
return err return err
} }
txfile, err := os.Create(txfilepath) if err = Rename(tmpfile.Name(), utxofilepath); err != nil {
if err != nil {
return err
}
defer txfile.Close()
a.TxStore.RLock()
_, err = a.TxStore.s.WriteTo(txfile)
a.TxStore.RUnlock()
if err != nil {
return err
}
wfile, err := os.Create(wfilepath)
if err != nil {
return err
}
defer wfile.Close()
a.mtx.RLock()
_, err = a.Wallet.WriteTo(wfile)
a.mtx.RUnlock()
if err != nil {
return err return err
} }