cleanup some logic around iterators and fix a bug where I was running two detect changes threads

This commit is contained in:
Jeffrey Picard 2022-03-09 21:41:38 +00:00
parent c66d07e8cc
commit b080ed3476

View file

@ -285,8 +285,13 @@ func (o *IterOptions) StopIteration(key []byte) bool {
return false
}
func (opts *IterOptions) ReadRow(ch chan *prefixes.PrefixRowKV, prevKey *[]byte) bool {
func (opts *IterOptions) ReadRow(prevKey *[]byte) *prefixes.PrefixRowKV {
it := opts.It
if !it.Valid() {
log.Trace("ReadRow iterator not valid returning nil")
return nil
}
key := it.Key()
keyData := key.Data()
keyLen := len(keyData)
@ -304,8 +309,8 @@ func (opts *IterOptions) ReadRow(ch chan *prefixes.PrefixRowKV, prevKey *[]byte)
// We need to check the current key if we're not including the stop
// key.
if !opts.IncludeStop && opts.StopIteration(keyData) {
log.Trace("ReadRow returning false")
return false
log.Trace("ReadRow returning nil")
return nil
}
// We have to copy the key no matter what because we need to check
@ -339,13 +344,13 @@ func (opts *IterOptions) ReadRow(ch chan *prefixes.PrefixRowKV, prevKey *[]byte)
key.Free()
value.Free()
ch <- &prefixes.PrefixRowKV{
kv := &prefixes.PrefixRowKV{
Key: outKey,
Value: outValue,
}
*prevKey = newKeyData
return true
return kv
}
func IterCF(db *grocksdb.DB, opts *IterOptions) <-chan *prefixes.PrefixRowKV {
@ -365,16 +370,23 @@ func IterCF(db *grocksdb.DB, opts *IterOptions) <-chan *prefixes.PrefixRowKV {
defer it.Close()
defer close(ch)
var prevKey []byte = nil
var prevKey []byte
// FIXME: There's messy uses of kv being nil / not nil here.
var kv *prefixes.PrefixRowKV = nil
if !opts.IncludeStart {
kv = opts.ReadRow(&prevKey)
it.Next()
}
if !it.Valid() && opts.IncludeStop {
opts.ReadRow(ch, &prevKey)
if !it.Valid() && opts.IncludeStop && kv != nil {
ch <- kv
}
var continueIter bool = true
for ; continueIter && !opts.StopIteration(prevKey) && it.Valid(); it.Next() {
continueIter = opts.ReadRow(ch, &prevKey)
kv = &prefixes.PrefixRowKV{}
for ; kv != nil && !opts.StopIteration(prevKey) && it.Valid(); it.Next() {
if kv = opts.ReadRow(&prevKey); kv != nil {
ch <- kv
}
}
}()
@ -405,15 +417,21 @@ func Iter(db *grocksdb.DB, opts *IterOptions) <-chan *prefixes.PrefixRowKV {
defer it.Close()
defer close(ch)
var prevKey []byte = nil
var prevKey []byte
var kv *prefixes.PrefixRowKV = &prefixes.PrefixRowKV{}
if !opts.IncludeStart {
kv = opts.ReadRow(&prevKey)
it.Next()
}
if !it.Valid() && opts.IncludeStop {
opts.ReadRow(ch, &prevKey)
if !it.Valid() && opts.IncludeStop && kv != nil {
ch <- kv
}
for ; !opts.StopIteration(prevKey) && it.Valid(); it.Next() {
opts.ReadRow(ch, &prevKey)
for ; kv != nil && !opts.StopIteration(prevKey) && it.Valid(); it.Next() {
if kv = opts.ReadRow(&prevKey); kv != nil {
ch <- kv
}
}
}()
@ -527,8 +545,6 @@ func GetDBColumnFamlies(name string, secondayPath string, cfNames []string) (*Re
return nil, err
}
RunDetectChanges(myDB)
return myDB, nil
}
@ -573,11 +589,11 @@ func RunDetectChanges(db *ReadOnlyDBColumnFamily) {
go func() {
for {
// FIXME: Figure out best sleep interval
time.Sleep(time.Second)
err := DetectChanges(db)
if err != nil {
log.Printf("Error detecting changes: %#v\n", err)
}
time.Sleep(time.Second)
}
}()
}
@ -793,7 +809,7 @@ func InitTxCounts(db *ReadOnlyDBColumnFamily) error {
// whjy not needs to be len-1 because we start loading with the zero block
// and the txcounts start at one???
db.Height = db.TxCounts.Len()
// db.Height = db.TxCounts.Len()
// if db.TxCounts.Len() > 0 {
// db.Height = db.TxCounts.Len() - 1
// } else {