Implementing url resolving server side

This commit is contained in:
Jeffrey Picard 2021-06-04 01:56:50 -04:00
parent 232025ac8e
commit b557bf8237
5 changed files with 455 additions and 49 deletions

View file

@ -3,6 +3,7 @@ package main
import (
"context"
"fmt"
"github.com/lbryio/hub/util"
"log"
"net"
"os"
@ -166,6 +167,6 @@ func main() {
log.Printf("found %d results\n", r.GetTotal())
for _, t := range r.Txos {
fmt.Printf("%s:%d\n", server.FromHash(t.TxHash), t.Nout)
fmt.Printf("%s:%d\n", util.FromHash(t.TxHash), t.Nout)
}
}

167
schema/url.go Normal file
View file

@ -0,0 +1,167 @@
package schema
import (
"fmt"
"github.com/lbryio/hub/util"
"log"
"regexp"
"strconv"
"strings"
)
type PathSegment struct {
Name string
ClaimId string
AmountOrder int
}
type URL struct {
Stream *PathSegment
Channel *PathSegment
}
func (ps *PathSegment) Normalized() string {
return util.Normalize(ps.Name)
}
func (ps *PathSegment) IsShortID() bool {
return len(ps.ClaimId) < 40
}
func (ps *PathSegment) IsFullID() bool {
return len(ps.ClaimId) == 40
}
func (ps *PathSegment) String() string {
if ps == nil {
return ""
}
if ps.ClaimId != "" {
return ps.Name + ":" + ps.ClaimId
} else if ps.AmountOrder >= 0 {
return fmt.Sprintf("%s$%d", ps.Name, ps.AmountOrder)
}
return ps.Name
}
func (url *URL) HasChannel() bool {
return url.Channel != nil
}
func (url *URL) HasStream() bool {
return url.Stream != nil
}
func (url *URL) HasStreamInChannel() bool {
return url.HasChannel() && url.HasStream()
}
func (url *URL) GetParts() []*PathSegment {
if url.HasStreamInChannel() {
return []*PathSegment{url.Channel, url.Stream}
}
if url.HasChannel() {
return []*PathSegment{url.Channel}
}
return []*PathSegment{url.Stream}
}
func (url *URL) String() string {
parts := url.GetParts()
stringParts := make([]string, len(parts))
for i, x := range parts {
stringParts[i] = x.String()
}
return "lbry://" + strings.Join(stringParts, "/")
}
func ParseURL(url string) *URL {
segmentNames := []string{"channel", "stream", "channel_with_stream", "stream_in_channel"}
re := createUrlRegex()
match := re.FindStringSubmatch(url)
parts := make(map[string]string)
for i, name := range re.SubexpNames() {
if i != 0 && name != "" {
parts[name] = match[i]
}
}
segments := make(map[string]*PathSegment)
var amountOrder int
for _, segment := range segmentNames {
if res, ok := parts[segment + "_name"]; ok && res != ""{
x, ok := parts[segment + "_amount_order"]
if ok && x != "" {
parsedInt, err := strconv.Atoi(x)
if err != nil {
log.Fatalln("can't parse amount_order")
}
amountOrder = parsedInt
} else {
amountOrder = -1
}
segments[segment] = &PathSegment{
Name: parts[segment + "_name"],
ClaimId: parts[segment + "_claim_id"],
AmountOrder: amountOrder,
}
}
}
var stream *PathSegment = nil
var channel *PathSegment = nil
if _, ok := segments["channel_with_stream"]; ok {
stream = segments["channel_with_stream"]
channel = segments["stream_in_channel"]
} else {
stream = segments["channel"]
channel = segments["stream"]
}
return &URL{stream,channel}
}
func createUrlRegex() *regexp.Regexp {
//invalidNamesRegex := "[^=&#:$@%?;\"/\\<>%{}|^~`\\[\\]" + "\u0000-\u0020\uD800-\uDFFF\uFFFE-\uFFFF]+"
//invalidNamesRegex := "[^=&#:$@%?;\"/\\<>%{}|^~`\\[\\]" + "\u0000-\u0020-\uFFFE-\uFFFF]+"
invalidNamesRegex := "[^=&#:$@%?;\"/\\<>%{}|^~`\\[\\]" + "]+"
named := func (name string, regex string) string {
return "(?P<" + name + ">" + regex + ")"
}
group := func(regex string) string {
return "(?:" + regex + ")"
}
oneof := func(choices []string) string {
return group(strings.Join(choices, "|"))
}
claim := func(name string, prefix string) string {
return group(
named(name+"_name", prefix + invalidNamesRegex) +
oneof(
[]string {
group("[:#]" + named(name+"_claim_id", "[0-9a-f]{1,40}")),
group("\\$" + named(name+"_amount_order", "[1-9][0-9]*")),
},
) + "?",
)
}
finalStr := "^" +
named("scheme", "lbry://") + "?" +
oneof(
[]string {
group(claim("channel_with_stream", "@") + "/" + claim("stream_in_channel", "")),
claim("channel", "@"),
claim("stream", ""),
},
) +
"$"
re := regexp.MustCompile(finalStr)
return re
}

View file

@ -8,10 +8,11 @@ import (
"github.com/btcsuite/btcutil/base58"
"github.com/golang/protobuf/ptypes/wrappers"
pb "github.com/lbryio/hub/protobuf/go"
"github.com/lbryio/hub/schema"
"github.com/lbryio/hub/util"
"github.com/olivere/elastic/v7"
"golang.org/x/text/cases"
"golang.org/x/text/language"
"golang.org/x/text/unicode/norm"
"log"
"reflect"
"strings"
@ -21,17 +22,21 @@ type record struct {
Txid string `json:"tx_id"`
Nout uint32 `json:"tx_nout"`
Height uint32 `json:"height"`
ClaimId string `json:"claim_id"`
}
type orderField struct {
Field string
is_asc bool
}
func ReverseBytes(s []byte) {
for i, j := 0, len(s)-1; i < j; i, j = i+1, j-1 {
s[i], s[j] = s[j], s[i]
}
const (
errorResolution = iota
channelResolution = iota
streamResolution = iota
)
type urlResolution struct {
resolutionType int
value string
}
func StrArrToInterface(arr []string) []interface{} {
@ -104,11 +109,6 @@ func AddInvertibleField(field *pb.InvertibleField, name string, q *elastic.BoolQ
}
}
func normalize(s string) string {
c := cases.Fold()
return c.String(norm.NFD.String(s))
}
func (s *Server) normalizeTag(tag string) string {
c := cases.Lower(language.English)
res := s.MultiSpaceRe.ReplaceAll(
@ -129,12 +129,230 @@ func (s *Server) cleanTags(tags []string) []string {
return cleanedTags
}
func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs, error) {
// TODO: reuse elastic client across requests
client, err := elastic.NewClient(elastic.SetSniff(false))
if err != nil {
return nil, err
func (s *Server) fullIdFromShortId(ctx context.Context, channelName string, claimId string) (string, error) {
return "", nil
}
func (s *Server) resolveStream(ctx context.Context, url *schema.URL, channelId string) (string, error) {
return "", nil
}
func (s *Server) resolveChannelId(ctx context.Context, url *schema.URL) (string, error) {
if !url.HasChannel() {
return "", nil
}
if url.Channel.IsFullID() {
return url.Channel.ClaimId, nil
}
if url.Channel.IsShortID() {
channelId, err := s.fullIdFromShortId(ctx, url.Channel.Name, url.Channel.ClaimId)
if err != nil {
return "", err
}
return channelId, nil
}
in := &pb.SearchRequest{}
in.Normalized = []string{util.Normalize(url.Channel.Name)}
if url.Channel.ClaimId == "" && url.Channel.AmountOrder < 0 {
in.IsControlling = &wrappers.BoolValue{Value: true}
} else {
if url.Channel.AmountOrder > 0 {
in.AmountOrder = &wrappers.Int32Value{Value: int32(url.Channel.AmountOrder)}
}
if url.Channel.ClaimId != "" {
in.ClaimId = &pb.InvertibleField{
Invert: false,
Value: []string{url.Channel.ClaimId},
}
}
}
var size = 1
var from = 0
q := elastic.NewBoolQuery()
q = AddTermsField(in.Normalized, "normalized", q)
if in.IsControlling != nil {
q = q.Must(elastic.NewTermQuery("is_controlling", in.IsControlling.Value))
}
if in.AmountOrder != nil {
in.Limit.Value = 1
in.OrderBy = []string{"effective_amount"}
in.Offset = &wrappers.Int32Value{Value: in.AmountOrder.Value - 1}
}
if in.Limit != nil {
size = int(in.Limit.Value)
}
if in.Offset != nil {
from = int(in.Offset.Value)
}
if in.ClaimId != nil {
searchVals := StrArrToInterface(in.ClaimId.Value)
if len(in.ClaimId.Value) == 1 && len(in.ClaimId.Value[0]) < 20 {
if in.ClaimId.Invert {
q = q.MustNot(elastic.NewPrefixQuery("claim_id.keyword", in.ClaimId.Value[0]))
} else {
q = q.Must(elastic.NewPrefixQuery("claim_id.keyword", in.ClaimId.Value[0]))
}
} else {
if in.ClaimId.Invert {
q = q.MustNot(elastic.NewTermsQuery("claim_id.keyword", searchVals...))
} else {
q = q.Must(elastic.NewTermsQuery("claim_id.keyword", searchVals...))
}
}
}
searchResult, err := s.EsClient.Search().
Query(q). // specify the query
From(from).Size(size).
Do(ctx)
if err != nil {
return "", err
}
var r record
var channelId string
for _, item := range searchResult.Each(reflect.TypeOf(r)) {
if t, ok := item.(record); ok {
channelId = t.ClaimId
}
}
//matches, err := s.Search(ctx, in)
//if err != nil {
// return "", err
//}
return channelId, nil
}
func (s *Server) resolveUrl(ctx context.Context, rawUrl string) *urlResolution {
url := schema.ParseURL(rawUrl)
if url == nil {
return nil
}
channelId, err := s.resolveChannelId(ctx, url)
if err != nil {
return &urlResolution{
resolutionType: errorResolution,
value: fmt.Sprintf("Could not find channel in \"%s\".", url),
}
}
stream, _ := s.resolveStream(ctx, url, channelId)
if url.HasStream() {
return &urlResolution{
resolutionType: streamResolution,
value: stream,
}
} else {
return &urlResolution{
resolutionType: channelResolution,
value: channelId,
}
}
}
/*
async def resolve_url(self, raw_url):
if raw_url not in self.resolution_cache:
self.resolution_cache[raw_url] = await self._resolve_url(raw_url)
return self.resolution_cache[raw_url]
async def _resolve_url(self, raw_url):
try:
url = URL.parse(raw_url)
except ValueError as e:
return e
stream = LookupError(f'Could not find claim at "{raw_url}".')
channel_id = await self.resolve_channel_id(url)
if isinstance(channel_id, LookupError):
return channel_id
stream = (await self.resolve_stream(url, channel_id if isinstance(channel_id, str) else None)) or stream
if url.has_stream:
return StreamResolution(stream)
else:
return ChannelResolution(channel_id)
async def resolve_channel_id(self, url: URL):
if not url.has_channel:
return
if url.channel.is_fullid:
return url.channel.claim_id
if url.channel.is_shortid:
channel_id = await self.full_id_from_short_id(url.channel.name, url.channel.claim_id)
if not channel_id:
return LookupError(f'Could not find channel in "{url}".')
return channel_id
query = url.channel.to_dict()
if set(query) == {'name'}:
query['is_controlling'] = True
else:
query['order_by'] = ['^creation_height']
matches, _, _ = await self.search(**query, limit=1)
if matches:
channel_id = matches[0]['claim_id']
else:
return LookupError(f'Could not find channel in "{url}".')
return channel_id
async def resolve_stream(self, url: URL, channel_id: str = None):
if not url.has_stream:
return None
if url.has_channel and channel_id is None:
return None
query = url.stream.to_dict()
if url.stream.claim_id is not None:
if url.stream.is_fullid:
claim_id = url.stream.claim_id
else:
claim_id = await self.full_id_from_short_id(query['name'], query['claim_id'], channel_id)
return claim_id
if channel_id is not None:
if set(query) == {'name'}:
# temporarily emulate is_controlling for claims in channel
query['order_by'] = ['effective_amount', '^height']
else:
query['order_by'] = ['^channel_join']
query['channel_id'] = channel_id
query['signature_valid'] = True
elif set(query) == {'name'}:
query['is_controlling'] = True
matches, _, _ = await self.search(**query, limit=1)
if matches:
return matches[0]['claim_id']
*/
func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs, error) {
var client *elastic.Client = nil
if s.EsClient == nil {
tmpClient, err := elastic.NewClient(elastic.SetSniff(false))
if err != nil {
return nil, err
}
client = tmpClient
s.EsClient = client
} else {
client = s.EsClient
}
res := s.resolveUrl(ctx, "@abc#111")
log.Println(res)
claimTypes := map[string]int {
"stream": 1,
@ -218,7 +436,7 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs,
if len(in.Name) > 0 {
normalized := make([]string, len(in.Name))
for i := 0; i < len(in.Name); i++ {
normalized[i] = normalize(in.Name[i])
normalized[i] = util.Normalize(in.Name[i])
}
in.Normalized = normalized
}
@ -264,7 +482,7 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs,
if len(in.XId) > 0 {
searchVals := make([]interface{}, len(in.XId))
for i := 0; i < len(in.XId); i++ {
ReverseBytes(in.XId[i])
util.ReverseBytes(in.XId[i])
searchVals[i] = hex.Dump(in.XId[i])
}
if len(in.XId) == 1 && len(in.XId[0]) < 20 {
@ -432,7 +650,7 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs,
for i, item := range searchResult.Each(reflect.TypeOf(r)) {
if t, ok := item.(record); ok {
txos[i] = &pb.Output{
TxHash: toHash(t.Txid),
TxHash: util.ToHash(t.Txid),
Nout: t.Nout,
Height: t.Height,
}
@ -466,32 +684,3 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs,
Offset: uint32(int64(from) + searchResult.TotalHits()),
}, nil
}
// convert txid to txHash
func toHash(txid string) []byte {
t, err := hex.DecodeString(txid)
if err != nil {
return nil
}
// reverse the bytes. thanks, Satoshi 😒
for i, j := 0, len(t)-1; i < j; i, j = i+1, j-1 {
t[i], t[j] = t[j], t[i]
}
return t
}
// convert txHash to txid
func FromHash(txHash []byte) string {
t := make([]byte, len(txHash))
copy(t, txHash)
// reverse the bytes. thanks, Satoshi 😒
for i, j := 0, len(txHash)-1; i < j; i, j = i+1, j-1 {
txHash[i], txHash[j] = txHash[j], txHash[i]
}
return hex.EncodeToString(t)
}

View file

@ -3,6 +3,7 @@ package server
import (
"context"
pb "github.com/lbryio/hub/protobuf/go"
"github.com/olivere/elastic/v7"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
"log"
@ -14,6 +15,7 @@ type Server struct {
Args *Args
MultiSpaceRe *regexp.Regexp
WeirdCharsRe *regexp.Regexp
EsClient *elastic.Client
pb.UnimplementedHubServer
}

47
util/util.go Normal file
View file

@ -0,0 +1,47 @@
package util
import (
"encoding/hex"
"golang.org/x/text/cases"
"golang.org/x/text/unicode/norm"
)
func Normalize(s string) string {
c := cases.Fold()
return c.String(norm.NFD.String(s))
}
func ReverseBytes(s []byte) {
for i, j := 0, len(s)-1; i < j; i, j = i+1, j-1 {
s[i], s[j] = s[j], s[i]
}
}
// convert txid to txHash
func ToHash(txid string) []byte {
t, err := hex.DecodeString(txid)
if err != nil {
return nil
}
// reverse the bytes. thanks, Satoshi 😒
for i, j := 0, len(t)-1; i < j; i, j = i+1, j-1 {
t[i], t[j] = t[j], t[i]
}
return t
}
// convert txHash to txid
func FromHash(txHash []byte) string {
t := make([]byte, len(txHash))
copy(t, txHash)
// reverse the bytes. thanks, Satoshi 😒
for i, j := 0, len(txHash)-1; i < j; i, j = i+1, j-1 {
txHash[i], txHash[j] = txHash[j], txHash[i]
}
return hex.EncodeToString(t)
}