rosetta-lbry/indexer/indexer.go
Thomas Zarebczan 0581f6bcd3
LBRY changes
2020-12-08 01:29:16 -05:00

821 lines
22 KiB
Go

// Copyright 2020 Coinbase, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package indexer
import (
"context"
"errors"
"fmt"
"time"
"github.com/lbryio/rosetta-lbry/configuration"
"github.com/lbryio/rosetta-lbry/lbry"
"github.com/lbryio/rosetta-lbry/services"
"github.com/lbryio/rosetta-lbry/utils"
"github.com/coinbase/rosetta-sdk-go/asserter"
"github.com/coinbase/rosetta-sdk-go/storage/database"
storageErrs "github.com/coinbase/rosetta-sdk-go/storage/errors"
"github.com/coinbase/rosetta-sdk-go/storage/modules"
"github.com/coinbase/rosetta-sdk-go/syncer"
"github.com/coinbase/rosetta-sdk-go/types"
sdkUtils "github.com/coinbase/rosetta-sdk-go/utils"
"github.com/dgraph-io/badger/v2"
"github.com/dgraph-io/badger/v2/options"
)
const (
// indexPlaceholder is provided to the syncer
// to indicate we should both start from the
// last synced block and that we should sync
// blocks until exit (instead of stopping at
// a particular height).
indexPlaceholder = -1
retryDelay = 10 * time.Second
retryLimit = 5
nodeWaitSleep = 3 * time.Second
missingTransactionDelay = 200 * time.Millisecond
// sizeMultiplier is used to multiply the memory
// estimate for pre-fetching blocks. In other words,
// this is the estimated memory overhead for each
// block fetched by the indexer.
sizeMultiplier = 5
// zeroValue is 0 as a string
zeroValue = "0"
)
var (
errMissingTransaction = errors.New("missing transaction")
)
// Client is used by the indexer to sync blocks.
type Client interface {
NetworkStatus(context.Context) (*types.NetworkStatusResponse, error)
PruneBlockchain(context.Context, int64) (int64, error)
GetRawBlock(context.Context, *types.PartialBlockIdentifier) (*lbry.Block, []string, error)
ParseBlock(
context.Context,
*lbry.Block,
map[string]*types.AccountCoin,
) (*types.Block, error)
}
var _ syncer.Handler = (*Indexer)(nil)
var _ syncer.Helper = (*Indexer)(nil)
var _ services.Indexer = (*Indexer)(nil)
// Indexer caches blocks and provides balance query functionality.
type Indexer struct {
cancel context.CancelFunc
network *types.NetworkIdentifier
pruningConfig *configuration.PruningConfiguration
client Client
asserter *asserter.Asserter
database database.Database
blockStorage *modules.BlockStorage
balanceStorage *modules.BalanceStorage
coinStorage *modules.CoinStorage
workers []modules.BlockWorker
waiter *waitTable
}
// CloseDatabase closes a storage.Database. This should be called
// before exiting.
func (i *Indexer) CloseDatabase(ctx context.Context) {
logger := utils.ExtractLogger(ctx, "")
err := i.database.Close(ctx)
if err != nil {
logger.Fatalw("unable to close indexer database", "error", err)
}
logger.Infow("database closed successfully")
}
// defaultBadgerOptions returns a set of badger.Options optimized
// for running a Rosetta implementation.
func defaultBadgerOptions(
dir string,
) badger.Options {
opts := badger.DefaultOptions(dir)
// By default, we do not compress the table at all. Doing so can
// significantly increase memory usage.
opts.Compression = options.None
// Load tables into memory and memory map value logs.
opts.TableLoadingMode = options.MemoryMap
opts.ValueLogLoadingMode = options.MemoryMap
// Use an extended table size for larger commits.
opts.MaxTableSize = database.DefaultMaxTableSize
// Smaller value log sizes means smaller contiguous memory allocations
// and less RAM usage on cleanup.
opts.ValueLogFileSize = database.DefaultLogValueSize
// To allow writes at a faster speed, we create a new memtable as soon as
// an existing memtable is filled up. This option determines how many
// memtables should be kept in memory.
opts.NumMemtables = 1
// Don't keep multiple memtables in memory. With larger
// memtable size, this explodes memory usage.
opts.NumLevelZeroTables = 1
opts.NumLevelZeroTablesStall = 2
// This option will have a significant effect the memory. If the level is kept
// in-memory, read are faster but the tables will be kept in memory. By default,
// this is set to false.
opts.KeepL0InMemory = false
// We don't compact L0 on close as this can greatly delay shutdown time.
opts.CompactL0OnClose = false
// LoadBloomsOnOpen=false will improve the db startup speed. This is also
// a waste to enable with a limited index cache size (as many of the loaded bloom
// filters will be immediately discarded from the cache).
opts.LoadBloomsOnOpen = false
return opts
}
// Initialize returns a new Indexer.
func Initialize(
ctx context.Context,
cancel context.CancelFunc,
config *configuration.Configuration,
client Client,
) (*Indexer, error) {
localStore, err := database.NewBadgerDatabase(
ctx,
config.IndexerPath,
database.WithCompressorEntries(config.Compressors),
database.WithCustomSettings(defaultBadgerOptions(
config.IndexerPath,
)),
)
if err != nil {
return nil, fmt.Errorf("%w: unable to initialize storage", err)
}
blockStorage := modules.NewBlockStorage(localStore)
asserter, err := asserter.NewClientWithOptions(
config.Network,
config.GenesisBlockIdentifier,
lbry.OperationTypes,
lbry.OperationStatuses,
services.Errors,
nil,
)
if err != nil {
return nil, fmt.Errorf("%w: unable to initialize asserter", err)
}
i := &Indexer{
cancel: cancel,
network: config.Network,
pruningConfig: config.Pruning,
client: client,
database: localStore,
blockStorage: blockStorage,
waiter: newWaitTable(),
asserter: asserter,
}
coinStorage := modules.NewCoinStorage(
localStore,
&CoinStorageHelper{blockStorage},
asserter,
)
i.coinStorage = coinStorage
balanceStorage := modules.NewBalanceStorage(localStore)
balanceStorage.Initialize(
&BalanceStorageHelper{asserter},
&BalanceStorageHandler{},
)
i.balanceStorage = balanceStorage
i.workers = []modules.BlockWorker{coinStorage, balanceStorage}
return i, nil
}
// waitForNode returns once lbrycrdd is ready to serve
// block queries.
func (i *Indexer) waitForNode(ctx context.Context) error {
logger := utils.ExtractLogger(ctx, "indexer")
for {
_, err := i.client.NetworkStatus(ctx)
if err == nil {
return nil
}
logger.Infow("waiting for lbrycrdd...")
if err := sdkUtils.ContextSleep(ctx, nodeWaitSleep); err != nil {
return err
}
}
}
// Sync attempts to index lbry blocks using
// the lbry.Client until stopped.
func (i *Indexer) Sync(ctx context.Context) error {
if err := i.waitForNode(ctx); err != nil {
return fmt.Errorf("%w: failed to wait for node", err)
}
i.blockStorage.Initialize(i.workers)
startIndex := int64(indexPlaceholder)
head, err := i.blockStorage.GetHeadBlockIdentifier(ctx)
if err == nil {
startIndex = head.Index + 1
}
// Load in previous blocks into syncer cache to handle reorgs.
// If previously processed blocks exist in storage, they are fetched.
// Otherwise, none are provided to the cache (the syncer will not attempt
// a reorg if the cache is empty).
pastBlocks := i.blockStorage.CreateBlockCache(ctx, syncer.DefaultPastBlockLimit)
syncer := syncer.New(
i.network,
i,
i,
i.cancel,
syncer.WithCacheSize(syncer.DefaultCacheSize),
syncer.WithSizeMultiplier(sizeMultiplier),
syncer.WithPastBlocks(pastBlocks),
)
return syncer.Sync(ctx, startIndex, indexPlaceholder)
}
// Prune attempts to prune blocks in lbrycrdd every
// pruneFrequency.
func (i *Indexer) Prune(ctx context.Context) error {
logger := utils.ExtractLogger(ctx, "pruner")
tc := time.NewTicker(i.pruningConfig.Frequency)
defer tc.Stop()
for {
select {
case <-ctx.Done():
logger.Warnw("exiting pruner")
return ctx.Err()
case <-tc.C:
head, err := i.blockStorage.GetHeadBlockIdentifier(ctx)
if err != nil {
continue
}
// Must meet pruning conditions in lbry core
// Source:
// https://github.com/lbry/lbry/blob/a63a26f042134fa80356860c109edb25ac567552/src/rpc/blockchain.cpp#L953-L960
pruneHeight := head.Index - i.pruningConfig.Depth
if pruneHeight <= i.pruningConfig.MinHeight {
logger.Infow("waiting to prune", "min prune height", i.pruningConfig.MinHeight)
continue
}
logger.Infow("attempting to prune lbrycrdd", "prune height", pruneHeight)
prunedHeight, err := i.client.PruneBlockchain(ctx, pruneHeight)
if err != nil {
logger.Warnw(
"unable to prune lbrycrdd",
"prune height", pruneHeight,
"error", err,
)
} else {
logger.Infow("pruned lbrycrdd", "prune height", prunedHeight)
}
}
}
}
// BlockAdded is called by the syncer when a block is added.
func (i *Indexer) BlockAdded(ctx context.Context, block *types.Block) error {
logger := utils.ExtractLogger(ctx, "indexer")
err := i.blockStorage.AddBlock(ctx, block)
if err != nil {
return fmt.Errorf(
"%w: unable to add block to storage %s:%d",
err,
block.BlockIdentifier.Hash,
block.BlockIdentifier.Index,
)
}
ops := 0
// Close channels of all blocks waiting.
i.waiter.Lock()
for _, transaction := range block.Transactions {
ops += len(transaction.Operations)
txHash := transaction.TransactionIdentifier.Hash
val, ok := i.waiter.Get(txHash, false)
if !ok {
continue
}
if val.channelClosed {
logger.Debugw(
"channel already closed",
"hash", block.BlockIdentifier.Hash,
"index", block.BlockIdentifier.Index,
"channel", txHash,
)
continue
}
// Closing channel will cause all listeners to continue
val.channelClosed = true
close(val.channel)
}
// Look for all remaining waiting transactions associated
// with the next block that have not yet been closed. We should
// abort these waits as they will never be closed by a new transaction.
for txHash, val := range i.waiter.table {
if val.earliestBlock == block.BlockIdentifier.Index+1 && !val.channelClosed {
logger.Debugw(
"aborting channel",
"hash", block.BlockIdentifier.Hash,
"index", block.BlockIdentifier.Index,
"channel", txHash,
)
val.channelClosed = true
val.aborted = true
close(val.channel)
}
}
i.waiter.Unlock()
logger.Debugw(
"block added",
"hash", block.BlockIdentifier.Hash,
"index", block.BlockIdentifier.Index,
"transactions", len(block.Transactions),
"ops", ops,
)
return nil
}
// BlockRemoved is called by the syncer when a block is removed.
func (i *Indexer) BlockRemoved(
ctx context.Context,
blockIdentifier *types.BlockIdentifier,
) error {
logger := utils.ExtractLogger(ctx, "indexer")
logger.Debugw(
"block removed",
"hash", blockIdentifier.Hash,
"index", blockIdentifier.Index,
)
err := i.blockStorage.RemoveBlock(ctx, blockIdentifier)
if err != nil {
return fmt.Errorf(
"%w: unable to remove block from storage %s:%d",
err,
blockIdentifier.Hash,
blockIdentifier.Index,
)
}
return nil
}
// NetworkStatus is called by the syncer to get the current
// network status.
func (i *Indexer) NetworkStatus(
ctx context.Context,
network *types.NetworkIdentifier,
) (*types.NetworkStatusResponse, error) {
return i.client.NetworkStatus(ctx)
}
func (i *Indexer) findCoin(
ctx context.Context,
btcBlock *lbry.Block,
coinIdentifier string,
) (*types.Coin, *types.AccountIdentifier, error) {
for ctx.Err() == nil {
databaseTransaction := i.database.ReadTransaction(ctx)
defer databaseTransaction.Discard(ctx)
coinHeadBlock, err := i.blockStorage.GetHeadBlockIdentifierTransactional(
ctx,
databaseTransaction,
)
if errors.Is(err, storageErrs.ErrHeadBlockNotFound) {
if err := sdkUtils.ContextSleep(ctx, missingTransactionDelay); err != nil {
return nil, nil, err
}
continue
}
if err != nil {
return nil, nil, fmt.Errorf(
"%w: unable to get transactional head block identifier",
err,
)
}
// Attempt to find coin
coin, owner, err := i.coinStorage.GetCoinTransactional(
ctx,
databaseTransaction,
&types.CoinIdentifier{
Identifier: coinIdentifier,
},
)
if err == nil {
return coin, owner, nil
}
if !errors.Is(err, storageErrs.ErrCoinNotFound) {
return nil, nil, fmt.Errorf("%w: unable to lookup coin %s", err, coinIdentifier)
}
// Locking here prevents us from adding sending any done
// signals while we are determining whether or not to add
// to the WaitTable.
i.waiter.Lock()
// Check to see if head block has increased since
// we created our databaseTransaction.
currHeadBlock, err := i.blockStorage.GetHeadBlockIdentifier(ctx)
if err != nil {
return nil, nil, fmt.Errorf("%w: unable to get head block identifier", err)
}
// If the block has changed, we try to look up the transaction
// again.
if types.Hash(currHeadBlock) != types.Hash(coinHeadBlock) {
i.waiter.Unlock()
continue
}
// Put Transaction in WaitTable if doesn't already exist (could be
// multiple listeners)
transactionHash := lbry.TransactionHash(coinIdentifier)
val, ok := i.waiter.Get(transactionHash, false)
if !ok {
val = &waitTableEntry{
channel: make(chan struct{}),
earliestBlock: btcBlock.Height,
}
}
if val.earliestBlock > btcBlock.Height {
val.earliestBlock = btcBlock.Height
}
val.listeners++
i.waiter.Set(transactionHash, val, false)
i.waiter.Unlock()
return nil, nil, errMissingTransaction
}
return nil, nil, ctx.Err()
}
func (i *Indexer) checkHeaderMatch(
ctx context.Context,
btcBlock *lbry.Block,
) error {
headBlock, err := i.blockStorage.GetHeadBlockIdentifier(ctx)
if err != nil && !errors.Is(err, storageErrs.ErrHeadBlockNotFound) {
return fmt.Errorf("%w: unable to lookup head block", err)
}
// If block we are trying to process is next but it is not connected, we
// should return syncer.ErrOrphanHead to manually trigger a reorg.
if headBlock != nil &&
btcBlock.Height == headBlock.Index+1 &&
btcBlock.PreviousBlockHash != headBlock.Hash {
return syncer.ErrOrphanHead
}
return nil
}
func (i *Indexer) findCoins(
ctx context.Context,
btcBlock *lbry.Block,
coins []string,
) (map[string]*types.AccountCoin, error) {
if err := i.checkHeaderMatch(ctx, btcBlock); err != nil {
return nil, fmt.Errorf("%w: check header match failed", err)
}
coinMap := map[string]*types.AccountCoin{}
remainingCoins := []string{}
for _, coinIdentifier := range coins {
coin, owner, err := i.findCoin(
ctx,
btcBlock,
coinIdentifier,
)
if err == nil {
coinMap[coinIdentifier] = &types.AccountCoin{
Account: owner,
Coin: coin,
}
continue
}
if errors.Is(err, errMissingTransaction) {
remainingCoins = append(remainingCoins, coinIdentifier)
continue
}
return nil, fmt.Errorf("%w: unable to find coin %s", err, coinIdentifier)
}
if len(remainingCoins) == 0 {
return coinMap, nil
}
// Wait for remaining transactions
shouldAbort := false
for _, coinIdentifier := range remainingCoins {
// Wait on Channel
txHash := lbry.TransactionHash(coinIdentifier)
entry, ok := i.waiter.Get(txHash, true)
if !ok {
return nil, fmt.Errorf("transaction %s not in waiter", txHash)
}
select {
case <-entry.channel:
case <-ctx.Done():
return nil, ctx.Err()
}
// Delete Transaction from WaitTable if last listener
i.waiter.Lock()
val, ok := i.waiter.Get(txHash, false)
if !ok {
return nil, fmt.Errorf("transaction %s not in waiter", txHash)
}
// Don't exit right away to make sure
// we remove all closed entries from the
// waiter.
if val.aborted {
shouldAbort = true
}
val.listeners--
if val.listeners == 0 {
i.waiter.Delete(txHash, false)
} else {
i.waiter.Set(txHash, val, false)
}
i.waiter.Unlock()
}
// Wait to exit until we have decremented our listeners
if shouldAbort {
return nil, syncer.ErrOrphanHead
}
// In the case of a reorg, we may still not be able to find
// the transactions. So, we need to repeat this same process
// recursively until we find the transactions we are looking for.
foundCoins, err := i.findCoins(ctx, btcBlock, remainingCoins)
if err != nil {
return nil, fmt.Errorf("%w: unable to get remaining transactions", err)
}
for k, v := range foundCoins {
coinMap[k] = v
}
return coinMap, nil
}
// Block is called by the syncer to fetch a block.
func (i *Indexer) Block(
ctx context.Context,
network *types.NetworkIdentifier,
blockIdentifier *types.PartialBlockIdentifier,
) (*types.Block, error) {
// get raw block
var btcBlock *lbry.Block
var coins []string
var err error
retries := 0
for ctx.Err() == nil {
btcBlock, coins, err = i.client.GetRawBlock(ctx, blockIdentifier)
if err == nil {
break
}
retries++
if retries > retryLimit {
return nil, fmt.Errorf("%w: unable to get raw block %+v", err, blockIdentifier)
}
if err := sdkUtils.ContextSleep(ctx, retryDelay); err != nil {
return nil, err
}
}
// determine which coins must be fetched and get from coin storage
coinMap, err := i.findCoins(ctx, btcBlock, coins)
if err != nil {
return nil, fmt.Errorf("%w: unable to find input transactions", err)
}
// provide to block parsing
block, err := i.client.ParseBlock(ctx, btcBlock, coinMap)
if err != nil {
return nil, fmt.Errorf("%w: unable to parse block %+v", err, blockIdentifier)
}
// ensure block is valid
if err := i.asserter.Block(block); err != nil {
return nil, fmt.Errorf("%w: block is not valid %+v", err, blockIdentifier)
}
return block, nil
}
// GetScriptPubKeys gets the ScriptPubKey for
// a collection of *types.CoinIdentifier. It also
// confirms that the amount provided with each coin
// is valid.
func (i *Indexer) GetScriptPubKeys(
ctx context.Context,
coins []*types.Coin,
) ([]*lbry.ScriptPubKey, error) {
databaseTransaction := i.database.ReadTransaction(ctx)
defer databaseTransaction.Discard(ctx)
scripts := make([]*lbry.ScriptPubKey, len(coins))
for j, coin := range coins {
coinIdentifier := coin.CoinIdentifier
transactionHash, networkIndex, err := lbry.ParseCoinIdentifier(coinIdentifier)
if err != nil {
return nil, fmt.Errorf("%w: unable to parse coin identifier", err)
}
_, transaction, err := i.blockStorage.FindTransaction(
ctx,
&types.TransactionIdentifier{Hash: transactionHash.String()},
databaseTransaction,
)
if err != nil || transaction == nil {
return nil, fmt.Errorf(
"%w: unable to find transaction %s",
err,
transactionHash.String(),
)
}
for _, op := range transaction.Operations {
if op.Type != lbry.OutputOpType {
continue
}
if *op.OperationIdentifier.NetworkIndex != int64(networkIndex) {
continue
}
var opMetadata lbry.OperationMetadata
if err := types.UnmarshalMap(op.Metadata, &opMetadata); err != nil {
return nil, fmt.Errorf(
"%w: unable to unmarshal operation metadata %+v",
err,
op.Metadata,
)
}
if types.Hash(op.Amount.Currency) != types.Hash(coin.Amount.Currency) {
return nil, fmt.Errorf(
"currency expected %s does not match coin %s",
types.PrintStruct(coin.Amount.Currency),
types.PrintStruct(op.Amount.Currency),
)
}
addition, err := types.AddValues(op.Amount.Value, coin.Amount.Value)
if err != nil {
return nil, fmt.Errorf("%w: unable to add op amount and coin amount", err)
}
if addition != "0" {
return nil, fmt.Errorf(
"coin amount does not match expected with difference %s",
addition,
)
}
scripts[j] = opMetadata.ScriptPubKey
break
}
if scripts[j] == nil {
return nil, fmt.Errorf("unable to find script for coin %s", coinIdentifier.Identifier)
}
}
return scripts, nil
}
// GetBlockLazy returns a *types.BlockResponse from the indexer's block storage.
// All transactions in a block must be fetched individually.
func (i *Indexer) GetBlockLazy(
ctx context.Context,
blockIdentifier *types.PartialBlockIdentifier,
) (*types.BlockResponse, error) {
return i.blockStorage.GetBlockLazy(ctx, blockIdentifier)
}
// GetBlockTransaction returns a *types.Transaction if it is in the provided
// *types.BlockIdentifier.
func (i *Indexer) GetBlockTransaction(
ctx context.Context,
blockIdentifier *types.BlockIdentifier,
transactionIdentifier *types.TransactionIdentifier,
) (*types.Transaction, error) {
return i.blockStorage.GetBlockTransaction(
ctx,
blockIdentifier,
transactionIdentifier,
)
}
// GetCoins returns all unspent coins for a particular *types.AccountIdentifier.
func (i *Indexer) GetCoins(
ctx context.Context,
accountIdentifier *types.AccountIdentifier,
) ([]*types.Coin, *types.BlockIdentifier, error) {
return i.coinStorage.GetCoins(ctx, accountIdentifier)
}
// GetBalance returns the balance of an account
// at a particular *types.PartialBlockIdentifier.
func (i *Indexer) GetBalance(
ctx context.Context,
accountIdentifier *types.AccountIdentifier,
currency *types.Currency,
blockIdentifier *types.PartialBlockIdentifier,
) (*types.Amount, *types.BlockIdentifier, error) {
dbTx := i.database.ReadTransaction(ctx)
defer dbTx.Discard(ctx)
blockResponse, err := i.blockStorage.GetBlockLazyTransactional(
ctx,
blockIdentifier,
dbTx,
)
if err != nil {
return nil, nil, err
}
amount, err := i.balanceStorage.GetBalanceTransactional(
ctx,
dbTx,
accountIdentifier,
currency,
blockResponse.Block.BlockIdentifier.Index,
)
if errors.Is(err, storageErrs.ErrAccountMissing) {
return &types.Amount{
Value: zeroValue,
Currency: currency,
}, blockResponse.Block.BlockIdentifier, nil
}
if err != nil {
return nil, nil, err
}
return amount, blockResponse.Block.BlockIdentifier, nil
}