herald.go/db/db.go
2022-02-08 12:50:37 -05:00

528 lines
11 KiB
Go

package db
import (
"bytes"
"encoding/hex"
"fmt"
"log"
"math"
"os"
"github.com/lbryio/hub/db/prefixes"
"github.com/lbryio/lbry.go/v2/extras/util"
"github.com/linxGnu/grocksdb"
)
type ResolveResult struct {
Name string
NormalizedName string
ClaimHash []byte
TxNum int
Position int
TxHash []byte
Height int
Amount int
ShortUrl string
IsControlling bool
CanonicalUrl string
CreationHeight int
ActivationHeight int
ExpirationHeight int
EffectiveAmount int
SupportAmount int
Reposted int
LastTakeoverHeight int
ClaimsInChannel int
ChannelHash []byte
RepostedClaimHash []byte
SignatureValid bool
}
type ResolveError struct {
Error error
}
type OptionalResolveResultOrError interface {
GetResult() *ResolveResult
GetError() *ResolveError
}
type optionalResolveResultOrError struct {
res *ResolveResult
err *ResolveError
}
func (x *optionalResolveResultOrError) GetResult() *ResolveResult {
return x.res
}
func (x *optionalResolveResultOrError) GetError() *ResolveError {
return x.err
}
type ExpandedResolveResult struct {
Stream OptionalResolveResultOrError
Channel OptionalResolveResultOrError
Repost OptionalResolveResultOrError
RepostedChannel OptionalResolveResultOrError
}
type IterOptions struct {
FillCache bool
Prefix []byte
Start []byte //interface{}
Stop []byte //interface{}
IncludeStart bool
IncludeStop bool
IncludeKey bool
IncludeValue bool
RawKey bool
RawValue bool
CfHandle *grocksdb.ColumnFamilyHandle
It *grocksdb.Iterator
}
// NewIterateOptions creates a defualt options structure for a db iterator.
func NewIterateOptions() *IterOptions {
return &IterOptions{
FillCache: false,
Prefix: []byte{},
Start: nil,
Stop: nil,
IncludeStart: true,
IncludeStop: false,
IncludeKey: true,
IncludeValue: false,
RawKey: false,
RawValue: false,
CfHandle: nil,
It: nil,
}
}
func (o *IterOptions) WithCfHandle(cfHandle *grocksdb.ColumnFamilyHandle) *IterOptions {
o.CfHandle = cfHandle
return o
}
func (o *IterOptions) WithFillCache(fillCache bool) *IterOptions {
o.FillCache = fillCache
return o
}
func (o *IterOptions) WithPrefix(prefix []byte) *IterOptions {
o.Prefix = prefix
return o
}
func (o *IterOptions) WithStart(start []byte) *IterOptions {
o.Start = start
return o
}
func (o *IterOptions) WithStop(stop []byte) *IterOptions {
o.Stop = stop
return o
}
func (o *IterOptions) WithIncludeStart(includeStart bool) *IterOptions {
o.IncludeStart = includeStart
return o
}
func (o *IterOptions) WithIncludeStop(includeStop bool) *IterOptions {
o.IncludeStop = includeStop
return o
}
func (o *IterOptions) WithIncludeKey(includeKey bool) *IterOptions {
o.IncludeKey = includeKey
return o
}
func (o *IterOptions) WithIncludeValue(includeValue bool) *IterOptions {
o.IncludeValue = includeValue
return o
}
func (o *IterOptions) WithRawKey(rawKey bool) *IterOptions {
o.RawKey = rawKey
return o
}
func (o *IterOptions) WithRawValue(rawValue bool) *IterOptions {
o.RawValue = rawValue
return o
}
func (o *IterOptions) StopIteration(key []byte) bool {
if key == nil {
return false
}
maxLen := int(math.Min(float64(len(key)), float64(len(o.Stop))))
if o.Stop != nil &&
(bytes.HasPrefix(key, o.Stop) || bytes.Compare(o.Stop, key[:maxLen]) < 0) {
return true
} else if o.Start != nil &&
bytes.Compare(o.Start, key[:len(o.Start)]) > 0 {
return true
} else if o.Prefix != nil && !bytes.HasPrefix(key, o.Prefix) {
return true
}
return false
}
type PathSegment struct {
name string
claimId string
amountOrder int
}
func (ps *PathSegment) Normalized() string {
return util.NormalizeName(ps.name)
}
func (ps *PathSegment) IsShortId() bool {
return ps.claimId != "" && len(ps.claimId) < 40
}
func (ps *PathSegment) IsFullId() bool {
return len(ps.claimId) == 40
}
func (ps *PathSegment) String() string {
if ps.claimId != "" {
return fmt.Sprintf("%s:%s", ps.name, ps.claimId)
} else if ps.amountOrder != 0 {
return fmt.Sprintf("%s:%d", ps.name, ps.amountOrder)
}
return ps.name
}
type URL struct {
stream *PathSegment
channel *PathSegment
}
func NewURL() *URL {
return &URL{
stream: nil,
channel: nil,
}
}
func ParseURL(url string) (parsed *URL, err error) {
return NewURL(), nil
}
func Resolve(db *grocksdb.DB, url string) *ExpandedResolveResult {
var res = &ExpandedResolveResult{
Stream: nil,
Channel: nil,
Repost: nil,
RepostedChannel: nil,
}
parsed, err := ParseURL(url)
if err != nil {
res.Stream = &optionalResolveResultOrError{
err: &ResolveError{err},
}
return res
}
log.Printf("parsed: %+v\n", parsed)
return res
}
func (opts *IterOptions) ReadRow(ch chan *prefixes.PrefixRowKV, prevKey *[]byte) bool {
it := opts.It
key := it.Key()
keyData := key.Data()
keyLen := len(keyData)
value := it.Value()
valueData := value.Data()
valueLen := len(valueData)
var outKey interface{} = nil
var outValue interface{} = nil
var err error = nil
// We need to check the current key is we're not including the stop
// key.
if !opts.IncludeStop && opts.StopIteration(keyData) {
return false
}
// We have to copy the key no matter what because we need to check
// it on the next iterations to see if we're going to stop.
newKeyData := make([]byte, keyLen)
copy(newKeyData, keyData)
if opts.IncludeKey && !opts.RawKey {
outKey, err = prefixes.UnpackGenericKey(newKeyData)
if err != nil {
log.Println(err)
}
} else if opts.IncludeKey {
outKey = newKeyData
}
// Value could be quite large, so this setting could be important
// for performance in some cases.
if opts.IncludeValue {
newValueData := make([]byte, valueLen)
copy(newValueData, valueData)
if !opts.RawValue {
outValue, err = prefixes.UnpackGenericValue(newKeyData, newValueData)
if err != nil {
log.Println(err)
}
} else {
outValue = newValueData
}
}
key.Free()
value.Free()
ch <- &prefixes.PrefixRowKV{
Key: outKey,
Value: outValue,
}
*prevKey = newKeyData
return true
}
func IterCF(db *grocksdb.DB, opts *IterOptions) <-chan *prefixes.PrefixRowKV {
ch := make(chan *prefixes.PrefixRowKV)
ro := grocksdb.NewDefaultReadOptions()
ro.SetFillCache(opts.FillCache)
it := db.NewIteratorCF(ro, opts.CfHandle)
// it := db.NewIterator(ro)
opts.It = it
it.Seek(opts.Prefix)
if opts.Start != nil {
it.Seek(opts.Start)
}
go func() {
defer it.Close()
defer close(ch)
var prevKey []byte = nil
if !opts.IncludeStart {
it.Next()
}
if !it.Valid() && opts.IncludeStop {
opts.ReadRow(ch, &prevKey)
}
for ; !opts.StopIteration(prevKey) && it.Valid(); it.Next() {
opts.ReadRow(ch, &prevKey)
}
}()
return ch
}
func Iter(db *grocksdb.DB, opts *IterOptions) <-chan *prefixes.PrefixRowKV {
ch := make(chan *prefixes.PrefixRowKV)
ro := grocksdb.NewDefaultReadOptions()
ro.SetFillCache(opts.FillCache)
it := db.NewIterator(ro)
opts.It = it
it.Seek(opts.Prefix)
if opts.Start != nil {
it.Seek(opts.Start)
}
go func() {
defer it.Close()
defer close(ch)
var prevKey []byte = nil
if !opts.IncludeStart {
it.Next()
}
if !it.Valid() && opts.IncludeStop {
opts.ReadRow(ch, &prevKey)
}
for ; !opts.StopIteration(prevKey) && it.Valid(); it.Next() {
opts.ReadRow(ch, &prevKey)
}
}()
return ch
}
func GetWriteDBCF(name string) (*grocksdb.DB, []*grocksdb.ColumnFamilyHandle, error) {
opts := grocksdb.NewDefaultOptions()
cfOpt := grocksdb.NewDefaultOptions()
cfNames, err := grocksdb.ListColumnFamilies(opts, name)
if err != nil {
return nil, nil, err
}
cfOpts := make([]*grocksdb.Options, len(cfNames))
for i, _ := range cfNames {
cfOpts[i] = cfOpt
}
db, handles, err := grocksdb.OpenDbColumnFamilies(opts, name, cfNames, cfOpts)
if err != nil {
return nil, nil, err
}
for i, handle := range handles {
log.Printf("%d: %s, %+v\n", i, cfNames[i], handle)
}
return db, handles, nil
}
func GetDBCF(name string, cf string) (*grocksdb.DB, []*grocksdb.ColumnFamilyHandle, error) {
opts := grocksdb.NewDefaultOptions()
cfOpt := grocksdb.NewDefaultOptions()
cfNames := []string{"default", cf}
cfOpts := []*grocksdb.Options{cfOpt, cfOpt}
db, handles, err := grocksdb.OpenDbAsSecondaryColumnFamilies(opts, name, "asdf", cfNames, cfOpts)
for i, handle := range handles {
log.Printf("%d: %+v\n", i, handle)
}
if err != nil {
return nil, nil, err
}
return db, handles, nil
}
func GetDB(name string) (*grocksdb.DB, error) {
opts := grocksdb.NewDefaultOptions()
db, err := grocksdb.OpenDbAsSecondary(opts, name, "asdf")
if err != nil {
return nil, err
}
return db, nil
}
func ReadPrefixN(db *grocksdb.DB, prefix []byte, n int) []*prefixes.PrefixRowKV {
ro := grocksdb.NewDefaultReadOptions()
ro.SetFillCache(false)
it := db.NewIterator(ro)
defer it.Close()
res := make([]*prefixes.PrefixRowKV, n)
var i = 0
it.Seek(prefix)
for ; it.Valid(); it.Next() {
key := it.Key()
value := it.Value()
res[i] = &prefixes.PrefixRowKV{
Key: key.Data(),
Value: value.Data(),
}
key.Free()
value.Free()
i++
if i >= n {
break
}
}
return res
}
func ReadWriteRawNCF(db *grocksdb.DB, options *IterOptions, out string, n int) {
options.RawKey = true
options.RawValue = true
ch := IterCF(db, options)
file, err := os.Create(out)
if err != nil {
log.Println(err)
return
}
defer file.Close()
var i = 0
log.Println(options.Prefix)
file.Write([]byte(fmt.Sprintf("%s,\n", options.Prefix)))
for kv := range ch {
log.Println(i)
if i >= n {
return
}
key := kv.Key.([]byte)
value := kv.Value.([]byte)
keyHex := hex.EncodeToString(key)
valueHex := hex.EncodeToString(value)
//log.Println(keyHex)
//log.Println(valueHex)
file.WriteString(keyHex)
file.WriteString(",")
file.WriteString(valueHex)
file.WriteString("\n")
i++
}
}
func ReadWriteRawN(db *grocksdb.DB, options *IterOptions, out string, n int) {
options.RawKey = true
options.RawValue = true
ch := Iter(db, options)
file, err := os.Create(out)
if err != nil {
log.Println(err)
return
}
defer file.Close()
var i = 0
for kv := range ch {
log.Println(i)
if i >= n {
return
}
key := kv.Key.([]byte)
value := kv.Value.([]byte)
keyHex := hex.EncodeToString(key)
valueHex := hex.EncodeToString(value)
log.Println(keyHex)
log.Println(valueHex)
file.WriteString(keyHex)
file.WriteString(",")
file.WriteString(valueHex)
file.WriteString("\n")
i++
}
}
func GenerateTestData(prefix byte, fileName string) {
dbVal, err := GetDB("/mnt/d/data/wallet/lbry-rocksdb/")
if err != nil {
log.Fatalln(err)
}
options := NewIterateOptions()
options.WithRawKey(true).WithRawValue(true).WithIncludeValue(true)
options.WithPrefix([]byte{prefix})
ReadWriteRawN(dbVal, options, fileName, 10)
}