more features

This commit is contained in:
Jeffrey Picard 2021-05-18 06:02:55 -04:00
parent a671683fe8
commit e2d71261dd
4 changed files with 762 additions and 500 deletions

View file

@ -25,6 +25,8 @@ func parseArgs(searchRequest *pb.SearchRequest) {
id := flag.String("id", "", "_id")
author := flag.String("author", "", "author")
title := flag.String("title", "", "title")
channelName := flag.String("channelName", "", "channel name")
description := flag.String("description", "", "description")
flag.Parse()
@ -43,6 +45,12 @@ func parseArgs(searchRequest *pb.SearchRequest) {
if *title != "" {
searchRequest.Title = []string{*title}
}
if *channelName != "" {
searchRequest.ChannelId = &pb.InvertibleField{Invert: false, Value: []string{*channelName}}
}
if *description != "" {
searchRequest.Description = []string{*description}
}
}
func main() {

View file

@ -8,7 +8,12 @@ service Hub {
rpc Search (SearchRequest) returns (SearchReply) {}
}
message RangeQuery {
message InvertibleField {
bool invert = 1;
repeated string value = 2;
}
message RangeField {
enum Op {
EQ = 0;
LTE = 1;
@ -25,46 +30,46 @@ message SearchRequest {
string name = 2;
int32 amount_order = 3;
int32 limit = 4;
string order_by = 5;
repeated string order_by = 5;
int32 offset = 6;
bool is_controlling = 7;
string last_take_over_height = 20;
repeated string claim_id = 21;
InvertibleField claim_id = 21;
repeated string claim_name = 22;
repeated string normalized = 23;
RangeQuery tx_position = 24;
RangeQuery amount = 25;
RangeQuery timestamp = 26;
RangeQuery creation_timestamp = 27;
RangeQuery height = 28;
RangeQuery creation_height = 29;
RangeQuery activation_height = 30;
RangeQuery expiration_height = 31;
RangeQuery release_time = 32;
RangeField tx_position = 24;
RangeField amount = 25;
RangeField timestamp = 26;
RangeField creation_timestamp = 27;
RangeField height = 28;
RangeField creation_height = 29;
RangeField activation_height = 30;
RangeField expiration_height = 31;
RangeField release_time = 32;
repeated string short_url = 33;
repeated string canonical_url = 34;
repeated string title = 35;
repeated string author = 36;
repeated string description = 37;
repeated string claim_type = 38;
RangeQuery reposted = 39;
RangeField reposted = 39;
repeated string stream_type = 40;
repeated string media_type = 41;
RangeQuery fee_amount = 42;
RangeField fee_amount = 42;
repeated string fee_currency = 43;
RangeQuery duration = 44;
RangeField duration = 44;
string reposted_claim_hash = 45;
RangeQuery censor_type = 46;
RangeField censor_type = 46;
string claims_in_channel = 47;
RangeQuery channel_join = 48;
string signature_valid = 49;
RangeQuery effective_amount = 50;
RangeQuery support_amount = 51;
RangeQuery trending_group = 52;
RangeQuery trending_mixed = 53;
RangeQuery trending_local = 54;
RangeQuery trending_global = 55;
repeated string channel_id = 56;
RangeField channel_join = 48;
bool signature_valid = 49;
RangeField effective_amount = 50;
RangeField support_amount = 51;
RangeField trending_group = 52;
RangeField trending_mixed = 53;
RangeField trending_local = 54;
RangeField trending_global = 55;
InvertibleField channel_id = 56;
repeated string tx_id = 57;
string tx_nout = 58;
repeated string signature = 59;
@ -73,8 +78,11 @@ message SearchRequest {
repeated string public_key_hash = 62;
string public_key_id = 63;
repeated bytes _id = 64;
repeated string tags = 65;
InvertibleField tags = 65;
repeated string reposted_claim_id = 66;
bool has_channel_signature = 67;
bool has_source = 68;
int32 limit_claims_per_channel = 69;
}
message SearchReply {

File diff suppressed because it is too large Load diff

View file

@ -16,6 +16,11 @@ type record struct {
Nout uint32 `json:"tx_nout"`
}
type orderField struct {
Field string
is_asc bool
}
func ReverseBytes(s []byte) {
for i, j := 0, len(s)-1; i < j; i, j = i+1, j-1 {
s[i], s[j] = s[j], s[i]
@ -30,7 +35,7 @@ func StrArrToInterface(arr []string) []interface{} {
return searchVals
}
func AddTermsQuery(arr []string, name string, q *elastic.BoolQuery) *elastic.BoolQuery {
func AddTermsField(arr []string, name string, q *elastic.BoolQuery) *elastic.BoolQuery {
if len(arr) > 0 {
searchVals := StrArrToInterface(arr)
return q.Must(elastic.NewTermsQuery(name, searchVals...))
@ -38,30 +43,42 @@ func AddTermsQuery(arr []string, name string, q *elastic.BoolQuery) *elastic.Boo
return q
}
func AddRangeQuery(rq *pb.RangeQuery, name string, q *elastic.BoolQuery) *elastic.BoolQuery {
func AddRangeField(rq *pb.RangeField, name string, q *elastic.BoolQuery) *elastic.BoolQuery {
if rq == nil {
return q
}
if len(rq.Value) > 1 {
if rq.Op != pb.RangeQuery_EQ {
if rq.Op != pb.RangeField_EQ {
return q
}
return AddTermsQuery(rq.Value, name, q)
return AddTermsField(rq.Value, name, q)
}
if rq.Op == pb.RangeQuery_EQ {
return AddTermsQuery(rq.Value, name, q)
} else if rq.Op == pb.RangeQuery_LT {
if rq.Op == pb.RangeField_EQ {
return AddTermsField(rq.Value, name, q)
} else if rq.Op == pb.RangeField_LT {
return q.Must(elastic.NewRangeQuery(name).Lt(rq.Value))
} else if rq.Op == pb.RangeQuery_LTE {
} else if rq.Op == pb.RangeField_LTE {
return q.Must(elastic.NewRangeQuery(name).Lte(rq.Value))
} else if rq.Op == pb.RangeQuery_GT {
} else if rq.Op == pb.RangeField_GT {
return q.Must(elastic.NewRangeQuery(name).Gt(rq.Value))
} else { // pb.RangeQuery_GTE
} else { // pb.RangeField_GTE
return q.Must(elastic.NewRangeQuery(name).Gte(rq.Value))
}
}
func AddInvertibleField(field *pb.InvertibleField, name string, q *elastic.BoolQuery) *elastic.BoolQuery {
if field == nil {
return q
}
searchVals := StrArrToInterface(field.Value)
if field.Invert {
return q.MustNot(elastic.NewTermsQuery(name, searchVals...))
} else {
return q.Must(elastic.NewTermsQuery(name, searchVals...))
}
}
func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.SearchReply, error) {
// TODO: reuse elastic client across requests
client, err := elastic.NewClient(elastic.SetSniff(false))
@ -75,6 +92,48 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.SearchRe
"repost": 3,
"collection": 4,
}
streamTypes := map[string]int {
"video": 1,
"audio": 2,
"image": 3,
"document": 4,
"binary": 5,
"model": 6,
}
replacements := map[string]string {
"name": "normalized",
"txid": "tx_id",
"claim_hash": "_id",
}
textFields := map[string]bool {
"author": true,
"canonical_url": true,
"channel_id": true,
"claim_name": true,
"description": true,
"claim_id": true,
"media_type": true,
"normalized": true,
"public_key_bytes": true,
"public_key_hash": true,
"short_url": true,
"signature": true,
"signature_digest": true,
"stream_type": true,
"title": true,
"tx_id": true,
"fee_currency": true,
"reposted_claim_id": true,
"tags": true,
}
var from = 0
var size = 10
var orderBy []orderField
// Ping the Elasticsearch server to get e.g. the version number
//_, code, err := client.Ping("http://127.0.0.1:9200").Do(ctx)
//if err != nil {
@ -90,10 +149,36 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.SearchRe
if in.AmountOrder > 0 {
in.Limit = 1
in.OrderBy = "effective_amount"
in.OrderBy = []string{"effective_amount"}
in.Offset = in.AmountOrder - 1
}
if in.Limit > 0 {
size = int(in.Limit)
}
if in.Offset > 0 {
from = int(in.Offset)
}
if len(in.OrderBy) > 0 {
for _, x := range in.OrderBy {
var toAppend string
var is_asc = false
if x[0] == '^' {
is_asc = true
x = x[1:]
}
if _, ok := replacements[x]; ok {
toAppend = replacements[x]
}
if _, ok := textFields[x]; ok {
toAppend = x + ".keyword"
}
orderBy = append(orderBy, orderField{toAppend, is_asc})
}
}
if len(in.ClaimType) > 0 {
searchVals := make([]interface{}, len(in.ClaimType))
for i := 0; i < len(in.ClaimType); i++ {
@ -102,6 +187,15 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.SearchRe
q = q.Must(elastic.NewTermsQuery("claim_type", searchVals...))
}
// FIXME is this a text field or not?
if len(in.StreamType) > 0 {
searchVals := make([]interface{}, len(in.StreamType))
for i := 0; i < len(in.StreamType); i++ {
searchVals[i] = streamTypes[in.StreamType[i]]
}
q = q.Must(elastic.NewTermsQuery("stream_type.keyword", searchVals...))
}
if len(in.XId) > 0 {
searchVals := make([]interface{}, len(in.XId))
for i := 0; i < len(in.XId); i++ {
@ -116,12 +210,20 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.SearchRe
}
if len(in.ClaimId) > 0 {
searchVals := StrArrToInterface(in.ClaimId)
if len(in.ClaimId) == 1 && len(in.ClaimId[0]) < 20 {
q = q.Must(elastic.NewPrefixQuery("claim_id.keyword", in.ClaimId[0]))
if in.ClaimId != nil {
searchVals := StrArrToInterface(in.ClaimId.Value)
if len(in.ClaimId.Value) == 1 && len(in.ClaimId.Value[0]) < 20 {
if in.ClaimId.Invert {
q = q.MustNot(elastic.NewPrefixQuery("claim_id.keyword", in.ClaimId.Value[0]))
} else {
q = q.Must(elastic.NewPrefixQuery("claim_id.keyword", in.ClaimId.Value[0]))
}
} else {
q = q.Must(elastic.NewTermsQuery("claim_id.keyword", searchVals...))
if in.ClaimId.Invert {
q = q.MustNot(elastic.NewTermsQuery("claim_id.keyword", searchVals...))
} else {
q = q.Must(elastic.NewTermsQuery("claim_id.keyword", searchVals...))
}
}
}
@ -130,45 +232,73 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.SearchRe
q = q.Must(elastic.NewTermQuery("public_key_hash.keyword", value))
}
q = AddTermsQuery(in.PublicKeyHash, "public_key_hash.keyword", q)
q = AddTermsQuery(in.Author, "author.keyword", q)
q = AddTermsQuery(in.Title, "title.keyword", q)
q = AddTermsQuery(in.CanonicalUrl, "canonical_url.keyword", q)
q = AddTermsQuery(in.ChannelId, "channel_id.keyword", q)
q = AddTermsQuery(in.ClaimName, "claim_name.keyword", q)
q = AddTermsQuery(in.Description, "description.keyword", q)
q = AddTermsQuery(in.MediaType, "media_type.keyword", q)
q = AddTermsQuery(in.Normalized, "normalized.keyword", q)
q = AddTermsQuery(in.PublicKeyBytes, "public_key_bytes.keyword", q)
q = AddTermsQuery(in.ShortUrl, "short_url.keyword", q)
q = AddTermsQuery(in.Signature, "signature.keyword", q)
q = AddTermsQuery(in.SignatureDigest, "signature_digest.keyword", q)
q = AddTermsQuery(in.StreamType, "stream_type.keyword", q)
q = AddTermsQuery(in.TxId, "tx_id.keyword", q)
q = AddTermsQuery(in.FeeCurrency, "fee_currency.keyword", q)
q = AddTermsQuery(in.RepostedClaimId, "reposted_claim_id.keyword", q)
q = AddTermsQuery(in.Tags, "tags.keyword", q)
if in.HasChannelSignature {
q = q.Should(elastic.NewBoolQuery().Must(elastic.NewExistsQuery("signature_digest")))
if in.SignatureValid {
q = q.Should(elastic.NewTermQuery("signature_valid", in.SignatureValid))
}
} else if in.SignatureValid {
//FIXME Might need to abstract this to another message so we can tell if the param is passed
//without relying on it's truth value
q = q.MinimumNumberShouldMatch(1)
q = q.Should(elastic.NewBoolQuery().MustNot(elastic.NewExistsQuery("signature_digest")))
q = q.Should(elastic.NewTermQuery("signature_valid", in.SignatureValid))
}
q = AddRangeQuery(in.TxPosition, "tx_position", q)
q = AddRangeQuery(in.Amount, "amount", q)
q = AddRangeQuery(in.Timestamp, "timestamp", q)
q = AddRangeQuery(in.CreationTimestamp, "creation_timestamp", q)
q = AddRangeQuery(in.Height, "height", q)
q = AddRangeQuery(in.CreationHeight, "creation_height", q)
q = AddRangeQuery(in.ActivationHeight, "activation_height", q)
q = AddRangeQuery(in.ExpirationHeight, "expiration_height", q)
q = AddRangeQuery(in.ReleaseTime, "release_time", q)
q = AddRangeQuery(in.Reposted, "reposted", q)
q = AddRangeQuery(in.FeeAmount, "fee_amount", q)
q = AddRangeQuery(in.Duration, "duration", q)
q = AddRangeQuery(in.CensorType, "censor_type", q)
q = AddRangeQuery(in.ChannelJoin, "channel_join", q)
q = AddRangeQuery(in.EffectiveAmount, "effective_amount", q)
q = AddRangeQuery(in.SupportAmount, "support_amount", q)
q = AddRangeQuery(in.TrendingGroup, "trending_group", q)
q = AddRangeQuery(in.TrendingMixed, "trending_mixed", q)
q = AddRangeQuery(in.TrendingLocal, "trending_local", q)
q = AddRangeQuery(in.TrendingGlobal, "trending_global", q)
if in.HasSource {
q = q.MinimumNumberShouldMatch(1)
isStreamOrReport := elastic.NewTermsQuery("claim_type", claimTypes["stream"], claimTypes["repost"])
q = q.Should(elastic.NewBoolQuery().Must(isStreamOrReport, elastic.NewMatchQuery("has_source", in.HasSource)))
q = q.Should(elastic.NewBoolQuery().MustNot(isStreamOrReport))
q = q.Should(elastic.NewBoolQuery().Must(elastic.NewTermQuery("reposted_claim_type", claimTypes["channel"])))
}
var collapse *elastic.CollapseBuilder
if in.LimitClaimsPerChannel > 0 {
innerHit := elastic.NewInnerHit().Size(int(in.LimitClaimsPerChannel)).Name("channel_id.keyword")
collapse = elastic.NewCollapseBuilder("channel_id.keyword").InnerHit(innerHit)
}
q = AddTermsField(in.PublicKeyHash, "public_key_hash.keyword", q)
q = AddTermsField(in.Author, "author.keyword", q)
q = AddTermsField(in.Title, "title.keyword", q)
q = AddTermsField(in.CanonicalUrl, "canonical_url.keyword", q)
q = AddTermsField(in.ClaimName, "claim_name.keyword", q)
q = AddTermsField(in.Description, "description.keyword", q)
q = AddTermsField(in.MediaType, "media_type.keyword", q)
q = AddTermsField(in.Normalized, "normalized.keyword", q)
q = AddTermsField(in.PublicKeyBytes, "public_key_bytes.keyword", q)
q = AddTermsField(in.ShortUrl, "short_url.keyword", q)
q = AddTermsField(in.Signature, "signature.keyword", q)
q = AddTermsField(in.SignatureDigest, "signature_digest.keyword", q)
q = AddTermsField(in.TxId, "tx_id.keyword", q)
q = AddTermsField(in.FeeCurrency, "fee_currency.keyword", q)
q = AddTermsField(in.RepostedClaimId, "reposted_claim_id.keyword", q)
q = AddInvertibleField(in.ChannelId, "channel_id.keyword", q)
q = AddInvertibleField(in.Tags, "tags.keyword", q)
q = AddRangeField(in.TxPosition, "tx_position", q)
q = AddRangeField(in.Amount, "amount", q)
q = AddRangeField(in.Timestamp, "timestamp", q)
q = AddRangeField(in.CreationTimestamp, "creation_timestamp", q)
q = AddRangeField(in.Height, "height", q)
q = AddRangeField(in.CreationHeight, "creation_height", q)
q = AddRangeField(in.ActivationHeight, "activation_height", q)
q = AddRangeField(in.ExpirationHeight, "expiration_height", q)
q = AddRangeField(in.ReleaseTime, "release_time", q)
q = AddRangeField(in.Reposted, "reposted", q)
q = AddRangeField(in.FeeAmount, "fee_amount", q)
q = AddRangeField(in.Duration, "duration", q)
q = AddRangeField(in.CensorType, "censor_type", q)
q = AddRangeField(in.ChannelJoin, "channel_join", q)
q = AddRangeField(in.EffectiveAmount, "effective_amount", q)
q = AddRangeField(in.SupportAmount, "support_amount", q)
q = AddRangeField(in.TrendingGroup, "trending_group", q)
q = AddRangeField(in.TrendingMixed, "trending_mixed", q)
q = AddRangeField(in.TrendingLocal, "trending_local", q)
q = AddRangeField(in.TrendingGlobal, "trending_global", q)
if in.Query != "" {
textQuery := elastic.NewSimpleQueryStringQuery(in.Query).
@ -182,14 +312,18 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.SearchRe
q = q.Must(textQuery)
}
searchResult, err := client.Search().
search := client.Search().
//Index("twitter"). // search in index "twitter"
Query(q). // specify the query
//Sort("user", true). // sort by "user" field, ascending
From(0).Size(10). // take documents 0-9
//Pretty(true). // pretty print request and response JSON
Do(ctx) // execute
From(from).Size(size)
if in.LimitClaimsPerChannel > 0 {
search = search.Collapse(collapse)
}
for _, x := range orderBy {
search = search.Sort(x.Field, x.is_asc)
}
searchResult, err := search.Do(ctx) // execute
if err != nil {
return nil, err
}