some cleanup and adding arguments and db load / refresh to server command

This commit is contained in:
Jeffrey Picard 2022-03-02 10:39:06 -05:00
parent cff780bc74
commit 44fb309a7b
5 changed files with 56 additions and 68 deletions

View file

@ -249,13 +249,6 @@ func (ps *PathSegment) String() string {
return ps.name
}
// // BisectRight returns the index of the first element in the list that is greater than or equal to the value.
// // https://stackoverflow.com/questions/29959506/is-there-a-go-analog-of-pythons-bisect-module
// func BisectRight(arr []uint32, val uint32) uint32 {
// i := sort.Search(len(arr), func(i int) bool { return arr[i] >= val })
// return uint32(i)
// }
// BisectRight returns the index of the first element in the list that is greater than or equal to the value.
// https://stackoverflow.com/questions/29959506/is-there-a-go-analog-of-pythons-bisect-module
func BisectRight(arr []interface{}, val uint32) uint32 {
@ -307,7 +300,7 @@ func (opts *IterOptions) ReadRow(ch chan *prefixes.PrefixRowKV, prevKey *[]byte)
// We need to check the current key if we're not including the stop
// key.
if !opts.IncludeStop && opts.StopIteration(keyData) {
log.Println("returning false")
log.Println("ReadRow returning false")
return false
}
@ -359,12 +352,10 @@ func IterCF(db *grocksdb.DB, opts *IterOptions) <-chan *prefixes.PrefixRowKV {
ro := grocksdb.NewDefaultReadOptions()
ro.SetFillCache(opts.FillCache)
it := db.NewIteratorCF(ro, opts.CfHandle)
// it := db.NewIterator(ro)
opts.It = it
it.Seek(opts.Prefix)
if opts.Start != nil {
log.Println("Seeking to start")
it.Seek(opts.Start)
}
@ -374,16 +365,13 @@ func IterCF(db *grocksdb.DB, opts *IterOptions) <-chan *prefixes.PrefixRowKV {
var prevKey []byte = nil
if !opts.IncludeStart {
log.Println("Not including start")
it.Next()
}
if !it.Valid() && opts.IncludeStop {
log.Println("Not valid, but including stop")
opts.ReadRow(ch, &prevKey)
}
var continueIter bool = true
for ; continueIter && !opts.StopIteration(prevKey) && it.Valid(); it.Next() {
//log.Println("Main loop")
continueIter = opts.ReadRow(ch, &prevKey)
}
}()
@ -457,7 +445,25 @@ func GetWriteDBCF(name string) (*grocksdb.DB, []*grocksdb.ColumnFamilyHandle, er
return db, handles, nil
}
func GetDBColumnFamlies(name string, cfNames []string) (*ReadOnlyDBColumnFamily, error) {
// GetProdDB returns a db that is used for production.
func GetProdDB(name string, secondaryPath string) (*ReadOnlyDBColumnFamily, error) {
prefixNames := prefixes.GetPrefixes()
// additional prefixes that aren't in the code explicitly
cfNames := []string{"default", "e", "d", "c"}
for _, prefix := range prefixNames {
cfName := string(prefix)
cfNames = append(cfNames, cfName)
}
db, err := GetDBColumnFamlies(name, secondaryPath, cfNames)
if err != nil {
return nil, err
}
return db, nil
}
func GetDBColumnFamlies(name string, secondayPath string, cfNames []string) (*ReadOnlyDBColumnFamily, error) {
opts := grocksdb.NewDefaultOptions()
roOpts := grocksdb.NewDefaultReadOptions()
cfOpt := grocksdb.NewDefaultOptions()
@ -468,7 +474,7 @@ func GetDBColumnFamlies(name string, cfNames []string) (*ReadOnlyDBColumnFamily,
cfOpts[i] = cfOpt
}
db, handles, err := grocksdb.OpenDbAsSecondaryColumnFamilies(opts, name, "asdf", cfNames, cfOpts)
db, handles, err := grocksdb.OpenDbAsSecondaryColumnFamilies(opts, name, secondayPath, cfNames, cfOpts)
// db, handles, err := grocksdb.OpenDbColumnFamilies(opts, name, cfNames, cfOpts)
if err != nil {
@ -706,45 +712,6 @@ func ReadDBState(db *ReadOnlyDBColumnFamily) error {
}
return nil
/*
state := db.DBState
if state == nil {
db.DBHeight = -1
db.DBTxCount = 0
db.DBTip = make([]byte, 32)
db.DBVersion = max(db.DBVersions)
db.UTXOFlushCount = 0
db.WallTime = 0
db.FirstSync = true
db.HistFlushCount = 0
db.HistCompFlushCount = -1
db.HistCompCursor = -1
db.HistDBVersion = max(db.DBVersions)
db.ESSyncHeight = 0
} else {
db.DBVersion = state.DBVersion
if db.DBVersion != db.DBVersions[0] {
panic(f"DB version {db.DBVersion} not supported")
}
// backwards compat
genesisHash := state.Genesis
if !bytes.Equal(genesisHash, db.Coin.GENESIS_HASH) {
panic(f"DB genesis hash {genesisHash} does not match coin {db.Coin.GENESIS_HASH}")
}
db.DBHeight = state.Height
db.DBTxCount = state.TxCount
db.DBTip = state.Tip
db.UTXOFlushCount = state.UTXOFlushCount
db.WallTime = state.WallTime
db.FirstSync = state.FirstSync
db.HistFlushCount = state.HistFlushCount
db.HistCompFlushCount = state.HistCompFlushCount
db.HistCompCursor = state.HistCompCursor
db.HistDBVersion = state.HistDBVersion
db.ESSyncHeight = state.ESSyncHeight
}
*/
}
func InitHeaders(db *ReadOnlyDBColumnFamily) error {

View file

@ -247,13 +247,8 @@ func TestCatFullDB(t *testing.T) {
// url := "lbry://@lbry"
// url := "lbry://@lbry#3fda836a92faaceedfe398225fb9b2ee2ed1f01a"
dbPath := "/mnt/d/data/snapshot_1072108/lbry-rocksdb/"
prefixNames := prefixes.GetPrefixes()
cfNames := []string{"default", "e", "d", "c"}
for _, prefix := range prefixNames {
cfName := string(prefix)
cfNames = append(cfNames, cfName)
}
db, err := dbpkg.GetDBColumnFamlies(dbPath, cfNames)
secondaryPath := "asdf"
db, err := dbpkg.GetProdDB(dbPath, secondaryPath)
toDefer := func() {
db.DB.Close()
err = os.RemoveAll("./asdf")
@ -288,13 +283,8 @@ func TestOpenFullDB(t *testing.T) {
// url := "lbry://@lbry#3fda836a92faaceedfe398225fb9b2ee2ed1f01a"
// url := "lbry://@lbry$1"
dbPath := "/mnt/d/data/snapshot_1072108/lbry-rocksdb/"
prefixNames := prefixes.GetPrefixes()
cfNames := []string{"default", "e", "d", "c"}
for _, prefix := range prefixNames {
cfName := string(prefix)
cfNames = append(cfNames, cfName)
}
db, err := dbpkg.GetDBColumnFamlies(dbPath, cfNames)
secondaryPath := "asdf"
db, err := dbpkg.GetProdDB(dbPath, secondaryPath)
toDefer := func() {
db.DB.Close()
err = os.RemoveAll("./asdf")

View file

@ -22,6 +22,7 @@ type Args struct {
CmdType int
Host string
Port string
DBPath string
EsHost string
EsPort string
PrometheusPort string
@ -37,11 +38,14 @@ type Args struct {
DisableStartUDP bool
DisableWritePeers bool
DisableFederation bool
DisableRocksDBRefresh bool
DisableResolve bool
}
const (
DefaultHost = "0.0.0.0"
DefaultPort = "50051"
DefaultDBPath = "/mnt/d/data/snapshot_1072108/lbry-rocksdb/" // FIXME
DefaultEsHost = "http://localhost"
DefaultEsIndex = "claims"
DefaultEsPort = "9200"
@ -55,6 +59,8 @@ const (
DefaultDisableStartUDP = false
DefaultDisableWritePeers = false
DefaultDisableFederation = false
DefaultDisableRockDBRefresh = true
DefaultDisableResolve = true
)
// GetEnvironment takes the environment variables as an array of strings
@ -92,6 +98,7 @@ func ParseArgs(searchRequest *pb.SearchRequest) *Args {
host := parser.String("", "rpchost", &argparse.Options{Required: false, Help: "RPC host", Default: DefaultHost})
port := parser.String("", "rpcport", &argparse.Options{Required: false, Help: "RPC port", Default: DefaultPort})
dbPath := parser.String("", "db-path", &argparse.Options{Required: false, Help: "RocksDB path", Default: DefaultDBPath})
esHost := parser.String("", "eshost", &argparse.Options{Required: false, Help: "elasticsearch host", Default: DefaultEsHost})
esPort := parser.String("", "esport", &argparse.Options{Required: false, Help: "elasticsearch port", Default: DefaultEsPort})
prometheusPort := parser.String("", "prometheus-port", &argparse.Options{Required: false, Help: "prometheus port", Default: DefaultPrometheusPort})
@ -108,6 +115,8 @@ func ParseArgs(searchRequest *pb.SearchRequest) *Args {
disableStartUdp := parser.Flag("", "disable-start-udp", &argparse.Options{Required: false, Help: "Disable start UDP ping server", Default: DefaultDisableStartUDP})
disableWritePeers := parser.Flag("", "disable-write-peers", &argparse.Options{Required: false, Help: "Disable write peer to disk as we learn about them", Default: DefaultDisableWritePeers})
disableFederation := parser.Flag("", "disable-federation", &argparse.Options{Required: false, Help: "Disable server federation", Default: DefaultDisableFederation})
disableRocksDBRefresh := parser.Flag("", "disable-rocksdb-refresh", &argparse.Options{Required: false, Help: "Disable rocksdb refreshing", Default: DefaultDisableRockDBRefresh})
disableResolve := parser.Flag("", "disable-resolve", &argparse.Options{Required: false, Help: "Disable resolve endpoint (and rocksdb loading)", Default: DefaultDisableRockDBRefresh})
text := parser.String("", "text", &argparse.Options{Required: false, Help: "text query"})
name := parser.String("", "name", &argparse.Options{Required: false, Help: "name"})
@ -129,6 +138,7 @@ func ParseArgs(searchRequest *pb.SearchRequest) *Args {
CmdType: SearchCmd,
Host: *host,
Port: *port,
DBPath: *dbPath,
EsHost: *esHost,
EsPort: *esPort,
PrometheusPort: *prometheusPort,
@ -144,6 +154,8 @@ func ParseArgs(searchRequest *pb.SearchRequest) *Args {
DisableStartUDP: *disableStartUdp,
DisableWritePeers: *disableWritePeers,
DisableFederation: *disableFederation,
DisableRocksDBRefresh: *disableRocksDBRefresh,
DisableResolve: *disableResolve,
}
if esHost, ok := environment["ELASTIC_HOST"]; ok {

View file

@ -49,6 +49,7 @@ func makeDefaultArgs() *server.Args {
CmdType: server.ServeCmd,
Host: server.DefaultHost,
Port: server.DefaultPort,
DBPath: server.DefaultDBPath,
EsHost: server.DefaultEsHost,
EsPort: server.DefaultEsPort,
PrometheusPort: server.DefaultPrometheusPort,
@ -63,6 +64,8 @@ func makeDefaultArgs() *server.Args {
DisableStartPrometheus: true,
DisableStartUDP: true,
DisableWritePeers: true,
DisableRocksDBRefresh: true,
DisableResolve: true,
}
return args

View file

@ -14,6 +14,7 @@ import (
"time"
"github.com/ReneKroon/ttlcache/v2"
"github.com/lbryio/hub/db"
"github.com/lbryio/hub/internal/metrics"
"github.com/lbryio/hub/meta"
pb "github.com/lbryio/hub/protobuf/go"
@ -29,6 +30,7 @@ type Server struct {
Args *Args
MultiSpaceRe *regexp.Regexp
WeirdCharsRe *regexp.Regexp
DB *db.ReadOnlyDBColumnFamily
EsClient *elastic.Client
QueryCache *ttlcache.Cache
S256 *hash.Hash
@ -184,11 +186,22 @@ func MakeHubServer(ctx context.Context, args *Args) *Server {
numSubs := new(int64)
*numSubs = 0
//TODO: is this the right place to load the db?
var myDB *db.ReadOnlyDBColumnFamily
if !args.DisableResolve {
myDB, err = db.GetProdDB(args.DBPath, "readonlytmp")
if err != nil {
// Can't load the db, fail loudly
log.Fatalln(err)
}
}
s := &Server{
GrpcServer: grpcServer,
Args: args,
MultiSpaceRe: multiSpaceRe,
WeirdCharsRe: weirdCharsRe,
DB: myDB,
EsClient: client,
QueryCache: cache,
S256: &s256,
@ -205,6 +218,9 @@ func MakeHubServer(ctx context.Context, args *Args) *Server {
}
// Start up our background services
if !args.DisableResolve && !args.DisableRocksDBRefresh {
db.RunDetectChanges(myDB)
}
if !args.DisableStartPrometheus {
go s.prometheusEndpoint(s.Args.PrometheusPort, "metrics")
}