From 114bb581f711d42f57870fc3539f43ffa3b313b2 Mon Sep 17 00:00:00 2001 From: Josh Rickmar Date: Tue, 4 Feb 2014 15:48:20 -0500 Subject: [PATCH] Fix hang related to account file writes. The disk syncer now maintains its own countdown timer, creating a new timer only when necessary (when there is no timer running, and something is scheduled to be written). When the timer expires, the select loop begins selecting on a grab of the account manager's binary semaphore, and if read, performs the sync and nils the select channel to prevent a future grab until a new timer has expired. Tested with a race-enabled build on Windows. No lockups or races related to the disk syncing experienced with constant client requests and incoming btcd notifications, and scheduled writes run as expected once the countdown timer expires, locking out all server request and notifiation handling. --- acctmgr.go | 15 +++++---------- disksync.go | 47 ++++++++++++++++++++++++++++++----------------- rpcserver.go | 5 ++--- 3 files changed, 37 insertions(+), 30 deletions(-) diff --git a/acctmgr.go b/acctmgr.go index e58dec7..84c72c5 100644 --- a/acctmgr.go +++ b/acctmgr.go @@ -25,7 +25,6 @@ import ( "github.com/conformal/btcwallet/tx" "github.com/conformal/btcwallet/wallet" "github.com/conformal/btcwire" - "time" ) // Errors relating to accounts. @@ -81,8 +80,6 @@ func (am *AccountManager) Start() { l := list.New() m := make(map[string]*Account) - wait := 10 * time.Second - timer := time.NewTimer(wait) for { select { case access := <-am.accessAccount: @@ -117,20 +114,18 @@ func (am *AccountManager) Start() { } } } - - case <-timer.C: - am.ds.FlushScheduled() - timer = time.NewTimer(wait) } } } -// Grab grabs the account manager's binary semaphore. +// Grab grabs the account manager's binary semaphore. A custom semaphore +// is used instead of a sync.Mutex so the account manager's disk syncer +// can grab the semaphore from a select statement. func (am *AccountManager) Grab() { <-am.bsem } -// Release releases the account manager's binary semaphore. +// Release releases exclusive ownership of the AccountManager. func (am *AccountManager) Release() { am.bsem <- struct{}{} } @@ -476,7 +471,7 @@ func (am *AccountManager) ListSinceBlock(since, curBlockHeight int32, minconf in // GetTransaction(). type accountTx struct { Account string - Tx tx.Tx + Tx tx.Tx } // GetTransaction returns an array of accountTx to fully represent the effect of diff --git a/disksync.go b/disksync.go index d643457..641ddda 100644 --- a/disksync.go +++ b/disksync.go @@ -22,6 +22,7 @@ import ( "io/ioutil" "os" "path/filepath" + "time" ) // networkDir returns the directory name of a network directory to hold account @@ -192,8 +193,7 @@ type exportRequest struct { // DiskSyncer manages all disk write operations for a collection of accounts. type DiskSyncer struct { // Flush scheduled account writes. - flushScheduled chan struct{} - flushAccount chan *flushAccountRequest + flushAccount chan *flushAccountRequest // Schedule file writes for an account. scheduleWallet chan *Account @@ -214,7 +214,6 @@ type DiskSyncer struct { // NewDiskSyncer creates a new DiskSyncer. func NewDiskSyncer(am *AccountManager) *DiskSyncer { return &DiskSyncer{ - flushScheduled: make(chan struct{}, 1), flushAccount: make(chan *flushAccountRequest), scheduleWallet: make(chan *Account), scheduleTxStore: make(chan *Account), @@ -237,28 +236,52 @@ func (ds *DiskSyncer) Start() { } tmpnetdir := tmpNetworkDir(cfg.Net()) + const wait = 10 * time.Second + var timer <-chan time.Time + + var sem chan struct{} schedule := newSyncSchedule(netdir) for { select { - case <-ds.flushScheduled: - ds.am.Grab() + case <-sem: // Now have exclusive access of the account manager err := schedule.flush() - ds.am.Release() if err != nil { log.Errorf("Cannot write accounts: %v", err) } + timer = nil + + // Account manager passed ownership of the semaphore; + // Do not grab semaphore again until another flush is needed. + sem = nil + + // Release semaphore. + ds.am.bsem <- struct{}{} + + case <-timer: + // Grab AccountManager semaphore when ready so flush can occur. + sem = ds.am.bsem + case fr := <-ds.flushAccount: fr.err <- schedule.flushAccount(fr.a) case a := <-ds.scheduleWallet: schedule.wallets[a] = struct{}{} + if timer == nil { + timer = time.After(wait) + } case a := <-ds.scheduleTxStore: schedule.txs[a] = struct{}{} + if timer == nil { + timer = time.After(wait) + } case a := <-ds.scheduleUtxoStore: schedule.utxos[a] = struct{}{} + if timer == nil { + timer = time.After(wait) + } case sr := <-ds.writeBatch: err := batchWriteAccounts(sr.a, tmpnetdir, netdir) @@ -266,6 +289,7 @@ func (ds *DiskSyncer) Start() { // All accounts have been synced, old schedule // can be discarded. schedule = newSyncSchedule(netdir) + timer = nil } sr.err <- err @@ -277,17 +301,6 @@ func (ds *DiskSyncer) Start() { } } -// FlushScheduled writes all scheduled account files to disk. -func (ds *DiskSyncer) FlushScheduled() { - // Schedule a flush if one is not already waiting. This channel - // is buffered so if a request is already waiting, a duplicate - // can be safely dropped. - select { - case ds.flushScheduled <- struct{}{}: - default: - } -} - // FlushAccount writes all scheduled account files to disk for a single // account. func (ds *DiskSyncer) FlushAccount(a *Account) error { diff --git a/rpcserver.go b/rpcserver.go index 0c4c0a5..bc3ad78 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -789,7 +789,7 @@ func GetTransaction(icmd btcjson.Cmd) (interface{}, *btcjson.Error) { return nil, &btcjson.ErrInternal } - accumulatedTxen := AcctMgr.GetTransaction(cmd.Txid) + accumulatedTxen := AcctMgr.GetTransaction(cmd.Txid) if len(accumulatedTxen) == 0 { return nil, &btcjson.ErrNoTxInfo } @@ -825,7 +825,7 @@ func GetTransaction(icmd btcjson.Cmd) (interface{}, *btcjson.Error) { // whether it is an orphan or in the blockchain. "category": "receive", "amount": t.Amount, - "address": hex.EncodeToString(t.ReceiverHash), + "address": hex.EncodeToString(t.ReceiverHash), }) } } @@ -1153,7 +1153,6 @@ func sendPairs(icmd btcjson.Cmd, account string, amounts map[string]int64, return handleSendRawTxReply(icmd, txid, a, createdTx) } - // SendFrom handles a sendfrom RPC request by creating a new transaction // spending unspent transaction outputs for a wallet to another payment // address. Leftover inputs not sent to the payment address or a fee for