changes per code review

This commit is contained in:
Jeffrey Picard 2022-04-20 18:29:37 +00:00
parent ca840aea7f
commit ef5f42b199
8 changed files with 89 additions and 73 deletions

View file

@ -10,8 +10,8 @@ import (
"os"
"time"
"github.com/lbryio/hub/db/db_stack"
"github.com/lbryio/hub/db/prefixes"
"github.com/lbryio/hub/db/stack"
"github.com/lbryio/hub/internal"
"github.com/lbryio/hub/internal/metrics"
pb "github.com/lbryio/hub/protobuf/go"
@ -48,10 +48,10 @@ type ReadOnlyDBColumnFamily struct {
DB *grocksdb.DB
Handles map[string]*grocksdb.ColumnFamilyHandle
Opts *grocksdb.ReadOptions
TxCounts *db_stack.SliceBackedStack
TxCounts *stack.SliceBacked
Height uint32
LastState *prefixes.DBStateValue
Headers *db_stack.SliceBackedStack
Headers *stack.SliceBacked
BlockingChannelHashes [][]byte
FilteringChannelHashes [][]byte
BlockedStreams map[string][]byte
@ -410,6 +410,7 @@ func Iter(db *grocksdb.DB, opts *IterOptions) <-chan *prefixes.PrefixRowKV {
// GetDB functions that open and return a db
//
// GetWriteDBCF opens a db for writing with all columns families opened.
func GetWriteDBCF(name string) (*grocksdb.DB, []*grocksdb.ColumnFamilyHandle, error) {
opts := grocksdb.NewDefaultOptions()
cfOpt := grocksdb.NewDefaultOptions()
@ -461,6 +462,7 @@ func GetProdDB(name string, secondaryPath string) (*ReadOnlyDBColumnFamily, func
return db, cleanup, nil
}
// GetDBColumnFamilies gets a db with the specified column families and secondary path.
func GetDBColumnFamilies(name string, secondayPath string, cfNames []string) (*ReadOnlyDBColumnFamily, error) {
opts := grocksdb.NewDefaultOptions()
roOpts := grocksdb.NewDefaultReadOptions()
@ -565,6 +567,7 @@ func (db *ReadOnlyDBColumnFamily) Unwind() {
db.Headers.Pop()
}
// Shutdown shuts down the db.
func (db *ReadOnlyDBColumnFamily) Shutdown() {
db.ShutdownChan <- struct{}{}
<-db.DoneChan
@ -709,7 +712,7 @@ func (db *ReadOnlyDBColumnFamily) InitHeaders() error {
}
//TODO: figure out a reasonable default and make it a constant
db.Headers = db_stack.NewSliceBackedStack(12000)
db.Headers = stack.NewSliceBacked(12000)
startKey := prefixes.NewHeaderKey(0)
// endKey := prefixes.NewHeaderKey(db.LastState.Height)
@ -736,7 +739,7 @@ func (db *ReadOnlyDBColumnFamily) InitTxCounts() error {
return err
}
db.TxCounts = db_stack.NewSliceBackedStack(InitialTxCountSize)
db.TxCounts = stack.NewSliceBacked(InitialTxCountSize)
options := NewIterateOptions().WithPrefix([]byte{prefixes.TxCount}).WithCfHandle(handle)
options = options.WithIncludeKey(false).WithIncludeValue(true).WithIncludeStop(true)
@ -769,6 +772,7 @@ func (db *ReadOnlyDBColumnFamily) RunGetBlocksAndFilters() {
}()
}
// GetBlocksAndFilters gets the blocked and filtered channels and streams from the database.
func (db *ReadOnlyDBColumnFamily) GetBlocksAndFilters() error {
blockedChannels, blockedStreams, err := db.GetStreamsAndChannelRepostedByChannelHashes(db.BlockingChannelHashes)
if err != nil {
@ -789,6 +793,7 @@ func (db *ReadOnlyDBColumnFamily) GetBlocksAndFilters() error {
return nil
}
// GetDBCF Get the database and open given column families.
func GetDBCF(name string, cf string) (*grocksdb.DB, []*grocksdb.ColumnFamilyHandle, error) {
opts := grocksdb.NewDefaultOptions()
cfOpt := grocksdb.NewDefaultOptions()
@ -808,6 +813,7 @@ func GetDBCF(name string, cf string) (*grocksdb.DB, []*grocksdb.ColumnFamilyHand
return db, handles, nil
}
// GetDB Get the database.
func GetDB(name string) (*grocksdb.DB, error) {
opts := grocksdb.NewDefaultOptions()
db, err := grocksdb.OpenDbAsSecondary(opts, name, "asdf")

View file

@ -11,10 +11,14 @@ import (
"github.com/linxGnu/grocksdb"
)
// GetExpirationHeight returns the expiration height for the given height. Uses
// the original claim expiration time.
func GetExpirationHeight(lastUpdatedHeight uint32) uint32 {
return GetExpirationHeightFull(lastUpdatedHeight, false)
}
// GetExpirationHeightFull returns the expiration height for the given height.
// Takes boolean to indicated whether to use extended or original expiration time.
func GetExpirationHeightFull(lastUpdatedHeight uint32, extended bool) uint32 {
if extended {
return lastUpdatedHeight + ExtendedClaimExpirationTime
@ -35,6 +39,7 @@ func (db *ReadOnlyDBColumnFamily) EnsureHandle(prefix byte) (*grocksdb.ColumnFam
return handle, nil
}
// GetBlockHash returns the block hash for the given height.
func (db *ReadOnlyDBColumnFamily) GetBlockHash(height uint32) ([]byte, error) {
handle, err := db.EnsureHandle(prefixes.BlockHash)
if err != nil {
@ -77,6 +82,7 @@ func (db *ReadOnlyDBColumnFamily) GetHeader(height uint32) ([]byte, error) {
return rawValue, nil
}
// GetStreamsAndChannelRepostedByChannelHashes returns a map of streams and channel hashes that are reposted by the given channel hashes.
func (db *ReadOnlyDBColumnFamily) GetStreamsAndChannelRepostedByChannelHashes(reposterChannelHashes [][]byte) (map[string][]byte, map[string][]byte, error) {
handle, err := db.EnsureHandle(prefixes.ChannelToClaim)
if err != nil {
@ -119,6 +125,7 @@ func (db *ReadOnlyDBColumnFamily) GetStreamsAndChannelRepostedByChannelHashes(re
return streams, channels, nil
}
// GetClaimsInChannelCount returns the number of claims in the given channel.
func (db *ReadOnlyDBColumnFamily) GetClaimsInChannelCount(channelHash []byte) (uint32, error) {
handle, err := db.EnsureHandle(prefixes.ChannelCount)
if err != nil {

View file

@ -1,6 +1,6 @@
package db
// iterator.go contains the implementation for iterators on rocksdb used by the hub
// iteroptions.go contains the implementation for iterators on rocksdb used by the hub
import (
"bytes"

View file

@ -1,4 +1,4 @@
package db_stack
package stack
// The db_stack package contains the implementation of a generic slice backed stack
// used for tracking various states in the hub, i.e. headers and txcounts
@ -9,21 +9,21 @@ import (
"github.com/lbryio/hub/internal"
)
type SliceBackedStack struct {
type SliceBacked struct {
slice []interface{}
len uint32
mut sync.RWMutex
}
func NewSliceBackedStack(size int) *SliceBackedStack {
return &SliceBackedStack{
func NewSliceBacked(size int) *SliceBacked {
return &SliceBacked{
slice: make([]interface{}, size),
len: 0,
mut: sync.RWMutex{},
}
}
func (s *SliceBackedStack) Push(v interface{}) {
func (s *SliceBacked) Push(v interface{}) {
s.mut.Lock()
defer s.mut.Unlock()
@ -35,7 +35,7 @@ func (s *SliceBackedStack) Push(v interface{}) {
s.len++
}
func (s *SliceBackedStack) Pop() interface{} {
func (s *SliceBacked) Pop() interface{} {
s.mut.Lock()
defer s.mut.Unlock()
@ -46,7 +46,7 @@ func (s *SliceBackedStack) Pop() interface{} {
return s.slice[s.len]
}
func (s *SliceBackedStack) Get(i uint32) interface{} {
func (s *SliceBacked) Get(i uint32) interface{} {
s.mut.RLock()
defer s.mut.RUnlock()
@ -56,7 +56,7 @@ func (s *SliceBackedStack) Get(i uint32) interface{} {
return s.slice[i]
}
func (s *SliceBackedStack) GetTip() interface{} {
func (s *SliceBacked) GetTip() interface{} {
s.mut.RLock()
defer s.mut.RUnlock()
@ -66,27 +66,27 @@ func (s *SliceBackedStack) GetTip() interface{} {
return s.slice[s.len-1]
}
func (s *SliceBackedStack) Len() uint32 {
func (s *SliceBacked) Len() uint32 {
s.mut.RLock()
defer s.mut.RUnlock()
return s.len
}
func (s *SliceBackedStack) Cap() int {
func (s *SliceBacked) Cap() int {
s.mut.RLock()
defer s.mut.RUnlock()
return cap(s.slice)
}
func (s *SliceBackedStack) GetSlice() []interface{} {
func (s *SliceBacked) GetSlice() []interface{} {
// This is not thread safe so I won't bother with locking
return s.slice
}
// This function is dangerous because it assumes underlying types
func (s *SliceBackedStack) TxCountsBisectRight(txNum, rootTxNum uint32) (uint32, uint32) {
func (s *SliceBacked) TxCountsBisectRight(txNum, rootTxNum uint32) (uint32, uint32) {
s.mut.RLock()
defer s.mut.RUnlock()

View file

@ -1,16 +1,16 @@
package db_stack_test
package stack_test
import (
"testing"
"time"
"github.com/lbryio/hub/db/db_stack"
"github.com/lbryio/hub/db/stack"
)
func TestPush(t *testing.T) {
var want uint32 = 3
stack := db_stack.NewSliceBackedStack(10)
stack := stack.NewSliceBacked(10)
stack.Push(0)
stack.Push(1)
@ -22,7 +22,7 @@ func TestPush(t *testing.T) {
}
func TestPushPop(t *testing.T) {
stack := db_stack.NewSliceBackedStack(10)
stack := stack.NewSliceBacked(10)
for i := 0; i < 5; i++ {
stack.Push(i)
@ -46,20 +46,20 @@ func TestPushPop(t *testing.T) {
}
}
func doPushes(stack *db_stack.SliceBackedStack, numPushes int) {
func doPushes(stack *stack.SliceBacked, numPushes int) {
for i := 0; i < numPushes; i++ {
stack.Push(i)
}
}
func doPops(stack *db_stack.SliceBackedStack, numPops int) {
func doPops(stack *stack.SliceBacked, numPops int) {
for i := 0; i < numPops; i++ {
stack.Pop()
}
}
func TestMultiThreaded(t *testing.T) {
stack := db_stack.NewSliceBackedStack(100000)
stack := stack.NewSliceBacked(100000)
go doPushes(stack, 100000)
go doPushes(stack, 100000)
@ -83,7 +83,7 @@ func TestMultiThreaded(t *testing.T) {
}
func TestGet(t *testing.T) {
stack := db_stack.NewSliceBackedStack(10)
stack := stack.NewSliceBacked(10)
for i := 0; i < 5; i++ {
stack.Push(i)
@ -107,7 +107,7 @@ func TestGet(t *testing.T) {
}
func TestLenCap(t *testing.T) {
stack := db_stack.NewSliceBackedStack(10)
stack := stack.NewSliceBacked(10)
if got := stack.Len(); got != 0 {
t.Errorf("got %v, want %v", got, 0)

View file

@ -1,5 +1,8 @@
package internal
// internal types that need their own file to avoid circular imports.
// HeightHash struct for the height subscription endpoint.
type HeightHash struct {
Height uint64
BlockHash []byte

View file

@ -13,8 +13,6 @@ const (
ServeCmd = iota
SearchCmd = iota
DBCmd = iota
DBCmd2 = iota
DBCmd3 = iota
)
// Args struct contains the arguments to the hub server.
@ -106,8 +104,6 @@ func ParseArgs(searchRequest *pb.SearchRequest) *Args {
serveCmd := parser.NewCommand("serve", "start the hub server")
searchCmd := parser.NewCommand("search", "claim search")
dbCmd := parser.NewCommand("db", "db testing")
dbCmd2 := parser.NewCommand("db2", "db testing")
dbCmd3 := parser.NewCommand("db3", "db testing")
host := parser.String("", "rpchost", &argparse.Options{Required: false, Help: "RPC host", Default: DefaultHost})
port := parser.String("", "rpcport", &argparse.Options{Required: false, Help: "RPC port", Default: DefaultPort})
@ -210,10 +206,6 @@ func ParseArgs(searchRequest *pb.SearchRequest) *Args {
args.CmdType = SearchCmd
} else if dbCmd.Happened() {
args.CmdType = DBCmd
} else if dbCmd2.Happened() {
args.CmdType = DBCmd2
} else if dbCmd3.Happened() {
args.CmdType = DBCmd3
}
if *text != "" {

View file

@ -146,6 +146,50 @@ func (s *Server) Run() {
}
}
func LoadDatabase(args *Args) (*db.ReadOnlyDBColumnFamily, error) {
tmpName, err := ioutil.TempDir("", "go-lbry-hub")
if err != nil {
logrus.Info(err)
log.Fatal(err)
}
logrus.Info("tmpName", tmpName)
if err != nil {
logrus.Info(err)
}
myDB, _, err := db.GetProdDB(args.DBPath, tmpName)
// dbShutdown = func() {
// db.Shutdown(myDB)
// }
if err != nil {
// Can't load the db, fail loudly
logrus.Info(err)
log.Fatalln(err)
}
blockingChannelHashes := make([][]byte, 0, 10)
filteringChannelHashes := make([][]byte, 0, 10)
for _, id := range args.BlockingChannelIds {
hash, err := hex.DecodeString(id)
if err != nil {
logrus.Warn("Invalid channel id: ", id)
}
blockingChannelHashes = append(blockingChannelHashes, hash)
}
for _, id := range args.FilteringChannelIds {
hash, err := hex.DecodeString(id)
if err != nil {
logrus.Warn("Invalid channel id: ", id)
}
filteringChannelHashes = append(filteringChannelHashes, hash)
}
myDB.BlockingChannelHashes = blockingChannelHashes
myDB.FilteringChannelHashes = filteringChannelHashes
return myDB, nil
}
// MakeHubServer takes the arguments given to a hub when it's started and
// initializes everything. It loads information about previously known peers,
// creates needed internal data structures, and initializes goroutines.
@ -200,46 +244,10 @@ func MakeHubServer(ctx context.Context, args *Args) *Server {
var myDB *db.ReadOnlyDBColumnFamily
// var dbShutdown = func() {}
if !args.DisableResolve {
tmpName, err := ioutil.TempDir("", "go-lbry-hub")
myDB, err = LoadDatabase(args)
if err != nil {
logrus.Info(err)
log.Fatal(err)
logrus.Warning(err)
}
logrus.Info("tmpName", tmpName)
if err != nil {
logrus.Info(err)
}
myDB, _, err = db.GetProdDB(args.DBPath, tmpName)
// dbShutdown = func() {
// db.Shutdown(myDB)
// }
if err != nil {
// Can't load the db, fail loudly
logrus.Info(err)
log.Fatalln(err)
}
blockingChannelHashes := make([][]byte, 0, 10)
filteringChannelHashes := make([][]byte, 0, 10)
for _, id := range args.BlockingChannelIds {
hash, err := hex.DecodeString(id)
if err != nil {
logrus.Warn("Invalid channel id: ", id)
}
blockingChannelHashes = append(blockingChannelHashes, hash)
}
for _, id := range args.FilteringChannelIds {
hash, err := hex.DecodeString(id)
if err != nil {
logrus.Warn("Invalid channel id: ", id)
}
filteringChannelHashes = append(filteringChannelHashes, hash)
}
myDB.BlockingChannelHashes = blockingChannelHashes
myDB.FilteringChannelHashes = filteringChannelHashes
}
s := &Server{