Remove deprecated sqlite3.

The sqlite3 backend has been deprecated for quite some time.  As a result,
it has not been updated with many of the more recent changes which means
the behavior no longer conforms to the interface contract.
This commit is contained in:
Dave Collins 2014-01-19 02:36:54 -06:00
parent 27bc18ba2e
commit 845aedf103
9 changed files with 0 additions and 2730 deletions

View file

@ -1,15 +0,0 @@
// Copyright (c) 2013-2014 Conformal Systems LLC.
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
/*
Package sqlite3 implements a sqlite3 instance of btcdb.
sqlite provides a zero setup, single file database. It requires cgo
and the presence of the sqlite library and headers, but nothing else.
The performance is generally high although it goes down with database
size.
Many of the block or tx specific functions for btcdb are in this subpackage.
*/
package sqlite3

View file

@ -1,140 +0,0 @@
// Copyright (c) 2013-2014 Conformal Systems LLC.
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
package sqlite3_test
import (
"github.com/conformal/btcdb"
"github.com/conformal/btcdb/sqlite3"
"os"
"path/filepath"
"testing"
)
func TestFailOperational(t *testing.T) {
sqlite3.SetTestingT(t)
failtestOperationalMode(t, dbTmDefault)
failtestOperationalMode(t, dbTmNormal)
failtestOperationalMode(t, dbTmFast)
failtestOperationalMode(t, dbTmNoVerify)
}
func failtestOperationalMode(t *testing.T, mode int) {
// simplified basic operation is:
// 1) fetch block from remote server
// 2) look up all txin (except coinbase in db)
// 3) insert block
// Ignore db remove errors since it means we didn't have an old one.
dbname := "tstdbop1"
_ = os.Remove(dbname)
db, err := btcdb.CreateDB("sqlite", dbname)
if err != nil {
t.Errorf("Failed to open test database %v", err)
return
}
defer os.Remove(dbname)
defer db.Close()
switch mode {
case dbTmDefault: // default
// no setup
case dbTmNormal: // explicit normal
db.SetDBInsertMode(btcdb.InsertNormal)
case dbTmFast: // fast mode
db.SetDBInsertMode(btcdb.InsertFast)
if sqldb, ok := db.(*sqlite3.SqliteDb); ok {
sqldb.TempTblMax = 100
} else {
t.Errorf("not right type")
}
case dbTmNoVerify: // validated block
// no point in testing this
return
}
// Since we are dealing with small dataset, reduce cache size
sqlite3.SetBlockCacheSize(db, 2)
sqlite3.SetTxCacheSize(db, 3)
testdatafile := filepath.Join("..", "testdata", "blocks1-256.bz2")
blocks, err := loadBlocks(t, testdatafile)
if err != nil {
t.Errorf("Unable to load blocks from test data for mode %v: %v",
mode, err)
return
}
err = nil
out:
for height := int64(0); height < int64(len(blocks)); height++ {
block := blocks[height]
mblock := block.MsgBlock()
blockname, _ := block.Sha()
if height == 248 {
// time to corrupt the datbase, to see if it leaves the block or tx in the db
if len(mblock.Transactions) != 2 {
t.Errorf("transaction #248 should have two transactions txid %v ?= 828ef3b079f9c23829c56fe86e85b4a69d9e06e5b54ea597eef5fb3ffef509fe", blockname)
return
}
tx := mblock.Transactions[1]
txin := tx.TxIn[0]
origintxsha := &txin.PreviousOutpoint.Hash
sqlite3.KillTx(db, origintxsha)
_, err = db.FetchTxBySha(origintxsha)
if err == nil {
t.Errorf("deleted tx found %v", origintxsha)
}
}
if height == 248 {
}
newheight, err := db.InsertBlock(block)
if err != nil {
if height != 248 {
t.Errorf("failed to insert block %v err %v", height, err)
break out
}
} else {
if height == 248 {
t.Errorf("block insert with missing input tx succeeded block %v err %v", height, err)
break out
}
}
if height == 248 {
for _, tx := range mblock.Transactions {
txsha, err := tx.TxSha()
_, err = db.FetchTxBySha(&txsha)
if err == nil {
t.Errorf("referenced tx found, should not have been %v, ", txsha)
}
}
}
if height == 248 {
exists := db.ExistsSha(blockname)
if exists == true {
t.Errorf("block still present after failed insert")
}
// if we got here with no error, testing was successful
break out
}
if newheight != height {
t.Errorf("height mismatch expect %v returned %v", height, newheight)
break out
}
}
switch mode {
case dbTmDefault: // default
// no cleanup
case dbTmNormal: // explicit normal
// no cleanup
case dbTmFast: // fast mode
db.SetDBInsertMode(btcdb.InsertNormal)
case dbTmNoVerify: // validated block
db.SetDBInsertMode(btcdb.InsertNormal)
}
}

View file

@ -1,209 +0,0 @@
// Copyright (c) 2013-2014 Conformal Systems LLC.
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
package sqlite3_test
import (
"github.com/conformal/btcdb"
"github.com/conformal/btcdb/sqlite3"
"github.com/conformal/btcutil"
"github.com/conformal/btcwire"
"os"
"path/filepath"
"testing"
)
var tstBlocks []*btcutil.Block
func loadblocks(t *testing.T) []*btcutil.Block {
if len(tstBlocks) != 0 {
return tstBlocks
}
testdatafile := filepath.Join("..", "testdata", "blocks1-256.bz2")
blocks, err := loadBlocks(t, testdatafile)
if err != nil {
t.Errorf("Unable to load blocks from test data: %v", err)
return nil
}
tstBlocks = blocks
return blocks
}
func TestUnspentInsert(t *testing.T) {
testUnspentInsert(t, dbTmDefault)
testUnspentInsert(t, dbTmNormal)
testUnspentInsert(t, dbTmFast)
}
// insert every block in the test chain
// after each insert, fetch all the tx affected by the latest
// block and verify that the the tx is spent/unspent
// new tx should be fully unspent, referenced tx should have
// the associated txout set to spent.
func testUnspentInsert(t *testing.T, mode int) {
// Ignore db remove errors since it means we didn't have an old one.
dbname := "tstdbuspnt1"
_ = os.Remove(dbname)
db, err := btcdb.CreateDB("sqlite", dbname)
if err != nil {
t.Errorf("Failed to open test database %v", err)
return
}
defer os.Remove(dbname)
defer db.Close()
switch mode {
case dbTmDefault: // default
// no setup
case dbTmNormal: // explicit normal
db.SetDBInsertMode(btcdb.InsertNormal)
case dbTmFast: // fast mode
db.SetDBInsertMode(btcdb.InsertFast)
if sqldb, ok := db.(*sqlite3.SqliteDb); ok {
sqldb.TempTblMax = 100
} else {
t.Errorf("not right type")
}
case dbTmNoVerify: // validated block
t.Errorf("UnspentInsert test is not valid in NoVerify mode")
}
// Since we are dealing with small dataset, reduce cache size
sqlite3.SetBlockCacheSize(db, 2)
sqlite3.SetTxCacheSize(db, 3)
blocks := loadblocks(t)
endtest:
for height := int64(0); height < int64(len(blocks)); height++ {
block := blocks[height]
// look up inputs to this x
mblock := block.MsgBlock()
var txneededList []*btcwire.ShaHash
var txlookupList []*btcwire.ShaHash
var txOutList []*btcwire.ShaHash
var txInList []*btcwire.OutPoint
for _, tx := range mblock.Transactions {
for _, txin := range tx.TxIn {
if txin.PreviousOutpoint.Index == uint32(4294967295) {
continue
}
origintxsha := &txin.PreviousOutpoint.Hash
txInList = append(txInList, &txin.PreviousOutpoint)
txneededList = append(txneededList, origintxsha)
txlookupList = append(txlookupList, origintxsha)
if !db.ExistsTxSha(origintxsha) {
t.Errorf("referenced tx not found %v ", origintxsha)
}
}
txshaname, _ := tx.TxSha()
txlookupList = append(txlookupList, &txshaname)
txOutList = append(txOutList, &txshaname)
}
txneededmap := map[btcwire.ShaHash]*btcdb.TxListReply{}
txlist := db.FetchUnSpentTxByShaList(txneededList)
for _, txe := range txlist {
if txe.Err != nil {
t.Errorf("tx list fetch failed %v err %v ", txe.Sha, txe.Err)
break endtest
}
txneededmap[*txe.Sha] = txe
}
for _, spend := range txInList {
itxe := txneededmap[spend.Hash]
if itxe.TxSpent[spend.Index] == true {
t.Errorf("txin %v:%v is already spent", spend.Hash, spend.Index)
}
}
newheight, err := db.InsertBlock(block)
if err != nil {
t.Errorf("failed to insert block %v err %v", height, err)
break endtest
}
if newheight != height {
t.Errorf("height mismatch expect %v returned %v", height, newheight)
break endtest
}
txlookupmap := map[btcwire.ShaHash]*btcdb.TxListReply{}
txlist = db.FetchUnSpentTxByShaList(txlookupList)
for _, txe := range txlist {
if txe.Err != nil {
t.Errorf("tx list fetch failed %v err %v ", txe.Sha, txe.Err)
break endtest
}
txlookupmap[*txe.Sha] = txe
}
for _, spend := range txInList {
itxe := txlookupmap[spend.Hash]
if itxe.TxSpent[spend.Index] == false {
t.Errorf("txin %v:%v is unspent %v", spend.Hash, spend.Index, itxe.TxSpent)
}
}
for _, txo := range txOutList {
itxe := txlookupmap[*txo]
for i, spent := range itxe.TxSpent {
if spent == true {
t.Errorf("freshly inserted tx %v already spent %v", txo, i)
}
}
}
if len(txInList) == 0 {
continue
}
dropblock := blocks[height-1]
dropsha, _ := dropblock.Sha()
err = db.DropAfterBlockBySha(dropsha)
if err != nil {
t.Errorf("failed to drop block %v err %v", height, err)
break endtest
}
txlookupmap = map[btcwire.ShaHash]*btcdb.TxListReply{}
txlist = db.FetchUnSpentTxByShaList(txlookupList)
for _, txe := range txlist {
if txe.Err != nil {
if _, ok := txneededmap[*txe.Sha]; ok {
t.Errorf("tx list fetch failed %v err %v ", txe.Sha, txe.Err)
break endtest
}
}
txlookupmap[*txe.Sha] = txe
}
for _, spend := range txInList {
itxe := txlookupmap[spend.Hash]
if itxe.TxSpent[spend.Index] == true {
t.Errorf("txin %v:%v is unspent %v", spend.Hash, spend.Index, itxe.TxSpent)
}
}
newheight, err = db.InsertBlock(block)
if err != nil {
t.Errorf("failed to insert block %v err %v", height, err)
break endtest
}
txlookupmap = map[btcwire.ShaHash]*btcdb.TxListReply{}
txlist = db.FetchUnSpentTxByShaList(txlookupList)
for _, txe := range txlist {
if txe.Err != nil {
t.Errorf("tx list fetch failed %v err %v ", txe.Sha, txe.Err)
break endtest
}
txlookupmap[*txe.Sha] = txe
}
for _, spend := range txInList {
itxe := txlookupmap[spend.Hash]
if itxe.TxSpent[spend.Index] == false {
t.Errorf("txin %v:%v is unspent %v", spend.Hash, spend.Index, itxe.TxSpent)
}
}
}
}

View file

@ -1,84 +0,0 @@
// Copyright (c) 2013-2014 Conformal Systems LLC.
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
package sqlite3
import (
"fmt"
"github.com/conformal/btcdb"
"github.com/conformal/btcwire"
"testing"
)
var t *testing.T
func SetTestingT(t_arg *testing.T) {
t = t_arg
}
// FetchSha returns the datablock and pver for the given ShaHash.
// This is a testing only interface.
func FetchSha(db btcdb.Db, sha *btcwire.ShaHash) (buf []byte, pver uint32,
blkid int64, err error) {
sqldb, ok := db.(*SqliteDb)
if !ok {
err = fmt.Errorf("Invalid data type")
return
}
buf, pver, blkid, err = sqldb.fetchSha(*sha)
return
}
// SetBlockCacheSize configures the maximum number of blocks in the cache to
// be the given size should be made before any fetching.
// This is a testing only interface.
func SetBlockCacheSize(db btcdb.Db, newsize int) {
sqldb, ok := db.(*SqliteDb)
if !ok {
return
}
bc := &sqldb.blockCache
bc.maxcount = newsize
}
// SetTxCacheSize configures the maximum number of tx in the cache to
// be the given size should be made before any fetching.
// This is a testing only interface.
func SetTxCacheSize(db btcdb.Db, newsize int) {
sqldb, ok := db.(*SqliteDb)
if !ok {
return
}
tc := &sqldb.txCache
tc.maxcount = newsize
}
// KillTx is a function that deletes a transaction from the database
// this should only be used for testing purposes to valiate error paths
// in the database. This is _expected_ to leave the database in an
// inconsistant state.
func KillTx(dbarg btcdb.Db, txsha *btcwire.ShaHash) {
db, ok := dbarg.(*SqliteDb)
if !ok {
return
}
db.endTx(false)
db.startTx()
tx := &db.txState
key := txsha.String()
_, err := tx.tx.Exec("DELETE FROM txtmp WHERE key == ?", key)
if err != nil {
log.Warnf("error deleting tx %v from txtmp", txsha)
}
_, err = tx.tx.Exec("DELETE FROM tx WHERE key == ?", key)
if err != nil {
log.Warnf("error deleting tx %v from tx (%v)", txsha, key)
}
err = db.endTx(true)
if err != nil {
// XXX
db.endTx(false)
}
db.InvalidateCache()
}

View file

@ -1,387 +0,0 @@
// Copyright (c) 2013-2014 Conformal Systems LLC.
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
package sqlite3_test
import (
"compress/bzip2"
"encoding/binary"
"github.com/conformal/btcdb"
"github.com/conformal/btcdb/sqlite3"
"github.com/conformal/btcutil"
"github.com/conformal/btcwire"
"io"
"os"
"path/filepath"
"strings"
"testing"
)
var network = btcwire.MainNet
const (
dbTmDefault = iota
dbTmNormal
dbTmFast
dbTmNoVerify
)
func TestOperational(t *testing.T) {
testOperationalMode(t, dbTmDefault)
testOperationalMode(t, dbTmNormal)
testOperationalMode(t, dbTmFast)
testOperationalMode(t, dbTmNoVerify)
}
func testOperationalMode(t *testing.T, mode int) {
// simplified basic operation is:
// 1) fetch block from remote server
// 2) look up all txin (except coinbase in db)
// 3) insert block
// Ignore db remove errors since it means we didn't have an old one.
dbname := "tstdbop1"
_ = os.Remove(dbname)
db, err := btcdb.CreateDB("sqlite", dbname)
if err != nil {
t.Errorf("Failed to open test database %v", err)
return
}
defer os.Remove(dbname)
defer db.Close()
switch mode {
case dbTmDefault: // default
// no setup
case dbTmNormal: // explicit normal
db.SetDBInsertMode(btcdb.InsertNormal)
case dbTmFast: // fast mode
db.SetDBInsertMode(btcdb.InsertFast)
if sqldb, ok := db.(*sqlite3.SqliteDb); ok {
sqldb.TempTblMax = 100
} else {
t.Errorf("not right type")
}
case dbTmNoVerify: // validated block
db.SetDBInsertMode(btcdb.InsertValidatedInput)
}
// Since we are dealing with small dataset, reduce cache size
sqlite3.SetBlockCacheSize(db, 2)
sqlite3.SetTxCacheSize(db, 3)
testdatafile := filepath.Join("..", "testdata", "blocks1-256.bz2")
blocks, err := loadBlocks(t, testdatafile)
if err != nil {
t.Errorf("Unable to load blocks from test data for mode %v: %v",
mode, err)
return
}
err = nil
out:
for height := int64(0); height < int64(len(blocks)); height++ {
block := blocks[height]
if mode != dbTmNoVerify {
// except for NoVerify which does not allow lookups check inputs
mblock := block.MsgBlock()
var txneededList []*btcwire.ShaHash
for _, tx := range mblock.Transactions {
for _, txin := range tx.TxIn {
if txin.PreviousOutpoint.Index == uint32(4294967295) {
continue
}
origintxsha := &txin.PreviousOutpoint.Hash
txneededList = append(txneededList, origintxsha)
if !db.ExistsTxSha(origintxsha) {
t.Errorf("referenced tx not found %v ", origintxsha)
}
_, err = db.FetchTxBySha(origintxsha)
if err != nil {
t.Errorf("referenced tx not found %v err %v ", origintxsha, err)
}
}
}
txlist := db.FetchUnSpentTxByShaList(txneededList)
for _, txe := range txlist {
if txe.Err != nil {
t.Errorf("tx list fetch failed %v err %v ", txe.Sha, txe.Err)
break out
}
}
}
newheight, err := db.InsertBlock(block)
if err != nil {
t.Errorf("failed to insert block %v err %v", height, err)
break out
}
if newheight != height {
t.Errorf("height mismatch expect %v returned %v", height, newheight)
break out
}
newSha, blkid, err := db.NewestSha()
if err != nil {
t.Errorf("failed to obtain latest sha %v %v", height, err)
}
if blkid != height {
t.Errorf("height does not match latest block height %v %v", blkid, height)
}
blkSha, _ := block.Sha()
if *newSha != *blkSha {
t.Errorf("Newest block sha does not match freshly inserted one %v %v ", newSha, blkSha)
}
}
// now that db is populated, do some additional test
testFetchRangeHeight(t, db, blocks)
switch mode {
case dbTmDefault: // default
// no cleanup
case dbTmNormal: // explicit normal
// no cleanup
case dbTmFast: // fast mode
db.SetDBInsertMode(btcdb.InsertNormal)
case dbTmNoVerify: // validated block
db.SetDBInsertMode(btcdb.InsertNormal)
}
}
func TestBackout(t *testing.T) {
testBackout(t, dbTmDefault)
testBackout(t, dbTmNormal)
testBackout(t, dbTmFast)
}
func testBackout(t *testing.T, mode int) {
// simplified basic operation is:
// 1) fetch block from remote server
// 2) look up all txin (except coinbase in db)
// 3) insert block
// Ignore db remove errors since it means we didn't have an old one.
dbname := "tstdbop2"
_ = os.Remove(dbname)
db, err := btcdb.CreateDB("sqlite", dbname)
if err != nil {
t.Errorf("Failed to open test database %v", err)
return
}
defer os.Remove(dbname)
defer db.Close()
switch mode {
case dbTmDefault: // default
// no setup
case dbTmNormal: // explicit normal
db.SetDBInsertMode(btcdb.InsertNormal)
case dbTmFast: // fast mode
db.SetDBInsertMode(btcdb.InsertFast)
if sqldb, ok := db.(*sqlite3.SqliteDb); ok {
sqldb.TempTblMax = 100
} else {
t.Errorf("not right type")
}
}
// Since we are dealing with small dataset, reduce cache size
sqlite3.SetBlockCacheSize(db, 2)
sqlite3.SetTxCacheSize(db, 3)
testdatafile := filepath.Join("..", "testdata", "blocks1-256.bz2")
blocks, err := loadBlocks(t, testdatafile)
if len(blocks) < 120 {
t.Errorf("test data too small")
return
}
err = nil
for height := int64(0); height < int64(len(blocks)); height++ {
if height == 100 {
t.Logf("Syncing at block height 100")
db.Sync()
}
if height == 120 {
t.Logf("Simulating unexpected application quit")
// Simulate unexpected application quit
db.RollbackClose()
break
}
block := blocks[height]
newheight, err := db.InsertBlock(block)
if err != nil {
t.Errorf("failed to insert block %v err %v", height, err)
break
}
if newheight != height {
t.Errorf("height mismatch expect %v returned %v", height, newheight)
break
}
}
// db was closed at height 120, so no cleanup is possible.
// reopen db
db, err = btcdb.OpenDB("sqlite", dbname)
if err != nil {
t.Errorf("Failed to open test database %v", err)
return
}
defer db.Close()
sha, err := blocks[99].Sha()
if err != nil {
t.Errorf("failed to get block 99 sha err %v", err)
return
}
_ = db.ExistsSha(sha)
_, err = db.FetchBlockBySha(sha)
if err != nil {
t.Errorf("failed to load block 99 from db %v", err)
}
sha, err = blocks[110].Sha()
if err != nil {
t.Errorf("failed to get block 110 sha err %v", err)
return
}
_ = db.ExistsSha(sha)
_, err = db.FetchBlockBySha(sha)
if err == nil {
t.Errorf("loaded block 110 from db, failure expected")
return
}
block := blocks[110]
mblock := block.MsgBlock()
txsha, err := mblock.Transactions[0].TxSha()
exists := db.ExistsTxSha(&txsha)
if exists {
t.Errorf("tx %v exists in db, failure expected", txsha)
}
_, err = db.FetchTxBySha(&txsha)
_, err = db.FetchTxBySha(&txsha)
}
func loadBlocks(t *testing.T, file string) (blocks []*btcutil.Block, err error) {
testdatafile := filepath.Join("..", "testdata", "blocks1-256.bz2")
var dr io.Reader
var fi io.ReadCloser
fi, err = os.Open(testdatafile)
if err != nil {
t.Errorf("failed to open file %v, err %v", testdatafile, err)
return
}
if strings.HasSuffix(testdatafile, ".bz2") {
z := bzip2.NewReader(fi)
dr = z
} else {
dr = fi
}
defer func() {
if err := fi.Close(); err != nil {
t.Errorf("failed to close file %v %v", testdatafile, err)
}
}()
// Set the first block as the genesis block.
genesis := btcutil.NewBlock(&btcwire.GenesisBlock)
blocks = append(blocks, genesis)
var block *btcutil.Block
err = nil
for height := int64(1); err == nil; height++ {
var rintbuf uint32
err = binary.Read(dr, binary.LittleEndian, &rintbuf)
if err == io.EOF {
// hit end of file at expected offset: no warning
height--
err = nil
break
}
if err != nil {
t.Errorf("failed to load network type, err %v", err)
break
}
if rintbuf != uint32(network) {
t.Errorf("Block doesn't match network: %v expects %v",
rintbuf, network)
break
}
err = binary.Read(dr, binary.LittleEndian, &rintbuf)
blocklen := rintbuf
rbytes := make([]byte, blocklen)
// read block
dr.Read(rbytes)
block, err = btcutil.NewBlockFromBytes(rbytes)
if err != nil {
t.Errorf("failed to parse block %v", height)
return
}
blocks = append(blocks, block)
}
return
}
func testFetchRangeHeight(t *testing.T, db btcdb.Db, blocks []*btcutil.Block) {
var testincrement int64 = 50
var testcnt int64 = 100
shanames := make([]*btcwire.ShaHash, len(blocks))
nBlocks := int64(len(blocks))
for i := range blocks {
blockSha, err := blocks[i].Sha()
if err != nil {
t.Errorf("FetchRangeHeight: unexpected failure computing block sah %v", err)
}
shanames[i] = blockSha
}
for startheight := int64(0); startheight < nBlocks; startheight += testincrement {
endheight := startheight + testcnt
if endheight > nBlocks {
endheight = btcdb.AllShas
}
shalist, err := db.FetchHeightRange(startheight, endheight)
if err != nil {
t.Errorf("FetchRangeHeight: unexpected failure looking up shas %v", err)
}
if endheight == btcdb.AllShas {
if int64(len(shalist)) != nBlocks-startheight {
t.Errorf("FetchRangeHeight: expected A %v shas, got %v", nBlocks-startheight, len(shalist))
}
} else {
if int64(len(shalist)) != testcnt {
t.Errorf("FetchRangeHeight: expected %v shas, got %v", testcnt, len(shalist))
}
}
for i := range shalist {
if *shanames[int64(i)+startheight] != shalist[i] {
t.Errorf("FetchRangeHeight: mismatch sha at %v requested range %v %v ", int64(i)+startheight, startheight, endheight)
}
}
}
}

View file

@ -1,881 +0,0 @@
// Copyright (c) 2013-2014 Conformal Systems LLC.
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
package sqlite3
import (
"database/sql"
"fmt"
"github.com/conformal/btcdb"
"github.com/conformal/btclog"
"github.com/conformal/btcutil"
"github.com/conformal/btcwire"
_ "github.com/mattn/go-sqlite3"
"os"
"sync"
)
const (
dbVersion int = 2
dbMaxTransCnt = 20000
dbMaxTransMem = 64 * 1024 * 1024 // 64 MB
)
const (
blkInsertSha = iota
blkFetchSha
blkExistsSha
blkFetchIdx
blkFetchIdxList
)
const (
txInsertStmt = iota
txFetchUsedByShaStmt
txFetchLocationByShaStmt
txFetchLocUsedByShaStmt
txUpdateUsedByShaStmt
txtmpInsertStmt
txtmpFetchUsedByShaStmt
txtmpFetchLocationByShaStmt
txtmpFetchLocUsedByShaStmt
txtmpUpdateUsedByShaStmt
txMigrateCopy
txMigrateClear
txMigratePrep
txMigrateFinish
txMigrateCount
txPragmaVacuumOn
txPragmaVacuumOff
txVacuum
txExistsShaStmt
txtmpExistsShaStmt
)
var blkqueries []string = []string{
blkInsertSha: "INSERT INTO block (key, pver, data) VALUES(?, ?, ?);",
blkFetchSha: "SELECT pver, data, blockid FROM block WHERE key = ?;",
blkExistsSha: "SELECT pver FROM block WHERE key = ?;",
blkFetchIdx: "SELECT key FROM block WHERE blockid = ?;",
blkFetchIdxList: "SELECT key FROM block WHERE blockid >= ? AND blockid < ? ORDER BY blockid ASC LIMIT 500;",
}
var txqueries []string = []string{
txInsertStmt: "INSERT INTO tx (key, blockid, txoff, txlen, data) VALUES(?, ?, ?, ?, ?);",
txFetchUsedByShaStmt: "SELECT data FROM tx WHERE key = ?;",
txFetchLocationByShaStmt: "SELECT blockid, txoff, txlen FROM tx WHERE key = ?;",
txFetchLocUsedByShaStmt: "SELECT blockid, txoff, txlen, data FROM tx WHERE key = ?;",
txUpdateUsedByShaStmt: "UPDATE tx SET data = ? WHERE key = ?;",
txtmpInsertStmt: "INSERT INTO txtmp (key, blockid, txoff, txlen, data) VALUES(?, ?, ?, ?, ?);",
txtmpFetchUsedByShaStmt: "SELECT data FROM txtmp WHERE key = ?;",
txtmpFetchLocationByShaStmt: "SELECT blockid, txoff, txlen FROM txtmp WHERE key = ?;",
txtmpFetchLocUsedByShaStmt: "SELECT blockid, txoff, txlen, data FROM txtmp WHERE key = ?;",
txtmpUpdateUsedByShaStmt: "UPDATE txtmp SET data = ? WHERE key = ?;",
txMigrateCopy: "INSERT INTO tx (key, blockid, txoff, txlen, data) SELECT key, blockid, txoff, txlen, data FROM txtmp;",
txMigrateClear: "DELETE from txtmp;",
txMigratePrep: "DROP index IF EXISTS uniquetx;",
txMigrateFinish: "CREATE UNIQUE INDEX IF NOT EXISTS uniquetx ON tx (key);",
txMigrateCount: "SELECT COUNT(*) FROM txtmp;",
txPragmaVacuumOn: "PRAGMA auto_vacuum = FULL;",
txPragmaVacuumOff: "PRAGMA auto_vacuum = NONE;",
txVacuum: "VACUUM;",
txExistsShaStmt: "SELECT blockid FROM tx WHERE key = ?;",
txtmpExistsShaStmt: "SELECT blockid FROM txtmp WHERE key = ?;",
}
var log = btclog.Disabled
type tBlockInsertData struct {
sha btcwire.ShaHash
pver uint32
buf []byte
}
type tTxInsertData struct {
txsha *btcwire.ShaHash
blockid int64
txoff int
txlen int
usedbuf []byte
}
type txState struct {
tx *sql.Tx
writeCount int
txDataSz int
txInsertList []interface{}
}
type SqliteDb struct {
sqldb *sql.DB
blkStmts []*sql.Stmt
blkBaseStmts []*sql.Stmt
txStmts []*sql.Stmt
txBaseStmts []*sql.Stmt
txState txState
dbLock sync.Mutex
lastBlkShaCached bool
lastBlkSha btcwire.ShaHash
lastBlkIdx int64
txCache txCache
blockCache blockCache
UseTempTX bool
TempTblSz int
TempTblMax int
dbInsertMode btcdb.InsertMode
}
var self = btcdb.DriverDB{DbType: "sqlite", Create: CreateSqliteDB, Open: OpenSqliteDB}
func init() {
btcdb.AddDBDriver(self)
}
// createDB configure the database, setting up all tables to initial state.
func createDB(db *sql.DB) error {
log.Infof("Initializing new block database")
// XXX check for old tables
buildTables := []string{
"CREATE TABLE dbversion (version integer);",
"CREATE TABLE block ( blockid INTEGER PRIMARY KEY, key BLOB UNIQUE, " +
"pver INTEGER NOT NULL, data BLOB NOT NULL);",
"INSERT INTO dbversion (version) VALUES (" + fmt.Sprintf("%d", dbVersion) +
");",
}
buildtxTables := []string{
"CREATE TABLE tx (txidx INTEGER PRIMARY KEY, " +
"key TEXT, " +
"blockid INTEGER NOT NULL, " +
"txoff INTEGER NOT NULL, txlen INTEGER NOT NULL, " +
"data BLOB NOT NULL, " +
"FOREIGN KEY(blockid) REFERENCES block(blockid));",
"CREATE TABLE txtmp (key TEXT PRIMARY KEY, " +
"blockid INTEGER NOT NULL, " +
"txoff INTEGER NOT NULL, txlen INTEGER NOT NULL, " +
"data BLOB NOT NULL, " +
"FOREIGN KEY(blockid) REFERENCES block(blockid));",
"CREATE UNIQUE INDEX uniquetx ON tx (key);",
}
for _, sql := range buildTables {
_, err := db.Exec(sql)
if err != nil {
log.Warnf("sql table op failed %v [%v]", err, sql)
return err
}
}
for _, sql := range buildtxTables {
_, err := db.Exec(sql)
if err != nil {
log.Warnf("sql table op failed %v [%v]", err, sql)
return err
}
}
return nil
}
// OpenSqliteDB opens an existing database for use.
func OpenSqliteDB(filepath string) (pbdb btcdb.Db, err error) {
log = btcdb.GetLog()
return newOrCreateSqliteDB(filepath, false)
}
// CreateSqliteDB creates, initializes and opens a database for use.
func CreateSqliteDB(filepath string) (pbdb btcdb.Db, err error) {
log = btcdb.GetLog()
return newOrCreateSqliteDB(filepath, true)
}
// newOrCreateSqliteDB opens a database, either creating it or opens
// existing database based on flag.
func newOrCreateSqliteDB(filepath string, create bool) (pbdb btcdb.Db, err error) {
var bdb SqliteDb
if create == false {
_, err = os.Stat(filepath)
if err != nil {
return nil, btcdb.DbDoesNotExist
}
}
db, err := sql.Open("sqlite3", filepath)
if err != nil {
log.Warnf("db open failed %v\n", err)
return nil, err
}
db.Exec("PRAGMA page_size=4096;")
db.Exec("PRAGMA foreign_keys=ON;")
db.Exec("PRAGMA journal_mode=WAL;")
dbverstmt, err := db.Prepare("SELECT version FROM dbversion;")
if err != nil {
// about the only reason this would fail is that the database
// is not initialized
if create == false {
return nil, btcdb.DbDoesNotExist
}
err = createDB(db)
if err != nil {
// already warned in the called function
return nil, err
}
dbverstmt, err = db.Prepare("SELECT version FROM dbversion;")
if err != nil {
// if it failed this a second time, fail.
return nil, err
}
}
row := dbverstmt.QueryRow()
var version int
err = row.Scan(&version)
if err != nil {
log.Warnf("unable to find db version: no row\n", err)
}
switch version {
case dbVersion:
// all good
default:
log.Warnf("mismatch db version: %v expected %v\n", version, dbVersion)
return nil, fmt.Errorf("Invalid version in database")
}
bdb.sqldb = db
bdb.blkStmts = make([]*sql.Stmt, len(blkqueries))
bdb.blkBaseStmts = make([]*sql.Stmt, len(blkqueries))
for i := range blkqueries {
stmt, err := db.Prepare(blkqueries[i])
if err != nil {
// XXX log/
return nil, err
}
bdb.blkBaseStmts[i] = stmt
}
for i := range bdb.blkBaseStmts {
bdb.blkStmts[i] = bdb.blkBaseStmts[i]
}
bdb.txBaseStmts = make([]*sql.Stmt, len(txqueries))
for i := range txqueries {
stmt, err := db.Prepare(txqueries[i])
if err != nil {
// XXX log/
return nil, err
}
bdb.txBaseStmts[i] = stmt
}
// NOTE: all array entries in txStmts remain nil'ed
// tx statements are lazy bound
bdb.txStmts = make([]*sql.Stmt, len(txqueries))
bdb.blockCache.maxcount = 150
bdb.blockCache.blockMap = map[btcwire.ShaHash]*blockCacheObj{}
bdb.blockCache.blockMap = map[btcwire.ShaHash]*blockCacheObj{}
bdb.blockCache.blockHeightMap = map[int64]*blockCacheObj{}
bdb.txCache.maxcount = 2000
bdb.txCache.txMap = map[btcwire.ShaHash]*txCacheObj{}
bdb.UseTempTX = true
bdb.TempTblMax = 1000000
return &bdb, nil
}
// Sync verifies that the database is coherent on disk,
// and no outstanding transactions are in flight.
func (db *SqliteDb) Sync() {
db.dbLock.Lock()
defer db.dbLock.Unlock()
db.endTx(true)
}
// syncPoint notifies the db that this is a safe time to sync the database,
// if there are many outstanding transactions.
// Must be called with db lock held.
func (db *SqliteDb) syncPoint() {
tx := &db.txState
if db.TempTblSz > db.TempTblMax {
err := db.migrateTmpTable()
if err != nil {
return
}
} else {
if len(tx.txInsertList) > dbMaxTransCnt || tx.txDataSz > dbMaxTransMem {
db.endTx(true)
}
}
}
// Close cleanly shuts down database, syncing all data.
func (db *SqliteDb) Close() {
db.dbLock.Lock()
defer db.dbLock.Unlock()
db.close()
}
// RollbackClose discards the recent database changes to the previously
// saved data at last Sync.
func (db *SqliteDb) RollbackClose() {
db.dbLock.Lock()
defer db.dbLock.Unlock()
tx := &db.txState
if tx.tx != nil {
err := tx.tx.Rollback()
if err != nil {
log.Debugf("Rollback failed: %v", err)
} else {
tx.tx = nil
}
}
db.close()
}
// close performs the internal shutdown/close operation.
func (db *SqliteDb) close() {
db.endTx(true)
db.InvalidateCache()
for i := range db.blkBaseStmts {
db.blkBaseStmts[i].Close()
}
for i := range db.txBaseStmts {
if db.txBaseStmts[i] != nil {
db.txBaseStmts[i].Close()
db.txBaseStmts[i] = nil
}
}
db.sqldb.Close()
}
// txop returns the appropriately prepared statement, based on
// transaction state of the database.
func (db *SqliteDb) txop(op int) *sql.Stmt {
if db.txStmts[op] != nil {
return db.txStmts[op]
}
if db.txState.tx == nil {
// we are not in a transaction, return the base statement
return db.txBaseStmts[op]
}
if db.txStmts[op] == nil {
db.txStmts[op] = db.txState.tx.Stmt(db.txBaseStmts[op])
}
return db.txStmts[op]
}
// startTx starts a transaction, preparing or scrubbing statements
// for proper operation inside a transaction.
func (db *SqliteDb) startTx() (err error) {
tx := &db.txState
if tx.tx != nil {
// this shouldn't happen...
log.Warnf("Db startTx called while in a transaction")
return
}
tx.tx, err = db.sqldb.Begin()
if err != nil {
log.Warnf("Db startTx: begin failed %v", err)
tx.tx = nil
return
}
for i := range db.blkBaseStmts {
db.blkStmts[i] = tx.tx.Stmt(db.blkBaseStmts[i])
}
for i := range db.txBaseStmts {
db.txStmts[i] = nil // these are lazily prepared
}
return
}
// endTx commits the current active transaction, it zaps all of the prepared
// statements associated with the transaction.
func (db *SqliteDb) endTx(recover bool) (err error) {
tx := &db.txState
if tx.tx == nil {
return
}
err = tx.tx.Commit()
if err != nil && recover {
// XXX - double check that the tx is dead after
// commit failure (rollback?)
log.Warnf("Db endTx: commit failed %v", err)
err = db.rePlayTransaction()
if err != nil {
// We tried, return failure (after zeroing state)
// so the upper level can notice and restart
}
}
for i := range db.blkBaseStmts {
db.blkStmts[i].Close()
db.blkStmts[i] = db.blkBaseStmts[i]
}
for i := range db.txStmts {
if db.txStmts[i] != nil {
db.txStmts[i].Close()
db.txStmts[i] = nil
}
}
tx.tx = nil
var emptyTxList []interface{}
tx.txInsertList = emptyTxList
tx.txDataSz = 0
return
}
// rePlayTransaction will attempt to re-execute inserts performed
// sync the beginning of a transaction. This is to be used after
// a sql Commit operation fails to keep the database from losing data.
func (db *SqliteDb) rePlayTransaction() (err error) {
err = db.startTx()
if err != nil {
return
}
tx := &db.txState
for _, ins := range tx.txInsertList {
switch v := ins.(type) {
case tBlockInsertData:
block := v
_, err = db.blkStmts[blkInsertSha].Exec(block.sha.Bytes(),
block.pver, block.buf)
if err != nil {
break
}
case tTxInsertData:
txd := v
txnamebytes := txd.txsha.Bytes()
txop := db.txop(txInsertStmt)
_, err = txop.Exec(txd.blockid, txnamebytes, txd.txoff,
txd.txlen, txd.usedbuf)
if err != nil {
break
}
}
}
// This function is called even if we have failed.
// We need to clean up so the database can be used again.
// However we want the original error not any new error,
// unless there was no original error but the commit fails.
err2 := db.endTx(false)
if err == nil && err2 != nil {
err = err2
}
return
}
// DropAfterBlockBySha will remove any blocks from the database after the given block.
// It terminates any existing transaction and performs its operations in an
// atomic transaction, it is terminated (committed) before exit.
func (db *SqliteDb) DropAfterBlockBySha(sha *btcwire.ShaHash) (err error) {
var row *sql.Row
db.dbLock.Lock()
defer db.dbLock.Unlock()
// This is a destructive operation and involves multiple requests
// so requires a transaction, terminate any transaction to date
// and start a new transaction
err = db.endTx(true)
if err != nil {
return err
}
err = db.startTx()
if err != nil {
return err
}
var startheight int64
if db.lastBlkShaCached {
startheight = db.lastBlkIdx
} else {
querystr := "SELECT blockid FROM block ORDER BY blockid DESC;"
tx := &db.txState
if tx.tx != nil {
row = tx.tx.QueryRow(querystr)
} else {
row = db.sqldb.QueryRow(querystr)
}
var startblkidx int64
err = row.Scan(&startblkidx)
if err != nil {
log.Warnf("DropAfterBlockBySha:unable to fetch blockheight %v", err)
return err
}
startheight = startblkidx
}
// also drop any cached sha data
db.lastBlkShaCached = false
querystr := "SELECT blockid FROM block WHERE key = ?;"
tx := &db.txState
row = tx.tx.QueryRow(querystr, sha.Bytes())
var keepidx int64
err = row.Scan(&keepidx)
if err != nil {
// XXX
db.endTx(false)
return err
}
for height := startheight; height > keepidx; height = height - 1 {
var blk *btcutil.Block
blkc, ok := db.fetchBlockHeightCache(height)
if ok {
blk = blkc.blk
} else {
// must load the block from the db
sha, err = db.fetchBlockShaByHeight(height - 1)
if err != nil {
return
}
var buf []byte
buf, _, _, err = db.fetchSha(*sha)
if err != nil {
return
}
blk, err = btcutil.NewBlockFromBytes(buf)
if err != nil {
return
}
}
for _, tx := range blk.MsgBlock().Transactions {
err = db.unSpend(tx)
if err != nil {
return
}
}
}
// invalidate the cache after possibly using cached entries for block
// lookup to unspend coins in them
db.InvalidateCache()
return db.delFromDB(keepidx)
}
func (db *SqliteDb) delFromDB(keepidx int64) error {
tx := &db.txState
_, err := tx.tx.Exec("DELETE FROM txtmp WHERE blockid > ?", keepidx)
if err != nil {
// XXX
db.endTx(false)
return err
}
_, err = tx.tx.Exec("DELETE FROM tx WHERE blockid > ?", keepidx)
if err != nil {
// XXX
db.endTx(false)
return err
}
// delete from block last in case of foreign keys
_, err = tx.tx.Exec("DELETE FROM block WHERE blockid > ?", keepidx)
if err != nil {
// XXX
db.endTx(false)
return err
}
err = db.endTx(true)
if err != nil {
return err
}
return err
}
// InsertBlock inserts raw block and transaction data from a block into the
// database. The first block inserted into the database will be treated as the
// genesis block. Every subsequent block insert requires the referenced parent
// block to already exist.
func (db *SqliteDb) InsertBlock(block *btcutil.Block) (int64, error) {
db.dbLock.Lock()
defer db.dbLock.Unlock()
blocksha, err := block.Sha()
if err != nil {
log.Warnf("Failed to compute block sha %v", blocksha)
return -1, err
}
mblock := block.MsgBlock()
rawMsg, err := block.Bytes()
if err != nil {
log.Warnf("Failed to obtain raw block sha %v", blocksha)
return -1, err
}
txloc, err := block.TxLoc()
if err != nil {
log.Warnf("Failed to obtain raw block sha %v", blocksha)
return -1, err
}
// Insert block into database
newheight, err := db.insertBlockData(blocksha, &mblock.Header.PrevBlock,
0, rawMsg)
if err != nil {
log.Warnf("Failed to insert block %v %v %v", blocksha,
&mblock.Header.PrevBlock, err)
return -1, err
}
txinsertidx := -1
success := false
defer func() {
if success {
return
}
for txidx := 0; txidx <= txinsertidx; txidx++ {
tx := mblock.Transactions[txidx]
err = db.unSpend(tx)
if err != nil {
log.Warnf("unSpend error during block insert unwind %v %v %v", blocksha, txidx, err)
}
}
err = db.delFromDB(newheight - 1)
if err != nil {
log.Warnf("Error during block insert unwind %v %v", blocksha, err)
}
}()
// At least two blocks in the long past were generated by faulty
// miners, the sha of the transaction exists in a previous block,
// detect this condition and 'accept' the block.
for txidx, tx := range mblock.Transactions {
var txsha btcwire.ShaHash
txsha, err = tx.TxSha()
if err != nil {
log.Warnf("failed to compute tx name block %v idx %v err %v", blocksha, txidx, err)
return -1, err
}
// num tx inserted, thus would need unwind if failure occurs
txinsertidx = txidx
// Some old blocks contain duplicate transactions
// Attempt to cleanly bypass this problem
// http://blockexplorer.com/b/91842
// http://blockexplorer.com/b/91880
if newheight == 91842 {
dupsha, err := btcwire.NewShaHashFromStr("d5d27987d2a3dfc724e359870c6644b40e497bdc0589a033220fe15429d88599")
if err != nil {
panic("invalid sha string in source")
}
if txsha == *dupsha {
log.Tracef("skipping sha %v %v", dupsha, newheight)
continue
}
}
if newheight == 91880 {
dupsha, err := btcwire.NewShaHashFromStr("e3bf3d07d4b0375638d5f1db5255fe07ba2c4cb067cd81b84ee974b6585fb468")
if err != nil {
panic("invalid sha string in source")
}
if txsha == *dupsha {
log.Tracef("skipping sha %v %v", dupsha, newheight)
continue
}
}
spentbuflen := (len(tx.TxOut) + 7) / 8
spentbuf := make([]byte, spentbuflen, spentbuflen)
if len(tx.TxOut)%8 != 0 {
for i := uint(len(tx.TxOut) % 8); i < 8; i++ {
spentbuf[spentbuflen-1] |= (byte(1) << i)
}
}
err = db.insertTx(&txsha, newheight, txloc[txidx].TxStart, txloc[txidx].TxLen, spentbuf)
if err != nil {
log.Warnf("block %v idx %v failed to insert tx %v %v err %v", blocksha, newheight, &txsha, txidx, err)
var oBlkIdx int64
oBlkIdx, _, _, err = db.fetchLocationBySha(&txsha)
log.Warnf("oblkidx %v err %v", oBlkIdx, err)
return -1, err
}
err = db.doSpend(tx)
if err != nil {
log.Warnf("block %v idx %v failed to spend tx %v %v err %v", blocksha, newheight, &txsha, txidx, err)
return -1, err
}
}
success = true
db.syncPoint()
return newheight, nil
}
// SetDBInsertMode provides hints to the database to how the application
// is running this allows the database to work in optimized modes when the
// database may be very busy.
func (db *SqliteDb) SetDBInsertMode(newmode btcdb.InsertMode) {
oldMode := db.dbInsertMode
switch newmode {
case btcdb.InsertNormal:
// Normal mode inserts tx directly into the tx table
db.UseTempTX = false
db.dbInsertMode = newmode
switch oldMode {
case btcdb.InsertFast:
if db.TempTblSz != 0 {
err := db.migrateTmpTable()
if err != nil {
return
}
}
case btcdb.InsertValidatedInput:
// generate tx indexes
txop := db.txop(txMigrateFinish)
_, err := txop.Exec()
if err != nil {
log.Warnf("Failed to create tx table index - %v", err)
}
}
case btcdb.InsertFast:
// Fast mode inserts tx into txtmp with validation,
// then dumps to tx then rebuilds indexes at thresholds
db.UseTempTX = true
if oldMode != btcdb.InsertNormal {
log.Warnf("switching between invalid DB modes")
break
}
db.dbInsertMode = newmode
case btcdb.InsertValidatedInput:
// ValidatedInput mode inserts into tx table with
// no duplicate checks, then builds index on exit from
// ValidatedInput mode
if oldMode != btcdb.InsertNormal {
log.Warnf("switching between invalid DB modes")
break
}
// remove tx table index
txop := db.txop(txMigratePrep)
_, err := txop.Exec()
if err != nil {
log.Warnf("Failed to clear tx table index - %v", err)
}
db.dbInsertMode = newmode
// XXX
db.UseTempTX = false
}
}
func (db *SqliteDb) doSpend(tx *btcwire.MsgTx) error {
for txinidx := range tx.TxIn {
txin := tx.TxIn[txinidx]
inTxSha := txin.PreviousOutpoint.Hash
inTxidx := txin.PreviousOutpoint.Index
if inTxidx == ^uint32(0) {
continue
}
//log.Infof("spending %v %v", &inTxSha, inTxidx)
err := db.setSpentData(&inTxSha, inTxidx)
if err != nil {
return err
}
}
return nil
}
func (db *SqliteDb) unSpend(tx *btcwire.MsgTx) error {
for txinidx := range tx.TxIn {
txin := tx.TxIn[txinidx]
inTxSha := txin.PreviousOutpoint.Hash
inTxidx := txin.PreviousOutpoint.Index
if inTxidx == ^uint32(0) {
continue
}
err := db.clearSpentData(&inTxSha, inTxidx)
if err != nil {
return err
}
}
return nil
}
func (db *SqliteDb) setSpentData(sha *btcwire.ShaHash, idx uint32) error {
return db.setclearSpentData(sha, idx, true)
}
func (db *SqliteDb) clearSpentData(sha *btcwire.ShaHash, idx uint32) error {
return db.setclearSpentData(sha, idx, false)
}
func (db *SqliteDb) setclearSpentData(txsha *btcwire.ShaHash, idx uint32, set bool) error {
var spentdata []byte
usingtmp := false
txop := db.txop(txFetchUsedByShaStmt)
row := txop.QueryRow(txsha.String())
err := row.Scan(&spentdata)
if err != nil {
// if the error is simply didn't fine continue otherwise
// retun failure
usingtmp = true
txop = db.txop(txtmpFetchUsedByShaStmt)
row := txop.QueryRow(txsha.String())
err := row.Scan(&spentdata)
if err != nil {
log.Warnf("Failed to locate spent data - %v %v", txsha, err)
return err
}
}
byteidx := idx / 8
byteoff := idx % 8
if set {
spentdata[byteidx] |= (byte(1) << byteoff)
} else {
spentdata[byteidx] &= ^(byte(1) << byteoff)
}
txc, cached := db.fetchTxCache(txsha)
if cached {
txc.spent = spentdata
}
if usingtmp {
txop = db.txop(txtmpUpdateUsedByShaStmt)
} else {
txop = db.txop(txUpdateUsedByShaStmt)
}
_, err = txop.Exec(spentdata, txsha.String())
return err
}

View file

@ -1,293 +0,0 @@
// Copyright (c) 2013-2014 Conformal Systems LLC.
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
package sqlite3
import (
"database/sql"
"github.com/conformal/btcdb"
"github.com/conformal/btcwire"
_ "github.com/mattn/go-sqlite3"
)
// insertSha stores a block hash and its associated data block with a
// previous sha of `prevSha' and a version of `pver'.
// insertSha shall be called with db lock held
func (db *SqliteDb) insertBlockData(sha *btcwire.ShaHash, prevSha *btcwire.ShaHash, pver uint32, buf []byte) (int64, error) {
tx := &db.txState
if tx.tx == nil {
err := db.startTx()
if err != nil {
return 0, err
}
}
// It is an error if the previous block does not already exist in the
// database, unless there are no blocks at all.
if prevOk := db.blkExistsSha(prevSha); !prevOk {
var numBlocks uint64
querystr := "SELECT COUNT(blockid) FROM block;"
err := tx.tx.QueryRow(querystr).Scan(&numBlocks)
if err != nil {
return 0, err
}
if numBlocks != 0 {
return 0, btcdb.PrevShaMissing
}
}
result, err := db.blkStmts[blkInsertSha].Exec(sha.Bytes(), pver, buf)
if err != nil {
return 0, err
}
blkid, err := result.LastInsertId()
if err != nil {
return 0, err
}
blkid -= 1 // skew between btc blockid and sql
// Because we don't know know what the last idx is, we don't
// cache unless already cached
if db.lastBlkShaCached == true {
db.lastBlkSha = *sha
db.lastBlkIdx++
}
bid := tBlockInsertData{*sha, pver, buf}
tx.txInsertList = append(tx.txInsertList, bid)
tx.txDataSz += len(buf)
return blkid, nil
}
// fetchSha returns the datablock and pver for the given ShaHash.
func (db *SqliteDb) fetchSha(sha btcwire.ShaHash) (buf []byte, pver uint32,
blkid int64, err error) {
row := db.blkStmts[blkFetchSha].QueryRow(sha.Bytes())
var blockidx int64
var databytes []byte
err = row.Scan(&pver, &databytes, &blockidx)
if err == sql.ErrNoRows {
return // no warning
}
if err != nil {
log.Warnf("fail 2 %v", err)
return
}
buf = databytes
blkid = blockidx - 1 // skew between btc blockid and sql
return
}
// ExistsSha looks up the given block hash
// returns true if it is present in the database.
func (db *SqliteDb) ExistsSha(sha *btcwire.ShaHash) (exists bool) {
db.dbLock.Lock()
defer db.dbLock.Unlock()
_, exists = db.fetchBlockCache(sha)
if exists {
return
}
// not in cache, try database
exists = db.blkExistsSha(sha)
return
}
// blkExistsSha looks up the given block hash
// returns true if it is present in the database.
// CALLED WITH LOCK HELD
func (db *SqliteDb) blkExistsSha(sha *btcwire.ShaHash) bool {
var pver uint32
row := db.blkStmts[blkExistsSha].QueryRow(sha.Bytes())
err := row.Scan(&pver)
if err == sql.ErrNoRows {
return false
}
if err != nil {
// ignore real errors?
log.Warnf("blkExistsSha: fail %v", err)
return false
}
return true
}
// FetchBlockShaByHeight returns a block hash based on its height in the
// block chain.
func (db *SqliteDb) FetchBlockShaByHeight(height int64) (sha *btcwire.ShaHash, err error) {
db.dbLock.Lock()
defer db.dbLock.Unlock()
return db.fetchBlockShaByHeight(height)
}
// fetchBlockShaByHeight returns a block hash based on its height in the
// block chain.
func (db *SqliteDb) fetchBlockShaByHeight(height int64) (sha *btcwire.ShaHash, err error) {
var row *sql.Row
blockidx := height + 1 // skew between btc blockid and sql
row = db.blkStmts[blkFetchIdx].QueryRow(blockidx)
var shabytes []byte
err = row.Scan(&shabytes)
if err != nil {
return
}
var shaval btcwire.ShaHash
shaval.SetBytes(shabytes)
return &shaval, nil
}
// FetchHeightRange looks up a range of blocks by the start and ending
// heights. Fetch is inclusive of the start height and exclusive of the
// ending height. To fetch all hashes from the start height until no
// more are present, use the special id `AllShas'.
func (db *SqliteDb) FetchHeightRange(startHeight, endHeight int64) (rshalist []btcwire.ShaHash, err error) {
db.dbLock.Lock()
defer db.dbLock.Unlock()
startidx := startHeight + 1 // skew between btc block height and sql
var endidx int64
if endHeight == btcdb.AllShas {
endidx = btcdb.AllShas // no skew if asking for all
} else {
endidx = endHeight + 1 // skew between btc block height and sql
}
rows, err := db.blkStmts[blkFetchIdxList].Query(startidx, endidx)
if err != nil {
log.Warnf("query failed %v", err)
return
}
var shalist []btcwire.ShaHash
for rows.Next() {
var sha btcwire.ShaHash
var shabytes []byte
err = rows.Scan(&shabytes)
if err != nil {
log.Warnf("wtf? %v", err)
break
}
sha.SetBytes(shabytes)
shalist = append(shalist, sha)
}
rows.Close()
if err == nil {
rshalist = shalist
}
log.Tracef("FetchIdxRange idx %v %v returned %v shas err %v", startHeight, endHeight, len(shalist), err)
return
}
// NewestSha returns the hash and block height of the most recent (end) block of
// the block chain. It will return the zero hash, -1 for the block height, and
// no error (nil) if there are not any blocks in the database yet.
func (db *SqliteDb) NewestSha() (sha *btcwire.ShaHash, blkid int64, err error) {
var row *sql.Row
var blockidx int64
db.dbLock.Lock()
defer db.dbLock.Unlock()
// answer may be cached
if db.lastBlkShaCached == true {
shacopy := db.lastBlkSha
sha = &shacopy
blkid = db.lastBlkIdx - 1 // skew between btc blockid and sql
return
}
querystr := "SELECT key, blockid FROM block ORDER BY blockid DESC;"
tx := &db.txState
if tx.tx != nil {
row = tx.tx.QueryRow(querystr)
} else {
row = db.sqldb.QueryRow(querystr)
}
var shabytes []byte
err = row.Scan(&shabytes, &blockidx)
if err == sql.ErrNoRows {
return &btcwire.ShaHash{}, -1, nil
}
if err == nil {
var retsha btcwire.ShaHash
retsha.SetBytes(shabytes)
sha = &retsha
blkid = blockidx - 1 // skew between btc blockid and sql
db.lastBlkSha = retsha
db.lastBlkIdx = blockidx
db.lastBlkShaCached = true
}
return
}
type SqliteBlockIterator struct {
rows *sql.Rows
stmt *sql.Stmt
db *SqliteDb
}
// NextRow iterates thru all blocks in database.
func (bi *SqliteBlockIterator) NextRow() bool {
return bi.rows.Next()
}
// Row returns row data for block iterator.
func (bi *SqliteBlockIterator) Row() (key *btcwire.ShaHash, pver uint32,
buf []byte, err error) {
var keybytes []byte
err = bi.rows.Scan(&keybytes, &pver, &buf)
if err == nil {
var retkey btcwire.ShaHash
retkey.SetBytes(keybytes)
key = &retkey
}
return
}
// Close shuts down the iterator when done walking blocks in the database.
func (bi *SqliteBlockIterator) Close() {
bi.rows.Close()
bi.stmt.Close()
}
// NewIterateBlocks prepares iterator for all blocks in database.
func (db *SqliteDb) NewIterateBlocks() (btcdb.BlockIterator, error) {
var bi SqliteBlockIterator
db.dbLock.Lock()
defer db.dbLock.Unlock()
stmt, err := db.sqldb.Prepare("SELECT key, pver, data FROM block ORDER BY blockid;")
if err != nil {
return nil, err
}
tx := &db.txState
if tx.tx != nil {
txstmt := tx.tx.Stmt(stmt)
stmt.Close()
stmt = txstmt
}
bi.stmt = stmt
bi.rows, err = bi.stmt.Query()
if err != nil {
return nil, err
}
bi.db = db
return &bi, nil
}

View file

@ -1,394 +0,0 @@
// Copyright (c) 2013-2014 Conformal Systems LLC.
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
package sqlite3
import (
"bytes"
"container/list"
"github.com/conformal/btcdb"
"github.com/conformal/btcutil"
"github.com/conformal/btcwire"
"sync"
)
type txCache struct {
maxcount int
fifo list.List
// NOTE: the key is specifically ShaHash, not *ShaHash
txMap map[btcwire.ShaHash]*txCacheObj
cacheLock sync.RWMutex
}
type txCacheObj struct {
next *txCacheObj
sha btcwire.ShaHash
blksha btcwire.ShaHash
pver uint32
tx *btcwire.MsgTx
height int64
spent []byte
txbuf []byte
}
type blockCache struct {
maxcount int
fifo list.List
blockMap map[btcwire.ShaHash]*blockCacheObj
blockHeightMap map[int64]*blockCacheObj
cacheLock sync.RWMutex
}
type blockCacheObj struct {
next *blockCacheObj
sha btcwire.ShaHash
blk *btcutil.Block
}
// FetchBlockBySha - return a btcutil Block, object may be a cached.
func (db *SqliteDb) FetchBlockBySha(sha *btcwire.ShaHash) (blk *btcutil.Block, err error) {
db.dbLock.Lock()
defer db.dbLock.Unlock()
return db.fetchBlockBySha(sha)
}
// fetchBlockBySha - return a btcutil Block, object may be a cached.
// Must be called with db lock held.
func (db *SqliteDb) fetchBlockBySha(sha *btcwire.ShaHash) (blk *btcutil.Block, err error) {
blkcache, ok := db.fetchBlockCache(sha)
if ok {
return blkcache.blk, nil
}
buf, _, height, err := db.fetchSha(*sha)
if err != nil {
return nil, err
}
blk, err = btcutil.NewBlockFromBytes(buf)
if err != nil {
return
}
blk.SetHeight(height)
db.insertBlockCache(sha, blk)
return
}
// fetchBlockCache check if a block is in the block cache, if so return it.
func (db *SqliteDb) fetchBlockCache(sha *btcwire.ShaHash) (*blockCacheObj, bool) {
db.blockCache.cacheLock.RLock()
defer db.blockCache.cacheLock.RUnlock()
blkobj, ok := db.blockCache.blockMap[*sha]
if !ok { // could this just return the map deref?
return nil, false
}
return blkobj, true
}
// fetchBlockHeightCache check if a block is in the block cache, if so return it.
func (db *SqliteDb) fetchBlockHeightCache(height int64) (*blockCacheObj, bool) {
db.blockCache.cacheLock.RLock()
defer db.blockCache.cacheLock.RUnlock()
blkobj, ok := db.blockCache.blockHeightMap[height]
if !ok { // could this just return the map deref?
return nil, false
}
return blkobj, true
}
// insertBlockCache insert the given sha/block into the cache map.
// If the block cache is determined to be full, it will release
// an old entry in FIFO order.
func (db *SqliteDb) insertBlockCache(sha *btcwire.ShaHash, blk *btcutil.Block) {
bc := &db.blockCache
bc.cacheLock.Lock()
defer bc.cacheLock.Unlock()
blkObj := blockCacheObj{sha: *sha, blk: blk}
bc.fifo.PushBack(&blkObj)
if bc.fifo.Len() > bc.maxcount {
listobj := bc.fifo.Front()
bc.fifo.Remove(listobj)
tailObj, ok := listobj.Value.(*blockCacheObj)
if ok {
delete(bc.blockMap, tailObj.sha)
delete(bc.blockHeightMap, tailObj.blk.Height())
} else {
panic("invalid type pushed on blockCache list")
}
}
bc.blockHeightMap[blk.Height()] = &blkObj
bc.blockMap[blkObj.sha] = &blkObj
}
// FetchTxByShaList returns the most recent tx of the name fully spent or not
func (db *SqliteDb) FetchTxByShaList(txShaList []*btcwire.ShaHash) []*btcdb.TxListReply {
// until the fully spent separation of tx is complete this is identical
// to FetchUnSpentTxByShaList
return db.FetchUnSpentTxByShaList(txShaList)
}
// FetchUnSpentTxByShaList given a array of ShaHash, look up the transactions
// and return them in a TxListReply array.
func (db *SqliteDb) FetchUnSpentTxByShaList(txShaList []*btcwire.ShaHash) []*btcdb.TxListReply {
db.dbLock.Lock()
defer db.dbLock.Unlock()
var replies []*btcdb.TxListReply
for _, txsha := range txShaList {
tx, _, _, _, height, txspent, err := db.fetchTxDataBySha(txsha)
btxspent := []bool{}
if err == nil {
btxspent = make([]bool, len(tx.TxOut), len(tx.TxOut))
for idx := range tx.TxOut {
byteidx := idx / 8
byteoff := uint(idx % 8)
btxspent[idx] = (txspent[byteidx] & (byte(1) << byteoff)) != 0
}
}
txlre := btcdb.TxListReply{Sha: txsha, Tx: tx, Height: height, TxSpent: btxspent, Err: err}
replies = append(replies, &txlre)
}
return replies
}
// fetchTxDataBySha returns several pieces of data regarding the given sha.
func (db *SqliteDb) fetchTxDataBySha(txsha *btcwire.ShaHash) (rtx *btcwire.MsgTx, rtxbuf []byte, rpver uint32, rblksha *btcwire.ShaHash, rheight int64, rtxspent []byte, err error) {
var pver uint32
var blksha *btcwire.ShaHash
var height int64
var txspent []byte
var toff int
var tlen int
var blk *btcutil.Block
var blkbuf []byte
// Check Tx cache
if txc, ok := db.fetchTxCache(txsha); ok {
if txc.spent != nil {
return txc.tx, txc.txbuf, txc.pver, &txc.blksha, txc.height, txc.spent, nil
}
}
// If not cached load it
height, toff, tlen, txspent, err = db.fetchLocationUsedBySha(txsha)
if err != nil {
return
}
blksha, err = db.fetchBlockShaByHeight(height)
if err != nil {
log.Warnf("block idx lookup %v to %v", height, err)
return
}
log.Tracef("transaction %v is at block %v %v tx %v",
txsha, blksha, height, toff)
blk, err = db.fetchBlockBySha(blksha)
if err != nil {
log.Warnf("unable to fetch block %v %v ",
height, &blksha)
return
}
blkbuf, err = blk.Bytes()
if err != nil {
log.Warnf("unable to decode block %v %v", height, &blksha)
return
}
txbuf := make([]byte, tlen)
copy(txbuf[:], blkbuf[toff:toff+tlen])
rbuf := bytes.NewBuffer(txbuf)
var tx btcwire.MsgTx
err = tx.BtcDecode(rbuf, pver)
if err != nil {
log.Warnf("unable to decode tx block %v %v txoff %v txlen %v",
height, &blksha, toff, tlen)
return
}
// Shove data into TxCache
// XXX -
var txc txCacheObj
txc.sha = *txsha
txc.tx = &tx
txc.txbuf = txbuf
txc.pver = pver
txc.height = height
txc.spent = txspent
txc.blksha = *blksha
db.insertTxCache(&txc)
return &tx, txbuf, pver, blksha, height, txspent, nil
}
// FetchTxBySha returns several pieces of data regarding the given sha.
func (db *SqliteDb) FetchTxBySha(txsha *btcwire.ShaHash) ([]*btcdb.TxListReply, error) {
var pver uint32
var blksha *btcwire.ShaHash
var height int64
var toff int
var tlen int
var txspent []byte
var blk *btcutil.Block
var blkbuf []byte
var err error
// Check Tx cache
if txc, ok := db.fetchTxCache(txsha); ok {
replies := make([]*btcdb.TxListReply, 1)
tx := txc.tx
btxspent := make([]bool, len(tx.TxOut), len(tx.TxOut))
for idx := range tx.TxOut {
byteidx := idx / 8
byteoff := uint(idx % 8)
btxspent[idx] = (txc.spent[byteidx] & (byte(1) << byteoff)) != 0
}
txlre := btcdb.TxListReply{Sha: txsha, Tx: tx, BlkSha: &txc.blksha, Height: txc.height, TxSpent: btxspent, Err: nil}
replies[0] = &txlre
return replies, nil
}
// If not cached load it
height, toff, tlen, txspent, err = db.fetchLocationUsedBySha(txsha)
if err != nil {
return []*btcdb.TxListReply{}, err
}
blksha, err = db.FetchBlockShaByHeight(height)
if err != nil {
log.Warnf("block idx lookup %v to %v", height, err)
return []*btcdb.TxListReply{}, err
}
log.Tracef("transaction %v is at block %v %v tx %v",
txsha, blksha, height, toff)
blk, err = db.FetchBlockBySha(blksha)
if err != nil {
log.Warnf("unable to fetch block %v %v ",
height, &blksha)
return []*btcdb.TxListReply{}, err
}
blkbuf, err = blk.Bytes()
if err != nil {
log.Warnf("unable to decode block %v %v", height, &blksha)
return []*btcdb.TxListReply{}, err
}
txbuf := make([]byte, tlen)
copy(txbuf[:], blkbuf[toff:toff+tlen])
rbuf := bytes.NewBuffer(txbuf)
var tx btcwire.MsgTx
err = tx.BtcDecode(rbuf, pver)
if err != nil {
log.Warnf("unable to decode tx block %v %v txoff %v txlen %v",
height, &blksha, toff, tlen)
return []*btcdb.TxListReply{}, err
}
// Shove data into TxCache
// XXX -
var txc txCacheObj
txc.sha = *txsha
txc.tx = &tx
txc.txbuf = txbuf
txc.pver = pver
txc.height = height
txc.spent = txspent
txc.blksha = *blksha
db.insertTxCache(&txc)
btxspent := make([]bool, len(tx.TxOut), len(tx.TxOut))
for idx := range tx.TxOut {
byteidx := idx / 8
byteoff := uint(idx % 8)
btxspent[idx] = (txspent[byteidx] & (byte(1) << byteoff)) != 0
}
replies := make([]*btcdb.TxListReply, 1)
txlre := btcdb.TxListReply{Sha: txsha, Tx: &tx, BlkSha: blksha, Height: height, TxSpent: btxspent, Err: err}
replies[0] = &txlre
return replies, nil
}
// fetchTxCache look up the given transaction in the Tx cache.
func (db *SqliteDb) fetchTxCache(sha *btcwire.ShaHash) (*txCacheObj, bool) {
tc := &db.txCache
tc.cacheLock.RLock()
defer tc.cacheLock.RUnlock()
txObj, ok := tc.txMap[*sha]
if !ok { // could this just return the map deref?
return nil, false
}
return txObj, true
}
// insertTxCache, insert the given txobj into the cache.
// if the tx cache is determined to be full, it will release
// an old entry in FIFO order.
func (db *SqliteDb) insertTxCache(txObj *txCacheObj) {
tc := &db.txCache
tc.cacheLock.Lock()
defer tc.cacheLock.Unlock()
tc.fifo.PushBack(txObj)
if tc.fifo.Len() >= tc.maxcount {
listobj := tc.fifo.Front()
tc.fifo.Remove(listobj)
tailObj, ok := listobj.Value.(*txCacheObj)
if ok {
delete(tc.txMap, tailObj.sha)
} else {
panic("invalid type pushed on tx list")
}
}
tc.txMap[txObj.sha] = txObj
}
// InvalidateTxCache clear/release all cached transactions.
func (db *SqliteDb) InvalidateTxCache() {
tc := &db.txCache
tc.cacheLock.Lock()
defer tc.cacheLock.Unlock()
tc.txMap = map[btcwire.ShaHash]*txCacheObj{}
tc.fifo = list.List{}
}
// InvalidateTxCache clear/release all cached blocks.
func (db *SqliteDb) InvalidateBlockCache() {
bc := &db.blockCache
bc.cacheLock.Lock()
defer bc.cacheLock.Unlock()
bc.blockMap = map[btcwire.ShaHash]*blockCacheObj{}
bc.blockHeightMap = map[int64]*blockCacheObj{}
bc.fifo = list.List{}
}
// InvalidateCache clear/release all cached blocks and transactions.
func (db *SqliteDb) InvalidateCache() {
db.InvalidateTxCache()
db.InvalidateBlockCache()
}

View file

@ -1,327 +0,0 @@
// Copyright (c) 2013-2014 Conformal Systems LLC.
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
package sqlite3
import (
"database/sql"
"github.com/conformal/btcdb"
"github.com/conformal/btcwire"
_ "github.com/mattn/go-sqlite3"
)
// insertTx inserts a tx hash and its associated data into the database.
// Must be called with db lock held.
func (db *SqliteDb) insertTx(txsha *btcwire.ShaHash, height int64, txoff int, txlen int, usedbuf []byte) (err error) {
tx := &db.txState
if tx.tx == nil {
err = db.startTx()
if err != nil {
return
}
}
blockid := height + 1
txd := tTxInsertData{txsha: txsha, blockid: blockid, txoff: txoff, txlen: txlen, usedbuf: usedbuf}
log.Tracef("inserting tx %v for block %v off %v len %v",
txsha, blockid, txoff, txlen)
rowBytes := txsha.String()
var op int // which table to insert data into.
if db.UseTempTX {
var tblockid int64
var ttxoff int
var ttxlen int
txop := db.txop(txFetchLocationByShaStmt)
row := txop.QueryRow(rowBytes)
err = row.Scan(&tblockid, &ttxoff, &ttxlen)
if err != sql.ErrNoRows {
// sha already present
err = btcdb.DuplicateSha
return
}
op = txtmpInsertStmt
} else {
op = txInsertStmt
}
txop := db.txop(op)
_, err = txop.Exec(rowBytes, blockid, txoff, txlen, usedbuf)
if err != nil {
log.Warnf("failed to insert %v %v %v", txsha, blockid, err)
return
}
if db.UseTempTX {
db.TempTblSz++
}
// put in insert list for replay
tx.txInsertList = append(tx.txInsertList, txd)
return
}
// ExistsTxSha returns if the given tx sha exists in the database
func (db *SqliteDb) ExistsTxSha(txsha *btcwire.ShaHash) (exists bool) {
db.dbLock.Lock()
defer db.dbLock.Unlock()
if _, ok := db.fetchTxCache(txsha); ok {
return true
}
return db.existsTxSha(txsha)
}
// existsTxSha returns if the given tx sha exists in the database.o
// Must be called with the db lock held.
func (db *SqliteDb) existsTxSha(txsha *btcwire.ShaHash) (exists bool) {
var blockid uint32
txop := db.txop(txExistsShaStmt)
row := txop.QueryRow(txsha.String())
err := row.Scan(&blockid)
if err == sql.ErrNoRows {
txop = db.txop(txtmpExistsShaStmt)
row = txop.QueryRow(txsha.String())
err := row.Scan(&blockid)
if err == sql.ErrNoRows {
return false
}
if err != nil {
log.Warnf("txTmpExistsTxSha: fail %v", err)
return false
}
log.Warnf("txtmpExistsTxSha: success")
return true
}
if err != nil {
// ignore real errors?
log.Warnf("existsTxSha: fail %v", err)
return false
}
return true
}
// FetchLocationBySha looks up the Tx sha information by name.
func (db *SqliteDb) FetchLocationBySha(txsha *btcwire.ShaHash) (blockidx int64, txoff int, txlen int, err error) {
db.dbLock.Lock()
defer db.dbLock.Unlock()
return db.fetchLocationBySha(txsha)
}
// fetchLocationBySha look up the Tx sha information by name.
// Must be called with db lock held.
func (db *SqliteDb) fetchLocationBySha(txsha *btcwire.ShaHash) (height int64, txoff int, txlen int, err error) {
var row *sql.Row
var blockid int64
var ttxoff int
var ttxlen int
rowBytes := txsha.String()
txop := db.txop(txFetchLocationByShaStmt)
row = txop.QueryRow(rowBytes)
err = row.Scan(&blockid, &ttxoff, &ttxlen)
if err == sql.ErrNoRows {
txop = db.txop(txtmpFetchLocationByShaStmt)
row = txop.QueryRow(rowBytes)
err = row.Scan(&blockid, &ttxoff, &ttxlen)
if err == sql.ErrNoRows {
err = btcdb.TxShaMissing
return
}
if err != nil {
log.Warnf("txtmp FetchLocationBySha: fail %v",
err)
return
}
}
if err != nil {
log.Warnf("FetchLocationBySha: fail %v", err)
return
}
height = blockid - 1
txoff = ttxoff
txlen = ttxlen
return
}
// fetchLocationUsedBySha look up the Tx sha information by name.
// Must be called with db lock held.
func (db *SqliteDb) fetchLocationUsedBySha(txsha *btcwire.ShaHash) (rheight int64, rtxoff int, rtxlen int, rspentbuf []byte, err error) {
var row *sql.Row
var blockid int64
var txoff int
var txlen int
var txspent []byte
rowBytes := txsha.String()
txop := db.txop(txFetchLocUsedByShaStmt)
row = txop.QueryRow(rowBytes)
err = row.Scan(&blockid, &txoff, &txlen, &txspent)
if err == sql.ErrNoRows {
txop = db.txop(txtmpFetchLocUsedByShaStmt)
row = txop.QueryRow(rowBytes)
err = row.Scan(&blockid, &txoff, &txlen, &txspent)
if err == sql.ErrNoRows {
err = btcdb.TxShaMissing
return
}
if err != nil {
log.Warnf("txtmp FetchLocationBySha: fail %v",
err)
return
}
}
if err != nil {
log.Warnf("FetchLocationBySha: fail %v", err)
return
}
height := blockid - 1
return height, txoff, txlen, txspent, nil
}
// FetchTxUsedBySha returns the used/spent buffer for a given transaction.
func (db *SqliteDb) FetchTxUsedBySha(txsha *btcwire.ShaHash) (spentbuf []byte, err error) {
var row *sql.Row
db.dbLock.Lock()
defer db.dbLock.Unlock()
rowBytes := txsha.String()
txop := db.txop(txFetchUsedByShaStmt)
row = txop.QueryRow(rowBytes)
var databytes []byte
err = row.Scan(&databytes)
if err == sql.ErrNoRows {
txop := db.txop(txtmpFetchUsedByShaStmt)
row = txop.QueryRow(rowBytes)
err = row.Scan(&databytes)
if err == sql.ErrNoRows {
err = btcdb.TxShaMissing
return
}
if err != nil {
log.Warnf("txtmp FetchLocationBySha: fail %v",
err)
return
}
}
if err != nil {
log.Warnf("FetchUsedBySha: fail %v", err)
return
}
spentbuf = databytes
return
}
var vaccumDbNextMigrate bool
// migrateTmpTable functions to perform internal db optimization when
// performing large numbers of database inserts. When in Fast operation
// mode, it inserts into txtmp, then when that table reaches a certain
// size limit it moves all tx in the txtmp table into the primary tx
// table and recomputes the index on the primary tx table.
func (db *SqliteDb) migrateTmpTable() error {
db.endTx(true)
db.startTx() // ???
db.UseTempTX = false
db.TempTblSz = 0
var doVacuum bool
var nsteps int
if vaccumDbNextMigrate {
nsteps = 6
vaccumDbNextMigrate = false
doVacuum = true
} else {
nsteps = 5
vaccumDbNextMigrate = true
}
log.Infof("db compaction Stage 1/%v: Preparing", nsteps)
txop := db.txop(txMigratePrep)
_, err := txop.Exec()
if err != nil {
log.Warnf("Failed to prepare migrate - %v", err)
return err
}
log.Infof("db compaction Stage 2/%v: Copying", nsteps)
txop = db.txop(txMigrateCopy)
_, err = txop.Exec()
if err != nil {
log.Warnf("Migrate read failed - %v", err)
return err
}
log.Tracef("db compaction Stage 2a/%v: Enable db vacuum", nsteps)
txop = db.txop(txPragmaVacuumOn)
_, err = txop.Exec()
if err != nil {
log.Warnf("Migrate error trying to enable vacuum on "+
"temporary transaction table - %v", err)
return err
}
log.Infof("db compaction Stage 3/%v: Clearing old data", nsteps)
txop = db.txop(txMigrateClear)
_, err = txop.Exec()
if err != nil {
log.Warnf("Migrate error trying to clear temporary "+
"transaction table - %v", err)
return err
}
log.Tracef("db compaction Stage 3a/%v: Disable db vacuum", nsteps)
txop = db.txop(txPragmaVacuumOff)
_, err = txop.Exec()
if err != nil {
log.Warnf("Migrate error trying to disable vacuum on "+
"temporary transaction table - %v", err)
return err
}
log.Infof("db compaction Stage 4/%v: Rebuilding index", nsteps)
txop = db.txop(txMigrateFinish)
_, err = txop.Exec()
if err != nil {
log.Warnf("Migrate error trying to clear temporary "+
"transaction table - %v", err)
return err
}
log.Infof("db compaction Stage 5/%v: Finalizing transaction", nsteps)
db.endTx(true) // ???
if doVacuum {
log.Infof("db compaction Stage 6/%v: Optimizing database", nsteps)
txop = db.txop(txVacuum)
_, err = txop.Exec()
if err != nil {
log.Warnf("migrate error trying to clear txtmp tbl %v", err)
return err
}
}
log.Infof("db compaction: Complete")
// TODO(drahn) - determine if this should be turned back on or not
db.UseTempTX = true
return nil
}