821 lines
22 KiB
Go
821 lines
22 KiB
Go
// Copyright 2020 Coinbase, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package indexer
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/lbryio/rosetta-lbry/configuration"
|
|
"github.com/lbryio/rosetta-lbry/lbry"
|
|
"github.com/lbryio/rosetta-lbry/services"
|
|
"github.com/lbryio/rosetta-lbry/utils"
|
|
|
|
"github.com/coinbase/rosetta-sdk-go/asserter"
|
|
"github.com/coinbase/rosetta-sdk-go/storage/database"
|
|
storageErrs "github.com/coinbase/rosetta-sdk-go/storage/errors"
|
|
"github.com/coinbase/rosetta-sdk-go/storage/modules"
|
|
"github.com/coinbase/rosetta-sdk-go/syncer"
|
|
"github.com/coinbase/rosetta-sdk-go/types"
|
|
sdkUtils "github.com/coinbase/rosetta-sdk-go/utils"
|
|
"github.com/dgraph-io/badger/v2"
|
|
"github.com/dgraph-io/badger/v2/options"
|
|
)
|
|
|
|
const (
|
|
// indexPlaceholder is provided to the syncer
|
|
// to indicate we should both start from the
|
|
// last synced block and that we should sync
|
|
// blocks until exit (instead of stopping at
|
|
// a particular height).
|
|
indexPlaceholder = -1
|
|
|
|
retryDelay = 10 * time.Second
|
|
retryLimit = 5
|
|
|
|
nodeWaitSleep = 3 * time.Second
|
|
missingTransactionDelay = 200 * time.Millisecond
|
|
|
|
// sizeMultiplier is used to multiply the memory
|
|
// estimate for pre-fetching blocks. In other words,
|
|
// this is the estimated memory overhead for each
|
|
// block fetched by the indexer.
|
|
sizeMultiplier = 5
|
|
|
|
// zeroValue is 0 as a string
|
|
zeroValue = "0"
|
|
)
|
|
|
|
var (
|
|
errMissingTransaction = errors.New("missing transaction")
|
|
)
|
|
|
|
// Client is used by the indexer to sync blocks.
|
|
type Client interface {
|
|
NetworkStatus(context.Context) (*types.NetworkStatusResponse, error)
|
|
PruneBlockchain(context.Context, int64) (int64, error)
|
|
GetRawBlock(context.Context, *types.PartialBlockIdentifier) (*lbry.Block, []string, error)
|
|
ParseBlock(
|
|
context.Context,
|
|
*lbry.Block,
|
|
map[string]*types.AccountCoin,
|
|
) (*types.Block, error)
|
|
}
|
|
|
|
var _ syncer.Handler = (*Indexer)(nil)
|
|
var _ syncer.Helper = (*Indexer)(nil)
|
|
var _ services.Indexer = (*Indexer)(nil)
|
|
|
|
// Indexer caches blocks and provides balance query functionality.
|
|
type Indexer struct {
|
|
cancel context.CancelFunc
|
|
|
|
network *types.NetworkIdentifier
|
|
pruningConfig *configuration.PruningConfiguration
|
|
|
|
client Client
|
|
|
|
asserter *asserter.Asserter
|
|
database database.Database
|
|
blockStorage *modules.BlockStorage
|
|
balanceStorage *modules.BalanceStorage
|
|
coinStorage *modules.CoinStorage
|
|
workers []modules.BlockWorker
|
|
|
|
waiter *waitTable
|
|
}
|
|
|
|
// CloseDatabase closes a storage.Database. This should be called
|
|
// before exiting.
|
|
func (i *Indexer) CloseDatabase(ctx context.Context) {
|
|
logger := utils.ExtractLogger(ctx, "")
|
|
err := i.database.Close(ctx)
|
|
if err != nil {
|
|
logger.Fatalw("unable to close indexer database", "error", err)
|
|
}
|
|
|
|
logger.Infow("database closed successfully")
|
|
}
|
|
|
|
// defaultBadgerOptions returns a set of badger.Options optimized
|
|
// for running a Rosetta implementation.
|
|
func defaultBadgerOptions(
|
|
dir string,
|
|
) badger.Options {
|
|
opts := badger.DefaultOptions(dir)
|
|
|
|
// By default, we do not compress the table at all. Doing so can
|
|
// significantly increase memory usage.
|
|
opts.Compression = options.None
|
|
|
|
// Load tables into memory and memory map value logs.
|
|
opts.TableLoadingMode = options.MemoryMap
|
|
opts.ValueLogLoadingMode = options.MemoryMap
|
|
|
|
// Use an extended table size for larger commits.
|
|
opts.MaxTableSize = database.DefaultMaxTableSize
|
|
|
|
// Smaller value log sizes means smaller contiguous memory allocations
|
|
// and less RAM usage on cleanup.
|
|
opts.ValueLogFileSize = database.DefaultLogValueSize
|
|
|
|
// To allow writes at a faster speed, we create a new memtable as soon as
|
|
// an existing memtable is filled up. This option determines how many
|
|
// memtables should be kept in memory.
|
|
opts.NumMemtables = 1
|
|
|
|
// Don't keep multiple memtables in memory. With larger
|
|
// memtable size, this explodes memory usage.
|
|
opts.NumLevelZeroTables = 1
|
|
opts.NumLevelZeroTablesStall = 2
|
|
|
|
// This option will have a significant effect the memory. If the level is kept
|
|
// in-memory, read are faster but the tables will be kept in memory. By default,
|
|
// this is set to false.
|
|
opts.KeepL0InMemory = false
|
|
|
|
// We don't compact L0 on close as this can greatly delay shutdown time.
|
|
opts.CompactL0OnClose = false
|
|
|
|
// LoadBloomsOnOpen=false will improve the db startup speed. This is also
|
|
// a waste to enable with a limited index cache size (as many of the loaded bloom
|
|
// filters will be immediately discarded from the cache).
|
|
opts.LoadBloomsOnOpen = false
|
|
|
|
return opts
|
|
}
|
|
|
|
// Initialize returns a new Indexer.
|
|
func Initialize(
|
|
ctx context.Context,
|
|
cancel context.CancelFunc,
|
|
config *configuration.Configuration,
|
|
client Client,
|
|
) (*Indexer, error) {
|
|
localStore, err := database.NewBadgerDatabase(
|
|
ctx,
|
|
config.IndexerPath,
|
|
database.WithCompressorEntries(config.Compressors),
|
|
database.WithCustomSettings(defaultBadgerOptions(
|
|
config.IndexerPath,
|
|
)),
|
|
)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("%w: unable to initialize storage", err)
|
|
}
|
|
|
|
blockStorage := modules.NewBlockStorage(localStore)
|
|
asserter, err := asserter.NewClientWithOptions(
|
|
config.Network,
|
|
config.GenesisBlockIdentifier,
|
|
lbry.OperationTypes,
|
|
lbry.OperationStatuses,
|
|
services.Errors,
|
|
nil,
|
|
)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("%w: unable to initialize asserter", err)
|
|
}
|
|
|
|
i := &Indexer{
|
|
cancel: cancel,
|
|
network: config.Network,
|
|
pruningConfig: config.Pruning,
|
|
client: client,
|
|
database: localStore,
|
|
blockStorage: blockStorage,
|
|
waiter: newWaitTable(),
|
|
asserter: asserter,
|
|
}
|
|
|
|
coinStorage := modules.NewCoinStorage(
|
|
localStore,
|
|
&CoinStorageHelper{blockStorage},
|
|
asserter,
|
|
)
|
|
i.coinStorage = coinStorage
|
|
|
|
balanceStorage := modules.NewBalanceStorage(localStore)
|
|
balanceStorage.Initialize(
|
|
&BalanceStorageHelper{asserter},
|
|
&BalanceStorageHandler{},
|
|
)
|
|
i.balanceStorage = balanceStorage
|
|
|
|
i.workers = []modules.BlockWorker{coinStorage, balanceStorage}
|
|
|
|
return i, nil
|
|
}
|
|
|
|
// waitForNode returns once lbrycrdd is ready to serve
|
|
// block queries.
|
|
func (i *Indexer) waitForNode(ctx context.Context) error {
|
|
logger := utils.ExtractLogger(ctx, "indexer")
|
|
for {
|
|
_, err := i.client.NetworkStatus(ctx)
|
|
if err == nil {
|
|
return nil
|
|
}
|
|
|
|
logger.Infow("waiting for lbrycrdd...")
|
|
if err := sdkUtils.ContextSleep(ctx, nodeWaitSleep); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
// Sync attempts to index lbry blocks using
|
|
// the lbry.Client until stopped.
|
|
func (i *Indexer) Sync(ctx context.Context) error {
|
|
if err := i.waitForNode(ctx); err != nil {
|
|
return fmt.Errorf("%w: failed to wait for node", err)
|
|
}
|
|
|
|
i.blockStorage.Initialize(i.workers)
|
|
|
|
startIndex := int64(indexPlaceholder)
|
|
head, err := i.blockStorage.GetHeadBlockIdentifier(ctx)
|
|
if err == nil {
|
|
startIndex = head.Index + 1
|
|
}
|
|
|
|
// Load in previous blocks into syncer cache to handle reorgs.
|
|
// If previously processed blocks exist in storage, they are fetched.
|
|
// Otherwise, none are provided to the cache (the syncer will not attempt
|
|
// a reorg if the cache is empty).
|
|
pastBlocks := i.blockStorage.CreateBlockCache(ctx, syncer.DefaultPastBlockLimit)
|
|
|
|
syncer := syncer.New(
|
|
i.network,
|
|
i,
|
|
i,
|
|
i.cancel,
|
|
syncer.WithCacheSize(syncer.DefaultCacheSize),
|
|
syncer.WithSizeMultiplier(sizeMultiplier),
|
|
syncer.WithPastBlocks(pastBlocks),
|
|
)
|
|
|
|
return syncer.Sync(ctx, startIndex, indexPlaceholder)
|
|
}
|
|
|
|
// Prune attempts to prune blocks in lbrycrdd every
|
|
// pruneFrequency.
|
|
func (i *Indexer) Prune(ctx context.Context) error {
|
|
logger := utils.ExtractLogger(ctx, "pruner")
|
|
|
|
tc := time.NewTicker(i.pruningConfig.Frequency)
|
|
defer tc.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
logger.Warnw("exiting pruner")
|
|
return ctx.Err()
|
|
case <-tc.C:
|
|
head, err := i.blockStorage.GetHeadBlockIdentifier(ctx)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
|
|
// Must meet pruning conditions in lbry core
|
|
// Source:
|
|
// https://github.com/lbry/lbry/blob/a63a26f042134fa80356860c109edb25ac567552/src/rpc/blockchain.cpp#L953-L960
|
|
pruneHeight := head.Index - i.pruningConfig.Depth
|
|
if pruneHeight <= i.pruningConfig.MinHeight {
|
|
logger.Infow("waiting to prune", "min prune height", i.pruningConfig.MinHeight)
|
|
continue
|
|
}
|
|
|
|
logger.Infow("attempting to prune lbrycrdd", "prune height", pruneHeight)
|
|
prunedHeight, err := i.client.PruneBlockchain(ctx, pruneHeight)
|
|
if err != nil {
|
|
logger.Warnw(
|
|
"unable to prune lbrycrdd",
|
|
"prune height", pruneHeight,
|
|
"error", err,
|
|
)
|
|
} else {
|
|
logger.Infow("pruned lbrycrdd", "prune height", prunedHeight)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// BlockAdded is called by the syncer when a block is added.
|
|
func (i *Indexer) BlockAdded(ctx context.Context, block *types.Block) error {
|
|
logger := utils.ExtractLogger(ctx, "indexer")
|
|
|
|
err := i.blockStorage.AddBlock(ctx, block)
|
|
if err != nil {
|
|
return fmt.Errorf(
|
|
"%w: unable to add block to storage %s:%d",
|
|
err,
|
|
block.BlockIdentifier.Hash,
|
|
block.BlockIdentifier.Index,
|
|
)
|
|
}
|
|
|
|
ops := 0
|
|
|
|
// Close channels of all blocks waiting.
|
|
i.waiter.Lock()
|
|
for _, transaction := range block.Transactions {
|
|
ops += len(transaction.Operations)
|
|
txHash := transaction.TransactionIdentifier.Hash
|
|
val, ok := i.waiter.Get(txHash, false)
|
|
if !ok {
|
|
continue
|
|
}
|
|
|
|
if val.channelClosed {
|
|
logger.Debugw(
|
|
"channel already closed",
|
|
"hash", block.BlockIdentifier.Hash,
|
|
"index", block.BlockIdentifier.Index,
|
|
"channel", txHash,
|
|
)
|
|
continue
|
|
}
|
|
|
|
// Closing channel will cause all listeners to continue
|
|
val.channelClosed = true
|
|
close(val.channel)
|
|
}
|
|
|
|
// Look for all remaining waiting transactions associated
|
|
// with the next block that have not yet been closed. We should
|
|
// abort these waits as they will never be closed by a new transaction.
|
|
for txHash, val := range i.waiter.table {
|
|
if val.earliestBlock == block.BlockIdentifier.Index+1 && !val.channelClosed {
|
|
logger.Debugw(
|
|
"aborting channel",
|
|
"hash", block.BlockIdentifier.Hash,
|
|
"index", block.BlockIdentifier.Index,
|
|
"channel", txHash,
|
|
)
|
|
val.channelClosed = true
|
|
val.aborted = true
|
|
close(val.channel)
|
|
}
|
|
}
|
|
i.waiter.Unlock()
|
|
|
|
logger.Debugw(
|
|
"block added",
|
|
"hash", block.BlockIdentifier.Hash,
|
|
"index", block.BlockIdentifier.Index,
|
|
"transactions", len(block.Transactions),
|
|
"ops", ops,
|
|
)
|
|
|
|
return nil
|
|
}
|
|
|
|
// BlockRemoved is called by the syncer when a block is removed.
|
|
func (i *Indexer) BlockRemoved(
|
|
ctx context.Context,
|
|
blockIdentifier *types.BlockIdentifier,
|
|
) error {
|
|
logger := utils.ExtractLogger(ctx, "indexer")
|
|
logger.Debugw(
|
|
"block removed",
|
|
"hash", blockIdentifier.Hash,
|
|
"index", blockIdentifier.Index,
|
|
)
|
|
err := i.blockStorage.RemoveBlock(ctx, blockIdentifier)
|
|
if err != nil {
|
|
return fmt.Errorf(
|
|
"%w: unable to remove block from storage %s:%d",
|
|
err,
|
|
blockIdentifier.Hash,
|
|
blockIdentifier.Index,
|
|
)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// NetworkStatus is called by the syncer to get the current
|
|
// network status.
|
|
func (i *Indexer) NetworkStatus(
|
|
ctx context.Context,
|
|
network *types.NetworkIdentifier,
|
|
) (*types.NetworkStatusResponse, error) {
|
|
return i.client.NetworkStatus(ctx)
|
|
}
|
|
|
|
func (i *Indexer) findCoin(
|
|
ctx context.Context,
|
|
btcBlock *lbry.Block,
|
|
coinIdentifier string,
|
|
) (*types.Coin, *types.AccountIdentifier, error) {
|
|
for ctx.Err() == nil {
|
|
databaseTransaction := i.database.ReadTransaction(ctx)
|
|
defer databaseTransaction.Discard(ctx)
|
|
|
|
coinHeadBlock, err := i.blockStorage.GetHeadBlockIdentifierTransactional(
|
|
ctx,
|
|
databaseTransaction,
|
|
)
|
|
if errors.Is(err, storageErrs.ErrHeadBlockNotFound) {
|
|
if err := sdkUtils.ContextSleep(ctx, missingTransactionDelay); err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
continue
|
|
}
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf(
|
|
"%w: unable to get transactional head block identifier",
|
|
err,
|
|
)
|
|
}
|
|
|
|
// Attempt to find coin
|
|
coin, owner, err := i.coinStorage.GetCoinTransactional(
|
|
ctx,
|
|
databaseTransaction,
|
|
&types.CoinIdentifier{
|
|
Identifier: coinIdentifier,
|
|
},
|
|
)
|
|
if err == nil {
|
|
return coin, owner, nil
|
|
}
|
|
|
|
if !errors.Is(err, storageErrs.ErrCoinNotFound) {
|
|
return nil, nil, fmt.Errorf("%w: unable to lookup coin %s", err, coinIdentifier)
|
|
}
|
|
|
|
// Locking here prevents us from adding sending any done
|
|
// signals while we are determining whether or not to add
|
|
// to the WaitTable.
|
|
i.waiter.Lock()
|
|
|
|
// Check to see if head block has increased since
|
|
// we created our databaseTransaction.
|
|
currHeadBlock, err := i.blockStorage.GetHeadBlockIdentifier(ctx)
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("%w: unable to get head block identifier", err)
|
|
}
|
|
|
|
// If the block has changed, we try to look up the transaction
|
|
// again.
|
|
if types.Hash(currHeadBlock) != types.Hash(coinHeadBlock) {
|
|
i.waiter.Unlock()
|
|
continue
|
|
}
|
|
|
|
// Put Transaction in WaitTable if doesn't already exist (could be
|
|
// multiple listeners)
|
|
transactionHash := lbry.TransactionHash(coinIdentifier)
|
|
val, ok := i.waiter.Get(transactionHash, false)
|
|
if !ok {
|
|
val = &waitTableEntry{
|
|
channel: make(chan struct{}),
|
|
earliestBlock: btcBlock.Height,
|
|
}
|
|
}
|
|
if val.earliestBlock > btcBlock.Height {
|
|
val.earliestBlock = btcBlock.Height
|
|
}
|
|
val.listeners++
|
|
i.waiter.Set(transactionHash, val, false)
|
|
i.waiter.Unlock()
|
|
|
|
return nil, nil, errMissingTransaction
|
|
}
|
|
|
|
return nil, nil, ctx.Err()
|
|
}
|
|
|
|
func (i *Indexer) checkHeaderMatch(
|
|
ctx context.Context,
|
|
btcBlock *lbry.Block,
|
|
) error {
|
|
headBlock, err := i.blockStorage.GetHeadBlockIdentifier(ctx)
|
|
if err != nil && !errors.Is(err, storageErrs.ErrHeadBlockNotFound) {
|
|
return fmt.Errorf("%w: unable to lookup head block", err)
|
|
}
|
|
|
|
// If block we are trying to process is next but it is not connected, we
|
|
// should return syncer.ErrOrphanHead to manually trigger a reorg.
|
|
if headBlock != nil &&
|
|
btcBlock.Height == headBlock.Index+1 &&
|
|
btcBlock.PreviousBlockHash != headBlock.Hash {
|
|
return syncer.ErrOrphanHead
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (i *Indexer) findCoins(
|
|
ctx context.Context,
|
|
btcBlock *lbry.Block,
|
|
coins []string,
|
|
) (map[string]*types.AccountCoin, error) {
|
|
if err := i.checkHeaderMatch(ctx, btcBlock); err != nil {
|
|
return nil, fmt.Errorf("%w: check header match failed", err)
|
|
}
|
|
|
|
coinMap := map[string]*types.AccountCoin{}
|
|
remainingCoins := []string{}
|
|
for _, coinIdentifier := range coins {
|
|
coin, owner, err := i.findCoin(
|
|
ctx,
|
|
btcBlock,
|
|
coinIdentifier,
|
|
)
|
|
if err == nil {
|
|
coinMap[coinIdentifier] = &types.AccountCoin{
|
|
Account: owner,
|
|
Coin: coin,
|
|
}
|
|
continue
|
|
}
|
|
|
|
if errors.Is(err, errMissingTransaction) {
|
|
remainingCoins = append(remainingCoins, coinIdentifier)
|
|
continue
|
|
}
|
|
|
|
return nil, fmt.Errorf("%w: unable to find coin %s", err, coinIdentifier)
|
|
}
|
|
|
|
if len(remainingCoins) == 0 {
|
|
return coinMap, nil
|
|
}
|
|
|
|
// Wait for remaining transactions
|
|
shouldAbort := false
|
|
for _, coinIdentifier := range remainingCoins {
|
|
// Wait on Channel
|
|
txHash := lbry.TransactionHash(coinIdentifier)
|
|
entry, ok := i.waiter.Get(txHash, true)
|
|
if !ok {
|
|
return nil, fmt.Errorf("transaction %s not in waiter", txHash)
|
|
}
|
|
|
|
select {
|
|
case <-entry.channel:
|
|
case <-ctx.Done():
|
|
return nil, ctx.Err()
|
|
}
|
|
|
|
// Delete Transaction from WaitTable if last listener
|
|
i.waiter.Lock()
|
|
val, ok := i.waiter.Get(txHash, false)
|
|
if !ok {
|
|
return nil, fmt.Errorf("transaction %s not in waiter", txHash)
|
|
}
|
|
|
|
// Don't exit right away to make sure
|
|
// we remove all closed entries from the
|
|
// waiter.
|
|
if val.aborted {
|
|
shouldAbort = true
|
|
}
|
|
|
|
val.listeners--
|
|
if val.listeners == 0 {
|
|
i.waiter.Delete(txHash, false)
|
|
} else {
|
|
i.waiter.Set(txHash, val, false)
|
|
}
|
|
i.waiter.Unlock()
|
|
}
|
|
|
|
// Wait to exit until we have decremented our listeners
|
|
if shouldAbort {
|
|
return nil, syncer.ErrOrphanHead
|
|
}
|
|
|
|
// In the case of a reorg, we may still not be able to find
|
|
// the transactions. So, we need to repeat this same process
|
|
// recursively until we find the transactions we are looking for.
|
|
foundCoins, err := i.findCoins(ctx, btcBlock, remainingCoins)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("%w: unable to get remaining transactions", err)
|
|
}
|
|
|
|
for k, v := range foundCoins {
|
|
coinMap[k] = v
|
|
}
|
|
|
|
return coinMap, nil
|
|
}
|
|
|
|
// Block is called by the syncer to fetch a block.
|
|
func (i *Indexer) Block(
|
|
ctx context.Context,
|
|
network *types.NetworkIdentifier,
|
|
blockIdentifier *types.PartialBlockIdentifier,
|
|
) (*types.Block, error) {
|
|
// get raw block
|
|
var btcBlock *lbry.Block
|
|
var coins []string
|
|
var err error
|
|
|
|
retries := 0
|
|
for ctx.Err() == nil {
|
|
btcBlock, coins, err = i.client.GetRawBlock(ctx, blockIdentifier)
|
|
if err == nil {
|
|
break
|
|
}
|
|
|
|
retries++
|
|
if retries > retryLimit {
|
|
return nil, fmt.Errorf("%w: unable to get raw block %+v", err, blockIdentifier)
|
|
}
|
|
|
|
if err := sdkUtils.ContextSleep(ctx, retryDelay); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
// determine which coins must be fetched and get from coin storage
|
|
coinMap, err := i.findCoins(ctx, btcBlock, coins)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("%w: unable to find input transactions", err)
|
|
}
|
|
|
|
// provide to block parsing
|
|
block, err := i.client.ParseBlock(ctx, btcBlock, coinMap)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("%w: unable to parse block %+v", err, blockIdentifier)
|
|
}
|
|
|
|
// ensure block is valid
|
|
if err := i.asserter.Block(block); err != nil {
|
|
return nil, fmt.Errorf("%w: block is not valid %+v", err, blockIdentifier)
|
|
}
|
|
|
|
return block, nil
|
|
}
|
|
|
|
// GetScriptPubKeys gets the ScriptPubKey for
|
|
// a collection of *types.CoinIdentifier. It also
|
|
// confirms that the amount provided with each coin
|
|
// is valid.
|
|
func (i *Indexer) GetScriptPubKeys(
|
|
ctx context.Context,
|
|
coins []*types.Coin,
|
|
) ([]*lbry.ScriptPubKey, error) {
|
|
databaseTransaction := i.database.ReadTransaction(ctx)
|
|
defer databaseTransaction.Discard(ctx)
|
|
|
|
scripts := make([]*lbry.ScriptPubKey, len(coins))
|
|
for j, coin := range coins {
|
|
coinIdentifier := coin.CoinIdentifier
|
|
transactionHash, networkIndex, err := lbry.ParseCoinIdentifier(coinIdentifier)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("%w: unable to parse coin identifier", err)
|
|
}
|
|
|
|
_, transaction, err := i.blockStorage.FindTransaction(
|
|
ctx,
|
|
&types.TransactionIdentifier{Hash: transactionHash.String()},
|
|
databaseTransaction,
|
|
)
|
|
if err != nil || transaction == nil {
|
|
return nil, fmt.Errorf(
|
|
"%w: unable to find transaction %s",
|
|
err,
|
|
transactionHash.String(),
|
|
)
|
|
}
|
|
|
|
for _, op := range transaction.Operations {
|
|
if op.Type != lbry.OutputOpType {
|
|
continue
|
|
}
|
|
|
|
if *op.OperationIdentifier.NetworkIndex != int64(networkIndex) {
|
|
continue
|
|
}
|
|
|
|
var opMetadata lbry.OperationMetadata
|
|
if err := types.UnmarshalMap(op.Metadata, &opMetadata); err != nil {
|
|
return nil, fmt.Errorf(
|
|
"%w: unable to unmarshal operation metadata %+v",
|
|
err,
|
|
op.Metadata,
|
|
)
|
|
}
|
|
|
|
if types.Hash(op.Amount.Currency) != types.Hash(coin.Amount.Currency) {
|
|
return nil, fmt.Errorf(
|
|
"currency expected %s does not match coin %s",
|
|
types.PrintStruct(coin.Amount.Currency),
|
|
types.PrintStruct(op.Amount.Currency),
|
|
)
|
|
}
|
|
|
|
addition, err := types.AddValues(op.Amount.Value, coin.Amount.Value)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("%w: unable to add op amount and coin amount", err)
|
|
}
|
|
|
|
if addition != "0" {
|
|
return nil, fmt.Errorf(
|
|
"coin amount does not match expected with difference %s",
|
|
addition,
|
|
)
|
|
}
|
|
|
|
scripts[j] = opMetadata.ScriptPubKey
|
|
break
|
|
}
|
|
|
|
if scripts[j] == nil {
|
|
return nil, fmt.Errorf("unable to find script for coin %s", coinIdentifier.Identifier)
|
|
}
|
|
}
|
|
|
|
return scripts, nil
|
|
}
|
|
|
|
// GetBlockLazy returns a *types.BlockResponse from the indexer's block storage.
|
|
// All transactions in a block must be fetched individually.
|
|
func (i *Indexer) GetBlockLazy(
|
|
ctx context.Context,
|
|
blockIdentifier *types.PartialBlockIdentifier,
|
|
) (*types.BlockResponse, error) {
|
|
return i.blockStorage.GetBlockLazy(ctx, blockIdentifier)
|
|
}
|
|
|
|
// GetBlockTransaction returns a *types.Transaction if it is in the provided
|
|
// *types.BlockIdentifier.
|
|
func (i *Indexer) GetBlockTransaction(
|
|
ctx context.Context,
|
|
blockIdentifier *types.BlockIdentifier,
|
|
transactionIdentifier *types.TransactionIdentifier,
|
|
) (*types.Transaction, error) {
|
|
return i.blockStorage.GetBlockTransaction(
|
|
ctx,
|
|
blockIdentifier,
|
|
transactionIdentifier,
|
|
)
|
|
}
|
|
|
|
// GetCoins returns all unspent coins for a particular *types.AccountIdentifier.
|
|
func (i *Indexer) GetCoins(
|
|
ctx context.Context,
|
|
accountIdentifier *types.AccountIdentifier,
|
|
) ([]*types.Coin, *types.BlockIdentifier, error) {
|
|
return i.coinStorage.GetCoins(ctx, accountIdentifier)
|
|
}
|
|
|
|
// GetBalance returns the balance of an account
|
|
// at a particular *types.PartialBlockIdentifier.
|
|
func (i *Indexer) GetBalance(
|
|
ctx context.Context,
|
|
accountIdentifier *types.AccountIdentifier,
|
|
currency *types.Currency,
|
|
blockIdentifier *types.PartialBlockIdentifier,
|
|
) (*types.Amount, *types.BlockIdentifier, error) {
|
|
dbTx := i.database.ReadTransaction(ctx)
|
|
defer dbTx.Discard(ctx)
|
|
|
|
blockResponse, err := i.blockStorage.GetBlockLazyTransactional(
|
|
ctx,
|
|
blockIdentifier,
|
|
dbTx,
|
|
)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
amount, err := i.balanceStorage.GetBalanceTransactional(
|
|
ctx,
|
|
dbTx,
|
|
accountIdentifier,
|
|
currency,
|
|
blockResponse.Block.BlockIdentifier.Index,
|
|
)
|
|
if errors.Is(err, storageErrs.ErrAccountMissing) {
|
|
return &types.Amount{
|
|
Value: zeroValue,
|
|
Currency: currency,
|
|
}, blockResponse.Block.BlockIdentifier, nil
|
|
}
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
return amount, blockResponse.Block.BlockIdentifier, nil
|
|
}
|