One DB instad of many.

improved error handling

Fix testing, no point in running tests multiple times

Fix error when no blocks in database, reopen would misreport NewstSha() return
This commit is contained in:
Dale Rahn 2013-08-22 11:40:59 -04:00
parent b66abdf6ba
commit 96f7305c29
7 changed files with 151 additions and 112 deletions

View file

@ -24,9 +24,9 @@ func (db *LevelDb) InsertBlockData(sha *btcwire.ShaHash, prevSha *btcwire.ShaHas
func (db *LevelDb) getBlkLoc(sha *btcwire.ShaHash) (int64, error) {
var blkHeight int64
key := sha.Bytes()
key := shaBlkToKey(sha)
data, err := db.bShaDb.Get(key, db.ro)
data, err := db.lDb.Get(key, db.ro)
if err != nil {
return 0, err
@ -48,8 +48,9 @@ func (db *LevelDb) getBlkByHeight(blkHeight int64) (rsha *btcwire.ShaHash, rbuf
key := int64ToKey(blkHeight)
blkVal, err = db.bBlkDb.Get(key, db.ro)
blkVal, err = db.lDb.Get(key, db.ro)
if err != nil {
log.Tracef("failed to find height %v", blkHeight)
return // exists ???
}
@ -89,7 +90,7 @@ func (db *LevelDb) setBlk(sha *btcwire.ShaHash, blkHeight int64, buf []byte) err
err = fmt.Errorf("Write Fail")
return err
}
shaKey := sha.Bytes()
shaKey := shaBlkToKey(sha)
blkKey := int64ToKey(blkHeight)
@ -98,9 +99,9 @@ func (db *LevelDb) setBlk(sha *btcwire.ShaHash, blkHeight int64, buf []byte) err
copy(blkVal[0:], shaB)
copy(blkVal[len(shaB):], buf)
db.bShaBatch().Put(shaKey, lw.Bytes())
db.lBatch().Put(shaKey, lw.Bytes())
db.bBlkBatch().Put(blkKey, blkVal)
db.lBatch().Put(blkKey, blkVal)
return nil
}
@ -110,7 +111,8 @@ func (db *LevelDb) setBlk(sha *btcwire.ShaHash, blkHeight int64, buf []byte) err
// insertSha shall be called with db lock held
func (db *LevelDb) insertBlockData(sha *btcwire.ShaHash, prevSha *btcwire.ShaHash, buf []byte) (blockid int64, err error) {
oBlkHeight, err := db.getBlkLoc(prevSha)
var oBlkHeight int64
oBlkHeight, err = db.getBlkLoc(prevSha)
if err != nil {
// check current block count
@ -119,6 +121,9 @@ func (db *LevelDb) insertBlockData(sha *btcwire.ShaHash, prevSha *btcwire.ShaHas
// return
// }
oBlkHeight = -1
if db.nextBlock != 0 {
return 0, err
}
}
// TODO(drahn) check curfile filesize, increment curfile if this puts it over
@ -222,7 +227,7 @@ func (db *LevelDb) FetchHeightRange(startHeight, endHeight int64) (rshalist []bt
// TODO(drahn) fix blkFile from height
key := int64ToKey(height)
blkVal, lerr := db.bBlkDb.Get(key, db.ro)
blkVal, lerr := db.lDb.Get(key, db.ro)
if lerr != nil {
break
}
@ -248,6 +253,7 @@ func (db *LevelDb) NewestSha() (rsha *btcwire.ShaHash, rblkid int64, err error)
defer db.dbLock.Unlock()
if db.lastBlkIdx == -1 {
rblkid = db.lastBlkIdx
err = fmt.Errorf("Empty Database")
return
}

44
ldb/boundary_test.go Normal file
View file

@ -0,0 +1,44 @@
// Copyright (c) 2013 Conformal Systems LLC.
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
package ldb_test
import (
"github.com/conformal/btcdb"
"os"
"testing"
)
// we need to test for empty databas and make certain it returns proper value
func TestEmptyDB(t *testing.T) {
dbname := "tstdbempty"
_ = os.RemoveAll(dbname)
db, err := btcdb.CreateDB("leveldb", dbname)
if err != nil {
t.Errorf("Failed to open test database %v", err)
return
}
defer os.RemoveAll(dbname)
// This is a reopen test
db.Close()
db, err = btcdb.OpenDB("leveldb", dbname)
if err != nil {
t.Errorf("Failed to open test database %v", err)
return
}
defer db.Close()
sha, height, err := db.NewestSha()
if sha != nil {
t.Errorf("sha not nil")
}
if height != -1 {
t.Errorf("height not -1 %v", height)
}
}

View file

@ -6,7 +6,6 @@ package ldb
import (
"bytes"
"fmt"
"github.com/conformal/btcdb"
"github.com/conformal/btcutil"
"github.com/conformal/btcwire"
@ -78,7 +77,6 @@ func (db *LevelDb) fetchTxDataBySha(txsha *btcwire.ShaHash) (rtx *btcwire.MsgTx,
blksha, blkbuf, err = db.getBlkByHeight(blkHeight)
if err != nil {
fmt.Printf("failed to get block %v %v\n", blkHeight, err)
return
}

View file

@ -5,6 +5,7 @@
package ldb_test
import (
"fmt"
"github.com/conformal/btcdb"
_ "github.com/conformal/btcdb/ldb"
"github.com/conformal/btcutil"
@ -33,8 +34,8 @@ func loadblocks(t *testing.T) []*btcutil.Block {
func TestUnspentInsert(t *testing.T) {
testUnspentInsert(t, dbTmDefault)
testUnspentInsert(t, dbTmNormal)
testUnspentInsert(t, dbTmFast)
//testUnspentInsert(t, dbTmNormal)
//testUnspentInsert(t, dbTmFast)
}
// insert every block in the test chain
@ -44,7 +45,7 @@ func TestUnspentInsert(t *testing.T) {
// the associated txout set to spent.
func testUnspentInsert(t *testing.T, mode int) {
// Ignore db remove errors since it means we didn't have an old one.
dbname := "tstdbuspnt1"
dbname := fmt.Sprintf("tstdbuspnt1.%d", mode)
_ = os.RemoveAll(dbname)
db, err := btcdb.CreateDB("leveldb", dbname)
if err != nil {

View file

@ -44,14 +44,11 @@ type LevelDb struct {
dbLock sync.Mutex
// leveldb pieces
bShaDb *leveldb.DB
bBlkDb *leveldb.DB
tShaDb *leveldb.DB
lDb *leveldb.DB
ro *opt.ReadOptions
wo *opt.WriteOptions
bShabatch *leveldb.Batch
bBlkbatch *leveldb.Batch
lbatch *leveldb.Batch
nextBlock int64
@ -99,16 +96,20 @@ blockforward:
} else {
if testblock == 0 {
//no blocks in db, odd but ok.
return db, nil
lastknownblock = -1
nextunknownblock = 0
var emptysha btcwire.ShaHash
lastSha = &emptysha
} else {
nextunknownblock = testblock
}
nextunknownblock = testblock
break blockforward
}
}
// narrow search
blocknarrow:
for {
for lastknownblock != -1 {
testblock = (lastknownblock + nextunknownblock) / 2
sha, _, err := ldb.getBlkByHeight(testblock)
if err == nil {
@ -133,12 +134,10 @@ blocknarrow:
func openDB(dbpath string, flag opt.OptionsFlag) (pbdb btcdb.Db, err error) {
var db LevelDb
var tbShaDb, ttShaDb, tbBlkDb *leveldb.DB
var tlDb *leveldb.DB
defer func() {
if err == nil {
db.bShaDb = tbShaDb
db.bBlkDb = tbBlkDb
db.tShaDb = ttShaDb
db.lDb = tlDb
db.txUpdateMap = map[btcwire.ShaHash]*txUpdateObj{}
@ -160,18 +159,8 @@ func openDB(dbpath string, flag opt.OptionsFlag) (pbdb btcdb.Db, err error) {
}
}
bShaName := filepath.Join(dbpath, "bSha.ldb")
tbShaDb, err = leveldb.OpenFile(bShaName, &opt.Options{Flag: flag})
if err != nil {
return
}
bBlkName := filepath.Join(dbpath, "bBlk.ldb")
tbBlkDb, err = leveldb.OpenFile(bBlkName, &opt.Options{Flag: flag})
if err != nil {
return
}
tShaName := filepath.Join(dbpath, "tSha.ldb")
ttShaDb, err = leveldb.OpenFile(tShaName, &opt.Options{Flag: flag})
lDbName := filepath.Join(dbpath, "btcd.ldb")
tlDb, err = leveldb.OpenFile(lDbName, &opt.Options{Flag: flag})
if err != nil {
return
}
@ -188,9 +177,7 @@ func CreateDB(dbpath string) (btcdb.Db, error) {
}
func (db *LevelDb) close() {
db.bShaDb.Close()
db.bBlkDb.Close()
db.tShaDb.Close()
db.lDb.Close()
}
// Sync verifies that the database is coherent on disk,
@ -213,17 +200,22 @@ func (db *LevelDb) Close() {
// DropAfterBlockBySha will remove any blocks from the database after
// the given block.
func (db *LevelDb) DropAfterBlockBySha(sha *btcwire.ShaHash) error {
func (db *LevelDb) DropAfterBlockBySha(sha *btcwire.ShaHash) (rerr error) {
db.dbLock.Lock()
defer db.dbLock.Unlock()
defer db.processBatches()
defer func () {
if rerr == nil {
rerr = db.processBatches()
}
} ()
startheight := db.nextBlock - 1
keepidx, err := db.getBlkLoc(sha)
if err != nil {
// should the error here be normalized ?
log.Infof("block loc failed %v ", sha)
log.Tracef("block loc failed %v ", sha)
return err
}
@ -251,8 +243,8 @@ func (db *LevelDb) DropAfterBlockBySha(sha *btcwire.ShaHash) error {
txUo.delete = true
db.txUpdateMap[txSha] = &txUo
}
db.bShaBatch().Delete(shaToKey(blksha))
db.bBlkBatch().Delete(int64ToKey(height))
db.lBatch().Delete(shaBlkToKey(blksha))
db.lBatch().Delete(int64ToKey(height))
}
db.nextBlock = keepidx + 1
@ -264,26 +256,31 @@ func (db *LevelDb) DropAfterBlockBySha(sha *btcwire.ShaHash) error {
// database. The first block inserted into the database will be treated as the
// genesis block. Every subsequent block insert requires the referenced parent
// block to already exist.
func (db *LevelDb) InsertBlock(block *btcutil.Block) (height int64, err error) {
func (db *LevelDb) InsertBlock(block *btcutil.Block) (height int64, rerr error) {
db.dbLock.Lock()
defer db.dbLock.Unlock()
defer db.processBatches()
defer func () {
if rerr == nil {
rerr = db.processBatches()
}
} ()
blocksha, err := block.Sha()
if err != nil {
log.Warnf("Failed to compute block sha %v", blocksha)
return
return 0, err
}
mblock := block.MsgBlock()
rawMsg, err := block.Bytes()
if err != nil {
log.Warnf("Failed to obtain raw block sha %v", blocksha)
return
return 0, err
}
txloc, err := block.TxLoc()
if err != nil {
log.Warnf("Failed to obtain raw block sha %v", blocksha)
return
return 0, err
}
// Insert block into database
@ -292,7 +289,7 @@ func (db *LevelDb) InsertBlock(block *btcutil.Block) (height int64, err error) {
if err != nil {
log.Warnf("Failed to insert block %v %v %v", blocksha,
&mblock.Header.PrevBlock, err)
return
return 0, err
}
// At least two blocks in the long past were generated by faulty
@ -303,7 +300,7 @@ func (db *LevelDb) InsertBlock(block *btcutil.Block) (height int64, err error) {
txsha, err = tx.TxSha()
if err != nil {
log.Warnf("failed to compute tx name block %v idx %v err %v", blocksha, txidx, err)
return
return 0, err
}
// Some old blocks contain duplicate transactions
// Attempt to cleanly bypass this problem
@ -448,81 +445,68 @@ func (db *LevelDb) setclearSpentData(txsha *btcwire.ShaHash, idx uint32, set boo
return nil
}
func intToKey(keyint int) []byte {
key := fmt.Sprintf("%d", keyint)
return []byte(key)
}
func int64ToKey(keyint int64) []byte {
key := fmt.Sprintf("%d", keyint)
return []byte(key)
}
func shaToKey(sha *btcwire.ShaHash) []byte {
return sha.Bytes()
func shaBlkToKey(sha *btcwire.ShaHash) []byte {
shaB := sha.Bytes()
return shaB
}
func (db *LevelDb) bShaBatch() *leveldb.Batch {
if db.bShabatch == nil {
db.bShabatch = new(leveldb.Batch)
func shaTxToKey(sha *btcwire.ShaHash) []byte {
shaB := sha.Bytes()
shaB = append(shaB, "tx"...)
return shaB
}
func (db *LevelDb) lBatch() *leveldb.Batch {
if db.lbatch == nil {
db.lbatch = new(leveldb.Batch)
}
return db.bShabatch
return db.lbatch
}
func (db *LevelDb) bBlkBatch() *leveldb.Batch {
if db.bBlkbatch == nil {
db.bBlkbatch = new(leveldb.Batch)
}
return db.bBlkbatch
}
func (db *LevelDb) processBatches() {
func (db *LevelDb) processBatches() error {
var err error
if db.bShabatch != nil {
err = db.bShaDb.Write(db.bShabatch, db.wo)
db.bShabatch.Reset()
db.bShabatch = nil
if err != nil {
return
}
}
if db.bBlkbatch != nil {
err = db.bBlkDb.Write(db.bBlkbatch, db.wo)
db.bBlkbatch.Reset()
db.bBlkbatch = nil
if err != nil {
return
}
}
if len(db.txUpdateMap) != 0 {
tShabatch := new(leveldb.Batch)
if len(db.txUpdateMap) != 0 || db.lbatch != nil {
if db.lbatch == nil {
db.lbatch = new(leveldb.Batch)
}
for txSha, txU := range db.txUpdateMap {
key := shaToKey(&txSha)
key := shaTxToKey(&txSha)
if txU.delete {
//log.Infof("deleting tx %v", txSha)
tShabatch.Delete(key)
db.lbatch.Delete(key)
} else {
//log.Infof("inserting tx %v", txSha)
txdat, err := db.formatTx(txU)
if err != nil {
return
return err
}
tShabatch.Put(key, txdat)
db.lbatch.Put(key, txdat)
}
}
err = db.tShaDb.Write(tShabatch, db.wo)
tShabatch.Reset()
err = db.lDb.Write(db.lbatch, db.wo)
if err != nil {
return
fmt.Printf("batch failed %v\n", err)
return err
}
db.lbatch.Reset()
db.txUpdateMap = map[btcwire.ShaHash]*txUpdateObj{}
runtime.GC()
}
return nil
}
func (db *LevelDb) RollbackClose() {
db.dbLock.Lock()
defer db.dbLock.Unlock()
db.close()
}

View file

@ -7,6 +7,7 @@ package ldb_test
import (
"compress/bzip2"
"encoding/binary"
"fmt"
"github.com/conformal/btcdb"
"github.com/conformal/btcutil"
"github.com/conformal/btcwire"
@ -28,9 +29,9 @@ const (
func TestOperational(t *testing.T) {
testOperationalMode(t, dbTmDefault)
testOperationalMode(t, dbTmNormal)
testOperationalMode(t, dbTmFast)
testOperationalMode(t, dbTmNoVerify)
//testOperationalMode(t, dbTmNormal)
//testOperationalMode(t, dbTmFast)
//testOperationalMode(t, dbTmNoVerify)
}
func testOperationalMode(t *testing.T, mode int) {
@ -40,7 +41,7 @@ func testOperationalMode(t *testing.T, mode int) {
// 3) insert block
// Ignore db remove errors since it means we didn't have an old one.
dbname := "tstdbop1"
dbname := fmt.Sprintf("tstdbop1.%d", mode)
_ = os.RemoveAll(dbname)
db, err := btcdb.CreateDB("leveldb", dbname)
if err != nil {
@ -163,8 +164,8 @@ out:
func TestBackout(t *testing.T) {
testBackout(t, dbTmDefault)
testBackout(t, dbTmNormal)
testBackout(t, dbTmFast)
//testBackout(t, dbTmNormal)
//testBackout(t, dbTmFast)
}
func testBackout(t *testing.T, mode int) {
@ -173,8 +174,9 @@ func testBackout(t *testing.T, mode int) {
// 2) look up all txin (except coinbase in db)
// 3) insert block
t.Logf("mode %v", mode)
// Ignore db remove errors since it means we didn't have an old one.
dbname := "tstdbop2"
dbname := fmt.Sprintf("tstdbop2.%d", mode)
_ = os.RemoveAll(dbname)
db, err := btcdb.CreateDB("leveldb", dbname)
if err != nil {
@ -218,11 +220,11 @@ func testBackout(t *testing.T, mode int) {
newheight, err := db.InsertBlock(block)
if err != nil {
t.Errorf("failed to insert block %v err %v", height, err)
break
return
}
if newheight != height {
t.Errorf("height mismatch expect %v returned %v", height, newheight)
break
return
}
}
@ -245,6 +247,7 @@ func testBackout(t *testing.T, mode int) {
_, err = db.FetchBlockBySha(sha)
if err != nil {
t.Errorf("failed to load block 99 from db %v", err)
return
}
sha, err = blocks[119].Sha()
@ -279,7 +282,14 @@ func testBackout(t *testing.T, mode int) {
}
}
var savedblocks []*btcutil.Block
func loadBlocks(t *testing.T, file string) (blocks []*btcutil.Block, err error) {
if len(savedblocks) != 0 {
blocks = savedblocks
return
}
testdatafile := filepath.Join("testdata", "blocks1-256.bz2")
var dr io.Reader
var fi io.ReadCloser
@ -340,6 +350,7 @@ func loadBlocks(t *testing.T, file string) (blocks []*btcutil.Block, err error)
}
blocks = append(blocks, block)
}
savedblocks = blocks
return
}

View file

@ -60,28 +60,24 @@ func (db *LevelDb) formatTx(txu *txUpdateObj) ([]byte, error) {
err := binary.Write(&txW, binary.LittleEndian, blkHeight)
if err != nil {
fmt.Printf("fail encoding blkHeight %v\n", err)
err = fmt.Errorf("Write fail")
return nil, err
}
err = binary.Write(&txW, binary.LittleEndian, txOff)
if err != nil {
fmt.Printf("fail encoding txoff %v\n", err)
err = fmt.Errorf("Write fail")
return nil, err
}
err = binary.Write(&txW, binary.LittleEndian, txLen)
if err != nil {
fmt.Printf("fail encoding txlen %v\n", err)
err = fmt.Errorf("Write fail")
return nil, err
}
err = binary.Write(&txW, binary.LittleEndian, spentbuf)
if err != nil {
fmt.Printf("fail encoding spentbuf %v\n", err)
err = fmt.Errorf("Write fail")
return nil, err
}
@ -93,8 +89,8 @@ func (db *LevelDb) getTxData(txsha *btcwire.ShaHash) (rblkHeight int64,
rtxOff int, rtxLen int, rspentBuf []byte, err error) {
var buf []byte
key := shaToKey(txsha)
buf, err = db.tShaDb.Get(key, db.ro)
key := shaTxToKey(txsha)
buf, err = db.lDb.Get(key, db.ro)
if err != nil {
return
}
@ -121,7 +117,6 @@ func (db *LevelDb) getTxData(txsha *btcwire.ShaHash) (rblkHeight int64,
spentBuf := make([]byte, dr.Len())
err = binary.Read(dr, binary.LittleEndian, spentBuf)
if err != nil {
fmt.Printf("fail encoding spentbuf %v\n", err)
err = fmt.Errorf("Db Corrupt 4")
return
}