Claim search port #5
2 changed files with 6 additions and 338 deletions
175
schema/url.go
175
schema/url.go
|
@ -1,175 +0,0 @@
|
|||
package schema
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/lbryio/hub/util"
|
||||
"log"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"strings"
|
||||
"unicode/utf16"
|
||||
)
|
||||
|
||||
type PathSegment struct {
|
||||
Name string
|
||||
ClaimId string
|
||||
AmountOrder int
|
||||
}
|
||||
|
||||
type URL struct {
|
||||
Stream *PathSegment
|
||||
Channel *PathSegment
|
||||
}
|
||||
|
||||
func (ps *PathSegment) Normalized() string {
|
||||
return util.Normalize(ps.Name)
|
||||
}
|
||||
|
||||
func (ps *PathSegment) IsShortID() bool {
|
||||
return len(ps.ClaimId) < 40
|
||||
}
|
||||
|
||||
func (ps *PathSegment) IsFullID() bool {
|
||||
return len(ps.ClaimId) == 40
|
||||
}
|
||||
|
||||
func (ps *PathSegment) String() string {
|
||||
if ps == nil {
|
||||
return ""
|
||||
}
|
||||
if ps.ClaimId != "" {
|
||||
return ps.Name + ":" + ps.ClaimId
|
||||
} else if ps.AmountOrder >= 0 {
|
||||
return fmt.Sprintf("%s$%d", ps.Name, ps.AmountOrder)
|
||||
}
|
||||
return ps.Name
|
||||
}
|
||||
|
||||
func (url *URL) HasChannel() bool {
|
||||
return url.Channel != nil
|
||||
}
|
||||
|
||||
func (url *URL) HasStream() bool {
|
||||
return url.Stream != nil
|
||||
}
|
||||
|
||||
func (url *URL) HasStreamInChannel() bool {
|
||||
return url.HasChannel() && url.HasStream()
|
||||
}
|
||||
|
||||
func (url *URL) GetParts() []*PathSegment {
|
||||
if url.HasStreamInChannel() {
|
||||
return []*PathSegment{url.Channel, url.Stream}
|
||||
}
|
||||
if url.HasChannel() {
|
||||
return []*PathSegment{url.Channel}
|
||||
}
|
||||
return []*PathSegment{url.Stream}
|
||||
}
|
||||
|
||||
func (url *URL) String() string {
|
||||
parts := url.GetParts()
|
||||
stringParts := make([]string, len(parts))
|
||||
for i, x := range parts {
|
||||
stringParts[i] = x.String()
|
||||
}
|
||||
return "lbry://" + strings.Join(stringParts, "/")
|
||||
}
|
||||
|
||||
func ParseURL(url string) *URL {
|
||||
segmentNames := []string{"channel", "stream", "channel_with_stream", "stream_in_channel"}
|
||||
re := createUrlRegex()
|
||||
|
||||
match := re.FindStringSubmatch(url)
|
||||
parts := make(map[string]string)
|
||||
for i, name := range re.SubexpNames() {
|
||||
if i != 0 && name != "" {
|
||||
parts[name] = match[i]
|
||||
}
|
||||
}
|
||||
|
||||
segments := make(map[string]*PathSegment)
|
||||
var amountOrder int
|
||||
for _, segment := range segmentNames {
|
||||
if res, ok := parts[segment + "_name"]; ok && res != ""{
|
||||
x, ok := parts[segment + "_amount_order"]
|
||||
if ok && x != "" {
|
||||
parsedInt, err := strconv.Atoi(x)
|
||||
if err != nil {
|
||||
log.Fatalln("can't parse amount_order")
|
||||
}
|
||||
amountOrder = parsedInt
|
||||
} else {
|
||||
amountOrder = -1
|
||||
}
|
||||
segments[segment] = &PathSegment{
|
||||
Name: parts[segment + "_name"],
|
||||
ClaimId: parts[segment + "_claim_id"],
|
||||
AmountOrder: amountOrder,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var stream *PathSegment = nil
|
||||
var channel *PathSegment = nil
|
||||
if _, ok := segments["channel_with_stream"]; ok {
|
||||
stream = segments["channel_with_stream"]
|
||||
channel = segments["stream_in_channel"]
|
||||
} else {
|
||||
stream = segments["channel"]
|
||||
channel = segments["stream"]
|
||||
}
|
||||
|
||||
return &URL{stream,channel}
|
||||
}
|
||||
|
||||
func createUrlRegex() *regexp.Regexp {
|
||||
d800 := []uint16{0xd800}
|
||||
dfff := []uint16{0xdfff}
|
||||
s1 := string(utf16.Decode(d800))
|
||||
s2 := string(utf16.Decode(dfff))
|
||||
log.Println(s1)
|
||||
log.Println(s2)
|
||||
//invalidNamesRegex := "[^=&#:$@%?;\"/\\<>%{}|^~`\\[\\]" + "\\u0000-\\u0020\\uD800-\\uDFFF\\uFFFE-\\uFFFF]+"
|
||||
invalidNamesRegex := "[^=&#:$@%?;\"/\\<>%{}|^~`\\[\\]" + "\u0000-\u0020" + s1 + "-" + s2 + "\uFFFE-\uFFFF]+"
|
||||
//invalidNamesRegex := "[^=&#:$@%?;\"/\\<>%{}|^~`\\[\\]" + "\u0000-\u0020-\uFFFE-\uFFFF]+"
|
||||
//invalidNamesRegex := "[^=&#:$@%?;\"/\\<>%{}|^~`\\[\\]" + "]+"
|
||||
|
||||
named := func (name string, regex string) string {
|
||||
return "(?P<" + name + ">" + regex + ")"
|
||||
}
|
||||
|
||||
group := func(regex string) string {
|
||||
return "(?:" + regex + ")"
|
||||
}
|
||||
|
||||
oneof := func(choices []string) string {
|
||||
return group(strings.Join(choices, "|"))
|
||||
}
|
||||
|
||||
claim := func(name string, prefix string) string {
|
||||
return group(
|
||||
named(name+"_name", prefix + invalidNamesRegex) +
|
||||
oneof(
|
||||
[]string {
|
||||
group("[:#]" + named(name+"_claim_id", "[0-9a-f]{1,40}")),
|
||||
group("\\$" + named(name+"_amount_order", "[1-9][0-9]*")),
|
||||
},
|
||||
) + "?",
|
||||
)
|
||||
}
|
||||
|
||||
finalStr := "^" +
|
||||
named("scheme", "lbry://") + "?" +
|
||||
oneof(
|
||||
[]string {
|
||||
group(claim("channel_with_stream", "@") + "/" + claim("stream_in_channel", "")),
|
||||
claim("channel", "@"),
|
||||
claim("stream", ""),
|
||||
},
|
||||
) +
|
||||
"$"
|
||||
|
||||
re := regexp.MustCompile(finalStr)
|
||||
return re
|
||||
}
|
169
server/search.go
169
server/search.go
|
@ -3,7 +3,6 @@ package server
|
|||
import (
|
||||
"context"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"github.com/btcsuite/btcutil/base58"
|
||||
"github.com/golang/protobuf/ptypes/wrappers"
|
||||
pb "github.com/lbryio/hub/protobuf/go"
|
||||
|
@ -17,7 +16,6 @@ import (
|
|||
"gopkg.in/karalabe/cookiejar.v1/collections/deque"
|
||||
"log"
|
||||
"reflect"
|
||||
"sort"
|
||||
"strings"
|
||||
)
|
||||
|
||||
|
@ -31,89 +29,10 @@ type record struct {
|
|||
RepostedClaimId string `json:"reposted_claim_id"`
|
||||
}
|
||||
|
||||
type compareFunc func(r1, r2 **record, invert bool) int
|
||||
|
||||
type multiSorter struct {
|
||||
records []*record
|
||||
compare []compareFunc
|
||||
invert []bool
|
||||
}
|
||||
|
||||
var compareFuncs = map[string]compareFunc {
|
||||
"height": func(r1, r2 **record, invert bool) int {
|
||||
var res = 0
|
||||
if (*r1).Height < (*r2).Height {
|
||||
res = -1
|
||||
} else if (*r1).Height > (*r2).Height {
|
||||
res = 1
|
||||
}
|
||||
if invert {
|
||||
res = res * -1
|
||||
}
|
||||
return res
|
||||
},
|
||||
}
|
||||
|
||||
// Sort sorts the argument slice according to the less functions passed to OrderedBy.
|
||||
func (ms *multiSorter) Sort(records []*record) {
|
||||
ms.records = records
|
||||
sort.Sort(ms)
|
||||
}
|
||||
|
||||
// OrderedBy returns a Sorter that sorts using the less functions, in order.
|
||||
// Call its Sort method to sort the data.
|
||||
func OrderedBy(compare ...compareFunc) *multiSorter {
|
||||
return &multiSorter{
|
||||
compare: compare,
|
||||
}
|
||||
}
|
||||
|
||||
// Len is part of sort.Interface.
|
||||
func (ms *multiSorter) Len() int {
|
||||
return len(ms.records)
|
||||
}
|
||||
|
||||
// Swap is part of sort.Interface.
|
||||
func (ms *multiSorter) Swap(i, j int) {
|
||||
ms.records[i], ms.records[j] = ms.records[j], ms.records[i]
|
||||
}
|
||||
|
||||
// Less is part of sort.Interface. It is implemented by looping along the
|
||||
// less functions until it finds a comparison that discriminates between
|
||||
// the two items (one is less than the other). Note that it can call the
|
||||
// less functions twice per call. We could change the functions to return
|
||||
// -1, 0, 1 and reduce the number of calls for greater efficiency: an
|
||||
// exercise for the reader.
|
||||
func (ms *multiSorter) Less(i, j int) bool {
|
||||
p, q := &ms.records[i], &ms.records[j]
|
||||
// Try all but the last comparison.
|
||||
var k int
|
||||
for k = 0; k < len(ms.compare)-1; k++ {
|
||||
cmp := ms.compare[k]
|
||||
res := cmp(p, q, ms.invert[k])
|
||||
|
||||
if res != 0 {
|
||||
return res > 0
|
||||
}
|
||||
}
|
||||
// All comparisons to here said "equal", so just return whatever
|
||||
// the final comparison reports.
|
||||
return ms.compare[k](p, q, ms.invert[k]) > 0
|
||||
}
|
||||
|
||||
type orderField struct {
|
||||
Field string
|
||||
IsAsc bool
|
||||
}
|
||||
const (
|
||||
errorResolution = iota
|
||||
channelResolution = iota
|
||||
streamResolution = iota
|
||||
)
|
||||
type urlResolution struct {
|
||||
resolutionType int
|
||||
value string
|
||||
}
|
||||
|
||||
func StrArrToInterface(arr []string) []interface{} {
|
||||
searchVals := make([]interface{}, len(arr))
|
||||
|
@ -267,7 +186,6 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs,
|
|||
var size = 1000
|
||||
var pageSize = 10
|
||||
var orderBy []orderField
|
||||
var ms *multiSorter
|
||||
|
||||
// Ping the Elasticsearch server to get e.g. the version number
|
||||
//_, code, err := client.Ping("http://127.0.0.1:9200").Do(ctx)
|
||||
|
@ -278,8 +196,6 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs,
|
|||
// return nil, errors.New("ping failed")
|
||||
//}
|
||||
|
||||
// TODO: support all of this https://github.com/lbryio/lbry-sdk/blob/master/lbry/wallet/server/db/elasticsearch/search.py#L385
|
||||
|
||||
q := elastic.NewBoolQuery()
|
||||
|
||||
if in.IsControlling != nil {
|
||||
|
@ -328,15 +244,6 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs,
|
|||
}
|
||||
orderBy = append(orderBy, orderField{toAppend, isAsc})
|
||||
}
|
||||
|
||||
ms = &multiSorter{
|
||||
invert: make([]bool, len(orderBy)),
|
||||
compare: make([]compareFunc, len(orderBy)),
|
||||
}
|
||||
for i, x := range orderBy {
|
||||
ms.compare[i] = compareFuncs[x.Field]
|
||||
ms.invert[i] = x.IsAsc
|
||||
}
|
||||
}
|
||||
|
||||
if len(in.ClaimType) > 0 {
|
||||
|
@ -410,20 +317,6 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs,
|
|||
q = q.Should(elastic.NewBoolQuery().Must(elastic.NewTermQuery("reposted_claim_type", claimTypes["channel"])))
|
||||
}
|
||||
|
||||
//var collapse *elastic.CollapseBuilder
|
||||
//if in.LimitClaimsPerChannel != nil {
|
||||
// println(in.LimitClaimsPerChannel.Value)
|
||||
// innerHit := elastic.
|
||||
// NewInnerHit().
|
||||
// //From(0).
|
||||
// Size(int(in.LimitClaimsPerChannel.Value)).
|
||||
// Name("channel_id")
|
||||
// for _, x := range orderBy {
|
||||
// innerHit = innerHit.Sort(x.Field, x.IsAsc)
|
||||
// }
|
||||
// collapse = elastic.NewCollapseBuilder("channel_id.keyword").InnerHit(innerHit)
|
||||
//}
|
||||
|
||||
if in.TxNout != nil {
|
||||
q = q.Must(elastic.NewTermQuery("tx_nout", in.TxNout.Value))
|
||||
}
|
||||
|
@ -515,9 +408,7 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs,
|
|||
FetchSourceContext(fsc).
|
||||
Query(q). // specify the query
|
||||
From(0).Size(1000)
|
||||
//if in.LimitClaimsPerChannel != nil {
|
||||
// search = search.Collapse(collapse)
|
||||
//}
|
||||
|
||||
for _, x := range orderBy {
|
||||
log.Println(x.Field, x.IsAsc)
|
||||
search = search.Sort(x.Field, x.IsAsc)
|
||||
|
@ -533,59 +424,14 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs,
|
|||
var txos []*pb.Output
|
||||
var records []*record
|
||||
|
||||
//if in.LimitClaimsPerChannel == nil {
|
||||
if true {
|
||||
records = make([]*record, 0, searchResult.TotalHits())
|
||||
records = make([]*record, 0, searchResult.TotalHits())
|
||||
|
||||
var r record
|
||||
for _, item := range searchResult.Each(reflect.TypeOf(r)) {
|
||||
if t, ok := item.(record); ok {
|
||||
records = append(records, &t)
|
||||
//txos[i] = &pb.Output{
|
||||
// TxHash: util.ToHash(t.Txid),
|
||||
// Nout: t.Nout,
|
||||
// Height: t.Height,
|
||||
//}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
records = make([]*record, 0, len(searchResult.Hits.Hits) * int(in.LimitClaimsPerChannel.Value))
|
||||
txos = make([]*pb.Output, 0, len(searchResult.Hits.Hits) * int(in.LimitClaimsPerChannel.Value))
|
||||
var i = 0
|
||||
for _, hit := range searchResult.Hits.Hits {
|
||||
if innerHit, ok := hit.InnerHits["channel_id"]; ok {
|
||||
for _, hitt := range innerHit.Hits.Hits {
|
||||
if i >= size {
|
||||
break
|
||||
}
|
||||
var t *record
|
||||
err := json.Unmarshal(hitt.Source, &t)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
records = append(records, t)
|
||||
i++
|
||||
}
|
||||
}
|
||||
}
|
||||
ms.Sort(records)
|
||||
log.Println(records)
|
||||
for _, t := range records {
|
||||
res := &pb.Output{
|
||||
TxHash: util.ToHash(t.Txid),
|
||||
Nout: t.Nout,
|
||||
Height: t.Height,
|
||||
}
|
||||
txos = append(txos, res)
|
||||
var r record
|
||||
for _, item := range searchResult.Each(reflect.TypeOf(r)) {
|
||||
if t, ok := item.(record); ok {
|
||||
records = append(records, &t)
|
||||
}
|
||||
}
|
||||
//
|
||||
//for _, rec := range records {
|
||||
// log.Println(*rec)
|
||||
//}
|
||||
//
|
||||
//log.Println("#########################")
|
||||
//
|
||||
|
||||
if in.RemoveDuplicates != nil {
|
||||
records = removeDuplicates(records)
|
||||
|
@ -593,9 +439,6 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs,
|
|||
|
||||
if in.LimitClaimsPerChannel != nil {
|
||||
records = searchAhead(records, pageSize, int(in.LimitClaimsPerChannel.Value))
|
||||
//for _, rec := range records {
|
||||
// log.Println(*rec)
|
||||
//}
|
||||
}
|
||||
|
||||
finalLength := int(math.Min(float64(len(records)), float64(pageSize)))
|
||||
|
|
Loading…
Reference in a new issue