implement remove duplicates

This commit is contained in:
Jeffrey Picard 2021-06-08 22:07:59 -04:00
parent d6af17df23
commit 79db1be087
3 changed files with 99 additions and 76 deletions

View file

@ -90,4 +90,5 @@ message SearchRequest {
.google.protobuf.Int32Value limit_claims_per_channel = 73; .google.protobuf.Int32Value limit_claims_per_channel = 73;
repeated string any_languages = 74; repeated string any_languages = 74;
repeated string all_languages = 75; repeated string all_languages = 75;
.google.protobuf.BoolValue remove_duplicates = 76;
} }

View file

@ -253,6 +253,7 @@ type SearchRequest struct {
LimitClaimsPerChannel *wrapperspb.Int32Value `protobuf:"bytes,73,opt,name=limit_claims_per_channel,json=limitClaimsPerChannel,proto3" json:"limit_claims_per_channel"` LimitClaimsPerChannel *wrapperspb.Int32Value `protobuf:"bytes,73,opt,name=limit_claims_per_channel,json=limitClaimsPerChannel,proto3" json:"limit_claims_per_channel"`
AnyLanguages []string `protobuf:"bytes,74,rep,name=any_languages,json=anyLanguages,proto3" json:"any_languages"` AnyLanguages []string `protobuf:"bytes,74,rep,name=any_languages,json=anyLanguages,proto3" json:"any_languages"`
AllLanguages []string `protobuf:"bytes,75,rep,name=all_languages,json=allLanguages,proto3" json:"all_languages"` AllLanguages []string `protobuf:"bytes,75,rep,name=all_languages,json=allLanguages,proto3" json:"all_languages"`
RemoveDuplicates *wrapperspb.BoolValue `protobuf:"bytes,76,opt,name=remove_duplicates,json=removeDuplicates,proto3" json:"remove_duplicates"`
} }
func (x *SearchRequest) Reset() { func (x *SearchRequest) Reset() {
@ -721,6 +722,13 @@ func (x *SearchRequest) GetAllLanguages() []string {
return nil return nil
} }
func (x *SearchRequest) GetRemoveDuplicates() *wrapperspb.BoolValue {
if x != nil {
return x.RemoveDuplicates
}
return nil
}
var File_hub_proto protoreflect.FileDescriptor var File_hub_proto protoreflect.FileDescriptor
var file_hub_proto_rawDesc = []byte{ var file_hub_proto_rawDesc = []byte{
@ -739,7 +747,7 @@ var file_hub_proto_rawDesc = []byte{
0x76, 0x61, 0x6c, 0x75, 0x65, 0x22, 0x2e, 0x0a, 0x02, 0x4f, 0x70, 0x12, 0x06, 0x0a, 0x02, 0x45, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x22, 0x2e, 0x0a, 0x02, 0x4f, 0x70, 0x12, 0x06, 0x0a, 0x02, 0x45,
0x51, 0x10, 0x00, 0x12, 0x07, 0x0a, 0x03, 0x4c, 0x54, 0x45, 0x10, 0x01, 0x12, 0x07, 0x0a, 0x03, 0x51, 0x10, 0x00, 0x12, 0x07, 0x0a, 0x03, 0x4c, 0x54, 0x45, 0x10, 0x01, 0x12, 0x07, 0x0a, 0x03,
0x47, 0x54, 0x45, 0x10, 0x02, 0x12, 0x06, 0x0a, 0x02, 0x4c, 0x54, 0x10, 0x03, 0x12, 0x06, 0x0a, 0x47, 0x54, 0x45, 0x10, 0x02, 0x12, 0x06, 0x0a, 0x02, 0x4c, 0x54, 0x10, 0x03, 0x12, 0x06, 0x0a,
0x02, 0x47, 0x54, 0x10, 0x04, 0x22, 0xd0, 0x15, 0x0a, 0x0d, 0x53, 0x65, 0x61, 0x72, 0x63, 0x68, 0x02, 0x47, 0x54, 0x10, 0x04, 0x22, 0x99, 0x16, 0x0a, 0x0d, 0x53, 0x65, 0x61, 0x72, 0x63, 0x68,
0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x65, 0x78, 0x74, 0x18, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x65, 0x78, 0x74, 0x18,
0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x74, 0x65, 0x78, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x74, 0x65, 0x78, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x6e,
0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x03, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x03, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12,
@ -912,13 +920,18 @@ var file_hub_proto_rawDesc = []byte{
0x4a, 0x20, 0x03, 0x28, 0x09, 0x52, 0x0c, 0x61, 0x6e, 0x79, 0x4c, 0x61, 0x6e, 0x67, 0x75, 0x61, 0x4a, 0x20, 0x03, 0x28, 0x09, 0x52, 0x0c, 0x61, 0x6e, 0x79, 0x4c, 0x61, 0x6e, 0x67, 0x75, 0x61,
0x67, 0x65, 0x73, 0x12, 0x23, 0x0a, 0x0d, 0x61, 0x6c, 0x6c, 0x5f, 0x6c, 0x61, 0x6e, 0x67, 0x75, 0x67, 0x65, 0x73, 0x12, 0x23, 0x0a, 0x0d, 0x61, 0x6c, 0x6c, 0x5f, 0x6c, 0x61, 0x6e, 0x67, 0x75,
0x61, 0x67, 0x65, 0x73, 0x18, 0x4b, 0x20, 0x03, 0x28, 0x09, 0x52, 0x0c, 0x61, 0x6c, 0x6c, 0x4c, 0x61, 0x67, 0x65, 0x73, 0x18, 0x4b, 0x20, 0x03, 0x28, 0x09, 0x52, 0x0c, 0x61, 0x6c, 0x6c, 0x4c,
0x61, 0x6e, 0x67, 0x75, 0x61, 0x67, 0x65, 0x73, 0x32, 0x31, 0x0a, 0x03, 0x48, 0x75, 0x62, 0x12, 0x61, 0x6e, 0x67, 0x75, 0x61, 0x67, 0x65, 0x73, 0x12, 0x47, 0x0a, 0x11, 0x72, 0x65, 0x6d, 0x6f,
0x2a, 0x0a, 0x06, 0x53, 0x65, 0x61, 0x72, 0x63, 0x68, 0x12, 0x11, 0x2e, 0x70, 0x62, 0x2e, 0x53, 0x76, 0x65, 0x5f, 0x64, 0x75, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x65, 0x73, 0x18, 0x4c, 0x20,
0x65, 0x61, 0x72, 0x63, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x0b, 0x2e, 0x70, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f,
0x62, 0x2e, 0x4f, 0x75, 0x74, 0x70, 0x75, 0x74, 0x73, 0x22, 0x00, 0x42, 0x26, 0x5a, 0x24, 0x67, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x42, 0x6f, 0x6f, 0x6c, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x52,
0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6c, 0x62, 0x72, 0x79, 0x69, 0x6f, 0x10, 0x72, 0x65, 0x6d, 0x6f, 0x76, 0x65, 0x44, 0x75, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x65,
0x2f, 0x68, 0x75, 0x62, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x67, 0x6f, 0x73, 0x32, 0x31, 0x0a, 0x03, 0x48, 0x75, 0x62, 0x12, 0x2a, 0x0a, 0x06, 0x53, 0x65, 0x61, 0x72,
0x2f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, 0x63, 0x68, 0x12, 0x11, 0x2e, 0x70, 0x62, 0x2e, 0x53, 0x65, 0x61, 0x72, 0x63, 0x68, 0x52, 0x65,
0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x0b, 0x2e, 0x70, 0x62, 0x2e, 0x4f, 0x75, 0x74, 0x70, 0x75,
0x74, 0x73, 0x22, 0x00, 0x42, 0x26, 0x5a, 0x24, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63,
0x6f, 0x6d, 0x2f, 0x6c, 0x62, 0x72, 0x79, 0x69, 0x6f, 0x2f, 0x68, 0x75, 0x62, 0x2f, 0x70, 0x72,
0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x67, 0x6f, 0x2f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72,
0x6f, 0x74, 0x6f, 0x33,
} }
var ( var (
@ -978,13 +991,14 @@ var file_hub_proto_depIdxs = []int32{
5, // 30: pb.SearchRequest.has_channel_signature:type_name -> google.protobuf.BoolValue 5, // 30: pb.SearchRequest.has_channel_signature:type_name -> google.protobuf.BoolValue
5, // 31: pb.SearchRequest.has_source:type_name -> google.protobuf.BoolValue 5, // 31: pb.SearchRequest.has_source:type_name -> google.protobuf.BoolValue
4, // 32: pb.SearchRequest.limit_claims_per_channel:type_name -> google.protobuf.Int32Value 4, // 32: pb.SearchRequest.limit_claims_per_channel:type_name -> google.protobuf.Int32Value
3, // 33: pb.Hub.Search:input_type -> pb.SearchRequest 5, // 33: pb.SearchRequest.remove_duplicates:type_name -> google.protobuf.BoolValue
6, // 34: pb.Hub.Search:output_type -> pb.Outputs 3, // 34: pb.Hub.Search:input_type -> pb.SearchRequest
34, // [34:35] is the sub-list for method output_type 6, // 35: pb.Hub.Search:output_type -> pb.Outputs
33, // [33:34] is the sub-list for method input_type 35, // [35:36] is the sub-list for method output_type
33, // [33:33] is the sub-list for extension type_name 34, // [34:35] is the sub-list for method input_type
33, // [33:33] is the sub-list for extension extendee 34, // [34:34] is the sub-list for extension type_name
0, // [0:33] is the sub-list for field type_name 34, // [34:34] is the sub-list for extension extendee
0, // [0:34] is the sub-list for field type_name
} }
func init() { file_hub_proto_init() } func init() { file_hub_proto_init() }

View file

@ -27,6 +27,8 @@ type record struct {
Height uint32 `json:"height"` Height uint32 `json:"height"`
ClaimId string `json:"claim_id"` ClaimId string `json:"claim_id"`
ChannelId string `json:"channel_id"` ChannelId string `json:"channel_id"`
CreationHeight uint32 `json:"creation_height"`
RepostedClaimId string `json:"reposted_claim_id"`
} }
type compareFunc func(r1, r2 **record, invert bool) int type compareFunc func(r1, r2 **record, invert bool) int
@ -506,7 +508,7 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs,
j = j + 1 j = j + 1
} }
fsc := elastic.NewFetchSourceContext(true).Exclude("description", "title") fsc := elastic.NewFetchSourceContext(true).Exclude("description", "title")//.Include("_id")
log.Printf("from: %d, size: %d\n", from, size) log.Printf("from: %d, size: %d\n", from, size)
search := client.Search(). search := client.Search().
Index(searchIndices...). Index(searchIndices...).
@ -577,32 +579,30 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs,
txos = append(txos, res) txos = append(txos, res)
} }
} }
//
//for _, rec := range records {
// log.Println(*rec)
//}
//
//log.Println("#########################")
//
var finalRecords []*record if in.RemoveDuplicates != nil {
for _, rec := range records { records = removeDuplicates(records)
log.Println(*rec)
} }
log.Println("#########################")
if in.LimitClaimsPerChannel != nil { if in.LimitClaimsPerChannel != nil {
finalRecords = searchAhead(records, pageSize, int(in.LimitClaimsPerChannel.Value)) records = searchAhead(records, pageSize, int(in.LimitClaimsPerChannel.Value))
for _, rec := range finalRecords { //for _, rec := range records {
log.Println(*rec) // log.Println(*rec)
} //}
} else {
finalRecords = records
} }
finalLength := int(math.Min(float64(len(finalRecords)), float64(pageSize))) finalLength := int(math.Min(float64(len(records)), float64(pageSize)))
// var start int = from
txos = make([]*pb.Output, 0, finalLength) txos = make([]*pb.Output, 0, finalLength)
//for i, t := range finalRecords {
j = 0 j = 0
for i := from; i < from + finalLength && i < len(finalRecords) && j < finalLength; i++ { for i := from; i < from + finalLength && i < len(records) && j < finalLength; i++ {
t := finalRecords[i] t := records[i]
res := &pb.Output{ res := &pb.Output{
TxHash: util.ToHash(t.Txid), TxHash: util.ToHash(t.Txid),
Nout: t.Nout, Nout: t.Nout,
@ -612,7 +612,7 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs,
j += 1 j += 1
} }
// or if you want more control //// or if you want more control
//for _, hit := range searchResult.Hits.Hits { //for _, hit := range searchResult.Hits.Hits {
// // hit.Index contains the name of the index // // hit.Index contains the name of the index
// //
@ -641,40 +641,6 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs,
}, nil }, nil
} }
/* def __search_ahead(self, search_hits: list, page_size: int, per_channel_per_page: int):
reordered_hits = []
channel_counters = Counter()
next_page_hits_maybe_check_later = deque()
while search_hits or next_page_hits_maybe_check_later:
if reordered_hits and len(reordered_hits) % page_size == 0:
channel_counters.clear()
elif not reordered_hits:
pass
else:
break # means last page was incomplete and we are left with bad replacements
for _ in range(len(next_page_hits_maybe_check_later)):
claim_id, channel_id = next_page_hits_maybe_check_later.popleft()
if per_channel_per_page > 0 and channel_counters[channel_id] < per_channel_per_page:
reordered_hits.append((claim_id, channel_id))
channel_counters[channel_id] += 1
else:
next_page_hits_maybe_check_later.append((claim_id, channel_id))
while search_hits:
hit = search_hits.popleft()
hit_id, hit_channel_id = hit['_id'], hit['_source']['channel_id']
if hit_channel_id is None or per_channel_per_page <= 0:
reordered_hits.append((hit_id, hit_channel_id))
elif channel_counters[hit_channel_id] < per_channel_per_page:
reordered_hits.append((hit_id, hit_channel_id))
channel_counters[hit_channel_id] += 1
if len(reordered_hits) % page_size == 0:
break
else:
next_page_hits_maybe_check_later.append((hit_id, hit_channel_id))
return reordered_hits
*/
func sumCounters(channelCounters map[string]int) int { func sumCounters(channelCounters map[string]int) int {
var sum int = 0 var sum int = 0
@ -702,8 +668,6 @@ func searchAhead(searchHits []*record, pageSize int, perChannelPerPage int) []*r
break break
} }
// log.Printf("searchHitsQ = %d, nextPageHitsMaybeCheckLater = %d\n", searchHitsQ.Size(), nextPageHitsMaybeCheckLater.Size())
for i := 0; i < nextPageHitsMaybeCheckLater.Size(); i++ { for i := 0; i < nextPageHitsMaybeCheckLater.Size(); i++ {
rec := nextPageHitsMaybeCheckLater.PopLeft().(*record) rec := nextPageHitsMaybeCheckLater.PopLeft().(*record)
if perChannelPerPage > 0 && channelCounters[rec.ChannelId] < perChannelPerPage { if perChannelPerPage > 0 && channelCounters[rec.ChannelId] < perChannelPerPage {
@ -728,3 +692,47 @@ func searchAhead(searchHits []*record, pageSize int, perChannelPerPage int) []*r
} }
return finalHits return finalHits
} }
func (r *record) getHitId() string {
if r.RepostedClaimId != "" {
return r.RepostedClaimId
} else {
return r.ClaimId
}
}
func removeDuplicates(searchHits []*record) []*record {
dropped := make(map[*record]bool)
// claim_id -> (creation_height, hit_id), where hit_id is either reposted claim id or original
knownIds := make(map[string]*record)
for _, hit := range searchHits {
hitHeight := hit.Height
hitId := hit.getHitId()
if knownIds[hitId] == nil {
knownIds[hitId] = hit
} else {
prevHit := knownIds[hitId]
if hitHeight < prevHit.Height {
knownIds[hitId] = hit
dropped[prevHit] = true
} else {
dropped[hit] = true
}
}
}
deduped := make([]*record, len(searchHits) - len(dropped))
var i = 0
for _, hit := range searchHits {
if !dropped[hit] {
deduped[i] = hit
i++
}
}
return deduped
}