Claim search port #5

Merged
jeffreypicard merged 32 commits from claim_search_port into master 2021-06-23 17:07:41 +02:00
4 changed files with 141 additions and 21 deletions
Showing only changes of commit d6af17df23 - Show all commits

View file

@ -1,4 +1,4 @@
#!/bin/bash
go build .
docker build . -t lbry/hub:latest
sudo docker build . -t lbry/hub:latest
lyoshenka commented 2021-06-15 22:13:44 +02:00 (Migrated from github.com)
Review

sudo in a build script is not a good idea. i think you can use docker without sudo if you a dd yourself to the docker group

`sudo` in a build script is not a good idea. i think you can use docker without sudo if you a dd yourself to the docker group
jeffreypicard commented 2021-06-18 01:25:01 +02:00 (Migrated from github.com)
Review

You're right, I set this up on a new ubuntu system and was too lazy to do that.

You're right, I set this up on a new ubuntu system and was too lazy to do that.

1
go.mod
View file

@ -13,4 +13,5 @@ require (
google.golang.org/genproto v0.0.0-20210524171403-669157292da3 // indirect
google.golang.org/grpc v1.38.0
google.golang.org/protobuf v1.26.0
gopkg.in/karalabe/cookiejar.v1 v1.0.0-20141109175019-e1490cae028c
)

2
go.sum
View file

@ -173,6 +173,8 @@ google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/l
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/karalabe/cookiejar.v1 v1.0.0-20141109175019-e1490cae028c h1:4GYkPhjcYLPrPAnoxHVQlH/xcXtWN8pEgqBnHrPAs8c=
gopkg.in/karalabe/cookiejar.v1 v1.0.0-20141109175019-e1490cae028c/go.mod h1:xd7qpr5uPMNy4hsRJ5JEBXA8tJjTFmUI1soCjlCIgAE=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=

View file

@ -7,11 +7,14 @@ import (
"github.com/btcsuite/btcutil/base58"
"github.com/golang/protobuf/ptypes/wrappers"
pb "github.com/lbryio/hub/protobuf/go"
"math"
//"github.com/lbryio/hub/schema"
"github.com/lbryio/hub/util"
"github.com/olivere/elastic/v7"
"golang.org/x/text/cases"
"golang.org/x/text/language"
"gopkg.in/karalabe/cookiejar.v1/collections/deque"
"log"
"reflect"
"sort"
@ -19,26 +22,27 @@ import (
)
type record struct {
Txid string `json:"tx_id"`
Nout uint32 `json:"tx_nout"`
Height uint32 `json:"height"`
ClaimId string `json:"claim_id"`
Txid string `json:"tx_id"`
Nout uint32 `json:"tx_nout"`
Height uint32 `json:"height"`
ClaimId string `json:"claim_id"`
ChannelId string `json:"channel_id"`
}
type compareFunc func(r1, r2 *record, invert bool) int
type compareFunc func(r1, r2 **record, invert bool) int
type multiSorter struct {
records []record
records []*record
compare []compareFunc
invert []bool
}
var compareFuncs = map[string]compareFunc {
"height": func(r1, r2 *record, invert bool) int {
"height": func(r1, r2 **record, invert bool) int {
var res = 0
if r1.Height < r2.Height {
if (*r1).Height < (*r2).Height {
res = -1
} else if r1.Height > r2.Height {
} else if (*r1).Height > (*r2).Height {
res = 1
}
if invert {
@ -49,7 +53,7 @@ var compareFuncs = map[string]compareFunc {
}
// Sort sorts the argument slice according to the less functions passed to OrderedBy.
func (ms *multiSorter) Sort(records []record) {
func (ms *multiSorter) Sort(records []*record) {
ms.records = records
sort.Sort(ms)
}
@ -494,7 +498,7 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs,
}
searchIndices := make([]string, numIndices)
j := 0
for i := 0; i < len(indices); i++ {
for i := 0; j < numIndices; i++ {
if indices[i] == "claims" {
continue
}
@ -508,7 +512,7 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs,
Index(searchIndices...).
FetchSourceContext(fsc).
Query(q). // specify the query
From(from).Size(size)
From(0).Size(1000)
//if in.LimitClaimsPerChannel != nil {
// search = search.Collapse(collapse)
//}
@ -525,17 +529,16 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs,
log.Printf("%s: found %d results in %dms\n", in.Text, len(searchResult.Hits.Hits), searchResult.TookInMillis)
var txos []*pb.Output
var records []record
var records []*record
//if in.LimitClaimsPerChannel == nil {
if true {
txos = make([]*pb.Output, searchResult.TotalHits())
records = make([]record, 0, searchResult.TotalHits())
records = make([]*record, 0, searchResult.TotalHits())
var r record
for _, item := range searchResult.Each(reflect.TypeOf(r)) {
if t, ok := item.(record); ok {
records = append(records, t)
records = append(records, &t)
//txos[i] = &pb.Output{
// TxHash: util.ToHash(t.Txid),
// Nout: t.Nout,
@ -544,7 +547,7 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs,
}
}
} else {
records = make([]record, 0, len(searchResult.Hits.Hits) * int(in.LimitClaimsPerChannel.Value))
records = make([]*record, 0, len(searchResult.Hits.Hits) * int(in.LimitClaimsPerChannel.Value))
txos = make([]*pb.Output, 0, len(searchResult.Hits.Hits) * int(in.LimitClaimsPerChannel.Value))
var i = 0
for _, hit := range searchResult.Hits.Hits {
@ -553,7 +556,7 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs,
if i >= size {
break
}
var t record
var t *record
err := json.Unmarshal(hitt.Source, &t)
if err != nil {
return nil, err
@ -575,12 +578,38 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs,
}
}
for i, t := range records {
txos[i] = &pb.Output{
var finalRecords []*record
for _, rec := range records {
log.Println(*rec)
}
log.Println("#########################")
if in.LimitClaimsPerChannel != nil {
finalRecords = searchAhead(records, pageSize, int(in.LimitClaimsPerChannel.Value))
for _, rec := range finalRecords {
log.Println(*rec)
}
} else {
finalRecords = records
}
finalLength := int(math.Min(float64(len(finalRecords)), float64(pageSize)))
// var start int = from
txos = make([]*pb.Output, 0, finalLength)
//for i, t := range finalRecords {
j = 0
for i := from; i < from + finalLength && i < len(finalRecords) && j < finalLength; i++ {
t := finalRecords[i]
res := &pb.Output{
TxHash: util.ToHash(t.Txid),
Nout: t.Nout,
Height: t.Height,
}
txos = append(txos, res)
j += 1
}
// or if you want more control
@ -611,3 +640,91 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs,
Offset: uint32(int64(from) + searchResult.TotalHits()),
}, nil
}
/* def __search_ahead(self, search_hits: list, page_size: int, per_channel_per_page: int):
reordered_hits = []
channel_counters = Counter()
next_page_hits_maybe_check_later = deque()
while search_hits or next_page_hits_maybe_check_later:
if reordered_hits and len(reordered_hits) % page_size == 0:
channel_counters.clear()
elif not reordered_hits:
pass
else:
break # means last page was incomplete and we are left with bad replacements
for _ in range(len(next_page_hits_maybe_check_later)):
claim_id, channel_id = next_page_hits_maybe_check_later.popleft()
if per_channel_per_page > 0 and channel_counters[channel_id] < per_channel_per_page:
reordered_hits.append((claim_id, channel_id))
channel_counters[channel_id] += 1
else:
next_page_hits_maybe_check_later.append((claim_id, channel_id))
while search_hits:
hit = search_hits.popleft()
hit_id, hit_channel_id = hit['_id'], hit['_source']['channel_id']
if hit_channel_id is None or per_channel_per_page <= 0:
reordered_hits.append((hit_id, hit_channel_id))
elif channel_counters[hit_channel_id] < per_channel_per_page:
reordered_hits.append((hit_id, hit_channel_id))
channel_counters[hit_channel_id] += 1
if len(reordered_hits) % page_size == 0:
break
else:
next_page_hits_maybe_check_later.append((hit_id, hit_channel_id))
return reordered_hits
*/
func sumCounters(channelCounters map[string]int) int {
var sum int = 0
for _, v := range channelCounters {
sum += v
}
return sum
}
func searchAhead(searchHits []*record, pageSize int, perChannelPerPage int) []*record {
finalHits := make([]*record, 0 , len(searchHits))
var channelCounters map[string]int
channelCounters = make(map[string]int)
nextPageHitsMaybeCheckLater := deque.New()
searchHitsQ := deque.New()
for _, rec := range searchHits {
searchHitsQ.PushRight(rec)
}
for !searchHitsQ.Empty() || !nextPageHitsMaybeCheckLater.Empty() {
if len(finalHits) > 0 && len(finalHits) % pageSize == 0 {
channelCounters = make(map[string]int)
} else if len(finalHits) != 0 {
// means last page was incomplete and we are left with bad replacements
break
}
// log.Printf("searchHitsQ = %d, nextPageHitsMaybeCheckLater = %d\n", searchHitsQ.Size(), nextPageHitsMaybeCheckLater.Size())
for i := 0; i < nextPageHitsMaybeCheckLater.Size(); i++ {
rec := nextPageHitsMaybeCheckLater.PopLeft().(*record)
if perChannelPerPage > 0 && channelCounters[rec.ChannelId] < perChannelPerPage {
finalHits = append(finalHits, rec)
channelCounters[rec.ChannelId] = channelCounters[rec.ChannelId] + 1
}
}
for !searchHitsQ.Empty() {
hit := searchHitsQ.PopLeft().(*record)
if hit.ChannelId == "" || perChannelPerPage < 0 {
finalHits = append(finalHits, hit)
} else if channelCounters[hit.ChannelId] < perChannelPerPage {
finalHits = append(finalHits, hit)
channelCounters[hit.ChannelId] = channelCounters[hit.ChannelId] + 1
if len(finalHits) % pageSize == 0 {
break
}
} else {
nextPageHitsMaybeCheckLater.PushRight(hit)
}
}
}
return finalHits
}