diff --git a/db/db.go b/db/db.go index b659ff4..4b2823f 100644 --- a/db/db.go +++ b/db/db.go @@ -16,6 +16,7 @@ import ( "github.com/lbryio/herald.go/internal" "github.com/lbryio/herald.go/internal/metrics" pb "github.com/lbryio/herald.go/protobuf/go" + "github.com/lbryio/lbry.go/v3/extras/stop" "github.com/linxGnu/grocksdb" log "github.com/sirupsen/logrus" @@ -61,6 +62,7 @@ type ReadOnlyDBColumnFamily struct { FilteredChannels map[string][]byte OpenIterators map[string][]chan struct{} ItMut sync.RWMutex + Grp *stop.Group ShutdownChan chan struct{} DoneChan chan struct{} ShutdownCalled bool @@ -339,17 +341,22 @@ func interruptRequested(interrupted <-chan struct{}) bool { func IterCF(db *grocksdb.DB, opts *IterOptions) <-chan *prefixes.PrefixRowKV { ch := make(chan *prefixes.PrefixRowKV) - iterKey := fmt.Sprintf("%p", opts) + // iterKey := fmt.Sprintf("%p", opts) if opts.DB != nil { - opts.DB.ItMut.Lock() - // There is a tiny chance that we were wating on the above lock while shutdown was - // being called and by the time we get it the db has already notified all active - // iterators to shutdown. In this case we go to the else branch. - if !opts.DB.ShutdownCalled { - opts.DB.OpenIterators[iterKey] = []chan struct{}{opts.DoneChan, opts.ShutdownChan} - opts.DB.ItMut.Unlock() - } else { - opts.DB.ItMut.Unlock() + // opts.DB.ItMut.Lock() + // // There is a tiny chance that we were wating on the above lock while shutdown was + // // being called and by the time we get it the db has already notified all active + // // iterators to shutdown. In this case we go to the else branch. + // if !opts.DB.ShutdownCalled { + // opts.DB.OpenIterators[iterKey] = []chan struct{}{opts.DoneChan, opts.ShutdownChan} + // opts.DB.ItMut.Unlock() + // } else { + // opts.DB.ItMut.Unlock() + // return ch + // } + if opts.DB.ShutdownCalled && opts.Grp != nil { + // opts.Grp.DoneNamed(iterKey) + opts.Grp.Done() return ch } } @@ -369,11 +376,13 @@ func IterCF(db *grocksdb.DB, opts *IterOptions) <-chan *prefixes.PrefixRowKV { it.Close() close(ch) ro.Destroy() - if opts.DB != nil { - opts.DoneChan <- struct{}{} - opts.DB.ItMut.Lock() - delete(opts.DB.OpenIterators, iterKey) - opts.DB.ItMut.Unlock() + if opts.DB != nil && opts.Grp != nil { + // opts.Grp.DoneNamed(iterKey) + opts.Grp.Done() + // opts.DoneChan <- struct{}{} + // opts.DB.ItMut.Lock() + // delete(opts.DB.OpenIterators, iterKey) + // opts.DB.ItMut.Unlock() } }() @@ -394,7 +403,7 @@ func IterCF(db *grocksdb.DB, opts *IterOptions) <-chan *prefixes.PrefixRowKV { if kv = opts.ReadRow(&prevKey); kv != nil { ch <- kv } - if interruptRequested(opts.ShutdownChan) { + if opts.Grp != nil && interruptRequested(opts.Grp.Ch()) { return } } @@ -687,24 +696,26 @@ func (db *ReadOnlyDBColumnFamily) Unwind() { // Shutdown shuts down the db. func (db *ReadOnlyDBColumnFamily) Shutdown() { - log.Println("Sending message to ShutdownChan...") - db.ShutdownChan <- struct{}{} - log.Println("Locking iterator mutex...") - db.ItMut.Lock() - log.Println("Setting ShutdownCalled to true...") db.ShutdownCalled = true - log.Println("Notifying iterators to shutdown...") - for _, it := range db.OpenIterators { - it[1] <- struct{}{} - } - log.Println("Waiting for iterators to shutdown...") - for _, it := range db.OpenIterators { - <-it[0] - } - log.Println("Unlocking iterator mutex...") - db.ItMut.Unlock() - log.Println("Sending message to DoneChan...") - <-db.DoneChan + db.Grp.StopAndWait() + // log.Println("Sending message to ShutdownChan...") + // db.ShutdownChan <- struct{}{} + // log.Println("Locking iterator mutex...") + // db.ItMut.Lock() + // log.Println("Setting ShutdownCalled to true...") + // db.ShutdownCalled = true + // log.Println("Notifying iterators to shutdown...") + // for _, it := range db.OpenIterators { + // it[1] <- struct{}{} + // } + // log.Println("Waiting for iterators to shutdown...") + // for _, it := range db.OpenIterators { + // <-it[0] + // } + // log.Println("Unlocking iterator mutex...") + // db.ItMut.Unlock() + // log.Println("Sending message to DoneChan...") + // <-db.DoneChan log.Println("Calling cleanup...") db.Cleanup() log.Println("Leaving Shutdown...") diff --git a/db/iteroptions.go b/db/iteroptions.go index 6d1ea7f..f960a69 100644 --- a/db/iteroptions.go +++ b/db/iteroptions.go @@ -6,6 +6,7 @@ import ( "bytes" "github.com/lbryio/herald.go/db/prefixes" + "github.com/lbryio/lbry.go/v3/extras/stop" "github.com/linxGnu/grocksdb" log "github.com/sirupsen/logrus" @@ -22,12 +23,13 @@ type IterOptions struct { IncludeValue bool RawKey bool RawValue bool - ShutdownChan chan struct{} - DoneChan chan struct{} - DB *ReadOnlyDBColumnFamily - CfHandle *grocksdb.ColumnFamilyHandle - It *grocksdb.Iterator - Serializer *prefixes.SerializationAPI + Grp *stop.Group + // ShutdownChan chan struct{} + // DoneChan chan struct{} + DB *ReadOnlyDBColumnFamily + CfHandle *grocksdb.ColumnFamilyHandle + It *grocksdb.Iterator + Serializer *prefixes.SerializationAPI } // NewIterateOptions creates a defualt options structure for a db iterator. @@ -43,12 +45,13 @@ func NewIterateOptions() *IterOptions { IncludeValue: false, RawKey: false, RawValue: false, - ShutdownChan: make(chan struct{}, 1), - DoneChan: make(chan struct{}, 1), - DB: nil, - CfHandle: nil, - It: nil, - Serializer: prefixes.ProductionAPI, + Grp: nil, + // ShutdownChan: make(chan struct{}, 1), + // DoneChan: make(chan struct{}, 1), + DB: nil, + CfHandle: nil, + It: nil, + Serializer: prefixes.ProductionAPI, } } @@ -109,6 +112,11 @@ func (o *IterOptions) WithRawValue(rawValue bool) *IterOptions { func (o *IterOptions) WithDB(db *ReadOnlyDBColumnFamily) *IterOptions { o.DB = db + // o.Grp = stop.NewDebug(db.Grp) + // iterKey := fmt.Sprintf("%p", o) + // o.Grp.AddNamed(1, iterKey) + o.Grp = stop.New(db.Grp) + o.Grp.Add(1) return o } diff --git a/main.go b/main.go index 1e42b45..424e29a 100644 --- a/main.go +++ b/main.go @@ -10,6 +10,7 @@ import ( "github.com/lbryio/herald.go/internal" pb "github.com/lbryio/herald.go/protobuf/go" "github.com/lbryio/herald.go/server" + "github.com/lbryio/lbry.go/v3/extras/stop" log "github.com/sirupsen/logrus" "google.golang.org/grpc" ) @@ -27,13 +28,16 @@ func main() { if args.CmdType == server.ServeCmd { // This will cancel goroutines with the server finishes. - ctxWCancel, cancel := context.WithCancel(ctx) - defer cancel() + // ctxWCancel, cancel := context.WithCancel(ctx) + // defer cancel() + stopGroup := stop.NewDebug() + // defer stopGroup.Stop() - initsignals() + initsignals(stopGroup.Ch()) interrupt := interruptListener() - s := server.MakeHubServer(ctxWCancel, args) + // s := server.MakeHubServer(ctxWCancel, args) + s := server.MakeHubServer(stopGroup, args) go s.Run() defer func() { diff --git a/server/federation_test.go b/server/federation_test.go index b1064fc..04e74b8 100644 --- a/server/federation_test.go +++ b/server/federation_test.go @@ -13,6 +13,7 @@ import ( "github.com/lbryio/herald.go/internal/metrics" pb "github.com/lbryio/herald.go/protobuf/go" server "github.com/lbryio/herald.go/server" + "github.com/lbryio/lbry.go/v3/extras/stop" dto "github.com/prometheus/client_model/go" "google.golang.org/grpc" ) @@ -46,7 +47,8 @@ func removeFile(fileName string) { // TestAddPeer tests the ability to add peers func TestAddPeer(t *testing.T) { - ctx := context.Background() + // ctx := context.Background() + ctx := stop.NewDebug() args := server.MakeDefaultTestArgs() tests := []struct { @@ -104,7 +106,8 @@ func TestAddPeer(t *testing.T) { // TestPeerWriter tests that peers get written properly func TestPeerWriter(t *testing.T) { - ctx := context.Background() + // ctx := context.Background() + ctx := stop.NewDebug() args := server.MakeDefaultTestArgs() args.DisableWritePeers = false @@ -160,7 +163,8 @@ func TestPeerWriter(t *testing.T) { // TestAddPeerEndpoint tests the ability to add peers func TestAddPeerEndpoint(t *testing.T) { - ctx := context.Background() + // ctx := context.Background() + ctx := stop.NewDebug() args := server.MakeDefaultTestArgs() args2 := server.MakeDefaultTestArgs() args2.Port = "50052" @@ -231,7 +235,8 @@ func TestAddPeerEndpoint(t *testing.T) { // TestAddPeerEndpoint2 tests the ability to add peers func TestAddPeerEndpoint2(t *testing.T) { - ctx := context.Background() + // ctx := context.Background() + ctx := stop.NewDebug() args := server.MakeDefaultTestArgs() args2 := server.MakeDefaultTestArgs() args3 := server.MakeDefaultTestArgs() @@ -312,7 +317,8 @@ func TestAddPeerEndpoint2(t *testing.T) { // TestAddPeerEndpoint3 tests the ability to add peers func TestAddPeerEndpoint3(t *testing.T) { - ctx := context.Background() + // ctx := context.Background() + ctx := stop.NewDebug() args := server.MakeDefaultTestArgs() args2 := server.MakeDefaultTestArgs() args3 := server.MakeDefaultTestArgs() @@ -401,7 +407,8 @@ func TestAddPeerEndpoint3(t *testing.T) { // TestAddPeer tests the ability to add peers func TestUDPServer(t *testing.T) { - ctx := context.Background() + // ctx := context.Background() + ctx := stop.NewDebug() args := server.MakeDefaultTestArgs() args.DisableStartUDP = false args2 := server.MakeDefaultTestArgs() diff --git a/server/notifier_test.go b/server/notifier_test.go index 0e12879..d482b60 100644 --- a/server/notifier_test.go +++ b/server/notifier_test.go @@ -1,7 +1,6 @@ package server_test import ( - "context" "encoding/hex" "fmt" "net" @@ -10,6 +9,7 @@ import ( "github.com/lbryio/herald.go/internal" "github.com/lbryio/herald.go/server" + "github.com/lbryio/lbry.go/v3/extras/stop" "github.com/sirupsen/logrus" ) @@ -48,7 +48,8 @@ func tcpRead(conn net.Conn) ([]byte, error) { func TestNotifierServer(t *testing.T) { args := server.MakeDefaultTestArgs() - ctx := context.Background() + // ctx := context.Background() + ctx := stop.NewDebug() hub := server.MakeHubServer(ctx, args) go hub.NotifierServer() diff --git a/server/search_test.go b/server/search_test.go index fe3a846..cd5a50a 100644 --- a/server/search_test.go +++ b/server/search_test.go @@ -11,6 +11,7 @@ import ( pb "github.com/lbryio/herald.go/protobuf/go" server "github.com/lbryio/herald.go/server" + "github.com/lbryio/lbry.go/v3/extras/stop" "github.com/olivere/elastic/v7" ) @@ -55,13 +56,14 @@ func TestSearch(t *testing.T) { w.Write([]byte(resp)) } - context := context.Background() + ctx := context.Background() + stopGroup := stop.NewDebug() args := server.MakeDefaultTestArgs() - hubServer := server.MakeHubServer(context, args) + hubServer := server.MakeHubServer(stopGroup, args) req := &pb.SearchRequest{ Text: "asdf", } - out, err := hubServer.Search(context, req) + out, err := hubServer.Search(ctx, req) if err != nil { log.Println(err) } diff --git a/server/server.go b/server/server.go index 4bb1bb6..0addc04 100644 --- a/server/server.go +++ b/server/server.go @@ -22,6 +22,7 @@ import ( "github.com/lbryio/herald.go/meta" pb "github.com/lbryio/herald.go/protobuf/go" "github.com/lbryio/lbcd/chaincfg" + "github.com/lbryio/lbry.go/v3/extras/stop" "github.com/olivere/elastic/v7" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" @@ -53,6 +54,7 @@ type Server struct { HeightSubs map[net.Addr]net.Conn HeightSubsMut sync.RWMutex NotifierChan chan interface{} + Grp *stop.Group sessionManager *sessionManager pb.UnimplementedHubServer } @@ -217,7 +219,7 @@ func LoadDatabase(args *Args) (*db.ReadOnlyDBColumnFamily, error) { // MakeHubServer takes the arguments given to a hub when it's started and // initializes everything. It loads information about previously known peers, // creates needed internal data structures, and initializes goroutines. -func MakeHubServer(ctx context.Context, args *Args) *Server { +func MakeHubServer(grp *stop.Group, args *Args) *Server { grpcServer := grpc.NewServer(grpc.NumStreamWorkers(0)) multiSpaceRe, err := regexp.Compile(`\s{2,}`) @@ -272,6 +274,7 @@ func MakeHubServer(ctx context.Context, args *Args) *Server { if err != nil { logrus.Warning(err) } + myDB.Grp = stop.NewDebug(grp) } dbChain := (*chaincfg.Params)(nil) @@ -333,6 +336,7 @@ func MakeHubServer(ctx context.Context, args *Args) *Server { HeightSubs: make(map[net.Addr]net.Conn), HeightSubsMut: sync.RWMutex{}, NotifierChan: make(chan interface{}), + Grp: grp, sessionManager: newSessionManager(myDB, &chain, args.MaxSessions, args.SessionTimeout), } diff --git a/signal.go b/signal.go index 7eab8a5..a2561a6 100644 --- a/signal.go +++ b/signal.go @@ -8,12 +8,13 @@ import ( "os" "os/signal" + "github.com/lbryio/lbry.go/v3/extras/stop" log "github.com/sirupsen/logrus" ) // shutdownRequestChannel is used to initiate shutdown from one of the // subsystems using the same code paths as when an interrupt signal is received. -var shutdownRequestChannel = make(chan struct{}) +var shutdownRequestChannel = make(stop.Chan) // interruptSignals defines the default signals to catch in order to do a proper // shutdown. This may be modified during init depending on the platform. diff --git a/signalsigterm.go b/signalsigterm.go index dbd9750..c9435fc 100644 --- a/signalsigterm.go +++ b/signalsigterm.go @@ -10,9 +10,12 @@ package main import ( "os" "syscall" + + "github.com/lbryio/lbry.go/v3/extras/stop" ) // initsignals sets the signals to be caught by the signal handler -func initsignals() { +func initsignals(stopCh stop.Chan) { + shutdownRequestChannel = stopCh interruptSignals = []os.Signal{os.Interrupt, syscall.SIGTERM} }