integration testing scripts #64
2 changed files with 54 additions and 0 deletions
43
db/db.go
43
db/db.go
|
@ -8,6 +8,7 @@ import (
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/lbryio/herald.go/db/prefixes"
|
"github.com/lbryio/herald.go/db/prefixes"
|
||||||
|
@ -58,6 +59,8 @@ type ReadOnlyDBColumnFamily struct {
|
||||||
BlockedChannels map[string][]byte
|
BlockedChannels map[string][]byte
|
||||||
FilteredStreams map[string][]byte
|
FilteredStreams map[string][]byte
|
||||||
FilteredChannels map[string][]byte
|
FilteredChannels map[string][]byte
|
||||||
|
OpenIterators map[string][]chan struct{}
|
||||||
|
ItMut sync.RWMutex
|
||||||
ShutdownChan chan struct{}
|
ShutdownChan chan struct{}
|
||||||
DoneChan chan struct{}
|
DoneChan chan struct{}
|
||||||
Cleanup func()
|
Cleanup func()
|
||||||
|
@ -318,6 +321,20 @@ func intMin(a, b int) int {
|
||||||
return b
|
return b
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// FIXME: This was copied from the signal.go file, maybe move it to a more common place?
|
||||||
|
// interruptRequested returns true when the channel returned by
|
||||||
|
// interruptListener was closed. This simplifies early shutdown slightly since
|
||||||
|
// the caller can just use an if statement instead of a select.
|
||||||
|
func interruptRequested(interrupted <-chan struct{}) bool {
|
||||||
|
select {
|
||||||
|
case <-interrupted:
|
||||||
|
return true
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
func IterCF(db *grocksdb.DB, opts *IterOptions) <-chan *prefixes.PrefixRowKV {
|
func IterCF(db *grocksdb.DB, opts *IterOptions) <-chan *prefixes.PrefixRowKV {
|
||||||
ch := make(chan *prefixes.PrefixRowKV)
|
ch := make(chan *prefixes.PrefixRowKV)
|
||||||
|
|
||||||
|
@ -325,14 +342,27 @@ func IterCF(db *grocksdb.DB, opts *IterOptions) <-chan *prefixes.PrefixRowKV {
|
||||||
ro.SetFillCache(opts.FillCache)
|
ro.SetFillCache(opts.FillCache)
|
||||||
it := db.NewIteratorCF(ro, opts.CfHandle)
|
it := db.NewIteratorCF(ro, opts.CfHandle)
|
||||||
opts.It = it
|
opts.It = it
|
||||||
|
iterKey := fmt.Sprintf("%p", opts)
|
||||||
|
|
||||||
it.Seek(opts.Prefix)
|
it.Seek(opts.Prefix)
|
||||||
if opts.Start != nil {
|
if opts.Start != nil {
|
||||||
it.Seek(opts.Start)
|
it.Seek(opts.Start)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if opts.DB != nil {
|
||||||
|
opts.DB.ItMut.Lock()
|
||||||
|
opts.DB.OpenIterators[iterKey] = []chan struct{}{opts.DoneChan, opts.ShutdownChan}
|
||||||
|
opts.DB.ItMut.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
defer func() {
|
defer func() {
|
||||||
|
if opts.DB != nil {
|
||||||
|
opts.DB.ItMut.Lock()
|
||||||
|
delete(opts.DB.OpenIterators, iterKey)
|
||||||
|
opts.DB.ItMut.Unlock()
|
||||||
|
opts.DoneChan <- struct{}{}
|
||||||
|
}
|
||||||
it.Close()
|
it.Close()
|
||||||
close(ch)
|
close(ch)
|
||||||
ro.Destroy()
|
ro.Destroy()
|
||||||
|
@ -355,6 +385,9 @@ func IterCF(db *grocksdb.DB, opts *IterOptions) <-chan *prefixes.PrefixRowKV {
|
||||||
if kv = opts.ReadRow(&prevKey); kv != nil {
|
if kv = opts.ReadRow(&prevKey); kv != nil {
|
||||||
ch <- kv
|
ch <- kv
|
||||||
}
|
}
|
||||||
|
if interruptRequested(opts.ShutdownChan) {
|
||||||
|
return
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
@ -514,6 +547,7 @@ func GetProdDB(name string, secondaryPath string) (*ReadOnlyDBColumnFamily, func
|
||||||
}
|
}
|
||||||
|
|
||||||
db, err := GetDBColumnFamilies(name, secondaryPath, cfNames)
|
db, err := GetDBColumnFamilies(name, secondaryPath, cfNames)
|
||||||
|
db.OpenIterators = make(map[string][]chan struct{})
|
||||||
|
|
||||||
cleanupFiles := func() {
|
cleanupFiles := func() {
|
||||||
err = os.RemoveAll(secondaryPath)
|
err = os.RemoveAll(secondaryPath)
|
||||||
|
@ -642,7 +676,16 @@ func (db *ReadOnlyDBColumnFamily) Unwind() {
|
||||||
|
|
||||||
// Shutdown shuts down the db.
|
// Shutdown shuts down the db.
|
||||||
func (db *ReadOnlyDBColumnFamily) Shutdown() {
|
func (db *ReadOnlyDBColumnFamily) Shutdown() {
|
||||||
|
// FIXME: Do we need to shutdown the iterators first?
|
||||||
db.ShutdownChan <- struct{}{}
|
db.ShutdownChan <- struct{}{}
|
||||||
|
db.ItMut.Lock()
|
||||||
|
for _, it := range db.OpenIterators {
|
||||||
|
it[0] <- struct{}{}
|
||||||
|
}
|
||||||
|
for _, it := range db.OpenIterators {
|
||||||
|
<-it[1]
|
||||||
|
}
|
||||||
|
db.ItMut.Unlock()
|
||||||
<-db.DoneChan
|
<-db.DoneChan
|
||||||
db.Cleanup()
|
db.Cleanup()
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,6 +22,9 @@ type IterOptions struct {
|
||||||
IncludeValue bool
|
IncludeValue bool
|
||||||
RawKey bool
|
RawKey bool
|
||||||
RawValue bool
|
RawValue bool
|
||||||
|
ShutdownChan chan struct{}
|
||||||
|
DoneChan chan struct{}
|
||||||
|
DB *ReadOnlyDBColumnFamily
|
||||||
CfHandle *grocksdb.ColumnFamilyHandle
|
CfHandle *grocksdb.ColumnFamilyHandle
|
||||||
It *grocksdb.Iterator
|
It *grocksdb.Iterator
|
||||||
Serializer *prefixes.SerializationAPI
|
Serializer *prefixes.SerializationAPI
|
||||||
|
@ -40,6 +43,9 @@ func NewIterateOptions() *IterOptions {
|
||||||
IncludeValue: false,
|
IncludeValue: false,
|
||||||
RawKey: false,
|
RawKey: false,
|
||||||
RawValue: false,
|
RawValue: false,
|
||||||
|
ShutdownChan: make(chan struct{}),
|
||||||
|
DoneChan: make(chan struct{}),
|
||||||
|
DB: nil,
|
||||||
CfHandle: nil,
|
CfHandle: nil,
|
||||||
It: nil,
|
It: nil,
|
||||||
Serializer: prefixes.ProductionAPI,
|
Serializer: prefixes.ProductionAPI,
|
||||||
|
@ -101,6 +107,11 @@ func (o *IterOptions) WithRawValue(rawValue bool) *IterOptions {
|
||||||
return o
|
return o
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (o *IterOptions) WithDB(db *ReadOnlyDBColumnFamily) *IterOptions {
|
||||||
|
o.DB = db
|
||||||
|
return o
|
||||||
|
}
|
||||||
|
|
||||||
func (o *IterOptions) WithSerializer(serializer *prefixes.SerializationAPI) *IterOptions {
|
func (o *IterOptions) WithSerializer(serializer *prefixes.SerializationAPI) *IterOptions {
|
||||||
o.Serializer = serializer
|
o.Serializer = serializer
|
||||||
return o
|
return o
|
||||||
|
|
Loading…
Reference in a new issue