block converted.

This commit is contained in:
Dale Rahn 2013-08-02 18:31:21 -04:00
parent ead14e5a12
commit 85d97d7436
9 changed files with 2493 additions and 0 deletions

293
leveldb/block.go Normal file
View file

@ -0,0 +1,293 @@
// Copyright (c) 2013 Conformal Systems LLC.
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
package ldb
import (
"bytes"
"fmt"
"encoding/binary"
"errors"
"github.com/conformal/btcdb"
"github.com/conformal/btcwire"
)
// InsertBlockData stores a block hash and its associated data block with a
// previous sha of `prevSha' and a version of `pver'.
func (db *LevelDb) InsertBlockData(sha *btcwire.ShaHash, prevSha *btcwire.ShaHash, pver uint32, buf []byte) (blockid int64, err error) {
db.dbLock.Lock()
defer db.dbLock.Unlock()
return db.insertBlockData(sha, prevSha, pver, buf)
}
func (db *LevelDb) getBlkLoc(sha *btcwire.ShaHash) (int64, int, error) {
var blkHeight int64
var blkFile int
key := sha.Bytes()
data, err := db.bShaDb.Get(key, db.ro)
if err != nil {
return 0, 0, err
}
// deserialize
dr := bytes.NewBuffer(data)
err = binary.Read(dr, binary.LittleEndian, &blkHeight)
if err != nil {
err = errors.New("Db Corrupt")
return 0, 0, err
}
err = binary.Read(dr, binary.LittleEndian, &blkFile)
if err != nil {
err = errors.New("Db Corrupt")
return 0, 0, err
}
return blkHeight, blkFile, nil
}
func (db *LevelDb) getBlkByHeight(blkHeight int64, blkFile int) (rsha *btcwire.ShaHash, rbuf []byte, err error) {
var blkVal []byte
key := fmt.Sprintf("%d",blkHeight)
blkVal, err = db.bBlkDb[blkFile].Get([]byte(key), db.ro)
if err != nil {
return // exists ???
}
var sha btcwire.ShaHash
sha.SetBytes(blkVal[0:31])
return &sha, blkVal[32:], nil
}
func (db *LevelDb) getBlk(sha *btcwire.ShaHash) (rblkHeight int64, rblkFile int, rbuf []byte, err error) {
var blkHeight int64
var blkFile int
blkHeight, blkFile, err = db.getBlkLoc(sha)
if err != nil {
return
}
var buf []byte
_, buf, err = db.getBlkByHeight(blkHeight, blkFile)
if err != nil {
return
}
return blkHeight, blkFile, buf, nil
}
func (db *LevelDb) setBlk(sha *btcwire.ShaHash, blkHeight int64, blkFile int, buf []byte) (error) {
// serialize
var lw bytes.Buffer
err := binary.Write(&lw, binary.LittleEndian, &blkHeight)
if err != nil {
err = errors.New("Write fail")
return err
}
err = binary.Write(&lw, binary.LittleEndian, &blkFile)
if err != nil {
err = errors.New("Write fail")
return err
}
key := sha.Bytes()
err = db.bShaDb.Put(key, lw.Bytes(), db.wo)
if err != nil {
return err
}
key = []byte(fmt.Sprintf("%d",blkHeight))
shaB := sha.Bytes()
blkVal := make([]byte, len(shaB) + len(buf))
copy (blkVal[0:], shaB)
copy (blkVal[len(shaB):], buf)
err = db.bBlkDb[blkFile].Put(key, blkVal, db.wo)
return nil
}
// insertSha stores a block hash and its associated data block with a
// previous sha of `prevSha' and a version of `pver'.
// insertSha shall be called with db lock held
func (db *LevelDb) insertBlockData(sha *btcwire.ShaHash, prevSha *btcwire.ShaHash, pver uint32, buf []byte) (blockid int64, err error) {
tx := &db.txState
if tx.tx == nil {
err = db.startTx()
if err != nil {
return
}
}
oBlkHeight, _, err:= db.getBlkLoc(prevSha)
if err != nil {
// check current block count
// if count != 0 {
// err = btcdb.PrevShaMissing
// return
// }
oBlkHeight = -1
}
// TODO(drahn) check curfile filesize, increment curfile if this puts it over
curFile := 0
blkHeight := oBlkHeight - 1
err = db.setBlk(sha, blkHeight, curFile, buf)
if err != nil {
return
}
// update the last block cache
db.lastBlkShaCached = true
db.lastBlkSha = *sha
db.lastBlkIdx = blkHeight
return blkHeight, nil
}
// fetchSha returns the datablock and pver for the given ShaHash.
func (db *LevelDb) fetchSha(sha *btcwire.ShaHash) (rbuf []byte, rpver uint32,
rblkHeight int64, err error) {
var blkHeight int64
var buf []byte
blkHeight, _, buf, err = db.getBlk(sha)
if err != nil {
return
}
fakepver := uint32(1)
return buf, fakepver, blkHeight, nil
}
// ExistsSha looks up the given block hash
// returns true if it is present in the database.
func (db *LevelDb) ExistsSha(sha *btcwire.ShaHash) (exists bool) {
db.dbLock.Lock()
defer db.dbLock.Unlock()
_, exists = db.fetchBlockCache(sha)
if exists {
return
}
// not in cache, try database
exists = db.blkExistsSha(sha)
return
}
// blkExistsSha looks up the given block hash
// returns true if it is present in the database.
// CALLED WITH LOCK HELD
func (db *LevelDb) blkExistsSha(sha *btcwire.ShaHash) bool {
var pver uint32
oBlkHeight, _, err:= db.getBlkLoc(sha)
if err != nil {
/*
should this warn if the failure is something besides does not exist ?
log.Warnf("blkExistsSha: fail %v", err)
*/
return false
}
return true
}
// FetchBlockShaByHeight returns a block hash based on its height in the
// block chain.
func (db *LevelDb) FetchBlockShaByHeight(height int64) (sha *btcwire.ShaHash, err error) {
db.dbLock.Lock()
defer db.dbLock.Unlock()
return db.fetchBlockShaByHeight(height)
}
// fetchBlockShaByHeight returns a block hash based on its height in the
// block chain.
func (db *LevelDb) fetchBlockShaByHeight(height int64) (sha *btcwire.ShaHash, err error) {
// TODO(drahn) figure out which file blkHeight is located
blkFile := 0
var buf []byte
_, buf, err = db.getBlkByHeight(height, blkFile)
var shaval btcwire.ShaHash
shaval.SetBytes(buf[0:31])
return &shaval, nil
}
// FetchHeightRange looks up a range of blocks by the start and ending
// heights. Fetch is inclusive of the start height and exclusive of the
// ending height. To fetch all hashes from the start height until no
// more are present, use the special id `AllShas'.
func (db *LevelDb) FetchHeightRange(startHeight, endHeight int64) (rshalist []btcwire.ShaHash, err error) {
db.dbLock.Lock()
defer db.dbLock.Unlock()
var endidx int64
if endHeight == btcdb.AllShas {
endidx = startHeight + 500
} else {
endidx = endHeight
}
var shalist []btcwire.ShaHash
for height := startHeight; height < endidx; height++ {
// TODO(drahn) fix blkFile from height
blkFile := 0
key := fmt.Sprintf("%d", height)
blkVal, lerr := db.bBlkDb[blkFile].Get([]byte(key), db.ro)
if lerr != nil {
break
}
var sha btcwire.ShaHash
sha.SetBytes(blkVal[0:31])
shalist = append(shalist, sha)
}
if err == nil {
return
}
log.Tracef("FetchIdxRange idx %v %v returned %v shas err %v", startHeight, endHeight, len(shalist), err)
return shalist, nil
}
// NewestSha returns the hash and block height of the most recent (end) block of
// the block chain. It will return the zero hash, -1 for the block height, and
// no error (nil) if there are not any blocks in the database yet.
func (db *LevelDb) NewestSha() (rsha *btcwire.ShaHash, rblkid int64, err error) {
db.dbLock.Lock()
defer db.dbLock.Unlock()
if db.lastBlkIdx == -1 {
err = errors.New("Empty Database")
return
}
sha := db.lastBlkSha
return &sha, db.lastBlkIdx, nil
}
func (db *LevelDb) NewIterateBlocks() (rbogus btcdb.BlockIterator, err error) {
err = errors.New("Not implemented")
return
}

375
leveldb/dbcache.go Normal file
View file

@ -0,0 +1,375 @@
// Copyright (c) 2013 Conformal Systems LLC.
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
package ldb
import (
"bytes"
"container/list"
"github.com/conformal/btcdb"
"github.com/conformal/btcutil"
"github.com/conformal/btcwire"
"sync"
)
type txCache struct {
maxcount int
fifo list.List
// NOTE: the key is specifically ShaHash, not *ShaHash
txMap map[btcwire.ShaHash]*txCacheObj
cacheLock sync.RWMutex
}
type txCacheObj struct {
next *txCacheObj
sha btcwire.ShaHash
blksha btcwire.ShaHash
pver uint32
tx *btcwire.MsgTx
height int64
spent []byte
txbuf []byte
}
type blockCache struct {
maxcount int
fifo list.List
blockMap map[btcwire.ShaHash]*blockCacheObj
blockHeightMap map[int64]*blockCacheObj
cacheLock sync.RWMutex
}
type blockCacheObj struct {
next *blockCacheObj
sha btcwire.ShaHash
blk *btcutil.Block
}
// FetchBlockBySha - return a btcutil Block, object may be a cached.
func (db *LevelDb) FetchBlockBySha(sha *btcwire.ShaHash) (blk *btcutil.Block, err error) {
db.dbLock.Lock()
defer db.dbLock.Unlock()
return db.fetchBlockBySha(sha)
}
// fetchBlockBySha - return a btcutil Block, object may be a cached.
// Must be called with db lock held.
func (db *LevelDb) fetchBlockBySha(sha *btcwire.ShaHash) (blk *btcutil.Block, err error) {
blkcache, ok := db.fetchBlockCache(sha)
if ok {
return blkcache.blk, nil
}
buf, pver, height, err := db.fetchSha(sha)
if err != nil {
return
}
blk, err = btcutil.NewBlockFromBytes(buf, pver)
if err != nil {
return
}
blk.SetHeight(height)
db.insertBlockCache(sha, blk)
return
}
// fetchBlockCache check if a block is in the block cache, if so return it.
func (db *LevelDb) fetchBlockCache(sha *btcwire.ShaHash) (*blockCacheObj, bool) {
db.blockCache.cacheLock.RLock()
defer db.blockCache.cacheLock.RUnlock()
blkobj, ok := db.blockCache.blockMap[*sha]
if !ok { // could this just return the map deref?
return nil, false
}
return blkobj, true
}
// fetchBlockHeightCache check if a block is in the block cache, if so return it.
func (db *LevelDb) fetchBlockHeightCache(height int64) (*blockCacheObj, bool) {
db.blockCache.cacheLock.RLock()
defer db.blockCache.cacheLock.RUnlock()
blkobj, ok := db.blockCache.blockHeightMap[height]
if !ok { // could this just return the map deref?
return nil, false
}
return blkobj, true
}
// insertBlockCache insert the given sha/block into the cache map.
// If the block cache is determined to be full, it will release
// an old entry in FIFO order.
func (db *LevelDb) insertBlockCache(sha *btcwire.ShaHash, blk *btcutil.Block) {
bc := &db.blockCache
bc.cacheLock.Lock()
defer bc.cacheLock.Unlock()
blkObj := blockCacheObj{sha: *sha, blk: blk}
bc.fifo.PushBack(&blkObj)
if bc.fifo.Len() > bc.maxcount {
listobj := bc.fifo.Front()
bc.fifo.Remove(listobj)
tailObj, ok := listobj.Value.(*blockCacheObj)
if ok {
delete(bc.blockMap, tailObj.sha)
delete(bc.blockHeightMap, tailObj.blk.Height())
} else {
panic("invalid type pushed on blockCache list")
}
}
bc.blockHeightMap[blk.Height()] = &blkObj
bc.blockMap[blkObj.sha] = &blkObj
}
// FetchTxByShaList given a array of ShaHash, look up the transactions
// and return them in a TxListReply array.
func (db *LevelDb) FetchTxByShaList(txShaList []*btcwire.ShaHash) []*btcdb.TxListReply {
db.dbLock.Lock()
defer db.dbLock.Unlock()
var replies []*btcdb.TxListReply
for _, txsha := range txShaList {
tx, _, _, _, height, txspent, err := db.fetchTxDataBySha(txsha)
btxspent := []bool{}
if err == nil {
btxspent = make([]bool, len(tx.TxOut), len(tx.TxOut))
for idx := range tx.TxOut {
byteidx := idx / 8
byteoff := uint(idx % 8)
btxspent[idx] = (txspent[byteidx] & (byte(1) << byteoff)) != 0
}
}
txlre := btcdb.TxListReply{Sha: txsha, Tx: tx, Height: height, TxSpent: btxspent, Err: err}
replies = append(replies, &txlre)
}
return replies
}
// fetchTxDataBySha returns several pieces of data regarding the given sha.
func (db *LevelDb) fetchTxDataBySha(txsha *btcwire.ShaHash) (rtx *btcwire.MsgTx, rtxbuf []byte, rpver uint32, rblksha *btcwire.ShaHash, rheight int64, rtxspent []byte, err error) {
var pver uint32
var blksha *btcwire.ShaHash
var height int64
var txspent []byte
var toff int
var tlen int
var blk *btcutil.Block
var blkbuf []byte
// Check Tx cache
if txc, ok := db.fetchTxCache(txsha); ok {
if txc.spent != nil {
return txc.tx, txc.txbuf, txc.pver, &txc.blksha, txc.height, txc.spent, nil
}
}
// If not cached load it
height, toff, tlen, txspent, err = db.fetchLocationUsedBySha(txsha)
if err != nil {
return
}
blksha, err = db.fetchBlockShaByHeight(height)
if err != nil {
log.Warnf("block idx lookup %v to %v", height, err)
return
}
log.Tracef("transaction %v is at block %v %v tx %v",
txsha, blksha, height, toff)
blk, err = db.fetchBlockBySha(blksha)
if err != nil {
log.Warnf("unable to fetch block %v %v ",
height, &blksha)
return
}
blkbuf, pver, err = blk.Bytes()
if err != nil {
log.Warnf("unable to decode block %v %v", height, &blksha)
return
}
txbuf := make([]byte, tlen)
copy(txbuf[:], blkbuf[toff:toff+tlen])
rbuf := bytes.NewBuffer(txbuf)
var tx btcwire.MsgTx
err = tx.BtcDecode(rbuf, pver)
if err != nil {
log.Warnf("unable to decode tx block %v %v txoff %v txlen %v",
height, &blksha, toff, tlen)
return
}
// Shove data into TxCache
// XXX -
var txc txCacheObj
txc.sha = *txsha
txc.tx = &tx
txc.txbuf = txbuf
txc.pver = pver
txc.height = height
txc.spent = txspent
txc.blksha = *blksha
db.insertTxCache(&txc)
return &tx, txbuf, pver, blksha, height, txspent, nil
}
// FetchTxAllBySha returns several pieces of data regarding the given sha.
func (db *LevelDb) FetchTxAllBySha(txsha *btcwire.ShaHash) (rtx *btcwire.MsgTx, rtxbuf []byte, rpver uint32, rblksha *btcwire.ShaHash, err error) {
var pver uint32
var blksha *btcwire.ShaHash
var height int64
var toff int
var tlen int
var blk *btcutil.Block
var blkbuf []byte
// Check Tx cache
if txc, ok := db.fetchTxCache(txsha); ok {
return txc.tx, txc.txbuf, txc.pver, &txc.blksha, nil
}
// If not cached load it
height, toff, tlen, err = db.FetchLocationBySha(txsha)
if err != nil {
return
}
blksha, err = db.FetchBlockShaByHeight(height)
if err != nil {
log.Warnf("block idx lookup %v to %v", height, err)
return
}
log.Tracef("transaction %v is at block %v %v tx %v",
txsha, blksha, height, toff)
blk, err = db.FetchBlockBySha(blksha)
if err != nil {
log.Warnf("unable to fetch block %v %v ",
height, &blksha)
return
}
blkbuf, pver, err = blk.Bytes()
if err != nil {
log.Warnf("unable to decode block %v %v", height, &blksha)
return
}
txbuf := make([]byte, tlen)
copy(txbuf[:], blkbuf[toff:toff+tlen])
rbuf := bytes.NewBuffer(txbuf)
var tx btcwire.MsgTx
err = tx.BtcDecode(rbuf, pver)
if err != nil {
log.Warnf("unable to decode tx block %v %v txoff %v txlen %v",
height, &blksha, toff, tlen)
return
}
// Shove data into TxCache
// XXX -
var txc txCacheObj
txc.sha = *txsha
txc.tx = &tx
txc.txbuf = txbuf
txc.pver = pver
txc.height = height
txc.blksha = *blksha
db.insertTxCache(&txc)
return &tx, txbuf, pver, blksha, nil
}
// FetchTxBySha returns some data for the given Tx Sha.
func (db *LevelDb) FetchTxBySha(txsha *btcwire.ShaHash) (rtx *btcwire.MsgTx, rpver uint32, blksha *btcwire.ShaHash, err error) {
rtx, _, rpver, blksha, err = db.FetchTxAllBySha(txsha)
return
}
// FetchTxBufBySha return the bytestream data and associated protocol version.
// for the given Tx Sha
func (db *LevelDb) FetchTxBufBySha(txsha *btcwire.ShaHash) (txbuf []byte, rpver uint32, err error) {
_, txbuf, rpver, _, err = db.FetchTxAllBySha(txsha)
return
}
// fetchTxCache look up the given transaction in the Tx cache.
func (db *LevelDb) fetchTxCache(sha *btcwire.ShaHash) (*txCacheObj, bool) {
tc := &db.txCache
tc.cacheLock.RLock()
defer tc.cacheLock.RUnlock()
txObj, ok := tc.txMap[*sha]
if !ok { // could this just return the map deref?
return nil, false
}
return txObj, true
}
// insertTxCache, insert the given txobj into the cache.
// if the tx cache is determined to be full, it will release
// an old entry in FIFO order.
func (db *LevelDb) insertTxCache(txObj *txCacheObj) {
tc := &db.txCache
tc.cacheLock.Lock()
defer tc.cacheLock.Unlock()
tc.fifo.PushBack(txObj)
if tc.fifo.Len() >= tc.maxcount {
listobj := tc.fifo.Front()
tc.fifo.Remove(listobj)
tailObj, ok := listobj.Value.(*txCacheObj)
if ok {
delete(tc.txMap, tailObj.sha)
} else {
panic("invalid type pushed on tx list")
}
}
tc.txMap[txObj.sha] = txObj
}
// InvalidateTxCache clear/release all cached transactions.
func (db *LevelDb) InvalidateTxCache() {
tc := &db.txCache
tc.cacheLock.Lock()
defer tc.cacheLock.Unlock()
tc.txMap = map[btcwire.ShaHash]*txCacheObj{}
tc.fifo = list.List{}
}
// InvalidateTxCache clear/release all cached blocks.
func (db *LevelDb) InvalidateBlockCache() {
bc := &db.blockCache
bc.cacheLock.Lock()
defer bc.cacheLock.Unlock()
bc.blockMap = map[btcwire.ShaHash]*blockCacheObj{}
bc.blockHeightMap = map[int64]*blockCacheObj{}
bc.fifo = list.List{}
}
// InvalidateCache clear/release all cached blocks and transactions.
func (db *LevelDb) InvalidateCache() {
db.InvalidateTxCache()
db.InvalidateBlockCache()
}

15
leveldb/doc.go Normal file
View file

@ -0,0 +1,15 @@
// Copyright (c) 2013 Conformal Systems LLC.
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
/*
Package sqlite3 implements a sqlite3 instance of btcdb.
sqlite provides a zero setup, single file database. It requires cgo
and the presence of the sqlite library and headers, but nothing else.
The performance is generally high although it goes down with database
size.
Many of the block or tx specific functions for btcdb are in this subpackage.
*/
package ldb

View file

@ -0,0 +1,209 @@
// Copyright (c) 2013 Conformal Systems LLC.
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
package ldb_test
import (
"github.com/conformal/btcdb"
"github.com/conformal/btcdb/sqlite3"
"github.com/conformal/btcutil"
"github.com/conformal/btcwire"
"os"
"path/filepath"
"testing"
)
var tstBlocks []*btcutil.Block
func loadblocks(t *testing.T) []*btcutil.Block {
if len(tstBlocks) != 0 {
return tstBlocks
}
testdatafile := filepath.Join("testdata", "blocks1-256.bz2")
blocks, err := loadBlocks(t, testdatafile)
if err != nil {
t.Errorf("Unable to load blocks from test data: %v", err)
return nil
}
tstBlocks = blocks
return blocks
}
func TestUnspentInsert(t *testing.T) {
testUnspentInsert(t, dbTmDefault)
testUnspentInsert(t, dbTmNormal)
testUnspentInsert(t, dbTmFast)
}
// insert every block in the test chain
// after each insert, fetch all the tx affected by the latest
// block and verify that the the tx is spent/unspent
// new tx should be fully unspent, referenced tx should have
// the associated txout set to spent.
func testUnspentInsert(t *testing.T, mode int) {
// Ignore db remove errors since it means we didn't have an old one.
dbname := "tstdbuspnt1"
_ = os.Remove(dbname)
db, err := btcdb.CreateDB("sqlite", dbname)
if err != nil {
t.Errorf("Failed to open test database %v", err)
return
}
defer os.Remove(dbname)
defer db.Close()
switch mode {
case dbTmDefault: // default
// no setup
case dbTmNormal: // explicit normal
db.SetDBInsertMode(btcdb.InsertNormal)
case dbTmFast: // fast mode
db.SetDBInsertMode(btcdb.InsertFast)
if sqldb, ok := db.(*sqlite3.LevelDb); ok {
sqldb.TempTblMax = 100
} else {
t.Errorf("not right type")
}
case dbTmNoVerify: // validated block
t.Errorf("UnspentInsert test is not valid in NoVerify mode")
}
// Since we are dealing with small dataset, reduce cache size
sqlite3.SetBlockCacheSize(db, 2)
sqlite3.SetTxCacheSize(db, 3)
blocks := loadblocks(t)
endtest:
for height := int64(0); height < int64(len(blocks)); height++ {
block := blocks[height]
// look up inputs to this x
mblock := block.MsgBlock()
var txneededList []*btcwire.ShaHash
var txlookupList []*btcwire.ShaHash
var txOutList []*btcwire.ShaHash
var txInList []*btcwire.OutPoint
for _, tx := range mblock.Transactions {
for _, txin := range tx.TxIn {
if txin.PreviousOutpoint.Index == uint32(4294967295) {
continue
}
origintxsha := &txin.PreviousOutpoint.Hash
txInList = append(txInList, &txin.PreviousOutpoint)
txneededList = append(txneededList, origintxsha)
txlookupList = append(txlookupList, origintxsha)
if !db.ExistsTxSha(origintxsha) {
t.Errorf("referenced tx not found %v ", origintxsha)
}
}
txshaname, _ := tx.TxSha(block.ProtocolVersion())
txlookupList = append(txlookupList, &txshaname)
txOutList = append(txOutList, &txshaname)
}
txneededmap := map[btcwire.ShaHash]*btcdb.TxListReply{}
txlist := db.FetchTxByShaList(txneededList)
for _, txe := range txlist {
if txe.Err != nil {
t.Errorf("tx list fetch failed %v err %v ", txe.Sha, txe.Err)
break endtest
}
txneededmap[*txe.Sha] = txe
}
for _, spend := range txInList {
itxe := txneededmap[spend.Hash]
if itxe.TxSpent[spend.Index] == true {
t.Errorf("txin %v:%v is already spent", spend.Hash, spend.Index)
}
}
newheight, err := db.InsertBlock(block)
if err != nil {
t.Errorf("failed to insert block %v err %v", height, err)
break endtest
}
if newheight != height {
t.Errorf("height mismatch expect %v returned %v", height, newheight)
break endtest
}
txlookupmap := map[btcwire.ShaHash]*btcdb.TxListReply{}
txlist = db.FetchTxByShaList(txlookupList)
for _, txe := range txlist {
if txe.Err != nil {
t.Errorf("tx list fetch failed %v err %v ", txe.Sha, txe.Err)
break endtest
}
txlookupmap[*txe.Sha] = txe
}
for _, spend := range txInList {
itxe := txlookupmap[spend.Hash]
if itxe.TxSpent[spend.Index] == false {
t.Errorf("txin %v:%v is unspent %v", spend.Hash, spend.Index, itxe.TxSpent)
}
}
for _, txo := range txOutList {
itxe := txlookupmap[*txo]
for i, spent := range itxe.TxSpent {
if spent == true {
t.Errorf("freshly inserted tx %v already spent %v", txo, i)
}
}
}
if len(txInList) == 0 {
continue
}
dropblock := blocks[height-1]
dropsha, _ := dropblock.Sha()
err = db.DropAfterBlockBySha(dropsha)
if err != nil {
t.Errorf("failed to drop block %v err %v", height, err)
break endtest
}
txlookupmap = map[btcwire.ShaHash]*btcdb.TxListReply{}
txlist = db.FetchTxByShaList(txlookupList)
for _, txe := range txlist {
if txe.Err != nil {
if _, ok := txneededmap[*txe.Sha]; ok {
t.Errorf("tx list fetch failed %v err %v ", txe.Sha, txe.Err)
break endtest
}
}
txlookupmap[*txe.Sha] = txe
}
for _, spend := range txInList {
itxe := txlookupmap[spend.Hash]
if itxe.TxSpent[spend.Index] == true {
t.Errorf("txin %v:%v is unspent %v", spend.Hash, spend.Index, itxe.TxSpent)
}
}
newheight, err = db.InsertBlock(block)
if err != nil {
t.Errorf("failed to insert block %v err %v", height, err)
break endtest
}
txlookupmap = map[btcwire.ShaHash]*btcdb.TxListReply{}
txlist = db.FetchTxByShaList(txlookupList)
for _, txe := range txlist {
if txe.Err != nil {
t.Errorf("tx list fetch failed %v err %v ", txe.Sha, txe.Err)
break endtest
}
txlookupmap[*txe.Sha] = txe
}
for _, spend := range txInList {
itxe := txlookupmap[spend.Hash]
if itxe.TxSpent[spend.Index] == false {
t.Errorf("txin %v:%v is unspent %v", spend.Hash, spend.Index, itxe.TxSpent)
}
}
}
}

48
leveldb/internal_test.go Normal file
View file

@ -0,0 +1,48 @@
// Copyright (c) 2013 Conformal Systems LLC.
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
package ldb
import (
"fmt"
"github.com/conformal/btcdb"
"github.com/conformal/btcwire"
)
// FetchSha returns the datablock and pver for the given ShaHash.
// This is a testing only interface.
func FetchSha(db btcdb.Db, sha *btcwire.ShaHash) (buf []byte, pver uint32,
blkid int64, err error) {
sqldb, ok := db.(*LevelDb)
if !ok {
err = fmt.Errorf("Invalid data type")
return
}
buf, pver, blkid, err = sqldb.fetchSha(*sha)
return
}
// SetBlockCacheSize configures the maximum number of blocks in the cache to
// be the given size should be made before any fetching.
// This is a testing only interface.
func SetBlockCacheSize(db btcdb.Db, newsize int) {
sqldb, ok := db.(*LevelDb)
if !ok {
return
}
bc := &sqldb.blockCache
bc.maxcount = newsize
}
// SetTxCacheSize configures the maximum number of tx in the cache to
// be the given size should be made before any fetching.
// This is a testing only interface.
func SetTxCacheSize(db btcdb.Db, newsize int) {
sqldb, ok := db.(*LevelDb)
if !ok {
return
}
tc := &sqldb.txCache
tc.maxcount = newsize
}

798
leveldb/leveldb.go Normal file
View file

@ -0,0 +1,798 @@
// Copyright (c) 2013 Conformal Systems LLC.
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
package ldb
import (
"database/sql"
"fmt"
"github.com/conformal/btcdb"
"github.com/conformal/btcutil"
"github.com/conformal/btcwire"
"github.com/conformal/seelog"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/opt"
_ "github.com/mattn/go-sqlite3"
"os"
"sync"
)
const (
dbVersion int = 2
dbMaxTransCnt = 20000
dbMaxTransMem = 64 * 1024 * 1024 // 64 MB
)
var log seelog.LoggerInterface = seelog.Disabled
type tBlockInsertData struct {
sha btcwire.ShaHash
pver uint32
buf []byte
}
type tTxInsertData struct {
txsha *btcwire.ShaHash
blockid int64
txoff int
txlen int
usedbuf []byte
}
type txState struct {
tx *sql.Tx
writeCount int
txDataSz int
txInsertList []interface{}
}
type LevelDb struct {
// to be removed
sqldb *sql.DB
blkStmts []*sql.Stmt
blkBaseStmts []*sql.Stmt
txStmts []*sql.Stmt
txBaseStmts []*sql.Stmt
txState txState
// lock preventing multiple entry
dbLock sync.Mutex
// leveldb pieces
bShaDb *leveldb.DB
bBlkDb []*leveldb.DB
tShaDb *leveldb.DB
tLocDb []*leveldb.DB
tSpentDb []*leveldb.DB
blkOpen int
txOpen int
txSpentOpen int
ro *opt.ReadOptions
wo *opt.WriteOptions
lastBlkShaCached bool
lastBlkSha btcwire.ShaHash
lastBlkIdx int64
txCache txCache
blockCache blockCache
UseTempTX bool
TempTblSz int
TempTblMax int
dbInsertMode btcdb.InsertMode
}
var self = btcdb.DriverDB{DbType: "sqlite", Create: CreateSqliteDB, Open: OpenSqliteDB}
func init() {
btcdb.AddDBDriver(self)
}
// createDB configure the database, setting up all tables to initial state.
func createDB(db *sql.DB) error {
log.Infof("Initializing new block database")
// XXX check for old tables
buildTables := []string{
"CREATE TABLE dbversion (version integer);",
"CREATE TABLE block ( blockid INTEGER PRIMARY KEY, key BLOB UNIQUE, " +
"pver INTEGER NOT NULL, data BLOB NOT NULL);",
"INSERT INTO dbversion (version) VALUES (" + fmt.Sprintf("%d", dbVersion) +
");",
}
buildtxTables := []string{
"CREATE TABLE tx (txidx INTEGER PRIMARY KEY, " +
"key TEXT, " +
"blockid INTEGER NOT NULL, " +
"txoff INTEGER NOT NULL, txlen INTEGER NOT NULL, " +
"data BLOB NOT NULL, " +
"FOREIGN KEY(blockid) REFERENCES block(blockid));",
"CREATE TABLE txtmp (key TEXT PRIMARY KEY, " +
"blockid INTEGER NOT NULL, " +
"txoff INTEGER NOT NULL, txlen INTEGER NOT NULL, " +
"data BLOB NOT NULL, " +
"FOREIGN KEY(blockid) REFERENCES block(blockid));",
"CREATE UNIQUE INDEX uniquetx ON tx (key);",
}
for _, sql := range buildTables {
_, err := db.Exec(sql)
if err != nil {
log.Warnf("sql table op failed %v [%v]", err, sql)
return err
}
}
for _, sql := range buildtxTables {
_, err := db.Exec(sql)
if err != nil {
log.Warnf("sql table op failed %v [%v]", err, sql)
return err
}
}
return nil
}
// OpenSqliteDB opens an existing database for use.
func OpenSqliteDB(filepath string) (pbdb btcdb.Db, err error) {
log = btcdb.GetLog()
return newOrCreateSqliteDB(filepath, false)
}
// CreateSqliteDB creates, initializes and opens a database for use.
func CreateSqliteDB(filepath string) (pbdb btcdb.Db, err error) {
log = btcdb.GetLog()
return newOrCreateSqliteDB(filepath, true)
}
// newOrCreateSqliteDB opens a database, either creating it or opens
// existing database based on flag.
func newOrCreateSqliteDB(filepath string, create bool) (pbdb btcdb.Db, err error) {
var bdb LevelDb
if create == false {
_, err = os.Stat(filepath)
if err != nil {
return nil, btcdb.DbDoesNotExist
}
}
db, err := sql.Open("sqlite3", filepath)
if err != nil {
log.Warnf("db open failed %v\n", err)
return nil, err
}
dbverstmt, err := db.Prepare("SELECT version FROM dbversion;")
if err != nil {
// about the only reason this would fail is that the database
// is not initialized
if create == false {
return nil, btcdb.DbDoesNotExist
}
err = createDB(db)
if err != nil {
// already warned in the called function
return nil, err
}
dbverstmt, err = db.Prepare("SELECT version FROM dbversion;")
if err != nil {
// if it failed this a second time, fail.
return nil, err
}
}
row := dbverstmt.QueryRow()
var version int
err = row.Scan(&version)
if err != nil {
log.Warnf("unable to find db version: no row\n", err)
}
switch version {
case dbVersion:
// all good
default:
log.Warnf("mismatch db version: %v expected %v\n", version, dbVersion)
return nil, fmt.Errorf("Invalid version in database")
}
db.Exec("PRAGMA foreign_keys = ON;")
db.Exec("PRAGMA journal_mode=WAL;")
bdb.sqldb = db
bdb.blkStmts = make([]*sql.Stmt, len(blkqueries))
bdb.blkBaseStmts = make([]*sql.Stmt, len(blkqueries))
for i := range blkqueries {
stmt, err := db.Prepare(blkqueries[i])
if err != nil {
// XXX log/
return nil, err
}
bdb.blkBaseStmts[i] = stmt
}
for i := range bdb.blkBaseStmts {
bdb.blkStmts[i] = bdb.blkBaseStmts[i]
}
bdb.txBaseStmts = make([]*sql.Stmt, len(txqueries))
for i := range txqueries {
stmt, err := db.Prepare(txqueries[i])
if err != nil {
// XXX log/
return nil, err
}
bdb.txBaseStmts[i] = stmt
}
// NOTE: all array entries in txStmts remain nil'ed
// tx statements are lazy bound
bdb.txStmts = make([]*sql.Stmt, len(txqueries))
bdb.blockCache.maxcount = 150
bdb.blockCache.blockMap = map[btcwire.ShaHash]*blockCacheObj{}
bdb.blockCache.blockMap = map[btcwire.ShaHash]*blockCacheObj{}
bdb.blockCache.blockHeightMap = map[int64]*blockCacheObj{}
bdb.txCache.maxcount = 2000
bdb.txCache.txMap = map[btcwire.ShaHash]*txCacheObj{}
bdb.UseTempTX = true
bdb.TempTblMax = 1000000
return &bdb, nil
}
// Sync verifies that the database is coherent on disk,
// and no outstanding transactions are in flight.
func (db *LevelDb) Sync() {
db.dbLock.Lock()
defer db.dbLock.Unlock()
db.endTx(true)
}
// syncPoint notifies the db that this is a safe time to sync the database,
// if there are many outstanding transactions.
// Must be called with db lock held.
func (db *LevelDb) syncPoint() {
tx := &db.txState
if db.TempTblSz > db.TempTblMax {
err := db.migrateTmpTable()
if err != nil {
return
}
} else {
if len(tx.txInsertList) > dbMaxTransCnt || tx.txDataSz > dbMaxTransMem {
db.endTx(true)
}
}
}
// Close cleanly shuts down database, syncing all data.
func (db *LevelDb) Close() {
db.dbLock.Lock()
defer db.dbLock.Unlock()
db.close()
}
// RollbackClose discards the recent database changes to the previously
// saved data at last Sync.
func (db *LevelDb) RollbackClose() {
db.dbLock.Lock()
defer db.dbLock.Unlock()
tx := &db.txState
if tx.tx != nil {
err := tx.tx.Rollback()
if err != nil {
log.Debugf("Rollback failed: %v", err)
} else {
tx.tx = nil
}
}
db.close()
}
// close performs the internal shutdown/close operation.
func (db *LevelDb) close() {
db.endTx(true)
db.InvalidateCache()
for i := range db.blkBaseStmts {
db.blkBaseStmts[i].Close()
}
for i := range db.txBaseStmts {
if db.txBaseStmts[i] != nil {
db.txBaseStmts[i].Close()
db.txBaseStmts[i] = nil
}
}
db.sqldb.Close()
}
// txop returns the appropriately prepared statement, based on
// transaction state of the database.
func (db *LevelDb) txop(op int) *sql.Stmt {
if db.txStmts[op] != nil {
return db.txStmts[op]
}
if db.txState.tx == nil {
// we are not in a transaction, return the base statement
return db.txBaseStmts[op]
}
if db.txStmts[op] == nil {
db.txStmts[op] = db.txState.tx.Stmt(db.txBaseStmts[op])
}
return db.txStmts[op]
}
// startTx starts a transaction, preparing or scrubbing statements
// for proper operation inside a transaction.
func (db *LevelDb) startTx() (err error) {
tx := &db.txState
if tx.tx != nil {
// this shouldn't happen...
log.Warnf("Db startTx called while in a transaction")
return
}
tx.tx, err = db.sqldb.Begin()
if err != nil {
log.Warnf("Db startTx: begin failed %v", err)
tx.tx = nil
return
}
for i := range db.blkBaseStmts {
db.blkStmts[i] = tx.tx.Stmt(db.blkBaseStmts[i])
}
for i := range db.txBaseStmts {
db.txStmts[i] = nil // these are lazily prepared
}
return
}
// endTx commits the current active transaction, it zaps all of the prepared
// statements associated with the transaction.
func (db *LevelDb) endTx(recover bool) (err error) {
tx := &db.txState
if tx.tx == nil {
return
}
err = tx.tx.Commit()
if err != nil && recover {
// XXX - double check that the tx is dead after
// commit failure (rollback?)
log.Warnf("Db endTx: commit failed %v", err)
err = db.rePlayTransaction()
if err != nil {
// We tried, return failure (after zeroing state)
// so the upper level can notice and restart
}
}
for i := range db.blkBaseStmts {
db.blkStmts[i].Close()
db.blkStmts[i] = db.blkBaseStmts[i]
}
for i := range db.txStmts {
if db.txStmts[i] != nil {
db.txStmts[i].Close()
db.txStmts[i] = nil
}
}
tx.tx = nil
var emptyTxList []interface{}
tx.txInsertList = emptyTxList
tx.txDataSz = 0
return
}
// rePlayTransaction will attempt to re-execute inserts performed
// sync the beginning of a transaction. This is to be used after
// a sql Commit operation fails to keep the database from losing data.
func (db *LevelDb) rePlayTransaction() (err error) {
err = db.startTx()
if err != nil {
return
}
tx := &db.txState
for _, ins := range tx.txInsertList {
switch v := ins.(type) {
case tBlockInsertData:
block := v
_, err = db.blkStmts[blkInsertSha].Exec(block.sha.Bytes(),
block.pver, block.buf)
if err != nil {
break
}
case tTxInsertData:
txd := v
txnamebytes := txd.txsha.Bytes()
txop := db.txop(txInsertStmt)
_, err = txop.Exec(txd.blockid, txnamebytes, txd.txoff,
txd.txlen, txd.usedbuf)
if err != nil {
break
}
}
}
// This function is called even if we have failed.
// We need to clean up so the database can be used again.
// However we want the original error not any new error,
// unless there was no original error but the commit fails.
err2 := db.endTx(false)
if err == nil && err2 != nil {
err = err2
}
return
}
// DropAfterBlockBySha will remove any blocks from the database after the given block.
// It terminates any existing transaction and performs its operations in an
// atomic transaction, it is terminated (committed) before exit.
func (db *LevelDb) DropAfterBlockBySha(sha *btcwire.ShaHash) (err error) {
var row *sql.Row
db.dbLock.Lock()
defer db.dbLock.Unlock()
// This is a destructive operation and involves multiple requests
// so requires a transaction, terminate any transaction to date
// and start a new transaction
err = db.endTx(true)
if err != nil {
return err
}
err = db.startTx()
if err != nil {
return err
}
var startheight int64
if db.lastBlkShaCached {
startheight = db.lastBlkIdx
} else {
querystr := "SELECT blockid FROM block ORDER BY blockid DESC;"
tx := &db.txState
if tx.tx != nil {
row = tx.tx.QueryRow(querystr)
} else {
row = db.sqldb.QueryRow(querystr)
}
var startblkidx int64
err = row.Scan(&startblkidx)
if err != nil {
log.Warnf("DropAfterBlockBySha:unable to fetch blockheight %v", err)
return err
}
startheight = startblkidx
}
// also drop any cached sha data
db.lastBlkShaCached = false
querystr := "SELECT blockid FROM block WHERE key = ?;"
tx := &db.txState
row = tx.tx.QueryRow(querystr, sha.Bytes())
var keepidx int64
err = row.Scan(&keepidx)
if err != nil {
// XXX
db.endTx(false)
return err
}
for height := startheight; height > keepidx; height = height - 1 {
var blk *btcutil.Block
blkc, ok := db.fetchBlockHeightCache(height)
if ok {
blk = blkc.blk
} else {
// must load the block from the db
sha, err = db.fetchBlockShaByHeight(height - 1)
if err != nil {
return
}
var buf []byte
var pver uint32
buf, pver, _, err = db.fetchSha(*sha)
if err != nil {
return
}
blk, err = btcutil.NewBlockFromBytes(buf, pver)
if err != nil {
return
}
}
for _, tx := range blk.MsgBlock().Transactions {
err = db.unSpend(tx)
if err != nil {
return
}
}
}
// invalidate the cache after possibly using cached entries for block
// lookup to unspend coins in them
db.InvalidateCache()
_, err = tx.tx.Exec("DELETE FROM txtmp WHERE blockid > ?", keepidx)
if err != nil {
// XXX
db.endTx(false)
return err
}
_, err = tx.tx.Exec("DELETE FROM tx WHERE blockid > ?", keepidx)
if err != nil {
// XXX
db.endTx(false)
return err
}
// delete from block last in case of foreign keys
_, err = tx.tx.Exec("DELETE FROM block WHERE blockid > ?", keepidx)
if err != nil {
// XXX
db.endTx(false)
return err
}
err = db.endTx(true)
if err != nil {
return err
}
return
}
// InsertBlock inserts raw block and transaction data from a block into the
// database. The first block inserted into the database will be treated as the
// genesis block. Every subsequent block insert requires the referenced parent
// block to already exist.
func (db *LevelDb) InsertBlock(block *btcutil.Block) (height int64, err error) {
db.dbLock.Lock()
defer db.dbLock.Unlock()
blocksha, err := block.Sha()
if err != nil {
log.Warnf("Failed to compute block sha %v", blocksha)
return
}
mblock := block.MsgBlock()
rawMsg, pver, err := block.Bytes()
if err != nil {
log.Warnf("Failed to obtain raw block sha %v", blocksha)
return
}
txloc, err := block.TxLoc()
if err != nil {
log.Warnf("Failed to obtain raw block sha %v", blocksha)
return
}
// Insert block into database
newheight, err := db.insertBlockData(blocksha, &mblock.Header.PrevBlock,
pver, rawMsg)
if err != nil {
log.Warnf("Failed to insert block %v %v %v", blocksha,
&mblock.Header.PrevBlock, err)
return
}
// At least two blocks in the long past were generated by faulty
// miners, the sha of the transaction exists in a previous block,
// detect this condition and 'accept' the block.
for txidx, tx := range mblock.Transactions {
var txsha btcwire.ShaHash
txsha, err = tx.TxSha(pver)
if err != nil {
log.Warnf("failed to compute tx name block %v idx %v err %v", blocksha, txidx, err)
return
}
// Some old blocks contain duplicate transactions
// Attempt to cleanly bypass this problem
// http://blockexplorer.com/b/91842
// http://blockexplorer.com/b/91880
if newheight == 91842 {
dupsha, err := btcwire.NewShaHashFromStr("d5d27987d2a3dfc724e359870c6644b40e497bdc0589a033220fe15429d88599")
if err != nil {
panic("invalid sha string in source")
}
if txsha == *dupsha {
log.Tracef("skipping sha %v %v", dupsha, newheight)
continue
}
}
if newheight == 91880 {
dupsha, err := btcwire.NewShaHashFromStr("e3bf3d07d4b0375638d5f1db5255fe07ba2c4cb067cd81b84ee974b6585fb468")
if err != nil {
panic("invalid sha string in source")
}
if txsha == *dupsha {
log.Tracef("skipping sha %v %v", dupsha, newheight)
continue
}
}
spentbuflen := (len(tx.TxOut) + 7) / 8
spentbuf := make([]byte, spentbuflen, spentbuflen)
if len(tx.TxOut)%8 != 0 {
for i := uint(len(tx.TxOut) % 8); i < 8; i++ {
spentbuf[spentbuflen-1] |= (byte(1) << i)
}
}
err = db.insertTx(&txsha, newheight, txloc[txidx].TxStart, txloc[txidx].TxLen, spentbuf)
if err != nil {
log.Warnf("block %v idx %v failed to insert tx %v %v err %v", blocksha, newheight, &txsha, txidx, err)
var oBlkIdx int64
oBlkIdx, _, _, err = db.fetchLocationBySha(&txsha)
log.Warnf("oblkidx %v err %v", oBlkIdx, err)
return
}
err = db.doSpend(tx)
if err != nil {
log.Warnf("block %v idx %v failed to spend tx %v %v err %v", blocksha, newheight, &txsha, txidx, err)
return
}
}
db.syncPoint()
return newheight, nil
}
// SetDBInsertMode provides hints to the database to how the application
// is running this allows the database to work in optimized modes when the
// database may be very busy.
func (db *LevelDb) SetDBInsertMode(newmode btcdb.InsertMode) {
oldMode := db.dbInsertMode
switch newmode {
case btcdb.InsertNormal:
// Normal mode inserts tx directly into the tx table
db.UseTempTX = false
db.dbInsertMode = newmode
switch oldMode {
case btcdb.InsertFast:
if db.TempTblSz != 0 {
err := db.migrateTmpTable()
if err != nil {
return
}
}
case btcdb.InsertValidatedInput:
// generate tx indexes
txop := db.txop(txMigrateFinish)
_, err := txop.Exec()
if err != nil {
log.Warnf("Failed to create tx table index - %v", err)
}
}
case btcdb.InsertFast:
// Fast mode inserts tx into txtmp with validation,
// then dumps to tx then rebuilds indexes at thresholds
db.UseTempTX = true
if oldMode != btcdb.InsertNormal {
log.Warnf("switching between invalid DB modes")
break
}
db.dbInsertMode = newmode
case btcdb.InsertValidatedInput:
// ValidatedInput mode inserts into tx table with
// no duplicate checks, then builds index on exit from
// ValidatedInput mode
if oldMode != btcdb.InsertNormal {
log.Warnf("switching between invalid DB modes")
break
}
// remove tx table index
txop := db.txop(txMigratePrep)
_, err := txop.Exec()
if err != nil {
log.Warnf("Failed to clear tx table index - %v", err)
}
db.dbInsertMode = newmode
// XXX
db.UseTempTX = false
}
}
func (db *LevelDb) doSpend(tx *btcwire.MsgTx) error {
for txinidx := range tx.TxIn {
txin := tx.TxIn[txinidx]
inTxSha := txin.PreviousOutpoint.Hash
inTxidx := txin.PreviousOutpoint.Index
if inTxidx == ^uint32(0) {
continue
}
//log.Infof("spending %v %v", &inTxSha, inTxidx)
err := db.setSpentData(&inTxSha, inTxidx)
if err != nil {
return err
}
}
return nil
}
func (db *LevelDb) unSpend(tx *btcwire.MsgTx) error {
for txinidx := range tx.TxIn {
txin := tx.TxIn[txinidx]
inTxSha := txin.PreviousOutpoint.Hash
inTxidx := txin.PreviousOutpoint.Index
if inTxidx == ^uint32(0) {
continue
}
err := db.clearSpentData(&inTxSha, inTxidx)
if err != nil {
return err
}
}
return nil
}
func (db *LevelDb) setSpentData(sha *btcwire.ShaHash, idx uint32) error {
return db.setclearSpentData(sha, idx, true)
}
func (db *LevelDb) clearSpentData(sha *btcwire.ShaHash, idx uint32) error {
return db.setclearSpentData(sha, idx, false)
}
func (db *LevelDb) setclearSpentData(txsha *btcwire.ShaHash, idx uint32, set bool) error {
var spentdata []byte
usingtmp := false
txop := db.txop(txFetchUsedByShaStmt)
row := txop.QueryRow(txsha.String())
err := row.Scan(&spentdata)
if err != nil {
// if the error is simply didn't fine continue otherwise
// retun failure
usingtmp = true
txop = db.txop(txtmpFetchUsedByShaStmt)
row := txop.QueryRow(txsha.String())
err := row.Scan(&spentdata)
if err != nil {
log.Warnf("Failed to locate spent data - %v %v", txsha, err)
return err
}
}
byteidx := idx / 8
byteoff := idx % 8
if set {
spentdata[byteidx] |= (byte(1) << byteoff)
} else {
spentdata[byteidx] &= ^(byte(1) << byteoff)
}
txc, cached := db.fetchTxCache(txsha)
if cached {
txc.spent = spentdata
}
if usingtmp {
txop = db.txop(txtmpUpdateUsedByShaStmt)
} else {
txop = db.txop(txUpdateUsedByShaStmt)
}
_, err = txop.Exec(spentdata, txsha.String())
return err
}

420
leveldb/operational_test.go Normal file
View file

@ -0,0 +1,420 @@
// Copyright (c) 2013 Conformal Systems LLC.
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
package ldb_test
import (
"compress/bzip2"
"encoding/binary"
"github.com/conformal/btcdb"
"github.com/conformal/btcdb/sqlite3"
"github.com/conformal/btcutil"
"github.com/conformal/btcwire"
"io"
"os"
"path/filepath"
"strings"
"testing"
)
var network = btcwire.MainNet
const (
dbTmDefault = iota
dbTmNormal
dbTmFast
dbTmNoVerify
)
func TestOperational(t *testing.T) {
testOperationalMode(t, dbTmDefault)
testOperationalMode(t, dbTmNormal)
testOperationalMode(t, dbTmFast)
testOperationalMode(t, dbTmNoVerify)
}
func testOperationalMode(t *testing.T, mode int) {
// simplified basic operation is:
// 1) fetch block from remote server
// 2) look up all txin (except coinbase in db)
// 3) insert block
// Ignore db remove errors since it means we didn't have an old one.
dbname := "tstdbop1"
_ = os.Remove(dbname)
db, err := btcdb.CreateDB("sqlite", dbname)
if err != nil {
t.Errorf("Failed to open test database %v", err)
return
}
defer os.Remove(dbname)
defer db.Close()
switch mode {
case dbTmDefault: // default
// no setup
case dbTmNormal: // explicit normal
db.SetDBInsertMode(btcdb.InsertNormal)
case dbTmFast: // fast mode
db.SetDBInsertMode(btcdb.InsertFast)
if sqldb, ok := db.(*sqlite3.LevelDb); ok {
sqldb.TempTblMax = 100
} else {
t.Errorf("not right type")
}
case dbTmNoVerify: // validated block
db.SetDBInsertMode(btcdb.InsertValidatedInput)
}
// Since we are dealing with small dataset, reduce cache size
sqlite3.SetBlockCacheSize(db, 2)
sqlite3.SetTxCacheSize(db, 3)
testdatafile := filepath.Join("testdata", "blocks1-256.bz2")
blocks, err := loadBlocks(t, testdatafile)
if err != nil {
t.Errorf("Unable to load blocks from test data for mode %v: %v",
mode, err)
return
}
err = nil
out:
for height := int64(0); height < int64(len(blocks)); height++ {
block := blocks[height]
if mode != dbTmNoVerify {
// except for NoVerify which does not allow lookups check inputs
mblock := block.MsgBlock()
var txneededList []*btcwire.ShaHash
for _, tx := range mblock.Transactions {
for _, txin := range tx.TxIn {
if txin.PreviousOutpoint.Index == uint32(4294967295) {
continue
}
origintxsha := &txin.PreviousOutpoint.Hash
txneededList = append(txneededList, origintxsha)
if !db.ExistsTxSha(origintxsha) {
t.Errorf("referenced tx not found %v ", origintxsha)
}
_, _, _, _, err := db.FetchTxAllBySha(origintxsha)
if err != nil {
t.Errorf("referenced tx not found %v err %v ", origintxsha, err)
}
_, _, _, _, err = db.FetchTxAllBySha(origintxsha)
if err != nil {
t.Errorf("referenced tx not found %v err %v ", origintxsha, err)
}
_, _, _, err = db.FetchTxBySha(origintxsha)
if err != nil {
t.Errorf("referenced tx not found %v err %v ", origintxsha, err)
}
_, _, err = db.FetchTxBufBySha(origintxsha)
if err != nil {
t.Errorf("referenced tx not found %v err %v ", origintxsha, err)
}
_, err = db.FetchTxUsedBySha(origintxsha)
if err != nil {
t.Errorf("tx used fetch fail %v err %v ", origintxsha, err)
}
}
}
txlist := db.FetchTxByShaList(txneededList)
for _, txe := range txlist {
if txe.Err != nil {
t.Errorf("tx list fetch failed %v err %v ", txe.Sha, txe.Err)
break out
}
}
}
newheight, err := db.InsertBlock(block)
if err != nil {
t.Errorf("failed to insert block %v err %v", height, err)
break out
}
if newheight != height {
t.Errorf("height mismatch expect %v returned %v", height, newheight)
break out
}
newSha, blkid, err := db.NewestSha()
if err != nil {
t.Errorf("failed to obtain latest sha %v %v", height, err)
}
if blkid != height {
t.Errorf("height doe not match latest block height %v %v", blkid, height, err)
}
blkSha, _ := block.Sha()
if *newSha != *blkSha {
t.Errorf("Newest block sha does not match freshly inserted one %v %v ", newSha, blkSha, err)
}
}
// now that db is populated, do some additional test
testFetchRangeHeight(t, db, blocks)
switch mode {
case dbTmDefault: // default
// no cleanup
case dbTmNormal: // explicit normal
// no cleanup
case dbTmFast: // fast mode
db.SetDBInsertMode(btcdb.InsertNormal)
case dbTmNoVerify: // validated block
db.SetDBInsertMode(btcdb.InsertNormal)
}
}
func TestBackout(t *testing.T) {
testBackout(t, dbTmDefault)
testBackout(t, dbTmNormal)
testBackout(t, dbTmFast)
}
func testBackout(t *testing.T, mode int) {
// simplified basic operation is:
// 1) fetch block from remote server
// 2) look up all txin (except coinbase in db)
// 3) insert block
// Ignore db remove errors since it means we didn't have an old one.
dbname := "tstdbop2"
_ = os.Remove(dbname)
db, err := btcdb.CreateDB("sqlite", dbname)
if err != nil {
t.Errorf("Failed to open test database %v", err)
return
}
defer os.Remove(dbname)
defer db.Close()
switch mode {
case dbTmDefault: // default
// no setup
case dbTmNormal: // explicit normal
db.SetDBInsertMode(btcdb.InsertNormal)
case dbTmFast: // fast mode
db.SetDBInsertMode(btcdb.InsertFast)
if sqldb, ok := db.(*sqlite3.LevelDb); ok {
sqldb.TempTblMax = 100
} else {
t.Errorf("not right type")
}
}
// Since we are dealing with small dataset, reduce cache size
sqlite3.SetBlockCacheSize(db, 2)
sqlite3.SetTxCacheSize(db, 3)
testdatafile := filepath.Join("testdata", "blocks1-256.bz2")
blocks, err := loadBlocks(t, testdatafile)
if len(blocks) < 120 {
t.Errorf("test data too small")
return
}
err = nil
for height := int64(0); height < int64(len(blocks)); height++ {
if height == 100 {
t.Logf("Syncing at block height 100")
db.Sync()
}
if height == 120 {
t.Logf("Simulating unexpected application quit")
// Simulate unexpected application quit
db.RollbackClose()
break
}
block := blocks[height]
newheight, err := db.InsertBlock(block)
if err != nil {
t.Errorf("failed to insert block %v err %v", height, err)
break
}
if newheight != height {
t.Errorf("height mismatch expect %v returned %v", height, newheight)
break
}
}
// db was closed at height 120, so no cleanup is possible.
// reopen db
db, err = btcdb.OpenDB("sqlite", dbname)
if err != nil {
t.Errorf("Failed to open test database %v", err)
return
}
defer db.Close()
sha, err := blocks[99].Sha()
if err != nil {
t.Errorf("failed to get block 99 sha err %v", err)
return
}
_ = db.ExistsSha(sha)
_, err = db.FetchBlockBySha(sha)
if err != nil {
t.Errorf("failed to load block 99 from db %v", err)
}
sha, err = blocks[110].Sha()
if err != nil {
t.Errorf("failed to get block 110 sha err %v", err)
return
}
_ = db.ExistsSha(sha)
_, err = db.FetchBlockBySha(sha)
if err == nil {
t.Errorf("loaded block 110 from db, failure expected")
return
}
block := blocks[110]
mblock := block.MsgBlock()
txsha, err := mblock.Transactions[0].TxSha(block.ProtocolVersion())
exists := db.ExistsTxSha(&txsha)
if exists {
t.Errorf("tx %v exists in db, failure expected", txsha)
}
_, _, _, err = db.FetchTxBySha(&txsha)
_, err = db.FetchTxUsedBySha(&txsha)
block = blocks[99]
mblock = block.MsgBlock()
txsha, err = mblock.Transactions[0].TxSha(block.ProtocolVersion())
oldused, err := db.FetchTxUsedBySha(&txsha)
err = db.InsertTx(&txsha, 99, 1024, 1048, oldused)
if err == nil {
t.Errorf("dup insert of tx succeeded")
return
}
}
func loadBlocks(t *testing.T, file string) (blocks []*btcutil.Block, err error) {
testdatafile := filepath.Join("testdata", "blocks1-256.bz2")
var dr io.Reader
var fi io.ReadCloser
fi, err = os.Open(testdatafile)
if err != nil {
t.Errorf("failed to open file %v, err %v", testdatafile, err)
return
}
if strings.HasSuffix(testdatafile, ".bz2") {
z := bzip2.NewReader(fi)
dr = z
} else {
dr = fi
}
defer func() {
if err := fi.Close(); err != nil {
t.Errorf("failed to close file %v %v", testdatafile, err)
}
}()
// Set the first block as the genesis block.
genesis := btcutil.NewBlock(&btcwire.GenesisBlock, btcwire.ProtocolVersion)
blocks = append(blocks, genesis)
var block *btcutil.Block
err = nil
for height := int64(1); err == nil; height++ {
var rintbuf uint32
err = binary.Read(dr, binary.LittleEndian, &rintbuf)
if err == io.EOF {
// hit end of file at expected offset: no warning
height--
err = nil
break
}
if err != nil {
t.Errorf("failed to load network type, err %v", err)
break
}
if rintbuf != uint32(network) {
t.Errorf("Block doesn't match network: %v expects %v",
rintbuf, network)
break
}
err = binary.Read(dr, binary.LittleEndian, &rintbuf)
blocklen := rintbuf
rbytes := make([]byte, blocklen)
// read block
dr.Read(rbytes)
var pver uint32
switch {
case height < 200000:
pver = 1
case height >= 200000:
pver = 2
}
block, err = btcutil.NewBlockFromBytes(rbytes, pver)
if err != nil {
t.Errorf("failed to parse block %v", height)
return
}
blocks = append(blocks, block)
}
return
}
func testFetchRangeHeight(t *testing.T, db btcdb.Db, blocks []*btcutil.Block) {
var testincrement int64 = 50
var testcnt int64 = 100
shanames := make([]*btcwire.ShaHash, len(blocks))
nBlocks := int64(len(blocks))
for i := range blocks {
blockSha, err := blocks[i].Sha()
if err != nil {
t.Errorf("FetchRangeHeight: unexpected failure computing block sah %v", err)
}
shanames[i] = blockSha
}
for startheight := int64(0); startheight < nBlocks; startheight += testincrement {
endheight := startheight + testcnt
if endheight > nBlocks {
endheight = btcdb.AllShas
}
shalist, err := db.FetchHeightRange(startheight, endheight)
if err != nil {
t.Errorf("FetchRangeHeight: unexpected failure looking up shas %v", err)
}
if endheight == btcdb.AllShas {
if int64(len(shalist)) != nBlocks-startheight {
t.Errorf("FetchRangeHeight: expected A %v shas, got %v", nBlocks-startheight, len(shalist))
}
} else {
if int64(len(shalist)) != testcnt {
t.Errorf("FetchRangeHeight: expected %v shas, got %v", testcnt, len(shalist))
}
}
for i := range shalist {
if *shanames[int64(i)+startheight] != shalist[i] {
t.Errorf("FetchRangeHeight: mismatch sha at %v requested range %v %v ", int64(i)+startheight, startheight, endheight)
}
}
}
}

BIN
leveldb/testdata/blocks1-256.bz2 vendored Normal file

Binary file not shown.

335
leveldb/tx.go Normal file
View file

@ -0,0 +1,335 @@
// Copyright (c) 2013 Conformal Systems LLC.
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
package ldb
import (
"database/sql"
"github.com/conformal/btcdb"
"github.com/conformal/btcwire"
_ "github.com/mattn/go-sqlite3"
)
// InsertTx inserts a tx hash and its associated data into the database.
func (db *LevelDb) InsertTx(txsha *btcwire.ShaHash, height int64, txoff int, txlen int, usedbuf []byte) (err error) {
db.dbLock.Lock()
defer db.dbLock.Unlock()
return db.insertTx(txsha, height, txoff, txlen, usedbuf)
}
// insertTx inserts a tx hash and its associated data into the database.
// Must be called with db lock held.
func (db *LevelDb) insertTx(txsha *btcwire.ShaHash, height int64, txoff int, txlen int, usedbuf []byte) (err error) {
tx := &db.txState
if tx.tx == nil {
err = db.startTx()
if err != nil {
return
}
}
blockid := height + 1
txd := tTxInsertData{txsha: txsha, blockid: blockid, txoff: txoff, txlen: txlen, usedbuf: usedbuf}
log.Tracef("inserting tx %v for block %v off %v len %v",
txsha, blockid, txoff, txlen)
rowBytes := txsha.String()
var op int // which table to insert data into.
if db.UseTempTX {
var tblockid int64
var ttxoff int
var ttxlen int
txop := db.txop(txFetchLocationByShaStmt)
row := txop.QueryRow(rowBytes)
err = row.Scan(&tblockid, &ttxoff, &ttxlen)
if err != sql.ErrNoRows {
// sha already present
err = btcdb.DuplicateSha
return
}
op = txtmpInsertStmt
} else {
op = txInsertStmt
}
txop := db.txop(op)
_, err = txop.Exec(rowBytes, blockid, txoff, txlen, usedbuf)
if err != nil {
log.Warnf("failed to insert %v %v %v", txsha, blockid, err)
return
}
if db.UseTempTX {
db.TempTblSz++
}
// put in insert list for replay
tx.txInsertList = append(tx.txInsertList, txd)
return
}
// ExistsTxSha returns if the given tx sha exists in the database
func (db *LevelDb) ExistsTxSha(txsha *btcwire.ShaHash) (exists bool) {
db.dbLock.Lock()
defer db.dbLock.Unlock()
if _, ok := db.fetchTxCache(txsha); ok {
return true
}
return db.existsTxSha(txsha)
}
// existsTxSha returns if the given tx sha exists in the database.o
// Must be called with the db lock held.
func (db *LevelDb) existsTxSha(txsha *btcwire.ShaHash) (exists bool) {
var blockid uint32
txop := db.txop(txExistsShaStmt)
row := txop.QueryRow(txsha.String())
err := row.Scan(&blockid)
if err == sql.ErrNoRows {
txop = db.txop(txtmpExistsShaStmt)
row = txop.QueryRow(txsha.String())
err := row.Scan(&blockid)
if err == sql.ErrNoRows {
return false
}
if err != nil {
log.Warnf("txTmpExistsTxSha: fail %v", err)
return false
}
log.Warnf("txtmpExistsTxSha: success")
return true
}
if err != nil {
// ignore real errors?
log.Warnf("existsTxSha: fail %v", err)
return false
}
return true
}
// FetchLocationBySha looks up the Tx sha information by name.
func (db *LevelDb) FetchLocationBySha(txsha *btcwire.ShaHash) (blockidx int64, txoff int, txlen int, err error) {
db.dbLock.Lock()
defer db.dbLock.Unlock()
return db.fetchLocationBySha(txsha)
}
// fetchLocationBySha look up the Tx sha information by name.
// Must be called with db lock held.
func (db *LevelDb) fetchLocationBySha(txsha *btcwire.ShaHash) (height int64, txoff int, txlen int, err error) {
var row *sql.Row
var blockid int64
var ttxoff int
var ttxlen int
rowBytes := txsha.String()
txop := db.txop(txFetchLocationByShaStmt)
row = txop.QueryRow(rowBytes)
err = row.Scan(&blockid, &ttxoff, &ttxlen)
if err == sql.ErrNoRows {
txop = db.txop(txtmpFetchLocationByShaStmt)
row = txop.QueryRow(rowBytes)
err = row.Scan(&blockid, &ttxoff, &ttxlen)
if err == sql.ErrNoRows {
err = btcdb.TxShaMissing
return
}
if err != nil {
log.Warnf("txtmp FetchLocationBySha: fail %v",
err)
return
}
}
if err != nil {
log.Warnf("FetchLocationBySha: fail %v", err)
return
}
height = blockid - 1
txoff = ttxoff
txlen = ttxlen
return
}
// fetchLocationUsedBySha look up the Tx sha information by name.
// Must be called with db lock held.
func (db *LevelDb) fetchLocationUsedBySha(txsha *btcwire.ShaHash) (rheight int64, rtxoff int, rtxlen int, rspentbuf []byte, err error) {
var row *sql.Row
var blockid int64
var txoff int
var txlen int
var txspent []byte
rowBytes := txsha.String()
txop := db.txop(txFetchLocUsedByShaStmt)
row = txop.QueryRow(rowBytes)
err = row.Scan(&blockid, &txoff, &txlen, &txspent)
if err == sql.ErrNoRows {
txop = db.txop(txtmpFetchLocUsedByShaStmt)
row = txop.QueryRow(rowBytes)
err = row.Scan(&blockid, &txoff, &txlen, &txspent)
if err == sql.ErrNoRows {
err = btcdb.TxShaMissing
return
}
if err != nil {
log.Warnf("txtmp FetchLocationBySha: fail %v",
err)
return
}
}
if err != nil {
log.Warnf("FetchLocationBySha: fail %v", err)
return
}
height := blockid - 1
return height, txoff, txlen, txspent, nil
}
// FetchTxUsedBySha returns the used/spent buffer for a given transaction.
func (db *LevelDb) FetchTxUsedBySha(txsha *btcwire.ShaHash) (spentbuf []byte, err error) {
var row *sql.Row
db.dbLock.Lock()
defer db.dbLock.Unlock()
rowBytes := txsha.String()
txop := db.txop(txFetchUsedByShaStmt)
row = txop.QueryRow(rowBytes)
var databytes []byte
err = row.Scan(&databytes)
if err == sql.ErrNoRows {
txop := db.txop(txtmpFetchUsedByShaStmt)
row = txop.QueryRow(rowBytes)
err = row.Scan(&databytes)
if err == sql.ErrNoRows {
err = btcdb.TxShaMissing
return
}
if err != nil {
log.Warnf("txtmp FetchLocationBySha: fail %v",
err)
return
}
}
if err != nil {
log.Warnf("FetchUsedBySha: fail %v", err)
return
}
spentbuf = databytes
return
}
var vaccumDbNextMigrate bool
// migrateTmpTable functions to perform internal db optimization when
// performing large numbers of database inserts. When in Fast operation
// mode, it inserts into txtmp, then when that table reaches a certain
// size limit it moves all tx in the txtmp table into the primary tx
// table and recomputes the index on the primary tx table.
func (db *LevelDb) migrateTmpTable() error {
db.endTx(true)
db.startTx() // ???
db.UseTempTX = false
db.TempTblSz = 0
var doVacuum bool
var nsteps int
if vaccumDbNextMigrate {
nsteps = 6
vaccumDbNextMigrate = false
doVacuum = true
} else {
nsteps = 5
vaccumDbNextMigrate = true
}
log.Infof("db compaction Stage 1/%v: Preparing", nsteps)
txop := db.txop(txMigratePrep)
_, err := txop.Exec()
if err != nil {
log.Warnf("Failed to prepare migrate - %v", err)
return err
}
log.Infof("db compaction Stage 2/%v: Copying", nsteps)
txop = db.txop(txMigrateCopy)
_, err = txop.Exec()
if err != nil {
log.Warnf("Migrate read failed - %v", err)
return err
}
log.Tracef("db compaction Stage 2a/%v: Enable db vacuum", nsteps)
txop = db.txop(txPragmaVacuumOn)
_, err = txop.Exec()
if err != nil {
log.Warnf("Migrate error trying to enable vacuum on "+
"temporary transaction table - %v", err)
return err
}
log.Infof("db compaction Stage 3/%v: Clearing old data", nsteps)
txop = db.txop(txMigrateClear)
_, err = txop.Exec()
if err != nil {
log.Warnf("Migrate error trying to clear temporary "+
"transaction table - %v", err)
return err
}
log.Tracef("db compaction Stage 3a/%v: Disable db vacuum", nsteps)
txop = db.txop(txPragmaVacuumOff)
_, err = txop.Exec()
if err != nil {
log.Warnf("Migrate error trying to disable vacuum on "+
"temporary transaction table - %v", err)
return err
}
log.Infof("db compaction Stage 4/%v: Rebuilding index", nsteps)
txop = db.txop(txMigrateFinish)
_, err = txop.Exec()
if err != nil {
log.Warnf("Migrate error trying to clear temporary "+
"transaction table - %v", err)
return err
}
log.Infof("db compaction Stage 5/%v: Finalizing transaction", nsteps)
db.endTx(true) // ???
if doVacuum {
log.Infof("db compaction Stage 6/%v: Optimizing database", nsteps)
txop = db.txop(txVacuum)
_, err = txop.Exec()
if err != nil {
log.Warnf("migrate error trying to clear txtmp tbl %v", err)
return err
}
}
log.Infof("db compaction: Complete")
// TODO(drahn) - determine if this should be turned back on or not
db.UseTempTX = true
return nil
}