diff --git a/db/db.go b/db/db.go index 00cc9a7..1c21f9f 100644 --- a/db/db.go +++ b/db/db.go @@ -8,6 +8,7 @@ import ( "encoding/hex" "fmt" "os" + "sync" "time" "github.com/lbryio/herald.go/db/prefixes" @@ -58,6 +59,8 @@ type ReadOnlyDBColumnFamily struct { BlockedChannels map[string][]byte FilteredStreams map[string][]byte FilteredChannels map[string][]byte + OpenIterators map[string][]chan struct{} + ItMut sync.RWMutex ShutdownChan chan struct{} DoneChan chan struct{} Cleanup func() @@ -318,6 +321,20 @@ func intMin(a, b int) int { return b } +// FIXME: This was copied from the signal.go file, maybe move it to a more common place? +// interruptRequested returns true when the channel returned by +// interruptListener was closed. This simplifies early shutdown slightly since +// the caller can just use an if statement instead of a select. +func interruptRequested(interrupted <-chan struct{}) bool { + select { + case <-interrupted: + return true + default: + } + + return false +} + func IterCF(db *grocksdb.DB, opts *IterOptions) <-chan *prefixes.PrefixRowKV { ch := make(chan *prefixes.PrefixRowKV) @@ -325,14 +342,27 @@ func IterCF(db *grocksdb.DB, opts *IterOptions) <-chan *prefixes.PrefixRowKV { ro.SetFillCache(opts.FillCache) it := db.NewIteratorCF(ro, opts.CfHandle) opts.It = it + iterKey := fmt.Sprintf("%p", opts) it.Seek(opts.Prefix) if opts.Start != nil { it.Seek(opts.Start) } + if opts.DB != nil { + opts.DB.ItMut.Lock() + opts.DB.OpenIterators[iterKey] = []chan struct{}{opts.DoneChan, opts.ShutdownChan} + opts.DB.ItMut.Unlock() + } + go func() { defer func() { + if opts.DB != nil { + opts.DB.ItMut.Lock() + delete(opts.DB.OpenIterators, iterKey) + opts.DB.ItMut.Unlock() + opts.DoneChan <- struct{}{} + } it.Close() close(ch) ro.Destroy() @@ -355,6 +385,9 @@ func IterCF(db *grocksdb.DB, opts *IterOptions) <-chan *prefixes.PrefixRowKV { if kv = opts.ReadRow(&prevKey); kv != nil { ch <- kv } + if interruptRequested(opts.ShutdownChan) { + return + } } }() @@ -514,6 +547,7 @@ func GetProdDB(name string, secondaryPath string) (*ReadOnlyDBColumnFamily, func } db, err := GetDBColumnFamilies(name, secondaryPath, cfNames) + db.OpenIterators = make(map[string][]chan struct{}) cleanupFiles := func() { err = os.RemoveAll(secondaryPath) @@ -642,7 +676,16 @@ func (db *ReadOnlyDBColumnFamily) Unwind() { // Shutdown shuts down the db. func (db *ReadOnlyDBColumnFamily) Shutdown() { + // FIXME: Do we need to shutdown the iterators first? db.ShutdownChan <- struct{}{} + db.ItMut.Lock() + for _, it := range db.OpenIterators { + it[0] <- struct{}{} + } + for _, it := range db.OpenIterators { + <-it[1] + } + db.ItMut.Unlock() <-db.DoneChan db.Cleanup() } diff --git a/db/iteroptions.go b/db/iteroptions.go index 4508cec..aa728a3 100644 --- a/db/iteroptions.go +++ b/db/iteroptions.go @@ -22,6 +22,9 @@ type IterOptions struct { IncludeValue bool RawKey bool RawValue bool + ShutdownChan chan struct{} + DoneChan chan struct{} + DB *ReadOnlyDBColumnFamily CfHandle *grocksdb.ColumnFamilyHandle It *grocksdb.Iterator Serializer *prefixes.SerializationAPI @@ -40,6 +43,9 @@ func NewIterateOptions() *IterOptions { IncludeValue: false, RawKey: false, RawValue: false, + ShutdownChan: make(chan struct{}), + DoneChan: make(chan struct{}), + DB: nil, CfHandle: nil, It: nil, Serializer: prefixes.ProductionAPI, @@ -101,6 +107,11 @@ func (o *IterOptions) WithRawValue(rawValue bool) *IterOptions { return o } +func (o *IterOptions) WithDB(db *ReadOnlyDBColumnFamily) *IterOptions { + o.DB = db + return o +} + func (o *IterOptions) WithSerializer(serializer *prefixes.SerializationAPI) *IterOptions { o.Serializer = serializer return o