Cleanup after insert failure, do not leave inconsistant db.

Fix error returns in InsertBlock and FetchBlockBySha

Give up on return by name in InsertBlock() and return explicit
err one location in FetchBlockBySha to return proper error value
This commit is contained in:
Dale Rahn 2013-07-29 16:39:48 -04:00
parent d8c3213d99
commit 3b743e4cfc
4 changed files with 222 additions and 11 deletions

141
sqlite3/insertfail_test.go Normal file
View file

@ -0,0 +1,141 @@
// Copyright (c) 2013 Conformal Systems LLC.
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
package sqlite3_test
import (
"github.com/conformal/btcdb"
"github.com/conformal/btcdb/sqlite3"
"os"
"path/filepath"
"testing"
)
func TestFailOperational(t *testing.T) {
sqlite3.SetTestingT(t)
failtestOperationalMode(t, dbTmDefault)
failtestOperationalMode(t, dbTmNormal)
failtestOperationalMode(t, dbTmFast)
failtestOperationalMode(t, dbTmNoVerify)
}
func failtestOperationalMode(t *testing.T, mode int) {
// simplified basic operation is:
// 1) fetch block from remote server
// 2) look up all txin (except coinbase in db)
// 3) insert block
// Ignore db remove errors since it means we didn't have an old one.
dbname := "tstdbop1"
_ = os.Remove(dbname)
db, err := btcdb.CreateDB("sqlite", dbname)
if err != nil {
t.Errorf("Failed to open test database %v", err)
return
}
defer os.Remove(dbname)
defer db.Close()
switch mode {
case dbTmDefault: // default
// no setup
case dbTmNormal: // explicit normal
db.SetDBInsertMode(btcdb.InsertNormal)
case dbTmFast: // fast mode
db.SetDBInsertMode(btcdb.InsertFast)
if sqldb, ok := db.(*sqlite3.SqliteDb); ok {
sqldb.TempTblMax = 100
} else {
t.Errorf("not right type")
}
case dbTmNoVerify: // validated block
// no point in testing this
return
}
// Since we are dealing with small dataset, reduce cache size
sqlite3.SetBlockCacheSize(db, 2)
sqlite3.SetTxCacheSize(db, 3)
testdatafile := filepath.Join("testdata", "blocks1-256.bz2")
blocks, err := loadBlocks(t, testdatafile)
if err != nil {
t.Errorf("Unable to load blocks from test data for mode %v: %v",
mode, err)
return
}
err = nil
out:
for height := int64(0); height < int64(len(blocks)); height++ {
block := blocks[height]
mblock := block.MsgBlock()
blockname, _ := block.Sha()
if height == 248 {
// time to corrupt the datbase, to see if it leaves the block or tx in the db
if len(mblock.Transactions) != 2 {
t.Errorf("transaction #248 should have two transactions txid %v ?= 828ef3b079f9c23829c56fe86e85b4a69d9e06e5b54ea597eef5fb3ffef509fe", blockname)
return
}
tx := mblock.Transactions[1]
txin := tx.TxIn[0]
origintxsha := &txin.PreviousOutpoint.Hash
sqlite3.KillTx(db, origintxsha)
_, _, _, _, err = db.FetchTxAllBySha(origintxsha)
if err == nil {
t.Errorf("deleted tx found %v", origintxsha)
}
}
if height == 248 {
}
newheight, err := db.InsertBlock(block)
if err != nil {
if height != 248 {
t.Errorf("failed to insert block %v err %v", height, err)
break out
}
} else {
if height == 248 {
t.Errorf("block insert with missing input tx succeeded block %v err %v", height, err)
break out
}
}
if height == 248 {
for _, tx := range mblock.Transactions {
txsha, err := tx.TxSha(block.ProtocolVersion())
_, _, _, _, err = db.FetchTxAllBySha(&txsha)
if err == nil {
t.Errorf("referenced tx found, should not have been %v, ", txsha)
}
}
}
if height == 248 {
exists := db.ExistsSha(blockname)
if exists == true {
t.Errorf("block still present after failed insert")
}
// if we got here with no error, testing was successful
break out
}
if newheight != height {
t.Errorf("height mismatch expect %v returned %v", height, newheight)
break out
}
}
switch mode {
case dbTmDefault: // default
// no cleanup
case dbTmNormal: // explicit normal
// no cleanup
case dbTmFast: // fast mode
db.SetDBInsertMode(btcdb.InsertNormal)
case dbTmNoVerify: // validated block
db.SetDBInsertMode(btcdb.InsertNormal)
}
}

View file

@ -8,8 +8,15 @@ import (
"fmt"
"github.com/conformal/btcdb"
"github.com/conformal/btcwire"
"testing"
)
var t *testing.T
func SetTestingT(t_arg *testing.T) {
t = t_arg
}
// FetchSha returns the datablock and pver for the given ShaHash.
// This is a testing only interface.
func FetchSha(db btcdb.Db, sha *btcwire.ShaHash) (buf []byte, pver uint32,
@ -46,3 +53,32 @@ func SetTxCacheSize(db btcdb.Db, newsize int) {
tc := &sqldb.txCache
tc.maxcount = newsize
}
// KillTx is a function that deletes a transaction from the database
// this should only be used for testing purposes to valiate error paths
// in the database. This is _expected_ to leave the database in an
// inconsistant state.
func KillTx(dbarg btcdb.Db, txsha *btcwire.ShaHash) {
db, ok := dbarg.(*SqliteDb)
if !ok {
return
}
db.endTx(false)
db.startTx()
tx := &db.txState
key := txsha.String()
_, err := tx.tx.Exec("DELETE FROM txtmp WHERE key == ?", key)
if err != nil {
log.Warnf("error deleting tx %v from txtmp", txsha)
}
_, err = tx.tx.Exec("DELETE FROM tx WHERE key == ?", key)
if err != nil {
log.Warnf("error deleting tx %v from tx (%v)", txsha, key)
}
err = db.endTx(true)
if err != nil {
// XXX
db.endTx(false)
}
db.InvalidateCache()
}

View file

@ -575,7 +575,12 @@ func (db *SqliteDb) DropAfterBlockBySha(sha *btcwire.ShaHash) (err error) {
// lookup to unspend coins in them
db.InvalidateCache()
_, err = tx.tx.Exec("DELETE FROM txtmp WHERE blockid > ?", keepidx)
return db.delFromDB(keepidx)
}
func (db *SqliteDb) delFromDB(keepidx int64) (error) {
tx := &db.txState
_, err := tx.tx.Exec("DELETE FROM txtmp WHERE blockid > ?", keepidx)
if err != nil {
// XXX
db.endTx(false)
@ -601,32 +606,33 @@ func (db *SqliteDb) DropAfterBlockBySha(sha *btcwire.ShaHash) (err error) {
if err != nil {
return err
}
return
return err
}
// InsertBlock inserts raw block and transaction data from a block into the
// database. The first block inserted into the database will be treated as the
// genesis block. Every subsequent block insert requires the referenced parent
// block to already exist.
func (db *SqliteDb) InsertBlock(block *btcutil.Block) (height int64, err error) {
func (db *SqliteDb) InsertBlock(block *btcutil.Block) (int64, error) {
db.dbLock.Lock()
defer db.dbLock.Unlock()
blocksha, err := block.Sha()
if err != nil {
log.Warnf("Failed to compute block sha %v", blocksha)
return
return -1, err
}
mblock := block.MsgBlock()
rawMsg, pver, err := block.Bytes()
if err != nil {
log.Warnf("Failed to obtain raw block sha %v", blocksha)
return
return -1, err
}
txloc, err := block.TxLoc()
if err != nil {
log.Warnf("Failed to obtain raw block sha %v", blocksha)
return
return -1, err
}
// Insert block into database
@ -635,9 +641,32 @@ func (db *SqliteDb) InsertBlock(block *btcutil.Block) (height int64, err error)
if err != nil {
log.Warnf("Failed to insert block %v %v %v", blocksha,
&mblock.Header.PrevBlock, err)
return
return -1, err
}
txinsertidx := -1
success := false
defer func() {
if success {
return
}
for txidx := 0; txidx <= txinsertidx; txidx++ {
tx := mblock.Transactions[txidx]
err = db.unSpend(tx)
if err != nil {
log.Warnf("unSpend error during block insert unwind %v %v %v", blocksha, txidx, err)
}
}
err = db.delFromDB(newheight -1)
if err != nil {
log.Warnf("Error during block insert unwind %v %v", blocksha, err)
}
}()
// At least two blocks in the long past were generated by faulty
// miners, the sha of the transaction exists in a previous block,
// detect this condition and 'accept' the block.
@ -646,8 +675,12 @@ func (db *SqliteDb) InsertBlock(block *btcutil.Block) (height int64, err error)
txsha, err = tx.TxSha(pver)
if err != nil {
log.Warnf("failed to compute tx name block %v idx %v err %v", blocksha, txidx, err)
return
return -1, err
}
// num tx inserted, thus would need unwind if failure occurs
txinsertidx = txidx
// Some old blocks contain duplicate transactions
// Attempt to cleanly bypass this problem
// http://blockexplorer.com/b/91842
@ -687,15 +720,16 @@ func (db *SqliteDb) InsertBlock(block *btcutil.Block) (height int64, err error)
oBlkIdx, _, _, err = db.fetchLocationBySha(&txsha)
log.Warnf("oblkidx %v err %v", oBlkIdx, err)
return
return -1, err
}
err = db.doSpend(tx)
if err != nil {
log.Warnf("block %v idx %v failed to spend tx %v %v err %v", blocksha, newheight, &txsha, txidx, err)
return
return -1, err
}
}
success = true
db.syncPoint()
return newheight, nil
}

View file

@ -64,7 +64,7 @@ func (db *SqliteDb) fetchBlockBySha(sha *btcwire.ShaHash) (blk *btcutil.Block, e
buf, pver, height, err := db.fetchSha(*sha)
if err != nil {
return
return nil, err
}
blk, err = btcutil.NewBlockFromBytes(buf, pver)