lbcwallet/disksync.go
Josh Rickmar 114bb581f7 Fix hang related to account file writes.
The disk syncer now maintains its own countdown timer, creating a new
timer only when necessary (when there is no timer running, and
something is scheduled to be written).  When the timer expires, the
select loop begins selecting on a grab of the account manager's binary
semaphore, and if read, performs the sync and nils the select channel
to prevent a future grab until a new timer has expired.

Tested with a race-enabled build on Windows.  No lockups or races
related to the disk syncing experienced with constant client requests
and incoming btcd notifications, and scheduled writes run as expected
once the countdown timer expires, locking out all server request and
notifiation handling.
2014-02-05 12:47:33 -05:00

450 lines
11 KiB
Go

/*
* Copyright (c) 2013, 2014 Conformal Systems LLC <info@conformal.com>
*
* Permission to use, copy, modify, and distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
* copyright notice and this permission notice appear in all copies.
*
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
* ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
* ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
package main
import (
"fmt"
"github.com/conformal/btcwire"
"io/ioutil"
"os"
"path/filepath"
"time"
)
// networkDir returns the directory name of a network directory to hold account
// files.
func networkDir(net btcwire.BitcoinNet) string {
var netname string
if net == btcwire.MainNet {
netname = "mainnet"
} else {
netname = "testnet"
}
return filepath.Join(cfg.DataDir, netname)
}
// tmpNetworkDir returns the temporary directory name for a given network.
func tmpNetworkDir(net btcwire.BitcoinNet) string {
return networkDir(net) + "_tmp"
}
// freshDir creates a new directory specified by path if it does not
// exist. If the directory already exists, all files contained in the
// directory are removed.
func freshDir(path string) error {
if err := checkCreateDir(path); err != nil {
return err
}
// Remove all files in the directory.
fd, err := os.Open(path)
if err != nil {
return err
}
defer fd.Close()
names, err := fd.Readdirnames(0)
if err != nil {
return err
}
for _, name := range names {
if err := os.RemoveAll(name); err != nil {
return err
}
}
return nil
}
// checkCreateDir checks that the path exists and is a directory.
// If path does not exist, it is created.
func checkCreateDir(path string) error {
if fi, err := os.Stat(path); err != nil {
if os.IsNotExist(err) {
// Attempt data directory creation
if err = os.MkdirAll(path, 0700); err != nil {
return fmt.Errorf("cannot create directory: %s", err)
}
} else {
return fmt.Errorf("error checking directory: %s", err)
}
} else {
if !fi.IsDir() {
return fmt.Errorf("path '%s' is not a directory", path)
}
}
return nil
}
// accountFilename returns the filepath of an account file given the
// filename suffix ("wallet.bin", "tx.bin", or "utxo.bin"), account
// name and the network directory holding the file.
func accountFilename(suffix, account, netdir string) string {
if account == "" {
// default account
return filepath.Join(netdir, suffix)
}
// non-default account
return filepath.Join(netdir, fmt.Sprintf("%v-%v", account, suffix))
}
// syncSchedule references the account files which have been
// scheduled to be written and the directory to write to.
type syncSchedule struct {
dir string
wallets map[*Account]struct{}
txs map[*Account]struct{}
utxos map[*Account]struct{}
}
func newSyncSchedule(dir string) *syncSchedule {
s := &syncSchedule{
dir: dir,
wallets: make(map[*Account]struct{}),
txs: make(map[*Account]struct{}),
utxos: make(map[*Account]struct{}),
}
return s
}
// flushAccount writes all scheduled account files to disk for
// a single account and removes them from the schedule.
func (s *syncSchedule) flushAccount(a *Account) error {
if _, ok := s.utxos[a]; ok {
if err := a.writeUtxoStore(s.dir); err != nil {
return err
}
delete(s.utxos, a)
}
if _, ok := s.txs[a]; ok {
if err := a.writeTxStore(s.dir); err != nil {
return err
}
delete(s.txs, a)
}
if _, ok := s.wallets[a]; ok {
if err := a.writeWallet(s.dir); err != nil {
return err
}
delete(s.wallets, a)
}
return nil
}
// flush writes all scheduled account files and removes each
// from the schedule.
func (s *syncSchedule) flush() error {
for a := range s.utxos {
if err := a.writeUtxoStore(s.dir); err != nil {
return err
}
delete(s.utxos, a)
}
for a := range s.txs {
if err := a.writeTxStore(s.dir); err != nil {
return err
}
delete(s.txs, a)
}
for a := range s.wallets {
if err := a.writeWallet(s.dir); err != nil {
return err
}
delete(s.wallets, a)
}
return nil
}
type flushAccountRequest struct {
a *Account
err chan error
}
type writeBatchRequest struct {
a []*Account
err chan error
}
type exportRequest struct {
dir string
a *Account
err chan error
}
// DiskSyncer manages all disk write operations for a collection of accounts.
type DiskSyncer struct {
// Flush scheduled account writes.
flushAccount chan *flushAccountRequest
// Schedule file writes for an account.
scheduleWallet chan *Account
scheduleTxStore chan *Account
scheduleUtxoStore chan *Account
// Write a collection of accounts all at once.
writeBatch chan *writeBatchRequest
// Write an account export.
exportAccount chan *exportRequest
// Account manager for this DiskSyncer. This is only
// needed to grab the account manager semaphore.
am *AccountManager
}
// NewDiskSyncer creates a new DiskSyncer.
func NewDiskSyncer(am *AccountManager) *DiskSyncer {
return &DiskSyncer{
flushAccount: make(chan *flushAccountRequest),
scheduleWallet: make(chan *Account),
scheduleTxStore: make(chan *Account),
scheduleUtxoStore: make(chan *Account),
writeBatch: make(chan *writeBatchRequest),
exportAccount: make(chan *exportRequest),
am: am,
}
}
// Start starts the disk syncer. It manages a set of "dirty" account files
// which must be written to disk, and synchronizes all writes in a single
// goroutine. Periodic flush operations may be signaled by an AccountManager.
//
// This never returns and is should be called from a new goroutine.
func (ds *DiskSyncer) Start() {
netdir := networkDir(cfg.Net())
if err := checkCreateDir(netdir); err != nil {
log.Errorf("Unable to create or write to account directory: %v", err)
}
tmpnetdir := tmpNetworkDir(cfg.Net())
const wait = 10 * time.Second
var timer <-chan time.Time
var sem chan struct{}
schedule := newSyncSchedule(netdir)
for {
select {
case <-sem: // Now have exclusive access of the account manager
err := schedule.flush()
if err != nil {
log.Errorf("Cannot write accounts: %v", err)
}
timer = nil
// Account manager passed ownership of the semaphore;
// Do not grab semaphore again until another flush is needed.
sem = nil
// Release semaphore.
ds.am.bsem <- struct{}{}
case <-timer:
// Grab AccountManager semaphore when ready so flush can occur.
sem = ds.am.bsem
case fr := <-ds.flushAccount:
fr.err <- schedule.flushAccount(fr.a)
case a := <-ds.scheduleWallet:
schedule.wallets[a] = struct{}{}
if timer == nil {
timer = time.After(wait)
}
case a := <-ds.scheduleTxStore:
schedule.txs[a] = struct{}{}
if timer == nil {
timer = time.After(wait)
}
case a := <-ds.scheduleUtxoStore:
schedule.utxos[a] = struct{}{}
if timer == nil {
timer = time.After(wait)
}
case sr := <-ds.writeBatch:
err := batchWriteAccounts(sr.a, tmpnetdir, netdir)
if err == nil {
// All accounts have been synced, old schedule
// can be discarded.
schedule = newSyncSchedule(netdir)
timer = nil
}
sr.err <- err
case er := <-ds.exportAccount:
a := er.a
dir := er.dir
er.err <- a.writeAll(dir)
}
}
}
// FlushAccount writes all scheduled account files to disk for a single
// account.
func (ds *DiskSyncer) FlushAccount(a *Account) error {
err := make(chan error)
ds.flushAccount <- &flushAccountRequest{a: a, err: err}
return <-err
}
// ScheduleWalletWrite schedules an account's wallet to be written to disk.
func (ds *DiskSyncer) ScheduleWalletWrite(a *Account) {
ds.scheduleWallet <- a
}
// ScheduleTxStoreWrite schedules an account's transaction store to be
// written to disk.
func (ds *DiskSyncer) ScheduleTxStoreWrite(a *Account) {
ds.scheduleTxStore <- a
}
// ScheduleUtxoStoreWrite schedules an account's utxo store to be written
// to disk.
func (ds *DiskSyncer) ScheduleUtxoStoreWrite(a *Account) {
ds.scheduleUtxoStore <- a
}
// WriteBatch safely replaces all account files in the network directory
// with new files created from all accounts in a.
func (ds *DiskSyncer) WriteBatch(a []*Account) error {
err := make(chan error)
ds.writeBatch <- &writeBatchRequest{
a: a,
err: err,
}
return <-err
}
// ExportAccount writes all account files for a to a new directory.
func (ds *DiskSyncer) ExportAccount(a *Account, dir string) error {
err := make(chan error)
er := &exportRequest{
dir: dir,
a: a,
err: err,
}
ds.exportAccount <- er
return <-err
}
func batchWriteAccounts(accts []*Account, tmpdir, netdir string) error {
if err := freshDir(tmpdir); err != nil {
return err
}
for _, a := range accts {
if err := a.writeAll(tmpdir); err != nil {
return err
}
}
// This is technically NOT an atomic operation, but at startup, if the
// network directory is missing but the temporary network directory
// exists, the temporary is moved before accounts are opened.
if err := os.RemoveAll(netdir); err != nil {
return err
}
if err := Rename(tmpdir, netdir); err != nil {
return err
}
return nil
}
func (a *Account) writeAll(dir string) error {
if err := a.writeUtxoStore(dir); err != nil {
return err
}
if err := a.writeTxStore(dir); err != nil {
return err
}
if err := a.writeWallet(dir); err != nil {
return err
}
return nil
}
func (a *Account) writeWallet(dir string) error {
wfilepath := accountFilename("wallet.bin", a.name, dir)
_, filename := filepath.Split(wfilepath)
tmpfile, err := ioutil.TempFile(dir, filename)
if err != nil {
return err
}
if _, err = a.Wallet.WriteTo(tmpfile); err != nil {
return err
}
tmppath := tmpfile.Name()
tmpfile.Close()
if err = Rename(tmppath, wfilepath); err != nil {
return err
}
return nil
}
func (a *Account) writeTxStore(dir string) error {
txfilepath := accountFilename("tx.bin", a.name, dir)
_, filename := filepath.Split(txfilepath)
tmpfile, err := ioutil.TempFile(dir, filename)
if err != nil {
return err
}
if _, err = a.TxStore.WriteTo(tmpfile); err != nil {
return err
}
tmppath := tmpfile.Name()
tmpfile.Close()
if err = Rename(tmppath, txfilepath); err != nil {
return err
}
return nil
}
func (a *Account) writeUtxoStore(dir string) error {
utxofilepath := accountFilename("utxo.bin", a.name, dir)
_, filename := filepath.Split(utxofilepath)
tmpfile, err := ioutil.TempFile(dir, filename)
if err != nil {
return err
}
if _, err = a.UtxoStore.WriteTo(tmpfile); err != nil {
return err
}
tmppath := tmpfile.Name()
tmpfile.Close()
if err = Rename(tmppath, utxofilepath); err != nil {
return err
}
return nil
}