From a3712f0c02cf74bf073b41fbc80652e6d290dab8 Mon Sep 17 00:00:00 2001 From: Jeffrey Picard Date: Sat, 11 Dec 2021 17:22:45 -0500 Subject: [PATCH] Progress of reading rocksdb --- db/db.go | 259 +++++++++++++++++++++++++++++++++++++- db/db_test.go | 139 ++++++++++++++++++++ db/prefixes/prefixes.go | 41 ++++++ docker/Dockerfile.github | 19 ++- scripts/build_and_test.sh | 1 + 5 files changed, 453 insertions(+), 6 deletions(-) create mode 100644 db/prefixes/prefixes.go diff --git a/db/db.go b/db/db.go index d483210..4d169a3 100644 --- a/db/db.go +++ b/db/db.go @@ -1,11 +1,261 @@ package db import ( + "bytes" + "encoding/binary" + "encoding/hex" "fmt" - "github.com/linxGnu/grocksdb" "log" + "reflect" + + "github.com/lbryio/hub/db/prefixes" + "github.com/linxGnu/grocksdb" ) +type IterOptions struct { + FillCache bool + Start []byte //interface{} + Stop []byte //interface{} +} + +type PrefixRow struct { + //KeyStruct interface{} + //ValueStruct interface{} + Prefix []byte + KeyPackFunc interface{} + ValuePackFunc interface{} + DB *grocksdb.DB +} + +type PrefixRowKV struct { + Key []byte + Value []byte +} + +type UTXOKey struct { + Prefix []byte + HashX []byte + TxNum uint32 + Nout uint16 +} + +type UTXOValue struct { + Amount uint64 +} + +func NewIterateOptions() *IterOptions { + return &IterOptions{ + FillCache: false, + Start: nil, + Stop: nil, + } +} + +func (o *IterOptions) WithFillCache(fillCache bool) *IterOptions { + o.FillCache = fillCache + return o +} + +func (o *IterOptions) WithStart(start []byte) *IterOptions { + o.Start = start + return o +} + +func (o *IterOptions) WithStop(stop []byte) *IterOptions { + o.Stop = stop + return o +} + +func (k *UTXOKey) String() string { + return fmt.Sprintf( + "%s(hashX=%s, tx_num=%d, nout=%d)", + reflect.TypeOf(k), + hex.EncodeToString(k.HashX), + k.TxNum, + k.Nout, + ) +} + +func (pr *PrefixRow) Iter(options *IterOptions) <-chan *PrefixRowKV { + ch := make(chan *PrefixRowKV) + + ro := grocksdb.NewDefaultReadOptions() + ro.SetFillCache(options.FillCache) + it := pr.DB.NewIterator(ro) + + it.Seek(pr.Prefix) + if options.Start != nil { + it.Seek(options.Start) + } + + /* + def _check_stop_iteration(self, key: bytes): + if self.stop is not None and (key.startswith(self.stop) or self.stop < key[:len(self.stop)]): + raise StopIteration + elif self.start is not None and self.start > key[:len(self.start)]: + raise StopIteration + elif self.prefix is not None and not key.startswith(self.prefix): + raise StopIteration + */ + terminateFunc := func(key []byte) bool { + if key == nil { + return true + } + + if options.Stop != nil && + (bytes.HasPrefix(key, options.Stop) || bytes.Compare(options.Stop, key[:len(options.Stop)]) < 0) { + return false + } else if options.Start != nil && + bytes.Compare(options.Start, key[:len(options.Start)]) > 0 { + return false + } else if pr.Prefix != nil && !bytes.HasPrefix(key, pr.Prefix) { + return false + } + + return true + } + + var prevKey []byte = nil + go func() { + for ; terminateFunc(prevKey); it.Next() { + key := it.Key() + prevKey = key.Data() + value := it.Value() + + ch <- &PrefixRowKV{ + Key: key.Data(), + Value: value.Data(), + } + + key.Free() + value.Free() + } + close(ch) + }() + + return ch +} + +func (k *UTXOKey) PackKey() []byte { + prefixLen := len(prefixes.UTXO) + // b'>11sLH' + n := prefixLen + 11 + 4 + 2 + key := make([]byte, n) + copy(key, k.Prefix) + copy(key[prefixLen:], k.HashX) + binary.BigEndian.PutUint32(key[prefixLen+11:], k.TxNum) + binary.BigEndian.PutUint16(key[prefixLen+15:], k.Nout) + + return key +} + +// UTXOKeyPackPartial packs a variable number of fields for a UTXOKey into +// a byte array. +func UTXOKeyPackPartial(k *UTXOKey, nFields int) []byte { + // Limit nFields between 0 and number of fields, we always at least need + // the prefix and we never need to iterate past the number of fields. + if nFields > 3 { + nFields = 3 + } + if nFields < 0 { + nFields = 0 + } + + // b'>11sLH' + prefixLen := len(prefixes.UTXO) + var n = prefixLen + for i := 0; i <= nFields; i++ { + switch i { + case 1: + n += 11 + case 2: + n += 4 + case 3: + n += 2 + } + } + + key := make([]byte, n) + + for i := 0; i <= nFields; i++ { + switch i { + case 0: + copy(key, k.Prefix) + case 1: + copy(key[prefixLen:], k.HashX) + case 2: + binary.BigEndian.PutUint32(key[prefixLen+11:], k.TxNum) + case 3: + binary.BigEndian.PutUint16(key[prefixLen+15:], k.Nout) + } + } + + return key +} + +func UTXOKeyUnpack(key []byte) *UTXOKey { + return &UTXOKey{ + Prefix: key[:1], + HashX: key[1:12], + TxNum: binary.BigEndian.Uint32(key[12:]), + Nout: binary.BigEndian.Uint16(key[16:]), + } +} + +func (k *UTXOValue) PackValue() []byte { + value := make([]byte, 8) + binary.BigEndian.PutUint64(value, k.Amount) + + return value +} + +func UTXOValueUnpack(value []byte) *UTXOValue { + return &UTXOValue{ + Amount: binary.BigEndian.Uint64(value), + } +} + +func GetDB(name string) (*grocksdb.DB, error) { + opts := grocksdb.NewDefaultOptions() + db, err := grocksdb.OpenDb(opts, name) + if err != nil { + return nil, err + } + + return db, nil +} + +func ReadPrefixN(db *grocksdb.DB, prefix []byte, n int) []*PrefixRowKV { + ro := grocksdb.NewDefaultReadOptions() + ro.SetFillCache(false) + + it := db.NewIterator(ro) + defer it.Close() + + res := make([]*PrefixRowKV, n) + + var i = 0 + it.Seek(prefix) + for ; it.Valid(); it.Next() { + key := it.Key() + value := it.Value() + + res[i] = &PrefixRowKV{ + Key: key.Data(), + Value: value.Data(), + } + + key.Free() + value.Free() + i++ + if i >= n { + break + } + } + + return res +} + func OpenDB(name string) int { // Read db opts := grocksdb.NewDefaultOptions() @@ -23,7 +273,7 @@ func OpenDB(name string) int { var i = 0 it.Seek([]byte("foo")) - for it = it; it.Valid(); it.Next() { + for ; it.Valid(); it.Next() { key := it.Key() value := it.Value() @@ -52,6 +302,9 @@ func OpenAndWriteDB(in string, out string) { // Write db opts.SetCreateIfMissing(true) db2, err := grocksdb.OpenDb(opts, out) + if err != nil { + log.Println(err) + } wo := grocksdb.NewDefaultWriteOptions() defer db2.Close() @@ -63,7 +316,7 @@ func OpenAndWriteDB(in string, out string) { var i = 0 it.Seek([]byte("foo")) - for it = it; it.Valid() && i < 10; it.Next() { + for ; it.Valid() && i < 10; it.Next() { key := it.Key() value := it.Value() fmt.Printf("Key: %v Value: %v\n", key.Data(), value.Data()) diff --git a/db/db_test.go b/db/db_test.go index 9b78de2..41cee3b 100644 --- a/db/db_test.go +++ b/db/db_test.go @@ -1,9 +1,111 @@ package db import ( + "encoding/hex" + "fmt" + "log" "testing" + + "github.com/lbryio/hub/db/prefixes" ) +func TestReadUTXO2(t *testing.T) { + + tests := []struct { + name string + want []uint64 + }{ + { + name: "Read 10 UTXO Key Values", + want: []uint64{2174594, 200000000, 20000000, 100000, 603510, 75000000, 100000, 962984, 25000000, 50000000}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + db, err := GetDB("/mnt/d/data/wallet/lbry-rocksdb") + if err != nil { + t.Errorf("err not nil: %+v\n", err) + } + utxoRow := &PrefixRow{ + // KeyStruct: UTXOKey{}, + // ValueStruct: UTXOValue{}, + Prefix: prefixes.UTXO, + KeyPackFunc: nil, + ValuePackFunc: nil, + DB: db, + } + b, err := hex.DecodeString("000012b") + if err != nil { + log.Println(err) + } + stopKey := &UTXOKey{ + Prefix: prefixes.UTXO, + HashX: b, + TxNum: 0, + Nout: 0, + } + stop := UTXOKeyPackPartial(stopKey, 1) + + options := NewIterateOptions().WithFillCache(false).WithStop(stop) + log.Println(options) + + ch := utxoRow.Iter(options) + + var i = 0 + for kv := range ch { + log.Println(kv.Key) + log.Println(UTXOKeyUnpack(kv.Key)) + log.Println(UTXOValueUnpack(kv.Value)) + got := UTXOValueUnpack(kv.Value).Amount + if got != tt.want[i] { + t.Errorf("got: %d, want: %d\n", got, tt.want) + } + i++ + if i >= 10 { + break + } + } + }) + } + +} + +func TestReadUTXO(t *testing.T) { + + tests := []struct { + name string + want int + }{ + { + name: "Read 10 UTXO Key Values", + want: 10, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + db, err := GetDB("/mnt/d/data/wallet/lbry-rocksdb") + if err != nil { + t.Errorf("err not nil: %+v\n", err) + } + data := ReadPrefixN(db, prefixes.UTXO, tt.want) + + got := len(data) + + for _, kv := range data { + log.Println(UTXOKeyUnpack(kv.Key)) + log.Println(UTXOValueUnpack(kv.Value)) + } + + if got != tt.want { + t.Errorf("got: %d, want: %d\n", got, tt.want) + } + }) + } + +} + // TestOpenDB test to see if we can open a db func TestOpenDB(t *testing.T) { @@ -29,3 +131,40 @@ func TestOpenDB(t *testing.T) { } } + +func TestUTXOKey_String(t *testing.T) { + tests := []struct { + name string + prefix []byte + hashx []byte + txnum uint32 + nout uint16 + want string + }{ + { + name: "Converts to string", + prefix: []byte("u"), + hashx: []byte("AAAAAAAAAA"), + txnum: 0, + nout: 0, + want: "db.UTXOKey(hashX=41414141414141414141, tx_num=0, nout=0)", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + key := UTXOKey{ + Prefix: tt.prefix, + HashX: tt.hashx, + TxNum: tt.txnum, + Nout: tt.nout, + } + + got := fmt.Sprint(key) + log.Println(got) + if got != tt.want { + t.Errorf("got: %s, want: %s\n", got, tt.want) + } + }) + } +} diff --git a/db/prefixes/prefixes.go b/db/prefixes/prefixes.go new file mode 100644 index 0000000..7025d62 --- /dev/null +++ b/db/prefixes/prefixes.go @@ -0,0 +1,41 @@ +package prefixes + +var ( + ClaimToSupport = []byte("K") + SupportToClaim = []byte("L") + + ClaimToTXO = []byte("E") + TXOToClaim = []byte("G") + + ClaimToChannel = []byte("I") + ChannelToClaim = []byte("J") + + ClaimShortIdPrefix = []byte("F") + EffectiveAmount = []byte("D") + ClaimExpiration = []byte("O") + + ClaimTakeover = []byte("P") + PendingActivation = []byte("Q") + ActivatedClaimAndSupport = []byte("R") + ActiveAmount = []byte("S") + + Repost = []byte("V") + RepostedClaim = []byte("W") + + Undo = []byte("M") + ClaimDiff = []byte("Y") + + Tx = []byte("B") + BlockHash = []byte("C") + Header = []byte("H") + TxNum = []byte("N") + TxCount = []byte("T") + TxHash = []byte("X") + UTXO = []byte("u") + HashXUTXO = []byte("h") + HashXHistory = []byte("x") + DBState = []byte("s") + ChannelCount = []byte("Z") + SupportAmount = []byte("a") + BlockTXs = []byte("b") +) diff --git a/docker/Dockerfile.github b/docker/Dockerfile.github index 0ad40a2..8680b43 100644 --- a/docker/Dockerfile.github +++ b/docker/Dockerfile.github @@ -2,11 +2,24 @@ FROM golang:1.16.11-bullseye RUN apt-get update && \ apt-get upgrade && \ - apt-get install -y dnsutils git libsnappy-dev liblz4-dev libzstd-dev zlib1g-dev -RUN git clone https://github.com/facebook/rocksdb.git && \ + apt-get install -y dnsutils git libsnappy-dev liblz4-dev libzstd-dev zlib1g-dev \ + autoconf automake libtool curl make g++ +RUN cd /tmp && \ + wget https://github.com/protocolbuffers/protobuf/releases/download/v3.17.1/protobuf-all-3.17.1.tar.gz && \ + tar xfzv protobuf-all-3.17.1.tar.gz && \ + cd protobuf-3.17.1 && \ + ./autogen.sh && \ + ./configure && \ + make && \ + make install && \ + ldconfig && \ + rm -rf /tmp/proto* +RUN cd /tmp && \ + git clone https://github.com/facebook/rocksdb.git && \ cd rocksdb && \ git checkout v6.26.1 && \ make shared_lib && \ - make install-shared + make install-shared && \ + rm -rf /tmp/rocksdb CMD ["bash"] diff --git a/scripts/build_and_test.sh b/scripts/build_and_test.sh index a267f85..795d032 100755 --- a/scripts/build_and_test.sh +++ b/scripts/build_and_test.sh @@ -1,3 +1,4 @@ #!/bin/bash +./protobuf/build.sh go build . go test -v -race ./...