This commit is contained in:
Jeffrey Picard 2021-12-14 08:14:42 -05:00
parent 946826c25c
commit 9565c94d84
2 changed files with 123 additions and 1 deletions

104
db/db.go
View file

@ -9,6 +9,7 @@ import (
"reflect"
"github.com/lbryio/hub/db/prefixes"
"github.com/lbryio/hub/db/rocksdbwrap"
"github.com/linxGnu/grocksdb"
)
@ -30,7 +31,8 @@ type PrefixRow struct {
ValuePackFunc interface{}
KeyUnpackFunc interface{}
ValueUnpackFunc interface{}
DB *grocksdb.DB
DB *rocksdbwrap.RocksDB
// DB *grocksdb.DB
}
type PrefixRowKV struct {
@ -38,6 +40,11 @@ type PrefixRowKV struct {
Value []byte
}
type PrefixRowKV2 struct {
Key interface{}
Value interface{}
}
type UTXOKey struct {
Prefix []byte
HashX []byte
@ -115,6 +122,101 @@ func (k *UTXOKey) String() string {
)
}
func (pr *PrefixRow) Iter2(options *IterOptions) <-chan *PrefixRowKV2 {
ch := make(chan *PrefixRowKV2)
ro := grocksdb.NewDefaultReadOptions()
ro.SetFillCache(options.FillCache)
it := pr.DB.NewIterator(ro)
it.Seek(pr.Prefix)
if options.Start != nil {
log.Println("Seeking to start")
it.Seek(options.Start)
} else {
log.Println("Not seeking to start")
}
stopIteration := func(key []byte) bool {
if key == nil {
return false
}
if options.Stop != nil &&
(bytes.HasPrefix(key, options.Stop) || bytes.Compare(options.Stop, key[:len(options.Stop)]) < 0) {
return true
} else if options.Start != nil &&
bytes.Compare(options.Start, key[:len(options.Start)]) > 0 {
return true
} else if pr.Prefix != nil && !bytes.HasPrefix(key, pr.Prefix) {
return true
}
return false
}
go func() {
defer it.Close()
defer close(ch)
if !options.IncludeStart {
it.Next()
}
var prevKey []byte = nil
for ; !stopIteration(prevKey); it.Next() {
key := it.Key()
keyData := key.Data()
keyLen := len(keyData)
value := it.Value()
valueData := value.Data()
valueLen := len(valueData)
var unpackedKey interface{} = nil
var unpackedValue interface{} = nil
// We need to check the current key is we're not including the stop
// key.
if !options.IncludeStop && stopIteration(keyData) {
return
}
// We have to copy the key no matter what because we need to check
// it on the next iterations to see if we're going to stop.
newKeyData := make([]byte, keyLen)
copy(newKeyData, keyData)
if options.IncludeKey {
unpackKeyFnValue := reflect.ValueOf(pr.KeyUnpackFunc)
keyArgs := []reflect.Value{reflect.ValueOf(newKeyData)}
unpackKeyFnResult := unpackKeyFnValue.Call(keyArgs)
unpackedKey = unpackKeyFnResult[0].Interface() //.(*UTXOKey)
}
// Value could be quite large, so this setting could be important
// for performance in some cases.
if options.IncludeValue {
newValueData := make([]byte, valueLen)
copy(newValueData, valueData)
unpackValueFnValue := reflect.ValueOf(pr.ValueUnpackFunc)
valueArgs := []reflect.Value{reflect.ValueOf(newValueData)}
unpackValueFnResult := unpackValueFnValue.Call(valueArgs)
unpackedValue = unpackValueFnResult[0].Interface() //.(*UTXOValue)
}
key.Free()
value.Free()
ch <- &PrefixRowKV2{
Key: unpackedKey,
Value: unpackedValue,
}
prevKey = newKeyData
}
}()
return ch
}
func (pr *PrefixRow) Iter(options *IterOptions) <-chan *PrefixRowKV {
ch := make(chan *PrefixRowKV)

View file

@ -0,0 +1,20 @@
package rocksdbwrap
import (
"github.com/linxGnu/grocksdb"
)
type RocksDB struct {
DB *grocksdb.DB
}
type RocksDBIterator struct {
It *grocksdb.Iterator
}
func (db *RocksDB) NewIterator(opts *grocksdb.ReadOptions) *RocksDBIterator {
it := db.DB.NewIterator(opts)
return &RocksDBIterator{
It: it,
}
}