Server endpoints goroutine refactor #69

Merged
jeffreypicard merged 18 commits from server-endpoints-goroutine-refactor into master 2022-10-25 07:48:13 +02:00
6 changed files with 64 additions and 116 deletions
Showing only changes of commit 418d4171b4 - Show all commits

View file

@ -531,8 +531,7 @@ func GetWriteDBCF(name string) (*grocksdb.DB, []*grocksdb.ColumnFamilyHandle, er
}
// GetProdDB returns a db that is used for production.
func GetProdDB(name string, secondaryPath string, grp *stop.Group) (*ReadOnlyDBColumnFamily, func(), error) {
grp.Add(1)
func GetProdDB(name string, secondaryPath string, grp *stop.Group) (*ReadOnlyDBColumnFamily, error) {
prefixNames := prefixes.GetPrefixes()
// additional prefixes that aren't in the code explicitly
cfNames := []string{"default", "e", "d", "c"}
@ -551,7 +550,8 @@ func GetProdDB(name string, secondaryPath string, grp *stop.Group) (*ReadOnlyDBC
}
if err != nil {
return nil, cleanupFiles, err
cleanupFiles()
return nil, err
}
cleanupDB := func() {
@ -560,7 +560,7 @@ func GetProdDB(name string, secondaryPath string, grp *stop.Group) (*ReadOnlyDBC
}
db.Cleanup = cleanupDB
return db, cleanupDB, nil
return db, nil
}
// GetDBColumnFamilies gets a db with the specified column families and secondary path.
@ -679,6 +679,7 @@ func (db *ReadOnlyDBColumnFamily) Shutdown() {
// to keep the db readonly view up to date and handle reorgs on the
// blockchain.
func (db *ReadOnlyDBColumnFamily) RunDetectChanges(notifCh chan<- interface{}) {
db.Grp.Add(1)
go func() {
lastPrint := time.Now()
for {

View file

@ -21,7 +21,7 @@ import (
////////////////////////////////////////////////////////////////////////////////
// OpenAndFillTmpDBColumnFamlies opens a db and fills it with data from a csv file using the given column family names
func OpenAndFillTmpDBColumnFamlies(filePath string) (*dbpkg.ReadOnlyDBColumnFamily, [][]string, func(), error) {
func OpenAndFillTmpDBColumnFamlies(filePath string) (*dbpkg.ReadOnlyDBColumnFamily, [][]string, error) {
log.Println(filePath)
file, err := os.Open(filePath)
@ -31,7 +31,7 @@ func OpenAndFillTmpDBColumnFamlies(filePath string) (*dbpkg.ReadOnlyDBColumnFami
reader := csv.NewReader(file)
records, err := reader.ReadAll()
if err != nil {
return nil, nil, nil, err
return nil, nil, err
}
wOpts := grocksdb.NewDefaultWriteOptions()
@ -39,7 +39,7 @@ func OpenAndFillTmpDBColumnFamlies(filePath string) (*dbpkg.ReadOnlyDBColumnFami
opts.SetCreateIfMissing(true)
db, err := grocksdb.OpenDb(opts, "tmp")
if err != nil {
return nil, nil, nil, err
return nil, nil, err
}
var handleMap map[string]*grocksdb.ColumnFamilyHandle = make(map[string]*grocksdb.ColumnFamilyHandle)
@ -54,7 +54,7 @@ func OpenAndFillTmpDBColumnFamlies(filePath string) (*dbpkg.ReadOnlyDBColumnFami
log.Println(cfName)
handle, err := db.CreateColumnFamily(opts, cfName)
if err != nil {
return nil, nil, nil, err
return nil, nil, err
}
handleMap[cfName] = handle
}
@ -68,16 +68,16 @@ func OpenAndFillTmpDBColumnFamlies(filePath string) (*dbpkg.ReadOnlyDBColumnFami
for _, record := range records[1:] {
cf := record[0]
if err != nil {
return nil, nil, nil, err
return nil, nil, err
}
handle := handleMap[string(cf)]
key, err := hex.DecodeString(record[1])
if err != nil {
return nil, nil, nil, err
return nil, nil, err
}
val, err := hex.DecodeString(record[2])
if err != nil {
return nil, nil, nil, err
return nil, nil, err
}
db.PutCF(wOpts, handle, key, val)
}
@ -95,6 +95,7 @@ func OpenAndFillTmpDBColumnFamlies(filePath string) (*dbpkg.ReadOnlyDBColumnFami
Height: 0,
Headers: nil,
Grp: stop.New(),
Cleanup: toDefer,
}
// err = dbpkg.ReadDBState(myDB) //TODO: Figure out right place for this
@ -104,7 +105,7 @@ func OpenAndFillTmpDBColumnFamlies(filePath string) (*dbpkg.ReadOnlyDBColumnFami
err = myDB.InitTxCounts()
if err != nil {
return nil, nil, nil, err
return nil, nil, err
}
// err = dbpkg.InitHeaders(myDB)
@ -112,7 +113,7 @@ func OpenAndFillTmpDBColumnFamlies(filePath string) (*dbpkg.ReadOnlyDBColumnFami
// return nil, nil, nil, err
// }
return myDB, records, toDefer, nil
return myDB, records, nil
}
// OpenAndFillTmpDBCF opens a db and fills it with data from a csv file
@ -252,11 +253,9 @@ func TestCatFullDB(t *testing.T) {
dbPath := "/mnt/sda1/wallet_server/_data/lbry-rocksdb/"
// dbPath := "/mnt/d/data/snapshot_1072108/lbry-rocksdb/"
secondaryPath := "asdf"
db, toDefer, err := dbpkg.GetProdDB(dbPath, secondaryPath, grp)
db, err := dbpkg.GetProdDB(dbPath, secondaryPath, grp)
defer db.Shutdown()
// defer toDefer()
db.Cleanup = toDefer
if err != nil {
t.Error(err)
return
@ -287,10 +286,8 @@ func TestOpenFullDB(t *testing.T) {
dbPath := "/mnt/sda1/wallet_server/_data/lbry-rocksdb/"
// dbPath := "/mnt/d/data/snapshot_1072108/lbry-rocksdb/"
secondaryPath := "asdf"
db, toDefer, err := dbpkg.GetProdDB(dbPath, secondaryPath, grp)
db, err := dbpkg.GetProdDB(dbPath, secondaryPath, grp)
defer db.Shutdown()
// defer toDefer()
db.Cleanup = toDefer
if err != nil {
t.Error(err)
return
@ -304,14 +301,13 @@ func TestOpenFullDB(t *testing.T) {
func TestResolve(t *testing.T) {
url := "lbry://@Styxhexenhammer666#2/legacy-media-baron-les-moonves-(cbs#9"
filePath := "../testdata/FULL_resolve.csv"
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath)
db, _, err := OpenAndFillTmpDBColumnFamlies(filePath)
moodyjon commented 2022-10-19 16:18:32 +02:00 (Migrated from github.com)
Review

Have OpenAndFillTmpDBColumnFamilies() assign db.Cleanup = toDefer inside to simplify this.

...Ditto all the other places calling OpenAndFillTmpDBColumnFamilies().

Have `OpenAndFillTmpDBColumnFamilies()` assign `db.Cleanup = toDefer` inside to simplify this. ...Ditto all the other places calling `OpenAndFillTmpDBColumnFamilies()`.
defer db.Shutdown()
if err != nil {
t.Error(err)
return
}
// defer toDefer()
db.Cleanup = toDefer
expandedResolveResult := db.Resolve(url)
log.Printf("%#v\n", expandedResolveResult)
if expandedResolveResult != nil && expandedResolveResult.Channel != nil {
@ -325,13 +321,11 @@ func TestResolve(t *testing.T) {
func TestGetDBState(t *testing.T) {
filePath := "../testdata/s_resolve.csv"
want := uint32(1072108)
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath)
db, _, err := OpenAndFillTmpDBColumnFamlies(filePath)
defer db.Shutdown()
if err != nil {
t.Error(err)
}
// defer toDefer()
db.Cleanup = toDefer
state, err := db.GetDBState()
if err != nil {
t.Error(err)
@ -349,13 +343,11 @@ func TestGetRepostedClaim(t *testing.T) {
// Should be non-existent
channelHash2, _ := hex.DecodeString("2556ed1cab9d17f2a9392030a9ad7f5d138f11bf")
filePath := "../testdata/W_resolve.csv"
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath)
db, _, err := OpenAndFillTmpDBColumnFamlies(filePath)
defer db.Shutdown()
if err != nil {
t.Error(err)
}
// defer toDefer()
db.Cleanup = toDefer
count, err := db.GetRepostedCount(channelHash)
if err != nil {
@ -384,13 +376,11 @@ func TestGetRepostedCount(t *testing.T) {
// Should be non-existent
channelHash2, _ := hex.DecodeString("2556ed1cab9d17f2a9392030a9ad7f5d138f11bf")
filePath := "../testdata/j_resolve.csv"
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath)
db, _, err := OpenAndFillTmpDBColumnFamlies(filePath)
defer db.Shutdown()
if err != nil {
t.Error(err)
}
// defer toDefer()
db.Cleanup = toDefer
count, err := db.GetRepostedCount(channelHash)
if err != nil {
@ -423,13 +413,11 @@ func TestGetRepost(t *testing.T) {
channelHash2, _ := hex.DecodeString("000009ca6e0caaaef16872b4bd4f6f1b8c2363e2")
filePath := "../testdata/V_resolve.csv"
// want := uint32(3670)
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath)
db, _, err := OpenAndFillTmpDBColumnFamlies(filePath)
defer db.Shutdown()
if err != nil {
t.Error(err)
}
// defer toDefer()
db.Cleanup = toDefer
res, err := db.GetRepost(channelHash)
if err != nil {
@ -459,13 +447,12 @@ func TestGetClaimsInChannelCount(t *testing.T) {
channelHash, _ := hex.DecodeString("2556ed1cab9d17f2a9392030a9ad7f5d138f11bd")
filePath := "../testdata/Z_resolve.csv"
want := uint32(3670)
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath)
db, _, err := OpenAndFillTmpDBColumnFamlies(filePath)
defer db.Shutdown()
if err != nil {
t.Error(err)
}
// defer toDefer()
db.Cleanup = toDefer
count, err := db.GetClaimsInChannelCount(channelHash)
if err != nil {
t.Error(err)
@ -490,13 +477,12 @@ func TestGetShortClaimIdUrl(t *testing.T) {
var position uint16 = 0
filePath := "../testdata/F_resolve.csv"
log.Println(filePath)
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath)
db, _, err := OpenAndFillTmpDBColumnFamlies(filePath)
defer db.Shutdown()
if err != nil {
t.Error(err)
}
// defer toDefer()
db.Cleanup = toDefer
shortUrl, err := db.GetShortClaimIdUrl(name, normalName, claimHash, rootTxNum, position)
if err != nil {
t.Error(err)
@ -510,13 +496,11 @@ func TestClaimShortIdIter(t *testing.T) {
filePath := "../testdata/F_cat.csv"
normalName := "cat"
claimId := "0"
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath)
db, _, err := OpenAndFillTmpDBColumnFamlies(filePath)
defer db.Shutdown()
if err != nil {
t.Error(err)
}
// defer toDefer()
db.Cleanup = toDefer
ch := db.ClaimShortIdIter(normalName, claimId)
@ -542,13 +526,12 @@ func TestGetTXOToClaim(t *testing.T) {
var txNum uint32 = 1456296
var position uint16 = 0
filePath := "../testdata/G_2.csv"
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath)
db, _, err := OpenAndFillTmpDBColumnFamlies(filePath)
defer db.Shutdown()
if err != nil {
t.Error(err)
}
// defer toDefer()
db.Cleanup = toDefer
val, err := db.GetCachedClaimHash(txNum, position)
if err != nil {
t.Error(err)
@ -572,13 +555,11 @@ func TestGetClaimToChannel(t *testing.T) {
var val []byte = nil
filePath := "../testdata/I_resolve.csv"
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath)
db, _, err := OpenAndFillTmpDBColumnFamlies(filePath)
defer db.Shutdown()
if err != nil {
t.Error(err)
}
// defer toDefer()
db.Cleanup = toDefer
val, err = db.GetChannelForClaim(claimHash, txNum, position)
if err != nil {
@ -603,13 +584,11 @@ func TestGetEffectiveAmountSupportOnly(t *testing.T) {
want := uint64(20000006)
claimHashStr := "00000324e40fcb63a0b517a3660645e9bd99244a"
claimHash, _ := hex.DecodeString(claimHashStr)
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath)
db, _, err := OpenAndFillTmpDBColumnFamlies(filePath)
defer db.Shutdown()
if err != nil {
t.Error(err)
}
// defer toDefer()
db.Cleanup = toDefer
db.Height = 999999999
amount, err := db.GetEffectiveAmount(claimHash, true)
@ -635,13 +614,12 @@ func TestGetEffectiveAmount(t *testing.T) {
want := uint64(21000006)
claimHashStr := "00000324e40fcb63a0b517a3660645e9bd99244a"
claimHash, _ := hex.DecodeString(claimHashStr)
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath)
db, _, err := OpenAndFillTmpDBColumnFamlies(filePath)
defer db.Shutdown()
if err != nil {
t.Error(err)
}
// defer toDefer()
db.Cleanup = toDefer
db.Height = 999999999
amount, err := db.GetEffectiveAmount(claimHash, false)
@ -674,13 +652,12 @@ func TestGetSupportAmount(t *testing.T) {
t.Error(err)
}
filePath := "../testdata/a_resolve.csv"
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath)
db, _, err := OpenAndFillTmpDBColumnFamlies(filePath)
defer db.Shutdown()
if err != nil {
t.Error(err)
}
// defer toDefer()
db.Cleanup = toDefer
res, err := db.GetSupportAmount(claimHash)
if err != nil {
t.Error(err)
@ -696,13 +673,12 @@ func TestGetTxHash(t *testing.T) {
want := "54e14ff0c404c29b3d39ae4d249435f167d5cd4ce5a428ecb745b3df1c8e3dde"
filePath := "../testdata/X_resolve.csv"
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath)
db, _, err := OpenAndFillTmpDBColumnFamlies(filePath)
defer db.Shutdown()
if err != nil {
t.Error(err)
}
// defer toDefer()
db.Cleanup = toDefer
resHash, err := db.GetTxHash(txNum)
if err != nil {
t.Error(err)
@ -740,13 +716,12 @@ func TestGetActivation(t *testing.T) {
txNum := uint32(0x6284e3)
position := uint16(0x0)
want := uint32(0xa6b65)
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath)
db, _, err := OpenAndFillTmpDBColumnFamlies(filePath)
defer db.Shutdown()
if err != nil {
t.Error(err)
}
// defer toDefer()
db.Cleanup = toDefer
activation, err := db.GetActivation(txNum, position)
if err != nil {
t.Error(err)
@ -773,14 +748,13 @@ func TestGetClaimToTXO(t *testing.T) {
return
}
filePath := "../testdata/E_resolve.csv"
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath)
db, _, err := OpenAndFillTmpDBColumnFamlies(filePath)
defer db.Shutdown()
if err != nil {
t.Error(err)
return
}
// defer toDefer()
db.Cleanup = toDefer
res, err := db.GetCachedClaimTxo(claimHash, true)
if err != nil {
t.Error(err)
@ -804,14 +778,13 @@ func TestGetControllingClaim(t *testing.T) {
claimName := internal.NormalizeName("@Styxhexenhammer666")
claimHash := "2556ed1cab9d17f2a9392030a9ad7f5d138f11bd"
filePath := "../testdata/P_resolve.csv"
db, _, toDefer, err := OpenAndFillTmpDBColumnFamlies(filePath)
db, _, err := OpenAndFillTmpDBColumnFamlies(filePath)
defer db.Shutdown()
if err != nil {
t.Error(err)
return
}
// defer toDefer()
db.Cleanup = toDefer
res, err := db.GetControllingClaim(claimName)
if err != nil {
t.Error(err)

View file

@ -59,8 +59,8 @@ var regTestAddrs = [30]string{
func TestServerGetHeight(t *testing.T) {
secondaryPath := "asdf"
grp := stop.New()
db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath, grp)
defer toDefer()
db, err := db.GetProdDB(regTestDBPath, secondaryPath, grp)
defer db.Shutdown()
if err != nil {
t.Error(err)
return
@ -90,8 +90,8 @@ func TestServerGetHeight(t *testing.T) {
func TestGetChunk(t *testing.T) {
secondaryPath := "asdf"
grp := stop.New()
db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath, grp)
defer toDefer()
db, err := db.GetProdDB(regTestDBPath, secondaryPath, grp)
defer db.Shutdown()
if err != nil {
t.Error(err)
return
@ -134,8 +134,8 @@ func TestGetChunk(t *testing.T) {
func TestGetHeader(t *testing.T) {
secondaryPath := "asdf"
grp := stop.New()
db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath, grp)
defer toDefer()
db, err := db.GetProdDB(regTestDBPath, secondaryPath, grp)
defer db.Shutdown()
if err != nil {
t.Error(err)
return
@ -164,8 +164,8 @@ func TestGetHeader(t *testing.T) {
func TestHeaders(t *testing.T) {
secondaryPath := "asdf"
grp := stop.New()
db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath, grp)
defer toDefer()
db, err := db.GetProdDB(regTestDBPath, secondaryPath, grp)
defer db.Shutdown()
if err != nil {
t.Error(err)
return
@ -197,11 +197,8 @@ func TestHeadersSubscribe(t *testing.T) {
args := MakeDefaultTestArgs()
grp := stop.New()
secondaryPath := "asdf"
db, _, err := db.GetProdDB(regTestDBPath, secondaryPath, grp)
// defer toDefer()
db, err := db.GetProdDB(regTestDBPath, secondaryPath, grp)
defer db.Shutdown()
defer db.Grp.Done()
// db.Cleanup = toDefer
if err != nil {
t.Error(err)
return
@ -292,8 +289,8 @@ func TestHeadersSubscribe(t *testing.T) {
func TestGetBalance(t *testing.T) {
secondaryPath := "asdf"
moodyjon commented 2022-10-19 16:25:36 +02:00 (Migrated from github.com)
Review

defer db.Shutdown() to match style elsewhere.

`defer db.Shutdown()` to match style elsewhere.
grp := stop.New()
db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath, grp)
defer toDefer()
db, err := db.GetProdDB(regTestDBPath, secondaryPath, grp)
defer db.Shutdown()
if err != nil {
t.Error(err)
return
@ -322,8 +319,8 @@ func TestGetBalance(t *testing.T) {
func TestGetHistory(t *testing.T) {
secondaryPath := "asdf"
grp := stop.New()
db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath, grp)
defer toDefer()
db, err := db.GetProdDB(regTestDBPath, secondaryPath, grp)
defer db.Shutdown()
if err != nil {
t.Error(err)
return
@ -352,8 +349,8 @@ func TestGetHistory(t *testing.T) {
func TestListUnspent(t *testing.T) {
secondaryPath := "asdf"
grp := stop.New()
db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath, grp)
defer toDefer()
db, err := db.GetProdDB(regTestDBPath, secondaryPath, grp)
defer db.Shutdown()
if err != nil {
t.Error(err)
return
@ -383,8 +380,8 @@ func TestAddressSubscribe(t *testing.T) {
args := MakeDefaultTestArgs()
grp := stop.New()
secondaryPath := "asdf"
db, toDefer, err := db.GetProdDB(regTestDBPath, secondaryPath, grp)
defer toDefer()
db, err := db.GetProdDB(regTestDBPath, secondaryPath, grp)
defer db.Shutdown()
if err != nil {
t.Error(err)
return

View file

@ -128,26 +128,6 @@ fail1:
log.Errorf("RegisterTCPService: %v\n", err)
goto fail2
}
// serverFeatureSvc := &ServerFeatureService{s.Args}
// err = s1.RegisterTCPService(serverFeatureSvc, "server_features")
// if err != nil {
// log.Errorf("RegisterTCPService: %v\n", err)
// goto fail2
// }
// serverBannerSvc := &ServerBannerService{s.Args}
// err = s1.RegisterTCPService(serverBannerSvc, "server_banner")
// if err != nil {
// log.Errorf("RegisterTCPService: %v\n", err)
// goto fail2
// }
// serverVersionSvc := &ServerVersionService{s.Args}
// err = s1.RegisterTCPService(serverVersionSvc, "server_version")
// if err != nil {
// log.Errorf("RegisterTCPService: %v\n", err)
// goto fail2
// }
r := gorilla_mux.NewRouter()
r.Handle("/rpc", s1)

View file

@ -161,10 +161,7 @@ func LoadDatabase(args *Args, grp *stop.Group) (*db.ReadOnlyDBColumnFamily, erro
if err != nil {
logrus.Info(err)
}
myDB, _, err := db.GetProdDB(args.DBPath, tmpName, grp)
// dbShutdown = func() {
// db.Shutdown(myDB)
// }
myDB, err := db.GetProdDB(args.DBPath, tmpName, grp)
if err != nil {
// Can't load the db, fail loudly
logrus.Info(err)

View file

@ -206,14 +206,14 @@ func (sm *sessionManager) addSession(conn net.Conn) *session {
serverSvc := &ServerService{sm.args}
err := s1.RegisterName("server", serverSvc)
if err != nil {
log.Errorf("RegisterTCPService: %v\n", err)
log.Errorf("RegisterName: %v\n", err)
}
// Register "blockchain.claimtrie.*"" handlers.
claimtrieSvc := &ClaimtrieService{sm.db}
err = s1.RegisterName("blockchain.claimtrie", claimtrieSvc)
if err != nil {
log.Errorf("RegisterService: %v\n", err)
log.Errorf("RegisterName: %v\n", err)
}
// Register other "blockchain.{block,address,scripthash}.*" handlers.