updates and fixes for integration tests

This commit is contained in:
Jeffrey Picard 2022-03-17 10:48:11 -04:00
parent d68a67133b
commit 05d7c5656c
6 changed files with 283 additions and 37 deletions

View file

@ -55,6 +55,9 @@ type ReadOnlyDBColumnFamily struct {
BlockedChannels map[string][]byte
FilteredStreams map[string][]byte
FilteredChannels map[string][]byte
ShutdownChan chan struct{}
DoneChan chan struct{}
Cleanup func()
}
type ResolveResult struct {
@ -80,10 +83,17 @@ type ResolveResult struct {
ChannelHash []byte
RepostedClaimHash []byte
SignatureValid bool
RepostTxHash []byte
RepostTxPostition uint16
RepostHeight uint32
ChannelTxHash []byte
ChannelTxPostition uint16
ChannelHeight uint32
}
type ResolveError struct {
Error error
ErrorType uint8
}
type OptionalResolveResultOrError interface {
@ -496,6 +506,7 @@ func GetProdDB(name string, secondaryPath string) (*ReadOnlyDBColumnFamily, func
log.Println(err)
}
}
db.Cleanup = cleanup
if err != nil {
return nil, cleanup, err
@ -540,6 +551,8 @@ func GetDBColumnFamlies(name string, secondayPath string, cfNames []string) (*Re
LastState: nil,
Height: 0,
Headers: nil,
ShutdownChan: make(chan struct{}),
DoneChan: make(chan struct{}),
}
err = ReadDBState(myDB) //TODO: Figure out right place for this
@ -599,6 +612,12 @@ func Unwind(db *ReadOnlyDBColumnFamily) {
db.Headers.Pop()
}
func Shutdown(db *ReadOnlyDBColumnFamily) {
db.ShutdownChan <- struct{}{}
<-db.DoneChan
db.Cleanup()
}
// RunDetectChanges Go routine the runs continuously while the hub is active
// to keep the db readonly view up to date and handle reorgs on the
// blockchain.
@ -610,7 +629,12 @@ func RunDetectChanges(db *ReadOnlyDBColumnFamily) {
if err != nil {
log.Printf("Error detecting changes: %#v\n", err)
}
time.Sleep(time.Second)
select {
case <-db.ShutdownChan:
db.DoneChan <- struct{}{}
return
case <-time.After(time.Millisecond * 10):
}
}
}()
}

View file

@ -401,15 +401,35 @@ func GetCachedClaimTxo(db *ReadOnlyDBColumnFamily, claim []byte, useCache bool)
return value, nil
}
func ControllingClaimIter(db *ReadOnlyDBColumnFamily) <-chan *prefixes.PrefixRowKV {
handle, err := EnsureHandle(db, prefixes.ClaimTakeover)
if err != nil {
return nil
}
key := prefixes.NewClaimTakeoverKey("")
var rawKeyPrefix []byte = nil
rawKeyPrefix = prefixes.ClaimTakeoverKeyPackPartial(key, 0)
options := NewIterateOptions().WithCfHandle(handle).WithPrefix(rawKeyPrefix)
options = options.WithIncludeValue(true) //.WithIncludeStop(true)
ch := IterCF(db.DB, options)
return ch
}
func GetControllingClaim(db *ReadOnlyDBColumnFamily, name string) (*prefixes.ClaimTakeoverValue, error) {
handle, err := EnsureHandle(db, prefixes.ClaimTakeover)
if err != nil {
return nil, err
}
log.Println(name)
key := prefixes.NewClaimTakeoverKey(name)
rawKey := key.PackKey()
log.Println(hex.EncodeToString(rawKey))
slice, err := db.DB.GetCF(db.Opts, handle, rawKey)
log.Printf("slice: %#v", slice)
log.Printf("err: %#v", err)
if err != nil {
return nil, err
}
@ -491,6 +511,21 @@ func GetDBState(db *ReadOnlyDBColumnFamily) (*prefixes.DBStateValue, error) {
return value, nil
}
func EffectiveAmountNameIter(db *ReadOnlyDBColumnFamily, normalizedName string) <-chan *prefixes.PrefixRowKV {
handle, err := EnsureHandle(db, prefixes.EffectiveAmount)
if err != nil {
return nil
}
key := prefixes.NewEffectiveAmountKey(normalizedName)
var rawKeyPrefix []byte = nil
rawKeyPrefix = prefixes.EffectiveAmountKeyPackPartial(key, 1)
options := NewIterateOptions().WithCfHandle(handle).WithPrefix(rawKeyPrefix)
options = options.WithIncludeValue(true) //.WithIncludeStop(true)
ch := IterCF(db.DB, options)
return ch
}
func ClaimShortIdIter(db *ReadOnlyDBColumnFamily, normalizedName string, claimId string) <-chan *prefixes.PrefixRowKV {
handle, err := EnsureHandle(db, prefixes.ClaimShortIdPrefix)
if err != nil {

View file

@ -9,6 +9,7 @@ import (
"strings"
"github.com/lbryio/hub/db/prefixes"
pb "github.com/lbryio/hub/protobuf/go"
"github.com/lbryio/lbry.go/v2/extras/util"
lbryurl "github.com/lbryio/lbry.go/v2/url"
log "github.com/sirupsen/logrus"
@ -68,6 +69,25 @@ func PrepareResolveResult(
return nil, err
}
var repostTxHash []byte
var repostTxPostition uint16
var repostHeight uint32
if repostedClaimHash != nil {
repostTxo, err := GetCachedClaimTxo(db, repostedClaimHash, true)
if err != nil {
return nil, err
}
if repostTxo != nil {
repostTxHash, err = GetTxHash(db, repostTxo.TxNum)
if err != nil {
return nil, err
}
repostTxPostition = repostTxo.Position
repostHeight, _ = db.TxCounts.TxCountsBisectRight(repostTxo.TxNum, rootTxNum, BisectRight)
}
}
shortUrl, err := GetShortClaimIdUrl(db, name, normalizedName, claimHash, txNum, rootPosition)
if err != nil {
return nil, err
@ -79,6 +99,10 @@ func PrepareResolveResult(
return nil, err
}
var channelTxHash []byte
var channelTxPostition uint16
var channelHeight uint32
if channelHash != nil {
// Ignore error because we already have this set if this doesn't work
channelVals, _ := GetCachedClaimTxo(db, channelHash, true)
@ -92,6 +116,12 @@ func PrepareResolveResult(
channelVals.RootPosition,
)
canonicalUrl = fmt.Sprintf("%s/%s", channelShortUrl, shortUrl)
channelTxHash, err = GetTxHash(db, channelVals.TxNum)
if err != nil {
return nil, err
}
channelTxPostition = channelVals.Position
channelHeight, _ = db.TxCounts.TxCountsBisectRight(channelVals.TxNum, rootTxNum, BisectRight)
}
}
@ -125,13 +155,29 @@ func PrepareResolveResult(
ChannelHash: channelHash,
RepostedClaimHash: repostedClaimHash,
SignatureValid: signatureValid,
RepostTxHash: repostTxHash,
RepostTxPostition: repostTxPostition,
RepostHeight: repostHeight,
ChannelTxHash: channelTxHash,
ChannelTxPostition: channelTxPostition,
ChannelHeight: channelHeight,
}, nil
}
func ResolveParsedUrl(db *ReadOnlyDBColumnFamily, parsed *PathSegment) (*ResolveResult, error) {
normalizedName := util.NormalizeName(parsed.name)
if (parsed.amountOrder == -1 && parsed.claimId == "") || parsed.amountOrder == 1 {
log.Warn("Resolving claim by name")
ch := ControllingClaimIter(db)
for kv := range ch {
key := kv.Key.(*prefixes.ClaimTakeoverKey)
val := kv.Value.(*prefixes.ClaimTakeoverValue)
log.Warnf("ClaimTakeoverKey: %#v", key)
log.Warnf("ClaimTakeoverValue: %#v", val)
}
controlling, err := GetControllingClaim(db, normalizedName)
log.Warnf("controlling: %#v", controlling)
log.Warnf("err: %#v", err)
if err != nil {
return nil, err
}
@ -145,6 +191,7 @@ func ResolveParsedUrl(db *ReadOnlyDBColumnFamily, parsed *PathSegment) (*Resolve
log.Println("amountOrder:", amountOrder)
// Resolve by claimId
if parsed.claimId != "" {
if len(parsed.claimId) == 40 {
claimHash, err := hex.DecodeString(parsed.claimId)
@ -167,6 +214,8 @@ func ResolveParsedUrl(db *ReadOnlyDBColumnFamily, parsed *PathSegment) (*Resolve
return nil, err
}
log.Warn("claimTxo.ChannelSignatureIsValid:", claimTxo.ChannelSignatureIsValid)
return PrepareResolveResult(
db,
claimTxo.TxNum,
@ -189,6 +238,10 @@ func ResolveParsedUrl(db *ReadOnlyDBColumnFamily, parsed *PathSegment) (*Resolve
ch := ClaimShortIdIter(db, normalizedName, parsed.claimId[:j])
row := <-ch
if row == nil {
return nil, nil
}
key := row.Key.(*prefixes.ClaimShortIDKey)
claimTxo := row.Value.(*prefixes.ClaimShortIDValue)
@ -210,6 +263,8 @@ func ResolveParsedUrl(db *ReadOnlyDBColumnFamily, parsed *PathSegment) (*Resolve
return nil, err
}
log.Warn("signatureIsValid:", signatureIsValid)
return PrepareResolveResult(
db,
claimTxo.TxNum,
@ -223,6 +278,51 @@ func ResolveParsedUrl(db *ReadOnlyDBColumnFamily, parsed *PathSegment) (*Resolve
)
}
// Resolve by amount ordering
/*
for idx, (key, claim_val) in enumerate(self.prefix_db.effective_amount.iterate(prefix=(normalized_name,))):
if amount_order > idx + 1:
continue
claim_txo = self.get_cached_claim_txo(claim_val.claim_hash)
activation = self.get_activation(key.tx_num, key.position)
return self._prepare_resolve_result(
key.tx_num, key.position, claim_val.claim_hash, key.normalized_name, claim_txo.root_tx_num,
claim_txo.root_position, activation, claim_txo.channel_signature_is_valid
)
*/
log.Warn("resolving by amount ordering")
ch := EffectiveAmountNameIter(db, normalizedName)
var i = 0
for kv := range ch {
if i+1 < amountOrder {
i++
continue
}
key := kv.Key.(*prefixes.EffectiveAmountKey)
claimVal := kv.Value.(*prefixes.EffectiveAmountValue)
claimTxo, err := GetCachedClaimTxo(db, claimVal.ClaimHash, true)
if err != nil {
return nil, err
}
activation, err := GetActivation(db, key.TxNum, key.Position)
if err != nil {
return nil, err
}
return PrepareResolveResult(
db,
key.TxNum,
key.Position,
claimVal.ClaimHash,
key.NormalizedName,
claimTxo.RootTxNum,
claimTxo.RootPosition,
activation,
claimTxo.ChannelSignatureIsValid,
)
}
return nil, nil
}
@ -283,17 +383,18 @@ func Resolve(db *ReadOnlyDBColumnFamily, url string) *ExpandedResolveResult {
var stream *PathSegment = nil
parsed, err := lbryurl.Parse(url, false)
log.Printf("parsed: %#v", parsed)
log.Warnf("parsed: %#v", parsed)
if err != nil {
log.Warn("lbryurl.Parse:", err)
res.Stream = &optionalResolveResultOrError{
err: &ResolveError{err},
err: &ResolveError{Error: err},
}
return res
}
// has stream in channel
if strings.Compare(parsed.StreamName, "") != 0 && strings.Compare(parsed.ClaimName, "") != 0 {
if strings.Compare(parsed.StreamName, "") != 0 && strings.Compare(parsed.ChannelName, "") != 0 {
channel = &PathSegment{
name: parsed.ClaimName,
claimId: parsed.ChannelClaimId,
@ -304,7 +405,7 @@ func Resolve(db *ReadOnlyDBColumnFamily, url string) *ExpandedResolveResult {
claimId: parsed.StreamClaimId,
amountOrder: parsed.SecondaryBidPosition,
}
} else if strings.Compare(parsed.ClaimName, "") != 0 {
} else if parsed.IsChannelUrl() {
channel = &PathSegment{
name: parsed.ClaimName,
claimId: parsed.ChannelClaimId,
@ -314,7 +415,7 @@ func Resolve(db *ReadOnlyDBColumnFamily, url string) *ExpandedResolveResult {
stream = &PathSegment{
name: parsed.StreamName,
claimId: parsed.StreamClaimId,
amountOrder: parsed.SecondaryBidPosition,
amountOrder: parsed.PrimaryBidPosition,
}
}
@ -327,21 +428,25 @@ func Resolve(db *ReadOnlyDBColumnFamily, url string) *ExpandedResolveResult {
resolvedChannel, err = ResolveParsedUrl(db, channel)
if err != nil {
res.Channel = &optionalResolveResultOrError{
err: &ResolveError{err},
err: &ResolveError{Error: err},
}
return res
} else if resolvedChannel == nil {
res.Channel = &optionalResolveResultOrError{
err: &ResolveError{fmt.Errorf("could not find channel in \"%s\"", url)},
err: &ResolveError{
Error: fmt.Errorf("Could not find claim at \"%s\".", url),
ErrorType: uint8(pb.Error_NOT_FOUND),
},
}
return res
}
}
if resolvedChannel != nil {
log.Printf("resolvedChannel: %#v\n", resolvedChannel)
log.Printf("resolvedChannel.TxHash: %s\n", hex.EncodeToString(resolvedChannel.TxHash))
log.Printf("resolvedChannel.ClaimHash: %s\n", hex.EncodeToString(resolvedChannel.ClaimHash))
log.Printf("resolvedChannel.ChannelHash: %s\n", hex.EncodeToString(resolvedChannel.ChannelHash))
log.Printf("stream %#v\n", stream)
}
if stream != nil {
if resolvedChannel != nil {
streamClaim, err := ResolveClaimInChannel(db, resolvedChannel.ClaimHash, stream.Normalized())
@ -353,7 +458,7 @@ func Resolve(db *ReadOnlyDBColumnFamily, url string) *ExpandedResolveResult {
// TODO: Confirm error case
if err != nil {
res.Stream = &optionalResolveResultOrError{
err: &ResolveError{err},
err: &ResolveError{Error: err},
}
return res
}
@ -363,7 +468,7 @@ func Resolve(db *ReadOnlyDBColumnFamily, url string) *ExpandedResolveResult {
// TODO: Confirm error case
if err != nil {
res.Stream = &optionalResolveResultOrError{
err: &ResolveError{err},
err: &ResolveError{Error: err},
}
return res
}
@ -373,7 +478,7 @@ func Resolve(db *ReadOnlyDBColumnFamily, url string) *ExpandedResolveResult {
// TODO: Confirm error case
if err != nil {
res.Stream = &optionalResolveResultOrError{
err: &ResolveError{err},
err: &ResolveError{Error: err},
}
return res
}
@ -382,7 +487,7 @@ func Resolve(db *ReadOnlyDBColumnFamily, url string) *ExpandedResolveResult {
// TODO: Confirm error case
if err != nil {
res.Channel = &optionalResolveResultOrError{
err: &ResolveError{err},
err: &ResolveError{Error: err},
}
return res
}
@ -390,7 +495,10 @@ func Resolve(db *ReadOnlyDBColumnFamily, url string) *ExpandedResolveResult {
}
if resolvedStream == nil {
res.Stream = &optionalResolveResultOrError{
err: &ResolveError{fmt.Errorf("could not find stream in \"%s\"", url)},
err: &ResolveError{
Error: fmt.Errorf("Could not find claim at \"%s\".", url),
ErrorType: uint8(pb.Error_NOT_FOUND),
},
}
return res
}
@ -399,7 +507,9 @@ func Resolve(db *ReadOnlyDBColumnFamily, url string) *ExpandedResolveResult {
// Getting blockers and filters
var repost *ResolveResult = nil
var repostedChannel *ResolveResult = nil
if resolvedChannel != nil && resolvedStream != nil {
log.Printf("about to get blockers and filters: %#v, %#v\n", resolvedChannel, resolvedStream)
}
if resolvedStream != nil || resolvedChannel != nil {
var claim *ResolveResult = nil
@ -418,7 +528,7 @@ func Resolve(db *ReadOnlyDBColumnFamily, url string) *ExpandedResolveResult {
log.Printf("blockerHash: %s\n", hex.EncodeToString(blockerHash))
if err != nil {
res.Channel = &optionalResolveResultOrError{
err: &ResolveError{err},
err: &ResolveError{Error: err},
}
return res
}
@ -426,12 +536,12 @@ func Resolve(db *ReadOnlyDBColumnFamily, url string) *ExpandedResolveResult {
reasonRow, err := FsGetClaimByHash(db, blockerHash)
if err != nil {
res.Channel = &optionalResolveResultOrError{
err: &ResolveError{err},
err: &ResolveError{Error: err},
}
return res
}
res.Channel = &optionalResolveResultOrError{
err: &ResolveError{fmt.Errorf("%s, %v, %v", url, blockerHash, reasonRow)},
err: &ResolveError{Error: fmt.Errorf("%s, %v, %v", url, blockerHash, reasonRow)},
}
return res
}
@ -439,7 +549,7 @@ func Resolve(db *ReadOnlyDBColumnFamily, url string) *ExpandedResolveResult {
repost, err = FsGetClaimByHash(db, claim.RepostedClaimHash)
if err != nil {
res.Channel = &optionalResolveResultOrError{
err: &ResolveError{err},
err: &ResolveError{Error: err},
}
return res
}
@ -447,7 +557,7 @@ func Resolve(db *ReadOnlyDBColumnFamily, url string) *ExpandedResolveResult {
repostedChannel, err = FsGetClaimByHash(db, repost.ChannelHash)
if err != nil {
res.Channel = &optionalResolveResultOrError{
err: &ResolveError{err},
err: &ResolveError{Error: err},
}
return res
}
@ -468,6 +578,7 @@ func Resolve(db *ReadOnlyDBColumnFamily, url string) *ExpandedResolveResult {
res: repostedChannel,
}
log.Printf("parsed: %#v\n", parsed)
log.Warnf("leaving Resolve, parsed: %#v\n", parsed)
log.Warnf("leaving Resolve, res: %s\n", res)
return res
}

View file

@ -3129,6 +3129,13 @@ type EffectiveAmountValue struct {
ClaimHash []byte `json:"claim_hash"`
}
func NewEffectiveAmountKey(normalizedName string) *EffectiveAmountKey {
return &EffectiveAmountKey{
Prefix: []byte{EffectiveAmount},
NormalizedName: normalizedName,
}
}
func (k *EffectiveAmountKey) PackKey() []byte {
prefixLen := 1
// 2 byte length field, plus number of bytes in name

View file

@ -45,7 +45,7 @@ func main() {
s.EsClient.Stop()
s.GrpcServer.GracefulStop()
s.DBCleanup()
db.Shutdown(s.DB)
log.Println("Returning from main...")
}()

View file

@ -46,7 +46,6 @@ type Server struct {
PeerSubsMut sync.RWMutex
NumPeerSubs *int64
ExternalIP net.IP
DBCleanup func()
pb.UnimplementedHubServer
}
@ -191,9 +190,14 @@ func MakeHubServer(ctx context.Context, args *Args) *Server {
//TODO: is this the right place to load the db?
var myDB *db.ReadOnlyDBColumnFamily
var dbCleanup = func() {}
// var dbShutdown = func() {}
if !args.DisableResolve {
myDB, dbCleanup, err = db.GetProdDB(args.DBPath, "readonlytmp")
tmpName := fmt.Sprintf("/tmp/%d", time.Now().Nanosecond())
logrus.Warn("tmpName", tmpName)
myDB, _, err = db.GetProdDB(args.DBPath, tmpName)
// dbShutdown = func() {
// db.Shutdown(myDB)
// }
if err != nil {
// Can't load the db, fail loudly
log.Fatalln(err)
@ -241,7 +245,6 @@ func MakeHubServer(ctx context.Context, args *Args) *Server {
PeerSubsMut: sync.RWMutex{},
NumPeerSubs: numSubs,
ExternalIP: net.IPv4(127, 0, 0, 1),
DBCleanup: dbCleanup,
}
// Start up our background services
@ -404,13 +407,48 @@ func (s *Server) Version(ctx context.Context, args *pb.EmptyMessage) (*pb.String
self.session_manager.resolved_url_count_metric.inc(len(sorted_urls))
*/
// type OutputWType struct {
// Output *pb.Output
// OutputType byte
// }
// const (
// OutputChannelType = iota
// OutputRepostType = iota
// OutputErrorType = iota
// )
func ResolveResultToOutput(res *db.ResolveResult) *pb.Output {
// func ResolveResultToOutput(res *db.ResolveResult, outputType byte) *OutputWType {
// res.ClaimHash
var channelOutput *pb.Output
var repostOutput *pb.Output
if res.ChannelTxHash != nil {
channelOutput = &pb.Output{
TxHash: res.ChannelTxHash,
Nout: uint32(res.ChannelTxPostition),
Height: res.ChannelHeight,
}
}
if res.RepostTxHash != nil {
repostOutput = &pb.Output{
TxHash: res.RepostTxHash,
Nout: uint32(res.RepostTxPostition),
Height: res.RepostHeight,
}
}
claimMeta := &pb.ClaimMeta{
Channel: channelOutput,
Repost: repostOutput,
ShortUrl: res.ShortUrl,
Reposted: uint32(res.Reposted),
IsControlling: res.IsControlling,
CreationHeight: res.CreationHeight,
ExpirationHeight: res.ExpirationHeight,
ClaimsInChannel: res.ClaimsInChannel,
EffectiveAmount: res.EffectiveAmount,
SupportAmount: res.SupportAmount,
}
@ -426,34 +464,52 @@ func ResolveResultToOutput(res *db.ResolveResult) *pb.Output {
Meta: claim,
}
// outputWType := &OutputWType{
// Output: output,
// OutputType: outputType,
// }
return output
}
func ExpandedResolveResultToOutput(res *db.ExpandedResolveResult) ([]*pb.Output, []*pb.Output, error) {
// func ExpandedResolveResultToOutput(res *db.ExpandedResolveResult) ([]*OutputWType, []*OutputWType, error) {
// FIXME: Set references in extraTxos properly
// FIXME: figure out the handling of rows and extra properly
// FIXME: want to return empty list or nil when extraTxos is empty?
txos := make([]*pb.Output, 0)
extraTxos := make([]*pb.Output, 0)
// txos := make([]*OutputWType, 0)
// extraTxos := make([]*OutputWType, 0)
// Errors
if x := res.Channel.GetError(); x != nil {
logrus.Warn("Channel error: ", x)
outputErr := &pb.Output_Error{
Error: &pb.Error{
Text: x.Error.Error(),
Code: 0, //FIXME
Code: pb.Error_Code(x.ErrorType), //FIXME
},
}
// res := &OutputWType{
// Output: &pb.Output{Meta: outputErr},
// OutputType: OutputErrorType,
// }
res := &pb.Output{Meta: outputErr}
txos = append(txos, res)
return txos, nil, nil
}
if x := res.Stream.GetError(); x != nil {
logrus.Warn("Stream error: ", x)
outputErr := &pb.Output_Error{
Error: &pb.Error{
Text: x.Error.Error(),
Code: 0, //FIXME
Code: pb.Error_Code(x.ErrorType), //FIXME
},
}
// res := &OutputWType{
// Output: &pb.Output{Meta: outputErr},
// OutputType: OutputErrorType,
// }
res := &pb.Output{Meta: outputErr}
txos = append(txos, res)
return txos, nil, nil
@ -483,10 +539,10 @@ func ExpandedResolveResultToOutput(res *db.ExpandedResolveResult) ([]*pb.Output,
return txos, extraTxos, nil
} else if stream != nil {
output := ResolveResultToOutput(channel)
output := ResolveResultToOutput(stream)
txos = append(txos, output)
if channel != nil {
output := ResolveResultToOutput(stream)
output := ResolveResultToOutput(channel)
extraTxos = append(extraTxos, output)
}
if repost != nil {
@ -521,12 +577,25 @@ func (s *Server) Resolve(ctx context.Context, args *pb.StringArray) (*pb.Outputs
allExtraTxos = append(allExtraTxos, extraTxos...)
}
return &pb.Outputs{
// for _, row := range allExtraTxos {
// for _, txo := range allExtraTxos {
// if txo.TxHash == row.TxHash && txo.Nout == row.Nout {
// txo.Extra = row.Extra
// break
// }
// }
// }
res := &pb.Outputs{
Txos: allTxos,
ExtraTxos: allExtraTxos,
Total: uint32(len(allTxos) + len(allExtraTxos)),
Offset: 0, //TODO
Blocked: nil, //TODO
BlockedTotal: 0, //TODO
}, nil
}
logrus.Warn(res)
return res, nil
}