More cleanup

This commit is contained in:
Jeffrey Picard 2022-04-11 01:40:29 +00:00
parent e2d1fa8558
commit 167f62c845
9 changed files with 46 additions and 149 deletions

View file

@ -1,11 +1,13 @@
package db
// db.go contains basic functions for representing and accessing the state of
// a read-only version of the rocksdb database.
import (
"bytes"
"encoding/hex"
"fmt"
"os"
"sort"
"time"
"github.com/lbryio/hub/db/db_stack"
@ -305,13 +307,6 @@ func (ps *PathSegment) String() string {
return ps.name
}
// BisectRight returns the index of the first element in the list that is greater than or equal to the value.
// https://stackoverflow.com/questions/29959506/is-there-a-go-analog-of-pythons-bisect-module
func BisectRight(arr []interface{}, val uint32) uint32 {
i := sort.Search(len(arr), func(i int) bool { return arr[i].(uint32) >= val })
return uint32(i)
}
//
// Iterators / db construction functions
//
@ -539,6 +534,11 @@ func GetDBColumnFamlies(name string, secondayPath string, cfNames []string) (*Re
// Advance advance the db to the given height.
func (db *ReadOnlyDBColumnFamily) Advance(height uint32) {
// DB wasn't created when we initialized headers, reinit
if db.TxCounts.Len() == 0 {
db.InitHeaders()
db.InitTxCounts()
}
// TODO: assert tx_count not in self.db.tx_counts, f'boom {tx_count} in {len(self.db.tx_counts)} tx counts'
if db.TxCounts.Len() != height {
log.Error("tx count len:", db.TxCounts.Len(), "height:", height)
@ -759,15 +759,6 @@ func (db *ReadOnlyDBColumnFamily) InitTxCounts() error {
log.Println("len(db.TxCounts), cap(db.TxCounts):", db.TxCounts.Len(), db.TxCounts.Cap())
log.Println("Time to get txCounts:", duration)
// whjy not needs to be len-1 because we start loading with the zero block
// and the txcounts start at one???
// db.Height = db.TxCounts.Len()
// if db.TxCounts.Len() > 0 {
// db.Height = db.TxCounts.Len() - 1
// } else {
// log.Println("db.TxCounts.Len() == 0 ???")
// }
return nil
}

View file

@ -1,5 +1,7 @@
package db
// db_get.go contains the basic access functions to the database.
import (
"encoding/hex"
"fmt"
@ -311,11 +313,6 @@ func (db *ReadOnlyDBColumnFamily) GetSupportAmount(claimHash []byte) (uint64, er
}
func (db *ReadOnlyDBColumnFamily) GetTxHash(txNum uint32) ([]byte, error) {
/*
if self._cache_all_tx_hashes:
return self.total_transactions[tx_num]
return self.prefix_db.tx_hash.get(tx_num, deserialize_value=False)
*/
// TODO: caching
handle, err := db.EnsureHandle(prefixes.TxHash)
if err != nil {

View file

@ -1,5 +1,7 @@
package db
// db_resolve.go contains functions relevant to resolving a claim.
import (
"bytes"
"encoding/hex"
@ -38,7 +40,7 @@ func PrepareResolveResult(
return nil, err
}
height, createdHeight := db.TxCounts.TxCountsBisectRight(txNum, rootTxNum, BisectRight)
height, createdHeight := db.TxCounts.TxCountsBisectRight(txNum, rootTxNum)
lastTakeoverHeight := controllingClaim.Height
expirationHeight := GetExpirationHeight(height)
@ -84,7 +86,7 @@ func PrepareResolveResult(
return nil, err
}
repostTxPostition = repostTxo.Position
repostHeight, _ = db.TxCounts.TxCountsBisectRight(repostTxo.TxNum, rootTxNum, BisectRight)
repostHeight, _ = db.TxCounts.TxCountsBisectRight(repostTxo.TxNum, rootTxNum)
}
}
@ -120,7 +122,7 @@ func PrepareResolveResult(
return nil, err
}
channelTxPostition = channelVals.Position
channelHeight, _ = db.TxCounts.TxCountsBisectRight(channelVals.TxNum, rootTxNum, BisectRight)
channelHeight, _ = db.TxCounts.TxCountsBisectRight(channelVals.TxNum, rootTxNum)
}
}

View file

@ -1,7 +1,12 @@
package db_stack
// The db_stack package contains the implementation of a generic slice backed stack
// used for tracking various states in the hub, i.e. headers and txcounts
import (
"sync"
"github.com/lbryio/hub/internal"
)
type SliceBackedStack struct {
@ -81,16 +86,13 @@ func (s *SliceBackedStack) GetSlice() []interface{} {
}
// This function is dangerous because it assumes underlying types
func (s *SliceBackedStack) TxCountsBisectRight(
txNum, rootTxNum uint32,
bisectFunc func([]interface{}, uint32) uint32,
) (uint32, uint32) {
func (s *SliceBackedStack) TxCountsBisectRight(txNum, rootTxNum uint32) (uint32, uint32) {
s.mut.RLock()
defer s.mut.RUnlock()
txCounts := s.slice[:s.Len()]
height := bisectFunc(txCounts, txNum)
createdHeight := bisectFunc(txCounts, rootTxNum)
height := internal.BisectRight(txCounts, txNum)
createdHeight := internal.BisectRight(txCounts, rootTxNum)
return height, createdHeight
}

View file

@ -1,5 +1,7 @@
package db
// iterator.go contains the implementation for iterators on rocksdb used by the hub
import (
"bytes"
@ -97,6 +99,7 @@ func (o *IterOptions) WithRawValue(rawValue bool) *IterOptions {
return o
}
// ReadRow reads a row from the db, returns nil when no more rows are available.
func (opts *IterOptions) ReadRow(prevKey *[]byte) *prefixes.PrefixRowKV {
it := opts.It
if !it.Valid() {
@ -165,12 +168,12 @@ func (opts *IterOptions) ReadRow(prevKey *[]byte) *prefixes.PrefixRowKV {
return kv
}
// StopIteration returns true if we've hit the criteria to end iteration on this key
func (o *IterOptions) StopIteration(key []byte) bool {
if key == nil {
return false
}
// TODO: Look at not doing floating point conversions for this
maxLenStop := intMin(len(key), len(o.Stop))
maxLenStart := intMin(len(key), len(o.Start))
if o.Stop != nil &&

View file

@ -181,12 +181,7 @@ func (v *DBStateValue) PackValue() []byte {
histFlushCount = (OnesCompTwiddle32 - uint32(v.HistFlushCount))
compFlushCount = (OnesCompTwiddle32 - uint32(v.CompFlushCount))
compCursor = (OnesCompTwiddle32 - uint32(v.CompCursor))
// if v.HistFlushCount < 0 {
// }
// if v.CompFlushCount < 0 {
// }
// if v.CompCursor < 0 {
// }
binary.BigEndian.PutUint32(value[32+4+4+32+4+4+1+1:], histFlushCount)
binary.BigEndian.PutUint32(value[32+4+4+32+4+4+1+1+4:], compFlushCount)
binary.BigEndian.PutUint32(value[32+4+4+32+4+4+1+1+4+4:], compCursor)
@ -689,6 +684,13 @@ type BlockTxsValue struct {
TxHashes [][]byte `json:"tx_hashes"`
}
func (k *BlockTxsKey) NewBlockTxsKey(height uint32) *BlockTxsKey {
return &BlockTxsKey{
Prefix: []byte{BlockTXs},
Height: height,
}
}
func (k *BlockTxsKey) PackKey() []byte {
prefixLen := 1
// b'>L'

10
internal/sort.go Normal file
View file

@ -0,0 +1,10 @@
package internal
import "sort"
// BisectRight returns the index of the first element in the list that is greater than or equal to the value.
// https://stackoverflow.com/questions/29959506/is-there-a-go-analog-of-pythons-bisect-module
func BisectRight(arr []interface{}, val uint32) uint32 {
i := sort.Search(len(arr), func(i int) bool { return arr[i].(uint32) >= val })
return uint32(i)
}

110
main.go
View file

@ -1,15 +1,10 @@
package main
import (
"bytes"
"context"
"encoding/hex"
"fmt"
"time"
"github.com/lbryio/hub/db"
"github.com/lbryio/hub/db/prefixes"
"github.com/lbryio/hub/internal"
pb "github.com/lbryio/hub/protobuf/go"
"github.com/lbryio/hub/server"
"github.com/lbryio/lbry.go/v2/extras/util"
@ -33,8 +28,6 @@ func main() {
ctxWCancel, cancel := context.WithCancel(ctx)
defer cancel()
// TODO: Figure out if / where we need signal handling
initsignals()
interrupt := interruptListener()
@ -53,109 +46,6 @@ func main() {
<-interrupt
return
} else if args.CmdType == server.DBCmd {
options := &db.IterOptions{
FillCache: false,
Prefix: []byte{prefixes.SupportAmount},
Start: nil,
Stop: nil,
IncludeStart: true,
IncludeStop: false,
IncludeKey: true,
IncludeValue: true,
RawKey: true,
RawValue: true,
}
dbVal, err := db.GetDB("/mnt/d/data/wallet/lbry-rocksdb/")
if err != nil {
log.Fatalln(err)
}
db.ReadWriteRawN(dbVal, options, "./testdata/support_amount.csv", 10)
return
} else if args.CmdType == server.DBCmd2 {
pxs := prefixes.GetPrefixes()
for _, prefix := range pxs {
//var rawPrefix byte = prefixes.ClaimExpiration
//prefix := []byte{rawPrefix}
columnFamily := string(prefix)
options := &db.IterOptions{
FillCache: false,
Prefix: prefix,
Start: nil,
Stop: nil,
IncludeStart: true,
IncludeStop: false,
IncludeKey: true,
IncludeValue: true,
RawKey: true,
RawValue: true,
}
dbVal, handles, err := db.GetDBCF("/mnt/d/data/snapshot_1072108/lbry-rocksdb/", columnFamily)
if err != nil {
log.Fatalln(err)
}
options.CfHandle = handles[1]
var n = 10
if bytes.Equal(prefix, []byte{prefixes.Undo}) || bytes.Equal(prefix, []byte{prefixes.DBState}) {
n = 1
}
db.ReadWriteRawNCF(dbVal, options, fmt.Sprintf("./testdata/%s.csv", columnFamily), n)
}
return
} else if args.CmdType == server.DBCmd3 {
// streamHash, _ := hex.DecodeString("9a0ed686ecdad9b6cb965c4d6681c02f0bbc66a6")
channelHash, _ := hex.DecodeString("2556ed1cab9d17f2a9392030a9ad7f5d138f11bd")
name := internal.NormalizeName("@Styxhexenhammer666")
// txNum := uint32(0x6284e3)
// position := uint16(0x0)
// For stream claim
// txNum := uint32(0x369e2b2)
// position := uint16(0x0)
// typ := uint8(prefixes.ACTIVATED_CLAIM_TXO_TYPE)
var rawPrefix byte = prefixes.ChannelToClaim
var startRaw []byte = nil
prefix := []byte{rawPrefix}
columnFamily := string(prefix)
// start := prefixes.NewClaimTakeoverKey(name)
// start := prefixes.NewActiveAmountKey(channelHash, prefixes.ACTIVATED_SUPPORT_TXO_TYPE, 0)
start := prefixes.NewChannelToClaimKey(channelHash, name)
startRaw = start.PackKey()
// start := &prefixes.ChannelCountKey{
// Prefix: prefix,
// ChannelHash: channelHash,
// }
// startRaw := prefixes.ChannelCountKeyPackPartial(start, 1)
// startRaw := start.PackKey()
options := &db.IterOptions{
FillCache: false,
Prefix: prefix,
Start: startRaw,
Stop: nil,
IncludeStart: true,
IncludeStop: false,
IncludeKey: true,
IncludeValue: true,
RawKey: true,
RawValue: true,
}
dbVal, handles, err := db.GetDBCF("/mnt/d/data/snapshot_1072108/lbry-rocksdb/", columnFamily)
if err != nil {
log.Fatalln(err)
}
options.CfHandle = handles[1]
db.ReadWriteRawNColumnFamilies(dbVal, options, fmt.Sprintf("./testdata/%s_resolve.csv", columnFamily), 1)
return
}
conn, err := grpc.Dial("localhost:"+args.Port,

View file

@ -35,7 +35,7 @@ func TestUDPPing(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
toAddr := "spv17.lbry.com"
toAddr := "spv15.lbry.com"
toPort := "50001"
pong, err := server.UDPPing(toAddr, toPort)