get claims for reposts and channels with mget

This commit is contained in:
Jeffrey Picard 2021-06-11 08:34:27 -04:00
parent acf7ce671d
commit af9b5de49d

View file

@ -3,6 +3,8 @@ package server
import (
"context"
"encoding/hex"
"encoding/json"
"fmt"
"github.com/btcsuite/btcutil/base58"
"github.com/golang/protobuf/ptypes/wrappers"
pb "github.com/lbryio/hub/protobuf/go"
@ -140,6 +142,17 @@ func (s *Server) cleanTags(tags []string) []string {
return cleanedTags
}
// Search /*
// Search logic is as follows:
// 1) Setup query with params given
// 2) Do query with limit of 1000
// 3) remove blocked content (these are returned separately)
// 4) remove duplicates (these are not returned)
// 5) limit claims per channel logic
// 6) get claims referenced by reposts
// 7) get channels references by claims and repost claims
// 8) return streams referenced by repost and all channel referenced in extra_txos
//*/
func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs, error) {
var client *elastic.Client = nil
if s.EsClient == nil {
@ -397,6 +410,7 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs,
var searchIndices = []string{}
if s.Args.Dev {
// If we're running in dev mode ignore the mainnet claims index
indices, err := client.IndexNames()
if err != nil {
log.Fatalln(err)
@ -423,6 +437,7 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs,
Query(q). // specify the query
From(0).Size(1000)
for _, x := range orderBy {
search = search.Sort(x.Field, x.IsAsc)
}
@ -467,30 +482,20 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs,
j += 1
}
//// or if you want more control
//for _, hit := range searchResult.Hits.Hits {
// // hit.Index contains the name of the index
//
// var t map[string]interface{} // or could be a Record
// err := json.Unmarshal(hit.Source, &t)
// if err != nil {
// return nil, err
// }
//
// b, err := json.MarshalIndent(t, "", " ")
// if err != nil {
// fmt.Println("error:", err)
// }
// fmt.Println(string(b))
// //for k := range t {
// // fmt.Println(k)
// //}
// //return nil, nil
//}
//printJsonFullResults(searchResult)
//Get claims for reposts
repostClaims, repostRecords := getClaimsForReposts(records, client, ctx, searchIndices)
//get all unique channels
channels := getUniqueChannels(append(records, repostRecords...))
//add these to extra txos
extraTxos := append(repostClaims, channels...)
//extraTxos := repostClaims
if in.NoTotals != nil && !in.NoTotals.Value {
return &pb.Outputs{
Txos: txos,
ExtraTxos: extraTxos,
Offset: uint32(int64(from) + searchResult.TotalHits()),
Blocked: blocked,
}, nil
@ -502,6 +507,7 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs,
}
return &pb.Outputs{
Txos: txos,
ExtraTxos: extraTxos,
Total: uint32(searchResult.TotalHits()),
Offset: uint32(int64(from) + searchResult.TotalHits()),
Blocked: blocked,
@ -509,14 +515,66 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs,
}, nil
}
func sumCounters(channelCounters map[string]int) int {
var sum int = 0
for _, v := range channelCounters {
sum += v
func getUniqueChannels(records []*record) []*pb.Output {
channelTxos := make([]*pb.Output, 0, len(records))
channels := make(map[string]bool)
for _, r := range records {
if r.ChannelId != "" && !channels[r.ChannelId] {
//txo := &pb.Output{
// TxHash: util.ToHash(r.ChannelId),
//}
txo := &pb.Output{
TxHash: util.ToHash(r.Txid),
//Meta: &pb.Output_Claim{
// Claim: &pb.ClaimMeta{
// Channel: r.recordToOutput(),
// },
//},
}
channelTxos = append(channelTxos, txo)
channels[r.ChannelId] = true
}
}
return sum
return channelTxos
}
func getClaimsForReposts(records []*record, client *elastic.Client, ctx context.Context, searchIndices []string) ([]*pb.Output, []*record) {
var totalReposted = 0
var mget = client.Mget()
for _, r := range records {
if r.Reposted > 0 {
var nmget = elastic.NewMultiGetItem().Id(r.RepostedClaimId)
for _, index := range searchIndices {
nmget = nmget.Index(index)
}
mget = mget.Add(nmget)
totalReposted++
}
}
res, err := mget.Do(ctx)
if err != nil {
log.Println(err)
return []*pb.Output{}, []*record{}
}
claims := make([]*pb.Output, totalReposted)
repostedRecords := make([]*record, totalReposted)
log.Println("reposted records", totalReposted)
for i, doc := range res.Docs {
var r record
err := json.Unmarshal(doc.Source, &r)
if err != nil {
return []*pb.Output{}, []*record{}
}
claims[i] = r.recordToOutput()
repostedRecords[i] = &r
}
return claims, repostedRecords
}
func searchAhead(searchHits []*record, pageSize int, perChannelPerPage int) []*record {
@ -656,3 +714,22 @@ func removeBlocked(searchHits []*record, blocked *[]*pb.Blocked) []*record {
return newHits
}
func printJsonFullResults(searchResult *elastic.SearchResult) {
// or if you want more control
for _, hit := range searchResult.Hits.Hits {
// hit.Index contains the name of the index
var t map[string]interface{} // or could be a Record
err := json.Unmarshal(hit.Source, &t)
if err != nil {
return
}
b, err := json.MarshalIndent(t, "", " ")
if err != nil {
fmt.Println("error:", err)
}
fmt.Println(string(b))
}
}