added slice backed stack, need to fix tests

This commit is contained in:
Jeffrey Picard 2022-02-28 15:23:06 -05:00
parent 2658bf50ff
commit 3c99296f8b
5 changed files with 422 additions and 20 deletions

300
db/db.go
View file

@ -10,6 +10,7 @@ import (
"sort"
"time"
"github.com/lbryio/hub/db/db_stack"
"github.com/lbryio/hub/db/prefixes"
"github.com/lbryio/lbry.go/v2/extras/util"
"github.com/linxGnu/grocksdb"
@ -37,11 +38,17 @@ const (
//
type ReadOnlyDBColumnFamily struct {
DB *grocksdb.DB
Handles map[string]*grocksdb.ColumnFamilyHandle
Opts *grocksdb.ReadOptions
TxCounts []uint32
Height uint32
DB *grocksdb.DB
Handles map[string]*grocksdb.ColumnFamilyHandle
Opts *grocksdb.ReadOptions
// TxCountIdx int32 // TODO: slice backed stack
// TxCounts []uint32
TxCounts *db_stack.SliceBackedStack
Height uint32
LastState *prefixes.DBStateValue
// HeaderIdx int32 // TODO: slice backed stack
// Headers []*prefixes.BlockHeaderValue
Headers *db_stack.SliceBackedStack
BlockedStreams map[string][]byte
BlockedChannels map[string][]byte
FilteredStreams map[string][]byte
@ -476,7 +483,14 @@ func GetDBColumnFamlies(name string, cfNames []string) (*ReadOnlyDBColumnFamily,
FilteredStreams: make(map[string][]byte),
FilteredChannels: make(map[string][]byte),
TxCounts: nil,
LastState: nil,
Height: 0,
Headers: nil,
}
err = ReadDBState(myDB) //TODO: Figure out right place for this
if err != nil {
return nil, err
}
err = InitTxCounts(myDB)
@ -484,30 +498,286 @@ func GetDBColumnFamlies(name string, cfNames []string) (*ReadOnlyDBColumnFamily,
return nil, err
}
err = InitHeaders(myDB)
if err != nil {
return nil, err
}
RunDetectChanges(myDB)
return myDB, nil
}
// DetectChanges keep the rocksdb db in sync
func Advance(db *ReadOnlyDBColumnFamily, height uint32) {
/*
def advance(self, height: int):
tx_count = self.db.prefix_db.tx_count.get(height).tx_count
assert len(self.db.tx_counts) == height, f"{len(self.db.tx_counts)} != {height}"
self.db.tx_counts.append(tx_count)
self.db.headers.append(self.db.prefix_db.header.get(height, deserialize_value=False))
*/
// TODO: assert tx_count not in self.db.tx_counts, f'boom {tx_count} in {len(self.db.tx_counts)} tx counts'
if db.TxCounts.Len() >= 0 && db.TxCounts.Len() != height {
log.Println("Error: tx count len:", db.TxCounts.Len(), "height:", height)
return
}
txCountObj, err := GetTxCount(db, height)
if err != nil {
log.Println("Error getting tx count:", err)
return
}
txCount := txCountObj.TxCount
db.TxCounts.Push(txCount)
}
func Unwind(db *ReadOnlyDBColumnFamily) {
db.TxCounts.Pop()
db.Headers.Pop()
}
// RunDetectChanges Go routine the runs continuously while the hub is active
// to keep the db readonly view up to date and handle reorgs on the
// blockchain.
func RunDetectChanges(db *ReadOnlyDBColumnFamily) {
go func() {
for {
// FIXME: Figure out best sleep interval
time.Sleep(time.Second)
err := DetectChanges(db)
if err != nil {
log.Printf("Error detecting changes: %#v\n", err)
}
}
}()
}
// DetectChanges keep the rocksdb db in sync and handle reorgs
func DetectChanges(db *ReadOnlyDBColumnFamily) error {
err := db.DB.TryCatchUpWithPrimary()
if err != nil {
log.Printf("error trying to catch up with primary: %#v", err)
return err
}
state, err := GetDBState(db)
if err != nil {
return err
}
if state == nil || state.Height <= 0 {
return nil
}
if db.LastState != nil && db.LastState.Height > state.Height {
log.Println("reorg detected, waiting until the writer has flushed the new blocks to advance")
return nil
}
var lastHeight uint32 = 0
var rewound bool = false
if db.LastState != nil {
lastHeight = db.LastState.Height
for {
lastHeightHeader, err := GetHeader(db, lastHeight)
if err != nil {
return err
}
curHeader := db.Headers.GetTip().(*prefixes.BlockHeaderValue).Header
if bytes.Equal(curHeader, lastHeightHeader) {
log.Println("connects to block", lastHeight)
break
} else {
log.Println("disconnect block", lastHeight)
Unwind(db)
rewound = true
lastHeight -= 1
}
}
}
if rewound {
//TODO: reorg count metric
}
err = ReadDBState(db)
if err != nil {
return err
}
if db.LastState == nil || lastHeight < state.Height {
for height := lastHeight; height <= state.Height; height++ {
log.Println("advancing to", height)
Advance(db, height)
//TODO: ClearCache
db.LastState = state
//TODO: block count metric
// self.last_state = state ???:w
//TODO: update blocked streams
//TODO: update filtered streams
}
}
/*
if self.last_state:
while True:
if self.db.headers[-1] == self.db.prefix_db.header.get(last_height, deserialize_value=False):
self.log.debug("connects to block %i", last_height)
break
else:
self.log.warning("disconnect block %i", last_height)
self.unwind()
rewound = True
last_height -= 1
if rewound:
self.reorg_count_metric.inc()
self.db.read_db_state()
if not self.last_state or last_height < state.height:
for height in range(last_height + 1, state.height + 1):
self.log.info("advancing to %i", height)
self.advance(height)
self.clear_caches()
self.last_state = state
self.block_count_metric.set(self.last_state.height)
self.db.blocked_streams, self.db.blocked_channels = self.db.get_streams_and_channels_reposted_by_channel_hashes(
self.db.blocking_channel_hashes
)
self.db.filtered_streams, self.db.filtered_channels = self.db.get_streams_and_channels_reposted_by_channel_hashes(
self.db.filtering_channel_hashes
)
*/
return nil
}
/*
def read_db_state(self):
state = self.prefix_db.db_state.get()
if not state:
self.db_height = -1
self.db_tx_count = 0
self.db_tip = b'\0' * 32
self.db_version = max(self.DB_VERSIONS)
self.utxo_flush_count = 0
self.wall_time = 0
self.first_sync = True
self.hist_flush_count = 0
self.hist_comp_flush_count = -1
self.hist_comp_cursor = -1
self.hist_db_version = max(self.DB_VERSIONS)
self.es_sync_height = 0
else:
self.db_version = state.db_version
if self.db_version not in self.DB_VERSIONS:
raise DBError(f'your DB version is {self.db_version} but this '
f'software only handles versions {self.DB_VERSIONS}')
# backwards compat
genesis_hash = state.genesis
if genesis_hash.hex() != self.coin.GENESIS_HASH:
raise DBError(f'DB genesis hash {genesis_hash} does not '
f'match coin {self.coin.GENESIS_HASH}')
self.db_height = state.height
self.db_tx_count = state.tx_count
self.db_tip = state.tip
self.utxo_flush_count = state.utxo_flush_count
self.wall_time = state.wall_time
self.first_sync = state.first_sync
self.hist_flush_count = state.hist_flush_count
self.hist_comp_flush_count = state.comp_flush_count
self.hist_comp_cursor = state.comp_cursor
self.hist_db_version = state.db_version
self.es_sync_height = state.es_sync_height
return state
*/
func ReadDBState(db *ReadOnlyDBColumnFamily) error {
state, err := GetDBState(db)
if err != nil {
return err
}
if state != nil {
db.LastState = state
} else {
db.LastState = prefixes.NewDBStateValue()
}
return nil
/*
state := db.DBState
if state == nil {
db.DBHeight = -1
db.DBTxCount = 0
db.DBTip = make([]byte, 32)
db.DBVersion = max(db.DBVersions)
db.UTXOFlushCount = 0
db.WallTime = 0
db.FirstSync = true
db.HistFlushCount = 0
db.HistCompFlushCount = -1
db.HistCompCursor = -1
db.HistDBVersion = max(db.DBVersions)
db.ESSyncHeight = 0
} else {
db.DBVersion = state.DBVersion
if db.DBVersion != db.DBVersions[0] {
panic(f"DB version {db.DBVersion} not supported")
}
// backwards compat
genesisHash := state.Genesis
if !bytes.Equal(genesisHash, db.Coin.GENESIS_HASH) {
panic(f"DB genesis hash {genesisHash} does not match coin {db.Coin.GENESIS_HASH}")
}
db.DBHeight = state.Height
db.DBTxCount = state.TxCount
db.DBTip = state.Tip
db.UTXOFlushCount = state.UTXOFlushCount
db.WallTime = state.WallTime
db.FirstSync = state.FirstSync
db.HistFlushCount = state.HistFlushCount
db.HistCompFlushCount = state.HistCompFlushCount
db.HistCompCursor = state.HistCompCursor
db.HistDBVersion = state.HistDBVersion
db.ESSyncHeight = state.ESSyncHeight
}
*/
}
func InitHeaders(db *ReadOnlyDBColumnFamily) error {
handle, err := EnsureHandle(db, prefixes.Header)
if err != nil {
return err
}
//TODO: figure out a reasonable default and make it a constant
db.Headers = db_stack.NewSliceBackedStack(12000)
startKey := prefixes.NewHeaderKey(0)
endKey := prefixes.NewHeaderKey(db.LastState.Height)
startKeyRaw := startKey.PackKey()
endKeyRaw := endKey.PackKey()
options := NewIterateOptions().WithPrefix([]byte{prefixes.Header}).WithCfHandle(handle)
options = options.WithIncludeKey(false).WithIncludeValue(true)
options = options.WithStart(startKeyRaw).WithStop(endKeyRaw)
ch := IterCF(db.DB, options)
for header := range ch {
db.Headers.Push(header.Value)
}
return nil
}
// InitTxCounts initializes the txCounts map
func InitTxCounts(db *ReadOnlyDBColumnFamily) error {
start := time.Now()
handle, ok := db.Handles[string([]byte{prefixes.TxCount})]
if !ok {
return fmt.Errorf("TxCount prefix not found")
handle, err := EnsureHandle(db, prefixes.TxCount)
if err != nil {
return err
}
//TODO: figure out a reasonable default and make it a constant
txCounts := make([]uint32, 0, 1200000)
db.TxCounts = db_stack.NewSliceBackedStack(1200000)
options := NewIterateOptions().WithPrefix([]byte{prefixes.TxCount}).WithCfHandle(handle)
options = options.WithIncludeKey(false).WithIncludeValue(true)
@ -515,15 +785,15 @@ func InitTxCounts(db *ReadOnlyDBColumnFamily) error {
ch := IterCF(db.DB, options)
for txCount := range ch {
txCounts = append(txCounts, txCount.Value.(*prefixes.TxCountValue).TxCount)
db.TxCounts.Push(txCount.Value.(*prefixes.TxCountValue).TxCount)
}
duration := time.Since(start)
log.Println("len(txCounts), cap(txCounts):", len(txCounts), cap(txCounts))
log.Println("len(db.TxCounts), size(db.TxCounts):", db.TxCounts.Len(), db.TxCounts.Size())
log.Println("Time to get txCounts:", duration)
db.TxCounts = txCounts
db.Height = uint32(len(txCounts))
db.Height = db.TxCounts.Len()
return nil
}

View file

@ -33,9 +33,25 @@ func EnsureHandle(db *ReadOnlyDBColumnFamily, prefix byte) (*grocksdb.ColumnFami
return handle, nil
}
//
// DB Get functions
//
func GetHeader(db *ReadOnlyDBColumnFamily, height uint32) ([]byte, error) {
handle, err := EnsureHandle(db, prefixes.Header)
if err != nil {
return nil, err
}
key := prefixes.NewHeaderKey(height)
rawKey := key.PackKey()
slice, err := db.DB.GetCF(db.Opts, handle, rawKey)
if err != nil {
return nil, err
} else if slice.Size() == 0 {
return nil, err
}
rawValue := make([]byte, len(slice.Data()))
copy(rawValue, slice.Data())
return rawValue, nil
}
func GetClaimsInChannelCount(db *ReadOnlyDBColumnFamily, channelHash []byte) (uint32, error) {
handle, err := EnsureHandle(db, prefixes.ChannelCount)
@ -360,6 +376,28 @@ func FsGetClaimByHash(db *ReadOnlyDBColumnFamily, claimHash []byte) (*ResolveRes
)
}
func GetTxCount(db *ReadOnlyDBColumnFamily, height uint32) (*prefixes.TxCountValue, error) {
handle, err := EnsureHandle(db, prefixes.TxCount)
if err != nil {
return nil, err
}
key := prefixes.NewTxCountKey(height)
rawKey := key.PackKey()
slice, err := db.DB.GetCF(db.Opts, handle, rawKey)
if err != nil {
return nil, err
}
if slice.Size() == 0 {
return nil, nil
}
rawValue := make([]byte, len(slice.Data()))
copy(rawValue, slice.Data())
value := prefixes.TxCountValueUnpack(rawValue)
return value, nil
}
func GetDBState(db *ReadOnlyDBColumnFamily) (*prefixes.DBStateValue, error) {
handle, err := EnsureHandle(db, prefixes.DBState)
if err != nil {

View file

@ -37,8 +37,11 @@ func PrepareResolveResult(
return nil, err
}
height := BisectRight(db.TxCounts, txNum)
createdHeight := BisectRight(db.TxCounts, rootTxNum)
var txCounts []uint32
txCounts = db.TxCounts.GetSlice().([]uint32)
txCounts = txCounts[db.TxCounts.Len()-1:]
height := BisectRight(txCounts, txNum)
createdHeight := BisectRight(txCounts, rootTxNum)
lastTakeoverHeight := controllingClaim.Height
expirationHeight := GetExpirationHeight(height)

56
db/db_stack/db_stack.go Normal file
View file

@ -0,0 +1,56 @@
package db_stack
type SliceBackedStack struct {
slice []interface{}
len uint32
}
func NewSliceBackedStack(size int) *SliceBackedStack {
return &SliceBackedStack{
slice: make([]interface{}, size),
len: 0,
}
}
func (s *SliceBackedStack) Push(v interface{}) {
if s.len == uint32(len(s.slice)) {
s.slice = append(s.slice, v)
} else {
s.slice[s.len] = v
}
s.len++
}
func (s *SliceBackedStack) Pop() interface{} {
if s.len == 0 {
return nil
}
s.len--
return s.slice[s.len]
}
func (s *SliceBackedStack) Get(i uint32) interface{} {
if i >= s.len {
return nil
}
return s.slice[i]
}
func (s *SliceBackedStack) GetTip() interface{} {
if s.len == 0 {
return nil
}
return s.slice[s.len-1]
}
func (s *SliceBackedStack) Len() uint32 {
return s.len
}
func (s *SliceBackedStack) Size() int {
return len(s.slice)
}
func (s *SliceBackedStack) GetSlice() interface{} {
return s.slice
}

View file

@ -137,6 +137,23 @@ type DBStateValue struct {
EsSyncHeight uint32
}
func NewDBStateValue() *DBStateValue {
return &DBStateValue{
Genesis: make([]byte, 32),
Height: 0,
TxCount: 0,
Tip: make([]byte, 32),
UtxoFlushCount: 0,
WallTime: 0,
FirstSync: true,
DDVersion: 0,
HistFlushCount: 0,
CompFlushCount: -1,
CompCursor: -1,
EsSyncHeight: 0,
}
}
func NewDBStateKey() *DBStateKey {
return &DBStateKey{
Prefix: []byte{DBState},
@ -820,6 +837,13 @@ type TxCountValue struct {
TxCount uint32 `json:"tx_count"`
}
func NewTxCountKey(height uint32) *TxCountKey {
return &TxCountKey{
Prefix: []byte{TxCount},
Height: height,
}
}
func (k *TxCountKey) PackKey() []byte {
prefixLen := 1
// b'>L'
@ -1223,6 +1247,17 @@ type BlockHeaderValue struct {
Header []byte `json:"header"`
}
func (k *BlockHeaderValue) Equals(v *BlockHeaderValue) bool {
return bytes.Equal(k.Header, v.Header)
}
func NewHeaderKey(height uint32) *BlockHeaderKey {
return &BlockHeaderKey{
Prefix: []byte{Header},
Height: height,
}
}
func (k *BlockHeaderKey) PackKey() []byte {
prefixLen := 1
// b'>L'