cleanup and updates

This commit is contained in:
Jeffrey Picard 2021-06-09 20:04:06 -04:00
parent ad21f22f8e
commit 4280edc465
4 changed files with 136 additions and 134 deletions

2
dev.sh
View file

@ -3,4 +3,4 @@
hash reflex 2>/dev/null || go get github.com/cespare/reflex
hash reflex 2>/dev/null || { echo >&2 'Make sure '"$(go env GOPATH)"'/bin is in your $PATH'; exit 1; }
reflex --decoration=none --start-service=true -- sh -c "go run . serve"
reflex --decoration=none --start-service=true -- sh -c "go run . serve --dev"

71
main.go
View file

@ -19,51 +19,43 @@ import (
)
const (
defaultHost = "0.0.0.0"
defaultPort = "50051"
defaultRPCUser = "rpcuser"
defaultRPCPassword = "rpcpassword"
defaultEsHost = "http://localhost"
defaultEsPort = "9200"
)
type loginCreds struct {
Username, Password string
}
func (c *loginCreds) GetRequestMetadata(context.Context, ...string) (map[string]string, error) {
return map[string]string{
"username": c.Username,
"password": c.Password,
}, nil
}
func (c *loginCreds) RequireTransportSecurity() bool {
return false
}
func parseArgs(searchRequest *pb.SearchRequest) *server.Args {
getenvironment := func(data []string, getkeyval func(item string) (key, val string)) map[string]string {
items := make(map[string]string)
for _, item := range data {
key, val := getkeyval(item)
items[key] = val
}
return items
func GetEnvironment(data []string, getkeyval func(item string) (key, val string)) map[string]string {
items := make(map[string]string)
for _, item := range data {
key, val := getkeyval(item)
items[key] = val
}
environment := getenvironment(os.Environ(), func(item string) (key, val string) {
return items
}
func GetEnvironmentStandard() map[string]string {
return GetEnvironment(os.Environ(), func(item string) (key, val string) {
splits := strings.Split(item, "=")
key = splits[0]
val = splits[1]
return
})
}
func parseArgs(searchRequest *pb.SearchRequest) *server.Args {
environment := GetEnvironmentStandard()
parser := argparse.NewParser("hub", "hub server and client")
serveCmd := parser.NewCommand("serve", "start the hub server")
host := parser.String("", "rpchost", &argparse.Options{Required: false, Help: "host", Default: defaultHost})
port := parser.String("", "rpcport", &argparse.Options{Required: false, Help: "port", Default: defaultPort})
user := parser.String("", "rpcuser", &argparse.Options{Required: false, Help: "user", Default: defaultRPCUser})
pass := parser.String("", "rpcpassword", &argparse.Options{Required: false, Help: "password", Default: defaultRPCPassword})
esHost := parser.String("", "eshost", &argparse.Options{Required: false, Help: "host", Default: defaultEsHost})
esPort := parser.String("", "esport", &argparse.Options{Required: false, Help: "port", Default: defaultEsPort})
dev := parser.Flag("", "dev", &argparse.Options{Required: false, Help: "port", Default: false})
text := parser.String("", "text", &argparse.Options{Required: false, Help: "text query"})
name := parser.String("", "name", &argparse.Options{Required: false, Help: "name"})
@ -84,11 +76,11 @@ func parseArgs(searchRequest *pb.SearchRequest) *server.Args {
args := &server.Args{
Serve: false,
Host: *host,
Port: ":" + *port,
User: *user,
Pass: *pass,
EsHost: defaultEsHost,
EsPort: defaultEsPort,
EsHost: *esHost,
EsPort: *esPort,
Dev: *dev,
}
if esHost, ok := environment["ELASTIC_HOST"]; ok {
@ -143,9 +135,6 @@ func parseArgs(searchRequest *pb.SearchRequest) *server.Args {
func main() {
searchRequest := &pb.SearchRequest{}
//
//res := schema.ParseURL("@abc#1111")
//log.Println(res)
args := parseArgs(searchRequest)
@ -170,10 +159,6 @@ func main() {
conn, err := grpc.Dial("localhost"+args.Port,
grpc.WithInsecure(),
grpc.WithBlock(),
//grpc.WithPerRPCCredentials(&loginCreds{
// Username: args.User,
// Password: args.Pass,
//}),
)
if err != nil {
log.Fatalf("did not connect: %v", err)
@ -182,16 +167,6 @@ func main() {
c := pb.NewHubClient(conn)
/*
var query string
if len(os.Args) > 1 {
query = strings.Join(os.Args[1:], " ")
} else {
log.Printf("error: no search query provided\n")
os.Exit(1)
}
*/
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

View file

@ -7,7 +7,6 @@ import (
"github.com/golang/protobuf/ptypes/wrappers"
pb "github.com/lbryio/hub/protobuf/go"
"math"
//"github.com/lbryio/hub/schema"
"github.com/lbryio/hub/util"
"github.com/olivere/elastic/v7"
@ -20,13 +19,30 @@ import (
)
type record struct {
Txid string `json:"tx_id"`
Nout uint32 `json:"tx_nout"`
Height uint32 `json:"height"`
ClaimId string `json:"claim_id"`
ChannelId string `json:"channel_id"`
CreationHeight uint32 `json:"creation_height"`
RepostedClaimId string `json:"reposted_claim_id"`
Txid string `json:"tx_id"`
Nout uint32 `json:"tx_nout"`
Height uint32 `json:"height"`
ClaimId string `json:"claim_id"`
ChannelId string `json:"channel_id"`
RepostedClaimId string `json:"reposted_claim_id"`
CensorType uint32 `json:"censor_type"`
CensoringChannelHash string `json:"censoring_channel_hash"`
ShortUrl string `json:"short_url"`
CanonicalUrl string `json:"canonical_url"`
IsControlling bool `json:"is_controlling"`
TakeOverHeight uint32 `json:"take_over_height"`
CreationHeight uint32 `json:"creation_height"`
ActivationHeight uint32 `json:"activation_height"`
ExpirationHeight uint32 `json:"expiration_height"`
ClaimsInChannel uint32 `json:"claims_in_channel"`
Reposted uint32 `json:"reposted"`
EffectiveAmount uint64 `json:"effective_amount"`
SupportAmount uint64 `json:"support_amount"`
TrendingGroup uint32 `json:"trending_group"`
TrendingMixed float32 `json:"trending_mixed"`
TrendingLocal float32 `json:"trending_local"`
TrendingGlobal float32 `json:"trending_global"`
Name string `json:"name"`
}
type orderField struct {
@ -379,24 +395,25 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs,
q = q.Must(textQuery)
}
//TODO make this only happen in dev environment
indices, err := client.IndexNames()
if err != nil {
log.Fatalln(err)
}
var numIndices = 0
if len(indices) > 0 {
numIndices = len(indices) - 1
}
searchIndices := make([]string, numIndices)
j := 0
for i := 0; j < numIndices; i++ {
if indices[i] == "claims" {
continue
var searchIndices = []string{}
if s.Args.Dev {
indices, err := client.IndexNames()
if err != nil {
log.Fatalln(err)
}
var numIndices = 0
if len(indices) > 0 {
numIndices = len(indices) - 1
}
searchIndices = make([]string, numIndices)
j := 0
for i := 0; j < numIndices; i++ {
if indices[i] == "claims" {
continue
}
searchIndices[j] = indices[i]
j = j + 1
}
searchIndices[j] = indices[i]
j = j + 1
}
fsc := elastic.NewFetchSourceContext(true).Exclude("description", "title")//.Include("_id")
@ -419,6 +436,7 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs,
var txos []*pb.Output
var records []*record
var blocked []*pb.Blocked = make([]*pb.Blocked, 0)
records = make([]*record, 0, searchResult.TotalHits())
@ -429,23 +447,46 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs,
}
}
records = removeBlocked(records, &blocked)
if in.RemoveDuplicates != nil {
records = removeDuplicates(records)
}
if in.LimitClaimsPerChannel != nil {
if in.LimitClaimsPerChannel != nil && in.LimitClaimsPerChannel.Value > 0 {
records = searchAhead(records, pageSize, int(in.LimitClaimsPerChannel.Value))
}
finalLength := int(math.Min(float64(len(records)), float64(pageSize)))
txos = make([]*pb.Output, 0, finalLength)
j = 0
var j = 0
for i := from; i < from + finalLength && i < len(records) && j < finalLength; i++ {
t := records[i]
res := &pb.Output{
TxHash: util.ToHash(t.Txid),
Nout: t.Nout,
Height: t.Height,
Meta: &pb.Output_Claim{
Claim: &pb.ClaimMeta{
//Channel:
//Repost:
ShortUrl: t.ShortUrl,
CanonicalUrl: t.CanonicalUrl,
IsControlling: t.IsControlling,
TakeOverHeight: t.TakeOverHeight,
CreationHeight: t.CreationHeight,
ActivationHeight: t.ActivationHeight,
ExpirationHeight: t.ExpirationHeight,
ClaimsInChannel: t.ClaimsInChannel,
Reposted: t.Reposted,
EffectiveAmount: t.EffectiveAmount,
SupportAmount: t.SupportAmount,
TrendingGroup: t.TrendingGroup,
TrendingMixed: t.TrendingMixed,
TrendingLocal: t.TrendingLocal,
TrendingGlobal: t.TrendingGlobal,
},
},
}
txos = append(txos, res)
j += 1
@ -472,10 +513,24 @@ func (s *Server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.Outputs,
// //return nil, nil
//}
if in.NoTotals != nil && !in.NoTotals.Value {
return &pb.Outputs{
Txos: txos,
Offset: uint32(int64(from) + searchResult.TotalHits()),
Blocked: blocked,
}, nil
}
var blockedTotal uint32 = 0
for _, b := range blocked {
blockedTotal += b.Count
}
return &pb.Outputs{
Txos: txos,
Total: uint32(searchResult.TotalHits()),
Offset: uint32(int64(from) + searchResult.TotalHits()),
Blocked: blocked,
BlockedTotal: blockedTotal,
}, nil
}
@ -574,3 +629,30 @@ func removeDuplicates(searchHits []*record) []*record {
return deduped
}
func removeBlocked(searchHits []*record, blocked *[]*pb.Blocked) []*record {
newHits := make([]*record, 0, len(searchHits))
blockedChannels := make(map[string]*pb.Blocked)
for _, r := range searchHits {
if r.CensorType != 0 {
if blockedChannels[r.ChannelId] == nil {
blockedObj := &pb.Blocked{
Count: 1,
Channel: &pb.Output{
TxHash: util.ToHash(r.Txid),
Nout: r.Nout,
Height: r.Height,
},
}
*blocked = append(*blocked, blockedObj)
blockedChannels[r.ChannelId] = blockedObj
} else {
blockedChannels[r.ChannelId].Count += 1
}
} else {
newHits = append(newHits, r)
}
}
return newHits
}

View file

@ -1,11 +1,9 @@
package server
import (
"context"
pb "github.com/lbryio/hub/protobuf/go"
"github.com/olivere/elastic/v7"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
"log"
"regexp"
)
@ -21,21 +19,11 @@ type Server struct {
type Args struct {
Serve bool
Host string
Port string
User string
Pass string
EsHost string
EsPort string
}
type AccessDeniedErr struct {}
func (AccessDeniedErr) Error() string {
return "Username or password incorrect."
}
type EmptyMetadataErr struct {}
func (EmptyMetadataErr) Error() string {
return "No username or password specified."
Dev bool
}
/*
@ -77,9 +65,7 @@ func (EmptyMetadataErr) Error() string {
'blockchain.address.unsubscribe'
*/
//func MakeHubServer(args Args) *grpc.Server {
func MakeHubServer(args *Args) *Server {
// authorize := makeAuthorizeFunc(args.User, args.Pass)
grpcServer := grpc.NewServer()
multiSpaceRe, err := regexp.Compile("\\s{2,}")
@ -100,45 +86,4 @@ func MakeHubServer(args *Args) *Server {
}
return s
// grpc.StreamInterceptor(makeStreamInterceptor(authorize)),
// grpc.UnaryInterceptor(makeUnaryInterceptor(authorize)),
//)
}
func makeStreamInterceptor(authorize func(context.Context) error) func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
if err := authorize(stream.Context()); err != nil {
return err
}
return handler(srv, stream)
}
}
func makeUnaryInterceptor(authorize func(context.Context) error) func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
if err := authorize(ctx); err != nil {
return nil, err
}
return handler(ctx, req)
}
}
func makeAuthorizeFunc(username string, password string) func(context.Context) error {
return func(ctx context.Context) error {
if md, ok := metadata.FromIncomingContext(ctx); ok {
log.Println(md)
if len(md["username"]) > 0 && md["username"][0] == username &&
len(md["password"]) > 0 && md["password"][0] == password {
return nil
}
return AccessDeniedErr{}
}
return EmptyMetadataErr{}
}
}
}