lbcd/ldb/leveldb.go

564 lines
12 KiB
Go
Raw Normal View History

// Copyright (c) 2013 Conformal Systems LLC.
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
package ldb
import (
"encoding/binary"
"fmt"
"github.com/conformal/btcdb"
"github.com/conformal/btcutil"
"github.com/conformal/btcwire"
"github.com/conformal/goleveldb/leveldb"
"github.com/conformal/goleveldb/leveldb/cache"
"github.com/conformal/goleveldb/leveldb/opt"
2013-10-13 13:02:45 -05:00
"github.com/conformal/seelog"
"os"
"runtime"
"sync"
)
const (
dbVersion int = 2
dbMaxTransCnt = 20000
dbMaxTransMem = 64 * 1024 * 1024 // 64 MB
)
var log seelog.LoggerInterface = seelog.Disabled
type tTxInsertData struct {
txsha *btcwire.ShaHash
blockid int64
txoff int
txlen int
usedbuf []byte
}
type LevelDb struct {
// lock preventing multiple entry
dbLock sync.Mutex
// leveldb pieces
lDb *leveldb.DB
2013-09-09 15:50:13 -05:00
ro *opt.ReadOptions
wo *opt.WriteOptions
lbatch *leveldb.Batch
nextBlock int64
lastBlkShaCached bool
lastBlkSha btcwire.ShaHash
lastBlkIdx int64
txUpdateMap map[btcwire.ShaHash]*txUpdateObj
}
var self = btcdb.DriverDB{DbType: "leveldb", Create: CreateDB, Open: OpenDB}
func init() {
btcdb.AddDBDriver(self)
}
// OpenDB opens an existing database for use.
func OpenDB(dbpath string) (btcdb.Db, error) {
log = btcdb.GetLog()
db, err := openDB(dbpath, 0)
if err != nil {
return nil, err
}
// Need to find last block and tx
var lastknownblock, nextunknownblock, testblock int64
increment := int64(100000)
ldb := db.(*LevelDb)
var lastSha *btcwire.ShaHash
// forward scan
blockforward:
for {
sha, _, err := ldb.getBlkByHeight(testblock)
if err == nil {
// block is found
lastSha = sha
lastknownblock = testblock
testblock += increment
} else {
if testblock == 0 {
//no blocks in db, odd but ok.
lastknownblock = -1
nextunknownblock = 0
var emptysha btcwire.ShaHash
lastSha = &emptysha
} else {
nextunknownblock = testblock
}
break blockforward
}
}
// narrow search
blocknarrow:
for lastknownblock != -1 {
testblock = (lastknownblock + nextunknownblock) / 2
sha, _, err := ldb.getBlkByHeight(testblock)
if err == nil {
lastknownblock = testblock
lastSha = sha
} else {
nextunknownblock = testblock
}
if lastknownblock+1 == nextunknownblock {
break blocknarrow
}
}
ldb.lastBlkSha = *lastSha
ldb.lastBlkIdx = lastknownblock
ldb.nextBlock = lastknownblock + 1
return db, nil
}
var CurrentDBVersion int32 = 1
func openDB(dbpath string, flag opt.OptionsFlag) (pbdb btcdb.Db, err error) {
var db LevelDb
var tlDb *leveldb.DB
var dbversion int32
defer func() {
if err == nil {
2013-09-09 15:50:13 -05:00
db.lDb = tlDb
db.txUpdateMap = map[btcwire.ShaHash]*txUpdateObj{}
pbdb = &db
}
}()
if flag&opt.OFCreateIfMissing == opt.OFCreateIfMissing {
err = os.Mkdir(dbpath, 0750)
if err != nil {
log.Errorf("mkdir failed %v %v", dbpath, err)
return
}
} else {
_, err = os.Stat(dbpath)
if err != nil {
err = btcdb.DbDoesNotExist
return
}
}
needVersionFile := false
verfile := dbpath + ".ver"
fi, ferr := os.Open(verfile)
if ferr == nil {
defer fi.Close()
ferr = binary.Read(fi, binary.LittleEndian, &dbversion)
if ferr != nil {
dbversion = ^0
}
} else {
if flag&opt.OFCreateIfMissing != 0 {
needVersionFile = true
dbversion = CurrentDBVersion
}
}
2013-09-26 10:15:14 -05:00
myCache := cache.NewEmptyCache()
opts := &opt.Options{Flag: flag,
2013-09-26 10:15:14 -05:00
BlockCache: myCache,
MaxOpenFiles: 256,
CompressionType: opt.NoCompression,
}
switch dbversion {
case 0:
opts = &opt.Options{Flag: flag}
case 1:
// uses defaults from above
default:
err = fmt.Errorf("unsupported db version %v", dbversion)
return
}
tlDb, err = leveldb.OpenFile(dbpath, opts)
if err != nil {
return
}
// If we opened the database successfully on 'create'
// update the
if needVersionFile {
fo, ferr := os.Create(verfile)
if ferr != nil {
// TODO(design) close and delete database?
err = ferr
return
}
defer fo.Close()
err = binary.Write(fo, binary.LittleEndian, dbversion)
if err != nil {
return
}
}
return
}
// CreateDB creates, initializes and opens a database for use.
func CreateDB(dbpath string) (btcdb.Db, error) {
log = btcdb.GetLog()
// No special setup needed, just OpenBB
db, err := openDB(dbpath, opt.OFCreateIfMissing)
if err == nil {
ldb := db.(*LevelDb)
ldb.lastBlkIdx = -1
ldb.nextBlock = 0
}
return db, err
}
func (db *LevelDb) close() {
db.lDb.Close()
}
// Sync verifies that the database is coherent on disk,
// and no outstanding transactions are in flight.
func (db *LevelDb) Sync() {
db.dbLock.Lock()
defer db.dbLock.Unlock()
// while specified by the API, does nothing
// however does grab lock to verify it does not return until other operations are complete.
}
// Close cleanly shuts down database, syncing all data.
func (db *LevelDb) Close() {
db.dbLock.Lock()
defer db.dbLock.Unlock()
db.close()
}
// DropAfterBlockBySha will remove any blocks from the database after
// the given block.
func (db *LevelDb) DropAfterBlockBySha(sha *btcwire.ShaHash) (rerr error) {
db.dbLock.Lock()
defer db.dbLock.Unlock()
2013-09-09 15:50:13 -05:00
defer func() {
if rerr == nil {
rerr = db.processBatches()
}
2013-09-09 15:50:13 -05:00
}()
startheight := db.nextBlock - 1
keepidx, err := db.getBlkLoc(sha)
if err != nil {
// should the error here be normalized ?
log.Tracef("block loc failed %v ", sha)
return err
}
for height := startheight; height > keepidx; height = height - 1 {
var blk *btcutil.Block
blksha, buf, err := db.getBlkByHeight(height)
if err != nil {
return err
}
blk, err = btcutil.NewBlockFromBytes(buf)
if err != nil {
return err
}
for _, tx := range blk.MsgBlock().Transactions {
err = db.unSpend(tx)
if err != nil {
return err
}
}
// rather than iterate the list of tx backward, do it twice.
for _, tx := range blk.MsgBlock().Transactions {
txSha, _ := tx.TxSha()
var txUo txUpdateObj
txUo.delete = true
db.txUpdateMap[txSha] = &txUo
}
db.lBatch().Delete(shaBlkToKey(blksha))
db.lBatch().Delete(int64ToKey(height))
}
db.nextBlock = keepidx + 1
return nil
}
// InsertBlock inserts raw block and transaction data from a block into the
// database. The first block inserted into the database will be treated as the
// genesis block. Every subsequent block insert requires the referenced parent
// block to already exist.
func (db *LevelDb) InsertBlock(block *btcutil.Block) (height int64, rerr error) {
db.dbLock.Lock()
defer db.dbLock.Unlock()
2013-09-09 15:50:13 -05:00
defer func() {
if rerr == nil {
rerr = db.processBatches()
}
2013-09-09 15:50:13 -05:00
}()
blocksha, err := block.Sha()
if err != nil {
log.Warnf("Failed to compute block sha %v", blocksha)
return 0, err
}
mblock := block.MsgBlock()
rawMsg, err := block.Bytes()
if err != nil {
log.Warnf("Failed to obtain raw block sha %v", blocksha)
return 0, err
}
txloc, err := block.TxLoc()
if err != nil {
log.Warnf("Failed to obtain raw block sha %v", blocksha)
return 0, err
}
// Insert block into database
newheight, err := db.insertBlockData(blocksha, &mblock.Header.PrevBlock,
rawMsg)
if err != nil {
log.Warnf("Failed to insert block %v %v %v", blocksha,
&mblock.Header.PrevBlock, err)
return 0, err
}
// At least two blocks in the long past were generated by faulty
// miners, the sha of the transaction exists in a previous block,
// detect this condition and 'accept' the block.
for txidx, tx := range mblock.Transactions {
var txsha btcwire.ShaHash
txsha, err = tx.TxSha()
if err != nil {
log.Warnf("failed to compute tx name block %v idx %v err %v", blocksha, txidx, err)
return 0, err
}
// Some old blocks contain duplicate transactions
// Attempt to cleanly bypass this problem
// http://blockexplorer.com/b/91842
// http://blockexplorer.com/b/91880
if newheight == 91842 {
dupsha, err := btcwire.NewShaHashFromStr("d5d27987d2a3dfc724e359870c6644b40e497bdc0589a033220fe15429d88599")
if err != nil {
panic("invalid sha string in source")
}
if txsha == *dupsha {
//log.Tracef("skipping sha %v %v", dupsha, newheight)
continue
}
}
if newheight == 91880 {
dupsha, err := btcwire.NewShaHashFromStr("e3bf3d07d4b0375638d5f1db5255fe07ba2c4cb067cd81b84ee974b6585fb468")
if err != nil {
panic("invalid sha string in source")
}
if txsha == *dupsha {
//log.Tracef("skipping sha %v %v", dupsha, newheight)
continue
}
}
spentbuflen := (len(tx.TxOut) + 7) / 8
spentbuf := make([]byte, spentbuflen, spentbuflen)
if len(tx.TxOut)%8 != 0 {
for i := uint(len(tx.TxOut) % 8); i < 8; i++ {
spentbuf[spentbuflen-1] |= (byte(1) << i)
}
}
err = db.insertTx(&txsha, newheight, txloc[txidx].TxStart, txloc[txidx].TxLen, spentbuf)
if err != nil {
log.Warnf("block %v idx %v failed to insert tx %v %v err %v", blocksha, newheight, &txsha, txidx, err)
return
}
err = db.doSpend(tx)
if err != nil {
log.Warnf("block %v idx %v failed to spend tx %v %v err %v", blocksha, newheight, &txsha, txidx, err)
return
}
}
return newheight, nil
}
// SetDBInsertMode provides hints to the database to how the application
// is running this allows the database to work in optimized modes when the
// database may be very busy.
func (db *LevelDb) SetDBInsertMode(newmode btcdb.InsertMode) {
// special modes are not supported
}
// doSpend iterates all TxIn in a bitcoin transaction marking each associated
// TxOut as spent.
func (db *LevelDb) doSpend(tx *btcwire.MsgTx) error {
for txinidx := range tx.TxIn {
txin := tx.TxIn[txinidx]
inTxSha := txin.PreviousOutpoint.Hash
inTxidx := txin.PreviousOutpoint.Index
if inTxidx == ^uint32(0) {
continue
}
//log.Infof("spending %v %v", &inTxSha, inTxidx)
err := db.setSpentData(&inTxSha, inTxidx)
if err != nil {
return err
}
}
return nil
}
// unSpend iterates all TxIn in a bitcoin transaction marking each associated
// TxOut as unspent.
func (db *LevelDb) unSpend(tx *btcwire.MsgTx) error {
for txinidx := range tx.TxIn {
txin := tx.TxIn[txinidx]
inTxSha := txin.PreviousOutpoint.Hash
inTxidx := txin.PreviousOutpoint.Index
if inTxidx == ^uint32(0) {
continue
}
err := db.clearSpentData(&inTxSha, inTxidx)
if err != nil {
return err
}
}
return nil
}
func (db *LevelDb) setSpentData(sha *btcwire.ShaHash, idx uint32) error {
return db.setclearSpentData(sha, idx, true)
}
func (db *LevelDb) clearSpentData(sha *btcwire.ShaHash, idx uint32) error {
return db.setclearSpentData(sha, idx, false)
}
func (db *LevelDb) setclearSpentData(txsha *btcwire.ShaHash, idx uint32, set bool) error {
var txUo *txUpdateObj
var ok bool
if txUo, ok = db.txUpdateMap[*txsha]; !ok {
// not cached, load from db
var txU txUpdateObj
blkHeight, txOff, txLen, spentData, err := db.getTxData(txsha)
if err != nil {
return err
}
txU.txSha = txsha
txU.blkHeight = blkHeight
txU.txoff = txOff
txU.txlen = txLen
txU.spentData = spentData
txUo = &txU
}
byteidx := idx / 8
byteoff := idx % 8
if set {
txUo.spentData[byteidx] |= (byte(1) << byteoff)
} else {
txUo.spentData[byteidx] &= ^(byte(1) << byteoff)
}
db.txUpdateMap[*txsha] = txUo
return nil
}
func int64ToKey(keyint int64) []byte {
key := fmt.Sprintf("%d", keyint)
return []byte(key)
}
func shaBlkToKey(sha *btcwire.ShaHash) []byte {
2013-09-09 15:50:13 -05:00
shaB := sha.Bytes()
return shaB
}
func shaTxToKey(sha *btcwire.ShaHash) []byte {
2013-09-09 15:50:13 -05:00
shaB := sha.Bytes()
shaB = append(shaB, "tx"...)
return shaB
}
func (db *LevelDb) lBatch() *leveldb.Batch {
if db.lbatch == nil {
db.lbatch = new(leveldb.Batch)
}
return db.lbatch
}
func (db *LevelDb) processBatches() error {
var err error
if len(db.txUpdateMap) != 0 || db.lbatch != nil {
if db.lbatch == nil {
db.lbatch = new(leveldb.Batch)
}
for txSha, txU := range db.txUpdateMap {
key := shaTxToKey(&txSha)
if txU.delete {
//log.Infof("deleting tx %v", txSha)
db.lbatch.Delete(key)
} else {
//log.Infof("inserting tx %v", txSha)
txdat, err := db.formatTx(txU)
if err != nil {
return err
}
db.lbatch.Put(key, txdat)
}
}
err = db.lDb.Write(db.lbatch, db.wo)
if err != nil {
2013-09-10 11:25:13 -04:00
log.Tracef("batch failed %v\n", err)
return err
}
db.lbatch.Reset()
db.txUpdateMap = map[btcwire.ShaHash]*txUpdateObj{}
runtime.GC()
}
return nil
}
func (db *LevelDb) RollbackClose() {
db.dbLock.Lock()
defer db.dbLock.Unlock()
db.close()
}