From 96f7305c292ccbea7006d0bfd1b9118535e271f6 Mon Sep 17 00:00:00 2001 From: Dale Rahn Date: Thu, 22 Aug 2013 11:40:59 -0400 Subject: [PATCH] One DB instad of many. improved error handling Fix testing, no point in running tests multiple times Fix error when no blocks in database, reopen would misreport NewstSha() return --- ldb/block.go | 24 ++++--- ldb/boundary_test.go | 44 ++++++++++++ ldb/dbcache.go | 2 - ldb/insertremove_test.go | 7 +- ldb/leveldb.go | 148 +++++++++++++++++---------------------- ldb/operational_test.go | 29 +++++--- ldb/tx.go | 9 +-- 7 files changed, 151 insertions(+), 112 deletions(-) create mode 100644 ldb/boundary_test.go diff --git a/ldb/block.go b/ldb/block.go index 8d211e75..8a5a9523 100644 --- a/ldb/block.go +++ b/ldb/block.go @@ -24,9 +24,9 @@ func (db *LevelDb) InsertBlockData(sha *btcwire.ShaHash, prevSha *btcwire.ShaHas func (db *LevelDb) getBlkLoc(sha *btcwire.ShaHash) (int64, error) { var blkHeight int64 - key := sha.Bytes() + key := shaBlkToKey(sha) - data, err := db.bShaDb.Get(key, db.ro) + data, err := db.lDb.Get(key, db.ro) if err != nil { return 0, err @@ -48,8 +48,9 @@ func (db *LevelDb) getBlkByHeight(blkHeight int64) (rsha *btcwire.ShaHash, rbuf key := int64ToKey(blkHeight) - blkVal, err = db.bBlkDb.Get(key, db.ro) + blkVal, err = db.lDb.Get(key, db.ro) if err != nil { + log.Tracef("failed to find height %v", blkHeight) return // exists ??? } @@ -89,7 +90,7 @@ func (db *LevelDb) setBlk(sha *btcwire.ShaHash, blkHeight int64, buf []byte) err err = fmt.Errorf("Write Fail") return err } - shaKey := sha.Bytes() + shaKey := shaBlkToKey(sha) blkKey := int64ToKey(blkHeight) @@ -98,9 +99,9 @@ func (db *LevelDb) setBlk(sha *btcwire.ShaHash, blkHeight int64, buf []byte) err copy(blkVal[0:], shaB) copy(blkVal[len(shaB):], buf) - db.bShaBatch().Put(shaKey, lw.Bytes()) + db.lBatch().Put(shaKey, lw.Bytes()) - db.bBlkBatch().Put(blkKey, blkVal) + db.lBatch().Put(blkKey, blkVal) return nil } @@ -110,7 +111,8 @@ func (db *LevelDb) setBlk(sha *btcwire.ShaHash, blkHeight int64, buf []byte) err // insertSha shall be called with db lock held func (db *LevelDb) insertBlockData(sha *btcwire.ShaHash, prevSha *btcwire.ShaHash, buf []byte) (blockid int64, err error) { - oBlkHeight, err := db.getBlkLoc(prevSha) + var oBlkHeight int64 + oBlkHeight, err = db.getBlkLoc(prevSha) if err != nil { // check current block count @@ -119,6 +121,9 @@ func (db *LevelDb) insertBlockData(sha *btcwire.ShaHash, prevSha *btcwire.ShaHas // return // } oBlkHeight = -1 + if db.nextBlock != 0 { + return 0, err + } } // TODO(drahn) check curfile filesize, increment curfile if this puts it over @@ -222,7 +227,7 @@ func (db *LevelDb) FetchHeightRange(startHeight, endHeight int64) (rshalist []bt // TODO(drahn) fix blkFile from height key := int64ToKey(height) - blkVal, lerr := db.bBlkDb.Get(key, db.ro) + blkVal, lerr := db.lDb.Get(key, db.ro) if lerr != nil { break } @@ -248,8 +253,9 @@ func (db *LevelDb) NewestSha() (rsha *btcwire.ShaHash, rblkid int64, err error) defer db.dbLock.Unlock() if db.lastBlkIdx == -1 { + rblkid = db.lastBlkIdx err = fmt.Errorf("Empty Database") - return + return } sha := db.lastBlkSha diff --git a/ldb/boundary_test.go b/ldb/boundary_test.go new file mode 100644 index 00000000..3e48f8f0 --- /dev/null +++ b/ldb/boundary_test.go @@ -0,0 +1,44 @@ +// Copyright (c) 2013 Conformal Systems LLC. +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +package ldb_test + +import ( + "github.com/conformal/btcdb" + "os" + "testing" +) + +// we need to test for empty databas and make certain it returns proper value + +func TestEmptyDB(t *testing.T) { + + dbname := "tstdbempty" + _ = os.RemoveAll(dbname) + db, err := btcdb.CreateDB("leveldb", dbname) + if err != nil { + t.Errorf("Failed to open test database %v", err) + return + } + defer os.RemoveAll(dbname) + + // This is a reopen test + db.Close() + + db, err = btcdb.OpenDB("leveldb", dbname) + if err != nil { + t.Errorf("Failed to open test database %v", err) + return + } + defer db.Close() + + sha, height, err := db.NewestSha() + + if sha != nil { + t.Errorf("sha not nil") + } + if height != -1 { + t.Errorf("height not -1 %v", height) + } +} diff --git a/ldb/dbcache.go b/ldb/dbcache.go index c5f74c90..6a9b824a 100644 --- a/ldb/dbcache.go +++ b/ldb/dbcache.go @@ -6,7 +6,6 @@ package ldb import ( "bytes" - "fmt" "github.com/conformal/btcdb" "github.com/conformal/btcutil" "github.com/conformal/btcwire" @@ -78,7 +77,6 @@ func (db *LevelDb) fetchTxDataBySha(txsha *btcwire.ShaHash) (rtx *btcwire.MsgTx, blksha, blkbuf, err = db.getBlkByHeight(blkHeight) if err != nil { - fmt.Printf("failed to get block %v %v\n", blkHeight, err) return } diff --git a/ldb/insertremove_test.go b/ldb/insertremove_test.go index c6b6cafa..5de70aba 100644 --- a/ldb/insertremove_test.go +++ b/ldb/insertremove_test.go @@ -5,6 +5,7 @@ package ldb_test import ( + "fmt" "github.com/conformal/btcdb" _ "github.com/conformal/btcdb/ldb" "github.com/conformal/btcutil" @@ -33,8 +34,8 @@ func loadblocks(t *testing.T) []*btcutil.Block { func TestUnspentInsert(t *testing.T) { testUnspentInsert(t, dbTmDefault) - testUnspentInsert(t, dbTmNormal) - testUnspentInsert(t, dbTmFast) + //testUnspentInsert(t, dbTmNormal) + //testUnspentInsert(t, dbTmFast) } // insert every block in the test chain @@ -44,7 +45,7 @@ func TestUnspentInsert(t *testing.T) { // the associated txout set to spent. func testUnspentInsert(t *testing.T, mode int) { // Ignore db remove errors since it means we didn't have an old one. - dbname := "tstdbuspnt1" + dbname := fmt.Sprintf("tstdbuspnt1.%d", mode) _ = os.RemoveAll(dbname) db, err := btcdb.CreateDB("leveldb", dbname) if err != nil { diff --git a/ldb/leveldb.go b/ldb/leveldb.go index 6605b368..488be28f 100644 --- a/ldb/leveldb.go +++ b/ldb/leveldb.go @@ -44,14 +44,11 @@ type LevelDb struct { dbLock sync.Mutex // leveldb pieces - bShaDb *leveldb.DB - bBlkDb *leveldb.DB - tShaDb *leveldb.DB + lDb *leveldb.DB ro *opt.ReadOptions wo *opt.WriteOptions - bShabatch *leveldb.Batch - bBlkbatch *leveldb.Batch + lbatch *leveldb.Batch nextBlock int64 @@ -99,16 +96,20 @@ blockforward: } else { if testblock == 0 { //no blocks in db, odd but ok. - return db, nil + lastknownblock = -1 + nextunknownblock = 0 + var emptysha btcwire.ShaHash + lastSha = &emptysha + } else { + nextunknownblock = testblock } - nextunknownblock = testblock break blockforward } } // narrow search blocknarrow: - for { + for lastknownblock != -1 { testblock = (lastknownblock + nextunknownblock) / 2 sha, _, err := ldb.getBlkByHeight(testblock) if err == nil { @@ -133,12 +134,10 @@ blocknarrow: func openDB(dbpath string, flag opt.OptionsFlag) (pbdb btcdb.Db, err error) { var db LevelDb - var tbShaDb, ttShaDb, tbBlkDb *leveldb.DB + var tlDb *leveldb.DB defer func() { if err == nil { - db.bShaDb = tbShaDb - db.bBlkDb = tbBlkDb - db.tShaDb = ttShaDb + db.lDb = tlDb db.txUpdateMap = map[btcwire.ShaHash]*txUpdateObj{} @@ -160,18 +159,8 @@ func openDB(dbpath string, flag opt.OptionsFlag) (pbdb btcdb.Db, err error) { } } - bShaName := filepath.Join(dbpath, "bSha.ldb") - tbShaDb, err = leveldb.OpenFile(bShaName, &opt.Options{Flag: flag}) - if err != nil { - return - } - bBlkName := filepath.Join(dbpath, "bBlk.ldb") - tbBlkDb, err = leveldb.OpenFile(bBlkName, &opt.Options{Flag: flag}) - if err != nil { - return - } - tShaName := filepath.Join(dbpath, "tSha.ldb") - ttShaDb, err = leveldb.OpenFile(tShaName, &opt.Options{Flag: flag}) + lDbName := filepath.Join(dbpath, "btcd.ldb") + tlDb, err = leveldb.OpenFile(lDbName, &opt.Options{Flag: flag}) if err != nil { return } @@ -188,9 +177,7 @@ func CreateDB(dbpath string) (btcdb.Db, error) { } func (db *LevelDb) close() { - db.bShaDb.Close() - db.bBlkDb.Close() - db.tShaDb.Close() + db.lDb.Close() } // Sync verifies that the database is coherent on disk, @@ -213,17 +200,22 @@ func (db *LevelDb) Close() { // DropAfterBlockBySha will remove any blocks from the database after // the given block. -func (db *LevelDb) DropAfterBlockBySha(sha *btcwire.ShaHash) error { +func (db *LevelDb) DropAfterBlockBySha(sha *btcwire.ShaHash) (rerr error) { db.dbLock.Lock() defer db.dbLock.Unlock() - defer db.processBatches() + defer func () { + if rerr == nil { + rerr = db.processBatches() + + } + } () startheight := db.nextBlock - 1 keepidx, err := db.getBlkLoc(sha) if err != nil { // should the error here be normalized ? - log.Infof("block loc failed %v ", sha) + log.Tracef("block loc failed %v ", sha) return err } @@ -251,8 +243,8 @@ func (db *LevelDb) DropAfterBlockBySha(sha *btcwire.ShaHash) error { txUo.delete = true db.txUpdateMap[txSha] = &txUo } - db.bShaBatch().Delete(shaToKey(blksha)) - db.bBlkBatch().Delete(int64ToKey(height)) + db.lBatch().Delete(shaBlkToKey(blksha)) + db.lBatch().Delete(int64ToKey(height)) } db.nextBlock = keepidx + 1 @@ -264,26 +256,31 @@ func (db *LevelDb) DropAfterBlockBySha(sha *btcwire.ShaHash) error { // database. The first block inserted into the database will be treated as the // genesis block. Every subsequent block insert requires the referenced parent // block to already exist. -func (db *LevelDb) InsertBlock(block *btcutil.Block) (height int64, err error) { +func (db *LevelDb) InsertBlock(block *btcutil.Block) (height int64, rerr error) { db.dbLock.Lock() defer db.dbLock.Unlock() - defer db.processBatches() + defer func () { + if rerr == nil { + rerr = db.processBatches() + } + } () + blocksha, err := block.Sha() if err != nil { log.Warnf("Failed to compute block sha %v", blocksha) - return + return 0, err } mblock := block.MsgBlock() rawMsg, err := block.Bytes() if err != nil { log.Warnf("Failed to obtain raw block sha %v", blocksha) - return + return 0, err } txloc, err := block.TxLoc() if err != nil { log.Warnf("Failed to obtain raw block sha %v", blocksha) - return + return 0, err } // Insert block into database @@ -292,7 +289,7 @@ func (db *LevelDb) InsertBlock(block *btcutil.Block) (height int64, err error) { if err != nil { log.Warnf("Failed to insert block %v %v %v", blocksha, &mblock.Header.PrevBlock, err) - return + return 0, err } // At least two blocks in the long past were generated by faulty @@ -303,7 +300,7 @@ func (db *LevelDb) InsertBlock(block *btcutil.Block) (height int64, err error) { txsha, err = tx.TxSha() if err != nil { log.Warnf("failed to compute tx name block %v idx %v err %v", blocksha, txidx, err) - return + return 0, err } // Some old blocks contain duplicate transactions // Attempt to cleanly bypass this problem @@ -448,81 +445,68 @@ func (db *LevelDb) setclearSpentData(txsha *btcwire.ShaHash, idx uint32, set boo return nil } -func intToKey(keyint int) []byte { - key := fmt.Sprintf("%d", keyint) - return []byte(key) -} func int64ToKey(keyint int64) []byte { key := fmt.Sprintf("%d", keyint) return []byte(key) } -func shaToKey(sha *btcwire.ShaHash) []byte { - return sha.Bytes() +func shaBlkToKey(sha *btcwire.ShaHash) []byte { + shaB := sha.Bytes() + return shaB } -func (db *LevelDb) bShaBatch() *leveldb.Batch { - if db.bShabatch == nil { - db.bShabatch = new(leveldb.Batch) +func shaTxToKey(sha *btcwire.ShaHash) []byte { + shaB := sha.Bytes() + shaB = append(shaB, "tx"...) + return shaB +} + +func (db *LevelDb) lBatch() *leveldb.Batch { + if db.lbatch == nil { + db.lbatch = new(leveldb.Batch) } - return db.bShabatch + return db.lbatch } -func (db *LevelDb) bBlkBatch() *leveldb.Batch { - if db.bBlkbatch == nil { - db.bBlkbatch = new(leveldb.Batch) - } - return db.bBlkbatch -} - -func (db *LevelDb) processBatches() { +func (db *LevelDb) processBatches() error { var err error - if db.bShabatch != nil { - err = db.bShaDb.Write(db.bShabatch, db.wo) - db.bShabatch.Reset() - db.bShabatch = nil - if err != nil { - return - } - } - if db.bBlkbatch != nil { - err = db.bBlkDb.Write(db.bBlkbatch, db.wo) - db.bBlkbatch.Reset() - db.bBlkbatch = nil - if err != nil { - return - } - } - if len(db.txUpdateMap) != 0 { - tShabatch := new(leveldb.Batch) + if len(db.txUpdateMap) != 0 || db.lbatch != nil { + if db.lbatch == nil { + db.lbatch = new(leveldb.Batch) + } for txSha, txU := range db.txUpdateMap { - key := shaToKey(&txSha) + key := shaTxToKey(&txSha) if txU.delete { //log.Infof("deleting tx %v", txSha) - tShabatch.Delete(key) + db.lbatch.Delete(key) } else { //log.Infof("inserting tx %v", txSha) txdat, err := db.formatTx(txU) if err != nil { - return + return err } - tShabatch.Put(key, txdat) + db.lbatch.Put(key, txdat) } } - err = db.tShaDb.Write(tShabatch, db.wo) - tShabatch.Reset() + err = db.lDb.Write(db.lbatch, db.wo) if err != nil { - return + fmt.Printf("batch failed %v\n", err) + return err } + db.lbatch.Reset() db.txUpdateMap = map[btcwire.ShaHash]*txUpdateObj{} runtime.GC() } + return nil } func (db *LevelDb) RollbackClose() { + db.dbLock.Lock() + defer db.dbLock.Unlock() + db.close() } diff --git a/ldb/operational_test.go b/ldb/operational_test.go index 1000bd66..eb52db46 100644 --- a/ldb/operational_test.go +++ b/ldb/operational_test.go @@ -7,6 +7,7 @@ package ldb_test import ( "compress/bzip2" "encoding/binary" + "fmt" "github.com/conformal/btcdb" "github.com/conformal/btcutil" "github.com/conformal/btcwire" @@ -28,9 +29,9 @@ const ( func TestOperational(t *testing.T) { testOperationalMode(t, dbTmDefault) - testOperationalMode(t, dbTmNormal) - testOperationalMode(t, dbTmFast) - testOperationalMode(t, dbTmNoVerify) + //testOperationalMode(t, dbTmNormal) + //testOperationalMode(t, dbTmFast) + //testOperationalMode(t, dbTmNoVerify) } func testOperationalMode(t *testing.T, mode int) { @@ -40,7 +41,7 @@ func testOperationalMode(t *testing.T, mode int) { // 3) insert block // Ignore db remove errors since it means we didn't have an old one. - dbname := "tstdbop1" + dbname := fmt.Sprintf("tstdbop1.%d", mode) _ = os.RemoveAll(dbname) db, err := btcdb.CreateDB("leveldb", dbname) if err != nil { @@ -163,8 +164,8 @@ out: func TestBackout(t *testing.T) { testBackout(t, dbTmDefault) - testBackout(t, dbTmNormal) - testBackout(t, dbTmFast) + //testBackout(t, dbTmNormal) + //testBackout(t, dbTmFast) } func testBackout(t *testing.T, mode int) { @@ -173,8 +174,9 @@ func testBackout(t *testing.T, mode int) { // 2) look up all txin (except coinbase in db) // 3) insert block + t.Logf("mode %v", mode) // Ignore db remove errors since it means we didn't have an old one. - dbname := "tstdbop2" + dbname := fmt.Sprintf("tstdbop2.%d", mode) _ = os.RemoveAll(dbname) db, err := btcdb.CreateDB("leveldb", dbname) if err != nil { @@ -218,11 +220,11 @@ func testBackout(t *testing.T, mode int) { newheight, err := db.InsertBlock(block) if err != nil { t.Errorf("failed to insert block %v err %v", height, err) - break + return } if newheight != height { t.Errorf("height mismatch expect %v returned %v", height, newheight) - break + return } } @@ -245,6 +247,7 @@ func testBackout(t *testing.T, mode int) { _, err = db.FetchBlockBySha(sha) if err != nil { t.Errorf("failed to load block 99 from db %v", err) + return } sha, err = blocks[119].Sha() @@ -279,7 +282,14 @@ func testBackout(t *testing.T, mode int) { } } + +var savedblocks []*btcutil.Block + func loadBlocks(t *testing.T, file string) (blocks []*btcutil.Block, err error) { + if len(savedblocks) != 0 { + blocks = savedblocks + return + } testdatafile := filepath.Join("testdata", "blocks1-256.bz2") var dr io.Reader var fi io.ReadCloser @@ -340,6 +350,7 @@ func loadBlocks(t *testing.T, file string) (blocks []*btcutil.Block, err error) } blocks = append(blocks, block) } + savedblocks = blocks return } diff --git a/ldb/tx.go b/ldb/tx.go index 300f599f..7bc00300 100644 --- a/ldb/tx.go +++ b/ldb/tx.go @@ -60,28 +60,24 @@ func (db *LevelDb) formatTx(txu *txUpdateObj) ([]byte, error) { err := binary.Write(&txW, binary.LittleEndian, blkHeight) if err != nil { - fmt.Printf("fail encoding blkHeight %v\n", err) err = fmt.Errorf("Write fail") return nil, err } err = binary.Write(&txW, binary.LittleEndian, txOff) if err != nil { - fmt.Printf("fail encoding txoff %v\n", err) err = fmt.Errorf("Write fail") return nil, err } err = binary.Write(&txW, binary.LittleEndian, txLen) if err != nil { - fmt.Printf("fail encoding txlen %v\n", err) err = fmt.Errorf("Write fail") return nil, err } err = binary.Write(&txW, binary.LittleEndian, spentbuf) if err != nil { - fmt.Printf("fail encoding spentbuf %v\n", err) err = fmt.Errorf("Write fail") return nil, err } @@ -93,8 +89,8 @@ func (db *LevelDb) getTxData(txsha *btcwire.ShaHash) (rblkHeight int64, rtxOff int, rtxLen int, rspentBuf []byte, err error) { var buf []byte - key := shaToKey(txsha) - buf, err = db.tShaDb.Get(key, db.ro) + key := shaTxToKey(txsha) + buf, err = db.lDb.Get(key, db.ro) if err != nil { return } @@ -121,7 +117,6 @@ func (db *LevelDb) getTxData(txsha *btcwire.ShaHash) (rblkHeight int64, spentBuf := make([]byte, dr.Len()) err = binary.Read(dr, binary.LittleEndian, spentBuf) if err != nil { - fmt.Printf("fail encoding spentbuf %v\n", err) err = fmt.Errorf("Db Corrupt 4") return }