blockchain/indexers: Reduce duplication in cfindex.

The index will hold three types of entries for each filter type, block
pair: filter, header, and hash. Since they all have similar methods
and implementations, refactor to reduce duplication.
This commit is contained in:
Jim Posen 2018-01-31 23:38:10 -08:00 committed by Olaoluwa Osuntokun
parent 185577f4c2
commit d07fd2f333

View file

@ -14,7 +14,6 @@ import (
"github.com/btcsuite/btcutil"
"github.com/btcsuite/btcutil/gcs"
"github.com/btcsuite/btcutil/gcs/builder"
"github.com/btcsuite/fastsha256"
"github.com/roasbeef/btcd/wire"
)
@ -48,49 +47,21 @@ var (
maxFilterType = uint8(len(cfHeaderKeys) - 1)
)
// dbFetchFilter retrieves a block's basic or extended filter. A filter's
// absence is not considered an error.
func dbFetchFilter(dbTx database.Tx, key []byte, h *chainhash.Hash) ([]byte, error) {
// dbFetchFilterIdxEntry retrieves a data blob from the filter index database.
// An entry's absence is not considered an error.
func dbFetchFilterIdxEntry(dbTx database.Tx, key []byte, h *chainhash.Hash) ([]byte, error) {
idx := dbTx.Metadata().Bucket(cfIndexParentBucketKey).Bucket(key)
return idx.Get(h[:]), nil
}
// dbFetchFilterHeader retrieves a block's basic or extended filter header.
// A filter's absence is not considered an error.
func dbFetchFilterHeader(dbTx database.Tx, key []byte, h *chainhash.Hash) ([]byte, error) {
idx := dbTx.Metadata().Bucket(cfIndexParentBucketKey).Bucket(key)
fh := idx.Get(h[:])
if len(fh) != fastsha256.Size {
return nil, errors.New("invalid filter header length")
}
return fh, nil
}
// dbStoreFilter stores a block's basic or extended filter.
func dbStoreFilter(dbTx database.Tx, key []byte, h *chainhash.Hash, f []byte) error {
// dbStoreFilterIdxEntry stores a data blob in the filter index database.
func dbStoreFilterIdxEntry(dbTx database.Tx, key []byte, h *chainhash.Hash, f []byte) error {
idx := dbTx.Metadata().Bucket(cfIndexParentBucketKey).Bucket(key)
return idx.Put(h[:], f)
}
// dbStoreFilterHeader stores a block's basic or extended filter header.
func dbStoreFilterHeader(dbTx database.Tx, key []byte, h *chainhash.Hash, fh []byte) error {
if len(fh) != fastsha256.Size {
return errors.New("invalid filter header length")
}
idx := dbTx.Metadata().Bucket(cfIndexParentBucketKey).Bucket(key)
return idx.Put(h[:], fh)
}
// dbDeleteFilter deletes a filter's basic or extended filter.
func dbDeleteFilter(dbTx database.Tx, key []byte, h *chainhash.Hash) error {
idx := dbTx.Metadata().Bucket(cfIndexParentBucketKey).Bucket(key)
return idx.Delete(h[:])
}
// dbDeleteFilterHeader deletes a filter's basic or extended filter header.
func dbDeleteFilterHeader(dbTx database.Tx, key []byte, h *chainhash.Hash) error {
// dbDeleteFilterIdxEntry deletes a data blob from the filter index database.
func dbDeleteFilterIdxEntry(dbTx database.Tx, key []byte, h *chainhash.Hash) error {
idx := dbTx.Metadata().Bucket(cfIndexParentBucketKey).Bucket(key)
return idx.Delete(h[:])
}
@ -148,7 +119,7 @@ func (idx *CfIndex) Create(dbTx database.Tx) error {
}
firstHeader := make([]byte, chainhash.HashSize)
err = dbStoreFilterHeader(
err = dbStoreFilterIdxEntry(
dbTx,
cfHeaderKeys[wire.GCSFilterRegular],
&idx.chainParams.GenesisBlock.Header.PrevBlock,
@ -158,7 +129,7 @@ func (idx *CfIndex) Create(dbTx database.Tx) error {
return err
}
return dbStoreFilterHeader(
return dbStoreFilterIdxEntry(
dbTx,
cfHeaderKeys[wire.GCSFilterExtended],
&idx.chainParams.GenesisBlock.Header.PrevBlock,
@ -184,14 +155,14 @@ func storeFilter(dbTx database.Tx, block *btcutil.Block, f *gcs.Filter,
if f != nil {
basicFilterBytes = f.NBytes()
}
err := dbStoreFilter(dbTx, fkey, h, basicFilterBytes)
err = dbStoreFilterIdxEntry(dbTx, fkey, h, filterBytes)
if err != nil {
return err
}
// Then fetch the previous block's filter header.
ph := &block.MsgBlock().Header.PrevBlock
pfh, err := dbFetchFilterHeader(dbTx, hkey, ph)
pfh, err := dbFetchFilterIdxEntry(dbTx, hkey, ph)
if err != nil {
return err
}
@ -201,8 +172,11 @@ func storeFilter(dbTx database.Tx, block *btcutil.Block, f *gcs.Filter,
if err != nil {
return err
}
fh := builder.MakeHeaderForFilter(f, *prevHeader)
return dbStoreFilterHeader(dbTx, hkey, h, fh[:])
fh, err := builder.MakeHeaderForFilter(f, *prevHeader)
if err != nil {
return err
}
return dbStoreFilterIdxEntry(dbTx, hkey, h, fh[:])
}
// ConnectBlock is invoked by the index manager when a new block has been
@ -236,14 +210,14 @@ func (idx *CfIndex) DisconnectBlock(dbTx database.Tx, block *btcutil.Block,
view *blockchain.UtxoViewpoint) error {
for _, key := range cfIndexKeys {
err := dbDeleteFilter(dbTx, key, block.Hash())
err := dbDeleteFilterIdxEntry(dbTx, key, block.Hash())
if err != nil {
return err
}
}
for _, key := range cfHeaderKeys {
err := dbDeleteFilterHeader(dbTx, key, block.Hash())
err := dbDeleteFilterIdxEntry(dbTx, key, block.Hash())
if err != nil {
return err
}
@ -252,39 +226,37 @@ func (idx *CfIndex) DisconnectBlock(dbTx database.Tx, block *btcutil.Block,
return nil
}
// entryByBlockHash fetches a filter index entry of a particular type
// (eg. filter, filter header, etc) for a filter type and block hash.
func (idx *CfIndex) entryByBlockHash(filterTypeKeys [][]byte,
filterType wire.FilterType, h *chainhash.Hash) ([]byte, error) {
if uint8(filterType) > maxFilterType {
return nil, errors.New("unsupported filter type")
}
key := filterTypeKeys[filterType]
var entry []byte
err := idx.db.View(func(dbTx database.Tx) error {
var err error
entry, err = dbFetchFilterIdxEntry(dbTx, key, h)
return err
})
return entry, err
}
// FilterByBlockHash returns the serialized contents of a block's basic or
// extended committed filter.
func (idx *CfIndex) FilterByBlockHash(h *chainhash.Hash,
filterType wire.FilterType) ([]byte, error) {
var f []byte
err := idx.db.View(func(dbTx database.Tx) error {
if uint8(filterType) > maxFilterType {
return errors.New("unsupported filter type")
}
var err error
f, err = dbFetchFilter(dbTx, cfIndexKeys[filterType], h)
return err
})
return f, err
return idx.entryByBlockHash(cfIndexKeys, filterType, h)
}
// FilterHeaderByBlockHash returns the serialized contents of a block's basic
// or extended committed filter header.
func (idx *CfIndex) FilterHeaderByBlockHash(h *chainhash.Hash,
filterType wire.FilterType) ([]byte, error) {
var fh []byte
err := idx.db.View(func(dbTx database.Tx) error {
if uint8(filterType) > maxFilterType {
return errors.New("unsupported filter type")
}
var err error
fh, err = dbFetchFilterHeader(dbTx,
cfHeaderKeys[filterType], h)
return err
})
return fh, err
return idx.entryByBlockHash(cfHeaderKeys, filterType, h)
}
// NewCfIndex returns a new instance of an indexer that is used to create a