Initial implementation.

This commit is contained in:
Dave Collins 2013-05-28 19:07:21 -05:00
parent a02af039b5
commit 752ca5dfbb
14 changed files with 2561 additions and 1 deletions

13
LICENSE Normal file
View file

@ -0,0 +1,13 @@
Copyright (c) 2013 Conformal Systems LLC.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.

View file

@ -1,4 +1,60 @@
btcdb
=====
Package btcdb provides a database interface for the bitcoin blockchain.
Package btcdb provides a database interface for the bitcoin block chain and
transactions. There is a test suite which is aiming to reach 100% code coverage
coverage. See `test_coverage.txt` for the current coverage (using gocov). On a
UNIX-like OS, the script `cov_report.sh` can be used to generate the report.
Package btcjson is licensed under the liberal ISC license.
## Sample Use
```Go
db, err := btcdb.CreateDB("sqlite", "dbexample")
newHeight, err := db.InsertBlock(block)
db.Sync()
```
## Documentation
Full `go doc` style documentation for the project can be viewed online without
installing this package by using the GoDoc site
[here](http://godoc.org/github.com/conformal/btcdb).
You can also view the documentation locally once the package is installed with
the `godoc` tool by running `godoc -http=":6060"` and pointing your browser to
http://localhost:6060/pkg/github.com/conformal/btcdb
## Installation
```bash
$ go get github.com/conformal/btcdb
```
## TODO
- Increase test coverage to 100%
- Allow other database backends
## GPG Verification Key
All official release tags are signed by Conformal so users can ensure the code
has not been tampered with and is coming from Conformal. To verify the
signature perform the following:
- Download the public key from the Conformal website at
https://opensource.conformal.com/GIT-GPG-KEY-conformal.txt
- Import the public key into your GPG keyring:
```bash
gpg --import GIT-GPG-KEY-conformal.txt
```
- Verify the release tag with the following command where `TAG_NAME` is a
placeholder for the specific tag:
```bash
git tag -v TAG_NAME
```
## License
Package btcdb is licensed under the liberal ISC License.

176
db.go Normal file
View file

@ -0,0 +1,176 @@
// Copyright (c) 2013 Conformal Systems LLC.
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
package btcdb
import (
"errors"
"github.com/conformal/btcutil"
"github.com/conformal/btcwire"
)
var (
PrevShaMissing = errors.New("Previous sha missing from database")
TxShaMissing = errors.New("Requested Tx does not exist")
DuplicateSha = errors.New("Duplicate insert attempted")
DbDoesNotExist = errors.New("Non-existant database")
DbUnknownType = errors.New("Non-existant database type")
)
// AllShas is a special value that can be used as the final sha when requesting
// a range of shas by height to request them all.
const AllShas = int64(^uint64(0) >> 1)
// InsertMode represents a hint to the database about how much data the
// application is expecting to send to the database in a short period of time.
// This in turn provides the database with the opportunity to work in optimized
// modes when it will be very busy such as during the initial block chain
// download.
type InsertMode int
// Constants used to indicate the database insert mode hint. See InsertMode.
const (
InsertNormal InsertMode = iota
InsertFast
InsertValidatedInput
)
type Db interface {
// Close cleanly shuts down the database and syncs all data.
Close()
// DropAfterBlockBySha will remove any blocks from the database after
// the given block. It terminates any existing transaction and performs
// its operations in an atomic transaction which is commited before
// the function returns.
DropAfterBlockBySha(btcwire.ShaHash) (err error)
// ExistsSha returns whether or not the given block hash is present in
// the database.
ExistsSha(sha *btcwire.ShaHash) (exists bool)
// FetchBlockBySha returns a btcutil Block. The implementation may
// cache the underlying object if desired.
FetchBlockBySha(sha *btcwire.ShaHash) (blk *btcutil.Block, err error)
// FetchBlockShaByIdx returns a block sha based on its height in the
// blockchain.
FetchBlockShaByIdx(blkid int64) (sha *btcwire.ShaHash, err error)
// FetchIdxRange looks up a range of block by the start and ending ids.
// Fetch is inclusive of the start id and exclusive of the ending id. If
// the special id `AllShas' is provided as endid then FetchIdxRange will
// fetch all shas from startid until no more shas are present.
FetchIdxRange(startid, endid int64) (rshalist []btcwire.ShaHash, err error)
// FetchTxAllBySha returns several pieces of data regarding the given sha.
FetchTxAllBySha(txsha *btcwire.ShaHash) (rtx *btcwire.MsgTx, rtxbuf []byte, rpver uint32, rblksha *btcwire.ShaHash, err error)
// FetchTxBufBySha returns the raw bytes and associated protocol version
// for the transaction with the requested sha.
FetchTxBufBySha(txsha *btcwire.ShaHash) (txbuf []byte, rpver uint32, err error)
// FetchTxBySha returns some data for the given Tx Sha.
FetchTxBySha(txsha *btcwire.ShaHash) (rtx *btcwire.MsgTx, rpver uint32, blksha *btcwire.ShaHash, err error)
// FetchTxByShaList returns a TxListReply given an array of ShaHash, look up the transactions
// and return them in a TxListReply array.
FetchTxByShaList(txShaList []*btcwire.ShaHash) []*TxListReply
// FetchTxUsedBySha returns the used/spent buffer for a given transaction.
FetchTxUsedBySha(txsha *btcwire.ShaHash) (spentbuf []byte, err error)
// InsertBlock inserts the block data and transaction data from a block
// into the database.
InsertBlock(block *btcutil.Block) (blockid int64, err error)
// InsertTx inserts a tx hash and its associated data into the database
InsertTx(txsha *btcwire.ShaHash, blockidx int64, txoff int, txlen int, usedbuf []byte) (err error)
// InvalidateBlockCache releases all cached blocks.
InvalidateBlockCache()
// InvalidateCache releases all cached blocks and transactions.
InvalidateCache()
// InvalidateTxCache releases all cached transactions.
InvalidateTxCache()
// NewIterateBlocks returns an iterator for all blocks in database.
NewIterateBlocks() (pbi BlockIterator, err error)
// NewestSha provides an interface to quickly look up the sha of
// the most recent (end) of the block chain.
NewestSha() (sha *btcwire.ShaHash, blkid int64, err error)
// RollbackClose discards the recent database changes to the previously
// saved data at last Sync and closes the database.
RollbackClose()
// SetDBInsertMode provides hints to the database to how the application
// is running. This allows the database to work in optimized modes when
// the database may be very busy.
SetDBInsertMode(InsertMode)
// Sync verifies that the database is coherent on disk and no
// outstanding transactions are in flight.
Sync()
}
type BlockIterator interface {
// Close shuts down the iterator when done walking blocks in the database.
Close()
// NextRow iterates thru all blocks in database.
NextRow() bool
// Row returns row data for block iterator.
Row() (key *btcwire.ShaHash, pver uint32, buf []byte, err error)
}
type DriverDB struct {
DbType string
Create func(argstr string) (pbdb Db, err error)
Open func(filepath string) (pbdb Db, err error)
}
type TxListReply struct {
Sha *btcwire.ShaHash
Tx *btcwire.MsgTx
Err error
}
// driverList holds all of the registered database backends.
var driverList []DriverDB
// AddDBDriver adds a back end database driver to available interfaces.
func AddDBDriver(instance DriverDB) {
// TODO(drahn) Does this really need to check for duplicate names ?
for _, drv := range driverList {
// TODO(drahn) should duplicates be an error?
if drv.DbType == instance.DbType {
return
}
}
driverList = append(driverList, instance)
}
// CreateDB intializes and opens a database.
func CreateDB(dbtype string, argstr string) (pbdb Db, err error) {
for _, drv := range driverList {
if drv.DbType == dbtype {
return drv.Create(argstr)
}
}
return nil, DbUnknownType
}
// OpenDB opens an existing database.
func OpenDB(dbtype string, argstr string) (pbdb Db, err error) {
for _, drv := range driverList {
if drv.DbType == dbtype {
return drv.Open(argstr)
}
}
return nil, DbUnknownType
}

56
doc.go Normal file
View file

@ -0,0 +1,56 @@
// Copyright (c) 2013 Conformal Systems LLC.
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
/*
Package btcdb provides a database interface for the bitcoin block chain.
As of May 2013, there are over 235,000 blocks in the bitcoin block chain and
and over 17 million transactions (which turns out to be over 11Gb of data).
btcdb provides a database layer to store and retrieve this data in a fairly
simple and efficient manner. The use of this should not require specific
knowledge of the database backend used although currently only db_sqlite is
provided.
Basic Design
The basic design of btcdb is to provide two classes of items in a
database; blocks and transactions (tx) where the block number
increases monotonically. Each transaction belongs to a single block
although a block can have a variable number of transactions. Along
with these two items, several convenience functions for dealing with
the database are provided as well as functions to query specific items
that may be present in a block or tx (although many of these are in
the db_sqlite subpackage).
Usage
At the highest level, the use of this packages just requires that you
import it, setup a database, insert some data into it, and optionally,
query the data back. In a more concrete example:
// Import packages
import (
"github.com/conformal/btcdb"
_ "github.com/conformal/btcdb/db_sqlite"
)
// Create a database
dbname := "dbexample"
db, err := btcdb.CreateDB("sqlite", dbname)
if err != nil {
fmt.Printf("Failed to open database %v", err)
return
}
// Insert a block
newheight, err := db.InsertBlock(block)
if err != nil {
fmt.Printf("failed to insert block %v err %v", height, err)
}
// Sync the database
db.Sync()
*/
package btcdb

56
log.go Normal file
View file

@ -0,0 +1,56 @@
// Copyright (c) 2013 Conformal Systems LLC.
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
package btcdb
import (
"errors"
"github.com/conformal/seelog"
"io"
)
// log is a logger that is initialized with no output filters. This
// means the package will not perform any logging by default until the caller
// requests it.
var log seelog.LoggerInterface
// The default amount of logging is none.
func init() {
DisableLog()
}
// DisableLog disables all library log output. Logging output is disabled
// by default until either UserLogger or SetLogWriter are called.
func DisableLog() {
log = seelog.Disabled
}
// UseLogger uses a specified Logger to output package logging info.
// This should be used in preference to SetLogWriter if the caller is also
// using seelog.
func UseLogger(logger seelog.LoggerInterface) {
log = logger
}
// SetLogWriter uses a specified io.Writer to output package logging info.
// This allows a caller to direct package logging output without needing a
// dependency on seelog. If the caller is also using seelog, UseLogger should
// be used instead.
func SetLogWriter(w io.Writer) error {
if w == nil {
return errors.New("nil writer")
}
l, err := seelog.LoggerFromWriterWithMinLevel(w, seelog.TraceLvl)
if err != nil {
return err
}
UseLogger(l)
return nil
}
func GetLog() seelog.LoggerInterface {
return log
}

15
sqlite3/doc.go Normal file
View file

@ -0,0 +1,15 @@
// Copyright (c) 2013 Conformal Systems LLC.
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
/*
Package sqlite3 implements a sqlite3 instance of btcdb.
sqlite provides a zero setup, single file database. It requires cgo
and the presence of the sqlite library and headers, but nothing else.
The performance is generally high although it goes down with database
size.
Many of the block or tx specific functions for btcdb are in this subpackage.
*/
package sqlite3

48
sqlite3/internal_test.go Normal file
View file

@ -0,0 +1,48 @@
// Copyright (c) 2013 Conformal Systems LLC.
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
package sqlite3
import (
"fmt"
"github.com/conformal/btcdb"
"github.com/conformal/btcwire"
)
// FetchSha returns the datablock and pver for the given ShaHash.
// This is a testing only interface.
func FetchSha(db btcdb.Db, sha *btcwire.ShaHash) (buf []byte, pver uint32,
blkid int64, err error) {
sqldb, ok := db.(*SqliteDb)
if !ok {
err = fmt.Errorf("Invalid data type")
return
}
buf, pver, blkid, err = sqldb.fetchSha(*sha)
return
}
// SetBlockCacheSize configures the maximum number of blocks in the cache to
// be the given size should be made before any fetching.
// This is a testing only interface.
func SetBlockCacheSize(db btcdb.Db, newsize int) {
sqldb, ok := db.(*SqliteDb)
if !ok {
return
}
bc := &sqldb.blockCache
bc.maxcount = newsize
}
// SetTxCacheSize configures the maximum number of tx in the cache to
// be the given size should be made before any fetching.
// This is a testing only interface.
func SetTxCacheSize(db btcdb.Db, newsize int) {
sqldb, ok := db.(*SqliteDb)
if !ok {
return
}
tc := &sqldb.txCache
tc.maxcount = newsize
}

339
sqlite3/operational_test.go Normal file
View file

@ -0,0 +1,339 @@
// Copyright (c) 2013 Conformal Systems LLC.
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
package sqlite3_test
import (
"compress/bzip2"
"encoding/binary"
"github.com/confomral/btcdb"
"github.com/confomral/btcdb/db_sqlite"
"github.com/conformal/btcutil"
"github.com/conformal/btcwire"
"io"
"os"
"path/filepath"
"strings"
"testing"
)
var network = btcwire.MainNet
const (
dbTmDefault = iota
dbTmNormal
dbTmFast
dbTmNoVerify
)
func TestOperational(t *testing.T) {
testOperationalMode(t, dbTmDefault)
testOperationalMode(t, dbTmNormal)
testOperationalMode(t, dbTmFast)
testOperationalMode(t, dbTmNoVerify)
}
func testOperationalMode(t *testing.T, mode int) {
// simplified basic operation is:
// 1) fetch block from remote server
// 2) look up all txin (except coinbase in db)
// 3) insert block
// Ignore db remove errors since it means we didn't have an old one.
dbname := "tstdbop1"
_ = os.Remove(dbname)
db, err := btcdb.CreateDB("sqlite", dbname)
if err != nil {
t.Errorf("Failed to open test database %v", err)
return
}
defer os.Remove(dbname)
switch mode {
case dbTmDefault: // default
// no setup
case dbTmNormal: // explicit normal
db.SetDBInsertMode(btcdb.InsertNormal)
case dbTmFast: // fast mode
db.SetDBInsertMode(btcdb.InsertFast)
if sqldb, ok := db.(*sqlite3.SqliteDb); ok {
sqldb.TempTblMax = 100
} else {
t.Errorf("not right type")
}
case dbTmNoVerify: // validated block
db.SetDBInsertMode(btcdb.InsertValidatedInput)
}
// Since we are dealing with small dataset, reduce cache size
sqlite3.SetBlockCacheSize(db, 2)
sqlite3.SetTxCacheSize(db, 3)
testdatafile := filepath.Join("testdata", "blocks1-256.bz2")
blocks, err := loadBlocks(t, testdatafile)
var height = int64(1)
err = nil
for ; height < int64(len(blocks)); height++ {
block := blocks[height]
if mode != dbTmNoVerify {
// except for NoVerify which does not allow lookups check inputs
mblock := block.MsgBlock()
var txneededList []*btcwire.ShaHash
for _, tx := range mblock.Transactions {
for _, txin := range tx.TxIn {
if txin.PreviousOutpoint.Index == uint32(4294967295) {
continue
}
origintxsha := &txin.PreviousOutpoint.Hash
txneededList = append(txneededList, origintxsha)
_, _, _, _, err := db.FetchTxAllBySha(origintxsha)
if err != nil {
t.Errorf("referenced tx not found %v err %v ", origintxsha, err)
}
_, _, _, _, err = db.FetchTxAllBySha(origintxsha)
if err != nil {
t.Errorf("referenced tx not found %v err %v ", origintxsha, err)
}
_, _, _, err = db.FetchTxBySha(origintxsha)
if err != nil {
t.Errorf("referenced tx not found %v err %v ", origintxsha, err)
}
_, _, err = db.FetchTxBufBySha(origintxsha)
if err != nil {
t.Errorf("referenced tx not found %v err %v ", origintxsha, err)
}
_, err = db.FetchTxUsedBySha(origintxsha)
if err != nil {
t.Errorf("tx used fetch fail %v err %v ", origintxsha, err)
}
}
}
txlist := db.FetchTxByShaList(txneededList)
for _, txe := range txlist {
if txe.Err != nil {
t.Errorf("tx list fetch failed %v err %v ", txe.Sha, txe.Err)
}
}
}
t.Logf("Inserting Block %v", height)
newheight, err := db.InsertBlock(block)
if err != nil {
t.Errorf("failed to insert block %v err %v", height, err)
}
if newheight != height {
t.Errorf("height mismatch expect %v returned %v", height, newheight)
}
}
switch mode {
case dbTmDefault: // default
// no cleanup
case dbTmNormal: // explicit normal
// no cleanup
case dbTmFast: // fast mode
db.SetDBInsertMode(btcdb.InsertNormal)
case dbTmNoVerify: // validated block
db.SetDBInsertMode(btcdb.InsertNormal)
}
}
func TestBackout(t *testing.T) {
testBackout(t, dbTmDefault)
testBackout(t, dbTmNormal)
testBackout(t, dbTmFast)
}
func testBackout(t *testing.T, mode int) {
// simplified basic operation is:
// 1) fetch block from remote server
// 2) look up all txin (except coinbase in db)
// 3) insert block
// Ignore db remove errors since it means we didn't have an old one.
dbname := "tstdbop2"
_ = os.Remove(dbname)
db, err := btcdb.CreateDB("sqlite", dbname)
if err != nil {
t.Errorf("Failed to open test database %v", err)
return
}
defer os.Remove(dbname)
switch mode {
case dbTmDefault: // default
// no setup
case dbTmNormal: // explicit normal
db.SetDBInsertMode(btcdb.InsertNormal)
case dbTmFast: // fast mode
db.SetDBInsertMode(btcdb.InsertFast)
if sqldb, ok := db.(*sqlite3.SqliteDb); ok {
sqldb.TempTblMax = 100
} else {
t.Errorf("not right type")
}
}
// Since we are dealing with small dataset, reduce cache size
sqlite3.SetBlockCacheSize(db, 2)
sqlite3.SetTxCacheSize(db, 3)
testdatafile := filepath.Join("testdata", "blocks1-256.bz2")
blocks, err := loadBlocks(t, testdatafile)
if len(blocks) < 120 {
t.Errorf("test data too small")
return
}
var height = int64(1)
err = nil
for ; height < int64(len(blocks)); height++ {
if height == 100 {
t.Logf("sync")
db.Sync()
}
if height == 120 {
t.Logf("wha?")
// Simulate unexpected application quit
db.RollbackClose()
break
}
block := blocks[height]
t.Logf("Inserting Block %v", height)
newheight, err := db.InsertBlock(block)
if err != nil {
t.Errorf("failed to insert block %v err %v", height, err)
}
if newheight != height {
t.Errorf("height mismatch expect %v returned %v", height, newheight)
}
}
// db was closed at height 120, so no cleanup is possible.
// reopen db
db, err = btcdb.NewDB("sqlite", dbname)
if err != nil {
t.Errorf("Failed to open test database %v", err)
return
}
sha, err := blocks[99].Sha()
if err != nil {
t.Errorf("failed to get block 99 sha err %v", err)
return
}
_ = db.ExistsSha(sha)
_, err = db.FetchBlockBySha(sha)
if err != nil {
t.Errorf("failed to load block 99 from db", err)
}
sha, err = blocks[110].Sha()
if err != nil {
t.Errorf("failed to get block 110 sha err %v", err)
return
}
_ = db.ExistsSha(sha)
_, err = db.FetchBlockBySha(sha)
if err == nil {
t.Errorf("loaded block 110 from db, failure expected")
}
block := blocks[110]
mblock := block.MsgBlock()
txsha, err := mblock.Transactions[0].TxSha(block.ProtocolVersion())
t.Logf("txsha %v", txsha)
_, _, _, err = db.FetchTxBySha(&txsha)
_, err = db.FetchTxUsedBySha(&txsha)
block = blocks[99]
mblock = block.MsgBlock()
txsha, err = mblock.Transactions[0].TxSha(block.ProtocolVersion())
oldused, err := db.FetchTxUsedBySha(&txsha)
err = db.InsertTx(&txsha, 99, 1024, 1048, oldused)
if err == nil {
t.Errorf("dup insert of tx succeeded")
}
}
func loadBlocks(t *testing.T, file string) (blocks []*btcutil.Block, err error) {
testdatafile := filepath.Join("testdata", "blocks1-256.bz2")
var dr io.Reader
var fi io.ReadCloser
fi, err = os.Open(testdatafile)
if err != nil {
t.Errorf("failed to open file %v, err %v", testdatafile, err)
return
}
if strings.HasSuffix(testdatafile, ".bz2") {
z := bzip2.NewReader(fi)
dr = z
} else {
dr = fi
}
defer func() {
if err := fi.Close(); err != nil {
t.Errorf("failed to close file %v %v", testdatafile, err)
}
}()
var block *btcutil.Block
// block 0 isn't really there, put in nil
blocks = append(blocks, block)
var height = int64(1)
err = nil
for ; err == nil; height++ {
var rintbuf uint32
err = binary.Read(dr, binary.LittleEndian, &rintbuf)
if err == io.EOF {
// hit end of file at expected offset: no warning
height--
break
}
if err != nil {
t.Errorf("failed to load network type, err %v", err)
break
}
if rintbuf != uint32(network) {
t.Errorf("Block doesn't match network: %v expects %v",
rintbuf, network)
break
}
err = binary.Read(dr, binary.LittleEndian, &rintbuf)
blocklen := rintbuf
rbytes := make([]byte, blocklen)
// read block
dr.Read(rbytes)
var pver uint32
switch {
case height < 200000:
pver = 1
case height >= 200000:
pver = 2
}
block, err = btcutil.NewBlockFromBytes(rbytes, pver)
if err != nil {
t.Errorf("failed to parse block %v", height)
return
}
blocks = append(blocks, block)
}
return
}

673
sqlite3/sqlite.go Normal file
View file

@ -0,0 +1,673 @@
// Copyright (c) 2013 Conformal Systems LLC.
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
package sqlite3
import (
"database/sql"
"fmt"
"github.com/conformal/btcdb"
"github.com/conformal/btcutil"
"github.com/conformal/btcwire"
"github.com/conformal/seelog"
_ "github.com/mattn/go-sqlite3"
"os"
"sync"
)
const (
dbVersion int = 2
dbMaxTransCnt = 20000
dbMaxTransMem = 64 * 1024 * 1024 // 64 MB
)
const (
blkInsertSha = iota
blkFetchSha
blkExistsSha
blkFetchIdx
blkFetchIdxList
)
const (
txInsertStmt = iota
txFetchUsedByShaStmt
txFetchLocationByShaStmt
txtmpInsertStmt
txtmpFetchUsedByShaStmt
txtmpFetchLocationByShaStmt
txMigrateCopy
txMigrateClear
txMigratePrep
txMigrateFinish
txMigrateCount
txPragmaVacuumOn
txPragmaVacuumOff
txVacuum
)
var blkqueries []string = []string{
blkInsertSha: "INSERT INTO block (key, pver, data) VALUES(?, ?, ?);",
blkFetchSha: "SELECT pver, data, blockid FROM block WHERE key = ?;",
blkExistsSha: "SELECT pver FROM block WHERE key = ?;",
blkFetchIdx: "SELECT key FROM block WHERE blockid = ?;",
blkFetchIdxList: "SELECT key FROM block WHERE blockid >= ? AND blockid < ? ORDER BY blockid ASC LIMIT 500;",
}
var txqueries []string = []string{
txInsertStmt: "INSERT INTO tx (key, blockid, txoff, txlen, data) VALUES(?, ?, ?, ?, ?);",
txFetchUsedByShaStmt: "SELECT data FROM tx WHERE key = ?;",
txFetchLocationByShaStmt: "SELECT blockid, txoff, txlen FROM tx WHERE key = ?;",
txtmpInsertStmt: "INSERT INTO txtmp (key, blockid, txoff, txlen, data) VALUES(?, ?, ?, ?, ?);",
txtmpFetchUsedByShaStmt: "SELECT data FROM txtmp WHERE key = ?;",
txtmpFetchLocationByShaStmt: "SELECT blockid, txoff, txlen FROM txtmp WHERE key = ?;",
txMigrateCopy: "INSERT INTO tx (key, blockid, txoff, txlen, data) SELECT key, blockid, txoff, txlen, data FROM txtmp;",
txMigrateClear: "DELETE from txtmp;",
txMigratePrep: "DROP index uniquetx;",
txMigrateFinish: "CREATE UNIQUE INDEX IF NOT EXISTS uniquetx ON tx (key);",
txMigrateCount: "SELECT COUNT(*) FROM txtmp;",
txPragmaVacuumOn: "PRAGMA auto_vacuum = FULL;",
txPragmaVacuumOff: "PRAGMA auto_vacuum = NONE;",
txVacuum: "VACUUM;",
}
var log seelog.LoggerInterface = seelog.Disabled
type tBlockInsertData struct {
sha btcwire.ShaHash
pver uint32
buf []byte
}
type tTxInsertData struct {
txsha *btcwire.ShaHash
blockid int64
txoff int
txlen int
usedbuf []byte
}
type txState struct {
tx *sql.Tx
writeCount int
txDataSz int
txInsertList []interface{}
}
type SqliteDb struct {
sqldb *sql.DB
blkStmts []*sql.Stmt
blkBaseStmts []*sql.Stmt
txStmts []*sql.Stmt
txBaseStmts []*sql.Stmt
txState txState
dbLock sync.Mutex
lastBlkShaCached bool
lastBlkSha btcwire.ShaHash
lastBlkIdx int64
txCache txCache
blockCache blockCache
UseTempTX bool
TempTblSz int
TempTblMax int
dbInsertMode btcdb.InsertMode
}
var self = btcdb.DriverDB{DbType: "sqlite", Create: CreateSqliteDB, Open: OpenSqliteDB}
func init() {
btcdb.AddDBDriver(self)
}
// createDB configure the database, setting up all tables to initial state.
func createDB(db *sql.DB) error {
log.Infof("Initializing new block database")
// XXX check for old tables
buildTables := []string{
"CREATE TABLE dbversion (version integer);",
"CREATE TABLE block ( blockid INTEGER PRIMARY KEY, key BLOB UNIQUE, " +
"pver INTEGER NOT NULL, data BLOB NOT NULL);",
"INSERT INTO dbversion (version) VALUES (" + fmt.Sprintf("%d", dbVersion) +
");",
}
buildtxTables := []string{
"CREATE TABLE tx (txidx INTEGER PRIMARY KEY, " +
"key TEXT, " +
"blockid INTEGER NOT NULL, " +
"txoff INTEGER NOT NULL, txlen INTEGER NOT NULL, " +
"data BLOB NOT NULL, " +
"FOREIGN KEY(blockid) REFERENCES block(blockid));",
"CREATE TABLE txtmp (key TEXT PRIMARY KEY, " +
"blockid INTEGER NOT NULL, " +
"txoff INTEGER NOT NULL, txlen INTEGER NOT NULL, " +
"data BLOB NOT NULL, " +
"FOREIGN KEY(blockid) REFERENCES block(blockid));",
"CREATE UNIQUE INDEX uniquetx ON tx (key);",
}
for _, sql := range buildTables {
_, err := db.Exec(sql)
if err != nil {
log.Warnf("sql table op failed %v [%v]", err, sql)
return err
}
}
for _, sql := range buildtxTables {
_, err := db.Exec(sql)
if err != nil {
log.Warnf("sql table op failed %v [%v]", err, sql)
return err
}
}
// Insert the genesis block.
err := insertGenesisBlock(db)
if err != nil {
return err
}
return nil
}
// OpenSqliteDB opens an existing database for use.
func OpenSqliteDB(filepath string) (pbdb btcdb.Db, err error) {
log = btcdb.GetLog()
return newOrCreateSqliteDB(filepath, false)
}
// CreateSqliteDB creates, initializes and opens a database for use.
func CreateSqliteDB(filepath string) (pbdb btcdb.Db, err error) {
log = btcdb.GetLog()
return newOrCreateSqliteDB(filepath, true)
}
// newOrCreateSqliteDB opens a database, either creating it or opens
// existing database based on flag.
func newOrCreateSqliteDB(filepath string, create bool) (pbdb btcdb.Db, err error) {
var bdb SqliteDb
if create == false {
_, err = os.Stat(filepath)
if err != nil {
return nil, btcdb.DbDoesNotExist
}
}
db, err := sql.Open("sqlite3", filepath)
if err != nil {
log.Warnf("db open failed %v\n", err)
return nil, err
}
dbverstmt, err := db.Prepare("SELECT version FROM dbversion;")
if err != nil {
// about the only reason this would fail is that the database
// is not initialized
if create == false {
return nil, btcdb.DbDoesNotExist
}
err = createDB(db)
if err != nil {
// already warned in the called function
return nil, err
}
dbverstmt, err = db.Prepare("SELECT version FROM dbversion;")
if err != nil {
// if it failed this a second time, fail.
return nil, err
}
}
row := dbverstmt.QueryRow()
var version int
err = row.Scan(&version)
if err != nil {
log.Warnf("unable to find db version: no row\n", err)
}
switch version {
case dbVersion:
// all good
default:
log.Warnf("mismatch db version: %v expected %v\n", version, dbVersion)
return nil, fmt.Errorf("Invalid version in database")
}
db.Exec("PRAGMA foreign_keys = ON;")
db.Exec("PRAGMA journal_mode=WAL;")
bdb.sqldb = db
bdb.blkStmts = make([]*sql.Stmt, len(blkqueries))
bdb.blkBaseStmts = make([]*sql.Stmt, len(blkqueries))
for i := range blkqueries {
stmt, err := db.Prepare(blkqueries[i])
if err != nil {
// XXX log/
return nil, err
}
bdb.blkBaseStmts[i] = stmt
}
for i := range bdb.blkBaseStmts {
bdb.blkStmts[i] = bdb.blkBaseStmts[i]
}
bdb.txBaseStmts = make([]*sql.Stmt, len(txqueries))
for i := range txqueries {
stmt, err := db.Prepare(txqueries[i])
if err != nil {
// XXX log/
return nil, err
}
bdb.txBaseStmts[i] = stmt
}
// NOTE: all array entries in txStmts remain nil'ed
// tx statements are lazy bound
bdb.txStmts = make([]*sql.Stmt, len(txqueries))
bdb.blockCache.maxcount = 150
bdb.blockCache.blockMap = map[btcwire.ShaHash]*blockCacheObj{}
bdb.txCache.maxcount = 2000
bdb.txCache.txMap = map[btcwire.ShaHash]*txCacheObj{}
bdb.UseTempTX = true
bdb.TempTblMax = 1000000
return &bdb, nil
}
// Sync verifies that the database is coherent on disk,
// and no outstanding transactions are in flight.
func (db *SqliteDb) Sync() {
db.dbLock.Lock()
defer db.dbLock.Unlock()
db.endTx(true)
}
// syncPoint notifies the db that this is a safe time to sync the database,
// if there are many outstanding transactions.
// Must be called with db lock held.
func (db *SqliteDb) syncPoint() {
tx := &db.txState
if db.TempTblSz > db.TempTblMax {
err := db.migrateTmpTable()
if err != nil {
return
}
} else {
if len(tx.txInsertList) > dbMaxTransCnt || tx.txDataSz > dbMaxTransMem {
db.endTx(true)
}
}
}
// Close cleanly shuts down database, syncing all data.
func (db *SqliteDb) Close() {
db.dbLock.Lock()
defer db.dbLock.Unlock()
db.close()
}
// RollbackClose discards the recent database changes to the previously
// saved data at last Sync.
func (db *SqliteDb) RollbackClose() {
db.dbLock.Lock()
defer db.dbLock.Unlock()
tx := &db.txState
if tx.tx != nil {
err := tx.tx.Rollback()
if err != nil {
log.Debugf("Rollback failed: %v", err)
}
}
db.close()
}
// close performs the internal shutdown/close operation.
func (db *SqliteDb) close() {
db.endTx(true)
db.InvalidateCache()
for i := range db.blkBaseStmts {
db.blkBaseStmts[i].Close()
}
for i := range db.txBaseStmts {
if db.txBaseStmts[i] != nil {
db.txBaseStmts[i].Close()
db.txBaseStmts[i] = nil
}
}
db.sqldb.Close()
}
// txop returns the appropriately prepared statement, based on
// transaction state of the database.
func (db *SqliteDb) txop(op int) *sql.Stmt {
if db.txStmts[op] != nil {
return db.txStmts[op]
}
if db.txState.tx == nil {
// we are not in a transaction, return the base statement
return db.txBaseStmts[op]
}
if db.txStmts[op] == nil {
db.txStmts[op] = db.txState.tx.Stmt(db.txBaseStmts[op])
}
return db.txStmts[op]
}
// startTx starts a transaction, preparing or scrubbing statements
// for proper operation inside a transaction.
func (db *SqliteDb) startTx() (err error) {
tx := &db.txState
if tx.tx != nil {
// this shouldn't happen...
log.Warnf("Db startTx called while in a transaction")
return
}
tx.tx, err = db.sqldb.Begin()
if err != nil {
log.Warnf("Db startTx: begin failed %v", err)
tx.tx = nil
return
}
for i := range db.blkBaseStmts {
db.blkStmts[i] = tx.tx.Stmt(db.blkBaseStmts[i])
}
for i := range db.txBaseStmts {
db.txStmts[i] = nil // these are lazily prepared
}
return
}
// endTx commits the current active transaction, it zaps all of the prepared
// statements associated with the transaction.
func (db *SqliteDb) endTx(recover bool) (err error) {
tx := &db.txState
if tx.tx == nil {
return
}
err = tx.tx.Commit()
if err != nil && recover {
// XXX - double check that the tx is dead after
// commit failure (rollback?)
log.Warnf("Db endTx: commit failed %v", err)
err = db.rePlayTransaction()
if err != nil {
// We tried, return failure (after zeroing state)
// so the upper level can notice and restart
}
}
for i := range db.blkBaseStmts {
db.blkStmts[i].Close()
db.blkStmts[i] = db.blkBaseStmts[i]
}
for i := range db.txStmts {
if db.txStmts[i] != nil {
db.txStmts[i].Close()
db.txStmts[i] = nil
}
}
tx.tx = nil
var emptyTxList []interface{}
tx.txInsertList = emptyTxList
tx.txDataSz = 0
return
}
// rePlayTransaction will attempt to re-execute inserts performed
// sync the beginning of a transaction. This is to be used after
// a sql Commit operation fails to keep the database from losing data.
func (db *SqliteDb) rePlayTransaction() (err error) {
err = db.startTx()
if err != nil {
return
}
tx := &db.txState
for _, ins := range tx.txInsertList {
switch v := ins.(type) {
case tBlockInsertData:
block := v
_, err = db.blkStmts[blkInsertSha].Exec(block.sha.Bytes(),
block.pver, block.buf)
if err != nil {
break
}
case tTxInsertData:
txd := v
txnamebytes := txd.txsha.Bytes()
txop := db.txop(txInsertStmt)
_, err = txop.Exec(txd.blockid, txnamebytes, txd.txoff,
txd.txlen, txd.usedbuf)
if err != nil {
break
}
}
}
// This function is called even if we have failed.
// We need to clean up so the database can be used again.
// However we want the original error not any new error,
// unless there was no original error but the commit fails.
err2 := db.endTx(false)
if err == nil && err2 != nil {
err = err2
}
return
}
// DropAfterBlockBySha will remove any blocks from the database after the given block.
// It terminates any existing transaction and performs its operations in an
// atomic transaction, it is terminated (committed) before exit.
func (db *SqliteDb) DropAfterBlockBySha(sha btcwire.ShaHash) (err error) {
var row *sql.Row
db.dbLock.Lock()
defer db.dbLock.Unlock()
db.InvalidateCache()
// This is a destructive operation and involves multiple requests
// so requires a transaction, terminate any transaction to date
// and start a new transaction
err = db.endTx(true)
if err != nil {
return err
}
err = db.startTx()
if err != nil {
return err
}
// also drop any cached sha data
db.lastBlkShaCached = false
querystr := "SELECT blockid FROM block WHERE key = ?;"
tx := &db.txState
row = tx.tx.QueryRow(querystr, sha.Bytes())
var blockidx uint64
err = row.Scan(&blockidx)
if err != nil {
// XXX
db.endTx(false)
return err
}
_, err = tx.tx.Exec("DELETE FROM txtmp WHERE blockid > ?", blockidx)
if err != nil {
// XXX
db.endTx(false)
return err
}
_, err = tx.tx.Exec("DELETE FROM tx WHERE blockid > ?", blockidx)
if err != nil {
// XXX
db.endTx(false)
return err
}
// delete from block last in case of foreign keys
_, err = tx.tx.Exec("DELETE FROM block WHERE blockid > ?", blockidx)
if err != nil {
// XXX
db.endTx(false)
return err
}
err = db.endTx(true)
if err != nil {
return err
}
return
}
// InsertBlock inserts the block data and transaction data from a block
// into the database.
func (db *SqliteDb) InsertBlock(block *btcutil.Block) (height int64, err error) {
db.dbLock.Lock()
defer db.dbLock.Unlock()
blocksha, err := block.Sha()
if err != nil {
log.Warnf("Failed to compute block sha %v", blocksha)
return
}
mblock := block.MsgBlock()
rawMsg, pver, err := block.Bytes()
if err != nil {
log.Warnf("Failed to obtain raw block sha %v", blocksha)
return
}
txloc, err := block.TxLoc()
if err != nil {
log.Warnf("Failed to obtain raw block sha %v", blocksha)
return
}
// Insert block into database
newheight, err := db.insertBlockData(blocksha, &mblock.Header.PrevBlock,
pver, rawMsg)
if err != nil {
log.Warnf("Failed to insert block %v %v %v", blocksha,
&mblock.Header.PrevBlock, err)
return
}
// At least two blocks in the long past were generated by faulty
// miners, the sha of the transaction exists in a previous block,
// detect this condition and 'accept' the block.
for txidx, tx := range mblock.Transactions {
var txsha btcwire.ShaHash
txsha, err = tx.TxSha(pver)
if err != nil {
log.Warnf("failed to compute tx name block %v idx %v err %v", blocksha, txidx, err)
return
}
// Some old blocks contain duplicate transactions
// Attempt to cleanly bypass this problem
// http://blockexplorer.com/b/91842
// http://blockexplorer.com/b/91880
if newheight == 91842 {
dupsha, err := btcwire.NewShaHashFromStr("d5d27987d2a3dfc724e359870c6644b40e497bdc0589a033220fe15429d88599")
if err != nil {
panic("invalid sha string in source")
}
if txsha == *dupsha {
log.Tracef("skipping sha %v %v", dupsha, newheight)
continue
}
}
if newheight == 91880 {
dupsha, err := btcwire.NewShaHashFromStr("e3bf3d07d4b0375638d5f1db5255fe07ba2c4cb067cd81b84ee974b6585fb468")
if err != nil {
panic("invalid sha string in source")
}
if txsha == *dupsha {
log.Tracef("skipping sha %v %v", dupsha, newheight)
continue
}
}
spentbuflen := (len(tx.TxOut) + 7) / 8
spentbuf := make([]byte, spentbuflen, spentbuflen)
err = db.insertTx(&txsha, newheight, txloc[txidx].TxStart, txloc[txidx].TxLen, spentbuf)
if err != nil {
log.Warnf("block %v idx %v failed to insert tx %v %v err %v", &blocksha, newheight, &txsha, txidx, err)
var oBlkIdx int64
oBlkIdx, _, _, err = db.fetchLocationBySha(&txsha)
log.Warnf("oblkidx %v err %v", oBlkIdx, err)
return
}
}
db.syncPoint()
return newheight, nil
}
// SetDBInsertMode provides hints to the database to how the application
// is running this allows the database to work in optimized modes when the
// database may be very busy.
func (db *SqliteDb) SetDBInsertMode(newmode btcdb.InsertMode) {
oldMode := db.dbInsertMode
switch newmode {
case btcdb.InsertNormal:
// Normal mode inserts tx directly into the tx table
db.UseTempTX = false
db.dbInsertMode = newmode
switch oldMode {
case btcdb.InsertFast:
if db.TempTblSz != 0 {
err := db.migrateTmpTable()
if err != nil {
return
}
}
case btcdb.InsertValidatedInput:
// generate tx indexes
txop := db.txop(txMigrateFinish)
_, err := txop.Exec()
if err != nil {
log.Warnf("Failed to create tx table index - %v", err)
}
}
case btcdb.InsertFast:
// Fast mode inserts tx into txtmp with validation,
// then dumps to tx then rebuilds indexes at thresholds
db.UseTempTX = true
if oldMode != btcdb.InsertNormal {
log.Warnf("switching between invalid DB modes")
break
}
db.dbInsertMode = newmode
case btcdb.InsertValidatedInput:
// ValidatedInput mode inserts into tx table with
// no duplicate checks, then builds index on exit from
// ValidatedInput mode
if oldMode != btcdb.InsertNormal {
log.Warnf("switching between invalid DB modes")
break
}
// remove tx table index
txop := db.txop(txMigratePrep)
_, err := txop.Exec()
if err != nil {
log.Warnf("Failed to clear tx table index - %v", err)
}
db.dbInsertMode = newmode
// XXX
db.UseTempTX = false
}
}

312
sqlite3/sqliteblock.go Normal file
View file

@ -0,0 +1,312 @@
// Copyright (c) 2013 Conformal Systems LLC.
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
package sqlite3
import (
"bytes"
"database/sql"
"github.com/conformal/btcdb"
"github.com/conformal/btcwire"
_ "github.com/mattn/go-sqlite3"
)
// insertGenesisBlock inserts the genesis block of the block chain into the
// database.
func insertGenesisBlock(db *sql.DB) error {
// Encode the genesis block to raw bytes.
pver := uint32(btcwire.ProtocolVersion)
var buf bytes.Buffer
err := btcwire.GenesisBlock.BtcEncode(&buf, pver)
if err != nil {
return err
}
// Insert the genesis block along with its hash and protocol encoding
// version.
sql := blkqueries[blkInsertSha]
sha := btcwire.GenesisHash
_, err = db.Exec(sql, sha.Bytes(), pver, buf.Bytes())
if err != nil {
return err
}
return nil
}
// InsertBlockData stores a block hash and its associated data block with a
// previous sha of `prevSha' and a version of `pver'.
func (db *SqliteDb) InsertBlockData(sha *btcwire.ShaHash, prevSha *btcwire.ShaHash, pver uint32, buf []byte) (blockid int64, err error) {
db.dbLock.Lock()
defer db.dbLock.Unlock()
return db.insertBlockData(sha, prevSha, pver, buf)
}
// insertSha stores a block hash and its associated data block with a
// previous sha of `prevSha' and a version of `pver'.
// insertSha shall be called with db lock held
func (db *SqliteDb) insertBlockData(sha *btcwire.ShaHash, prevSha *btcwire.ShaHash, pver uint32, buf []byte) (blockid int64, err error) {
tx := &db.txState
if tx.tx == nil {
err = db.startTx()
if err != nil {
return
}
}
var prevOk bool
var blkid int64
prevOk = db.blkExistsSha(prevSha) // exists -> ok
if !prevOk {
return 0, btcdb.PrevShaMissing
}
result, err := db.blkStmts[blkInsertSha].Exec(sha.Bytes(), pver, buf)
if err != nil {
return
}
blkid, err = result.LastInsertId()
if err != nil {
return 0, err
}
blkid -= 1 // skew between btc blockid and sql
// Because we don't know know what the last idx is, we don't
// cache unless already cached
if db.lastBlkShaCached == true {
db.lastBlkSha = *sha
db.lastBlkIdx++
}
bid := tBlockInsertData{*sha, pver, buf}
tx.txInsertList = append(tx.txInsertList, bid)
tx.txDataSz += len(buf)
blockid = blkid
return
}
// fetchSha returns the datablock and pver for the given ShaHash.
func (db *SqliteDb) fetchSha(sha btcwire.ShaHash) (buf []byte, pver uint32,
blkid int64, err error) {
db.dbLock.Lock()
defer db.dbLock.Unlock()
row := db.blkStmts[blkFetchSha].QueryRow(sha.Bytes())
var blockidx int64
var databytes []byte
err = row.Scan(&pver, &databytes, &blockidx)
if err == sql.ErrNoRows {
return // no warning
}
if err != nil {
log.Warnf("fail 2 %v", err)
return
}
buf = databytes
blkid = blockidx - 1 // skew between btc blockid and sql
return
}
// ExistsSha looks up the given block hash
// returns true if it is present in the database.
func (db *SqliteDb) ExistsSha(sha *btcwire.ShaHash) (exists bool) {
db.dbLock.Lock()
defer db.dbLock.Unlock()
_, exists = db.fetchBlockCache(sha)
if exists {
return
}
// not in cache, try database
exists = db.blkExistsSha(sha)
return
}
// blkExistsSha looks up the given block hash
// returns true if it is present in the database.
// CALLED WITH LOCK HELD
func (db *SqliteDb) blkExistsSha(sha *btcwire.ShaHash) bool {
var pver uint32
row := db.blkStmts[blkExistsSha].QueryRow(sha.Bytes())
err := row.Scan(&pver)
if err == sql.ErrNoRows {
return false
}
if err != nil {
// ignore real errors?
log.Warnf("blkExistsSha: fail %v", err)
return false
}
return true
}
// FetchBlockShaByIdx returns a block sha based on its height in the blockchain.
func (db *SqliteDb) FetchBlockShaByIdx(blkid int64) (sha *btcwire.ShaHash, err error) {
var row *sql.Row
db.dbLock.Lock()
defer db.dbLock.Unlock()
blockidx := blkid + 1 // skew between btc blockid and sql
row = db.blkStmts[blkFetchIdx].QueryRow(blockidx)
var shabytes []byte
err = row.Scan(&shabytes)
if err != nil {
return
}
var shaval btcwire.ShaHash
shaval.SetBytes(shabytes)
return &shaval, nil
}
// FetchIdxRange looks up a range of block by the start and ending ids.
// Fetch is inclusive of the start id and exclusive of the ending id. If the
// special id `AllShas' is provided as endid then FetchIdxRange will fetch all
// shas from startid until no more shas are present.
func (db *SqliteDb) FetchIdxRange(startid, endid int64) (rshalist []btcwire.ShaHash, err error) {
db.dbLock.Lock()
defer db.dbLock.Unlock()
startidx := startid + 1 // skew between btc blockid and sql
var endidx int64
if endid == btcdb.AllShas {
endidx = btcdb.AllShas // no skew if asking for all
} else {
endidx = endid + 1 // skew between btc blockid and sql
}
rows, err := db.blkStmts[blkFetchIdxList].Query(startidx, endidx)
if err != nil {
log.Warnf("query failed %v", err)
return
}
var shalist []btcwire.ShaHash
for rows.Next() {
var sha btcwire.ShaHash
var shabytes []byte
err = rows.Scan(&shabytes)
if err != nil {
log.Warnf("wtf? %v", err)
break
}
sha.SetBytes(shabytes)
shalist = append(shalist, sha)
}
rows.Close()
if err == nil {
rshalist = shalist
}
log.Tracef("FetchIdxRange idx %v %v returned %v shas err %v", startid, endid, len(shalist), err)
return
}
// NewestSha provides an interface to quickly look up the sha of
// the most recent (end) of the block chain.
func (db *SqliteDb) NewestSha() (sha *btcwire.ShaHash, blkid int64, err error) {
var row *sql.Row
var blockidx int64
db.dbLock.Lock()
defer db.dbLock.Unlock()
// answer may be cached
if db.lastBlkShaCached == true {
shacopy := db.lastBlkSha
sha = &shacopy
blkid = db.lastBlkIdx - 1 // skew between btc blockid and sql
return
}
querystr := "SELECT key, blockid FROM block ORDER BY blockid DESC;"
tx := &db.txState
if tx.tx != nil {
row = tx.tx.QueryRow(querystr)
} else {
row = db.sqldb.QueryRow(querystr)
}
var shabytes []byte
err = row.Scan(&shabytes, &blockidx)
if err == nil {
var retsha btcwire.ShaHash
retsha.SetBytes(shabytes)
sha = &retsha
blkid = blockidx - 1 // skew between btc blockid and sql
db.lastBlkSha = retsha
db.lastBlkIdx = blockidx
db.lastBlkShaCached = true
}
return
}
type SqliteBlockIterator struct {
rows *sql.Rows
stmt *sql.Stmt
db *SqliteDb
}
// NextRow iterates thru all blocks in database.
func (bi *SqliteBlockIterator) NextRow() bool {
return bi.rows.Next()
}
// Row returns row data for block iterator.
func (bi *SqliteBlockIterator) Row() (key *btcwire.ShaHash, pver uint32,
buf []byte, err error) {
var keybytes []byte
err = bi.rows.Scan(&keybytes, &pver, &buf)
if err == nil {
var retkey btcwire.ShaHash
retkey.SetBytes(keybytes)
key = &retkey
}
return
}
// Close shuts down the iterator when done walking blocks in the database.
func (bi *SqliteBlockIterator) Close() {
bi.rows.Close()
bi.stmt.Close()
}
// NewIterateBlocks prepares iterator for all blocks in database.
func (db *SqliteDb) NewIterateBlocks() (btcdb.BlockIterator, error) {
var bi SqliteBlockIterator
db.dbLock.Lock()
defer db.dbLock.Unlock()
stmt, err := db.sqldb.Prepare("SELECT key, pver, data FROM block ORDER BY blockid;")
if err != nil {
return nil, err
}
tx := &db.txState
if tx.tx != nil {
txstmt := tx.tx.Stmt(stmt)
stmt.Close()
stmt = txstmt
}
bi.stmt = stmt
bi.rows, err = bi.stmt.Query()
if err != nil {
return nil, err
}
bi.db = db
return &bi, nil
}

302
sqlite3/sqliteblock_test.go Normal file
View file

@ -0,0 +1,302 @@
// Copyright (c) 2013 Conformal Systems LLC.
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
package sqlite3_test
import (
"bytes"
"fmt"
"github.com/conformal/btcdb"
"github.com/conformal/btcdb/db_sqlite"
"github.com/conformal/btcwire"
"github.com/conformal/seelog"
"os"
"testing"
)
// array of shas
var testShas []btcwire.ShaHash = []btcwire.ShaHash{
{
0x11, 0x11, 0x11, 0x11, 0x11, 0x11, 0x11, 0x11,
0x11, 0x11, 0x11, 0x11, 0x11, 0x11, 0x11, 0x11,
0x11, 0x11, 0x11, 0x11, 0x11, 0x11, 0x11, 0x11,
0x11, 0x11, 0x11, 0x11, 0x11, 0x11, 0x11, 0x11,
},
{
0x22, 0x22, 0x22, 0x22, 0x22, 0x22, 0x22, 0x22,
0x22, 0x22, 0x22, 0x22, 0x22, 0x22, 0x22, 0x22,
0x22, 0x22, 0x22, 0x22, 0x22, 0x22, 0x22, 0x22,
0x22, 0x22, 0x22, 0x22, 0x22, 0x22, 0x22, 0x22,
},
{
0x33, 0x33, 0x33, 0x33, 0x33, 0x33, 0x33, 0x33,
0x33, 0x33, 0x33, 0x33, 0x33, 0x33, 0x33, 0x33,
0x33, 0x33, 0x33, 0x33, 0x33, 0x33, 0x33, 0x33,
0x33, 0x33, 0x33, 0x33, 0x33, 0x33, 0x33, 0x33,
},
{
0x44, 0x44, 0x44, 0x44, 0x44, 0x44, 0x44, 0x44,
0x44, 0x44, 0x44, 0x44, 0x44, 0x44, 0x44, 0x44,
0x44, 0x44, 0x44, 0x44, 0x44, 0x44, 0x44, 0x44,
0x44, 0x44, 0x44, 0x44, 0x44, 0x44, 0x44, 0x44,
},
{
0x55, 0x55, 0x55, 0x55, 0x55, 0x55, 0x55, 0x55,
0x55, 0x55, 0x55, 0x55, 0x55, 0x55, 0x55, 0x55,
0x55, 0x55, 0x55, 0x55, 0x55, 0x55, 0x55, 0x55,
0x55, 0x55, 0x55, 0x55, 0x55, 0x55, 0x55, 0x55,
},
}
// Work around stupid go vet bug where any non array should have named
// initializers. Since ShaHash is a glorified array it shouldn't matter.
var badShaArray = [32]byte{
0x99, 0x99, 0x99, 0x99, 0x99, 0x99, 0x99, 0x99,
0x99, 0x99, 0x99, 0x99, 0x99, 0x99, 0x99, 0x99,
0x99, 0x99, 0x99, 0x99, 0x99, 0x99, 0x99, 0x99,
0x99, 0x99, 0x99, 0x99, 0x99, 0x99, 0x99, 0x99,
}
var badSha btcwire.ShaHash = btcwire.ShaHash(badShaArray)
var zeroSha = btcwire.ShaHash{}
var zeroBlock []byte = make([]byte, 32)
func compareArray(t *testing.T, one, two []btcwire.ShaHash, test string,
sync string) {
if len(one) != len(two) {
t.Errorf("%s: lengths don't match for arrays (%s)", test, sync)
return
}
for i := range one {
if !one[i].IsEqual(&two[i]) {
t.Errorf("%s: %dth sha doesn't match (%s)", test, i,
sync)
}
}
}
func testNewestSha(t *testing.T, db btcdb.Db, expSha btcwire.ShaHash,
expBlk int64, situation string) {
newestsha, blkid, err := db.NewestSha()
if err != nil {
t.Errorf("NewestSha failed %v (%s)", err, situation)
return
}
if blkid != expBlk {
t.Errorf("NewestSha blkid is %d not %d (%s)", blkid, expBlk,
situation)
}
if !newestsha.IsEqual(&expSha) {
t.Errorf("Newestsha isn't the last sha we inserted %v %v (%s)",
newestsha, &expSha, situation)
}
}
type fetchIdxTest struct {
start int64
end int64
exp []btcwire.ShaHash
test string
}
func testFetch(t *testing.T, db btcdb.Db, shas []btcwire.ShaHash,
sync string) {
// Test the newest sha is what we expect and call it twice to ensure
// caching is working working properly.
numShas := int64(len(shas))
newestSha := shas[numShas-1]
newestBlockID := int64(numShas)
testNewestSha(t, db, newestSha, newestBlockID, sync)
testNewestSha(t, db, newestSha, newestBlockID, sync+" cached")
for i, sha := range shas {
// Add one for genesis block skew.
i = i + 1
// Ensure the sha exists in the db as expected.
if !db.ExistsSha(&sha) {
t.Errorf("testSha %d doesn't exists (%s)", i, sync)
break
}
// Fetch the sha from the db and ensure all fields are expected
// values.
buf, pver, idx, err := sqlite3.FetchSha(db, &sha)
if err != nil {
t.Errorf("Failed to fetch testSha %d (%s)", i, sync)
}
if !bytes.Equal(zeroBlock, buf) {
t.Errorf("testSha %d incorrect block return (%s)", i,
sync)
}
if pver != 1 {
t.Errorf("pver is %d and not 1 for testSha %d (%s)",
pver, i, sync)
}
if idx != int64(i) {
t.Errorf("index isn't as expected %d vs %d (%s)",
idx, i, sync)
}
// Fetch the sha by index and ensure it matches.
tsha, err := db.FetchBlockShaByIdx(int64(i))
if err != nil {
t.Errorf("can't fetch sha at index %d: %v", i, err)
continue
}
if !tsha.IsEqual(&sha) {
t.Errorf("sha for index %d isn't shas[%d]", i, i)
}
}
endBlockID := numShas + 1
midBlockID := endBlockID / 2
fetchIdxTests := []fetchIdxTest{
// All shas.
{1, btcdb.AllShas, shas, "fetch all shas"},
//// All shas using known bounds.
{1, endBlockID, shas, "fetch all shas2"},
// Partial list starting at beginning.
{1, midBlockID, shas[:midBlockID-1], "fetch first half"},
// Partial list ending at end.
{midBlockID, endBlockID, shas[midBlockID-1 : endBlockID-1],
"fetch second half"},
// Nonexistant off the end.
{endBlockID, endBlockID * 2, []btcwire.ShaHash{},
"fetch nonexistant"},
}
for _, test := range fetchIdxTests {
t.Logf("numSha: %d - Fetch from %d to %d\n", numShas, test.start, test.end)
if shalist, err := db.FetchIdxRange(test.start, test.end); err == nil {
compareArray(t, shalist, test.exp, test.test, sync)
} else {
t.Errorf("failed to fetch index range for %s (%s)",
test.test, sync)
}
}
// Try and fetch nonexistant sha.
if db.ExistsSha(&badSha) {
t.Errorf("non existant sha exists (%s)!", sync)
}
_, _, _, err := sqlite3.FetchSha(db, &badSha)
if err == nil {
t.Errorf("Success when fetching a bad sha! (%s)", sync)
}
// XXX if not check to see it is the right value?
testIterator(t, db, shas, sync)
}
func testIterator(t *testing.T, db btcdb.Db, shas []btcwire.ShaHash,
sync string) {
// Iterate over the whole list of shas.
iter, err := db.NewIterateBlocks()
if err != nil {
t.Errorf("failed to create iterated blocks")
return
}
// Skip the genesis block.
_ = iter.NextRow()
i := 0
for ; iter.NextRow(); i++ {
key, pver, buf, err := iter.Row()
if err != nil {
t.Errorf("iter.NextRow() failed: %v (%s)", err, sync)
break
}
if i >= len(shas) {
t.Errorf("iterator returned more shas than "+
"expected - %d (%s)", i, sync)
break
}
if !key.IsEqual(&shas[i]) {
t.Errorf("iterator test: %dth sha doesn't match (%s)",
i, sync)
}
if !bytes.Equal(zeroBlock, buf) {
t.Errorf("iterator test: %d buf incorrect (%s)", i,
sync)
}
if pver != 1 {
t.Errorf("iterator: %dth pver is %d and not 1 (%s)",
i, pver, sync)
}
}
if i < len(shas) {
t.Errorf("iterator got no rows on %dth loop, should have %d "+
"(%s)", i, len(shas), sync)
}
if _, _, _, err = iter.Row(); err == nil {
t.Errorf("done iterator didn't return failure")
}
iter.Close()
}
func TestBdb(t *testing.T) {
log, err := seelog.LoggerFromWriterWithMinLevel(os.Stdout,
seelog.InfoLvl)
if err != nil {
t.Errorf("failed to create logger: %v", err)
return
}
defer log.Flush()
btcdb.UseLogger(log)
// Ignore db remove errors since it means we didn't have an old one.
_ = os.Remove("tstdb1")
db, err := btcdb.CreateDB("sqlite", "tstdb1")
if err != nil {
t.Errorf("Failed to open test database %v", err)
return
}
defer os.Remove("tstdb1")
for i := range testShas {
var previous btcwire.ShaHash
if i == 0 {
previous = btcwire.GenesisHash
} else {
previous = testShas[i-1]
}
_, err := db.InsertBlockData(&testShas[i], &previous, 1, zeroBlock)
if err != nil {
t.Errorf("Failed to insert testSha %d. Error: %v",
i, err)
return
}
testFetch(t, db, testShas[0:i+1], "pre sync ")
}
// XXX insert enough so that we hit the transaction limit
// XXX try and insert a with a bad previous
db.Sync()
testFetch(t, db, testShas, "post sync")
for i := len(testShas) - 1; i >= 0; i-- {
err := db.DropAfterBlockBySha(testShas[i])
if err != nil {
t.Errorf("drop after %d failed %v", i, err)
break
}
testFetch(t, db, testShas[:i+1],
fmt.Sprintf("post DropAfter for sha %d", i))
}
// Just tests that it doesn't crash, no return value
db.Close()
}

261
sqlite3/sqlitedbcache.go Normal file
View file

@ -0,0 +1,261 @@
// Copyright (c) 2013 Conformal Systems LLC.
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
package sqlite3
import (
"bytes"
"container/list"
"github.com/conformal/btcdb"
"github.com/conformal/btcutil"
"github.com/conformal/btcwire"
"sync"
)
type txCache struct {
maxcount int
fifo list.List
// NOTE: the key is specifically ShaHash, not *ShaHash
txMap map[btcwire.ShaHash]*txCacheObj
cacheLock sync.RWMutex
}
type txCacheObj struct {
next *txCacheObj
sha btcwire.ShaHash
blksha btcwire.ShaHash
pver uint32
tx *btcwire.MsgTx
txbuf []byte
}
type blockCache struct {
maxcount int
fifo list.List
blockMap map[btcwire.ShaHash]*blockCacheObj
cacheLock sync.RWMutex
}
type blockCacheObj struct {
next *blockCacheObj
sha btcwire.ShaHash
blk *btcutil.Block
}
// FetchBlockBySha - return a btcutil Block, object may be a cached.
func (db *SqliteDb) FetchBlockBySha(sha *btcwire.ShaHash) (blk *btcutil.Block, err error) {
blkcache, ok := db.fetchBlockCache(sha)
if ok {
return blkcache.blk, nil
}
buf, pver, height, err := db.fetchSha(*sha)
if err != nil {
return
}
blk, err = btcutil.NewBlockFromBytes(buf, pver)
if err != nil {
return
}
blk.SetHeight(height)
db.insertBlockCache(sha, blk)
return
}
// fetchBlockCache check if a block is in the block cache, if so return it.
func (db *SqliteDb) fetchBlockCache(sha *btcwire.ShaHash) (*blockCacheObj, bool) {
db.blockCache.cacheLock.RLock()
defer db.blockCache.cacheLock.RUnlock()
blkobj, ok := db.blockCache.blockMap[*sha]
if !ok { // could this just return the map deref?
return nil, false
}
return blkobj, true
}
// insertBlockCache insert the given sha/block into the cache map.
// If the block cache is determined to be full, it will release
// an old entry in FIFO order.
func (db *SqliteDb) insertBlockCache(sha *btcwire.ShaHash, blk *btcutil.Block) {
bc := &db.blockCache
bc.cacheLock.Lock()
defer bc.cacheLock.Unlock()
blkObj := blockCacheObj{sha: *sha, blk: blk}
bc.fifo.PushBack(&blkObj)
if bc.fifo.Len() > bc.maxcount {
listobj := bc.fifo.Front()
bc.fifo.Remove(listobj)
tailObj, ok := listobj.Value.(*blockCacheObj)
if ok {
delete(bc.blockMap, tailObj.sha)
} else {
panic("invalid type pushed on blockCache list")
}
}
bc.blockMap[blkObj.sha] = &blkObj
}
type TxListReply struct {
Sha *btcwire.ShaHash
Tx *btcwire.MsgTx
Err error
}
// FetchTxByShaList given a array of ShaHash, look up the transactions
// and return them in a TxListReply array.
func (db *SqliteDb) FetchTxByShaList(txShaList []*btcwire.ShaHash) []*btcdb.TxListReply {
var replies []*btcdb.TxListReply
for _, txsha := range txShaList {
tx, _, _, err := db.FetchTxBySha(txsha)
txlre := btcdb.TxListReply{Sha: txsha, Tx: tx, Err: err}
replies = append(replies, &txlre)
}
return replies
}
// FetchTxAllBySha returns several pieces of data regarding the given sha.
func (db *SqliteDb) FetchTxAllBySha(txsha *btcwire.ShaHash) (rtx *btcwire.MsgTx, rtxbuf []byte, rpver uint32, rblksha *btcwire.ShaHash, err error) {
// Check Tx cache
if txc, ok := db.fetchTxCache(txsha); ok {
return txc.tx, txc.txbuf, txc.pver, &txc.blksha, nil
}
// If not cached load it
bidx, toff, tlen, err := db.FetchLocationBySha(txsha)
if err != nil {
log.Warnf("unable to find location of origin tx %v", txsha)
return
}
blksha, err := db.FetchBlockShaByIdx(bidx)
if err != nil {
log.Warnf("block idx lookup %v to %v", bidx, err)
return
}
log.Tracef("transaction %v is at block %v %v tx %v",
txsha, blksha, bidx, toff)
blk, err := db.FetchBlockBySha(blksha)
if err != nil {
log.Warnf("unable to fetch block %v %v ",
bidx, &blksha)
return
}
blkbuf, pver, err := blk.Bytes()
if err != nil {
log.Warnf("unable to decode block %v %v", bidx, &blksha)
return
}
txbuf := make([]byte, tlen)
copy(txbuf[:], blkbuf[toff:toff+tlen])
rbuf := bytes.NewBuffer(txbuf)
var tx btcwire.MsgTx
err = tx.BtcDecode(rbuf, pver)
if err != nil {
log.Warnf("unable to decode tx block %v %v txoff %v txlen %v",
bidx, &blksha, toff, tlen)
return
}
// Shove data into TxCache
// XXX -
var txc txCacheObj
txc.sha = *txsha
txc.tx = &tx
txc.txbuf = txbuf
txc.pver = pver
txc.blksha = *blksha
db.insertTxCache(&txc)
return &tx, txbuf, pver, blksha, nil
}
// FetchTxBySha returns some data for the given Tx Sha.
func (db *SqliteDb) FetchTxBySha(txsha *btcwire.ShaHash) (rtx *btcwire.MsgTx, rpver uint32, blksha *btcwire.ShaHash, err error) {
rtx, _, rpver, blksha, err = db.FetchTxAllBySha(txsha)
return
}
// FetchTxBufBySha return the bytestream data and associated protocol version.
// for the given Tx Sha
func (db *SqliteDb) FetchTxBufBySha(txsha *btcwire.ShaHash) (txbuf []byte, rpver uint32, err error) {
_, txbuf, rpver, _, err = db.FetchTxAllBySha(txsha)
return
}
// fetchTxCache look up the given transaction in the Tx cache.
func (db *SqliteDb) fetchTxCache(sha *btcwire.ShaHash) (*txCacheObj, bool) {
tc := &db.txCache
tc.cacheLock.RLock()
defer tc.cacheLock.RUnlock()
txObj, ok := tc.txMap[*sha]
if !ok { // could this just return the map deref?
return nil, false
}
return txObj, true
}
// insertTxCache, insert the given txobj into the cache.
// if the tx cache is determined to be full, it will release
// an old entry in FIFO order.
func (db *SqliteDb) insertTxCache(txObj *txCacheObj) {
tc := &db.txCache
tc.cacheLock.Lock()
defer tc.cacheLock.Unlock()
tc.fifo.PushBack(txObj)
if tc.fifo.Len() >= tc.maxcount {
listobj := tc.fifo.Front()
tc.fifo.Remove(listobj)
tailObj, ok := listobj.Value.(*txCacheObj)
if ok {
delete(tc.txMap, tailObj.sha)
} else {
panic("invalid type pushed on tx list")
}
}
tc.txMap[txObj.sha] = txObj
}
// InvalidateTxCache clear/release all cached transactions.
func (db *SqliteDb) InvalidateTxCache() {
tc := &db.txCache
tc.cacheLock.Lock()
defer tc.cacheLock.Unlock()
tc.txMap = map[btcwire.ShaHash]*txCacheObj{}
tc.fifo = list.List{}
}
// InvalidateTxCache clear/release all cached blocks.
func (db *SqliteDb) InvalidateBlockCache() {
bc := &db.blockCache
bc.cacheLock.Lock()
defer bc.cacheLock.Unlock()
bc.blockMap = map[btcwire.ShaHash]*blockCacheObj{}
bc.fifo = list.List{}
}
// InvalidateCache clear/release all cached blocks and transactions.
func (db *SqliteDb) InvalidateCache() {
db.InvalidateTxCache()
db.InvalidateBlockCache()
}

253
sqlite3/sqlitetx.go Normal file
View file

@ -0,0 +1,253 @@
// Copyright (c) 2013 Conformal Systems LLC.
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
package sqlite3
import (
"database/sql"
"github.com/conformal/btcdb"
"github.com/conformal/btcwire"
_ "github.com/mattn/go-sqlite3"
)
// InsertTx inserts a tx hash and its associated data into the database.
func (db *SqliteDb) InsertTx(txsha *btcwire.ShaHash, blockidx int64, txoff int, txlen int, usedbuf []byte) (err error) {
db.dbLock.Lock()
defer db.dbLock.Unlock()
return db.insertTx(txsha, blockidx, txoff, txlen, usedbuf)
}
// insertTx inserts a tx hash and its associated data into the database.
// Must be called with db lock held.
func (db *SqliteDb) insertTx(txsha *btcwire.ShaHash, blockidx int64, txoff int, txlen int, usedbuf []byte) (err error) {
tx := &db.txState
if tx.tx == nil {
err = db.startTx()
if err != nil {
return
}
}
blockid := blockidx + 1
txd := tTxInsertData{txsha: txsha, blockid: blockid, txoff: txoff, txlen: txlen, usedbuf: usedbuf}
log.Tracef("inserting tx %v for block %v off %v len %v",
txsha, blockid, txoff, txlen)
rowBytes := txsha.String()
var op int // which table to insert data into.
if db.UseTempTX {
var tblockid int64
var ttxoff int
var ttxlen int
txop := db.txop(txFetchLocationByShaStmt)
row := txop.QueryRow(rowBytes)
err = row.Scan(&tblockid, &ttxoff, &ttxlen)
if err != sql.ErrNoRows {
// sha already present
err = btcdb.DuplicateSha
return
}
op = txtmpInsertStmt
} else {
op = txInsertStmt
}
txop := db.txop(op)
_, err = txop.Exec(rowBytes, blockid, txoff, txlen, usedbuf)
if err != nil {
log.Warnf("failed to insert %v %v %v", txsha, blockid, err)
return
}
if db.UseTempTX {
db.TempTblSz++
}
// put in insert list for replay
tx.txInsertList = append(tx.txInsertList, txd)
return
}
// FetchLocationBySha looks up the Tx sha information by name.
func (db *SqliteDb) FetchLocationBySha(txsha *btcwire.ShaHash) (blockidx int64, txoff int, txlen int, err error) {
db.dbLock.Lock()
defer db.dbLock.Unlock()
return db.fetchLocationBySha(txsha)
}
// fetchLocationBySha look up the Tx sha information by name.
// Must be called with db lock held.
func (db *SqliteDb) fetchLocationBySha(txsha *btcwire.ShaHash) (blockidx int64, txoff int, txlen int, err error) {
var row *sql.Row
var blockid int64
var ttxoff int
var ttxlen int
rowBytes := txsha.String()
txop := db.txop(txFetchLocationByShaStmt)
row = txop.QueryRow(rowBytes)
err = row.Scan(&blockid, &ttxoff, &ttxlen)
if err == sql.ErrNoRows {
txop = db.txop(txtmpFetchLocationByShaStmt)
row = txop.QueryRow(rowBytes)
err = row.Scan(&blockid, &ttxoff, &ttxlen)
if err == sql.ErrNoRows {
err = btcdb.TxShaMissing
return
}
if err != nil {
log.Warnf("txtmp FetchLocationBySha: fail %v",
err)
return
}
}
if err != nil {
log.Warnf("FetchLocationBySha: fail %v", err)
return
}
blockidx = blockid - 1
txoff = ttxoff
txlen = ttxlen
return
}
// FetchTxUsedBySha returns the used/spent buffer for a given transaction.
func (db *SqliteDb) FetchTxUsedBySha(txsha *btcwire.ShaHash) (spentbuf []byte, err error) {
var row *sql.Row
db.dbLock.Lock()
defer db.dbLock.Unlock()
rowBytes := txsha.String()
txop := db.txop(txFetchUsedByShaStmt)
row = txop.QueryRow(rowBytes)
var databytes []byte
err = row.Scan(&databytes)
if err == sql.ErrNoRows {
txop := db.txop(txtmpFetchUsedByShaStmt)
row = txop.QueryRow(rowBytes)
err = row.Scan(&databytes)
if err == sql.ErrNoRows {
err = btcdb.TxShaMissing
return
}
if err != nil {
log.Warnf("txtmp FetchLocationBySha: fail %v",
err)
return
}
}
if err != nil {
log.Warnf("FetchUsedBySha: fail %v", err)
return
}
spentbuf = databytes
return
}
var vaccumDbNextMigrate bool
// migrateTmpTable functions to perform internal db optimization when
// performing large numbers of database inserts. When in Fast operation
// mode, it inserts into txtmp, then when that table reaches a certain
// size limit it moves all tx in the txtmp table into the primary tx
// table and recomputes the index on the primary tx table.
func (db *SqliteDb) migrateTmpTable() error {
db.endTx(true)
db.startTx() // ???
db.UseTempTX = false
db.TempTblSz = 0
var doVacuum bool
var nsteps int
if vaccumDbNextMigrate {
nsteps = 6
vaccumDbNextMigrate = false
doVacuum = true
} else {
nsteps = 5
vaccumDbNextMigrate = true
}
log.Infof("db compaction Stage 1/%v: Preparing", nsteps)
txop := db.txop(txMigratePrep)
_, err := txop.Exec()
if err != nil {
log.Warnf("Failed to prepare migrate - %v", err)
return err
}
log.Infof("db compaction Stage 2/%v: Copying", nsteps)
txop = db.txop(txMigrateCopy)
_, err = txop.Exec()
if err != nil {
log.Warnf("Migrate read failed - %v", err)
return err
}
log.Tracef("db compaction Stage 2a/%v: Enable db vacuum", nsteps)
txop = db.txop(txPragmaVacuumOn)
_, err = txop.Exec()
if err != nil {
log.Warnf("Migrate error trying to enable vacuum on "+
"temporary transaction table - %v", err)
return err
}
log.Infof("db compaction Stage 3/%v: Clearing old data", nsteps)
txop = db.txop(txMigrateClear)
_, err = txop.Exec()
if err != nil {
log.Warnf("Migrate error trying to clear temporary "+
"transaction table - %v", err)
return err
}
log.Tracef("db compaction Stage 3a/%v: Disable db vacuum", nsteps)
txop = db.txop(txPragmaVacuumOff)
_, err = txop.Exec()
if err != nil {
log.Warnf("Migrate error trying to disable vacuum on "+
"temporary transaction table - %v", err)
return err
}
log.Infof("db compaction Stage 4/%v: Rebuilding index", nsteps)
txop = db.txop(txMigrateFinish)
_, err = txop.Exec()
if err != nil {
log.Warnf("Migrate error trying to clear temporary "+
"transaction table - %v", err)
return err
}
log.Infof("db compaction Stage 5/%v: Finalizing transaction", nsteps)
db.endTx(true) // ???
if doVacuum {
log.Infof("db compaction Stage 6/%v: Optimizing database", nsteps)
txop = db.txop(txVacuum)
_, err = txop.Exec()
if err != nil {
log.Warnf("migrate error trying to clear txtmp tbl %v", err)
return err
}
}
log.Infof("db compaction: Complete")
// TODO(drahn) - determine if this should be turned back on or not
db.UseTempTX = true
return nil
}

BIN
sqlite3/testdata/blocks1-256.bz2 vendored Normal file

Binary file not shown.