Progress of reading rocksdb
This commit is contained in:
parent
01a938487a
commit
a3712f0c02
5 changed files with 453 additions and 6 deletions
259
db/db.go
259
db/db.go
|
@ -1,11 +1,261 @@
|
||||||
package db
|
package db
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
|
"encoding/binary"
|
||||||
|
"encoding/hex"
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/linxGnu/grocksdb"
|
|
||||||
"log"
|
"log"
|
||||||
|
"reflect"
|
||||||
|
|
||||||
|
"github.com/lbryio/hub/db/prefixes"
|
||||||
|
"github.com/linxGnu/grocksdb"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type IterOptions struct {
|
||||||
|
FillCache bool
|
||||||
|
Start []byte //interface{}
|
||||||
|
Stop []byte //interface{}
|
||||||
|
}
|
||||||
|
|
||||||
|
type PrefixRow struct {
|
||||||
|
//KeyStruct interface{}
|
||||||
|
//ValueStruct interface{}
|
||||||
|
Prefix []byte
|
||||||
|
KeyPackFunc interface{}
|
||||||
|
ValuePackFunc interface{}
|
||||||
|
DB *grocksdb.DB
|
||||||
|
}
|
||||||
|
|
||||||
|
type PrefixRowKV struct {
|
||||||
|
Key []byte
|
||||||
|
Value []byte
|
||||||
|
}
|
||||||
|
|
||||||
|
type UTXOKey struct {
|
||||||
|
Prefix []byte
|
||||||
|
HashX []byte
|
||||||
|
TxNum uint32
|
||||||
|
Nout uint16
|
||||||
|
}
|
||||||
|
|
||||||
|
type UTXOValue struct {
|
||||||
|
Amount uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewIterateOptions() *IterOptions {
|
||||||
|
return &IterOptions{
|
||||||
|
FillCache: false,
|
||||||
|
Start: nil,
|
||||||
|
Stop: nil,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (o *IterOptions) WithFillCache(fillCache bool) *IterOptions {
|
||||||
|
o.FillCache = fillCache
|
||||||
|
return o
|
||||||
|
}
|
||||||
|
|
||||||
|
func (o *IterOptions) WithStart(start []byte) *IterOptions {
|
||||||
|
o.Start = start
|
||||||
|
return o
|
||||||
|
}
|
||||||
|
|
||||||
|
func (o *IterOptions) WithStop(stop []byte) *IterOptions {
|
||||||
|
o.Stop = stop
|
||||||
|
return o
|
||||||
|
}
|
||||||
|
|
||||||
|
func (k *UTXOKey) String() string {
|
||||||
|
return fmt.Sprintf(
|
||||||
|
"%s(hashX=%s, tx_num=%d, nout=%d)",
|
||||||
|
reflect.TypeOf(k),
|
||||||
|
hex.EncodeToString(k.HashX),
|
||||||
|
k.TxNum,
|
||||||
|
k.Nout,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pr *PrefixRow) Iter(options *IterOptions) <-chan *PrefixRowKV {
|
||||||
|
ch := make(chan *PrefixRowKV)
|
||||||
|
|
||||||
|
ro := grocksdb.NewDefaultReadOptions()
|
||||||
|
ro.SetFillCache(options.FillCache)
|
||||||
|
it := pr.DB.NewIterator(ro)
|
||||||
|
|
||||||
|
it.Seek(pr.Prefix)
|
||||||
|
if options.Start != nil {
|
||||||
|
it.Seek(options.Start)
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
def _check_stop_iteration(self, key: bytes):
|
||||||
|
if self.stop is not None and (key.startswith(self.stop) or self.stop < key[:len(self.stop)]):
|
||||||
|
raise StopIteration
|
||||||
|
elif self.start is not None and self.start > key[:len(self.start)]:
|
||||||
|
raise StopIteration
|
||||||
|
elif self.prefix is not None and not key.startswith(self.prefix):
|
||||||
|
raise StopIteration
|
||||||
|
*/
|
||||||
|
terminateFunc := func(key []byte) bool {
|
||||||
|
if key == nil {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
if options.Stop != nil &&
|
||||||
|
(bytes.HasPrefix(key, options.Stop) || bytes.Compare(options.Stop, key[:len(options.Stop)]) < 0) {
|
||||||
|
return false
|
||||||
|
} else if options.Start != nil &&
|
||||||
|
bytes.Compare(options.Start, key[:len(options.Start)]) > 0 {
|
||||||
|
return false
|
||||||
|
} else if pr.Prefix != nil && !bytes.HasPrefix(key, pr.Prefix) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
var prevKey []byte = nil
|
||||||
|
go func() {
|
||||||
|
for ; terminateFunc(prevKey); it.Next() {
|
||||||
|
key := it.Key()
|
||||||
|
prevKey = key.Data()
|
||||||
|
value := it.Value()
|
||||||
|
|
||||||
|
ch <- &PrefixRowKV{
|
||||||
|
Key: key.Data(),
|
||||||
|
Value: value.Data(),
|
||||||
|
}
|
||||||
|
|
||||||
|
key.Free()
|
||||||
|
value.Free()
|
||||||
|
}
|
||||||
|
close(ch)
|
||||||
|
}()
|
||||||
|
|
||||||
|
return ch
|
||||||
|
}
|
||||||
|
|
||||||
|
func (k *UTXOKey) PackKey() []byte {
|
||||||
|
prefixLen := len(prefixes.UTXO)
|
||||||
|
// b'>11sLH'
|
||||||
|
n := prefixLen + 11 + 4 + 2
|
||||||
|
key := make([]byte, n)
|
||||||
|
copy(key, k.Prefix)
|
||||||
|
copy(key[prefixLen:], k.HashX)
|
||||||
|
binary.BigEndian.PutUint32(key[prefixLen+11:], k.TxNum)
|
||||||
|
binary.BigEndian.PutUint16(key[prefixLen+15:], k.Nout)
|
||||||
|
|
||||||
|
return key
|
||||||
|
}
|
||||||
|
|
||||||
|
// UTXOKeyPackPartial packs a variable number of fields for a UTXOKey into
|
||||||
|
// a byte array.
|
||||||
|
func UTXOKeyPackPartial(k *UTXOKey, nFields int) []byte {
|
||||||
|
// Limit nFields between 0 and number of fields, we always at least need
|
||||||
|
// the prefix and we never need to iterate past the number of fields.
|
||||||
|
if nFields > 3 {
|
||||||
|
nFields = 3
|
||||||
|
}
|
||||||
|
if nFields < 0 {
|
||||||
|
nFields = 0
|
||||||
|
}
|
||||||
|
|
||||||
|
// b'>11sLH'
|
||||||
|
prefixLen := len(prefixes.UTXO)
|
||||||
|
var n = prefixLen
|
||||||
|
for i := 0; i <= nFields; i++ {
|
||||||
|
switch i {
|
||||||
|
case 1:
|
||||||
|
n += 11
|
||||||
|
case 2:
|
||||||
|
n += 4
|
||||||
|
case 3:
|
||||||
|
n += 2
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
key := make([]byte, n)
|
||||||
|
|
||||||
|
for i := 0; i <= nFields; i++ {
|
||||||
|
switch i {
|
||||||
|
case 0:
|
||||||
|
copy(key, k.Prefix)
|
||||||
|
case 1:
|
||||||
|
copy(key[prefixLen:], k.HashX)
|
||||||
|
case 2:
|
||||||
|
binary.BigEndian.PutUint32(key[prefixLen+11:], k.TxNum)
|
||||||
|
case 3:
|
||||||
|
binary.BigEndian.PutUint16(key[prefixLen+15:], k.Nout)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return key
|
||||||
|
}
|
||||||
|
|
||||||
|
func UTXOKeyUnpack(key []byte) *UTXOKey {
|
||||||
|
return &UTXOKey{
|
||||||
|
Prefix: key[:1],
|
||||||
|
HashX: key[1:12],
|
||||||
|
TxNum: binary.BigEndian.Uint32(key[12:]),
|
||||||
|
Nout: binary.BigEndian.Uint16(key[16:]),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (k *UTXOValue) PackValue() []byte {
|
||||||
|
value := make([]byte, 8)
|
||||||
|
binary.BigEndian.PutUint64(value, k.Amount)
|
||||||
|
|
||||||
|
return value
|
||||||
|
}
|
||||||
|
|
||||||
|
func UTXOValueUnpack(value []byte) *UTXOValue {
|
||||||
|
return &UTXOValue{
|
||||||
|
Amount: binary.BigEndian.Uint64(value),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func GetDB(name string) (*grocksdb.DB, error) {
|
||||||
|
opts := grocksdb.NewDefaultOptions()
|
||||||
|
db, err := grocksdb.OpenDb(opts, name)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return db, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func ReadPrefixN(db *grocksdb.DB, prefix []byte, n int) []*PrefixRowKV {
|
||||||
|
ro := grocksdb.NewDefaultReadOptions()
|
||||||
|
ro.SetFillCache(false)
|
||||||
|
|
||||||
|
it := db.NewIterator(ro)
|
||||||
|
defer it.Close()
|
||||||
|
|
||||||
|
res := make([]*PrefixRowKV, n)
|
||||||
|
|
||||||
|
var i = 0
|
||||||
|
it.Seek(prefix)
|
||||||
|
for ; it.Valid(); it.Next() {
|
||||||
|
key := it.Key()
|
||||||
|
value := it.Value()
|
||||||
|
|
||||||
|
res[i] = &PrefixRowKV{
|
||||||
|
Key: key.Data(),
|
||||||
|
Value: value.Data(),
|
||||||
|
}
|
||||||
|
|
||||||
|
key.Free()
|
||||||
|
value.Free()
|
||||||
|
i++
|
||||||
|
if i >= n {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return res
|
||||||
|
}
|
||||||
|
|
||||||
func OpenDB(name string) int {
|
func OpenDB(name string) int {
|
||||||
// Read db
|
// Read db
|
||||||
opts := grocksdb.NewDefaultOptions()
|
opts := grocksdb.NewDefaultOptions()
|
||||||
|
@ -23,7 +273,7 @@ func OpenDB(name string) int {
|
||||||
|
|
||||||
var i = 0
|
var i = 0
|
||||||
it.Seek([]byte("foo"))
|
it.Seek([]byte("foo"))
|
||||||
for it = it; it.Valid(); it.Next() {
|
for ; it.Valid(); it.Next() {
|
||||||
key := it.Key()
|
key := it.Key()
|
||||||
value := it.Value()
|
value := it.Value()
|
||||||
|
|
||||||
|
@ -52,6 +302,9 @@ func OpenAndWriteDB(in string, out string) {
|
||||||
// Write db
|
// Write db
|
||||||
opts.SetCreateIfMissing(true)
|
opts.SetCreateIfMissing(true)
|
||||||
db2, err := grocksdb.OpenDb(opts, out)
|
db2, err := grocksdb.OpenDb(opts, out)
|
||||||
|
if err != nil {
|
||||||
|
log.Println(err)
|
||||||
|
}
|
||||||
wo := grocksdb.NewDefaultWriteOptions()
|
wo := grocksdb.NewDefaultWriteOptions()
|
||||||
defer db2.Close()
|
defer db2.Close()
|
||||||
|
|
||||||
|
@ -63,7 +316,7 @@ func OpenAndWriteDB(in string, out string) {
|
||||||
|
|
||||||
var i = 0
|
var i = 0
|
||||||
it.Seek([]byte("foo"))
|
it.Seek([]byte("foo"))
|
||||||
for it = it; it.Valid() && i < 10; it.Next() {
|
for ; it.Valid() && i < 10; it.Next() {
|
||||||
key := it.Key()
|
key := it.Key()
|
||||||
value := it.Value()
|
value := it.Value()
|
||||||
fmt.Printf("Key: %v Value: %v\n", key.Data(), value.Data())
|
fmt.Printf("Key: %v Value: %v\n", key.Data(), value.Data())
|
||||||
|
|
139
db/db_test.go
139
db/db_test.go
|
@ -1,9 +1,111 @@
|
||||||
package db
|
package db
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"encoding/hex"
|
||||||
|
"fmt"
|
||||||
|
"log"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
"github.com/lbryio/hub/db/prefixes"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func TestReadUTXO2(t *testing.T) {
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
want []uint64
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "Read 10 UTXO Key Values",
|
||||||
|
want: []uint64{2174594, 200000000, 20000000, 100000, 603510, 75000000, 100000, 962984, 25000000, 50000000},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
db, err := GetDB("/mnt/d/data/wallet/lbry-rocksdb")
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("err not nil: %+v\n", err)
|
||||||
|
}
|
||||||
|
utxoRow := &PrefixRow{
|
||||||
|
// KeyStruct: UTXOKey{},
|
||||||
|
// ValueStruct: UTXOValue{},
|
||||||
|
Prefix: prefixes.UTXO,
|
||||||
|
KeyPackFunc: nil,
|
||||||
|
ValuePackFunc: nil,
|
||||||
|
DB: db,
|
||||||
|
}
|
||||||
|
b, err := hex.DecodeString("000012b")
|
||||||
|
if err != nil {
|
||||||
|
log.Println(err)
|
||||||
|
}
|
||||||
|
stopKey := &UTXOKey{
|
||||||
|
Prefix: prefixes.UTXO,
|
||||||
|
HashX: b,
|
||||||
|
TxNum: 0,
|
||||||
|
Nout: 0,
|
||||||
|
}
|
||||||
|
stop := UTXOKeyPackPartial(stopKey, 1)
|
||||||
|
|
||||||
|
options := NewIterateOptions().WithFillCache(false).WithStop(stop)
|
||||||
|
log.Println(options)
|
||||||
|
|
||||||
|
ch := utxoRow.Iter(options)
|
||||||
|
|
||||||
|
var i = 0
|
||||||
|
for kv := range ch {
|
||||||
|
log.Println(kv.Key)
|
||||||
|
log.Println(UTXOKeyUnpack(kv.Key))
|
||||||
|
log.Println(UTXOValueUnpack(kv.Value))
|
||||||
|
got := UTXOValueUnpack(kv.Value).Amount
|
||||||
|
if got != tt.want[i] {
|
||||||
|
t.Errorf("got: %d, want: %d\n", got, tt.want)
|
||||||
|
}
|
||||||
|
i++
|
||||||
|
if i >= 10 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestReadUTXO(t *testing.T) {
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
want int
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "Read 10 UTXO Key Values",
|
||||||
|
want: 10,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
db, err := GetDB("/mnt/d/data/wallet/lbry-rocksdb")
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("err not nil: %+v\n", err)
|
||||||
|
}
|
||||||
|
data := ReadPrefixN(db, prefixes.UTXO, tt.want)
|
||||||
|
|
||||||
|
got := len(data)
|
||||||
|
|
||||||
|
for _, kv := range data {
|
||||||
|
log.Println(UTXOKeyUnpack(kv.Key))
|
||||||
|
log.Println(UTXOValueUnpack(kv.Value))
|
||||||
|
}
|
||||||
|
|
||||||
|
if got != tt.want {
|
||||||
|
t.Errorf("got: %d, want: %d\n", got, tt.want)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
// TestOpenDB test to see if we can open a db
|
// TestOpenDB test to see if we can open a db
|
||||||
func TestOpenDB(t *testing.T) {
|
func TestOpenDB(t *testing.T) {
|
||||||
|
|
||||||
|
@ -29,3 +131,40 @@ func TestOpenDB(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestUTXOKey_String(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
prefix []byte
|
||||||
|
hashx []byte
|
||||||
|
txnum uint32
|
||||||
|
nout uint16
|
||||||
|
want string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "Converts to string",
|
||||||
|
prefix: []byte("u"),
|
||||||
|
hashx: []byte("AAAAAAAAAA"),
|
||||||
|
txnum: 0,
|
||||||
|
nout: 0,
|
||||||
|
want: "db.UTXOKey(hashX=41414141414141414141, tx_num=0, nout=0)",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
key := UTXOKey{
|
||||||
|
Prefix: tt.prefix,
|
||||||
|
HashX: tt.hashx,
|
||||||
|
TxNum: tt.txnum,
|
||||||
|
Nout: tt.nout,
|
||||||
|
}
|
||||||
|
|
||||||
|
got := fmt.Sprint(key)
|
||||||
|
log.Println(got)
|
||||||
|
if got != tt.want {
|
||||||
|
t.Errorf("got: %s, want: %s\n", got, tt.want)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
41
db/prefixes/prefixes.go
Normal file
41
db/prefixes/prefixes.go
Normal file
|
@ -0,0 +1,41 @@
|
||||||
|
package prefixes
|
||||||
|
|
||||||
|
var (
|
||||||
|
ClaimToSupport = []byte("K")
|
||||||
|
SupportToClaim = []byte("L")
|
||||||
|
|
||||||
|
ClaimToTXO = []byte("E")
|
||||||
|
TXOToClaim = []byte("G")
|
||||||
|
|
||||||
|
ClaimToChannel = []byte("I")
|
||||||
|
ChannelToClaim = []byte("J")
|
||||||
|
|
||||||
|
ClaimShortIdPrefix = []byte("F")
|
||||||
|
EffectiveAmount = []byte("D")
|
||||||
|
ClaimExpiration = []byte("O")
|
||||||
|
|
||||||
|
ClaimTakeover = []byte("P")
|
||||||
|
PendingActivation = []byte("Q")
|
||||||
|
ActivatedClaimAndSupport = []byte("R")
|
||||||
|
ActiveAmount = []byte("S")
|
||||||
|
|
||||||
|
Repost = []byte("V")
|
||||||
|
RepostedClaim = []byte("W")
|
||||||
|
|
||||||
|
Undo = []byte("M")
|
||||||
|
ClaimDiff = []byte("Y")
|
||||||
|
|
||||||
|
Tx = []byte("B")
|
||||||
|
BlockHash = []byte("C")
|
||||||
|
Header = []byte("H")
|
||||||
|
TxNum = []byte("N")
|
||||||
|
TxCount = []byte("T")
|
||||||
|
TxHash = []byte("X")
|
||||||
|
UTXO = []byte("u")
|
||||||
|
HashXUTXO = []byte("h")
|
||||||
|
HashXHistory = []byte("x")
|
||||||
|
DBState = []byte("s")
|
||||||
|
ChannelCount = []byte("Z")
|
||||||
|
SupportAmount = []byte("a")
|
||||||
|
BlockTXs = []byte("b")
|
||||||
|
)
|
|
@ -2,11 +2,24 @@ FROM golang:1.16.11-bullseye
|
||||||
|
|
||||||
RUN apt-get update && \
|
RUN apt-get update && \
|
||||||
apt-get upgrade && \
|
apt-get upgrade && \
|
||||||
apt-get install -y dnsutils git libsnappy-dev liblz4-dev libzstd-dev zlib1g-dev
|
apt-get install -y dnsutils git libsnappy-dev liblz4-dev libzstd-dev zlib1g-dev \
|
||||||
RUN git clone https://github.com/facebook/rocksdb.git && \
|
autoconf automake libtool curl make g++
|
||||||
|
RUN cd /tmp && \
|
||||||
|
wget https://github.com/protocolbuffers/protobuf/releases/download/v3.17.1/protobuf-all-3.17.1.tar.gz && \
|
||||||
|
tar xfzv protobuf-all-3.17.1.tar.gz && \
|
||||||
|
cd protobuf-3.17.1 && \
|
||||||
|
./autogen.sh && \
|
||||||
|
./configure && \
|
||||||
|
make && \
|
||||||
|
make install && \
|
||||||
|
ldconfig && \
|
||||||
|
rm -rf /tmp/proto*
|
||||||
|
RUN cd /tmp && \
|
||||||
|
git clone https://github.com/facebook/rocksdb.git && \
|
||||||
cd rocksdb && \
|
cd rocksdb && \
|
||||||
git checkout v6.26.1 && \
|
git checkout v6.26.1 && \
|
||||||
make shared_lib && \
|
make shared_lib && \
|
||||||
make install-shared
|
make install-shared && \
|
||||||
|
rm -rf /tmp/rocksdb
|
||||||
|
|
||||||
CMD ["bash"]
|
CMD ["bash"]
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
#!/bin/bash
|
#!/bin/bash
|
||||||
|
./protobuf/build.sh
|
||||||
go build .
|
go build .
|
||||||
go test -v -race ./...
|
go test -v -race ./...
|
||||||
|
|
Loading…
Reference in a new issue