signal handling and cleanup functions
This commit is contained in:
parent
2a998ce850
commit
5402853976
4 changed files with 34 additions and 38 deletions
19
db/db.go
19
db/db.go
|
@ -448,7 +448,7 @@ func GetWriteDBCF(name string) (*grocksdb.DB, []*grocksdb.ColumnFamilyHandle, er
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetProdDB returns a db that is used for production.
|
// GetProdDB returns a db that is used for production.
|
||||||
func GetProdDB(name string, secondaryPath string) (*ReadOnlyDBColumnFamily, error) {
|
func GetProdDB(name string, secondaryPath string) (*ReadOnlyDBColumnFamily, func(), error) {
|
||||||
prefixNames := prefixes.GetPrefixes()
|
prefixNames := prefixes.GetPrefixes()
|
||||||
// additional prefixes that aren't in the code explicitly
|
// additional prefixes that aren't in the code explicitly
|
||||||
cfNames := []string{"default", "e", "d", "c"}
|
cfNames := []string{"default", "e", "d", "c"}
|
||||||
|
@ -458,11 +458,20 @@ func GetProdDB(name string, secondaryPath string) (*ReadOnlyDBColumnFamily, erro
|
||||||
}
|
}
|
||||||
|
|
||||||
db, err := GetDBColumnFamlies(name, secondaryPath, cfNames)
|
db, err := GetDBColumnFamlies(name, secondaryPath, cfNames)
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
cleanup := func() {
|
||||||
|
db.DB.Close()
|
||||||
|
err = os.RemoveAll(fmt.Sprintf("./%s", secondaryPath))
|
||||||
|
if err != nil {
|
||||||
|
log.Println(err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return db, nil
|
if err != nil {
|
||||||
|
return nil, cleanup, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return db, cleanup, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func GetDBColumnFamlies(name string, secondayPath string, cfNames []string) (*ReadOnlyDBColumnFamily, error) {
|
func GetDBColumnFamlies(name string, secondayPath string, cfNames []string) (*ReadOnlyDBColumnFamily, error) {
|
||||||
|
@ -632,7 +641,7 @@ func DetectChanges(db *ReadOnlyDBColumnFamily) error {
|
||||||
|
|
||||||
if db.LastState == nil || lastHeight < state.Height {
|
if db.LastState == nil || lastHeight < state.Height {
|
||||||
for height := lastHeight + 1; height <= state.Height; height++ {
|
for height := lastHeight + 1; height <= state.Height; height++ {
|
||||||
log.Info("advancing to", height)
|
log.Info("advancing to: ", height)
|
||||||
Advance(db, height)
|
Advance(db, height)
|
||||||
}
|
}
|
||||||
//TODO: ClearCache
|
//TODO: ClearCache
|
||||||
|
|
|
@ -249,14 +249,8 @@ func TestCatFullDB(t *testing.T) {
|
||||||
dbPath := "/mnt/sda/wallet_server/_data/lbry-rocksdb/"
|
dbPath := "/mnt/sda/wallet_server/_data/lbry-rocksdb/"
|
||||||
// dbPath := "/mnt/d/data/snapshot_1072108/lbry-rocksdb/"
|
// dbPath := "/mnt/d/data/snapshot_1072108/lbry-rocksdb/"
|
||||||
secondaryPath := "asdf"
|
secondaryPath := "asdf"
|
||||||
db, err := dbpkg.GetProdDB(dbPath, secondaryPath)
|
db, toDefer, err := dbpkg.GetProdDB(dbPath, secondaryPath)
|
||||||
toDefer := func() {
|
|
||||||
db.DB.Close()
|
|
||||||
err = os.RemoveAll("./asdf")
|
|
||||||
if err != nil {
|
|
||||||
log.Println(err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
defer toDefer()
|
defer toDefer()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
|
@ -283,18 +277,11 @@ func TestOpenFullDB(t *testing.T) {
|
||||||
// url := "lbry://@lbry"
|
// url := "lbry://@lbry"
|
||||||
// url := "lbry://@lbry#3fda836a92faaceedfe398225fb9b2ee2ed1f01a"
|
// url := "lbry://@lbry#3fda836a92faaceedfe398225fb9b2ee2ed1f01a"
|
||||||
// url := "lbry://@lbry$1"
|
// url := "lbry://@lbry$1"
|
||||||
url := "lbry://@lothrop:2/lothrop-livestream-games-and-code:c"
|
url := "https://lbry.tv/@lothrop:2/lothrop-livestream-games-and-code:c"
|
||||||
dbPath := "/mnt/sda/wallet_server/_data/lbry-rocksdb/"
|
dbPath := "/mnt/sda/wallet_server/_data/lbry-rocksdb/"
|
||||||
// dbPath := "/mnt/d/data/snapshot_1072108/lbry-rocksdb/"
|
// dbPath := "/mnt/d/data/snapshot_1072108/lbry-rocksdb/"
|
||||||
secondaryPath := "asdf"
|
secondaryPath := "asdf"
|
||||||
db, err := dbpkg.GetProdDB(dbPath, secondaryPath)
|
db, toDefer, err := dbpkg.GetProdDB(dbPath, secondaryPath)
|
||||||
toDefer := func() {
|
|
||||||
db.DB.Close()
|
|
||||||
err = os.RemoveAll("./asdf")
|
|
||||||
if err != nil {
|
|
||||||
log.Println(err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
defer toDefer()
|
defer toDefer()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
|
|
27
main.go
27
main.go
|
@ -33,27 +33,24 @@ func main() {
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
// TODO: Figure out if / where we need signal handling
|
// TODO: Figure out if / where we need signal handling
|
||||||
// interrupt := make(chan os.Signal, 1)
|
|
||||||
// signal.Notify(interrupt, os.Interrupt, syscall.SIGTERM)
|
initsignals()
|
||||||
// defer signal.Stop(interrupt)
|
interrupt := interruptListener()
|
||||||
|
|
||||||
s := server.MakeHubServer(ctxWCancel, args)
|
s := server.MakeHubServer(ctxWCancel, args)
|
||||||
s.Run()
|
go s.Run()
|
||||||
|
|
||||||
// select {
|
defer func() {
|
||||||
// case <-interrupt:
|
log.Println("Shutting down server...")
|
||||||
// break
|
|
||||||
// case <-ctx.Done():
|
|
||||||
// break
|
|
||||||
// }
|
|
||||||
|
|
||||||
// log.Println("Shutting down server...")
|
s.EsClient.Stop()
|
||||||
|
s.GrpcServer.GracefulStop()
|
||||||
|
s.DBCleanup()
|
||||||
|
|
||||||
// s.EsClient.Stop()
|
log.Println("Returning from main...")
|
||||||
// s.GrpcServer.GracefulStop()
|
}()
|
||||||
|
|
||||||
// log.Println("Returning from main...")
|
|
||||||
|
|
||||||
|
<-interrupt
|
||||||
return
|
return
|
||||||
} else if args.CmdType == server.DBCmd {
|
} else if args.CmdType == server.DBCmd {
|
||||||
options := &db.IterOptions{
|
options := &db.IterOptions{
|
||||||
|
|
|
@ -44,6 +44,7 @@ type Server struct {
|
||||||
PeerSubsMut sync.RWMutex
|
PeerSubsMut sync.RWMutex
|
||||||
NumPeerSubs *int64
|
NumPeerSubs *int64
|
||||||
ExternalIP net.IP
|
ExternalIP net.IP
|
||||||
|
DBCleanup func()
|
||||||
pb.UnimplementedHubServer
|
pb.UnimplementedHubServer
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -188,8 +189,9 @@ func MakeHubServer(ctx context.Context, args *Args) *Server {
|
||||||
|
|
||||||
//TODO: is this the right place to load the db?
|
//TODO: is this the right place to load the db?
|
||||||
var myDB *db.ReadOnlyDBColumnFamily
|
var myDB *db.ReadOnlyDBColumnFamily
|
||||||
|
var dbCleanup = func() {}
|
||||||
if !args.DisableResolve {
|
if !args.DisableResolve {
|
||||||
myDB, err = db.GetProdDB(args.DBPath, "readonlytmp")
|
myDB, dbCleanup, err = db.GetProdDB(args.DBPath, "readonlytmp")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Can't load the db, fail loudly
|
// Can't load the db, fail loudly
|
||||||
log.Fatalln(err)
|
log.Fatalln(err)
|
||||||
|
@ -215,6 +217,7 @@ func MakeHubServer(ctx context.Context, args *Args) *Server {
|
||||||
PeerSubsMut: sync.RWMutex{},
|
PeerSubsMut: sync.RWMutex{},
|
||||||
NumPeerSubs: numSubs,
|
NumPeerSubs: numSubs,
|
||||||
ExternalIP: net.IPv4(127, 0, 0, 1),
|
ExternalIP: net.IPv4(127, 0, 0, 1),
|
||||||
|
DBCleanup: dbCleanup,
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start up our background services
|
// Start up our background services
|
||||||
|
|
Loading…
Reference in a new issue