From 66731c1a1ea36eb7723b9b106772a7e3580338cb Mon Sep 17 00:00:00 2001 From: Dale Rahn Date: Thu, 11 Jul 2013 16:35:50 -0400 Subject: [PATCH] Implement TxOut Spend tracking. Return spent data in TxListReply. Unspend coins when blocks are removed during DropAfterBlock. --- db.go | 8 +- sqlite3/insertremove_test.go | 217 ++++++++++++++++++++++++++ sqlite3/operational_test.go | 4 +- sqlite3/sqlite.go | 220 +++++++++++++++++++++++--- sqlite3/sqliteblock.go | 12 +- sqlite3/sqliteblock_test.go | 292 ----------------------------------- sqlite3/sqlitedbcache.go | 161 ++++++++++++++++--- sqlite3/sqlitetx.go | 50 +++++- 8 files changed, 611 insertions(+), 353 deletions(-) create mode 100644 sqlite3/insertremove_test.go delete mode 100644 sqlite3/sqliteblock_test.go diff --git a/db.go b/db.go index 7f2d0493..81cc9fee 100644 --- a/db.go +++ b/db.go @@ -161,9 +161,11 @@ type DriverDB struct { // TxListReply is used to return individual transaction information when // data about multiple transactions is requested in a single call. type TxListReply struct { - Sha *btcwire.ShaHash - Tx *btcwire.MsgTx - Err error + Sha *btcwire.ShaHash + Tx *btcwire.MsgTx + Height int64 + TxSpent []bool + Err error } // driverList holds all of the registered database backends. diff --git a/sqlite3/insertremove_test.go b/sqlite3/insertremove_test.go new file mode 100644 index 00000000..f639369e --- /dev/null +++ b/sqlite3/insertremove_test.go @@ -0,0 +1,217 @@ +// Copyright (c) 2013 Conformal Systems LLC. +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +package sqlite3_test + +import ( + //"compress/bzip2" + //"encoding/binary" + "github.com/conformal/btcdb" + "github.com/conformal/btcdb/sqlite3" + "github.com/conformal/btcutil" + "github.com/conformal/btcwire" + //"io" + "fmt" + "os" + "path/filepath" + //"strings" + "testing" +) + +var tstBlocks []*btcutil.Block + +func loadblocks(t *testing.T) []*btcutil.Block { + if len(tstBlocks) != 0 { + return tstBlocks + } + + testdatafile := filepath.Join("testdata", "blocks1-256.bz2") + blocks, err := loadBlocks(t, testdatafile) + if err != nil { + t.Errorf("Unable to load blocks from test data: %v", err) + return nil + } + tstBlocks = blocks + return blocks +} + +func TestUnspentInsert(t *testing.T) { + testUnspentInsert(t, dbTmDefault) + testUnspentInsert(t, dbTmNormal) + testUnspentInsert(t, dbTmFast) +} + +// insert every block in the test chain +// after each insert, fetch all the tx affected by the latest +// block and verify that the the tx is spent/unspent +// new tx should be fully unspent, referenced tx should have +// the associated txout set to spent. +func testUnspentInsert(t *testing.T, mode int) { + // Ignore db remove errors since it means we didn't have an old one. + dbname := "tstdbuspnt1" + _ = os.Remove(dbname) + db, err := btcdb.CreateDB("sqlite", dbname) + if err != nil { + t.Errorf("Failed to open test database %v", err) + return + } + defer os.Remove(dbname) + defer db.Close() + + switch mode { + case dbTmDefault: // default + // no setup + case dbTmNormal: // explicit normal + db.SetDBInsertMode(btcdb.InsertNormal) + case dbTmFast: // fast mode + db.SetDBInsertMode(btcdb.InsertFast) + if sqldb, ok := db.(*sqlite3.SqliteDb); ok { + sqldb.TempTblMax = 100 + } else { + t.Errorf("not right type") + } + case dbTmNoVerify: // validated block + t.Errorf("UnspentInsert test is not valid in NoVerify mode") + } + + // Since we are dealing with small dataset, reduce cache size + sqlite3.SetBlockCacheSize(db, 2) + sqlite3.SetTxCacheSize(db, 3) + + blocks := loadblocks(t) +endtest: + for height := int64(1); height < int64(len(blocks)); height++ { + + block := blocks[height] + // look up inputs to this x + mblock := block.MsgBlock() + var txneededList []*btcwire.ShaHash + var txlookupList []*btcwire.ShaHash + var txOutList []*btcwire.ShaHash + var txInList []*btcwire.OutPoint + for _, tx := range mblock.Transactions { + for _, txin := range tx.TxIn { + if txin.PreviousOutpoint.Index == uint32(4294967295) { + continue + } + origintxsha := &txin.PreviousOutpoint.Hash + + txInList = append(txInList, &txin.PreviousOutpoint) + txneededList = append(txneededList, origintxsha) + txlookupList = append(txlookupList, origintxsha) + + if !db.ExistsTxSha(origintxsha) { + t.Errorf("referenced tx not found %v ", origintxsha) + } + + } + txshaname, _ := tx.TxSha(block.ProtocolVersion()) + txlookupList = append(txlookupList, &txshaname) + txOutList = append(txOutList, &txshaname) + } + + txneededmap := map[btcwire.ShaHash]*btcdb.TxListReply{} + txlist := db.FetchTxByShaList(txneededList) + for _, txe := range txlist { + if txe.Err != nil { + t.Errorf("tx list fetch failed %v err %v ", txe.Sha, txe.Err) + break endtest + } + txneededmap[*txe.Sha] = txe + } + for _, spend := range txInList { + itxe := txneededmap[spend.Hash] + if itxe.TxSpent[spend.Index] == true { + t.Errorf("txin %v:%v is already spent", spend.Hash, spend.Index) + } + } + + newheight, err := db.InsertBlock(block) + if err != nil { + t.Errorf("failed to insert block %v err %v", height, err) + break endtest + } + if newheight != height { + t.Errorf("height mismatch expect %v returned %v", height, newheight) + break endtest + } + + txlookupmap := map[btcwire.ShaHash]*btcdb.TxListReply{} + txlist = db.FetchTxByShaList(txlookupList) + for _, txe := range txlist { + if txe.Err != nil { + t.Errorf("tx list fetch failed %v err %v ", txe.Sha, txe.Err) + break endtest + } + txlookupmap[*txe.Sha] = txe + } + for _, spend := range txInList { + itxe := txlookupmap[spend.Hash] + if itxe.TxSpent[spend.Index] == false { + fmt.Printf("?? txin %v:%v is unspent %v\n", spend.Hash, spend.Index, itxe.TxSpent) + t.Errorf("txin %v:%v is unspent %v", spend.Hash, spend.Index, itxe.TxSpent) + } + } + for _, txo := range txOutList { + itxe := txlookupmap[*txo] + for i, spent := range itxe.TxSpent { + if spent == true { + t.Errorf("freshly inserted tx %v already spent %v", txo, i) + } + } + + } + if len(txInList) == 0 { + continue + } + dropblock := blocks[height-1] + dropsha, _ := dropblock.Sha() + + err = db.DropAfterBlockBySha(dropsha) + if err != nil { + t.Errorf("failed to drop block %v err %v", height, err) + break endtest + } + + txlookupmap = map[btcwire.ShaHash]*btcdb.TxListReply{} + txlist = db.FetchTxByShaList(txlookupList) + for _, txe := range txlist { + if txe.Err != nil { + if _, ok := txneededmap[*txe.Sha]; ok { + t.Errorf("tx list fetch failed %v err %v ", txe.Sha, txe.Err) + break endtest + } + } + txlookupmap[*txe.Sha] = txe + } + for _, spend := range txInList { + itxe := txlookupmap[spend.Hash] + if itxe.TxSpent[spend.Index] == true { + fmt.Printf("?? txin %v:%v is spent %v\n", spend.Hash, spend.Index, itxe.TxSpent) + t.Errorf("txin %v:%v is unspent %v", spend.Hash, spend.Index, itxe.TxSpent) + } + } + newheight, err = db.InsertBlock(block) + if err != nil { + t.Errorf("failed to insert block %v err %v", height, err) + break endtest + } + txlookupmap = map[btcwire.ShaHash]*btcdb.TxListReply{} + txlist = db.FetchTxByShaList(txlookupList) + for _, txe := range txlist { + if txe.Err != nil { + t.Errorf("tx list fetch failed %v err %v ", txe.Sha, txe.Err) + break endtest + } + txlookupmap[*txe.Sha] = txe + } + for _, spend := range txInList { + itxe := txlookupmap[spend.Hash] + if itxe.TxSpent[spend.Index] == false { + fmt.Printf("?? txin %v:%v is unspent %v\n", spend.Hash, spend.Index, itxe.TxSpent) + t.Errorf("txin %v:%v is unspent %v", spend.Hash, spend.Index, itxe.TxSpent) + } + } + } +} diff --git a/sqlite3/operational_test.go b/sqlite3/operational_test.go index e731188d..5ddc0b3a 100644 --- a/sqlite3/operational_test.go +++ b/sqlite3/operational_test.go @@ -246,7 +246,7 @@ func testBackout(t *testing.T, mode int) { _ = db.ExistsSha(sha) _, err = db.FetchBlockBySha(sha) if err != nil { - t.Errorf("failed to load block 99 from db", err) + t.Errorf("failed to load block 99 from db %v", err) } sha, err = blocks[110].Sha() @@ -266,7 +266,7 @@ func testBackout(t *testing.T, mode int) { txsha, err := mblock.Transactions[0].TxSha(block.ProtocolVersion()) exists := db.ExistsTxSha(&txsha) if exists { - t.Errorf("tx %v exists in db, failure expected") + t.Errorf("tx %v exists in db, failure expected", txsha) } _, _, _, err = db.FetchTxBySha(&txsha) diff --git a/sqlite3/sqlite.go b/sqlite3/sqlite.go index e41b7148..69f52b79 100644 --- a/sqlite3/sqlite.go +++ b/sqlite3/sqlite.go @@ -34,9 +34,15 @@ const ( txInsertStmt = iota txFetchUsedByShaStmt txFetchLocationByShaStmt + txFetchLocUsedByShaStmt + txUpdateUsedByShaStmt + txtmpInsertStmt txtmpFetchUsedByShaStmt txtmpFetchLocationByShaStmt + txtmpFetchLocUsedByShaStmt + txtmpUpdateUsedByShaStmt + txMigrateCopy txMigrateClear txMigratePrep @@ -58,22 +64,28 @@ var blkqueries []string = []string{ } var txqueries []string = []string{ - txInsertStmt: "INSERT INTO tx (key, blockid, txoff, txlen, data) VALUES(?, ?, ?, ?, ?);", - txFetchUsedByShaStmt: "SELECT data FROM tx WHERE key = ?;", - txFetchLocationByShaStmt: "SELECT blockid, txoff, txlen FROM tx WHERE key = ?;", + txInsertStmt: "INSERT INTO tx (key, blockid, txoff, txlen, data) VALUES(?, ?, ?, ?, ?);", + txFetchUsedByShaStmt: "SELECT data FROM tx WHERE key = ?;", + txFetchLocationByShaStmt: "SELECT blockid, txoff, txlen FROM tx WHERE key = ?;", + txFetchLocUsedByShaStmt: "SELECT blockid, txoff, txlen, data FROM tx WHERE key = ?;", + txUpdateUsedByShaStmt: "UPDATE tx SET data = ? WHERE key = ?;", + txtmpInsertStmt: "INSERT INTO txtmp (key, blockid, txoff, txlen, data) VALUES(?, ?, ?, ?, ?);", txtmpFetchUsedByShaStmt: "SELECT data FROM txtmp WHERE key = ?;", txtmpFetchLocationByShaStmt: "SELECT blockid, txoff, txlen FROM txtmp WHERE key = ?;", - txMigrateCopy: "INSERT INTO tx (key, blockid, txoff, txlen, data) SELECT key, blockid, txoff, txlen, data FROM txtmp;", - txMigrateClear: "DELETE from txtmp;", - txMigratePrep: "DROP index IF EXISTS uniquetx;", - txMigrateFinish: "CREATE UNIQUE INDEX IF NOT EXISTS uniquetx ON tx (key);", - txMigrateCount: "SELECT COUNT(*) FROM txtmp;", - txPragmaVacuumOn: "PRAGMA auto_vacuum = FULL;", - txPragmaVacuumOff: "PRAGMA auto_vacuum = NONE;", - txVacuum: "VACUUM;", - txExistsShaStmt: "SELECT blockid FROM tx WHERE key = ?;", - txtmpExistsShaStmt: "SELECT blockid FROM txtmp WHERE key = ?;", + txtmpFetchLocUsedByShaStmt: "SELECT blockid, txoff, txlen, data FROM txtmp WHERE key = ?;", + txtmpUpdateUsedByShaStmt: "UPDATE txtmp SET data = ? WHERE key = ?;", + + txMigrateCopy: "INSERT INTO tx (key, blockid, txoff, txlen, data) SELECT key, blockid, txoff, txlen, data FROM txtmp;", + txMigrateClear: "DELETE from txtmp;", + txMigratePrep: "DROP index IF EXISTS uniquetx;", + txMigrateFinish: "CREATE UNIQUE INDEX IF NOT EXISTS uniquetx ON tx (key);", + txMigrateCount: "SELECT COUNT(*) FROM txtmp;", + txPragmaVacuumOn: "PRAGMA auto_vacuum = FULL;", + txPragmaVacuumOff: "PRAGMA auto_vacuum = NONE;", + txVacuum: "VACUUM;", + txExistsShaStmt: "SELECT blockid FROM tx WHERE key = ?;", + txtmpExistsShaStmt: "SELECT blockid FROM txtmp WHERE key = ?;", } var log seelog.LoggerInterface = seelog.Disabled @@ -269,6 +281,8 @@ func newOrCreateSqliteDB(filepath string, create bool) (pbdb btcdb.Db, err error bdb.blockCache.maxcount = 150 bdb.blockCache.blockMap = map[btcwire.ShaHash]*blockCacheObj{} + bdb.blockCache.blockMap = map[btcwire.ShaHash]*blockCacheObj{} + bdb.blockCache.blockHeightMap = map[int64]*blockCacheObj{} bdb.txCache.maxcount = 2000 bdb.txCache.txMap = map[btcwire.ShaHash]*txCacheObj{} @@ -325,6 +339,8 @@ func (db *SqliteDb) RollbackClose() { err := tx.tx.Rollback() if err != nil { log.Debugf("Rollback failed: %v", err) + } else { + tx.tx = nil } } db.close() @@ -477,8 +493,6 @@ func (db *SqliteDb) DropAfterBlockBySha(sha *btcwire.ShaHash) (err error) { db.dbLock.Lock() defer db.dbLock.Unlock() - db.InvalidateCache() - // This is a destructive operation and involves multiple requests // so requires a transaction, terminate any transaction to date // and start a new transaction @@ -491,6 +505,27 @@ func (db *SqliteDb) DropAfterBlockBySha(sha *btcwire.ShaHash) (err error) { return err } + var startheight int64 + + if db.lastBlkShaCached { + startheight = db.lastBlkIdx + } else { + querystr := "SELECT blockid FROM block ORDER BY blockid DESC;" + + tx := &db.txState + if tx.tx != nil { + row = tx.tx.QueryRow(querystr) + } else { + row = db.sqldb.QueryRow(querystr) + } + var startblkidx int64 + err = row.Scan(&startblkidx) + if err != nil { + log.Warnf("DropAfterBlockBySha:unable to fetch blockheight %v", err) + return err + } + startheight = startblkidx + } // also drop any cached sha data db.lastBlkShaCached = false @@ -499,22 +534,61 @@ func (db *SqliteDb) DropAfterBlockBySha(sha *btcwire.ShaHash) (err error) { tx := &db.txState row = tx.tx.QueryRow(querystr, sha.Bytes()) - var blockidx uint64 - err = row.Scan(&blockidx) + var keepidx int64 + err = row.Scan(&keepidx) if err != nil { // XXX db.endTx(false) return err } - _, err = tx.tx.Exec("DELETE FROM txtmp WHERE blockid > ?", blockidx) + for height := startheight; height > keepidx; height = height - 1 { + var blk *btcutil.Block + blkc, ok := db.fetchBlockHeightCache(height) + + if ok { + blk = blkc.blk + } else { + // must load the block from the db + sha, err = db.fetchBlockShaByHeight(height - 1) + if err != nil { + return + } + + var buf []byte + var pver uint32 + + buf, pver, _, err = db.fetchSha(*sha) + if err != nil { + return + } + + blk, err = btcutil.NewBlockFromBytes(buf, pver) + if err != nil { + return + } + } + + for _, tx := range blk.MsgBlock().Transactions { + err = db.unSpend(tx) + if err != nil { + return + } + } + } + + // invalidate the cache after possibly using cached entries for block + // lookup to unspend coins in them + db.InvalidateCache() + + _, err = tx.tx.Exec("DELETE FROM txtmp WHERE blockid > ?", keepidx) if err != nil { // XXX db.endTx(false) return err } - _, err = tx.tx.Exec("DELETE FROM tx WHERE blockid > ?", blockidx) + _, err = tx.tx.Exec("DELETE FROM tx WHERE blockid > ?", keepidx) if err != nil { // XXX db.endTx(false) @@ -522,7 +596,7 @@ func (db *SqliteDb) DropAfterBlockBySha(sha *btcwire.ShaHash) (err error) { } // delete from block last in case of foreign keys - _, err = tx.tx.Exec("DELETE FROM block WHERE blockid > ?", blockidx) + _, err = tx.tx.Exec("DELETE FROM block WHERE blockid > ?", keepidx) if err != nil { // XXX db.endTx(false) @@ -604,6 +678,9 @@ func (db *SqliteDb) InsertBlock(block *btcutil.Block) (height int64, err error) } spentbuflen := (len(tx.TxOut) + 7) / 8 spentbuf := make([]byte, spentbuflen, spentbuflen) + for i := uint(len(tx.TxOut) % 8); i < 8; i++ { + spentbuf[spentbuflen-1] |= (byte(1) << i) + } err = db.insertTx(&txsha, newheight, txloc[txidx].TxStart, txloc[txidx].TxLen, spentbuf) if err != nil { @@ -612,6 +689,12 @@ func (db *SqliteDb) InsertBlock(block *btcutil.Block) (height int64, err error) oBlkIdx, _, _, err = db.fetchLocationBySha(&txsha) log.Warnf("oblkidx %v err %v", oBlkIdx, err) + return + } + err = db.doSpend(tx) + if err != nil { + log.Warnf("block %v idx %v failed to spend tx %v %v err %v", &blocksha, newheight, &txsha, txidx, err) + return } } @@ -675,3 +758,100 @@ func (db *SqliteDb) SetDBInsertMode(newmode btcdb.InsertMode) { db.UseTempTX = false } } +func (db *SqliteDb) doSpend(tx *btcwire.MsgTx) error { + for txinidx := range tx.TxIn { + txin := tx.TxIn[txinidx] + + inTxSha := txin.PreviousOutpoint.Hash + inTxidx := txin.PreviousOutpoint.Index + + if inTxidx == ^uint32(0) { + continue + } + + //log.Infof("spending %v %v", &inTxSha, inTxidx) + + err := db.setSpentData(&inTxSha, inTxidx) + if err != nil { + return err + } + } + return nil +} + +func (db *SqliteDb) unSpend(tx *btcwire.MsgTx) error { + for txinidx := range tx.TxIn { + txin := tx.TxIn[txinidx] + + inTxSha := txin.PreviousOutpoint.Hash + inTxidx := txin.PreviousOutpoint.Index + + if inTxidx == ^uint32(0) { + continue + } + + err := db.clearSpentData(&inTxSha, inTxidx) + if err != nil { + return err + } + } + return nil +} + +func (db *SqliteDb) setSpentData(sha *btcwire.ShaHash, idx uint32) error { + return db.setclearSpentData(sha, idx, true) +} + +func (db *SqliteDb) clearSpentData(sha *btcwire.ShaHash, idx uint32) error { + return db.setclearSpentData(sha, idx, false) +} + +func (db *SqliteDb) setclearSpentData(txsha *btcwire.ShaHash, idx uint32, set bool) error { + var spentdata []byte + usingtmp := false + txop := db.txop(txFetchUsedByShaStmt) + row := txop.QueryRow(txsha.String()) + err := row.Scan(&spentdata) + if err != nil { + // if the error is simply didn't fine continue otherwise + // retun failure + + usingtmp = true + txop = db.txop(txtmpFetchUsedByShaStmt) + row := txop.QueryRow(txsha.String()) + err := row.Scan(&spentdata) + if err != nil { + log.Warnf("Failed to locate spent data - %v %v", txsha, err) + return err + } + } + byteidx := idx / 8 + byteoff := idx % 8 + + interestingsha := txsha.String() == "866504b0d6242c237e484b49c6a5db1c234e9a1547e96b7ba3d690cabccbf6b0" + + if interestingsha { + fmt.Printf("txdata was %v ", spentdata) + } + if set { + spentdata[byteidx] |= (byte(1) << byteoff) + } else { + spentdata[byteidx] &= ^(byte(1) << byteoff) + } + txc, cached := db.fetchTxCache(txsha) + if cached { + txc.spent = spentdata + } + if interestingsha { + fmt.Printf("now %v\n", spentdata) + } + + if usingtmp { + txop = db.txop(txtmpUpdateUsedByShaStmt) + } else { + txop = db.txop(txUpdateUsedByShaStmt) + } + _, err = txop.Exec(spentdata, txsha.String()) + + return err +} diff --git a/sqlite3/sqliteblock.go b/sqlite3/sqliteblock.go index b2c2b84e..5d9f5939 100644 --- a/sqlite3/sqliteblock.go +++ b/sqlite3/sqliteblock.go @@ -94,9 +94,6 @@ func (db *SqliteDb) insertBlockData(sha *btcwire.ShaHash, prevSha *btcwire.ShaHa func (db *SqliteDb) fetchSha(sha btcwire.ShaHash) (buf []byte, pver uint32, blkid int64, err error) { - db.dbLock.Lock() - defer db.dbLock.Unlock() - row := db.blkStmts[blkFetchSha].QueryRow(sha.Bytes()) var blockidx int64 @@ -154,10 +151,17 @@ func (db *SqliteDb) blkExistsSha(sha *btcwire.ShaHash) bool { // FetchBlockShaByHeight returns a block hash based on its height in the // block chain. func (db *SqliteDb) FetchBlockShaByHeight(height int64) (sha *btcwire.ShaHash, err error) { - var row *sql.Row db.dbLock.Lock() defer db.dbLock.Unlock() + return db.fetchBlockShaByHeight(height) +} + +// fetchBlockShaByHeight returns a block hash based on its height in the +// block chain. +func (db *SqliteDb) fetchBlockShaByHeight(height int64) (sha *btcwire.ShaHash, err error) { + var row *sql.Row + blockidx := height + 1 // skew between btc blockid and sql row = db.blkStmts[blkFetchIdx].QueryRow(blockidx) diff --git a/sqlite3/sqliteblock_test.go b/sqlite3/sqliteblock_test.go deleted file mode 100644 index 12bac76d..00000000 --- a/sqlite3/sqliteblock_test.go +++ /dev/null @@ -1,292 +0,0 @@ -// Copyright (c) 2013 Conformal Systems LLC. -// Use of this source code is governed by an ISC -// license that can be found in the LICENSE file. - -package sqlite3_test - -import ( - "bytes" - "fmt" - "github.com/conformal/btcdb" - "github.com/conformal/btcdb/sqlite3" - "github.com/conformal/btcwire" - "os" - "testing" -) - -// array of shas -var testShas []btcwire.ShaHash = []btcwire.ShaHash{ - { - 0x11, 0x11, 0x11, 0x11, 0x11, 0x11, 0x11, 0x11, - 0x11, 0x11, 0x11, 0x11, 0x11, 0x11, 0x11, 0x11, - 0x11, 0x11, 0x11, 0x11, 0x11, 0x11, 0x11, 0x11, - 0x11, 0x11, 0x11, 0x11, 0x11, 0x11, 0x11, 0x11, - }, - { - 0x22, 0x22, 0x22, 0x22, 0x22, 0x22, 0x22, 0x22, - 0x22, 0x22, 0x22, 0x22, 0x22, 0x22, 0x22, 0x22, - 0x22, 0x22, 0x22, 0x22, 0x22, 0x22, 0x22, 0x22, - 0x22, 0x22, 0x22, 0x22, 0x22, 0x22, 0x22, 0x22, - }, - { - 0x33, 0x33, 0x33, 0x33, 0x33, 0x33, 0x33, 0x33, - 0x33, 0x33, 0x33, 0x33, 0x33, 0x33, 0x33, 0x33, - 0x33, 0x33, 0x33, 0x33, 0x33, 0x33, 0x33, 0x33, - 0x33, 0x33, 0x33, 0x33, 0x33, 0x33, 0x33, 0x33, - }, - { - 0x44, 0x44, 0x44, 0x44, 0x44, 0x44, 0x44, 0x44, - 0x44, 0x44, 0x44, 0x44, 0x44, 0x44, 0x44, 0x44, - 0x44, 0x44, 0x44, 0x44, 0x44, 0x44, 0x44, 0x44, - 0x44, 0x44, 0x44, 0x44, 0x44, 0x44, 0x44, 0x44, - }, - { - 0x55, 0x55, 0x55, 0x55, 0x55, 0x55, 0x55, 0x55, - 0x55, 0x55, 0x55, 0x55, 0x55, 0x55, 0x55, 0x55, - 0x55, 0x55, 0x55, 0x55, 0x55, 0x55, 0x55, 0x55, - 0x55, 0x55, 0x55, 0x55, 0x55, 0x55, 0x55, 0x55, - }, -} - -// Work around stupid go vet bug where any non array should have named -// initializers. Since ShaHash is a glorified array it shouldn't matter. -var badShaArray = [32]byte{ - 0x99, 0x99, 0x99, 0x99, 0x99, 0x99, 0x99, 0x99, - 0x99, 0x99, 0x99, 0x99, 0x99, 0x99, 0x99, 0x99, - 0x99, 0x99, 0x99, 0x99, 0x99, 0x99, 0x99, 0x99, - 0x99, 0x99, 0x99, 0x99, 0x99, 0x99, 0x99, 0x99, -} -var badSha btcwire.ShaHash = btcwire.ShaHash(badShaArray) -var zeroSha = btcwire.ShaHash{} -var zeroBlock []byte = make([]byte, 32) - -func compareArray(t *testing.T, one, two []btcwire.ShaHash, test string, - sync string) { - if len(one) != len(two) { - t.Errorf("%s: lengths don't match for arrays (%s)", test, sync) - return - } - - for i := range one { - if !one[i].IsEqual(&two[i]) { - t.Errorf("%s: %dth sha doesn't match (%s)", test, i, - sync) - } - } -} - -func testNewestSha(t *testing.T, db btcdb.Db, expSha btcwire.ShaHash, - expBlk int64, situation string) { - - newestsha, blkid, err := db.NewestSha() - if err != nil { - t.Errorf("NewestSha failed %v (%s)", err, situation) - return - } - if blkid != expBlk { - t.Errorf("NewestSha blkid is %d not %d (%s)", blkid, expBlk, - situation) - } - if !newestsha.IsEqual(&expSha) { - t.Errorf("Newestsha isn't the last sha we inserted %v %v (%s)", - newestsha, &expSha, situation) - } -} - -type fetchIdxTest struct { - start int64 - end int64 - exp []btcwire.ShaHash - test string -} - -func testFetch(t *testing.T, db btcdb.Db, shas []btcwire.ShaHash, - sync string) { - - // Test the newest sha is what we expect and call it twice to ensure - // caching is working working properly. - numShas := int64(len(shas)) - newestSha := shas[numShas-1] - newestBlockID := int64(numShas) - testNewestSha(t, db, newestSha, newestBlockID, sync) - testNewestSha(t, db, newestSha, newestBlockID, sync+" cached") - - for i, sha := range shas { - // Add one for genesis block skew. - i = i + 1 - - // Ensure the sha exists in the db as expected. - if !db.ExistsSha(&sha) { - t.Errorf("testSha %d doesn't exists (%s)", i, sync) - break - } - - // Fetch the sha from the db and ensure all fields are expected - // values. - buf, pver, idx, err := sqlite3.FetchSha(db, &sha) - if err != nil { - t.Errorf("Failed to fetch testSha %d (%s)", i, sync) - } - if !bytes.Equal(zeroBlock, buf) { - t.Errorf("testSha %d incorrect block return (%s)", i, - sync) - } - if pver != 1 { - t.Errorf("pver is %d and not 1 for testSha %d (%s)", - pver, i, sync) - } - if idx != int64(i) { - t.Errorf("index isn't as expected %d vs %d (%s)", - idx, i, sync) - } - - // Fetch the sha by index and ensure it matches. - tsha, err := db.FetchBlockShaByHeight(int64(i)) - if err != nil { - t.Errorf("can't fetch sha at index %d: %v", i, err) - continue - } - if !tsha.IsEqual(&sha) { - t.Errorf("sha for index %d isn't shas[%d]", i, i) - } - } - - endBlockID := numShas + 1 - midBlockID := endBlockID / 2 - fetchIdxTests := []fetchIdxTest{ - // All shas. - {1, btcdb.AllShas, shas, "fetch all shas"}, - - //// All shas using known bounds. - {1, endBlockID, shas, "fetch all shas2"}, - - // Partial list starting at beginning. - {1, midBlockID, shas[:midBlockID-1], "fetch first half"}, - - // Partial list ending at end. - {midBlockID, endBlockID, shas[midBlockID-1 : endBlockID-1], - "fetch second half"}, - - // Nonexistent off the end. - {endBlockID, endBlockID * 2, []btcwire.ShaHash{}, - "fetch nonexistent"}, - } - - for _, test := range fetchIdxTests { - t.Logf("numSha: %d - Fetch from %d to %d\n", numShas, test.start, test.end) - if shalist, err := db.FetchHeightRange(test.start, test.end); err == nil { - compareArray(t, shalist, test.exp, test.test, sync) - } else { - t.Errorf("failed to fetch index range for %s (%s)", - test.test, sync) - } - } - - // Try and fetch nonexistent sha. - if db.ExistsSha(&badSha) { - t.Errorf("nonexistent sha exists (%s)!", sync) - } - _, _, _, err := sqlite3.FetchSha(db, &badSha) - if err == nil { - t.Errorf("Success when fetching a bad sha! (%s)", sync) - } - // XXX if not check to see it is the right value? - - testIterator(t, db, shas, sync) -} - -func testIterator(t *testing.T, db btcdb.Db, shas []btcwire.ShaHash, - sync string) { - - // Iterate over the whole list of shas. - iter, err := db.NewIterateBlocks() - if err != nil { - t.Errorf("failed to create iterated blocks") - return - } - - // Skip the genesis block. - _ = iter.NextRow() - - i := 0 - for ; iter.NextRow(); i++ { - key, pver, buf, err := iter.Row() - if err != nil { - t.Errorf("iter.NextRow() failed: %v (%s)", err, sync) - break - } - if i >= len(shas) { - t.Errorf("iterator returned more shas than "+ - "expected - %d (%s)", i, sync) - break - } - if !key.IsEqual(&shas[i]) { - t.Errorf("iterator test: %dth sha doesn't match (%s)", - i, sync) - } - if !bytes.Equal(zeroBlock, buf) { - t.Errorf("iterator test: %d buf incorrect (%s)", i, - sync) - } - if pver != 1 { - t.Errorf("iterator: %dth pver is %d and not 1 (%s)", - i, pver, sync) - } - } - if i < len(shas) { - t.Errorf("iterator got no rows on %dth loop, should have %d "+ - "(%s)", i, len(shas), sync) - } - if _, _, _, err = iter.Row(); err == nil { - t.Errorf("done iterator didn't return failure") - } - iter.Close() -} - -func TestBdb(t *testing.T) { - // Ignore db remove errors since it means we didn't have an old one. - _ = os.Remove("tstdb1") - db, err := btcdb.CreateDB("sqlite", "tstdb1") - if err != nil { - t.Errorf("Failed to open test database %v", err) - return - } - defer os.Remove("tstdb1") - - for i := range testShas { - var previous btcwire.ShaHash - if i == 0 { - previous = btcwire.GenesisHash - } else { - previous = testShas[i-1] - } - _, err := db.InsertBlockData(&testShas[i], &previous, 1, zeroBlock) - if err != nil { - t.Errorf("Failed to insert testSha %d. Error: %v", - i, err) - return - } - - testFetch(t, db, testShas[0:i+1], "pre sync ") - } - - // XXX insert enough so that we hit the transaction limit - // XXX try and insert a with a bad previous - - db.Sync() - - testFetch(t, db, testShas, "post sync") - - for i := len(testShas) - 1; i >= 0; i-- { - err := db.DropAfterBlockBySha(&testShas[i]) - if err != nil { - t.Errorf("drop after %d failed %v", i, err) - break - } - testFetch(t, db, testShas[:i+1], - fmt.Sprintf("post DropAfter for sha %d", i)) - } - - // Just tests that it doesn't crash, no return value - db.Close() -} diff --git a/sqlite3/sqlitedbcache.go b/sqlite3/sqlitedbcache.go index 45de6cee..3875f3a2 100644 --- a/sqlite3/sqlitedbcache.go +++ b/sqlite3/sqlitedbcache.go @@ -7,6 +7,7 @@ package sqlite3 import ( "bytes" "container/list" + "fmt" "github.com/conformal/btcdb" "github.com/conformal/btcutil" "github.com/conformal/btcwire" @@ -27,14 +28,17 @@ type txCacheObj struct { blksha btcwire.ShaHash pver uint32 tx *btcwire.MsgTx + height int64 + spent []byte txbuf []byte } type blockCache struct { - maxcount int - fifo list.List - blockMap map[btcwire.ShaHash]*blockCacheObj - cacheLock sync.RWMutex + maxcount int + fifo list.List + blockMap map[btcwire.ShaHash]*blockCacheObj + blockHeightMap map[int64]*blockCacheObj + cacheLock sync.RWMutex } type blockCacheObj struct { @@ -45,6 +49,9 @@ type blockCacheObj struct { // FetchBlockBySha - return a btcutil Block, object may be a cached. func (db *SqliteDb) FetchBlockBySha(sha *btcwire.ShaHash) (blk *btcutil.Block, err error) { + db.dbLock.Lock() + defer db.dbLock.Unlock() + blkcache, ok := db.fetchBlockCache(sha) if ok { return blkcache.blk, nil @@ -78,6 +85,19 @@ func (db *SqliteDb) fetchBlockCache(sha *btcwire.ShaHash) (*blockCacheObj, bool) return blkobj, true } +// fetchBlockHeightCache check if a block is in the block cache, if so return it. +func (db *SqliteDb) fetchBlockHeightCache(height int64) (*blockCacheObj, bool) { + + db.blockCache.cacheLock.RLock() + defer db.blockCache.cacheLock.RUnlock() + + blkobj, ok := db.blockCache.blockHeightMap[height] + if !ok { // could this just return the map deref? + return nil, false + } + return blkobj, true +} + // insertBlockCache insert the given sha/block into the cache map. // If the block cache is determined to be full, it will release // an old entry in FIFO order. @@ -96,65 +116,85 @@ func (db *SqliteDb) insertBlockCache(sha *btcwire.ShaHash, blk *btcutil.Block) { tailObj, ok := listobj.Value.(*blockCacheObj) if ok { delete(bc.blockMap, tailObj.sha) + delete(bc.blockHeightMap, tailObj.blk.Height()) } else { panic("invalid type pushed on blockCache list") } } + bc.blockHeightMap[blk.Height()] = &blkObj bc.blockMap[blkObj.sha] = &blkObj } -type TxListReply struct { - Sha *btcwire.ShaHash - Tx *btcwire.MsgTx - Err error -} - // FetchTxByShaList given a array of ShaHash, look up the transactions // and return them in a TxListReply array. func (db *SqliteDb) FetchTxByShaList(txShaList []*btcwire.ShaHash) []*btcdb.TxListReply { var replies []*btcdb.TxListReply for _, txsha := range txShaList { - tx, _, _, err := db.FetchTxBySha(txsha) - txlre := btcdb.TxListReply{Sha: txsha, Tx: tx, Err: err} + tx, _, _, _, height, txspent, err := db.fetchTxDataBySha(txsha) + btxspent := []bool{} + if err == nil { + btxspent = make([]bool, len(tx.TxOut), len(tx.TxOut)) + for idx := range tx.TxOut { + byteidx := idx / 8 + byteoff := uint(idx % 8) + btxspent[idx] = (txspent[byteidx] & (byte(1) << byteoff)) != 0 + } + } + interestingsha := txsha.String() == "866504b0d6242c237e484b49c6a5db1c234e9a1547e96b7ba3d690cabccbf6b" + if interestingsha { + fmt.Printf("usage of sha %v is %v\n", txsha, btxspent) + } + fmt.Printf("usage of sha %v is %v\n", txsha, btxspent) + txlre := btcdb.TxListReply{Sha: txsha, Tx: tx, Height: height, TxSpent: btxspent, Err: err} replies = append(replies, &txlre) } return replies } -// FetchTxAllBySha returns several pieces of data regarding the given sha. -func (db *SqliteDb) FetchTxAllBySha(txsha *btcwire.ShaHash) (rtx *btcwire.MsgTx, rtxbuf []byte, rpver uint32, rblksha *btcwire.ShaHash, err error) { +// fetchTxDataBySha returns several pieces of data regarding the given sha. +func (db *SqliteDb) fetchTxDataBySha(txsha *btcwire.ShaHash) (rtx *btcwire.MsgTx, rtxbuf []byte, rpver uint32, rblksha *btcwire.ShaHash, rheight int64, rtxspent []byte, err error) { + + var pver uint32 + var blksha *btcwire.ShaHash + var height int64 + var txspent []byte + var toff int + var tlen int + var blk *btcutil.Block + var blkbuf []byte // Check Tx cache if txc, ok := db.fetchTxCache(txsha); ok { - return txc.tx, txc.txbuf, txc.pver, &txc.blksha, nil + if txc.spent != nil { + return txc.tx, txc.txbuf, txc.pver, &txc.blksha, txc.height, txc.spent, nil + } } // If not cached load it - bidx, toff, tlen, err := db.FetchLocationBySha(txsha) + height, toff, tlen, txspent, err = db.fetchLocationUsedBySha(txsha) if err != nil { - log.Warnf("unable to find location of origin tx %v", txsha) return } - blksha, err := db.FetchBlockShaByHeight(bidx) + blksha, err = db.FetchBlockShaByHeight(height) if err != nil { - log.Warnf("block idx lookup %v to %v", bidx, err) + log.Warnf("block idx lookup %v to %v", height, err) return } log.Tracef("transaction %v is at block %v %v tx %v", - txsha, blksha, bidx, toff) + txsha, blksha, height, toff) - blk, err := db.FetchBlockBySha(blksha) + blk, err = db.FetchBlockBySha(blksha) if err != nil { log.Warnf("unable to fetch block %v %v ", - bidx, &blksha) + height, &blksha) return } - blkbuf, pver, err := blk.Bytes() + blkbuf, pver, err = blk.Bytes() if err != nil { - log.Warnf("unable to decode block %v %v", bidx, &blksha) + log.Warnf("unable to decode block %v %v", height, &blksha) return } @@ -166,7 +206,7 @@ func (db *SqliteDb) FetchTxAllBySha(txsha *btcwire.ShaHash) (rtx *btcwire.MsgTx, err = tx.BtcDecode(rbuf, pver) if err != nil { log.Warnf("unable to decode tx block %v %v txoff %v txlen %v", - bidx, &blksha, toff, tlen) + height, &blksha, toff, tlen) return } @@ -177,6 +217,76 @@ func (db *SqliteDb) FetchTxAllBySha(txsha *btcwire.ShaHash) (rtx *btcwire.MsgTx, txc.tx = &tx txc.txbuf = txbuf txc.pver = pver + txc.height = height + txc.spent = txspent + txc.blksha = *blksha + db.insertTxCache(&txc) + + return &tx, txbuf, pver, blksha, height, txspent, nil +} + +// FetchTxAllBySha returns several pieces of data regarding the given sha. +func (db *SqliteDb) FetchTxAllBySha(txsha *btcwire.ShaHash) (rtx *btcwire.MsgTx, rtxbuf []byte, rpver uint32, rblksha *btcwire.ShaHash, err error) { + var pver uint32 + var blksha *btcwire.ShaHash + var height int64 + var toff int + var tlen int + var blk *btcutil.Block + var blkbuf []byte + + // Check Tx cache + if txc, ok := db.fetchTxCache(txsha); ok { + return txc.tx, txc.txbuf, txc.pver, &txc.blksha, nil + } + + // If not cached load it + height, toff, tlen, err = db.FetchLocationBySha(txsha) + if err != nil { + return + } + + blksha, err = db.FetchBlockShaByHeight(height) + if err != nil { + log.Warnf("block idx lookup %v to %v", height, err) + return + } + log.Tracef("transaction %v is at block %v %v tx %v", + txsha, blksha, height, toff) + + blk, err = db.FetchBlockBySha(blksha) + if err != nil { + log.Warnf("unable to fetch block %v %v ", + height, &blksha) + return + } + + blkbuf, pver, err = blk.Bytes() + if err != nil { + log.Warnf("unable to decode block %v %v", height, &blksha) + return + } + + txbuf := make([]byte, tlen) + copy(txbuf[:], blkbuf[toff:toff+tlen]) + rbuf := bytes.NewBuffer(txbuf) + + var tx btcwire.MsgTx + err = tx.BtcDecode(rbuf, pver) + if err != nil { + log.Warnf("unable to decode tx block %v %v txoff %v txlen %v", + height, &blksha, toff, tlen) + return + } + + // Shove data into TxCache + // XXX - + var txc txCacheObj + txc.sha = *txsha + txc.tx = &tx + txc.txbuf = txbuf + txc.pver = pver + txc.height = height txc.blksha = *blksha db.insertTxCache(&txc) @@ -251,6 +361,7 @@ func (db *SqliteDb) InvalidateBlockCache() { bc.cacheLock.Lock() defer bc.cacheLock.Unlock() bc.blockMap = map[btcwire.ShaHash]*blockCacheObj{} + bc.blockHeightMap = map[int64]*blockCacheObj{} bc.fifo = list.List{} } diff --git a/sqlite3/sqlitetx.go b/sqlite3/sqlitetx.go index 1c05aa7e..24a8e4cc 100644 --- a/sqlite3/sqlitetx.go +++ b/sqlite3/sqlitetx.go @@ -12,16 +12,16 @@ import ( ) // InsertTx inserts a tx hash and its associated data into the database. -func (db *SqliteDb) InsertTx(txsha *btcwire.ShaHash, blockidx int64, txoff int, txlen int, usedbuf []byte) (err error) { +func (db *SqliteDb) InsertTx(txsha *btcwire.ShaHash, height int64, txoff int, txlen int, usedbuf []byte) (err error) { db.dbLock.Lock() defer db.dbLock.Unlock() - return db.insertTx(txsha, blockidx, txoff, txlen, usedbuf) + return db.insertTx(txsha, height, txoff, txlen, usedbuf) } // insertTx inserts a tx hash and its associated data into the database. // Must be called with db lock held. -func (db *SqliteDb) insertTx(txsha *btcwire.ShaHash, blockidx int64, txoff int, txlen int, usedbuf []byte) (err error) { +func (db *SqliteDb) insertTx(txsha *btcwire.ShaHash, height int64, txoff int, txlen int, usedbuf []byte) (err error) { tx := &db.txState if tx.tx == nil { @@ -30,7 +30,7 @@ func (db *SqliteDb) insertTx(txsha *btcwire.ShaHash, blockidx int64, txoff int, return } } - blockid := blockidx + 1 + blockid := height + 1 txd := tTxInsertData{txsha: txsha, blockid: blockid, txoff: txoff, txlen: txlen, usedbuf: usedbuf} log.Tracef("inserting tx %v for block %v off %v len %v", @@ -84,7 +84,6 @@ func (db *SqliteDb) ExistsTxSha(txsha *btcwire.ShaHash) (exists bool) { return db.existsTxSha(txsha) } - // existsTxSha returns if the given tx sha exists in the database.o // Must be called with the db lock held. func (db *SqliteDb) existsTxSha(txsha *btcwire.ShaHash) (exists bool) { @@ -126,7 +125,7 @@ func (db *SqliteDb) FetchLocationBySha(txsha *btcwire.ShaHash) (blockidx int64, // fetchLocationBySha look up the Tx sha information by name. // Must be called with db lock held. -func (db *SqliteDb) fetchLocationBySha(txsha *btcwire.ShaHash) (blockidx int64, txoff int, txlen int, err error) { +func (db *SqliteDb) fetchLocationBySha(txsha *btcwire.ShaHash) (height int64, txoff int, txlen int, err error) { var row *sql.Row var blockid int64 var ttxoff int @@ -156,12 +155,49 @@ func (db *SqliteDb) fetchLocationBySha(txsha *btcwire.ShaHash) (blockidx int64, log.Warnf("FetchLocationBySha: fail %v", err) return } - blockidx = blockid - 1 + height = blockid - 1 txoff = ttxoff txlen = ttxlen return } +// fetchLocationUsedBySha look up the Tx sha information by name. +// Must be called with db lock held. +func (db *SqliteDb) fetchLocationUsedBySha(txsha *btcwire.ShaHash) (rheight int64, rtxoff int, rtxlen int, rspentbuf []byte, err error) { + var row *sql.Row + var blockid int64 + var txoff int + var txlen int + var txspent []byte + + rowBytes := txsha.String() + txop := db.txop(txFetchLocUsedByShaStmt) + row = txop.QueryRow(rowBytes) + + err = row.Scan(&blockid, &txoff, &txlen, &txspent) + if err == sql.ErrNoRows { + txop = db.txop(txtmpFetchLocUsedByShaStmt) + row = txop.QueryRow(rowBytes) + + err = row.Scan(&blockid, &txoff, &txlen, &txspent) + if err == sql.ErrNoRows { + err = btcdb.TxShaMissing + return + } + if err != nil { + log.Warnf("txtmp FetchLocationBySha: fail %v", + err) + return + } + } + if err != nil { + log.Warnf("FetchLocationBySha: fail %v", err) + return + } + height := blockid - 1 + return height, txoff, txlen, txspent, nil +} + // FetchTxUsedBySha returns the used/spent buffer for a given transaction. func (db *SqliteDb) FetchTxUsedBySha(txsha *btcwire.ShaHash) (spentbuf []byte, err error) { var row *sql.Row