Compare commits

...

3 commits

Author SHA1 Message Date
Patrick O'Grady
46da44d520
Attempt aggressive caching 2020-10-27 22:41:51 -07:00
Patrick O'Grady
311eb6988c
Use rwmutex 2020-10-27 22:20:54 -07:00
Patrick O'Grady
863313103a
In-memory coin cache during sync, first pass 2020-10-27 22:09:30 -07:00

View file

@ -18,6 +18,7 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"sync"
"time" "time"
"github.com/coinbase/rosetta-bitcoin/bitcoin" "github.com/coinbase/rosetta-bitcoin/bitcoin"
@ -95,6 +96,8 @@ type Indexer struct {
workers []storage.BlockWorker workers []storage.BlockWorker
waiter *waitTable waiter *waitTable
coinMapMutex sync.RWMutex
coinMap map[string]*storage.AccountCoin
} }
// CloseDatabase closes a storage.Database. This should be called // CloseDatabase closes a storage.Database. This should be called
@ -198,6 +201,9 @@ func Initialize(
blockStorage: blockStorage, blockStorage: blockStorage,
waiter: newWaitTable(), waiter: newWaitTable(),
asserter: asserter, asserter: asserter,
// TODO: only enable during fast catchup (i.e. far behind chain)
// Delete oldest entries whenever some size: https://stackoverflow.com/questions/60829460/is-there-a-way-to-delete-first-element-from-map
coinMap: map[string]*storage.AccountCoin{},
} }
coinStorage := storage.NewCoinStorage( coinStorage := storage.NewCoinStorage(
@ -317,6 +323,30 @@ func (i *Indexer) Prune(ctx context.Context) error {
func (i *Indexer) BlockAdded(ctx context.Context, block *types.Block) error { func (i *Indexer) BlockAdded(ctx context.Context, block *types.Block) error {
logger := utils.ExtractLogger(ctx, "indexer") logger := utils.ExtractLogger(ctx, "indexer")
// Update cache
i.coinMapMutex.Lock()
for _, transaction := range block.Transactions {
for _, op := range transaction.Operations {
if op.CoinChange == nil || op.Amount == nil {
continue
}
if op.CoinChange.CoinAction == types.CoinSpent {
delete(i.coinMap, op.CoinChange.CoinIdentifier.Identifier)
continue
}
i.coinMap[op.CoinChange.CoinIdentifier.Identifier] = &storage.AccountCoin{
Account: op.Account,
Coin: &types.Coin{
CoinIdentifier: op.CoinChange.CoinIdentifier,
Amount: op.Amount,
},
}
}
}
i.coinMapMutex.Unlock()
err := i.blockStorage.AddBlock(ctx, block) err := i.blockStorage.AddBlock(ctx, block)
if err != nil { if err != nil {
return fmt.Errorf( return fmt.Errorf(
@ -416,6 +446,22 @@ func (i *Indexer) NetworkStatus(
return i.client.NetworkStatus(ctx) return i.client.NetworkStatus(ctx)
} }
func (i *Indexer) getCoin(
ctx context.Context,
dbTx storage.DatabaseTransaction,
coinIdentifier *types.CoinIdentifier,
) (*types.Coin, *types.AccountIdentifier, error) {
i.coinMapMutex.RLock()
m, ok := i.coinMap[coinIdentifier.Identifier]
i.coinMapMutex.RUnlock()
if ok {
return m.Coin, m.Account, nil
}
// THis is SUPER dangerous (won't survive restart)
return nil, nil, storage.ErrCoinNotFound
}
func (i *Indexer) findCoin( func (i *Indexer) findCoin(
ctx context.Context, ctx context.Context,
btcBlock *bitcoin.Block, btcBlock *bitcoin.Block,
@ -444,7 +490,7 @@ func (i *Indexer) findCoin(
} }
// Attempt to find coin // Attempt to find coin
coin, owner, err := i.coinStorage.GetCoinTransactional( coin, owner, err := i.getCoin(
ctx, ctx,
databaseTransaction, databaseTransaction,
&types.CoinIdentifier{ &types.CoinIdentifier{