Remove resolve code and get things setup for implementing paging / limit

claims per channel manually
This commit is contained in:
Jeffrey Picard 2021-06-06 22:28:13 -04:00
parent 836d517dd0
commit 1cd11d2c73

View file

@ -4,11 +4,10 @@ import (
"context" "context"
"encoding/hex" "encoding/hex"
"encoding/json" "encoding/json"
"fmt"
"github.com/btcsuite/btcutil/base58" "github.com/btcsuite/btcutil/base58"
"github.com/golang/protobuf/ptypes/wrappers" "github.com/golang/protobuf/ptypes/wrappers"
pb "github.com/lbryio/hub/protobuf/go" pb "github.com/lbryio/hub/protobuf/go"
"github.com/lbryio/hub/schema" //"github.com/lbryio/hub/schema"
"github.com/lbryio/hub/util" "github.com/lbryio/hub/util"
"github.com/olivere/elastic/v7" "github.com/olivere/elastic/v7"
"golang.org/x/text/cases" "golang.org/x/text/cases"
@ -200,141 +199,6 @@ func (s *Server) cleanTags(tags []string) []string {
return cleanedTags return cleanedTags
} }
func (s *Server) fullIdFromShortId(ctx context.Context, channelName string, claimId string) (string, error) {
return "", nil
}
func (s *Server) resolveStream(ctx context.Context, url *schema.URL, channelId string) (string, error) {
return "", nil
}
func (s *Server) resolveChannelId(ctx context.Context, url *schema.URL) (string, error) {
if !url.HasChannel() {
return "", nil
}
if url.Channel.IsFullID() {
return url.Channel.ClaimId, nil
}
if url.Channel.IsShortID() {
channelId, err := s.fullIdFromShortId(ctx, url.Channel.Name, url.Channel.ClaimId)
if err != nil {
return "", err
}
return channelId, nil
}
in := &pb.SearchRequest{}
in.Normalized = []string{util.Normalize(url.Channel.Name)}
if url.Channel.ClaimId == "" && url.Channel.AmountOrder < 0 {
in.IsControlling = &wrappers.BoolValue{Value: true}
} else {
if url.Channel.AmountOrder > 0 {
in.AmountOrder = &wrappers.Int32Value{Value: int32(url.Channel.AmountOrder)}
}
if url.Channel.ClaimId != "" {
in.ClaimId = &pb.InvertibleField{
Invert: false,
Value: []string{url.Channel.ClaimId},
}
}
}
var size = 1
var from = 0
q := elastic.NewBoolQuery()
q = AddTermsField(in.Normalized, "normalized", q)
if in.IsControlling != nil {
q = q.Must(elastic.NewTermQuery("is_controlling", in.IsControlling.Value))
}
if in.AmountOrder != nil {
in.Limit.Value = 1
in.OrderBy = []string{"effective_amount"}
in.Offset = &wrappers.Int32Value{Value: in.AmountOrder.Value - 1}
}
if in.Limit != nil {
size = int(in.Limit.Value)
}
if in.Offset != nil {
from = int(in.Offset.Value)
}
if in.ClaimId != nil {
searchVals := StrArrToInterface(in.ClaimId.Value)
if len(in.ClaimId.Value) == 1 && len(in.ClaimId.Value[0]) < 20 {
if in.ClaimId.Invert {
q = q.MustNot(elastic.NewPrefixQuery("claim_id.keyword", in.ClaimId.Value[0]))
} else {
q = q.Must(elastic.NewPrefixQuery("claim_id.keyword", in.ClaimId.Value[0]))
}
} else {
if in.ClaimId.Invert {
q = q.MustNot(elastic.NewTermsQuery("claim_id.keyword", searchVals...))
} else {
q = q.Must(elastic.NewTermsQuery("claim_id.keyword", searchVals...))
}
}
}
searchResult, err := s.EsClient.Search().
Query(q). // specify the query
From(from).Size(size).
Do(ctx)
if err != nil {
return "", err
}
var r record
var channelId string
for _, item := range searchResult.Each(reflect.TypeOf(r)) {
if t, ok := item.(record); ok {
channelId = t.ClaimId
}
}
//matches, err := s.Search(ctx, in)
//if err != nil {
// return "", err
//}
return channelId, nil
}
func (s *Server) resolveUrl(ctx context.Context, rawUrl string) *urlResolution {
url := schema.ParseURL(rawUrl)
if url == nil {
return nil
}
channelId, err := s.resolveChannelId(ctx, url)
if err != nil {
return &urlResolution{
resolutionType: errorResolution,
value: fmt.Sprintf("Could not find channel in \"%s\".", url),
}
}
stream, _ := s.resolveStream(ctx, url, channelId)
if url.HasStream() {
return &urlResolution{
resolutionType: streamResolution,
value: stream,
}
} else {
return &urlResolution{
resolutionType: channelResolution,
value: channelId,
}
}
}
func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs, error) { func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs, error) {
var client *elastic.Client = nil var client *elastic.Client = nil
if s.EsClient == nil { if s.EsClient == nil {
@ -394,7 +258,8 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs,
} }
var from = 0 var from = 0
var size = 10 var size = 1000
var pageSize = 10
var orderBy []orderField var orderBy []orderField
var ms *multiSorter var ms *multiSorter
@ -422,7 +287,8 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs,
} }
if in.Limit != nil { if in.Limit != nil {
size = int(in.Limit.Value) pageSize = int(in.Limit.Value)
log.Printf("page size: %d\n", pageSize)
} }
if in.Offset != nil { if in.Offset != nil {
@ -538,19 +404,19 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs,
q = q.Should(elastic.NewBoolQuery().Must(elastic.NewTermQuery("reposted_claim_type", claimTypes["channel"]))) q = q.Should(elastic.NewBoolQuery().Must(elastic.NewTermQuery("reposted_claim_type", claimTypes["channel"])))
} }
var collapse *elastic.CollapseBuilder //var collapse *elastic.CollapseBuilder
if in.LimitClaimsPerChannel != nil { //if in.LimitClaimsPerChannel != nil {
println(in.LimitClaimsPerChannel.Value) // println(in.LimitClaimsPerChannel.Value)
innerHit := elastic. // innerHit := elastic.
NewInnerHit(). // NewInnerHit().
//From(0). // //From(0).
Size(int(in.LimitClaimsPerChannel.Value)). // Size(int(in.LimitClaimsPerChannel.Value)).
Name("channel_id") // Name("channel_id")
for _, x := range orderBy { // for _, x := range orderBy {
innerHit = innerHit.Sort(x.Field, x.IsAsc) // innerHit = innerHit.Sort(x.Field, x.IsAsc)
} // }
collapse = elastic.NewCollapseBuilder("channel_id.keyword").InnerHit(innerHit) // collapse = elastic.NewCollapseBuilder("channel_id.keyword").InnerHit(innerHit)
} //}
if in.TxNout != nil { if in.TxNout != nil {
q = q.Must(elastic.NewTermQuery("tx_nout", in.TxNout.Value)) q = q.Must(elastic.NewTermQuery("tx_nout", in.TxNout.Value))
@ -643,9 +509,9 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs,
FetchSourceContext(fsc). FetchSourceContext(fsc).
Query(q). // specify the query Query(q). // specify the query
From(from).Size(size) From(from).Size(size)
if in.LimitClaimsPerChannel != nil { //if in.LimitClaimsPerChannel != nil {
search = search.Collapse(collapse) // search = search.Collapse(collapse)
} //}
for _, x := range orderBy { for _, x := range orderBy {
log.Println(x.Field, x.IsAsc) log.Println(x.Field, x.IsAsc)
search = search.Sort(x.Field, x.IsAsc) search = search.Sort(x.Field, x.IsAsc)
@ -659,22 +525,26 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs,
log.Printf("%s: found %d results in %dms\n", in.Text, len(searchResult.Hits.Hits), searchResult.TookInMillis) log.Printf("%s: found %d results in %dms\n", in.Text, len(searchResult.Hits.Hits), searchResult.TookInMillis)
var txos []*pb.Output var txos []*pb.Output
var records []record
if in.LimitClaimsPerChannel == nil { //if in.LimitClaimsPerChannel == nil {
txos = make([]*pb.Output, len(searchResult.Hits.Hits)) if true {
txos = make([]*pb.Output, searchResult.TotalHits())
records = make([]record, 0, searchResult.TotalHits())
var r record var r record
for i, item := range searchResult.Each(reflect.TypeOf(r)) { for _, item := range searchResult.Each(reflect.TypeOf(r)) {
if t, ok := item.(record); ok { if t, ok := item.(record); ok {
txos[i] = &pb.Output{ records = append(records, t)
TxHash: util.ToHash(t.Txid), //txos[i] = &pb.Output{
Nout: t.Nout, // TxHash: util.ToHash(t.Txid),
Height: t.Height, // Nout: t.Nout,
} // Height: t.Height,
//}
} }
} }
} else { } else {
records := make([]record, 0, len(searchResult.Hits.Hits) * int(in.LimitClaimsPerChannel.Value)) records = make([]record, 0, len(searchResult.Hits.Hits) * int(in.LimitClaimsPerChannel.Value))
txos = make([]*pb.Output, 0, len(searchResult.Hits.Hits) * int(in.LimitClaimsPerChannel.Value)) txos = make([]*pb.Output, 0, len(searchResult.Hits.Hits) * int(in.LimitClaimsPerChannel.Value))
var i = 0 var i = 0
for _, hit := range searchResult.Hits.Hits { for _, hit := range searchResult.Hits.Hits {
@ -705,6 +575,14 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs,
} }
} }
for i, t := range records {
txos[i] = &pb.Output{
TxHash: util.ToHash(t.Txid),
Nout: t.Nout,
Height: t.Height,
}
}
// or if you want more control // or if you want more control
//for _, hit := range searchResult.Hits.Hits { //for _, hit := range searchResult.Hits.Hits {
// // hit.Index contains the name of the index // // hit.Index contains the name of the index