diff --git a/acctmgr.go b/acctmgr.go index e58dec7..84c72c5 100644 --- a/acctmgr.go +++ b/acctmgr.go @@ -25,7 +25,6 @@ import ( "github.com/conformal/btcwallet/tx" "github.com/conformal/btcwallet/wallet" "github.com/conformal/btcwire" - "time" ) // Errors relating to accounts. @@ -81,8 +80,6 @@ func (am *AccountManager) Start() { l := list.New() m := make(map[string]*Account) - wait := 10 * time.Second - timer := time.NewTimer(wait) for { select { case access := <-am.accessAccount: @@ -117,20 +114,18 @@ func (am *AccountManager) Start() { } } } - - case <-timer.C: - am.ds.FlushScheduled() - timer = time.NewTimer(wait) } } } -// Grab grabs the account manager's binary semaphore. +// Grab grabs the account manager's binary semaphore. A custom semaphore +// is used instead of a sync.Mutex so the account manager's disk syncer +// can grab the semaphore from a select statement. func (am *AccountManager) Grab() { <-am.bsem } -// Release releases the account manager's binary semaphore. +// Release releases exclusive ownership of the AccountManager. func (am *AccountManager) Release() { am.bsem <- struct{}{} } @@ -476,7 +471,7 @@ func (am *AccountManager) ListSinceBlock(since, curBlockHeight int32, minconf in // GetTransaction(). type accountTx struct { Account string - Tx tx.Tx + Tx tx.Tx } // GetTransaction returns an array of accountTx to fully represent the effect of diff --git a/disksync.go b/disksync.go index d643457..641ddda 100644 --- a/disksync.go +++ b/disksync.go @@ -22,6 +22,7 @@ import ( "io/ioutil" "os" "path/filepath" + "time" ) // networkDir returns the directory name of a network directory to hold account @@ -192,8 +193,7 @@ type exportRequest struct { // DiskSyncer manages all disk write operations for a collection of accounts. type DiskSyncer struct { // Flush scheduled account writes. - flushScheduled chan struct{} - flushAccount chan *flushAccountRequest + flushAccount chan *flushAccountRequest // Schedule file writes for an account. scheduleWallet chan *Account @@ -214,7 +214,6 @@ type DiskSyncer struct { // NewDiskSyncer creates a new DiskSyncer. func NewDiskSyncer(am *AccountManager) *DiskSyncer { return &DiskSyncer{ - flushScheduled: make(chan struct{}, 1), flushAccount: make(chan *flushAccountRequest), scheduleWallet: make(chan *Account), scheduleTxStore: make(chan *Account), @@ -237,28 +236,52 @@ func (ds *DiskSyncer) Start() { } tmpnetdir := tmpNetworkDir(cfg.Net()) + const wait = 10 * time.Second + var timer <-chan time.Time + + var sem chan struct{} schedule := newSyncSchedule(netdir) for { select { - case <-ds.flushScheduled: - ds.am.Grab() + case <-sem: // Now have exclusive access of the account manager err := schedule.flush() - ds.am.Release() if err != nil { log.Errorf("Cannot write accounts: %v", err) } + timer = nil + + // Account manager passed ownership of the semaphore; + // Do not grab semaphore again until another flush is needed. + sem = nil + + // Release semaphore. + ds.am.bsem <- struct{}{} + + case <-timer: + // Grab AccountManager semaphore when ready so flush can occur. + sem = ds.am.bsem + case fr := <-ds.flushAccount: fr.err <- schedule.flushAccount(fr.a) case a := <-ds.scheduleWallet: schedule.wallets[a] = struct{}{} + if timer == nil { + timer = time.After(wait) + } case a := <-ds.scheduleTxStore: schedule.txs[a] = struct{}{} + if timer == nil { + timer = time.After(wait) + } case a := <-ds.scheduleUtxoStore: schedule.utxos[a] = struct{}{} + if timer == nil { + timer = time.After(wait) + } case sr := <-ds.writeBatch: err := batchWriteAccounts(sr.a, tmpnetdir, netdir) @@ -266,6 +289,7 @@ func (ds *DiskSyncer) Start() { // All accounts have been synced, old schedule // can be discarded. schedule = newSyncSchedule(netdir) + timer = nil } sr.err <- err @@ -277,17 +301,6 @@ func (ds *DiskSyncer) Start() { } } -// FlushScheduled writes all scheduled account files to disk. -func (ds *DiskSyncer) FlushScheduled() { - // Schedule a flush if one is not already waiting. This channel - // is buffered so if a request is already waiting, a duplicate - // can be safely dropped. - select { - case ds.flushScheduled <- struct{}{}: - default: - } -} - // FlushAccount writes all scheduled account files to disk for a single // account. func (ds *DiskSyncer) FlushAccount(a *Account) error { diff --git a/rpcserver.go b/rpcserver.go index 0c4c0a5..bc3ad78 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -789,7 +789,7 @@ func GetTransaction(icmd btcjson.Cmd) (interface{}, *btcjson.Error) { return nil, &btcjson.ErrInternal } - accumulatedTxen := AcctMgr.GetTransaction(cmd.Txid) + accumulatedTxen := AcctMgr.GetTransaction(cmd.Txid) if len(accumulatedTxen) == 0 { return nil, &btcjson.ErrNoTxInfo } @@ -825,7 +825,7 @@ func GetTransaction(icmd btcjson.Cmd) (interface{}, *btcjson.Error) { // whether it is an orphan or in the blockchain. "category": "receive", "amount": t.Amount, - "address": hex.EncodeToString(t.ReceiverHash), + "address": hex.EncodeToString(t.ReceiverHash), }) } } @@ -1153,7 +1153,6 @@ func sendPairs(icmd btcjson.Cmd, account string, amounts map[string]int64, return handleSendRawTxReply(icmd, txid, a, createdTx) } - // SendFrom handles a sendfrom RPC request by creating a new transaction // spending unspent transaction outputs for a wallet to another payment // address. Leftover inputs not sent to the payment address or a fee for