// Copyright (c) 2013 Conformal Systems LLC. // Use of this source code is governed by an ISC // license that can be found in the LICENSE file. package sqlite3 import ( "database/sql" "fmt" "github.com/conformal/btcdb" "github.com/conformal/btcutil" "github.com/conformal/btcwire" "github.com/conformal/seelog" _ "github.com/mattn/go-sqlite3" "os" "sync" ) const ( dbVersion int = 2 dbMaxTransCnt = 20000 dbMaxTransMem = 64 * 1024 * 1024 // 64 MB ) const ( blkInsertSha = iota blkFetchSha blkExistsSha blkFetchIdx blkFetchIdxList ) const ( txInsertStmt = iota txFetchUsedByShaStmt txFetchLocationByShaStmt txtmpInsertStmt txtmpFetchUsedByShaStmt txtmpFetchLocationByShaStmt txMigrateCopy txMigrateClear txMigratePrep txMigrateFinish txMigrateCount txPragmaVacuumOn txPragmaVacuumOff txVacuum ) var blkqueries []string = []string{ blkInsertSha: "INSERT INTO block (key, pver, data) VALUES(?, ?, ?);", blkFetchSha: "SELECT pver, data, blockid FROM block WHERE key = ?;", blkExistsSha: "SELECT pver FROM block WHERE key = ?;", blkFetchIdx: "SELECT key FROM block WHERE blockid = ?;", blkFetchIdxList: "SELECT key FROM block WHERE blockid >= ? AND blockid < ? ORDER BY blockid ASC LIMIT 500;", } var txqueries []string = []string{ txInsertStmt: "INSERT INTO tx (key, blockid, txoff, txlen, data) VALUES(?, ?, ?, ?, ?);", txFetchUsedByShaStmt: "SELECT data FROM tx WHERE key = ?;", txFetchLocationByShaStmt: "SELECT blockid, txoff, txlen FROM tx WHERE key = ?;", txtmpInsertStmt: "INSERT INTO txtmp (key, blockid, txoff, txlen, data) VALUES(?, ?, ?, ?, ?);", txtmpFetchUsedByShaStmt: "SELECT data FROM txtmp WHERE key = ?;", txtmpFetchLocationByShaStmt: "SELECT blockid, txoff, txlen FROM txtmp WHERE key = ?;", txMigrateCopy: "INSERT INTO tx (key, blockid, txoff, txlen, data) SELECT key, blockid, txoff, txlen, data FROM txtmp;", txMigrateClear: "DELETE from txtmp;", txMigratePrep: "DROP index IF EXISTS uniquetx;", txMigrateFinish: "CREATE UNIQUE INDEX IF NOT EXISTS uniquetx ON tx (key);", txMigrateCount: "SELECT COUNT(*) FROM txtmp;", txPragmaVacuumOn: "PRAGMA auto_vacuum = FULL;", txPragmaVacuumOff: "PRAGMA auto_vacuum = NONE;", txVacuum: "VACUUM;", } var log seelog.LoggerInterface = seelog.Disabled type tBlockInsertData struct { sha btcwire.ShaHash pver uint32 buf []byte } type tTxInsertData struct { txsha *btcwire.ShaHash blockid int64 txoff int txlen int usedbuf []byte } type txState struct { tx *sql.Tx writeCount int txDataSz int txInsertList []interface{} } type SqliteDb struct { sqldb *sql.DB blkStmts []*sql.Stmt blkBaseStmts []*sql.Stmt txStmts []*sql.Stmt txBaseStmts []*sql.Stmt txState txState dbLock sync.Mutex lastBlkShaCached bool lastBlkSha btcwire.ShaHash lastBlkIdx int64 txCache txCache blockCache blockCache UseTempTX bool TempTblSz int TempTblMax int dbInsertMode btcdb.InsertMode } var self = btcdb.DriverDB{DbType: "sqlite", Create: CreateSqliteDB, Open: OpenSqliteDB} func init() { btcdb.AddDBDriver(self) } // createDB configure the database, setting up all tables to initial state. func createDB(db *sql.DB) error { log.Infof("Initializing new block database") // XXX check for old tables buildTables := []string{ "CREATE TABLE dbversion (version integer);", "CREATE TABLE block ( blockid INTEGER PRIMARY KEY, key BLOB UNIQUE, " + "pver INTEGER NOT NULL, data BLOB NOT NULL);", "INSERT INTO dbversion (version) VALUES (" + fmt.Sprintf("%d", dbVersion) + ");", } buildtxTables := []string{ "CREATE TABLE tx (txidx INTEGER PRIMARY KEY, " + "key TEXT, " + "blockid INTEGER NOT NULL, " + "txoff INTEGER NOT NULL, txlen INTEGER NOT NULL, " + "data BLOB NOT NULL, " + "FOREIGN KEY(blockid) REFERENCES block(blockid));", "CREATE TABLE txtmp (key TEXT PRIMARY KEY, " + "blockid INTEGER NOT NULL, " + "txoff INTEGER NOT NULL, txlen INTEGER NOT NULL, " + "data BLOB NOT NULL, " + "FOREIGN KEY(blockid) REFERENCES block(blockid));", "CREATE UNIQUE INDEX uniquetx ON tx (key);", } for _, sql := range buildTables { _, err := db.Exec(sql) if err != nil { log.Warnf("sql table op failed %v [%v]", err, sql) return err } } for _, sql := range buildtxTables { _, err := db.Exec(sql) if err != nil { log.Warnf("sql table op failed %v [%v]", err, sql) return err } } // Insert the genesis block. err := insertGenesisBlock(db) if err != nil { return err } return nil } // OpenSqliteDB opens an existing database for use. func OpenSqliteDB(filepath string) (pbdb btcdb.Db, err error) { log = btcdb.GetLog() return newOrCreateSqliteDB(filepath, false) } // CreateSqliteDB creates, initializes and opens a database for use. func CreateSqliteDB(filepath string) (pbdb btcdb.Db, err error) { log = btcdb.GetLog() return newOrCreateSqliteDB(filepath, true) } // newOrCreateSqliteDB opens a database, either creating it or opens // existing database based on flag. func newOrCreateSqliteDB(filepath string, create bool) (pbdb btcdb.Db, err error) { var bdb SqliteDb if create == false { _, err = os.Stat(filepath) if err != nil { return nil, btcdb.DbDoesNotExist } } db, err := sql.Open("sqlite3", filepath) if err != nil { log.Warnf("db open failed %v\n", err) return nil, err } dbverstmt, err := db.Prepare("SELECT version FROM dbversion;") if err != nil { // about the only reason this would fail is that the database // is not initialized if create == false { return nil, btcdb.DbDoesNotExist } err = createDB(db) if err != nil { // already warned in the called function return nil, err } dbverstmt, err = db.Prepare("SELECT version FROM dbversion;") if err != nil { // if it failed this a second time, fail. return nil, err } } row := dbverstmt.QueryRow() var version int err = row.Scan(&version) if err != nil { log.Warnf("unable to find db version: no row\n", err) } switch version { case dbVersion: // all good default: log.Warnf("mismatch db version: %v expected %v\n", version, dbVersion) return nil, fmt.Errorf("Invalid version in database") } db.Exec("PRAGMA foreign_keys = ON;") db.Exec("PRAGMA journal_mode=WAL;") bdb.sqldb = db bdb.blkStmts = make([]*sql.Stmt, len(blkqueries)) bdb.blkBaseStmts = make([]*sql.Stmt, len(blkqueries)) for i := range blkqueries { stmt, err := db.Prepare(blkqueries[i]) if err != nil { // XXX log/ return nil, err } bdb.blkBaseStmts[i] = stmt } for i := range bdb.blkBaseStmts { bdb.blkStmts[i] = bdb.blkBaseStmts[i] } bdb.txBaseStmts = make([]*sql.Stmt, len(txqueries)) for i := range txqueries { stmt, err := db.Prepare(txqueries[i]) if err != nil { // XXX log/ return nil, err } bdb.txBaseStmts[i] = stmt } // NOTE: all array entries in txStmts remain nil'ed // tx statements are lazy bound bdb.txStmts = make([]*sql.Stmt, len(txqueries)) bdb.blockCache.maxcount = 150 bdb.blockCache.blockMap = map[btcwire.ShaHash]*blockCacheObj{} bdb.txCache.maxcount = 2000 bdb.txCache.txMap = map[btcwire.ShaHash]*txCacheObj{} bdb.UseTempTX = true bdb.TempTblMax = 1000000 return &bdb, nil } // Sync verifies that the database is coherent on disk, // and no outstanding transactions are in flight. func (db *SqliteDb) Sync() { db.dbLock.Lock() defer db.dbLock.Unlock() db.endTx(true) } // syncPoint notifies the db that this is a safe time to sync the database, // if there are many outstanding transactions. // Must be called with db lock held. func (db *SqliteDb) syncPoint() { tx := &db.txState if db.TempTblSz > db.TempTblMax { err := db.migrateTmpTable() if err != nil { return } } else { if len(tx.txInsertList) > dbMaxTransCnt || tx.txDataSz > dbMaxTransMem { db.endTx(true) } } } // Close cleanly shuts down database, syncing all data. func (db *SqliteDb) Close() { db.dbLock.Lock() defer db.dbLock.Unlock() db.close() } // RollbackClose discards the recent database changes to the previously // saved data at last Sync. func (db *SqliteDb) RollbackClose() { db.dbLock.Lock() defer db.dbLock.Unlock() tx := &db.txState if tx.tx != nil { err := tx.tx.Rollback() if err != nil { log.Debugf("Rollback failed: %v", err) } } db.close() } // close performs the internal shutdown/close operation. func (db *SqliteDb) close() { db.endTx(true) db.InvalidateCache() for i := range db.blkBaseStmts { db.blkBaseStmts[i].Close() } for i := range db.txBaseStmts { if db.txBaseStmts[i] != nil { db.txBaseStmts[i].Close() db.txBaseStmts[i] = nil } } db.sqldb.Close() } // txop returns the appropriately prepared statement, based on // transaction state of the database. func (db *SqliteDb) txop(op int) *sql.Stmt { if db.txStmts[op] != nil { return db.txStmts[op] } if db.txState.tx == nil { // we are not in a transaction, return the base statement return db.txBaseStmts[op] } if db.txStmts[op] == nil { db.txStmts[op] = db.txState.tx.Stmt(db.txBaseStmts[op]) } return db.txStmts[op] } // startTx starts a transaction, preparing or scrubbing statements // for proper operation inside a transaction. func (db *SqliteDb) startTx() (err error) { tx := &db.txState if tx.tx != nil { // this shouldn't happen... log.Warnf("Db startTx called while in a transaction") return } tx.tx, err = db.sqldb.Begin() if err != nil { log.Warnf("Db startTx: begin failed %v", err) tx.tx = nil return } for i := range db.blkBaseStmts { db.blkStmts[i] = tx.tx.Stmt(db.blkBaseStmts[i]) } for i := range db.txBaseStmts { db.txStmts[i] = nil // these are lazily prepared } return } // endTx commits the current active transaction, it zaps all of the prepared // statements associated with the transaction. func (db *SqliteDb) endTx(recover bool) (err error) { tx := &db.txState if tx.tx == nil { return } err = tx.tx.Commit() if err != nil && recover { // XXX - double check that the tx is dead after // commit failure (rollback?) log.Warnf("Db endTx: commit failed %v", err) err = db.rePlayTransaction() if err != nil { // We tried, return failure (after zeroing state) // so the upper level can notice and restart } } for i := range db.blkBaseStmts { db.blkStmts[i].Close() db.blkStmts[i] = db.blkBaseStmts[i] } for i := range db.txStmts { if db.txStmts[i] != nil { db.txStmts[i].Close() db.txStmts[i] = nil } } tx.tx = nil var emptyTxList []interface{} tx.txInsertList = emptyTxList tx.txDataSz = 0 return } // rePlayTransaction will attempt to re-execute inserts performed // sync the beginning of a transaction. This is to be used after // a sql Commit operation fails to keep the database from losing data. func (db *SqliteDb) rePlayTransaction() (err error) { err = db.startTx() if err != nil { return } tx := &db.txState for _, ins := range tx.txInsertList { switch v := ins.(type) { case tBlockInsertData: block := v _, err = db.blkStmts[blkInsertSha].Exec(block.sha.Bytes(), block.pver, block.buf) if err != nil { break } case tTxInsertData: txd := v txnamebytes := txd.txsha.Bytes() txop := db.txop(txInsertStmt) _, err = txop.Exec(txd.blockid, txnamebytes, txd.txoff, txd.txlen, txd.usedbuf) if err != nil { break } } } // This function is called even if we have failed. // We need to clean up so the database can be used again. // However we want the original error not any new error, // unless there was no original error but the commit fails. err2 := db.endTx(false) if err == nil && err2 != nil { err = err2 } return } // DropAfterBlockBySha will remove any blocks from the database after the given block. // It terminates any existing transaction and performs its operations in an // atomic transaction, it is terminated (committed) before exit. func (db *SqliteDb) DropAfterBlockBySha(sha btcwire.ShaHash) (err error) { var row *sql.Row db.dbLock.Lock() defer db.dbLock.Unlock() db.InvalidateCache() // This is a destructive operation and involves multiple requests // so requires a transaction, terminate any transaction to date // and start a new transaction err = db.endTx(true) if err != nil { return err } err = db.startTx() if err != nil { return err } // also drop any cached sha data db.lastBlkShaCached = false querystr := "SELECT blockid FROM block WHERE key = ?;" tx := &db.txState row = tx.tx.QueryRow(querystr, sha.Bytes()) var blockidx uint64 err = row.Scan(&blockidx) if err != nil { // XXX db.endTx(false) return err } _, err = tx.tx.Exec("DELETE FROM txtmp WHERE blockid > ?", blockidx) if err != nil { // XXX db.endTx(false) return err } _, err = tx.tx.Exec("DELETE FROM tx WHERE blockid > ?", blockidx) if err != nil { // XXX db.endTx(false) return err } // delete from block last in case of foreign keys _, err = tx.tx.Exec("DELETE FROM block WHERE blockid > ?", blockidx) if err != nil { // XXX db.endTx(false) return err } err = db.endTx(true) if err != nil { return err } return } // InsertBlock inserts the block data and transaction data from a block // into the database. func (db *SqliteDb) InsertBlock(block *btcutil.Block) (height int64, err error) { db.dbLock.Lock() defer db.dbLock.Unlock() blocksha, err := block.Sha() if err != nil { log.Warnf("Failed to compute block sha %v", blocksha) return } mblock := block.MsgBlock() rawMsg, pver, err := block.Bytes() if err != nil { log.Warnf("Failed to obtain raw block sha %v", blocksha) return } txloc, err := block.TxLoc() if err != nil { log.Warnf("Failed to obtain raw block sha %v", blocksha) return } // Insert block into database newheight, err := db.insertBlockData(blocksha, &mblock.Header.PrevBlock, pver, rawMsg) if err != nil { log.Warnf("Failed to insert block %v %v %v", blocksha, &mblock.Header.PrevBlock, err) return } // At least two blocks in the long past were generated by faulty // miners, the sha of the transaction exists in a previous block, // detect this condition and 'accept' the block. for txidx, tx := range mblock.Transactions { var txsha btcwire.ShaHash txsha, err = tx.TxSha(pver) if err != nil { log.Warnf("failed to compute tx name block %v idx %v err %v", blocksha, txidx, err) return } // Some old blocks contain duplicate transactions // Attempt to cleanly bypass this problem // http://blockexplorer.com/b/91842 // http://blockexplorer.com/b/91880 if newheight == 91842 { dupsha, err := btcwire.NewShaHashFromStr("d5d27987d2a3dfc724e359870c6644b40e497bdc0589a033220fe15429d88599") if err != nil { panic("invalid sha string in source") } if txsha == *dupsha { log.Tracef("skipping sha %v %v", dupsha, newheight) continue } } if newheight == 91880 { dupsha, err := btcwire.NewShaHashFromStr("e3bf3d07d4b0375638d5f1db5255fe07ba2c4cb067cd81b84ee974b6585fb468") if err != nil { panic("invalid sha string in source") } if txsha == *dupsha { log.Tracef("skipping sha %v %v", dupsha, newheight) continue } } spentbuflen := (len(tx.TxOut) + 7) / 8 spentbuf := make([]byte, spentbuflen, spentbuflen) err = db.insertTx(&txsha, newheight, txloc[txidx].TxStart, txloc[txidx].TxLen, spentbuf) if err != nil { log.Warnf("block %v idx %v failed to insert tx %v %v err %v", &blocksha, newheight, &txsha, txidx, err) var oBlkIdx int64 oBlkIdx, _, _, err = db.fetchLocationBySha(&txsha) log.Warnf("oblkidx %v err %v", oBlkIdx, err) return } } db.syncPoint() return newheight, nil } // SetDBInsertMode provides hints to the database to how the application // is running this allows the database to work in optimized modes when the // database may be very busy. func (db *SqliteDb) SetDBInsertMode(newmode btcdb.InsertMode) { oldMode := db.dbInsertMode switch newmode { case btcdb.InsertNormal: // Normal mode inserts tx directly into the tx table db.UseTempTX = false db.dbInsertMode = newmode switch oldMode { case btcdb.InsertFast: if db.TempTblSz != 0 { err := db.migrateTmpTable() if err != nil { return } } case btcdb.InsertValidatedInput: // generate tx indexes txop := db.txop(txMigrateFinish) _, err := txop.Exec() if err != nil { log.Warnf("Failed to create tx table index - %v", err) } } case btcdb.InsertFast: // Fast mode inserts tx into txtmp with validation, // then dumps to tx then rebuilds indexes at thresholds db.UseTempTX = true if oldMode != btcdb.InsertNormal { log.Warnf("switching between invalid DB modes") break } db.dbInsertMode = newmode case btcdb.InsertValidatedInput: // ValidatedInput mode inserts into tx table with // no duplicate checks, then builds index on exit from // ValidatedInput mode if oldMode != btcdb.InsertNormal { log.Warnf("switching between invalid DB modes") break } // remove tx table index txop := db.txop(txMigratePrep) _, err := txop.Exec() if err != nil { log.Warnf("Failed to clear tx table index - %v", err) } db.dbInsertMode = newmode // XXX db.UseTempTX = false } }