Rework leveldb support for duplicate tx.
This separates fully spent tx from tx containing unspent TxOut This handles duplicate tx and Ideally should be faster.
This commit is contained in:
parent
078fb4d38e
commit
13ef0e2be9
6 changed files with 629 additions and 147 deletions
26
ldb/block.go
26
ldb/block.go
|
@ -9,9 +9,35 @@ import (
|
|||
"encoding/binary"
|
||||
"fmt"
|
||||
"github.com/conformal/btcdb"
|
||||
"github.com/conformal/btcutil"
|
||||
"github.com/conformal/btcwire"
|
||||
)
|
||||
|
||||
// FetchBlockBySha - return a btcutil Block
|
||||
func (db *LevelDb) FetchBlockBySha(sha *btcwire.ShaHash) (blk *btcutil.Block, err error) {
|
||||
db.dbLock.Lock()
|
||||
defer db.dbLock.Unlock()
|
||||
return db.fetchBlockBySha(sha)
|
||||
}
|
||||
|
||||
// fetchBlockBySha - return a btcutil Block
|
||||
// Must be called with db lock held.
|
||||
func (db *LevelDb) fetchBlockBySha(sha *btcwire.ShaHash) (blk *btcutil.Block, err error) {
|
||||
|
||||
buf, height, err := db.fetchSha(sha)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
blk, err = btcutil.NewBlockFromBytes(buf)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
blk.SetHeight(height)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (db *LevelDb) getBlkLoc(sha *btcwire.ShaHash) (int64, error) {
|
||||
var blkHeight int64
|
||||
|
||||
|
|
117
ldb/dbcache.go
117
ldb/dbcache.go
|
@ -5,124 +5,9 @@
|
|||
package ldb
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"github.com/conformal/btcdb"
|
||||
"github.com/conformal/btcutil"
|
||||
"github.com/conformal/btcwire"
|
||||
//"fmt"
|
||||
)
|
||||
|
||||
// FetchBlockBySha - return a btcutil Block, object may be a cached.
|
||||
func (db *LevelDb) FetchBlockBySha(sha *btcwire.ShaHash) (blk *btcutil.Block, err error) {
|
||||
db.dbLock.Lock()
|
||||
defer db.dbLock.Unlock()
|
||||
return db.fetchBlockBySha(sha)
|
||||
}
|
||||
|
||||
// fetchBlockBySha - return a btcutil Block, object may be a cached.
|
||||
// Must be called with db lock held.
|
||||
func (db *LevelDb) fetchBlockBySha(sha *btcwire.ShaHash) (blk *btcutil.Block, err error) {
|
||||
|
||||
buf, height, err := db.fetchSha(sha)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
blk, err = btcutil.NewBlockFromBytes(buf)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
blk.SetHeight(height)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// FetchTxByShaList returns the most recent tx of the name fully spent or not
|
||||
func (db *LevelDb) FetchTxByShaList(txShaList []*btcwire.ShaHash) []*btcdb.TxListReply {
|
||||
// until the fully spent separation of tx is complete this is identical
|
||||
// to FetchUnSpentTxByShaList
|
||||
return db.FetchUnSpentTxByShaList(txShaList)
|
||||
}
|
||||
|
||||
// FetchUnSpentTxByShaList given a array of ShaHash, look up the transactions
|
||||
// and return them in a TxListReply array.
|
||||
func (db *LevelDb) FetchUnSpentTxByShaList(txShaList []*btcwire.ShaHash) []*btcdb.TxListReply {
|
||||
db.dbLock.Lock()
|
||||
defer db.dbLock.Unlock()
|
||||
|
||||
replies := make([]*btcdb.TxListReply, len(txShaList))
|
||||
for i, txsha := range txShaList {
|
||||
tx, blockSha, height, txspent, err := db.fetchTxDataBySha(txsha)
|
||||
btxspent := []bool{}
|
||||
if err == nil {
|
||||
btxspent = make([]bool, len(tx.TxOut), len(tx.TxOut))
|
||||
for idx := range tx.TxOut {
|
||||
byteidx := idx / 8
|
||||
byteoff := uint(idx % 8)
|
||||
btxspent[idx] = (txspent[byteidx] & (byte(1) << byteoff)) != 0
|
||||
}
|
||||
}
|
||||
txlre := btcdb.TxListReply{Sha: txsha, Tx: tx, BlkSha: blockSha, Height: height, TxSpent: btxspent, Err: err}
|
||||
replies[i] = &txlre
|
||||
}
|
||||
return replies
|
||||
}
|
||||
|
||||
// fetchTxDataBySha returns several pieces of data regarding the given sha.
|
||||
func (db *LevelDb) fetchTxDataBySha(txsha *btcwire.ShaHash) (rtx *btcwire.MsgTx, rblksha *btcwire.ShaHash, rheight int64, rtxspent []byte, err error) {
|
||||
var blksha *btcwire.ShaHash
|
||||
var blkHeight int64
|
||||
var txspent []byte
|
||||
var txOff, txLen int
|
||||
var blkbuf []byte
|
||||
|
||||
blkHeight, txOff, txLen, txspent, err = db.getTxData(txsha)
|
||||
if err != nil {
|
||||
err = btcdb.TxShaMissing
|
||||
return
|
||||
}
|
||||
|
||||
blksha, blkbuf, err = db.getBlkByHeight(blkHeight)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
//log.Trace("transaction %v is at block %v %v txoff %v, txlen %v\n",
|
||||
// txsha, blksha, blkHeight, txOff, txLen)
|
||||
|
||||
rbuf := bytes.NewBuffer(blkbuf[txOff : txOff+txLen])
|
||||
|
||||
var tx btcwire.MsgTx
|
||||
err = tx.Deserialize(rbuf)
|
||||
if err != nil {
|
||||
log.Warnf("unable to decode tx block %v %v txoff %v txlen %v",
|
||||
blkHeight, blksha, txOff, txLen)
|
||||
return
|
||||
}
|
||||
|
||||
return &tx, blksha, blkHeight, txspent, nil
|
||||
}
|
||||
|
||||
// FetchTxBySha returns some data for the given Tx Sha.
|
||||
func (db *LevelDb) FetchTxBySha(txsha *btcwire.ShaHash) ([]*btcdb.TxListReply, error) {
|
||||
tx, blksha, height, txspent, err := db.fetchTxDataBySha(txsha)
|
||||
if err != nil {
|
||||
return []*btcdb.TxListReply{}, err
|
||||
}
|
||||
|
||||
replies := make([]*btcdb.TxListReply, 1)
|
||||
|
||||
btxspent := make([]bool, len(tx.TxOut), len(tx.TxOut))
|
||||
for idx := range tx.TxOut {
|
||||
byteidx := idx / 8
|
||||
byteoff := uint(idx % 8)
|
||||
btxspent[idx] = (txspent[byteidx] & (byte(1) << byteoff)) != 0
|
||||
}
|
||||
|
||||
txlre := btcdb.TxListReply{Sha: txsha, Tx: tx, BlkSha: blksha, Height: height, TxSpent: btxspent, Err: err}
|
||||
replies[0] = &txlre
|
||||
return replies, nil
|
||||
}
|
||||
|
||||
// InvalidateTxCache clear/release all cached transactions.
|
||||
func (db *LevelDb) InvalidateTxCache() {
|
||||
}
|
||||
|
|
176
ldb/dup_test.go
Normal file
176
ldb/dup_test.go
Normal file
|
@ -0,0 +1,176 @@
|
|||
// Copyright (c) 2013 Conformal Systems LLC.
|
||||
// Use of this source code is governed by an ISC
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package ldb_test
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/conformal/btcdb"
|
||||
"github.com/conformal/btcutil"
|
||||
"github.com/conformal/btcwire"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func Test_dupTx(t *testing.T) {
|
||||
|
||||
// Ignore db remove errors since it means we didn't have an old one.
|
||||
dbname := fmt.Sprintf("tstdbdup0")
|
||||
dbnamever := dbname + ".ver"
|
||||
_ = os.RemoveAll(dbname)
|
||||
_ = os.RemoveAll(dbnamever)
|
||||
db, err := btcdb.CreateDB("leveldb", dbname)
|
||||
if err != nil {
|
||||
t.Errorf("Failed to open test database %v", err)
|
||||
return
|
||||
}
|
||||
defer os.RemoveAll(dbname)
|
||||
defer os.RemoveAll(dbnamever)
|
||||
defer db.Close()
|
||||
|
||||
testdatafile := filepath.Join("testdata", "blocks1-256.bz2")
|
||||
blocks, err := loadBlocks(t, testdatafile)
|
||||
if err != nil {
|
||||
t.Errorf("Unable to load blocks from test data for: %v",
|
||||
err)
|
||||
return
|
||||
}
|
||||
|
||||
var lastSha *btcwire.ShaHash
|
||||
|
||||
// Populate with the fisrt 256 blocks, so we have blocks to 'mess with'
|
||||
err = nil
|
||||
out:
|
||||
for height := int64(0); height < int64(len(blocks)); height++ {
|
||||
block := blocks[height]
|
||||
|
||||
// except for NoVerify which does not allow lookups check inputs
|
||||
mblock := block.MsgBlock()
|
||||
var txneededList []*btcwire.ShaHash
|
||||
for _, tx := range mblock.Transactions {
|
||||
for _, txin := range tx.TxIn {
|
||||
if txin.PreviousOutpoint.Index == uint32(4294967295) {
|
||||
continue
|
||||
}
|
||||
origintxsha := &txin.PreviousOutpoint.Hash
|
||||
txneededList = append(txneededList, origintxsha)
|
||||
|
||||
if !db.ExistsTxSha(origintxsha) {
|
||||
t.Errorf("referenced tx not found %v ", origintxsha)
|
||||
}
|
||||
|
||||
_, err = db.FetchTxBySha(origintxsha)
|
||||
if err != nil {
|
||||
t.Errorf("referenced tx not found %v err %v ", origintxsha, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
txlist := db.FetchUnSpentTxByShaList(txneededList)
|
||||
for _, txe := range txlist {
|
||||
if txe.Err != nil {
|
||||
t.Errorf("tx list fetch failed %v err %v ", txe.Sha, txe.Err)
|
||||
break out
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
newheight, err := db.InsertBlock(block)
|
||||
if err != nil {
|
||||
t.Errorf("failed to insert block %v err %v", height, err)
|
||||
break out
|
||||
}
|
||||
if newheight != height {
|
||||
t.Errorf("height mismatch expect %v returned %v", height, newheight)
|
||||
break out
|
||||
}
|
||||
|
||||
newSha, blkid, err := db.NewestSha()
|
||||
if err != nil {
|
||||
t.Errorf("failed to obtain latest sha %v %v", height, err)
|
||||
}
|
||||
|
||||
if blkid != height {
|
||||
t.Errorf("height doe not match latest block height %v %v %v", blkid, height, err)
|
||||
}
|
||||
|
||||
blkSha, _ := block.Sha()
|
||||
if *newSha != *blkSha {
|
||||
t.Errorf("Newest block sha does not match freshly inserted one %v %v %v ", newSha, blkSha, err)
|
||||
}
|
||||
lastSha = blkSha
|
||||
}
|
||||
|
||||
// genrate a new block based on the last sha
|
||||
// these block are not verified, so there are a bunch of garbage fields
|
||||
// in the 'generated' block.
|
||||
|
||||
var bh btcwire.BlockHeader
|
||||
|
||||
bh.Version = 2
|
||||
bh.PrevBlock = *lastSha
|
||||
// Bits, Nonce are not filled in
|
||||
|
||||
mblk := btcwire.NewMsgBlock(&bh)
|
||||
|
||||
hash, _ := btcwire.NewShaHashFromStr("df2b060fa2e5e9c8ed5eaf6a45c13753ec8c63282b2688322eba40cd98ea067a")
|
||||
|
||||
po := btcwire.NewOutPoint(hash, 0)
|
||||
txI := btcwire.NewTxIn(po, []byte("garbage"))
|
||||
txO := btcwire.NewTxOut(50000000, []byte("garbageout"))
|
||||
|
||||
var tx btcwire.MsgTx
|
||||
tx.AddTxIn(txI)
|
||||
tx.AddTxOut(txO)
|
||||
|
||||
mblk.AddTransaction(&tx)
|
||||
|
||||
blk := btcutil.NewBlock(mblk)
|
||||
|
||||
fetchList := []*btcwire.ShaHash{hash}
|
||||
listReply := db.FetchUnSpentTxByShaList(fetchList)
|
||||
for _, lr := range listReply {
|
||||
if lr.Err != nil {
|
||||
t.Errorf("sha %v spent %v err %v\n", lr.Sha,
|
||||
lr.TxSpent, lr.Err)
|
||||
}
|
||||
}
|
||||
|
||||
_, err = db.InsertBlock(blk)
|
||||
if err != nil {
|
||||
t.Errorf("failed to insert phony block %v", err)
|
||||
}
|
||||
|
||||
// ok, did it 'spend' the tx ?
|
||||
|
||||
listReply = db.FetchUnSpentTxByShaList(fetchList)
|
||||
for _, lr := range listReply {
|
||||
if lr.Err != btcdb.TxShaMissing {
|
||||
t.Errorf("sha %v spent %v err %v\n", lr.Sha,
|
||||
lr.TxSpent, lr.Err)
|
||||
}
|
||||
}
|
||||
|
||||
txshalist, _ := blk.TxShas()
|
||||
for _, txsha := range txshalist {
|
||||
txReply, err := db.FetchTxBySha(txsha)
|
||||
if err != nil {
|
||||
t.Errorf("fully spent lookup %v err %v\n", hash, err)
|
||||
} else {
|
||||
for _, lr := range txReply {
|
||||
if lr.Err != nil {
|
||||
fmt.Errorf("stx %v spent %v err %v\n", lr.Sha,
|
||||
lr.TxSpent, lr.Err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
t.Logf("Dropping block")
|
||||
|
||||
err = db.DropAfterBlockBySha(lastSha)
|
||||
if err != nil {
|
||||
t.Errorf("failed to drop spending block %v", err)
|
||||
}
|
||||
}
|
|
@ -130,7 +130,7 @@ endtest:
|
|||
}
|
||||
|
||||
txlookupmap := map[btcwire.ShaHash]*btcdb.TxListReply{}
|
||||
txlist = db.FetchUnSpentTxByShaList(txlookupList)
|
||||
txlist = db.FetchTxByShaList(txlookupList)
|
||||
for _, txe := range txlist {
|
||||
if txe.Err != nil {
|
||||
t.Errorf("tx list fetch failed %v err %v ", txe.Sha, txe.Err)
|
||||
|
@ -188,7 +188,7 @@ endtest:
|
|||
break endtest
|
||||
}
|
||||
txlookupmap = map[btcwire.ShaHash]*btcdb.TxListReply{}
|
||||
txlist = db.FetchUnSpentTxByShaList(txlookupList)
|
||||
txlist = db.FetchTxByShaList(txlookupList)
|
||||
for _, txe := range txlist {
|
||||
if txe.Err != nil {
|
||||
t.Errorf("tx list fetch failed %v err %v ", txe.Sha, txe.Err)
|
||||
|
|
173
ldb/leveldb.go
173
ldb/leveldb.go
|
@ -52,6 +52,7 @@ type LevelDb struct {
|
|||
lastBlkIdx int64
|
||||
|
||||
txUpdateMap map[btcwire.ShaHash]*txUpdateObj
|
||||
txSpentUpdateMap map[btcwire.ShaHash]*spentTxUpdate
|
||||
}
|
||||
|
||||
var self = btcdb.DriverDB{DbType: "leveldb", Create: CreateDB, Open: OpenDB}
|
||||
|
@ -136,6 +137,7 @@ func openDB(dbpath string, flag opt.OptionsFlag) (pbdb btcdb.Db, err error) {
|
|||
db.lDb = tlDb
|
||||
|
||||
db.txUpdateMap = map[btcwire.ShaHash]*txUpdateObj{}
|
||||
db.txSpentUpdateMap = make(map[btcwire.ShaHash]*spentTxUpdate)
|
||||
|
||||
pbdb = &db
|
||||
}
|
||||
|
@ -350,30 +352,6 @@ func (db *LevelDb) InsertBlock(block *btcutil.Block) (height int64, rerr error)
|
|||
log.Warnf("failed to compute tx name block %v idx %v err %v", blocksha, txidx, err)
|
||||
return 0, err
|
||||
}
|
||||
// Some old blocks contain duplicate transactions
|
||||
// Attempt to cleanly bypass this problem
|
||||
// http://blockexplorer.com/b/91842
|
||||
// http://blockexplorer.com/b/91880
|
||||
if newheight == 91842 {
|
||||
dupsha, err := btcwire.NewShaHashFromStr("d5d27987d2a3dfc724e359870c6644b40e497bdc0589a033220fe15429d88599")
|
||||
if err != nil {
|
||||
panic("invalid sha string in source")
|
||||
}
|
||||
if txsha.IsEqual(dupsha) {
|
||||
//log.Tracef("skipping sha %v %v", dupsha, newheight)
|
||||
continue
|
||||
}
|
||||
}
|
||||
if newheight == 91880 {
|
||||
dupsha, err := btcwire.NewShaHashFromStr("e3bf3d07d4b0375638d5f1db5255fe07ba2c4cb067cd81b84ee974b6585fb468")
|
||||
if err != nil {
|
||||
panic("invalid sha string in source")
|
||||
}
|
||||
if txsha.IsEqual(dupsha) {
|
||||
//log.Tracef("skipping sha %v %v", dupsha, newheight)
|
||||
continue
|
||||
}
|
||||
}
|
||||
spentbuflen := (len(tx.TxOut) + 7) / 8
|
||||
spentbuf := make([]byte, spentbuflen, spentbuflen)
|
||||
if len(tx.TxOut)%8 != 0 {
|
||||
|
@ -384,10 +362,53 @@ func (db *LevelDb) InsertBlock(block *btcutil.Block) (height int64, rerr error)
|
|||
|
||||
err = db.insertTx(txsha, newheight, txloc[txidx].TxStart, txloc[txidx].TxLen, spentbuf)
|
||||
if err != nil {
|
||||
log.Warnf("block %v idx %v failed to insert tx %v %v err %v", blocksha, newheight, txsha, txidx, err)
|
||||
log.Warnf("block %v idx %v failed to insert tx %v %v err %v", blocksha, newheight, &txsha, txidx, err)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// Some old blocks contain duplicate transactions
|
||||
// Attempt to cleanly bypass this problem by marking the
|
||||
// first as fully spent.
|
||||
// http://blockexplorer.com/b/91812 dup in 91842
|
||||
// http://blockexplorer.com/b/91722 dup in 91880
|
||||
if newheight == 91812 {
|
||||
dupsha, err := btcwire.NewShaHashFromStr("d5d27987d2a3dfc724e359870c6644b40e497bdc0589a033220fe15429d88599")
|
||||
if err != nil {
|
||||
panic("invalid sha string in source")
|
||||
}
|
||||
if txsha.IsEqual(dupsha) {
|
||||
// marking TxOut[0] as spent
|
||||
po := btcwire.NewOutPoint(dupsha, 0)
|
||||
txI := btcwire.NewTxIn(po, []byte("garbage"))
|
||||
|
||||
var spendtx btcwire.MsgTx
|
||||
spendtx.AddTxIn(txI)
|
||||
err = db.doSpend(&spendtx)
|
||||
if err != nil {
|
||||
log.Warnf("block %v idx %v failed to spend tx %v %v err %v", blocksha, newheight, &txsha, txidx, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
if newheight == 91722 {
|
||||
dupsha, err := btcwire.NewShaHashFromStr("e3bf3d07d4b0375638d5f1db5255fe07ba2c4cb067cd81b84ee974b6585fb468")
|
||||
if err != nil {
|
||||
panic("invalid sha string in source")
|
||||
}
|
||||
if txsha.IsEqual(dupsha) {
|
||||
// marking TxOut[0] as spent
|
||||
po := btcwire.NewOutPoint(dupsha, 0)
|
||||
txI := btcwire.NewTxIn(po, []byte("garbage"))
|
||||
|
||||
var spendtx btcwire.MsgTx
|
||||
spendtx.AddTxIn(txI)
|
||||
err = db.doSpend(&spendtx)
|
||||
if err != nil {
|
||||
log.Warnf("block %v idx %v failed to spend tx %v %v err %v", blocksha, newheight, &txsha, txidx, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
err = db.doSpend(tx)
|
||||
if err != nil {
|
||||
log.Warnf("block %v idx %v failed to spend tx %v %v err %v", blocksha, newheight, txsha, txidx, err)
|
||||
|
@ -467,7 +488,38 @@ func (db *LevelDb) setclearSpentData(txsha *btcwire.ShaHash, idx uint32, set boo
|
|||
var txU txUpdateObj
|
||||
blkHeight, txOff, txLen, spentData, err := db.getTxData(txsha)
|
||||
if err != nil {
|
||||
return err
|
||||
// setting a fully spent tx is an error.
|
||||
if set == true {
|
||||
return err
|
||||
}
|
||||
// if we are clearing a tx and it wasn't found
|
||||
// in the tx table, it could be in the fully spent
|
||||
// (duplicates) table.
|
||||
spentTxList, err := db.getTxFullySpent(txsha)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// need to reslice the list to exclude the most recent.
|
||||
sTx := spentTxList [len(spentTxList) -1]
|
||||
spentTxList [len(spentTxList) -1] = nil
|
||||
if len (spentTxList) == 1 {
|
||||
// write entry to delete tx from spent pool
|
||||
// XXX
|
||||
} else {
|
||||
spentTxList = spentTxList [:len(spentTxList)-1]
|
||||
// XXX format sTxList and set update Table
|
||||
}
|
||||
|
||||
// Create 'new' Tx update data.
|
||||
blkHeight = sTx.blkHeight
|
||||
txOff = sTx.txoff
|
||||
txLen = sTx.txlen
|
||||
spentbuflen := (sTx.numTxO + 7) / 8
|
||||
spentData = make([]byte, spentbuflen, spentbuflen)
|
||||
for i := range spentData {
|
||||
spentData[i] = ^byte(0)
|
||||
}
|
||||
}
|
||||
|
||||
txU.txSha = txsha
|
||||
|
@ -488,7 +540,51 @@ func (db *LevelDb) setclearSpentData(txsha *btcwire.ShaHash, idx uint32, set boo
|
|||
txUo.spentData[byteidx] &= ^(byte(1) << byteoff)
|
||||
}
|
||||
|
||||
db.txUpdateMap[*txsha] = txUo
|
||||
// check for fully spent Tx
|
||||
fullySpent := true
|
||||
for _, val := range txUo.spentData {
|
||||
if val != ^byte(0) {
|
||||
fullySpent = false
|
||||
break
|
||||
}
|
||||
}
|
||||
if fullySpent {
|
||||
var txSu *spentTxUpdate
|
||||
// Look up Tx in fully spent table
|
||||
if txSuOld, ok := db.txSpentUpdateMap[*txsha] ; ok {
|
||||
txSu = txSuOld
|
||||
} else {
|
||||
var txSuStore spentTxUpdate
|
||||
txSu = &txSuStore
|
||||
|
||||
txSuOld, err := db.getTxFullySpent(txsha)
|
||||
if err == nil {
|
||||
txSu.txl = txSuOld
|
||||
}
|
||||
}
|
||||
|
||||
// Fill in spentTx
|
||||
var sTx spentTx
|
||||
sTx.blkHeight = txUo.blkHeight
|
||||
sTx.txoff = txUo.txoff
|
||||
sTx.txlen = txUo.txlen
|
||||
// XXX -- there is no way to comput the real TxOut
|
||||
// from the spent array.
|
||||
sTx.numTxO = 8 * len(txUo.spentData)
|
||||
|
||||
// append this txdata to fully spent txlist
|
||||
txSu.txl = append(txSu.txl, &sTx)
|
||||
|
||||
// mark txsha as deleted in the txUpdateMap
|
||||
log.Tracef("***tx %v is fully spent\n", txsha)
|
||||
|
||||
db.txSpentUpdateMap[*txsha] = txSu
|
||||
|
||||
txUo.delete = true
|
||||
db.txUpdateMap[*txsha] = txUo
|
||||
} else {
|
||||
db.txUpdateMap[*txsha] = txUo
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -509,6 +605,12 @@ func shaTxToKey(sha *btcwire.ShaHash) []byte {
|
|||
return shaB
|
||||
}
|
||||
|
||||
func shaSpentTxToKey(sha *btcwire.ShaHash) []byte {
|
||||
shaB := sha.Bytes()
|
||||
shaB = append(shaB, "sx"...)
|
||||
return shaB
|
||||
}
|
||||
|
||||
func (db *LevelDb) lBatch() *leveldb.Batch {
|
||||
if db.lbatch == nil {
|
||||
db.lbatch = new(leveldb.Batch)
|
||||
|
@ -519,7 +621,7 @@ func (db *LevelDb) lBatch() *leveldb.Batch {
|
|||
func (db *LevelDb) processBatches() error {
|
||||
var err error
|
||||
|
||||
if len(db.txUpdateMap) != 0 || db.lbatch != nil {
|
||||
if len(db.txUpdateMap) != 0 || len(db.txSpentUpdateMap) != 0 || db.lbatch != nil {
|
||||
if db.lbatch == nil {
|
||||
db.lbatch = new(leveldb.Batch)
|
||||
}
|
||||
|
@ -538,6 +640,20 @@ func (db *LevelDb) processBatches() error {
|
|||
db.lbatch.Put(key, txdat)
|
||||
}
|
||||
}
|
||||
for txSha, txSu := range db.txSpentUpdateMap {
|
||||
key := shaSpentTxToKey(&txSha)
|
||||
if txSu.delete {
|
||||
//log.Infof("deleting tx %v", txSha)
|
||||
db.lbatch.Delete(key)
|
||||
} else {
|
||||
//log.Infof("inserting tx %v", txSha)
|
||||
txdat, err := db.formatTxFullySpent(txSu.txl)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
db.lbatch.Put(key, txdat)
|
||||
}
|
||||
}
|
||||
|
||||
err = db.lDb.Write(db.lbatch, db.wo)
|
||||
if err != nil {
|
||||
|
@ -546,6 +662,7 @@ func (db *LevelDb) processBatches() error {
|
|||
}
|
||||
db.lbatch.Reset()
|
||||
db.txUpdateMap = map[btcwire.ShaHash]*txUpdateObj{}
|
||||
db.txSpentUpdateMap = make(map[btcwire.ShaHash]*spentTxUpdate)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
|
280
ldb/tx.go
280
ldb/tx.go
|
@ -8,7 +8,8 @@ import (
|
|||
"bytes"
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
//"github.com/conformal/btcdb"
|
||||
"github.com/conformal/btcdb"
|
||||
"github.com/conformal/goleveldb/leveldb"
|
||||
"github.com/conformal/btcwire"
|
||||
)
|
||||
|
||||
|
@ -17,10 +18,23 @@ type txUpdateObj struct {
|
|||
blkHeight int64
|
||||
txoff int
|
||||
txlen int
|
||||
ntxout int
|
||||
spentData []byte
|
||||
delete bool
|
||||
}
|
||||
|
||||
type spentTx struct {
|
||||
blkHeight int64
|
||||
txoff int
|
||||
txlen int
|
||||
numTxO int
|
||||
delete bool
|
||||
}
|
||||
type spentTxUpdate struct {
|
||||
txl []*spentTx
|
||||
delete bool
|
||||
}
|
||||
|
||||
// InsertTx inserts a tx hash and its associated data into the database.
|
||||
func (db *LevelDb) InsertTx(txsha *btcwire.ShaHash, height int64, txoff int, txlen int, spentbuf []byte) (err error) {
|
||||
db.dbLock.Lock()
|
||||
|
@ -123,6 +137,97 @@ func (db *LevelDb) getTxData(txsha *btcwire.ShaHash) (rblkHeight int64,
|
|||
return blkHeight, int(txOff), int(txLen), spentBuf, nil
|
||||
}
|
||||
|
||||
func (db *LevelDb) getTxFullySpent(txsha *btcwire.ShaHash) ([]*spentTx, error) {
|
||||
|
||||
var badTxList, spentTxList []*spentTx
|
||||
|
||||
key := shaSpentTxToKey(txsha)
|
||||
buf, err := db.lDb.Get(key, db.ro)
|
||||
if err == leveldb.ErrNotFound {
|
||||
return badTxList, btcdb.TxShaMissing
|
||||
} else if err != nil {
|
||||
return badTxList, err
|
||||
}
|
||||
txListLen := len(buf) / 20
|
||||
txR := bytes.NewBuffer(buf)
|
||||
spentTxList = make([]*spentTx, txListLen, txListLen)
|
||||
|
||||
for i := range spentTxList {
|
||||
var sTx spentTx
|
||||
var blkHeight int64
|
||||
var txOff, txLen, numTxO int32
|
||||
|
||||
err := binary.Read(txR, binary.LittleEndian, &blkHeight)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("sTx Read fail 0")
|
||||
return nil, err
|
||||
}
|
||||
sTx.blkHeight = blkHeight
|
||||
|
||||
err = binary.Read(txR, binary.LittleEndian, &txOff)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("sTx Read fail 1")
|
||||
return nil, err
|
||||
}
|
||||
sTx.txoff = int(txOff)
|
||||
|
||||
err = binary.Read(txR, binary.LittleEndian, &txLen)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("sTx Read fail 2")
|
||||
return nil, err
|
||||
}
|
||||
sTx.txlen = int(txLen)
|
||||
|
||||
err = binary.Read(txR, binary.LittleEndian, &numTxO)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("sTx Read fail 3")
|
||||
return nil, err
|
||||
}
|
||||
sTx.numTxO = int(numTxO)
|
||||
|
||||
spentTxList[i] = &sTx
|
||||
}
|
||||
|
||||
return spentTxList, nil
|
||||
}
|
||||
|
||||
func (db *LevelDb) formatTxFullySpent(sTxList []*spentTx) ([]byte, error) {
|
||||
var txW bytes.Buffer
|
||||
|
||||
for _, sTx := range sTxList {
|
||||
blkHeight := sTx.blkHeight
|
||||
txOff := int32(sTx.txoff)
|
||||
txLen := int32(sTx.txlen)
|
||||
numTxO := int32(sTx.numTxO)
|
||||
|
||||
err := binary.Write(&txW, binary.LittleEndian, blkHeight)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("Write fail")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = binary.Write(&txW, binary.LittleEndian, txOff)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("Write fail")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = binary.Write(&txW, binary.LittleEndian, txLen)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("Write fail")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = binary.Write(&txW, binary.LittleEndian, numTxO)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("Write fail")
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return txW.Bytes(), nil
|
||||
}
|
||||
|
||||
// ExistsTxSha returns if the given tx sha exists in the database
|
||||
func (db *LevelDb) ExistsTxSha(txsha *btcwire.ShaHash) (exists bool) {
|
||||
db.dbLock.Lock()
|
||||
|
@ -143,3 +248,176 @@ func (db *LevelDb) existsTxSha(txSha *btcwire.ShaHash) (exists bool) {
|
|||
|
||||
return false
|
||||
}
|
||||
|
||||
// FetchTxByShaList returns the most recent tx of the name fully spent or not
|
||||
func (db *LevelDb) FetchTxByShaList(txShaList []*btcwire.ShaHash) []*btcdb.TxListReply {
|
||||
// until the fully spent separation of tx is complete this is identical
|
||||
// to FetchUnSpentTxByShaList
|
||||
replies := make([]*btcdb.TxListReply, len(txShaList))
|
||||
for i, txsha := range txShaList {
|
||||
tx, blockSha, height, txspent, err := db.fetchTxDataBySha(txsha)
|
||||
btxspent := []bool{}
|
||||
if err == nil {
|
||||
btxspent = make([]bool, len(tx.TxOut), len(tx.TxOut))
|
||||
for idx := range tx.TxOut {
|
||||
byteidx := idx / 8
|
||||
byteoff := uint(idx % 8)
|
||||
btxspent[idx] = (txspent[byteidx] & (byte(1) << byteoff)) != 0
|
||||
}
|
||||
}
|
||||
if err == btcdb.TxShaMissing {
|
||||
// if the unspent pool did not have the tx,
|
||||
// look in the fully spent pool (only last instance
|
||||
|
||||
sTxList, fSerr := db.getTxFullySpent(txsha)
|
||||
if fSerr == nil && len(sTxList) != 0 {
|
||||
idx := len(sTxList) - 1
|
||||
stx := sTxList[idx]
|
||||
|
||||
tx, blockSha, _, _, err = db.fetchTxDataByLoc(
|
||||
stx.blkHeight, stx.txoff, stx.txlen, []byte{})
|
||||
if err == nil {
|
||||
btxspent = make([]bool, len(tx.TxOut))
|
||||
for i := range btxspent {
|
||||
btxspent[i] = true
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
txlre := btcdb.TxListReply{Sha: txsha, Tx: tx, BlkSha: blockSha, Height: height, TxSpent: btxspent, Err: err}
|
||||
replies[i] = &txlre
|
||||
}
|
||||
return replies
|
||||
}
|
||||
|
||||
// FetchUnSpentTxByShaList given a array of ShaHash, look up the transactions
|
||||
// and return them in a TxListReply array.
|
||||
func (db *LevelDb) FetchUnSpentTxByShaList(txShaList []*btcwire.ShaHash) []*btcdb.TxListReply {
|
||||
db.dbLock.Lock()
|
||||
defer db.dbLock.Unlock()
|
||||
|
||||
replies := make([]*btcdb.TxListReply, len(txShaList))
|
||||
for i, txsha := range txShaList {
|
||||
tx, blockSha, height, txspent, err := db.fetchTxDataBySha(txsha)
|
||||
btxspent := []bool{}
|
||||
if err == nil {
|
||||
btxspent = make([]bool, len(tx.TxOut), len(tx.TxOut))
|
||||
for idx := range tx.TxOut {
|
||||
byteidx := idx / 8
|
||||
byteoff := uint(idx % 8)
|
||||
btxspent[idx] = (txspent[byteidx] & (byte(1) << byteoff)) != 0
|
||||
}
|
||||
}
|
||||
txlre := btcdb.TxListReply{Sha: txsha, Tx: tx, BlkSha: blockSha, Height: height, TxSpent: btxspent, Err: err}
|
||||
replies[i] = &txlre
|
||||
}
|
||||
return replies
|
||||
}
|
||||
|
||||
// fetchTxDataBySha returns several pieces of data regarding the given sha.
|
||||
func (db *LevelDb) fetchTxDataBySha(txsha *btcwire.ShaHash) (rtx *btcwire.MsgTx, rblksha *btcwire.ShaHash, rheight int64, rtxspent []byte, err error) {
|
||||
var blkHeight int64
|
||||
var txspent []byte
|
||||
var txOff, txLen int
|
||||
|
||||
blkHeight, txOff, txLen, txspent, err = db.getTxData(txsha)
|
||||
if err != nil {
|
||||
if err == leveldb.ErrNotFound {
|
||||
err = btcdb.TxShaMissing
|
||||
}
|
||||
return
|
||||
}
|
||||
return db.fetchTxDataByLoc(blkHeight, txOff, txLen, txspent)
|
||||
}
|
||||
|
||||
// fetchTxDataByLoc returns several pieces of data regarding the given tx
|
||||
// located by the block/offset/size location
|
||||
func (db *LevelDb) fetchTxDataByLoc(blkHeight int64, txOff int, txLen int, txspent []byte) (rtx *btcwire.MsgTx, rblksha *btcwire.ShaHash, rheight int64, rtxspent []byte, err error) {
|
||||
var blksha *btcwire.ShaHash
|
||||
var blkbuf []byte
|
||||
|
||||
blksha, blkbuf, err = db.getBlkByHeight(blkHeight)
|
||||
if err != nil {
|
||||
if err == leveldb.ErrNotFound {
|
||||
err = btcdb.TxShaMissing
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
//log.Trace("transaction %v is at block %v %v txoff %v, txlen %v\n",
|
||||
// txsha, blksha, blkHeight, txOff, txLen)
|
||||
|
||||
rbuf := bytes.NewBuffer(blkbuf[txOff : txOff+txLen])
|
||||
|
||||
var tx btcwire.MsgTx
|
||||
err = tx.Deserialize(rbuf)
|
||||
if err != nil {
|
||||
log.Warnf("unable to decode tx block %v %v txoff %v txlen %v",
|
||||
blkHeight, blksha, txOff, txLen)
|
||||
return
|
||||
}
|
||||
|
||||
return &tx, blksha, blkHeight, txspent, nil
|
||||
}
|
||||
|
||||
// FetchTxBySha returns some data for the given Tx Sha.
|
||||
func (db *LevelDb) FetchTxBySha(txsha *btcwire.ShaHash) ([]*btcdb.TxListReply, error) {
|
||||
replylen := 0
|
||||
replycnt := 0
|
||||
|
||||
tx, blksha, height, txspent, txerr := db.fetchTxDataBySha(txsha)
|
||||
if txerr == nil {
|
||||
replylen++
|
||||
} else {
|
||||
if txerr != btcdb.TxShaMissing {
|
||||
return []*btcdb.TxListReply{}, txerr
|
||||
}
|
||||
replylen++
|
||||
}
|
||||
|
||||
sTxList, fSerr := db.getTxFullySpent(txsha)
|
||||
|
||||
if fSerr != nil {
|
||||
if fSerr != btcdb.TxShaMissing {
|
||||
return []*btcdb.TxListReply{}, fSerr
|
||||
}
|
||||
} else {
|
||||
replylen += len(sTxList)
|
||||
}
|
||||
|
||||
replies := make ([]*btcdb.TxListReply, replylen)
|
||||
|
||||
|
||||
if fSerr == nil {
|
||||
for _, stx := range sTxList {
|
||||
tx, blksha, _, _, err := db.fetchTxDataByLoc(
|
||||
stx.blkHeight, stx.txoff, stx.txlen, []byte{})
|
||||
if err != nil {
|
||||
if err != leveldb.ErrNotFound {
|
||||
return []*btcdb.TxListReply{}, err
|
||||
}
|
||||
continue
|
||||
}
|
||||
btxspent := make([]bool, len(tx.TxOut), len(tx.TxOut))
|
||||
for i := range btxspent {
|
||||
btxspent[i] = true
|
||||
}
|
||||
txlre := btcdb.TxListReply{Sha: txsha, Tx: tx, BlkSha: blksha, Height: stx.blkHeight, TxSpent: btxspent, Err: nil}
|
||||
replies[replycnt] = &txlre
|
||||
replycnt++
|
||||
}
|
||||
}
|
||||
if txerr == nil {
|
||||
btxspent := make([]bool, len(tx.TxOut), len(tx.TxOut))
|
||||
for idx := range tx.TxOut {
|
||||
byteidx := idx / 8
|
||||
byteoff := uint(idx % 8)
|
||||
btxspent[idx] = (txspent[byteidx] & (byte(1) << byteoff)) != 0
|
||||
}
|
||||
txlre := btcdb.TxListReply{Sha: txsha, Tx: tx, BlkSha: blksha, Height: height, TxSpent: btxspent, Err: nil}
|
||||
replies[replycnt] = &txlre
|
||||
replycnt++
|
||||
}
|
||||
return replies, nil
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue