diff --git a/server/search.go b/server/search.go index bfd97d7..830e297 100644 --- a/server/search.go +++ b/server/search.go @@ -3,6 +3,8 @@ package server import ( "context" "encoding/hex" + "encoding/json" + "fmt" "github.com/btcsuite/btcutil/base58" "github.com/golang/protobuf/ptypes/wrappers" pb "github.com/lbryio/hub/protobuf/go" @@ -140,6 +142,17 @@ func (s *Server) cleanTags(tags []string) []string { return cleanedTags } +// Search /* +// Search logic is as follows: +// 1) Setup query with params given +// 2) Do query with limit of 1000 +// 3) remove blocked content (these are returned separately) +// 4) remove duplicates (these are not returned) +// 5) limit claims per channel logic +// 6) get claims referenced by reposts +// 7) get channels references by claims and repost claims +// 8) return streams referenced by repost and all channel referenced in extra_txos +//*/ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs, error) { var client *elastic.Client = nil if s.EsClient == nil { @@ -397,6 +410,7 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs, var searchIndices = []string{} if s.Args.Dev { + // If we're running in dev mode ignore the mainnet claims index indices, err := client.IndexNames() if err != nil { log.Fatalln(err) @@ -423,6 +437,7 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs, Query(q). // specify the query From(0).Size(1000) + for _, x := range orderBy { search = search.Sort(x.Field, x.IsAsc) } @@ -467,30 +482,20 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs, j += 1 } - //// or if you want more control - //for _, hit := range searchResult.Hits.Hits { - // // hit.Index contains the name of the index - // - // var t map[string]interface{} // or could be a Record - // err := json.Unmarshal(hit.Source, &t) - // if err != nil { - // return nil, err - // } - // - // b, err := json.MarshalIndent(t, "", " ") - // if err != nil { - // fmt.Println("error:", err) - // } - // fmt.Println(string(b)) - // //for k := range t { - // // fmt.Println(k) - // //} - // //return nil, nil - //} + //printJsonFullResults(searchResult) + + //Get claims for reposts + repostClaims, repostRecords := getClaimsForReposts(records, client, ctx, searchIndices) + //get all unique channels + channels := getUniqueChannels(append(records, repostRecords...)) + //add these to extra txos + extraTxos := append(repostClaims, channels...) + //extraTxos := repostClaims if in.NoTotals != nil && !in.NoTotals.Value { return &pb.Outputs{ Txos: txos, + ExtraTxos: extraTxos, Offset: uint32(int64(from) + searchResult.TotalHits()), Blocked: blocked, }, nil @@ -502,6 +507,7 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs, } return &pb.Outputs{ Txos: txos, + ExtraTxos: extraTxos, Total: uint32(searchResult.TotalHits()), Offset: uint32(int64(from) + searchResult.TotalHits()), Blocked: blocked, @@ -509,14 +515,66 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs, }, nil } - -func sumCounters(channelCounters map[string]int) int { - var sum int = 0 - for _, v := range channelCounters { - sum += v +func getUniqueChannels(records []*record) []*pb.Output { + channelTxos := make([]*pb.Output, 0, len(records)) + channels := make(map[string]bool) + for _, r := range records { + if r.ChannelId != "" && !channels[r.ChannelId] { + //txo := &pb.Output{ + // TxHash: util.ToHash(r.ChannelId), + //} + txo := &pb.Output{ + TxHash: util.ToHash(r.Txid), + //Meta: &pb.Output_Claim{ + // Claim: &pb.ClaimMeta{ + // Channel: r.recordToOutput(), + // }, + //}, + } + channelTxos = append(channelTxos, txo) + channels[r.ChannelId] = true + } } - return sum + return channelTxos +} + +func getClaimsForReposts(records []*record, client *elastic.Client, ctx context.Context, searchIndices []string) ([]*pb.Output, []*record) { + + var totalReposted = 0 + var mget = client.Mget() + for _, r := range records { + if r.Reposted > 0 { + var nmget = elastic.NewMultiGetItem().Id(r.RepostedClaimId) + for _, index := range searchIndices { + nmget = nmget.Index(index) + } + mget = mget.Add(nmget) + totalReposted++ + } + } + + res, err := mget.Do(ctx) + if err != nil { + log.Println(err) + return []*pb.Output{}, []*record{} + } + + claims := make([]*pb.Output, totalReposted) + repostedRecords := make([]*record, totalReposted) + + log.Println("reposted records", totalReposted) + for i, doc := range res.Docs { + var r record + err := json.Unmarshal(doc.Source, &r) + if err != nil { + return []*pb.Output{}, []*record{} + } + claims[i] = r.recordToOutput() + repostedRecords[i] = &r + } + + return claims, repostedRecords } func searchAhead(searchHits []*record, pageSize int, perChannelPerPage int) []*record { @@ -656,3 +714,22 @@ func removeBlocked(searchHits []*record, blocked *[]*pb.Blocked) []*record { return newHits } + +func printJsonFullResults(searchResult *elastic.SearchResult) { + // or if you want more control + for _, hit := range searchResult.Hits.Hits { + // hit.Index contains the name of the index + + var t map[string]interface{} // or could be a Record + err := json.Unmarshal(hit.Source, &t) + if err != nil { + return + } + + b, err := json.MarshalIndent(t, "", " ") + if err != nil { + fmt.Println("error:", err) + } + fmt.Println(string(b)) + } +} \ No newline at end of file