Claim search port #5

Merged
jeffreypicard merged 32 commits from claim_search_port into master 2021-06-23 17:07:41 +02:00
3 changed files with 252 additions and 236 deletions
Showing only changes of commit 3f64655a26 - Show all commits

View file

@ -13,7 +13,6 @@ import (
"github.com/akamensky/argparse"
pb "github.com/lbryio/hub/protobuf/go"
"github.com/lbryio/hub/server"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
)

View file

@ -5,21 +5,25 @@ import (
"encoding/hex"
"encoding/json"
"fmt"
"log"
"math"
"reflect"
"strings"
//"github.com/lbryio/hub/schema"
"github.com/btcsuite/btcutil/base58"
"github.com/golang/protobuf/ptypes/wrappers"
pb "github.com/lbryio/hub/protobuf/go"
"math"
//"github.com/lbryio/hub/schema"
"github.com/lbryio/hub/util"
"github.com/olivere/elastic/v7"
"golang.org/x/text/cases"
"golang.org/x/text/language"
"gopkg.in/karalabe/cookiejar.v1/collections/deque"
"log"
"reflect"
"strings"
)
const DefaultSearchSize = 1000
type record struct {
Txid string `json:"tx_id"`
Nout uint32 `json:"tx_nout"`
@ -60,29 +64,26 @@ func StrArrToInterface(arr []string) []interface{} {
return searchVals
}
func AddTermsField(arr []string, name string, q *elastic.BoolQuery) *elastic.BoolQuery {
if len(arr) > 0 {
searchVals := StrArrToInterface(arr)
return q.Must(elastic.NewTermsQuery(name, searchVals...))
}
return q
}
func AddIndividualTermFields(arr []string, name string, q *elastic.BoolQuery, invert bool) *elastic.BoolQuery {
if len(arr) > 0 {
for _, x := range arr {
if invert {
q = q.MustNot(elastic.NewTermQuery(name, x))
} else {
q = q.Must(elastic.NewTermQuery(name, x))
}
}
func AddTermsField(q *elastic.BoolQuery, arr []string, name string) *elastic.BoolQuery {
if len(arr) == 0 {
return q
}
searchVals := StrArrToInterface(arr)
return q.Must(elastic.NewTermsQuery(name, searchVals...))
}
func AddIndividualTermFields(q *elastic.BoolQuery, arr []string, name string, invert bool) *elastic.BoolQuery {
for _, x := range arr {
if invert {
q = q.MustNot(elastic.NewTermQuery(name, x))
} else {
q = q.Must(elastic.NewTermQuery(name, x))
}
}
return q
}
func AddRangeField(rq *pb.RangeField, name string, q *elastic.BoolQuery) *elastic.BoolQuery {
func AddRangeField(q *elastic.BoolQuery, rq *pb.RangeField, name string) *elastic.BoolQuery {
if rq == nil {
return q
}
@ -91,7 +92,7 @@ func AddRangeField(rq *pb.RangeField, name string, q *elastic.BoolQuery) *elasti
if rq.Op != pb.RangeField_EQ {
return q
}
return AddTermsField(rq.Value, name, q)
return AddTermsField(q, rq.Value, name)
}
if rq.Op == pb.RangeField_EQ {
return q.Must(elastic.NewTermQuery(name, rq.Value[0]))
@ -106,7 +107,7 @@ func AddRangeField(rq *pb.RangeField, name string, q *elastic.BoolQuery) *elasti
}
}
func AddInvertibleField(field *pb.InvertibleField, name string, q *elastic.BoolQuery) *elastic.BoolQuery {
func AddInvertibleField(q *elastic.BoolQuery, field *pb.InvertibleField, name string) *elastic.BoolQuery {
if field == nil {
return q
}
@ -122,26 +123,6 @@ func AddInvertibleField(field *pb.InvertibleField, name string, q *elastic.BoolQ
}
}
func (s *Server) normalizeTag(tag string) string {
c := cases.Lower(language.English)
res := s.MultiSpaceRe.ReplaceAll(
s.WeirdCharsRe.ReplaceAll(
[]byte(strings.TrimSpace(strings.Replace(c.String(tag), "'", "", -1))),
[]byte(" ")),
[]byte(" "))
return string(res)
}
func (s *Server) cleanTags(tags []string) []string {
cleanedTags := make([]string, len(tags))
for i, tag := range tags {
cleanedTags[i] = s.normalizeTag(tag)
}
return cleanedTags
}
// Search /*
// Search logic is as follows:
// 1) Setup query with params given
@ -168,6 +149,182 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs,
client = s.EsClient
}
var from = 0
var pageSize = 10
var orderBy []orderField
var searchIndices = []string{}
q := elastic.NewBoolQuery()
q = s.setupEsQuery(q, in, &pageSize, &from, &orderBy)
if s.Args.Dev && len(in.SearchIndices) == 0 {
// If we're running in dev mode ignore the mainnet claims index
indices, err := client.IndexNames()
if err != nil {
log.Fatalln(err)
}
var numIndices = len(indices)
searchIndices = make([]string, 0, numIndices)
for i := 0; i < numIndices; i++ {
if indices[i] == "claims" {
continue
}
searchIndices = append(searchIndices, indices[i])
}
}
if len(in.SearchIndices) > 0 {
searchIndices = in.SearchIndices
}
fsc := elastic.NewFetchSourceContext(true).Exclude("description", "title")
search := client.Search().
Index(searchIndices...).
FetchSourceContext(fsc).
Query(q). // specify the query
From(0).Size(DefaultSearchSize)
for _, x := range orderBy {
search = search.Sort(x.Field, x.IsAsc)
}
searchResult, err := search.Do(ctx) // execute
if err != nil {
log.Println(err)
return nil, err
}
log.Printf("%s: found %d results in %dms\n", in.Text, len(searchResult.Hits.Hits), searchResult.TookInMillis)
txos, extraTxos, blocked := s.postProcessResults(ctx, client, searchResult, in, pageSize, from, searchIndices)
if in.NoTotals != nil && !in.NoTotals.Value {
return &pb.Outputs{
Txos: txos,
ExtraTxos: extraTxos,
Offset: uint32(int64(from) + searchResult.TotalHits()),
Blocked: blocked,
}, nil
}
var blockedTotal uint32 = 0
for _, b := range blocked {
blockedTotal += b.Count
}
return &pb.Outputs{
Txos: txos,
ExtraTxos: extraTxos,
Total: uint32(searchResult.TotalHits()),
Offset: uint32(int64(from) + searchResult.TotalHits()),
Blocked: blocked,
BlockedTotal: blockedTotal,
}, nil
lyoshenka commented 2021-06-15 23:59:58 +02:00 (Migrated from github.com)
Review

by convention ctx is generally the first param

by convention `ctx` is generally the first param
jeffreypicard commented 2021-06-18 01:31:54 +02:00 (Migrated from github.com)
Review

Makes sense

Makes sense
}
func (s *Server) normalizeTag(tag string) string {
c := cases.Lower(language.English)
res := s.MultiSpaceRe.ReplaceAll(
s.WeirdCharsRe.ReplaceAll(
[]byte(strings.TrimSpace(strings.Replace(c.String(tag), "'", "", -1))),
[]byte(" ")),
[]byte(" "))
return string(res)
}
func (s *Server) cleanTags(tags []string) []string {
cleanedTags := make([]string, len(tags))
for i, tag := range tags {
cleanedTags[i] = s.normalizeTag(tag)
}
return cleanedTags
}
func (s *Server) postProcessResults(
ctx context.Context,
client *elastic.Client,
searchResult *elastic.SearchResult,
in *pb.SearchRequest,
pageSize int,
from int,
searchIndices []string) ([]*pb.Output, []*pb.Output, []*pb.Blocked) {
var txos []*pb.Output
var records []*record
var blockedRecords []*record
var blocked []*pb.Blocked
var blockedMap map[string]*pb.Blocked
records = make([]*record, 0, searchResult.TotalHits())
var r record
for _, item := range searchResult.Each(reflect.TypeOf(r)) {
if t, ok := item.(record); ok {
records = append(records, &t)
}
}
//printJsonFullResults(searchResult)
records, blockedRecords, blockedMap = removeBlocked(records)
if in.RemoveDuplicates != nil {
records = removeDuplicates(records)
}
if in.LimitClaimsPerChannel != nil && in.LimitClaimsPerChannel.Value > 0 {
records = searchAhead(records, pageSize, int(in.LimitClaimsPerChannel.Value))
}
finalLength := int(math.Min(float64(len(records)), float64(pageSize)))
txos = make([]*pb.Output, 0, finalLength)
var j = 0
for i := from; i < from + finalLength && i < len(records) && j < finalLength; i++ {
t := records[i]
res := t.recordToOutput()
txos = append(txos, res)
j += 1
}
//printJsonFullRecords(blockedRecords)
//Get claims for reposts
repostClaims, repostRecords, repostedMap := getClaimsForReposts(ctx, client, records, searchIndices)
//get all unique channels
channels, channelMap := getUniqueChannels(append(append(records, repostRecords...), blockedRecords...), client, ctx, searchIndices)
//add these to extra txos
extraTxos := append(repostClaims, channels...)
//Fill in channel / repost data for txos and blocked
for i, txo := range txos {
channel, cOk := channelMap[records[i].ChannelId]
repostClaim, rOk := repostedMap[records[i].RepostedClaimId]
if cOk {
txo.GetClaim().Channel = channel
}
if rOk {
txo.GetClaim().Repost = repostClaim
}
}
blocked = make([]*pb.Blocked, 0, len(blockedMap))
for k, v := range blockedMap {
if channel, ok := channelMap[k]; ok {
v.Channel = channel
}
blocked = append(blocked, v)
}
return txos, extraTxos, blocked
}
func (s *Server) setupEsQuery(
q *elastic.BoolQuery,
in *pb.SearchRequest,
from *int,
pageSize *int,
orderBy *[]orderField) *elastic.BoolQuery {
claimTypes := map[string]int {
"stream": 1,
"channel": 2,
@ -212,21 +369,6 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs,
"tags": true,
}
var from = 0
var pageSize = 10
var orderBy []orderField
// Ping the Elasticsearch server to get e.g. the version number
//_, code, err := client.Ping("http://127.0.0.1:9200").Do(ctx)
//if err != nil {
// return nil, err
//}
//if code != 200 {
// return nil, errors.New("ping failed")
//}
q := elastic.NewBoolQuery()
if in.IsControlling != nil {
q = q.Must(elastic.NewTermQuery("is_controlling", in.IsControlling.Value))
}
@ -238,11 +380,11 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs,
}
if in.Limit != nil {
pageSize = int(in.Limit.Value)
*pageSize = int(in.Limit.Value)
}
if in.Offset != nil {
from = int(in.Offset.Value)
*from = int(in.Offset.Value)
}
if len(in.Name) > 0 {
@ -270,7 +412,7 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs,
if _, ok := textFields[toAppend]; ok {
toAppend = toAppend + ".keyword"
}
orderBy = append(orderBy, orderField{toAppend, isAsc})
*orderBy = append(*orderBy, orderField{toAppend, isAsc})
}
}
@ -349,53 +491,53 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs,
q = q.Must(elastic.NewTermQuery("tx_nout", in.TxNout.Value))
}
q = AddTermsField(in.PublicKeyHash, "public_key_hash.keyword", q)
q = AddTermsField(in.Author, "author.keyword", q)
q = AddTermsField(in.Title, "title.keyword", q)
q = AddTermsField(in.CanonicalUrl, "canonical_url.keyword", q)
q = AddTermsField(in.ClaimName, "claim_name.keyword", q)
q = AddTermsField(in.Description, "description.keyword", q)
q = AddTermsField(in.MediaType, "media_type.keyword", q)
q = AddTermsField(in.Normalized, "normalized.keyword", q)
q = AddTermsField(in.PublicKeyBytes, "public_key_bytes.keyword", q)
q = AddTermsField(in.ShortUrl, "short_url.keyword", q)
q = AddTermsField(in.Signature, "signature.keyword", q)
q = AddTermsField(in.SignatureDigest, "signature_digest.keyword", q)
q = AddTermsField(in.TxId, "tx_id.keyword", q)
q = AddTermsField(in.FeeCurrency, "fee_currency.keyword", q)
q = AddTermsField(in.RepostedClaimId, "reposted_claim_id.keyword", q)
q = AddTermsField(q, in.PublicKeyHash, "public_key_hash.keyword")
q = AddTermsField(q, in.Author, "author.keyword")
q = AddTermsField(q, in.Title, "title.keyword")
q = AddTermsField(q, in.CanonicalUrl, "canonical_url.keyword")
q = AddTermsField(q, in.ClaimName, "claim_name.keyword")
q = AddTermsField(q, in.Description, "description.keyword")
q = AddTermsField(q, in.MediaType, "media_type.keyword")
q = AddTermsField(q, in.Normalized, "normalized.keyword")
q = AddTermsField(q, in.PublicKeyBytes, "public_key_bytes.keyword")
q = AddTermsField(q, in.ShortUrl, "short_url.keyword")
q = AddTermsField(q, in.Signature, "signature.keyword")
q = AddTermsField(q, in.SignatureDigest, "signature_digest.keyword")
q = AddTermsField(q, in.TxId, "tx_id.keyword")
q = AddTermsField(q, in.FeeCurrency, "fee_currency.keyword")
q = AddTermsField(q, in.RepostedClaimId, "reposted_claim_id.keyword")
q = AddTermsField(s.cleanTags(in.AnyTags), "tags.keyword", q)
q = AddIndividualTermFields(s.cleanTags(in.AllTags), "tags.keyword", q, false)
q = AddIndividualTermFields(s.cleanTags(in.NotTags), "tags.keyword", q, true)
q = AddTermsField(in.AnyLanguages, "languages", q)
q = AddIndividualTermFields(in.AllLanguages, "languages", q, false)
q = AddTermsField(q, s.cleanTags(in.AnyTags), "tags.keyword")
q = AddIndividualTermFields(q, s.cleanTags(in.AllTags), "tags.keyword", false)
q = AddIndividualTermFields(q, s.cleanTags(in.NotTags), "tags.keyword", true)
q = AddTermsField(q, in.AnyLanguages, "languages")
q = AddIndividualTermFields(q, in.AllLanguages, "languages", false)
q = AddInvertibleField(in.ChannelId, "channel_id.keyword", q)
q = AddInvertibleField(in.ChannelIds, "channel_id.keyword", q)
q = AddInvertibleField(q, in.ChannelId, "channel_id.keyword")
q = AddInvertibleField(q, in.ChannelIds, "channel_id.keyword")
q = AddRangeField(in.TxPosition, "tx_position", q)
q = AddRangeField(in.Amount, "amount", q)
q = AddRangeField(in.Timestamp, "timestamp", q)
q = AddRangeField(in.CreationTimestamp, "creation_timestamp", q)
q = AddRangeField(in.Height, "height", q)
q = AddRangeField(in.CreationHeight, "creation_height", q)
q = AddRangeField(in.ActivationHeight, "activation_height", q)
q = AddRangeField(in.ExpirationHeight, "expiration_height", q)
q = AddRangeField(in.ReleaseTime, "release_time", q)
q = AddRangeField(in.Reposted, "reposted", q)
q = AddRangeField(in.FeeAmount, "fee_amount", q)
q = AddRangeField(in.Duration, "duration", q)
q = AddRangeField(in.CensorType, "censor_type", q)
q = AddRangeField(in.ChannelJoin, "channel_join", q)
q = AddRangeField(in.EffectiveAmount, "effective_amount", q)
q = AddRangeField(in.SupportAmount, "support_amount", q)
q = AddRangeField(in.TrendingGroup, "trending_group", q)
q = AddRangeField(in.TrendingMixed, "trending_mixed", q)
q = AddRangeField(in.TrendingLocal, "trending_local", q)
q = AddRangeField(in.TrendingGlobal, "trending_global", q)
q = AddRangeField(q, in.TxPosition, "tx_position")
q = AddRangeField(q, in.Amount, "amount")
q = AddRangeField(q, in.Timestamp, "timestamp")
q = AddRangeField(q, in.CreationTimestamp, "creation_timestamp")
q = AddRangeField(q, in.Height, "height")
q = AddRangeField(q, in.CreationHeight, "creation_height")
q = AddRangeField(q, in.ActivationHeight, "activation_height")
q = AddRangeField(q, in.ExpirationHeight, "expiration_height")
q = AddRangeField(q, in.ReleaseTime, "release_time")
q = AddRangeField(q, in.Reposted, "reposted")
q = AddRangeField(q, in.FeeAmount, "fee_amount")
q = AddRangeField(q, in.Duration, "duration")
q = AddRangeField(q, in.CensorType, "censor_type")
q = AddRangeField(q, in.ChannelJoin, "channel_join")
q = AddRangeField(q, in.EffectiveAmount, "effective_amount")
q = AddRangeField(q, in.SupportAmount, "support_amount")
q = AddRangeField(q, in.TrendingGroup, "trending_group")
q = AddRangeField(q, in.TrendingMixed, "trending_mixed")
q = AddRangeField(q, in.TrendingLocal, "trending_local")
q = AddRangeField(q, in.TrendingGlobal, "trending_global")
if in.Text != "" {
textQuery := elastic.NewSimpleQueryStringQuery(in.Text).
@ -409,133 +551,7 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs,
q = q.Must(textQuery)
}
var searchIndices = []string{}
if s.Args.Dev && len(in.SearchIndices) == 0 {
// If we're running in dev mode ignore the mainnet claims index
indices, err := client.IndexNames()
if err != nil {
log.Fatalln(err)
}
var numIndices = len(indices)
searchIndices = make([]string, 0, numIndices)
for i := 0; i < numIndices; i++ {
if indices[i] == "claims" {
continue
}
searchIndices = append(searchIndices, indices[i])
}
}
if len(in.SearchIndices) > 0 {
searchIndices = in.SearchIndices
}
fsc := elastic.NewFetchSourceContext(true).Exclude("description", "title")//.Include("_id")
search := client.Search().
Index(searchIndices...).
FetchSourceContext(fsc).
Query(q). // specify the query
From(0).Size(1000)
for _, x := range orderBy {
search = search.Sort(x.Field, x.IsAsc)
}
searchResult, err := search.Do(ctx) // execute
if err != nil {
log.Println(err)
return nil, err
}
log.Printf("%s: found %d results in %dms\n", in.Text, len(searchResult.Hits.Hits), searchResult.TookInMillis)
var txos []*pb.Output
var records []*record
var blockedRecords []*record
var blocked []*pb.Blocked
var blockedMap map[string]*pb.Blocked
records = make([]*record, 0, searchResult.TotalHits())
var r record
for _, item := range searchResult.Each(reflect.TypeOf(r)) {
if t, ok := item.(record); ok {
records = append(records, &t)
}
}
//printJsonFullResults(searchResult)
records, blockedRecords, blockedMap = removeBlocked(records)
if in.RemoveDuplicates != nil {
records = removeDuplicates(records)
}
if in.LimitClaimsPerChannel != nil && in.LimitClaimsPerChannel.Value > 0 {
records = searchAhead(records, pageSize, int(in.LimitClaimsPerChannel.Value))
}
finalLength := int(math.Min(float64(len(records)), float64(pageSize)))
txos = make([]*pb.Output, 0, finalLength)
var j = 0
for i := from; i < from + finalLength && i < len(records) && j < finalLength; i++ {
t := records[i]
res := t.recordToOutput()
txos = append(txos, res)
j += 1
}
//printJsonFullRecords(blockedRecords)
//Get claims for reposts
repostClaims, repostRecords, repostedMap := getClaimsForReposts(records, client, ctx, searchIndices)
//get all unique channels
channels, channelMap := getUniqueChannels(append(append(records, repostRecords...), blockedRecords...), client, ctx, searchIndices)
//add these to extra txos
extraTxos := append(repostClaims, channels...)
//Fill in channel / repost data for txos and blocked
for i, txo := range txos {
channel, cOk := channelMap[records[i].ChannelId]
repostClaim, rOk := repostedMap[records[i].RepostedClaimId]
if cOk {
txo.GetClaim().Channel = channel
}
if rOk {
txo.GetClaim().Repost = repostClaim
}
}
blocked = make([]*pb.Blocked, 0, len(blockedMap))
for k, v := range blockedMap {
if channel, ok := channelMap[k]; ok {
v.Channel = channel
}
blocked = append(blocked, v)
}
if in.NoTotals != nil && !in.NoTotals.Value {
return &pb.Outputs{
Txos: txos,
ExtraTxos: extraTxos,
Offset: uint32(int64(from) + searchResult.TotalHits()),
Blocked: blocked,
}, nil
}
var blockedTotal uint32 = 0
for _, b := range blocked {
blockedTotal += b.Count
}
return &pb.Outputs{
Txos: txos,
ExtraTxos: extraTxos,
Total: uint32(searchResult.TotalHits()),
Offset: uint32(int64(from) + searchResult.TotalHits()),
Blocked: blocked,
BlockedTotal: blockedTotal,
}, nil
return q
}
func getUniqueChannels(records []*record, client *elastic.Client, ctx context.Context, searchIndices []string) ([]*pb.Output, map[string]*pb.Output) {
@ -588,7 +604,7 @@ func getUniqueChannels(records []*record, client *elastic.Client, ctx context.Co
return channelTxos, channels
}
func getClaimsForReposts(records []*record, client *elastic.Client, ctx context.Context, searchIndices []string) ([]*pb.Output, []*record, map[string]*pb.Output) {
func getClaimsForReposts(ctx context.Context, client *elastic.Client, records []*record, searchIndices []string) ([]*pb.Output, []*record, map[string]*pb.Output) {
var totalReposted = 0
var mget = client.Mget()//.StoredFields("_id")

View file

@ -1,11 +1,12 @@
package server
import (
"log"
"regexp"
pb "github.com/lbryio/hub/protobuf/go"
"github.com/olivere/elastic/v7"
"google.golang.org/grpc"
"log"
"regexp"
)
type Server struct {