Redo account locking and RPC request processing.

This change removes the three separate mutexes which used to lock an
account's wallet, tx store, and utxo store.  Accounts no longer
contain any locking mechanism and rely on go's other synchronization
constructs (goroutines and channels) for correct access.

All accounts are now managed as a collection through the new
AccountManager, rather than the old AccountStore.  AccountManager runs
as its own goroutine to provide access to accounts.

RPC requests are now queued for handling, being denied if the queue
buffer is exhausted.  Notifications are also queued (instead of being
sent from their own goroutine after being received, in which order is
undefined), however, notifications are never dropped and will
potentially grow a queue of infinite size if unhandled.
This commit is contained in:
Josh Rickmar 2014-01-30 10:14:02 -05:00
parent 20e8201125
commit 6a08c7de07
12 changed files with 1508 additions and 1433 deletions

View file

@ -22,7 +22,6 @@ import (
"io/ioutil"
"os"
"path/filepath"
"time"
)
// networkDir returns the directory name of a network directory to hold account
@ -122,9 +121,9 @@ func newSyncSchedule(dir string) *syncSchedule {
return s
}
// FlushAccount writes all scheduled account files to disk for
// flushAccount writes all scheduled account files to disk for
// a single account and removes them from the schedule.
func (s *syncSchedule) FlushAccount(a *Account) error {
func (s *syncSchedule) flushAccount(a *Account) error {
if _, ok := s.utxos[a]; ok {
if err := a.writeUtxoStore(s.dir); err != nil {
return err
@ -147,9 +146,9 @@ func (s *syncSchedule) FlushAccount(a *Account) error {
return nil
}
// Flush writes all scheduled account files and removes each
// flush writes all scheduled account files and removes each
// from the schedule.
func (s *syncSchedule) Flush() error {
func (s *syncSchedule) flush() error {
for a := range s.utxos {
if err := a.writeUtxoStore(s.dir); err != nil {
return err
@ -174,22 +173,16 @@ func (s *syncSchedule) Flush() error {
return nil
}
// Channels for AccountDiskSyncer.
var (
scheduleWalletWrite = make(chan *Account)
scheduleTxStoreWrite = make(chan *Account)
scheduleUtxoStoreWrite = make(chan *Account)
syncBatch = make(chan *syncBatchRequest)
syncAccount = make(chan *syncRequest)
exportAccount = make(chan *exportRequest)
)
type flushScheduledRequest struct {
err chan error
}
type syncRequest struct {
type flushAccountRequest struct {
a *Account
err chan error
}
type syncBatchRequest struct {
type writeBatchRequest struct {
a []*Account
err chan error
}
@ -200,14 +193,48 @@ type exportRequest struct {
err chan error
}
// AccountDiskSyncer manages a set of "dirty" account files which must
// be written to disk, and synchronizes all writes in a single goroutine.
// After 10 seconds since the latest sync, all unwritten files are written
// and removed. Writes for a single account may be scheduled immediately by
// calling WriteScheduledToDisk.
// DiskSyncer manages all disk write operations for a collection of accounts.
type DiskSyncer struct {
// Flush scheduled account writes.
flushScheduled chan *flushScheduledRequest
flushAccount chan *flushAccountRequest
// Schedule file writes for an account.
scheduleWallet chan *Account
scheduleTxStore chan *Account
scheduleUtxoStore chan *Account
// Write a collection of accounts all at once.
writeBatch chan *writeBatchRequest
// Write an account export.
exportAccount chan *exportRequest
// Account manager for this DiskSyncer. This is only
// needed to grab the account manager semaphore.
am *AccountManager
}
// NewDiskSyncer creates a new DiskSyncer.
func NewDiskSyncer(am *AccountManager) *DiskSyncer {
return &DiskSyncer{
flushScheduled: make(chan *flushScheduledRequest),
flushAccount: make(chan *flushAccountRequest),
scheduleWallet: make(chan *Account),
scheduleTxStore: make(chan *Account),
scheduleUtxoStore: make(chan *Account),
writeBatch: make(chan *writeBatchRequest),
exportAccount: make(chan *exportRequest),
am: am,
}
}
// Start starts the disk syncer. It manages a set of "dirty" account files
// which must be written to disk, and synchronizes all writes in a single
// goroutine. Periodic flush operations may be signaled by an AccountManager.
//
// This never returns and is meant to be called from a goroutine.
func AccountDiskSyncer() {
// This never returns and is should be called from a new goroutine.
func (ds *DiskSyncer) Start() {
netdir := networkDir(cfg.Net())
if err := checkCreateDir(netdir); err != nil {
log.Errorf("Unable to create or write to account directory: %v", err)
@ -215,22 +242,24 @@ func AccountDiskSyncer() {
tmpnetdir := tmpNetworkDir(cfg.Net())
schedule := newSyncSchedule(netdir)
ticker := time.Tick(10 * time.Second)
for {
select {
case a := <-scheduleWalletWrite:
case fr := <-ds.flushScheduled:
fr.err <- schedule.flush()
case fr := <-ds.flushAccount:
fr.err <- schedule.flushAccount(fr.a)
case a := <-ds.scheduleWallet:
schedule.wallets[a] = struct{}{}
case a := <-scheduleTxStoreWrite:
case a := <-ds.scheduleTxStore:
schedule.txs[a] = struct{}{}
case a := <-scheduleUtxoStoreWrite:
case a := <-ds.scheduleUtxoStore:
schedule.utxos[a] = struct{}{}
case sr := <-syncAccount:
sr.err <- schedule.FlushAccount(sr.a)
case sr := <-syncBatch:
case sr := <-ds.writeBatch:
err := batchWriteAccounts(sr.a, tmpnetdir, netdir)
if err == nil {
// All accounts have been synced, old schedule
@ -239,40 +268,71 @@ func AccountDiskSyncer() {
}
sr.err <- err
case er := <-exportAccount:
case er := <-ds.exportAccount:
a := er.a
dir := er.dir
er.err <- a.writeAll(dir)
case <-ticker:
if err := schedule.Flush(); err != nil {
log.Errorf("Cannot write account: %v", err)
}
}
}
}
// WriteAllToDisk writes all account files for all accounts at once. Unlike
// writing individual account files, this causes each account file to be
// written to a new network directory to replace the old one. Use this
// function when it is needed to ensure an all or nothing write for all
// account files.
//
// It is a runtime error to call this without holding the store writer lock.
func (store *AccountStore) WriteAllToDisk() error {
accts := make([]*Account, 0, len(store.accounts))
for _, a := range store.accounts {
accts = append(accts, a)
}
// FlushScheduled writes all scheduled account files to disk.
func (ds *DiskSyncer) FlushScheduled() error {
ds.am.Grab()
err := make(chan error)
ds.flushScheduled <- &flushScheduledRequest{err}
ds.am.Release()
return <-err
}
err := make(chan error, 1)
syncBatch <- &syncBatchRequest{
a: accts,
// FlushAccount writes all scheduled account files to disk for a single
// account.
func (ds *DiskSyncer) FlushAccount(a *Account) error {
err := make(chan error)
ds.flushAccount <- &flushAccountRequest{a: a, err: err}
return <-err
}
// ScheduleWalletWrite schedules an account's wallet to be written to disk.
func (ds *DiskSyncer) ScheduleWalletWrite(a *Account) {
ds.scheduleWallet <- a
}
// ScheduleTxStoreWrite schedules an account's transaction store to be
// written to disk.
func (ds *DiskSyncer) ScheduleTxStoreWrite(a *Account) {
ds.scheduleTxStore <- a
}
// ScheduleUtxoStoreWrite schedules an account's utxo store to be written
// to disk.
func (ds *DiskSyncer) ScheduleUtxoStoreWrite(a *Account) {
ds.scheduleUtxoStore <- a
}
// WriteBatch safely replaces all account files in the network directory
// with new files created from all accounts in a.
func (ds *DiskSyncer) WriteBatch(a []*Account) error {
err := make(chan error)
ds.writeBatch <- &writeBatchRequest{
a: a,
err: err,
}
return <-err
}
// ExportAccount writes all account files for a to a new directory.
func (ds *DiskSyncer) ExportAccount(a *Account, dir string) error {
err := make(chan error)
er := &exportRequest{
dir: dir,
a: a,
err: err,
}
ds.exportAccount <- er
return <-err
}
func batchWriteAccounts(accts []*Account, tmpdir, netdir string) error {
if err := freshDir(tmpdir); err != nil {
return err
@ -294,52 +354,6 @@ func batchWriteAccounts(accts []*Account, tmpdir, netdir string) error {
return nil
}
// WriteScheduledToDisk signals AccountDiskSyncer to write all scheduled
// account files for a to disk now instead of waiting for the next sync
// interval. This function blocks until all the file writes for a have
// finished, and returns a non-nil error if any of the file writes failed.
func (a *Account) WriteScheduledToDisk() error {
err := make(chan error, 1)
syncAccount <- &syncRequest{
a: a,
err: err,
}
return <-err
}
// ScheduleWalletWrite schedules a write of an account's wallet file.
func (a *Account) ScheduleWalletWrite() {
scheduleWalletWrite <- a
}
// ScheduleTxStoreWrite schedules a write of an account's tx store file.
func (a *Account) ScheduleTxStoreWrite() {
scheduleTxStoreWrite <- a
}
// ScheduleUtxoStoreWrite schedules a write of an account's utxo store file.
func (a *Account) ScheduleUtxoStoreWrite() {
scheduleUtxoStoreWrite <- a
}
// ExportToDirectory writes an account to a special export directory. Any
// previous files are overwritten.
func (a *Account) ExportToDirectory(dirBaseName string) error {
dir := filepath.Join(networkDir(cfg.Net()), dirBaseName)
if err := checkCreateDir(dir); err != nil {
return err
}
err := make(chan error)
er := &exportRequest{
dir: dir,
a: a,
err: err,
}
exportAccount <- er
return <-err
}
func (a *Account) writeAll(dir string) error {
if err := a.writeUtxoStore(dir); err != nil {
return err
@ -362,10 +376,7 @@ func (a *Account) writeWallet(dir string) error {
}
defer tmpfile.Close()
a.mtx.RLock()
_, err = a.Wallet.WriteTo(tmpfile)
a.mtx.RUnlock()
if err != nil {
if _, err = a.Wallet.WriteTo(tmpfile); err != nil {
return err
}
@ -385,10 +396,7 @@ func (a *Account) writeTxStore(dir string) error {
}
defer tmpfile.Close()
a.TxStore.RLock()
_, err = a.TxStore.s.WriteTo(tmpfile)
a.TxStore.RUnlock()
if err != nil {
if _, err = a.TxStore.WriteTo(tmpfile); err != nil {
return err
}
@ -408,10 +416,7 @@ func (a *Account) writeUtxoStore(dir string) error {
}
defer tmpfile.Close()
a.UtxoStore.RLock()
_, err = a.UtxoStore.s.WriteTo(tmpfile)
a.UtxoStore.RUnlock()
if err != nil {
if _, err = a.UtxoStore.WriteTo(tmpfile); err != nil {
return err
}