Better iterator. Need to implement a lot of keys next, and tests, maybe

tests needed.
This commit is contained in:
Jeffrey Picard 2021-12-14 17:57:02 -05:00
parent 9565c94d84
commit 76f56c163a
5 changed files with 292 additions and 235 deletions

366
db/db.go
View file

@ -4,35 +4,38 @@ import (
"bytes"
"encoding/binary"
"encoding/hex"
"encoding/json"
"fmt"
"log"
"reflect"
"github.com/lbryio/hub/db/prefixes"
"github.com/lbryio/hub/db/rocksdbwrap"
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/linxGnu/grocksdb"
)
type IterOptions struct {
FillCache bool
Prefix []byte
Start []byte //interface{}
Stop []byte //interface{}
IncludeStart bool
IncludeStop bool
IncludeKey bool
IncludeValue bool
RawKey bool
RawValue bool
}
type PrefixRow struct {
//KeyStruct interface{}
//ValueStruct interface{}
Prefix []byte
KeyPackFunc interface{}
ValuePackFunc interface{}
KeyUnpackFunc interface{}
ValueUnpackFunc interface{}
DB *rocksdbwrap.RocksDB
// DB *grocksdb.DB
//KeyPackFunc interface{}
//ValuePackFunc interface{}
//KeyUnpackFunc interface{}
//ValueUnpackFunc interface{}
DB *grocksdb.DB
}
type PrefixRowKV struct {
@ -46,14 +49,116 @@ type PrefixRowKV2 struct {
}
type UTXOKey struct {
Prefix []byte
HashX []byte
TxNum uint32
Nout uint16
Prefix []byte `json:"prefix"`
HashX []byte `json:"hashx"`
TxNum uint32 `json:"tx_num"`
Nout uint16 `json:"nout"`
}
type UTXOValue struct {
Amount uint64
Amount uint64 `json:"amount"`
}
func UnpackGenericKey(key []byte) (byte, interface{}, error) {
if len(key) == 0 {
return 0x0, nil, errors.Base("key length zero")
}
firstByte := key[0]
switch firstByte {
case prefixes.ClaimToSupport:
case prefixes.SupportToClaim:
case prefixes.ClaimToTXO:
case prefixes.TXOToClaim:
case prefixes.ClaimToChannel:
case prefixes.ChannelToClaim:
case prefixes.ClaimShortIdPrefix:
case prefixes.EffectiveAmount:
case prefixes.ClaimExpiration:
case prefixes.ClaimTakeover:
case prefixes.PendingActivation:
case prefixes.ActivatedClaimAndSupport:
case prefixes.ActiveAmount:
case prefixes.Repost:
case prefixes.RepostedClaim:
case prefixes.Undo:
case prefixes.ClaimDiff:
case prefixes.Tx:
case prefixes.BlockHash:
case prefixes.Header:
case prefixes.TxNum:
case prefixes.TxCount:
case prefixes.TxHash:
return 0x0, nil, errors.Base("key unpack function for %v not implemented", firstByte)
case prefixes.UTXO:
return prefixes.UTXO, UTXOKeyUnpack(key), nil
case prefixes.HashXUTXO:
case prefixes.HashXHistory:
case prefixes.DBState:
case prefixes.ChannelCount:
case prefixes.SupportAmount:
case prefixes.BlockTXs:
}
return 0x0, nil, errors.Base("key unpack function for %v not implemented", firstByte)
}
func UnpackGenericValue(key, value []byte) (byte, interface{}, error) {
if len(key) == 0 {
return 0x0, nil, errors.Base("key length zero")
}
if len(value) == 0 {
return 0x0, nil, errors.Base("value length zero")
}
firstByte := key[0]
switch firstByte {
case prefixes.ClaimToSupport:
case prefixes.SupportToClaim:
case prefixes.ClaimToTXO:
case prefixes.TXOToClaim:
case prefixes.ClaimToChannel:
case prefixes.ChannelToClaim:
case prefixes.ClaimShortIdPrefix:
case prefixes.EffectiveAmount:
case prefixes.ClaimExpiration:
case prefixes.ClaimTakeover:
case prefixes.PendingActivation:
case prefixes.ActivatedClaimAndSupport:
case prefixes.ActiveAmount:
case prefixes.Repost:
case prefixes.RepostedClaim:
case prefixes.Undo:
case prefixes.ClaimDiff:
case prefixes.Tx:
case prefixes.BlockHash:
case prefixes.Header:
case prefixes.TxNum:
case prefixes.TxCount:
case prefixes.TxHash:
return 0x0, nil, nil
case prefixes.UTXO:
return prefixes.UTXO, UTXOValueUnpack(value), nil
case prefixes.HashXUTXO:
case prefixes.HashXHistory:
case prefixes.DBState:
case prefixes.ChannelCount:
case prefixes.SupportAmount:
case prefixes.BlockTXs:
}
return 0x0, nil, nil
}
// NewIterateOptions creates a defualt options structure for a db iterator.
@ -74,6 +179,8 @@ func NewIterateOptions() *IterOptions {
IncludeStop: false,
IncludeKey: true,
IncludeValue: false,
RawKey: false,
RawValue: false,
}
}
@ -122,19 +229,16 @@ func (k *UTXOKey) String() string {
)
}
func (pr *PrefixRow) Iter2(options *IterOptions) <-chan *PrefixRowKV2 {
func Iter(db *grocksdb.DB, opts *IterOptions) <-chan *PrefixRowKV2 {
ch := make(chan *PrefixRowKV2)
ro := grocksdb.NewDefaultReadOptions()
ro.SetFillCache(options.FillCache)
it := pr.DB.NewIterator(ro)
ro.SetFillCache(opts.FillCache)
it := db.NewIterator(ro)
it.Seek(pr.Prefix)
if options.Start != nil {
log.Println("Seeking to start")
it.Seek(options.Start)
} else {
log.Println("Not seeking to start")
it.Seek(opts.Prefix)
if opts.Start != nil {
it.Seek(opts.Start)
}
stopIteration := func(key []byte) bool {
@ -142,13 +246,13 @@ func (pr *PrefixRow) Iter2(options *IterOptions) <-chan *PrefixRowKV2 {
return false
}
if options.Stop != nil &&
(bytes.HasPrefix(key, options.Stop) || bytes.Compare(options.Stop, key[:len(options.Stop)]) < 0) {
if opts.Stop != nil &&
(bytes.HasPrefix(key, opts.Stop) || bytes.Compare(opts.Stop, key[:len(opts.Stop)]) < 0) {
return true
} else if options.Start != nil &&
bytes.Compare(options.Start, key[:len(options.Start)]) > 0 {
} else if opts.Start != nil &&
bytes.Compare(opts.Start, key[:len(opts.Start)]) > 0 {
return true
} else if pr.Prefix != nil && !bytes.HasPrefix(key, pr.Prefix) {
} else if opts.Prefix != nil && !bytes.HasPrefix(key, opts.Prefix) {
return true
}
@ -159,11 +263,11 @@ func (pr *PrefixRow) Iter2(options *IterOptions) <-chan *PrefixRowKV2 {
defer it.Close()
defer close(ch)
if !options.IncludeStart {
if !opts.IncludeStart {
it.Next()
}
var prevKey []byte = nil
for ; !stopIteration(prevKey); it.Next() {
for ; !stopIteration(prevKey) && it.Valid(); it.Next() {
key := it.Key()
keyData := key.Data()
keyLen := len(keyData)
@ -171,12 +275,13 @@ func (pr *PrefixRow) Iter2(options *IterOptions) <-chan *PrefixRowKV2 {
valueData := value.Data()
valueLen := len(valueData)
var unpackedKey interface{} = nil
var unpackedValue interface{} = nil
var outKey interface{} = nil
var outValue interface{} = nil
var err error = nil
// We need to check the current key is we're not including the stop
// key.
if !options.IncludeStop && stopIteration(keyData) {
if !opts.IncludeStop && stopIteration(keyData) {
return
}
@ -184,117 +289,44 @@ func (pr *PrefixRow) Iter2(options *IterOptions) <-chan *PrefixRowKV2 {
// it on the next iterations to see if we're going to stop.
newKeyData := make([]byte, keyLen)
copy(newKeyData, keyData)
if options.IncludeKey {
unpackKeyFnValue := reflect.ValueOf(pr.KeyUnpackFunc)
keyArgs := []reflect.Value{reflect.ValueOf(newKeyData)}
unpackKeyFnResult := unpackKeyFnValue.Call(keyArgs)
unpackedKey = unpackKeyFnResult[0].Interface() //.(*UTXOKey)
if opts.IncludeKey && !opts.RawKey {
//unpackKeyFnValue := reflect.ValueOf(KeyUnpackFunc)
//keyArgs := []reflect.Value{reflect.ValueOf(newKeyData)}
//unpackKeyFnResult := unpackKeyFnValue.Call(keyArgs)
//outKey = unpackKeyFnResult[0].Interface()
_, outKey, err = UnpackGenericKey(newKeyData)
if err != nil {
log.Println(err)
}
} else if opts.IncludeKey {
outKey = newKeyData
}
// Value could be quite large, so this setting could be important
// for performance in some cases.
if options.IncludeValue {
if opts.IncludeValue {
newValueData := make([]byte, valueLen)
copy(newValueData, valueData)
unpackValueFnValue := reflect.ValueOf(pr.ValueUnpackFunc)
valueArgs := []reflect.Value{reflect.ValueOf(newValueData)}
unpackValueFnResult := unpackValueFnValue.Call(valueArgs)
unpackedValue = unpackValueFnResult[0].Interface() //.(*UTXOValue)
//unpackValueFnValue := reflect.ValueOf(ValueUnpackFunc)
//valueArgs := []reflect.Value{reflect.ValueOf(newValueData)}
//unpackValueFnResult := unpackValueFnValue.Call(valueArgs)
//outValue = unpackValueFnResult[0].Interface()
if !opts.RawValue {
_, outValue, err = UnpackGenericValue(newKeyData, newValueData)
if err != nil {
log.Println(err)
}
} else {
outValue = newValueData
}
}
key.Free()
value.Free()
ch <- &PrefixRowKV2{
Key: unpackedKey,
Value: unpackedValue,
}
prevKey = newKeyData
}
}()
return ch
}
func (pr *PrefixRow) Iter(options *IterOptions) <-chan *PrefixRowKV {
ch := make(chan *PrefixRowKV)
ro := grocksdb.NewDefaultReadOptions()
ro.SetFillCache(options.FillCache)
it := pr.DB.NewIterator(ro)
it.Seek(pr.Prefix)
if options.Start != nil {
log.Println("Seeking to start")
it.Seek(options.Start)
} else {
log.Println("Not seeking to start")
}
stopIteration := func(key []byte) bool {
if key == nil {
return false
}
if options.Stop != nil &&
(bytes.HasPrefix(key, options.Stop) || bytes.Compare(options.Stop, key[:len(options.Stop)]) < 0) {
return true
} else if options.Start != nil &&
bytes.Compare(options.Start, key[:len(options.Start)]) > 0 {
return true
} else if pr.Prefix != nil && !bytes.HasPrefix(key, pr.Prefix) {
return true
}
return false
}
go func() {
defer it.Close()
defer close(ch)
if !options.IncludeStart {
it.Next()
}
var prevKey []byte = nil
for ; !stopIteration(prevKey); it.Next() {
key := it.Key()
keyData := key.Data()
keyLen := len(keyData)
value := it.Value()
valueData := value.Data()
valueLen := len(valueData)
// We need to check the current key is we're not including the stop
// key.
if !options.IncludeStop && stopIteration(keyData) {
return
}
var outputKeyData []byte = nil
// We have to copy the key no matter what because we need to check
// it on the next iterations to see if we're going to stop.
newKeyData := make([]byte, keyLen)
copy(newKeyData, keyData)
if options.IncludeKey {
outputKeyData = newKeyData
}
var newValueData []byte = nil
// Value could be quite large, so this setting could be important
// for performance in some cases.
if options.IncludeValue {
newValueData = make([]byte, valueLen)
copy(newValueData, valueData)
}
key.Free()
value.Free()
ch <- &PrefixRowKV{
Key: outputKeyData,
Value: newValueData,
Key: outKey,
Value: outValue,
}
prevKey = newKeyData
@ -305,7 +337,7 @@ func (pr *PrefixRow) Iter(options *IterOptions) <-chan *PrefixRowKV {
}
func (k *UTXOKey) PackKey() []byte {
prefixLen := len(prefixes.UTXO)
prefixLen := 1
// b'>11sLH'
n := prefixLen + 11 + 4 + 2
key := make([]byte, n)
@ -328,7 +360,7 @@ func UTXOKeyPackPartialNFields(nFields int) func(*UTXOKey) []byte {
// a byte array.
func UTXOKeyPackPartial(k *UTXOKey, nFields int) []byte {
// Limit nFields between 0 and number of fields, we always at least need
// the prefix and we never need to iterate past the number of fields.
// the prefix, and we never need to iterate past the number of fields.
if nFields > 3 {
nFields = 3
}
@ -337,7 +369,7 @@ func UTXOKeyPackPartial(k *UTXOKey, nFields int) []byte {
}
// b'>11sLH'
prefixLen := len(prefixes.UTXO)
prefixLen := 1
var n = prefixLen
for i := 0; i <= nFields; i++ {
switch i {
@ -469,43 +501,55 @@ func OpenDB(name string, start string) int {
func OpenAndWriteDB(prIn *PrefixRow, options *IterOptions, out string) {
// Write db
opts := grocksdb.NewDefaultOptions()
opts.SetCreateIfMissing(true)
db, err := grocksdb.OpenDb(opts, out)
if err != nil {
log.Println(err)
}
wo := grocksdb.NewDefaultWriteOptions()
defer db.Close()
//opts := grocksdb.NewDefaultOptions()
//opts.SetCreateIfMissing(true)
//db, err := grocksdb.OpenDb(opts, out)
//if err != nil {
// log.Println(err)
//}
//wo := grocksdb.NewDefaultWriteOptions()
//defer db.Close()
ch := prIn.Iter(options)
options.Prefix = prIn.Prefix
ch := Iter(prIn.DB, options)
var i = 0
for kv := range ch {
key := kv.Key
value := kv.Value
var unpackedKey *UTXOKey = nil
var unpackedValue *UTXOValue = nil
key := kv.Key.(*UTXOKey)
value := kv.Value.(*UTXOValue)
//var unpackedKey *UTXOKey = nil
//var unpackedValue *UTXOValue = nil
if key != nil {
unpackKeyFnValue := reflect.ValueOf(prIn.KeyUnpackFunc)
keyArgs := []reflect.Value{reflect.ValueOf(key)}
unpackKeyFnResult := unpackKeyFnValue.Call(keyArgs)
unpackedKey = unpackKeyFnResult[0].Interface().(*UTXOKey)
}
//if key != nil {
// unpackKeyFnValue := reflect.ValueOf(prIn.KeyUnpackFunc)
// keyArgs := []reflect.Value{reflect.ValueOf(key)}
// unpackKeyFnResult := unpackKeyFnValue.Call(keyArgs)
// unpackedKey = unpackKeyFnResult[0].Interface().(*UTXOKey)
//}
//
//if value != nil {
// unpackValueFnValue := reflect.ValueOf(prIn.ValueUnpackFunc)
// valueArgs := []reflect.Value{reflect.ValueOf(value)}
// unpackValueFnResult := unpackValueFnValue.Call(valueArgs)
// unpackedValue = unpackValueFnResult[0].Interface().(*UTXOValue)
//}
if value != nil {
unpackValueFnValue := reflect.ValueOf(prIn.ValueUnpackFunc)
valueArgs := []reflect.Value{reflect.ValueOf(value)}
unpackValueFnResult := unpackValueFnValue.Call(valueArgs)
unpackedValue = unpackValueFnResult[0].Interface().(*UTXOValue)
}
//log.Println(key, value)
log.Println(unpackedKey, unpackedValue)
if err := db.Put(wo, key, value); err != nil {
keyMarshal, err := json.Marshal(key)
if err != nil {
log.Println(err)
}
valMarshal, err := json.Marshal(value)
if err != nil {
log.Println(err)
}
log.Println(string(keyMarshal), string(valMarshal))
//if err := db.Put(wo, key, value); err != nil {
// log.Println(err)
//}
i++
}
}

View file

@ -30,20 +30,16 @@ func TestReadUTXO2(t *testing.T) {
t.Errorf("err not nil: %+v\n", err)
}
defer db.Close()
utxoRow := &PrefixRow{
// KeyStruct: UTXOKey{},
// ValueStruct: UTXOValue{},
Prefix: prefixes.UTXO,
KeyPackFunc: nil,
ValuePackFunc: nil,
DB: db,
}
//utxoRow := &PrefixRow{
// Prefix: []byte{prefixes.UTXO},
// DB: db,
//}
b, err := hex.DecodeString("000012")
if err != nil {
log.Println(err)
}
stopKey := &UTXOKey{
Prefix: prefixes.UTXO,
Prefix: []byte{prefixes.UTXO},
HashX: b,
TxNum: 0,
Nout: 0,
@ -52,24 +48,25 @@ func TestReadUTXO2(t *testing.T) {
options := &IterOptions{
FillCache: false,
Prefix: []byte{prefixes.UTXO},
Start: nil,
Stop: stop,
IncludeStart: true,
IncludeStop: false,
IncludeKey: true,
IncludeValue: true,
RawKey: false,
RawValue: false,
}
log.Println(options)
ch := utxoRow.Iter(options)
ch := Iter(db, options)
var i = 0
for kv := range ch {
log.Println(kv.Key)
log.Println(UTXOKeyUnpack(kv.Key))
log.Println(UTXOValueUnpack(kv.Value))
got := UTXOValueUnpack(kv.Value).Amount
got := kv.Value.(*UTXOValue).Amount
if got != tt.want[i] {
t.Errorf("got: %d, want: %d\n", got, tt.want[i])
}

View file

@ -1,41 +1,80 @@
package prefixes
var (
ClaimToSupport = []byte("K")
SupportToClaim = []byte("L")
const (
ClaimToTXO = []byte("E")
TXOToClaim = []byte("G")
//ClaimToSupport = []byte("K")
//SupportToClaim = []byte("L")
//
//ClaimToTXO = []byte("E")
//TXOToClaim = []byte("G")
//
//ClaimToChannel = []byte("I")
//ChannelToClaim = []byte("J")
//
//ClaimShortIdPrefix = []byte("F")
//EffectiveAmount = []byte("D")
//ClaimExpiration = []byte("O")
//
//ClaimTakeover = []byte("P")
//PendingActivation = []byte("Q")
//ActivatedClaimAndSupport = []byte("R")
//ActiveAmount = []byte("S")
//
//Repost = []byte("V")
//RepostedClaim = []byte("W")
//
//Undo = []byte("M")
//ClaimDiff = []byte("Y")
//
//Tx = []byte("B")
//BlockHash = []byte("C")
//Header = []byte("H")
//TxNum = []byte("N")
//TxCount = []byte("T")
//TxHash = []byte("X")
//UTXO = []byte("u")
//HashXUTXO = []byte("h")
//HashXHistory = []byte("x")
//DBState = []byte("s")
//ChannelCount = []byte("Z")
//SupportAmount = []byte("a")
//BlockTXs = []byte("b")
ClaimToChannel = []byte("I")
ChannelToClaim = []byte("J")
ClaimToSupport = 'K'
SupportToClaim = 'L'
ClaimShortIdPrefix = []byte("F")
EffectiveAmount = []byte("D")
ClaimExpiration = []byte("O")
ClaimToTXO = 'E'
TXOToClaim = 'G'
ClaimTakeover = []byte("P")
PendingActivation = []byte("Q")
ActivatedClaimAndSupport = []byte("R")
ActiveAmount = []byte("S")
ClaimToChannel = 'I'
ChannelToClaim = 'J'
Repost = []byte("V")
RepostedClaim = []byte("W")
ClaimShortIdPrefix = 'F'
EffectiveAmount = 'D'
ClaimExpiration = 'O'
Undo = []byte("M")
ClaimDiff = []byte("Y")
ClaimTakeover = 'P'
PendingActivation = 'Q'
ActivatedClaimAndSupport = 'R'
ActiveAmount = 'S'
Tx = []byte("B")
BlockHash = []byte("C")
Header = []byte("H")
TxNum = []byte("N")
TxCount = []byte("T")
TxHash = []byte("X")
UTXO = []byte("u")
HashXUTXO = []byte("h")
HashXHistory = []byte("x")
DBState = []byte("s")
ChannelCount = []byte("Z")
SupportAmount = []byte("a")
BlockTXs = []byte("b")
Repost = 'V'
RepostedClaim = 'W'
Undo = 'M'
ClaimDiff = 'Y'
Tx = 'B'
BlockHash = 'C'
Header = 'H'
TxNum = 'N'
TxCount = 'T'
TxHash = 'X'
UTXO = 'u'
HashXUTXO = 'h'
HashXHistory = 'x'
DBState = 's'
ChannelCount = 'Z'
SupportAmount = 'a'
BlockTXs = 'b'
)

View file

@ -1,20 +0,0 @@
package rocksdbwrap
import (
"github.com/linxGnu/grocksdb"
)
type RocksDB struct {
DB *grocksdb.DB
}
type RocksDBIterator struct {
It *grocksdb.Iterator
}
func (db *RocksDB) NewIterator(opts *grocksdb.ReadOptions) *RocksDBIterator {
it := db.DB.NewIterator(opts)
return &RocksDBIterator{
It: it,
}
}

13
main.go
View file

@ -32,17 +32,13 @@ func main() {
return
} else if args.CmdType == server.DBCmd {
dbVal, err := db.GetDB("/mnt/d/data/wallet/lbry-rocksdb/")
dbVal, err := db.GetDB("./resources/asdf.db")
if err != nil {
log.Fatalln(err)
}
pr := &db.PrefixRow{
Prefix: prefixes.UTXO,
KeyPackFunc: nil,
ValuePackFunc: nil,
KeyUnpackFunc: db.UTXOKeyUnpack,
ValueUnpackFunc: db.UTXOValueUnpack,
Prefix: []byte{prefixes.UTXO},
DB: dbVal,
}
@ -51,13 +47,14 @@ func main() {
log.Println(err)
}
stopKey := &db.UTXOKey{
Prefix: prefixes.UTXO,
Prefix: []byte{prefixes.UTXO},
HashX: b,
TxNum: 0,
Nout: 0,
}
stop := db.UTXOKeyPackPartial(stopKey, 1)
log.Println(stop)
log.Print(hex.EncodeToString(stop))
options := &db.IterOptions{
@ -70,7 +67,7 @@ func main() {
IncludeValue: true,
}
db.OpenAndWriteDB(pr, options, "./resources/asdf.db")
db.OpenAndWriteDB(pr, options, "./resources/asdf2.db")
return
}