Merge pull request #10 from lbryio/search_fixes

search fixes
This commit is contained in:
Victor Shyba 2021-08-23 14:01:44 -03:00 committed by GitHub
commit d93fe2ee8d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 757 additions and 748 deletions

52
main.go
View file

@ -18,13 +18,13 @@ import (
) )
const ( const (
defaultHost = "0.0.0.0" defaultHost = "0.0.0.0"
defaultPort = "50051" defaultPort = "50051"
defaultEsHost = "http://localhost" defaultEsHost = "http://localhost"
defaultEsPort = "9200" defaultEsIndex = "claims"
defaultEsPort = "9200"
) )
func GetEnvironment(data []string, getkeyval func(item string) (key, val string)) map[string]string { func GetEnvironment(data []string, getkeyval func(item string) (key, val string)) map[string]string {
items := make(map[string]string) items := make(map[string]string)
for _, item := range data { for _, item := range data {
@ -49,12 +49,13 @@ func parseArgs(searchRequest *pb.SearchRequest) *server.Args {
parser := argparse.NewParser("hub", "hub server and client") parser := argparse.NewParser("hub", "hub server and client")
serveCmd := parser.NewCommand("serve", "start the hub server") serveCmd := parser.NewCommand("serve", "start the hub server")
debug := parser.Flag("", "debug", &argparse.Options{Required: false, Help: "enable debug logging", Default: false})
host := parser.String("", "rpchost", &argparse.Options{Required: false, Help: "host", Default: defaultHost}) host := parser.String("", "rpchost", &argparse.Options{Required: false, Help: "RPC host", Default: defaultHost})
port := parser.String("", "rpcport", &argparse.Options{Required: false, Help: "port", Default: defaultPort}) port := parser.String("", "rpcport", &argparse.Options{Required: false, Help: "RPC port", Default: defaultPort})
esHost := parser.String("", "eshost", &argparse.Options{Required: false, Help: "host", Default: defaultEsHost}) esHost := parser.String("", "eshost", &argparse.Options{Required: false, Help: "elasticsearch host", Default: defaultEsHost})
esPort := parser.String("", "esport", &argparse.Options{Required: false, Help: "port", Default: defaultEsPort}) esPort := parser.String("", "esport", &argparse.Options{Required: false, Help: "elasticsearch port", Default: defaultEsPort})
dev := parser.Flag("", "dev", &argparse.Options{Required: false, Help: "port", Default: false}) esIndex := parser.String("", "esindex", &argparse.Options{Required: false, Help: "elasticsearch index name", Default: defaultEsIndex})
text := parser.String("", "text", &argparse.Options{Required: false, Help: "text query"}) text := parser.String("", "text", &argparse.Options{Required: false, Help: "text query"})
name := parser.String("", "name", &argparse.Options{Required: false, Help: "name"}) name := parser.String("", "name", &argparse.Options{Required: false, Help: "name"})
@ -72,14 +73,14 @@ func parseArgs(searchRequest *pb.SearchRequest) *server.Args {
log.Fatalln(parser.Usage(err)) log.Fatalln(parser.Usage(err))
} }
args := &server.Args{ args := &server.Args{
Serve: false, Serve: false,
Host: *host, Host: *host,
Port: ":" + *port, Port: ":" + *port,
EsHost: *esHost, EsHost: *esHost,
EsPort: *esPort, EsPort: *esPort,
Dev: *dev, EsIndex: *esIndex,
Debug: *debug,
} }
if esHost, ok := environment["ELASTIC_HOST"]; ok { if esHost, ok := environment["ELASTIC_HOST"]; ok {
@ -95,8 +96,8 @@ func parseArgs(searchRequest *pb.SearchRequest) *server.Args {
} }
/* /*
Verify no invalid argument combinations Verify no invalid argument combinations
*/ */
if len(*channelIds) > 0 && *channelId != "" { if len(*channelIds) > 0 && *channelId != "" {
log.Fatal("Cannot specify both channel_id and channel_ids") log.Fatal("Cannot specify both channel_id and channel_ids")
} }
@ -108,23 +109,23 @@ func parseArgs(searchRequest *pb.SearchRequest) *server.Args {
if *text != "" { if *text != "" {
searchRequest.Text = *text searchRequest.Text = *text
} }
if *name!= "" { if *name != "" {
searchRequest.Name = []string{*name} searchRequest.ClaimName = *name
} }
if *claimType != "" { if *claimType != "" {
searchRequest.ClaimType = []string{*claimType} searchRequest.ClaimType = []string{*claimType}
} }
if *id != "" { if *id != "" {
searchRequest.XId = [][]byte{[]byte(*id)} searchRequest.ClaimId = &pb.InvertibleField{Invert: false, Value: []string{*id}}
} }
if *author != "" { if *author != "" {
searchRequest.Author = []string{*author} searchRequest.Author = *author
} }
if *title != "" { if *title != "" {
searchRequest.Title = []string{*title} searchRequest.Title = *title
} }
if *description != "" { if *description != "" {
searchRequest.Description = []string{*description} searchRequest.Description = *description
} }
if *channelId != "" { if *channelId != "" {
searchRequest.ChannelId = &pb.InvertibleField{Invert: false, Value: []string{*channelId}} searchRequest.ChannelId = &pb.InvertibleField{Invert: false, Value: []string{*channelId}}
@ -174,7 +175,6 @@ func main() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second) ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel() defer cancel()
r, err := c.Search(ctx, searchRequest) r, err := c.Search(ctx, searchRequest)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)

View file

@ -1,7 +1,6 @@
syntax = "proto3"; syntax = "proto3";
option go_package = "github.com/lbryio/hub/protobuf/go/pb"; option go_package = "github.com/lbryio/hub/protobuf/go/pb";
import "google/protobuf/wrappers.proto";
import "result.proto"; import "result.proto";
package pb; package pb;
@ -15,6 +14,14 @@ message InvertibleField {
repeated string value = 2; repeated string value = 2;
} }
message BoolValue {
bool value = 1;
}
message UInt32Value {
uint32 value = 1;
}
message RangeField { message RangeField {
enum Op { enum Op {
EQ = 0; EQ = 0;
@ -28,69 +35,62 @@ message RangeField {
} }
message SearchRequest { message SearchRequest {
string text = 1; InvertibleField claim_id = 1;
repeated string name = 2; InvertibleField channel_id = 2;
.google.protobuf.Int32Value amount_order = 3; string text = 3;
.google.protobuf.Int32Value limit = 4; uint32 limit = 4;
repeated string order_by = 5; repeated string order_by = 5;
.google.protobuf.Int32Value offset = 6; uint32 offset = 6;
.google.protobuf.BoolValue is_controlling = 7; bool is_controlling = 7;
string last_take_over_height = 19; string last_take_over_height = 8;
InvertibleField claim_id = 20; string claim_name = 9;
repeated string claim_name = 22; string normalized_name = 10;
repeated string normalized = 23; RangeField tx_position = 11;
RangeField tx_position = 24; RangeField amount = 12;
RangeField amount = 25; RangeField timestamp = 13;
RangeField timestamp = 26; RangeField creation_timestamp = 14;
RangeField creation_timestamp = 27; RangeField height = 15;
RangeField height = 28; RangeField creation_height = 16;
RangeField creation_height = 29; RangeField activation_height = 17;
RangeField activation_height = 30; RangeField expiration_height = 18;
RangeField expiration_height = 31; RangeField release_time = 19;
RangeField release_time = 32; string short_url = 20;
repeated string short_url = 33; string canonical_url = 21;
repeated string canonical_url = 34; string title = 22;
repeated string title = 35; string author = 23;
repeated string author = 36; string description = 24;
repeated string description = 37; repeated string claim_type = 25;
repeated string claim_type = 38; RangeField repost_count = 26;
RangeField reposted = 39; repeated string stream_type = 27;
repeated string stream_type = 40; repeated string media_type = 28;
repeated string media_type = 41; RangeField fee_amount = 29;
RangeField fee_amount = 42; string fee_currency = 30;
repeated string fee_currency = 43; RangeField duration = 31;
RangeField duration = 44; string reposted_claim_id = 32;
string reposted_claim_hash = 45; RangeField censor_type = 33;
RangeField censor_type = 46; string claims_in_channel = 34;
string claims_in_channel = 47; RangeField channel_join = 35;
RangeField channel_join = 48; BoolValue is_signature_valid = 36;
.google.protobuf.BoolValue signature_valid = 49; RangeField effective_amount = 37;
RangeField effective_amount = 51; RangeField support_amount = 38;
RangeField support_amount = 52; RangeField trending_group = 39;
RangeField trending_group = 53; RangeField trending_mixed = 40;
RangeField trending_mixed = 54; RangeField trending_local = 41;
RangeField trending_local = 55; RangeField trending_global = 42;
RangeField trending_global = 56; string tx_id = 43;
InvertibleField channel_id = 57; UInt32Value tx_nout = 44;
InvertibleField channel_ids = 58; string signature = 45;
repeated string tx_id = 59; string signature_digest = 46;
.google.protobuf.Int32Value tx_nout = 60; string public_key_bytes = 47;
repeated string signature = 61; string public_key_id = 48;
repeated string signature_digest = 62; repeated string any_tags = 49;
repeated string public_key_bytes = 63; repeated string all_tags = 50;
repeated string public_key_hash = 64; repeated string not_tags = 51;
string public_key_id = 65; bool has_channel_signature = 52;
repeated bytes _id = 66; BoolValue has_source = 53;
repeated string any_tags = 67; uint32 limit_claims_per_channel = 54;
repeated string all_tags = 68; repeated string any_languages = 55;
repeated string not_tags = 69; repeated string all_languages = 56;
repeated string reposted_claim_id = 70; bool remove_duplicates = 57;
.google.protobuf.BoolValue has_channel_signature = 71; bool no_totals = 58;
.google.protobuf.BoolValue has_source = 72;
.google.protobuf.Int32Value limit_claims_per_channel = 73;
repeated string any_languages = 74;
repeated string all_languages = 75;
.google.protobuf.BoolValue remove_duplicates = 76;
.google.protobuf.BoolValue no_totals = 77;
repeated string search_indices = 78;
} }

File diff suppressed because it is too large Load diff

View file

@ -1,7 +1,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT. // Code generated by protoc-gen-go. DO NOT EDIT.
// versions: // versions:
// protoc-gen-go v1.26.0 // protoc-gen-go v1.26.0
// protoc v3.17.1 // protoc v3.17.3
// source: result.proto // source: result.proto
package pb package pb

View file

@ -4,16 +4,17 @@ import (
"context" "context"
"encoding/hex" "encoding/hex"
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"log" "log"
"math" "math"
"reflect" "reflect"
"strings" "strings"
"time"
//"github.com/lbryio/hub/schema" //"github.com/lbryio/hub/schema"
"github.com/btcsuite/btcutil/base58" "github.com/btcsuite/btcutil/base58"
"github.com/golang/protobuf/ptypes/wrappers"
pb "github.com/lbryio/hub/protobuf/go" pb "github.com/lbryio/hub/protobuf/go"
"github.com/lbryio/lbry.go/v2/extras/util" "github.com/lbryio/lbry.go/v2/extras/util"
"github.com/olivere/elastic/v7" "github.com/olivere/elastic/v7"
@ -25,30 +26,30 @@ import (
const DefaultSearchSize = 1000 const DefaultSearchSize = 1000
type record struct { type record struct {
Txid string `json:"tx_id"` Txid string `json:"tx_id"`
Nout uint32 `json:"tx_nout"` Nout uint32 `json:"tx_nout"`
Height uint32 `json:"height"` Height uint32 `json:"height"`
ClaimId string `json:"claim_id"` ClaimId string `json:"claim_id"`
ChannelId string `json:"channel_id"` ChannelId string `json:"channel_id"`
RepostedClaimId string `json:"reposted_claim_id"` RepostedClaimId string `json:"reposted_claim_id"`
CensorType uint32 `json:"censor_type"` CensorType uint32 `json:"censor_type"`
CensoringChannelHash string `json:"censoring_channel_hash"` CensoringChannelId string `json:"censoring_channel_id"`
ShortUrl string `json:"short_url"` ShortUrl string `json:"short_url"`
CanonicalUrl string `json:"canonical_url"` CanonicalUrl string `json:"canonical_url"`
IsControlling bool `json:"is_controlling"` IsControlling bool `json:"is_controlling"`
TakeOverHeight uint32 `json:"last_take_over_height"` TakeOverHeight uint32 `json:"last_take_over_height"`
CreationHeight uint32 `json:"creation_height"` CreationHeight uint32 `json:"creation_height"`
ActivationHeight uint32 `json:"activation_height"` ActivationHeight uint32 `json:"activation_height"`
ExpirationHeight uint32 `json:"expiration_height"` ExpirationHeight uint32 `json:"expiration_height"`
ClaimsInChannel uint32 `json:"claims_in_channel"` ClaimsInChannel uint32 `json:"claims_in_channel"`
Reposted uint32 `json:"reposted"` Reposted uint32 `json:"repost_count"`
EffectiveAmount uint64 `json:"effective_amount"` EffectiveAmount uint64 `json:"effective_amount"`
SupportAmount uint64 `json:"support_amount"` SupportAmount uint64 `json:"support_amount"`
TrendingGroup uint32 `json:"trending_group"` TrendingGroup uint32 `json:"trending_group"`
TrendingMixed float32 `json:"trending_mixed"` TrendingMixed float32 `json:"trending_mixed"`
TrendingLocal float32 `json:"trending_local"` TrendingLocal float32 `json:"trending_local"`
TrendingGlobal float32 `json:"trending_global"` TrendingGlobal float32 `json:"trending_global"`
Name string `json:"name"` Name string `json:"name"`
} }
type orderField struct { type orderField struct {
@ -72,6 +73,13 @@ func AddTermsField(q *elastic.BoolQuery, arr []string, name string) *elastic.Boo
return q.Must(elastic.NewTermsQuery(name, searchVals...)) return q.Must(elastic.NewTermsQuery(name, searchVals...))
} }
func AddTermField(q *elastic.BoolQuery, value string, name string) *elastic.BoolQuery {
if value != "" {
return q.Must(elastic.NewTermQuery(name, value))
}
return q
}
func AddIndividualTermFields(q *elastic.BoolQuery, arr []string, name string, invert bool) *elastic.BoolQuery { func AddIndividualTermFields(q *elastic.BoolQuery, arr []string, name string, invert bool) *elastic.BoolQuery {
for _, x := range arr { for _, x := range arr {
if invert { if invert {
@ -135,49 +143,22 @@ func AddInvertibleField(q *elastic.BoolQuery, field *pb.InvertibleField, name st
// 8) return streams referenced by repost and all channel referenced in extra_txos // 8) return streams referenced by repost and all channel referenced in extra_txos
//*/ //*/
func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs, error) { func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs, error) {
var client *elastic.Client = nil
if s.EsClient == nil {
esUrl := s.Args.EsHost + ":" + s.Args.EsPort
tmpClient, err := elastic.NewClient(elastic.SetURL(esUrl), elastic.SetSniff(false))
if err != nil {
log.Println(err)
return nil, err
}
client = tmpClient
s.EsClient = client
} else {
client = s.EsClient
}
var from = 0 var from = 0
var pageSize = 10 var pageSize = 10
var orderBy []orderField var orderBy []orderField
var searchIndices = []string{} var searchIndices = []string{}
client := s.EsClient
searchIndices = make([]string, 0, 1)
searchIndices = append(searchIndices, s.Args.EsIndex)
q := elastic.NewBoolQuery() q := elastic.NewBoolQuery()
err := s.checkQuery(in)
if err != nil {
return nil, err
}
q = s.setupEsQuery(q, in, &pageSize, &from, &orderBy) q = s.setupEsQuery(q, in, &pageSize, &from, &orderBy)
if s.Args.Dev && len(in.SearchIndices) == 0 {
// If we're running in dev mode ignore the mainnet claims index
indices, err := client.IndexNames()
if err != nil {
log.Fatalln(err)
}
var numIndices = len(indices)
searchIndices = make([]string, 0, numIndices)
for i := 0; i < numIndices; i++ {
if indices[i] == "claims" {
continue
}
searchIndices = append(searchIndices, indices[i])
}
}
if len(in.SearchIndices) > 0 {
searchIndices = in.SearchIndices
}
fsc := elastic.NewFetchSourceContext(true).Exclude("description", "title") fsc := elastic.NewFetchSourceContext(true).Exclude("description", "title")
search := client.Search(). search := client.Search().
Index(searchIndices...). Index(searchIndices...).
@ -185,14 +166,17 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs,
Query(q). // specify the query Query(q). // specify the query
From(0).Size(DefaultSearchSize) From(0).Size(DefaultSearchSize)
for _, x := range orderBy { for _, x := range orderBy {
search = search.Sort(x.Field, x.IsAsc) search = search.Sort(x.Field, x.IsAsc)
} }
searchResult, err := search.Do(ctx) // execute searchResult, err := search.Do(ctx) // execute
if err != nil { if err != nil && elastic.IsNotFound(err) {
log.Println(err) log.Println("Index returned 404! Check writer. Index: ", searchIndices)
return &pb.Outputs{}, nil
} else if err != nil {
log.Println("Error executing query: ", err)
return nil, err return nil, err
} }
@ -200,12 +184,12 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs,
txos, extraTxos, blocked := s.postProcessResults(ctx, client, searchResult, in, pageSize, from, searchIndices) txos, extraTxos, blocked := s.postProcessResults(ctx, client, searchResult, in, pageSize, from, searchIndices)
if in.NoTotals != nil && !in.NoTotals.Value { if in.NoTotals {
return &pb.Outputs{ return &pb.Outputs{
Txos: txos, Txos: txos,
ExtraTxos: extraTxos, ExtraTxos: extraTxos,
Offset: uint32(int64(from) + searchResult.TotalHits()), Offset: uint32(int64(from) + searchResult.TotalHits()),
Blocked: blocked, Blocked: blocked,
}, nil }, nil
} }
@ -214,11 +198,11 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs,
blockedTotal += b.Count blockedTotal += b.Count
} }
return &pb.Outputs{ return &pb.Outputs{
Txos: txos, Txos: txos,
ExtraTxos: extraTxos, ExtraTxos: extraTxos,
Total: uint32(searchResult.TotalHits()), Total: uint32(searchResult.TotalHits()),
Offset: uint32(int64(from) + searchResult.TotalHits()), Offset: uint32(int64(from) + searchResult.TotalHits()),
Blocked: blocked, Blocked: blocked,
BlockedTotal: blockedTotal, BlockedTotal: blockedTotal,
}, nil }, nil
} }
@ -234,7 +218,6 @@ func (s *Server) normalizeTag(tag string) string {
return string(res) return string(res)
} }
func (s *Server) cleanTags(tags []string) []string { func (s *Server) cleanTags(tags []string) []string {
cleanedTags := make([]string, len(tags)) cleanedTags := make([]string, len(tags))
for i, tag := range tags { for i, tag := range tags {
@ -269,18 +252,18 @@ func (s *Server) postProcessResults(
//printJsonFullResults(searchResult) //printJsonFullResults(searchResult)
records, blockedRecords, blockedMap = removeBlocked(records) records, blockedRecords, blockedMap = removeBlocked(records)
if in.RemoveDuplicates != nil { if in.RemoveDuplicates {
records = removeDuplicates(records) records = removeDuplicates(records)
} }
if in.LimitClaimsPerChannel != nil && in.LimitClaimsPerChannel.Value > 0 { if in.LimitClaimsPerChannel > 0 {
records = searchAhead(records, pageSize, int(in.LimitClaimsPerChannel.Value)) records = searchAhead(records, pageSize, int(in.LimitClaimsPerChannel))
} }
finalLength := int(math.Min(float64(len(records)), float64(pageSize))) finalLength := int(math.Min(float64(len(records)), float64(pageSize)))
txos = make([]*pb.Output, 0, finalLength) txos = make([]*pb.Output, 0, finalLength)
var j = 0 var j = 0
for i := from; i < from + finalLength && i < len(records) && j < finalLength; i++ { for i := from; i < from+finalLength && i < len(records) && j < finalLength; i++ {
t := records[i] t := records[i]
res := t.recordToOutput() res := t.recordToOutput()
txos = append(txos, res) txos = append(txos, res)
@ -319,80 +302,91 @@ func (s *Server) postProcessResults(
return txos, extraTxos, blocked return txos, extraTxos, blocked
} }
func (s *Server) checkQuery(in *pb.SearchRequest) error {
limit := 2048
checks := map[string]bool{
"claim_ids": in.ClaimId != nil && !in.ClaimId.Invert && len(in.ClaimId.Value) > limit,
"not_claim_ids": in.ClaimId != nil && in.ClaimId.Invert && len(in.ClaimId.Value) > limit,
"channel_ids": in.ChannelId != nil && !in.ChannelId.Invert && len(in.ChannelId.Value) > limit,
"not_channel_ids": in.ChannelId != nil && in.ChannelId.Invert && len(in.ChannelId.Value) > limit,
"not_tags": len(in.NotTags) > limit,
"all_tags": len(in.AllTags) > limit,
"any_tags": len(in.AnyTags) > limit,
"any_languages": len(in.AnyLanguages) > limit,
}
for name, failed := range checks {
if failed {
time.Sleep(2) // throttle
return errors.New(fmt.Sprintf("%s cant have more than %d items.", name, limit))
}
}
return nil
}
func (s *Server) setupEsQuery( func (s *Server) setupEsQuery(
q *elastic.BoolQuery, q *elastic.BoolQuery,
in *pb.SearchRequest, in *pb.SearchRequest,
pageSize *int, pageSize *int,
from *int, from *int,
orderBy *[]orderField) *elastic.BoolQuery { orderBy *[]orderField) *elastic.BoolQuery {
claimTypes := map[string]int { claimTypes := map[string]int{
"stream": 1, "stream": 1,
"channel": 2, "channel": 2,
"repost": 3, "repost": 3,
"collection": 4, "collection": 4,
} }
streamTypes := map[string]int { streamTypes := map[string]int{
"video": 1, "video": 1,
"audio": 2, "audio": 2,
"image": 3, "image": 3,
"document": 4, "document": 4,
"binary": 5, "binary": 5,
"model": 6, "model": 6,
} }
replacements := map[string]string { replacements := map[string]string{
"name": "normalized", "name": "normalized_name",
"txid": "tx_id", "txid": "tx_id",
"claim_hash": "_id", "claim_hash": "_id",
} }
textFields := map[string]bool { textFields := map[string]bool{
"author": true, "author": true,
"canonical_url": true, "canonical_url": true,
"channel_id": true, "channel_id": true,
"claim_name": true, "claim_name": true,
"description": true, "description": true,
"claim_id": true, "claim_id": true,
"media_type": true, "media_type": true,
"normalized": true, "normalized_name": true,
"public_key_bytes": true, "public_key_bytes": true,
"public_key_hash": true, "public_key_id": true,
"short_url": true, "short_url": true,
"signature": true, "signature": true,
"signature_digest": true, "signature_digest": true,
"stream_type": true, "stream_type": true,
"title": true, "title": true,
"tx_id": true, "tx_id": true,
"fee_currency": true, "fee_currency": true,
"reposted_claim_id": true, "reposted_claim_id": true,
"tags": true, "tags": true,
} }
if in.IsControlling != nil { if in.IsControlling {
q = q.Must(elastic.NewTermQuery("is_controlling", in.IsControlling.Value)) q = q.Must(elastic.NewTermQuery("is_controlling", in.IsControlling))
} }
if in.AmountOrder != nil { if in.Limit > 0 {
in.Limit.Value = 1 *pageSize = int(in.Limit)
in.OrderBy = []string{"effective_amount"}
in.Offset = &wrappers.Int32Value{Value: in.AmountOrder.Value - 1}
} }
if in.Limit != nil { if in.Offset > 0 {
*pageSize = int(in.Limit.Value) *from = int(in.Offset)
} }
if in.Offset != nil { if len(in.ClaimName) > 0 {
*from = int(in.Offset.Value) in.NormalizedName = util.NormalizeName(in.ClaimName)
}
if len(in.Name) > 0 {
normalized := make([]string, len(in.Name))
for i := 0; i < len(in.Name); i++ {
normalized[i] = util.NormalizeName(in.Name[i])
}
in.Normalized = normalized
} }
if len(in.OrderBy) > 0 { if len(in.OrderBy) > 0 {
@ -432,21 +426,6 @@ func (s *Server) setupEsQuery(
q = q.Must(elastic.NewTermsQuery("stream_type", searchVals...)) q = q.Must(elastic.NewTermsQuery("stream_type", searchVals...))
} }
if len(in.XId) > 0 {
searchVals := make([]interface{}, len(in.XId))
for i := 0; i < len(in.XId); i++ {
util.ReverseBytesInPlace(in.XId[i])
searchVals[i] = hex.Dump(in.XId[i])
}
if len(in.XId) == 1 && len(in.XId[0]) < 20 {
q = q.Must(elastic.NewPrefixQuery("_id", string(in.XId[0])))
} else {
q = q.Must(elastic.NewTermsQuery("_id", searchVals...))
}
}
if in.ClaimId != nil { if in.ClaimId != nil {
searchVals := StrArrToInterface(in.ClaimId.Value) searchVals := StrArrToInterface(in.ClaimId.Value)
if len(in.ClaimId.Value) == 1 && len(in.ClaimId.Value[0]) < 20 { if len(in.ClaimId.Value) == 1 && len(in.ClaimId.Value[0]) < 20 {
@ -466,18 +445,18 @@ func (s *Server) setupEsQuery(
if in.PublicKeyId != "" { if in.PublicKeyId != "" {
value := hex.EncodeToString(base58.Decode(in.PublicKeyId)[1:21]) value := hex.EncodeToString(base58.Decode(in.PublicKeyId)[1:21])
q = q.Must(elastic.NewTermQuery("public_key_hash.keyword", value)) q = q.Must(elastic.NewTermQuery("public_key_id.keyword", value))
} }
if in.HasChannelSignature != nil && in.HasChannelSignature.Value { if in.HasChannelSignature {
q = q.Must(elastic.NewExistsQuery("signature_digest")) q = q.Must(elastic.NewExistsQuery("signature_digest"))
if in.SignatureValid != nil { if in.IsSignatureValid != nil {
q = q.Must(elastic.NewTermQuery("signature_valid", in.SignatureValid.Value)) q = q.Must(elastic.NewTermQuery("is_signature_valid", in.IsSignatureValid.Value))
} }
} else if in.SignatureValid != nil { } else if in.IsSignatureValid != nil {
q = q.MinimumNumberShouldMatch(1) q = q.MinimumNumberShouldMatch(1)
q = q.Should(elastic.NewBoolQuery().MustNot(elastic.NewExistsQuery("signature_digest"))) q = q.Should(elastic.NewBoolQuery().MustNot(elastic.NewExistsQuery("signature_digest")))
q = q.Should(elastic.NewTermQuery("signature_valid", in.SignatureValid.Value)) q = q.Should(elastic.NewTermQuery("is_signature_valid", in.IsSignatureValid.Value))
} }
if in.HasSource != nil { if in.HasSource != nil {
@ -492,22 +471,20 @@ func (s *Server) setupEsQuery(
q = q.Must(elastic.NewTermQuery("tx_nout", in.TxNout.Value)) q = q.Must(elastic.NewTermQuery("tx_nout", in.TxNout.Value))
} }
q = AddTermsField(q, in.PublicKeyHash, "public_key_hash.keyword") q = AddTermField(q, in.Author, "author.keyword")
q = AddTermsField(q, in.Author, "author.keyword") q = AddTermField(q, in.Title, "title.keyword")
q = AddTermsField(q, in.Title, "title.keyword") q = AddTermField(q, in.CanonicalUrl, "canonical_url.keyword")
q = AddTermsField(q, in.CanonicalUrl, "canonical_url.keyword") q = AddTermField(q, in.ClaimName, "claim_name.keyword")
q = AddTermsField(q, in.ClaimName, "claim_name.keyword") q = AddTermField(q, in.Description, "description.keyword")
q = AddTermsField(q, in.Description, "description.keyword")
q = AddTermsField(q, in.MediaType, "media_type.keyword") q = AddTermsField(q, in.MediaType, "media_type.keyword")
q = AddTermsField(q, in.Normalized, "normalized.keyword") q = AddTermField(q, in.NormalizedName, "normalized_name.keyword")
q = AddTermsField(q, in.PublicKeyBytes, "public_key_bytes.keyword") q = AddTermField(q, in.PublicKeyBytes, "public_key_bytes.keyword")
q = AddTermsField(q, in.ShortUrl, "short_url.keyword") q = AddTermField(q, in.ShortUrl, "short_url.keyword")
q = AddTermsField(q, in.Signature, "signature.keyword") q = AddTermField(q, in.Signature, "signature.keyword")
q = AddTermsField(q, in.SignatureDigest, "signature_digest.keyword") q = AddTermField(q, in.SignatureDigest, "signature_digest.keyword")
q = AddTermsField(q, in.TxId, "tx_id.keyword") q = AddTermField(q, in.TxId, "tx_id.keyword")
q = AddTermsField(q, in.FeeCurrency, "fee_currency.keyword") q = AddTermField(q, in.FeeCurrency, "fee_currency.keyword")
q = AddTermsField(q, in.RepostedClaimId, "reposted_claim_id.keyword") q = AddTermField(q, in.RepostedClaimId, "reposted_claim_id.keyword")
q = AddTermsField(q, s.cleanTags(in.AnyTags), "tags.keyword") q = AddTermsField(q, s.cleanTags(in.AnyTags), "tags.keyword")
q = AddIndividualTermFields(q, s.cleanTags(in.AllTags), "tags.keyword", false) q = AddIndividualTermFields(q, s.cleanTags(in.AllTags), "tags.keyword", false)
@ -516,8 +493,6 @@ func (s *Server) setupEsQuery(
q = AddIndividualTermFields(q, in.AllLanguages, "languages", false) q = AddIndividualTermFields(q, in.AllLanguages, "languages", false)
q = AddInvertibleField(q, in.ChannelId, "channel_id.keyword") q = AddInvertibleField(q, in.ChannelId, "channel_id.keyword")
q = AddInvertibleField(q, in.ChannelIds, "channel_id.keyword")
q = AddRangeField(q, in.TxPosition, "tx_position") q = AddRangeField(q, in.TxPosition, "tx_position")
q = AddRangeField(q, in.Amount, "amount") q = AddRangeField(q, in.Amount, "amount")
@ -528,7 +503,7 @@ func (s *Server) setupEsQuery(
q = AddRangeField(q, in.ActivationHeight, "activation_height") q = AddRangeField(q, in.ActivationHeight, "activation_height")
q = AddRangeField(q, in.ExpirationHeight, "expiration_height") q = AddRangeField(q, in.ExpirationHeight, "expiration_height")
q = AddRangeField(q, in.ReleaseTime, "release_time") q = AddRangeField(q, in.ReleaseTime, "release_time")
q = AddRangeField(q, in.Reposted, "reposted") q = AddRangeField(q, in.RepostCount, "repost_count")
q = AddRangeField(q, in.FeeAmount, "fee_amount") q = AddRangeField(q, in.FeeAmount, "fee_amount")
q = AddRangeField(q, in.Duration, "duration") q = AddRangeField(q, in.Duration, "duration")
q = AddRangeField(q, in.CensorType, "censor_type") q = AddRangeField(q, in.CensorType, "censor_type")
@ -568,9 +543,9 @@ func getUniqueChannels(records []*record, client *elastic.Client, ctx context.Co
mget = mget.Add(nmget) mget = mget.Add(nmget)
totalChannels++ totalChannels++
} }
if r.CensorType != 0 && !channelsSet[r.CensoringChannelHash] { if r.CensorType != 0 && !channelsSet[r.CensoringChannelId] {
channelsSet[r.CensoringChannelHash] = true channelsSet[r.CensoringChannelId] = true
nmget := elastic.NewMultiGetItem().Id(r.CensoringChannelHash).Index(searchIndex) nmget := elastic.NewMultiGetItem().Id(r.CensoringChannelId).Index(searchIndex)
mget = mget.Add(nmget) mget = mget.Add(nmget)
totalChannels++ totalChannels++
} }
@ -608,13 +583,13 @@ func getUniqueChannels(records []*record, client *elastic.Client, ctx context.Co
func getClaimsForReposts(ctx context.Context, client *elastic.Client, records []*record, searchIndices []string) ([]*pb.Output, []*record, map[string]*pb.Output) { func getClaimsForReposts(ctx context.Context, client *elastic.Client, records []*record, searchIndices []string) ([]*pb.Output, []*record, map[string]*pb.Output) {
var totalReposted = 0 var totalReposted = 0
var mget = client.Mget()//.StoredFields("_id") var mget = client.Mget() //.StoredFields("_id")
/* /*
var nmget = elastic.NewMultiGetItem() var nmget = elastic.NewMultiGetItem()
for _, index := range searchIndices { for _, index := range searchIndices {
nmget = nmget.Index(index) nmget = nmget.Index(index)
} }
*/ */
for _, r := range records { for _, r := range records {
for _, searchIndex := range searchIndices { for _, searchIndex := range searchIndices {
if r.RepostedClaimId != "" { if r.RepostedClaimId != "" {
@ -656,7 +631,7 @@ func getClaimsForReposts(ctx context.Context, client *elastic.Client, records []
} }
func searchAhead(searchHits []*record, pageSize int, perChannelPerPage int) []*record { func searchAhead(searchHits []*record, pageSize int, perChannelPerPage int) []*record {
finalHits := make([]*record, 0 , len(searchHits)) finalHits := make([]*record, 0, len(searchHits))
var channelCounters map[string]int var channelCounters map[string]int
channelCounters = make(map[string]int) channelCounters = make(map[string]int)
nextPageHitsMaybeCheckLater := deque.New() nextPageHitsMaybeCheckLater := deque.New()
@ -665,7 +640,7 @@ func searchAhead(searchHits []*record, pageSize int, perChannelPerPage int) []*r
searchHitsQ.PushRight(rec) searchHitsQ.PushRight(rec)
} }
for !searchHitsQ.Empty() || !nextPageHitsMaybeCheckLater.Empty() { for !searchHitsQ.Empty() || !nextPageHitsMaybeCheckLater.Empty() {
if len(finalHits) > 0 && len(finalHits) % pageSize == 0 { if len(finalHits) > 0 && len(finalHits)%pageSize == 0 {
channelCounters = make(map[string]int) channelCounters = make(map[string]int)
} else if len(finalHits) != 0 { } else if len(finalHits) != 0 {
// means last page was incomplete and we are left with bad replacements // means last page was incomplete and we are left with bad replacements
@ -674,7 +649,7 @@ func searchAhead(searchHits []*record, pageSize int, perChannelPerPage int) []*r
for i := 0; i < nextPageHitsMaybeCheckLater.Size(); i++ { for i := 0; i < nextPageHitsMaybeCheckLater.Size(); i++ {
rec := nextPageHitsMaybeCheckLater.PopLeft().(*record) rec := nextPageHitsMaybeCheckLater.PopLeft().(*record)
if perChannelPerPage > 0 && channelCounters[rec.ChannelId] < perChannelPerPage { if perChannelPerPage > 0 && channelCounters[rec.ChannelId] < perChannelPerPage {
finalHits = append(finalHits, rec) finalHits = append(finalHits, rec)
channelCounters[rec.ChannelId] = channelCounters[rec.ChannelId] + 1 channelCounters[rec.ChannelId] = channelCounters[rec.ChannelId] + 1
} }
@ -686,7 +661,7 @@ func searchAhead(searchHits []*record, pageSize int, perChannelPerPage int) []*r
} else if channelCounters[hit.ChannelId] < perChannelPerPage { } else if channelCounters[hit.ChannelId] < perChannelPerPage {
finalHits = append(finalHits, hit) finalHits = append(finalHits, hit)
channelCounters[hit.ChannelId] = channelCounters[hit.ChannelId] + 1 channelCounters[hit.ChannelId] = channelCounters[hit.ChannelId] + 1
if len(finalHits) % pageSize == 0 { if len(finalHits)%pageSize == 0 {
break break
} }
} else { } else {
@ -752,13 +727,12 @@ func removeDuplicates(searchHits []*record) []*record {
hitHeight := hit.Height hitHeight := hit.Height
hitId := hit.getHitId() hitId := hit.getHitId()
if knownIds[hitId] == nil { if knownIds[hitId] == nil {
knownIds[hitId] = hit knownIds[hitId] = hit
} else { } else {
prevHit := knownIds[hitId] prevHit := knownIds[hitId]
if hitHeight < prevHit.Height { if hitHeight < prevHit.Height {
knownIds[hitId] = hit knownIds[hitId] = hit
dropped[prevHit] = true dropped[prevHit] = true
} else { } else {
dropped[hit] = true dropped[hit] = true
@ -766,7 +740,7 @@ func removeDuplicates(searchHits []*record) []*record {
} }
} }
deduped := make([]*record, len(searchHits) - len(dropped)) deduped := make([]*record, len(searchHits)-len(dropped))
var i = 0 var i = 0
for _, hit := range searchHits { for _, hit := range searchHits {
@ -785,15 +759,15 @@ func removeBlocked(searchHits []*record) ([]*record, []*record, map[string]*pb.B
blockedChannels := make(map[string]*pb.Blocked) blockedChannels := make(map[string]*pb.Blocked)
for _, r := range searchHits { for _, r := range searchHits {
if r.CensorType != 0 { if r.CensorType != 0 {
if blockedChannels[r.CensoringChannelHash] == nil { if blockedChannels[r.CensoringChannelId] == nil {
blockedObj := &pb.Blocked{ blockedObj := &pb.Blocked{
Count: 1, Count: 1,
Channel: nil, Channel: nil,
} }
blockedChannels[r.CensoringChannelHash] = blockedObj blockedChannels[r.CensoringChannelId] = blockedObj
blockedHits = append(blockedHits, r) blockedHits = append(blockedHits, r)
} else { } else {
blockedChannels[r.CensoringChannelHash].Count += 1 blockedChannels[r.CensoringChannelId].Count += 1
} }
} else { } else {
newHits = append(newHits, r) newHits = append(newHits, r)

View file

@ -2,6 +2,7 @@ package server
import ( import (
"log" "log"
"os"
"regexp" "regexp"
pb "github.com/lbryio/hub/protobuf/go" pb "github.com/lbryio/hub/protobuf/go"
@ -11,7 +12,7 @@ import (
type Server struct { type Server struct {
GrpcServer *grpc.Server GrpcServer *grpc.Server
Args *Args Args *Args
MultiSpaceRe *regexp.Regexp MultiSpaceRe *regexp.Regexp
WeirdCharsRe *regexp.Regexp WeirdCharsRe *regexp.Regexp
EsClient *elastic.Client EsClient *elastic.Client
@ -19,12 +20,13 @@ type Server struct {
} }
type Args struct { type Args struct {
Serve bool Serve bool
Host string Host string
Port string Port string
EsHost string EsHost string
EsPort string EsPort string
Dev bool EsIndex string
Debug bool
} }
/* /*
@ -79,11 +81,21 @@ func MakeHubServer(args *Args) *Server {
log.Fatal(err) log.Fatal(err)
} }
s := &Server { esUrl := args.EsHost + ":" + args.EsPort
GrpcServer: grpcServer, opts := []elastic.ClientOptionFunc{elastic.SetSniff(false), elastic.SetURL(esUrl)}
Args: args, if args.Debug {
opts = append(opts, elastic.SetTraceLog(log.New(os.Stderr, "[[ELASTIC]]", 0)))
}
client, err := elastic.NewClient(opts...)
if err != nil {
log.Fatal(err)
}
s := &Server{
GrpcServer: grpcServer,
Args: args,
MultiSpaceRe: multiSpaceRe, MultiSpaceRe: multiSpaceRe,
WeirdCharsRe: weirdCharsRe, WeirdCharsRe: weirdCharsRe,
EsClient: client,
} }
return s return s